View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2007-2025, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread,
   39          [ concurrent/3,               % +Threads, :Goals, +Options
   40            concurrent_maplist/2,       % :Goal, +List
   41            concurrent_maplist/3,       % :Goal, ?List1, ?List2
   42            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
   43            concurrent_forall/2,        % :Generate, :Test
   44            concurrent_forall/3,        % :Generate, :Test, +Options
   45            concurrent_and/2,           % :Generator,:Test
   46            concurrent_and/3,           % :Generator,:Test,+Options
   47            first_solution/3,           % -Var, :Goals, +Options
   48
   49            call_in_thread/2,           % +Thread, :Goal
   50            call_in_thread/3            % +Thread, :Goal, +Options
   51          ]).   52:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]).   53:- autoload(library(error), [must_be/2, instantiation_error/1]).   54:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]).   55:- autoload(library(option), [option/2, option/3, meta_options/3]).   56:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]).   57:- use_module(library(debug), [debug/3, assertion/1]).   58
   59%:- debug(concurrent).
   60
   61:- meta_predicate
   62    concurrent(+, :, +),
   63    concurrent_maplist(1, +),
   64    concurrent_maplist(2, ?, ?),
   65    concurrent_maplist(3, ?, ?, ?),
   66    concurrent_forall(0, 0),
   67    concurrent_forall(0, 0, +),
   68    concurrent_and(0, 0),
   69    concurrent_and(0, 0, +),
   70    first_solution(-, :, +),
   71    call_in_thread(+, 0),
   72    call_in_thread(+, 0, :).   73
   74
   75:- predicate_options(concurrent/3, 3,
   76                     [ pass_to(system:thread_create/3, 3)
   77                     ]).   78:- predicate_options(concurrent_forall/3, 3,
   79                     [ threads(nonneg)
   80                     ]).   81:- predicate_options(concurrent_and/3, 3,
   82                     [ threads(nonneg)
   83                     ]).   84:- predicate_options(first_solution/3, 3,
   85                     [ on_fail(oneof([stop,continue])),
   86                       on_error(oneof([stop,continue])),
   87                       pass_to(system:thread_create/3, 3)
   88                     ]).

High level thread primitives

This module defines simple to use predicates for running goals concurrently. Where the core multi-threaded API is targeted at communicating long-living threads, the predicates here are defined to run goals concurrently without having to deal with thread creation and maintenance explicitely.

Note that these predicates run goals concurrently and therefore these goals need to be thread-safe. As the predicates in this module also abort branches of the computation that are no longer needed, predicates that have side-effect must act properly. In a nutshell, this has the following consequences:

author
- Jan Wielemaker */
 concurrent(+N, :Goals, +Options) is semidet
Run Goals in parallel using N threads. This call blocks until all work has been done. The Goals must be independent. They should not communicate using shared variables or any form of global data. All Goals must be thread-safe.

Execution succeeds if all goals have succeeded. If one goal fails or throws an exception, other workers are abandoned as soon as possible and the entire computation fails or re-throws the exception. Note that if multiple goals fail or raise an error it is not defined which error or failure is reported.

On successful completion, variable bindings are returned. Note however that threads have independent stacks and therefore the goal is copied to the worker thread and the result is copied back to the caller of concurrent/3.

Choosing the right number of threads is not always obvious. Here are some scenarios:

Arguments:
N- Number of worker-threads to create. Using 1, no threads are created. If N is larger than the number of Goals we create exactly as many threads as there are Goals.
Goals- List of callable terms.
Options- Passed to thread_create/3 for creating the workers. Only options changing the stack-sizes can be used. In particular, do not pass the detached or alias options.
See also
- In many cases, concurrent_maplist/2 and friends is easier to program and is tractable to program analysis.
  167concurrent(1, M:List, _) :-
  168    !,
  169    maplist(once_in_module(M), List).
  170concurrent(N, M:List, Options) :-
  171    must_be(positive_integer, N),
  172    must_be(list(callable), List),
  173    length(List, JobCount),
  174    message_queue_create(Done),
  175    message_queue_create(Queue),
  176    WorkerCount is min(N, JobCount),
  177    create_workers(WorkerCount, Queue, Done, Workers, Options),
  178    submit_goals(List, 1, M, Queue, VarList),
  179    forall(between(1, WorkerCount, _),
  180           thread_send_message(Queue, done)),
  181    VT =.. [vars|VarList],
  182    concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
  183                Result, [], Exitted),
  184    subtract(Workers, Exitted, RemainingWorkers),
  185    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
  186    (   Result == true
  187    ->  true
  188    ;   Result = false
  189    ->  fail
  190    ;   Result = exception(Error)
  191    ->  throw(Error)
  192    ).
  193
  194once_in_module(M, Goal) :-
  195    call(M:Goal), !.
 submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det
