View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jeffrey Rosenwald
    4    E-mail:        jeffrose@acm.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2009-2013, Jeffrey Rosenwald
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(tipc_broadcast,
   36          [ tipc_host_to_address/2,             % ?Host, ?Address
   37            tipc_initialize/0
   38          ]).   39:- use_module(library(tipc/tipc),[tipc_initialize/0]).

A TIPC Broadcast Bridge

SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The TIPC broadcast library removes that restriction. With this library loaded, any member of a TIPC network that also has this library loaded may hear and respond to your broadcasts. Using TIPC Broadcast, it becomes a nearly trivial matter to build an instance of supercomputer that researchers within the High Performance Computer community refer to as "Beowulf Class Cluster Computers."

This module has no public predicates. When this module is initialized, it does three things:

A broadcast/1 or broadcast_request/1 that is not directed to one of the six listeners above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of TIPC's multicast addressing facility. A TIPC broadcast or broadcast request takes the typical form: broadcast(tipc_node(+Term, +Timeout)). The principal functors tipc_node, tipc_cluster, and tipc_zone, specify the scope of the broadcast. The functor tipc_node, specifies that the broadcast is to be confined to members of a present TIPC node. Likewise, tipc_cluster and tipc_zone, specify that the traffic should be confined to members of a present TIPC cluster and zone, respectively. To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.

An example of three separate processes cooperating on the same Node:

