View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2025, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).   61
   62/** <module> Pengines: Web Logic Programming Made Easy
   63
   64The library(pengines) provides an  infrastructure   for  creating Prolog
   65engines in a (remote) pengine server  and accessing these engines either
   66from Prolog or JavaScript.
   67
   68@author Torbjörn Lager and Jan Wielemaker
   69*/
   70
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(statistics),[thread_statistics/2]).   91:- autoload(library(term_to_json),[term_to_json/2]).   92:- autoload(library(thread_pool),
   93	    [thread_pool_create/3,thread_create_in_pool/4]).   94:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   95:- autoload(library(uri),
   96	    [ uri_components/2,
   97	      uri_query_components/2,
   98	      uri_data/3,
   99	      uri_data/4,
  100	      uri_edit/3,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- use_module(library(http/http_dispatch),
  106              [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json_dict/1]).  115:- use_module(library(sandbox),[safe_goal/1]).  116
  117:- if(exists_source(library(uuid))).  118:- autoload(library(uuid), [uuid/2]).  119:- endif.  120
  121
  122:- meta_predicate
  123    pengine_create(:),
  124    pengine_rpc(+, +, :),
  125    pengine_event_loop(1, +).  126
  127:- multifile
  128    write_result/3,                 % +Format, +Event, +Dict
  129    event_to_json/3,                % +Event, -JSON, +Format
  130    prepare_module/3,               % +Module, +Application, +Options
  131    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  132    authentication_hook/3,          % +Request, +Application, -User
  133    not_sandboxed/2,                % +User, +App
  134    pengine_flush_output_hook/0.  135
  136:- predicate_options(pengine_create/1, 1,
  137                     [ id(-atom),
  138                       alias(atom),
  139                       application(atom),
  140                       destroy(boolean),
  141                       server(atom),
  142                       ask(compound),
  143                       template(compound),
  144                       chunk(integer;oneof([false])),
  145                       bindings(list),
  146                       src_list(list),
  147                       src_text(any),           % text
  148                       src_url(atom),
  149                       src_predicates(list)
  150                     ]).  151:- predicate_options(pengine_ask/3, 3,
  152                     [ template(any),
  153                       chunk(integer;oneof([false])),
  154                       bindings(list)
  155                     ]).  156:- predicate_options(pengine_next/2, 2,
  157                     [ chunk(integer),
  158                       pass_to(pengine_send/3, 3)
  159                     ]).  160:- predicate_options(pengine_stop/2, 2,
  161                     [ pass_to(pengine_send/3, 3)
  162                     ]).  163:- predicate_options(pengine_respond/3, 2,
  164                     [ pass_to(pengine_send/3, 3)
  165                     ]).  166:- predicate_options(pengine_rpc/3, 3,
  167                     [ chunk(integer;oneof([false])),
  168                       pass_to(pengine_create/1, 1)
  169                     ]).  170:- predicate_options(pengine_send/3, 3,
  171                     [ delay(number)
  172                     ]).  173:- predicate_options(pengine_event/2, 2,
  174                     [ listen(atom),
  175                       pass_to(system:thread_get_message/3, 3)
  176                     ]).  177:- predicate_options(pengine_pull_response/2, 2,
  178                     [ pass_to(http_open/3, 3)
  179                     ]).  180:- predicate_options(pengine_event_loop/2, 2,
  181                     []).                       % not yet implemented
  182
  183% :- debug(pengine(transition)).
  184:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  185
  186goal_expansion(random_delay, Expanded) :-
  187    (   debugging(pengine(delay))
  188    ->  Expanded = do_random_delay
  189    ;   Expanded = true
  190    ).
  191
  192do_random_delay :-
  193    Delay is random(20)/1000,
  194    sleep(Delay).
  195
  196:- meta_predicate                       % internal meta predicates
  197    solve(+, ?, 0, +),
  198    findnsols_no_empty(+, ?, 0, -),
  199    pengine_event_loop(+, 1, +).  200
  201/**  pengine_create(:Options) is det.
  202
  203    Creates a new pengine. Valid options are:
  204
  205    * id(-ID)
  206      ID gets instantiated to the id of the created pengine.  ID is
  207      atomic.
  208
  209    * alias(+Name)
  210      The pengine is named Name (an atom). A slave pengine (child) can
  211      subsequently be referred to by this name.
  212
  213    * application(+Application)
  214      Application in which the pengine runs.  See pengine_application/1.
  215
  216    * server(+URL)
  217      The pengine will run in (and in the Prolog context of) the pengine
  218      server located at URL.
  219
  220    * src_list(+List_of_clauses)
  221      Inject a list of Prolog clauses into the pengine.
  222
  223    * src_text(+Atom_or_string)
  224      Inject the clauses specified by a source text into the pengine.
  225
  226    * src_url(+URL)
  227      Inject the clauses specified in the file located at URL into the
  228      pengine.
  229
  230    * src_predicates(+List)
  231      Send the local predicates denoted by List to the remote pengine.
  232      List is a list of predicate indicators.
  233
  234Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  235non-local pengines) and thread_create/3. Note   that for thread_create/3
  236only options changing the stack-sizes can be used. In particular, do not
  237pass the detached or alias options..
  238
  239Successful creation of a pengine will return an _event term_ of the
  240following form:
  241
  242    * create(ID, Term)
  243      ID is the id of the pengine that was created.
  244      Term is not used at the moment.
  245
  246An error will be returned if the pengine could not be created:
  247
  248    * error(ID, Term)
  249      ID is invalid, since no pengine was created.
  250      Term is the exception's error term.
  251*/
  252
  253
  254pengine_create(M:Options0) :-
  255    translate_local_sources(Options0, Options, M),
  256    (   select_option(server(BaseURL), Options, RestOptions)
  257    ->  remote_pengine_create(BaseURL, RestOptions)
  258    ;   local_pengine_create(Options)
  259    ).
  260
  261%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  262%
  263%   Translate  the  `src_predicates`  and  `src_list`  options  into
  264%   `src_text`. We need to do that   anyway for remote pengines. For
  265%   local pengines, we could avoid  this   step,  but  there is very
  266%   little point in transferring source to a local pengine anyway as
  267%   local pengines can access any  Prolog   predicate  that you make
  268%   visible to the application.
  269%
  270%   Multiple sources are concatenated  to  end   up  with  a  single
  271%   src_text option.
  272
  273translate_local_sources(OptionsIn, Options, Module) :-
  274    translate_local_sources(OptionsIn, Sources, Options2, Module),
  275    (   Sources == []
  276    ->  Options = Options2
  277    ;   Sources = [Source]
  278    ->  Options = [src_text(Source)|Options2]
  279    ;   atomics_to_string(Sources, Source)
  280    ->  Options = [src_text(Source)|Options2]
  281    ).
  282
  283translate_local_sources([], [], [], _).
  284translate_local_sources([H0|T], [S0|S], Options, M) :-
  285    nonvar(H0),
  286    translate_local_source(H0, S0, M),
  287    !,
  288    translate_local_sources(T, S, Options, M).
  289translate_local_sources([H|T0], S, [H|T], M) :-
  290    translate_local_sources(T0, S, T, M).
  291
  292translate_local_source(src_predicates(PIs), Source, M) :-
  293    must_be(list, PIs),
  294    with_output_to(string(Source),
  295                   maplist(list_in_module(M), PIs)).
  296translate_local_source(src_list(Terms), Source, _) :-
  297    must_be(list, Terms),
  298    with_output_to(string(Source),
  299                   forall(member(Term, Terms),
  300                          format('~k .~n', [Term]))).
  301translate_local_source(src_text(Source), Source, _).
  302
  303list_in_module(M, PI) :-
  304    listing(M:PI).
  305
  306/**  pengine_send(+NameOrID, +Term) is det
  307
  308Same as pengine_send(NameOrID, Term, []).
  309*/
  310
  311pengine_send(Target, Event) :-
  312    pengine_send(Target, Event, []).
  313
  314
  315/**  pengine_send(+NameOrID, +Term, +Options) is det
  316
  317Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  318NameOrID. Options is a list of options:
  319
  320   * delay(+Time)
  321     The actual sending is delayed by Time seconds. Time is an integer
  322     or a float.
  323
  324Any remaining options are passed to http_open/3.
  325*/
  326
  327pengine_send(Target, Event, Options) :-
  328    must_be(atom, Target),
  329    pengine_send2(Target, Event, Options).
  330
  331pengine_send2(self, Event, Options) :-
  332    !,
  333    thread_self(Queue),
  334    delay_message(queue(Queue), Event, Options).
  335pengine_send2(Name, Event, Options) :-
  336    child(Name, Target),
  337    !,
  338    delay_message(pengine(Target), Event, Options).
  339pengine_send2(Target, Event, Options) :-
  340    delay_message(pengine(Target), Event, Options).
  341
  342delay_message(Target, Event, Options) :-
  343    option(delay(Delay), Options),
  344    !,
  345    alarm(Delay,
  346          send_message(Target, Event, Options),
  347          _AlarmID,
  348          [remove(true)]).
  349delay_message(Target, Event, Options) :-
  350    random_delay,
  351    send_message(Target, Event, Options).
  352
  353send_message(queue(Queue), Event, _) :-
  354    thread_send_message(Queue, pengine_request(Event)).
  355send_message(pengine(Pengine), Event, Options) :-
  356    (   pengine_remote(Pengine, Server)
  357    ->  remote_pengine_send(Server, Pengine, Event, Options)
  358    ;   pengine_thread(Pengine, Thread)
  359    ->  thread_send_message(Thread, pengine_request(Event))
  360    ;   existence_error(pengine, Pengine)
  361    ).
  362
  363%!  pengine_request(-Request) is det.
  364%
  365%   To be used by a pengine to wait  for the next request. Such messages
  366%   are placed in the  queue  by   pengine_send/2.  Keeps  the thread in
  367%   normal state if an event arrives within a second. Otherwise it waits
  368%   for the `idle_limit` setting while   using  thread_idle/2 to minimis
  369%   resources.
  370
  371pengine_request(Request) :-
  372    thread_self(Me),
  373    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  374    !.
  375pengine_request(Request) :-
  376    pengine_self(Self),
  377    get_pengine_application(Self, Application),
  378    setting(Application:idle_limit, IdleLimit0),
  379    IdleLimit is IdleLimit0-1,
  380    thread_self(Me),
  381    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  382                                       [timeout(IdleLimit)]),
  383                    long)
  384    ->  true
  385    ;   Request = destroy
  386    ).
  387
  388
  389%!  pengine_reply(+Event) is det.
  390%!  pengine_reply(+Queue, +Event) is det.
  391%
  392%   Reply Event to the parent of the   current  Pengine or the given
  393%   Queue.  Such  events  are  read   by    the   other   side  with
  394%   pengine_event/1.
  395%
  396%   If the message cannot be sent within the `idle_limit` setting of
  397%   the pengine, abort the pengine.
  398
  399pengine_reply(Event) :-
  400    pengine_parent(Queue),
  401    pengine_reply(Queue, Event).
  402
  403pengine_reply(_Queue, _Event0) :-
  404    nb_current(pengine_idle_limit_exceeded, true),
  405    !.
  406pengine_reply(Queue, Event0) :-
  407    arg(1, Event0, ID),
  408    wrap_first_answer(ID, Event0, Event),
  409    random_delay,
  410    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  411    (   pengine_self(ID),
  412        \+ pengine_detached(ID, _)
  413    ->  get_pengine_application(ID, Application),
  414        setting(Application:idle_limit, IdleLimit),
  415        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  416        (   thread_send_message(Queue, pengine_event(ID, Event),
  417                                [ timeout(IdleLimit)
  418                                ])
  419        ->  true
  420        ;   thread_self(Me),
  421            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  422                  [ID, Me]),
  423            nb_setval(pengine_idle_limit_exceeded, true),
  424            thread_detach(Me),
  425            abort
  426        )
  427    ;   thread_send_message(Queue, pengine_event(ID, Event))
  428    ).
  429
  430wrap_first_answer(ID, Event0, CreateEvent) :-
  431    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  432    arg(1, CreateEvent, ID),
  433    !,
  434    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  435wrap_first_answer(_ID, Event, Event).
  436
  437
  438empty_queue :-
  439    pengine_parent(Queue),
  440    empty_queue(Queue, 0, Discarded),
  441    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  442
  443empty_queue(Queue, C0, C) :-
  444    thread_get_message(Queue, _Term, [timeout(0)]),
  445    !,
  446    C1 is C0+1,
  447    empty_queue(Queue, C1, C).
  448empty_queue(_, C, C).
  449
  450
  451/** pengine_ask(+NameOrID, @Query, +Options) is det
  452
  453Asks pengine NameOrID a query Query.
  454
  455Options is a list of options:
  456
  457    * template(+Template)
  458      Template is a variable (or a term containing variables) shared
  459      with the query. By default, the template is identical to the
  460      query.
  461
  462    * chunk(+IntegerOrFalse)
  463      Retrieve solutions in chunks of Integer rather than one by one. 1
  464      means no chunking (default). Other integers indicate the maximum
  465      number of solutions to retrieve in one chunk.  If `false`, the
  466      Pengine goal is not executed using findall/3 and friends and
  467      we do not backtrack immediately over the goal.  As a result,
  468      changes to backtrackable global state are retained.  This is
  469      similar that using set_prolog_flag(toplevel_mode, recursive).
  470
  471    * bindings(+Bindings)
  472      Sets the global variable '$variable_names' to a list of
  473      `Name = Var` terms, providing access to the actual variable
  474      names.
  475
  476Any remaining options are passed to pengine_send/3.
  477
  478Note that the predicate pengine_ask/3 is deterministic, even for queries
  479that have more than one solution. Also,  the variables in Query will not
  480be bound. Instead, results will  be  returned   in  the  form  of _event
  481terms_.
  482
  483    * success(ID, Terms, Projection, Time, More)
  484      ID is the id of the pengine that succeeded in solving the query.
  485      Terms is a list holding instantiations of `Template`.  Projection
  486      is a list of variable names that should be displayed. Time is
  487      the CPU time used to produce the results and finally, More
  488      is either `true` or `false`, indicating whether we can expect the
  489      pengine to be able to return more solutions or not, would we call
  490      pengine_next/2.
  491
  492    * failure(ID)
  493      ID is the id of the pengine that failed for lack of a solutions.
  494
  495    * error(ID, Term)
  496      ID is the id of the pengine throwing the exception.
  497      Term is the exception's error term.
  498
  499    * output(ID, Term)
  500      ID is the id of a pengine running the query that called
  501      pengine_output/1. Term is the term that was passed in the first
  502      argument of pengine_output/1 when it was called.
  503
  504    * prompt(ID, Term)
  505      ID is the id of the pengine that called pengine_input/2 and Term is
  506      the prompt.
  507
  508Defined in terms of pengine_send/3, like so:
  509
  510==
  511pengine_ask(ID, Query, Options) :-
  512    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  513    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  514==
  515*/
  516
  517pengine_ask(ID, Query, Options) :-
  518    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  519    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  520
  521
  522pengine_ask_option(template(_)).
  523pengine_ask_option(chunk(_)).
  524pengine_ask_option(bindings(_)).
  525pengine_ask_option(breakpoints(_)).
  526
  527
  528/** pengine_next(+NameOrID, +Options) is det
  529
  530Asks pengine NameOrID for the  next  solution   to  a  query  started by
  531pengine_ask/3. Defined options are:
  532
  533    * chunk(+Count)
  534    Modify the chunk-size to Count before asking the next set of
  535    solutions.  This may not be used if the goal was started with
  536    chunk(false).
  537
  538Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  539re-executing the current goal is returned  to the caller's message queue
  540in the form of _event terms_.
  541
  542    * success(ID, Terms, Projection, Time, More)
  543      See pengine_ask/3.
  544
  545    * failure(ID)
  546      ID is the id of the pengine that failed for lack of more solutions.
  547
  548    * error(ID, Term)
  549      ID is the id of the pengine throwing the exception.
  550      Term is the exception's error term.
  551
  552    * output(ID, Term)
  553      ID is the id of a pengine running the query that called
  554      pengine_output/1. Term is the term that was passed in the first
  555      argument of pengine_output/1 when it was called.
  556
  557    * prompt(ID, Term)
  558      ID is the id of the pengine that called pengine_input/2 and Term
  559      is the prompt.
  560
  561Defined in terms of pengine_send/3, as follows:
  562
  563==
  564pengine_next(ID, Options) :-
  565    pengine_send(ID, next, Options).
  566==
  567
  568*/
  569
  570pengine_next(ID, Options) :-
  571    select_option(chunk(Count), Options, Options1),
  572    !,
  573    pengine_send(ID, next(Count), Options1).
  574pengine_next(ID, Options) :-
  575    pengine_send(ID, next, Options).
  576
  577
  578/** pengine_stop(+NameOrID, +Options) is det
  579
  580Tells pengine NameOrID to stop looking  for   more  solutions to a query
  581started by pengine_ask/3. Options are passed to pengine_send/3.
  582
  583Defined in terms of pengine_send/3, like so:
  584
  585==
  586pengine_stop(ID, Options) :-
  587    pengine_send(ID, stop, Options).
  588==
  589*/
  590
  591pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  592
  593
  594/** pengine_abort(+NameOrID) is det
  595
  596Aborts the running query. The pengine goes   back  to state `2', waiting
  597for new queries.
  598
  599@see pengine_destroy/1.
  600*/
  601
  602pengine_abort(Name) :-
  603    (   child(Name, Pengine)
  604    ->  true
  605    ;   Pengine = Name
  606    ),
  607    (   pengine_remote(Pengine, Server)
  608    ->  remote_pengine_abort(Server, Pengine, [])
  609    ;   pengine_thread(Pengine, Thread),
  610        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  611        catch(thread_signal(Thread, throw(abort_query)), _, true)
  612    ).
  613
  614
  615/** pengine_destroy(+NameOrID) is det.
  616    pengine_destroy(+NameOrID, +Options) is det.
  617
  618Destroys the pengine NameOrID.  With the option force(true), the pengine
  619is killed using abort/0 and pengine_destroy/2 succeeds.
  620*/
  621
  622pengine_destroy(ID) :-
  623    pengine_destroy(ID, []).
  624
  625pengine_destroy(Name, Options) :-
  626    (   child(Name, ID)
  627    ->  true
  628    ;   ID = Name
  629    ),
  630    option(force(true), Options),
  631    !,
  632    (   pengine_thread(ID, Thread)
  633    ->  catch(thread_signal(Thread, abort),
  634              error(existence_error(thread, _), _), true)
  635    ;   true
  636    ).
  637pengine_destroy(ID, Options) :-
  638    catch(pengine_send(ID, destroy, Options),
  639          error(existence_error(pengine, ID), _),
  640          retractall(child(_,ID))).
  641
  642
  643/*================= pengines administration =======================
  644*/
  645
  646%!  current_pengine(?Id, ?Parent, ?Location)
  647%
  648%   Dynamic predicate that registers our known pengines.  Id is
  649%   an atomic unique datatype.  Parent is the id of our parent
  650%   pengine.  Location is one of
  651%
  652%     - thread(ThreadId)
  653%     - remote(URL)
  654
  655:- dynamic
  656    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  657    pengine_queue/4,                % Id, Queue, TimeOut, Time
  658    output_queue/3,                 % Id, Queue, Time
  659    pengine_user/2,                 % Id, User
  660    pengine_data/2,                 % Id, Data
  661    pengine_detached/2.             % Id, Data
  662:- volatile
  663    current_pengine/6,
  664    pengine_queue/4,
  665    output_queue/3,
  666    pengine_user/2,
  667    pengine_data/2,
  668    pengine_detached/2.  669
  670:- thread_local
  671    child/2.                        % ?Name, ?Child
  672
  673%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  674%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  675%!  pengine_unregister(+Id) is det.
  676
  677pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  678    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  679
  680pengine_register_remote(Id, URL, Application, Destroy) :-
  681    thread_self(Queue),
  682    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  683
  684%!  pengine_unregister(+Id)
  685%
  686%   Called by the pengine thread  destruction.   If  we are a remote
  687%   pengine thread, our URL  equals  =http=   and  the  queue is the
  688%   message queue used to send events to the HTTP workers.
  689
  690pengine_unregister(Id) :-
  691    thread_self(Me),
  692    (   current_pengine(Id, Queue, Me, http, _, _)
  693    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  694    ;   true
  695    ),
  696    retractall(current_pengine(Id, _, Me, _, _, _)),
  697    retractall(pengine_user(Id, _)),
  698    retractall(pengine_data(Id, _)).
  699
  700pengine_unregister_remote(Id) :-
  701    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  702
  703%!  pengine_self(-Id) is det.
  704%
  705%   True if the current thread is a pengine with Id.
  706
  707pengine_self(Id) :-
  708    thread_self(Thread),
  709    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  710
  711pengine_parent(Parent) :-
  712    nb_getval(pengine_parent, Parent).
  713
  714pengine_thread(Pengine, Thread) :-
  715    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  716    Thread \== 0,
  717    !.
  718
  719pengine_remote(Pengine, URL) :-
  720    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  721
  722get_pengine_application(Pengine, Application) :-
  723    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  724    !.
  725
  726get_pengine_module(Pengine, Pengine).
  727
  728:- if(current_predicate(uuid/2)).  729pengine_uuid(Id) :-
  730    uuid(Id, [version(4)]).             % Version 4 is random.
  731:- else.  732pengine_uuid(Id) :-
  733    (   current_prolog_flag(max_integer, Max1)
  734    ->  Max is Max1-1
  735    ;   Max is 1<<128
  736    ),
  737    random_between(0, Max, Num),
  738    atom_number(Id, Num).
  739:- endif.  740
  741%!  protect_pengine(+Id, :Goal) is semidet.
  742%
  743%   Run Goal while protecting the Pengine  Id from being destroyed. Used
  744%   by the HTTP  I/O  routines  to   avoid  that  the  Pengine's  module
  745%   disappears while I/O is in progress. We  use a pool of locks because
  746%   the lock may be held relatively long by output routines.
  747%
  748%   This also runs Goal if the Pengine no longer exists. This deals with
  749%   Pengines terminated through destroy_or_continue/1.
  750%
  751%   @bug After destroy_or_continue/1 takes the destroy route, the module
  752%   may drop-out at any point in time,   resulting  in a possible crash.
  753%   Seems the only safe way out is   to  do (de)serialization inside the
  754%   Pengine.
  755
  756:- meta_predicate protect_pengine(+, 0).  757
  758protect_pengine(Id, Goal) :-
  759    term_hash(Id, Hash),
  760    LockN is Hash mod 64,
  761    atom_concat(pengine_done_, LockN, Lock),
  762    with_mutex(Lock,
  763               (   pengine_thread(Id, _)
  764               ->  Goal
  765               ;   Goal
  766               )).
  767
  768
  769/** pengine_application(+Application) is det.
  770
  771Directive that must be used to declare a pengine application module. The
  772module must not be associated to any   file.  The default application is
  773=pengine_sandbox=.  The  example  below  creates    a   new  application
  774=address_book=  and  imports  the  API  defined    in  the  module  file
  775=adress_book_api.pl= into the application.
  776
  777  ==
  778  :- pengine_application(address_book).
  779  :- use_module(address_book:adress_book_api).
  780  ==
  781*/
  782
  783pengine_application(Application) :-
  784    throw(error(context_error(nodirective,
  785                             pengine_application(Application)), _)).
  786
  787:- multifile
  788    system:term_expansion/2,
  789    current_application/1.  790
  791%!  current_pengine_application(?Application) is nondet.
  792%
  793%   True when Application is a currently defined application.
  794%
  795%   @see pengine_application/1
  796
  797current_pengine_application(Application) :-
  798    current_application(Application).
  799
  800
  801% Default settings for all applications
  802
  803:- setting(thread_pool_size, integer, 100,
  804           'Maximum number of pengines this application can run.').  805:- setting(thread_pool_stacks, list(compound), [],
  806           'Maximum stack sizes for pengines this application can run.').  807:- setting(slave_limit, integer, 3,
  808           'Maximum number of slave pengines a master pengine can create.').  809:- setting(time_limit, number, 300,
  810           'Maximum time to wait for output').  811:- setting(idle_limit, number, 300,
  812           'Pengine auto-destroys when idle for this time').  813:- setting(safe_goal_limit, number, 10,
  814           'Maximum time to try proving safety of the goal').  815:- setting(program_space, integer, 100_000_000,
  816           'Maximum memory used by predicates').  817:- setting(allow_from, list(atom), [*],
  818           'IP addresses from which remotes are allowed to connect').  819:- setting(deny_from, list(atom), [],
  820           'IP addresses from which remotes are NOT allowed to connect').  821:- setting(debug_info, boolean, false,
  822           'Keep information to support source-level debugging').  823
  824
  825system:term_expansion((:- pengine_application(Application)), Expanded) :-
  826    must_be(atom, Application),
  827    (   module_property(Application, file(_))
  828    ->  permission_error(create, pengine_application, Application)
  829    ;   true
  830    ),
  831    expand_term((:- setting(Application:thread_pool_size, integer,
  832                            setting(pengines:thread_pool_size),
  833                            'Maximum number of pengines this \c
  834                            application can run.')),
  835                ThreadPoolSizeSetting),
  836    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  837                            setting(pengines:thread_pool_stacks),
  838                            'Maximum stack sizes for pengines \c
  839                            this application can run.')),
  840                ThreadPoolStacksSetting),
  841    expand_term((:- setting(Application:slave_limit, integer,
  842                            setting(pengines:slave_limit),
  843                            'Maximum number of local slave pengines \c
  844                            a master pengine can create.')),
  845                SlaveLimitSetting),
  846    expand_term((:- setting(Application:time_limit, number,
  847                            setting(pengines:time_limit),
  848                            'Maximum time to wait for output')),
  849                TimeLimitSetting),
  850    expand_term((:- setting(Application:idle_limit, number,
  851                            setting(pengines:idle_limit),
  852                            'Pengine auto-destroys when idle for this time')),
  853                IdleLimitSetting),
  854    expand_term((:- setting(Application:safe_goal_limit, number,
  855                            setting(pengines:safe_goal_limit),
  856                            'Maximum time to try proving safety of the goal')),
  857                SafeGoalLimitSetting),
  858    expand_term((:- setting(Application:program_space, integer,
  859                            setting(pengines:program_space),
  860                            'Maximum memory used by predicates')),
  861                ProgramSpaceSetting),
  862    expand_term((:- setting(Application:allow_from, list(atom),
  863                            setting(pengines:allow_from),
  864                            'IP addresses from which remotes are allowed \c
  865                            to connect')),
  866                AllowFromSetting),
  867    expand_term((:- setting(Application:deny_from, list(atom),
  868                            setting(pengines:deny_from),
  869                            'IP addresses from which remotes are NOT \c
  870                            allowed to connect')),
  871                DenyFromSetting),
  872    expand_term((:- setting(Application:debug_info, boolean,
  873                            setting(pengines:debug_info),
  874                            'Keep information to support source-level \c
  875                            debugging')),
  876                DebugInfoSetting),
  877    flatten([ pengines:current_application(Application),
  878              ThreadPoolSizeSetting,
  879              ThreadPoolStacksSetting,
  880              SlaveLimitSetting,
  881              TimeLimitSetting,
  882              IdleLimitSetting,
  883              SafeGoalLimitSetting,
  884              ProgramSpaceSetting,
  885              AllowFromSetting,
  886              DenyFromSetting,
  887              DebugInfoSetting
  888            ], Expanded).
  889
  890% Register default application
  891
  892:- pengine_application(pengine_sandbox).  893
  894
  895/** pengine_property(?Pengine, ?Property) is nondet.
  896
  897True when Property is a property of   the  given Pengine. Enumerates all
  898pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  899properties are:
  900
  901  * self(ID)
  902    Identifier of the pengine.  This is the same as the first argument,
  903    and can be used to enumerate all known pengines.
  904  * alias(Name)
  905    Name is the alias name of the pengine, as provided through the
  906    `alias` option when creating the pengine.
  907  * thread(Thread)
  908    If the pengine is a local pengine, Thread is the Prolog thread
  909    identifier of the pengine.
  910  * remote(Server)
  911    If the pengine is remote, the URL of the server.
  912  * application(Application)
  913    Pengine runs the given application
  914  * module(Module)
  915    Temporary module used for running the Pengine.
  916  * destroy(Destroy)
  917    Destroy is =true= if the pengines is destroyed automatically
  918    after completing the query.
  919  * parent(Queue)
  920    Message queue to which the (local) pengine reports.
  921  * source(?SourceID, ?Source)
  922    Source is the source code with the given SourceID. May be present if
  923    the setting `debug_info` is present.
  924  * detached(?Time)
  925    Pengine was detached at Time.
  926*/
  927
  928
  929pengine_property(Id, Prop) :-
  930    nonvar(Id), nonvar(Prop),
  931    pengine_property2(Prop, Id),
  932    !.
  933pengine_property(Id, Prop) :-
  934    pengine_property2(Prop, Id).
  935
  936pengine_property2(self(Id), Id) :-
  937    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  938pengine_property2(module(Id), Id) :-
  939    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  940pengine_property2(alias(Alias), Id) :-
  941    child(Alias, Id),
  942    Alias \== Id.
  943pengine_property2(thread(Thread), Id) :-
  944    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  945    Thread \== 0.
  946pengine_property2(remote(Server), Id) :-
  947    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  948pengine_property2(application(Application), Id) :-
  949    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  950pengine_property2(destroy(Destroy), Id) :-
  951    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  952pengine_property2(parent(Parent), Id) :-
  953    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  954pengine_property2(source(SourceID, Source), Id) :-
  955    pengine_data(Id, source(SourceID, Source)).
  956pengine_property2(detached(When), Id) :-
  957    pengine_detached(Id, When).
  958
  959/** pengine_output(+Term) is det
  960
  961Sends Term to the parent pengine or thread.
  962*/
  963
  964pengine_output(Term) :-
  965    pengine_self(Me),
  966    pengine_reply(output(Me, Term)).
  967
  968
  969/** pengine_debug(+Format, +Args) is det
  970
  971Create a message using format/3 from Format   and  Args and send this to
  972the    client.    The    default    JavaScript    client    will    call
  973=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  974pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  975topic pengine(debug) is enabled by default.
  976
  977@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  978@see format/2 for format specifications
  979*/
  980
  981pengine_debug(Format, Args) :-
  982    pengine_parent(Queue),
  983    pengine_self(Self),
  984    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  985    (   var(E)
  986    ->  format(atom(Message), Format, Args)
  987    ;   message_to_string(E, Message)
  988    ),
  989    pengine_reply(Queue, debug(Self, Message)).
  990
  991
  992/*================= Local pengine =======================
  993*/
  994
  995%!  local_pengine_create(+Options)
  996%
  997%   Creates  a  local   Pengine,   which    is   a   thread  running
  998%   pengine_main/2.  It maintains two predicates:
  999%
 1000%     - The global dynamic predicate id/2 relates Pengines to their
 1001%       childs.
 1002%     - The local predicate id/2 maps named childs to their ids.
 1003
 1004local_pengine_create(Options) :-
 1005    thread_self(Self),
 1006    option(application(Application), Options, pengine_sandbox),
 1007    create(Self, Child, Options, local, Application),
 1008    option(alias(Name), Options, Child),
 1009    assert(child(Name, Child)).
 1010
 1011
 1012%!  thread_pool:create_pool(+Application) is det.
 1013%
 1014%   On demand creation of a thread pool for a pengine application.
 1015
 1016:- multifile thread_pool:create_pool/1. 1017
 1018thread_pool:create_pool(Application) :-
 1019    current_application(Application),
 1020    setting(Application:thread_pool_size, Size),
 1021    setting(Application:thread_pool_stacks, Stacks),
 1022    thread_pool_create(Application, Size, Stacks).
 1023
 1024%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
 1025%
 1026%   Create a new pengine thread.
 1027%
 1028%   @arg Queue is the queue (or thread handle) to report to
 1029%   @arg Child is the identifier of the created pengine.
 1030%   @arg URL is one of =local= or =http=
 1031
 1032create(Queue, Child, Options, local, Application) :-
 1033    !,
 1034    pengine_child_id(Child),
 1035    create0(Queue, Child, Options, local, Application).
 1036create(Queue, Child, Options, URL, Application) :-
 1037    pengine_child_id(Child),
 1038    catch(create0(Queue, Child, Options, URL, Application),
 1039          Error,
 1040          create_error(Queue, Child, Error)).
 1041
 1042pengine_child_id(Child) :-
 1043    (   nonvar(Child)
 1044    ->  true
 1045    ;   pengine_uuid(Child)
 1046    ).
 1047
 1048create_error(Queue, Child, Error) :-
 1049    pengine_reply(Queue, error(Child, Error)).
 1050
 1051create0(Queue, Child, Options, URL, Application) :-
 1052    (  current_application(Application)
 1053    -> true
 1054    ;  existence_error(pengine_application, Application)
 1055    ),
 1056    (   URL \== http                    % pengine is _not_ a child of the
 1057                                        % HTTP server thread
 1058    ->  aggregate_all(count, child(_,_), Count),
 1059        setting(Application:slave_limit, Max),
 1060        (   Count >= Max
 1061        ->  throw(error(resource_error(max_pengines), _))
 1062        ;   true
 1063        )
 1064    ;   true
 1065    ),
 1066    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1067    thread_create_in_pool(
 1068        Application,
 1069        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1070        [ at_exit(pengine_done)
 1071        | RestOptions
 1072        ]),
 1073    option(destroy(Destroy), PengineOptions, true),
 1074    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1075    thread_send_message(ChildThread, pengine_registered(Child)),
 1076    (   option(id(Id), Options)
 1077    ->  Id = Child
 1078    ;   true
 1079    ).
 1080
 1081pengine_create_option(src_text(_)).
 1082pengine_create_option(src_url(_)).
 1083pengine_create_option(application(_)).
 1084pengine_create_option(destroy(_)).
 1085pengine_create_option(ask(_)).
 1086pengine_create_option(template(_)).
 1087pengine_create_option(bindings(_)).
 1088pengine_create_option(chunk(_)).
 1089pengine_create_option(alias(_)).
 1090pengine_create_option(user(_)).
 1091
 1092
 1093%!  pengine_done is det.
 1094%
 1095%   Called from the pengine thread   `at_exit`  option. Destroys _child_
 1096%   pengines  using  pengine_destroy/1.  Cleaning  up   the  Pengine  is
 1097%   synchronised by the `pengine_done` mutex. See read_event/6.
 1098
 1099:- public
 1100    pengine_done/0. 1101
 1102pengine_done :-
 1103    thread_self(Me),
 1104    (   thread_property(Me, status(exception(Ex))),
 1105        abort_exception(Ex),
 1106        thread_detach(Me),
 1107        pengine_self(Pengine)
 1108    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1109              error(_,_), true)
 1110    ;   true
 1111    ),
 1112    forall(child(_Name, Child),
 1113           pengine_destroy(Child)),
 1114    pengine_self(Id),
 1115    protect_pengine(Id, pengine_unregister(Id)).
 1116
 1117abort_exception('$aborted').
 1118abort_exception(unwind(abort)).
 1119
 1120%!  pengine_main(+Parent, +Options, +Application)
 1121%
 1122%   Run a pengine main loop. First acknowledges its creation and run
 1123%   pengine_main_loop/1.
 1124
 1125:- thread_local wrap_first_answer_in_create_event/2. 1126
 1127:- meta_predicate
 1128    pengine_prepare_source(:, +). 1129
 1130pengine_main(Parent, Options, Application) :-
 1131    fix_streams,
 1132    thread_get_message(pengine_registered(Self)),
 1133    nb_setval(pengine_parent, Parent),
 1134    pengine_register_user(Options),
 1135    set_prolog_flag(mitigate_spectre, true),
 1136    catch(in_temporary_module(
 1137              Self,
 1138              pengine_prepare_source(Application, Options),
 1139              pengine_create_and_loop(Self, Application, Options)),
 1140          prepare_source_failed,
 1141          pengine_terminate(Self)).
 1142
 1143pengine_create_and_loop(Self, Application, Options) :-
 1144    setting(Application:slave_limit, SlaveLimit),
 1145    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1146    (   option(ask(Query0), Options)
 1147    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1148        (   string(Query0)                      % string is not callable
 1149        ->  (   option(template(TemplateS), Options)
 1150            ->  Ask2 = Query0-TemplateS
 1151            ;   Ask2 = Query0
 1152            ),
 1153            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1154                  Error, true),
 1155            (   var(Error)
 1156            ->  true
 1157            ;   send_error(Error),
 1158                throw(prepare_source_failed)
 1159            )
 1160        ;   Query = Query0,
 1161            option(template(Template), Options, Query),
 1162            option(bindings(Bindings), Options, [])
 1163        ),
 1164        option(chunk(Chunk), Options, 1),
 1165        pengine_ask(Self, Query,
 1166                    [ template(Template),
 1167                      chunk(Chunk),
 1168                      bindings(Bindings)
 1169                    ])
 1170    ;   Extra = [],
 1171        pengine_reply(CreateEvent)
 1172    ),
 1173    pengine_main_loop(Self).
 1174
 1175
 1176%!  ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det.
 1177%
 1178%   Translate the AskSpec into a query, template and bindings. The trick
 1179%   is that we must parse using the  operator declarations of the source
 1180%   and we must make sure  variable   sharing  between  query and answer
 1181%   template are known.
 1182
 1183ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1184    !,
 1185    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1186    term_string(t(Template1,Ask1), AskTemplate,
 1187                [ variable_names(Bindings0),
 1188                  module(Module)
 1189                ]),
 1190    phrase(template_bindings(Template1, Bindings0), Bindings).
 1191ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1192    term_string(Ask1, Ask,
 1193                [ variable_names(Bindings),
 1194                  module(Module)
 1195                ]),
 1196    exclude(anon, Bindings, Bindings1),
 1197    dict_create(Template, swish_default_template, Bindings1).
 1198
 1199template_bindings(Var, Bindings) -->
 1200    { var(Var) }, !,
 1201    (   { var_binding(Bindings, Var, Binding)
 1202        }
 1203    ->  [Binding]
 1204    ;   []
 1205    ).
 1206template_bindings([H|T], Bindings) -->
 1207    !,
 1208    template_bindings(H, Bindings),
 1209    template_bindings(T, Bindings).
 1210template_bindings(Compoound, Bindings) -->
 1211    { compound(Compoound), !,
 1212      compound_name_arguments(Compoound, _, Args)
 1213    },
 1214    template_bindings(Args, Bindings).
 1215template_bindings(_, _) --> [].
 1216
 1217var_binding(Bindings, Var, Binding) :-
 1218    member(Binding, Bindings),
 1219    arg(2, Binding, V),
 1220    V == Var, !.
 1221
 1222%!  fix_streams is det.
 1223%
 1224%   If we are a pengine that is   created  from a web server thread,
 1225%   the current output points to a CGI stream.
 1226
 1227fix_streams :-
 1228    fix_stream(current_output).
 1229
 1230fix_stream(Name) :-
 1231    is_cgi_stream(Name),
 1232    !,
 1233    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1234    set_stream(user_output, alias(Name)).
 1235fix_stream(_).
 1236
 1237%!  pengine_prepare_source(:Application, +Options) is det.
 1238%
 1239%   Load the source into the pengine's module.
 1240%
 1241%   @throws =prepare_source_failed= if it failed to prepare the
 1242%           sources.
 1243
 1244pengine_prepare_source(Module:Application, Options) :-
 1245    setting(Application:program_space, SpaceLimit),
 1246    set_module(Module:program_space(SpaceLimit)),
 1247    delete_import_module(Module, user),
 1248    add_import_module(Module, Application, start),
 1249    catch(prep_module(Module, Application, Options), Error, true),
 1250    (   var(Error)
 1251    ->  true
 1252    ;   send_error(Error),
 1253        throw(prepare_source_failed)
 1254    ).
 1255
 1256prep_module(Module, Application, Options) :-
 1257    maplist(copy_flag(Module, Application), [var_prefix]),
 1258    forall(prepare_module(Module, Application, Options), true),
 1259    setup_call_cleanup(
 1260        '$set_source_module'(OldModule, Module),
 1261        maplist(process_create_option(Module), Options),
 1262        '$set_source_module'(OldModule)).
 1263
 1264copy_flag(Module, Application, Flag) :-
 1265    current_prolog_flag(Application:Flag, Value),
 1266    !,
 1267    set_prolog_flag(Module:Flag, Value).
 1268copy_flag(_, _, _).
 1269
 1270process_create_option(Application, src_text(Text)) :-
 1271    !,
 1272    pengine_src_text(Text, Application).
 1273process_create_option(Application, src_url(URL)) :-
 1274    !,
 1275    pengine_src_url(URL, Application).
 1276process_create_option(_, _).
 1277
 1278
 1279%!  prepare_module(+Module, +Application, +Options) is semidet.
 1280%
 1281%   Hook, called to initialize  the   temporary  private module that
 1282%   provides the working context of a pengine. This hook is executed
 1283%   by the pengine's thread.  Preparing the source consists of three
 1284%   steps:
 1285%
 1286%     1. Add Application as (first) default import module for Module
 1287%     2. Call this hook
 1288%     3. Compile the source provided by the the `src_text` and
 1289%        `src_url` options
 1290%
 1291%   @arg    Module is a new temporary module (see
 1292%           in_temporary_module/3) that may be (further) prepared
 1293%           by this hook.
 1294%   @arg    Application (also a module) associated to the pengine.
 1295%   @arg    Options is passed from the environment and should
 1296%           (currently) be ignored.
 1297
 1298
 1299pengine_main_loop(ID) :-
 1300    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1301
 1302pengine_aborted(ID) :-
 1303    thread_self(Self),
 1304    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1305    empty_queue,
 1306    destroy_or_continue(abort(ID)).
 1307
 1308
 1309%!  guarded_main_loop(+Pengine) is det.
 1310%
 1311%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1312%   events:
 1313%
 1314%     - destroy
 1315%     Terminate the pengine
 1316%     - ask(:Goal, +Options)
 1317%     Solve Goal.
 1318
 1319guarded_main_loop(ID) :-
 1320    pengine_request(Request),
 1321    (   Request = destroy
 1322    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1323        pengine_terminate(ID)
 1324    ;   Request = ask(Goal, Options)
 1325    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1326        ask(ID, Goal, Options)
 1327    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1328        pengine_reply(error(ID, error(protocol_error, _))),
 1329        guarded_main_loop(ID)
 1330    ).
 1331
 1332
 1333pengine_terminate(ID) :-
 1334    pengine_reply(destroy(ID)),
 1335    thread_self(Me),            % Make the thread silently disappear
 1336    thread_detach(Me).
 1337
 1338
 1339%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1340%
 1341%   Solve Goal. Note that because we can ask for a new goal in state
 1342%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1343%   need to be sure to  have  a   choice  point  before  we can call
 1344%   prolog_current_choice/1. This is the reason   why this predicate
 1345%   has two clauses.
 1346
 1347solve(Chunk, Template, Goal, ID) :-
 1348    prolog_current_choice(Choice),
 1349    (   integer(Chunk)
 1350    ->  State = count(Chunk)
 1351    ;   Chunk == false
 1352    ->  State = no_chunk
 1353    ;   domain_error(chunk, Chunk)
 1354    ),
 1355    statistics(cputime, Epoch),
 1356    Time = time(Epoch),
 1357    nb_current('$variable_names', Bindings),
 1358    filter_template(Template, Bindings, Template2),
 1359    '$current_typein_module'(CurrTypeIn),
 1360    (   '$set_typein_module'(ID),
 1361        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1362                                              set_projection(Goal, Bindings),
 1363                                              Result),
 1364                           Error, true),
 1365                     query_done(Det, CurrTypeIn)),
 1366        arg(1, Time, T0),
 1367        statistics(cputime, T1),
 1368        CPUTime is T1-T0,
 1369        forall(pengine_flush_output_hook, true),
 1370        (   var(Error)
 1371        ->  projection(Projection),
 1372            (   var(Det)
 1373            ->  pengine_reply(success(ID, Result, Projection,
 1374                                      CPUTime, true)),
 1375                more_solutions(ID, Choice, State, Time)
 1376            ;   !,                      % commit
 1377                destroy_or_continue(success(ID, Result, Projection,
 1378                                            CPUTime, false))
 1379            )
 1380        ;   !,                          % commit
 1381            (   Error == abort_query
 1382            ->  throw(Error)
 1383            ;   destroy_or_continue(error(ID, Error))
 1384            )
 1385        )
 1386    ;   !,                              % commit
 1387        arg(1, Time, T0),
 1388        statistics(cputime, T1),
 1389        CPUTime is T1-T0,
 1390        destroy_or_continue(failure(ID, CPUTime))
 1391    ).
 1392solve(_, _, _, _).                      % leave a choice point
 1393
 1394query_done(true, CurrTypeIn) :-
 1395    '$set_typein_module'(CurrTypeIn).
 1396
 1397
 1398%!  set_projection(:Goal, +Bindings)
 1399%
 1400%   findnsols_no_empty/4  copies  its  goal  and    template   to  avoid
 1401%   instantiation thereof when it stops after finding N solutions. Using
 1402%   this helper we can a renamed version of Bindings that we can set.
 1403
 1404set_projection(Goal, Bindings) :-
 1405    b_setval('$variable_names', Bindings),
 1406    call(Goal).
 1407
 1408projection(Projection) :-
 1409    nb_current('$variable_names', Bindings),
 1410    !,
 1411    maplist(var_name, Bindings, Projection).
 1412projection([]).
 1413
 1414%!  filter_template(+Template0, +Bindings, -Template) is det.
 1415%
 1416%   Establish the final template. This is   there  because hooks such as
 1417%   goal_expansion/2 and the SWISH query  hooks   can  modify the set of
 1418%   bindings.
 1419%
 1420%   @bug Projection and template handling is pretty messy.
 1421
 1422filter_template(Template0, Bindings, Template) :-
 1423    is_dict(Template0, swish_default_template),
 1424    !,
 1425    dict_create(Template, swish_default_template, Bindings).
 1426filter_template(Template, _Bindings, Template).
 1427
 1428findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1429    List = [Template],
 1430    call(Goal).
 1431findnsols_no_empty(State, Template, Goal, List) =>
 1432    findnsols(State, Template, Goal, List),
 1433    List \== [].
 1434
 1435destroy_or_continue(Event) :-
 1436    arg(1, Event, ID),
 1437    (   pengine_property(ID, destroy(true))
 1438    ->  thread_self(Me),
 1439        thread_detach(Me),
 1440        pengine_reply(destroy(ID, Event))
 1441    ;   pengine_reply(Event),
 1442        guarded_main_loop(ID)
 1443    ).
 1444
 1445%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1446%
 1447%   Called after a solution was found while  there can be more. This
 1448%   is state `6' of the state machine. It processes these events:
 1449%
 1450%     * stop
 1451%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1452%     * next
 1453%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1454%     providing at most the current `chunk` solutions.
 1455%     * next(Count)
 1456%     As `next`, but sets the new chunk-size to Count.
 1457%     * ask(Goal, Options)
 1458%     Ask another goal.  Note that we must commit the choice point
 1459%     of the previous goal asked for.
 1460
 1461more_solutions(ID, Choice, State, Time) :-
 1462    pengine_request(Event),
 1463    more_solutions(Event, ID, Choice, State, Time).
 1464
 1465more_solutions(stop, ID, _Choice, _State, _Time) :-
 1466    !,
 1467    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1468    destroy_or_continue(stop(ID)).
 1469more_solutions(next, ID, _Choice, _State, Time) :-
 1470    !,
 1471    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1472    statistics(cputime, T0),
 1473    nb_setarg(1, Time, T0),
 1474    fail.
 1475more_solutions(next(Count), ID, _Choice, State, Time) :-
 1476    Count > 0,
 1477    State = count(_),                   % else fallthrough to protocol error
 1478    !,
 1479    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1480    nb_setarg(1, State, Count),
 1481    statistics(cputime, T0),
 1482    nb_setarg(1, Time, T0),
 1483    fail.
 1484more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1485    !,
 1486    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1487    prolog_cut_to(Choice),
 1488    ask(ID, Goal, Options).
 1489more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1490    !,
 1491    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1492    pengine_terminate(ID).
 1493more_solutions(Event, ID, Choice, State, Time) :-
 1494    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1495    pengine_reply(error(ID, error(protocol_error, _))),
 1496    more_solutions(ID, Choice, State, Time).
 1497
 1498%!  ask(+Pengine, :Goal, +Options)
 1499%
 1500%   Migrate from state `2' to `3'.  This predicate validates that it
 1501%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1502%   prove the goal. It takes care of the chunk(N) option.
 1503
 1504ask(ID, Goal, Options) :-
 1505    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1506    !,
 1507    (   var(Error)
 1508    ->  option(template(Template), Options, Goal),
 1509        option(chunk(N), Options, 1),
 1510        solve(N, Template, Goal1, ID)
 1511    ;   pengine_reply(error(ID, Error)),
 1512        guarded_main_loop(ID)
 1513    ).
 1514
 1515%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1516%
 1517%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1518%   perform goal expansion and, if the   system  is sandboxed, check
 1519%   the sandbox.
 1520%
 1521%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1522%   to write, but this does not work correctly if the user wishes to
 1523%   expand `X:Y` while interpreting `X` not   as the module in which
 1524%   to run `Y`. This happens in the  CQL package. Possibly we should
 1525%   disallow this reinterpretation?
 1526
 1527prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1528    option(bindings(Bindings), Options, []),
 1529    b_setval('$variable_names', Bindings),
 1530    (   prepare_goal(Goal0, Goal1, Options)
 1531    ->  true
 1532    ;   Goal1 = Goal0
 1533    ),
 1534    get_pengine_module(ID, Module),
 1535    setup_call_cleanup(
 1536        '$set_source_module'(Old, Module),
 1537        expand_goal(Goal1, Goal),
 1538        '$set_source_module'(_, Old)),
 1539    (   pengine_not_sandboxed(ID)
 1540    ->  true
 1541    ;   get_pengine_application(ID, App),
 1542        setting(App:safe_goal_limit, Limit),
 1543        catch(call_with_time_limit(
 1544                  Limit,
 1545                  safe_goal(Module:Goal)), E, true)
 1546    ->  (   var(E)
 1547        ->  true
 1548        ;   E = time_limit_exceeded
 1549        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1550        ;   throw(E)
 1551        )
 1552    ).
 1553
 1554
 1555%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1556%
 1557%   Pre-preparation hook for running Goal0. The hook runs in the context
 1558%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1559%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1560%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1561%   Goal0 is used for further processing.
 1562%
 1563%   @arg Options provides the options as given to _ask_
 1564
 1565
 1566%%  pengine_not_sandboxed(+Pengine) is semidet.
 1567%
 1568%   True when pengine does not operate in sandboxed mode. This implies a
 1569%   user must be  registered  by   authentication_hook/3  and  the  hook
 1570%   pengines:not_sandboxed(User, Application) must succeed.
 1571
 1572pengine_not_sandboxed(ID) :-
 1573    pengine_user(ID, User),
 1574    pengine_property(ID, application(App)),
 1575    not_sandboxed(User, App),
 1576    !.
 1577
 1578%%  not_sandboxed(+User, +Application) is semidet.
 1579%
 1580%   This hook is called to see whether the Pengine must be executed in a
 1581%   protected environment. It is only called after authentication_hook/3
 1582%   has confirmed the authentity  of  the   current  user.  If this hook
 1583%   succeeds, both loading the code and  executing the query is executed
 1584%   without enforcing sandbox security.  Typically, one should:
 1585%
 1586%     1. Provide a safe user authentication hook.
 1587%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1588%        ensure that the network between the proxy and the pengine
 1589%        server can be trusted.
 1590
 1591
 1592/** pengine_pull_response(+Pengine, +Options) is det
 1593
 1594Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1595remote process, else does nothing at all.
 1596*/
 1597
 1598pengine_pull_response(Pengine, Options) :-
 1599    pengine_remote(Pengine, Server),
 1600    !,
 1601    remote_pengine_pull_response(Server, Pengine, Options).
 1602pengine_pull_response(_ID, _Options).
 1603
 1604
 1605/** pengine_input(+Prompt, -Term) is det
 1606
 1607Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be
 1608any term, compound as well as atomic.
 1609*/
 1610
 1611pengine_input(Prompt, Term) :-
 1612    pengine_self(Self),
 1613    pengine_parent(Parent),
 1614    pengine_reply(Parent, prompt(Self, Prompt)),
 1615    pengine_request(Request),
 1616    (   Request = input(Input)
 1617    ->  Term = Input
 1618    ;   Request == destroy
 1619    ->  abort
 1620    ;   throw(error(protocol_error,_))
 1621    ).
 1622
 1623
 1624/** pengine_respond(+Pengine, +Input, +Options) is det
 1625
 1626Sends a response in the form of the term Input to a slave (child) pengine
 1627that has prompted its master (parent) for input.
 1628
 1629Defined in terms of pengine_send/3, as follows:
 1630
 1631==
 1632pengine_respond(Pengine, Input, Options) :-
 1633    pengine_send(Pengine, input(Input), Options).
 1634==
 1635
 1636*/
 1637
 1638pengine_respond(Pengine, Input, Options) :-
 1639    pengine_send(Pengine, input(Input), Options).
 1640
 1641
 1642%!  send_error(+Error) is det.
 1643%
 1644%   Send an error to my parent.   Remove non-readable blobs from the
 1645%   error term first using replace_blobs/2. If  the error contains a
 1646%   stack-trace, this is resolved to a string before sending.
 1647
 1648send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1649    is_list(Frames),
 1650    !,
 1651    with_output_to(string(Stack),
 1652                   print_prolog_backtrace(current_output, Frames)),
 1653    pengine_self(Self),
 1654    replace_blobs(Formal, Formal1),
 1655    replace_blobs(Message, Message1),
 1656    pengine_reply(error(Self, error(Formal1,
 1657                                    context(prolog_stack(Stack), Message1)))).
 1658send_error(Error) :-
 1659    pengine_self(Self),
 1660    replace_blobs(Error, Error1),
 1661    pengine_reply(error(Self, Error1)).
 1662
 1663%!  replace_blobs(Term0, Term) is det.
 1664%
 1665%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1666%   for error messages that may hold   streams  and other handles to
 1667%   non-readable objects.
 1668
 1669replace_blobs(Blob, Atom) :-
 1670    blob(Blob, Type), Type \== text,
 1671    !,
 1672    format(atom(Atom), '~p', [Blob]).
 1673replace_blobs(Term0, Term) :-
 1674    compound(Term0),
 1675    !,
 1676    compound_name_arguments(Term0, Name, Args0),
 1677    maplist(replace_blobs, Args0, Args),
 1678    compound_name_arguments(Term, Name, Args).
 1679replace_blobs(Term, Term).
 1680
 1681
 1682/*================= Remote pengines =======================
 1683*/
 1684
 1685
 1686remote_pengine_create(BaseURL, Options) :-
 1687    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1688    (   option(ask(Query), PengineOptions0),
 1689        \+ option(template(_Template), PengineOptions0)
 1690    ->  PengineOptions = [template(Query)|PengineOptions0]
 1691    ;   PengineOptions = PengineOptions0
 1692    ),
 1693    options_to_dict(PengineOptions, PostData),
 1694    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1695    arg(1, Reply, ID),
 1696    (   option(id(ID2), Options)
 1697    ->  ID = ID2
 1698    ;   true
 1699    ),
 1700    option(alias(Name), Options, ID),
 1701    assert(child(Name, ID)),
 1702    (   (   functor(Reply, create, _)   % actually created
 1703        ;   functor(Reply, output, _)   % compiler messages
 1704        )
 1705    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1706        option(destroy(Destroy), PengineOptions, true),
 1707        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1708    ;   true
 1709    ),
 1710    thread_self(Queue),
 1711    pengine_reply(Queue, Reply).
 1712
 1713options_to_dict(Options, Dict) :-
 1714    select_option(ask(Ask), Options, Options1),
 1715    select_option(template(Template), Options1, Options2),
 1716    !,
 1717    no_numbered_var_in(Ask+Template),
 1718    findall(AskString-TemplateString,
 1719            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1720            [ AskString-TemplateString ]),
 1721    options_to_dict(Options2, Dict0),
 1722    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1723options_to_dict(Options, Dict) :-
 1724    maplist(prolog_option, Options, Options1),
 1725    dict_create(Dict, _, Options1).
 1726
 1727no_numbered_var_in(Term) :-
 1728    sub_term(Sub, Term),
 1729    subsumes_term('$VAR'(_), Sub),
 1730    !,
 1731    domain_error(numbered_vars_free_term, Term).
 1732no_numbered_var_in(_).
 1733
 1734ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1735    numbervars(Ask+Template, 0, _),
 1736    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1737    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1738                                            Template, WOpts
 1739                                          ]),
 1740    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1741
 1742prolog_option(Option0, Option) :-
 1743    create_option_type(Option0, term),
 1744    !,
 1745    Option0 =.. [Name,Value],
 1746    format(string(String), '~k', [Value]),
 1747    Option =.. [Name,String].
 1748prolog_option(Option, Option).
 1749
 1750create_option_type(ask(_),         term).
 1751create_option_type(template(_),    term).
 1752create_option_type(application(_), atom).
 1753
 1754remote_pengine_send(BaseURL, ID, Event, Options) :-
 1755    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1756    thread_self(Queue),
 1757    pengine_reply(Queue, Reply).
 1758
 1759remote_pengine_pull_response(BaseURL, ID, Options) :-
 1760    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1761    thread_self(Queue),
 1762    pengine_reply(Queue, Reply).
 1763
 1764remote_pengine_abort(BaseURL, ID, Options) :-
 1765    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1766    thread_self(Queue),
 1767    pengine_reply(Queue, Reply).
 1768
 1769%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1770%
 1771%   Issue a GET request on Server and   unify Reply with the replied
 1772%   term.
 1773
 1774remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1775    !,
 1776    server_url(Server, Action, [id=ID], URL),
 1777    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1778              [ post(prolog(Event))     % makes it impossible to interrupt.
 1779              | Options
 1780              ]),
 1781    call_cleanup(
 1782        read_prolog_reply(Stream, Reply),
 1783        close(Stream)).
 1784remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1785    server_url(Server, Action, [id=ID|Params], URL),
 1786    http_open(URL, Stream, Options),
 1787    call_cleanup(
 1788        read_prolog_reply(Stream, Reply),
 1789        close(Stream)).
 1790
 1791remote_post_rec(Server, Action, Data, Reply, Options) :-
 1792    server_url(Server, Action, [], URL),
 1793    probe(Action, URL, Options),
 1794    http_open(URL, Stream,
 1795              [ post(json(Data))
 1796              | Options
 1797              ]),
 1798    call_cleanup(
 1799        read_prolog_reply(Stream, Reply),
 1800        close(Stream)).
 1801
 1802%!  probe(+Action, +URL) is det.
 1803%
 1804%   Probe the target. This is a  good   idea  before posting a large
 1805%   document and be faced with an authentication challenge. Possibly
 1806%   we should make this an option for simpler scenarios.
 1807
 1808probe(create, URL, Options) :-
 1809    !,
 1810    http_open(URL, Stream, [method(options)|Options]),
 1811    close(Stream).
 1812probe(_, _, _).
 1813
 1814read_prolog_reply(In, Reply) :-
 1815    set_stream(In, encoding(utf8)),
 1816    read(In, Reply0),
 1817    rebind_cycles(Reply0, Reply).
 1818
 1819rebind_cycles(@(Reply, Bindings), Reply) :-
 1820    is_list(Bindings),
 1821    !,
 1822    maplist(bind, Bindings).
 1823rebind_cycles(Reply, Reply).
 1824
 1825bind(Var = Value) :-
 1826    Var = Value.
 1827
 1828server_url(Server, Action, Params, URL) :-
 1829    atom_concat('pengine/', Action, PAction),
 1830    uri_edit([ path(PAction),
 1831               search(Params)
 1832             ], Server, URL).
 1833
 1834
 1835/** pengine_event(?EventTerm) is det.
 1836    pengine_event(?EventTerm, +Options) is det.
 1837
 1838Examines the pengine's event queue  and   if  necessary blocks execution
 1839until a term that unifies to Term  arrives   in  the queue. After a term
 1840from the queue has been unified to Term,   the  term is deleted from the
 1841queue.
 1842
 1843   Valid options are:
 1844
 1845   * timeout(+Time)
 1846     Time is a float or integer and specifies the maximum time to wait
 1847     in seconds. If no event has arrived before the time is up EventTerm
 1848     is bound to the atom =timeout=.
 1849   * listen(+Id)
 1850     Only listen to events from the pengine identified by Id.
 1851*/
 1852
 1853pengine_event(Event) :-
 1854    pengine_event(Event, []).
 1855
 1856pengine_event(Event, Options) :-
 1857    thread_self(Self),
 1858    option(listen(Id), Options, _),
 1859    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1860    ->  true
 1861    ;   Event = timeout
 1862    ),
 1863    update_remote_destroy(Event).
 1864
 1865update_remote_destroy(Event) :-
 1866    destroy_event(Event),
 1867    arg(1, Event, Id),
 1868    pengine_remote(Id, _Server),
 1869    !,
 1870    pengine_unregister_remote(Id).
 1871update_remote_destroy(_).
 1872
 1873destroy_event(destroy(_)).
 1874destroy_event(destroy(_,_)).
 1875destroy_event(create(_,Features)) :-
 1876    memberchk(answer(Answer), Features),
 1877    !,
 1878    nonvar(Answer),
 1879    destroy_event(Answer).
 1880
 1881
 1882/** pengine_event_loop(:Closure, +Options) is det
 1883
 1884Starts an event loop accepting event terms   sent to the current pengine
 1885or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1886closure thus acts as a _handler_  for   the  event. Some events are also
 1887treated specially:
 1888
 1889   * create(ID, Term)
 1890     The ID is placed in a list of active pengines.
 1891
 1892   * destroy(ID)
 1893     The ID is removed from the list of active pengines. When the last
 1894     pengine ID is removed, the loop terminates.
 1895
 1896   * output(ID, Term)
 1897     The predicate pengine_pull_response/2 is called.
 1898
 1899Valid options are:
 1900
 1901   * autoforward(+To)
 1902     Forwards received event terms to slaves. To is either =all=,
 1903     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1904     implemented]
 1905
 1906*/
 1907
 1908pengine_event_loop(Closure, Options) :-
 1909    child(_,_),
 1910    !,
 1911    pengine_event(Event),
 1912    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1913    ->  forall(child(_,ID), pengine_send(ID, Event))
 1914    ;   true
 1915    ),
 1916    pengine_event_loop(Event, Closure, Options).
 1917pengine_event_loop(_, _).
 1918
 1919:- meta_predicate
 1920    pengine_process_event(+, 1, -, +). 1921
 1922pengine_event_loop(Event, Closure, Options) :-
 1923    pengine_process_event(Event, Closure, Continue, Options),
 1924    (   Continue == true
 1925    ->  pengine_event_loop(Closure, Options)
 1926    ;   true
 1927    ).
 1928
 1929pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1930    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1931    (   select(answer(First), T, T1)
 1932    ->  ignore(call(Closure, create(ID, T1))),
 1933        pengine_process_event(First, Closure, Continue, Options)
 1934    ;   ignore(call(Closure, create(ID, T))),
 1935        Continue = true
 1936    ).
 1937pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1938    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1939    ignore(call(Closure, output(ID, Msg))),
 1940    pengine_pull_response(ID, []).
 1941pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1942    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1943    ignore(call(Closure, debug(ID, Msg))),
 1944    pengine_pull_response(ID, []).
 1945pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1946    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1947    ignore(call(Closure, prompt(ID, Term))).
 1948pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1949    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1950    ignore(call(Closure, success(ID, Sol, More))).
 1951pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1952    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1953    ignore(call(Closure, failure(ID))).
 1954pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1955    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1956    (   call(Closure, error(ID, Error))
 1957    ->  Continue = true
 1958    ;   forall(child(_,Child), pengine_destroy(Child)),
 1959        throw(Error)
 1960    ).
 1961pengine_process_event(stop(ID), Closure, true, _Options) :-
 1962    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1963    ignore(call(Closure, stop(ID))).
 1964pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1965    pengine_process_event(Event, Closure, _, Options),
 1966    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1967pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1968    retractall(child(_,ID)),
 1969    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1970    ignore(call(Closure, destroy(ID))).
 1971
 1972
 1973/** pengine_rpc(+URL, +Query) is nondet.
 1974    pengine_rpc(+URL, +Query, +Options) is nondet.
 1975
 1976Semantically equivalent to the sequence below,  except that the query is
 1977executed in (and in the Prolog context   of) the pengine server referred
 1978to by URL, rather than locally.
 1979
 1980  ==
 1981    copy_term_nat(Query, Copy),  % attributes are not copied to the server
 1982    call(Copy),			 % executed on server at URL
 1983    Query = Copy.
 1984  ==
 1985
 1986Valid options are:
 1987
 1988    - chunk(+IntegerOrFalse)
 1989      Can be used to reduce the number of network roundtrips being made.
 1990      See pengine_ask/3.
 1991    - timeout(+Time)
 1992      Wait at most Time seconds for the next event from the server.
 1993      The default is defined by the setting `pengines:time_limit`.
 1994
 1995Remaining  options  (except   the   server    option)   are   passed  to
 1996pengine_create/1.
 1997*/
 1998
 1999pengine_rpc(URL, Query) :-
 2000    pengine_rpc(URL, Query, []).
 2001
 2002pengine_rpc(URL, Query, M:Options0) :-
 2003    translate_local_sources(Options0, Options1, M),
 2004    (  option(timeout(_), Options1)
 2005    -> Options = Options1
 2006    ;  setting(time_limit, Limit),
 2007       Options = [timeout(Limit)|Options1]
 2008    ),
 2009    term_variables(Query, Vars),
 2010    Template =.. [v|Vars],
 2011    State = destroy(true),              % modified by process_event/4
 2012    setup_call_catcher_cleanup(
 2013        pengine_create([ ask(Query),
 2014                         template(Template),
 2015                         server(URL),
 2016                         id(Id)
 2017                       | Options
 2018                       ]),
 2019        wait_event(Template, State, [listen(Id)|Options]),
 2020        Why,
 2021        pengine_destroy_and_wait(State, Id, Why, Options)).
 2022
 2023pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
 2024    !,
 2025    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2026    pengine_destroy(Id, Options),
 2027    wait_destroy(Id, 10).
 2028pengine_destroy_and_wait(_, _, Why, _) :-
 2029    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2030
 2031wait_destroy(Id, _) :-
 2032    \+ child(_, Id),
 2033    !.
 2034wait_destroy(Id, N) :-
 2035    pengine_event(Event, [listen(Id),timeout(10)]),
 2036    !,
 2037    (   destroy_event(Event)
 2038    ->  retractall(child(_,Id))
 2039    ;   succ(N1, N)
 2040    ->  wait_destroy(Id, N1)
 2041    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2042        pengine_unregister_remote(Id),
 2043        retractall(child(_,Id))
 2044    ).
 2045
 2046wait_event(Template, State, Options) :-
 2047    pengine_event(Event, Options),
 2048    debug(pengine(event), 'Received ~p', [Event]),
 2049    process_event(Event, Template, State, Options).
 2050
 2051process_event(create(_ID, Features), Template, State, Options) :-
 2052    memberchk(answer(First), Features),
 2053    process_event(First, Template, State, Options).
 2054process_event(error(_ID, Error), _Template, _, _Options) :-
 2055    throw(Error).
 2056process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2057    fail.
 2058process_event(prompt(ID, Prompt), Template, State, Options) :-
 2059    pengine_rpc_prompt(ID, Prompt, Reply),
 2060    pengine_send(ID, input(Reply)),
 2061    wait_event(Template, State, Options).
 2062process_event(output(ID, Term), Template, State, Options) :-
 2063    pengine_rpc_output(ID, Term),
 2064    pengine_pull_response(ID, Options),
 2065    wait_event(Template, State, Options).
 2066process_event(debug(ID, Message), Template, State, Options) :-
 2067    debug(pengine(debug), '~w', [Message]),
 2068    pengine_pull_response(ID, Options),
 2069    wait_event(Template, State, Options).
 2070process_event(success(_ID, Solutions, _Proj, _Time, false),
 2071              Template, _, _Options) :-
 2072    !,
 2073    member(Template, Solutions).
 2074process_event(success(ID, Solutions, _Proj, _Time, true),
 2075              Template, State, Options) :-
 2076    (   member(Template, Solutions)
 2077    ;   pengine_next(ID, Options),
 2078        wait_event(Template, State, Options)
 2079    ).
 2080process_event(destroy(ID, Event), Template, State, Options) :-
 2081    !,
 2082    retractall(child(_,ID)),
 2083    nb_setarg(1, State, false),
 2084    debug(pengine(destroy), 'State: ~p~n', [State]),
 2085    process_event(Event, Template, State, Options).
 2086% compatibility with older versions of the protocol.
 2087process_event(success(ID, Solutions, Time, More),
 2088              Template, State, Options) :-
 2089    process_event(success(ID, Solutions, _Proj, Time, More),
 2090                  Template, State, Options).
 2091
 2092
 2093pengine_rpc_prompt(ID, Prompt, Term) :-
 2094    prompt(ID, Prompt, Term0),
 2095    !,
 2096    Term = Term0.
 2097pengine_rpc_prompt(_ID, Prompt, Term) :-
 2098    setup_call_cleanup(
 2099        prompt(Old, Prompt),
 2100        read(Term),
 2101        prompt(_, Old)).
 2102
 2103pengine_rpc_output(ID, Term) :-
 2104    output(ID, Term),
 2105    !.
 2106pengine_rpc_output(_ID, Term) :-
 2107    print(Term).
 2108
 2109%%  prompt(+ID, +Prompt, -Term) is semidet.
 2110%
 2111%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 2112%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 2113
 2114:- multifile prompt/3. 2115
 2116%%  output(+ID, +Term) is semidet.
 2117%
 2118%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 2119%   fails, it calls print/1 on Term.
 2120
 2121:- multifile output/2. 2122
 2123
 2124/*================= HTTP handlers =======================
 2125*/
 2126
 2127%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2128%   time_limit(inifinite) because pengines have their  own timeout. Also
 2129%   note that we use spawn. This  is   needed  because we can easily get
 2130%   many clients waiting for  some  action   on  a  pengine to complete.
 2131%   Without spawning, we would quickly exhaust   the  worker pool of the
 2132%   HTTP server.
 2133%
 2134%   FIXME: probably we should wait for a   short time for the pengine on
 2135%   the default worker thread. Only if  that   time  has expired, we can
 2136%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2137%   improve the performance and reduce the usage of threads.
 2138
 2139:- multifile http:location/3. 2140http:location(pengine, root(pengine), [-100]).
 2141
 2142:- http_handler(pengine(.),             http_404([]),
 2143                [ id(pengines) ]). 2144:- http_handler(pengine(create),        http_pengine_create,
 2145                [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(pengine(send),          http_pengine_send,
 2147                [ time_limit(infinite), spawn([]) ]). 2148:- http_handler(pengine(pull_response), http_pengine_pull_response,
 2149                [ time_limit(infinite), spawn([]) ]). 2150:- http_handler(pengine(abort),         http_pengine_abort,         []). 2151:- http_handler(pengine(detach),        http_pengine_detach,        []). 2152:- http_handler(pengine(list),          http_pengine_list,          []). 2153:- http_handler(pengine(ping),          http_pengine_ping,          []). 2154:- http_handler(pengine(destroy_all),   http_pengine_destroy_all,   []). 2155
 2156:- http_handler(pengine('pengines.js'),
 2157                http_reply_file(library('http/web/js/pengines.js'), []), []). 2158:- http_handler(pengine('plterm.css'),
 2159                http_reply_file(library('http/web/css/plterm.css'), []), []). 2160
 2161
 2162%%  http_pengine_create(+Request)
 2163%
 2164%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 2165%   pengine  creation  parameters  both  as  =application/json=  and  as
 2166%   =www-form-encoded=.  Accepted parameters:
 2167%
 2168%     | **Parameter** | **Default**       | **Comment**                |
 2169%     |---------------|-------------------|----------------------------|
 2170%     | format        | `prolog`          | Output format              |
 2171%     | application   | `pengine_sandbox` | Pengine application        |
 2172%     | chunk         | 1                 | Chunk-size for results     |
 2173%     | collate       | 0 (off)           | Join output events         |
 2174%     | solutions     | chunked           | If `all`, emit all results |
 2175%     | ask           | -                 | The query                  |
 2176%     | template      | -                 | Output template            |
 2177%     | src_text      | ""                | Program                    |
 2178%     | src_url       | -                 | Program to download        |
 2179%     | disposition   | -                 | Download location          |
 2180%
 2181%     Note that solutions=all internally  uses   chunking  to obtain the
 2182%     results from the pengine, but the results are combined in a single
 2183%     HTTP reply. This is currently only  implemented by the CSV backend
 2184%     that is part of SWISH for   downloading unbounded result sets with
 2185%     limited memory resources.
 2186%
 2187%     Using  `chunk=false`  simulates  the   _recursive  toplevel_.  See
 2188%     pengine_ask/3.
 2189
 2190http_pengine_create(Request) :-
 2191    reply_options(Request, [post]),
 2192    !.
 2193http_pengine_create(Request) :-
 2194    memberchk(content_type(CT), Request),
 2195    sub_atom(CT, 0, _, _, 'application/json'),
 2196    !,
 2197    http_read_json_dict(Request, Dict),
 2198    dict_atom_option(format, Dict, Format, prolog),
 2199    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2200    http_pengine_create(Request, Application, Format, Dict).
 2201http_pengine_create(Request) :-
 2202    Optional = [optional(true)],
 2203    OptString = [string|Optional],
 2204    Form = [ format(Format, [default(prolog)]),
 2205             application(Application, [default(pengine_sandbox)]),
 2206             chunk(_, [nonneg;oneof([false]), default(1)]),
 2207             collate(_, [number, default(0)]),
 2208             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2209             ask(_, OptString),
 2210             template(_, OptString),
 2211             src_text(_, OptString),
 2212             disposition(_, OptString),
 2213             src_url(_, Optional)
 2214           ],
 2215    http_parameters(Request, Form),
 2216    form_dict(Form, Dict),
 2217    http_pengine_create(Request, Application, Format, Dict).
 2218
 2219dict_atom_option(Key, Dict, Atom, Default) :-
 2220    (   get_dict(Key, Dict, String)
 2221    ->  atom_string(Atom, String)
 2222    ;   Atom = Default
 2223    ).
 2224
 2225form_dict(Form, Dict) :-
 2226    form_values(Form, Pairs),
 2227    dict_pairs(Dict, _, Pairs).
 2228
 2229form_values([], []).
 2230form_values([H|T], Pairs) :-
 2231    arg(1, H, Value),
 2232    nonvar(Value),
 2233    !,
 2234    functor(H, Name, _),
 2235    Pairs = [Name-Value|PairsT],
 2236    form_values(T, PairsT).
 2237form_values([_|T], Pairs) :-
 2238    form_values(T, Pairs).
 2239
 2240%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2241
 2242
 2243http_pengine_create(Request, Application, Format, Dict) :-
 2244    current_application(Application),
 2245    !,
 2246    allowed(Request, Application),
 2247    authenticate(Request, Application, UserOptions),
 2248    dict_to_options(Dict, Application, CreateOptions0),
 2249    append(UserOptions, CreateOptions0, CreateOptions),
 2250    pengine_uuid(Pengine),
 2251    message_queue_create(Queue, [max_size(25)]),
 2252    setting(Application:time_limit, TimeLimit),
 2253    get_time(Now),
 2254    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2255    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2256    create(Queue, Pengine, CreateOptions, http, Application),
 2257    create_wait_and_output_result(Pengine, Queue, Format,
 2258                                  TimeLimit, Dict),
 2259    gc_abandoned_queues.
 2260http_pengine_create(_Request, Application, Format, _Dict) :-
 2261    Error = existence_error(pengine_application, Application),
 2262    pengine_uuid(ID),
 2263    output_result(ID, Format, error(ID, error(Error, _))).
 2264
 2265
 2266dict_to_options(Dict, Application, CreateOptions) :-
 2267    dict_pairs(Dict, _, Pairs),
 2268    pairs_create_options(Pairs, Application, CreateOptions).
 2269
 2270pairs_create_options([], _, []) :- !.
 2271pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2272    Opt =.. [N,V],
 2273    pengine_create_option(Opt), N \== user,
 2274    !,
 2275    (   create_option_type(Opt, atom)
 2276    ->  atom_string(V, V0)               % term creation must be done if
 2277    ;   V = V0                           % we created the source and know
 2278    ),                                   % the operators.
 2279    pairs_create_options(T0, App, T).
 2280pairs_create_options([_|T0], App, T) :-
 2281    pairs_create_options(T0, App, T).
 2282
 2283%!  wait_and_output_result(+Pengine, +Queue,
 2284%!                         +Format, +TimeLimit, +Collate) is det.
 2285%
 2286%   Wait for the Pengine's Queue and if  there is a message, send it
 2287%   to the requester using  output_result/1.   If  Pengine  does not
 2288%   answer within the time specified   by  the setting =time_limit=,
 2289%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2290%   _).
 2291
 2292wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
 2293    Collate is min(Collate0, TimeLimit/10),
 2294    get_time(Epoch),
 2295    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2296                                 [ timeout(TimeLimit)
 2297                                 ]),
 2298              Error, true)
 2299    ->  (   var(Error)
 2300        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2301            (   collating_event(Collate, Event)
 2302            ->  Deadline is Epoch+TimeLimit,
 2303                collect_events(Pengine, Collate, Queue, Deadline, 100, More),
 2304                Events = [Event|More],
 2305                ignore(destroy_queue_from_http(Pengine, Events, Queue)),
 2306                protect_pengine(Pengine, output_result(Pengine, Format, Events))
 2307            ;   ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2308                protect_pengine(Pengine, output_result(Pengine, Format, Event))
 2309            )
 2310        ;   output_result(Pengine, Format, died(Pengine))
 2311        )
 2312    ;   time_limit_exceeded(Pengine, Format)
 2313    ).
 2314
 2315%!  collect_events(+Pengine, +CollateTime, +Queue, +Deadline, +Max, -Events)
 2316%
 2317%   Collect more events as long as they   are not separated by more than
 2318%   CollateTime seconds and collect at most Max.
 2319
 2320collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
 2321    !.
 2322collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
 2323    debug(pengine(wait), 'Waiting to collate events', []),
 2324    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2325                                 [ timeout(Collate)
 2326                                 ]),
 2327              Error, true)
 2328    ->  (   var(Error)
 2329        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2330            Events = [Event|More],
 2331            (   collating_event(Collate, Event)
 2332            ->  Max2 is Max - 1,
 2333                collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
 2334            ;   More = []
 2335            )
 2336        ;   Events = [died(Pengine)]
 2337        )
 2338    ;   get_time(Now),
 2339        Now > Deadline
 2340    ->  time_limit_event(Pengine, TimeLimitEvent),
 2341        Events = [TimeLimitEvent]
 2342    ;   Events = []
 2343    ).
 2344
 2345collating_event(0, _) :-
 2346    !,
 2347    fail.
 2348collating_event(_, output(_,_)).
 2349
 2350%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2351%!                                +TimeLimit, +Dict) is det.
 2352%
 2353%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2354%   results. Dict may contain a  `disposition`   key  to  denote the
 2355%   download location.
 2356
 2357create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2358    get_dict(solutions, Dict, all),
 2359    !,
 2360    between(1, infinite, Page),
 2361    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2362                                 [ timeout(TimeLimit)
 2363                                 ]),
 2364              Error, true)
 2365    ->  (   var(Error)
 2366        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2367            (   destroy_queue_from_http(Pengine, Event, Queue)
 2368            ->  !,
 2369                protect_pengine(Pengine,
 2370                                output_result_2(Format, page(Page, Event), Dict))
 2371            ;   is_more_event(Event)
 2372            ->  pengine_thread(Pengine, Thread),
 2373                thread_send_message(Thread, pengine_request(next)),
 2374                protect_pengine(Pengine,
 2375                                output_result_2(Format, page(Page, Event), Dict)),
 2376                fail
 2377            ;   !,
 2378                protect_pengine(Pengine,
 2379                                output_result_2(Format, page(Page, Event), Dict))
 2380            )
 2381        ;   !, output_result(Pengine, Format, died(Pengine))
 2382        )
 2383    ;   !, time_limit_exceeded(Pengine, Format)
 2384    ),
 2385    !.
 2386create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2387    wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
 2388
 2389is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2390is_more_event(create(_, Options)) :-
 2391    memberchk(answer(Event), Options),
 2392    is_more_event(Event).
 2393
 2394
 2395
 2396%!  time_limit_exceeded(+Pengine, +Format)
 2397%
 2398%   The Pengine did not reply within its time limit. Send a reply to the
 2399%   client in the requested format and interrupt the Pengine.
 2400%
 2401%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2402%   get the Pengine back to its main   loop.  Unfortunately we only have
 2403%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2404%   which cannot be caught and thus destroys the Pengine.
 2405
 2406time_limit_exceeded(Pengine, Format) :-
 2407    time_limit_event(Pengine, Event),
 2408    call_cleanup(
 2409        pengine_destroy(Pengine, [force(true)]),
 2410        output_result(Pengine, Format, Event)).
 2411
 2412time_limit_event(Pengine,
 2413                 destroy(Pengine, error(Pengine, time_limit_exceeded))).
 2414
 2415destroy_pengine_after_output(Pengine, Events) :-
 2416    is_list(Events),
 2417    last(Events, Last),
 2418    time_limit_event(Pengine,  Last),
 2419    !,
 2420    catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
 2421destroy_pengine_after_output(_, _).
 2422
 2423
 2424%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2425%
 2426%   Consider destroying the output queue   for Pengine after sending
 2427%   Event back to the HTTP client. We can destroy the queue if
 2428%
 2429%     - The pengine already died (output_queue/3 is present) and
 2430%       the queue is empty.
 2431%     - This is a final (destroy) event.
 2432%
 2433%   @tbd    If the client did not request all output, the queue will
 2434%           not be destroyed.  We need some timeout and GC for that.
 2435
 2436destroy_queue_from_http(ID, _, Queue) :-
 2437    output_queue(ID, Queue, _),
 2438    !,
 2439    destroy_queue_if_empty(Queue).
 2440destroy_queue_from_http(ID, Event, Queue) :-
 2441    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2442    is_destroy_event(Event),
 2443    !,
 2444    message_queue_property(Queue, size(Waiting)),
 2445    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2446    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2447
 2448is_destroy_event(destroy(_)).
 2449is_destroy_event(destroy(_,_)).
 2450is_destroy_event(create(_, Options)) :-
 2451    memberchk(answer(Event), Options),
 2452    is_destroy_event(Event).
 2453
 2454destroy_queue_if_empty(Queue) :-
 2455    thread_peek_message(Queue, _),
 2456    !.
 2457destroy_queue_if_empty(Queue) :-
 2458    retractall(output_queue(_, Queue, _)),
 2459    message_queue_destroy(Queue).
 2460
 2461%!  gc_abandoned_queues
 2462%
 2463%   Check whether there are queues  that   have  been abadoned. This
 2464%   happens if the stream contains output events and not all of them
 2465%   are read by the client.
 2466
 2467:- dynamic
 2468    last_gc/1. 2469
 2470gc_abandoned_queues :-
 2471    consider_queue_gc,
 2472    !,
 2473    get_time(Now),
 2474    (   output_queue(_, Queue, Time),
 2475        Now-Time > 15*60,
 2476        retract(output_queue(_, Queue, Time)),
 2477        message_queue_destroy(Queue),
 2478        fail
 2479    ;   retractall(last_gc(_)),
 2480        asserta(last_gc(Now))
 2481    ).
 2482gc_abandoned_queues.
 2483
 2484consider_queue_gc :-
 2485    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2486    N > 100,
 2487    (   last_gc(Time),
 2488        get_time(Now),
 2489        Now-Time > 5*60
 2490    ->  true
 2491    ;   \+ last_gc(_)
 2492    ).
 2493
 2494%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2495%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2496%
 2497%   Handle destruction of the message queue connecting the HTTP side
 2498%   to the pengine. We cannot delete the queue when the pengine dies
 2499%   because the queue may contain output  events. Termination of the
 2500%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2501%   orders. This means we need handle this using synchronization.
 2502%
 2503%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2504%     Called (indirectly) from pengine_done/1 if the pengine's
 2505%     thread dies.
 2506%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2507%     Called from destroy_queue/3, from wait_and_output_result/5,
 2508%     i.e., from the HTTP side.
 2509
 2510:- dynamic output_queue_destroyed/1. 2511
 2512sync_destroy_queue_from_http(ID, Queue) :-
 2513    (   output_queue(ID, Queue, _)
 2514    ->  destroy_queue_if_empty(Queue)
 2515    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2516    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2517              [Queue]),
 2518        get_time(Now),
 2519        asserta(output_queue(ID, Queue, Now))
 2520    ;   message_queue_destroy(Queue),
 2521        asserta(output_queue_destroyed(Queue))
 2522    ).
 2523
 2524%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2525%
 2526%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2527%   terminates. It is called while the mutex `pengine` held.
 2528
 2529sync_destroy_queue_from_pengine(ID, Queue) :-
 2530    (   retract(output_queue_destroyed(Queue))
 2531    ->  true
 2532    ;   get_time(Now),
 2533        asserta(output_queue(ID, Queue, Now))
 2534    ),
 2535    retractall(pengine_queue(ID, Queue, _, _)).
 2536
 2537
 2538http_pengine_send(Request) :-
 2539    reply_options(Request, [get,post]),
 2540    !.
 2541http_pengine_send(Request) :-
 2542    http_parameters(Request,
 2543                    [ id(ID, [ type(atom) ]),
 2544                      event(EventString, [optional(true)]),
 2545                      collate(Collate, [number, default(0)]),
 2546                      format(Format, [default(prolog)])
 2547                    ]),
 2548    catch(read_event(ID, Request, Format, EventString, Event),
 2549          Error,
 2550          true),
 2551    (   var(Error)
 2552    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2553        (   pengine_thread(ID, Thread)
 2554        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2555            random_delay,
 2556            broadcast(pengine(send(ID, Event))),
 2557            thread_send_message(Thread, pengine_request(Event)),
 2558            wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2559        ;   atom(ID)
 2560        ->  pengine_died(Format, ID)
 2561        ;   http_404([], Request)
 2562        )
 2563    ;   Error = error(existence_error(pengine, ID), _)
 2564    ->  pengine_died(Format, ID)
 2565    ;   output_result(ID, Format, error(ID, Error))
 2566    ).
 2567
 2568pengine_died(Format, Pengine) :-
 2569    output_result(Pengine, Format,
 2570                  error(Pengine, error(existence_error(pengine, Pengine),_))).
 2571
 2572
 2573%!  read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
 2574%
 2575%   Read an event on behalve of Pengine.  Note that the pengine's module
 2576%   should not be  deleted  while  we   are  reading  using  its  syntax
 2577%   (module). This is ensured using the `pengine_done` mutex.
 2578%
 2579%   @see pengine_done/0.
 2580
 2581read_event(Pengine, Request, Format, EventString, Event) :-
 2582    protect_pengine(
 2583        Pengine,
 2584        ( get_pengine_module(Pengine, Module),
 2585          read_event_2(Request, EventString, Module, Event0, Bindings)
 2586        )),
 2587    !,
 2588    fix_bindings(Format, Event0, Bindings, Event).
 2589read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2590    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2591    discard_post_data(Request),
 2592    existence_error(pengine, Pengine).
 2593
 2594
 2595%%  read_event_(+Request, +EventString, +Module, -Event, -Bindings)
 2596%
 2597%   Read the sent event. The event is a   Prolog  term that is either in
 2598%   the =event= parameter or as a posted document.
 2599
 2600read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2601    nonvar(EventString),
 2602    !,
 2603    term_string(Event, EventString,
 2604                [ variable_names(Bindings),
 2605                  module(Module)
 2606                ]).
 2607read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2608    option(method(post), Request),
 2609    http_read_data(Request,     Event,
 2610                   [ content_type('application/x-prolog'),
 2611                     module(Module),
 2612                     variable_names(Bindings)
 2613                   ]).
 2614
 2615%%  discard_post_data(+Request) is det.
 2616%
 2617%   If this is a POST request, discard the posted data.
 2618
 2619discard_post_data(Request) :-
 2620    option(method(post), Request),
 2621    !,
 2622    setup_call_cleanup(
 2623        open_null_stream(NULL),
 2624        http_read_data(Request, _, [to(stream(NULL))]),
 2625        close(NULL)).
 2626discard_post_data(_).
 2627
 2628%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2629%
 2630%   Generate the template for json(-s) Format  from the variables in
 2631%   the asked Goal. Variables starting  with an underscore, followed
 2632%   by an capital letter are ignored from the template.
 2633
 2634fix_bindings(Format,
 2635             ask(Goal, Options0), Bindings,
 2636             ask(Goal, NewOptions)) :-
 2637    json_lang(Format),
 2638    !,
 2639    exclude(anon, Bindings, NamedBindings),
 2640    template(NamedBindings, Template, Options0, Options1),
 2641    select_option(chunk(Paging), Options1, Options2, 1),
 2642    NewOptions = [ template(Template),
 2643                   chunk(Paging),
 2644                   bindings(NamedBindings)
 2645                 | Options2
 2646                 ].
 2647fix_bindings(_, Command, _, Command).
 2648
 2649template(_, Template, Options0, Options) :-
 2650    select_option(template(Template), Options0, Options),
 2651    !.
 2652template(Bindings, Template, Options, Options) :-
 2653    dict_create(Template, swish_default_template, Bindings).
 2654
 2655anon(Name=_) :-
 2656    sub_atom(Name, 0, _, _, '_'),
 2657    sub_atom(Name, 1, 1, _, Next),
 2658    char_type(Next, prolog_var_start).
 2659
 2660var_name(Name=_, Name).
 2661
 2662
 2663%!  json_lang(+Format) is semidet.
 2664%
 2665%   True if Format is a JSON variation.
 2666
 2667json_lang(json) :- !.
 2668json_lang(Format) :-
 2669    sub_atom(Format, 0, _, _, 'json-').
 2670
 2671%!  http_pengine_pull_response(+Request)
 2672%
 2673%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2674%   messages from the pengine.
 2675
 2676http_pengine_pull_response(Request) :-
 2677    reply_options(Request, [get]),
 2678    !.
 2679http_pengine_pull_response(Request) :-
 2680    http_parameters(Request,
 2681            [   id(ID, []),
 2682                format(Format, [default(prolog)]),
 2683                collate(Collate, [number, default(0)])
 2684            ]),
 2685    reattach(ID),
 2686    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2687        ->  true
 2688        ;   output_queue(ID, Queue, _),
 2689            TimeLimit = 0
 2690        )
 2691    ->  wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2692    ;   http_404([], Request)
 2693    ).
 2694
 2695%!  http_pengine_abort(+Request)
 2696%
 2697%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2698%   any time and the reply may  be   handled  by a pull_response. In
 2699%   that case, our  pengine  has  already   died  before  we  get to
 2700%   wait_and_output_result/5.
 2701
 2702http_pengine_abort(Request) :-
 2703    reply_options(Request, [get,post]),
 2704    !.
 2705http_pengine_abort(Request) :-
 2706    http_parameters(Request,
 2707            [   id(ID, [])
 2708            ]),
 2709    (   pengine_thread(ID, _Thread)
 2710    ->  broadcast(pengine(abort(ID))),
 2711        abort_pending_output(ID),
 2712        pengine_abort(ID),
 2713        reply_json_dict(true)
 2714    ;   http_404([], Request)
 2715    ).
 2716
 2717%!  http_pengine_detach(+Request)
 2718%
 2719%   Detach a Pengine while keeping it running.  This has the following
 2720%   consequences:
 2721%
 2722%     - `/destroy_all` including the id of this pengine is ignored.
 2723%     - Output from the pengine is stored in the queue without
 2724%       waiting for the queue to drain.
 2725%     - The Pengine becomes available through `/list`
 2726
 2727http_pengine_detach(Request) :-
 2728    reply_options(Request, [post]),
 2729    !.
 2730http_pengine_detach(Request) :-
 2731    http_parameters(Request,
 2732                    [ id(ID, [])
 2733                    ]),
 2734    http_read_json_dict(Request, ClientData),
 2735    (   pengine_property(ID, application(Application)),
 2736        allowed(Request, Application),
 2737        authenticate(Request, Application, _UserOptions)
 2738    ->  broadcast(pengine(detach(ID))),
 2739        get_time(Now),
 2740        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2741        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2742        message_queue_set(Queue, max_size(1000)),
 2743        pengine_reply(Queue, detached(ID)),
 2744        reply_json_dict(true)
 2745    ;   http_404([], Request)
 2746    ).
 2747
 2748reattach(ID) :-
 2749    (   retract(pengine_detached(ID, _Data)),
 2750        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2751    ->  message_queue_set(Queue, max_size(25))
 2752    ;   true
 2753    ).
 2754
 2755
 2756%!  http_pengine_destroy_all(+Request)
 2757%
 2758%   Destroy a list of pengines. Normally   called  by pengines.js if the
 2759%   browser window is closed.
 2760
 2761http_pengine_destroy_all(Request) :-
 2762    reply_options(Request, [get,post]),
 2763    !.
 2764http_pengine_destroy_all(Request) :-
 2765    http_parameters(Request,
 2766                    [ ids(IDsAtom, [])
 2767                    ]),
 2768    atomic_list_concat(IDs, ',', IDsAtom),
 2769    forall(( member(ID, IDs),
 2770             \+ pengine_detached(ID, _)
 2771           ),
 2772           pengine_destroy(ID, [force(true)])),
 2773    reply_json_dict("ok").
 2774
 2775%!  http_pengine_ping(+Request)
 2776%
 2777%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2778%   alive and event status(Pengine, Stats) is created, where `Stats`
 2779%   is the return of thread_statistics/2.
 2780
 2781http_pengine_ping(Request) :-
 2782    reply_options(Request, [get]),
 2783    !.
 2784http_pengine_ping(Request) :-
 2785    http_parameters(Request,
 2786                    [ id(Pengine, []),
 2787                      format(Format, [default(prolog)])
 2788                    ]),
 2789    (   pengine_thread(Pengine, Thread),
 2790        Error = error(_,_),
 2791        catch(thread_statistics(Thread, Stats), Error, fail)
 2792    ->  output_result(Pengine, Format, ping(Pengine, Stats))
 2793    ;   output_result(Pengine, Format, died(Pengine))
 2794    ).
 2795
 2796%!  http_pengine_list(+Request)
 2797%
 2798%   HTTP  handler  for  `/pengine/list`,   providing  information  about
 2799%   running Pengines.
 2800%
 2801%   @tbd Only list detached Pengines associated to the logged in user.
 2802
 2803http_pengine_list(Request) :-
 2804    reply_options(Request, [get]),
 2805    !.
 2806http_pengine_list(Request) :-
 2807    http_parameters(Request,
 2808                    [ status(Status, [default(detached), oneof([detached])]),
 2809                      application(Application, [default(pengine_sandbox)])
 2810                    ]),
 2811    allowed(Request, Application),
 2812    authenticate(Request, Application, _UserOptions),
 2813    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2814    reply_json_dict(json{pengines: Terms}).
 2815
 2816listed_pengine(Application, detached, State) :-
 2817    State = pengine{id:Id,
 2818                    detached:Time,
 2819                    queued:Queued,
 2820                    stats:Stats},
 2821
 2822    pengine_property(Id, application(Application)),
 2823    pengine_property(Id, detached(Time)),
 2824    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2825    message_queue_property(Queue, size(Queued)),
 2826    (   pengine_thread(Id, Thread),
 2827        catch(thread_statistics(Thread, Stats), _, fail)
 2828    ->  true
 2829    ;   Stats = thread{status:died}
 2830    ).
 2831
 2832
 2833%!  output_result(+Pengine, +Format, +EventTerm) is det.
 2834%!  output_result(+Pengine, +Format, +EventTerm, +OptionsDict) is det.
 2835%
 2836%   Formulate an HTTP response from a pengine event term. Format is
 2837%   one of =prolog=, =json= or =json-s=.
 2838%
 2839%   @arg EventTerm is either a single event or a list of events.
 2840
 2841:- dynamic
 2842    pengine_replying/2.             % +Pengine, +Thread
 2843
 2844output_result(Pengine, Format, Event) :-
 2845    thread_self(Thread),
 2846    cors_enable,            % contingent on http:cors setting
 2847    disable_client_cache,
 2848    setup_call_cleanup(
 2849        asserta(pengine_replying(Pengine, Thread), Ref),
 2850        catch(output_result_2(Format, Event, _{}),
 2851              pengine_abort_output,
 2852              true),
 2853        erase(Ref)),
 2854    destroy_pengine_after_output(Pengine, Event).
 2855
 2856output_result_2(Lang, Event, Dict) :-
 2857    write_result(Lang, Event, Dict),
 2858    !.
 2859output_result_2(prolog, Event, _) :-
 2860    !,
 2861    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2862    write_term(Event,
 2863               [ quoted(true),
 2864                 ignore_ops(true),
 2865                 fullstop(true),
 2866                 blobs(portray),
 2867                 portray_goal(portray_blob),
 2868                 nl(true)
 2869               ]).
 2870output_result_2(Lang, Event, _) :-
 2871    json_lang(Lang),
 2872    !,
 2873    (   event_term_to_json_data(Event, JSON, Lang)
 2874    ->  reply_json_dict(JSON)
 2875    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2876    ).
 2877output_result_2(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2878    domain_error(pengine_format, Lang).
 2879
 2880%!  portray_blob(+Blob, +Options) is det.
 2881%
 2882%   Portray non-text blobs that may  appear   in  output  terms. Not
 2883%   really sure about that. Basically such  terms need to be avoided
 2884%   as they are meaningless outside the process. The generated error
 2885%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2886%   Future versions may include more info, depending on `Type`.
 2887
 2888:- public portray_blob/2.               % called from write-term
 2889portray_blob(Blob, _Options) :-
 2890    blob(Blob, Type),
 2891    writeq('$BLOB'(Type)).
 2892
 2893%!  abort_pending_output(+Pengine) is det.
 2894%
 2895%   If we get an abort, it is possible that output is being produced
 2896%   for the client.  This predicate aborts these threads.
 2897
 2898abort_pending_output(Pengine) :-
 2899    forall(pengine_replying(Pengine, Thread),
 2900           abort_output_thread(Thread)).
 2901
 2902abort_output_thread(Thread) :-
 2903    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2904          error(existence_error(thread, _), _),
 2905          true).
 2906
 2907%!  write_result(+Lang, +Event, +Dict) is semidet.
 2908%
 2909%   Hook that allows for different output formats. The core Pengines
 2910%   library supports `prolog` and various   JSON  dialects. The hook
 2911%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2912%   hook must be used if  a   completely  different output format is
 2913%   desired.
 2914
 2915%!  disable_client_cache
 2916%
 2917%   Make sure the client will not cache our page.
 2918%
 2919%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2920
 2921disable_client_cache :-
 2922    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2923            Pragma: no-cache\r\n\c
 2924            Expires: 0\r\n').
 2925
 2926event_term_to_json_data(Events, JSON, Lang) :-
 2927    is_list(Events),
 2928    !,
 2929    events_to_json_data(Events, JSON, Lang).
 2930event_term_to_json_data(Event, JSON, Lang) :-
 2931    event_to_json(Event, JSON, Lang),
 2932    !.
 2933event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2934                        json{event:success, id:ID, time:Time,
 2935                             data:Bindings, more:More, projection:Projection},
 2936                        json) :-
 2937    !,
 2938    term_to_json(Bindings0, Bindings).
 2939event_term_to_json_data(destroy(ID, Event),
 2940                        json{event:destroy, id:ID, data:JSON},
 2941                        Style) :-
 2942    !,
 2943    event_term_to_json_data(Event, JSON, Style).
 2944event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2945    !,
 2946    (   select(answer(First0), Features0, Features1)
 2947    ->  event_term_to_json_data(First0, First, Style),
 2948        Features = [answer(First)|Features1]
 2949    ;   Features = Features0
 2950    ),
 2951    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2952event_term_to_json_data(destroy(ID, Event),
 2953                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2954    !,
 2955    event_term_to_json_data(Event, JSON, Style).
 2956event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2957    !,
 2958    Error0 = json{event:error, id:ID, data:Message},
 2959    add_error_details(ErrorTerm, Error0, Error),
 2960    message_to_string(ErrorTerm, Message).
 2961event_term_to_json_data(failure(ID, Time),
 2962                        json{event:failure, id:ID, time:Time}, _) :-
 2963    !.
 2964event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2965    functor(EventTerm, F, 1),
 2966    !,
 2967    arg(1, EventTerm, ID).
 2968event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2969    functor(EventTerm, F, 2),
 2970    arg(1, EventTerm, ID),
 2971    arg(2, EventTerm, Data),
 2972    term_to_json(Data, JSON).
 2973
 2974events_to_json_data([], [], _).
 2975events_to_json_data([E|T0], [J|T], Lang) :-
 2976    event_term_to_json_data(E, J, Lang),
 2977    events_to_json_data(T0, T, Lang).
 2978
 2979:- public add_error_details/3. 2980
 2981%%  add_error_details(+Error, +JSON0, -JSON)
 2982%
 2983%   Add format error code and  location   information  to an error. Also
 2984%   used by pengines_io.pl.
 2985
 2986add_error_details(Error, JSON0, JSON) :-
 2987    add_error_code(Error, JSON0, JSON1),
 2988    add_error_location(Error, JSON1, JSON).
 2989
 2990%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2991%
 2992%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2993%   code is the functor name of  the   formal  part  of the error, e.g.,
 2994%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2995%   information:
 2996%
 2997%     - existence_error(Type, Obj)
 2998%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2999%     atomic.
 3000
 3001add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 3002    atom(Type),
 3003    !,
 3004    to_atomic(Obj, Value),
 3005    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 3006add_error_code(error(Formal, _), Error0, Error) :-
 3007    callable(Formal),
 3008    !,
 3009    functor(Formal, Code, _),
 3010    Error = Error0.put(code, Code).
 3011add_error_code(_, Error, Error).
 3012
 3013% What to do with large integers?
 3014to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 3015to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 3016to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 3017to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 3018
 3019
 3020%%  add_error_location(+Error, +JSON0, -JSON) is det.
 3021%
 3022%   Add a =location= property if the  error   can  be  associated with a
 3023%   source location. The location is an   object  with properties =file=
 3024%   and =line= and, if available, the character location in the line.
 3025
 3026add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 3027    atom(Path), integer(Line),
 3028    !,
 3029    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 3030add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 3031    atom(Path), integer(Line), integer(Ch),
 3032    !,
 3033    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 3034add_error_location(_, Term, Term).
 3035
 3036
 3037%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 3038%
 3039%   Hook that translates a Pengine event  structure into a term suitable
 3040%   for reply_json_dict/1, according to the language specification Lang.
 3041%   This can be used to massage general Prolog terms, notably associated
 3042%   with `success(ID, Bindings, Projection, Time, More)` and `output(ID,
 3043%   Term)` into a format suitable for processing at the client side.
 3044
 3045%:- multifile pengines:event_to_json/3.
 3046
 3047
 3048                 /*******************************
 3049                 *        ACCESS CONTROL        *
 3050                 *******************************/
 3051
 3052%!  allowed(+Request, +Application) is det.
 3053%
 3054%   Check whether the peer is allowed to connect.  Returns a
 3055%   =forbidden= header if contact is not allowed.
 3056
 3057allowed(Request, Application) :-
 3058    setting(Application:allow_from, Allow),
 3059    match_peer(Request, Allow),
 3060    setting(Application:deny_from, Deny),
 3061    \+ match_peer(Request, Deny),
 3062    !.
 3063allowed(Request, _Application) :-
 3064    memberchk(request_uri(Here), Request),
 3065    throw(http_reply(forbidden(Here))).
 3066
 3067match_peer(_, Allowed) :-
 3068    memberchk(*, Allowed),
 3069    !.
 3070match_peer(_, []) :- !, fail.
 3071match_peer(Request, Allowed) :-
 3072    http_peer(Request, Peer),
 3073    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3074    (   memberchk(Peer, Allowed)
 3075    ->  true
 3076    ;   member(Pattern, Allowed),
 3077        match_peer_pattern(Pattern, Peer)
 3078    ).
 3079
 3080match_peer_pattern(Pattern, Peer) :-
 3081    ip_term(Pattern, IP),
 3082    ip_term(Peer, IP),
 3083    !.
 3084
 3085ip_term(Peer, Pattern) :-
 3086    split_string(Peer, ".", "", PartStrings),
 3087    ip_pattern(PartStrings, Pattern).
 3088
 3089ip_pattern([], []).
 3090ip_pattern([*], _) :- !.
 3091ip_pattern([S|T0], [N|T]) :-
 3092    number_string(N, S),
 3093    ip_pattern(T0, T).
 3094
 3095
 3096%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 3097%
 3098%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 3099%   an exception.
 3100
 3101authenticate(Request, Application, UserOptions) :-
 3102    authentication_hook(Request, Application, User),
 3103    !,
 3104    must_be(ground, User),
 3105    UserOptions = [user(User)].
 3106authenticate(_, _, []).
 3107
 3108%%  authentication_hook(+Request, +Application, -User) is semidet.
 3109%
 3110%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 3111%   discover whether the server is accessed   by  an authorized user. It
 3112%   can react in three ways:
 3113%
 3114%     - Succeed, binding User to a ground term.  The authentity of the
 3115%       user is available through pengine_user/1.
 3116%     - Fail.  The =/create= succeeds, but the pengine is not associated
 3117%       with a user.
 3118%     - Throw an exception to prevent creation of the pengine.  Two
 3119%       meaningful exceptions are:
 3120%         - throw(http_reply(authorise(basic(Realm))))
 3121%         Start a normal HTTP login challenge (reply 401)
 3122%         - throw(http_reply(forbidden(Path))))
 3123%         Reject the request using a 403 repply.
 3124%
 3125%   @see http_authenticate/3 can be used to implement this hook using
 3126%        default HTTP authentication data.
 3127
 3128pengine_register_user(Options) :-
 3129    option(user(User), Options),
 3130    !,
 3131    pengine_self(Me),
 3132    asserta(pengine_user(Me, User)).
 3133pengine_register_user(_).
 3134
 3135
 3136%%  pengine_user(-User) is semidet.
 3137%
 3138%   True when the pengine was create by  an HTTP request that authorized
 3139%   User.
 3140%
 3141%   @see authentication_hook/3 can be used to extract authorization from
 3142%        the HTTP header.
 3143
 3144pengine_user(User) :-
 3145    pengine_self(Me),
 3146    pengine_user(Me, User).
 3147
 3148%!  reply_options(+Request, +Methods) is semidet.
 3149%
 3150%   Reply the HTTP OPTIONS request
 3151
 3152reply_options(Request, Allowed) :-
 3153    option(method(options), Request),
 3154    !,
 3155    cors_enable(Request,
 3156                [ methods(Allowed)
 3157                ]),
 3158    format('Content-type: text/plain\r\n'),
 3159    format('~n').                   % empty body
 3160
 3161
 3162                 /*******************************
 3163                 *        COMPILE SOURCE        *
 3164                 *******************************/
 3165
 3166/** pengine_src_text(+SrcText, +Module) is det
 3167
 3168Asserts the clauses defined in SrcText in   the  private database of the
 3169current Pengine. This  predicate  processes   the  `src_text'  option of
 3170pengine_create/1.
 3171*/
 3172
 3173pengine_src_text(Src, Module) :-
 3174    pengine_self(Self),
 3175    format(atom(ID), 'pengine://~w/src', [Self]),
 3176    extra_load_options(Self, Options),
 3177    setup_call_cleanup(
 3178        open_chars_stream(Src, Stream),
 3179        load_files(Module:ID,
 3180                   [ stream(Stream),
 3181                     module(Module),
 3182                     silent(true)
 3183                   | Options
 3184                   ]),
 3185        close(Stream)),
 3186    keep_source(Self, ID, Src).
 3187
 3188system:'#file'(File, _Line) :-
 3189    prolog_load_context(stream, Stream),
 3190    set_stream(Stream, file_name(File)),
 3191    set_stream(Stream, record_position(false)),
 3192    set_stream(Stream, record_position(true)).
 3193
 3194%%   pengine_src_url(+URL, +Module) is det
 3195%
 3196%    Asserts the clauses defined in URL in   the private database of the
 3197%    current Pengine. This predicate processes   the `src_url' option of
 3198%    pengine_create/1.
 3199%
 3200%    @tbd: make a sensible guess at the encoding.
 3201
 3202pengine_src_url(URL, Module) :-
 3203    pengine_self(Self),
 3204    uri_encoded(path, URL, Path),
 3205    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3206    extra_load_options(Self, Options),
 3207    (   get_pengine_application(Self, Application),
 3208        setting(Application:debug_info, false)
 3209    ->  setup_call_cleanup(
 3210            http_open(URL, Stream, []),
 3211            ( set_stream(Stream, encoding(utf8)),
 3212              load_files(Module:ID,
 3213                         [ stream(Stream),
 3214                           module(Module)
 3215                         | Options
 3216                         ])
 3217            ),
 3218            close(Stream))
 3219    ;   setup_call_cleanup(
 3220            http_open(URL, TempStream, []),
 3221            ( set_stream(TempStream, encoding(utf8)),
 3222              read_string(TempStream, _, Src)
 3223            ),
 3224            close(TempStream)),
 3225        setup_call_cleanup(
 3226            open_chars_stream(Src, Stream),
 3227            load_files(Module:ID,
 3228                       [ stream(Stream),
 3229                         module(Module)
 3230                       | Options
 3231                       ]),
 3232            close(Stream)),
 3233        keep_source(Self, ID, Src)
 3234    ).
 3235
 3236
 3237extra_load_options(Pengine, Options) :-
 3238    pengine_not_sandboxed(Pengine),
 3239    !,
 3240    Options = [].
 3241extra_load_options(_, [sandboxed(true)]).
 3242
 3243
 3244keep_source(Pengine, ID, SrcText) :-
 3245    get_pengine_application(Pengine, Application),
 3246    setting(Application:debug_info, true),
 3247    !,
 3248    to_string(SrcText, SrcString),
 3249    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3250keep_source(_, _, _).
 3251
 3252to_string(String, String) :-
 3253    string(String),
 3254    !.
 3255to_string(Atom, String) :-
 3256    atom_string(Atom, String),
 3257    !.
 3258
 3259		 /*******************************
 3260		 *            SANDBOX		*
 3261		 *******************************/
 3262
 3263:- multifile
 3264    sandbox:safe_primitive/1. 3265
 3266sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3267sandbox:safe_primitive(pengines:pengine_output(_)).
 3268sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3269
 3270
 3271                 /*******************************
 3272                 *            MESSAGES          *
 3273                 *******************************/
 3274
 3275prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3276    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3277      'This is normally caused by an insufficiently instantiated'-[], nl,
 3278      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3279      'find all possible instantations of Var.'-[]
 3280    ]