1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2009-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_broadcast, 36 [ tipc_host_to_address/2, % ?Host, ?Address 37 tipc_initialize/0 38 ]). 39:- use_module(library(tipc/tipc),[tipc_initialize/0]). 40 41/** <module> A TIPC Broadcast Bridge 42 43SWI-Prolog's broadcast library provides a means that may be used to 44facilitate publish and subscribe communication regimes between anonymous 45members of a community of interest. The members of the community are 46however, necessarily limited to a single instance of Prolog. The TIPC 47broadcast library removes that restriction. With this library loaded, 48any member of a TIPC network that also has this library loaded may hear 49and respond to your broadcasts. Using TIPC Broadcast, it becomes a 50nearly trivial matter to build an instance of supercomputer that 51researchers within the High Performance Computer community refer to as 52"Beowulf Class Cluster Computers." 53 54This module has no public predicates. When this module is initialized, 55it does three things: 56 57 * It starts a listener daemon thread that listens for 58 broadcasts from others, received as TIPC datagrams, and 59 60 * It registers three listeners: tipc_node/1, tipc_cluster/1, and 61 tipc_zone/1, and 62 63 * It registers three listeners: tipc_node/2, tipc_cluster/2, and 64 tipc_zone/2. 65 66A broadcast/1 or broadcast_request/1 that is not directed to one of the 67six listeners above, behaves as usual and is confined to the instance of 68Prolog that originated it. But when so directed, the broadcast will be 69sent to all participating systems, including itself, by way of TIPC's 70multicast addressing facility. A TIPC broadcast or broadcast 71request takes the typical form: =|broadcast(tipc_node(+Term, 72+Timeout))|=. The principal functors =tipc_node=, =tipc_cluster=, and 73=tipc_zone=, specify the scope of the broadcast. The functor 74=tipc_node=, specifies that the broadcast is to be confined to members 75of a present TIPC node. Likewise, =tipc_cluster= and =tipc_zone=, 76specify that the traffic should be confined to members of a present TIPC 77cluster and zone, respectively. To prevent the potential for feedback 78loops, the scope qualifier is stripped from the message before 79transmission. The timeout is optional. It specifies the amount to time 80to wait for replies to arrive in response to a broadcast_request. The 81default period is 0.250 seconds. The timeout is ignored for broadcasts. 82 83An example of three separate processes cooperating on the same Node: 84 85== 86Process A: 87 88 ?- listen(number(X), between(1, 5, X)). 89 true. 90 91 ?- 92 93Process B: 94 95 ?- listen(number(X), between(7, 9, X)). 96 true. 97 98 ?- 99 100Process C: 101 102 ?- findall(X, broadcast_request(tipc_node(number(X))), Xs). 103 Xs = [1, 2, 3, 4, 5, 7, 8, 9]. 104 105 ?- 106== 107 108It is also possible to carry on a private dialog with a single 109responder. To do this, you supply a compound of the form, Term:PortId, 110to a TIPC scoped broadcast/1 or broadcast_request/1, where PortId is the 111port-id of the intended listener. If you supply an unbound variable, 112PortId, to broadcast_request, it will be unified with the address of the 113listener that responds to Term. You may send a directed broadcast to a 114specific member by simply providing this address in a similarly 115structured compound to a TIPC scoped broadcast/1. The message is sent 116via unicast to that member only by way of the member's broadcast 117listener. It is received by the listener just as any other broadcast 118would be. The listener does not know the difference. 119 120Although this capability is needed under some circumstances, it has a 121tendency to compromise the resilience of the broadcast model. You should 122not rely on it too heavily, or fault tolerance will suffer. 123 124For example, in order to discover who responded with a particular value: 125 126== 127Process A: 128 129 ?- listen(number(X), between(1, 3, X)). 130 true. 131 132 ?- 133 134Process B: 135 136 ?- listen(number(X), between(7, 9, X)). 137 true. 138 139 ?- 140 141Process C: 142 143 ?- broadcast_request(tipc_node(number(X):From)). 144 X = 7, 145 From = port_id('<1.1.1:3971170279>') ; 146 X = 8, 147 From = port_id('<1.1.1:3971170279>') ; 148 X = 9, 149 From = port_id('<1.1.1:3971170279>') ; 150 X = 1, 151 From = port_id('<1.1.1:3971170280>') ; 152 X = 2, 153 From = port_id('<1.1.1:3971170280>') ; 154 X = 3, 155 From = port_id('<1.1.1:3971170280>') ; 156 false. 157 158?- 159 160== 161 162## Caveats {#tipc-caveats} 163 164While the implementation is mostly transparent, there are some important 165and subtle differences that must be taken into consideration: 166 167 * TIPC broadcast now requires an initialization step in order to 168 launch the broadcast listener daemon. See tipc_initialize/0. 169 170 * Prolog's broadcast_request/1 is nondet. It sends the request, 171 then evaluates the replies synchronously, backtracking as needed 172 until a satisfactory reply is received. The remaining potential 173 replies are not evaluated. This is not so when TIPC is involved. 174 175 * A TIPC broadcast/1 is completely asynchronous. 176 177 * A TIPC broadcast_request/1 is partially synchronous. A 178 broadcast_request/1 is sent, then the sender balks for a period of 179 time (default: 250 ms) while the replies are collected. Any reply 180 that is received after this period is silently discarded. An 181 optional second argument is provided so that a sender may specify 182 more (or less) time for replies. 183 184 * Replies are _|no longer|_ collected using findall/3. Replies are 185 presented to the user as a choice point on arrival, until the 186 broadcast request timer finally expires. This change allows 187 traffic to propagate through the system faster and provides the 188 requestor with the opportunity to terminate a broadcast request 189 early if desired, by simply cutting choice points. 190 191 * Please beware that broadcast request transactions will now remain 192 active and resources consumed until broadcast_request finally fails 193 on backtracking, an uncaught exception occurs, or until choice 194 points are cut. Failure to properly manage this will likely result 195 in chronic exhaustion of TIPC sockets. 196 197 * If a listener is connected to a generator that always succeeds 198 (e.g. a random number generator), then the broadcast request will 199 never terminate and trouble is bound to ensue. 200 201 * broadcast_request/1 with TIPC scope is _not_ reentrant (at 202 least, not now anyway). If a listener performs a broadcast_request/1 203 with TIPC scope recursively, then disaster looms certain. This 204 caveat does not apply to a TIPC scoped broadcast/1, which can safely 205 be performed from a listener context. 206 207 * TIPC's capacity is not infinite. While TIPC can tolerate 208 substantial bursts of activity, it is designed for short bursts of 209 small messages. It can tolerate several thousand replies in response 210 to a broadcast_request/1 without trouble, but it will begin to 211 encounter congestion beyond that. And in congested conditions, 212 things will start to become unreliable as TIPC begins prioritizing 213 and/or discarding traffic. 214 215 * A TIPC broadcast_request/1 term that is grounded is considered to 216 be a broadcast only. No replies are collected unless the there is at 217 least one unbound variable to unify. 218 219 * A TIPC broadcast/1 always succeeds, even if there are no 220 listeners. 221 222 * A TIPC broadcast_request/1 that receives no replies will fail. 223 224 * Replies may be coming from many different places in the network 225 (or none at all). No ordering of replies is implied. 226 227 * Prolog terms are sent to others after first converting them to 228 atoms using term_to_atom/2. Passing real numbers this way may 229 result in a substantial truncation of precision. See prolog flag 230 option, 'float_format', of current_prolog_flag/2. 231 232@author Jeffrey Rosenwald (JeffRose@acm.org) 233@license LGPL 234@see tipc.pl 235@compat Linux only 236*/ 237 238:- autoload(tipc, 239 [ tipc_get_name/2, 240 tipc_send/4, 241 tipc_socket/2, 242 tipc_close_socket/1, 243 tipc_setopt/2, 244 tipc_bind/3, 245 tipc_receive/4 246 ]). 247:- autoload(library(broadcast), 248 [broadcast_request/1,broadcast/1,listen/3,unlisten/1]). 249:- use_module(library(debug),[assertion/1]). 250:- autoload(library(time), 251 [call_with_time_limit/2,alarm/3,remove_alarm/1]). 252 253:- require([ thread_self/1 254 , forall/2 255 , term_to_atom/2 256 , thread_send_message/2 257 , catch/3 258 , setup_call_cleanup/3 259 , thread_create/3 260 ]). 261 262:- meta_predicate safely( ), eventually_implies( , ), ~>( , ). 263 264tipc_broadcast_service(node, name_seq(20005, 0, 0)). 265tipc_broadcast_service(cluster, name_seq(20005, 1, 1)). 266tipc_broadcast_service(zone, name_seq(20005, 2, 2)). 267 268% 269% Here's a TIPC bridge to Prolog's broadcast library 270% 271% A sender may confine a broadcast to a subset of a TIPC network by 272% specifying a scoping qualifier in his/her broadcast. The qualifier 273% has the effect of selecting the appropriate multi-cast address for 274% the transmission. Thus, the sender of the message has control over 275% the scope of his/her traffic on a per-message basis. 276% 277% All in-scope listeners receive the broadcast and simply rebroadcast 278% the message locally. All broadcast replies, if any, are sent directly 279% to the sender via the port-id that was received with the broadcast. 280% No additional multiplexing is required. 281% 282 283safely(Predicate) :- 284 catch(Predicate, Err, caught_safely(Err)). 285 286caught_safely('$aborted'). 287caught_safely(unwind(abort)). 288caught_safely(Error) :- 289 print_message(error, Error), 290 fail. 291 292%! ~>(:P, :Q) is semidet. 293%! eventually_implies(:P, :Q) is semidet. 294% asserts temporal Liveness (something good happens, eventually) and 295% Safety (nothing bad ever happens) properties. Analogous to the 296% "leads-to" operator of Owicki and Lamport, 1982. Provides a sort of 297% lazy implication described informally as: 298% 299% * Liveness: For all possible outcomes, P -> Q, eventually. 300% * Safety: For all possible outcomes, (\+P ; Q), is invariant. 301% 302% Described practically: 303% 304% P ~> Q, declares that if P is true, then Q must be true, now or at 305% some point in the future. 306% 307 308eventually_implies(P, Q) :- 309 setup_call_cleanup(P, ( Solution = yes ; Solution = no ), assertion(Q)), 310 Solution = yes. 311 312:- op(950, xfy, ~>). 313 314~>(P, Q) :- 315 eventually_implies(P, Q). 316 317ld_dispatch(S, '$tipc_request'(wru(Name)), From) :- 318 !, tipc_get_name(S, Name), 319 term_to_atom(wru(Name), Atom), 320 tipc_send(S, Atom, From, []). 321 322ld_dispatch(S, '$tipc_request'(Term), From) :- 323 !, forall(broadcast_request(Term), 324 ( term_to_atom(Term, Atom), 325 tipc_send(S, Atom, From, []))). 326 327ld_dispatch(_S, Term, _From) :- 328 safely(broadcast(Term)). 329 330tipc_listener_daemon(Parent) :- 331 tipc_socket(S, rdm) ~> tipc_close_socket(S), 332 333% tipc_setopt(S, importance(medium)), 334 tipc_setopt(S, dest_droppable(true)), % discard if not deliverable 335 336 forall(tipc_broadcast_service(Scope, Address), 337 tipc_bind(S, Address, scope(Scope))), 338 339 listen(tipc_broadcast, Head, broadcast_listener(Head)) 340 ~> unlisten(tipc_broadcast), 341 342 thread_send_message(Parent, tipc_listener_daemon_ready), 343 344 repeat, 345 safely(dispatch_traffic(S)). 346 347dispatch_traffic(S) :- 348 tipc_receive(S, Data, From, [as(atom)]), 349 term_to_atom(Term, Data), 350 ld_dispatch(S, Term, From), 351 !, 352 dispatch_traffic(S). 353 354start_tipc_listener_daemon :- 355 catch(thread_property(tipc_listener_daemon, status(running)),_, fail), 356 !. 357 358start_tipc_listener_daemon :- 359 thread_self(Self), 360 thread_create(tipc_listener_daemon(Self), _, 361 [alias(tipc_listener_daemon), detached(true)]), 362 call_with_time_limit(6.0, 363 thread_get_message(tipc_listener_daemon_ready)). 364 365:- multifile tipc:host_to_address/2. 366% 367broadcast_listener(tipc_host_to_address(Host, Addr)) :- 368 tipc:host_to_address(Host, Addr). 369 370broadcast_listener(tipc_broadcast_service(Class, Addr)) :- 371 tipc_broadcast_service(Class, Addr). 372 373broadcast_listener(tipc_node(X)) :- 374 tipc_broadcast(X, node, 0.250). 375 376broadcast_listener(tipc_cluster(X)) :- 377 tipc_broadcast(X, cluster, 0.250). 378 379broadcast_listener(tipc_zone(X)) :- 380 tipc_broadcast(X, zone, 0.250). 381 382broadcast_listener(tipc_node(X, Timeout)) :- 383 tipc_broadcast(X, node, Timeout). 384 385broadcast_listener(tipc_cluster(X, Timeout)) :- 386 tipc_broadcast(X, cluster, Timeout). 387 388broadcast_listener(tipc_zone(X, Timeout)) :- 389 tipc_broadcast(X, zone, Timeout). 390 391% 392% 393 394tipc_basic_broadcast(S, Term, Address) :- 395 tipc_socket(S, rdm) ~> tipc_close_socket(S), 396% tipc_setopt(S, importance(medium)), 397 term_to_atom(Term, Atom), 398 safely(tipc_send(S, Atom, Address, [])). 399 400% directed broadcast to a single listener 401tipc_broadcast(Term:To, _Scope, _Timeout) :- 402 ground(Term), ground(To), 403 !, 404 tipc_basic_broadcast(_S, Term, To), 405 !. 406 407% broadcast to all listeners 408tipc_broadcast(Term, Scope, _Timeout) :- 409 ground(Term), 410 !, 411 tipc_broadcast_service(Scope, Address), 412 tipc_basic_broadcast(_S, Term, Address), 413 !. 414 415% directed broadcast_request to a single listener 416tipc_broadcast(Term:Address, _Scope, Timeout) :- 417 ground(Address), 418 !, 419 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 420 tipc_br_collect_replies(S, Timeout, Term:Address). 421 422% broadcast_request to all listeners returning responder port-id 423tipc_broadcast(Term:From, Scope, Timeout) :- 424 !, tipc_broadcast_service(Scope, Address), 425 tipc_basic_broadcast(S, '$tipc_request'(Term), Address), 426 tipc_br_collect_replies(S, Timeout, Term:From). 427 428% broadcast_request to all listeners ignoring responder port-id 429tipc_broadcast(Term, Scope, Timeout) :- 430 tipc_broadcast(Term:_, Scope, Timeout). 431 432tipc_br_send_timeout(Port) :- 433 tipc_socket(S, rdm) ~> tipc_close_socket(S), 434 435 tipc_setopt(S, importance(critical)), 436 tipc_send(S, '$tipc_br_timeout', Port, []), 437 !. 438 439tipc_br_collect_replies(S, Timeout, Term:From) :- 440 tipc_get_name(S, Port), 441 alarm(Timeout, tipc_br_send_timeout(Port), Id) 442 ~> remove_alarm(Id), 443 tipc_setopt(S, dispatch(false)), 444 repeat, 445 tipc_receive(S, Atom, From1, [as(atom)]), 446 ( (Atom \== '$tipc_br_timeout') 447 -> (From1 = From, safely(term_to_atom(Term, Atom))) 448 ; (!, fail)). 449 450%! tipc_host_to_address(?Service, ?Address) is nondet. 451% 452% locates a TIPC service by name. Service is an atom or grounded term 453% representing the common name of the service. Address is a TIPC 454% address structure. A server may advertise its services by name by 455% including the fact, tipc:host_to_address(+Service, +Address), 456% somewhere in its source. This predicate can also be used to perform 457% reverse searches. That is it will also resolve an Address to a 458% Service name. The search is zone-wide. Locating a service however, 459% does not imply that the service is actually reachable from any 460% particular node within the zone. 461% 462 463tipc_host_to_address(Host, Address) :- 464 broadcast_request(tipc_zone(tipc_host_to_address(Host, Address))). 465 466%! tipc_initialize is semidet. 467% See tipc:tipc_initialize/0 468% 469:- multifile tipc:tipc_stack_initialize/0. 470 471 472% tipc_stack_initialize() is det. causes any required runtime 473% initialization to occur. This called as a side-effect of 474% tipc_initialize/0, which is now required to be included in an 475% applications intialization directive. 476% 477tipctipc_stack_initialize :- 478 start_tipc_listener_daemon