1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2007-2025, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread, 39 [ concurrent/3, % +Threads, :Goals, +Options 40 concurrent_maplist/2, % :Goal, +List 41 concurrent_maplist/3, % :Goal, ?List1, ?List2 42 concurrent_maplist/4, % :Goal, ?List1, ?List2, ?List3 43 concurrent_forall/2, % :Generate, :Test 44 concurrent_forall/3, % :Generate, :Test, +Options 45 concurrent_and/2, % :Generator,:Test 46 concurrent_and/3, % :Generator,:Test,+Options 47 first_solution/3, % -Var, :Goals, +Options 48 49 call_in_thread/2, % +Thread, :Goal 50 call_in_thread/3 % +Thread, :Goal, +Options 51 ]). 52:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]). 53:- autoload(library(error), [must_be/2, instantiation_error/1]). 54:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]). 55:- autoload(library(option), [option/2, option/3, meta_options/3]). 56:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]). 57:- use_module(library(debug), [debug/3, assertion/1]). 58 59%:- debug(concurrent). 60 61:- meta_predicate 62 concurrent(, , ), 63 concurrent_maplist(, ), 64 concurrent_maplist(, , ), 65 concurrent_maplist(, , , ), 66 concurrent_forall(, ), 67 concurrent_forall(, , ), 68 concurrent_and(, ), 69 concurrent_and(, , ), 70 first_solution(, , ), 71 call_in_thread(, ), 72 call_in_thread(, , ). 73 74 75:- predicate_options(concurrent/3, 3, 76 [ pass_to(system:thread_create/3, 3) 77 ]). 78:- predicate_options(concurrent_forall/3, 3, 79 [ threads(nonneg) 80 ]). 81:- predicate_options(concurrent_and/3, 3, 82 [ threads(nonneg) 83 ]). 84:- predicate_options(first_solution/3, 3, 85 [ on_fail(oneof([stop,continue])), 86 on_error(oneof([stop,continue])), 87 pass_to(system:thread_create/3, 3) 88 ]).
Execution succeeds if all goals have succeeded. If one goal fails or throws an exception, other workers are abandoned as soon as possible and the entire computation fails or re-throws the exception. Note that if multiple goals fail or raise an error it is not defined which error or failure is reported.
On successful completion, variable bindings are returned. Note however that threads have independent stacks and therefore the goal is copied to the worker thread and the result is copied back to the caller of concurrent/3.
Choosing the right number of threads is not always obvious. Here are some scenarios:
167concurrent(1, M:List, _) :- 168 !, 169 maplist(once_in_module(M), List). 170concurrent(N, M:List, Options) :- 171 must_be(positive_integer, N), 172 must_be(list(callable), List), 173 length(List, JobCount), 174 message_queue_create(Done), 175 message_queue_create(Queue), 176 WorkerCount is min(N, JobCount), 177 create_workers(WorkerCount, Queue, Done, Workers, Options), 178 submit_goals(List, 1, M, Queue, VarList), 179 forall(between(1, WorkerCount, _), 180 thread_send_message(Queue, done)), 181 VT =.. [vars|VarList], 182 concur_wait(JobCount, Done, VT, cleanup(Workers, Queue), 183 Result, [], Exitted), 184 subtract(Workers, Exitted, RemainingWorkers), 185 concur_cleanup(Result, RemainingWorkers, [Queue, Done]), 186 ( Result == true 187 -> true 188 ; Result = false 189 -> fail 190 ; Result = exception(Error) 191 -> throw(Error) 192 ). 193 194once_in_module(M, Goal) :- 195 call(M:Goal), !.
goal(Id, Goal, Vars). Vars is unified with a list of
lists of free variables appearing in each goal.203submit_goals([], _, _, _, []). 204submit_goals([H|T], I, M, Queue, [Vars|VT]) :- 205 term_variables(H, Vars), 206 thread_send_message(Queue, goal(I, M:H, Vars)), 207 I2 is I + 1, 208 submit_goals(T, I2, M, Queue, VT).
219concur_wait(0, _, _, _, true, Exited, Exited) :- !. 220concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :- 221 debug(concurrent, 'Concurrent: waiting for workers ...', []), 222 catch(thread_get_message(Done, Exit), Error, 223 concur_abort(Error, Cleanup, Done, Exitted0)), 224 debug(concurrent, 'Waiting: received ~p', [Exit]), 225 ( Exit = done(Id, Vars) 226 -> debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]), 227 arg(Id, VT, Vars), 228 N2 is N - 1, 229 concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted) 230 ; Exit = finished(Thread) 231 -> thread_join(Thread, JoinStatus), 232 debug(concurrent, 'Concurrent: waiter ~p joined: ~p', 233 [Thread, JoinStatus]), 234 ( JoinStatus == true 235 -> concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted) 236 ; Status = JoinStatus, 237 Exitted = [Thread|Exitted0] 238 ) 239 ). 240 241concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :- 242 debug(concurrent, 'Concurrent: got ~p', [Error]), 243 subtract(Workers, Exitted, RemainingWorkers), 244 concur_cleanup(Error, RemainingWorkers, [Queue, Done]), 245 throw(Error). 246 247create_workers(N, Queue, Done, [Id|Ids], Options) :- 248 N > 0, 249 !, 250 thread_create(worker(Queue, Done), Id, 251 [ at_exit(thread_send_message(Done, finished(Id))) 252 | Options 253 ]), 254 N2 is N - 1, 255 create_workers(N2, Queue, Done, Ids, Options). 256create_workers(_, _, _, [], _).
263worker(Queue, Done) :-
264 thread_get_message(Queue, Message),
265 debug(concurrent, 'Worker: received ~p', [Message]),
266 ( Message = goal(Id, Goal, Vars)
267 -> (
268 -> thread_send_message(Done, done(Id, Vars)),
269 worker(Queue, Done)
270 )
271 ; true
272 ).true, signal all workers to make them stop prematurely. If
result is true we assume all workers have been instructed to
stop or have stopped themselves.282concur_cleanup(Result, Workers, Queues) :- 283 !, 284 ( Result == true 285 -> true 286 ; kill_workers(Workers) 287 ), 288 join_all(Workers), 289 maplist(message_queue_destroy, Queues). 290 291kill_workers([]). 292kill_workers([Id|T]) :- 293 debug(concurrent, 'Signalling ~w', [Id]), 294 catch(thread_signal(Id, abort), _, true), 295 kill_workers(T). 296 297join_all([]). 298join_all([Id|T]) :- 299 thread_join(Id, _), 300 join_all(T). 301 302 303 /******************************* 304 * FORALL * 305 *******************************/
cpu_count.326:- dynamic 327 fa_aborted/1. 328 329concurrent_forall(Generate, Test) :- 330 concurrent_forall(Generate, Test, []). 331 332concurrent_forall(Generate, Test, Options) :- 333 jobs(Jobs, Options), 334 Jobs > 1, 335 !, 336 term_variables(Generate, GVars), 337 term_variables(Test, TVars), 338 sort(GVars, GVarsS), 339 sort(TVars, TVarsS), 340 ord_intersection(GVarsS, TVarsS, Shared), 341 Templ =.. [v|Shared], 342 MaxSize is Jobs*4, 343 message_queue_create(Q, [max_size(MaxSize)]), 344 length(Workers, Jobs), 345 thread_self(Me), 346 maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers), 347 catch(( forall(Generate, 348 thread_send_message(Q, job(Templ))), 349 forall(between(1, Jobs, _), 350 thread_send_message(Q, done)), 351 maplist(thread_join, Workers), 352 message_queue_destroy(Q) 353 ), 354 Error, 355 fa_cleanup(Error, Workers, Q)). 356concurrent_forall(Generate, Test, _) :- 357 forall(Generate, Test). 358 359fa_cleanup(Error, Workers, Q) :- 360 maplist(safe_abort, Workers), 361 debug(concurrent(fail), 'Joining workers', []), 362 maplist(safe_join, Workers), 363 debug(concurrent(fail), 'Destroying queue', []), 364 retractall(fa_aborted(Q)), 365 message_queue_destroy(Q), 366 ( Error = fa_worker_failed(_0Test, Why) 367 -> debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]), 368 ( Why == false 369 -> fail 370 ; Why = error(E) 371 -> throw(E) 372 ; assertion(fail) 373 ) 374 ; throw(Error) 375 ). 376 377fa_worker(Queue, Main, Templ, Test) :- 378 repeat, 379 thread_get_message(Queue, Msg), 380 ( Msg == done 381 -> ! 382 ; Msg = job(Templ), 383 debug(concurrent, 'Running test ~p', [Test]), 384 ( catch_with_backtrace(Test, E, true) 385 -> ( var(E) 386 -> fail 387 ; fa_stop(Queue, Main, fa_worker_failed(Test, error(E))) 388 ) 389 ; !, 390 fa_stop(Queue, Main, fa_worker_failed(Test, false)) 391 ) 392 ). 393 394fa_stop(Queue, Main, Why) :- 395 with_mutex('$concurrent_forall', 396 fa_stop_sync(Queue, Main, Why)). 397 398fa_stop_sync(Queue, _Main, _Why) :- 399 fa_aborted(Queue), 400 !. 401fa_stop_sync(Queue, Main, Why) :- 402 asserta(fa_aborted(Queue)), 403 debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]), 404 thread_signal(Main, throw(Why)). 405 406jobs(Jobs, Options) :- 407 ( option(threads(Jobs), Options) 408 -> true 409 ; current_prolog_flag(cpu_count, Jobs) 410 -> true 411 ; Jobs = 1 412 ). 413 414safe_abort(Thread) :- 415 catch(thread_signal(Thread, abort), error(_,_), true). 416safe_join(Thread) :- 417 E = error(_,_), 418 catch(thread_join(Thread, _Status), E, true). 419 420 421 /******************************* 422 * AND * 423 *******************************/
(Generator,Test). This predicate creates a
thread providing solutions for Generator that are handed to a pool
of threads that run Test for the different instantiations provided
by Generator concurrently. The predicate is logically equivalent to
a simple conjunction except for two aspects: (1) terms are copied
from Generator to the test Test threads while answers are copied
back to the calling thread and (2) answers may be produced out of
order.
If the evaluation of some Test raises an exception, concurrent_and/2,3 is terminated with this exception. If the caller commits after a given answer or raises an exception while concurrent_and/2,3 is active with pending choice points, all involved resources are reclaimed.
Options:
cpu_count.
This predicate was proposed by Jan Burse as
balance((Generator,Test)).
452concurrent_and(Gen, Test) :- 453 concurrent_and(Gen, Test, []). 454 455concurrent_and(Gen, Test, Options) :- 456 jobs(Jobs, Options), 457 MaxSize is Jobs*4, 458 message_queue_create(JobQueue, [max_size(MaxSize)]), 459 message_queue_create(AnswerQueue, [max_size(MaxSize)]), 460 ca_template(Gen, Test, Templ), 461 term_variables(Gen+Test, AllVars), 462 ReplyTempl =.. [v|AllVars], 463 length(Workers, Jobs), 464 Alive is 1<<Jobs-1, 465 maplist(thread_create(ca_worker(JobQueue, AnswerQueue, 466 Templ, Test, ReplyTempl)), 467 Workers), 468 thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue), 469 GenThread), 470 State = state(Alive), 471 call_cleanup( 472 ca_gather(State, AnswerQueue, ReplyTempl, Workers), 473 ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)). 474 475ca_gather(State, AnswerQueue, ReplyTempl, Workers) :- 476 repeat, 477 thread_get_message(AnswerQueue, Msg), 478 ( Msg = true(ReplyTempl) 479 -> true 480 ; Msg = done(Worker) 481 -> nth0(Done, Workers, Worker), 482 arg(1, State, Alive0), 483 Alive1 is Alive0 /\ \(1<<Done), 484 debug(concurrent(and), 'Alive = ~2r', [Alive1]), 485 ( Alive1 =:= 0 486 -> !, 487 fail 488 ; nb_setarg(1, State, Alive1), 489 fail 490 ) 491 ; Msg = error(E) 492 -> throw(E) 493 ). 494 495ca_template(Gen, Test, Templ) :- 496 term_variables(Gen, GVars), 497 term_variables(Test, TVars), 498 sort(GVars, GVarsS), 499 sort(TVars, TVarsS), 500 ord_intersection(GVarsS, TVarsS, Shared), 501 ord_union(GVarsS, Shared, TemplVars), 502 Templ =.. [v|TemplVars]. 503 504ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :- 505 thread_self(Me), 506 EG = error(existence_error(message_queue, _), _), 507 repeat, 508 catch(thread_get_message(JobQueue, Req), EG, Req=all_done), 509 ( Req = job(Templ) 510 -> ( catch(Test, E, true), 511 ( var(E) 512 -> thread_send_message(AnswerQueue, true(ReplyTempl)) 513 ; thread_send_message(AnswerQueue, error(E)) 514 ), 515 fail 516 ) 517 ; Req == done 518 -> !, 519 message_queue_destroy(JobQueue), 520 thread_send_message(AnswerQueue, done(Me)) 521 ; assertion(Req == all_done) 522 -> !, 523 thread_send_message(AnswerQueue, done(Me)) 524 ). 525 526ca_generator(Gen, Templ, JobQueue, AnswerQueue) :- 527 ( catch(Gen, E, true), 528 ( var(E) 529 -> thread_send_message(JobQueue, job(Templ)) 530 ; thread_send_message(AnswerQueue, error(E)) 531 ), 532 fail 533 ; thread_send_message(JobQueue, done) 534 ). 535 536ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :- 537 safe_abort(GenThread), 538 safe_join(GenThread), 539 maplist(safe_abort, Workers), 540 maplist(safe_join, Workers), 541 message_queue_destroy(AnswerQueue), 542 catch(message_queue_destroy(JobQueue), error(_,_), true). 543 544 545 /******************************* 546 * MAPLIST * 547 *******************************/
cpu_count. If
this flag is absent or 1 or List has less than two elements, this
predicate calls the corresponding maplist/N version using a wrapper
based on once/1. Note that all goals are executed as if wrapped in
once/1 and therefore these predicates are semidet.
Note that the the overhead of this predicate is considerable and therefore Goal must be fairly expensive before one reaches a speedup.
566concurrent_maplist(Goal, List) :- 567 workers(List, WorkerCount), 568 !, 569 maplist(ml_goal(Goal), List, Goals), 570 concurrent(WorkerCount, Goals, []). 571concurrent_maplist(M:Goal, List) :- 572 maplist(once_in_module(M, Goal), List). 573 574once_in_module(M, Goal, Arg) :- 575 call(M:Goal, Arg), !. 576 577ml_goal(Goal, Elem, call(Goal, Elem)). 578 579concurrent_maplist(Goal, List1, List2) :- 580 same_length(List1, List2), 581 workers(List1, WorkerCount), 582 !, 583 maplist(ml_goal(Goal), List1, List2, Goals), 584 concurrent(WorkerCount, Goals, []). 585concurrent_maplist(M:Goal, List1, List2) :- 586 maplist(once_in_module(M, Goal), List1, List2). 587 588once_in_module(M, Goal, Arg1, Arg2) :- 589 call(M:Goal, Arg1, Arg2), !. 590 591ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)). 592 593concurrent_maplist(Goal, List1, List2, List3) :- 594 same_length(List1, List2, List3), 595 workers(List1, WorkerCount), 596 !, 597 maplist(ml_goal(Goal), List1, List2, List3, Goals), 598 concurrent(WorkerCount, Goals, []). 599concurrent_maplist(M:Goal, List1, List2, List3) :- 600 maplist(once_in_module(M, Goal), List1, List2, List3). 601 602once_in_module(M, Goal, Arg1, Arg2, Arg3) :- 603 call(M:Goal, Arg1, Arg2, Arg3), !. 604 605ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)). 606 607workers(List, Count) :- 608 current_prolog_flag(cpu_count, Cores), 609 Cores > 1, 610 length(List, Len), 611 Count is min(Cores,Len), 612 Count > 1, 613 !. 614 615same_length([], [], []). 616same_length([_|T1], [_|T2], [_|T3]) :- 617 same_length(T1, T2, T3). 618 619 620 /******************************* 621 * FIRST * 622 *******************************/
For example, if it is unclear whether it is better to search a graph breadth-first or depth-first we can use:
search_graph(Grap, Path) :-
first_solution(Path, [ breadth_first(Graph, Path),
depth_first(Graph, Path)
],
[]).
Options include thread stack-sizes passed to thread_create, as
well as the options on_fail and on_error that specify what
to do if a solver fails or triggers an error. By default
execution of all solvers is terminated and the result is
returned. Sometimes one may wish to continue. One such scenario
is if one of the solvers may run out of resources or one of the
solvers is known to be incomplete.
stop (default), terminate all threads and stop with
the failure. If continue, keep waiting.662first_solution(X, M:List, Options) :- 663 message_queue_create(Done), 664 thread_options(Options, ThreadOptions, RestOptions), 665 length(List, JobCount), 666 create_solvers(List, M, X, Done, Solvers, ThreadOptions), 667 wait_for_one(JobCount, Done, Result, RestOptions), 668 concur_cleanup(kill, Solvers, [Done]), 669 ( Result = done(_, Var) 670 -> X = Var 671 ; Result = error(_, Error) 672 -> throw(Error) 673 ). 674 675create_solvers([], _, _, _, [], _). 676create_solvers([H|T], M, X, Done, [Id|IDs], Options) :- 677 thread_create(solve(M:H, X, Done), Id, Options), 678 create_solvers(T, M, X, Done, IDs, Options). 679 680solve(Goal, Var, Queue) :- 681 thread_self(Me), 682 ( catch(Goal, E, true) 683 -> ( var(E) 684 -> thread_send_message(Queue, done(Me, Var)) 685 ; thread_send_message(Queue, error(Me, E)) 686 ) 687 ; thread_send_message(Queue, failed(Me)) 688 ). 689 690wait_for_one(0, _, failed, _) :- !. 691wait_for_one(JobCount, Queue, Result, Options) :- 692 thread_get_message(Queue, Msg), 693 LeftCount is JobCount - 1, 694 ( Msg = done(_, _) 695 -> Result = Msg 696 ; Msg = failed(_) 697 -> ( option(on_fail(stop), Options, stop) 698 -> Result = Msg 699 ; wait_for_one(LeftCount, Queue, Result, Options) 700 ) 701 ; Msg = error(_, _) 702 -> ( option(on_error(stop), Options, stop) 703 -> Result = Msg 704 ; wait_for_one(LeftCount, Queue, Result, Options) 705 ) 706 ).
thread(-size) options and other
options.714thread_options([], [], []). 715thread_options([H|T], [H|Th], O) :- 716 thread_option(H), 717 !, 718 thread_options(T, Th, O). 719thread_options([H|T], Th, [H|O]) :- 720 thread_options(T, Th, O). 721 722thread_option(local(_)). 723thread_option(global(_)). 724thread_option(trail(_)). 725thread_option(argument(_)). 726thread_option(stack(_)).
stop(Reason)
exception into Goal. Interrupts can be nested, i.e., it is allowed
to run a call_in_thread/2 while the target thread is processing such
an interrupt.
Options are passed to thread_get_message/3 and notably allow for
specifying a timeout. If a timeout is reached, this predicate will
attempt to kill Goal in Thread and act according to the option
on_timeout.
timeout(Time), or deadline(Stamp)
option, call Goal. The default is throw(time_limit_exceeded).This predicate is primarily intended for debugging and inspection tasks.
750call_in_thread(Thread, Goal) :- 751 call_in_thread(Thread, Goal, []). 752 753call_in_thread(Thread, Goal, _) :- 754 must_be(callable, Goal), 755 var(Thread), 756 !, 757 instantiation_error(Thread). 758call_in_thread(Thread, Goal, _) :- 759 thread_self(Thread), 760 !, 761 once(Goal). 762call_in_thread(Thread, Goal, Options) :- 763 meta_options(is_meta, Options, Options1), 764 term_variables(Goal, Vars), 765 thread_self(Me), 766 A is random(1 000 000 000), 767 thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)), 768 ( catch(thread_get_message(Me, in_thread(A,Result), Options1), 769 Error, 770 forward_exception(Thread, A, Error)) 771 -> ( Result = true(Vars) 772 -> true 773 ; Result = error(Error) 774 -> throw(Error) 775 ; fail 776 ) 777 ; thread_signal(Thread, kill_task(A, stop(time_limit_exceeded))), 778 option(on_timeout(Action), Options1, throw(time_limit_exceeded)), 779 call(Action) 780 ). 781 782is_meta(on_timeout). 783 784run_in_thread(Goal, Vars, Id, Sender) :- 785 ( catch_with_backtrace(call(Goal), Error, true) 786 -> ( var(Error) 787 -> thread_send_message(Sender, in_thread(Id, true(Vars))) 788 ; Error = stop(_) 789 -> true 790 ; thread_send_message(Sender, in_thread(Id, error(Error))) 791 ) 792 ; thread_send_message(Sender, in_thread(Id, false)) 793 ). 794 795forward_exception(Thread, Id, Error) :- 796 kill_with(Error, Kill), 797 thread_signal(Thread, kill_task(Id, Kill)), 798 throw(Error). 799 800kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :- 801 !. 802kill_with(_, stop(interrupt)). 803 804kill_task(Id, Exception) :- 805 prolog_current_frame(Frame), 806 prolog_frame_attribute(Frame, parent_goal, 807 run_in_thread(_Goal, _Vars, Id, _Sender)), 808 !, 809 throw(Exception). 810kill_task(_, _)
High level thread primitives
This module defines simple to use predicates for running goals concurrently. Where the core multi-threaded API is targeted at communicating long-living threads, the predicates here are defined to run goals concurrently without having to deal with thread creation and maintenance explicitely.
Note that these predicates run goals concurrently and therefore these goals need to be thread-safe. As the predicates in this module also abort branches of the computation that are no longer needed, predicates that have side-effect must act properly. In a nutshell, this has the following consequences: