View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2007-2020, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9    All rights reserved.
   10
   11    Redistribution and use in source and binary forms, with or without
   12    modification, are permitted provided that the following conditions
   13    are met:
   14
   15    1. Redistributions of source code must retain the above copyright
   16       notice, this list of conditions and the following disclaimer.
   17
   18    2. Redistributions in binary form must reproduce the above copyright
   19       notice, this list of conditions and the following disclaimer in
   20       the documentation and/or other materials provided with the
   21       distribution.
   22
   23    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   24    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   25    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   26    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   27    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   28    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   29    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   30    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   31    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   32    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   33    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   34    POSSIBILITY OF SUCH DAMAGE.
   35*/
   36
   37:- module(thread,
   38          [ concurrent/3,               % +Threads, :Goals, +Options
   39            concurrent_maplist/2,       % :Goal, +List
   40            concurrent_maplist/3,       % :Goal, ?List1, ?List2
   41            concurrent_maplist/4,       % :Goal, ?List1, ?List2, ?List3
   42            concurrent_forall/2,        % :Generate, :Test
   43            concurrent_forall/3,        % :Generate, :Test, +Options
   44            concurrent_and/2,           % :Generator,:Test
   45            concurrent_and/3,           % :Generator,:Test,+Options
   46            first_solution/3,           % -Var, :Goals, +Options
   47
   48            call_in_thread/2            % +Thread, :Goal
   49          ]).   50:- autoload(library(apply), [maplist/2, maplist/3, maplist/4, maplist/5]).   51:- autoload(library(error), [must_be/2]).   52:- autoload(library(lists), [subtract/3, same_length/2, nth0/3]).   53:- autoload(library(option), [option/2, option/3]).   54:- autoload(library(ordsets), [ord_intersection/3, ord_union/3]).   55:- use_module(library(debug), [debug/3, assertion/1]).   56
   57%:- debug(concurrent).
   58
   59:- meta_predicate
   60    concurrent(+, :, +),
   61    concurrent_maplist(1, +),
   62    concurrent_maplist(2, ?, ?),
   63    concurrent_maplist(3, ?, ?, ?),
   64    concurrent_forall(0, 0),
   65    concurrent_forall(0, 0, +),
   66    concurrent_and(0, 0),
   67    concurrent_and(0, 0, +),
   68    first_solution(-, :, +),
   69    call_in_thread(+, 0).   70
   71
   72:- predicate_options(concurrent/3, 3,
   73                     [ pass_to(system:thread_create/3, 3)
   74                     ]).   75:- predicate_options(concurrent_forall/3, 3,
   76                     [ threads(nonneg)
   77                     ]).   78:- predicate_options(concurrent_and/3, 3,
   79                     [ threads(nonneg)
   80                     ]).   81:- predicate_options(first_solution/3, 3,
   82                     [ on_fail(oneof([stop,continue])),
   83                       on_error(oneof([stop,continue])),
   84                       pass_to(system:thread_create/3, 3)
   85                     ]).   86
   87/** <module> High level thread primitives
   88
   89This  module  defines  simple  to  use   predicates  for  running  goals
   90concurrently.  Where  the  core  multi-threaded    API  is  targeted  at
   91communicating long-living threads, the predicates   here  are defined to
   92run goals concurrently without having to   deal with thread creation and
   93maintenance explicitely.
   94
   95Note that these predicates run goals   concurrently  and therefore these
   96goals need to be thread-safe. As  the   predicates  in  this module also
   97abort branches of the computation that  are no longer needed, predicates
   98that have side-effect must act properly.  In   a  nutshell, this has the
   99following consequences:
  100
  101  * Nice clean Prolog code without side-effects (but with cut) works
  102    fine.
  103  * Side-effects are bad news.  If you really need assert to store
  104    intermediate results, use the thread_local/1 declaration.  This
  105    also guarantees cleanup of left-over clauses if the thread is
  106    cancelled.  For other side-effects, make sure to use call_cleanup/2
  107    to undo them should the thread be cancelled.
  108  * Global variables are ok as they are thread-local and destroyed
  109    on thread cancellation.  Note however that global variables in
  110    the calling thread are *not* available in the threads that are
  111    created.  You have to pass the value as an argument and initialise
  112    the variable in the new thread.
  113  * Thread-cancellation uses thread_signal/2.  Using this code
  114    with long-blocking foreign predicates may result in long delays,
  115    even if another thread asks for cancellation.
  116
  117@author Jan Wielemaker
  118*/
  119
  120%!  concurrent(+N, :Goals, +Options) is semidet.
  121%
  122%   Run Goals in parallel using N   threads.  This call blocks until
  123%   all work has been done.  The   Goals  must  be independent. They
  124%   should not communicate using shared  variables   or  any form of
  125%   global data. All Goals must be thread-safe.
  126%
  127%   Execution succeeds if all goals  have   succeeded.  If  one goal
  128%   fails or throws an exception,  other   workers  are abandoned as
  129%   soon as possible and the entire   computation fails or re-throws
  130%   the exception. Note that if  multiple   goals  fail  or raise an
  131%   error it is not defined which error or failure is reported.
  132%
  133%   On successful completion, variable bindings   are returned. Note
  134%   however that threads have independent   stacks and therefore the
  135%   goal is copied to the worker  thread   and  the result is copied
  136%   back to the caller of concurrent/3.
  137%
  138%   Choosing the right number of threads is not always obvious. Here
  139%   are some scenarios:
  140%
  141%     * If the goals are CPU intensive and normally all succeeding,
  142%     typically the number of CPUs is the optimal number of
  143%     threads.  Less does not use all CPUs, more wastes time in
  144%     context switches and also uses more memory.
  145%
  146%     * If the tasks are I/O bound the number of threads is
  147%     typically higher than the number of CPUs.
  148%
  149%     * If one or more of the goals may fail or produce an error,
  150%     using a higher number of threads may find this earlier.
  151%
  152%   @arg N Number of worker-threads to create. Using 1, no threads
  153%        are created.  If N is larger than the number of Goals we
  154%        create exactly as many threads as there are Goals.
  155%   @arg Goals List of callable terms.
  156%   @arg Options Passed to thread_create/3 for creating the
  157%        workers.  Only options changing the stack-sizes can
  158%        be used. In particular, do not pass the detached or alias
  159%        options.
  160%   @see In many cases, concurrent_maplist/2 and friends
  161%        is easier to program and is tractable to program
  162%        analysis.
  163
  164concurrent(1, M:List, _) :-
  165    !,
  166    maplist(once_in_module(M), List).
  167concurrent(N, M:List, Options) :-
  168    must_be(positive_integer, N),
  169    must_be(list(callable), List),
  170    length(List, JobCount),
  171    message_queue_create(Done),
  172    message_queue_create(Queue),
  173    WorkerCount is min(N, JobCount),
  174    create_workers(WorkerCount, Queue, Done, Workers, Options),
  175    submit_goals(List, 1, M, Queue, VarList),
  176    forall(between(1, WorkerCount, _),
  177           thread_send_message(Queue, done)),
  178    VT =.. [vars|VarList],
  179    concur_wait(JobCount, Done, VT, cleanup(Workers, Queue),
  180                Result, [], Exitted),
  181    subtract(Workers, Exitted, RemainingWorkers),
  182    concur_cleanup(Result, RemainingWorkers, [Queue, Done]),
  183    (   Result == true
  184    ->  true
  185    ;   Result = false
  186    ->  fail
  187    ;   Result = exception(Error)
  188    ->  throw(Error)
  189    ).
  190
  191once_in_module(M, Goal) :-
  192    call(M:Goal), !.
  193
  194%!  submit_goals(+List, +Id0, +Module, +Queue, -Vars) is det.
  195%
  196%   Send all jobs from List to Queue. Each goal is added to Queue as
  197%   a term goal(Id, Goal, Vars). Vars  is   unified  with  a list of
  198%   lists of free variables appearing in each goal.
  199
  200submit_goals([], _, _, _, []).
  201submit_goals([H|T], I, M, Queue, [Vars|VT]) :-
  202    term_variables(H, Vars),
  203    thread_send_message(Queue, goal(I, M:H, Vars)),
  204    I2 is I + 1,
  205    submit_goals(T, I2, M, Queue, VT).
  206
  207
  208%!  concur_wait(+N, +Done:queue, +VT:compound, +Cleanup,
  209%!              -Result, +Exitted0, -Exitted) is semidet.
  210%
  211%   Wait for completion, failure or error.
  212%
  213%   @arg Exited List of thread-ids with threads that completed
  214%   before all work was done.
  215
  216concur_wait(0, _, _, _, true, Exited, Exited) :- !.
  217concur_wait(N, Done, VT, Cleanup, Status, Exitted0, Exitted) :-
  218    debug(concurrent, 'Concurrent: waiting for workers ...', []),
  219    catch(thread_get_message(Done, Exit), Error,
  220          concur_abort(Error, Cleanup, Done, Exitted0)),
  221    debug(concurrent, 'Waiting: received ~p', [Exit]),
  222    (   Exit = done(Id, Vars)
  223    ->  debug(concurrent, 'Concurrent: Job ~p completed with ~p', [Id, Vars]),
  224        arg(Id, VT, Vars),
  225        N2 is N - 1,
  226        concur_wait(N2, Done, VT, Cleanup, Status, Exitted0, Exitted)
  227    ;   Exit = finished(Thread)
  228    ->  thread_join(Thread, JoinStatus),
  229        debug(concurrent, 'Concurrent: waiter ~p joined: ~p',
  230              [Thread, JoinStatus]),
  231        (   JoinStatus == true
  232        ->  concur_wait(N, Done, VT, Cleanup, Status, [Thread|Exitted0], Exitted)
  233        ;   Status = JoinStatus,
  234            Exitted = [Thread|Exitted0]
  235        )
  236    ).
  237
  238concur_abort(Error, cleanup(Workers, Queue), Done, Exitted) :-
  239    debug(concurrent, 'Concurrent: got ~p', [Error]),
  240    subtract(Workers, Exitted, RemainingWorkers),
  241    concur_cleanup(Error, RemainingWorkers, [Queue, Done]),
  242    throw(Error).
  243
  244create_workers(N, Queue, Done, [Id|Ids], Options) :-
  245    N > 0,
  246    !,
  247    thread_create(worker(Queue, Done), Id,
  248                  [ at_exit(thread_send_message(Done, finished(Id)))
  249                  | Options
  250                  ]),
  251    N2 is N - 1,
  252    create_workers(N2, Queue, Done, Ids, Options).
  253create_workers(_, _, _, [], _).
  254
  255
  256%!  worker(+WorkQueue, +DoneQueue) is det.
  257%
  258%   Process jobs from WorkQueue and send the results to DoneQueue.
  259
  260worker(Queue, Done) :-
  261    thread_get_message(Queue, Message),
  262    debug(concurrent, 'Worker: received ~p', [Message]),
  263    (   Message = goal(Id, Goal, Vars)
  264    ->  (   Goal
  265        ->  thread_send_message(Done, done(Id, Vars)),
  266            worker(Queue, Done)
  267        )
  268    ;   true
  269    ).
  270
  271
  272%!  concur_cleanup(+Result, +Workers:list, +Queues:list) is det.
  273%
  274%   Cleanup the concurrent workers and message  queues. If Result is
  275%   not =true=, signal all workers to make them stop prematurely. If
  276%   result is true we assume  all   workers  have been instructed to
  277%   stop or have stopped themselves.
  278
  279concur_cleanup(Result, Workers, Queues) :-
  280    !,
  281    (   Result == true
  282    ->  true
  283    ;   kill_workers(Workers)
  284    ),
  285    join_all(Workers),
  286    maplist(message_queue_destroy, Queues).
  287
  288kill_workers([]).
  289kill_workers([Id|T]) :-
  290    debug(concurrent, 'Signalling ~w', [Id]),
  291    catch(thread_signal(Id, abort), _, true),
  292    kill_workers(T).
  293
  294join_all([]).
  295join_all([Id|T]) :-
  296    thread_join(Id, _),
  297    join_all(T).
  298
  299
  300		 /*******************************
  301		 *             FORALL		*
  302		 *******************************/
  303
  304%!  concurrent_forall(:Generate, :Action) is semidet.
  305%!  concurrent_forall(:Generate, :Action, +Options) is semidet.
  306%
  307%   True when Action is true for all solutions of Generate. This has the
  308%   same semantics as forall/2, but  the   Action  goals are executed in
  309%   multiple threads. Notable a failing Action   or a Action throwing an
  310%   exception signals the calling  thread  which   in  turn  aborts  all
  311%   workers and fails or re-throws the generated error. Options:
  312%
  313%     - threads(+Count)
  314%       Number of threads to use.  The default is determined by the
  315%       Prolog flag `cpu_count`.
  316%
  317%   @tbd Ideally we would grow the   set of workers dynamically, similar
  318%   to dynamic scheduling of  HTTP  worker   threads.  This  would avoid
  319%   creating threads that are never used if Generate is too slow or does
  320%   not provide enough answers and  would   further  raise the number of
  321%   threads if Action is I/O bound rather than CPU bound.
  322
  323:- dynamic
  324    fa_aborted/1.  325
  326concurrent_forall(Generate, Test) :-
  327    concurrent_forall(Generate, Test, []).
  328
  329concurrent_forall(Generate, Test, Options) :-
  330    jobs(Jobs, Options),
  331    Jobs > 1,
  332    !,
  333    term_variables(Generate, GVars),
  334    term_variables(Test, TVars),
  335    sort(GVars, GVarsS),
  336    sort(TVars, TVarsS),
  337    ord_intersection(GVarsS, TVarsS, Shared),
  338    Templ =.. [v|Shared],
  339    MaxSize is Jobs*4,
  340    message_queue_create(Q, [max_size(MaxSize)]),
  341    length(Workers, Jobs),
  342    thread_self(Me),
  343    maplist(thread_create(fa_worker(Q, Me, Templ, Test)), Workers),
  344    catch(( forall(Generate,
  345                   thread_send_message(Q, job(Templ))),
  346            forall(between(1, Jobs, _),
  347                   thread_send_message(Q, done)),
  348            maplist(thread_join, Workers),
  349            message_queue_destroy(Q)
  350          ),
  351          Error,
  352          fa_cleanup(Error, Workers, Q)).
  353concurrent_forall(Generate, Test, _) :-
  354    forall(Generate, Test).
  355
  356fa_cleanup(Error, Workers, Q) :-
  357    maplist(safe_abort, Workers),
  358    debug(concurrent(fail), 'Joining workers', []),
  359    maplist(safe_join, Workers),
  360    debug(concurrent(fail), 'Destroying queue', []),
  361    retractall(fa_aborted(Q)),
  362    message_queue_destroy(Q),
  363    (   Error = fa_worker_failed(_0Test, Why)
  364    ->  debug(concurrent(fail), 'Test ~p failed: ~p', [_0Test, Why]),
  365        (   Why == false
  366        ->  fail
  367        ;   Why = error(E)
  368        ->  throw(E)
  369        ;   assertion(fail)
  370        )
  371    ;   throw(Error)
  372    ).
  373
  374fa_worker(Queue, Main, Templ, Test) :-
  375    repeat,
  376    thread_get_message(Queue, Msg),
  377    (   Msg == done
  378    ->  !
  379    ;   Msg = job(Templ),
  380        debug(concurrent, 'Running test ~p', [Test]),
  381        (   catch_with_backtrace(Test, E, true)
  382        ->  (   var(E)
  383            ->  fail
  384            ;   fa_stop(Queue, Main, fa_worker_failed(Test, error(E)))
  385            )
  386        ;   !,
  387            fa_stop(Queue, Main, fa_worker_failed(Test, false))
  388        )
  389    ).
  390
  391fa_stop(Queue, Main, Why) :-
  392    with_mutex('$concurrent_forall',
  393               fa_stop_sync(Queue, Main, Why)).
  394
  395fa_stop_sync(Queue, _Main, _Why) :-
  396    fa_aborted(Queue),
  397    !.
  398fa_stop_sync(Queue, Main, Why) :-
  399    asserta(fa_aborted(Queue)),
  400    debug(concurrent(fail), 'Stop due to ~p. Signalling ~q', [Why, Main]),
  401    thread_signal(Main, throw(Why)).
  402
  403jobs(Jobs, Options) :-
  404    (   option(threads(Jobs), Options)
  405    ->  true
  406    ;   current_prolog_flag(cpu_count, Jobs)
  407    ->  true
  408    ;   Jobs = 1
  409    ).
  410
  411safe_abort(Thread) :-
  412    catch(thread_signal(Thread, abort), error(_,_), true).
  413safe_join(Thread) :-
  414    E = error(_,_),
  415    catch(thread_join(Thread, _Status), E, true).
  416
  417
  418		 /*******************************
  419		 *              AND		*
  420		 *******************************/
  421
  422%!  concurrent_and(:Generator, :Test).
  423%!  concurrent_and(:Generator, :Test, +Options).
  424%
  425%   Concurrent version of `(Generator,Test)`. This   predicate creates a
  426%   thread providing solutions for Generator that   are handed to a pool
  427%   of threads that run Test for   the different instantiations provided
  428%   by Generator concurrently. The predicate  is logically equivalent to
  429%   a simple conjunction except for two  aspects: (1) terms are _copied_
  430%   from Generator to the test  Test   threads  while answers are copied
  431%   back to the calling thread and (2)   answers  may be produced out of
  432%   order.
  433%
  434%   If   the   evaluation   of   some    Test   raises   an   exception,
  435%   concurrent_and/2,3 is terminated with this  exception. If the caller
  436%   commits  after  a  given  answer  or    raises  an  exception  while
  437%   concurrent_and/2,3  is  active  with  pending   choice  points,  all
  438%   involved resources are reclaimed.
  439%
  440%   Options:
  441%
  442%     - threads(+Count)
  443%       Create a worker pool holding Count threads.  The default is
  444%       the Prolog flag `cpu_count`.
  445%
  446%   This    predicate    was    proposed     by      Jan     Burse    as
  447%   balance((Generator,Test)).
  448
  449concurrent_and(Gen, Test) :-
  450    concurrent_and(Gen, Test, []).
  451
  452concurrent_and(Gen, Test, Options) :-
  453    jobs(Jobs, Options),
  454    MaxSize is Jobs*4,
  455    message_queue_create(JobQueue, [max_size(MaxSize)]),
  456    message_queue_create(AnswerQueue, [max_size(MaxSize)]),
  457    ca_template(Gen, Test, Templ),
  458    term_variables(Gen+Test, AllVars),
  459    ReplyTempl =.. [v|AllVars],
  460    length(Workers, Jobs),
  461    Alive is 1<<Jobs-1,
  462    maplist(thread_create(ca_worker(JobQueue, AnswerQueue,
  463                                    Templ, Test, ReplyTempl)),
  464            Workers),
  465    thread_create(ca_generator(Gen, Templ, JobQueue, AnswerQueue),
  466                  GenThread),
  467    State = state(Alive),
  468    call_cleanup(
  469        ca_gather(State, AnswerQueue, ReplyTempl, Workers),
  470        ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue)).
  471
  472ca_gather(State, AnswerQueue, ReplyTempl, Workers) :-
  473    repeat,
  474       thread_get_message(AnswerQueue, Msg),
  475       (   Msg = true(ReplyTempl)
  476       ->  true
  477       ;   Msg = done(Worker)
  478       ->  nth0(Done, Workers, Worker),
  479           arg(1, State, Alive0),
  480           Alive1 is Alive0 /\ \(1<<Done),
  481           debug(concurrent(and), 'Alive = ~2r', [Alive1]),
  482           (   Alive1 =:= 0
  483           ->  !,
  484               fail
  485           ;   nb_setarg(1, State, Alive1),
  486               fail
  487           )
  488       ;   Msg = error(E)
  489       ->  throw(E)
  490       ).
  491
  492ca_template(Gen, Test, Templ) :-
  493    term_variables(Gen,  GVars),
  494    term_variables(Test, TVars),
  495    sort(GVars, GVarsS),
  496    sort(TVars, TVarsS),
  497    ord_intersection(GVarsS, TVarsS, Shared),
  498    ord_union(GVarsS, Shared, TemplVars),
  499    Templ =.. [v|TemplVars].
  500
  501ca_worker(JobQueue, AnswerQueue, Templ, Test, ReplyTempl) :-
  502    thread_self(Me),
  503    EG = error(existence_error(message_queue, _), _),
  504    repeat,
  505    catch(thread_get_message(JobQueue, Req), EG, Req=all_done),
  506    (   Req = job(Templ)
  507    ->  (   catch(Test, E, true),
  508            (   var(E)
  509            ->  thread_send_message(AnswerQueue, true(ReplyTempl))
  510            ;   thread_send_message(AnswerQueue, error(E))
  511            ),
  512            fail
  513        )
  514    ;   Req == done
  515    ->  !,
  516        message_queue_destroy(JobQueue),
  517        thread_send_message(AnswerQueue, done(Me))
  518    ;   assertion(Req == all_done)
  519    ->  !,
  520        thread_send_message(AnswerQueue, done(Me))
  521    ).
  522
  523ca_generator(Gen, Templ, JobQueue, AnswerQueue) :-
  524    (   catch(Gen, E, true),
  525        (   var(E)
  526        ->  thread_send_message(JobQueue, job(Templ))
  527        ;   thread_send_message(AnswerQueue, error(E))
  528        ),
  529        fail
  530    ;   thread_send_message(JobQueue, done)
  531    ).
  532
  533ca_cleanup(GenThread, Workers, JobQueue, AnswerQueue) :-
  534    safe_abort(GenThread),
  535    safe_join(GenThread),
  536    maplist(safe_abort, Workers),
  537    maplist(safe_join, Workers),
  538    message_queue_destroy(AnswerQueue),
  539    catch(message_queue_destroy(JobQueue), error(_,_), true).
  540
  541
  542                 /*******************************
  543                 *             MAPLIST          *
  544                 *******************************/
  545
  546%!  concurrent_maplist(:Goal, +List) is semidet.
  547%!  concurrent_maplist(:Goal, +List1, +List2) is semidet.
  548%!  concurrent_maplist(:Goal, +List1, +List2, +List3) is semidet.
  549%
  550%   Concurrent version of maplist/2. This   predicate uses concurrent/3,
  551%   using multiple _worker_ threads.  The  number   of  threads  is  the
  552%   minimum of the list length and the   number  of cores available. The
  553%   number of cores is determined using  the prolog flag =cpu_count=. If
  554%   this flag is absent or 1 or List   has  less than two elements, this
  555%   predicate calls the corresponding maplist/N  version using a wrapper
  556%   based on once/1. Note that all goals   are executed as if wrapped in
  557%   once/1 and therefore these predicates are _semidet_.
  558%
  559%   Note that the the overhead  of   this  predicate is considerable and
  560%   therefore Goal must  be  fairly  expensive   before  one  reaches  a
  561%   speedup.
  562
  563concurrent_maplist(Goal, List) :-
  564    workers(List, WorkerCount),
  565    !,
  566    maplist(ml_goal(Goal), List, Goals),
  567    concurrent(WorkerCount, Goals, []).
  568concurrent_maplist(M:Goal, List) :-
  569    maplist(once_in_module(M, Goal), List).
  570
  571once_in_module(M, Goal, Arg) :-
  572    call(M:Goal, Arg), !.
  573
  574ml_goal(Goal, Elem, call(Goal, Elem)).
  575
  576concurrent_maplist(Goal, List1, List2) :-
  577    same_length(List1, List2),
  578    workers(List1, WorkerCount),
  579    !,
  580    maplist(ml_goal(Goal), List1, List2, Goals),
  581    concurrent(WorkerCount, Goals, []).
  582concurrent_maplist(M:Goal, List1, List2) :-
  583    maplist(once_in_module(M, Goal), List1, List2).
  584
  585once_in_module(M, Goal, Arg1, Arg2) :-
  586    call(M:Goal, Arg1, Arg2), !.
  587
  588ml_goal(Goal, Elem1, Elem2, call(Goal, Elem1, Elem2)).
  589
  590concurrent_maplist(Goal, List1, List2, List3) :-
  591    same_length(List1, List2, List3),
  592    workers(List1, WorkerCount),
  593    !,
  594    maplist(ml_goal(Goal), List1, List2, List3, Goals),
  595    concurrent(WorkerCount, Goals, []).
  596concurrent_maplist(M:Goal, List1, List2, List3) :-
  597    maplist(once_in_module(M, Goal), List1, List2, List3).
  598
  599once_in_module(M, Goal, Arg1, Arg2, Arg3) :-
  600    call(M:Goal, Arg1, Arg2, Arg3), !.
  601
  602ml_goal(Goal, Elem1, Elem2, Elem3, call(Goal, Elem1, Elem2, Elem3)).
  603
  604workers(List, Count) :-
  605    current_prolog_flag(cpu_count, Cores),
  606    Cores > 1,
  607    length(List, Len),
  608    Count is min(Cores,Len),
  609    Count > 1,
  610    !.
  611
  612same_length([], [], []).
  613same_length([_|T1], [_|T2], [_|T3]) :-
  614    same_length(T1, T2, T3).
  615
  616
  617                 /*******************************
  618                 *             FIRST            *
  619                 *******************************/
  620
  621%!  first_solution(-X, :Goals, +Options) is semidet.
  622%
  623%   Try  alternative  solvers  concurrently,   returning  the  first
  624%   answer. In a typical scenario, solving any of the goals in Goals
  625%   is satisfactory for the application to  continue. As soon as one
  626%   of the tried alternatives is  successful,   all  the others are
  627%   killed and first_solution/3 succeeds.
  628%
  629%   For example, if it is unclear whether   it is better to search a
  630%   graph breadth-first or depth-first we can use:
  631%
  632%   ==
  633%   search_graph(Grap, Path) :-
  634%            first_solution(Path, [ breadth_first(Graph, Path),
  635%                                   depth_first(Graph, Path)
  636%                                 ],
  637%                           []).
  638%   ==
  639%
  640%   Options include thread stack-sizes passed   to thread_create, as
  641%   well as the options =on_fail= and   =on_error= that specify what
  642%   to do if a  solver  fails  or   triggers  an  error.  By default
  643%   execution of all  solvers  is  terminated   and  the  result  is
  644%   returned. Sometimes one may wish to  continue. One such scenario
  645%   is if one of the solvers may run  out of resources or one of the
  646%   solvers is known to be incomplete.
  647%
  648%           * on_fail(Action)
  649%           If =stop= (default), terminate all threads and stop with
  650%           the failure.  If =continue=, keep waiting.
  651%           * on_error(Action)
  652%           As above, re-throwing the error if an error appears.
  653%
  654%   @bug    first_solution/3 cannot deal with non-determinism.  There
  655%           is no obvious way to fit non-determinism into it.  If multiple
  656%           solutions are needed wrap the solvers in findall/3.
  657
  658
  659first_solution(X, M:List, Options) :-
  660    message_queue_create(Done),
  661    thread_options(Options, ThreadOptions, RestOptions),
  662    length(List, JobCount),
  663    create_solvers(List, M, X, Done, Solvers, ThreadOptions),
  664    wait_for_one(JobCount, Done, Result, RestOptions),
  665    concur_cleanup(kill, Solvers, [Done]),
  666    (   Result = done(_, Var)
  667    ->  X = Var
  668    ;   Result = error(_, Error)
  669    ->  throw(Error)
  670    ).
  671
  672create_solvers([], _, _, _, [], _).
  673create_solvers([H|T], M, X, Done, [Id|IDs], Options) :-
  674    thread_create(solve(M:H, X, Done), Id, Options),
  675    create_solvers(T, M, X, Done, IDs, Options).
  676
  677solve(Goal, Var, Queue) :-
  678    thread_self(Me),
  679    (   catch(Goal, E, true)
  680    ->  (   var(E)
  681        ->  thread_send_message(Queue, done(Me, Var))
  682        ;   thread_send_message(Queue, error(Me, E))
  683        )
  684    ;   thread_send_message(Queue, failed(Me))
  685    ).
  686
  687wait_for_one(0, _, failed, _) :- !.
  688wait_for_one(JobCount, Queue, Result, Options) :-
  689    thread_get_message(Queue, Msg),
  690    LeftCount is JobCount - 1,
  691    (   Msg = done(_, _)
  692    ->  Result = Msg
  693    ;   Msg = failed(_)
  694    ->  (   option(on_fail(stop), Options, stop)
  695        ->  Result = Msg
  696        ;   wait_for_one(LeftCount, Queue, Result, Options)
  697        )
  698    ;   Msg = error(_, _)
  699    ->  (   option(on_error(stop), Options, stop)
  700        ->  Result = Msg
  701        ;   wait_for_one(LeftCount, Queue, Result, Options)
  702        )
  703    ).
  704
  705
  706%!  thread_options(+Options, -ThreadOptions, -RestOptions) is det.
  707%
  708%   Split the option  list  over   thread(-size)  options  and other
  709%   options.
  710
  711thread_options([], [], []).
  712thread_options([H|T], [H|Th], O) :-
  713    thread_option(H),
  714    !,
  715    thread_options(T, Th, O).
  716thread_options([H|T], Th, [H|O]) :-
  717    thread_options(T, Th, O).
  718
  719thread_option(local(_)).
  720thread_option(global(_)).
  721thread_option(trail(_)).
  722thread_option(argument(_)).
  723thread_option(stack(_)).
  724
  725
  726%!  call_in_thread(+Thread, :Goal) is semidet.
  727%
  728%   Run Goal as an interrupt in the context  of Thread. This is based on
  729%   thread_signal/2. If waiting times  out,   we  inject  a stop(Reason)
  730%   exception into Goal. Interrupts can be   nested, i.e., it is allowed
  731%   to run a call_in_thread/2 while the target thread is processing such
  732%   an interrupt.
  733%
  734%   This predicate is primarily intended   for  debugging and inspection
  735%   tasks.
  736
  737call_in_thread(Thread, Goal) :-
  738    thread_self(Thread),
  739    !,
  740    once(Goal).
  741call_in_thread(Thread, Goal) :-
  742    term_variables(Goal, Vars),
  743    thread_self(Me),
  744    A is random(1 000 000 000),
  745    thread_signal(Thread, run_in_thread(Goal,Vars,A,Me)),
  746    catch(thread_get_message(in_thread(A,Result)),
  747          Error,
  748          forward_exception(Thread, A, Error)),
  749    (   Result = true(Vars)
  750    ->  true
  751    ;   Result = error(Error)
  752    ->  throw(Error)
  753    ;   fail
  754    ).
  755
  756run_in_thread(Goal, Vars, Id, Sender) :-
  757    (   catch_with_backtrace(call(Goal), Error, true)
  758    ->  (   var(Error)
  759        ->  thread_send_message(Sender, in_thread(Id, true(Vars)))
  760        ;   Error = stop(_)
  761        ->  true
  762        ;   thread_send_message(Sender, in_thread(Id, error(Error)))
  763        )
  764    ;   thread_send_message(Sender, in_thread(Id, false))
  765    ).
  766
  767forward_exception(Thread, Id, Error) :-
  768    kill_with(Error, Kill),
  769    thread_signal(Thread, kill_task(Id, Kill)),
  770    throw(Error).
  771
  772kill_with(time_limit_exceeded, stop(time_limit_exceeded)) :-
  773    !.
  774kill_with(_, stop(interrupt)).
  775
  776kill_task(Id, Exception) :-
  777    prolog_current_frame(Frame),
  778    prolog_frame_attribute(Frame, parent_goal,
  779                           run_in_thread(_Goal, _Vars, Id, _Sender)),
  780    !,
  781    throw(Exception).
  782kill_task(_, _)