Send all jobs from List to Queue. Each goal is added to Queue as a term goal(Id, Goal, Vars). Vars is unified with a list of lists of free variables appearing in each goal.
  203submit_goals([], _, _, _, []).
  204submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
  205    term_variables(H, Vars),
  206    thread_send_message(Queue, goal(I, M:H, Vars)),
  207    I2 is I + 1,
  208    submit_goals(T, I2, M, Queue, VT).
 concur_wait(+N, +Done:queue, +VT:compound, +Cleanup, -Result, +Exitted0, -Exitted) is semidet
Wait for completion, failure or error.
Arguments:
Exited- List of thread-ids with threads that completed before all work was done.
  219concur_wait(0, _, _, _, true, Exited, Exited) :- !.
  220concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
  221    debug(concurrent, 'Concurrent: waiting for workers ...', []),
  222    catch(thread_get_message(Done, Exit), Error,
  223          concur_abort(Error, Cleanup, Done, Exitted0)),
  224    debug(concurrent, 'Waiting: received ~p', [Exit]),
  225    (   Exit = done(Id, Vars)
  226    ->  debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
  227        arg(Id, VT, Vars),
  228        N2 is N - 1,
  229        concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
  230    ;   Exit = finished(Thread)
  231    ->  thread_join(Thread, JoinStatus),
  232        debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
  233              [Thread, JoinStatus]),
  234        (   JoinStatus == true
  235        ->  concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
  236        ;   Status = JoinStatus,
  237            Exitted = [Thread|Exitted0]
  238        )
  239    ).
  240
  241concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
  242    debug(concurrent, 'Concurrent: got ~p', [Error]),
  243    subtract(Workers, Exitted, RemainingWorkers),
  244    concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
  245    throw(Error).
  246
  247create_workers(N, Queue, Done, [Id|Ids], Options) :-
  248    N > 0,
  249    !,
  250    thread_create(worker(Queue, Done), Id,
  251                  [ at_exit(thread_send_message(Done, finished(Id)))
  252                  | Options
  253                  ]),
  254    N2 is N - 1,
  255    create_workers(N2, Queue, Done, Ids, Options).
  256create_workers(_, _, _, [], _).
 worker(+WorkQueue, +DoneQueue) is det
Process jobs from WorkQueue and send the results to DoneQueue.
  263worker(Queue, Done) :-
  264    thread_get_message(Queue, Message),
  265    debug(concurrent, 'Worker: received ~p', [Message]),
  266    (   Message = goal(Id, Goal, Vars)
  267    ->  (   Goal
  268        ->  thread_send_message(Done, done(Id, Vars)),
  269            worker(Queue, Done)
  270        )
  271    ;   true
  272    ).
 concur_cleanup(+Result, +Workers:list, +Queues:list) is det
Cleanup the concurrent workers and message queues. If Result is not true, signal all workers to make them stop prematurely. If result is true we assume all workers have been instructed to stop or have stopped themselves.
  282concur_cleanup(Result, Workers, Queues) :-
  283    !,
  284    (   Result == true
  285    ->  true
  286    ;   kill_workers(Workers)
  287    ),
  288    join_all(Workers),
  289    maplist(message_queue_destroy, Queues).
  290
  291kill_workers([]).
  292kill_workers([Id|T]) :-
  293    debug(concurrent, 'Signalling ~w', [Id]),
  294    catch(thread_signal(Id, abort), _, true),
  295    kill_workers(T).
  296
  297join_all([]).
  298join_all([Id|T]) :-
  299    thread_join(Id, _),
  300    join_all(T).
  301
  302
  303		 /*******************************
  304		 *             FORALL		*
  305		 *******************************/
 concurrent_forall(:Generate, :Action) is semidet
 concurrent_forall(:Generate, :Action, +Options) is semidet
