1:- module(chan, [new/2, send/2, recv/2, recvd/2, close/1]).    2
    3:- use_module(library(error),[]).    4
    5% define types for library(error), etc.
    6:- multifile error:has_type/2.    7error:has_type(chan,Chan) :-
    8    once( error:has_type(tx_chan,Chan)
    9        ; error:has_type(rx_chan,Chan)
   10        ).
   11error:has_type(tx_chan,tx_chan(Q,Status)) :-
   12    is_message_queue(Q),
   13    ground(Status),
   14    memberchk(Status,[open,closed]).
   15error:has_type(rx_chan,rx_chan(Q)) :-
   16    is_message_queue(Q).
 new(-Tx:tx_chan, -Rx:rx_chan) is det
Create a new channel for transmitting (Tx) and receiving (Rx) terms. The channel has capacity to hold one term at a time.
   23new(tx_chan(Q,open),rx_chan(Q)) :-
   24    message_queue_create(Q,[max_size(1)]).
 send(+Tx:ch_chan, +Term) is det
Send Term down the Tx channel. Blocks if the channel is full. Throws an exception if called with a receive channel.
   31send(tx_chan(Q,Status),Term) :-
   32    ( Status=open ->
   33        thread_send_message(Q,msg(Term),[])
   34    ; Status=closed ->
   35        throw(not_allowed_on_channel(send,tx_closed))
   36    ).
   37send(rx_chan(_)) :-
   38    throw(not_allowed_on_channel(send,rx)).
 recv(+Rx:rx_chan, +Term) is semidet
Receive the next Term from an Rx channel. Blocks until a term arrives. Fails for a closed, receive channel. Throws an exception for a transmit channel.
   46recv(rx_chan(Q),Term) :-
   47    catch(thread_get_message(Q,MaybeTerm),_,fail),
   48    ( MaybeTerm=close ->
   49        message_queue_destroy(Q),
   50        fail
   51    ; MaybeTerm=msg(Term) ->
   52        true
   53    ; otherwise ->
   54        throw(unexpected_channel_term(MaybeTerm))
   55    ).
   56recv(tx_chan(_,_)) :-
   57    throw(not_allowed_on_channel(recv,tx)).
 recvd(+Rx:rx_chan, -Term) is multi
True if Term is a message received on Rx. Iterates terms on backtracking until Rx closes.
   64recvd(Rx,Term) :-
   65    ( recv(Rx,Term) -> true; !, fail ).
   66recvd(Rx,Term) :-
   67    recvd(Rx,Term).
 close(+Tx:tx_chan) is det
Close the Tx channel. Closing an already closed channel throws an exception. Trying to close a receive channel throws an exception.
   74:- redefine_system_predicate(chan:close(_)).   75close(Tx) :-
   76    Tx=tx_chan(Q,Status),  % for nb_setarg/3
   77    !,
   78    ( Status=open ->
   79        thread_send_message(Q,close),
   80        nb_setarg(2,Tx,closed)
   81    ; Status=closed ->
   82        throw(not_allowed_on_channel(close,tx_closed))
   83    ; otherwise ->
   84        throw(unexpected_channel_status(Status))
   85    ).
   86close(rx_chan(_)) :-
   87    throw(not_allowed_on_channel(close,rx)).
   88
   89
   90/*
   91
   92not sure this API can make sense without a blocking message_queue_peek to handle
   93empty queues.
   94
   95%% is_closed(+Chan:chan) is semidet.
   96%
   97%  True if Chan is closed.  Never blocks for a transmit channel.  May block
   98%  for an empty receive channel.
   99is_closed(tx_chan(_,closed)).
  100is_closed(rx_chan(Q)) :-
  101    catch(thread_get_message(Q,closed,[timeout(0)]),E,true),
  102    once( nonvar(E)                % queue is gone, so channel is closed
  103        ; message_queue_destroy(Q) % fetched 'closed', channel is now closed
  104        ).
  105*/
  106
  107
  108% true if Q is a message queue
  109is_message_queue(Q) :-
  110    ground(Q),
  111    catch(message_queue_property(Q,size(_)),_,fail)