Process A:

   ?- listen(number(X), between(1, 5, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- findall(X, broadcast_request(tipc_node(number(X))), Xs).
   Xs = [1, 2, 3, 4, 5, 7, 8, 9].

   ?-

It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a TIPC scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.

Although this capability is needed under some circumstances, it has a tendency to compromise the resilience of the broadcast model. You should not rely on it too heavily, or fault tolerance will suffer.

For example, in order to discover who responded with a particular value:

Process A:

   ?- listen(number(X), between(1, 3, X)).
   true.

   ?-

Process B:

   ?- listen(number(X), between(7, 9, X)).
   true.

   ?-

Process C:

   ?- broadcast_request(tipc_node(number(X):From)).
   X = 7,
   From = port_id('<1.1.1:3971170279>') ;
   X = 8,
   From = port_id('<1.1.1:3971170279>') ;
   X = 9,
   From = port_id('<1.1.1:3971170279>') ;
   X = 1,
   From = port_id('<1.1.1:3971170280>') ;
   X = 2,
   From = port_id('<1.1.1:3971170280>') ;
   X = 3,
   From = port_id('<1.1.1:3971170280>') ;
   false.

?-

Caveats

While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:

author
- Jeffrey Rosenwald (JeffRose@acm.org)
See also
- tipc.pl
Compatibility
- Linux only */
license
- LGPL
  238:- autoload(tipc,
  239	    [ tipc_get_name/2,
  240	      tipc_send/4,
  241	      tipc_socket/2,
  242	      tipc_close_socket/1,
  243	      tipc_setopt/2,
  244	      tipc_bind/3,
  245	      tipc_receive/4
  246	    ]).  247:- autoload(library(broadcast),
  248	    [broadcast_request/1,broadcast/1,listen/3,unlisten/1]).  249:- use_module(library(debug),[assertion/1]).  250:- autoload(library(time),
  251	    [call_with_time_limit/2,alarm/3,remove_alarm/1]).  252
  253:- require([ thread_self/1
  254           , forall/2
  255           , term_to_atom/2
  256           , thread_send_message/2
  257           , catch/3
  258           , setup_call_cleanup/3
  259           , thread_create/3
  260           ]).  261
  262:- meta_predicate safely(0), eventually_implies(0,0), ~>(0,0).  263
  264tipc_broadcast_service(node,            name_seq(20005, 0, 0)).
  265tipc_broadcast_service(cluster,         name_seq(20005, 1, 1)).
  266tipc_broadcast_service(zone,            name_seq(20005, 2, 2)).
  267
  268%
  269%  Here's a TIPC bridge to Prolog's broadcast library
  270%
  271%  A sender may confine a broadcast to  a   subset  of a TIPC network by
  272%  specifying a scoping qualifier in   his/her  broadcast. The qualifier
  273%  has the effect of selecting the   appropriate  multi-cast address for
  274%  the transmission. Thus, the sender of   the  message has control over
  275%  the scope of his/her traffic on a per-message basis.
  276%
  277%  All in-scope listeners receive the   broadcast and simply rebroadcast
  278%  the message locally. All broadcast replies, if any, are sent directly
  279%  to the sender via the port-id that   was received with the broadcast.
  280%  No additional multiplexing is required.
  281%
  282
  283safely(Predicate) :-
  284    catch(Predicate, Err,
  285          (Err == '$aborted' -> (!, fail);
  286          print_message(error, Err), fail)).
 ~>(:P, :Q) is semidet
 eventually_implies(:P, :Q) is semidet
asserts temporal Liveness (something good happens, eventually) and Safety (nothing bad ever happens) properties. Analogous to the "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of lazy implication described informally as:

Described practically:

P ~> Q, declares that if P is true, then Q must be true, now or at some point in the future.

  304eventually_implies(P, Q) :-
  305    setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)),
  306    Solution = yes.
  307
  308:- op(950, xfy, ~>).  309
  310~>(P, Q) :-
  311    eventually_implies(P, Q).
  312
  313ld_dispatch(S, '$tipc_request'(wru(Name)), From) :-
  314    !, tipc_get_name(S, Name),
  315    term_to_atom(wru(Name), Atom),
  316    tipc_send(S, Atom, From, []).
  317
  318ld_dispatch(S, '$tipc_request'(Term), From) :-
  319    !, forall(broadcast_request(Term),
  320          (   term_to_atom(Term, Atom),
  321              tipc_send(S, Atom, From, []))).
  322
  323ld_dispatch(_S, Term, _From) :-
  324    safely(broadcast(Term)).
  325
  326tipc_listener_daemon(Parent) :-
  327    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  328
  329%       tipc_setopt(S, importance(medium)),
  330    tipc_setopt(S, dest_droppable(true)),  % discard if not deliverable
  331
  332    forall(tipc_broadcast_service(Scope, Address),
  333         tipc_bind(S, Address, scope(Scope))),
  334
  335    listen(tipc_broadcast, Head, broadcast_listener(Head))
  336         ~> unlisten(tipc_broadcast),
  337
  338    thread_send_message(Parent, tipc_listener_daemon_ready),
  339
  340    repeat,
  341    safely(dispatch_traffic(S)).
  342
  343dispatch_traffic(S) :-
  344    tipc_receive(S, Data, From, [as(atom)]),
  345    term_to_atom(Term, Data),
  346    ld_dispatch(S, Term, From),
  347    !,
  348    dispatch_traffic(S).
  349
  350start_tipc_listener_daemon :-
  351    catch(thread_property(tipc_listener_daemon, status(running)),_, fail),
  352    !.
  353
  354start_tipc_listener_daemon :-
  355    thread_self(Self),
  356    thread_create(tipc_listener_daemon(Self), _,
  357           [alias(tipc_listener_daemon), detached(true)]),
  358    call_with_time_limit(6.0,
  359                         thread_get_message(tipc_listener_daemon_ready)).
  360
  361:- multifile tipc:host_to_address/2.  362%
  363broadcast_listener(tipc_host_to_address(Host, Addr)) :-
  364    tipc:host_to_address(Host, Addr).
  365
  366broadcast_listener(tipc_broadcast_service(Class, Addr)) :-
  367    tipc_broadcast_service(Class, Addr).
  368
  369broadcast_listener(tipc_node(X)) :-
  370    tipc_broadcast(X, node, 0.250).
  371
  372broadcast_listener(tipc_cluster(X)) :-
  373    tipc_broadcast(X, cluster, 0.250).
  374
  375broadcast_listener(tipc_zone(X)) :-
  376    tipc_broadcast(X, zone, 0.250).
  377
  378broadcast_listener(tipc_node(X, Timeout)) :-
  379    tipc_broadcast(X, node, Timeout).
  380
  381broadcast_listener(tipc_cluster(X, Timeout)) :-
  382    tipc_broadcast(X, cluster, Timeout).
  383
  384broadcast_listener(tipc_zone(X, Timeout)) :-
  385    tipc_broadcast(X, zone, Timeout).
  386
  387%
  388%
  389
  390tipc_basic_broadcast(S, Term, Address) :-
  391    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  392%   tipc_setopt(S, importance(medium)),
  393    term_to_atom(Term, Atom),
  394    safely(tipc_send(S, Atom, Address, [])).
  395
  396% directed broadcast to a single listener
  397tipc_broadcast(Term:To, _Scope, _Timeout) :-
  398    ground(Term), ground(To),
  399    !,
  400    tipc_basic_broadcast(_S, Term, To),
  401    !.
  402
  403% broadcast to all listeners
  404tipc_broadcast(Term, Scope, _Timeout) :-
  405    ground(Term),
  406    !,
  407    tipc_broadcast_service(Scope, Address),
  408    tipc_basic_broadcast(_S, Term, Address),
  409    !.
  410
  411% directed broadcast_request to a single listener
  412tipc_broadcast(Term:Address, _Scope, Timeout) :-
  413    ground(Address),
  414    !,
  415    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  416    tipc_br_collect_replies(S, Timeout, Term:Address).
  417
  418% broadcast_request to all listeners returning responder port-id
  419tipc_broadcast(Term:From, Scope, Timeout) :-
  420    !, tipc_broadcast_service(Scope, Address),
  421    tipc_basic_broadcast(S, '$tipc_request'(Term), Address),
  422    tipc_br_collect_replies(S, Timeout, Term:From).
  423
  424% broadcast_request to all listeners ignoring responder port-id
  425tipc_broadcast(Term, Scope, Timeout) :-
  426    tipc_broadcast(Term:_, Scope, Timeout).
  427
  428tipc_br_send_timeout(Port) :-
  429    tipc_socket(S, rdm) ~> tipc_close_socket(S),
  430
  431    tipc_setopt(S, importance(critical)),
  432    tipc_send(S, '$tipc_br_timeout', Port, []),
  433    !.
  434
  435tipc_br_collect_replies(S, Timeout, Term:From) :-
  436    tipc_get_name(S, Port),
  437    alarm(Timeout, tipc_br_send_timeout(Port), Id)
  438         ~> remove_alarm(Id),
  439    tipc_setopt(S, dispatch(false)),
  440    repeat,
  441    tipc_receive(S, Atom, From1, [as(atom)]),
  442    (   (Atom \== '$tipc_br_timeout')
  443        -> (From1 = From, safely(term_to_atom(Term, Atom)))
  444        ;  (!, fail)).
 tipc_host_to_address(?Service, ?Address) is nondet
locates a TIPC service by name. Service is an atom or grounded term representing the common name of the service. Address is a TIPC address structure. A server may advertise its services by name by including the fact, tipc:host_to_address(+Service, +Address), somewhere in its source. This predicate can also be used to perform reverse searches. That is it will also resolve an Address to a Service name. The search is zone-wide. Locating a service however, does not imply that the service is actually reachable from any particular node within the zone.
  459tipc_host_to_address(Host, Address) :-
  460    broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
 tipc_initialize is semidet
See tipc:tipc_initialize/0
  465:- multifile tipc:tipc_stack_initialize/0.  466
  467
  468%   tipc_stack_initialize() is det. causes any required runtime
  469%   initialization to occur. This called as a side-effect of
  470%   tipc_initialize/0, which is now required to be included in an
  471%   applications intialization directive.
  472%
  473tipc:tipc_stack_initialize :-
  474    start_tipc_listener_daemon