True when Action is true for all solutions of Generate. This has the same semantics as forall/2, but the Action goals are executed in multiple threads. Notable a failing Action or a Action throwing an exception signals the calling thread which in turn aborts all workers and fails or re-throws the generated error. Options:
threads(+Count)
Number of threads to use. The default is determined by the Prolog flag cpu_count.
To be done
- Ideally we would grow the set of workers dynamically, similar to dynamic scheduling of HTTP worker threads. This would avoid creating threads that are never used if Generate is too slow or does not provide enough answers and would further raise the number of threads if Action is I/O bound rather than CPU bound.
  326:- dynamic
  327    fa_aborted/1.  328
  329concurrent_forall(Generate, Test) :-
  330    concurrent_forall(Generate, Test, []).
  331
  332concurrent_forall(Generate, Test, Options) :-
  333    jobs(Jobs, Options),
  334    Jobs > 1,
  335    !,
  336    term_variables(Generate, GVars),
  337    term_variables(Test, TVars),
  338    sort(GVars, GVarsS),
  339    sort(TVars, TVarsS),
  340    ord_intersection(GVarsS, TVarsS, Shared),
  341    Templ =.. [v|Shared],
  342    MaxSize is Jobs*4,
  343    message_queue_create(Q, [max_size(MaxSize)]),
  344    length(Workers, Jobs),
  345    thread_self(Me),
  346    maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
  347    catch(( forall(Generate,
  348                   thread_send_message(Q, job(Templ))),
  349            forall(between(1, Jobs, _),
  350                   thread_send_message(Q, done)),
  351            maplist(thread_join, Workers),
  352            message_queue_destroy(Q)
  353          ),
  354          Error,
  355          fa_cleanup(Error, Workers, Q)).
  356concurrent_forall(Generate, Test, _) :-
  357    forall(Generate, Test).
  358
  359fa_cleanup(Error, Workers, Q) :-
  360    maplist(safe_abort, Workers),
  361    debug(concurrent(fail), 'Joining workers', []),
  362    maplist(safe_join, Workers),
  363    debug(concurrent(fail), 'Destroying queue', []),
  364    retractall(fa_aborted(Q)),
  365    message_queue_destroy(Q),
  366    (   Error = fa_worker_failed(_0Test, Why)
  367    ->  debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]),
  368        (   Why == false
  369        ->  fail
  370        ;   Why = error(E)
  371        ->  throw(E)
  372        ;   assertion(fail)
  373        )
  374    ;   throw(Error)
  375    ).
  376
  377fa_worker(Queue, Main, Templ, Test) :-
  378    repeat,
  379    thread_get_message(Queue, Msg),
  380    (   Msg == done
  381    ->  !
  382    ;   Msg = job(Templ),
  383        debug(concurrent, 'Running test ~p', [Test]),
  384        (   catch_with_backtrace(Test, E, true)
  385        ->  (   var(E)
  386            ->  fail
  387            ;   fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
  388            )
  389        ;   !,
  390            fa_stop(Queue, Main, fa_worker_failed(Test, false))
  391        )
  392    ).
  393
  394fa_stop(Queue, Main, Why) :-
  395    with_mutex('$concurrent_forall',
  396               fa_stop_sync(Queue, Main, Why)).
  397
  398fa_stop_sync(Queue, _Main, _Why) :-
  399    fa_aborted(Queue),
  400    !.
  401fa_stop_sync(Queue, Main, Why) :-
  402    asserta(fa_aborted(Queue)),
  403    debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
  404    thread_signal(Main, throw(Why)).
  405
  406jobs(Jobs, Options) :-
  407    (   option(threads(Jobs), Options)
  408    ->  true
  409    ;   current_prolog_flag(cpu_count, Jobs)
  410    ->  true
  411    ;   Jobs = 1
  412    ).
  413
  414safe_abort(Thread) :-
  415    catch(thread_signal(Thread, abort), error(_,_), true).
  416safe_join(Thread) :-
  417    E = error(_,_),
  418    catch(thread_join(Thread, _Status), E, true).
  419
  420
  421		 /*******************************
  422		 *              AND		*
  423		 *******************************/
 concurrent_and(:Generator, :Test)
 concurrent_and(:Generator, :Test, +Options)
Concurrent version of (Generator,Test). This predicate creates a thread providing solutions for Generator that are handed to a pool of threads that run Test for the different instantiations provided by Generator concurrently. The predicate is logically equivalent to a simple conjunction except for two aspects: (1) terms are copied from Generator to the test Test threads while answers are copied back to the calling thread and (2) answers may be produced out of order.

