View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(tipc_broadcast,
   36          [ tipc_host_to_address/2,             % ?Host, ?Address
   37            tipc_initialize/0
   38          ]).   39:- use_module(library(tipc/tipc),[tipc_initialize/0]).

A TIPC Broadcast Bridge

SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The TIPC broadcast library removes that restriction. With this library loaded, any member of a TIPC network that also has this library loaded may hear and respond to your broadcasts. Using TIPC Broadcast, it becomes a nearly trivial matter to build an instance of supercomputer that researchers within the High Performance Computer community refer to as "Beowulf Class Cluster Computers."

This module has no public predicates. When this module is initialized, it does three things:

A broadcast/1 or broadcast_request/1 that is not directed to one of the six listeners above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of TIPC's multicast addressing facility. A TIPC broadcast or broadcast request takes the typical form: broadcast(tipc_node(+Term, +Timeout)). The principal functors tipc_node, tipc_cluster, and tipc_zone, specify the scope of the broadcast. The functor tipc_node, specifies that the broadcast is to be confined to members of a present TIPC node. Likewise, tipc_cluster and tipc_zone, specify that the traffic should be confined to members of a present TIPC cluster and zone, respectively. To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.

An example of three separate processes cooperating on the same Node:

Process A:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- findall(X, broadcast_request(tipc_node(number(X))), Xs).
   Xs = [1, 2, 3, 4, 5, 7, 8, 9].

   ?-

It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a TIPC scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.

Although this capability is needed under some circumstances, it has a tendency to compromise the resilience of the broadcast model. You should not rely on it too heavily, or fault tolerance will suffer.

For example, in order to discover who responded with a particular value:

