1/* Part of SWI-Prolog 2 3 Author: Jeffrey Rosenwald 4 E-mail: jeffrose@acm.org 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2010-2013, Jeffrey Rosenwald 7 All rights reserved. 8 9 Redistribution and use in source and binary forms, with or without 10 modification, are permitted provided that the following conditions 11 are met: 12 13 1. Redistributions of source code must retain the above copyright 14 notice, this list of conditions and the following disclaimer. 15 16 2. Redistributions in binary form must reproduce the above copyright 17 notice, this list of conditions and the following disclaimer in 18 the documentation and/or other materials provided with the 19 distribution. 20 21 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 29 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 30 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 31 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 32 POSSIBILITY OF SUCH DAMAGE. 33*/ 34 35:- module(tipc_linda, 36 [ 37 linda/0, % 38 linda/1, % +Term 39 linda_client/1, % +Address 40 close_client/0, % 41 linda_timeout/1, % +Number 42 linda_timeout/2, % ?Number, ?Number 43 out/1, % +Term 44 in/1, % ?Term 45 in_noblock/1, % ?Term 46 in/2, % +List, ?Term 47 rd/1, % ?Term 48 rd_noblock/1, % ?Term 49 rd/2, % +List, ?Term 50 bagof_rd_noblock/3, % +Template, ?Term, -Bag 51 bagof_in_noblock/3, % +Template, ?Term, -Bag 52 linda_eval/1, % :Head 53 linda_eval/2, % ?Head, :Body 54 linda_eval_detached/1, % :Head 55 linda_eval_detached/2, % ?Head, :Body 56 tuple/1, % :Goal 57 tuple/2, % ?Head, :Body 58 tipc_linda_server/0, % 59 tipc_initialize/0 60 ]). 61:- use_module(library(tipc/tipc_broadcast),[tipc_initialize/0]). 62:- autoload(library(broadcast), 63 [listen/3,broadcast_request/1,broadcast/1,unlisten/3]). 64:- use_module(library(debug),[assertion/1]). 65:- autoload(library(error),[must_be/2]). 66:- autoload(library(lists),[member/2]).
248:- meta_predicate eventually_implies( , ), ~>( , ), safely( ). 249 250safely(Goal) :- 251 catch(Goal, Err, (print_message(error, Err), fail)). 252 253eventually_implies(P, Q) :- 254 setup_call_cleanup(P, (Foo = true; Foo = false), assertion(Q)), 255 Foo == true. 256 257:- op(950, xfy, ~>). 258 259~>(P, Q) :- eventually_implies(P, Q). 260 261 262:- dynamic(linda_data/1). 263 264% 265% This is the backend state machine 266% 267 268linda_action(rd(listening)) :- !. 269 270linda_action(in(TupleList, Tuple)) :- 271 member(Tuple, TupleList), 272 retract(linda_data(Tuple)), 273 !. 274 275linda_action(in(Tuple)) :- 276 retract(linda_data(Tuple)), 277 !. 278 279linda_action(out(Tuple)) :- 280 assert(linda_data(Tuple)). 281 282linda_action(rd(TupleList, Tuple)) :- 283 member(Tuple, TupleList), 284 linda_data(Tuple). 285 286linda_action(rd(Tuple)) :- 287 linda_data(Tuple). 288 289linda_action(bagof_rd_noblock(Template, Var^Tuple, Bag)) :- 290 !, bagof(Template, Var^linda_data(Tuple), Bag). 291 292linda_action(bagof_rd_noblock(Template, Tuple, Bag)) :- 293 !, bagof(Template, linda_data(Tuple), Bag). 294 295linda_action(bagof_in_noblock(Template, Var^Tuple, Bag)) :- 296 Datum = linda_data(Tuple), 297 !, bagof(Template, Var^(Datum, retract(Datum)), Bag). 298 299linda_action(bagof_in_noblock(Template, Tuple, Bag)) :- 300 !, bagof(Template, retract(linda_data(Tuple)), Bag). 301 302% 303% This is the user interface 304%
port_id('<1.1.1:3200515722>')
). This
predicates looks to see if a server is already listening on the
cluster. If so, it reports the address of the existing server.
Otherwise, it registers a new server and reports its address.
?- linda. TIPC Linda server now listening at: port_id('<1.1.1:3200515722>') true. ?- linda. TIPC Linda server still listening at: port_id('<1.1.1:3200515722>') true.
The following will call my_init/0 in the current module after the server is successfully started or is found already listening. my_init/0 could start client-processes, initialize the tuple space, etc.
?- linda(my_init).
335linda_listening(Addr) :- 336 basic_request(rd(listening), Addr), 337 !. 338 339linda :- 340 linda_listening(Addr), 341 !, 342 format('TIPC Linda server still listening at: ~p~n', [Addr]). 343 344linda :- 345 listen(tipc_linda, '$linda'(Action), linda_action(Action)), 346 linda_listening(Addr), 347 !, 348 format('TIPC Linda server now listening at: ~p~n', [Addr]). 349 350:- meta_predicate linda( ). 351 352linda(Hook) :- 353 linda, 354 call(Hook).
global
, is supported. A client may interact with
any server reachable on the TIPC cluster. This predicate will fail
if no server is reachable for that tuple space.
366linda_client(global) :-
367 linda_listening(Addr),
368 !,
369 format('TIPC Linda server listening at: ~p~n', [Addr]).
376close_client :- true. % Presently a noop
in
and rd
requests. Replies arriving outside of this window are silently
ignored. OldTime is unified with the old timeout and then timeout is
set to NewTime. NewTime is of the form Seconds:Milliseconds. A
non-negative real number, seconds, is also recognized. The default is
0.250 seconds. This timeout is thread local and is not inherited
from its parent. New threads are initialized to the default.
Note: The synchronous behavior afforded by in/1 and rd/1 is implemented by periodically polling the server. The poll rate is set according to this timeout. Setting the timeout too small may result in substantial network traffic that is of little value.
399:- thread_local linda_time_out/1. 400 401linda_timeout(Time, Time) :- 402 linda_time_out(Time), 403 !. 404 405linda_timeout(_OldTime, NewTime) :- 406 NewTime == off, 407 throw(error(feature_not_supported)). 408 409linda_timeout(OldTime, NewTime) :- 410 ground(NewTime), 411 NewTime = Seconds:Milliseconds, 412 NewTime1 is float(Seconds + (Milliseconds / 1000.0)), 413 linda_timeout(OldTime, NewTime1), 414 !. 415 416linda_timeout(OldTime, NewTime) :- 417 ground(NewTime), 418 NewTime >= 0.020, 419 clause(linda_time_out(OldTime), true, Ref), 420 asserta(linda_time_out(NewTime)) -> erase(Ref), 421 !. 422 423linda_timeout(0.250, NewTime) :- 424 NewTime >= 0.020, 425 asserta(linda_time_out(NewTime)).
436linda_timeout(NewTime) :- 437 linda_timeout(OldTime, NewTime) ~> 438 linda_timeout(NewTime, OldTime). 439 440basic_request(Action) :- 441 basic_request(Action, _Addr). 442 443basic_request(Action, Addr) :- 444 linda_timeout(Time, Time), 445 broadcast_request(tipc_cluster('$linda'(Action):Addr, Time)).
452out(Tuple) :-
453 broadcast(tipc_cluster('$linda'(out(Tuple)))),
454 !.
463in(Tuple) :-
464 repeat,
465 in_noblock(Tuple),
466 !.
474in_noblock(Tuple) :-
475 basic_request(in(Tuple)),
476 !.
483in(TupleList, Tuple) :-
484 must_be(list, TupleList),
485 repeat,
486 basic_request(in(TupleList, Tuple)),
487 !.
495rd(Tuple) :-
496 repeat,
497 rd_noblock(Tuple).
505rd_noblock(Tuple) :-
506 basic_request(rd(Tuple)).
513rd(TupleList, Tuple) :-
514 must_be(list, TupleList),
515 repeat,
516 basic_request(rd(TupleList, Tuple)).
?- out(x(a,3)), out(x(a,4)), out(x(b,3)), out(x(c,3)). true. ?- bagof_rd_noblock(C-N, x(C,N), L). L = [a-3,a-4,b-3,c-3] . true. ?- bagof_rd_noblock(C, N^x(C,N), L). L = [a,a,b,c] . true.
545bagof_rd_noblock(Template, Tuple, Bag) :- 546 !, basic_request(bagof_rd_noblock(Template, Tuple, Bag)). 547 548bagof_in_noblock(Template, Tuple, Bag) :- 549 !, basic_request(bagof_in_noblock(Template, Tuple, Bag)). 550 551:- meta_predicate 552 linda_eval( , ), 553 linda_eval( ), 554 linda_eval_detached( , ), 555 linda_eval_detached( ).
Joining Threads: Threads created using linda_eval/(1-2) are not allowed to linger. They are joined (blocking the parent, if necessary) under three conditions: backtracking on failure into an linda_eval/(1-2), receipt of an uncaught exception, and cut of choice-points. Goals are evaluated using forall/2. They are expected to provide nondeterministic behavior. That is they may succeed zero or more times on backtracking. They must however, eventually fail or succeed deterministically. Otherwise, the thread will hang, which will eventually hang the parent thread. Cutting choice points in the parent's body has the effect of joining all children created by the parent. This provides a barrier that guarantees that all child instances of Goal have run to completion before the parent proceeds. Detached threads behave as above, except that they operate independently and cannot be joined. They will continue to run while the host process continues to run.
Here is an example of a parallel quicksort:
qksort([], []). qksort([X | List], Sorted) :- partition(@>(X), List, Less, More), linda_eval(qksort(More, SortedMore)), qksort(Less, SortedLess), !, in_noblock(qksort(More, SortedMore)), append(SortedLess, [X | SortedMore], Sorted).
602linda_eval(Head) :- 603 linda_eval(Head, Head). 604 605linda_eval(Head, Body) :- 606 must_be(callable, Body), 607 strip_module(Head, _Module, Plain), 608 thread_create(forall(Body, out(Plain)), Id, []) ~> 609 thread_join(Id, true). 610 611linda_eval_detached(Head) :- 612 linda_eval_detached(Head, Head). 613 614linda_eval_detached(Head, Body) :- 615 must_be(callable, Body), 616 strip_module(Head, _Module, Plain), 617 thread_create(forall(Body, out(Plain)), _Id, [detached(true)]).
Note: A virtual tuple is an extension of the server. Even though
it is operating in the client's Prolog environment, it is restricted
in the server operations that it may perform. It is generally safe
for tuple predicates to perform out/1 operations, but it is unsafe
for them to perform any variant of in
or rd
, either directly or
indirectly. This restriction is however, relaxed if the server and
client are operating in separate heavyweight processes (not threads)
on the node or cluster. This is most easily achieved by starting a
stand-alone Linda server somewhere on the cluster. See
tipc_linda_server/0, below.
644:- meta_predicate tuple( , ), tuple( ). 645 646tuple(Head) :- 647 tuple(Head, Head). 648 649tuple(Head, Body) :- 650 must_be(callable, Body), 651 strip_module(Head, _Module, Plain), 652 listen(user, '$linda'(rd(Plain)), Body) ~> 653 unlisten(user, '$linda'(rd(Plain)), Body).
out(server_quit)
, the server's Prolog
process will exit via halt/1. It is intended for use in scripting as
follows:
swipl -q -g 'use_module(library(tipc/tipc_linda)), tipc_linda_server' -t 'halt(1)'
See also manual section 2.10.2.1 Using PrologScript.
Note: Prolog will return a non-zero exit status if this predicate is executed on a cluster that already has an active server. An exit status of zero is returned on graceful shutdown.
679wait_for_quit :- 680 linda_timeout(6.0), 681 in(server_quit), 682 halt(0). 683 684tipc_linda_server :- 685% detach_IO, % become a daemon 686 tipc_initialize, 687 ( linda_client(global) -> true; linda(wait_for_quit)).
A Process Communication Interface
Linda is a framework for building systems that are composed of programs that cooperate among themselves in order to realize a larger goal. A Linda application is composed of two or more processes acting in concert. One process acts as a server and the others act as clients. Fine-grained communications between client and server is provided by way of message passing over sockets and support networks, TIPC sockets in this case. Clients interact indirectly by way of the server. The server is in principle an eraseable blackboard that clients can use to write (out/1), read (rd/1) and remove (in/1) messages called tuples. Some predicates will fail if a requested tuple is not present on the blackboard. Others will block until a tuple instance becomes available. Tuple instances are made available to clients by writing them on the blackboard using out/1.
In TIPC Linda, there is a subtle difference between the
in
and therd
predicates that is worth noting. Thein
predicates succeed exactly once for each tuple placed in the tuple space. The tuple is provided to exactly one requesting client. Clients can contend for tuples in this way, thus enabling multi-server operations. Therd
predicates succeed nondeterministically, providing all matching tuples in the tuple space at a given time to the requesting client as a choice point without disturbing them.TIPC Linda is inspired by and adapted from the SICStus Prolog API. But unlike SICStus TCP Linda, TIPC Linda is connectionless. There is no specific session between client and server. The server receives and responds to datagrams originated by clients in an epiperiodic manner.
Example: A simple producer-consumer.
In client 1:
In client 2:
Example: Synchronization
Example: A critical region
Example: Reading global data
or, without blocking:
Example: Waiting for any one of several events
Example: Producers and Consumers in the same process using
linda_eval
threads and/ortuple
predicatesServers
The server is the process running the "blackboard process". It is part of TIPC Linda. It is a collection of predicates that are registered as tipc_broadcast listeners. The server process can be run on a separate machine if necessary.
To load the package, enter the query:
Clients
The clients are one or more Prolog processes that have
connection(s)
to the server.To load the package, enter the query: