33
34:- module(mqi,
35 [ mqi_start/0,
36 mqi_start/1, 37 mqi_stop/1, 38 mqi_version/2 39 ]). 40
110:- use_module(library(socket)). 111:- use_module(library(http/json)). 112:- use_module(library(http/json_convert)). 113:- use_module(library(http/http_stream)). 114:- use_module(library(option)). 115:- use_module(library(term_to_json)). 116:- use_module(library(debug)). 117:- use_module(library(filesex)). 118:- use_module(library(gensym)). 119:- use_module(library(lists)). 120:- use_module(library(main)). 121:- use_module(library(make)). 122:- use_module(library(prolog_source)). 123:- use_module(library(time)). 124:- use_module(library(uuid)). 125
127:- dynamic(mqi_thread/3). 128
130:- dynamic(mqi_worker_threads/3). 131:- dynamic(mqi_socket/5). 132
136:- dynamic(query_in_progress/1). 137
140:- dynamic(safe_to_cancel/1). 141
175mqi_version(1, 0).
176
177
181mqi_start(Options) :-
182 Encoding = utf8,
183 option(pending_connections(Connection_Count), Options, 5),
184 option(query_timeout(Query_Timeout), Options, -1),
185 option(port(Port), Options, _),
186 option(run_server_on_thread(Run_Server_On_Thread), Options, true),
187 option(exit_main_on_failure(Exit_Main_On_Failure), Options, false),
188 option(write_connection_values(Write_Connection_Values), Options, false),
189 option(unix_domain_socket(Unix_Domain_Socket_Path_And_File), Options, _),
190 ( ( memberchk(unix_domain_socket(_), Options),
191 var(Unix_Domain_Socket_Path_And_File)
192 )
193 -> unix_domain_socket_path(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File)
194 ; true
195 ),
196 option(server_thread(Server_Thread_ID), Options, _),
197 ( var(Server_Thread_ID)
198 -> gensym(mqi, Server_Thread_ID)
199 ; true
200 ),
201 option(password(Password), Options, _),
202 ( var(Password)
203 -> ( current_prolog_flag(bounded, false)
204 -> uuid(UUID, [format(integer)])
205 ; UUID is random(1<<62)
206 ),
207 format(string(Password), '~d', [UUID])
208 ; true
209 ),
210 string_concat(Password, '.\n', Final_Password),
211 bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address),
212 send_client_startup_data(Write_Connection_Values, user_output, Unix_Domain_Socket_Path_And_File, Client_Address, Password),
213 option(write_output_to_file(File), Options, _),
214 ( var(File)
215 -> true
216 ; write_output_to_file(File)
217 ),
218 Server_Goal = (
219 catch(server_thread(Server_Thread_ID, Socket, Client_Address, Final_Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure), error(E1, E2), true),
220 debug(mqi(protocol), "Stopped MQI on thread: ~w due to exception: ~w", [Server_Thread_ID, error(E1, E2)])
221 ),
222 start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File).
223
224opt_type(port, port, natural).
225opt_type(create_unix_domain_socket, create_unix_domain_socket, boolean).
226opt_type(unix_domain_socket, unix_domain_socket, file(write)).
227opt_type(password, password, string).
228opt_type(pending_connections, pending_connections, nonneg).
229opt_type(query_timeout, query_timeout, float).
230opt_type(run_server_on_thread, run_server_on_thread, boolean).
231opt_type(exit_main_on_failure, exit_main_on_failure, boolean).
232opt_type(write_connection_values, write_connection_values, boolean).
233opt_type(write_output_to_file, write_output_to_file, file(write)).
234
235opt_help(port, "TCP/IP port for clients to connect to").
236opt_help(create_unix_domain_socket, "Create a Unix domain socket for clients to connect to").
237opt_help(unix_domain_socket, "File path for the Unix domain socket").
238opt_help(password, "Connection password").
239opt_help(pending_connections, "Max number of queued connections (5)").
240opt_help(query_timeout, "Max query runtime in seconds (default infinite)").
241opt_help(run_server_on_thread, "Run server in a background thread (true)").
242opt_help(exit_main_on_failure, "Exit the process on a failure").
243opt_help(write_connection_values, "Print info for clients to connect").
244opt_help(write_output_to_file, "Write stdout and stderr to file").
245
281
282
290mqi_start :-
291 current_prolog_flag(argv, Argv),
292 argv_options(Argv, _Args, Options),
293 merge_options(Options, [exit_main_on_failure(true)], Options1),
294 select_option(create_unix_domain_socket(Create_Unix_Domain_Socket), Options1, Options2, false),
295 ( Create_Unix_Domain_Socket == true
296 -> merge_options(Options2, [unix_domain_socket(_)], FinalOptions)
297 ; FinalOptions = Options2
298 ),
299 option(run_server_on_thread(Run_Server_On_Thread), FinalOptions, true),
300 ( Run_Server_On_Thread == true
301 -> true
302 ; throw(domain_error(cannot_be_set_in_embedded_mode, run_server_on_thread))
303 ),
304 mqi_start(FinalOptions),
305 on_signal(int, _, quit),
306 thread_get_message(quit_mqi).
307
308
309quit(_) :-
310 thread_send_message(main, quit_mqi).
311
312
318
323mqi_stop(Server_Thread_ID) :-
324 325 forall(retract(mqi_thread(Server_Thread_ID, _, Socket)),
326 (
327 debug(mqi(protocol), "Found server: ~w", [Server_Thread_ID]),
328 catch(tcp_close_socket(Socket), Socket_Exception, true),
329 abortSilentExit(Server_Thread_ID, Server_Thread_Exception),
330 debug(mqi(protocol), "Stopped server thread: ~w, socket_close_exception(~w), stop_thread_exception(~w)", [Server_Thread_ID, Socket_Exception, Server_Thread_Exception])
331 )),
332 forall(retract(mqi_worker_threads(Server_Thread_ID, Communication_Thread_ID, Goal_Thread_ID)),
333 (
334 abortSilentExit(Communication_Thread_ID, CommunicationException),
335 debug(mqi(protocol), "Stopped server: ~w communication thread: ~w, exception(~w)", [Server_Thread_ID, Communication_Thread_ID, CommunicationException]),
336 abortSilentExit(Goal_Thread_ID, Goal_Exception),
337 debug(mqi(protocol), "Stopped server: ~w goal thread: ~w, exception(~w)", [Server_Thread_ID, Goal_Thread_ID, Goal_Exception])
338 )).
339
340
341start_server_thread(Run_Server_On_Thread, Server_Thread_ID, Server_Goal, Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
342 ( Run_Server_On_Thread
343 -> ( thread_create(Server_Goal, _, [ alias(Server_Thread_ID),
344 at_exit((delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
345 detach_if_expected(Server_Thread_ID)
346 ))
347 ]),
348 debug(mqi(protocol), "Started server on thread: ~w", [Server_Thread_ID])
349 )
350 ; ( Server_Goal,
351 delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File),
352 debug(mqi(protocol), "Halting.", [])
353 )
354 ).
355
356
360delete_unix_domain_socket_file(Unix_Domain_Socket_Path, Unix_Domain_Socket_Path_And_File) :-
361 ( nonvar(Unix_Domain_Socket_Path)
362 -> catch(delete_directory_and_contents(Unix_Domain_Socket_Path), error(_, _), true)
363 ; ( nonvar(Unix_Domain_Socket_Path_And_File)
364 -> catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true)
365 ; true
366 )
367 ).
368
369:- if(current_predicate(unix_domain_socket/1)). 370 optional_unix_domain_socket(Socket) :-
371 unix_domain_socket(Socket).
372:- else. 373 optional_unix_domain_socket(_).
374:- endif. 375
378bind_socket(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Port, Socket, Client_Address) :-
379 ( nonvar(Unix_Domain_Socket_Path_And_File)
380 -> debug(mqi(protocol), "Using Unix domain socket name: ~w", [Unix_Domain_Socket_Path_And_File]),
381 optional_unix_domain_socket(Socket),
382 catch(delete_file(Unix_Domain_Socket_Path_And_File), error(_, _), true),
383 tcp_bind(Socket, Unix_Domain_Socket_Path_And_File),
384 Client_Address = Unix_Domain_Socket_Path_And_File
385 ; ( tcp_socket(Socket),
386 tcp_setopt(Socket, reuseaddr),
387 tcp_bind(Socket, '127.0.0.1':Port),
388 debug(mqi(protocol), "Using TCP/IP port: ~w", ['127.0.0.1':Port]),
389 Client_Address = Port
390 )
391 ),
392 assert(mqi_thread(Server_Thread_ID, Unix_Domain_Socket_Path_And_File, Socket)).
393
396send_client_startup_data(Write_Connection_Values, Stream, Unix_Domain_Socket_Path_And_File, Port, Password) :-
397 ( Write_Connection_Values
398 -> ( ( var(Unix_Domain_Socket_Path_And_File)
399 -> format(Stream, "~d\n", [Port])
400 ; format(Stream, "~w\n", [Unix_Domain_Socket_Path_And_File])
401 ),
402 format(Stream, "~w\n", [Password]),
403 flush_output(Stream)
404 )
405 ; true
406 ).
407
408
412server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
413 debug(mqi(protocol), "Listening on address: ~w", [Address]),
414 tcp_listen(Socket, Connection_Count),
415 tcp_open_socket(Socket, AcceptFd, _),
416 create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure),
417 server_thread(Server_Thread_ID, Socket, Address, Password, Connection_Count, Encoding, Query_Timeout, Exit_Main_On_Failure).
418
419
424create_connection(Server_Thread_ID, AcceptFd, Password, Encoding, Query_Timeout, Exit_Main_On_Failure) :-
425 debug(mqi(protocol), "Waiting for client connection...", []),
426 tcp_accept(AcceptFd, Socket, _Peer),
427 debug(mqi(protocol), "Client connected", []),
428 gensym('conn', Connection_Base),
429 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_comm'], Thread_Alias),
430 atomic_list_concat([Server_Thread_ID, "_", Connection_Base, '_goal'], Goal_Alias),
431 mutex_create(Goal_Alias, [alias(Goal_Alias)]),
432 assert(mqi_worker_threads(Server_Thread_ID, Thread_Alias, Goal_Alias)),
433 thread_create(goal_thread(Thread_Alias),
434 _,
435 [alias(Goal_Alias), at_exit(detach_if_expected(Goal_Alias))]),
436 thread_create(communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Alias, Query_Timeout, Exit_Main_On_Failure),
437 _,
438 [alias(Thread_Alias), at_exit(detach_if_expected(Thread_Alias))]).
439
440
448goal_thread(Respond_To_Thread_ID) :-
449 thread_self(Self_ID),
450 throw_if_testing(Self_ID),
451 thread_get_message(Self_ID, goal(Unexpanded_Goal, Binding_List, Query_Timeout, Find_All)),
452 expand_goal(Unexpanded_Goal, Goal),
453 debug(mqi(query), "Received Findall = ~w, Query_Timeout = ~w, binding list: ~w, unexpanded: ~w, goal: ~w", [Find_All, Query_Timeout, Binding_List, Unexpanded_Goal, Goal]),
454 ( Find_All
455 -> One_Answer_Goal = findall(Binding_List, @(user:Goal, user), Answers)
456 ; One_Answer_Goal = ( findall( One_Answer,
457 ( @(user:Goal, user),
458 One_Answer = [Binding_List],
459 send_next_result(Respond_To_Thread_ID, One_Answer, _, Find_All)
460 ),
461 Answers
462 ),
463 ( Answers == []
464 -> send_next_result(Respond_To_Thread_ID, [], _, Find_All)
465 ; true
466 )
467 )
468 ),
469 Cancellable_Goal = run_cancellable_goal(Self_ID, One_Answer_Goal),
470 ( Query_Timeout == -1
471 -> catch(Cancellable_Goal, Top_Exception, true)
472 ; catch(call_with_time_limit(Query_Timeout, Cancellable_Goal), Top_Exception, true)
473 ),
474 ( var(Top_Exception)
475 -> ( Find_All
476 -> send_next_result(Respond_To_Thread_ID, Answers, _, Find_All)
477 ; send_next_result(Respond_To_Thread_ID, [], no_more_results, Find_All)
478 )
479 ; send_next_result(Respond_To_Thread_ID, [], Top_Exception, true)
480 ),
481 goal_thread(Respond_To_Thread_ID).
482
483
485throw_if_testing(Self_ID) :-
486 ( thread_peek_message(Self_ID, testThrow(Test_Exception))
487 -> ( debug(mqi(query), "TESTING: Throwing test exception: ~w", [Test_Exception]),
488 throw(Test_Exception)
489 )
490 ; true
491 ).
492
493
501run_cancellable_goal(Mutex_ID, Goal) :-
502 thread_self(Self_ID),
503 setup_call_cleanup(
504 assert(safe_to_cancel(Self_ID), Assertion),
505 Goal,
506 with_mutex(Mutex_ID, erase(Assertion))
507 ).
508
509
520communication_thread(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout, Exit_Main_On_Failure) :-
521 thread_self(Self_ID),
522 ( (
523 catch(communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout), error(Serve_Exception1, Serve_Exception2), true),
524 debug(mqi(protocol), "Session finished. Communication thread exception: ~w", [error(Serve_Exception1, Serve_Exception2)]),
525 abortSilentExit(Goal_Thread_ID, _),
526 retractall(mqi_worker_threads(Server_Thread_ID, Self_ID, Goal_Thread_ID))
527 )
528 -> Halt = (nonvar(Serve_Exception1), Exit_Main_On_Failure)
529 ; Halt = true
530 ),
531 ( Halt
532 -> ( debug(mqi(protocol), "Ending session and halting Prolog server due to thread ~w: exception(~w)", [Self_ID, error(Serve_Exception1, Serve_Exception2)]),
533 quit(_)
534 )
535 ; ( debug(mqi(protocol), "Ending session ~w", [Self_ID]),
536 catch(tcp_close_socket(Socket), error(_, _), true)
537 )
538 ).
539
540
545communication_thread_listen(Password, Socket, Encoding, Server_Thread_ID, Goal_Thread_ID, Query_Timeout) :-
546 tcp_open_socket(Socket, Read_Stream, Write_Stream),
547 thread_self(Communication_Thread_ID),
548 assert(mqi_socket(Server_Thread_ID, Communication_Thread_ID, Socket, Read_Stream, Write_Stream)),
549 set_stream(Read_Stream, encoding(Encoding)),
550 set_stream(Write_Stream, encoding(Encoding)),
551 read_message(Read_Stream, Sent_Password),
552 ( Password == Sent_Password
553 -> ( debug(mqi(protocol), "Password matched.", []),
554 thread_self(Self_ID),
555 mqi_version(Major, Minor),
556 reply(Write_Stream, true([[threads(Self_ID, Goal_Thread_ID), version(Major, Minor)]]))
557 )
558 ; ( debug(mqi(protocol), "Password mismatch, failing. ~w", [Sent_Password]),
559 reply_error(Write_Stream, password_mismatch),
560 throw(password_mismatch)
561 )
562 ),
563 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout),
564 debug(mqi(protocol), "Session finished.", []).
565
566
581process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout) :-
582 process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command),
583 ( Command == close
584 -> ( debug(mqi(protocol), "Command: close. Client closed the connection cleanly.", []),
585 true
586 )
587 ; ( Command == quit
588 -> ( debug(mqi(protocol), "Command: quit.", []),
589 false
590 )
591 ;
592 process_mqi_messages(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout)
593 )
594 ).
595
613process_mqi_message(Read_Stream, Write_Stream, Goal_Thread_ID, Query_Timeout, Command) :-
614 debug(mqi(protocol), "Waiting for next message ...", []),
615 ( state_receive_raw_message(Read_Stream, Message_String)
616 -> ( state_parse_command(Write_Stream, Message_String, Command, Binding_List)
617 -> state_process_command(Write_Stream, Goal_Thread_ID, Query_Timeout, Command, Binding_List)
618 ; true
619 )
620 ; false
621 ).
622
623
628state_receive_raw_message(Read, Command_String) :-
629 read_message(Read, Command_String),
630 debug(mqi(protocol), "Valid message: ~w", [Command_String]).
631
632
642state_parse_command(Write_Stream, Command_String, Parsed_Command, Binding_List) :-
643 ( catch(read_term_from_atom(Command_String, Parsed_Command, [variable_names(Binding_List), module(user)]), Parse_Exception, true)
644 -> ( var(Parse_Exception)
645 -> debug(mqi(protocol), "Parse Success: ~w", [Parsed_Command])
646 ; ( reply_error(Write_Stream, Parse_Exception),
647 fail
648 )
649 )
650 ; ( reply_error(Write_Stream, error(couldNotParseCommand, _)),
651 fail
652 )
653 ).
654
655
667state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(Goal, Timeout), Binding_List) :-
668 !,
669 debug(mqi(protocol), "Command: run/1. Timeout: ~w", [Timeout]),
670 repeat_until_false((
671 query_in_progress(Goal_Thread_ID),
672 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
673 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
674 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
675 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
676 )),
677 debug(mqi(protocol), "All previous results drained", []),
678 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, true),
679 heartbeat_until_result(Goal_Thread_ID, Stream, Answers),
680 reply_with_result(Goal_Thread_ID, Stream, Answers).
681
682
685state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run_async(Goal, Timeout, Find_All), Binding_List) :-
686 !,
687 debug(mqi(protocol), "Command: run_async/1.", []),
688 debug(mqi(query), " Goal: ~w", [Goal]),
689 repeat_until_false((
690 query_in_progress(Goal_Thread_ID),
691 debug(mqi(protocol), "Draining unretrieved result for ~w", [Goal_Thread_ID]),
692 heartbeat_until_result(Goal_Thread_ID, Stream, Unused_Answer),
693 debug(mqi(protocol), "Drained result for ~w", [Goal_Thread_ID]),
694 debug(mqi(query), " Discarded answer: ~w", [Unused_Answer])
695 )),
696 debug(mqi(protocol), "All previous results drained", []),
697 send_goal_to_thread(Stream, Goal_Thread_ID, Query_Timeout, Timeout, Goal, Binding_List, Find_All),
698 reply(Stream, true([[]])).
699
700
702state_process_command(Stream, Goal_Thread_ID, _, async_result(Timeout), _) :-
703 !,
704 debug(mqi(protocol), "Command: async_result, timeout: ~w.", [Timeout]),
705 ( once((var(Timeout) ; Timeout == -1))
706 -> Options = []
707 ; Options = [timeout(Timeout)]
708 ),
709 ( query_in_progress(Goal_Thread_ID)
710 -> ( ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
711 get_next_result(Goal_Thread_ID, Stream, Options, Result)
712 )
713 -> reply_with_result(Goal_Thread_ID, Stream, Result)
714 ; reply_error(Stream, result_not_available)
715 )
716 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
717 reply_error(Stream, no_query)
718 )
719 ).
720
721
728state_process_command(Stream, Goal_Thread_ID, _, cancel_async, _) :-
729 !,
730 debug(mqi(protocol), "Command: cancel_async/0.", []),
731 with_mutex(Goal_Thread_ID, (
732 ( safe_to_cancel(Goal_Thread_ID)
733 -> ( thread_signal(Goal_Thread_ID, throw(cancel_goal)),
734 reply(Stream, true([[]]))
735 )
736 ; ( query_in_progress(Goal_Thread_ID)
737 -> ( debug(mqi(protocol), "Pending query results exist for ~w", [Goal_Thread_ID]),
738 reply(Stream, true([[]]))
739 )
740 ; ( debug(mqi(protocol), "No pending query results for ~w", [Goal_Thread_ID]),
741 reply_error(Stream, no_query)
742 )
743 )
744 )
745 )).
746
747
751state_process_command(Stream, Goal_Thread_ID, Query_Timeout, testThrowGoalThread(Test_Exception), Binding_List) :-
752 !,
753 debug(mqi(protocol), "TESTING: requested goal thread unhandled exception", []),
754 thread_send_message(Goal_Thread_ID, testThrow(Test_Exception)),
755 state_process_command(Stream, Goal_Thread_ID, Query_Timeout, run(true, -1), Binding_List).
756
757
758state_process_command(Stream, _, _, close, _) :-
759 !,
760 reply(Stream, true([[]])).
761
762
763state_process_command(Stream, _, _, quit, _) :-
764 !,
765 reply(Stream, true([[]])).
766
767
769state_process_command(Stream, _, _, Command, _) :-
770 debug(mqi(protocol), "Unknown command ~w", [Command]),
771 reply_error(Stream, unknownCommand).
772
773
779heartbeat_until_result(Goal_Thread_ID, Stream, Answers) :-
780 ( get_next_result(Goal_Thread_ID, Stream, [timeout(2)], Answers)
781 -> debug(mqi(query), "Received answer from goal thread: ~w", [Answers])
782 ; ( debug(mqi(protocol), "heartbeat...", []),
783 write_heartbeat(Stream),
784 heartbeat_until_result(Goal_Thread_ID, Stream, Answers)
785 )
786 ).
787
788
792write_heartbeat(Stream) :-
793 put_char(Stream, '.'),
794 flush_output(Stream).
795
796
805send_goal_to_thread(Stream, Goal_Thread_ID, Default_Timeout, Timeout, Goal, Binding_List, Find_All) :-
806 ( var(Timeout)
807 -> Timeout = Default_Timeout
808 ; true
809 ),
810 ( var(Binding_List)
811 -> Binding_List = []
812 ; true
813 ),
814 debug(mqi(query), "Sending to goal thread with timeout = ~w: ~w", [Timeout, Goal]),
815 assert(query_in_progress(Goal_Thread_ID)),
816 catch(thread_send_message(Goal_Thread_ID, goal(Goal, Binding_List, Timeout, Find_All)), Send_Message_Exception, true),
817 ( var(Send_Message_Exception)
818 -> true
819 ; ( reply_error(Stream, connection_failed),
820 throw(Send_Message_Exception)
821 )
822 ).
823
824
826send_next_result(Respond_To_Thread_ID, Answer, Exception_In_Goal, Find_All) :-
827 ( var(Exception_In_Goal)
828 -> ( ( debug(mqi(query), "Sending result of goal to communication thread, Result: ~w", [Answer]),
829 Answer == []
830 )
831 -> thread_send_message(Respond_To_Thread_ID, result(false, Find_All))
832 ; handle_constraints(Answer, Final_Answer),
833 thread_send_message(Respond_To_Thread_ID, result(true(Final_Answer), Find_All))
834 )
835 ; ( debug(mqi(query), "Sending result of goal to communication thread, Exception: ~w", [Exception_In_Goal]),
836 thread_send_message(Respond_To_Thread_ID, result(error(Exception_In_Goal), Find_All))
837 )
838 ).
839
840
841handle_constraints(Answer, Final_Answer) :-
842 ( term_attvars(Answer, [])
843 -> Final_Answer = Answer
844 ; findall( Single_Answer_With_Attributes,
845 ( member(Single_Answer, Answer),
846 copy_term(Single_Answer, Single_Answer_Copy, Attributes),
847 append(['$residuals' = Attributes], Single_Answer_Copy, Single_Answer_With_Attributes)
848 ),
849 Final_Answer
850 ),
851 debug(mqi(query), "Constraints detected, converted: ~w to ~w", [Answer, Final_Answer])
852 ).
853
854
861get_next_result(Goal_Thread_ID, Stream, Options, Answers) :-
862 ( thread_property(Goal_Thread_ID, status(running))
863 -> true
864 ; ( reply_error(Stream, connection_failed),
865 throw(connection_failed)
866 )
867 ),
868 thread_self(Self_ID),
869 thread_get_message(Self_ID, result(Answers, Find_All), Options),
870 ( Find_All
871 -> ( debug(mqi(protocol), "Query completed and answers drained for findall ~w", [Goal_Thread_ID]),
872 retractall(query_in_progress(Goal_Thread_ID))
873 )
874 ; ( Answers = error(_)
875 -> ( debug(mqi(protocol), "Query completed and answers drained for non-findall ~w", [Goal_Thread_ID]),
876 retractall(query_in_progress(Goal_Thread_ID))
877 )
878 ; true
879 )
880 ).
881
882
885reply_with_result(_, Stream, error(Error)) :-
886 !,
887 reply_error(Stream, Error).
888
890reply_with_result(_, Stream, Result) :-
891 !,
892 catch(reply(Stream, Result), error(Exception, _), reply_with_result(_, Stream, error(Exception))).
893
894
897reply(Stream, Term) :-
898 debug(mqi(query), "Responding with Term: ~w", [Term]),
899 term_to_json_string(Term, Json_String),
900 write_message(Stream, Json_String).
901
902
905reply_error(Stream, Error_Term) :-
906 debug(mqi(query), "Responding with exception: ~w", [Error_Term]),
907 ( error(Error_Value, _) = Error_Term
908 -> Response = exception(Error_Value)
909 ; ( atom(Error_Term)
910 ->
911 Response = exception(Error_Term)
912 ; ( compound_name_arity(Error_Term, Name, _),
913 Response = exception(Name)
914 )
915 )
916 ),
917 reply(Stream, Response).
918
919
923
924
927write_message(Stream, String) :-
928 write_string_length(Stream, String),
929 write(Stream, String),
930 flush_output(Stream).
931
932
937read_message(Stream, String) :-
938 read_string_length(Stream, Length),
939 stream_property(Stream, encoding(Encoding)),
940 setup_call_cleanup(
941 stream_range_open(Stream, Tmp, [size(Length)]),
942 ( set_stream(Tmp, encoding(Encoding)),
943 read_string(Tmp, _, String)
944 ),
945 close(Tmp)).
946
947
949write_string_length(Stream, String) :-
950 stream_property(Stream, encoding(Encoding)),
951 string_encoding_length(String, Encoding, Length),
952 format(Stream, "~d.\n", [Length]).
953
954
957read_string_length(Stream, Length) :-
958 read_term(Stream, Length, []),
959 get_char(Stream, _).
960
961
963string_encoding_length(String, Encoding, Length) :-
964 setup_call_cleanup(
965 open_null_stream(Out),
966 ( set_stream(Out, encoding(Encoding)),
967 write(Out, String),
968 byte_count(Out, Length)
969 ),
970 close(Out)).
971
972
975term_to_json_string(Term, Json_String) :-
976 term_to_json(Term, Json),
977 with_output_to(string(Json_String),
978 ( current_output(Stream),
979 json_write(Stream, Json),
980 put(Stream, '\n')
981 )).
982
983
986repeat_until_false(Goal) :-
987 (\+ (\+ Goal)), !, repeat_until_false(Goal).
988repeat_until_false(_).
989
990
997abortSilentExit(Thread_ID, Exception) :-
998 catch(thread_signal(Thread_ID, abort), error(Exception, _), true),
999 debug(mqi(protocol), "Attempting to abort thread: ~w. thread_signal_exception: ~w", [Thread_ID, Exception]).
1000
1001
1008detach_if_expected(Thread_ID) :-
1009 thread_property(Thread_ID, status(Status)),
1010 debug(mqi(protocol), "Thread ~w exited with status ~w", [Thread_ID, Status]),
1011 ( once((Status = true ; Status = false))
1012 -> ( debug(mqi(protocol), "Expected thread status, detaching thread ~w", [Thread_ID]),
1013 thread_detach(Thread_ID)
1014 )
1015 ; true
1016 ).
1017
1018
1019write_output_to_file(File) :-
1020 debug(mqi(protocol), "Writing all STDOUT and STDERR to file:~w", [File]),
1021 open(File, write, Stream, [buffer(false)]),
1022 set_prolog_IO(user_input, Stream, Stream).
1023
1024
1043unix_domain_socket_path(Created_Directory, File_Path) :-
1044 tmp_file(udsock, Created_Directory),
1045 make_directory(Created_Directory),
1046 catch( chmod(Created_Directory, urwx),
1047 Exception,
1048 ( catch(delete_directory(Created_Directory), error(_, _), true),
1049 throw(Exception)
1050 )
1051 ),
1052 setup_call_cleanup( ( current_prolog_flag(tmp_dir, Save_Tmp_Dir),
1053 set_prolog_flag(tmp_dir, Created_Directory)
1054 ),
1055 tmp_file_stream(File_Path, Stream, []),
1056 set_prolog_flag(tmp_dir, Save_Tmp_Dir)
1057 ),
1058 close(Stream)