1:- encoding(utf8).
37
38:- module(pengines,
39 [ pengine_create/1, 40 pengine_ask/3, 41 pengine_next/2, 42 pengine_stop/2, 43 pengine_event/2, 44 pengine_input/2, 45 pengine_output/1, 46 pengine_respond/3, 47 pengine_debug/2, 48 pengine_self/1, 49 pengine_pull_response/2, 50 pengine_destroy/1, 51 pengine_destroy/2, 52 pengine_abort/1, 53 pengine_application/1, 54 current_pengine_application/1, 55 pengine_property/2, 56 pengine_user/1, 57 pengine_event_loop/2, 58 pengine_rpc/2, 59 pengine_rpc/3 60 ]). 61
70
71:- autoload(library(aggregate),[aggregate_all/3]). 72:- autoload(library(apply),[maplist/2,partition/4,exclude/3,maplist/3]). 73:- autoload(library(broadcast),[broadcast/1]). 74:- autoload(library(charsio),[open_chars_stream/2]). 75:- use_module(library(debug),[debug/1,debugging/1,debug/3,assertion/1]). 76:- autoload(library(error),
77 [ must_be/2,
78 existence_error/2,
79 permission_error/3,
80 domain_error/2
81 ]). 82:- autoload(library(filesex),[directory_file_path/3]). 83:- autoload(library(listing),[listing/1]). 84:- autoload(library(lists),[member/2,flatten/2,select/3,append/3]). 85:- autoload(library(modules),[in_temporary_module/3]). 86:- autoload(library(occurs),[sub_term/2]). 87:- autoload(library(option),
88 [select_option/3,option/2,option/3,select_option/4]). 89:- autoload(library(prolog_stack),[print_prolog_backtrace/2]). 90:- autoload(library(statistics),[thread_statistics/2]). 91:- autoload(library(term_to_json),[term_to_json/2]). 92:- autoload(library(thread_pool),
93 [thread_pool_create/3,thread_create_in_pool/4]). 94:- autoload(library(time),[alarm/4,call_with_time_limit/2]). 95:- autoload(library(uri),
96 [ uri_components/2,
97 uri_query_components/2,
98 uri_data/3,
99 uri_data/4,
100 uri_edit/3,
101 uri_encoded/3
102 ]). 103:- autoload(library(http/http_client),[http_read_data/3]). 104:- autoload(library(http/http_cors),[cors_enable/0,cors_enable/2]). 105:- use_module(library(http/http_dispatch),
106 [http_handler/3,http_404/2,http_reply_file/3]). 107:- autoload(library(http/http_open),[http_open/3]). 108:- autoload(library(http/http_parameters),[http_parameters/2]). 109:- autoload(library(http/http_stream),[is_cgi_stream/1]). 110:- autoload(library(http/http_wrapper),[http_peer/2]). 111
112:- use_module(library(settings),[setting/2,setting/4]). 113:- use_module(library(http/http_json),
114 [http_read_json_dict/2,reply_json_dict/1]). 115:- use_module(library(sandbox),[safe_goal/1]). 116
117:- if(exists_source(library(uuid))). 118:- autoload(library(uuid), [uuid/2]). 119:- endif. 120
121
122:- meta_predicate
123 pengine_create(:),
124 pengine_rpc(+, +, :),
125 pengine_event_loop(1, +). 126
127:- multifile
128 write_result/3, 129 event_to_json/3, 130 prepare_module/3, 131 prepare_goal/3, 132 authentication_hook/3, 133 not_sandboxed/2, 134 pengine_flush_output_hook/0. 135
136:- predicate_options(pengine_create/1, 1,
137 [ id(-atom),
138 alias(atom),
139 application(atom),
140 destroy(boolean),
141 server(atom),
142 ask(compound),
143 template(compound),
144 chunk(integer;oneof([false])),
145 bindings(list),
146 src_list(list),
147 src_text(any), 148 src_url(atom),
149 src_predicates(list)
150 ]). 151:- predicate_options(pengine_ask/3, 3,
152 [ template(any),
153 chunk(integer;oneof([false])),
154 bindings(list)
155 ]). 156:- predicate_options(pengine_next/2, 2,
157 [ chunk(integer),
158 pass_to(pengine_send/3, 3)
159 ]). 160:- predicate_options(pengine_stop/2, 2,
161 [ pass_to(pengine_send/3, 3)
162 ]). 163:- predicate_options(pengine_respond/3, 2,
164 [ pass_to(pengine_send/3, 3)
165 ]). 166:- predicate_options(pengine_rpc/3, 3,
167 [ chunk(integer;oneof([false])),
168 pass_to(pengine_create/1, 1)
169 ]). 170:- predicate_options(pengine_send/3, 3,
171 [ delay(number)
172 ]). 173:- predicate_options(pengine_event/2, 2,
174 [ listen(atom),
175 pass_to(system:thread_get_message/3, 3)
176 ]). 177:- predicate_options(pengine_pull_response/2, 2,
178 [ pass_to(http_open/3, 3)
179 ]). 180:- predicate_options(pengine_event_loop/2, 2,
181 []). 182
184:- debug(pengine(debug)). 185
186goal_expansion(random_delay, Expanded) :-
187 ( debugging(pengine(delay))
188 -> Expanded = do_random_delay
189 ; Expanded = true
190 ).
191
192do_random_delay :-
193 Delay is random(20)/1000,
194 sleep(Delay).
195
196:- meta_predicate 197 solve(+, ?, 0, +),
198 findnsols_no_empty(+, ?, 0, -),
199 pengine_event_loop(+, 1, +). 200
252
253
254pengine_create(M:Options0) :-
255 translate_local_sources(Options0, Options, M),
256 ( select_option(server(BaseURL), Options, RestOptions)
257 -> remote_pengine_create(BaseURL, RestOptions)
258 ; local_pengine_create(Options)
259 ).
260
272
273translate_local_sources(OptionsIn, Options, Module) :-
274 translate_local_sources(OptionsIn, Sources, Options2, Module),
275 ( Sources == []
276 -> Options = Options2
277 ; Sources = [Source]
278 -> Options = [src_text(Source)|Options2]
279 ; atomics_to_string(Sources, Source)
280 -> Options = [src_text(Source)|Options2]
281 ).
282
283translate_local_sources([], [], [], _).
284translate_local_sources([H0|T], [S0|S], Options, M) :-
285 nonvar(H0),
286 translate_local_source(H0, S0, M),
287 !,
288 translate_local_sources(T, S, Options, M).
289translate_local_sources([H|T0], S, [H|T], M) :-
290 translate_local_sources(T0, S, T, M).
291
292translate_local_source(src_predicates(PIs), Source, M) :-
293 must_be(list, PIs),
294 with_output_to(string(Source),
295 maplist(list_in_module(M), PIs)).
296translate_local_source(src_list(Terms), Source, _) :-
297 must_be(list, Terms),
298 with_output_to(string(Source),
299 forall(member(Term, Terms),
300 format('~k .~n', [Term]))).
301translate_local_source(src_text(Source), Source, _).
302
303list_in_module(M, PI) :-
304 listing(M:PI).
305
310
311pengine_send(Target, Event) :-
312 pengine_send(Target, Event, []).
313
314
326
327pengine_send(Target, Event, Options) :-
328 must_be(atom, Target),
329 pengine_send2(Target, Event, Options).
330
331pengine_send2(self, Event, Options) :-
332 !,
333 thread_self(Queue),
334 delay_message(queue(Queue), Event, Options).
335pengine_send2(Name, Event, Options) :-
336 child(Name, Target),
337 !,
338 delay_message(pengine(Target), Event, Options).
339pengine_send2(Target, Event, Options) :-
340 delay_message(pengine(Target), Event, Options).
341
342delay_message(Target, Event, Options) :-
343 option(delay(Delay), Options),
344 !,
345 alarm(Delay,
346 send_message(Target, Event, Options),
347 _AlarmID,
348 [remove(true)]).
349delay_message(Target, Event, Options) :-
350 random_delay,
351 send_message(Target, Event, Options).
352
353send_message(queue(Queue), Event, _) :-
354 thread_send_message(Queue, pengine_request(Event)).
355send_message(pengine(Pengine), Event, Options) :-
356 ( pengine_remote(Pengine, Server)
357 -> remote_pengine_send(Server, Pengine, Event, Options)
358 ; pengine_thread(Pengine, Thread)
359 -> thread_send_message(Thread, pengine_request(Event))
360 ; existence_error(pengine, Pengine)
361 ).
362
370
371pengine_request(Request) :-
372 thread_self(Me),
373 thread_get_message(Me, pengine_request(Request), [timeout(1)]),
374 !.
375pengine_request(Request) :-
376 pengine_self(Self),
377 get_pengine_application(Self, Application),
378 setting(Application:idle_limit, IdleLimit0),
379 IdleLimit is IdleLimit0-1,
380 thread_self(Me),
381 ( thread_idle(thread_get_message(Me, pengine_request(Request),
382 [timeout(IdleLimit)]),
383 long)
384 -> true
385 ; Request = destroy
386 ).
387
388
398
399pengine_reply(Event) :-
400 pengine_parent(Queue),
401 pengine_reply(Queue, Event).
402
403pengine_reply(_Queue, _Event0) :-
404 nb_current(pengine_idle_limit_exceeded, true),
405 !.
406pengine_reply(Queue, Event0) :-
407 arg(1, Event0, ID),
408 wrap_first_answer(ID, Event0, Event),
409 random_delay,
410 debug(pengine(event), 'Reply to ~p: ~p', [Queue, Event]),
411 ( pengine_self(ID),
412 \+ pengine_detached(ID, _)
413 -> get_pengine_application(ID, Application),
414 setting(Application:idle_limit, IdleLimit),
415 debug(pengine(reply), 'Sending ~p, timeout: ~q', [Event, IdleLimit]),
416 ( thread_send_message(Queue, pengine_event(ID, Event),
417 [ timeout(IdleLimit)
418 ])
419 -> true
420 ; thread_self(Me),
421 debug(pengine(reply), 'pengine_reply: timeout for ~q (thread ~q)',
422 [ID, Me]),
423 nb_setval(pengine_idle_limit_exceeded, true),
424 thread_detach(Me),
425 abort
426 )
427 ; thread_send_message(Queue, pengine_event(ID, Event))
428 ).
429
430wrap_first_answer(ID, Event0, CreateEvent) :-
431 wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)]),
432 arg(1, CreateEvent, ID),
433 !,
434 retract(wrap_first_answer_in_create_event(CreateEvent, [answer(Event0)])).
435wrap_first_answer(_ID, Event, Event).
436
437
438empty_queue :-
439 pengine_parent(Queue),
440 empty_queue(Queue, 0, Discarded),
441 debug(pengine(abort), 'Abort: discarded ~D messages', [Discarded]).
442
443empty_queue(Queue, C0, C) :-
444 thread_get_message(Queue, _Term, [timeout(0)]),
445 !,
446 C1 is C0+1,
447 empty_queue(Queue, C1, C).
448empty_queue(_, C, C).
449
450
516
517pengine_ask(ID, Query, Options) :-
518 partition(pengine_ask_option, Options, AskOptions, SendOptions),
519 pengine_send(ID, ask(Query, AskOptions), SendOptions).
520
521
522pengine_ask_option(template(_)).
523pengine_ask_option(chunk(_)).
524pengine_ask_option(bindings(_)).
525pengine_ask_option(breakpoints(_)).
526
527
569
570pengine_next(ID, Options) :-
571 select_option(chunk(Count), Options, Options1),
572 !,
573 pengine_send(ID, next(Count), Options1).
574pengine_next(ID, Options) :-
575 pengine_send(ID, next, Options).
576
577
590
591pengine_stop(ID, Options) :- pengine_send(ID, stop, Options).
592
593
601
602pengine_abort(Name) :-
603 ( child(Name, Pengine)
604 -> true
605 ; Pengine = Name
606 ),
607 ( pengine_remote(Pengine, Server)
608 -> remote_pengine_abort(Server, Pengine, [])
609 ; pengine_thread(Pengine, Thread),
610 debug(pengine(abort), 'Signalling thread ~p', [Thread]),
611 catch(thread_signal(Thread, throw(abort_query)), _, true)
612 ).
613
614
621
622pengine_destroy(ID) :-
623 pengine_destroy(ID, []).
624
625pengine_destroy(Name, Options) :-
626 ( child(Name, ID)
627 -> true
628 ; ID = Name
629 ),
630 option(force(true), Options),
631 !,
632 ( pengine_thread(ID, Thread)
633 -> catch(thread_signal(Thread, abort),
634 error(existence_error(thread, _), _), true)
635 ; true
636 ).
637pengine_destroy(ID, Options) :-
638 catch(pengine_send(ID, destroy, Options),
639 error(existence_error(pengine, ID), _),
640 retractall(child(_,ID))).
641
642
645
654
655:- dynamic
656 current_pengine/6, 657 pengine_queue/4, 658 output_queue/3, 659 pengine_user/2, 660 pengine_data/2, 661 pengine_detached/2. 662:- volatile
663 current_pengine/6,
664 pengine_queue/4,
665 output_queue/3,
666 pengine_user/2,
667 pengine_data/2,
668 pengine_detached/2. 669
670:- thread_local
671 child/2. 672
676
677pengine_register_local(Id, Thread, Queue, URL, Application, Destroy) :-
678 asserta(current_pengine(Id, Queue, Thread, URL, Application, Destroy)).
679
680pengine_register_remote(Id, URL, Application, Destroy) :-
681 thread_self(Queue),
682 asserta(current_pengine(Id, Queue, 0, URL, Application, Destroy)).
683
689
690pengine_unregister(Id) :-
691 thread_self(Me),
692 ( current_pengine(Id, Queue, Me, http, _, _)
693 -> with_mutex(pengine, sync_destroy_queue_from_pengine(Id, Queue))
694 ; true
695 ),
696 retractall(current_pengine(Id, _, Me, _, _, _)),
697 retractall(pengine_user(Id, _)),
698 retractall(pengine_data(Id, _)).
699
700pengine_unregister_remote(Id) :-
701 retractall(current_pengine(Id, _Parent, 0, _, _, _)).
702
706
707pengine_self(Id) :-
708 thread_self(Thread),
709 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy).
710
711pengine_parent(Parent) :-
712 nb_getval(pengine_parent, Parent).
713
714pengine_thread(Pengine, Thread) :-
715 current_pengine(Pengine, _Parent, Thread, _URL, _Application, _Destroy),
716 Thread \== 0,
717 !.
718
719pengine_remote(Pengine, URL) :-
720 current_pengine(Pengine, _Parent, 0, URL, _Application, _Destroy).
721
722get_pengine_application(Pengine, Application) :-
723 current_pengine(Pengine, _Parent, _, _URL, Application, _Destroy),
724 !.
725
726get_pengine_module(Pengine, Pengine).
727
728:- if(current_predicate(uuid/2)). 729pengine_uuid(Id) :-
730 uuid(Id, [version(4)]). 731:- else. 732pengine_uuid(Id) :-
733 ( current_prolog_flag(max_integer, Max1)
734 -> Max is Max1-1
735 ; Max is 1<<128
736 ),
737 random_between(0, Max, Num),
738 atom_number(Id, Num).
739:- endif. 740
755
756:- meta_predicate protect_pengine(+, 0). 757
758protect_pengine(Id, Goal) :-
759 term_hash(Id, Hash),
760 LockN is Hash mod 64,
761 atom_concat(pengine_done_, LockN, Lock),
762 with_mutex(Lock,
763 ( pengine_thread(Id, _)
764 -> Goal
765 ; Goal
766 )).
767
768
782
783pengine_application(Application) :-
784 throw(error(context_error(nodirective,
785 pengine_application(Application)), _)).
786
787:- multifile
788 system:term_expansion/2,
789 current_application/1. 790
796
797current_pengine_application(Application) :-
798 current_application(Application).
799
800
802
803:- setting(thread_pool_size, integer, 100,
804 'Maximum number of pengines this application can run.'). 805:- setting(thread_pool_stacks, list(compound), [],
806 'Maximum stack sizes for pengines this application can run.'). 807:- setting(slave_limit, integer, 3,
808 'Maximum number of slave pengines a master pengine can create.'). 809:- setting(time_limit, number, 300,
810 'Maximum time to wait for output'). 811:- setting(idle_limit, number, 300,
812 'Pengine auto-destroys when idle for this time'). 813:- setting(safe_goal_limit, number, 10,
814 'Maximum time to try proving safety of the goal'). 815:- setting(program_space, integer, 100_000_000,
816 'Maximum memory used by predicates'). 817:- setting(allow_from, list(atom), [*],
818 'IP addresses from which remotes are allowed to connect'). 819:- setting(deny_from, list(atom), [],
820 'IP addresses from which remotes are NOT allowed to connect'). 821:- setting(debug_info, boolean, false,
822 'Keep information to support source-level debugging'). 823
824
825system:term_expansion((:- pengine_application(Application)), Expanded) :-
826 must_be(atom, Application),
827 ( module_property(Application, file(_))
828 -> permission_error(create, pengine_application, Application)
829 ; true
830 ),
831 expand_term((:- setting(Application:thread_pool_size, integer,
832 setting(pengines:thread_pool_size),
833 'Maximum number of pengines this \c
834 application can run.')),
835 ThreadPoolSizeSetting),
836 expand_term((:- setting(Application:thread_pool_stacks, list(compound),
837 setting(pengines:thread_pool_stacks),
838 'Maximum stack sizes for pengines \c
839 this application can run.')),
840 ThreadPoolStacksSetting),
841 expand_term((:- setting(Application:slave_limit, integer,
842 setting(pengines:slave_limit),
843 'Maximum number of local slave pengines \c
844 a master pengine can create.')),
845 SlaveLimitSetting),
846 expand_term((:- setting(Application:time_limit, number,
847 setting(pengines:time_limit),
848 'Maximum time to wait for output')),
849 TimeLimitSetting),
850 expand_term((:- setting(Application:idle_limit, number,
851 setting(pengines:idle_limit),
852 'Pengine auto-destroys when idle for this time')),
853 IdleLimitSetting),
854 expand_term((:- setting(Application:safe_goal_limit, number,
855 setting(pengines:safe_goal_limit),
856 'Maximum time to try proving safety of the goal')),
857 SafeGoalLimitSetting),
858 expand_term((:- setting(Application:program_space, integer,
859 setting(pengines:program_space),
860 'Maximum memory used by predicates')),
861 ProgramSpaceSetting),
862 expand_term((:- setting(Application:allow_from, list(atom),
863 setting(pengines:allow_from),
864 'IP addresses from which remotes are allowed \c
865 to connect')),
866 AllowFromSetting),
867 expand_term((:- setting(Application:deny_from, list(atom),
868 setting(pengines:deny_from),
869 'IP addresses from which remotes are NOT \c
870 allowed to connect')),
871 DenyFromSetting),
872 expand_term((:- setting(Application:debug_info, boolean,
873 setting(pengines:debug_info),
874 'Keep information to support source-level \c
875 debugging')),
876 DebugInfoSetting),
877 flatten([ pengines:current_application(Application),
878 ThreadPoolSizeSetting,
879 ThreadPoolStacksSetting,
880 SlaveLimitSetting,
881 TimeLimitSetting,
882 IdleLimitSetting,
883 SafeGoalLimitSetting,
884 ProgramSpaceSetting,
885 AllowFromSetting,
886 DenyFromSetting,
887 DebugInfoSetting
888 ], Expanded).
889
891
892:- pengine_application(pengine_sandbox). 893
894
927
928
929pengine_property(Id, Prop) :-
930 nonvar(Id), nonvar(Prop),
931 pengine_property2(Prop, Id),
932 !.
933pengine_property(Id, Prop) :-
934 pengine_property2(Prop, Id).
935
936pengine_property2(self(Id), Id) :-
937 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
938pengine_property2(module(Id), Id) :-
939 current_pengine(Id, _Parent, _Thread, _URL, _Application, _Destroy).
940pengine_property2(alias(Alias), Id) :-
941 child(Alias, Id),
942 Alias \== Id.
943pengine_property2(thread(Thread), Id) :-
944 current_pengine(Id, _Parent, Thread, _URL, _Application, _Destroy),
945 Thread \== 0.
946pengine_property2(remote(Server), Id) :-
947 current_pengine(Id, _Parent, 0, Server, _Application, _Destroy).
948pengine_property2(application(Application), Id) :-
949 current_pengine(Id, _Parent, _Thread, _Server, Application, _Destroy).
950pengine_property2(destroy(Destroy), Id) :-
951 current_pengine(Id, _Parent, _Thread, _Server, _Application, Destroy).
952pengine_property2(parent(Parent), Id) :-
953 current_pengine(Id, Parent, _Thread, _URL, _Application, _Destroy).
954pengine_property2(source(SourceID, Source), Id) :-
955 pengine_data(Id, source(SourceID, Source)).
956pengine_property2(detached(When), Id) :-
957 pengine_detached(Id, When).
958
963
964pengine_output(Term) :-
965 pengine_self(Me),
966 pengine_reply(output(Me, Term)).
967
968
980
981pengine_debug(Format, Args) :-
982 pengine_parent(Queue),
983 pengine_self(Self),
984 catch(safe_goal(format(atom(_), Format, Args)), E, true),
985 ( var(E)
986 -> format(atom(Message), Format, Args)
987 ; message_to_string(E, Message)
988 ),
989 pengine_reply(Queue, debug(Self, Message)).
990
991
994
1003
1004local_pengine_create(Options) :-
1005 thread_self(Self),
1006 option(application(Application), Options, pengine_sandbox),
1007 create(Self, Child, Options, local, Application),
1008 option(alias(Name), Options, Child),
1009 assert(child(Name, Child)).
1010
1011
1015
1016:- multifile thread_pool:create_pool/1. 1017
1018thread_pool:create_pool(Application) :-
1019 current_application(Application),
1020 setting(Application:thread_pool_size, Size),
1021 setting(Application:thread_pool_stacks, Stacks),
1022 thread_pool_create(Application, Size, Stacks).
1023
1031
1032create(Queue, Child, Options, local, Application) :-
1033 !,
1034 pengine_child_id(Child),
1035 create0(Queue, Child, Options, local, Application).
1036create(Queue, Child, Options, URL, Application) :-
1037 pengine_child_id(Child),
1038 catch(create0(Queue, Child, Options, URL, Application),
1039 Error,
1040 create_error(Queue, Child, Error)).
1041
1042pengine_child_id(Child) :-
1043 ( nonvar(Child)
1044 -> true
1045 ; pengine_uuid(Child)
1046 ).
1047
1048create_error(Queue, Child, Error) :-
1049 pengine_reply(Queue, error(Child, Error)).
1050
1051create0(Queue, Child, Options, URL, Application) :-
1052 ( current_application(Application)
1053 -> true
1054 ; existence_error(pengine_application, Application)
1055 ),
1056 ( URL \== http 1057 1058 -> aggregate_all(count, child(_,_), Count),
1059 setting(Application:slave_limit, Max),
1060 ( Count >= Max
1061 -> throw(error(resource_error(max_pengines), _))
1062 ; true
1063 )
1064 ; true
1065 ),
1066 partition(pengine_create_option, Options, PengineOptions, RestOptions),
1067 thread_create_in_pool(
1068 Application,
1069 pengine_main(Queue, PengineOptions, Application), ChildThread,
1070 [ at_exit(pengine_done)
1071 | RestOptions
1072 ]),
1073 option(destroy(Destroy), PengineOptions, true),
1074 pengine_register_local(Child, ChildThread, Queue, URL, Application, Destroy),
1075 thread_send_message(ChildThread, pengine_registered(Child)),
1076 ( option(id(Id), Options)
1077 -> Id = Child
1078 ; true
1079 ).
1080
1081pengine_create_option(src_text(_)).
1082pengine_create_option(src_url(_)).
1083pengine_create_option(application(_)).
1084pengine_create_option(destroy(_)).
1085pengine_create_option(ask(_)).
1086pengine_create_option(template(_)).
1087pengine_create_option(bindings(_)).
1088pengine_create_option(chunk(_)).
1089pengine_create_option(alias(_)).
1090pengine_create_option(user(_)).
1091
1092
1098
1099:- public
1100 pengine_done/0. 1101
1102pengine_done :-
1103 thread_self(Me),
1104 ( thread_property(Me, status(exception(Ex))),
1105 abort_exception(Ex),
1106 thread_detach(Me),
1107 pengine_self(Pengine)
1108 -> catch(pengine_reply(destroy(Pengine, abort(Pengine))),
1109 error(_,_), true)
1110 ; true
1111 ),
1112 forall(child(_Name, Child),
1113 pengine_destroy(Child)),
1114 pengine_self(Id),
1115 protect_pengine(Id, pengine_unregister(Id)).
1116
1117abort_exception('$aborted').
1118abort_exception(unwind(abort)).
1119
1124
1125:- thread_local wrap_first_answer_in_create_event/2. 1126
1127:- meta_predicate
1128 pengine_prepare_source(:, +). 1129
1130pengine_main(Parent, Options, Application) :-
1131 fix_streams,
1132 thread_get_message(pengine_registered(Self)),
1133 nb_setval(pengine_parent, Parent),
1134 pengine_register_user(Options),
1135 set_prolog_flag(mitigate_spectre, true),
1136 catch(in_temporary_module(
1137 Self,
1138 pengine_prepare_source(Application, Options),
1139 pengine_create_and_loop(Self, Application, Options)),
1140 prepare_source_failed,
1141 pengine_terminate(Self)).
1142
1143pengine_create_and_loop(Self, Application, Options) :-
1144 setting(Application:slave_limit, SlaveLimit),
1145 CreateEvent = create(Self, [slave_limit(SlaveLimit)|Extra]),
1146 ( option(ask(Query0), Options)
1147 -> asserta(wrap_first_answer_in_create_event(CreateEvent, Extra)),
1148 ( string(Query0) 1149 -> ( option(template(TemplateS), Options)
1150 -> Ask2 = Query0-TemplateS
1151 ; Ask2 = Query0
1152 ),
1153 catch(ask_to_term(Ask2, Self, Query, Template, Bindings),
1154 Error, true),
1155 ( var(Error)
1156 -> true
1157 ; send_error(Error),
1158 throw(prepare_source_failed)
1159 )
1160 ; Query = Query0,
1161 option(template(Template), Options, Query),
1162 option(bindings(Bindings), Options, [])
1163 ),
1164 option(chunk(Chunk), Options, 1),
1165 pengine_ask(Self, Query,
1166 [ template(Template),
1167 chunk(Chunk),
1168 bindings(Bindings)
1169 ])
1170 ; Extra = [],
1171 pengine_reply(CreateEvent)
1172 ),
1173 pengine_main_loop(Self).
1174
1175
1182
1183ask_to_term(Ask-Template, Module, Ask1, Template1, Bindings) :-
1184 !,
1185 format(string(AskTemplate), 't((~s),(~s))', [Template, Ask]),
1186 term_string(t(Template1,Ask1), AskTemplate,
1187 [ variable_names(Bindings0),
1188 module(Module)
1189 ]),
1190 phrase(template_bindings(Template1, Bindings0), Bindings).
1191ask_to_term(Ask, Module, Ask1, Template, Bindings1) :-
1192 term_string(Ask1, Ask,
1193 [ variable_names(Bindings),
1194 module(Module)
1195 ]),
1196 exclude(anon, Bindings, Bindings1),
1197 dict_create(Template, swish_default_template, Bindings1).
1198
1199template_bindings(Var, Bindings) -->
1200 { var(Var) }, !,
1201 ( { var_binding(Bindings, Var, Binding)
1202 }
1203 -> [Binding]
1204 ; []
1205 ).
1206template_bindings([H|T], Bindings) -->
1207 !,
1208 template_bindings(H, Bindings),
1209 template_bindings(T, Bindings).
1210template_bindings(Compoound, Bindings) -->
1211 { compound(Compoound), !,
1212 compound_name_arguments(Compoound, _, Args)
1213 },
1214 template_bindings(Args, Bindings).
1215template_bindings(_, _) --> [].
1216
1217var_binding(Bindings, Var, Binding) :-
1218 member(Binding, Bindings),
1219 arg(2, Binding, V),
1220 V == Var, !.
1221
1226
1227fix_streams :-
1228 fix_stream(current_output).
1229
1230fix_stream(Name) :-
1231 is_cgi_stream(Name),
1232 !,
1233 debug(pengine(stream), '~w is a CGI stream!', [Name]),
1234 set_stream(user_output, alias(Name)).
1235fix_stream(_).
1236
1243
1244pengine_prepare_source(Module:Application, Options) :-
1245 setting(Application:program_space, SpaceLimit),
1246 set_module(Module:program_space(SpaceLimit)),
1247 delete_import_module(Module, user),
1248 add_import_module(Module, Application, start),
1249 catch(prep_module(Module, Application, Options), Error, true),
1250 ( var(Error)
1251 -> true
1252 ; send_error(Error),
1253 throw(prepare_source_failed)
1254 ).
1255
1256prep_module(Module, Application, Options) :-
1257 maplist(copy_flag(Module, Application), [var_prefix]),
1258 forall(prepare_module(Module, Application, Options), true),
1259 setup_call_cleanup(
1260 '$set_source_module'(OldModule, Module),
1261 maplist(process_create_option(Module), Options),
1262 '$set_source_module'(OldModule)).
1263
1264copy_flag(Module, Application, Flag) :-
1265 current_prolog_flag(Application:Flag, Value),
1266 !,
1267 set_prolog_flag(Module:Flag, Value).
1268copy_flag(_, _, _).
1269
1270process_create_option(Application, src_text(Text)) :-
1271 !,
1272 pengine_src_text(Text, Application).
1273process_create_option(Application, src_url(URL)) :-
1274 !,
1275 pengine_src_url(URL, Application).
1276process_create_option(_, _).
1277
1278
1297
1298
1299pengine_main_loop(ID) :-
1300 catch(guarded_main_loop(ID), abort_query, pengine_aborted(ID)).
1301
1302pengine_aborted(ID) :-
1303 thread_self(Self),
1304 debug(pengine(abort), 'Aborting ~p (thread ~p)', [ID, Self]),
1305 empty_queue,
1306 destroy_or_continue(abort(ID)).
1307
1308
1318
1319guarded_main_loop(ID) :-
1320 pengine_request(Request),
1321 ( Request = destroy
1322 -> debug(pengine(transition), '~q: 2 = ~q => 1', [ID, destroy]),
1323 pengine_terminate(ID)
1324 ; Request = ask(Goal, Options)
1325 -> debug(pengine(transition), '~q: 2 = ~q => 3', [ID, ask(Goal)]),
1326 ask(ID, Goal, Options)
1327 ; debug(pengine(transition), '~q: 2 = ~q => 2', [ID, protocol_error]),
1328 pengine_reply(error(ID, error(protocol_error, _))),
1329 guarded_main_loop(ID)
1330 ).
1331
1332
1333pengine_terminate(ID) :-
1334 pengine_reply(destroy(ID)),
1335 thread_self(Me), 1336 thread_detach(Me).
1337
1338
1346
1347solve(Chunk, Template, Goal, ID) :-
1348 prolog_current_choice(Choice),
1349 ( integer(Chunk)
1350 -> State = count(Chunk)
1351 ; Chunk == false
1352 -> State = no_chunk
1353 ; domain_error(chunk, Chunk)
1354 ),
1355 statistics(cputime, Epoch),
1356 Time = time(Epoch),
1357 nb_current('$variable_names', Bindings),
1358 filter_template(Template, Bindings, Template2),
1359 '$current_typein_module'(CurrTypeIn),
1360 ( '$set_typein_module'(ID),
1361 call_cleanup(catch(findnsols_no_empty(State, Template2,
1362 set_projection(Goal, Bindings),
1363 Result),
1364 Error, true),
1365 query_done(Det, CurrTypeIn)),
1366 arg(1, Time, T0),
1367 statistics(cputime, T1),
1368 CPUTime is T1-T0,
1369 forall(pengine_flush_output_hook, true),
1370 ( var(Error)
1371 -> projection(Projection),
1372 ( var(Det)
1373 -> pengine_reply(success(ID, Result, Projection,
1374 CPUTime, true)),
1375 more_solutions(ID, Choice, State, Time)
1376 ; !, 1377 destroy_or_continue(success(ID, Result, Projection,
1378 CPUTime, false))
1379 )
1380 ; !, 1381 ( Error == abort_query
1382 -> throw(Error)
1383 ; destroy_or_continue(error(ID, Error))
1384 )
1385 )
1386 ; !, 1387 arg(1, Time, T0),
1388 statistics(cputime, T1),
1389 CPUTime is T1-T0,
1390 destroy_or_continue(failure(ID, CPUTime))
1391 ).
1392solve(_, _, _, _). 1393
1394query_done(true, CurrTypeIn) :-
1395 '$set_typein_module'(CurrTypeIn).
1396
1397
1403
1404set_projection(Goal, Bindings) :-
1405 b_setval('$variable_names', Bindings),
1406 call(Goal).
1407
1408projection(Projection) :-
1409 nb_current('$variable_names', Bindings),
1410 !,
1411 maplist(var_name, Bindings, Projection).
1412projection([]).
1413
1421
1422filter_template(Template0, Bindings, Template) :-
1423 is_dict(Template0, swish_default_template),
1424 !,
1425 dict_create(Template, swish_default_template, Bindings).
1426filter_template(Template, _Bindings, Template).
1427
1428findnsols_no_empty(no_chunk, Template, Goal, List) =>
1429 List = [Template],
1430 call(Goal).
1431findnsols_no_empty(State, Template, Goal, List) =>
1432 findnsols(State, Template, Goal, List),
1433 List \== [].
1434
1435destroy_or_continue(Event) :-
1436 arg(1, Event, ID),
1437 ( pengine_property(ID, destroy(true))
1438 -> thread_self(Me),
1439 thread_detach(Me),
1440 pengine_reply(destroy(ID, Event))
1441 ; pengine_reply(Event),
1442 guarded_main_loop(ID)
1443 ).
1444
1460
1461more_solutions(ID, Choice, State, Time) :-
1462 pengine_request(Event),
1463 more_solutions(Event, ID, Choice, State, Time).
1464
1465more_solutions(stop, ID, _Choice, _State, _Time) :-
1466 !,
1467 debug(pengine(transition), '~q: 6 = ~q => 7', [ID, stop]),
1468 destroy_or_continue(stop(ID)).
1469more_solutions(next, ID, _Choice, _State, Time) :-
1470 !,
1471 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next]),
1472 statistics(cputime, T0),
1473 nb_setarg(1, Time, T0),
1474 fail.
1475more_solutions(next(Count), ID, _Choice, State, Time) :-
1476 Count > 0,
1477 State = count(_), 1478 !,
1479 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, next(Count)]),
1480 nb_setarg(1, State, Count),
1481 statistics(cputime, T0),
1482 nb_setarg(1, Time, T0),
1483 fail.
1484more_solutions(ask(Goal, Options), ID, Choice, _State, _Time) :-
1485 !,
1486 debug(pengine(transition), '~q: 6 = ~q => 3', [ID, ask(Goal)]),
1487 prolog_cut_to(Choice),
1488 ask(ID, Goal, Options).
1489more_solutions(destroy, ID, _Choice, _State, _Time) :-
1490 !,
1491 debug(pengine(transition), '~q: 6 = ~q => 1', [ID, destroy]),
1492 pengine_terminate(ID).
1493more_solutions(Event, ID, Choice, State, Time) :-
1494 debug(pengine(transition), '~q: 6 = ~q => 6', [ID, protocol_error(Event)]),
1495 pengine_reply(error(ID, error(protocol_error, _))),
1496 more_solutions(ID, Choice, State, Time).
1497
1503
1504ask(ID, Goal, Options) :-
1505 catch(prepare_goal(ID, Goal, Goal1, Options), Error, true),
1506 !,
1507 ( var(Error)
1508 -> option(template(Template), Options, Goal),
1509 option(chunk(N), Options, 1),
1510 solve(N, Template, Goal1, ID)
1511 ; pengine_reply(error(ID, Error)),
1512 guarded_main_loop(ID)
1513 ).
1514
1526
1527prepare_goal(ID, Goal0, Module:Goal, Options) :-
1528 option(bindings(Bindings), Options, []),
1529 b_setval('$variable_names', Bindings),
1530 ( prepare_goal(Goal0, Goal1, Options)
1531 -> true
1532 ; Goal1 = Goal0
1533 ),
1534 get_pengine_module(ID, Module),
1535 setup_call_cleanup(
1536 '$set_source_module'(Old, Module),
1537 expand_goal(Goal1, Goal),
1538 '$set_source_module'(_, Old)),
1539 ( pengine_not_sandboxed(ID)
1540 -> true
1541 ; get_pengine_application(ID, App),
1542 setting(App:safe_goal_limit, Limit),
1543 catch(call_with_time_limit(
1544 Limit,
1545 safe_goal(Module:Goal)), E, true)
1546 -> ( var(E)
1547 -> true
1548 ; E = time_limit_exceeded
1549 -> throw(error(sandbox(time_limit_exceeded, Limit),_))
1550 ; throw(E)
1551 )
1552 ).
1553
1554
1564
1565
1571
1572pengine_not_sandboxed(ID) :-
1573 pengine_user(ID, User),
1574 pengine_property(ID, application(App)),
1575 not_sandboxed(User, App),
1576 !.
1577
1590
1591
1597
1598pengine_pull_response(Pengine, Options) :-
1599 pengine_remote(Pengine, Server),
1600 !,
1601 remote_pengine_pull_response(Server, Pengine, Options).
1602pengine_pull_response(_ID, _Options).
1603
1604
1610
1611pengine_input(Prompt, Term) :-
1612 pengine_self(Self),
1613 pengine_parent(Parent),
1614 pengine_reply(Parent, prompt(Self, Prompt)),
1615 pengine_request(Request),
1616 ( Request = input(Input)
1617 -> Term = Input
1618 ; Request == destroy
1619 -> abort
1620 ; throw(error(protocol_error,_))
1621 ).
1622
1623
1637
1638pengine_respond(Pengine, Input, Options) :-
1639 pengine_send(Pengine, input(Input), Options).
1640
1641
1647
1648send_error(error(Formal, context(prolog_stack(Frames), Message))) :-
1649 is_list(Frames),
1650 !,
1651 with_output_to(string(Stack),
1652 print_prolog_backtrace(current_output, Frames)),
1653 pengine_self(Self),
1654 replace_blobs(Formal, Formal1),
1655 replace_blobs(Message, Message1),
1656 pengine_reply(error(Self, error(Formal1,
1657 context(prolog_stack(Stack), Message1)))).
1658send_error(Error) :-
1659 pengine_self(Self),
1660 replace_blobs(Error, Error1),
1661 pengine_reply(error(Self, Error1)).
1662
1668
1669replace_blobs(Blob, Atom) :-
1670 blob(Blob, Type), Type \== text,
1671 !,
1672 format(atom(Atom), '~p', [Blob]).
1673replace_blobs(Term0, Term) :-
1674 compound(Term0),
1675 !,
1676 compound_name_arguments(Term0, Name, Args0),
1677 maplist(replace_blobs, Args0, Args),
1678 compound_name_arguments(Term, Name, Args).
1679replace_blobs(Term, Term).
1680
1681
1684
1685
1686remote_pengine_create(BaseURL, Options) :-
1687 partition(pengine_create_option, Options, PengineOptions0, RestOptions),
1688 ( option(ask(Query), PengineOptions0),
1689 \+ option(template(_Template), PengineOptions0)
1690 -> PengineOptions = [template(Query)|PengineOptions0]
1691 ; PengineOptions = PengineOptions0
1692 ),
1693 options_to_dict(PengineOptions, PostData),
1694 remote_post_rec(BaseURL, create, PostData, Reply, RestOptions),
1695 arg(1, Reply, ID),
1696 ( option(id(ID2), Options)
1697 -> ID = ID2
1698 ; true
1699 ),
1700 option(alias(Name), Options, ID),
1701 assert(child(Name, ID)),
1702 ( ( functor(Reply, create, _) 1703 ; functor(Reply, output, _) 1704 )
1705 -> option(application(Application), PengineOptions, pengine_sandbox),
1706 option(destroy(Destroy), PengineOptions, true),
1707 pengine_register_remote(ID, BaseURL, Application, Destroy)
1708 ; true
1709 ),
1710 thread_self(Queue),
1711 pengine_reply(Queue, Reply).
1712
1713options_to_dict(Options, Dict) :-
1714 select_option(ask(Ask), Options, Options1),
1715 select_option(template(Template), Options1, Options2),
1716 !,
1717 no_numbered_var_in(Ask+Template),
1718 findall(AskString-TemplateString,
1719 ask_template_to_strings(Ask, Template, AskString, TemplateString),
1720 [ AskString-TemplateString ]),
1721 options_to_dict(Options2, Dict0),
1722 Dict = Dict0.put(_{ask:AskString,template:TemplateString}).
1723options_to_dict(Options, Dict) :-
1724 maplist(prolog_option, Options, Options1),
1725 dict_create(Dict, _, Options1).
1726
1727no_numbered_var_in(Term) :-
1728 sub_term(Sub, Term),
1729 subsumes_term('$VAR'(_), Sub),
1730 !,
1731 domain_error(numbered_vars_free_term, Term).
1732no_numbered_var_in(_).
1733
1734ask_template_to_strings(Ask, Template, AskString, TemplateString) :-
1735 numbervars(Ask+Template, 0, _),
1736 WOpts = [ numbervars(true), ignore_ops(true), quoted(true) ],
1737 format(string(AskTemplate), '~W\n~W', [ Ask, WOpts,
1738 Template, WOpts
1739 ]),
1740 split_string(AskTemplate, "\n", "", [AskString, TemplateString]).
1741
1742prolog_option(Option0, Option) :-
1743 create_option_type(Option0, term),
1744 !,
1745 Option0 =.. [Name,Value],
1746 format(string(String), '~k', [Value]),
1747 Option =.. [Name,String].
1748prolog_option(Option, Option).
1749
1750create_option_type(ask(_), term).
1751create_option_type(template(_), term).
1752create_option_type(application(_), atom).
1753
1754remote_pengine_send(BaseURL, ID, Event, Options) :-
1755 remote_send_rec(BaseURL, send, ID, [event=Event], Reply, Options),
1756 thread_self(Queue),
1757 pengine_reply(Queue, Reply).
1758
1759remote_pengine_pull_response(BaseURL, ID, Options) :-
1760 remote_send_rec(BaseURL, pull_response, ID, [], Reply, Options),
1761 thread_self(Queue),
1762 pengine_reply(Queue, Reply).
1763
1764remote_pengine_abort(BaseURL, ID, Options) :-
1765 remote_send_rec(BaseURL, abort, ID, [], Reply, Options),
1766 thread_self(Queue),
1767 pengine_reply(Queue, Reply).
1768
1773
1774remote_send_rec(Server, Action, ID, [event=Event], Reply, Options) :-
1775 !,
1776 server_url(Server, Action, [id=ID], URL),
1777 http_open(URL, Stream, 1778 [ post(prolog(Event)) 1779 | Options
1780 ]),
1781 call_cleanup(
1782 read_prolog_reply(Stream, Reply),
1783 close(Stream)).
1784remote_send_rec(Server, Action, ID, Params, Reply, Options) :-
1785 server_url(Server, Action, [id=ID|Params], URL),
1786 http_open(URL, Stream, Options),
1787 call_cleanup(
1788 read_prolog_reply(Stream, Reply),
1789 close(Stream)).
1790
1791remote_post_rec(Server, Action, Data, Reply, Options) :-
1792 server_url(Server, Action, [], URL),
1793 probe(Action, URL, Options),
1794 http_open(URL, Stream,
1795 [ post(json(Data))
1796 | Options
1797 ]),
1798 call_cleanup(
1799 read_prolog_reply(Stream, Reply),
1800 close(Stream)).
1801
1807
1808probe(create, URL, Options) :-
1809 !,
1810 http_open(URL, Stream, [method(options)|Options]),
1811 close(Stream).
1812probe(_, _, _).
1813
1814read_prolog_reply(In, Reply) :-
1815 set_stream(In, encoding(utf8)),
1816 read(In, Reply0),
1817 rebind_cycles(Reply0, Reply).
1818
1819rebind_cycles(@(Reply, Bindings), Reply) :-
1820 is_list(Bindings),
1821 !,
1822 maplist(bind, Bindings).
1823rebind_cycles(Reply, Reply).
1824
1825bind(Var = Value) :-
1826 Var = Value.
1827
1828server_url(Server, Action, Params, URL) :-
1829 atom_concat('pengine/', Action, PAction),
1830 uri_edit([ path(PAction),
1831 search(Params)
1832 ], Server, URL).
1833
1834
1852
1853pengine_event(Event) :-
1854 pengine_event(Event, []).
1855
1856pengine_event(Event, Options) :-
1857 thread_self(Self),
1858 option(listen(Id), Options, _),
1859 ( thread_get_message(Self, pengine_event(Id, Event), Options)
1860 -> true
1861 ; Event = timeout
1862 ),
1863 update_remote_destroy(Event).
1864
1865update_remote_destroy(Event) :-
1866 destroy_event(Event),
1867 arg(1, Event, Id),
1868 pengine_remote(Id, _Server),
1869 !,
1870 pengine_unregister_remote(Id).
1871update_remote_destroy(_).
1872
1873destroy_event(destroy(_)).
1874destroy_event(destroy(_,_)).
1875destroy_event(create(_,Features)) :-
1876 memberchk(answer(Answer), Features),
1877 !,
1878 nonvar(Answer),
1879 destroy_event(Answer).
1880
1881
1907
1908pengine_event_loop(Closure, Options) :-
1909 child(_,_),
1910 !,
1911 pengine_event(Event),
1912 ( option(autoforward(all), Options) 1913 -> forall(child(_,ID), pengine_send(ID, Event))
1914 ; true
1915 ),
1916 pengine_event_loop(Event, Closure, Options).
1917pengine_event_loop(_, _).
1918
1919:- meta_predicate
1920 pengine_process_event(+, 1, -, +). 1921
1922pengine_event_loop(Event, Closure, Options) :-
1923 pengine_process_event(Event, Closure, Continue, Options),
1924 ( Continue == true
1925 -> pengine_event_loop(Closure, Options)
1926 ; true
1927 ).
1928
1929pengine_process_event(create(ID, T), Closure, Continue, Options) :-
1930 debug(pengine(transition), '~q: 1 = /~q => 2', [ID, create(T)]),
1931 ( select(answer(First), T, T1)
1932 -> ignore(call(Closure, create(ID, T1))),
1933 pengine_process_event(First, Closure, Continue, Options)
1934 ; ignore(call(Closure, create(ID, T))),
1935 Continue = true
1936 ).
1937pengine_process_event(output(ID, Msg), Closure, true, _Options) :-
1938 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, output(Msg)]),
1939 ignore(call(Closure, output(ID, Msg))),
1940 pengine_pull_response(ID, []).
1941pengine_process_event(debug(ID, Msg), Closure, true, _Options) :-
1942 debug(pengine(transition), '~q: 3 = /~q => 4', [ID, debug(Msg)]),
1943 ignore(call(Closure, debug(ID, Msg))),
1944 pengine_pull_response(ID, []).
1945pengine_process_event(prompt(ID, Term), Closure, true, _Options) :-
1946 debug(pengine(transition), '~q: 3 = /~q => 5', [ID, prompt(Term)]),
1947 ignore(call(Closure, prompt(ID, Term))).
1948pengine_process_event(success(ID, Sol, _Proj, _Time, More), Closure, true, _) :-
1949 debug(pengine(transition), '~q: 3 = /~q => 6/2', [ID, success(Sol, More)]),
1950 ignore(call(Closure, success(ID, Sol, More))).
1951pengine_process_event(failure(ID, _Time), Closure, true, _Options) :-
1952 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, failure]),
1953 ignore(call(Closure, failure(ID))).
1954pengine_process_event(error(ID, Error), Closure, Continue, _Options) :-
1955 debug(pengine(transition), '~q: 3 = /~q => 2', [ID, error(Error)]),
1956 ( call(Closure, error(ID, Error))
1957 -> Continue = true
1958 ; forall(child(_,Child), pengine_destroy(Child)),
1959 throw(Error)
1960 ).
1961pengine_process_event(stop(ID), Closure, true, _Options) :-
1962 debug(pengine(transition), '~q: 7 = /~q => 2', [ID, stop]),
1963 ignore(call(Closure, stop(ID))).
1964pengine_process_event(destroy(ID, Event), Closure, Continue, Options) :-
1965 pengine_process_event(Event, Closure, _, Options),
1966 pengine_process_event(destroy(ID), Closure, Continue, Options).
1967pengine_process_event(destroy(ID), Closure, true, _Options) :-
1968 retractall(child(_,ID)),
1969 debug(pengine(transition), '~q: 1 = /~q => 0', [ID, destroy]),
1970 ignore(call(Closure, destroy(ID))).
1971
1972
1998
1999pengine_rpc(URL, Query) :-
2000 pengine_rpc(URL, Query, []).
2001
2002pengine_rpc(URL, Query, M:Options0) :-
2003 translate_local_sources(Options0, Options1, M),
2004 ( option(timeout(_), Options1)
2005 -> Options = Options1
2006 ; setting(time_limit, Limit),
2007 Options = [timeout(Limit)|Options1]
2008 ),
2009 term_variables(Query, Vars),
2010 Template =.. [v|Vars],
2011 State = destroy(true), 2012 setup_call_catcher_cleanup(
2013 pengine_create([ ask(Query),
2014 template(Template),
2015 server(URL),
2016 id(Id)
2017 | Options
2018 ]),
2019 wait_event(Template, State, [listen(Id)|Options]),
2020 Why,
2021 pengine_destroy_and_wait(State, Id, Why, Options)).
2022
2023pengine_destroy_and_wait(destroy(true), Id, Why, Options) :-
2024 !,
2025 debug(pengine(rpc), 'Destroying RPC because of ~p', [Why]),
2026 pengine_destroy(Id, Options),
2027 wait_destroy(Id, 10).
2028pengine_destroy_and_wait(_, _, Why, _) :-
2029 debug(pengine(rpc), 'Not destroying RPC (~p)', [Why]).
2030
2031wait_destroy(Id, _) :-
2032 \+ child(_, Id),
2033 !.
2034wait_destroy(Id, N) :-
2035 pengine_event(Event, [listen(Id),timeout(10)]),
2036 !,
2037 ( destroy_event(Event)
2038 -> retractall(child(_,Id))
2039 ; succ(N1, N)
2040 -> wait_destroy(Id, N1)
2041 ; debug(pengine(rpc), 'RPC did not answer to destroy ~p', [Id]),
2042 pengine_unregister_remote(Id),
2043 retractall(child(_,Id))
2044 ).
2045
2046wait_event(Template, State, Options) :-
2047 pengine_event(Event, Options),
2048 debug(pengine(event), 'Received ~p', [Event]),
2049 process_event(Event, Template, State, Options).
2050
2051process_event(create(_ID, Features), Template, State, Options) :-
2052 memberchk(answer(First), Features),
2053 process_event(First, Template, State, Options).
2054process_event(error(_ID, Error), _Template, _, _Options) :-
2055 throw(Error).
2056process_event(failure(_ID, _Time), _Template, _, _Options) :-
2057 fail.
2058process_event(prompt(ID, Prompt), Template, State, Options) :-
2059 pengine_rpc_prompt(ID, Prompt, Reply),
2060 pengine_send(ID, input(Reply)),
2061 wait_event(Template, State, Options).
2062process_event(output(ID, Term), Template, State, Options) :-
2063 pengine_rpc_output(ID, Term),
2064 pengine_pull_response(ID, Options),
2065 wait_event(Template, State, Options).
2066process_event(debug(ID, Message), Template, State, Options) :-
2067 debug(pengine(debug), '~w', [Message]),
2068 pengine_pull_response(ID, Options),
2069 wait_event(Template, State, Options).
2070process_event(success(_ID, Solutions, _Proj, _Time, false),
2071 Template, _, _Options) :-
2072 !,
2073 member(Template, Solutions).
2074process_event(success(ID, Solutions, _Proj, _Time, true),
2075 Template, State, Options) :-
2076 ( member(Template, Solutions)
2077 ; pengine_next(ID, Options),
2078 wait_event(Template, State, Options)
2079 ).
2080process_event(destroy(ID, Event), Template, State, Options) :-
2081 !,
2082 retractall(child(_,ID)),
2083 nb_setarg(1, State, false),
2084 debug(pengine(destroy), 'State: ~p~n', [State]),
2085 process_event(Event, Template, State, Options).
2087process_event(success(ID, Solutions, Time, More),
2088 Template, State, Options) :-
2089 process_event(success(ID, Solutions, _Proj, Time, More),
2090 Template, State, Options).
2091
2092
2093pengine_rpc_prompt(ID, Prompt, Term) :-
2094 prompt(ID, Prompt, Term0),
2095 !,
2096 Term = Term0.
2097pengine_rpc_prompt(_ID, Prompt, Term) :-
2098 setup_call_cleanup(
2099 prompt(Old, Prompt),
2100 read(Term),
2101 prompt(_, Old)).
2102
2103pengine_rpc_output(ID, Term) :-
2104 output(ID, Term),
2105 !.
2106pengine_rpc_output(_ID, Term) :-
2107 print(Term).
2108
2113
2114:- multifile prompt/3. 2115
2120
2121:- multifile output/2. 2122
2123
2126
2138
2139:- multifile http:location/3. 2140http:location(pengine, root(pengine), [-100]).
2141
2142:- http_handler(pengine(.), http_404([]),
2143 [ id(pengines) ]). 2144:- http_handler(pengine(create), http_pengine_create,
2145 [ time_limit(infinite), spawn([]) ]). 2146:- http_handler(pengine(send), http_pengine_send,
2147 [ time_limit(infinite), spawn([]) ]). 2148:- http_handler(pengine(pull_response), http_pengine_pull_response,
2149 [ time_limit(infinite), spawn([]) ]). 2150:- http_handler(pengine(abort), http_pengine_abort, []). 2151:- http_handler(pengine(detach), http_pengine_detach, []). 2152:- http_handler(pengine(list), http_pengine_list, []). 2153:- http_handler(pengine(ping), http_pengine_ping, []). 2154:- http_handler(pengine(destroy_all), http_pengine_destroy_all, []). 2155
2156:- http_handler(pengine('pengines.js'),
2157 http_reply_file(library('http/web/js/pengines.js'), []), []). 2158:- http_handler(pengine('plterm.css'),
2159 http_reply_file(library('http/web/css/plterm.css'), []), []). 2160
2161
2189
2190http_pengine_create(Request) :-
2191 reply_options(Request, [post]),
2192 !.
2193http_pengine_create(Request) :-
2194 memberchk(content_type(CT), Request),
2195 sub_atom(CT, 0, _, _, 'application/json'),
2196 !,
2197 http_read_json_dict(Request, Dict),
2198 dict_atom_option(format, Dict, Format, prolog),
2199 dict_atom_option(application, Dict, Application, pengine_sandbox),
2200 http_pengine_create(Request, Application, Format, Dict).
2201http_pengine_create(Request) :-
2202 Optional = [optional(true)],
2203 OptString = [string|Optional],
2204 Form = [ format(Format, [default(prolog)]),
2205 application(Application, [default(pengine_sandbox)]),
2206 chunk(_, [nonneg;oneof([false]), default(1)]),
2207 collate(_, [number, default(0)]),
2208 solutions(_, [oneof([all,chunked]), default(chunked)]),
2209 ask(_, OptString),
2210 template(_, OptString),
2211 src_text(_, OptString),
2212 disposition(_, OptString),
2213 src_url(_, Optional)
2214 ],
2215 http_parameters(Request, Form),
2216 form_dict(Form, Dict),
2217 http_pengine_create(Request, Application, Format, Dict).
2218
2219dict_atom_option(Key, Dict, Atom, Default) :-
2220 ( get_dict(Key, Dict, String)
2221 -> atom_string(Atom, String)
2222 ; Atom = Default
2223 ).
2224
2225form_dict(Form, Dict) :-
2226 form_values(Form, Pairs),
2227 dict_pairs(Dict, _, Pairs).
2228
2229form_values([], []).
2230form_values([H|T], Pairs) :-
2231 arg(1, H, Value),
2232 nonvar(Value),
2233 !,
2234 functor(H, Name, _),
2235 Pairs = [Name-Value|PairsT],
2236 form_values(T, PairsT).
2237form_values([_|T], Pairs) :-
2238 form_values(T, Pairs).
2239
2241
2242
2243http_pengine_create(Request, Application, Format, Dict) :-
2244 current_application(Application),
2245 !,
2246 allowed(Request, Application),
2247 authenticate(Request, Application, UserOptions),
2248 dict_to_options(Dict, Application, CreateOptions0),
2249 append(UserOptions, CreateOptions0, CreateOptions),
2250 pengine_uuid(Pengine),
2251 message_queue_create(Queue, [max_size(25)]),
2252 setting(Application:time_limit, TimeLimit),
2253 get_time(Now),
2254 asserta(pengine_queue(Pengine, Queue, TimeLimit, Now)),
2255 broadcast(pengine(create(Pengine, Application, CreateOptions))),
2256 create(Queue, Pengine, CreateOptions, http, Application),
2257 create_wait_and_output_result(Pengine, Queue, Format,
2258 TimeLimit, Dict),
2259 gc_abandoned_queues.
2260http_pengine_create(_Request, Application, Format, _Dict) :-
2261 Error = existence_error(pengine_application, Application),
2262 pengine_uuid(ID),
2263 output_result(ID, Format, error(ID, error(Error, _))).
2264
2265
2266dict_to_options(Dict, Application, CreateOptions) :-
2267 dict_pairs(Dict, _, Pairs),
2268 pairs_create_options(Pairs, Application, CreateOptions).
2269
2270pairs_create_options([], _, []) :- !.
2271pairs_create_options([N-V0|T0], App, [Opt|T]) :-
2272 Opt =.. [N,V],
2273 pengine_create_option(Opt), N \== user,
2274 !,
2275 ( create_option_type(Opt, atom)
2276 -> atom_string(V, V0) 2277 ; V = V0 2278 ), 2279 pairs_create_options(T0, App, T).
2280pairs_create_options([_|T0], App, T) :-
2281 pairs_create_options(T0, App, T).
2282
2291
2292wait_and_output_result(Pengine, Queue, Format, TimeLimit, Collate0) :-
2293 Collate is min(Collate0, TimeLimit/10),
2294 get_time(Epoch),
2295 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2296 [ timeout(TimeLimit)
2297 ]),
2298 Error, true)
2299 -> ( var(Error)
2300 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2301 ( collating_event(Collate, Event)
2302 -> Deadline is Epoch+TimeLimit,
2303 collect_events(Pengine, Collate, Queue, Deadline, 100, More),
2304 Events = [Event|More],
2305 ignore(destroy_queue_from_http(Pengine, Events, Queue)),
2306 protect_pengine(Pengine, output_result(Pengine, Format, Events))
2307 ; ignore(destroy_queue_from_http(Pengine, Event, Queue)),
2308 protect_pengine(Pengine, output_result(Pengine, Format, Event))
2309 )
2310 ; output_result(Pengine, Format, died(Pengine))
2311 )
2312 ; time_limit_exceeded(Pengine, Format)
2313 ).
2314
2319
2320collect_events(_Pengine, _Collate, _Queue, _Deadline, 0, []) :-
2321 !.
2322collect_events(Pengine, Collate, Queue, Deadline, Max, Events) :-
2323 debug(pengine(wait), 'Waiting to collate events', []),
2324 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2325 [ timeout(Collate)
2326 ]),
2327 Error, true)
2328 -> ( var(Error)
2329 -> debug(pengine(wait), 'Got ~q from ~q', [Event, Queue]),
2330 Events = [Event|More],
2331 ( collating_event(Collate, Event)
2332 -> Max2 is Max - 1,
2333 collect_events(Pengine, Collate, Queue, Deadline, Max2, More)
2334 ; More = []
2335 )
2336 ; Events = [died(Pengine)]
2337 )
2338 ; get_time(Now),
2339 Now > Deadline
2340 -> time_limit_event(Pengine, TimeLimitEvent),
2341 Events = [TimeLimitEvent]
2342 ; Events = []
2343 ).
2344
2345collating_event(0, _) :-
2346 !,
2347 fail.
2348collating_event(_, output(_,_)).
2349
2356
2357create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
2358 get_dict(solutions, Dict, all),
2359 !,
2360 between(1, infinite, Page),
2361 ( catch(thread_get_message(Queue, pengine_event(_, Event),
2362 [ timeout(TimeLimit)
2363 ]),
2364 Error, true)
2365 -> ( var(Error)
2366 -> debug(pengine(wait), 'Page ~D: got ~q from ~q', [Page, Event, Queue]),
2367 ( destroy_queue_from_http(Pengine, Event, Queue)
2368 -> !,
2369 protect_pengine(Pengine,
2370 output_result_2(Format, page(Page, Event), Dict))
2371 ; is_more_event(Event)
2372 -> pengine_thread(Pengine, Thread),
2373 thread_send_message(Thread, pengine_request(next)),
2374 protect_pengine(Pengine,
2375 output_result_2(Format, page(Page, Event), Dict)),
2376 fail
2377 ; !,
2378 protect_pengine(Pengine,
2379 output_result_2(Format, page(Page, Event), Dict))
2380 )
2381 ; !, output_result(Pengine, Format, died(Pengine))
2382 )
2383 ; !, time_limit_exceeded(Pengine, Format)
2384 ),
2385 !.
2386create_wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict) :-
2387 wait_and_output_result(Pengine, Queue, Format, TimeLimit, Dict.get(collate,0)).
2388
2389is_more_event(success(_Id, _Answers, _Projection, _Time, true)).
2390is_more_event(create(_, Options)) :-
2391 memberchk(answer(Event), Options),
2392 is_more_event(Event).
2393
2394
2395
2405
2406time_limit_exceeded(Pengine, Format) :-
2407 time_limit_event(Pengine, Event),
2408 call_cleanup(
2409 pengine_destroy(Pengine, [force(true)]),
2410 output_result(Pengine, Format, Event)).
2411
2412time_limit_event(Pengine,
2413 destroy(Pengine, error(Pengine, time_limit_exceeded))).
2414
2415destroy_pengine_after_output(Pengine, Events) :-
2416 is_list(Events),
2417 last(Events, Last),
2418 time_limit_event(Pengine, Last),
2419 !,
2420 catch(ignore(pengine_destroy(Pengine, [force(true)])), error(_,_), true).
2421destroy_pengine_after_output(_, _).
2422
2423
2435
2436destroy_queue_from_http(ID, _, Queue) :-
2437 output_queue(ID, Queue, _),
2438 !,
2439 destroy_queue_if_empty(Queue).
2440destroy_queue_from_http(ID, Event, Queue) :-
2441 debug(pengine(destroy), 'DESTROY? ~p', [Event]),
2442 is_destroy_event(Event),
2443 !,
2444 message_queue_property(Queue, size(Waiting)),
2445 debug(pengine(destroy), 'Destroy ~p (waiting ~D)', [Queue, Waiting]),
2446 with_mutex(pengine, sync_destroy_queue_from_http(ID, Queue)).
2447
2448is_destroy_event(destroy(_)).
2449is_destroy_event(destroy(_,_)).
2450is_destroy_event(create(_, Options)) :-
2451 memberchk(answer(Event), Options),
2452 is_destroy_event(Event).
2453
2454destroy_queue_if_empty(Queue) :-
2455 thread_peek_message(Queue, _),
2456 !.
2457destroy_queue_if_empty(Queue) :-
2458 retractall(output_queue(_, Queue, _)),
2459 message_queue_destroy(Queue).
2460
2466
2467:- dynamic
2468 last_gc/1. 2469
2470gc_abandoned_queues :-
2471 consider_queue_gc,
2472 !,
2473 get_time(Now),
2474 ( output_queue(_, Queue, Time),
2475 Now-Time > 15*60,
2476 retract(output_queue(_, Queue, Time)),
2477 message_queue_destroy(Queue),
2478 fail
2479 ; retractall(last_gc(_)),
2480 asserta(last_gc(Now))
2481 ).
2482gc_abandoned_queues.
2483
2484consider_queue_gc :-
2485 predicate_property(output_queue(_,_,_), number_of_clauses(N)),
2486 N > 100,
2487 ( last_gc(Time),
2488 get_time(Now),
2489 Now-Time > 5*60
2490 -> true
2491 ; \+ last_gc(_)
2492 ).
2493
2509
2510:- dynamic output_queue_destroyed/1. 2511
2512sync_destroy_queue_from_http(ID, Queue) :-
2513 ( output_queue(ID, Queue, _)
2514 -> destroy_queue_if_empty(Queue)
2515 ; thread_peek_message(Queue, pengine_event(_, output(_,_)))
2516 -> debug(pengine(destroy), 'Delay destruction of ~p because of output',
2517 [Queue]),
2518 get_time(Now),
2519 asserta(output_queue(ID, Queue, Now))
2520 ; message_queue_destroy(Queue),
2521 asserta(output_queue_destroyed(Queue))
2522 ).
2523
2528
2529sync_destroy_queue_from_pengine(ID, Queue) :-
2530 ( retract(output_queue_destroyed(Queue))
2531 -> true
2532 ; get_time(Now),
2533 asserta(output_queue(ID, Queue, Now))
2534 ),
2535 retractall(pengine_queue(ID, Queue, _, _)).
2536
2537
2538http_pengine_send(Request) :-
2539 reply_options(Request, [get,post]),
2540 !.
2541http_pengine_send(Request) :-
2542 http_parameters(Request,
2543 [ id(ID, [ type(atom) ]),
2544 event(EventString, [optional(true)]),
2545 collate(Collate, [number, default(0)]),
2546 format(Format, [default(prolog)])
2547 ]),
2548 catch(read_event(ID, Request, Format, EventString, Event),
2549 Error,
2550 true),
2551 ( var(Error)
2552 -> debug(pengine(event), 'HTTP send: ~p', [Event]),
2553 ( pengine_thread(ID, Thread)
2554 -> pengine_queue(ID, Queue, TimeLimit, _),
2555 random_delay,
2556 broadcast(pengine(send(ID, Event))),
2557 thread_send_message(Thread, pengine_request(Event)),
2558 wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
2559 ; atom(ID)
2560 -> pengine_died(Format, ID)
2561 ; http_404([], Request)
2562 )
2563 ; Error = error(existence_error(pengine, ID), _)
2564 -> pengine_died(Format, ID)
2565 ; output_result(ID, Format, error(ID, Error))
2566 ).
2567
2568pengine_died(Format, Pengine) :-
2569 output_result(Pengine, Format,
2570 error(Pengine, error(existence_error(pengine, Pengine),_))).
2571
2572
2580
2581read_event(Pengine, Request, Format, EventString, Event) :-
2582 protect_pengine(
2583 Pengine,
2584 ( get_pengine_module(Pengine, Module),
2585 read_event_2(Request, EventString, Module, Event0, Bindings)
2586 )),
2587 !,
2588 fix_bindings(Format, Event0, Bindings, Event).
2589read_event(Pengine, Request, _Format, _EventString, _Event) :-
2590 debug(pengine(event), 'Pengine ~q vanished', [Pengine]),
2591 discard_post_data(Request),
2592 existence_error(pengine, Pengine).
2593
2594
2599
2600read_event_2(_Request, EventString, Module, Event, Bindings) :-
2601 nonvar(EventString),
2602 !,
2603 term_string(Event, EventString,
2604 [ variable_names(Bindings),
2605 module(Module)
2606 ]).
2607read_event_2(Request, _EventString, Module, Event, Bindings) :-
2608 option(method(post), Request),
2609 http_read_data(Request, Event,
2610 [ content_type('application/x-prolog'),
2611 module(Module),
2612 variable_names(Bindings)
2613 ]).
2614
2618
2619discard_post_data(Request) :-
2620 option(method(post), Request),
2621 !,
2622 setup_call_cleanup(
2623 open_null_stream(NULL),
2624 http_read_data(Request, _, [to(stream(NULL))]),
2625 close(NULL)).
2626discard_post_data(_).
2627
2633
2634fix_bindings(Format,
2635 ask(Goal, Options0), Bindings,
2636 ask(Goal, NewOptions)) :-
2637 json_lang(Format),
2638 !,
2639 exclude(anon, Bindings, NamedBindings),
2640 template(NamedBindings, Template, Options0, Options1),
2641 select_option(chunk(Paging), Options1, Options2, 1),
2642 NewOptions = [ template(Template),
2643 chunk(Paging),
2644 bindings(NamedBindings)
2645 | Options2
2646 ].
2647fix_bindings(_, Command, _, Command).
2648
2649template(_, Template, Options0, Options) :-
2650 select_option(template(Template), Options0, Options),
2651 !.
2652template(Bindings, Template, Options, Options) :-
2653 dict_create(Template, swish_default_template, Bindings).
2654
2655anon(Name=_) :-
2656 sub_atom(Name, 0, _, _, '_'),
2657 sub_atom(Name, 1, 1, _, Next),
2658 char_type(Next, prolog_var_start).
2659
2660var_name(Name=_, Name).
2661
2662
2666
2667json_lang(json) :- !.
2668json_lang(Format) :-
2669 sub_atom(Format, 0, _, _, 'json-').
2670
2675
2676http_pengine_pull_response(Request) :-
2677 reply_options(Request, [get]),
2678 !.
2679http_pengine_pull_response(Request) :-
2680 http_parameters(Request,
2681 [ id(ID, []),
2682 format(Format, [default(prolog)]),
2683 collate(Collate, [number, default(0)])
2684 ]),
2685 reattach(ID),
2686 ( ( pengine_queue(ID, Queue, TimeLimit, _)
2687 -> true
2688 ; output_queue(ID, Queue, _),
2689 TimeLimit = 0
2690 )
2691 -> wait_and_output_result(ID, Queue, Format, TimeLimit, Collate)
2692 ; http_404([], Request)
2693 ).
2694
2701
2702http_pengine_abort(Request) :-
2703 reply_options(Request, [get,post]),
2704 !.
2705http_pengine_abort(Request) :-
2706 http_parameters(Request,
2707 [ id(ID, [])
2708 ]),
2709 ( pengine_thread(ID, _Thread)
2710 -> broadcast(pengine(abort(ID))),
2711 abort_pending_output(ID),
2712 pengine_abort(ID),
2713 reply_json_dict(true)
2714 ; http_404([], Request)
2715 ).
2716
2726
2727http_pengine_detach(Request) :-
2728 reply_options(Request, [post]),
2729 !.
2730http_pengine_detach(Request) :-
2731 http_parameters(Request,
2732 [ id(ID, [])
2733 ]),
2734 http_read_json_dict(Request, ClientData),
2735 ( pengine_property(ID, application(Application)),
2736 allowed(Request, Application),
2737 authenticate(Request, Application, _UserOptions)
2738 -> broadcast(pengine(detach(ID))),
2739 get_time(Now),
2740 assertz(pengine_detached(ID, ClientData.put(time, Now))),
2741 pengine_queue(ID, Queue, _TimeLimit, _Now),
2742 message_queue_set(Queue, max_size(1000)),
2743 pengine_reply(Queue, detached(ID)),
2744 reply_json_dict(true)
2745 ; http_404([], Request)
2746 ).
2747
2748reattach(ID) :-
2749 ( retract(pengine_detached(ID, _Data)),
2750 pengine_queue(ID, Queue, _TimeLimit, _Now)
2751 -> message_queue_set(Queue, max_size(25))
2752 ; true
2753 ).
2754
2755
2760
2761http_pengine_destroy_all(Request) :-
2762 reply_options(Request, [get,post]),
2763 !.
2764http_pengine_destroy_all(Request) :-
2765 http_parameters(Request,
2766 [ ids(IDsAtom, [])
2767 ]),
2768 atomic_list_concat(IDs, ',', IDsAtom),
2769 forall(( member(ID, IDs),
2770 \+ pengine_detached(ID, _)
2771 ),
2772 pengine_destroy(ID, [force(true)])),
2773 reply_json_dict("ok").
2774
2780
2781http_pengine_ping(Request) :-
2782 reply_options(Request, [get]),
2783 !.
2784http_pengine_ping(Request) :-
2785 http_parameters(Request,
2786 [ id(Pengine, []),
2787 format(Format, [default(prolog)])
2788 ]),
2789 ( pengine_thread(Pengine, Thread),
2790 Error = error(_,_),
2791 catch(thread_statistics(Thread, Stats), Error, fail)
2792 -> output_result(Pengine, Format, ping(Pengine, Stats))
2793 ; output_result(Pengine, Format, died(Pengine))
2794 ).
2795
2802
2803http_pengine_list(Request) :-
2804 reply_options(Request, [get]),
2805 !.
2806http_pengine_list(Request) :-
2807 http_parameters(Request,
2808 [ status(Status, [default(detached), oneof([detached])]),
2809 application(Application, [default(pengine_sandbox)])
2810 ]),
2811 allowed(Request, Application),
2812 authenticate(Request, Application, _UserOptions),
2813 findall(Term, listed_pengine(Application, Status, Term), Terms),
2814 reply_json_dict(json{pengines: Terms}).
2815
2816listed_pengine(Application, detached, State) :-
2817 State = pengine{id:Id,
2818 detached:Time,
2819 queued:Queued,
2820 stats:Stats},
2821
2822 pengine_property(Id, application(Application)),
2823 pengine_property(Id, detached(Time)),
2824 pengine_queue(Id, Queue, _TimeLimit, _Now),
2825 message_queue_property(Queue, size(Queued)),
2826 ( pengine_thread(Id, Thread),
2827 catch(thread_statistics(Thread, Stats), _, fail)
2828 -> true
2829 ; Stats = thread{status:died}
2830 ).
2831
2832
2840
2841:- dynamic
2842 pengine_replying/2. 2843
2844output_result(Pengine, Format, Event) :-
2845 thread_self(Thread),
2846 cors_enable, 2847 disable_client_cache,
2848 setup_call_cleanup(
2849 asserta(pengine_replying(Pengine, Thread), Ref),
2850 catch(output_result_2(Format, Event, _{}),
2851 pengine_abort_output,
2852 true),
2853 erase(Ref)),
2854 destroy_pengine_after_output(Pengine, Event).
2855
2856output_result_2(Lang, Event, Dict) :-
2857 write_result(Lang, Event, Dict),
2858 !.
2859output_result_2(prolog, Event, _) :-
2860 !,
2861 format('Content-type: text/x-prolog; charset=UTF-8~n~n'),
2862 write_term(Event,
2863 [ quoted(true),
2864 ignore_ops(true),
2865 fullstop(true),
2866 blobs(portray),
2867 portray_goal(portray_blob),
2868 nl(true)
2869 ]).
2870output_result_2(Lang, Event, _) :-
2871 json_lang(Lang),
2872 !,
2873 ( event_term_to_json_data(Event, JSON, Lang)
2874 -> reply_json_dict(JSON)
2875 ; assertion(event_term_to_json_data(Event, _, Lang))
2876 ).
2877output_result_2(Lang, _Event, _) :- 2878 domain_error(pengine_format, Lang).
2879
2887
2888:- public portray_blob/2. 2889portray_blob(Blob, _Options) :-
2890 blob(Blob, Type),
2891 writeq('$BLOB'(Type)).
2892
2897
2898abort_pending_output(Pengine) :-
2899 forall(pengine_replying(Pengine, Thread),
2900 abort_output_thread(Thread)).
2901
2902abort_output_thread(Thread) :-
2903 catch(thread_signal(Thread, throw(pengine_abort_output)),
2904 error(existence_error(thread, _), _),
2905 true).
2906
2914
2920
2921disable_client_cache :-
2922 format('Cache-Control: no-cache, no-store, must-revalidate\r\n\c
2923 Pragma: no-cache\r\n\c
2924 Expires: 0\r\n').
2925
2926event_term_to_json_data(Events, JSON, Lang) :-
2927 is_list(Events),
2928 !,
2929 events_to_json_data(Events, JSON, Lang).
2930event_term_to_json_data(Event, JSON, Lang) :-
2931 event_to_json(Event, JSON, Lang),
2932 !.
2933event_term_to_json_data(success(ID, Bindings0, Projection, Time, More),
2934 json{event:success, id:ID, time:Time,
2935 data:Bindings, more:More, projection:Projection},
2936 json) :-
2937 !,
2938 term_to_json(Bindings0, Bindings).
2939event_term_to_json_data(destroy(ID, Event),
2940 json{event:destroy, id:ID, data:JSON},
2941 Style) :-
2942 !,
2943 event_term_to_json_data(Event, JSON, Style).
2944event_term_to_json_data(create(ID, Features0), JSON, Style) :-
2945 !,
2946 ( select(answer(First0), Features0, Features1)
2947 -> event_term_to_json_data(First0, First, Style),
2948 Features = [answer(First)|Features1]
2949 ; Features = Features0
2950 ),
2951 dict_create(JSON, json, [event(create), id(ID)|Features]).
2952event_term_to_json_data(destroy(ID, Event),
2953 json{event:destroy, id:ID, data:JSON}, Style) :-
2954 !,
2955 event_term_to_json_data(Event, JSON, Style).
2956event_term_to_json_data(error(ID, ErrorTerm), Error, _Style) :-
2957 !,
2958 Error0 = json{event:error, id:ID, data:Message},
2959 add_error_details(ErrorTerm, Error0, Error),
2960 message_to_string(ErrorTerm, Message).
2961event_term_to_json_data(failure(ID, Time),
2962 json{event:failure, id:ID, time:Time}, _) :-
2963 !.
2964event_term_to_json_data(EventTerm, json{event:F, id:ID}, _) :-
2965 functor(EventTerm, F, 1),
2966 !,
2967 arg(1, EventTerm, ID).
2968event_term_to_json_data(EventTerm, json{event:F, id:ID, data:JSON}, _) :-
2969 functor(EventTerm, F, 2),
2970 arg(1, EventTerm, ID),
2971 arg(2, EventTerm, Data),
2972 term_to_json(Data, JSON).
2973
2974events_to_json_data([], [], _).
2975events_to_json_data([E|T0], [J|T], Lang) :-
2976 event_term_to_json_data(E, J, Lang),
2977 events_to_json_data(T0, T, Lang).
2978
2979:- public add_error_details/3. 2980
2985
2986add_error_details(Error, JSON0, JSON) :-
2987 add_error_code(Error, JSON0, JSON1),
2988 add_error_location(Error, JSON1, JSON).
2989
3000
3001add_error_code(error(existence_error(Type, Obj), _), Error0, Error) :-
3002 atom(Type),
3003 !,
3004 to_atomic(Obj, Value),
3005 Error = Error0.put(_{code:existence_error, arg1:Type, arg2:Value}).
3006add_error_code(error(Formal, _), Error0, Error) :-
3007 callable(Formal),
3008 !,
3009 functor(Formal, Code, _),
3010 Error = Error0.put(code, Code).
3011add_error_code(_, Error, Error).
3012
3014to_atomic(Obj, Atomic) :- atom(Obj), !, Atomic = Obj.
3015to_atomic(Obj, Atomic) :- number(Obj), !, Atomic = Obj.
3016to_atomic(Obj, Atomic) :- string(Obj), !, Atomic = Obj.
3017to_atomic(Obj, Atomic) :- term_string(Obj, Atomic).
3018
3019
3025
3026add_error_location(error(_, file(Path, Line, -1, _CharNo)), Term0, Term) :-
3027 atom(Path), integer(Line),
3028 !,
3029 Term = Term0.put(_{location:_{file:Path, line:Line}}).
3030add_error_location(error(_, file(Path, Line, Ch, _CharNo)), Term0, Term) :-
3031 atom(Path), integer(Line), integer(Ch),
3032 !,
3033 Term = Term0.put(_{location:_{file:Path, line:Line, ch:Ch}}).
3034add_error_location(_, Term, Term).
3035
3036
3044
3046
3047
3048 3051
3056
3057allowed(Request, Application) :-
3058 setting(Application:allow_from, Allow),
3059 match_peer(Request, Allow),
3060 setting(Application:deny_from, Deny),
3061 \+ match_peer(Request, Deny),
3062 !.
3063allowed(Request, _Application) :-
3064 memberchk(request_uri(Here), Request),
3065 throw(http_reply(forbidden(Here))).
3066
3067match_peer(_, Allowed) :-
3068 memberchk(*, Allowed),
3069 !.
3070match_peer(_, []) :- !, fail.
3071match_peer(Request, Allowed) :-
3072 http_peer(Request, Peer),
3073 debug(pengine(allow), 'Peer: ~q, Allow: ~q', [Peer, Allowed]),
3074 ( memberchk(Peer, Allowed)
3075 -> true
3076 ; member(Pattern, Allowed),
3077 match_peer_pattern(Pattern, Peer)
3078 ).
3079
3080match_peer_pattern(Pattern, Peer) :-
3081 ip_term(Pattern, IP),
3082 ip_term(Peer, IP),
3083 !.
3084
3085ip_term(Peer, Pattern) :-
3086 split_string(Peer, ".", "", PartStrings),
3087 ip_pattern(PartStrings, Pattern).
3088
3089ip_pattern([], []).
3090ip_pattern([*], _) :- !.
3091ip_pattern([S|T0], [N|T]) :-
3092 number_string(N, S),
3093 ip_pattern(T0, T).
3094
3095
3100
3101authenticate(Request, Application, UserOptions) :-
3102 authentication_hook(Request, Application, User),
3103 !,
3104 must_be(ground, User),
3105 UserOptions = [user(User)].
3106authenticate(_, _, []).
3107
3127
3128pengine_register_user(Options) :-
3129 option(user(User), Options),
3130 !,
3131 pengine_self(Me),
3132 asserta(pengine_user(Me, User)).
3133pengine_register_user(_).
3134
3135
3143
3144pengine_user(User) :-
3145 pengine_self(Me),
3146 pengine_user(Me, User).
3147
3151
3152reply_options(Request, Allowed) :-
3153 option(method(options), Request),
3154 !,
3155 cors_enable(Request,
3156 [ methods(Allowed)
3157 ]),
3158 format('Content-type: text/plain\r\n'),
3159 format('~n'). 3160
3161
3162 3165
3172
3173pengine_src_text(Src, Module) :-
3174 pengine_self(Self),
3175 format(atom(ID), 'pengine://~w/src', [Self]),
3176 extra_load_options(Self, Options),
3177 setup_call_cleanup(
3178 open_chars_stream(Src, Stream),
3179 load_files(Module:ID,
3180 [ stream(Stream),
3181 module(Module),
3182 silent(true)
3183 | Options
3184 ]),
3185 close(Stream)),
3186 keep_source(Self, ID, Src).
3187
3188system:'#file'(File, _Line) :-
3189 prolog_load_context(stream, Stream),
3190 set_stream(Stream, file_name(File)),
3191 set_stream(Stream, record_position(false)),
3192 set_stream(Stream, record_position(true)).
3193
3201
3202pengine_src_url(URL, Module) :-
3203 pengine_self(Self),
3204 uri_encoded(path, URL, Path),
3205 format(atom(ID), 'pengine://~w/url/~w', [Self, Path]),
3206 extra_load_options(Self, Options),
3207 ( get_pengine_application(Self, Application),
3208 setting(Application:debug_info, false)
3209 -> setup_call_cleanup(
3210 http_open(URL, Stream, []),
3211 ( set_stream(Stream, encoding(utf8)),
3212 load_files(Module:ID,
3213 [ stream(Stream),
3214 module(Module)
3215 | Options
3216 ])
3217 ),
3218 close(Stream))
3219 ; setup_call_cleanup(
3220 http_open(URL, TempStream, []),
3221 ( set_stream(TempStream, encoding(utf8)),
3222 read_string(TempStream, _, Src)
3223 ),
3224 close(TempStream)),
3225 setup_call_cleanup(
3226 open_chars_stream(Src, Stream),
3227 load_files(Module:ID,
3228 [ stream(Stream),
3229 module(Module)
3230 | Options
3231 ]),
3232 close(Stream)),
3233 keep_source(Self, ID, Src)
3234 ).
3235
3236
(Pengine, Options) :-
3238 pengine_not_sandboxed(Pengine),
3239 !,
3240 Options = [].
3241extra_load_options(_, [sandboxed(true)]).
3242
3243
3244keep_source(Pengine, ID, SrcText) :-
3245 get_pengine_application(Pengine, Application),
3246 setting(Application:debug_info, true),
3247 !,
3248 to_string(SrcText, SrcString),
3249 assertz(pengine_data(Pengine, source(ID, SrcString))).
3250keep_source(_, _, _).
3251
3252to_string(String, String) :-
3253 string(String),
3254 !.
3255to_string(Atom, String) :-
3256 atom_string(Atom, String),
3257 !.
3258
3259 3262
3263:- multifile
3264 sandbox:safe_primitive/1. 3265
3266sandbox:safe_primitive(pengines:pengine_input(_, _)).
3267sandbox:safe_primitive(pengines:pengine_output(_)).
3268sandbox:safe_primitive(pengines:pengine_debug(_,_)).
3269
3270
3271 3274
3275prolog:error_message(sandbox(time_limit_exceeded, Limit)) -->
3276 [ 'Could not prove safety of your goal within ~f seconds.'-[Limit], nl,
3277 'This is normally caused by an insufficiently instantiated'-[], nl,
3278 'meta-call (e.g., call(Var)) for which it is too expensive to'-[], nl,
3279 'find all possible instantations of Var.'-[]
3280 ]