1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2010-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_linda, 36 [ 37 linda/0, % 38 linda/1, % +Term 39 linda_client/1, % +Address 40 close_client/0, % 41 linda_timeout/1, % +Number 42 linda_timeout/2, % ?Number, ?Number 43 out/1, % +Term 44 in/1, % ?Term 45 in_noblock/1, % ?Term 46 in/2, % +List, ?Term 47 rd/1, % ?Term 48 rd_noblock/1, % ?Term 49 rd/2, % +List, ?Term 50 bagof_rd_noblock/3, % +Template, ?Term, -Bag 51 bagof_in_noblock/3, % +Template, ?Term, -Bag 52 linda_eval/1, % :Head 53 linda_eval/2, % ?Head, :Body 54 linda_eval_detached/1, % :Head 55 linda_eval_detached/2, % ?Head, :Body 56 tuple/1, % :Goal 57 tuple/2, % ?Head, :Body 58 tipc_linda_server/0, % 59 tipc_initialize/0 60 ]). 61:- use_module(library(tipc/tipc_broadcast),[tipc_initialize/0]). 62:- autoload(library(broadcast), 63 [listen/3,broadcast_request/1,broadcast/1,unlisten/3]). 64:- use_module(library(debug),[assertion/1]). 65:- autoload(library(error),[must_be/2]). 66:- autoload(library(lists),[member/2]). 67 68/** <module> A Process Communication Interface 69 70Linda is a framework for building systems that are composed of programs 71that cooperate among themselves in order to realize a larger goal. A 72Linda application is composed of two or more processes acting in 73concert. One process acts as a server and the others act as clients. 74Fine-grained communications between client and server is provided by way 75of message passing over sockets and support networks, TIPC sockets in 76this case. Clients interact indirectly by way of the server. The server 77is in principle an eraseable blackboard that clients can use to write 78(out/1), read (rd/1) and remove (in/1) messages called _|tuples.|_ Some 79predicates will fail if a requested tuple is not present on the 80blackboard. Others will block until a tuple instance becomes available. 81Tuple instances are made available to clients by writing them on the 82blackboard using out/1. 83 84In TIPC Linda, there is a subtle difference between the =in= and the 85=rd= predicates that is worth noting. The =in= predicates succeed 86exactly once for each tuple placed in the tuple space. The tuple is 87provided to exactly one requesting client. Clients can contend for 88tuples in this way, thus enabling multi-server operations. The =rd= 89predicates succeed nondeterministically, providing all matching tuples 90in the tuple space at a given time to the requesting client as a choice 91point without disturbing them. 92 93TIPC Linda is inspired by and adapted from the SICStus Prolog API. But 94unlike SICStus TCP Linda, TIPC Linda is connectionless. There is no 95specific session between client and server. The server receives and 96responds to datagrams originated by clients in an epiperiodic manner. 97 98Example: A simple producer-consumer. 99 100In client 1: 101== 102init_producer :- 103 linda_client(global), 104 producer. 105 106producer :- 107 produce(X), 108 out(p(X)), 109 producer. 110 111produce(X) :- ..... 112== 113In client 2: 114== 115init_consumer :- 116 linda_client(global), 117 consumer. 118 119consumer :- 120 in(p(A)), 121 consume(A), 122 consumer. 123 124consume(A) :- ..... 125== 126 127Example: Synchronization 128== 129 ..., 130 in(ready), %Waits here until someone does out(ready) 131 ..., 132== 133Example: A critical region 134== 135 ..., 136 in(region_free), % wait for region to be free 137 critical_part, 138 out(region_free), % let next one in 139 ..., 140== 141Example: Reading global data 142== 143 ..., 144 rd(data(Data)), 145 ..., 146== 147or, without blocking: 148== 149 ..., 150 (rd_noblock(data(Data)) -> 151 do_something(Data) 152 ; write('Data not available!'),nl 153 ), 154 ..., 155== 156 157Example: Waiting for any one of several events 158== 159 ..., 160 in([e(1),e(2),...,e(n)], E), 161% Here is E instantiated to the first tuple that became available 162 ..., 163== 164 165Example: Producers and Consumers in the same process using =linda_eval= 166threads and/or =tuple= predicates 167 168== 169 consumer1 :- 170 repeat, 171 in([p(_), quit], Y), 172 ( Y = p(Z) -> writeln(consuming(Z)); !), 173 fail. 174 175 producer1 :- 176 forall(between(1,40, X), out(p(X))). 177 178 producer_consumer1 :- 179 linda_eval(consumer1), 180 call_cleanup(producer1, out(quit)), !. 181% 182% 183 consumer2 :- 184 between(1,4,_), 185 in_noblock(p(X)), !, 186 writeln(consuming(X)), 187 consumer2. 188 189 producer2 :- 190 linda_eval(p(X), between(1,40, X)). 191 192 producer_consumer2 :- 193 producer2, 194 linda_eval(consumer2), !. 195% 196% 197 consumer3 :- 198 forall(rd_noblock(p(X)), writeln(consuming(X))). 199 200 producer3 :- 201 tuple(p(X), between(1,40, X)). 202 203 producer_consumer3 :- 204 producer3, 205 linda_eval(done, consumer3), 206 in(done), !. 207== 208 209## Servers {#tipc-linda-servers} 210 211 The server is the process running the "blackboard process". It is 212 part of TIPC Linda. It is a collection of predicates that are 213 registered as tipc_broadcast listeners. The server process can be run 214 on a separate machine if necessary. 215 216 To load the package, enter the query: 217 == 218 ?- use_module(library(tipc/tipc_linda)). 219 220 ?- linda. 221 TIPC Linda server now listening at: port_id('<1.1.1:3200515722>') 222 true. 223 == 224 225## Clients {#tipc-linda-clients} 226 227 The clients are one or more Prolog processes that have connection(s) 228 to the server. 229 230 To load the package, enter the query: 231 == 232 ?- use_module(library(tipc/tipc_linda)). 233 234 ?- linda_client(global). 235 TIPC Linda server listening at: port_id('<1.1.1:3200515722>') 236 true. 237 == 238 239@see Nicholas Carriero and David Gelernter. _|How to Write Parallel 240Programs: A First Course.|_ The MIT Press, Cambridge, MA, 1990. 241 242@author Jeffrey A. Rosenwald 243 244@compat SWI-Prolog for Linux only 245@compat tipc_broadcast library 246*/ 247 248:- meta_predicate eventually_implies( , ), ~>( , ), safely( ). 249 250safely(Goal) :- 251 catch(Goal, Err, (print_message(error, Err), fail)). 252 253eventually_implies(P, Q) :- 254 setup_call_cleanup(P, (Foo = true; Foo = false), assertion(Q)), 255 Foo == true. 256 257:- op(950, xfy, ~>). 258 259~>(P, Q) :- eventually_implies(P, Q). 260 261 262:- dynamic(linda_data/1). 263 264% 265% This is the backend state machine 266% 267 268linda_action(rd(listening)) :- !. 269 270linda_action(in(TupleList, Tuple)) :- 271 member(Tuple, TupleList), 272 retract(linda_data(Tuple)), 273 !. 274 275linda_action(in(Tuple)) :- 276 retract(linda_data(Tuple)), 277 !. 278 279linda_action(out(Tuple)) :- 280 assert(linda_data(Tuple)). 281 282linda_action(rd(TupleList, Tuple)) :- 283 member(Tuple, TupleList), 284 linda_data(Tuple). 285 286linda_action(rd(Tuple)) :- 287 linda_data(Tuple). 288 289linda_action(bagof_rd_noblock(Template, Var^Tuple, Bag)) :- 290 !, bagof(Template, Var^linda_data(Tuple), Bag). 291 292linda_action(bagof_rd_noblock(Template, Tuple, Bag)) :- 293 !, bagof(Template, linda_data(Tuple), Bag). 294 295linda_action(bagof_in_noblock(Template, Var^Tuple, Bag)) :- 296 Datum = linda_data(Tuple), 297 !, bagof(Template, Var^(Datum, retract(Datum)), Bag). 298 299linda_action(bagof_in_noblock(Template, Tuple, Bag)) :- 300 !, bagof(Template, retract(linda_data(Tuple)), Bag). 301 302% 303% This is the user interface 304% 305 306%! linda is det. 307%! linda(:Goal) is det. 308% Starts a Linda-server in this process. The 309% network address is written to current output stream as a TIPC 310% port_id/2 reference (e.g. port_id('<1.1.1:3200515722>') ). This 311% predicates looks to see if a server is already listening on the 312% cluster. If so, it reports the address of the existing server. 313% Otherwise, it registers a new server and reports its address. 314% 315% == 316% ?- linda. 317% TIPC Linda server now listening at: port_id('<1.1.1:3200515722>') 318% true. 319% 320% ?- linda. 321% TIPC Linda server still listening at: port_id('<1.1.1:3200515722>') 322% true. 323% == 324% 325% The following will call my_init/0 in the current module after the 326% server is successfully started or is found already listening. 327% my_init/0 could start client-processes, initialize the tuple space, 328% etc. 329% 330% == 331% ?- linda(my_init). 332% == 333% 334 335linda_listening(Addr) :- 336 basic_request(rd(listening), Addr), 337 !. 338 339linda :- 340 linda_listening(Addr), 341 !, 342 format('TIPC Linda server still listening at: ~p~n', [Addr]). 343 344linda :- 345 listen(tipc_linda, '$linda'(Action), linda_action(Action)), 346 linda_listening(Addr), 347 !, 348 format('TIPC Linda server now listening at: ~p~n', [Addr]). 349 350:- meta_predicate linda( ). 351 352linda(Hook) :- 353 linda, 354 call(Hook). 355 356%! linda_client(+Domain) is semidet. 357% 358% Establishes a connection to a Linda-server providing a named tuple 359% space. Domain is an atom specifying a particular tuple-space, 360% selected from a universe of tuple-spaces. At present however, only 361% one tuple-space, =global=, is supported. A client may interact with 362% any server reachable on the TIPC cluster. This predicate will fail 363% if no server is reachable for that tuple space. 364% 365 366linda_client(global) :- 367 linda_listening(Addr), 368 !, 369 format('TIPC Linda server listening at: ~p~n', [Addr]). 370 371%! close_client is det. 372% 373% Closes the connection to the Linda-server. Causes the server to 374% release resources associated with this client. 375 376close_client :- true. % Presently a noop 377 378%! linda_timeout(?OldTime, ?NewTime) is semidet. 379% 380% Controls Linda's message-passing timeout. It specifies the time window 381% where clients will accept server replies in response to =in= and =rd= 382% requests. Replies arriving outside of this window are silently 383% ignored. OldTime is unified with the old timeout and then timeout is 384% set to NewTime. NewTime is of the form Seconds:Milliseconds. A 385% non-negative real number, seconds, is also recognized. The default is 386% 0.250 seconds. This timeout is thread local and is _not_ inherited 387% from its parent. New threads are initialized to the default. 388% 389% *|Note:|* The synchronous behavior afforded by in/1 and rd/1 390% is implemented by periodically polling the server. The poll rate is 391% set according to this timeout. Setting the timeout too small may 392% result in substantial network traffic that is of little value. 393% 394% @throws error(feature_not_supported). SICStus Linda can 395% disable the timeout by specifying =off= as NewTime. This feature does 396% not exist for safety reasons. 397% 398 399:- thread_local linda_time_out/1. 400 401linda_timeout(Time, Time) :- 402 linda_time_out(Time), 403 !. 404 405linda_timeout(_OldTime, NewTime) :- 406 NewTime == off, 407 throw(error(feature_not_supported)). 408 409linda_timeout(OldTime, NewTime) :- 410 ground(NewTime), 411 NewTime = Seconds:Milliseconds, 412 NewTime1 is float(Seconds + (Milliseconds / 1000.0)), 413 linda_timeout(OldTime, NewTime1), 414 !. 415 416linda_timeout(OldTime, NewTime) :- 417 ground(NewTime), 418 NewTime >= 0.020, 419 clause(linda_time_out(OldTime), true, Ref), 420 asserta(linda_time_out(NewTime)) -> erase(Ref), 421 !. 422 423linda_timeout(0.250, NewTime) :- 424 NewTime >= 0.020, 425 asserta(linda_time_out(NewTime)). 426 427 428%! linda_timeout(+NewTime) is semidet. 429% 430% Temporarily sets Linda's timeout. Internally, the original timeout is 431% saved and then the timeout is set to NewTime. NewTime is as described 432% in linda_timeout/2. The original timeout is restored automatically on 433% cut of choice points, failure on backtracking, or uncaught exception. 434% 435 436linda_timeout(NewTime) :- 437 linda_timeout(OldTime, NewTime) ~> 438 linda_timeout(NewTime, OldTime). 439 440basic_request(Action) :- 441 basic_request(Action, _Addr). 442 443basic_request(Action, Addr) :- 444 linda_timeout(Time, Time), 445 broadcast_request(tipc_cluster('$linda'(Action):Addr, Time)). 446 447%! out(+Tuple) is det. 448% 449% Places a Tuple in Linda's tuple-space. 450% 451 452out(Tuple) :- 453 broadcast(tipc_cluster('$linda'(out(Tuple)))), 454 !. 455 456%! in(?Tuple) is det. 457% 458% Atomically removes the tuple Tuple from Linda's tuple-space if it 459% is there. The tuple will be returned to exactly one requestor. If 460% no tuple is available, the predicate blocks until it is available 461% (that is, someone performs an out/1). 462 463in(Tuple) :- 464 repeat, 465 in_noblock(Tuple), 466 !. 467 468%! in_noblock(?Tuple) is semidet. 469% 470% Atomically removes the tuple Tuple from Linda's tuple-space if it 471% is there. If not, the predicate fails. This predicate can fail due 472% to a timeout. 473 474in_noblock(Tuple) :- 475 basic_request(in(Tuple)), 476 !. 477 478%! in(+TupleList, -Tuple) is det. 479% 480% As in/1 but succeeds when any one of the tuples in TupleList is 481% available. Tuple is unified with the fetched tuple. 482 483in(TupleList, Tuple) :- 484 must_be(list, TupleList), 485 repeat, 486 basic_request(in(TupleList, Tuple)), 487 !. 488 489%! rd(?Tuple) is nondet. 490% 491% Succeeds nondeterministically if Tuple is available in the 492% tuple-space, suspends otherwise until it is available. Compare this 493% with in/1: the tuple is not removed. 494 495rd(Tuple) :- 496 repeat, 497 rd_noblock(Tuple). 498 499%! rd_noblock(?Tuple) is nondet. 500% 501% Succeeds nondeterministically if Tuple is available in the 502% tuple-space, fails otherwise. This predicate can fail due to a 503% timeout. 504 505rd_noblock(Tuple) :- 506 basic_request(rd(Tuple)). 507 508%! rd(?TupleList, -Tuple) is nondet. 509% 510% As in/2 but provides a choice point that does not remove any 511% tuples. 512 513rd(TupleList, Tuple) :- 514 must_be(list, TupleList), 515 repeat, 516 basic_request(rd(TupleList, Tuple)). 517 518%! bagof_in_noblock(?Template, ?Tuple, -Bag) is nondet. 519%! bagof_rd_noblock(?Template, ?Tuple, -Bag) is nondet. 520% 521% Bag is the list of all instances of Template such that Tuple exists 522% in the tuple-space. The behavior of variables in Tuple and Template 523% is as in bagof/3. The variables could be existentially quantified 524% with ^/2 as in bagof/3. The operation is performed as an atomic 525% operation. This predicate can fail due to a timeout. Example: 526% Assume that only one client is connected to the server and that the 527% tuple-space initially is empty. 528% 529% == 530% ?- out(x(a,3)), out(x(a,4)), out(x(b,3)), out(x(c,3)). 531% 532% true. 533% ?- bagof_rd_noblock(C-N, x(C,N), L). 534% 535% L = [a-3,a-4,b-3,c-3] . 536% 537% true. 538% ?- bagof_rd_noblock(C, N^x(C,N), L). 539% 540% L = [a,a,b,c] . 541% 542% true. 543% == 544 545bagof_rd_noblock(Template, Tuple, Bag) :- 546 !, basic_request(bagof_rd_noblock(Template, Tuple, Bag)). 547 548bagof_in_noblock(Template, Tuple, Bag) :- 549 !, basic_request(bagof_in_noblock(Template, Tuple, Bag)). 550 551:- meta_predicate 552 linda_eval( , ), 553 linda_eval( ), 554 linda_eval_detached( , ), 555 linda_eval_detached( ). 556 557%! linda_eval(:Goal) is det. 558%! linda_eval(?Head, :Goal) is det. 559%! linda_eval_detached(:Goal) is det. 560%! linda_eval_detached(?Head, :Goal) is det. 561% 562% Causes Goal to be evaluated in parallel with a parent predicate. The 563% child thread is a full-fledged client, possessing the same 564% capabilities as the parent. Upon successful completion of Goal, 565% unbound variables are unified and the result is sent to the Linda 566% server via out/1, where it is made available to others. linda_eval/2 567% evaluates Goal, then unifies the result with Head, providing a means 568% of customizing the resulting output structure. In linda_eval/1, Head, and 569% Goal are identical, except that the module name for Head is stripped 570% before output. If the child fails or receives an uncaught exception, 571% no such output occurs. 572% 573% *|Joining Threads:|* Threads created using linda_eval/(1-2) are not allowed 574% to linger. They are joined (blocking the parent, if necessary) under 575% three conditions: backtracking on failure into an linda_eval/(1-2), receipt 576% of an uncaught exception, and cut of choice-points. Goals are 577% evaluated using forall/2. They are expected to provide 578% nondeterministic behavior. That is they may succeed zero or more 579% times on backtracking. They must however, eventually fail or succeed 580% deterministically. Otherwise, the thread will hang, which will 581% eventually hang the parent thread. Cutting choice points in the 582% parent's body has the effect of joining all children created by the 583% parent. This provides a barrier that guarantees that all child 584% instances of Goal have run to completion before the parent proceeds. 585% Detached threads behave as above, except that they operate 586% independently and cannot be joined. They will continue to run while 587% the host process continues to run. 588% 589% Here is an example of a parallel quicksort: 590% 591% == 592% qksort([], []). 593% 594% qksort([X | List], Sorted) :- 595% partition(@>(X), List, Less, More), 596% linda_eval(qksort(More, SortedMore)), 597% qksort(Less, SortedLess), !, 598% in_noblock(qksort(More, SortedMore)), 599% append(SortedLess, [X | SortedMore], Sorted). 600% == 601% 602linda_eval(Head) :- 603 linda_eval(Head, Head). 604 605linda_eval(Head, Body) :- 606 must_be(callable, Body), 607 strip_module(Head, _Module, Plain), 608 thread_create(forall(Body, out(Plain)), Id, []) ~> 609 thread_join(Id, true). 610 611linda_eval_detached(Head) :- 612 linda_eval_detached(Head, Head). 613 614linda_eval_detached(Head, Body) :- 615 must_be(callable, Body), 616 strip_module(Head, _Module, Plain), 617 thread_create(forall(Body, out(Plain)), _Id, [detached(true)]). 618 619%! tuple(:Goal) is det. 620%! tuple(?Head, :Goal) is det. 621% 622% registers Head as a virtual tuple in TIPC Linda's tuple space. On 623% success, any client on the cluster may reference the tuple, Head, 624% using rd/1 or rd_noblock/1. On reference, Goal is executed by a 625% separate thread of execution in the host client's Prolog process. The 626% result is unified with Head, which is then returned to the guest 627% client. As in linda_eval/(1-2) above, Goal is evaluated using forall/2. The 628% virtual tuple is unregistered on backtracking into a tuple/(1-2), 629% receipt of uncaught exception, or cut of choice-points. In tuple/1, 630% Head and Goal are identical, except that the module name is stripped 631% from Head. 632% 633% *|Note:|* A virtual tuple is an extension of the server. Even though 634% it is operating in the client's Prolog environment, it is restricted 635% in the server operations that it may perform. It is generally safe 636% for tuple predicates to perform out/1 operations, but it is unsafe 637% for them to perform any variant of =in= or =rd=, either directly or 638% indirectly. This restriction is however, relaxed if the server and 639% client are operating in separate heavyweight processes (not threads) 640% on the node or cluster. This is most easily achieved by starting a 641% stand-alone Linda server somewhere on the cluster. See 642% tipc_linda_server/0, below. 643% 644:- meta_predicate tuple( , ), tuple( ). 645 646tuple(Head) :- 647 tuple(Head, Head). 648 649tuple(Head, Body) :- 650 must_be(callable, Body), 651 strip_module(Head, _Module, Plain), 652 listen(user, '$linda'(rd(Plain)), Body) ~> 653 unlisten(user, '$linda'(rd(Plain)), Body). 654 655%% tipc_linda_server is nondet. 656% 657% Acts as a stand-alone Linda server. This predicate initializes the 658% TIPC stack and then starts a Linda server in the current thread. If 659% a client performs an =|out(server_quit)|=, the server's Prolog 660% process will exit via halt/1. It is intended for use in scripting as 661% follows: 662% 663% == 664% swipl -q -g 'use_module(library(tipc/tipc_linda)), 665% tipc_linda_server' -t 'halt(1)' 666% == 667% 668% See also manual section 2.10.2.1 Using PrologScript. 669% 670% *|Note:|* Prolog will return a non-zero exit status if this 671% predicate is executed on a cluster that already has an active 672% server. An exit status of zero is returned on graceful shutdown. 673% 674% @throws error(permission_error(halt,thread,2),context(halt/1,Only 675% from thread 'main')), if this predicate is executed in a thread 676% other than =main=. 677% 678% 679wait_for_quit :- 680 linda_timeout(6.0), 681 in(server_quit), 682 halt(0). 683 684tipc_linda_server :- 685% detach_IO, % become a daemon 686 tipc_initialize, 687 ( linda_client(global) -> true; linda(wait_for_quit)). 688 689%! tipc_initialize is semidet. 690% 691% See tipc:tipc_initialize/0. 692%