If the evaluation of some Test raises an exception, concurrent_and/2,3 is terminated with this exception. If the caller commits after a given answer or raises an exception while concurrent_and/2,3 is active with pending choice points, all involved resources are reclaimed.

Options:

threads(+Count)
Create a worker pool holding Count threads. The default is the Prolog flag cpu_count.

This predicate was proposed by Jan Burse as balance((Generator,Test)).

  452concurrent_and(Gen, Test) :-
  453    concurrent_and(Gen, Test, []).
  454
  455concurrent_and(Gen, Test, Options) :-
  456    jobs(Jobs, Options),
  457    MaxSize is Jobs*4,
  458    message_queue_create(JobQueue, [max_size(MaxSize)]),
  459    message_queue_create(AnswerQueue, [max_size(MaxSize)]),
  460    ca_template(Gen, Test, Templ),
  461    term_variables(Gen+Test, AllVars),
  462    ReplyTempl =.. [v|AllVars],
  463    length(Workers, Jobs),
  464    Alive is 1<<Jobs-1,
  465    maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
  466                                    Templ, Test, ReplyTempl)),
  467            Workers),
  468    thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
  469                  GenThread),
  470    State = state(Alive),
  471    call_cleanup(
  472        ca_gather(State, AnswerQueue, ReplyTempl, Workers),
  473        ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
  474
  475ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
  476    repeat,
  477       thread_get_message(AnswerQueue, Msg),
  478       (   Msg = true(ReplyTempl)
  479       ->  true
  480       ;   Msg = done(Worker)
  481       ->  nth0(Done, Workers, Worker),
  482           arg(1, State, Alive0),
  483           Alive1 is Alive0 /\ \(1<<Done),
  484           debug(concurrent(and), 'Alive = ~2r', [Alive1]),
  485           (   Alive1 =:= 0
  486           ->  !,
  487               fail
  488           ;   nb_setarg(1, State, Alive1),
  489               fail
  490           )
  491       ;   Msg = error(E)
  492       ->  throw(E)
  493       ).
  494
  495ca_template(Gen, Test, Templ) :-
  496    term_variables(Gen,  GVars),
  497    term_variables(Test, TVars),
  498    sort(GVars, GVarsS),
  499    sort(TVars, TVarsS),
  500    ord_intersection(GVarsS, TVarsS, Shared),
  501    ord_union(GVarsS, Shared, TemplVars),
  502    Templ =.. [v|TemplVars].
  503
  504ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
  505    thread_self(Me),
  506    EG = error(existence_error(message_queue, _), _),
  507    repeat,
  508    catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
  509    (   Req = job(Templ)
  510    ->  (   catch(Test, E, true),
  511            (   var(E)
  512            ->  thread_send_message(AnswerQueue, true(ReplyTempl))
  513            ;   thread_send_message(AnswerQueue, error(E))
  514            ),
  515            fail
  516        )
  517    ;   Req == done
  518    ->  !,
  519        message_queue_destroy(JobQueue),
  520        thread_send_message(AnswerQueue, done(Me))
  521    ;   assertion(Req == all_done)
  522    ->  !,
  523        thread_send_message(AnswerQueue, done(Me))
  524    ).
  525
  526ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
  527    (   catch(Gen, E, true),
  528        (   var(E)
  529        ->  thread_send_message(JobQueue, job(Templ))
  530        ;   thread_send_message(AnswerQueue, error(E))
  531        ),
  532        fail
  533    ;   thread_send_message(JobQueue, done)
  534    ).
  535
  536ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
  537    safe_abort(GenThread),
  538    safe_join(GenThread),
  539    maplist(safe_abort, Workers),
  540    maplist(safe_join, Workers),
  541    message_queue_destroy(AnswerQueue),
  542    catch(message_queue_destroy(JobQueue), error(_,_), true).
  543
  544
  545                 /*******************************
  546                 *             MAPLIST          *
  547                 *******************************/
 concurrent_maplist(:Goal, +List) is semidet
 concurrent_maplist(:Goal, +List1, +List2) is semidet
 concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet
Concurrent version of maplist/2. This predicate uses concurrent/3, using multiple worker threads. The number of threads is the minimum of the list length and the number of cores available. The number of cores is determined using the prolog flag cpu_count. If this flag is absent or 1 or List has less than two elements, this predicate calls the corresponding maplist/N version using a wrapper based on once/1. Note that all goals are executed as if wrapped in once/1 and therefore these predicates are semidet.

Note that the the overhead of this predicate is considerable and therefore Goal must be fairly expensive before one reaches a speedup.

  566concurrent_maplist(Goal, List) :-
  567    workers(List, WorkerCount),
  568    !,
  569    maplist(ml_goal(Goal), List, Goals),
  570    concurrent(WorkerCount, Goals, []).
  571concurrent_maplist(M:Goal, List) :-
  572    maplist(once_in_module(M, Goal), List).
  573
  574once_in_module(M, Goal, Arg) :-
  575    call(M:Goal, Arg), !.
  576
  577ml_goal(Goal, Elem, call(Goal, Elem)).
  578
  579concurrent_maplist(Goal, List1, List2) :-
  580    same_length(List1, List2),
  581    workers(List1, WorkerCount),
  582    !,
  583    maplist(ml_goal(Goal), List1, List2, Goals),
  584    concurrent(WorkerCount, Goals, []).
  585concurrent_maplist(M:Goal, List1, List2) :-
  586    maplist(once_in_module(M, Goal), List1, List2).
  587
  588once_in_module(M, Goal, Arg1, Arg2) :-
  589    call(M:Goal, Arg1, Arg2), !.
  590
  591ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
  592
  593concurrent_maplist(Goal, List1, List2, List3) :-
  594    same_length(List1, List2, List3),
  595    workers(List1, WorkerCount),
  596    !,
  597    maplist(ml_goal(Goal), List1, List2, List3, Goals),
  598    concurrent(WorkerCount, Goals, []).
  599concurrent_maplist(M:Goal, List1, List2, List3) :-
  600    maplist(once_in_module(M, Goal), List1, List2, List3).
  601
  602once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
  603    call(M:Goal, Arg1, Arg2, Arg3), !.
  604
  605ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
  606
  607workers(List, Count) :-
  608    current_prolog_flag(cpu_count, Cores),
  609    Cores > 1,
  610    length(List, Len),
  611    Count is min(Cores,Len),
  612    Count > 1,
  613    !.
  614
  615same_length([], [], []).
  616same_length([_|T1], [_|T2], [_|T3]) :-
  617    same_length(T1, T2, T3).
  618
  619
  620                 /*******************************
  621                 *             FIRST            *
  622                 *******************************/
 first_solution(-X, :Goals, +Options) is semidet
Try alternative solvers concurrently, returning the first answer. In a typical scenario, solving any of the goals in Goals is satisfactory for the application to continue. As soon as one of the tried alternatives is successful, all the others are killed and first_solution/3 succeeds.

For example, if it is unclear whether it is better to search a graph breadth-first or depth-first we can use:

search_graph(Grap, Path) :-
         first_solution(Path, [ breadth_first(Graph, Path),
                                depth_first(Graph, Path)
                              ],
                        []).

Options include thread stack-sizes passed to thread_create, as well as the options on_fail and on_error that specify what to do if a solver fails or triggers an error. By default execution of all solvers is terminated and the result is returned. Sometimes one may wish to continue. One such scenario is if one of the solvers may run out of resources or one of the solvers is known to be incomplete.

on_fail(Action)
If stop (default), terminate all threads and stop with the failure. If continue, keep waiting.
on_error(Action)
As above, re-throwing the error if an error appears.
bug
- first_solution/3 cannot deal with non-determinism. There is no obvious way to fit non-determinism into it. If multiple solutions are needed wrap the solvers in findall/3.
  662first_solution(X, M:List, Options) :-
  663    message_queue_create(Done),
  664    thread_options(Options, ThreadOptions, RestOptions),
  665    length(List, JobCount),
  666    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
  667    wait_for_one(JobCount, Done, Result, RestOptions),
  668    concur_cleanup(kill, Solvers, [Done]),
  669    (   Result = done(_, Var)
  670    ->  X = Var
  671    ;   Result = error(_, Error)
  672    ->  throw(Error)
  673    ).
  674
  675create_solvers([], _, _, _, [], _).
  676create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
  677    thread_create(solve(M:H, X, Done), Id, Options),
  678    create_solvers(T, M, X, Done, IDs, Options).
  679
  680solve(Goal, Var, Queue) :-
  681    thread_self(Me),
  682    (   catch(Goal, E, true)
  683    ->  (   var(E)
  684        ->  thread_send_message(Queue, done(Me, Var))
  685        ;   thread_send_message(Queue, error(Me, E))
  686        )
  687    ;   thread_send_message(Queue, failed(Me))
  688    ).
  689
  690wait_for_one(0, _, failed, _) :- !.
  691wait_for_one(JobCount, Queue, Result, Options) :-
  692    thread_get_message(Queue, Msg),
  693    LeftCount is JobCount - 1,
  694    (   Msg = done(_, _)
  695    ->  Result = Msg
  696    ;   Msg = failed(_)
  697    ->  (   option(on_fail(stop), Options, stop)
  698        ->  Result = Msg
  699        ;   wait_for_one(LeftCount, Queue, Result, Options)
  700        )
  701    ;   Msg = error(_, _)
  702    ->  (   option(on_error(stop), Options, stop)
  703        ->  Result = Msg
  704        ;   wait_for_one(LeftCount, Queue, Result, Options)
  705        )
  706    ).
 thread_options(+Options, -ThreadOptions, -RestOptions) is det
Split the option list over thread(-size) options and other options.
  714thread_options([], [], []).
  715thread_options([H|T], [H|Th], O) :-
  716    thread_option(H),
  717    !,
  718    thread_options(T, Th, O).
  719thread_options([H|T], Th, [H|O]) :-
  720    thread_options(T, Th, O).
  721
  722thread_option(local(_)).
  723thread_option(global(_)).
  724thread_option(trail(_)).
  725thread_option(argument(_)).
  726thread_option(stack(_)).
 call_in_thread(+Thread, :Goal) is semidet
 call_in_thread(+Thread, :Goal, +Options) is semidet
Run Goal as an interrupt in the context of Thread. This is based on thread_signal/2. If waiting times out, we inject a stop(Reason) exception into Goal. Interrupts can be nested, i.e., it is allowed to run a call_in_thread/2 while the target thread is processing such an interrupt.

Options are passed to thread_get_message/3 and notably allow for specifying a timeout. If a timeout is reached, this predicate will attempt to kill Goal in Thread and act according to the option on_timeout.

on_timeout(:Goal)
If waiting terminates due to a timeout(Time), or deadline(Stamp) option, call Goal. The default is throw(time_limit_exceeded).

This predicate is primarily intended for debugging and inspection tasks.

  750call_in_thread(Thread, Goal) :-
  751    call_in_thread(Thread, Goal, []).
  752
  753call_in_thread(Thread, Goal, _) :-
  754    must_be(callable, Goal),
  755    var(Thread),
  756    !,
  757    instantiation_error(Thread).
  758call_in_thread(Thread, Goal, _) :-
  759    thread_self(Thread),
  760    !,
  761    once(Goal).
  762call_in_thread(Thread, Goal, Options) :-
  763    meta_options(is_meta, Options, Options1),
  764    term_variables(Goal, Vars),
  765    thread_self(Me),
  766    A is random(1 000 000 000),
  767    thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
  768    (   catch(thread_get_message(Me, in_thread(A,Result), Options1),
  769              Error,
  770              forward_exception(Thread, A, Error))
  771    ->  (   Result = true(Vars)
  772        ->  true
  773        ;   Result = error(Error)
  774        ->  throw(Error)
  775        ;   fail
  776        )
  777    ;   thread_signal(Thread, kill_task(A, stop(time_limit_exceeded))),
  778        option(on_timeout(Action), Options1, throw(time_limit_exceeded)),
  779        call(Action)
  780    ).
  781
  782is_meta(on_timeout).
  783
  784run_in_thread(Goal, Vars, Id, Sender) :-
  785    (   catch_with_backtrace(call(Goal), Error, true)
  786    ->  (   var(Error)
  787        ->  thread_send_message(Sender, in_thread(Id, true(Vars)))
  788        ;   Error = stop(_)
  789        ->  true
  790        ;   thread_send_message(Sender, in_thread(Id, error(Error)))
  791        )
  792    ;   thread_send_message(Sender, in_thread(Id, false))
  793    ).
  794
  795forward_exception(Thread, Id, Error) :-
  796    kill_with(Error, Kill),
  797    thread_signal(Thread, kill_task(Id, Kill)),
  798    throw(Error).
  799
  800kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
  801    !.
  802kill_with(_, stop(interrupt)).
  803
  804kill_task(Id, Exception) :-
  805    prolog_current_frame(Frame),
  806    prolog_frame_attribute(Frame, parent_goal,
  807                           run_in_thread(_Goal, _Vars, Id, _Sender)),
  808    !,
  809    throw(Exception).
  810kill_task(_, _)