1:- module(chan, [new/2, send/2, recv/2, recvd/2, close/1]).    2
    3:- use_module(library(error),[]).    4
    6:- multifile error:has_type/2.    7error:has_type(chan,Chan) :-
    8    once( error:has_type(tx_chan,Chan)
    9        ; error:has_type(rx_chan,Chan)
   10        ).
   11error:has_type(tx_chan,tx_chan(Q,Status)) :-
   12    is_message_queue(Q),
   13    ground(Status),
   14    memberchk(Status,[open,closed]).
   15error:has_type(rx_chan,rx_chan(Q)) :-
   16    is_message_queue(Q).
   23new(tx_chan(Q,open),rx_chan(Q)) :-
   24    message_queue_create(Q,[max_size(1)]).
   31send(tx_chan(Q,Status),Term) :-
   32    ( Status=open ->
   33        thread_send_message(Q,msg(Term),[])
   34    ; Status=closed ->
   35        throw(not_allowed_on_channel(send,tx_closed))
   36    ).
   37send(rx_chan(_)) :-
   38    throw(not_allowed_on_channel(send,rx)).
   46recv(rx_chan(Q),Term) :-
   47    catch(thread_get_message(Q,MaybeTerm),_,fail),
   48    ( MaybeTerm=close ->
   49        message_queue_destroy(Q),
   50        fail
   51    ; MaybeTerm=msg(Term) ->
   52        true
   53    ; otherwise ->
   54        throw(unexpected_channel_term(MaybeTerm))
   55    ).
   56recv(tx_chan(_,_)) :-
   57    throw(not_allowed_on_channel(recv,tx)).
   64recvd(Rx,Term) :-
   65    ( recv(Rx,Term) -> true; !, fail ).
   66recvd(Rx,Term) :-
   67    recvd(Rx,Term).
   74:- redefine_system_predicate(chan:close(_)).   75close(Tx) :-
   76    Tx=tx_chan(Q,Status),     77    !,
   78    ( Status=open ->
   79        thread_send_message(Q,close),
   80        nb_setarg(2,Tx,closed)
   81    ; Status=closed ->
   82        throw(not_allowed_on_channel(close,tx_closed))
   83    ; otherwise ->
   84        throw(unexpected_channel_status(Status))
   85    ).
   86close(rx_chan(_)) :-
   87    throw(not_allowed_on_channel(close,rx)).
   88
   89
  106
  107
  109is_message_queue(Q) :-
  110    ground(Q),
  111    catch(message_queue_property(Q,size(_)),_,fail)