1:- module(jolog,[ op(1200,xfx,&-)
    2                , op(1100,xfy,&)
    3                , jolog_import_sentinel/0
    4                , send/1
    5                , start_jolog/1
    6                , start_jolog/2
    7                ]).    8:- use_module(library(debug), [debug/3]).    9:- use_module(library(list_util), [split/3, xfy_list/3]).   10:- use_module(library(lists), [same_length/2]).   11:- use_module(library(error), [domain_error/2]).   12
   13:- use_module(library(jolog/manager)).   14
   15
   16:- thread_local channels/2.  % channels(Module, Message)
   17
   18/************* Jolog runtime code *******************/
 start_jolog(+Module, +Main) is det
Start running Jolog code defined in Module. After creating the runtime environment, this predicate sends the Main message. Use that message to trigger the rest of your application. For example,
main :-  % starting point for Prolog
    start_jolog(user, go).
go &-  % starting point for Jolog
    ( one_process
    & another_process
    ).
...

start_jolog/2 returns when one of the following is true:

   39start_jolog(Module,Main) :-
   40    setup_call_cleanup(
   41        manager_create(Module,Main),
   42        manager_loop(Module),
   43        manager_destroy(Module)
   44    ).
   45
   46create_worker(Module,Queue,ThreadId) :-
   47    thread_create(worker_loop(Module,Queue), ThreadId, [detached(true)]).
 start_jolog(+Module) is det
Like start_jolog/2 using main as the first message.
   53start_jolog(Module) :-
   54    start_jolog(Module, main).
 set_meta(+Module, +Name, +Value) is det
Set a named meta value for a Jolog runtime defined in Module.
   60set_meta(Module, Name, Value) :-
   61    retractall(meta(Module,Name,_)),
   62    assertz(meta(Module,Name,Value)).
 meta(+Module, ?Name, ?Value)
True if Jolog runtime defined in Module has a named meta value with Name and Value.
   69:- dynamic meta/3.   70
   71
   72% called by macro-expanded Jolog code to spawn a new, parallel process.
   73% Should only be called by the manager thread.
   74spawn_process(Module, Process) :-
   75    debug(jolog, '~w', spawn_process(Module, Process)),
   76
   77    % put process code in the workers' queue
   78    meta(Module, work_queue, WorkQueue),
   79    thread_send_message(WorkQueue, run_process(Process)),
   80
   81    % notify the manager that one more worker is active
   82    meta(Module, manager_queue, ManagerQueue),
   83    thread_send_message(ManagerQueue, active(+1)).
 send(+Message) is det
Sends a Jolog message to the relevant channel. Message should be a ground term whose functor indicates the channel and whose arguments indicate the message. For example, the following are legitimate messages:
send(hello)         % hello/0 channel
send(hello(world))  % hello/1 channel
send(foo(alpha,beta,gamma,delta))  % foo/4 channel
   96:- meta_predicate send(:).   97send(Module:Message) :-
   98    % someone listens on this channel; send the message
   99    functor(Message, Name, Arity),
  100    defined_channel(Module, Name, Arity),
  101    !,
  102    debug(jolog, '~w', [send(Module,Message)]),
  103    meta(Module, manager_queue, ManagerQueue),
  104    thread_send_message(ManagerQueue, send_message(Message)).
  105send(Module:Message) :-
  106    % nobody listens on this channel; generate a warning
  107    print_message(warning, jolog_nobody_listening(Module, Message)).
  108
  109% loop executed by Jolog worker threads
  110worker_loop(Module, Queue) :-
  111    debug(jolog,'~w',[worker(waiting)]),
  112    thread_get_message(Queue, Work),
  113    debug(jolog,'~w', [worker(job(Work))]),
  114    ( Work = halt ->
  115        debug(jolog,'worker exiting',[]),
  116        thread_exit(halt)
  117    ; Work = run_process(Goal) ->
  118        catch(Module:ignore(Goal),Ex,report_exception(Module,Goal,Ex)),
  119        meta(Module, manager_queue, ManagerQueue),
  120        debug(jolog,'~w',[worker(finished)]),
  121        thread_send_message(ManagerQueue, active(-1))
  122    ; % otherwise ->
  123        domain_error(jolog_worker_message, Work)
  124    ),
  125    worker_loop(Module, Queue).
  126
  127
  128report_exception(Module,Goal,Ex) :-
  129    print_message(warning,jolog_worker_crashed(Module,Goal)),
  130    print_message(warning,Ex).
  131
  132
  133prolog:message(jolog_worker_crashed(Module,Goal)) -->
  134   ["Caught exception in Jolog worker running ~q."-[Module:Goal]].
  135
  136
  137/*************************** Macro expansion code ***********************/
 jolog_import_sentinel