Process A:

   ?- listen(number(X), between(1, 3, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- broadcast_request(tipc_node(number(X):From)).
   X = 7,
   From = port_id('<1.1.1:3971170279>') ;
   X = 8,
   From = port_id('<1.1.1:3971170279>') ;
   X = 9,
   From = port_id('<1.1.1:3971170279>') ;
   X = 1,
   From = port_id('<1.1.1:3971170280>') ;
   X = 2,
   From = port_id('<1.1.1:3971170280>') ;
   X = 3,
   From = port_id('<1.1.1:3971170280>') ;
   false.

?-

Caveats

While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc.pl
Compatibility
- Linux only */
license
- LGPL
  238:- autoload(tipc,
  239	    [ tipc_get_name/2,
  240	      tipc_send/4,
  241	      tipc_socket/2,
  242	      tipc_close_socket/1,
  243	      tipc_setopt/2,
  244	      tipc_bind/3,
  245	      tipc_receive/4
  246	    ]).  247:- autoload(library(broadcast),
  248	    [broadcast_request/1,broadcast/1,listen/3,unlisten/1]).  249:- use_module(library(debug),[assertion/1]).  250:- autoload(library(time),
  251	    [call_with_time_limit/2,alarm/3,remove_alarm/1]).  252
  253:- require([ thread_self/1
  254           , forall/2
  255           , term_to_atom/2
  256           , thread_send_message/2
  257           , catch/3
  258           , setup_call_cleanup/3
  259           , thread_create/3
  260           ]).  261
  262:- meta_predicate safely(0), eventually_implies(0,0), ~>(0,0).  263
  264tipc_broadcast_service(node,            name_seq(20005, 0, 0)).
  265tipc_broadcast_service(cluster,         name_seq(20005, 1, 1)).
  266tipc_broadcast_service(zone,            name_seq(20005, 2, 2)).
  267
  268%
  269%  Here's a TIPC bridge to Prolog's broadcast library
  270%
  271%  A sender may confine a broadcast to  a   subset  of a TIPC network by
  272%  specifying a scoping qualifier in   his/her  broadcast. The qualifier
  273%  has the effect of selecting the   appropriate  multi-cast address for
  274%  the transmission. Thus, the sender of   the  message has control over
  275%  the scope of his/her traffic on a per-message basis.
  276%
  277%  All in-scope listeners receive the   broadcast and simply rebroadcast
  278%  the message locally. All broadcast replies, if any, are sent directly
  279%  to the sender via the port-id that   was received with the broadcast.
  280%  No additional multiplexing is required.
  281%
  282
  283safely(Predicate) :-
  284    catch(Predicate, Err, caught_safely(Err)).
  285
  286caught_safely('$aborted').
  287caught_safely(unwind(abort)).
  288caught_safely(Error) :-
  289    print_message(error, Error),
  290    fail.
 ~>(:P, :Q) is semidet
 eventually_implies(:P, :Q) is semidet
asserts temporal Liveness (something good happens, eventually) and Safety (nothing bad ever happens) properties. Analogous to the "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of lazy implication described informally as:

Described practically:

P ~> Q, declares that if P is true, then Q must be true, now or at some point in the future.

  308eventually_implies(P, Q) :-
  309    setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)),
  310    Solution = yes.
  311
  312:- op(950, xfy, ~>).  313
  314~>(P, Q) :-
  315    eventually_implies(P, Q).
  316
  317ld_dispatch(S, '$tipc_request'(wru(Name)), From) :-
  318    !, tipc_get_name(S, Name),
  319    term_to_atom(wru(Name), Atom),
  320    tipc_send(S, Atom, From, []).
  321
  322ld_dispatch(S, '$tipc_request'(Term), From) :-
  323    !, forall(broadcast_request(Term),
  324          (   term_to_atom(Term, Atom),
  325              tipc_send(S, Atom, From, []))).
  326
  327ld_dispatch(_S, Term, _From) :-
  328    safely(broadcast(Term)).
  329
  330tipc_listener_daemon(Parent) :-
  331    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  332
  333%       tipc_setopt(S, importance(medium)),
  334    tipc_setopt(S, dest_droppable(true)),  % discard if not deliverable
  335
  336    forall(tipc_broadcast_service(Scope, Address),
  337         tipc_bind(S, Address, scope(Scope))),
  338
  339    listen(tipc_broadcast, Head, broadcast_listener(Head))
  340         ~> unlisten(tipc_broadcast),
  341
  342    thread_send_message(Parent, tipc_listener_daemon_ready),
  343
  344    repeat,
  345    safely(dispatch_traffic(S)).
  346
  347dispatch_traffic(S) :-
  348    tipc_receive(S, Data, From, [as(atom)]),
  349    term_to_atom(Term, Data),
  350    ld_dispatch(S, Term, From),
  351    !,
  352    dispatch_traffic(S).
  353
  354start_tipc_listener_daemon :-
  355    catch(thread_property(tipc_listener_daemon, status(running)),_, fail),
  356    !.
  357
  358start_tipc_listener_daemon :-
  359    thread_self(Self),
  360    thread_create(tipc_listener_daemon(Self), _,
  361           [alias(tipc_listener_daemon), detached(true)]),
  362    call_with_time_limit(6.0,
  363                         thread_get_message(tipc_listener_daemon_ready)).
  364
  365:- multifile tipc:host_to_address/2.  366%
  367broadcast_listener(tipc_host_to_address(Host, Addr)) :-
  368    tipc:host_to_address(Host, Addr).
  369
  370broadcast_listener(tipc_broadcast_service(Class, Addr)) :-
  371    tipc_broadcast_service(Class, Addr).
  372
  373broadcast_listener(tipc_node(X)) :-
  374    tipc_broadcast(X, node, 0.250).
  375
  376broadcast_listener(tipc_cluster(X)) :-
  377    tipc_broadcast(X, cluster, 0.250).
  378
  379broadcast_listener(tipc_zone(X)) :-
  380    tipc_broadcast(X, zone, 0.250).
  381
  382broadcast_listener(tipc_node(X, Timeout)) :-
  383    tipc_broadcast(X, node, Timeout).
  384
  385broadcast_listener(tipc_cluster(X, Timeout)) :-
  386    tipc_broadcast(X, cluster, Timeout).
  387
  388broadcast_listener(tipc_zone(X, Timeout)) :-
  389    tipc_broadcast(X, zone, Timeout).
  390
  391%
  392%
  393
  394tipc_basic_broadcast(S, Term, Address) :-
  395    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  396%   tipc_setopt(S, importance(medium)),
  397    term_to_atom(Term, Atom),
  398    safely(tipc_send(S, Atom, Address, [])).
  399
  400% directed broadcast to a single listener
  401tipc_broadcast(Term:To, _Scope, _Timeout) :-
  402    ground(Term), ground(To),
  403    !,
  404    tipc_basic_broadcast(_S, Term, To),
  405    !.
  406
  407% broadcast to all listeners
  408tipc_broadcast(Term, Scope, _Timeout) :-
  409    ground(Term),
  410    !,
  411    tipc_broadcast_service(Scope, Address),
  412    tipc_basic_broadcast(_S, Term, Address),
  413    !.
  414
  415% directed broadcast_request to a single listener
  416tipc_broadcast(Term:Address, _Scope, Timeout) :-
  417    ground(Address),
  418    !,
  419    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  420    tipc_br_collect_replies(S, Timeout, Term:Address).
  421
  422% broadcast_request to all listeners returning responder port-id
  423tipc_broadcast(Term:From, Scope, Timeout) :-
  424    !, tipc_broadcast_service(Scope, Address),
  425    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  426    tipc_br_collect_replies(S, Timeout, Term:From).
  427
  428% broadcast_request to all listeners ignoring responder port-id
  429tipc_broadcast(Term, Scope, Timeout) :-
  430    tipc_broadcast(Term:_, Scope, Timeout).
  431
  432tipc_br_send_timeout(Port) :-
  433    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  434
  435    tipc_setopt(S, importance(critical)),
  436    tipc_send(S, '$tipc_br_timeout', Port, []),
  437    !.
  438
  439tipc_br_collect_replies(S, Timeout, Term:From) :-
  440    tipc_get_name(S, Port),
  441    alarm(Timeout, tipc_br_send_timeout(Port), Id)
  442         ~> remove_alarm(Id),
  443    tipc_setopt(S, dispatch(false)),
  444    repeat,
  445    tipc_receive(S, Atom, From1, [as(atom)]),
  446    (   (Atom \== '$tipc_br_timeout')
  447        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  448        ;  (!, fail)).
 tipc_host_to_address(?Service, ?Address) is nondet
locates a TIPC service by name. Service is an atom or grounded term representing the common name of the service. Address is a TIPC address structure. A server may advertise its services by name by including the fact, tipc:host_to_address(+Service, +Address), somewhere in its source. This predicate can also be used to perform reverse searches. That is it will also resolve an Address to a Service name. The search is zone-wide. Locating a service however, does not imply that the service is actually reachable from any particular node within the zone.
  463tipc_host_to_address(Host, Address) :-
  464    broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
 tipc_initialize is semidet
See tipc:tipc_initialize/0
  469:- multifile tipc:tipc_stack_initialize/0.  470
  471
  472%   tipc_stack_initialize() is det. causes any required runtime
  473%   initialization to occur. This called as a side-effect of
  474%   tipc_initialize/0, which is now required to be included in an
  475%   applications intialization directive.
  476%
  477tipc:tipc_stack_initialize :-
  478    start_tipc_listener_daemon