View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(tipc_broadcast,
   36          [ tipc_host_to_address/2,             % ?Host, ?Address
   37            tipc_initialize/0
   38          ]).   39:- use_module(library(tipc/tipc),[tipc_initialize/0]).   40
   41/** <module> A TIPC Broadcast Bridge
   42
   43SWI-Prolog's broadcast library provides a  means   that  may  be used to
   44facilitate publish and subscribe communication regimes between anonymous
   45members of a community of interest.  The   members  of the community are
   46however, necessarily limited to a single   instance  of Prolog. The TIPC
   47broadcast library removes that restriction.   With  this library loaded,
   48any member of a TIPC network that also  has this library loaded may hear
   49and respond to your broadcasts. Using TIPC Broadcast, it becomes a
   50nearly trivial matter to build an instance of supercomputer that
   51researchers within the High Performance Computer community refer to as
   52"Beowulf Class Cluster Computers."
   53
   54This module has no public predicates. When this module is initialized,
   55it does three things:
   56
   57    * It starts  a  listener  daemon   thread  that  listens for
   58    broadcasts from others, received as TIPC datagrams, and
   59
   60    * It registers three listeners: tipc_node/1, tipc_cluster/1, and
   61    tipc_zone/1, and
   62
   63    * It registers three listeners: tipc_node/2, tipc_cluster/2, and
   64    tipc_zone/2.
   65
   66A broadcast/1 or broadcast_request/1 that is not  directed to one of the
   67six listeners above, behaves as usual and is confined to the instance of
   68Prolog that originated it. But when so   directed, the broadcast will be
   69sent to all participating systems, including   itself,  by way of TIPC's
   70multicast addressing facility. A TIPC broadcast or broadcast
   71request takes the typical form: =|broadcast(tipc_node(+Term,
   72+Timeout))|=. The principal functors =tipc_node=, =tipc_cluster=, and
   73=tipc_zone=, specify the scope of the broadcast. The functor
   74=tipc_node=, specifies that the broadcast is to be confined to members
   75of a present TIPC node. Likewise, =tipc_cluster= and =tipc_zone=,
   76specify that the traffic should be confined to members of a present TIPC
   77cluster and zone, respectively. To prevent the potential for feedback
   78loops, the scope qualifier is stripped from the message before
   79transmission. The timeout is optional. It specifies the amount to time
   80to wait for replies to arrive in response to a broadcast_request. The
   81default period is 0.250 seconds. The timeout is ignored for broadcasts.
   82
   83An example of three separate processes cooperating on the same Node:
   84
   85==
   86Process A:
   87
   88   ?- listen(number(X), between(1, 5, X)).
   89   true.
   90
   91   ?-
   92
   93Process B:
   94
   95   ?- listen(number(X), between(7, 9, X)).
   96   true.
   97
   98   ?-
   99
  100Process C:
  101
  102   ?- findall(X, broadcast_request(tipc_node(number(X))), Xs).
  103   Xs = [1, 2, 3, 4, 5, 7, 8, 9].
  104
  105   ?-
  106==
  107
  108It is also  possible  to  carry  on   a  private  dialog  with  a single
  109responder. To do this, you supply a   compound of the form, Term:PortId,
  110to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the
  111port-id of the intended listener.  If   you  supply an unbound variable,
  112PortId, to broadcast_request, it will be unified with the address of the
  113listener that responds to Term. You may   send a directed broadcast to a
  114specific  member  by  simply  providing  this  address  in  a  similarly
  115structured compound to a TIPC scoped   broadcast/1.  The message is sent
  116via unicast to that  member  only  by   way  of  the  member's broadcast
  117listener. It is received by the  listener   just  as any other broadcast
  118would be. The listener does not know the difference.
  119
  120Although this capability is needed under   some  circumstances, it has a
  121tendency to compromise the resilience of the broadcast model. You should
  122not rely on it too heavily, or fault tolerance will suffer.
  123
  124For example, in order to discover who responded with a particular value:
  125
  126==
  127Process A:
  128
  129   ?- listen(number(X), between(1, 3, X)).
  130   true.
  131
  132   ?-
  133
  134Process B:
  135
  136   ?- listen(number(X), between(7, 9, X)).
  137   true.
  138
  139   ?-
  140
  141Process C:
  142
  143   ?- broadcast_request(tipc_node(number(X):From)).
  144   X = 7,
  145   From = port_id('<1.1.1:3971170279>') ;
  146   X = 8,
  147   From = port_id('<1.1.1:3971170279>') ;
  148   X = 9,
  149   From = port_id('<1.1.1:3971170279>') ;
  150   X = 1,
  151   From = port_id('<1.1.1:3971170280>') ;
  152   X = 2,
  153   From = port_id('<1.1.1:3971170280>') ;
  154   X = 3,
  155   From = port_id('<1.1.1:3971170280>') ;
  156   false.
  157
  158?-
  159
  160==
  161
  162## Caveats {#tipc-caveats}
  163
  164While the implementation is mostly transparent, there are some important
  165and subtle differences that must be taken into consideration:
  166
  167    * TIPC broadcast now requires an initialization step in order to
  168    launch the broadcast listener daemon. See tipc_initialize/0.
  169
  170    * Prolog's broadcast_request/1 is nondet. It sends the request,
  171    then evaluates the replies synchronously, backtracking as needed
  172    until a satisfactory reply is received. The remaining potential
  173    replies are not evaluated. This is not so when TIPC is involved.
  174
  175    * A TIPC broadcast/1 is completely asynchronous.
  176
  177    * A  TIPC broadcast_request/1 is partially synchronous. A
  178    broadcast_request/1 is sent, then the sender balks for a period of
  179    time (default: 250 ms) while the replies are collected. Any reply
  180    that is received after this period is silently discarded. An
  181    optional second argument is provided so that a sender may specify
  182    more (or less) time for replies.
  183
  184    * Replies are _|no longer|_ collected using findall/3. Replies are
  185    presented to the user as a choice point on arrival, until the
  186    broadcast request timer finally expires. This change allows
  187    traffic to propagate through the system faster and provides the
  188    requestor with the opportunity to terminate a broadcast request
  189    early if desired, by simply cutting choice points.
  190
  191    * Please beware that broadcast request transactions will now remain
  192    active and resources consumed until broadcast_request finally fails
  193    on backtracking, an uncaught exception occurs, or until choice
  194    points are cut. Failure to properly manage this will likely result
  195    in chronic exhaustion of TIPC sockets.
  196
  197    * If a listener is connected to a generator that always succeeds
  198    (e.g. a random number generator), then the broadcast request will
  199    never terminate and trouble is bound to ensue.
  200
  201    * broadcast_request/1 with TIPC scope is _not_ reentrant (at
  202    least, not now anyway). If a listener performs a broadcast_request/1
  203    with TIPC scope recursively, then disaster looms certain. This
  204    caveat does not apply to a TIPC scoped broadcast/1, which can safely
  205    be performed from a listener context.
  206
  207    * TIPC's capacity is not infinite. While TIPC can tolerate
  208    substantial bursts of activity, it is designed for short bursts of
  209    small messages. It can tolerate several thousand replies in response
  210    to a broadcast_request/1 without trouble, but it will begin to
  211    encounter congestion beyond that. And in congested conditions,
  212    things will start to become unreliable as TIPC begins prioritizing
  213    and/or discarding traffic.
  214
  215    * A TIPC broadcast_request/1 term that is grounded is considered to
  216    be a broadcast only. No replies are collected unless the there is at
  217    least one unbound variable to unify.
  218
  219    * A TIPC broadcast/1 always succeeds, even if there are no
  220    listeners.
  221
  222    * A TIPC broadcast_request/1 that receives no replies will fail.
  223
  224    * Replies may be coming from many different places in the network
  225    (or none at all). No ordering of replies is implied.
  226
  227    * Prolog terms are sent to others after first converting them to
  228    atoms using term_to_atom/2. Passing real numbers this way may
  229    result in a substantial truncation of precision. See prolog flag
  230    option, 'float_format', of current_prolog_flag/2.
  231
  232@author    Jeffrey Rosenwald (JeffRose@acm.org)
  233@license   LGPL
  234@see       tipc.pl
  235@compat    Linux only
  236*/
  237
  238:- autoload(tipc,
  239	    [ tipc_get_name/2,
  240	      tipc_send/4,
  241	      tipc_socket/2,
  242	      tipc_close_socket/1,
  243	      tipc_setopt/2,
  244	      tipc_bind/3,
  245	      tipc_receive/4
  246	    ]).  247:- autoload(library(broadcast),
  248	    [broadcast_request/1,broadcast/1,listen/3,unlisten/1]).  249:- use_module(library(debug),[assertion/1]).  250:- autoload(library(time),
  251	    [call_with_time_limit/2,alarm/3,remove_alarm/1]).  252
  253:- require([ thread_self/1
  254           , forall/2
  255           , term_to_atom/2
  256           , thread_send_message/2
  257           , catch/3
  258           , setup_call_cleanup/3
  259           , thread_create/3
  260           ]).  261
  262:- meta_predicate safely(0), eventually_implies(0,0), ~>(0,0).  263
  264tipc_broadcast_service(node,            name_seq(20005, 0, 0)).
  265tipc_broadcast_service(cluster,         name_seq(20005, 1, 1)).
  266tipc_broadcast_service(zone,            name_seq(20005, 2, 2)).
  267
  268%
  269%  Here's a TIPC bridge to Prolog's broadcast library
  270%
  271%  A sender may confine a broadcast to  a   subset  of a TIPC network by
  272%  specifying a scoping qualifier in   his/her  broadcast. The qualifier
  273%  has the effect of selecting the   appropriate  multi-cast address for
  274%  the transmission. Thus, the sender of   the  message has control over
  275%  the scope of his/her traffic on a per-message basis.
  276%
  277%  All in-scope listeners receive the   broadcast and simply rebroadcast
  278%  the message locally. All broadcast replies, if any, are sent directly
  279%  to the sender via the port-id that   was received with the broadcast.
  280%  No additional multiplexing is required.
  281%
  282
  283safely(Predicate) :-
  284    catch(Predicate, Err,
  285          (Err == '$aborted' -> (!, fail);
  286          print_message(error, Err), fail)).
  287
  288%!  ~>(:P, :Q) is semidet.
  289%!  eventually_implies(:P, :Q) is semidet.
  290%    asserts temporal Liveness (something good happens, eventually) and
  291%    Safety (nothing bad ever happens) properties. Analogous to the
  292%    "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of
  293%    lazy implication described informally as:
  294%
  295%    * Liveness: For all possible outcomes, P -> Q, eventually.
  296%    * Safety: For all possible outcomes, (\+P ; Q), is invariant.
  297%
  298%  Described practically:
  299%
  300%    P ~> Q, declares that if P is true, then Q must be true, now or at
  301%    some point in the future.
  302%
  303
  304eventually_implies(P, Q) :-
  305    setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)),
  306    Solution = yes.
  307
  308:- op(950, xfy, ~>).  309
  310~>(P, Q) :-
  311    eventually_implies(P, Q).
  312
  313ld_dispatch(S, '$tipc_request'(wru(Name)), From) :-
  314    !, tipc_get_name(S, Name),
  315    term_to_atom(wru(Name), Atom),
  316    tipc_send(S, Atom, From, []).
  317
  318ld_dispatch(S, '$tipc_request'(Term), From) :-
  319    !, forall(broadcast_request(Term),
  320          (   term_to_atom(Term, Atom),
  321              tipc_send(S, Atom, From, []))).
  322
  323ld_dispatch(_S, Term, _From) :-
  324    safely(broadcast(Term)).
  325
  326tipc_listener_daemon(Parent) :-
  327    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  328
  329%       tipc_setopt(S, importance(medium)),
  330    tipc_setopt(S, dest_droppable(true)),  % discard if not deliverable
  331
  332    forall(tipc_broadcast_service(Scope, Address),
  333         tipc_bind(S, Address, scope(Scope))),
  334
  335    listen(tipc_broadcast, Head, broadcast_listener(Head))
  336         ~> unlisten(tipc_broadcast),
  337
  338    thread_send_message(Parent, tipc_listener_daemon_ready),
  339
  340    repeat,
  341    safely(dispatch_traffic(S)).
  342
  343dispatch_traffic(S) :-
  344    tipc_receive(S, Data, From, [as(atom)]),
  345    term_to_atom(Term, Data),
  346    ld_dispatch(S, Term, From),
  347    !,
  348    dispatch_traffic(S).
  349
  350start_tipc_listener_daemon :-
  351    catch(thread_property(tipc_listener_daemon, status(running)),_, fail),
  352    !.
  353
  354start_tipc_listener_daemon :-
  355    thread_self(Self),
  356    thread_create(tipc_listener_daemon(Self), _,
  357           [alias(tipc_listener_daemon), detached(true)]),
  358    call_with_time_limit(6.0,
  359                         thread_get_message(tipc_listener_daemon_ready)).
  360
  361:- multifile tipc:host_to_address/2.  362%
  363broadcast_listener(tipc_host_to_address(Host, Addr)) :-
  364    tipc:host_to_address(Host, Addr).
  365
  366broadcast_listener(tipc_broadcast_service(Class, Addr)) :-
  367    tipc_broadcast_service(Class, Addr).
  368
  369broadcast_listener(tipc_node(X)) :-
  370    tipc_broadcast(X, node, 0.250).
  371
  372broadcast_listener(tipc_cluster(X)) :-
  373    tipc_broadcast(X, cluster, 0.250).
  374
  375broadcast_listener(tipc_zone(X)) :-
  376    tipc_broadcast(X, zone, 0.250).
  377
  378broadcast_listener(tipc_node(X, Timeout)) :-
  379    tipc_broadcast(X, node, Timeout).
  380
  381broadcast_listener(tipc_cluster(X, Timeout)) :-
  382    tipc_broadcast(X, cluster, Timeout).
  383
  384broadcast_listener(tipc_zone(X, Timeout)) :-
  385    tipc_broadcast(X, zone, Timeout).
  386
  387%
  388%
  389
  390tipc_basic_broadcast(S, Term, Address) :-
  391    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  392%   tipc_setopt(S, importance(medium)),
  393    term_to_atom(Term, Atom),
  394    safely(tipc_send(S, Atom, Address, [])).
  395
  396% directed broadcast to a single listener
  397tipc_broadcast(Term:To, _Scope, _Timeout) :-
  398    ground(Term), ground(To),
  399    !,
  400    tipc_basic_broadcast(_S, Term, To),
  401    !.
  402
  403% broadcast to all listeners
  404tipc_broadcast(Term, Scope, _Timeout) :-
  405    ground(Term),
  406    !,
  407    tipc_broadcast_service(Scope, Address),
  408    tipc_basic_broadcast(_S, Term, Address),
  409    !.
  410
  411% directed broadcast_request to a single listener
  412tipc_broadcast(Term:Address, _Scope, Timeout) :-
  413    ground(Address),
  414    !,
  415    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  416    tipc_br_collect_replies(S, Timeout, Term:Address).
  417
  418% broadcast_request to all listeners returning responder port-id
  419tipc_broadcast(Term:From, Scope, Timeout) :-
  420    !, tipc_broadcast_service(Scope, Address),
  421    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  422    tipc_br_collect_replies(S, Timeout, Term:From).
  423
  424% broadcast_request to all listeners ignoring responder port-id
  425tipc_broadcast(Term, Scope, Timeout) :-
  426    tipc_broadcast(Term:_, Scope, Timeout).
  427
  428tipc_br_send_timeout(Port) :-
  429    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  430
  431    tipc_setopt(S, importance(critical)),
  432    tipc_send(S, '$tipc_br_timeout', Port, []),
  433    !.
  434
  435tipc_br_collect_replies(S, Timeout, Term:From) :-
  436    tipc_get_name(S, Port),
  437    alarm(Timeout, tipc_br_send_timeout(Port), Id)
  438         ~> remove_alarm(Id),
  439    tipc_setopt(S, dispatch(false)),
  440    repeat,
  441    tipc_receive(S, Atom, From1, [as(atom)]),
  442    (   (Atom \== '$tipc_br_timeout')
  443        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  444        ;  (!, fail)).
  445
  446%!  tipc_host_to_address(?Service, ?Address) is nondet.
  447%
  448%   locates a TIPC service by name. Service  is an atom or grounded term
  449%   representing the common name  of  the   service.  Address  is a TIPC
  450%   address structure. A server may advertise   its  services by name by
  451%   including  the  fact,    tipc:host_to_address(+Service,   +Address),
  452%   somewhere in its source. This predicate can  also be used to perform
  453%   reverse searches. That is it  will  also   resolve  an  Address to a
  454%   Service name. The search is zone-wide. Locating a service however,
  455%   does not imply that the service is actually reachable from any
  456%   particular node within the zone.
  457%
  458
  459tipc_host_to_address(Host, Address) :-
  460    broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
  461
  462%!  tipc_initialize is semidet.
  463%   See tipc:tipc_initialize/0
  464%
  465:- multifile tipc:tipc_stack_initialize/0.  466
  467
  468%   tipc_stack_initialize() is det. causes any required runtime
  469%   initialization to occur. This called as a side-effect of
  470%   tipc_initialize/0, which is now required to be included in an
  471%   applications intialization directive.
  472%
  473tipc:tipc_stack_initialize :-
  474    start_tipc_listener_daemon