View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        jan@swi-prolog.org
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2020, SWI-Prolog Solutions b.v.
    7    All rights reserved.
    8
    9    Redistribution and use in source and binary forms, with or without
   10    modification, are permitted provided that the following conditions
   11    are met:
   12
   13    1. Redistributions of source code must retain the above copyright
   14       notice, this list of conditions and the following disclaimer.
   15
   16    2. Redistributions in binary form must reproduce the above copyright
   17       notice, this list of conditions and the following disclaimer in
   18       the documentation and/or other materials provided with the
   19       distribution.
   20
   21    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   22    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   23    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   24    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   25    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   26    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   27    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   28    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   29    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   30    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   31    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   32    POSSIBILITY OF SUCH DAMAGE.
   33*/
   34
   35:- module(redis_streams,
   36          [ xstream_set/3,              % +Redis, +Key, +Option
   37            xadd/4,                     % +Redis, +Key, ?Id, +Data:dict
   38            xlisten/3,                  % +Redis, +Streams, +Options
   39            xlisten_group/5,            % +Redis, +Group, +Consumer,
   40                                        % +Streams, +Options
   41            xconsumer_stop/1            % +Leave
   42          ]).   43:- use_module(library(redis)).   44:- use_module(library(error)).   45:- use_module(library(apply)).   46:- use_module(library(broadcast)).   47:- use_module(library(lists)).   48:- use_module(library(option)).   49:- use_module(library(debug)).   50
   51:- meta_predicate
   52    xlisten(+, +, 5, 5, +).   53
   54:- multifile
   55    xhook/2.                            % +Stream, +Event
   56
   57:- predicate_options(xlisten/3, 3,
   58                     [ count(nonneg),
   59                       start(one_of([$,0])),
   60                       starts(list)
   61                     ]).   62:- predicate_options(xlisten_group/5, 5,
   63                     [ block(number),
   64                       max_deliveries(nonneg),
   65                       max_claim(nonneg)
   66                     ]).   67
   68
   69/** <module> Using Redis streams
   70
   71A Redis stream is a set of   messages consisting of key-value pairs that
   72are identified by a  time  and   sequence  number.  Streams are powerful
   73objects that can roughly be used for three purposes:
   74
   75  - Maintain and query a log of events, i.e., a _timeline_.
   76
   77  - Provide an alternative to Redis' publish/subscribe API that ensures
   78    messages get delivered by all clients even if they are offline at
   79    the moment an event is published.
   80
   81  - Distribute messages over a group of clients.  This mode assigns
   82    messages to clients in a round-robin fashion.  Clients confirm
   83    a specific message is handled.  Living clients can inspect the
   84    stream for possibly dead clients and migrate the pending messages
   85    to other clients.
   86
   87This library abstracts the latter two scenarios. The main predicates are
   88
   89  - xadd/4 to add to a stream
   90  - xlisten/3 to read and broadcast messages from a stream
   91  - xlisten_group/5 to act as a _consumer_ in a consumer group.
   92
   93@see https://redis.io/topics/streams-intro
   94*/
   95
   96:- dynamic
   97    xstream_option/3.   98
   99%!  xstream_set(+Redis, +Key, +Option)
  100%
  101%   Set an option on for Key on Redis.  Currently supports:
  102%
  103%     - maxlen(+Count)
  104%       Make xadd/4 add a ``MAXLEN ~`` Count option to the ``XADD``
  105%       command, capping the length of the stream.  See also
  106%       [Redis as a message brokering system](#redis-brokering)
  107
  108xstream_set(Redis, KeyS, Option) :-
  109    must_be(atom, Redis),
  110    atom_string(Key, KeyS),
  111    valid_option(Option),
  112    functor(Option, Name, Arity),
  113    functor(Gen, Name, Arity),
  114    retractall(xstream_option(Redis, Key, Gen)),
  115    asserta(xstream_option(Redis, Key, Option)).
  116
  117valid_option(Option) :-
  118    stream_option(Option),
  119    !.
  120valid_option(Option) :-
  121    domain_error(redis_stream_option, Option).
  122
  123stream_option(maxlen(X)) :- must_be(integer, X).
  124
  125
  126%!  xadd(+Redis, +Key, ?Id, +Data:dict) is det.
  127%
  128%   Add a message to a the stream Key on Redis. The length of the stream
  129%   can be capped using the xstream_set/3 option maxlen(Count). If Id is
  130%   unbound, generating the id is left to   the server and Id is unified
  131%   with the returned id. The returned id  is a string consisting of the
  132%   time stamp in milliseconds and a sequence number. See Redis docs for
  133%   details.
  134
  135xadd(DB, Key, Id, Dict) :-
  136    redis_array_dict(Array, _, Dict),
  137    (   var(Id)
  138    ->  IdIn = '*'
  139    ;   IdIn = Id
  140    ),
  141    (   xstream_option(DB, Key, maxlen(MaxLen))
  142    ->  Command =.. [xadd, Key, maxlen, ~, MaxLen, IdIn|Array]
  143    ;   Command =.. [xadd, Key, IdIn|Array]
  144    ),
  145    redis(DB, Command, Reply),
  146    return_id(Id, Reply).
  147
  148return_id(Id, Reply) :-
  149    var(Id),
  150    !,
  151    Id = Time-Seq,
  152    split_string(Reply, "-", "", [TimeS,SeqS]),
  153    number_string(Time, TimeS),
  154    number_string(Seq, SeqS).
  155return_id(_, _).
  156
  157
  158		 /*******************************
  159		 *           SUBSCRIBE		*
  160		 *******************************/
  161
  162%!  xlisten(+Redis, +Streams, +Options).
  163%
  164%   Listen using ``XREAD`` on one or more   Streams on the server Redis.
  165%   For each message that arrives,  call   broadcast/1,  where Data is a
  166%   dict representing the message.
  167%
  168%       broadcast(redis(Redis, Stream, Id, Data))
  169%
  170%   Options:
  171%
  172%     - count(+Count)
  173%       Process at most Count messages per stream for each request.
  174%     - start(+Start)
  175%       Normally either `0` to start get all messages from the epoch
  176%       or `$` to get messages starting with the last.  Default is `$`.
  177%     - starts(+List)
  178%       May be used as an alternative to the start/1 option to specify
  179%       the start for each stream. This may be used to restart listening
  180%       if the application remembers the last processed id.
  181%
  182%   Note that this predicate does  __not   terminate__.  It  is normally
  183%   executed in a thread. The  following   call  listens  to the streams
  184%   `key1`   and   `key2`   on   the   default   Redis   server.   Using
  185%   reconnect(true), the client will try to re-establish a connection if
  186%   the collection got lost.
  187%
  188%
  189%   ```
  190%   ?- redis_connect(default, C, [reconnect(true)]),
  191%      thread_create(xlisten(C, [key1, key2], [start($)]),
  192%                    _, [detached(true)]).
  193%   ```
  194%
  195%   @arg Redis is either a Redis server name (see redis_server/3) or
  196%   an open connection.  If it is a server name, a new connection is
  197%   opened that is closed if xlisten/3 completes.
  198%   @see redis_subscribe/2 implements the classical   pub/sub  system of
  199%   Redis that does not have any memory.
  200
  201xlisten(Redis, Streams, Options) :-
  202    xlisten(Redis, Streams, xbroadcast, xidle, Options).
  203
  204xbroadcast(Redis, Stream, Id, Dict, _Options) :-
  205    redis_id(Redis, RedisId),
  206    catch(broadcast(redis(RedisId, Stream, Id, Dict)), Error,
  207          print_message(error, Error)).
  208
  209redis_id(redis(Id, _, _), Id) :-
  210    !.
  211redis_id(Id, Id).
  212
  213xidle(_Redis, _Streams, _Starts, _NewStarts, _Options).
  214
  215%!  xlisten(+Redis, +Streams, +OnBroadCast, +OnIdle, +Options).
  216%
  217%   Generalized version of xlisten/3 that is provided two callbacks: one
  218%   to handle a message and one after each time the underlying ``XREAD``
  219%   or ``XREADGROUP`` has returned and the messages are processed. These
  220%   callbacks are called as follows:
  221%
  222%       call(OnBroadCast, +Redis, +Stream, +MessageId, +Dict)
  223%       call(OnIdle, +Redis, +Streams, +Starts, +NewStarts, +Options)
  224%
  225%   Both callbacks __must__ succeeds  and  not   leave  any  open choice
  226%   points.  Failure or exception causes xlisten/5 to stop.
  227
  228xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  229    atom(Redis),
  230    !,
  231    setup_call_cleanup(
  232        redis_connect(Redis, Conn, [reconnect(true)|Options]),
  233        xlisten_(Conn, Streams, OnBroadcast, OnIdle, Options),
  234        redis_disconnect(Conn)).
  235xlisten(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  236    xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options).
  237
  238xlisten_(Redis, Streams, OnBroadcast, OnIdle, Options) :-
  239    xread_command(Streams, Starts0, CommandTempl, Options),
  240    listen_loop(Redis, Starts0, CommandTempl,
  241                OnBroadcast, OnIdle, Streams, Options).
  242
  243xread_command(Streams, Starts0, Starts-Command, Options) :-
  244    option(group(Group-Consumer), Options),
  245    !,
  246    xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
  247    Command =.. [xreadgroup, group, Group, Consumer | CmdArgs].
  248xread_command(Streams, Starts0, Starts-Command, Options) :-
  249    xread_command_args(Streams, Starts0, Starts, CmdArgs, Options),
  250    Command =.. [xread|CmdArgs].
  251
  252xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
  253    option(count(Count), Options),
  254    !,
  255    option(block(Block), Options, 0),
  256    BlockMS is integer(Block*1000),
  257    start_templ(Streams, Starts0, Starts, StreamArgs, Options),
  258    CmdArgs = [count, Count, block, BlockMS, streams | StreamArgs].
  259xread_command_args(Streams, Starts0, Starts, CmdArgs, Options) :-
  260    option(block(Block), Options, 0),
  261    BlockMS is integer(Block*1000),
  262    start_templ(Streams, Starts0, Starts, StreamArgs, Options),
  263    CmdArgs = [block, BlockMS, streams | StreamArgs].
  264
  265start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
  266    option(starts(Starts0), Options),
  267    !,
  268    length(Streams, Len),
  269    length(Starts, Len),
  270    length(Starts0, LenS),
  271    (   LenS =:= Len
  272    ->  true
  273    ;   domain_error(xread_starts, Starts0)
  274    ),
  275    append(Streams, Starts, StreamArgs).
  276start_templ(Streams, Starts0, Starts, StreamArgs, Options) :-
  277    option(start(Start), Options, $),
  278    !,
  279    length(Streams, Len),
  280    length(Starts, Len),
  281    length(Starts0, Len),
  282    maplist(=(Start), Starts0),
  283    append(Streams, Starts, StreamArgs).
  284
  285listen_loop(Redis, Starts, CommandTempl, OnBroadcast, OnIdle, Streams, Options) :-
  286    copy_term(CommandTempl, Starts-Command),
  287    (   redis(Redis, Command, Reply),
  288        Reply \== nil
  289    ->  dispatch_streams(Reply, Redis, Streams, Starts, NewStarts,
  290                         OnBroadcast, OnIdle, Options)
  291    ;   NewStarts = Starts
  292    ),
  293    call(OnIdle, Redis, Streams, Starts, NewStarts, Options),
  294    listen_loop(Redis, NewStarts, CommandTempl,
  295                OnBroadcast, OnIdle, Streams, Options).
  296
  297dispatch_streams([], _, _, Starts, NewStarts, _, _, _) :-
  298    maplist(copy_start, Starts, NewStarts).
  299dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
  300                 OnBroadcast, OnIdle, Options) :-
  301    stream_tuple(Tuple, StreamS, []),
  302    atom_string(Stream, StreamS),
  303    !,                                  % xreadgroup: no more old pending stuff
  304    set_start(Stream, _Start, >, Streams, Starts, NewStarts),
  305    dispatch_streams(T, Redis, Streams, Starts, NewStarts,
  306                     OnBroadcast, OnIdle, Options).
  307dispatch_streams([Tuple|T], Redis, Streams, Starts, NewStarts,
  308                 OnBroadcast, OnIdle, Options) :-
  309    stream_tuple(Tuple, StreamS, Messages),
  310    atom_string(Stream, StreamS),
  311    set_start(Stream, Start, NewStart, Streams, Starts, NewStarts),
  312    dispatch_messages(Messages, Stream, Redis, Start, NewStart,
  313                      OnBroadcast, Options),
  314    dispatch_streams(T, Redis, Streams, Starts, NewStarts,
  315                     OnBroadcast, OnIdle, Options).
  316
  317stream_tuple(Stream-Messages, Stream, Messages) :- !.
  318stream_tuple([Stream,Messages], Stream, Messages).
  319
  320set_start(Stream, Old, New, [Stream|_], [Old|_], [New|_]) :-
  321    !.
  322set_start(Stream, Old, New, [_|Streams], [_|OldStarts], [_|NewStarts]) :-
  323    set_start(Stream, Old, New, Streams, OldStarts, NewStarts).
  324
  325copy_start(Old, New) :-
  326    (   var(New)
  327    ->  Old = New
  328    ;   true
  329    ).
  330
  331%!  dispatch_messages(+Messages, +Stream, +Redis, +Start0, -Start) is det
  332
  333dispatch_messages([], _, _, Start, Start, _, _).
  334dispatch_messages([[Start,Data]|T], Stream, Redis, Start0, NewStart,
  335                  OnBroadcast, Options) :-
  336    dispatch_message(Data, Stream, Redis, Start, OnBroadcast, Options),
  337    join_starts(Start0, Start, Start1),
  338    dispatch_messages(T, Stream, Redis, Start1, NewStart, OnBroadcast, Options).
  339
  340dispatch_message(Data, Stream, Redis, Id, OnBroadcast, Options) :-
  341    (   Data == nil                     % when does this happen?
  342    ->  Dict = redis{}
  343    ;   redis_array_dict(Data, redis, Dict)
  344    ),
  345    call(OnBroadcast, Redis, Stream, Id, Dict, Options).
  346
  347join_starts(>, _Start, >) :-
  348    !.
  349join_starts(_Start0, Start, Start).
  350
  351		 /*******************************
  352		 *           CONSUMERS		*
  353		 *******************************/
  354
  355%!  xlisten_group(+Redis, +Group, +Consumer, +Streams, +Options)
  356%
  357%   Listen as Consumer to Group. This is  similar to xlisten/3, with the
  358%   following differences:
  359%
  360%     - Instead of using broadcast/1, broadcast_request/1 is used and
  361%       the message is only considered processed if broadcast_request/1
  362%       succeeds.  If the message is handled with success, an ``XACK``
  363%       is sent to the server.
  364%
  365%   Options processed:
  366%
  367%     - block(+Seconds)
  368%       Causes ``XREADGROUP`` to return with timeout when no messages
  369%       arrive within Seconds.  On a timeout, xidle_group/5 is called
  370%       which will try to handle messages to other consumers pending
  371%       longer than Seconds. Choosing the time depends on the
  372%       application.  Notably:
  373%         - Using a time shorter than the required processing time
  374%           will make the job migrate from consumer to consumer until
  375%           max_deliveries(Count) is exceeded.  Note that the original
  376%           receiver does not notice that the job is claimed and thus
  377%           multiple consumers may ultimately answer the message.
  378%         - Using a too long time causes an unnecessarily long delay
  379%           if a node fails.
  380%     - max_deliveries(+Count)
  381%       Re-deliver (using ``XCLAIM``) a message max Count times.
  382%       Exceeding this calls xhook/2.  Default Count is `3`.
  383%     - max_claim(+Count)
  384%       Do not claim more than Count messages during a single idle
  385%       action.  Default is `10`.
  386
  387xlisten_group(Redis, Group, Consumer, Streams, Options) :-
  388    catch(xlisten(Redis, Streams, xbroadcast_group, xidle_group,
  389                  [ group(Group-Consumer),
  390                    start(0)
  391                  | Options
  392                  ]),
  393          redis(stop(Leave)),
  394          true),
  395    (   Leave == true
  396    ->  xleave_group(Redis, Group, Consumer, Streams)
  397    ;   true
  398    ).
  399
  400xbroadcast_group(Connection, Stream, Id, Dict, Options) :-
  401    option(group(Group-Consumer), Options),
  402    redis_id(Connection, RedisId),
  403    (   catch(broadcast_request(redis_consume(Stream, Dict,
  404                                              _{redis:RedisId,
  405                                                message:Id,
  406                                                group:Group,
  407                                                consumer:Consumer})),
  408              Error, xbroadcast_error(Error, Connection, Stream, Group, Id))
  409    ->  redis(Connection, xack(Stream, Group, Id))
  410    ;   true
  411    ).
  412
  413xbroadcast_error(redis(stop(Unregister)), Connection, Stream, Group, Id) :-
  414    !,
  415    redis(Connection, xack(Stream, Group, Id), _),
  416    throw(redis(stop(Unregister))).
  417xbroadcast_error(Error, _Connection, _Stream, _Group, _Id) :-
  418    print_message(error, Error),
  419    fail.
  420
  421%!  xidle_group(+Redis, +Streams, +Starts, +NewStarts, +Options) is det.
  422%
  423%   Called after ``XREADGROUP`` returns and   the  returned messages (if
  424%   any) have been processed. If `Start   == NewStarts` no messages have
  425%   been processed, indicating a timeout.
  426%
  427%   This implementation looks for idle messages   on  other consumer and
  428%   will try to claim them.
  429%
  430%   @tbd: max time to work on other consumers messages?
  431
  432xidle_group(Redis, Streams, Starts, Starts, Options) :- % Idle time
  433    !,
  434    option(group(Group-_Consumer), Options),
  435    claim(Streams, Redis, Group, 0, _Claimed, Options).
  436xidle_group(_Redis, _Streams, _Starts, _NewStarts, _Options).
  437
  438claim([], _, _, Claimed, Claimed, _).
  439claim([Stream|ST], Redis, Group, Claimed0, Claimed, Options) :-
  440    claim_for_stream(Redis, Stream, Group, Claimed0, Claimed1, Options),
  441    claim(ST, Redis, Group, Claimed1, Claimed, Options).
  442
  443claim_for_stream(Redis, Stream, Group, Claimed0, Claimed, Options) :-
  444    check_limit_claimed(Claimed0, Options),
  445    redis(Redis, xpending(Stream, Group), [Count,_Oldest,_Newest, Cons]),
  446    Count > 0,
  447    !,
  448    debug(redis(claim), '~D pending messages on ~p for ~p (Consumers = ~p)',
  449          [Count, Stream, Group, Cons]),
  450    claim_for_stream_for_consumers(Cons, Redis, Stream, Group,
  451                                   Claimed0, Claimed, Options).
  452claim_for_stream(_Redis, _Stream, _Group, Claimed, Claimed, _Options).
  453
  454claim_for_stream_for_consumers([], _Redis, _Stream, _Group,
  455                               Claimed, Claimed, _Options).
  456claim_for_stream_for_consumers([C|T], Redis, Stream, Group,
  457                               Claimed0, Claimed, Options) :-
  458    claim_for_stream_for_consumer(Redis, Stream, Group, C,
  459                                  Claimed0, Claimed1, Options),
  460    claim_for_stream_for_consumers(T, Redis, Stream, Group,
  461                                   Claimed1, Claimed, Options).
  462
  463claim_for_stream_for_consumer(Redis, Stream, Group, [Consumer,_Pending],
  464                              Claimed0, Claimed, Options) :-
  465    redis(Redis, xpending(Stream, Group, -, +, 10, Consumer), Reply),
  466    claim_messages(Reply, Redis, Stream, Group, Claimed0, Claimed, Options).
  467
  468claim_messages([], _Redis, _Stream, _Group, Claimed, Claimed, _Options).
  469claim_messages([H|T], Redis, Stream, Group, Claimed0, Claimed, Options) :-
  470    debug(redis(claim), 'Found pending ~p', [H]),
  471    claim_message(H, Redis, Stream, Group, Claimed0, Claimed1, Options),
  472    claim_messages(T, Redis, Stream, Group, Claimed1, Claimed, Options).
  473
  474claim_message([Id,Consumer,Idle,Delivered], Redis, Stream, Group,
  475              Claimed0, Claimed, Options) :-
  476    option(block(Block), Options),
  477    BlockMS is integer(Block*1000),
  478    Idle > BlockMS,
  479    check_limit_deliveries(Redis, Stream, Delivered, Id, Options),
  480    check_limit_claimed(Claimed0, Options),
  481    option(group(Group-Me), Options),
  482    debug(redis(claim), '~p: trying to claim ~p from ~p (idle ~D)',
  483          [Me, Id, Consumer, Idle]),
  484    redis(Redis, xclaim(Stream, Group, Me, BlockMS, Id), ClaimedMsgs),
  485    !,
  486    Claimed is Claimed0 + 1,
  487    debug(redis(claimed), '~p: claimed ~p', [Me, ClaimedMsgs]),
  488    dispatch_claimed(ClaimedMsgs, Redis, Stream, Options).
  489claim_message(_Msg, _Redis, _Stream, _Group, Claimed, Claimed, _Options).
  490
  491dispatch_claimed([], _Redis, _Stream, _Options).
  492dispatch_claimed([[MsgId,MsgArray]|Msgs], Redis, Stream, Options) :-
  493    redis_array_dict(MsgArray, _, Dict),
  494    xbroadcast_group(Redis, Stream, MsgId, Dict, Options),
  495    dispatch_claimed(Msgs, Redis, Stream, Options).
  496
  497
  498%!  check_limit_deliveries(+Redis, +Stream, +Delivered, +Id, +Options)
  499%
  500%   If a message gets delivered to several   nodes and none of the nodes
  501%   is able to process it, we should stop  trying to do so at some point
  502%   because the failure is  most  likely   due  to  persistent error and
  503%   piling up such messages will harm the cluster.
  504
  505check_limit_deliveries(_Redis, _Stream, Delivered, _Id, Options) :-
  506    option(max_deliveries(Max), Options, 3),
  507    Delivered =< Max,
  508    !.
  509check_limit_deliveries(Redis, Stream, Delivered, Id, Options) :-
  510    option(group(Group-_Me), Options),
  511    (   xhook(Stream, delivery_failed(Id,Group,Delivered))
  512    ->  true
  513    ;   print_message(warning, redis(delivery_failed(Id,Group,Delivered))),
  514        redis(Redis, xack(Stream, Group, Id))
  515    ),
  516    fail.
  517
  518check_limit_claimed(Claimed0, Options) :-
  519    option(max_claim(Max), Options, 10),
  520    Claimed0 < Max.
  521
  522
  523%!  xleave_group(+Redis, +Group, +Consumer, +Streams) is det.
  524%
  525%   Remove Consumer from Group.
  526%
  527%   @tbd ``XGROUP DELCONSUMER`` only takes a single stream.  Why?
  528
  529xleave_group(Redis, Group, Consumer, [Stream|_]) :-
  530    redis(Redis, xgroup(delconsumer, Stream, Group, Consumer), _).
  531
  532%!  xconsumer_stop(+Leave)
  533%
  534%   May be called from a consumer listener   to  stop the consumer. This
  535%   predicate throws the exception redis(stop(Leave)),   which is caught
  536%   by xlisten_group/5.
  537
  538xconsumer_stop(Leave) :-
  539    throw(redis(stop(Leave))).
  540
  541
  542		 /*******************************
  543		 *             HOOKS		*
  544		 *******************************/
  545
  546%!  xhook(+Stream, +Event)
  547%
  548%   This multifile predicate is called on certain stream events. Defined
  549%   events are:
  550%
  551%     - delivery_failed(Id,Group,Delivered)
  552%       A message was delivered more than specified by max_deliveries/1
  553%       of xlisten_group/5.  Id is the message id, Group the group and
  554%       Delivered the current delivery count.  If the hooks fails, the
  555%       message is acknowledged using ``XACK``.  From [introduction
  556%       to streams](https://redis.io/topics/streams-intro):
  557%
  558%       > "So once the deliveries counter reaches a given large number
  559%       > that you chose, it is probably wiser to put such messages in
  560%       > another stream and send a notification to the system
  561%       > administrator. This is basically the way that Redis streams
  562%       > implement the concept of the dead letter."
  563
  564
  565		 /*******************************
  566		 *            MESSAGES		*
  567		 *******************************/
  568
  569:- multifile prolog:message//1.  570
  571prolog:message(redis(Message)) -->
  572    [ 'REDIS: '-[] ],
  573    redis_message(Message).
  574
  575redis_message(delivery_failed(Id,Group,Delivered)) -->
  576    [ 'Failed to deliver ~p to group ~p (tried ~D times)'-
  577      [Id, Group, Delivered]
  578    ]