View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(tipc_broadcast,
   36          [ tipc_host_to_address/2,             % ?Host, ?Address
   37            tipc_initialize/0
   38          ]).   39:- use_module(library(tipc/tipc),[tipc_initialize/0]).   40
   41/** <module> A TIPC Broadcast Bridge
   42
   43SWI-Prolog's broadcast library provides a  means   that  may  be used to
   44facilitate publish and subscribe communication regimes between anonymous
   45members of a community of interest.  The   members  of the community are
   46however, necessarily limited to a single   instance  of Prolog. The TIPC
   47broadcast library removes that restriction.   With  this library loaded,
   48any member of a TIPC network that also  has this library loaded may hear
   49and respond to your broadcasts. Using TIPC Broadcast, it becomes a
   50nearly trivial matter to build an instance of supercomputer that
   51researchers within the High Performance Computer community refer to as
   52"Beowulf Class Cluster Computers."
   53
   54This module has no public predicates. When this module is initialized,
   55it does three things:
   56
   57    * It starts  a  listener  daemon   thread  that  listens for
   58    broadcasts from others, received as TIPC datagrams, and
   59
   60    * It registers three listeners: tipc_node/1, tipc_cluster/1, and
   61    tipc_zone/1, and
   62
   63    * It registers three listeners: tipc_node/2, tipc_cluster/2, and
   64    tipc_zone/2.
   65
   66A broadcast/1 or broadcast_request/1 that is not  directed to one of the
   67six listeners above, behaves as usual and is confined to the instance of
   68Prolog that originated it. But when so   directed, the broadcast will be
   69sent to all participating systems, including   itself,  by way of TIPC's
   70multicast addressing facility. A TIPC broadcast or broadcast
   71request takes the typical form: =|broadcast(tipc_node(+Term,
   72+Timeout))|=. The principal functors =tipc_node=, =tipc_cluster=, and
   73=tipc_zone=, specify the scope of the broadcast. The functor
   74=tipc_node=, specifies that the broadcast is to be confined to members
   75of a present TIPC node. Likewise, =tipc_cluster= and =tipc_zone=,
   76specify that the traffic should be confined to members of a present TIPC
   77cluster and zone, respectively. To prevent the potential for feedback
   78loops, the scope qualifier is stripped from the message before
   79transmission. The timeout is optional. It specifies the amount to time
   80to wait for replies to arrive in response to a broadcast_request. The
   81default period is 0.250 seconds. The timeout is ignored for broadcasts.
   82
   83An example of three separate processes cooperating on the same Node:
   84
   85==
   86Process A:
   87
   88   ?- listen(number(X), between(1, 5, X)).
   89   true.
   90
   91   ?-
   92
   93Process B:
   94
   95   ?- listen(number(X), between(7, 9, X)).
   96   true.
   97
   98   ?-
   99
  100Process C:
  101
  102   ?- findall(X, broadcast_request(tipc_node(number(X))), Xs).
  103   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  104
  105   ?-
  106==
  107
  108It is also  possible  to  carry  on   a  private  dialog  with  a single
  109responder. To do this, you supply a   compound of the form, Term:PortId,
  110to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the
  111port-id of the intended listener.  If   you  supply an unbound variable,
  112PortId, to broadcast_request, it will be unified with the address of the
  113listener that responds to Term. You may   send a directed broadcast to a
  114specific  member  by  simply  providing  this  address  in  a  similarly
  115structured compound to a TIPC scoped   broadcast/1.  The message is sent
  116via unicast to that  member  only  by   way  of  the  member's broadcast
  117listener. It is received by the  listener   just  as any other broadcast
  118would be. The listener does not know the difference.
  119
  120Although this capability is needed under   some  circumstances, it has a
  121tendency to compromise the resilience of the broadcast model. You should
  122not rely on it too heavily, or fault tolerance will suffer.
  123
  124For example, in order to discover who responded with a particular value:
  125
  126==
  127Process A:
  128
  129   ?- listen(number(X), between(1, 3, X)).
  130   true.
  131
  132   ?-
  133
  134Process B:
  135
  136   ?- listen(number(X), between(7, 9, X)).
  137   true.
  138
  139   ?-
  140
  141Process C:
  142
  143   ?- broadcast_request(tipc_node(number(X):From)).
  144   X = 7,
  145   From = port_id('<1.1.1:3971170279>') ;
  146   X = 8,
  147   From = port_id('<1.1.1:3971170279>') ;
  148   X = 9,
  149   From = port_id('<1.1.1:3971170279>') ;
  150   X = 1,
  151   From = port_id('<1.1.1:3971170280>') ;
  152   X = 2,
  153   From = port_id('<1.1.1:3971170280>') ;
  154   X = 3,
  155   From = port_id('<1.1.1:3971170280>') ;
  156   false.
  157
  158?-
  159
  160==
  161
  162## Caveats {#tipc-caveats}
  163
  164While the implementation is mostly transparent, there are some important
  165and subtle differences that must be taken into consideration:
  166
  167    * TIPC broadcast now requires an initialization step in order to
  168    launch the broadcast listener daemon. See tipc_initialize/0.
  169
  170    * Prolog's broadcast_request/1 is nondet. It sends the request,
  171    then evaluates the replies synchronously, backtracking as needed
  172    until a satisfactory reply is received. The remaining potential
  173    replies are not evaluated. This is not so when TIPC is involved.
  174
  175    * A TIPC broadcast/1 is completely asynchronous.
  176
  177    * A  TIPC broadcast_request/1 is partially synchronous. A
  178    broadcast_request/1 is sent, then the sender balks for a period of
  179    time (default: 250 ms) while the replies are collected. Any reply
  180    that is received after this period is silently discarded. An
  181    optional second argument is provided so that a sender may specify
  182    more (or less) time for replies.
  183
  184    * Replies are _|no longer|_ collected using findall/3. Replies are
  185    presented to the user as a choice point on arrival, until the
  186    broadcast request timer finally expires. This change allows
  187    traffic to propagate through the system faster and provides the
  188    requestor with the opportunity to terminate a broadcast request
  189    early if desired, by simply cutting choice points.
  190
  191    * Please beware that broadcast request transactions will now remain
  192    active and resources consumed until broadcast_request finally fails
  193    on backtracking, an uncaught exception occurs, or until choice
  194    points are cut. Failure to properly manage this will likely result
  195    in chronic exhaustion of TIPC sockets.
  196
  197    * If a listener is connected to a generator that always succeeds
  198    (e.g. a random number generator), then the broadcast request will
  199    never terminate and trouble is bound to ensue.
  200
  201    * broadcast_request/1 with TIPC scope is _not_ reentrant (at
  202    least, not now anyway). If a listener performs a broadcast_request/1
  203    with TIPC scope recursively, then disaster looms certain. This
  204    caveat does not apply to a TIPC scoped broadcast/1, which can safely
  205    be performed from a listener context.
  206
  207    * TIPC's capacity is not infinite. While TIPC can tolerate
  208    substantial bursts of activity, it is designed for short bursts of
  209    small messages. It can tolerate several thousand replies in response
  210    to a broadcast_request/1 without trouble, but it will begin to
  211    encounter congestion beyond that. And in congested conditions,
  212    things will start to become unreliable as TIPC begins prioritizing
  213    and/or discarding traffic.
  214
  215    * A TIPC broadcast_request/1 term that is grounded is considered to
  216    be a broadcast only. No replies are collected unless the there is at
  217    least one unbound variable to unify.
  218
  219    * A TIPC broadcast/1 always succeeds, even if there are no
  220    listeners.
  221
  222    * A TIPC broadcast_request/1 that receives no replies will fail.
  223
  224    * Replies may be coming from many different places in the network
  225    (or none at all). No ordering of replies is implied.
  226
  227    * Prolog terms are sent to others after first converting them to
  228    atoms using term_to_atom/2. Passing real numbers this way may
  229    result in a substantial truncation of precision. See prolog flag
  230    option, 'float_format', of current_prolog_flag/2.
  231
  232@author    Jeffrey Rosenwald (JeffRose@acm.org)
  233@license   LGPL
  234@see       tipc.pl
  235@compat    Linux only
  236*/
  237
  238:- autoload(tipc,
  239	    [ tipc_get_name/2,
  240	      tipc_send/4,
  241	      tipc_socket/2,
  242	      tipc_close_socket/1,
  243	      tipc_setopt/2,
  244	      tipc_bind/3,
  245	      tipc_receive/4
  246	    ]).  247:- autoload(library(broadcast),
  248	    [broadcast_request/1,broadcast/1,listen/3,unlisten/1]).  249:- use_module(library(debug),[assertion/1]).  250:- autoload(library(time),
  251	    [call_with_time_limit/2,alarm/3,remove_alarm/1]).  252
  253:- require([ thread_self/1
  254           , forall/2
  255           , term_to_atom/2
  256           , thread_send_message/2
  257           , catch/3
  258           , setup_call_cleanup/3
  259           , thread_create/3
  260           ]).  261
  262:- meta_predicate safely(0), eventually_implies(0,0), ~>(0,0).  263
  264tipc_broadcast_service(node,            name_seq(20005, 0, 0)).
  265tipc_broadcast_service(cluster,         name_seq(20005, 1, 1)).
  266tipc_broadcast_service(zone,            name_seq(20005, 2, 2)).
  267
  268%
  269%  Here's a TIPC bridge to Prolog's broadcast library
  270%
  271%  A sender may confine a broadcast to  a   subset  of a TIPC network by
  272%  specifying a scoping qualifier in   his/her  broadcast. The qualifier
  273%  has the effect of selecting the   appropriate  multi-cast address for
  274%  the transmission. Thus, the sender of   the  message has control over
  275%  the scope of his/her traffic on a per-message basis.
  276%
  277%  All in-scope listeners receive the   broadcast and simply rebroadcast
  278%  the message locally. All broadcast replies, if any, are sent directly
  279%  to the sender via the port-id that   was received with the broadcast.
  280%  No additional multiplexing is required.
  281%
  282
  283safely(Predicate) :-
  284    catch(Predicate, Err, caught_safely(Err)).
  285
  286caught_safely('$aborted').
  287caught_safely(unwind(abort)).
  288caught_safely(Error) :-
  289    print_message(error, Error),
  290    fail.
  291
  292%!  ~>(:P, :Q) is semidet.
  293%!  eventually_implies(:P, :Q) is semidet.
  294%    asserts temporal Liveness (something good happens, eventually) and
  295%    Safety (nothing bad ever happens) properties. Analogous to the
  296%    "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of
  297%    lazy implication described informally as:
  298%
  299%    * Liveness: For all possible outcomes, P -> Q, eventually.
  300%    * Safety: For all possible outcomes, (\+P ; Q), is invariant.
  301%
  302%  Described practically:
  303%
  304%    P ~> Q, declares that if P is true, then Q must be true, now or at
  305%    some point in the future.
  306%
  307
  308eventually_implies(P, Q) :-
  309    setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)),
  310    Solution = yes.
  311
  312:- op(950, xfy, ~>).  313
  314~>(P, Q) :-
  315    eventually_implies(P, Q).
  316
  317ld_dispatch(S, '$tipc_request'(wru(Name)), From) :-
  318    !, tipc_get_name(S, Name),
  319    term_to_atom(wru(Name), Atom),
  320    tipc_send(S, Atom, From, []).
  321
  322ld_dispatch(S, '$tipc_request'(Term), From) :-
  323    !, forall(broadcast_request(Term),
  324          (   term_to_atom(Term, Atom),
  325              tipc_send(S, Atom, From, []))).
  326
  327ld_dispatch(_S, Term, _From) :-
  328    safely(broadcast(Term)).
  329
  330tipc_listener_daemon(Parent) :-
  331    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  332
  333%       tipc_setopt(S, importance(medium)),
  334    tipc_setopt(S, dest_droppable(true)),  % discard if not deliverable
  335
  336    forall(tipc_broadcast_service(Scope, Address),
  337         tipc_bind(S, Address, scope(Scope))),
  338
  339    listen(tipc_broadcast, Head, broadcast_listener(Head))
  340         ~> unlisten(tipc_broadcast),
  341
  342    thread_send_message(Parent, tipc_listener_daemon_ready),
  343
  344    repeat,
  345    safely(dispatch_traffic(S)).
  346
  347dispatch_traffic(S) :-
  348    tipc_receive(S, Data, From, [as(atom)]),
  349    term_to_atom(Term, Data),
  350    ld_dispatch(S, Term, From),
  351    !,
  352    dispatch_traffic(S).
  353
  354start_tipc_listener_daemon :-
  355    catch(thread_property(tipc_listener_daemon, status(running)),_, fail),
  356    !.
  357
  358start_tipc_listener_daemon :-
  359    thread_self(Self),
  360    thread_create(tipc_listener_daemon(Self), _,
  361           [alias(tipc_listener_daemon), detached(true)]),
  362    call_with_time_limit(6.0,
  363                         thread_get_message(tipc_listener_daemon_ready)).
  364
  365:- multifile tipc:host_to_address/2.  366%
  367broadcast_listener(tipc_host_to_address(Host, Addr)) :-
  368    tipc:host_to_address(Host, Addr).
  369
  370broadcast_listener(tipc_broadcast_service(Class, Addr)) :-
  371    tipc_broadcast_service(Class, Addr).
  372
  373broadcast_listener(tipc_node(X)) :-
  374    tipc_broadcast(X, node, 0.250).
  375
  376broadcast_listener(tipc_cluster(X)) :-
  377    tipc_broadcast(X, cluster, 0.250).
  378
  379broadcast_listener(tipc_zone(X)) :-
  380    tipc_broadcast(X, zone, 0.250).
  381
  382broadcast_listener(tipc_node(X, Timeout)) :-
  383    tipc_broadcast(X, node, Timeout).
  384
  385broadcast_listener(tipc_cluster(X, Timeout)) :-
  386    tipc_broadcast(X, cluster, Timeout).
  387
  388broadcast_listener(tipc_zone(X, Timeout)) :-
  389    tipc_broadcast(X, zone, Timeout).
  390
  391%
  392%
  393
  394tipc_basic_broadcast(S, Term, Address) :-
  395    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  396%   tipc_setopt(S, importance(medium)),
  397    term_to_atom(Term, Atom),
  398    safely(tipc_send(S, Atom, Address, [])).
  399
  400% directed broadcast to a single listener
  401tipc_broadcast(Term:To, _Scope, _Timeout) :-
  402    ground(Term), ground(To),
  403    !,
  404    tipc_basic_broadcast(_S, Term, To),
  405    !.
  406
  407% broadcast to all listeners
  408tipc_broadcast(Term, Scope, _Timeout) :-
  409    ground(Term),
  410    !,
  411    tipc_broadcast_service(Scope, Address),
  412    tipc_basic_broadcast(_S, Term, Address),
  413    !.
  414
  415% directed broadcast_request to a single listener
  416tipc_broadcast(Term:Address, _Scope, Timeout) :-
  417    ground(Address),
  418    !,
  419    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  420    tipc_br_collect_replies(S, Timeout, Term:Address).
  421
  422% broadcast_request to all listeners returning responder port-id
  423tipc_broadcast(Term:From, Scope, Timeout) :-
  424    !, tipc_broadcast_service(Scope, Address),
  425    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  426    tipc_br_collect_replies(S, Timeout, Term:From).
  427
  428% broadcast_request to all listeners ignoring responder port-id
  429tipc_broadcast(Term, Scope, Timeout) :-
  430    tipc_broadcast(Term:_, Scope, Timeout).
  431
  432tipc_br_send_timeout(Port) :-
  433    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  434
  435    tipc_setopt(S, importance(critical)),
  436    tipc_send(S, '$tipc_br_timeout', Port, []),
  437    !.
  438
  439tipc_br_collect_replies(S, Timeout, Term:From) :-
  440    tipc_get_name(S, Port),
  441    alarm(Timeout, tipc_br_send_timeout(Port), Id)
  442         ~> remove_alarm(Id),
  443    tipc_setopt(S, dispatch(false)),
  444    repeat,
  445    tipc_receive(S, Atom, From1, [as(atom)]),
  446    (   (Atom \== '$tipc_br_timeout')
  447        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  448        ;  (!, fail)).
  449
  450%!  tipc_host_to_address(?Service, ?Address) is nondet.
  451%
  452%   locates a TIPC service by name. Service  is an atom or grounded term
  453%   representing the common name  of  the   service.  Address  is a TIPC
  454%   address structure. A server may advertise   its  services by name by
  455%   including  the  fact,    tipc:host_to_address(+Service,   +Address),
  456%   somewhere in its source. This predicate can  also be used to perform
  457%   reverse searches. That is it  will  also   resolve  an  Address to a
  458%   Service name. The search is zone-wide. Locating a service however,
  459%   does not imply that the service is actually reachable from any
  460%   particular node within the zone.
  461%
  462
  463tipc_host_to_address(Host, Address) :-
  464    broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
  465
  466%!  tipc_initialize is semidet.
  467%   See tipc:tipc_initialize/0
  468%
  469:- multifile tipc:tipc_stack_initialize/0.  470
  471
  472%   tipc_stack_initialize() is det. causes any required runtime
  473%   initialization to occur. This called as a side-effect of
  474%   tipc_initialize/0, which is now required to be included in an
  475%   applications intialization directive.
  476%
  477tipc:tipc_stack_initialize :-
  478    start_tipc_listener_daemon