View source with formatted comments or as raw
    1:- encoding(utf8).
    2/*  Part of SWI-Prolog
    3
    4    Author:        Torbjörn Lager and Jan Wielemaker
    5    E-mail:        J.Wielemaker@vu.nl
    6    WWW:           http://www.swi-prolog.org
    7    Copyright (C): 2014-2024, Torbjörn Lager,
    8                              VU University Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(pengines,
   39          [ pengine_create/1,                   % +Options
   40            pengine_ask/3,                      % +Pengine, :Query, +Options
   41            pengine_next/2,                     % +Pengine. +Options
   42            pengine_stop/2,                     % +Pengine. +Options
   43            pengine_event/2,                    % -Event, +Options
   44            pengine_input/2,                    % +Prompt, -Term
   45            pengine_output/1,                   % +Term
   46            pengine_respond/3,                  % +Pengine, +Input, +Options
   47            pengine_debug/2,                    % +Format, +Args
   48            pengine_self/1,                     % -Pengine
   49            pengine_pull_response/2,            % +Pengine, +Options
   50            pengine_destroy/1,                  % +Pengine
   51            pengine_destroy/2,                  % +Pengine, +Options
   52            pengine_abort/1,                    % +Pengine
   53            pengine_application/1,              % +Application
   54            current_pengine_application/1,      % ?Application
   55            pengine_property/2,                 % ?Pengine, ?Property
   56            pengine_user/1,                     % -User
   57            pengine_event_loop/2,               % :Closure, +Options
   58            pengine_rpc/2,                      % +Server, :Goal
   59            pengine_rpc/3                       % +Server, :Goal, +Options
   60          ]).   61
   62/** <module> Pengines: Web Logic Programming Made Easy
   63
   64The library(pengines) provides an  infrastructure   for  creating Prolog
   65engines in a (remote) pengine server  and accessing these engines either
   66from Prolog or JavaScript.
   67
   68@author Torbjörn Lager and Jan Wielemaker
   69*/
   70
   71:- autoload(library(aggregate),[aggregate_all/3]).   72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]).   73:- autoload(library(broadcast),[broadcast/1]).   74:- autoload(library(charsio),[open_chars_stream/2]).   75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]).   76:- autoload(library(error),
   77	    [ must_be/2,
   78	      existence_error/2,
   79	      permission_error/3,
   80	      domain_error/2
   81	    ]).   82:- autoload(library(filesex),[directory_file_path/3]).   83:- autoload(library(listing),[listing/1]).   84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]).   85:- autoload(library(modules),[in_temporary_module/3]).   86:- autoload(library(occurs),[sub_term/2]).   87:- autoload(library(option),
   88	    [select_option/3,option/2,option/3,select_option/4]).   89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]).   90:- autoload(library(sandbox),[safe_goal/1]).   91:- autoload(library(statistics),[thread_statistics/2]).   92:- autoload(library(term_to_json),[term_to_json/2]).   93:- autoload(library(thread_pool),
   94	    [thread_pool_create/3,thread_create_in_pool/4]).   95:- autoload(library(time),[alarm/4,call_with_time_limit/2]).   96:- autoload(library(uri),
   97	    [ uri_components/2,
   98	      uri_query_components/2,
   99	      uri_data/3,
  100	      uri_data/4,
  101	      uri_encoded/3
  102	    ]).  103:- autoload(library(http/http_client),[http_read_data/3]).  104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]).  105:- autoload(library(http/http_dispatch),
  106	    [http_handler/3,http_404/2,http_reply_file/3]).  107:- autoload(library(http/http_open),[http_open/3]).  108:- autoload(library(http/http_parameters),[http_parameters/2]).  109:- autoload(library(http/http_stream),[is_cgi_stream/1]).  110:- autoload(library(http/http_wrapper),[http_peer/2]).  111
  112:- use_module(library(settings),[setting/2,setting/4]).  113:- use_module(library(http/http_json),
  114              [http_read_json_dict/2,reply_json_dict/1]).  115
  116:- if(exists_source(library(uuid))).  117:- autoload(library(uuid), [uuid/2]).  118:- endif.  119
  120
  121:- meta_predicate
  122    pengine_create(:),
  123    pengine_rpc(+, +, :),
  124    pengine_event_loop(1, +).  125
  126:- multifile
  127    write_result/3,                 % +Format, +Event, +Dict
  128    event_to_json/3,                % +Event, -JSON, +Format
  129    prepare_module/3,               % +Module, +Application, +Options
  130    prepare_goal/3,                 % +GoalIn, -GoalOut, +Options
  131    authentication_hook/3,          % +Request, +Application, -User
  132    not_sandboxed/2,                % +User, +App
  133    pengine_flush_output_hook/0.  134
  135:- predicate_options(pengine_create/1, 1,
  136                     [ id(-atom),
  137                       alias(atom),
  138                       application(atom),
  139                       destroy(boolean),
  140                       server(atom),
  141                       ask(compound),
  142                       template(compound),
  143                       chunk(integer;oneof([false])),
  144                       bindings(list),
  145                       src_list(list),
  146                       src_text(any),           % text
  147                       src_url(atom),
  148                       src_predicates(list)
  149                     ]).  150:- predicate_options(pengine_ask/3, 3,
  151                     [ template(any),
  152                       chunk(integer;oneof([false])),
  153                       bindings(list)
  154                     ]).  155:- predicate_options(pengine_next/2, 2,
  156                     [ chunk(integer),
  157                       pass_to(pengine_send/3, 3)
  158                     ]).  159:- predicate_options(pengine_stop/2, 2,
  160                     [ pass_to(pengine_send/3, 3)
  161                     ]).  162:- predicate_options(pengine_respond/3, 2,
  163                     [ pass_to(pengine_send/3, 3)
  164                     ]).  165:- predicate_options(pengine_rpc/3, 3,
  166                     [ chunk(integer;oneof([false])),
  167                       pass_to(pengine_create/1, 1)
  168                     ]).  169:- predicate_options(pengine_send/3, 3,
  170                     [ delay(number)
  171                     ]).  172:- predicate_options(pengine_event/2, 2,
  173                     [ listen(atom),
  174                       pass_to(system:thread_get_message/3, 3)
  175                     ]).  176:- predicate_options(pengine_pull_response/2, 2,
  177                     [ pass_to(http_open/3, 3)
  178                     ]).  179:- predicate_options(pengine_event_loop/2, 2,
  180                     []).                       % not yet implemented
  181
  182% :- debug(pengine(transition)).
  183:- debug(pengine(debug)).               % handle pengine_debug in pengine_rpc/3.
  184
  185goal_expansion(random_delay, Expanded) :-
  186    (   debugging(pengine(delay))
  187    ->  Expanded = do_random_delay
  188    ;   Expanded = true
  189    ).
  190
  191do_random_delay :-
  192    Delay is random(20)/1000,
  193    sleep(Delay).
  194
  195:- meta_predicate                       % internal meta predicates
  196    solve(+, ?, 0, +),
  197    findnsols_no_empty(+, ?, 0, -),
  198    pengine_event_loop(+, 1, +).  199
  200/**  pengine_create(:Options) is det.
  201
  202    Creates a new pengine. Valid options are:
  203
  204    * id(-ID)
  205      ID gets instantiated to the id of the created pengine.  ID is
  206      atomic.
  207
  208    * alias(+Name)
  209      The pengine is named Name (an atom). A slave pengine (child) can
  210      subsequently be referred to by this name.
  211
  212    * application(+Application)
  213      Application in which the pengine runs.  See pengine_application/1.
  214
  215    * server(+URL)
  216      The pengine will run in (and in the Prolog context of) the pengine
  217      server located at URL.
  218
  219    * src_list(+List_of_clauses)
  220      Inject a list of Prolog clauses into the pengine.
  221
  222    * src_text(+Atom_or_string)
  223      Inject the clauses specified by a source text into the pengine.
  224
  225    * src_url(+URL)
  226      Inject the clauses specified in the file located at URL into the
  227      pengine.
  228
  229    * src_predicates(+List)
  230      Send the local predicates denoted by List to the remote pengine.
  231      List is a list of predicate indicators.
  232
  233Remaining  options  are  passed  to  http_open/3  (meaningful  only  for
  234non-local pengines) and thread_create/3. Note   that for thread_create/3
  235only options changing the stack-sizes can be used. In particular, do not
  236pass the detached or alias options..
  237
  238Successful creation of a pengine will return an _event term_ of the
  239following form:
  240
  241    * create(ID, Term)
  242      ID is the id of the pengine that was created.
  243      Term is not used at the moment.
  244
  245An error will be returned if the pengine could not be created:
  246
  247    * error(ID, Term)
  248      ID is invalid, since no pengine was created.
  249      Term is the exception's error term.
  250*/
  251
  252
  253pengine_create(M:Options0) :-
  254    translate_local_sources(Options0, Options, M),
  255    (   select_option(server(BaseURL), Options, RestOptions)
  256    ->  remote_pengine_create(BaseURL, RestOptions)
  257    ;   local_pengine_create(Options)
  258    ).
  259
  260%!  translate_local_sources(+OptionsIn, -Options, +Module) is det.
  261%
  262%   Translate  the  `src_predicates`  and  `src_list`  options  into
  263%   `src_text`. We need to do that   anyway for remote pengines. For
  264%   local pengines, we could avoid  this   step,  but  there is very
  265%   little point in transferring source to a local pengine anyway as
  266%   local pengines can access any  Prolog   predicate  that you make
  267%   visible to the application.
  268%
  269%   Multiple sources are concatenated  to  end   up  with  a  single
  270%   src_text option.
  271
  272translate_local_sources(OptionsIn, Options, Module) :-
  273    translate_local_sources(OptionsIn, Sources, Options2, Module),
  274    (   Sources == []
  275    ->  Options = Options2
  276    ;   Sources = [Source]
  277    ->  Options = [src_text(Source)|Options2]
  278    ;   atomics_to_string(Sources, Source)
  279    ->  Options = [src_text(Source)|Options2]
  280    ).
  281
  282translate_local_sources([], [], [], _).
  283translate_local_sources([H0|T], [S0|S], Options, M) :-
  284    nonvar(H0),
  285    translate_local_source(H0, S0, M),
  286    !,
  287    translate_local_sources(T, S, Options, M).
  288translate_local_sources([H|T0], S, [H|T], M) :-
  289    translate_local_sources(T0, S, T, M).
  290
  291translate_local_source(src_predicates(PIs), Source, M) :-
  292    must_be(list, PIs),
  293    with_output_to(string(Source),
  294                   maplist(list_in_module(M), PIs)).
  295translate_local_source(src_list(Terms), Source, _) :-
  296    must_be(list, Terms),
  297    with_output_to(string(Source),
  298                   forall(member(Term, Terms),
  299                          format('~k .~n', [Term]))).
  300translate_local_source(src_text(Source), Source, _).
  301
  302list_in_module(M, PI) :-
  303    listing(M:PI).
  304
  305/**  pengine_send(+NameOrID, +Term) is det
  306
  307Same as pengine_send(NameOrID, Term, []).
  308*/
  309
  310pengine_send(Target, Event) :-
  311    pengine_send(Target, Event, []).
  312
  313
  314/**  pengine_send(+NameOrID, +Term, +Options) is det
  315
  316Succeeds immediately and  places  Term  in   the  queue  of  the pengine
  317NameOrID. Options is a list of options:
  318
  319   * delay(+Time)
  320     The actual sending is delayed by Time seconds. Time is an integer
  321     or a float.
  322
  323Any remaining options are passed to http_open/3.
  324*/
  325
  326pengine_send(Target, Event, Options) :-
  327    must_be(atom, Target),
  328    pengine_send2(Target, Event, Options).
  329
  330pengine_send2(self, Event, Options) :-
  331    !,
  332    thread_self(Queue),
  333    delay_message(queue(Queue), Event, Options).
  334pengine_send2(Name, Event, Options) :-
  335    child(Name, Target),
  336    !,
  337    delay_message(pengine(Target), Event, Options).
  338pengine_send2(Target, Event, Options) :-
  339    delay_message(pengine(Target), Event, Options).
  340
  341delay_message(Target, Event, Options) :-
  342    option(delay(Delay), Options),
  343    !,
  344    alarm(Delay,
  345          send_message(Target, Event, Options),
  346          _AlarmID,
  347          [remove(true)]).
  348delay_message(Target, Event, Options) :-
  349    random_delay,
  350    send_message(Target, Event, Options).
  351
  352send_message(queue(Queue), Event, _) :-
  353    thread_send_message(Queue, pengine_request(Event)).
  354send_message(pengine(Pengine), Event, Options) :-
  355    (   pengine_remote(Pengine, Server)
  356    ->  remote_pengine_send(Server, Pengine, Event, Options)
  357    ;   pengine_thread(Pengine, Thread)
  358    ->  thread_send_message(Thread, pengine_request(Event))
  359    ;   existence_error(pengine, Pengine)
  360    ).
  361
  362%!  pengine_request(-Request) is det.
  363%
  364%   To be used by a pengine to wait  for the next request. Such messages
  365%   are placed in the  queue  by   pengine_send/2.  Keeps  the thread in
  366%   normal state if an event arrives within a second. Otherwise it waits
  367%   for the `idle_limit` setting while   using  thread_idle/2 to minimis
  368%   resources.
  369
  370pengine_request(Request) :-
  371    thread_self(Me),
  372    thread_get_message(Me, pengine_request(Request), [timeout(1)]),
  373    !.
  374pengine_request(Request) :-
  375    pengine_self(Self),
  376    get_pengine_application(Self, Application),
  377    setting(Application:idle_limit, IdleLimit0),
  378    IdleLimit is IdleLimit0-1,
  379    thread_self(Me),
  380    (   thread_idle(thread_get_message(Me, pengine_request(Request),
  381                                       [timeout(IdleLimit)]),
  382                    long)
  383    ->  true
  384    ;   Request = destroy
  385    ).
  386
  387
  388%!  pengine_reply(+Event) is det.
  389%!  pengine_reply(+Queue, +Event) is det.
  390%
  391%   Reply Event to the parent of the   current  Pengine or the given
  392%   Queue.  Such  events  are  read   by    the   other   side  with
  393%   pengine_event/1.
  394%
  395%   If the message cannot be sent within the `idle_limit` setting of
  396%   the pengine, abort the pengine.
  397
  398pengine_reply(Event) :-
  399    pengine_parent(Queue),
  400    pengine_reply(Queue, Event).
  401
  402pengine_reply(_Queue, _Event0) :-
  403    nb_current(pengine_idle_limit_exceeded, true),
  404    !.
  405pengine_reply(Queue, Event0) :-
  406    arg(1, Event0, ID),
  407    wrap_first_answer(ID, Event0, Event),
  408    random_delay,
  409    debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
  410    (   pengine_self(ID),
  411        \+ pengine_detached(ID, _)
  412    ->  get_pengine_application(ID, Application),
  413        setting(Application:idle_limit, IdleLimit),
  414        debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
  415        (   thread_send_message(Queue, pengine_event(ID, Event),
  416                                [ timeout(IdleLimit)
  417                                ])
  418        ->  true
  419        ;   thread_self(Me),
  420            debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
  421                  [ID, Me]),
  422            nb_setval(pengine_idle_limit_exceeded, true),
  423            thread_detach(Me),
  424            abort
  425        )
  426    ;   thread_send_message(Queue, pengine_event(ID, Event))
  427    ).
  428
  429wrap_first_answer(ID, Event0, CreateEvent) :-
  430    wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
  431    arg(1, CreateEvent, ID),
  432    !,
  433    retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
  434wrap_first_answer(_ID, Event, Event).
  435
  436
  437empty_queue :-
  438    pengine_parent(Queue),
  439    empty_queue(Queue, 0, Discarded),
  440    debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
  441
  442empty_queue(Queue, C0, C) :-
  443    thread_get_message(Queue, _Term, [timeout(0)]),
  444    !,
  445    C1 is C0+1,
  446    empty_queue(Queue, C1, C).
  447empty_queue(_, C, C).
  448
  449
  450/** pengine_ask(+NameOrID, @Query, +Options) is det
  451
  452Asks pengine NameOrID a query Query.
  453
  454Options is a list of options:
  455
  456    * template(+Template)
  457      Template is a variable (or a term containing variables) shared
  458      with the query. By default, the template is identical to the
  459      query.
  460
  461    * chunk(+IntegerOrFalse)
  462      Retrieve solutions in chunks of Integer rather than one by one. 1
  463      means no chunking (default). Other integers indicate the maximum
  464      number of solutions to retrieve in one chunk.  If `false`, the
  465      Pengine goal is not executed using findall/3 and friends and
  466      we do not backtrack immediately over the goal.  As a result,
  467      changes to backtrackable global state are retained.  This is
  468      similar that using set_prolog_flag(toplevel_mode, recursive).
  469
  470    * bindings(+Bindings)
  471      Sets the global variable '$variable_names' to a list of
  472      `Name = Var` terms, providing access to the actual variable
  473      names.
  474
  475Any remaining options are passed to pengine_send/3.
  476
  477Note that the predicate pengine_ask/3 is deterministic, even for queries
  478that have more than one solution. Also,  the variables in Query will not
  479be bound. Instead, results will  be  returned   in  the  form  of _event
  480terms_.
  481
  482    * success(ID, Terms, Projection, Time, More)
  483      ID is the id of the pengine that succeeded in solving the query.
  484      Terms is a list holding instantiations of `Template`.  Projection
  485      is a list of variable names that should be displayed. Time is
  486      the CPU time used to produce the results and finally, More
  487      is either `true` or `false`, indicating whether we can expect the
  488      pengine to be able to return more solutions or not, would we call
  489      pengine_next/2.
  490
  491    * failure(ID)
  492      ID is the id of the pengine that failed for lack of a solutions.
  493
  494    * error(ID, Term)
  495      ID is the id of the pengine throwing the exception.
  496      Term is the exception's error term.
  497
  498    * output(ID, Term)
  499      ID is the id of a pengine running the query that called
  500      pengine_output/1. Term is the term that was passed in the first
  501      argument of pengine_output/1 when it was called.
  502
  503    * prompt(ID, Term)
  504      ID is the id of the pengine that called pengine_input/2 and Term is
  505      the prompt.
  506
  507Defined in terms of pengine_send/3, like so:
  508
  509==
  510pengine_ask(ID, Query, Options) :-
  511    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  512    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  513==
  514*/
  515
  516pengine_ask(ID, Query, Options) :-
  517    partition(pengine_ask_option, Options, AskOptions, SendOptions),
  518    pengine_send(ID, ask(Query, AskOptions), SendOptions).
  519
  520
  521pengine_ask_option(template(_)).
  522pengine_ask_option(chunk(_)).
  523pengine_ask_option(bindings(_)).
  524pengine_ask_option(breakpoints(_)).
  525
  526
  527/** pengine_next(+NameOrID, +Options) is det
  528
  529Asks pengine NameOrID for the  next  solution   to  a  query  started by
  530pengine_ask/3. Defined options are:
  531
  532    * chunk(+Count)
  533    Modify the chunk-size to Count before asking the next set of
  534    solutions.  This may not be used if the goal was started with
  535    chunk(false).
  536
  537Remaining  options  are  passed  to    pengine_send/3.   The  result  of
  538re-executing the current goal is returned  to the caller's message queue
  539in the form of _event terms_.
  540
  541    * success(ID, Terms, Projection, Time, More)
  542      See pengine_ask/3.
  543
  544    * failure(ID)
  545      ID is the id of the pengine that failed for lack of more solutions.
  546
  547    * error(ID, Term)
  548      ID is the id of the pengine throwing the exception.
  549      Term is the exception's error term.
  550
  551    * output(ID, Term)
  552      ID is the id of a pengine running the query that called
  553      pengine_output/1. Term is the term that was passed in the first
  554      argument of pengine_output/1 when it was called.
  555
  556    * prompt(ID, Term)
  557      ID is the id of the pengine that called pengine_input/2 and Term
  558      is the prompt.
  559
  560Defined in terms of pengine_send/3, as follows:
  561
  562==
  563pengine_next(ID, Options) :-
  564    pengine_send(ID, next, Options).
  565==
  566
  567*/
  568
  569pengine_next(ID, Options) :-
  570    select_option(chunk(Count), Options, Options1),
  571    !,
  572    pengine_send(ID, next(Count), Options1).
  573pengine_next(ID, Options) :-
  574    pengine_send(ID, next, Options).
  575
  576
  577/** pengine_stop(+NameOrID, +Options) is det
  578
  579Tells pengine NameOrID to stop looking  for   more  solutions to a query
  580started by pengine_ask/3. Options are passed to pengine_send/3.
  581
  582Defined in terms of pengine_send/3, like so:
  583
  584==
  585pengine_stop(ID, Options) :-
  586    pengine_send(ID, stop, Options).
  587==
  588*/
  589
  590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
  591
  592
  593/** pengine_abort(+NameOrID) is det
  594
  595Aborts the running query. The pengine goes   back  to state `2', waiting
  596for new queries.
  597
  598@see pengine_destroy/1.
  599*/
  600
  601pengine_abort(Name) :-
  602    (   child(Name, Pengine)
  603    ->  true
  604    ;   Pengine = Name
  605    ),
  606    (   pengine_remote(Pengine, Server)
  607    ->  remote_pengine_abort(Server, Pengine, [])
  608    ;   pengine_thread(Pengine, Thread),
  609        debug(pengine(abort), 'Signalling thread ~p', [Thread]),
  610        catch(thread_signal(Thread, throw(abort_query)), _, true)
  611    ).
  612
  613
  614/** pengine_destroy(+NameOrID) is det.
  615    pengine_destroy(+NameOrID, +Options) is det.
  616
  617Destroys the pengine NameOrID.  With the option force(true), the pengine
  618is killed using abort/0 and pengine_destroy/2 succeeds.
  619*/
  620
  621pengine_destroy(ID) :-
  622    pengine_destroy(ID, []).
  623
  624pengine_destroy(Name, Options) :-
  625    (   child(Name, ID)
  626    ->  true
  627    ;   ID = Name
  628    ),
  629    option(force(true), Options),
  630    !,
  631    (   pengine_thread(ID, Thread)
  632    ->  catch(thread_signal(Thread, abort),
  633              error(existence_error(thread, _), _), true)
  634    ;   true
  635    ).
  636pengine_destroy(ID, Options) :-
  637    catch(pengine_send(ID, destroy, Options),
  638          error(existence_error(pengine, ID), _),
  639          retractall(child(_,ID))).
  640
  641
  642/*================= pengines administration =======================
  643*/
  644
  645%!  current_pengine(?Id, ?Parent, ?Location)
  646%
  647%   Dynamic predicate that registers our known pengines.  Id is
  648%   an atomic unique datatype.  Parent is the id of our parent
  649%   pengine.  Location is one of
  650%
  651%     - thread(ThreadId)
  652%     - remote(URL)
  653
  654:- dynamic
  655    current_pengine/6,              % Id, ParentId, Thread, URL, App, Destroy
  656    pengine_queue/4,                % Id, Queue, TimeOut, Time
  657    output_queue/3,                 % Id, Queue, Time
  658    pengine_user/2,                 % Id, User
  659    pengine_data/2,                 % Id, Data
  660    pengine_detached/2.             % Id, Data
  661:- volatile
  662    current_pengine/6,
  663    pengine_queue/4,
  664    output_queue/3,
  665    pengine_user/2,
  666    pengine_data/2,
  667    pengine_detached/2.  668
  669:- thread_local
  670    child/2.                        % ?Name, ?Child
  671
  672%!  pengine_register_local(+Id, +Thread, +Queue, +URL, +App, +Destroy) is det.
  673%!  pengine_register_remote(+Id, +URL, +Queue, +App, +Destroy) is det.
  674%!  pengine_unregister(+Id) is det.
  675
  676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
  677    asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
  678
  679pengine_register_remote(Id, URL, Application, Destroy) :-
  680    thread_self(Queue),
  681    asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
  682
  683%!  pengine_unregister(+Id)
  684%
  685%   Called by the pengine thread  destruction.   If  we are a remote
  686%   pengine thread, our URL  equals  =http=   and  the  queue is the
  687%   message queue used to send events to the HTTP workers.
  688
  689pengine_unregister(Id) :-
  690    thread_self(Me),
  691    (   current_pengine(Id, Queue, Me, http, _, _)
  692    ->  with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
  693    ;   true
  694    ),
  695    retractall(current_pengine(Id, _, Me, _, _, _)),
  696    retractall(pengine_user(Id, _)),
  697    retractall(pengine_data(Id, _)).
  698
  699pengine_unregister_remote(Id) :-
  700    retractall(current_pengine(Id, _Parent, 0, _, _, _)).
  701
  702%!  pengine_self(-Id) is det.
  703%
  704%   True if the current thread is a pengine with Id.
  705
  706pengine_self(Id) :-
  707    thread_self(Thread),
  708    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
  709
  710pengine_parent(Parent) :-
  711    nb_getval(pengine_parent, Parent).
  712
  713pengine_thread(Pengine, Thread) :-
  714    current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
  715    Thread \== 0,
  716    !.
  717
  718pengine_remote(Pengine, URL) :-
  719    current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
  720
  721get_pengine_application(Pengine, Application) :-
  722    current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
  723    !.
  724
  725get_pengine_module(Pengine, Pengine).
  726
  727:- if(current_predicate(uuid/2)).  728pengine_uuid(Id) :-
  729    uuid(Id, [version(4)]).             % Version 4 is random.
  730:- else.  731pengine_uuid(Id) :-
  732    (   current_prolog_flag(max_integer, Max1)
  733    ->  Max is Max1-1
  734    ;   Max is 1<<128
  735    ),
  736    random_between(0, Max, Num),
  737    atom_number(Id, Num).
  738:- endif.  739
  740%!  protect_pengine(+Id, :Goal) is semidet.
  741%
  742%   Run Goal while protecting the Pengine  Id from being destroyed. Used
  743%   by the HTTP  I/O  routines  to   avoid  that  the  Pengine's  module
  744%   disappears while I/O is in progress. We  use a pool of locks because
  745%   the lock may be held relatively long by output routines.
  746%
  747%   This also runs Goal if the Pengine no longer exists. This deals with
  748%   Pengines terminated through destroy_or_continue/1.
  749%
  750%   @bug After destroy_or_continue/1 takes the destroy route, the module
  751%   may drop-out at any point in time,   resulting  in a possible crash.
  752%   Seems the only safe way out is   to  do (de)serialization inside the
  753%   Pengine.
  754
  755:- meta_predicate protect_pengine(+, 0).  756
  757protect_pengine(Id, Goal) :-
  758    term_hash(Id, Hash),
  759    LockN is Hash mod 64,
  760    atom_concat(pengine_done_, LockN, Lock),
  761    with_mutex(Lock,
  762               (   pengine_thread(Id, _)
  763               ->  Goal
  764               ;   Goal
  765               )).
  766
  767
  768/** pengine_application(+Application) is det.
  769
  770Directive that must be used to declare a pengine application module. The
  771module must not be associated to any   file.  The default application is
  772=pengine_sandbox=.  The  example  below  creates    a   new  application
  773=address_book=  and  imports  the  API  defined    in  the  module  file
  774=adress_book_api.pl= into the application.
  775
  776  ==
  777  :- pengine_application(address_book).
  778  :- use_module(address_book:adress_book_api).
  779  ==
  780*/
  781
  782pengine_application(Application) :-
  783    throw(error(context_error(nodirective,
  784                             pengine_application(Application)), _)).
  785
  786:- multifile
  787    system:term_expansion/2,
  788    current_application/1.  789
  790%!  current_pengine_application(?Application) is nondet.
  791%
  792%   True when Application is a currently defined application.
  793%
  794%   @see pengine_application/1
  795
  796current_pengine_application(Application) :-
  797    current_application(Application).
  798
  799
  800% Default settings for all applications
  801
  802:- setting(thread_pool_size, integer, 100,
  803           'Maximum number of pengines this application can run.').  804:- setting(thread_pool_stacks, list(compound), [],
  805           'Maximum stack sizes for pengines this application can run.').  806:- setting(slave_limit, integer, 3,
  807           'Maximum number of slave pengines a master pengine can create.').  808:- setting(time_limit, number, 300,
  809           'Maximum time to wait for output').  810:- setting(idle_limit, number, 300,
  811           'Pengine auto-destroys when idle for this time').  812:- setting(safe_goal_limit, number, 10,
  813           'Maximum time to try proving safety of the goal').  814:- setting(program_space, integer, 100_000_000,
  815           'Maximum memory used by predicates').  816:- setting(allow_from, list(atom), [*],
  817           'IP addresses from which remotes are allowed to connect').  818:- setting(deny_from, list(atom), [],
  819           'IP addresses from which remotes are NOT allowed to connect').  820:- setting(debug_info, boolean, false,
  821           'Keep information to support source-level debugging').  822
  823
  824system:term_expansion((:- pengine_application(Application)), Expanded) :-
  825    must_be(atom, Application),
  826    (   module_property(Application, file(_))
  827    ->  permission_error(create, pengine_application, Application)
  828    ;   true
  829    ),
  830    expand_term((:- setting(Application:thread_pool_size, integer,
  831                            setting(pengines:thread_pool_size),
  832                            'Maximum number of pengines this \c
  833                            application can run.')),
  834                ThreadPoolSizeSetting),
  835    expand_term((:- setting(Application:thread_pool_stacks, list(compound),
  836                            setting(pengines:thread_pool_stacks),
  837                            'Maximum stack sizes for pengines \c
  838                            this application can run.')),
  839                ThreadPoolStacksSetting),
  840    expand_term((:- setting(Application:slave_limit, integer,
  841                            setting(pengines:slave_limit),
  842                            'Maximum number of local slave pengines \c
  843                            a master pengine can create.')),
  844                SlaveLimitSetting),
  845    expand_term((:- setting(Application:time_limit, number,
  846                            setting(pengines:time_limit),
  847                            'Maximum time to wait for output')),
  848                TimeLimitSetting),
  849    expand_term((:- setting(Application:idle_limit, number,
  850                            setting(pengines:idle_limit),
  851                            'Pengine auto-destroys when idle for this time')),
  852                IdleLimitSetting),
  853    expand_term((:- setting(Application:safe_goal_limit, number,
  854                            setting(pengines:safe_goal_limit),
  855                            'Maximum time to try proving safety of the goal')),
  856                SafeGoalLimitSetting),
  857    expand_term((:- setting(Application:program_space, integer,
  858                            setting(pengines:program_space),
  859                            'Maximum memory used by predicates')),
  860                ProgramSpaceSetting),
  861    expand_term((:- setting(Application:allow_from, list(atom),
  862                            setting(pengines:allow_from),
  863                            'IP addresses from which remotes are allowed \c
  864                            to connect')),
  865                AllowFromSetting),
  866    expand_term((:- setting(Application:deny_from, list(atom),
  867                            setting(pengines:deny_from),
  868                            'IP addresses from which remotes are NOT \c
  869                            allowed to connect')),
  870                DenyFromSetting),
  871    expand_term((:- setting(Application:debug_info, boolean,
  872                            setting(pengines:debug_info),
  873                            'Keep information to support source-level \c
  874                            debugging')),
  875                DebugInfoSetting),
  876    flatten([ pengines:current_application(Application),
  877              ThreadPoolSizeSetting,
  878              ThreadPoolStacksSetting,
  879              SlaveLimitSetting,
  880              TimeLimitSetting,
  881              IdleLimitSetting,
  882              SafeGoalLimitSetting,
  883              ProgramSpaceSetting,
  884              AllowFromSetting,
  885              DenyFromSetting,
  886              DebugInfoSetting
  887            ], Expanded).
  888
  889% Register default application
  890
  891:- pengine_application(pengine_sandbox).  892
  893
  894/** pengine_property(?Pengine, ?Property) is nondet.
  895
  896True when Property is a property of   the  given Pengine. Enumerates all
  897pengines  that  are  known  to  the   calling  Prolog  process.  Defined
  898properties are:
  899
  900  * self(ID)
  901    Identifier of the pengine.  This is the same as the first argument,
  902    and can be used to enumerate all known pengines.
  903  * alias(Name)
  904    Name is the alias name of the pengine, as provided through the
  905    `alias` option when creating the pengine.
  906  * thread(Thread)
  907    If the pengine is a local pengine, Thread is the Prolog thread
  908    identifier of the pengine.
  909  * remote(Server)
  910    If the pengine is remote, the URL of the server.
  911  * application(Application)
  912    Pengine runs the given application
  913  * module(Module)
  914    Temporary module used for running the Pengine.
  915  * destroy(Destroy)
  916    Destroy is =true= if the pengines is destroyed automatically
  917    after completing the query.
  918  * parent(Queue)
  919    Message queue to which the (local) pengine reports.
  920  * source(?SourceID, ?Source)
  921    Source is the source code with the given SourceID. May be present if
  922    the setting `debug_info` is present.
  923  * detached(?Time)
  924    Pengine was detached at Time.
  925*/
  926
  927
  928pengine_property(Id, Prop) :-
  929    nonvar(Id), nonvar(Prop),
  930    pengine_property2(Prop, Id),
  931    !.
  932pengine_property(Id, Prop) :-
  933    pengine_property2(Prop, Id).
  934
  935pengine_property2(self(Id), Id) :-
  936    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  937pengine_property2(module(Id), Id) :-
  938    current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
  939pengine_property2(alias(Alias), Id) :-
  940    child(Alias, Id),
  941    Alias \== Id.
  942pengine_property2(thread(Thread), Id) :-
  943    current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
  944    Thread \== 0.
  945pengine_property2(remote(Server), Id) :-
  946    current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
  947pengine_property2(application(Application), Id) :-
  948    current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
  949pengine_property2(destroy(Destroy), Id) :-
  950    current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
  951pengine_property2(parent(Parent), Id) :-
  952    current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
  953pengine_property2(source(SourceID, Source), Id) :-
  954    pengine_data(Id, source(SourceID, Source)).
  955pengine_property2(detached(When), Id) :-
  956    pengine_detached(Id, When).
  957
  958/** pengine_output(+Term) is det
  959
  960Sends Term to the parent pengine or thread.
  961*/
  962
  963pengine_output(Term) :-
  964    pengine_self(Me),
  965    pengine_reply(output(Me, Term)).
  966
  967
  968/** pengine_debug(+Format, +Args) is det
  969
  970Create a message using format/3 from Format   and  Args and send this to
  971the    client.    The    default    JavaScript    client    will    call
  972=|console.log(Message)|=  if  there  is   a    console.   The  predicate
  973pengine_rpc/3 calls debug(pengine(debug), '~w',   [Message]).  The debug
  974topic pengine(debug) is enabled by default.
  975
  976@see debug/1 and nodebug/1 for controlling the pengine(debug) topic
  977@see format/2 for format specifications
  978*/
  979
  980pengine_debug(Format, Args) :-
  981    pengine_parent(Queue),
  982    pengine_self(Self),
  983    catch(safe_goal(format(atom(_), Format, Args)), E, true),
  984    (   var(E)
  985    ->  format(atom(Message), Format, Args)
  986    ;   message_to_string(E, Message)
  987    ),
  988    pengine_reply(Queue, debug(Self, Message)).
  989
  990
  991/*================= Local pengine =======================
  992*/
  993
  994%!  local_pengine_create(+Options)
  995%
  996%   Creates  a  local   Pengine,   which    is   a   thread  running
  997%   pengine_main/2.  It maintains two predicates:
  998%
  999%     - The global dynamic predicate id/2 relates Pengines to their
 1000%       childs.
 1001%     - The local predicate id/2 maps named childs to their ids.
 1002
 1003local_pengine_create(Options) :-
 1004    thread_self(Self),
 1005    option(application(Application), Options, pengine_sandbox),
 1006    create(Self, Child, Options, local, Application),
 1007    option(alias(Name), Options, Child),
 1008    assert(child(Name, Child)).
 1009
 1010
 1011%!  thread_pool:create_pool(+Application) is det.
 1012%
 1013%   On demand creation of a thread pool for a pengine application.
 1014
 1015:- multifile thread_pool:create_pool/1. 1016
 1017thread_pool:create_pool(Application) :-
 1018    current_application(Application),
 1019    setting(Application:thread_pool_size, Size),
 1020    setting(Application:thread_pool_stacks, Stacks),
 1021    thread_pool_create(Application, Size, Stacks).
 1022
 1023%!  create(+Queue, -Child, +Options, +URL, +Application) is det.
 1024%
 1025%   Create a new pengine thread.
 1026%
 1027%   @arg Queue is the queue (or thread handle) to report to
 1028%   @arg Child is the identifier of the created pengine.
 1029%   @arg URL is one of =local= or =http=
 1030
 1031create(Queue, Child, Options, local, Application) :-
 1032    !,
 1033    pengine_child_id(Child),
 1034    create0(Queue, Child, Options, local, Application).
 1035create(Queue, Child, Options, URL, Application) :-
 1036    pengine_child_id(Child),
 1037    catch(create0(Queue, Child, Options, URL, Application),
 1038          Error,
 1039          create_error(Queue, Child, Error)).
 1040
 1041pengine_child_id(Child) :-
 1042    (   nonvar(Child)
 1043    ->  true
 1044    ;   pengine_uuid(Child)
 1045    ).
 1046
 1047create_error(Queue, Child, Error) :-
 1048    pengine_reply(Queue, error(Child, Error)).
 1049
 1050create0(Queue, Child, Options, URL, Application) :-
 1051    (  current_application(Application)
 1052    -> true
 1053    ;  existence_error(pengine_application, Application)
 1054    ),
 1055    (   URL \== http                    % pengine is _not_ a child of the
 1056                                        % HTTP server thread
 1057    ->  aggregate_all(count, child(_,_), Count),
 1058        setting(Application:slave_limit, Max),
 1059        (   Count >= Max
 1060        ->  throw(error(resource_error(max_pengines), _))
 1061        ;   true
 1062        )
 1063    ;   true
 1064    ),
 1065    partition(pengine_create_option, Options, PengineOptions, RestOptions),
 1066    thread_create_in_pool(
 1067        Application,
 1068        pengine_main(Queue, PengineOptions, Application), ChildThread,
 1069        [ at_exit(pengine_done)
 1070        | RestOptions
 1071        ]),
 1072    option(destroy(Destroy), PengineOptions, true),
 1073    pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
 1074    thread_send_message(ChildThread, pengine_registered(Child)),
 1075    (   option(id(Id), Options)
 1076    ->  Id = Child
 1077    ;   true
 1078    ).
 1079
 1080pengine_create_option(src_text(_)).
 1081pengine_create_option(src_url(_)).
 1082pengine_create_option(application(_)).
 1083pengine_create_option(destroy(_)).
 1084pengine_create_option(ask(_)).
 1085pengine_create_option(template(_)).
 1086pengine_create_option(bindings(_)).
 1087pengine_create_option(chunk(_)).
 1088pengine_create_option(alias(_)).
 1089pengine_create_option(user(_)).
 1090
 1091
 1092%!  pengine_done is det.
 1093%
 1094%   Called from the pengine thread   `at_exit`  option. Destroys _child_
 1095%   pengines  using  pengine_destroy/1.  Cleaning  up   the  Pengine  is
 1096%   synchronised by the `pengine_done` mutex. See read_event/6.
 1097
 1098:- public
 1099    pengine_done/0. 1100
 1101pengine_done :-
 1102    thread_self(Me),
 1103    (   thread_property(Me, status(exception(Ex))),
 1104        abort_exception(Ex),
 1105        thread_detach(Me),
 1106        pengine_self(Pengine)
 1107    ->  catch(pengine_reply(destroy(Pengine, abort(Pengine))),
 1108              error(_,_), true)
 1109    ;   true
 1110    ),
 1111    forall(child(_Name, Child),
 1112           pengine_destroy(Child)),
 1113    pengine_self(Id),
 1114    protect_pengine(Id, pengine_unregister(Id)).
 1115
 1116abort_exception('$aborted').
 1117abort_exception(unwind(abort)).
 1118
 1119%!  pengine_main(+Parent, +Options, +Application)
 1120%
 1121%   Run a pengine main loop. First acknowledges its creation and run
 1122%   pengine_main_loop/1.
 1123
 1124:- thread_local wrap_first_answer_in_create_event/2. 1125
 1126:- meta_predicate
 1127    pengine_prepare_source(:, +). 1128
 1129pengine_main(Parent, Options, Application) :-
 1130    fix_streams,
 1131    thread_get_message(pengine_registered(Self)),
 1132    nb_setval(pengine_parent, Parent),
 1133    pengine_register_user(Options),
 1134    set_prolog_flag(mitigate_spectre, true),
 1135    catch(in_temporary_module(
 1136              Self,
 1137              pengine_prepare_source(Application, Options),
 1138              pengine_create_and_loop(Self, Application, Options)),
 1139          prepare_source_failed,
 1140          pengine_terminate(Self)).
 1141
 1142pengine_create_and_loop(Self, Application, Options) :-
 1143    setting(Application:slave_limit, SlaveLimit),
 1144    CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
 1145    (   option(ask(Query0), Options)
 1146    ->  asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
 1147        (   string(Query0)                      % string is not callable
 1148        ->  (   option(template(TemplateS), Options)
 1149            ->  Ask2 = Query0-TemplateS
 1150            ;   Ask2 = Query0
 1151            ),
 1152            catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
 1153                  Error, true),
 1154            (   var(Error)
 1155            ->  true
 1156            ;   send_error(Error),
 1157                throw(prepare_source_failed)
 1158            )
 1159        ;   Query = Query0,
 1160            option(template(Template), Options, Query),
 1161            option(bindings(Bindings), Options, [])
 1162        ),
 1163        option(chunk(Chunk), Options, 1),
 1164        pengine_ask(Self, Query,
 1165                    [ template(Template),
 1166                      chunk(Chunk),
 1167                      bindings(Bindings)
 1168                    ])
 1169    ;   Extra = [],
 1170        pengine_reply(CreateEvent)
 1171    ),
 1172    pengine_main_loop(Self).
 1173
 1174
 1175%!  ask_to_term(+AskSpec, +Module, -Options, OptionsTail) is det.
 1176%
 1177%   Translate the AskSpec into a query, template and bindings. The trick
 1178%   is that we must parse using the  operator declarations of the source
 1179%   and we must make sure  variable   sharing  between  query and answer
 1180%   template are known.
 1181
 1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
 1183    !,
 1184    format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
 1185    term_string(t(Template1,Ask1), AskTemplate,
 1186                [ variable_names(Bindings0),
 1187                  module(Module)
 1188                ]),
 1189    phrase(template_bindings(Template1, Bindings0), Bindings).
 1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
 1191    term_string(Ask1, Ask,
 1192                [ variable_names(Bindings),
 1193                  module(Module)
 1194                ]),
 1195    exclude(anon, Bindings, Bindings1),
 1196    dict_create(Template, swish_default_template, Bindings1).
 1197
 1198template_bindings(Var, Bindings) -->
 1199    { var(Var) }, !,
 1200    (   { var_binding(Bindings, Var, Binding)
 1201        }
 1202    ->  [Binding]
 1203    ;   []
 1204    ).
 1205template_bindings([H|T], Bindings) -->
 1206    !,
 1207    template_bindings(H, Bindings),
 1208    template_bindings(T, Bindings).
 1209template_bindings(Compoound, Bindings) -->
 1210    { compound(Compoound), !,
 1211      compound_name_arguments(Compoound, _, Args)
 1212    },
 1213    template_bindings(Args, Bindings).
 1214template_bindings(_, _) --> [].
 1215
 1216var_binding(Bindings, Var, Binding) :-
 1217    member(Binding, Bindings),
 1218    arg(2, Binding, V),
 1219    V == Var, !.
 1220
 1221%!  fix_streams is det.
 1222%
 1223%   If we are a pengine that is   created  from a web server thread,
 1224%   the current output points to a CGI stream.
 1225
 1226fix_streams :-
 1227    fix_stream(current_output).
 1228
 1229fix_stream(Name) :-
 1230    is_cgi_stream(Name),
 1231    !,
 1232    debug(pengine(stream), '~w is a CGI stream!', [Name]),
 1233    set_stream(user_output, alias(Name)).
 1234fix_stream(_).
 1235
 1236%!  pengine_prepare_source(:Application, +Options) is det.
 1237%
 1238%   Load the source into the pengine's module.
 1239%
 1240%   @throws =prepare_source_failed= if it failed to prepare the
 1241%           sources.
 1242
 1243pengine_prepare_source(Module:Application, Options) :-
 1244    setting(Application:program_space, SpaceLimit),
 1245    set_module(Module:program_space(SpaceLimit)),
 1246    delete_import_module(Module, user),
 1247    add_import_module(Module, Application, start),
 1248    catch(prep_module(Module, Application, Options), Error, true),
 1249    (   var(Error)
 1250    ->  true
 1251    ;   send_error(Error),
 1252        throw(prepare_source_failed)
 1253    ).
 1254
 1255prep_module(Module, Application, Options) :-
 1256    maplist(copy_flag(Module, Application), [var_prefix]),
 1257    forall(prepare_module(Module, Application, Options), true),
 1258    setup_call_cleanup(
 1259        '$set_source_module'(OldModule, Module),
 1260        maplist(process_create_option(Module), Options),
 1261        '$set_source_module'(OldModule)).
 1262
 1263copy_flag(Module, Application, Flag) :-
 1264    current_prolog_flag(Application:Flag, Value),
 1265    !,
 1266    set_prolog_flag(Module:Flag, Value).
 1267copy_flag(_, _, _).
 1268
 1269process_create_option(Application, src_text(Text)) :-
 1270    !,
 1271    pengine_src_text(Text, Application).
 1272process_create_option(Application, src_url(URL)) :-
 1273    !,
 1274    pengine_src_url(URL, Application).
 1275process_create_option(_, _).
 1276
 1277
 1278%!  prepare_module(+Module, +Application, +Options) is semidet.
 1279%
 1280%   Hook, called to initialize  the   temporary  private module that
 1281%   provides the working context of a pengine. This hook is executed
 1282%   by the pengine's thread.  Preparing the source consists of three
 1283%   steps:
 1284%
 1285%     1. Add Application as (first) default import module for Module
 1286%     2. Call this hook
 1287%     3. Compile the source provided by the the `src_text` and
 1288%        `src_url` options
 1289%
 1290%   @arg    Module is a new temporary module (see
 1291%           in_temporary_module/3) that may be (further) prepared
 1292%           by this hook.
 1293%   @arg    Application (also a module) associated to the pengine.
 1294%   @arg    Options is passed from the environment and should
 1295%           (currently) be ignored.
 1296
 1297
 1298pengine_main_loop(ID) :-
 1299    catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
 1300
 1301pengine_aborted(ID) :-
 1302    thread_self(Self),
 1303    debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
 1304    empty_queue,
 1305    destroy_or_continue(abort(ID)).
 1306
 1307
 1308%!  guarded_main_loop(+Pengine) is det.
 1309%
 1310%   Executes state `2' of  the  pengine,   where  it  waits  for two
 1311%   events:
 1312%
 1313%     - destroy
 1314%     Terminate the pengine
 1315%     - ask(:Goal, +Options)
 1316%     Solve Goal.
 1317
 1318guarded_main_loop(ID) :-
 1319    pengine_request(Request),
 1320    (   Request = destroy
 1321    ->  debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
 1322        pengine_terminate(ID)
 1323    ;   Request = ask(Goal, Options)
 1324    ->  debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
 1325        ask(ID, Goal, Options)
 1326    ;   debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
 1327        pengine_reply(error(ID, error(protocol_error, _))),
 1328        guarded_main_loop(ID)
 1329    ).
 1330
 1331
 1332pengine_terminate(ID) :-
 1333    pengine_reply(destroy(ID)),
 1334    thread_self(Me),            % Make the thread silently disappear
 1335    thread_detach(Me).
 1336
 1337
 1338%!  solve(+Chunk, +Template, :Goal, +ID) is det.
 1339%
 1340%   Solve Goal. Note that because we can ask for a new goal in state
 1341%   `6', we must provide for an ancesteral cut (prolog_cut_to/1). We
 1342%   need to be sure to  have  a   choice  point  before  we can call
 1343%   prolog_current_choice/1. This is the reason   why this predicate
 1344%   has two clauses.
 1345
 1346solve(Chunk, Template, Goal, ID) :-
 1347    prolog_current_choice(Choice),
 1348    (   integer(Chunk)
 1349    ->  State = count(Chunk)
 1350    ;   Chunk == false
 1351    ->  State = no_chunk
 1352    ;   domain_error(chunk, Chunk)
 1353    ),
 1354    statistics(cputime, Epoch),
 1355    Time = time(Epoch),
 1356    nb_current('$variable_names', Bindings),
 1357    filter_template(Template, Bindings, Template2),
 1358    '$current_typein_module'(CurrTypeIn),
 1359    (   '$set_typein_module'(ID),
 1360        call_cleanup(catch(findnsols_no_empty(State, Template2,
 1361                                              set_projection(Goal, Bindings),
 1362                                              Result),
 1363                           Error, true),
 1364                     query_done(Det, CurrTypeIn)),
 1365        arg(1, Time, T0),
 1366        statistics(cputime, T1),
 1367        CPUTime is T1-T0,
 1368        forall(pengine_flush_output_hook, true),
 1369        (   var(Error)
 1370        ->  projection(Projection),
 1371            (   var(Det)
 1372            ->  pengine_reply(success(ID, Result, Projection,
 1373                                      CPUTime, true)),
 1374                more_solutions(ID, Choice, State, Time)
 1375            ;   !,                      % commit
 1376                destroy_or_continue(success(ID, Result, Projection,
 1377                                            CPUTime, false))
 1378            )
 1379        ;   !,                          % commit
 1380            (   Error == abort_query
 1381            ->  throw(Error)
 1382            ;   destroy_or_continue(error(ID, Error))
 1383            )
 1384        )
 1385    ;   !,                              % commit
 1386        arg(1, Time, T0),
 1387        statistics(cputime, T1),
 1388        CPUTime is T1-T0,
 1389        destroy_or_continue(failure(ID, CPUTime))
 1390    ).
 1391solve(_, _, _, _).                      % leave a choice point
 1392
 1393query_done(true, CurrTypeIn) :-
 1394    '$set_typein_module'(CurrTypeIn).
 1395
 1396
 1397%!  set_projection(:Goal, +Bindings)
 1398%
 1399%   findnsols_no_empty/4  copies  its  goal  and    template   to  avoid
 1400%   instantiation thereof when it stops after finding N solutions. Using
 1401%   this helper we can a renamed version of Bindings that we can set.
 1402
 1403set_projection(Goal, Bindings) :-
 1404    b_setval('$variable_names', Bindings),
 1405    call(Goal).
 1406
 1407projection(Projection) :-
 1408    nb_current('$variable_names', Bindings),
 1409    !,
 1410    maplist(var_name, Bindings, Projection).
 1411projection([]).
 1412
 1413%!  filter_template(+Template0, +Bindings, -Template) is det.
 1414%
 1415%   Establish the final template. This is   there  because hooks such as
 1416%   goal_expansion/2 and the SWISH query  hooks   can  modify the set of
 1417%   bindings.
 1418%
 1419%   @bug Projection and template handling is pretty messy.
 1420
 1421filter_template(Template0, Bindings, Template) :-
 1422    is_dict(Template0, swish_default_template),
 1423    !,
 1424    dict_create(Template, swish_default_template, Bindings).
 1425filter_template(Template, _Bindings, Template).
 1426
 1427findnsols_no_empty(no_chunk, Template, Goal, List) =>
 1428    List = [Template],
 1429    call(Goal).
 1430findnsols_no_empty(State, Template, Goal, List) =>
 1431    findnsols(State, Template, Goal, List),
 1432    List \== [].
 1433
 1434destroy_or_continue(Event) :-
 1435    arg(1, Event, ID),
 1436    (   pengine_property(ID, destroy(true))
 1437    ->  thread_self(Me),
 1438        thread_detach(Me),
 1439        pengine_reply(destroy(ID, Event))
 1440    ;   pengine_reply(Event),
 1441        guarded_main_loop(ID)
 1442    ).
 1443
 1444%!  more_solutions(+Pengine, +Choice, +State, +Time)
 1445%
 1446%   Called after a solution was found while  there can be more. This
 1447%   is state `6' of the state machine. It processes these events:
 1448%
 1449%     * stop
 1450%     Go back via state `7' to state `2' (guarded_main_loop/1)
 1451%     * next
 1452%     Fail.  This causes solve/3 to backtrack on the goal asked,
 1453%     providing at most the current `chunk` solutions.
 1454%     * next(Count)
 1455%     As `next`, but sets the new chunk-size to Count.
 1456%     * ask(Goal, Options)
 1457%     Ask another goal.  Note that we must commit the choice point
 1458%     of the previous goal asked for.
 1459
 1460more_solutions(ID, Choice, State, Time) :-
 1461    pengine_request(Event),
 1462    more_solutions(Event, ID, Choice, State, Time).
 1463
 1464more_solutions(stop, ID, _Choice, _State, _Time) :-
 1465    !,
 1466    debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
 1467    destroy_or_continue(stop(ID)).
 1468more_solutions(next, ID, _Choice, _State, Time) :-
 1469    !,
 1470    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
 1471    statistics(cputime, T0),
 1472    nb_setarg(1, Time, T0),
 1473    fail.
 1474more_solutions(next(Count), ID, _Choice, State, Time) :-
 1475    Count > 0,
 1476    State = count(_),                   % else fallthrough to protocol error
 1477    !,
 1478    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
 1479    nb_setarg(1, State, Count),
 1480    statistics(cputime, T0),
 1481    nb_setarg(1, Time, T0),
 1482    fail.
 1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
 1484    !,
 1485    debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
 1486    prolog_cut_to(Choice),
 1487    ask(ID, Goal, Options).
 1488more_solutions(destroy, ID, _Choice, _State, _Time) :-
 1489    !,
 1490    debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
 1491    pengine_terminate(ID).
 1492more_solutions(Event, ID, Choice, State, Time) :-
 1493    debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
 1494    pengine_reply(error(ID, error(protocol_error, _))),
 1495    more_solutions(ID, Choice, State, Time).
 1496
 1497%!  ask(+Pengine, :Goal, +Options)
 1498%
 1499%   Migrate from state `2' to `3'.  This predicate validates that it
 1500%   is safe to call Goal using safe_goal/1 and then calls solve/3 to
 1501%   prove the goal. It takes care of the chunk(N) option.
 1502
 1503ask(ID, Goal, Options) :-
 1504    catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
 1505    !,
 1506    (   var(Error)
 1507    ->  option(template(Template), Options, Goal),
 1508        option(chunk(N), Options, 1),
 1509        solve(N, Template, Goal1, ID)
 1510    ;   pengine_reply(error(ID, Error)),
 1511        guarded_main_loop(ID)
 1512    ).
 1513
 1514%!  prepare_goal(+Pengine, +GoalIn, -GoalOut, +Options) is det.
 1515%
 1516%   Prepare GoalIn for execution in Pengine.   This  implies we must
 1517%   perform goal expansion and, if the   system  is sandboxed, check
 1518%   the sandbox.
 1519%
 1520%   Note that expand_goal(Module:GoalIn, GoalOut) is  what we'd like
 1521%   to write, but this does not work correctly if the user wishes to
 1522%   expand `X:Y` while interpreting `X` not   as the module in which
 1523%   to run `Y`. This happens in the  CQL package. Possibly we should
 1524%   disallow this reinterpretation?
 1525
 1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
 1527    option(bindings(Bindings), Options, []),
 1528    b_setval('$variable_names', Bindings),
 1529    (   prepare_goal(Goal0, Goal1, Options)
 1530    ->  true
 1531    ;   Goal1 = Goal0
 1532    ),
 1533    get_pengine_module(ID, Module),
 1534    setup_call_cleanup(
 1535        '$set_source_module'(Old, Module),
 1536        expand_goal(Goal1, Goal),
 1537        '$set_source_module'(_, Old)),
 1538    (   pengine_not_sandboxed(ID)
 1539    ->  true
 1540    ;   get_pengine_application(ID, App),
 1541        setting(App:safe_goal_limit, Limit),
 1542        catch(call_with_time_limit(
 1543                  Limit,
 1544                  safe_goal(Module:Goal)), E, true)
 1545    ->  (   var(E)
 1546        ->  true
 1547        ;   E = time_limit_exceeded
 1548        ->  throw(error(sandbox(time_limit_exceeded, Limit),_))
 1549        ;   throw(E)
 1550        )
 1551    ).
 1552
 1553
 1554%%  prepare_goal(+Goal0, -Goal1, +Options) is semidet.
 1555%
 1556%   Pre-preparation hook for running Goal0. The hook runs in the context
 1557%   of the pengine. Goal is the raw   goal  given to _ask_. The returned
 1558%   Goal1 is subject  to  goal   expansion  (expand_goal/2)  and sandbox
 1559%   validation (safe_goal/1) prior to  execution.   If  this goal fails,
 1560%   Goal0 is used for further processing.
 1561%
 1562%   @arg Options provides the options as given to _ask_
 1563
 1564
 1565%%  pengine_not_sandboxed(+Pengine) is semidet.
 1566%
 1567%   True when pengine does not operate in sandboxed mode. This implies a
 1568%   user must be  registered  by   authentication_hook/3  and  the  hook
 1569%   pengines:not_sandboxed(User, Application) must succeed.
 1570
 1571pengine_not_sandboxed(ID) :-
 1572    pengine_user(ID, User),
 1573    pengine_property(ID, application(App)),
 1574    not_sandboxed(User, App),
 1575    !.
 1576
 1577%%  not_sandboxed(+User, +Application) is semidet.
 1578%
 1579%   This hook is called to see whether the Pengine must be executed in a
 1580%   protected environment. It is only called after authentication_hook/3
 1581%   has confirmed the authentity  of  the   current  user.  If this hook
 1582%   succeeds, both loading the code and  executing the query is executed
 1583%   without enforcing sandbox security.  Typically, one should:
 1584%
 1585%     1. Provide a safe user authentication hook.
 1586%     2. Enable HTTPS in the server or put it behind an HTTPS proxy and
 1587%        ensure that the network between the proxy and the pengine
 1588%        server can be trusted.
 1589
 1590
 1591/** pengine_pull_response(+Pengine, +Options) is det
 1592
 1593Pulls a response (an event term) from the  slave Pengine if Pengine is a
 1594remote process, else does nothing at all.
 1595*/
 1596
 1597pengine_pull_response(Pengine, Options) :-
 1598    pengine_remote(Pengine, Server),
 1599    !,
 1600    remote_pengine_pull_response(Server, Pengine, Options).
 1601pengine_pull_response(_ID, _Options).
 1602
 1603
 1604/** pengine_input(+Prompt, -Term) is det
 1605
 1606Sends Prompt to the master (parent) pengine and waits for input. Note that Prompt may be
 1607any term, compound as well as atomic.
 1608*/
 1609
 1610pengine_input(Prompt, Term) :-
 1611    pengine_self(Self),
 1612    pengine_parent(Parent),
 1613    pengine_reply(Parent, prompt(Self, Prompt)),
 1614    pengine_request(Request),
 1615    (   Request = input(Input)
 1616    ->  Term = Input
 1617    ;   Request == destroy
 1618    ->  abort
 1619    ;   throw(error(protocol_error,_))
 1620    ).
 1621
 1622
 1623/** pengine_respond(+Pengine, +Input, +Options) is det
 1624
 1625Sends a response in the form of the term Input to a slave (child) pengine
 1626that has prompted its master (parent) for input.
 1627
 1628Defined in terms of pengine_send/3, as follows:
 1629
 1630==
 1631pengine_respond(Pengine, Input, Options) :-
 1632    pengine_send(Pengine, input(Input), Options).
 1633==
 1634
 1635*/
 1636
 1637pengine_respond(Pengine, Input, Options) :-
 1638    pengine_send(Pengine, input(Input), Options).
 1639
 1640
 1641%!  send_error(+Error) is det.
 1642%
 1643%   Send an error to my parent.   Remove non-readable blobs from the
 1644%   error term first using replace_blobs/2. If  the error contains a
 1645%   stack-trace, this is resolved to a string before sending.
 1646
 1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
 1648    is_list(Frames),
 1649    !,
 1650    with_output_to(string(Stack),
 1651                   print_prolog_backtrace(current_output, Frames)),
 1652    pengine_self(Self),
 1653    replace_blobs(Formal, Formal1),
 1654    replace_blobs(Message, Message1),
 1655    pengine_reply(error(Self, error(Formal1,
 1656                                    context(prolog_stack(Stack), Message1)))).
 1657send_error(Error) :-
 1658    pengine_self(Self),
 1659    replace_blobs(Error, Error1),
 1660    pengine_reply(error(Self, Error1)).
 1661
 1662%!  replace_blobs(Term0, Term) is det.
 1663%
 1664%   Copy Term0 to Term, replacing non-text   blobs. This is required
 1665%   for error messages that may hold   streams  and other handles to
 1666%   non-readable objects.
 1667
 1668replace_blobs(Blob, Atom) :-
 1669    blob(Blob, Type), Type \== text,
 1670    !,
 1671    format(atom(Atom), '~p', [Blob]).
 1672replace_blobs(Term0, Term) :-
 1673    compound(Term0),
 1674    !,
 1675    compound_name_arguments(Term0, Name, Args0),
 1676    maplist(replace_blobs, Args0, Args),
 1677    compound_name_arguments(Term, Name, Args).
 1678replace_blobs(Term, Term).
 1679
 1680
 1681/*================= Remote pengines =======================
 1682*/
 1683
 1684
 1685remote_pengine_create(BaseURL, Options) :-
 1686    partition(pengine_create_option, Options, PengineOptions0, RestOptions),
 1687        (       option(ask(Query), PengineOptions0),
 1688                \+ option(template(_Template), PengineOptions0)
 1689        ->      PengineOptions = [template(Query)|PengineOptions0]
 1690        ;       PengineOptions = PengineOptions0
 1691        ),
 1692    options_to_dict(PengineOptions, PostData),
 1693    remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
 1694    arg(1, Reply, ID),
 1695    (   option(id(ID2), Options)
 1696    ->  ID = ID2
 1697    ;   true
 1698    ),
 1699    option(alias(Name), Options, ID),
 1700    assert(child(Name, ID)),
 1701    (   (   functor(Reply, create, _)   % actually created
 1702        ;   functor(Reply, output, _)   % compiler messages
 1703        )
 1704    ->  option(application(Application), PengineOptions, pengine_sandbox),
 1705        option(destroy(Destroy), PengineOptions, true),
 1706        pengine_register_remote(ID, BaseURL, Application, Destroy)
 1707    ;   true
 1708    ),
 1709    thread_self(Queue),
 1710    pengine_reply(Queue, Reply).
 1711
 1712options_to_dict(Options, Dict) :-
 1713    select_option(ask(Ask), Options, Options1),
 1714    select_option(template(Template), Options1, Options2),
 1715    !,
 1716    no_numbered_var_in(Ask+Template),
 1717    findall(AskString-TemplateString,
 1718            ask_template_to_strings(Ask, Template, AskString, TemplateString),
 1719            [ AskString-TemplateString ]),
 1720    options_to_dict(Options2, Dict0),
 1721    Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
 1722options_to_dict(Options, Dict) :-
 1723    maplist(prolog_option, Options, Options1),
 1724    dict_create(Dict, _, Options1).
 1725
 1726no_numbered_var_in(Term) :-
 1727    sub_term(Sub, Term),
 1728    subsumes_term('$VAR'(_), Sub),
 1729    !,
 1730    domain_error(numbered_vars_free_term, Term).
 1731no_numbered_var_in(_).
 1732
 1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
 1734    numbervars(Ask+Template, 0, _),
 1735    WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
 1736    format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
 1737                                            Template, WOpts
 1738                                          ]),
 1739    split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
 1740
 1741prolog_option(Option0, Option) :-
 1742    create_option_type(Option0, term),
 1743    !,
 1744    Option0 =.. [Name,Value],
 1745    format(string(String), '~k', [Value]),
 1746    Option =.. [Name,String].
 1747prolog_option(Option, Option).
 1748
 1749create_option_type(ask(_),         term).
 1750create_option_type(template(_),    term).
 1751create_option_type(application(_), atom).
 1752
 1753remote_pengine_send(BaseURL, ID, Event, Options) :-
 1754    remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
 1755    thread_self(Queue),
 1756    pengine_reply(Queue, Reply).
 1757
 1758remote_pengine_pull_response(BaseURL, ID, Options) :-
 1759    remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
 1760    thread_self(Queue),
 1761    pengine_reply(Queue, Reply).
 1762
 1763remote_pengine_abort(BaseURL, ID, Options) :-
 1764    remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
 1765    thread_self(Queue),
 1766    pengine_reply(Queue, Reply).
 1767
 1768%!  remote_send_rec(+Server, +Action, +ID, +Params, -Reply, +Options)
 1769%
 1770%   Issue a GET request on Server and   unify Reply with the replied
 1771%   term.
 1772
 1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
 1774    !,
 1775    server_url(Server, Action, [id=ID], URL),
 1776    http_open(URL, Stream,              % putting this in setup_call_cleanup/3
 1777              [ post(prolog(Event))     % makes it impossible to interrupt.
 1778              | Options
 1779              ]),
 1780    call_cleanup(
 1781        read_prolog_reply(Stream, Reply),
 1782        close(Stream)).
 1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
 1784    server_url(Server, Action, [id=ID|Params], URL),
 1785    http_open(URL, Stream, Options),
 1786    call_cleanup(
 1787        read_prolog_reply(Stream, Reply),
 1788        close(Stream)).
 1789
 1790remote_post_rec(Server, Action, Data, Reply, Options) :-
 1791    server_url(Server, Action, [], URL),
 1792    probe(Action, URL, Options),
 1793    http_open(URL, Stream,
 1794              [ post(json(Data))
 1795              | Options
 1796              ]),
 1797    call_cleanup(
 1798        read_prolog_reply(Stream, Reply),
 1799        close(Stream)).
 1800
 1801%!  probe(+Action, +URL) is det.
 1802%
 1803%   Probe the target. This is a  good   idea  before posting a large
 1804%   document and be faced with an authentication challenge. Possibly
 1805%   we should make this an option for simpler scenarios.
 1806
 1807probe(create, URL, Options) :-
 1808    !,
 1809    http_open(URL, Stream, [method(options)|Options]),
 1810    close(Stream).
 1811probe(_, _, _).
 1812
 1813read_prolog_reply(In, Reply) :-
 1814    set_stream(In, encoding(utf8)),
 1815    read(In, Reply0),
 1816    rebind_cycles(Reply0, Reply).
 1817
 1818rebind_cycles(@(Reply, Bindings), Reply) :-
 1819    is_list(Bindings),
 1820    !,
 1821    maplist(bind, Bindings).
 1822rebind_cycles(Reply, Reply).
 1823
 1824bind(Var = Value) :-
 1825    Var = Value.
 1826
 1827server_url(Server, Action, Params, URL) :-
 1828    atom_concat('pengine/', Action, PAction),
 1829    uri_edit([ path(PAction),
 1830               search(Params)
 1831             ], Server, URL).
 1832
 1833
 1834/** pengine_event(?EventTerm) is det.
 1835    pengine_event(?EventTerm, +Options) is det.
 1836
 1837Examines the pengine's event queue  and   if  necessary blocks execution
 1838until a term that unifies to Term  arrives   in  the queue. After a term
 1839from the queue has been unified to Term,   the  term is deleted from the
 1840queue.
 1841
 1842   Valid options are:
 1843
 1844   * timeout(+Time)
 1845     Time is a float or integer and specifies the maximum time to wait
 1846     in seconds. If no event has arrived before the time is up EventTerm
 1847     is bound to the atom =timeout=.
 1848   * listen(+Id)
 1849     Only listen to events from the pengine identified by Id.
 1850*/
 1851
 1852pengine_event(Event) :-
 1853    pengine_event(Event, []).
 1854
 1855pengine_event(Event, Options) :-
 1856    thread_self(Self),
 1857    option(listen(Id), Options, _),
 1858    (   thread_get_message(Self, pengine_event(Id, Event), Options)
 1859    ->  true
 1860    ;   Event = timeout
 1861    ),
 1862    update_remote_destroy(Event).
 1863
 1864update_remote_destroy(Event) :-
 1865    destroy_event(Event),
 1866    arg(1, Event, Id),
 1867    pengine_remote(Id, _Server),
 1868    !,
 1869    pengine_unregister_remote(Id).
 1870update_remote_destroy(_).
 1871
 1872destroy_event(destroy(_)).
 1873destroy_event(destroy(_,_)).
 1874destroy_event(create(_,Features)) :-
 1875    memberchk(answer(Answer), Features),
 1876    !,
 1877    nonvar(Answer),
 1878    destroy_event(Answer).
 1879
 1880
 1881/** pengine_event_loop(:Closure, +Options) is det
 1882
 1883Starts an event loop accepting event terms   sent to the current pengine
 1884or thread. For each such  event   E,  calls  ignore(call(Closure, E)). A
 1885closure thus acts as a _handler_  for   the  event. Some events are also
 1886treated specially:
 1887
 1888   * create(ID, Term)
 1889     The ID is placed in a list of active pengines.
 1890
 1891   * destroy(ID)
 1892     The ID is removed from the list of active pengines. When the last
 1893     pengine ID is removed, the loop terminates.
 1894
 1895   * output(ID, Term)
 1896     The predicate pengine_pull_response/2 is called.
 1897
 1898Valid options are:
 1899
 1900   * autoforward(+To)
 1901     Forwards received event terms to slaves. To is either =all=,
 1902     =all_but_sender= or a Prolog list of NameOrIDs. [not yet
 1903     implemented]
 1904
 1905*/
 1906
 1907pengine_event_loop(Closure, Options) :-
 1908    child(_,_),
 1909    !,
 1910    pengine_event(Event),
 1911    (   option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs
 1912    ->  forall(child(_,ID), pengine_send(ID, Event))
 1913    ;   true
 1914    ),
 1915    pengine_event_loop(Event, Closure, Options).
 1916pengine_event_loop(_, _).
 1917
 1918:- meta_predicate
 1919    pengine_process_event(+, 1, -, +). 1920
 1921pengine_event_loop(Event, Closure, Options) :-
 1922    pengine_process_event(Event, Closure, Continue, Options),
 1923    (   Continue == true
 1924    ->  pengine_event_loop(Closure, Options)
 1925    ;   true
 1926    ).
 1927
 1928pengine_process_event(create(ID, T), Closure, Continue, Options) :-
 1929    debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
 1930    (   select(answer(First), T, T1)
 1931    ->  ignore(call(Closure, create(ID, T1))),
 1932        pengine_process_event(First, Closure, Continue, Options)
 1933    ;   ignore(call(Closure, create(ID, T))),
 1934        Continue = true
 1935    ).
 1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
 1937    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
 1938    ignore(call(Closure, output(ID, Msg))),
 1939    pengine_pull_response(ID, []).
 1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
 1941    debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
 1942    ignore(call(Closure, debug(ID, Msg))),
 1943    pengine_pull_response(ID, []).
 1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
 1945    debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
 1946    ignore(call(Closure, prompt(ID, Term))).
 1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
 1948    debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
 1949    ignore(call(Closure, success(ID, Sol, More))).
 1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
 1951    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
 1952    ignore(call(Closure, failure(ID))).
 1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
 1954    debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
 1955    (   call(Closure, error(ID, Error))
 1956    ->  Continue = true
 1957    ;   forall(child(_,Child), pengine_destroy(Child)),
 1958        throw(Error)
 1959    ).
 1960pengine_process_event(stop(ID), Closure, true, _Options) :-
 1961    debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
 1962    ignore(call(Closure, stop(ID))).
 1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
 1964    pengine_process_event(Event, Closure, _, Options),
 1965    pengine_process_event(destroy(ID), Closure, Continue, Options).
 1966pengine_process_event(destroy(ID), Closure, true, _Options) :-
 1967    retractall(child(_,ID)),
 1968    debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
 1969    ignore(call(Closure, destroy(ID))).
 1970
 1971
 1972/** pengine_rpc(+URL, +Query) is nondet.
 1973    pengine_rpc(+URL, +Query, +Options) is nondet.
 1974
 1975Semantically equivalent to the sequence below,  except that the query is
 1976executed in (and in the Prolog context   of) the pengine server referred
 1977to by URL, rather than locally.
 1978
 1979  ==
 1980    copy_term_nat(Query, Copy),  % attributes are not copied to the server
 1981    call(Copy),			 % executed on server at URL
 1982    Query = Copy.
 1983  ==
 1984
 1985Valid options are:
 1986
 1987    - chunk(+IntegerOrFalse)
 1988      Can be used to reduce the number of network roundtrips being made.
 1989      See pengine_ask/3.
 1990    - timeout(+Time)
 1991      Wait at most Time seconds for the next event from the server.
 1992      The default is defined by the setting `pengines:time_limit`.
 1993
 1994Remaining  options  (except   the   server    option)   are   passed  to
 1995pengine_create/1.
 1996*/
 1997
 1998pengine_rpc(URL, Query) :-
 1999    pengine_rpc(URL, Query, []).
 2000
 2001pengine_rpc(URL, Query, M:Options0) :-
 2002    translate_local_sources(Options0, Options1, M),
 2003    (  option(timeout(_), Options1)
 2004    -> Options = Options1
 2005    ;  setting(time_limit, Limit),
 2006       Options = [timeout(Limit)|Options1]
 2007    ),
 2008    term_variables(Query, Vars),
 2009    Template =.. [v|Vars],
 2010    State = destroy(true),              % modified by process_event/4
 2011    setup_call_catcher_cleanup(
 2012        pengine_create([ ask(Query),
 2013                         template(Template),
 2014                         server(URL),
 2015                         id(Id)
 2016                       | Options
 2017                       ]),
 2018        wait_event(Template, State, [listen(Id)|Options]),
 2019        Why,
 2020        pengine_destroy_and_wait(State, Id, Why, Options)).
 2021
 2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
 2023    !,
 2024    debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
 2025    pengine_destroy(Id, Options),
 2026    wait_destroy(Id, 10).
 2027pengine_destroy_and_wait(_, _, Why, _) :-
 2028    debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
 2029
 2030wait_destroy(Id, _) :-
 2031    \+ child(_, Id),
 2032    !.
 2033wait_destroy(Id, N) :-
 2034    pengine_event(Event, [listen(Id),timeout(10)]),
 2035    !,
 2036    (   destroy_event(Event)
 2037    ->  retractall(child(_,Id))
 2038    ;   succ(N1, N)
 2039    ->  wait_destroy(Id, N1)
 2040    ;   debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
 2041        pengine_unregister_remote(Id),
 2042        retractall(child(_,Id))
 2043    ).
 2044
 2045wait_event(Template, State, Options) :-
 2046    pengine_event(Event, Options),
 2047    debug(pengine(event), 'Received ~p', [Event]),
 2048    process_event(Event, Template, State, Options).
 2049
 2050process_event(create(_ID, Features), Template, State, Options) :-
 2051    memberchk(answer(First), Features),
 2052    process_event(First, Template, State, Options).
 2053process_event(error(_ID, Error), _Template, _, _Options) :-
 2054    throw(Error).
 2055process_event(failure(_ID, _Time), _Template, _, _Options) :-
 2056    fail.
 2057process_event(prompt(ID, Prompt), Template, State, Options) :-
 2058    pengine_rpc_prompt(ID, Prompt, Reply),
 2059    pengine_send(ID, input(Reply)),
 2060    wait_event(Template, State, Options).
 2061process_event(output(ID, Term), Template, State, Options) :-
 2062    pengine_rpc_output(ID, Term),
 2063    pengine_pull_response(ID, Options),
 2064    wait_event(Template, State, Options).
 2065process_event(debug(ID, Message), Template, State, Options) :-
 2066    debug(pengine(debug), '~w', [Message]),
 2067    pengine_pull_response(ID, Options),
 2068    wait_event(Template, State, Options).
 2069process_event(success(_ID, Solutions, _Proj, _Time, false),
 2070              Template, _, _Options) :-
 2071    !,
 2072    member(Template, Solutions).
 2073process_event(success(ID, Solutions, _Proj, _Time, true),
 2074              Template, State, Options) :-
 2075    (   member(Template, Solutions)
 2076    ;   pengine_next(ID, Options),
 2077        wait_event(Template, State, Options)
 2078    ).
 2079process_event(destroy(ID, Event), Template, State, Options) :-
 2080    !,
 2081    retractall(child(_,ID)),
 2082    nb_setarg(1, State, false),
 2083    debug(pengine(destroy), 'State: ~p~n', [State]),
 2084    process_event(Event, Template, State, Options).
 2085% compatibility with older versions of the protocol.
 2086process_event(success(ID, Solutions, Time, More),
 2087              Template, State, Options) :-
 2088    process_event(success(ID, Solutions, _Proj, Time, More),
 2089                  Template, State, Options).
 2090
 2091
 2092pengine_rpc_prompt(ID, Prompt, Term) :-
 2093    prompt(ID, Prompt, Term0),
 2094    !,
 2095    Term = Term0.
 2096pengine_rpc_prompt(_ID, Prompt, Term) :-
 2097    setup_call_cleanup(
 2098        prompt(Old, Prompt),
 2099        read(Term),
 2100        prompt(_, Old)).
 2101
 2102pengine_rpc_output(ID, Term) :-
 2103    output(ID, Term),
 2104    !.
 2105pengine_rpc_output(_ID, Term) :-
 2106    print(Term).
 2107
 2108%%  prompt(+ID, +Prompt, -Term) is semidet.
 2109%
 2110%   Hook to handle pengine_input/2 from the remote pengine. If the hooks
 2111%   fails, pengine_rpc/3 calls read/1 using the current prompt.
 2112
 2113:- multifile prompt/3. 2114
 2115%%  output(+ID, +Term) is semidet.
 2116%
 2117%   Hook to handle pengine_output/1 from the remote pengine. If the hook
 2118%   fails, it calls print/1 on Term.
 2119
 2120:- multifile output/2. 2121
 2122
 2123/*================= HTTP handlers =======================
 2124*/
 2125
 2126%   Declare  HTTP  locations  we  serve  and   how.  Note  that  we  use
 2127%   time_limit(inifinite) because pengines have their  own timeout. Also
 2128%   note that we use spawn. This  is   needed  because we can easily get
 2129%   many clients waiting for  some  action   on  a  pengine to complete.
 2130%   Without spawning, we would quickly exhaust   the  worker pool of the
 2131%   HTTP server.
 2132%
 2133%   FIXME: probably we should wait for a   short time for the pengine on
 2134%   the default worker thread. Only if  that   time  has expired, we can
 2135%   call http_spawn/2 to continue waiting on   a  new thread. That would
 2136%   improve the performance and reduce the usage of threads.
 2137
 2138:- multifile http:location/3. 2139http:location(pengine, root(pengine), [-100]).
 2140
 2141:- http_handler(pengine(.),             http_404([]),
 2142                [ id(pengines) ]). 2143:- http_handler(pengine(create),        http_pengine_create,
 2144                [ time_limit(infinite), spawn([]) ]). 2145:- http_handler(pengine(send),          http_pengine_send,
 2146                [ time_limit(infinite), spawn([]) ]). 2147:- http_handler(pengine(pull_response), http_pengine_pull_response,
 2148                [ time_limit(infinite), spawn([]) ]). 2149:- http_handler(pengine(abort),         http_pengine_abort,         []). 2150:- http_handler(pengine(detach),        http_pengine_detach,        []). 2151:- http_handler(pengine(list),          http_pengine_list,          []). 2152:- http_handler(pengine(ping),          http_pengine_ping,          []). 2153:- http_handler(pengine(destroy_all),   http_pengine_destroy_all,   []). 2154
 2155:- http_handler(pengine('pengines.js'),
 2156                http_reply_file(library('http/web/js/pengines.js'), []), []). 2157:- http_handler(pengine('plterm.css'),
 2158                http_reply_file(library('http/web/css/plterm.css'), []), []). 2159
 2160
 2161%%  http_pengine_create(+Request)
 2162%
 2163%   HTTP POST handler  for  =/pengine/create=.   This  API  accepts  the
 2164%   pengine  creation  parameters  both  as  =application/json=  and  as
 2165%   =www-form-encoded=.  Accepted parameters:
 2166%
 2167%     | **Parameter** | **Default**       | **Comment**                |
 2168%     |---------------|-------------------|----------------------------|
 2169%     | format        | `prolog`          | Output format              |
 2170%     | application   | `pengine_sandbox` | Pengine application        |
 2171%     | chunk         | 1                 | Chunk-size for results     |
 2172%     | collate       | 0 (off)           | Join output events         |
 2173%     | solutions     | chunked           | If `all`, emit all results |
 2174%     | ask           | -                 | The query                  |
 2175%     | template      | -                 | Output template            |
 2176%     | src_text      | ""                | Program                    |
 2177%     | src_url       | -                 | Program to download        |
 2178%     | disposition   | -                 | Download location          |
 2179%
 2180%     Note that solutions=all internally  uses   chunking  to obtain the
 2181%     results from the pengine, but the results are combined in a single
 2182%     HTTP reply. This is currently only  implemented by the CSV backend
 2183%     that is part of SWISH for   downloading unbounded result sets with
 2184%     limited memory resources.
 2185%
 2186%     Using  `chunk=false`  simulates  the   _recursive  toplevel_.  See
 2187%     pengine_ask/3.
 2188
 2189http_pengine_create(Request) :-
 2190    reply_options(Request, [post]),
 2191    !.
 2192http_pengine_create(Request) :-
 2193    memberchk(content_type(CT), Request),
 2194    sub_atom(CT, 0, _, _, 'application/json'),
 2195    !,
 2196    http_read_json_dict(Request, Dict),
 2197    dict_atom_option(format, Dict, Format, prolog),
 2198    dict_atom_option(application, Dict, Application, pengine_sandbox),
 2199    http_pengine_create(Request, Application, Format, Dict).
 2200http_pengine_create(Request) :-
 2201    Optional = [optional(true)],
 2202    OptString = [string|Optional],
 2203    Form = [ format(Format, [default(prolog)]),
 2204             application(Application, [default(pengine_sandbox)]),
 2205             chunk(_, [nonneg;oneof([false]), default(1)]),
 2206             collate(_, [number, default(0)]),
 2207             solutions(_, [oneof([all,chunked]), default(chunked)]),
 2208             ask(_, OptString),
 2209             template(_, OptString),
 2210             src_text(_, OptString),
 2211             disposition(_, OptString),
 2212             src_url(_, Optional)
 2213           ],
 2214    http_parameters(Request, Form),
 2215    form_dict(Form, Dict),
 2216    http_pengine_create(Request, Application, Format, Dict).
 2217
 2218dict_atom_option(Key, Dict, Atom, Default) :-
 2219    (   get_dict(Key, Dict, String)
 2220    ->  atom_string(Atom, String)
 2221    ;   Atom = Default
 2222    ).
 2223
 2224form_dict(Form, Dict) :-
 2225    form_values(Form, Pairs),
 2226    dict_pairs(Dict, _, Pairs).
 2227
 2228form_values([], []).
 2229form_values([H|T], Pairs) :-
 2230    arg(1, H, Value),
 2231    nonvar(Value),
 2232    !,
 2233    functor(H, Name, _),
 2234    Pairs = [Name-Value|PairsT],
 2235    form_values(T, PairsT).
 2236form_values([_|T], Pairs) :-
 2237    form_values(T, Pairs).
 2238
 2239%!  http_pengine_create(+Request, +Application, +Format, +OptionsDict)
 2240
 2241
 2242http_pengine_create(Request, Application, Format, Dict) :-
 2243    current_application(Application),
 2244    !,
 2245    allowed(Request, Application),
 2246    authenticate(Request, Application, UserOptions),
 2247    dict_to_options(Dict, Application, CreateOptions0),
 2248    append(UserOptions, CreateOptions0, CreateOptions),
 2249    pengine_uuid(Pengine),
 2250    message_queue_create(Queue, [max_size(25)]),
 2251    setting(Application:time_limit, TimeLimit),
 2252    get_time(Now),
 2253    asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
 2254    broadcast(pengine(create(Pengine, Application, CreateOptions))),
 2255    create(Queue, Pengine, CreateOptions, http, Application),
 2256    create_wait_and_output_result(Pengine, Queue, Format,
 2257                                  TimeLimit, Dict),
 2258    gc_abandoned_queues.
 2259http_pengine_create(_Request, Application, Format, _Dict) :-
 2260    Error = existence_error(pengine_application, Application),
 2261    pengine_uuid(ID),
 2262    output_result(ID, Format, error(ID, error(Error, _))).
 2263
 2264
 2265dict_to_options(Dict, Application, CreateOptions) :-
 2266    dict_pairs(Dict, _, Pairs),
 2267    pairs_create_options(Pairs, Application, CreateOptions).
 2268
 2269pairs_create_options([], _, []) :- !.
 2270pairs_create_options([N-V0|T0], App, [Opt|T]) :-
 2271    Opt =.. [N,V],
 2272    pengine_create_option(Opt), N \== user,
 2273    !,
 2274    (   create_option_type(Opt, atom)
 2275    ->  atom_string(V, V0)               % term creation must be done if
 2276    ;   V = V0                           % we created the source and know
 2277    ),                                   % the operators.
 2278    pairs_create_options(T0, App, T).
 2279pairs_create_options([_|T0], App, T) :-
 2280    pairs_create_options(T0, App, T).
 2281
 2282%!  wait_and_output_result(+Pengine, +Queue,
 2283%!                         +Format, +TimeLimit, +Collate) is det.
 2284%
 2285%   Wait for the Pengine's Queue and if  there is a message, send it
 2286%   to the requester using  output_result/1.   If  Pengine  does not
 2287%   answer within the time specified   by  the setting =time_limit=,
 2288%   Pengine is aborted and the  result is error(time_limit_exceeded,
 2289%   _).
 2290
 2291wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
 2292    Collate is min(Collate0, TimeLimit/10),
 2293    get_time(Epoch),
 2294    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2295                                 [ timeout(TimeLimit)
 2296                                 ]),
 2297              Error, true)
 2298    ->  (   var(Error)
 2299        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2300            (   collating_event(Collate, Event)
 2301            ->  Deadline is Epoch+TimeLimit,
 2302                collect_events(Pengine, Collate, Queue, Deadline, 100, More),
 2303                Events = [Event|More],
 2304                ignore(destroy_queue_from_http(Pengine, Events, Queue)),
 2305                protect_pengine(Pengine, output_result(Pengine, Format, Events))
 2306            ;   ignore(destroy_queue_from_http(Pengine, Event, Queue)),
 2307                protect_pengine(Pengine, output_result(Pengine, Format, Event))
 2308            )
 2309        ;   output_result(Pengine, Format, died(Pengine))
 2310        )
 2311    ;   time_limit_exceeded(Pengine, Format)
 2312    ).
 2313
 2314%!  collect_events(+Pengine, +CollateTime, +Queue, +Deadline, +Max, -Events)
 2315%
 2316%   Collect more events as long as they   are not separated by more than
 2317%   CollateTime seconds and collect at most Max.
 2318
 2319collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
 2320    !.
 2321collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
 2322    debug(pengine(wait), 'Waiting to collate events', []),
 2323    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2324                                 [ timeout(Collate)
 2325                                 ]),
 2326              Error, true)
 2327    ->  (   var(Error)
 2328        ->  debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
 2329            Events = [Event|More],
 2330            (   collating_event(Collate, Event)
 2331            ->  Max2 is Max - 1,
 2332                collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
 2333            ;   More = []
 2334            )
 2335        ;   Events = [died(Pengine)]
 2336        )
 2337    ;   get_time(Now),
 2338        Now > Deadline
 2339    ->  time_limit_event(Pengine, TimeLimitEvent),
 2340        Events = [TimeLimitEvent]
 2341    ;   Events = []
 2342    ).
 2343
 2344collating_event(0, _) :-
 2345    !,
 2346    fail.
 2347collating_event(_, output(_,_)).
 2348
 2349%!  create_wait_and_output_result(+Pengine, +Queue, +Format,
 2350%!                                +TimeLimit, +Dict) is det.
 2351%
 2352%   Intercepts  the  `solutions=all'  case    used  for  downloading
 2353%   results. Dict may contain a  `disposition`   key  to  denote the
 2354%   download location.
 2355
 2356create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2357    get_dict(solutions, Dict, all),
 2358    !,
 2359    between(1, infinite, Page),
 2360    (   catch(thread_get_message(Queue, pengine_event(_, Event),
 2361                                 [ timeout(TimeLimit)
 2362                                 ]),
 2363              Error, true)
 2364    ->  (   var(Error)
 2365        ->  debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
 2366            (   destroy_queue_from_http(Pengine, Event, Queue)
 2367            ->  !,
 2368                protect_pengine(Pengine,
 2369                                output_result_2(Format, page(Page, Event), Dict))
 2370            ;   is_more_event(Event)
 2371            ->  pengine_thread(Pengine, Thread),
 2372                thread_send_message(Thread, pengine_request(next)),
 2373                protect_pengine(Pengine,
 2374                                output_result_2(Format, page(Page, Event), Dict)),
 2375                fail
 2376            ;   !,
 2377                protect_pengine(Pengine,
 2378                                output_result_2(Format, page(Page, Event), Dict))
 2379            )
 2380        ;   !, output_result(Pengine, Format, died(Pengine))
 2381        )
 2382    ;   !, time_limit_exceeded(Pengine, Format)
 2383    ),
 2384    !.
 2385create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
 2386    wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
 2387
 2388is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
 2389is_more_event(create(_, Options)) :-
 2390    memberchk(answer(Event), Options),
 2391    is_more_event(Event).
 2392
 2393
 2394
 2395%!  time_limit_exceeded(+Pengine, +Format)
 2396%
 2397%   The Pengine did not reply within its time limit. Send a reply to the
 2398%   client in the requested format and interrupt the Pengine.
 2399%
 2400%   @bug Ideally, if the Pengine has `destroy` set to `false`, we should
 2401%   get the Pengine back to its main   loop.  Unfortunately we only have
 2402%   normal exceptions that may be  caught   by  the  Pengine and `abort`
 2403%   which cannot be caught and thus destroys the Pengine.
 2404
 2405time_limit_exceeded(Pengine, Format) :-
 2406    time_limit_event(Pengine, Event),
 2407    call_cleanup(
 2408        pengine_destroy(Pengine, [force(true)]),
 2409        output_result(Pengine, Format, Event)).
 2410
 2411time_limit_event(Pengine,
 2412                 destroy(Pengine, error(Pengine, time_limit_exceeded))).
 2413
 2414destroy_pengine_after_output(Pengine, Events) :-
 2415    is_list(Events),
 2416    last(Events, Last),
 2417    time_limit_event(Pengine,  Last),
 2418    !,
 2419    catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
 2420destroy_pengine_after_output(_, _).
 2421
 2422
 2423%!  destroy_queue_from_http(+Pengine, +Event, +Queue) is semidet.
 2424%
 2425%   Consider destroying the output queue   for Pengine after sending
 2426%   Event back to the HTTP client. We can destroy the queue if
 2427%
 2428%     - The pengine already died (output_queue/3 is present) and
 2429%       the queue is empty.
 2430%     - This is a final (destroy) event.
 2431%
 2432%   @tbd    If the client did not request all output, the queue will
 2433%           not be destroyed.  We need some timeout and GC for that.
 2434
 2435destroy_queue_from_http(ID, _, Queue) :-
 2436    output_queue(ID, Queue, _),
 2437    !,
 2438    destroy_queue_if_empty(Queue).
 2439destroy_queue_from_http(ID, Event, Queue) :-
 2440    debug(pengine(destroy), 'DESTROY? ~p', [Event]),
 2441    is_destroy_event(Event),
 2442    !,
 2443    message_queue_property(Queue, size(Waiting)),
 2444    debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
 2445    with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
 2446
 2447is_destroy_event(destroy(_)).
 2448is_destroy_event(destroy(_,_)).
 2449is_destroy_event(create(_, Options)) :-
 2450    memberchk(answer(Event), Options),
 2451    is_destroy_event(Event).
 2452
 2453destroy_queue_if_empty(Queue) :-
 2454    thread_peek_message(Queue, _),
 2455    !.
 2456destroy_queue_if_empty(Queue) :-
 2457    retractall(output_queue(_, Queue, _)),
 2458    message_queue_destroy(Queue).
 2459
 2460%!  gc_abandoned_queues
 2461%
 2462%   Check whether there are queues  that   have  been abadoned. This
 2463%   happens if the stream contains output events and not all of them
 2464%   are read by the client.
 2465
 2466:- dynamic
 2467    last_gc/1. 2468
 2469gc_abandoned_queues :-
 2470    consider_queue_gc,
 2471    !,
 2472    get_time(Now),
 2473    (   output_queue(_, Queue, Time),
 2474        Now-Time > 15*60,
 2475        retract(output_queue(_, Queue, Time)),
 2476        message_queue_destroy(Queue),
 2477        fail
 2478    ;   retractall(last_gc(_)),
 2479        asserta(last_gc(Now))
 2480    ).
 2481gc_abandoned_queues.
 2482
 2483consider_queue_gc :-
 2484    predicate_property(output_queue(_,_,_), number_of_clauses(N)),
 2485    N > 100,
 2486    (   last_gc(Time),
 2487        get_time(Now),
 2488        Now-Time > 5*60
 2489    ->  true
 2490    ;   \+ last_gc(_)
 2491    ).
 2492
 2493%!  sync_destroy_queue_from_http(+Pengine, +Queue) is det.
 2494%!  sync_delay_destroy_queue(+Pengine, +Queue) is det.
 2495%
 2496%   Handle destruction of the message queue connecting the HTTP side
 2497%   to the pengine. We cannot delete the queue when the pengine dies
 2498%   because the queue may contain output  events. Termination of the
 2499%   pengine and finishing the  HTTP  exchange   may  happen  in both
 2500%   orders. This means we need handle this using synchronization.
 2501%
 2502%     * sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2503%     Called (indirectly) from pengine_done/1 if the pengine's
 2504%     thread dies.
 2505%     * sync_destroy_queue_from_http(+Pengine, +Queue)
 2506%     Called from destroy_queue/3, from wait_and_output_result/5,
 2507%     i.e., from the HTTP side.
 2508
 2509:- dynamic output_queue_destroyed/1. 2510
 2511sync_destroy_queue_from_http(ID, Queue) :-
 2512    (   output_queue(ID, Queue, _)
 2513    ->  destroy_queue_if_empty(Queue)
 2514    ;   thread_peek_message(Queue, pengine_event(_, output(_,_)))
 2515    ->  debug(pengine(destroy), 'Delay destruction of ~p because of output',
 2516              [Queue]),
 2517        get_time(Now),
 2518        asserta(output_queue(ID, Queue, Now))
 2519    ;   message_queue_destroy(Queue),
 2520        asserta(output_queue_destroyed(Queue))
 2521    ).
 2522
 2523%!  sync_destroy_queue_from_pengine(+Pengine, +Queue)
 2524%
 2525%   Called  from  pengine_unregister/1  when    the  pengine  thread
 2526%   terminates. It is called while the mutex `pengine` held.
 2527
 2528sync_destroy_queue_from_pengine(ID, Queue) :-
 2529    (   retract(output_queue_destroyed(Queue))
 2530    ->  true
 2531    ;   get_time(Now),
 2532        asserta(output_queue(ID, Queue, Now))
 2533    ),
 2534    retractall(pengine_queue(ID, Queue, _, _)).
 2535
 2536
 2537http_pengine_send(Request) :-
 2538    reply_options(Request, [get,post]),
 2539    !.
 2540http_pengine_send(Request) :-
 2541    http_parameters(Request,
 2542                    [ id(ID, [ type(atom) ]),
 2543                      event(EventString, [optional(true)]),
 2544                      collate(Collate, [number, default(0)]),
 2545                      format(Format, [default(prolog)])
 2546                    ]),
 2547    catch(read_event(ID, Request, Format, EventString, Event),
 2548          Error,
 2549          true),
 2550    (   var(Error)
 2551    ->  debug(pengine(event), 'HTTP send: ~p', [Event]),
 2552        (   pengine_thread(ID, Thread)
 2553        ->  pengine_queue(ID, Queue, TimeLimit, _),
 2554            random_delay,
 2555            broadcast(pengine(send(ID, Event))),
 2556            thread_send_message(Thread, pengine_request(Event)),
 2557            wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2558        ;   atom(ID)
 2559        ->  pengine_died(Format, ID)
 2560        ;   http_404([], Request)
 2561        )
 2562    ;   Error = error(existence_error(pengine, ID), _)
 2563    ->  pengine_died(Format, ID)
 2564    ;   output_result(ID, Format, error(ID, Error))
 2565    ).
 2566
 2567pengine_died(Format, Pengine) :-
 2568    output_result(Pengine, Format,
 2569                  error(Pengine, error(existence_error(pengine, Pengine),_))).
 2570
 2571
 2572%!  read_event(+Pengine, +Request, +Format, +EventString, -Event) is det
 2573%
 2574%   Read an event on behalve of Pengine.  Note that the pengine's module
 2575%   should not be  deleted  while  we   are  reading  using  its  syntax
 2576%   (module). This is ensured using the `pengine_done` mutex.
 2577%
 2578%   @see pengine_done/0.
 2579
 2580read_event(Pengine, Request, Format, EventString, Event) :-
 2581    protect_pengine(
 2582        Pengine,
 2583        ( get_pengine_module(Pengine, Module),
 2584          read_event_2(Request, EventString, Module, Event0, Bindings)
 2585        )),
 2586    !,
 2587    fix_bindings(Format, Event0, Bindings, Event).
 2588read_event(Pengine, Request, _Format, _EventString, _Event) :-
 2589    debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
 2590    discard_post_data(Request),
 2591    existence_error(pengine, Pengine).
 2592
 2593
 2594%%  read_event_(+Request, +EventString, +Module, -Event, -Bindings)
 2595%
 2596%   Read the sent event. The event is a   Prolog  term that is either in
 2597%   the =event= parameter or as a posted document.
 2598
 2599read_event_2(_Request, EventString, Module, Event, Bindings) :-
 2600    nonvar(EventString),
 2601    !,
 2602    term_string(Event, EventString,
 2603                [ variable_names(Bindings),
 2604                  module(Module)
 2605                ]).
 2606read_event_2(Request, _EventString, Module, Event, Bindings) :-
 2607    option(method(post), Request),
 2608    http_read_data(Request,     Event,
 2609                   [ content_type('application/x-prolog'),
 2610                     module(Module),
 2611                     variable_names(Bindings)
 2612                   ]).
 2613
 2614%%  discard_post_data(+Request) is det.
 2615%
 2616%   If this is a POST request, discard the posted data.
 2617
 2618discard_post_data(Request) :-
 2619    option(method(post), Request),
 2620    !,
 2621    setup_call_cleanup(
 2622        open_null_stream(NULL),
 2623        http_read_data(Request, _, [to(stream(NULL))]),
 2624        close(NULL)).
 2625discard_post_data(_).
 2626
 2627%!  fix_bindings(+Format, +EventIn, +Bindings, -Event) is det.
 2628%
 2629%   Generate the template for json(-s) Format  from the variables in
 2630%   the asked Goal. Variables starting  with an underscore, followed
 2631%   by an capital letter are ignored from the template.
 2632
 2633fix_bindings(Format,
 2634             ask(Goal, Options0), Bindings,
 2635             ask(Goal, NewOptions)) :-
 2636    json_lang(Format),
 2637    !,
 2638    exclude(anon, Bindings, NamedBindings),
 2639    template(NamedBindings, Template, Options0, Options1),
 2640    select_option(chunk(Paging), Options1, Options2, 1),
 2641    NewOptions = [ template(Template),
 2642                   chunk(Paging),
 2643                   bindings(NamedBindings)
 2644                 | Options2
 2645                 ].
 2646fix_bindings(_, Command, _, Command).
 2647
 2648template(_, Template, Options0, Options) :-
 2649    select_option(template(Template), Options0, Options),
 2650    !.
 2651template(Bindings, Template, Options, Options) :-
 2652    dict_create(Template, swish_default_template, Bindings).
 2653
 2654anon(Name=_) :-
 2655    sub_atom(Name, 0, _, _, '_'),
 2656    sub_atom(Name, 1, 1, _, Next),
 2657    char_type(Next, prolog_var_start).
 2658
 2659var_name(Name=_, Name).
 2660
 2661
 2662%!  json_lang(+Format) is semidet.
 2663%
 2664%   True if Format is a JSON variation.
 2665
 2666json_lang(json) :- !.
 2667json_lang(Format) :-
 2668    sub_atom(Format, 0, _, _, 'json-').
 2669
 2670%!  http_pengine_pull_response(+Request)
 2671%
 2672%   HTTP handler for /pengine/pull_response.  Pulls possible pending
 2673%   messages from the pengine.
 2674
 2675http_pengine_pull_response(Request) :-
 2676    reply_options(Request, [get]),
 2677    !.
 2678http_pengine_pull_response(Request) :-
 2679    http_parameters(Request,
 2680            [   id(ID, []),
 2681                format(Format, [default(prolog)]),
 2682                collate(Collate, [number, default(0)])
 2683            ]),
 2684    reattach(ID),
 2685    (   (   pengine_queue(ID, Queue, TimeLimit, _)
 2686        ->  true
 2687        ;   output_queue(ID, Queue, _),
 2688            TimeLimit = 0
 2689        )
 2690    ->  wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
 2691    ;   http_404([], Request)
 2692    ).
 2693
 2694%!  http_pengine_abort(+Request)
 2695%
 2696%   HTTP handler for /pengine/abort. Note that  abort may be sent at
 2697%   any time and the reply may  be   handled  by a pull_response. In
 2698%   that case, our  pengine  has  already   died  before  we  get to
 2699%   wait_and_output_result/5.
 2700
 2701http_pengine_abort(Request) :-
 2702    reply_options(Request, [get,post]),
 2703    !.
 2704http_pengine_abort(Request) :-
 2705    http_parameters(Request,
 2706            [   id(ID, [])
 2707            ]),
 2708    (   pengine_thread(ID, _Thread)
 2709    ->  broadcast(pengine(abort(ID))),
 2710        abort_pending_output(ID),
 2711        pengine_abort(ID),
 2712        reply_json_dict(true)
 2713    ;   http_404([], Request)
 2714    ).
 2715
 2716%!  http_pengine_detach(+Request)
 2717%
 2718%   Detach a Pengine while keeping it running.  This has the following
 2719%   consequences:
 2720%
 2721%     - `/destroy_all` including the id of this pengine is ignored.
 2722%     - Output from the pengine is stored in the queue without
 2723%       waiting for the queue to drain.
 2724%     - The Pengine becomes available through `/list`
 2725
 2726http_pengine_detach(Request) :-
 2727    reply_options(Request, [post]),
 2728    !.
 2729http_pengine_detach(Request) :-
 2730    http_parameters(Request,
 2731                    [ id(ID, [])
 2732                    ]),
 2733    http_read_json_dict(Request, ClientData),
 2734    (   pengine_property(ID, application(Application)),
 2735        allowed(Request, Application),
 2736        authenticate(Request, Application, _UserOptions)
 2737    ->  broadcast(pengine(detach(ID))),
 2738        get_time(Now),
 2739        assertz(pengine_detached(ID, ClientData.put(time, Now))),
 2740        pengine_queue(ID, Queue, _TimeLimit, _Now),
 2741        message_queue_set(Queue, max_size(1000)),
 2742        pengine_reply(Queue, detached(ID)),
 2743        reply_json_dict(true)
 2744    ;   http_404([], Request)
 2745    ).
 2746
 2747reattach(ID) :-
 2748    (   retract(pengine_detached(ID, _Data)),
 2749        pengine_queue(ID, Queue, _TimeLimit, _Now)
 2750    ->  message_queue_set(Queue, max_size(25))
 2751    ;   true
 2752    ).
 2753
 2754
 2755%!  http_pengine_destroy_all(+Request)
 2756%
 2757%   Destroy a list of pengines. Normally   called  by pengines.js if the
 2758%   browser window is closed.
 2759
 2760http_pengine_destroy_all(Request) :-
 2761    reply_options(Request, [get,post]),
 2762    !.
 2763http_pengine_destroy_all(Request) :-
 2764    http_parameters(Request,
 2765                    [ ids(IDsAtom, [])
 2766                    ]),
 2767    atomic_list_concat(IDs, ',', IDsAtom),
 2768    forall(( member(ID, IDs),
 2769             \+ pengine_detached(ID, _)
 2770           ),
 2771           pengine_destroy(ID, [force(true)])),
 2772    reply_json_dict("ok").
 2773
 2774%!  http_pengine_ping(+Request)
 2775%
 2776%   HTTP handler for /pengine/ping.  If   the  requested  Pengine is
 2777%   alive and event status(Pengine, Stats) is created, where `Stats`
 2778%   is the return of thread_statistics/2.
 2779
 2780http_pengine_ping(Request) :-
 2781    reply_options(Request, [get]),
 2782    !.
 2783http_pengine_ping(Request) :-
 2784    http_parameters(Request,
 2785                    [ id(Pengine, []),
 2786                      format(Format, [default(prolog)])
 2787                    ]),
 2788    (   pengine_thread(Pengine, Thread),
 2789        Error = error(_,_),
 2790        catch(thread_statistics(Thread, Stats), Error, fail)
 2791    ->  output_result(Pengine, Format, ping(Pengine, Stats))
 2792    ;   output_result(Pengine, Format, died(Pengine))
 2793    ).
 2794
 2795%!  http_pengine_list(+Request)
 2796%
 2797%   HTTP  handler  for  `/pengine/list`,   providing  information  about
 2798%   running Pengines.
 2799%
 2800%   @tbd Only list detached Pengines associated to the logged in user.
 2801
 2802http_pengine_list(Request) :-
 2803    reply_options(Request, [get]),
 2804    !.
 2805http_pengine_list(Request) :-
 2806    http_parameters(Request,
 2807                    [ status(Status, [default(detached), oneof([detached])]),
 2808                      application(Application, [default(pengine_sandbox)])
 2809                    ]),
 2810    allowed(Request, Application),
 2811    authenticate(Request, Application, _UserOptions),
 2812    findall(Term, listed_pengine(Application, Status, Term), Terms),
 2813    reply_json_dict(json{pengines: Terms}).
 2814
 2815listed_pengine(Application, detached, State) :-
 2816    State = pengine{id:Id,
 2817                    detached:Time,
 2818                    queued:Queued,
 2819                    stats:Stats},
 2820
 2821    pengine_property(Id, application(Application)),
 2822    pengine_property(Id, detached(Time)),
 2823    pengine_queue(Id, Queue, _TimeLimit, _Now),
 2824    message_queue_property(Queue, size(Queued)),
 2825    (   pengine_thread(Id, Thread),
 2826        catch(thread_statistics(Thread, Stats), _, fail)
 2827    ->  true
 2828    ;   Stats = thread{status:died}
 2829    ).
 2830
 2831
 2832%!  output_result(+Pengine, +Format, +EventTerm) is det.
 2833%!  output_result(+Pengine, +Format, +EventTerm, +OptionsDict) is det.
 2834%
 2835%   Formulate an HTTP response from a pengine event term. Format is
 2836%   one of =prolog=, =json= or =json-s=.
 2837%
 2838%   @arg EventTerm is either a single event or a list of events.
 2839
 2840:- dynamic
 2841    pengine_replying/2.             % +Pengine, +Thread
 2842
 2843output_result(Pengine, Format, Event) :-
 2844    thread_self(Thread),
 2845    cors_enable,            % contingent on http:cors setting
 2846    disable_client_cache,
 2847    setup_call_cleanup(
 2848        asserta(pengine_replying(Pengine, Thread), Ref),
 2849        catch(output_result_2(Format, Event, _{}),
 2850              pengine_abort_output,
 2851              true),
 2852        erase(Ref)),
 2853    destroy_pengine_after_output(Pengine, Event).
 2854
 2855output_result_2(Lang, Event, Dict) :-
 2856    write_result(Lang, Event, Dict),
 2857    !.
 2858output_result_2(prolog, Event, _) :-
 2859    !,
 2860    format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
 2861    write_term(Event,
 2862               [ quoted(true),
 2863                 ignore_ops(true),
 2864                 fullstop(true),
 2865                 blobs(portray),
 2866                 portray_goal(portray_blob),
 2867                 nl(true)
 2868               ]).
 2869output_result_2(Lang, Event, _) :-
 2870    json_lang(Lang),
 2871    !,
 2872    (   event_term_to_json_data(Event, JSON, Lang)
 2873    ->  reply_json_dict(JSON)
 2874    ;   assertion(event_term_to_json_data(Event, _, Lang))
 2875    ).
 2876output_result_2(Lang, _Event, _) :-    % FIXME: allow for non-JSON format
 2877    domain_error(pengine_format, Lang).
 2878
 2879%!  portray_blob(+Blob, +Options) is det.
 2880%
 2881%   Portray non-text blobs that may  appear   in  output  terms. Not
 2882%   really sure about that. Basically such  terms need to be avoided
 2883%   as they are meaningless outside the process. The generated error
 2884%   is hard to debug though, so now we send them as `'$BLOB'(Type)`.
 2885%   Future versions may include more info, depending on `Type`.
 2886
 2887:- public portray_blob/2.               % called from write-term
 2888portray_blob(Blob, _Options) :-
 2889    blob(Blob, Type),
 2890    writeq('$BLOB'(Type)).
 2891
 2892%!  abort_pending_output(+Pengine) is det.
 2893%
 2894%   If we get an abort, it is possible that output is being produced
 2895%   for the client.  This predicate aborts these threads.
 2896
 2897abort_pending_output(Pengine) :-
 2898    forall(pengine_replying(Pengine, Thread),
 2899           abort_output_thread(Thread)).
 2900
 2901abort_output_thread(Thread) :-
 2902    catch(thread_signal(Thread, throw(pengine_abort_output)),
 2903          error(existence_error(thread, _), _),
 2904          true).
 2905
 2906%!  write_result(+Lang, +Event, +Dict) is semidet.
 2907%
 2908%   Hook that allows for different output formats. The core Pengines
 2909%   library supports `prolog` and various   JSON  dialects. The hook
 2910%   event_to_json/3 can be used to refine   the  JSON dialects. This
 2911%   hook must be used if  a   completely  different output format is
 2912%   desired.
 2913
 2914%!  disable_client_cache
 2915%
 2916%   Make sure the client will not cache our page.
 2917%
 2918%   @see http://stackoverflow.com/questions/49547/making-sure-a-web-page-is-not-cached-across-all-browsers
 2919
 2920disable_client_cache :-
 2921    format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
 2922            Pragma: no-cache\r\n\c
 2923            Expires: 0\r\n').
 2924
 2925event_term_to_json_data(Events, JSON, Lang) :-
 2926    is_list(Events),
 2927    !,
 2928    events_to_json_data(Events, JSON, Lang).
 2929event_term_to_json_data(Event, JSON, Lang) :-
 2930    event_to_json(Event, JSON, Lang),
 2931    !.
 2932event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
 2933                        json{event:success, id:ID, time:Time,
 2934                             data:Bindings, more:More, projection:Projection},
 2935                        json) :-
 2936    !,
 2937    term_to_json(Bindings0, Bindings).
 2938event_term_to_json_data(destroy(ID, Event),
 2939                        json{event:destroy, id:ID, data:JSON},
 2940                        Style) :-
 2941    !,
 2942    event_term_to_json_data(Event, JSON, Style).
 2943event_term_to_json_data(create(ID, Features0), JSON, Style) :-
 2944    !,
 2945    (   select(answer(First0), Features0, Features1)
 2946    ->  event_term_to_json_data(First0, First, Style),
 2947        Features = [answer(First)|Features1]
 2948    ;   Features = Features0
 2949    ),
 2950    dict_create(JSON, json, [event(create), id(ID)|Features]).
 2951event_term_to_json_data(destroy(ID, Event),
 2952                        json{event:destroy, id:ID, data:JSON}, Style) :-
 2953    !,
 2954    event_term_to_json_data(Event, JSON, Style).
 2955event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
 2956    !,
 2957    Error0 = json{event:error, id:ID, data:Message},
 2958    add_error_details(ErrorTerm, Error0, Error),
 2959    message_to_string(ErrorTerm, Message).
 2960event_term_to_json_data(failure(ID, Time),
 2961                        json{event:failure, id:ID, time:Time}, _) :-
 2962    !.
 2963event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
 2964    functor(EventTerm, F, 1),
 2965    !,
 2966    arg(1, EventTerm, ID).
 2967event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
 2968    functor(EventTerm, F, 2),
 2969    arg(1, EventTerm, ID),
 2970    arg(2, EventTerm, Data),
 2971    term_to_json(Data, JSON).
 2972
 2973events_to_json_data([], [], _).
 2974events_to_json_data([E|T0], [J|T], Lang) :-
 2975    event_term_to_json_data(E, J, Lang),
 2976    events_to_json_data(T0, T, Lang).
 2977
 2978:- public add_error_details/3. 2979
 2980%%  add_error_details(+Error, +JSON0, -JSON)
 2981%
 2982%   Add format error code and  location   information  to an error. Also
 2983%   used by pengines_io.pl.
 2984
 2985add_error_details(Error, JSON0, JSON) :-
 2986    add_error_code(Error, JSON0, JSON1),
 2987    add_error_location(Error, JSON1, JSON).
 2988
 2989%%  add_error_code(+Error, +JSON0, -JSON) is det.
 2990%
 2991%   Add a =code= field to JSON0 of Error is an ISO error term. The error
 2992%   code is the functor name of  the   formal  part  of the error, e.g.,
 2993%   =syntax_error=,  =type_error=,  etc.   Some    errors   carry   more
 2994%   information:
 2995%
 2996%     - existence_error(Type, Obj)
 2997%     {arg1:Type, arg2:Obj}, where Obj is stringified of it is not
 2998%     atomic.
 2999
 3000add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
 3001    atom(Type),
 3002    !,
 3003    to_atomic(Obj, Value),
 3004    Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
 3005add_error_code(error(Formal, _), Error0, Error) :-
 3006    callable(Formal),
 3007    !,
 3008    functor(Formal, Code, _),
 3009    Error = Error0.put(code, Code).
 3010add_error_code(_, Error, Error).
 3011
 3012% What to do with large integers?
 3013to_atomic(Obj, Atomic) :- atom(Obj),   !, Atomic = Obj.
 3014to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
 3015to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
 3016to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
 3017
 3018
 3019%%  add_error_location(+Error, +JSON0, -JSON) is det.
 3020%
 3021%   Add a =location= property if the  error   can  be  associated with a
 3022%   source location. The location is an   object  with properties =file=
 3023%   and =line= and, if available, the character location in the line.
 3024
 3025add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
 3026    atom(Path), integer(Line),
 3027    !,
 3028    Term = Term0.put(_{location:_{file:Path, line:Line}}).
 3029add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
 3030    atom(Path), integer(Line), integer(Ch),
 3031    !,
 3032    Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
 3033add_error_location(_, Term, Term).
 3034
 3035
 3036%!  event_to_json(+Event, -JSONTerm, +Lang) is semidet.
 3037%
 3038%   Hook that translates a Pengine event  structure into a term suitable
 3039%   for reply_json_dict/1, according to the language specification Lang.
 3040%   This can be used to massage general Prolog terms, notably associated
 3041%   with `success(ID, Bindings, Projection, Time, More)` and `output(ID,
 3042%   Term)` into a format suitable for processing at the client side.
 3043
 3044%:- multifile pengines:event_to_json/3.
 3045
 3046
 3047                 /*******************************
 3048                 *        ACCESS CONTROL        *
 3049                 *******************************/
 3050
 3051%!  allowed(+Request, +Application) is det.
 3052%
 3053%   Check whether the peer is allowed to connect.  Returns a
 3054%   =forbidden= header if contact is not allowed.
 3055
 3056allowed(Request, Application) :-
 3057    setting(Application:allow_from, Allow),
 3058    match_peer(Request, Allow),
 3059    setting(Application:deny_from, Deny),
 3060    \+ match_peer(Request, Deny),
 3061    !.
 3062allowed(Request, _Application) :-
 3063    memberchk(request_uri(Here), Request),
 3064    throw(http_reply(forbidden(Here))).
 3065
 3066match_peer(_, Allowed) :-
 3067    memberchk(*, Allowed),
 3068    !.
 3069match_peer(_, []) :- !, fail.
 3070match_peer(Request, Allowed) :-
 3071    http_peer(Request, Peer),
 3072    debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
 3073    (   memberchk(Peer, Allowed)
 3074    ->  true
 3075    ;   member(Pattern, Allowed),
 3076        match_peer_pattern(Pattern, Peer)
 3077    ).
 3078
 3079match_peer_pattern(Pattern, Peer) :-
 3080    ip_term(Pattern, IP),
 3081    ip_term(Peer, IP),
 3082    !.
 3083
 3084ip_term(Peer, Pattern) :-
 3085    split_string(Peer, ".", "", PartStrings),
 3086    ip_pattern(PartStrings, Pattern).
 3087
 3088ip_pattern([], []).
 3089ip_pattern([*], _) :- !.
 3090ip_pattern([S|T0], [N|T]) :-
 3091    number_string(N, S),
 3092    ip_pattern(T0, T).
 3093
 3094
 3095%%  authenticate(+Request, +Application, -UserOptions:list) is det.
 3096%
 3097%   Call authentication_hook/3, returning either `[user(User)]`, `[]` or
 3098%   an exception.
 3099
 3100authenticate(Request, Application, UserOptions) :-
 3101    authentication_hook(Request, Application, User),
 3102    !,
 3103    must_be(ground, User),
 3104    UserOptions = [user(User)].
 3105authenticate(_, _, []).
 3106
 3107%%  authentication_hook(+Request, +Application, -User) is semidet.
 3108%
 3109%   This hook is called  from  the   =/pengine/create=  HTTP  handler to
 3110%   discover whether the server is accessed   by  an authorized user. It
 3111%   can react in three ways:
 3112%
 3113%     - Succeed, binding User to a ground term.  The authentity of the
 3114%       user is available through pengine_user/1.
 3115%     - Fail.  The =/create= succeeds, but the pengine is not associated
 3116%       with a user.
 3117%     - Throw an exception to prevent creation of the pengine.  Two
 3118%       meaningful exceptions are:
 3119%         - throw(http_reply(authorise(basic(Realm))))
 3120%         Start a normal HTTP login challenge (reply 401)
 3121%         - throw(http_reply(forbidden(Path))))
 3122%         Reject the request using a 403 repply.
 3123%
 3124%   @see http_authenticate/3 can be used to implement this hook using
 3125%        default HTTP authentication data.
 3126
 3127pengine_register_user(Options) :-
 3128    option(user(User), Options),
 3129    !,
 3130    pengine_self(Me),
 3131    asserta(pengine_user(Me, User)).
 3132pengine_register_user(_).
 3133
 3134
 3135%%  pengine_user(-User) is semidet.
 3136%
 3137%   True when the pengine was create by  an HTTP request that authorized
 3138%   User.
 3139%
 3140%   @see authentication_hook/3 can be used to extract authorization from
 3141%        the HTTP header.
 3142
 3143pengine_user(User) :-
 3144    pengine_self(Me),
 3145    pengine_user(Me, User).
 3146
 3147%!  reply_options(+Request, +Methods) is semidet.
 3148%
 3149%   Reply the HTTP OPTIONS request
 3150
 3151reply_options(Request, Allowed) :-
 3152    option(method(options), Request),
 3153    !,
 3154    cors_enable(Request,
 3155                [ methods(Allowed)
 3156                ]),
 3157    format('Content-type: text/plain\r\n'),
 3158    format('~n').                   % empty body
 3159
 3160
 3161                 /*******************************
 3162                 *        COMPILE SOURCE        *
 3163                 *******************************/
 3164
 3165/** pengine_src_text(+SrcText, +Module) is det
 3166
 3167Asserts the clauses defined in SrcText in   the  private database of the
 3168current Pengine. This  predicate  processes   the  `src_text'  option of
 3169pengine_create/1.
 3170*/
 3171
 3172pengine_src_text(Src, Module) :-
 3173    pengine_self(Self),
 3174    format(atom(ID), 'pengine://~w/src', [Self]),
 3175    extra_load_options(Self, Options),
 3176    setup_call_cleanup(
 3177        open_chars_stream(Src, Stream),
 3178        load_files(Module:ID,
 3179                   [ stream(Stream),
 3180                     module(Module),
 3181                     silent(true)
 3182                   | Options
 3183                   ]),
 3184        close(Stream)),
 3185    keep_source(Self, ID, Src).
 3186
 3187system:'#file'(File, _Line) :-
 3188    prolog_load_context(stream, Stream),
 3189    set_stream(Stream, file_name(File)),
 3190    set_stream(Stream, record_position(false)),
 3191    set_stream(Stream, record_position(true)).
 3192
 3193%%   pengine_src_url(+URL, +Module) is det
 3194%
 3195%    Asserts the clauses defined in URL in   the private database of the
 3196%    current Pengine. This predicate processes   the `src_url' option of
 3197%    pengine_create/1.
 3198%
 3199%    @tbd: make a sensible guess at the encoding.
 3200
 3201pengine_src_url(URL, Module) :-
 3202    pengine_self(Self),
 3203    uri_encoded(path, URL, Path),
 3204    format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
 3205    extra_load_options(Self, Options),
 3206    (   get_pengine_application(Self, Application),
 3207        setting(Application:debug_info, false)
 3208    ->  setup_call_cleanup(
 3209            http_open(URL, Stream, []),
 3210            ( set_stream(Stream, encoding(utf8)),
 3211              load_files(Module:ID,
 3212                         [ stream(Stream),
 3213                           module(Module)
 3214                         | Options
 3215                         ])
 3216            ),
 3217            close(Stream))
 3218    ;   setup_call_cleanup(
 3219            http_open(URL, TempStream, []),
 3220            ( set_stream(TempStream, encoding(utf8)),
 3221              read_string(TempStream, _, Src)
 3222            ),
 3223            close(TempStream)),
 3224        setup_call_cleanup(
 3225            open_chars_stream(Src, Stream),
 3226            load_files(Module:ID,
 3227                       [ stream(Stream),
 3228                         module(Module)
 3229                       | Options
 3230                       ]),
 3231            close(Stream)),
 3232        keep_source(Self, ID, Src)
 3233    ).
 3234
 3235
 3236extra_load_options(Pengine, Options) :-
 3237    pengine_not_sandboxed(Pengine),
 3238    !,
 3239    Options = [].
 3240extra_load_options(_, [sandboxed(true)]).
 3241
 3242
 3243keep_source(Pengine, ID, SrcText) :-
 3244    get_pengine_application(Pengine, Application),
 3245    setting(Application:debug_info, true),
 3246    !,
 3247    to_string(SrcText, SrcString),
 3248    assertz(pengine_data(Pengine, source(ID, SrcString))).
 3249keep_source(_, _, _).
 3250
 3251to_string(String, String) :-
 3252    string(String),
 3253    !.
 3254to_string(Atom, String) :-
 3255    atom_string(Atom, String),
 3256    !.
 3257
 3258		 /*******************************
 3259		 *            SANDBOX		*
 3260		 *******************************/
 3261
 3262:- multifile
 3263    sandbox:safe_primitive/1. 3264
 3265sandbox:safe_primitive(pengines:pengine_input(_, _)).
 3266sandbox:safe_primitive(pengines:pengine_output(_)).
 3267sandbox:safe_primitive(pengines:pengine_debug(_,_)).
 3268
 3269
 3270                 /*******************************
 3271                 *            MESSAGES          *
 3272                 *******************************/
 3273
 3274prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
 3275    [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
 3276      'This is normally caused by an insufficiently instantiated'-[], nl,
 3277      'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
 3278      'find all possible instantations of Var.'-[]
 3279    ]