Nothing to see here. This is a junk predicate to keep Jolog macro expansion isolated to those modules that want it.
  143jolog_import_sentinel.
  144
  145
  146% True if the currently loading module wants jolog macro expansion
  147wants_jolog_expansion :-
  148    prolog_load_context(module, Module),
  149    predicate_property(Module:jolog_import_sentinel, imported_from(jolog)).
  150
  151
  152% Parse a jolog clause into its constituent parts
  153parse_join_clause((Head &- Body), Patterns, Guards, Processes) :-
  154    % separate head into individual join patterns
  155    xfy_list(',', Head, Patterns),
  156
  157    % separate body into guards and process terms
  158    xfy_list(',', Body, Goals),
  159    split(Goals, then, BodyParts),
  160    ( BodyParts = [Guards, ProcessTerms] ->
  161        true
  162    ; BodyParts = [ProcessTerms] ->  % missing 'then' goal
  163        Guards = []
  164    ),
  165
  166    % build process goals from process terms
  167    ( ProcessTerms = [] ->
  168        Processes = []
  169    ; ProcessTerms = [ProcessDisjunction] ->
  170        xfy_list('&', ProcessDisjunction, Processes)
  171    ; % otherwise ->
  172        xfy_list(',', Process, ProcessTerms),
  173        Processes = [Process]
  174    ).
 build_peek_goal(+Pattern, -MessageRef, -PeekGoal)
Convert a jolog join pattern into a goal which checks whether the pattern matches (PeekGoal). If calling PeekGoal succeeds, it binds MessageRef to a reference to a clause which represents the matching message. This reference can be used with erase/1 to consume the message.
  184build_peek_goal(Module, Pattern, MessageRef, PeekGoal) :-
  185    PeekGoal = jolog:clause(channels(Module,Pattern), true, MessageRef).
 remember_channel(+Module, +JoinPattern)
Makes a note that Module has a join pattern which refers to a specific channel.
  192:- dynamic defined_channel/3.  193remember_channel(Module, Pattern) :-
  194    functor(Pattern, Name, Arity),
  195    ( defined_channel(Module,Name,Arity) ->
  196        true
  197    ; % otherwise ->
  198        assertz(defined_channel(Module,Name,Arity))
  199    ).
  200
  201
  202user:term_expansion((Head &- Body), ('$jolog_code' :- Goals)) :-
  203    wants_jolog_expansion,
  204    parse_join_clause((Head &- Body), Patterns, Guards, Processes),
  205
  206    % build goals to peek at messages
  207    same_length(Patterns, MessageRefs),
  208    prolog_load_context(module, Module),
  209    maplist(build_peek_goal(Module), Patterns, MessageRefs, Peeks),
  210
  211    % remember which channels have been defined
  212    prolog_load_context(module, Module),
  213    maplist(remember_channel(Module), Patterns),
  214
  215    % build jolog clause body
  216    xfy_list(',', PeekGoals, Peeks),
  217    ( Guards=[] -> GuardGoals=true; xfy_list(',', GuardGoals, Guards) ),
  218    Module:dynamic('$jolog_code'/0),
  219    Goals = (
  220        debug(jolog,'Does head match? ~w', [Head]),
  221        PeekGoals,
  222        GuardGoals,
  223        !,
  224        maplist(erase, MessageRefs),
  225        maplist(jolog:spawn_process(Module), Processes)
  226    ).
  227user:term_expansion(end_of_file, _) :-
  228    % create Jolog clause to handle system halt
  229    wants_jolog_expansion,
  230    prolog_load_context(module, Module),
  231
  232    % only add a 'halt' rule if there are other rules
  233    Module:once(clause('$jolog_code', _)),
  234
  235    term_expansion((
  236        halt &-
  237             debug(jolog, 'halting', []),
  238             jolog:meta(Module, manager_queue, ManagerQueue),
  239             thread_send_message(ManagerQueue, halt),
  240             then
  241    ), Clause),
  242    Module:asserta(Clause),  % halt clause goes first
  243    remember_channel(Module, halt),
  244
  245    fail.  % let others have a chance to expand end_of_file