1:- module(jolog_manager, [ manager_create/2
    2                         , manager_destroy/1
    3                         , manager_loop/1
    4                         ]).    5
    6manager_loop(Module) :-
    7    manager_loop(Module, 0).
    8
    9manager_loop(Module, Outstanding) :-
   10    iterate_patterns(Module, Outstanding).
   11
   12
   13% create a Jolog manager
   14manager_create(Module,StartupMessage) :-
   15    message_queue_create(ManagerQueue),
   16    jolog:set_meta(Module, manager_queue, ManagerQueue),
   17
   18    message_queue_create(WorkQueue),
   19    jolog:set_meta(Module, work_queue, WorkQueue),
   20
   21    current_prolog_flag(cpu_count,CoreCount),
   22    WorkerCount is 2*CoreCount,
   23    jolog:set_meta(Module, worker_count, WorkerCount),
   24    length(Workers,WorkerCount),
   25    debug(jolog,"Jolog: starting ~d worker threads.",[WorkerCount]),
   26    maplist(jolog:create_worker(Module,WorkQueue),Workers),
   27    jolog:set_meta(Module, workers, Workers),
   28
   29    Module:send(StartupMessage).
   30
   31
   32% destroy a Jolog manager created with manager_create/2
   33manager_destroy(Module) :-
   34    jolog:meta(Module,workers,Workers),
   35    maplist(destroy_worker,Workers),
   36
   37    jolog:meta(Module,work_queue,WorkQueue),
   38    message_queue_destroy(WorkQueue),
   39
   40    jolog:meta(Module,manager_queue,ManagerQueue),
   41    message_queue_destroy(ManagerQueue),
   42
   43    % don't leave messages or metadata lying around
   44    retractall(jolog:meta(Module,_,_)),
   45    retractall(jolog:channels(_,_)).
   46
   47destroy_worker(ThreadId) :-
   48    debug(jolog,'Signaling worker ~d to stop',[ThreadId]),
   49    thread_signal(ThreadId,thread_exit(halt)).
   50
   51
   52% match as many join patterns as possible
   53iterate_patterns(Module, Outstanding) :-
   54    debug(jolog, '~w', [manager(iterate_patterns,Outstanding)]),
   55    Module:'$jolog_code',
   56    !,
   57    iterate_patterns(Module, Outstanding).
   58iterate_patterns(Module, Outstanding) :-
   59    iterate_events(Module, Outstanding).
   60
   61
   62% process as many manager events as possible
   63iterate_events(Module, Outstanding) :-
   64    take_event_no_block(Module,Event),
   65    debug(jolog,'~w',[manager(event, Event)]),
   66    handle_event(Event, Module, Outstanding).
   67
   68handle_event(send_message(Msg), Module, Outstanding) :-
   69    debug(jolog,"Sending message: ~w",[Msg]),
   70    jolog:assert(channels(Module,Msg)),
   71    iterate_patterns(Module, Outstanding).  % patterns might match now
   72handle_event(active(N), Module, Outstanding0) :-
   73    Outstanding is Outstanding0 + N,
   74    debug(jolog,"Outstanding workers: ~d -> ~d",[Outstanding0,Outstanding]),
   75    iterate_events(Module, Outstanding).
   76handle_event(none, Module, Outstanding) :-
   77    ( Outstanding > 0 ->  % block until pending workers are done
   78        debug(jolog, 'manager blocking', []),
   79        take_event_block(Module,Event),
   80        handle_event(Event, Module, Outstanding)
   81    ; true ->   % no chance of forward progress; stop Jolog
   82        debug(jolog,"No events, no outstanding workers: sending halt",[]),
   83        handle_event(send_message(halt), Module, Outstanding)
   84    ).
   85handle_event(halt, _, _).  % no more recursion
   86
   87
   88% Takes the next event that's available for the manager thread.
   89% Blocks if there are no events available. Should only be called by
   90% the manager thread.
   91take_event_block(Module,Event) :-
   92    jolog:meta(Module,manager_queue,ManagerQueue),
   93    thread_get_message(ManagerQueue, Event).
   94
   95
   96% Like take_event_block but binds Event to 'none' instead of blocking.
   97% Should only be called by the manager thread.
   98take_event_no_block(Module,Event) :-
   99    jolog:meta(Module,manager_queue,ManagerQueue),
  100    ( thread_get_message(ManagerQueue, Message, [timeout(0)]) ->
  101        Event = Message
  102    ; % otherwise ->
  103        Event = none
  104    )