. (utf8) 2/* Part of SWI-Prolog 3 4 Author: Torbjörn Lager and Jan Wielemaker 5 E-mail: J.Wielemaker@vu.nl 6 WWW: http://www.swi-prolog.org 7 Copyright (C): 2014-2024, Torbjörn Lager, 8 VU University Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(pengines, 39 [ pengine_create/1, % +Options 40 pengine_ask/3, % +Pengine, :Query, +Options 41 pengine_next/2, % +Pengine. +Options 42 pengine_stop/2, % +Pengine. +Options 43 pengine_event/2, % -Event, +Options 44 pengine_input/2, % +Prompt, -Term 45 pengine_output/1, % +Term 46 pengine_respond/3, % +Pengine, +Input, +Options 47 pengine_debug/2, % +Format, +Args 48 pengine_self/1, % -Pengine 49 pengine_pull_response/2, % +Pengine, +Options 50 pengine_destroy/1, % +Pengine 51 pengine_destroy/2, % +Pengine, +Options 52 pengine_abort/1, % +Pengine 53 pengine_application/1, % +Application 54 current_pengine_application/1, % ?Application 55 pengine_property/2, % ?Pengine, ?Property 56 pengine_user/1, % -User 57 pengine_event_loop/2, % :Closure, +Options 58 pengine_rpc/2, % +Server, :Goal 59 pengine_rpc/3 % +Server, :Goal, +Options 60 ]).
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error), 77 [ must_be/2, 78 existence_error/2, 79 permission_error/3, 80 domain_error/2 81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option), 88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(sandbox),[safe_goal/1]). 91:- autoload(library(statistics),[thread_statistics/2]). 92:- autoload(library(term_to_json),[term_to_json/2]). 93:- autoload(library(thread_pool), 94 [thread_pool_create/3,thread_create_in_pool/4]). 95:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 96:- autoload(library(uri), 97 [ uri_components/2, 98 uri_query_components/2, 99 uri_data/3, 100 uri_data/4, 101 uri_encoded/3 102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- autoload(library(http/http_dispatch), 106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111 112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json), 114 [http_read_json_dict/2,reply_json_dict/1]). 115 116:- if(exists_source(library(uuid))). 117:- autoload(library(uuid), [uuid/2]). 118:- endif. 119 120 121:- meta_predicate 122 pengine_create( ), 123 pengine_rpc( , , ), 124 pengine_event_loop( , ). 125 126:- multifile 127 write_result/3, % +Format, +Event, +Dict 128 event_to_json/3, % +Event, -JSON, +Format 129 prepare_module/3, % +Module, +Application, +Options 130 prepare_goal/3, % +GoalIn, -GoalOut, +Options 131 authentication_hook/3, % +Request, +Application, -User 132 not_sandboxed/2, % +User, +App 133 pengine_flush_output_hook/0. 134 135:- predicate_options(pengine_create/1, 1, 136 [ id(-atom), 137 alias(atom), 138 application(atom), 139 destroy(boolean), 140 server(atom), 141 ask(compound), 142 template(compound), 143 chunk(integer;oneof([false])), 144 bindings(list), 145 src_list(list), 146 src_text(any), % text 147 src_url(atom), 148 src_predicates(list) 149 ]). 150:- predicate_options(pengine_ask/3, 3, 151 [ template(any), 152 chunk(integer;oneof([false])), 153 bindings(list) 154 ]). 155:- predicate_options(pengine_next/2, 2, 156 [ chunk(integer), 157 pass_to(pengine_send/3, 3) 158 ]). 159:- predicate_options(pengine_stop/2, 2, 160 [ pass_to(pengine_send/3, 3) 161 ]). 162:- predicate_options(pengine_respond/3, 2, 163 [ pass_to(pengine_send/3, 3) 164 ]). 165:- predicate_options(pengine_rpc/3, 3, 166 [ chunk(integer;oneof([false])), 167 pass_to(pengine_create/1, 1) 168 ]). 169:- predicate_options(pengine_send/3, 3, 170 [ delay(number) 171 ]). 172:- predicate_options(pengine_event/2, 2, 173 [ listen(atom), 174 pass_to(system:thread_get_message/3, 3) 175 ]). 176:- predicate_options(pengine_pull_response/2, 2, 177 [ pass_to(http_open/3, 3) 178 ]). 179:- predicate_options(pengine_event_loop/2, 2, 180 []). % not yet implemented 181 182% :- debug(pengine(transition)). 183:- debug(pengine(debug)). % handle pengine_debug in pengine_rpc/3. 184 185goal_expansion(random_delay, Expanded) :- 186 ( debugging(pengine(delay)) 187 -> Expanded = do_random_delay 188 ; Expanded = true 189 ). 190 191do_random_delay :- 192 Delay is random(20)/1000, 193 sleep(Delay). 194 195:- meta_predicate % internal meta predicates 196 solve( , , , ), 197 findnsols_no_empty( , , , ), 198 pengine_event_loop( , , ).
Remaining options are passed to http_open/3 (meaningful only for non-local pengines) and thread_create/3. Note that for thread_create/3 only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options..
Successful creation of a pengine will return an event term of the following form:
An error will be returned if the pengine could not be created:
253pengine_create(M:Options0) :-
254 translate_local_sources(Options0, Options, M),
255 ( select_option(server(BaseURL), Options, RestOptions)
256 -> remote_pengine_create(BaseURL, RestOptions)
257 ; local_pengine_create(Options)
258 ).
src_predicates
and src_list
options into
src_text
. We need to do that anyway for remote pengines. For
local pengines, we could avoid this step, but there is very
little point in transferring source to a local pengine anyway as
local pengines can access any Prolog predicate that you make
visible to the application.
Multiple sources are concatenated to end up with a single src_text option.
272translate_local_sources(OptionsIn, Options, Module) :- 273 translate_local_sources(OptionsIn, Sources, Options2, Module), 274 ( Sources == [] 275 -> Options = Options2 276 ; Sources = [Source] 277 -> Options = [src_text(Source)|Options2] 278 ; atomics_to_string(Sources, Source) 279 -> Options = [src_text(Source)|Options2] 280 ). 281 282translate_local_sources([], [], [], _). 283translate_local_sources([H0|T], [S0|S], Options, M) :- 284 nonvar(H0), 285 translate_local_source(H0, S0, M), 286 !, 287 translate_local_sources(T, S, Options, M). 288translate_local_sources([H|T0], S, [H|T], M) :- 289 translate_local_sources(T0, S, T, M). 290 291translate_local_source(src_predicates(PIs), Source, M) :- 292 must_be(list, PIs), 293 with_output_to(string(Source), 294 maplist(list_in_module(M), PIs)). 295translate_local_source(src_list(Terms), Source, _) :- 296 must_be(list, Terms), 297 with_output_to(string(Source), 298 forall(member(Term, Terms), 299 format('~k .~n', [Term]))). 300translate_local_source(src_text(Source), Source, _). 301 302list_in_module(M, PI) :- 303 listing(M:PI).
pengine_send(NameOrID, Term, [])
.
*/
310pengine_send(Target, Event) :-
311 pengine_send(Target, Event, []).
Any remaining options are passed to http_open/3. */
326pengine_send(Target, Event, Options) :- 327 must_be(atom, Target), 328 pengine_send2(Target, Event, Options). 329 330pengine_send2(self, Event, Options) :- 331 !, 332 thread_self(Queue), 333 delay_message(queue(Queue), Event, Options). 334pengine_send2(Name, Event, Options) :- 335 child(Name, Target), 336 !, 337 delay_message(pengine(Target), Event, Options). 338pengine_send2(Target, Event, Options) :- 339 delay_message(pengine(Target), Event, Options). 340 341delay_message(Target, Event, Options) :- 342 option(delay(Delay), Options), 343 !, 344 alarm(Delay, 345 send_message(Target, Event, Options), 346 _AlarmID, 347 [remove(true)]). 348delay_message(Target, Event, Options) :- 349 random_delay, 350 send_message(Target, Event, Options). 351 352send_message(queue(Queue), Event, _) :- 353 thread_send_message(Queue, pengine_request(Event)). 354send_message(pengine(Pengine), Event, Options) :- 355 ( pengine_remote(Pengine, Server) 356 -> remote_pengine_send(Server, Pengine, Event, Options) 357 ; pengine_thread(Pengine, Thread) 358 -> thread_send_message(Thread, pengine_request(Event)) 359 ; existence_error(pengine, Pengine) 360 ).
idle_limit
setting while using thread_idle/2 to minimis
resources.370pengine_request(Request) :- 371 thread_self(Me), 372 thread_get_message(Me, pengine_request(Request), [timeout(1)]), 373 !. 374pengine_request(Request) :- 375 pengine_self(Self), 376 get_pengine_application(Self, Application), 377 setting(Application:idle_limit, IdleLimit0), 378 IdleLimit is IdleLimit0-1, 379 thread_self(Me), 380 ( thread_idle(thread_get_message(Me, pengine_request(Request), 381 [timeout(IdleLimit)]), 382 long) 383 -> true 384 ; Request = destroy 385 ).
If the message cannot be sent within the idle_limit
setting of
the pengine, abort the pengine.
398pengine_reply(Event) :- 399 pengine_parent(Queue), 400 pengine_reply(Queue, Event). 401 402pengine_reply(_Queue, _Event0) :- 403 nb_current(pengine_idle_limit_exceeded, true), 404 !. 405pengine_reply(Queue, Event0) :- 406 arg(1, Event0, ID), 407 wrap_first_answer(ID, Event0, Event), 408 random_delay, 409 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]), 410 ( pengine_self(ID), 411 \+ pengine_detached(ID, _) 412 -> get_pengine_application(ID, Application), 413 setting(Application:idle_limit, IdleLimit), 414 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]), 415 ( thread_send_message(Queue, pengine_event(ID, Event), 416 [ timeout(IdleLimit) 417 ]) 418 -> true 419 ; thread_self(Me), 420 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)', 421 [ID, Me]), 422 nb_setval(pengine_idle_limit_exceeded, true), 423 thread_detach(Me), 424 abort 425 ) 426 ; thread_send_message(Queue, pengine_event(ID, Event)) 427 ). 428 429wrap_first_answer(ID, Event0, CreateEvent) :- 430 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]), 431 arg(1, CreateEvent, ID), 432 !, 433 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])). 434wrap_first_answer(_ID, Event, Event). 435 436 437empty_queue :- 438 pengine_parent(Queue), 439 empty_queue(Queue, 0, Discarded), 440 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]). 441 442empty_queue(Queue, C0, C) :- 443 thread_get_message(Queue, _Term, [timeout(0)]), 444 !, 445 C1 is C0+1, 446 empty_queue(Queue, C1, C). 447empty_queue(_, C, C).
Options is a list of options:
false
, the
Pengine goal is not executed using findall/3 and friends and
we do not backtrack immediately over the goal. As a result,
changes to backtrackable global state are retained. This is
similar that using set_prolog_flag(toplevel_mode, recursive)
.Name = Var
terms, providing access to the actual variable
names.Any remaining options are passed to pengine_send/3.
Note that the predicate pengine_ask/3 is deterministic, even for queries that have more than one solution. Also, the variables in Query will not be bound. Instead, results will be returned in the form of event terms.
true
or false
, indicating whether we can expect the
pengine to be able to return more solutions or not, would we call
pengine_next/2.Defined in terms of pengine_send/3, like so:
pengine_ask(ID, Query, Options) :- partition(pengine_ask_option, Options, AskOptions, SendOptions), pengine_send(ID, ask(Query, AskOptions), SendOptions).
*/
516pengine_ask(ID, Query, Options) :- 517 partition(pengine_ask_option, Options, AskOptions, SendOptions), 518 pengine_send(ID, ask(Query, AskOptions), SendOptions). 519 520 521pengine_ask_option(template(_)). 522pengine_ask_option(chunk(_)). 523pengine_ask_option(bindings(_)). 524pengine_ask_option(breakpoints(_)).
chunk(false)
.Remaining options are passed to pengine_send/3. The result of re-executing the current goal is returned to the caller's message queue in the form of event terms.
Defined in terms of pengine_send/3, as follows:
pengine_next(ID, Options) :- pengine_send(ID, next, Options).
*/
569pengine_next(ID, Options) :- 570 select_option(chunk(Count), Options, Options1), 571 !, 572 pengine_send(ID, next(Count), Options1). 573pengine_next(ID, Options) :- 574 pengine_send(ID, next, Options).
Defined in terms of pengine_send/3, like so:
pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
*/
590pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
601pengine_abort(Name) :-
602 ( child(Name, Pengine)
603 -> true
604 ; Pengine = Name
605 ),
606 ( pengine_remote(Pengine, Server)
607 -> remote_pengine_abort(Server, Pengine, [])
608 ; pengine_thread(Pengine, Thread),
609 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
610 catch(thread_signal(Thread, throw(abort_query)), _, true)
611 ).
force(true)
, the pengine
is killed using abort/0 and pengine_destroy/2 succeeds.
*/621pengine_destroy(ID) :- 622 pengine_destroy(ID, []). 623 624pengine_destroy(Name, Options) :- 625 ( child(Name, ID) 626 -> true 627 ; ID = Name 628 ), 629 option(force(true), Options), 630 !, 631 ( pengine_thread(ID, Thread) 632 -> catch(thread_signal(Thread, abort), 633 error(existence_error(thread, _), _), true) 634 ; true 635 ). 636pengine_destroy(ID, Options) :- 637 catch(pengine_send(ID, destroy, Options), 638 error(existence_error(pengine, ID), _), 639 retractall(child(_,ID))). 640 641 642/*================= pengines administration ======================= 643*/
thread(ThreadId)
remote(URL)
654:- dynamic 655 current_pengine/6, % Id, ParentId, Thread, URL, App, Destroy 656 pengine_queue/4, % Id, Queue, TimeOut, Time 657 output_queue/3, % Id, Queue, Time 658 pengine_user/2, % Id, User 659 pengine_data/2, % Id, Data 660 pengine_detached/2. % Id, Data 661:- volatile 662 current_pengine/6, 663 pengine_queue/4, 664 output_queue/3, 665 pengine_user/2, 666 pengine_data/2, 667 pengine_detached/2. 668 669:- thread_local 670 child/2. % ?Name, ?Child
676pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :- 677 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)). 678 679pengine_register_remote(Id, URL, Application, Destroy) :- 680 thread_self(Queue), 681 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
http
and the queue is the
message queue used to send events to the HTTP workers.689pengine_unregister(Id) :- 690 thread_self(Me), 691 ( current_pengine(Id, Queue, Me, http, _, _) 692 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue)) 693 ; true 694 ), 695 retractall(current_pengine(Id, _, Me, _, _, _)), 696 retractall(pengine_user(Id, _)), 697 retractall(pengine_data(Id, _)). 698 699pengine_unregister_remote(Id) :- 700 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
706pengine_self(Id) :- 707 thread_self(Thread), 708 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy). 709 710pengine_parent(Parent) :- 711 nb_getval(pengine_parent, Parent). 712 713pengine_thread(Pengine, Thread) :- 714 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy), 715 Thread \== 0, 716 !. 717 718pengine_remote(Pengine, URL) :- 719 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy). 720 721get_pengine_application(Pengine, Application) :- 722 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy), 723 !. 724 725get_pengine_module(Pengine, Pengine). 726 727:- if(current_predicate(uuid/2)). 728pengine_uuid(Id) :- 729 uuid(Id, [version(4)]). % Version 4 is random. 730:- else. 731pengine_uuid(Id) :- 732 ( current_prolog_flag(max_integer, Max1) 733 -> Max is Max1-1 734 ; Max is 1<<128 735 ), 736 random_between(0, Max, Num), 737 atom_number(Id, Num). 738:- endif.
This also runs Goal if the Pengine no longer exists. This deals with Pengines terminated through destroy_or_continue/1.
755:- meta_predicate protect_pengine( , ). 756 757protect_pengine(Id, Goal) :- 758 term_hash(Id, Hash), 759 LockN is Hash mod 64, 760 atom_concat(pengine_done_, LockN, Lock), 761 with_mutex(Lock, 762 ( pengine_thread(Id, _) 763 -> Goal 764 ; Goal 765 )).
pengine_sandbox
. The example below creates a new application
address_book
and imports the API defined in the module file
adress_book_api.pl
into the application.
:- pengine_application(address_book). :- use_module(address_book:adress_book_api).
*/
782pengine_application(Application) :- 783 throw(error(context_error(nodirective, 784 pengine_application(Application)), _)). 785 786:- multifile 787 system:term_expansion/2, 788 current_application/1.
796current_pengine_application(Application) :- 797 current_application(Application). 798 799 800% Default settings for all applications 801 802:- setting(thread_pool_size, integer, 100, 803 'Maximum number of pengines this application can run.'). 804:- setting(thread_pool_stacks, list(compound), [], 805 'Maximum stack sizes for pengines this application can run.'). 806:- setting(slave_limit, integer, 3, 807 'Maximum number of slave pengines a master pengine can create.'). 808:- setting(time_limit, number, 300, 809 'Maximum time to wait for output'). 810:- setting(idle_limit, number, 300, 811 'Pengine auto-destroys when idle for this time'). 812:- setting(safe_goal_limit, number, 10, 813 'Maximum time to try proving safety of the goal'). 814:- setting(program_space, integer, 100_000_000, 815 'Maximum memory used by predicates'). 816:- setting(allow_from, list(atom), [*], 817 'IP addresses from which remotes are allowed to connect'). 818:- setting(deny_from, list(atom), [], 819 'IP addresses from which remotes are NOT allowed to connect'). 820:- setting(debug_info, boolean, false, 821 'Keep information to support source-level debugging'). 822 823 824systemterm_expansion((:- pengine_application(Application)), Expanded) :- 825 must_be(atom, Application), 826 ( module_property(Application, file(_)) 827 -> permission_error(create, pengine_application, Application) 828 ; true 829 ), 830 expand_term((:- setting(Application:thread_pool_size, integer, 831 setting(pengines:thread_pool_size), 832 'Maximum number of pengines this \c 833 application can run.')), 834 ThreadPoolSizeSetting), 835 expand_term((:- setting(Application:thread_pool_stacks, list(compound), 836 setting(pengines:thread_pool_stacks), 837 'Maximum stack sizes for pengines \c 838 this application can run.')), 839 ThreadPoolStacksSetting), 840 expand_term((:- setting(Application:slave_limit, integer, 841 setting(pengines:slave_limit), 842 'Maximum number of local slave pengines \c 843 a master pengine can create.')), 844 SlaveLimitSetting), 845 expand_term((:- setting(Application:time_limit, number, 846 setting(pengines:time_limit), 847 'Maximum time to wait for output')), 848 TimeLimitSetting), 849 expand_term((:- setting(Application:idle_limit, number, 850 setting(pengines:idle_limit), 851 'Pengine auto-destroys when idle for this time')), 852 IdleLimitSetting), 853 expand_term((:- setting(Application:safe_goal_limit, number, 854 setting(pengines:safe_goal_limit), 855 'Maximum time to try proving safety of the goal')), 856 SafeGoalLimitSetting), 857 expand_term((:- setting(Application:program_space, integer, 858 setting(pengines:program_space), 859 'Maximum memory used by predicates')), 860 ProgramSpaceSetting), 861 expand_term((:- setting(Application:allow_from, list(atom), 862 setting(pengines:allow_from), 863 'IP addresses from which remotes are allowed \c 864 to connect')), 865 AllowFromSetting), 866 expand_term((:- setting(Application:deny_from, list(atom), 867 setting(pengines:deny_from), 868 'IP addresses from which remotes are NOT \c 869 allowed to connect')), 870 DenyFromSetting), 871 expand_term((:- setting(Application:debug_info, boolean, 872 setting(pengines:debug_info), 873 'Keep information to support source-level \c 874 debugging')), 875 DebugInfoSetting), 876 flatten([ pengines:current_application(Application), 877 ThreadPoolSizeSetting, 878 ThreadPoolStacksSetting, 879 SlaveLimitSetting, 880 TimeLimitSetting, 881 IdleLimitSetting, 882 SafeGoalLimitSetting, 883 ProgramSpaceSetting, 884 AllowFromSetting, 885 DenyFromSetting, 886 DebugInfoSetting 887 ], Expanded). 888 889% Register default application 890 891:- pengine_application(pengine_sandbox).
alias
option when creating the pengine.true
if the pengines is destroyed automatically
after completing the query.debug_info
is present.928pengine_property(Id, Prop) :- 929 nonvar(Id), nonvar(Prop), 930 pengine_property2(Prop, Id), 931 !. 932pengine_property(Id, Prop) :- 933 pengine_property2(Prop, Id). 934 935pengine_property2(self(Id), Id) :- 936 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 937pengine_property2(module(Id), Id) :- 938 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy). 939pengine_property2(alias(Alias), Id) :- 940 child(Alias, Id), 941 Alias \== Id. 942pengine_property2(thread(Thread), Id) :- 943 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy), 944 Thread \== 0. 945pengine_property2(remote(Server), Id) :- 946 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy). 947pengine_property2(application(Application), Id) :- 948 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy). 949pengine_property2(destroy(Destroy), Id) :- 950 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy). 951pengine_property2(parent(Parent), Id) :- 952 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy). 953pengine_property2(source(SourceID, Source), Id) :- 954 pengine_data(Id, source(SourceID, Source)). 955pengine_property2(detached(When), Id) :- 956 pengine_detached(Id, When).
963pengine_output(Term) :-
964 pengine_self(Me),
965 pengine_reply(output(Me, Term)).
console.log(Message)
if there is a console. The predicate
pengine_rpc/3 calls debug(pengine(debug), '~w', [Message])
. The debug
topic pengine(debug)
is enabled by default.
980pengine_debug(Format, Args) :- 981 pengine_parent(Queue), 982 pengine_self(Self), 983 catch(safe_goal(format(atom(_), Format, Args)), E, true), 984 ( var(E) 985 -> format(atom(Message), Format, Args) 986 ; message_to_string(E, Message) 987 ), 988 pengine_reply(Queue, debug(Self, Message)). 989 990 991/*================= Local pengine ======================= 992*/
1003local_pengine_create(Options) :-
1004 thread_self(Self),
1005 option(application(Application), Options, pengine_sandbox),
1006 create(Self, Child, Options, local, Application),
1007 option(alias(Name), Options, Child),
1008 assert(child(Name, Child)).
1015:- multifile thread_pool:create_pool/1. 1016 1017thread_poolcreate_pool(Application) :- 1018 current_application(Application), 1019 setting(Application:thread_pool_size, Size), 1020 setting(Application:thread_pool_stacks, Stacks), 1021 thread_pool_create(Application, Size, Stacks).
1031create(Queue, Child, Options, local, Application) :- 1032 !, 1033 pengine_child_id(Child), 1034 create0(Queue, Child, Options, local, Application). 1035create(Queue, Child, Options, URL, Application) :- 1036 pengine_child_id(Child), 1037 catch(create0(Queue, Child, Options, URL, Application), 1038 Error, 1039 create_error(Queue, Child, Error)). 1040 1041pengine_child_id(Child) :- 1042 ( nonvar(Child) 1043 -> true 1044 ; pengine_uuid(Child) 1045 ). 1046 1047create_error(Queue, Child, Error) :- 1048 pengine_reply(Queue, error(Child, Error)). 1049 1050create0(Queue, Child, Options, URL, Application) :- 1051 ( current_application(Application) 1052 -> true 1053 ; existence_error(pengine_application, Application) 1054 ), 1055 ( URL \== http % pengine is _not_ a child of the 1056 % HTTP server thread 1057 -> aggregate_all(count, child(_,_), Count), 1058 setting(Application:slave_limit, Max), 1059 ( Count >= Max 1060 -> throw(error(resource_error(max_pengines), _)) 1061 ; true 1062 ) 1063 ; true 1064 ), 1065 partition(pengine_create_option, Options, PengineOptions, RestOptions), 1066 thread_create_in_pool( 1067 Application, 1068 pengine_main(Queue, PengineOptions, Application), ChildThread, 1069 [ at_exit(pengine_done) 1070 | RestOptions 1071 ]), 1072 option(destroy(Destroy), PengineOptions, true), 1073 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy), 1074 thread_send_message(ChildThread, pengine_registered(Child)), 1075 ( option(id(Id), Options) 1076 -> Id = Child 1077 ; true 1078 ). 1079 1080pengine_create_option(src_text(_)). 1081pengine_create_option(src_url(_)). 1082pengine_create_option(application(_)). 1083pengine_create_option(destroy(_)). 1084pengine_create_option(ask(_)). 1085pengine_create_option(template(_)). 1086pengine_create_option(bindings(_)). 1087pengine_create_option(chunk(_)). 1088pengine_create_option(alias(_)). 1089pengine_create_option(user(_)).
at_exit
option. Destroys child
pengines using pengine_destroy/1. Cleaning up the Pengine is
synchronised by the pengine_done
mutex. See read_event/6.1098:- public 1099 pengine_done/0. 1100 1101pengine_done :- 1102 thread_self(Me), 1103 ( thread_property(Me, status(exception(Ex))), 1104 abort_exception(Ex), 1105 thread_detach(Me), 1106 pengine_self(Pengine) 1107 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))), 1108 error(_,_), true) 1109 ; true 1110 ), 1111 forall(child(_Name, Child), 1112 pengine_destroy(Child)), 1113 pengine_self(Id), 1114 protect_pengine(Id, pengine_unregister(Id)). 1115 1116abort_exception('$aborted'). 1117abort_exception(unwind(abort)).
1124:- thread_local wrap_first_answer_in_create_event/2. 1125 1126:- meta_predicate 1127 pengine_prepare_source( , ). 1128 1129pengine_main(Parent, Options, Application) :- 1130 fix_streams, 1131 thread_get_message(pengine_registered(Self)), 1132 nb_setval(pengine_parent, Parent), 1133 pengine_register_user(Options), 1134 set_prolog_flag(mitigate_spectre, true), 1135 catch(in_temporary_module( 1136 Self, 1137 pengine_prepare_source(Application, Options), 1138 pengine_create_and_loop(Self, Application, Options)), 1139 prepare_source_failed, 1140 pengine_terminate(Self)). 1141 1142pengine_create_and_loop(Self, Application, Options) :- 1143 setting(Application:slave_limit, SlaveLimit), 1144 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]), 1145 ( option(ask(Query0), Options) 1146 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)), 1147 ( string(Query0) % string is not callable 1148 -> ( option(template(TemplateS), Options) 1149 -> Ask2 = Query0-TemplateS 1150 ; Ask2 = Query0 1151 ), 1152 catch(ask_to_term(Ask2, Self, Query, Template, Bindings), 1153 Error, true), 1154 ( var(Error) 1155 -> true 1156 ; send_error(Error), 1157 throw(prepare_source_failed) 1158 ) 1159 ; Query = Query0, 1160 option(template(Template), Options, Query), 1161 option(bindings(Bindings), Options, []) 1162 ), 1163 option(chunk(Chunk), Options, 1), 1164 pengine_ask(Self, Query, 1165 [ template(Template), 1166 chunk(Chunk), 1167 bindings(Bindings) 1168 ]) 1169 ; Extra = [], 1170 pengine_reply(CreateEvent) 1171 ), 1172 pengine_main_loop(Self).
1182ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :- 1183 !, 1184 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]), 1185 term_string(t(Template1,Ask1), AskTemplate, 1186 [ variable_names(Bindings0), 1187 module(Module) 1188 ]), 1189 phrase(template_bindings(Template1, Bindings0), Bindings). 1190ask_to_term(Ask, Module, Ask1, Template, Bindings1) :- 1191 term_string(Ask1, Ask, 1192 [ variable_names(Bindings), 1193 module(Module) 1194 ]), 1195 exclude(anon, Bindings, Bindings1), 1196 dict_create(Template, swish_default_template, Bindings1). 1197 1198template_bindings(Var, Bindings) --> 1199 { var(Var) }, !, 1200 ( { var_binding(Bindings, Var, Binding) 1201 } 1202 -> [Binding] 1203 ; [] 1204 ). 1205template_bindings([H|T], Bindings) --> 1206 !, 1207 template_bindings(H, Bindings), 1208 template_bindings(T, Bindings). 1209template_bindings(Compoound, Bindings) --> 1210 { compound(Compoound), !, 1211 compound_name_arguments(Compoound, _, Args) 1212 }, 1213 template_bindings(Args, Bindings). 1214template_bindings(_, _) --> []. 1215 1216var_binding(Bindings, Var, Binding) :- 1217 member(Binding, Bindings), 1218 arg(2, Binding, V), 1219 V == Var, !.
1226fix_streams :- 1227 fix_stream(current_output). 1228 1229fix_stream(Name) :- 1230 is_cgi_stream(Name), 1231 !, 1232 debug(pengine(stream), '~w is a CGI stream!', [Name]), 1233 set_stream(user_output, alias(Name)). 1234fix_stream(_).
1243pengine_prepare_source(Module:Application, Options) :- 1244 setting(Application:program_space, SpaceLimit), 1245 set_module(Module:program_space(SpaceLimit)), 1246 delete_import_module(Module, user), 1247 add_import_module(Module, Application, start), 1248 catch(prep_module(Module, Application, Options), Error, true), 1249 ( var(Error) 1250 -> true 1251 ; send_error(Error), 1252 throw(prepare_source_failed) 1253 ). 1254 1255prep_module(Module, Application, Options) :- 1256 maplist(copy_flag(Module, Application), [var_prefix]), 1257 forall(prepare_module(Module, Application, Options), true), 1258 setup_call_cleanup( 1259 '$set_source_module'(OldModule, Module), 1260 maplist(process_create_option(Module), Options), 1261 '$set_source_module'(OldModule)). 1262 1263copy_flag(Module, Application, Flag) :- 1264 current_prolog_flag(ApplicationFlag, Value), 1265 !, 1266 set_prolog_flag(ModuleFlag, Value). 1267copy_flag(_, _, _). 1268 1269process_create_option(Application, src_text(Text)) :- 1270 !, 1271 pengine_src_text(Text, Application). 1272process_create_option(Application, src_url(URL)) :- 1273 !, 1274 pengine_src_url(URL, Application). 1275process_create_option(_, _).
src_text
and
src_url
options1298pengine_main_loop(ID) :- 1299 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)). 1300 1301pengine_aborted(ID) :- 1302 thread_self(Self), 1303 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]), 1304 empty_queue, 1305 destroy_or_continue(abort(ID)).
1318guarded_main_loop(ID) :- 1319 pengine_request(Request), 1320 ( Request = destroy 1321 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]), 1322 pengine_terminate(ID) 1323 ; Request = ask(Goal, Options) 1324 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]), 1325 ask(ID, Goal, Options) 1326 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]), 1327 pengine_reply(error(ID, error(protocol_error, _))), 1328 guarded_main_loop(ID) 1329 ). 1330 1331 1332pengine_terminate(ID) :- 1333 pengine_reply(destroy(ID)), 1334 thread_self(Me), % Make the thread silently disappear 1335 thread_detach(Me).
1346solve(Chunk, Template, Goal, ID) :- 1347 prolog_current_choice(Choice), 1348 ( integer(Chunk) 1349 -> State = count(Chunk) 1350 ; Chunk == false 1351 -> State = no_chunk 1352 ; domain_error(chunk, Chunk) 1353 ), 1354 statistics(cputime, Epoch), 1355 Time = time(Epoch), 1356 nb_current('$variable_names', Bindings), 1357 filter_template(Template, Bindings, Template2), 1358 '$current_typein_module'(CurrTypeIn), 1359 ( '$set_typein_module'(ID), 1360 call_cleanup(catch(findnsols_no_empty(State, Template2, 1361 set_projection(Goal, Bindings), 1362 Result), 1363 Error, true), 1364 query_done(Det, CurrTypeIn)), 1365 arg(1, Time, T0), 1366 statistics(cputime, T1), 1367 CPUTime is T1-T0, 1368 forall(pengine_flush_output_hook, true), 1369 ( var(Error) 1370 -> projection(Projection), 1371 ( var(Det) 1372 -> pengine_reply(success(ID, Result, Projection, 1373 CPUTime, true)), 1374 more_solutions(ID, Choice, State, Time) 1375 ; !, % commit 1376 destroy_or_continue(success(ID, Result, Projection, 1377 CPUTime, false)) 1378 ) 1379 ; !, % commit 1380 ( Error == abort_query 1381 -> throw(Error) 1382 ; destroy_or_continue(error(ID, Error)) 1383 ) 1384 ) 1385 ; !, % commit 1386 arg(1, Time, T0), 1387 statistics(cputime, T1), 1388 CPUTime is T1-T0, 1389 destroy_or_continue(failure(ID, CPUTime)) 1390 ). 1391solve(_, _, _, _). % leave a choice point 1392 1393query_done(true, CurrTypeIn) :- 1394 '$set_typein_module'(CurrTypeIn).
1403set_projection(Goal, Bindings) :- 1404 b_setval('$variable_names', Bindings), 1405 call(Goal). 1406 1407projection(Projection) :- 1408 nb_current('$variable_names', Bindings), 1409 !, 1410 maplist(var_name, Bindings, Projection). 1411projection([]).
1421filter_template(Template0, Bindings, Template) :- 1422 is_dict(Template0, swish_default_template), 1423 !, 1424 dict_create(Template, swish_default_template, Bindings). 1425filter_template(Template, _Bindings, Template). 1426 1427findnsols_no_empty(no_chunk, Template, Goal, List) => 1428 List = [Template], 1429 call(Goal). 1430findnsols_no_empty(State, Template, Goal, List) => 1431 findnsols(State, Template, Goal, List), 1432 List \== []. 1433 1434destroy_or_continue(Event) :- 1435 arg(1, Event, ID), 1436 ( pengine_property(ID, destroy(true)) 1437 -> thread_self(Me), 1438 thread_detach(Me), 1439 pengine_reply(destroy(ID, Event)) 1440 ; pengine_reply(Event), 1441 guarded_main_loop(ID) 1442 ).
chunk
solutions.next
, but sets the new chunk-size to Count.1460more_solutions(ID, Choice, State, Time) :- 1461 pengine_request(Event), 1462 more_solutions(Event, ID, Choice, State, Time). 1463 1464more_solutions(stop, ID, _Choice, _State, _Time) :- 1465 !, 1466 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]), 1467 destroy_or_continue(stop(ID)). 1468more_solutions(next, ID, _Choice, _State, Time) :- 1469 !, 1470 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]), 1471 statistics(cputime, T0), 1472 nb_setarg(1, Time, T0), 1473 fail. 1474more_solutions(next(Count), ID, _Choice, State, Time) :- 1475 Count > 0, 1476 State = count(_), % else fallthrough to protocol error 1477 !, 1478 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]), 1479 nb_setarg(1, State, Count), 1480 statistics(cputime, T0), 1481 nb_setarg(1, Time, T0), 1482 fail. 1483more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :- 1484 !, 1485 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]), 1486 prolog_cut_to(Choice), 1487 ask(ID, Goal, Options). 1488more_solutions(destroy, ID, _Choice, _State, _Time) :- 1489 !, 1490 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]), 1491 pengine_terminate(ID). 1492more_solutions(Event, ID, Choice, State, Time) :- 1493 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]), 1494 pengine_reply(error(ID, error(protocol_error, _))), 1495 more_solutions(ID, Choice, State, Time).
chunk(N)
option.
1503ask(ID, Goal, Options) :-
1504 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1505 !,
1506 ( var(Error)
1507 -> option(template(Template), Options, Goal),
1508 option(chunk(N), Options, 1),
1509 solve(N, Template, Goal1, ID)
1510 ; pengine_reply(error(ID, Error)),
1511 guarded_main_loop(ID)
1512 ).
Note that expand_goal(Module:GoalIn, GoalOut)
is what we'd like
to write, but this does not work correctly if the user wishes to
expand X:Y
while interpreting X not as the module in which
to run Y. This happens in the CQL package. Possibly we should
disallow this reinterpretation?
1526prepare_goal(ID, Goal0, Module:Goal, Options) :-
1527 option(bindings(Bindings), Options, []),
1528 b_setval('$variable_names', Bindings),
1529 ( prepare_goal(Goal0, Goal1, Options)
1530 -> true
1531 ; Goal1 = Goal0
1532 ),
1533 get_pengine_module(ID, Module),
1534 setup_call_cleanup(
1535 '$set_source_module'(Old, Module),
1536 expand_goal(Goal1, Goal),
1537 '$set_source_module'(_, Old)),
1538 ( pengine_not_sandboxed(ID)
1539 -> true
1540 ; get_pengine_application(ID, App),
1541 setting(App:safe_goal_limit, Limit),
1542 catch(call_with_time_limit(
1543 Limit,
1544 safe_goal(Module:Goal)), E, true)
1545 -> ( var(E)
1546 -> true
1547 ; E = time_limit_exceeded
1548 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1549 ; throw(E)
1550 )
1551 ).
not_sandboxed(User, Application)
must succeed.
1571pengine_not_sandboxed(ID) :-
1572 pengine_user(ID, User),
1573 pengine_property(ID, application(App)),
1574 not_sandboxed(User, App),
1575 !.
1597pengine_pull_response(Pengine, Options) :- 1598 pengine_remote(Pengine, Server), 1599 !, 1600 remote_pengine_pull_response(Server, Pengine, Options). 1601pengine_pull_response(_ID, _Options).
1610pengine_input(Prompt, Term) :-
1611 pengine_self(Self),
1612 pengine_parent(Parent),
1613 pengine_reply(Parent, prompt(Self, Prompt)),
1614 pengine_request(Request),
1615 ( Request = input(Input)
1616 -> Term = Input
1617 ; Request == destroy
1618 -> abort
1619 ; throw(error(protocol_error,_))
1620 ).
Defined in terms of pengine_send/3, as follows:
pengine_respond(Pengine, Input, Options) :- pengine_send(Pengine, input(Input), Options).
*/
1637pengine_respond(Pengine, Input, Options) :-
1638 pengine_send(Pengine, input(Input), Options).
1647send_error(error(Formal, context(prolog_stack(Frames), Message))) :- 1648 is_list(Frames), 1649 !, 1650 with_output_to(string(Stack), 1651 print_prolog_backtrace(current_output, Frames)), 1652 pengine_self(Self), 1653 replace_blobs(Formal, Formal1), 1654 replace_blobs(Message, Message1), 1655 pengine_reply(error(Self, error(Formal1, 1656 context(prolog_stack(Stack), Message1)))). 1657send_error(Error) :- 1658 pengine_self(Self), 1659 replace_blobs(Error, Error1), 1660 pengine_reply(error(Self, Error1)).
1668replace_blobs(Blob, Atom) :- 1669 blob(Blob, Type), Type \== text, 1670 !, 1671 format(atom(Atom), '~p', [Blob]). 1672replace_blobs(Term0, Term) :- 1673 compound(Term0), 1674 !, 1675 compound_name_arguments(Term0, Name, Args0), 1676 maplist(replace_blobs, Args0, Args), 1677 compound_name_arguments(Term, Name, Args). 1678replace_blobs(Term, Term). 1679 1680 1681/*================= Remote pengines ======================= 1682*/ 1683 1684 1685remote_pengine_create(BaseURL, Options) :- 1686 partition(pengine_create_option, Options, PengineOptions0, RestOptions), 1687 ( option(ask(Query), PengineOptions0), 1688 \+ option(template(_Template), PengineOptions0) 1689 -> PengineOptions = [template(Query)|PengineOptions0] 1690 ; PengineOptions = PengineOptions0 1691 ), 1692 options_to_dict(PengineOptions, PostData), 1693 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions), 1694 arg(1, Reply, ID), 1695 ( option(id(ID2), Options) 1696 -> ID = ID2 1697 ; true 1698 ), 1699 option(alias(Name), Options, ID), 1700 assert(child(Name, ID)), 1701 ( ( functor(Reply, create, _) % actually created 1702 ; functor(Reply, output, _) % compiler messages 1703 ) 1704 -> option(application(Application), PengineOptions, pengine_sandbox), 1705 option(destroy(Destroy), PengineOptions, true), 1706 pengine_register_remote(ID, BaseURL, Application, Destroy) 1707 ; true 1708 ), 1709 thread_self(Queue), 1710 pengine_reply(Queue, Reply). 1711 1712options_to_dict(Options, Dict) :- 1713 select_option(ask(Ask), Options, Options1), 1714 select_option(template(Template), Options1, Options2), 1715 !, 1716 no_numbered_var_in(Ask+Template), 1717 findall(AskString-TemplateString, 1718 ask_template_to_strings(Ask, Template, AskString, TemplateString), 1719 [ AskString-TemplateString ]), 1720 options_to_dict(Options2, Dict0), 1721 Dict = Dict0.put(_{ask:AskString,template:TemplateString}). 1722options_to_dict(Options, Dict) :- 1723 maplist(prolog_option, Options, Options1), 1724 dict_create(Dict, _, Options1). 1725 1726no_numbered_var_in(Term) :- 1727 sub_term(Sub, Term), 1728 subsumes_term('$VAR'(_), Sub), 1729 !, 1730 domain_error(numbered_vars_free_term, Term). 1731no_numbered_var_in(_). 1732 1733ask_template_to_strings(Ask, Template, AskString, TemplateString) :- 1734 numbervars(Ask+Template, 0, _), 1735 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ], 1736 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts, 1737 Template, WOpts 1738 ]), 1739 split_string(AskTemplate, "\n", "", [AskString, TemplateString]). 1740 1741prolog_option(Option0, Option) :- 1742 create_option_type(Option0, term), 1743 !, 1744 Option0 =.. [Name,Value], 1745 format(string(String), '~k', [Value]), 1746 Option =.. [Name,String]. 1747prolog_option(Option, Option). 1748 1749create_option_type(ask(_), term). 1750create_option_type(template(_), term). 1751create_option_type(application(_), atom). 1752 1753remote_pengine_send(BaseURL, ID, Event, Options) :- 1754 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options), 1755 thread_self(Queue), 1756 pengine_reply(Queue, Reply). 1757 1758remote_pengine_pull_response(BaseURL, ID, Options) :- 1759 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options), 1760 thread_self(Queue), 1761 pengine_reply(Queue, Reply). 1762 1763remote_pengine_abort(BaseURL, ID, Options) :- 1764 remote_send_rec(BaseURL, abort, ID, [], Reply, Options), 1765 thread_self(Queue), 1766 pengine_reply(Queue, Reply).
1773remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :- 1774 !, 1775 server_url(Server, Action, [id=ID], URL), 1776 http_open(URL, Stream, % putting this in setup_call_cleanup/3 1777 [ post(prolog(Event)) % makes it impossible to interrupt. 1778 | Options 1779 ]), 1780 call_cleanup( 1781 read_prolog_reply(Stream, Reply), 1782 close(Stream)). 1783remote_send_rec(Server, Action, ID, Params, Reply, Options) :- 1784 server_url(Server, Action, [id=ID|Params], URL), 1785 http_open(URL, Stream, Options), 1786 call_cleanup( 1787 read_prolog_reply(Stream, Reply), 1788 close(Stream)). 1789 1790remote_post_rec(Server, Action, Data, Reply, Options) :- 1791 server_url(Server, Action, [], URL), 1792 probe(Action, URL, Options), 1793 http_open(URL, Stream, 1794 [ post(json(Data)) 1795 | Options 1796 ]), 1797 call_cleanup( 1798 read_prolog_reply(Stream, Reply), 1799 close(Stream)).
1807probe(create, URL, Options) :- 1808 !, 1809 http_open(URL, Stream, [method(options)|Options]), 1810 close(Stream). 1811probe(_, _, _). 1812 1813read_prolog_reply(In, Reply) :- 1814 set_stream(In, encoding(utf8)), 1815 read(In, Reply0), 1816 rebind_cycles(Reply0, Reply). 1817 1818rebind_cycles(@(Reply, Bindings), Reply) :- 1819 is_list(Bindings), 1820 !, 1821 maplist(bind, Bindings). 1822rebind_cycles(Reply, Reply). 1823 1824bind(Var = Value) :- 1825 Var = Value. 1826 1827server_url(Server, Action, Params, URL) :- 1828 atom_concat('pengine/', Action, PAction), 1829 uri_edit([ path(PAction), 1830 search(Params) 1831 ], Server, URL).
Valid options are:
timeout
.1852pengine_event(Event) :- 1853 pengine_event(Event, []). 1854 1855pengine_event(Event, Options) :- 1856 thread_self(Self), 1857 option(listen(Id), Options, _), 1858 ( thread_get_message(Self, pengine_event(Id, Event), Options) 1859 -> true 1860 ; Event = timeout 1861 ), 1862 update_remote_destroy(Event). 1863 1864update_remote_destroy(Event) :- 1865 destroy_event(Event), 1866 arg(1, Event, Id), 1867 pengine_remote(Id, _Server), 1868 !, 1869 pengine_unregister_remote(Id). 1870update_remote_destroy(_). 1871 1872destroy_event(destroy(_)). 1873destroy_event(destroy(_,_)). 1874destroy_event(create(_,Features)) :- 1875 memberchk(answer(Answer), Features), 1876 !, 1877 nonvar(Answer), 1878 destroy_event(Answer).
ignore(call(Closure, E))
. A
closure thus acts as a handler for the event. Some events are also
treated specially:
Valid options are:
all
,
all_but_sender
or a Prolog list of NameOrIDs. [not yet
implemented]*/
1907pengine_event_loop(Closure, Options) :- 1908 child(_,_), 1909 !, 1910 pengine_event(Event), 1911 ( option(autoforward(all), Options) % TODO: Implement all_but_sender and list of IDs 1912 -> forall(child(_,ID), pengine_send(ID, Event)) 1913 ; true 1914 ), 1915 pengine_event_loop(Event, Closure, Options). 1916pengine_event_loop(_, _). 1917 1918:- meta_predicate 1919 pengine_process_event( , , , ). 1920 1921pengine_event_loop(Event, Closure, Options) :- 1922 pengine_process_event(Event, Closure, Continue, Options), 1923 ( Continue == true 1924 -> pengine_event_loop(Closure, Options) 1925 ; true 1926 ). 1927 1928pengine_process_event(create(ID, T), Closure, Continue, Options) :- 1929 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]), 1930 ( select(answer(First), T, T1) 1931 -> ignore(call(Closure, create(ID, T1))), 1932 pengine_process_event(First, Closure, Continue, Options) 1933 ; ignore(call(Closure, create(ID, T))), 1934 Continue = true 1935 ). 1936pengine_process_event(output(ID, Msg), Closure, true, _Options) :- 1937 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]), 1938 ignore(call(Closure, output(ID, Msg))), 1939 pengine_pull_response(ID, []). 1940pengine_process_event(debug(ID, Msg), Closure, true, _Options) :- 1941 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]), 1942 ignore(call(Closure, debug(ID, Msg))), 1943 pengine_pull_response(ID, []). 1944pengine_process_event(prompt(ID, Term), Closure, true, _Options) :- 1945 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]), 1946 ignore(call(Closure, prompt(ID, Term))). 1947pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :- 1948 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]), 1949 ignore(call(Closure, success(ID, Sol, More))). 1950pengine_process_event(failure(ID, _Time), Closure, true, _Options) :- 1951 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]), 1952 ignore(call(Closure, failure(ID))). 1953pengine_process_event(error(ID, Error), Closure, Continue, _Options) :- 1954 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]), 1955 ( call(Closure, error(ID, Error)) 1956 -> Continue = true 1957 ; forall(child(_,Child), pengine_destroy(Child)), 1958 throw(Error) 1959 ). 1960pengine_process_event(stop(ID), Closure, true, _Options) :- 1961 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]), 1962 ignore(call(Closure, stop(ID))). 1963pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :- 1964 pengine_process_event(Event, Closure, _, Options), 1965 pengine_process_event(destroy(ID), Closure, Continue, Options). 1966pengine_process_event(destroy(ID), Closure, true, _Options) :- 1967 retractall(child(_,ID)), 1968 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]), 1969 ignore(call(Closure, destroy(ID))).
copy_term_nat(Query, Copy), % attributes are not copied to the server call(Copy), % executed on server at URL Query = Copy.
Valid options are:
pengines:time_limit
.Remaining options (except the server option) are passed to pengine_create/1. */
1998pengine_rpc(URL, Query) :- 1999 pengine_rpc(URL, Query, []). 2000 2001pengine_rpc(URL, Query, M:Options0) :- 2002 translate_local_sources(Options0, Options1, M), 2003 ( option(timeout(_), Options1) 2004 -> Options = Options1 2005 ; setting(time_limit, Limit), 2006 Options = [timeout(Limit)|Options1] 2007 ), 2008 term_variables(Query, Vars), 2009 Template =.. [v|Vars], 2010 State = destroy(true), % modified by process_event/4 2011 setup_call_catcher_cleanup( 2012 pengine_create([ ask(Query), 2013 template(Template), 2014 server(URL), 2015 id(Id) 2016 | Options 2017 ]), 2018 wait_event(Template, State, [listen(Id)|Options]), 2019 Why, 2020 pengine_destroy_and_wait(State, Id, Why, Options)). 2021 2022pengine_destroy_and_wait(destroy(true), Id, Why, Options) :- 2023 !, 2024 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]), 2025 pengine_destroy(Id, Options), 2026 wait_destroy(Id, 10). 2027pengine_destroy_and_wait(_, _, Why, _) :- 2028 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]). 2029 2030wait_destroy(Id, _) :- 2031 \+ child(_, Id), 2032 !. 2033wait_destroy(Id, N) :- 2034 pengine_event(Event, [listen(Id),timeout(10)]), 2035 !, 2036 ( destroy_event(Event) 2037 -> retractall(child(_,Id)) 2038 ; succ(N1, N) 2039 -> wait_destroy(Id, N1) 2040 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]), 2041 pengine_unregister_remote(Id), 2042 retractall(child(_,Id)) 2043 ). 2044 2045wait_event(Template, State, Options) :- 2046 pengine_event(Event, Options), 2047 debug(pengine(event), 'Received ~p', [Event]), 2048 process_event(Event, Template, State, Options). 2049 2050process_event(create(_ID, Features), Template, State, Options) :- 2051 memberchk(answer(First), Features), 2052 process_event(First, Template, State, Options). 2053process_event(error(_ID, Error), _Template, _, _Options) :- 2054 throw(Error). 2055process_event(failure(_ID, _Time), _Template, _, _Options) :- 2056 fail. 2057process_event(prompt(ID, Prompt), Template, State, Options) :- 2058 pengine_rpc_prompt(ID, Prompt, Reply), 2059 pengine_send(ID, input(Reply)), 2060 wait_event(Template, State, Options). 2061process_event(output(ID, Term), Template, State, Options) :- 2062 pengine_rpc_output(ID, Term), 2063 pengine_pull_response(ID, Options), 2064 wait_event(Template, State, Options). 2065process_event(debug(ID, Message), Template, State, Options) :- 2066 debug(pengine(debug), '~w', [Message]), 2067 pengine_pull_response(ID, Options), 2068 wait_event(Template, State, Options). 2069process_event(success(_ID, Solutions, _Proj, _Time, false), 2070 Template, _, _Options) :- 2071 !, 2072 member(Template, Solutions). 2073process_event(success(ID, Solutions, _Proj, _Time, true), 2074 Template, State, Options) :- 2075 ( member(Template, Solutions) 2076 ; pengine_next(ID, Options), 2077 wait_event(Template, State, Options) 2078 ). 2079process_event(destroy(ID, Event), Template, State, Options) :- 2080 !, 2081 retractall(child(_,ID)), 2082 nb_setarg(1, State, false), 2083 debug(pengine(destroy), 'State: ~p~n', [State]), 2084 process_event(Event, Template, State, Options). 2085% compatibility with older versions of the protocol. 2086process_event(success(ID, Solutions, Time, More), 2087 Template, State, Options) :- 2088 process_event(success(ID, Solutions, _Proj, Time, More), 2089 Template, State, Options). 2090 2091 2092pengine_rpc_prompt(ID, Prompt, Term) :- 2093 prompt(ID, Prompt, Term0), 2094 !, 2095 Term = Term0. 2096pengine_rpc_prompt(_ID, Prompt, Term) :- 2097 setup_call_cleanup( 2098 prompt(Old, Prompt), 2099 read(Term), 2100 prompt(_, Old)). 2101 2102pengine_rpc_output(ID, Term) :- 2103 output(ID, Term), 2104 !. 2105pengine_rpc_output(_ID, Term) :- 2106 print(Term).
2113:- multifile prompt/3.
2120:- multifile output/2. 2121 2122 2123/*================= HTTP handlers ======================= 2124*/ 2125 2126% Declare HTTP locations we serve and how. Note that we use 2127% time_limit(inifinite) because pengines have their own timeout. Also 2128% note that we use spawn. This is needed because we can easily get 2129% many clients waiting for some action on a pengine to complete. 2130% Without spawning, we would quickly exhaust the worker pool of the 2131% HTTP server. 2132% 2133% FIXME: probably we should wait for a short time for the pengine on 2134% the default worker thread. Only if that time has expired, we can 2135% call http_spawn/2 to continue waiting on a new thread. That would 2136% improve the performance and reduce the usage of threads. 2137 2138:- multifile http:location/3. 2139httplocation(pengine, root(pengine), [-100]). 2140 2141:- http_handler(pengine(.), http_404([]), 2142 [ id(pengines) ]). 2143:- http_handler(pengine(create), http_pengine_create, 2144 [ time_limit(infinite), spawn([]) ]). 2145:- http_handler(pengine(send), http_pengine_send, 2146 [ time_limit(infinite), spawn([]) ]). 2147:- http_handler(pengine(pull_response), http_pengine_pull_response, 2148 [ time_limit(infinite), spawn([]) ]). 2149:- http_handler(pengine(abort), http_pengine_abort, []). 2150:- http_handler(pengine(detach), http_pengine_detach, []). 2151:- http_handler(pengine(list), http_pengine_list, []). 2152:- http_handler(pengine(ping), http_pengine_ping, []). 2153:- http_handler(pengine(destroy_all), http_pengine_destroy_all, []). 2154 2155:- http_handler(pengine('pengines.js'), 2156 http_reply_file(library('http/web/js/pengines.js'), []), []). 2157:- http_handler(pengine('plterm.css'), 2158 http_reply_file(library('http/web/css/plterm.css'), []), []).
application/json
and as
www-form-encoded
. Accepted parameters:
Parameter | Default | Comment |
---|---|---|
format | prolog | Output format |
application | pengine_sandbox | Pengine application |
chunk | 1 | Chunk-size for results |
collate | 0 (off) | Join output events |
solutions | chunked | If all , emit all results |
ask | - | The query |
template | - | Output template |
src_text | "" | Program |
src_url | - | Program to download |
disposition | - | Download location |
Note that solutions=all internally uses chunking to obtain the results from the pengine, but the results are combined in a single HTTP reply. This is currently only implemented by the CSV backend that is part of SWISH for downloading unbounded result sets with limited memory resources.
Using chunk=false
simulates the recursive toplevel. See
pengine_ask/3.
2189http_pengine_create(Request) :- 2190 reply_options(Request, [post]), 2191 !. 2192http_pengine_create(Request) :- 2193 memberchk(content_type(CT), Request), 2194 sub_atom(CT, 0, _, _, 'application/json'), 2195 !, 2196 http_read_json_dict(Request, Dict), 2197 dict_atom_option(format, Dict, Format, prolog), 2198 dict_atom_option(application, Dict, Application, pengine_sandbox), 2199 http_pengine_create(Request, Application, Format, Dict). 2200http_pengine_create(Request) :- 2201 Optional = [optional(true)], 2202 OptString = [string|Optional], 2203 Form = [ format(Format, [default(prolog)]), 2204 application(Application, [default(pengine_sandbox)]), 2205 chunk(_, [nonneg;oneof([false]), default(1)]), 2206 collate(_, [number, default(0)]), 2207 solutions(_, [oneof([all,chunked]), default(chunked)]), 2208 ask(_, OptString), 2209 template(_, OptString), 2210 src_text(_, OptString), 2211 disposition(_, OptString), 2212 src_url(_, Optional) 2213 ], 2214 http_parameters(Request, Form), 2215 form_dict(Form, Dict), 2216 http_pengine_create(Request, Application, Format, Dict). 2217 2218dict_atom_option(Key, Dict, Atom, Default) :- 2219 ( get_dict(Key, Dict, String) 2220 -> atom_string(Atom, String) 2221 ; Atom = Default 2222 ). 2223 2224form_dict(Form, Dict) :- 2225 form_values(Form, Pairs), 2226 dict_pairs(Dict, _, Pairs). 2227 2228form_values([], []). 2229form_values([H|T], Pairs) :- 2230 arg(1, H, Value), 2231 nonvar(Value), 2232 !, 2233 functor(H, Name, _), 2234 Pairs = [Name-Value|PairsT], 2235 form_values(T, PairsT). 2236form_values([_|T], Pairs) :- 2237 form_values(T, Pairs).
2242http_pengine_create(Request, Application, Format, Dict) :- 2243 current_application(Application), 2244 !, 2245 allowed(Request, Application), 2246 authenticate(Request, Application, UserOptions), 2247 dict_to_options(Dict, Application, CreateOptions0), 2248 append(UserOptions, CreateOptions0, CreateOptions), 2249 pengine_uuid(Pengine), 2250 message_queue_create(Queue, [max_size(25)]), 2251 setting(Application:time_limit, TimeLimit), 2252 get_time(Now), 2253 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)), 2254 broadcast(pengine(create(Pengine, Application, CreateOptions))), 2255 create(Queue, Pengine, CreateOptions, http, Application), 2256 create_wait_and_output_result(Pengine, Queue, Format, 2257 TimeLimit, Dict), 2258 gc_abandoned_queues. 2259http_pengine_create(_Request, Application, Format, _Dict) :- 2260 Error = existence_error(pengine_application, Application), 2261 pengine_uuid(ID), 2262 output_result(ID, Format, error(ID, error(Error, _))). 2263 2264 2265dict_to_options(Dict, Application, CreateOptions) :- 2266 dict_pairs(Dict, _, Pairs), 2267 pairs_create_options(Pairs, Application, CreateOptions). 2268 2269pairs_create_options([], _, []) :- !. 2270pairs_create_options([N-V0|T0], App, [Opt|T]) :- 2271 Opt =.. [N,V], 2272 pengine_create_option(Opt), N \== user, 2273 !, 2274 ( create_option_type(Opt, atom) 2275 -> atom_string(V, V0) % term creation must be done if 2276 ; V = V0 % we created the source and know 2277 ), % the operators. 2278 pairs_create_options(T0, App, T). 2279pairs_create_options([_|T0], App, T) :- 2280 pairs_create_options(T0, App, T).
time_limit
,
Pengine is aborted and the result is error(time_limit_exceeded,
_)
.
2291wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2292 Collate is min(Collate0, TimeLimit/10),
2293 get_time(Epoch),
2294 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2295 [ timeout(TimeLimit)
2296 ]),
2297 Error, true)
2298 -> ( var(Error)
2299 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2300 ( collating_event(Collate, Event)
2301 -> Deadline is Epoch+TimeLimit,
2302 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2303 Events = [Event|More],
2304 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2305 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2306 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2307 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2308 )
2309 ; output_result(Pengine, Format, died(Pengine))
2310 )
2311 ; time_limit_exceeded(Pengine, Format)
2312 ).
2319collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :- 2320 !. 2321collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :- 2322 debug(pengine(wait), 'Waiting to collate events', []), 2323 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2324 [ timeout(Collate) 2325 ]), 2326 Error, true) 2327 -> ( var(Error) 2328 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]), 2329 Events = [Event|More], 2330 ( collating_event(Collate, Event) 2331 -> Max2 is Max - 1, 2332 collect_events(Pengine, Collate, Queue, Deadline, Max2, More) 2333 ; More = [] 2334 ) 2335 ; Events = [died(Pengine)] 2336 ) 2337 ; get_time(Now), 2338 Now > Deadline 2339 -> time_limit_event(Pengine, TimeLimitEvent), 2340 Events = [TimeLimitEvent] 2341 ; Events = [] 2342 ). 2343 2344collating_event(0, _) :- 2345 !, 2346 fail. 2347collating_event(_, output(_,_)).
disposition
key to denote the
download location.2356create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2357 get_dict(solutions, Dict, all), 2358 !, 2359 between(1, infinite, Page), 2360 ( catch(thread_get_message(Queue, pengine_event(_, Event), 2361 [ timeout(TimeLimit) 2362 ]), 2363 Error, true) 2364 -> ( var(Error) 2365 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]), 2366 ( destroy_queue_from_http(Pengine, Event, Queue) 2367 -> !, 2368 protect_pengine(Pengine, 2369 output_result_2(Format, page(Page, Event), Dict)) 2370 ; is_more_event(Event) 2371 -> pengine_thread(Pengine, Thread), 2372 thread_send_message(Thread, pengine_request(next)), 2373 protect_pengine(Pengine, 2374 output_result_2(Format, page(Page, Event), Dict)), 2375 fail 2376 ; !, 2377 protect_pengine(Pengine, 2378 output_result_2(Format, page(Page, Event), Dict)) 2379 ) 2380 ; !, output_result(Pengine, Format, died(Pengine)) 2381 ) 2382 ; !, time_limit_exceeded(Pengine, Format) 2383 ), 2384 !. 2385create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :- 2386 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)). 2387 2388is_more_event(success(_Id, _Answers, _Projection, _Time, true)). 2389is_more_event(create(_, Options)) :- 2390 memberchk(answer(Event), Options), 2391 is_more_event(Event).
2405time_limit_exceeded(Pengine, Format) :- 2406 time_limit_event(Pengine, Event), 2407 call_cleanup( 2408 pengine_destroy(Pengine, [force(true)]), 2409 output_result(Pengine, Format, Event)). 2410 2411time_limit_event(Pengine, 2412 destroy(Pengine, error(Pengine, time_limit_exceeded))). 2413 2414destroy_pengine_after_output(Pengine, Events) :- 2415 is_list(Events), 2416 last(Events, Last), 2417 time_limit_event(Pengine, Last), 2418 !, 2419 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true). 2420destroy_pengine_after_output(_, _).
2435destroy_queue_from_http(ID, _, Queue) :- 2436 output_queue(ID, Queue, _), 2437 !, 2438 destroy_queue_if_empty(Queue). 2439destroy_queue_from_http(ID, Event, Queue) :- 2440 debug(pengine(destroy), 'DESTROY? ~p', [Event]), 2441 is_destroy_event(Event), 2442 !, 2443 message_queue_property(Queue, size(Waiting)), 2444 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]), 2445 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)). 2446 2447is_destroy_event(destroy(_)). 2448is_destroy_event(destroy(_,_)). 2449is_destroy_event(create(_, Options)) :- 2450 memberchk(answer(Event), Options), 2451 is_destroy_event(Event). 2452 2453destroy_queue_if_empty(Queue) :- 2454 thread_peek_message(Queue, _), 2455 !. 2456destroy_queue_if_empty(Queue) :- 2457 retractall(output_queue(_, Queue, _)), 2458 message_queue_destroy(Queue).
2466:- dynamic 2467 last_gc/1. 2468 2469gc_abandoned_queues :- 2470 consider_queue_gc, 2471 !, 2472 get_time(Now), 2473 ( output_queue(_, Queue, Time), 2474 Now-Time > 15*60, 2475 retract(output_queue(_, Queue, Time)), 2476 message_queue_destroy(Queue), 2477 fail 2478 ; retractall(last_gc(_)), 2479 asserta(last_gc(Now)) 2480 ). 2481gc_abandoned_queues. 2482 2483consider_queue_gc :- 2484 predicate_property(output_queue(_,_,_), number_of_clauses(N)), 2485 N > 100, 2486 ( last_gc(Time), 2487 get_time(Now), 2488 Now-Time > 5*60 2489 -> true 2490 ; \+ last_gc(_) 2491 ).
2509:- dynamic output_queue_destroyed/1. 2510 2511sync_destroy_queue_from_http(ID, Queue) :- 2512 ( output_queue(ID, Queue, _) 2513 -> destroy_queue_if_empty(Queue) 2514 ; thread_peek_message(Queue, pengine_event(_, output(_,_))) 2515 -> debug(pengine(destroy), 'Delay destruction of ~p because of output', 2516 [Queue]), 2517 get_time(Now), 2518 asserta(output_queue(ID, Queue, Now)) 2519 ; message_queue_destroy(Queue), 2520 asserta(output_queue_destroyed(Queue)) 2521 ).
pengine
held.2528sync_destroy_queue_from_pengine(ID, Queue) :- 2529 ( retract(output_queue_destroyed(Queue)) 2530 -> true 2531 ; get_time(Now), 2532 asserta(output_queue(ID, Queue, Now)) 2533 ), 2534 retractall(pengine_queue(ID, Queue, _, _)). 2535 2536 2537http_pengine_send(Request) :- 2538 reply_options(Request, [get,post]), 2539 !. 2540http_pengine_send(Request) :- 2541 http_parameters(Request, 2542 [ id(ID, [ type(atom) ]), 2543 event(EventString, [optional(true)]), 2544 collate(Collate, [number, default(0)]), 2545 format(Format, [default(prolog)]) 2546 ]), 2547 catch(read_event(ID, Request, Format, EventString, Event), 2548 Error, 2549 true), 2550 ( var(Error) 2551 -> debug(pengine(event), 'HTTP send: ~p', [Event]), 2552 ( pengine_thread(ID, Thread) 2553 -> pengine_queue(ID, Queue, TimeLimit, _), 2554 random_delay, 2555 broadcast(pengine(send(ID, Event))), 2556 thread_send_message(Thread, pengine_request(Event)), 2557 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2558 ; atom(ID) 2559 -> pengine_died(Format, ID) 2560 ; http_404([], Request) 2561 ) 2562 ; Error = error(existence_error(pengine, ID), _) 2563 -> pengine_died(Format, ID) 2564 ; output_result(ID, Format, error(ID, Error)) 2565 ). 2566 2567pengine_died(Format, Pengine) :- 2568 output_result(Pengine, Format, 2569 error(Pengine, error(existence_error(pengine, Pengine),_))).
pengine_done
mutex.
2580read_event(Pengine, Request, Format, EventString, Event) :- 2581 protect_pengine( 2582 Pengine, 2583 ( get_pengine_module(Pengine, Module), 2584 read_event_2(Request, EventString, Module, Event0, Bindings) 2585 )), 2586 !, 2587 fix_bindings(Format, Event0, Bindings, Event). 2588read_event(Pengine, Request, _Format, _EventString, _Event) :- 2589 debug(pengine(event), 'Pengine ~q vanished', [Pengine]), 2590 discard_post_data(Request), 2591 existence_error(pengine, Pengine).
event
parameter or as a posted document.2599read_event_2(_Request, EventString, Module, Event, Bindings) :- 2600 nonvar(EventString), 2601 !, 2602 term_string(Event, EventString, 2603 [ variable_names(Bindings), 2604 module(Module) 2605 ]). 2606read_event_2(Request, _EventString, Module, Event, Bindings) :- 2607 option(method(post), Request), 2608 http_read_data(Request, Event, 2609 [ content_type('application/x-prolog'), 2610 module(Module), 2611 variable_names(Bindings) 2612 ]).
2618discard_post_data(Request) :- 2619 option(method(post), Request), 2620 !, 2621 setup_call_cleanup( 2622 open_null_stream(NULL), 2623 http_read_data(Request, _, [to(stream(NULL))]), 2624 close(NULL)). 2625discard_post_data(_).
json(-s)
Format from the variables in
the asked Goal. Variables starting with an underscore, followed
by an capital letter are ignored from the template.2633fix_bindings(Format, 2634 ask(Goal, Options0), Bindings, 2635 ask(Goal, NewOptions)) :- 2636 json_lang(Format), 2637 !, 2638 exclude(anon, Bindings, NamedBindings), 2639 template(NamedBindings, Template, Options0, Options1), 2640 select_option(chunk(Paging), Options1, Options2, 1), 2641 NewOptions = [ template(Template), 2642 chunk(Paging), 2643 bindings(NamedBindings) 2644 | Options2 2645 ]. 2646fix_bindings(_, Command, _, Command). 2647 2648template(_, Template, Options0, Options) :- 2649 select_option(template(Template), Options0, Options), 2650 !. 2651template(Bindings, Template, Options, Options) :- 2652 dict_create(Template, swish_default_template, Bindings). 2653 2654anon(Name=_) :- 2655 sub_atom(Name, 0, _, _, '_'), 2656 sub_atom(Name, 1, 1, _, Next), 2657 char_type(Next, prolog_var_start). 2658 2659var_name(Name=_, Name).
2666json_lang(json) :- !. 2667json_lang(Format) :- 2668 sub_atom(Format, 0, _, _, 'json-').
2675http_pengine_pull_response(Request) :- 2676 reply_options(Request, [get]), 2677 !. 2678http_pengine_pull_response(Request) :- 2679 http_parameters(Request, 2680 [ id(ID, []), 2681 format(Format, [default(prolog)]), 2682 collate(Collate, [number, default(0)]) 2683 ]), 2684 reattach(ID), 2685 ( ( pengine_queue(ID, Queue, TimeLimit, _) 2686 -> true 2687 ; output_queue(ID, Queue, _), 2688 TimeLimit = 0 2689 ) 2690 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate) 2691 ; http_404([], Request) 2692 ).
2701http_pengine_abort(Request) :- 2702 reply_options(Request, [get,post]), 2703 !. 2704http_pengine_abort(Request) :- 2705 http_parameters(Request, 2706 [ id(ID, []) 2707 ]), 2708 ( pengine_thread(ID, _Thread) 2709 -> broadcast(pengine(abort(ID))), 2710 abort_pending_output(ID), 2711 pengine_abort(ID), 2712 reply_json_dict(true) 2713 ; http_404([], Request) 2714 ).
2726http_pengine_detach(Request) :- 2727 reply_options(Request, [post]), 2728 !. 2729http_pengine_detach(Request) :- 2730 http_parameters(Request, 2731 [ id(ID, []) 2732 ]), 2733 http_read_json_dict(Request, ClientData), 2734 ( pengine_property(ID, application(Application)), 2735 allowed(Request, Application), 2736 authenticate(Request, Application, _UserOptions) 2737 -> broadcast(pengine(detach(ID))), 2738 get_time(Now), 2739 assertz(pengine_detached(ID, ClientData.put(time, Now))), 2740 pengine_queue(ID, Queue, _TimeLimit, _Now), 2741 message_queue_set(Queue, max_size(1000)), 2742 pengine_reply(Queue, detached(ID)), 2743 reply_json_dict(true) 2744 ; http_404([], Request) 2745 ). 2746 2747reattach(ID) :- 2748 ( retract(pengine_detached(ID, _Data)), 2749 pengine_queue(ID, Queue, _TimeLimit, _Now) 2750 -> message_queue_set(Queue, max_size(25)) 2751 ; true 2752 ).
2760http_pengine_destroy_all(Request) :- 2761 reply_options(Request, [get,post]), 2762 !. 2763http_pengine_destroy_all(Request) :- 2764 http_parameters(Request, 2765 [ ids(IDsAtom, []) 2766 ]), 2767 atomic_list_concat(IDs, ',', IDsAtom), 2768 forall(( member(ID, IDs), 2769 \+ pengine_detached(ID, _) 2770 ), 2771 pengine_destroy(ID, [force(true)])), 2772 reply_json_dict("ok").
status(Pengine, Stats)
is created, where Stats
is the return of thread_statistics/2.2780http_pengine_ping(Request) :- 2781 reply_options(Request, [get]), 2782 !. 2783http_pengine_ping(Request) :- 2784 http_parameters(Request, 2785 [ id(Pengine, []), 2786 format(Format, [default(prolog)]) 2787 ]), 2788 ( pengine_thread(Pengine, Thread), 2789 Error = error(_,_), 2790 catch(thread_statistics(Thread, Stats), Error, fail) 2791 -> output_result(Pengine, Format, ping(Pengine, Stats)) 2792 ; output_result(Pengine, Format, died(Pengine)) 2793 ).
2802http_pengine_list(Request) :- 2803 reply_options(Request, [get]), 2804 !. 2805http_pengine_list(Request) :- 2806 http_parameters(Request, 2807 [ status(Status, [default(detached), oneof([detached])]), 2808 application(Application, [default(pengine_sandbox)]) 2809 ]), 2810 allowed(Request, Application), 2811 authenticate(Request, Application, _UserOptions), 2812 findall(Term, listed_pengine(Application, Status, Term), Terms), 2813 reply_json_dict(json{pengines: Terms}). 2814 2815listed_pengine(Application, detached, State) :- 2816 State = pengine{id:Id, 2817 detached:Time, 2818 queued:Queued, 2819 stats:Stats}, 2820 2821 pengine_property(Id, application(Application)), 2822 pengine_property(Id, detached(Time)), 2823 pengine_queue(Id, Queue, _TimeLimit, _Now), 2824 message_queue_property(Queue, size(Queued)), 2825 ( pengine_thread(Id, Thread), 2826 catch(thread_statistics(Thread, Stats), _, fail) 2827 -> true 2828 ; Stats = thread{status:died} 2829 ).
prolog
, json
or json-s
.
2840:- dynamic 2841 pengine_replying/2. % +Pengine, +Thread 2842 2843output_result(Pengine, Format, Event) :- 2844 thread_self(Thread), 2845 cors_enable, % contingent on http:cors setting 2846 disable_client_cache, 2847 setup_call_cleanup( 2848 asserta(pengine_replying(Pengine, Thread), Ref), 2849 catch(output_result_2(Format, Event, _{}), 2850 pengine_abort_output, 2851 true), 2852 erase(Ref)), 2853 destroy_pengine_after_output(Pengine, Event). 2854 2855output_result_2(Lang, Event, Dict) :- 2856 write_result(Lang, Event, Dict), 2857 !. 2858output_result_2(prolog, Event, _) :- 2859 !, 2860 format('Content-type: text/x-prolog; charset=UTF-8~n~n'), 2861 write_term(Event, 2862 [ quoted(true), 2863 ignore_ops(true), 2864 fullstop(true), 2865 blobs(portray), 2866 portray_goal(portray_blob), 2867 nl(true) 2868 ]). 2869output_result_2(Lang, Event, _) :- 2870 json_lang(Lang), 2871 !, 2872 ( event_term_to_json_data(Event, JSON, Lang) 2873 -> reply_json_dict(JSON) 2874 ; assertion(event_term_to_json_data(Event, _, Lang)) 2875 ). 2876output_result_2(Lang, _Event, _) :- % FIXME: allow for non-JSON format 2877 domain_error(pengine_format, Lang).
'$BLOB'(Type)
.
Future versions may include more info, depending on Type.2887:- public portray_blob/2. % called from write-term 2888portray_blob(Blob, _Options) :- 2889 blob(Blob, Type), 2890 writeq('$BLOB'(Type)).
2897abort_pending_output(Pengine) :- 2898 forall(pengine_replying(Pengine, Thread), 2899 abort_output_thread(Thread)). 2900 2901abort_output_thread(Thread) :- 2902 catch(thread_signal(Thread, throw(pengine_abort_output)), 2903 error(existence_error(thread, _), _), 2904 true).
prolog
and various JSON dialects. The hook
event_to_json/3 can be used to refine the JSON dialects. This
hook must be used if a completely different output format is
desired.2920disable_client_cache :- 2921 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c 2922 Pragma: no-cache\r\n\c 2923 Expires: 0\r\n'). 2924 2925event_term_to_json_data(Events, JSON, Lang) :- 2926 is_list(Events), 2927 !, 2928 events_to_json_data(Events, JSON, Lang). 2929event_term_to_json_data(Event, JSON, Lang) :- 2930 event_to_json(Event, JSON, Lang), 2931 !. 2932event_term_to_json_data(success(ID, Bindings0, Projection, Time, More), 2933 json{event:success, id:ID, time:Time, 2934 data:Bindings, more:More, projection:Projection}, 2935 json) :- 2936 !, 2937 term_to_json(Bindings0, Bindings). 2938event_term_to_json_data(destroy(ID, Event), 2939 json{event:destroy, id:ID, data:JSON}, 2940 Style) :- 2941 !, 2942 event_term_to_json_data(Event, JSON, Style). 2943event_term_to_json_data(create(ID, Features0), JSON, Style) :- 2944 !, 2945 ( select(answer(First0), Features0, Features1) 2946 -> event_term_to_json_data(First0, First, Style), 2947 Features = [answer(First)|Features1] 2948 ; Features = Features0 2949 ), 2950 dict_create(JSON, json, [event(create), id(ID)|Features]). 2951event_term_to_json_data(destroy(ID, Event), 2952 json{event:destroy, id:ID, data:JSON}, Style) :- 2953 !, 2954 event_term_to_json_data(Event, JSON, Style). 2955event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :- 2956 !, 2957 Error0 = json{event:error, id:ID, data:Message}, 2958 add_error_details(ErrorTerm, Error0, Error), 2959 message_to_string(ErrorTerm, Message). 2960event_term_to_json_data(failure(ID, Time), 2961 json{event:failure, id:ID, time:Time}, _) :- 2962 !. 2963event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :- 2964 functor(EventTerm, F, 1), 2965 !, 2966 arg(1, EventTerm, ID). 2967event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :- 2968 functor(EventTerm, F, 2), 2969 arg(1, EventTerm, ID), 2970 arg(2, EventTerm, Data), 2971 term_to_json(Data, JSON). 2972 2973events_to_json_data([], [], _). 2974events_to_json_data([E|T0], [J|T], Lang) :- 2975 event_term_to_json_data(E, J, Lang), 2976 events_to_json_data(T0, T, Lang). 2977 2978:- public add_error_details/3.
pengines_io.pl
.
2985add_error_details(Error, JSON0, JSON) :-
2986 add_error_code(Error, JSON0, JSON1),
2987 add_error_location(Error, JSON1, JSON).
code
field to JSON0 of Error is an ISO error term. The error
code is the functor name of the formal part of the error, e.g.,
syntax_error
, type_error
, etc. Some errors carry more
information:
3000add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :- 3001 atom(Type), 3002 !, 3003 to_atomic(Obj, Value), 3004 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}). 3005add_error_code(error(Formal, _), Error0, Error) :- 3006 callable(Formal), 3007 !, 3008 functor(Formal, Code, _), 3009 Error = Error0.put(code, Code). 3010add_error_code(_, Error, Error). 3011 3012% What to do with large integers? 3013to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj. 3014to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj. 3015to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj. 3016to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
location
property if the error can be associated with a
source location. The location is an object with properties file
and line
and, if available, the character location in the line.3025add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :- 3026 atom(Path), integer(Line), 3027 !, 3028 Term = Term0.put(_{location:_{file:Path, line:Line}}). 3029add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :- 3030 atom(Path), integer(Line), integer(Ch), 3031 !, 3032 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}). 3033add_error_location(_, Term, Term).
success(ID, Bindings, Projection, Time, More)
and output(ID,
Term)
into a format suitable for processing at the client side.3044%:- multifile pengines:event_to_json/3. 3045 3046 3047 /******************************* 3048 * ACCESS CONTROL * 3049 *******************************/
forbidden
header if contact is not allowed.3056allowed(Request, Application) :- 3057 setting(Application:allow_from, Allow), 3058 match_peer(Request, Allow), 3059 setting(Application:deny_from, Deny), 3060 \+ match_peer(Request, Deny), 3061 !. 3062allowed(Request, _Application) :- 3063 memberchk(request_uri(Here), Request), 3064 throw(http_reply(forbidden(Here))). 3065 3066match_peer(_, Allowed) :- 3067 memberchk(*, Allowed), 3068 !. 3069match_peer(_, []) :- !, fail. 3070match_peer(Request, Allowed) :- 3071 http_peer(Request, Peer), 3072 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]), 3073 ( memberchk(Peer, Allowed) 3074 -> true 3075 ; member(Pattern, Allowed), 3076 match_peer_pattern(Pattern, Peer) 3077 ). 3078 3079match_peer_pattern(Pattern, Peer) :- 3080 ip_term(Pattern, IP), 3081 ip_term(Peer, IP), 3082 !. 3083 3084ip_term(Peer, Pattern) :- 3085 split_string(Peer, ".", "", PartStrings), 3086 ip_pattern(PartStrings, Pattern). 3087 3088ip_pattern([], []). 3089ip_pattern([*], _) :- !. 3090ip_pattern([S|T0], [N|T]) :- 3091 number_string(N, S), 3092 ip_pattern(T0, T).
[user(User)]
, []
or
an exception.3100authenticate(Request, Application, UserOptions) :- 3101 authentication_hook(Request, Application, User), 3102 !, 3103 must_be(ground, User), 3104 UserOptions = [user(User)]. 3105authenticate(_, _, []).
throw(http_reply(authorise(basic(Realm))))
Start a normal HTTP login challenge (reply 401)throw(http_reply(forbidden(Path)))
)
Reject the request using a 403 repply.3127pengine_register_user(Options) :- 3128 option(user(User), Options), 3129 !, 3130 pengine_self(Me), 3131 asserta(pengine_user(Me, User)). 3132pengine_register_user(_).
3143pengine_user(User) :-
3144 pengine_self(Me),
3145 pengine_user(Me, User).
3151reply_options(Request, Allowed) :- 3152 option(method(options), Request), 3153 !, 3154 cors_enable(Request, 3155 [ methods(Allowed) 3156 ]), 3157 format('Content-type: text/plain\r\n'), 3158 format('~n'). % empty body 3159 3160 3161 /******************************* 3162 * COMPILE SOURCE * 3163 *******************************/
3172pengine_src_text(Src, Module) :- 3173 pengine_self(Self), 3174 format(atom(ID), 'pengine://~w/src', [Self]), 3175 extra_load_options(Self, Options), 3176 setup_call_cleanup( 3177 open_chars_stream(Src, Stream), 3178 load_files(Module:ID, 3179 [ stream(Stream), 3180 module(Module), 3181 silent(true) 3182 | Options 3183 ]), 3184 close(Stream)), 3185 keep_source(Self, ID, Src). 3186 3187system'#file'(File, _Line) :- 3188 prolog_load_context(stream, Stream), 3189 set_stream(Stream, file_name(File)), 3190 set_stream(Stream, record_position(false)), 3191 set_stream(Stream, record_position(true)).
3201pengine_src_url(URL, Module) :- 3202 pengine_self(Self), 3203 uri_encoded(path, URL, Path), 3204 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]), 3205 extra_load_options(Self, Options), 3206 ( get_pengine_application(Self, Application), 3207 setting(Application:debug_info, false) 3208 -> setup_call_cleanup( 3209 http_open(URL, Stream, []), 3210 ( set_stream(Stream, encoding(utf8)), 3211 load_files(Module:ID, 3212 [ stream(Stream), 3213 module(Module) 3214 | Options 3215 ]) 3216 ), 3217 close(Stream)) 3218 ; setup_call_cleanup( 3219 http_open(URL, TempStream, []), 3220 ( set_stream(TempStream, encoding(utf8)), 3221 read_string(TempStream, _, Src) 3222 ), 3223 close(TempStream)), 3224 setup_call_cleanup( 3225 open_chars_stream(Src, Stream), 3226 load_files(Module:ID, 3227 [ stream(Stream), 3228 module(Module) 3229 | Options 3230 ]), 3231 close(Stream)), 3232 keep_source(Self, ID, Src) 3233 ). 3234 3235 3236extra_load_options(Pengine, Options) :- 3237 pengine_not_sandboxed(Pengine), 3238 !, 3239 Options = []. 3240extra_load_options(_, [sandboxed(true)]). 3241 3242 3243keep_source(Pengine, ID, SrcText) :- 3244 get_pengine_application(Pengine, Application), 3245 setting(Application:debug_info, true), 3246 !, 3247 to_string(SrcText, SrcString), 3248 assertz(pengine_data(Pengine, source(ID, SrcString))). 3249keep_source(_, _, _). 3250 3251to_string(String, String) :- 3252 string(String), 3253 !. 3254to_string(Atom, String) :- 3255 atom_string(Atom, String), 3256 !. 3257 3258 /******************************* 3259 * SANDBOX * 3260 *******************************/ 3261 3262:- multifile 3263 sandbox:safe_primitive/1. 3264 3265sandbox:safe_primitive(pengines:pengine_input(_, _)). 3266sandbox:safe_primitive(pengines:pengine_output(_)). 3267sandbox:safe_primitive(pengines:pengine_debug(_,_)). 3268 3269 3270 /******************************* 3271 * MESSAGES * 3272 *******************************/ 3273 3274prologerror_message(sandbox(time_limit_exceeded, Limit)) --> 3275 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl, 3276 'This is normally caused by an insufficiently instantiated'-[], nl, 3277 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl, 3278 'find all possible instantations of Var.'-[] 3279 ]
Pengines: Web Logic Programming Made Easy
The library(pengines) provides an infrastructure for creating Prolog engines in a (remote) pengine server and accessing these engines either from Prolog or JavaScript.