1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_broadcast, 36 [ tipc_host_to_address/2, % ?Host, ?Address 37 tipc_initialize/0 38 ]). 39:- use_module(library(tipc/tipc),[tipc_initialize/0]).
238:- autoload(tipc, 239 [ tipc_get_name/2, 240 tipc_send/4, 241 tipc_socket/2, 242 tipc_close_socket/1, 243 tipc_setopt/2, 244 tipc_bind/3, 245 tipc_receive/4 246 ]). 247:- autoload(library(broadcast), 248 [broadcast_request/1,broadcast/1,listen/3,unlisten/1]). 249:- use_module(library(debug),[assertion/1]). 250:- autoload(library(time), 251 [call_with_time_limit/2,alarm/3,remove_alarm/1]). 252 253:- require([ thread_self/1 254 , forall/2 255 , term_to_atom/2 256 , thread_send_message/2 257 , catch/3 258 , setup_call_cleanup/3 259 , thread_create/3 260 ]). 261 262:- meta_predicate safely( ), eventually_implies( , ), ~>( , ). 263 264tipc_broadcast_service(node, name_seq(20005, 0, 0)). 265tipc_broadcast_service(cluster, name_seq(20005, 1, 1)). 266tipc_broadcast_service(zone, name_seq(20005, 2, 2)). 267 268% 269% Here's a TIPC bridge to Prolog's broadcast library 270% 271% A sender may confine a broadcast to a subset of a TIPC network by 272% specifying a scoping qualifier in his/her broadcast. The qualifier 273% has the effect of selecting the appropriate multi-cast address for 274% the transmission. Thus, the sender of the message has control over 275% the scope of his/her traffic on a per-message basis. 276% 277% All in-scope listeners receive the broadcast and simply rebroadcast 278% the message locally. All broadcast replies, if any, are sent directly 279% to the sender via the port-id that was received with the broadcast. 280% No additional multiplexing is required. 281% 282 283safely(Predicate) :- 284 catch(Predicate, Err, caught_safely(Err)). 285 286caught_safely('$aborted'). 287caught_safely(unwind(abort)). 288caught_safely(Error) :- 289 print_message(error, Error), 290 fail.
Described practically:
P ~> Q, declares that if P is true, then Q must be true, now or at some point in the future.
308eventually_implies(P, Q) :- 309 setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)), 310 Solution = yes. 311 312:- op(950, xfy, ~>). 313 314~>(P, Q) :- 315 eventually_implies(P, Q). 316 317ld_dispatch(S, '$tipc_request'(wru(Name)), From) :- 318 !, tipc_get_name(S, Name), 319 term_to_atom(wru(Name), Atom), 320 tipc_send(S, Atom, From, []). 321 322ld_dispatch(S, '$tipc_request'(Term), From) :- 323 !, forall(broadcast_request(Term), 324 ( term_to_atom(Term, Atom), 325 tipc_send(S, Atom, From, []))). 326 327ld_dispatch(_S, Term, _From) :- 328 safely(broadcast(Term)). 329 330tipc_listener_daemon(Parent) :- 331 tipc_socket(S, rdm) ~> tipc_close_socket(S), 332 333% tipc_setopt(S, importance(medium)), 334 tipc_setopt(S, dest_droppable(true)), % discard if not deliverable 335 336 forall(tipc_broadcast_service(Scope, Address), 337 tipc_bind(S, Address, scope(Scope))), 338 339 listen(tipc_broadcast, Head, broadcast_listener(Head)) 340 ~> unlisten(tipc_broadcast), 341 342 thread_send_message(Parent, tipc_listener_daemon_ready), 343 344 repeat, 345 safely(dispatch_traffic(S)). 346 347dispatch_traffic(S) :- 348 tipc_receive(S, Data, From, [as(atom)]), 349 term_to_atom(Term, Data), 350 ld_dispatch(S, Term, From), 351 !, 352 dispatch_traffic(S). 353 354start_tipc_listener_daemon :- 355 catch(thread_property(tipc_listener_daemon, status(running)),_, fail), 356 !. 357 358start_tipc_listener_daemon :- 359 thread_self(Self), 360 thread_create(tipc_listener_daemon(Self), _, 361 [alias(tipc_listener_daemon), detached(true)]), 362 call_with_time_limit(6.0, 363 thread_get_message(tipc_listener_daemon_ready)). 364 365:- multifile tipc:host_to_address/2. 366% 367broadcast_listener(tipc_host_to_address(Host, Addr)) :- 368 tipc:host_to_address(Host, Addr). 369 370broadcast_listener(tipc_broadcast_service(Class, Addr)) :- 371 tipc_broadcast_service(Class, Addr). 372 373broadcast_listener(tipc_node(X)) :- 374 tipc_broadcast(X, node, 0.250). 375 376broadcast_listener(tipc_cluster(X)) :- 377 tipc_broadcast(X, cluster, 0.250). 378 379broadcast_listener(tipc_zone(X)) :- 380 tipc_broadcast(X, zone, 0.250). 381 382broadcast_listener(tipc_node(X, Timeout)) :- 383 tipc_broadcast(X, node, Timeout). 384 385broadcast_listener(tipc_cluster(X, Timeout)) :- 386 tipc_broadcast(X, cluster, Timeout). 387 388broadcast_listener(tipc_zone(X, Timeout)) :- 389 tipc_broadcast(X, zone, Timeout). 390 391% 392% 393 394tipc_basic_broadcast(S, Term, Address) :- 395 tipc_socket(S, rdm) ~> tipc_close_socket(S), 396% tipc_setopt(S, importance(medium)), 397 term_to_atom(Term, Atom), 398 safely(tipc_send(S, Atom, Address, [])). 399 400% directed broadcast to a single listener 401tipc_broadcast(Term:To, _Scope, _Timeout) :- 402 ground(Term), ground(To), 403 !, 404 tipc_basic_broadcast(_S, Term, To), 405 !. 406 407% broadcast to all listeners 408tipc_broadcast(Term, Scope, _Timeout) :- 409 ground(Term), 410 !, 411 tipc_broadcast_service(Scope, Address), 412 tipc_basic_broadcast(_S, Term, Address), 413 !. 414 415% directed broadcast_request to a single listener 416tipc_broadcast(Term:Address, _Scope, Timeout) :- 417 ground(Address), 418 !, 419 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 420 tipc_br_collect_replies(S, Timeout, Term:Address). 421 422% broadcast_request to all listeners returning responder port-id 423tipc_broadcast(Term:From, Scope, Timeout) :- 424 !, tipc_broadcast_service(Scope, Address), 425 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 426 tipc_br_collect_replies(S, Timeout, Term:From). 427 428% broadcast_request to all listeners ignoring responder port-id 429tipc_broadcast(Term, Scope, Timeout) :- 430 tipc_broadcast(Term:_, Scope, Timeout). 431 432tipc_br_send_timeout(Port) :- 433 tipc_socket(S, rdm) ~> tipc_close_socket(S), 434 435 tipc_setopt(S, importance(critical)), 436 tipc_send(S, '$tipc_br_timeout', Port, []), 437 !. 438 439tipc_br_collect_replies(S, Timeout, Term:From) :- 440 tipc_get_name(S, Port), 441 alarm(Timeout, tipc_br_send_timeout(Port), Id) 442 ~> remove_alarm(Id), 443 tipc_setopt(S, dispatch(false)), 444 repeat, 445 tipc_receive(S, Atom, From1, [as(atom)]), 446 ( (Atom \== '$tipc_br_timeout') 447 -> (From1 = From, safely(term_to_atom(Term, Atom))) 448 ; (!, fail)).
host_to_address(+Service, +Address)
,
somewhere in its source. This predicate can also be used to perform
reverse searches. That is it will also resolve an Address to a
Service name. The search is zone-wide. Locating a service however,
does not imply that the service is actually reachable from any
particular node within the zone.
463tipc_host_to_address(Host, Address) :-
464 broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))).
469:- multifile tipc:tipc_stack_initialize/0. 470 471 472% tipc_stack_initialize() is det. causes any required runtime 473% initialization to occur. This called as a side-effect of 474% tipc_initialize/0, which is now required to be included in an 475% applications intialization directive. 476% 477tipctipc_stack_initialize :- 478 start_tipc_listener_daemon
A TIPC Broadcast Bridge
SWI-Prolog's broadcast library provides a means that may be used to facilitate publish and subscribe communication regimes between anonymous members of a community of interest. The members of the community are however, necessarily limited to a single instance of Prolog. The TIPC broadcast library removes that restriction. With this library loaded, any member of a TIPC network that also has this library loaded may hear and respond to your broadcasts. Using TIPC Broadcast, it becomes a nearly trivial matter to build an instance of supercomputer that researchers within the High Performance Computer community refer to as "Beowulf Class Cluster Computers."
This module has no public predicates. When this module is initialized, it does three things:
A broadcast/1 or broadcast_request/1 that is not directed to one of the six listeners above, behaves as usual and is confined to the instance of Prolog that originated it. But when so directed, the broadcast will be sent to all participating systems, including itself, by way of TIPC's multicast addressing facility. A TIPC broadcast or broadcast request takes the typical form:
broadcast(tipc_node(+Term, +Timeout))
. The principal functorstipc_node
,tipc_cluster
, andtipc_zone
, specify the scope of the broadcast. The functortipc_node
, specifies that the broadcast is to be confined to members of a present TIPC node. Likewise,tipc_cluster
andtipc_zone
, specify that the traffic should be confined to members of a present TIPC cluster and zone, respectively. To prevent the potential for feedback loops, the scope qualifier is stripped from the message before transmission. The timeout is optional. It specifies the amount to time to wait for replies to arrive in response to a broadcast_request. The default period is 0.250 seconds. The timeout is ignored for broadcasts.An example of three separate processes cooperating on the same Node:
It is also possible to carry on a private dialog with a single responder. To do this, you supply a compound of the form, Term:PortId, to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the port-id of the intended listener. If you supply an unbound variable, PortId, to broadcast_request, it will be unified with the address of the listener that responds to Term. You may send a directed broadcast to a specific member by simply providing this address in a similarly structured compound to a TIPC scoped broadcast/1. The message is sent via unicast to that member only by way of the member's broadcast listener. It is received by the listener just as any other broadcast would be. The listener does not know the difference.
Although this capability is needed under some circumstances, it has a tendency to compromise the resilience of the broadcast model. You should not rely on it too heavily, or fault tolerance will suffer.
For example, in order to discover who responded with a particular value:
Caveats
While the implementation is mostly transparent, there are some important and subtle differences that must be taken into consideration:
tipc.pl