18
19:- module(concurrency, [concurrent_or/1, concurrent_or/3]). 20
21:- meta_predicate concurrent_or(-,:,+). 22:- meta_predicate concurrent_or(:).
29concurrent_or(Goals) :-
30 term_variables(Goals,Vars),
31 concurrent_or(Vars,Goals,[]).
49concurrent_or(Vars, M:List, Options) :-
50 select_option(on_error(OnError),Options,Opts1,stop),
51 select_option(queue_factor(K),Opts1,Opts2,1),
52 length(List, JobCount),
53 QueueSize is K*JobCount,
54 message_queue_create(Done,[max_size(QueueSize)]),
55 setup_call_cleanup(
56 maplist(create_worker(M,Vars,Done,Opts2),List,Solvers),
57 wait_for_one(JobCount, Done, Vars, OnError),
58 ( debug(concurrency,'Sending kill signal to workers',[]),
59 maplist(kill_thread,Solvers), drain(Done),
60 debug(concurrency,'Waiting for workers to die.',[]),
61 maplist(thread_join,Solvers,_),
62 message_queue_destroy(Done)
63 )
64 ).
65
66drain(Q) :- thread_get_message(Q,_,[timeout(0)]) -> drain(Q); true.
67kill_thread(Id) :- catch(thread_signal(Id,throw(abort)),_,true).
68create_worker(M,V,Q,O,H,Id) :- thread_create(worker(M:H,V,Q),Id,O).
69
70wait_for_one(N, Q, X, OnError) :-
71 succ(N1,N),
72 thread_get_message(Q, Msg),
73 ( Msg=success(_,Var) -> (X=Var; wait_for_one(N,Q,X,OnError))
74 ; Msg=failed(_) -> wait_for_one(N1,Q,X,OnError)
75 ; Msg=error(_,E) -> ( OnError=stop -> throw(error(E))
76 ; print_message(error,E),
77 wait_for_one(N1,Q,X,OnError)
78 )
79 ).
80
81worker(Goal,Var,Q) :-
82 thread_self(Me),
83 debug(concurrency,'Worker started on ~q.',[Goal]),
84 ( catch( Goal,E, (thread_send_message(Q,error(Me,E)), throw(error))),
85 thread_send_message(Q,success(Me,Var)), fail
86 ; thread_send_message(Q,failed(Me)),
87 debug(concurrency,'Worker finished normally.',[])
88 )