View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2022, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81	      type_error/2,
   82              instantiation_error/1,
   83              uninstantiation_error/1,
   84              existence_error/2,
   85              existence_error/3
   86            ]).   87:- autoload(library(lazy_lists), [lazy_list/2]).   88:- autoload(library(lists), [append/3, member/2]).   89:- autoload(library(option), [merge_options/3, option/2,
   90			      option/3, select_option/4]).   91:- autoload(library(pairs), [group_pairs_by_key/2]).   92:- autoload(library(time), [call_with_time_limit/2]).   93:- use_module(library(debug), [debug/3, assertion/1]).   94:- use_module(library(settings), [setting/4, setting/2]).   95:- if(exists_source(library(ssl))).   96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   97:- endif.   98
   99:- use_foreign_library(foreign(redis4pl)).  100
  101:- setting(max_retry_count, nonneg, 8640, % one day
  102           "Max number of retries").  103:- setting(max_retry_wait, number, 10,
  104           "Max time to wait between recovery attempts").  105:- setting(sentinel_timeout, number, 0.2,
  106	   "Time to wait for a sentinel").  107
  108:- predicate_options(redis_server/3, 3,
  109                     [ pass_to(redis:redis_connect/3, 3)
  110                     ]).  111:- predicate_options(redis_connect/3, 3,
  112                     [ reconnect(boolean),
  113                       user(atom),
  114                       password(atomic),
  115                       version(between(2,3))
  116                     ]).  117:- predicate_options(redis_disconnect/2, 2,
  118                     [ force(boolean)
  119                     ]).  120:- predicate_options(redis_scan/3, 3,
  121                     [ match(atomic),
  122                       count(nonneg),
  123                       type(atom)
  124                     ]).  125% Actually not passing, but the same
  126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  129
  130
  131/** <module> Redis client
  132
  133This library is a client  to   [Redis](https://redis.io),  a popular key
  134value store to  deal  with  caching   and  communication  between  micro
  135services.
  136
  137In the typical use case we register  the   details  of one or more Redis
  138servers using redis_server/3. Subsequenly, redis/2-3   is  used to issue
  139commands on the server.  For example:
  140
  141```
  142?- redis_server(default, redis:6379, [password("secret")]).
  143?- redis(default, set(user, "Bob")).
  144?- redis(default, get(user), User).
  145User = "Bob"
  146```
  147*/
  148
  149:- dynamic server/3.  150
  151:- dynamic ( connection/2,              % ServerName, Stream
  152	     sentinel/2			% Pool, Address
  153           ) as volatile.  154
  155%!  redis_server(+ServerName, +Address, +Options) is det.
  156%
  157%   Register a redis server without  connecting   to  it. The ServerName
  158%   acts as a lazy connection alias.  Initially the ServerName `default`
  159%   points at `localhost:6379` with no   connect  options. The `default`
  160%   server is used for redis/1 and redis/2 and may be changed using this
  161%   predicate.  Options are described with redis_connect/3.
  162%
  163%   Connections  established  this  way  are  by  default  automatically
  164%   reconnected if the connection  is  lost   for  some  reason unless a
  165%   reconnect(false) option is specified.
  166
  167redis_server(Alias, Address, Options) :-
  168    must_be(ground, Alias),
  169    retractall(server(Alias, _, _)),
  170    asserta(server(Alias, Address, Options)).
  171
  172server(default, localhost:6379, []).
  173
  174%!  redis_connect(-Connection) is det.
  175%!  redis_connect(+Address, -Connection, +Options) is det.
  176%!  redis_connect(-Connection, +Host, +Port) is det.
  177%
  178%   Connect to a redis server. The  main mode is redis_connect(+Address,
  179%   -Connection,   +Options).   redis_connect/1   is     equivalent   to
  180%   redis_connect(localhost:6379, Connection, []).  Options:
  181%
  182%     - reconnect(+Boolean)
  183%       If `true`, try to reconnect to the service when the connection
  184%       seems lost.  Default is `true` for connections specified using
  185%       redis_server/3 and `false` for explictly opened connections.
  186%     - user(+User)
  187%       If version(3) and password(Password) are specified, these
  188%       are used to authenticate using the `HELLO` command.
  189%     - password(+Password)
  190%       Authenticate using Password
  191%     - version(+Version)
  192%       Specify the connection protocol version.  Initially this is
  193%       version 2.  Redis 6 also supports version 3.  When specified
  194%       as `3`, the `HELLO` command is used to upgrade the protocol.
  195%     - tls(true)
  196%       When specified, initiate a TLS connection.  If this option is
  197%       specified we must also specify the `cacert`, `key` and `cert`
  198%       options.
  199%     - cacert(+File)
  200%       CA Certificate file to verify with.
  201%     - cert(+File)
  202%       Client certificate to authenticate with.
  203%     - key(+File)
  204%       Private key file to authenticate with.
  205%     - sentinels(+ListOfAddresses)
  206%       Used together with an Address of the form sentinel(MasterName)
  207%       to enable contacting a network of Redis servers guarded by a
  208%       sentinel network.
  209%     - sentinel_user(+User)
  210%     - sentinel_password(+Password)
  211%       Authentication information for the senitels.  When omitted we
  212%       try to connect withour authentication.
  213%
  214%   Instead of using these predicates, redis/2  and redis/3 are normally
  215%   used with a _server name_  argument registered using redis_server/3.
  216%   These  predicates  are  meant  for   creating  a  temporary  paralel
  217%   connection or using a connection with a _blocking_ call.
  218%
  219%   @compat   redis_connect(-Connection,   +Host,     +Port)    provides
  220%   compatibility to the original GNU-Prolog interface and is equivalent
  221%   to redis_connect(Host:Port, Connection, []).
  222%
  223%   @arg Address is a term Host:Port, unix(File) or the name of a server
  224%   registered  using  redis_server/3.  The  latter   realises  a  _new_
  225%   connection that is typically used for   blocking redis commands such
  226%   as listening for published messages, waiting on a list or stream.
  227
  228redis_connect(Conn) :-
  229    redis_connect(default, Conn, []).
  230
  231redis_connect(Conn, Host, Port) :-
  232    var(Conn),
  233    ground(Host), ground(Port),
  234    !,                                  % GNU-Prolog compatibility
  235    redis_connect(Host:Port, Conn, []).
  236redis_connect(Server, Conn, Options) :-
  237    atom(Server),
  238    !,
  239    (   server(Server, Address, DefaultOptions)
  240    ->  merge_options(Options, DefaultOptions, Options2),
  241        do_connect(Server, Address, Conn, [address(Address)|Options2])
  242    ;   existence_error(redis_server, Server)
  243    ).
  244redis_connect(Address, Conn, Options) :-
  245    do_connect(Address, Address, Conn, [address(Address)|Options]).
  246
  247%!  do_connect(+Id, +Address, -Conn, +Options)
  248%
  249%   Open the connection.  A connection is a compound term of the shape
  250%
  251%       redis_connection(Id, Stream, Failures, Options)
  252
  253do_connect(Id, sentinel(Pool), Conn, Options) =>
  254    sentinel_master(Id, Pool, Conn, Options).
  255do_connect(Id, Address0, Conn, Options) =>
  256    tcp_address(Address0, Address),
  257    tcp_connect(Address, Stream0, Options),
  258    tls_upgrade(Address, Stream0, Stream, Options),
  259    Conn = redis_connection(Id, Stream, 0, Options),
  260    hello(Conn, Options).
  261
  262tcp_address(unix(Path), Path) :-
  263    !.                                  % Using an atom is ambiguous
  264tcp_address(Address, Address).
  265
  266%!  tls_upgrade(+Address, +Raw, -Stream, +Options) is det.
  267%
  268%   Upgrade to a TLS connection when tls(true) is specified.
  269
  270:- if(current_predicate(ssl_context/3)).  271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  272    option(tls(true), Options),
  273    !,
  274    must_have_option(cacert(CacertFile), Options),
  275    must_have_option(key(KeyFile), Options),
  276    must_have_option(cert(CertFile), Options),
  277    ssl_context(client, SSL,
  278		[ host(Host),
  279		  certificate_file(CertFile),
  280		  key_file(KeyFile),
  281		  cacerts([file(CacertFile)]),
  282		  cert_verify_hook(tls_verify),
  283		  close_parent(true)
  284		]),
  285    stream_pair(Raw, RawRead, RawWrite),
  286    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  287    stream_pair(Stream, Read, Write).
  288:- endif.  289tls_upgrade(_, Stream, Stream, _).
  290
  291:- if(current_predicate(ssl_context/3)).  292
  293%!  tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet.
  294%
  295%   Accept  or reject  the certificate  verification.  Similar  to the
  296%   Redis  command   line  client   (``redis-cli``),  we   accept  the
  297%   certificate as long as it is signed, not verifying the hostname.
  298
  299:- public tls_verify/5.  300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  301    !.
  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  305    fail.
  306
  307:- endif.  308
  309%!  sentinel_master(+ServerId, +SetinelPool, -Connection, +Options) is det.
  310%
  311%   Discover the master and connect to it.
  312
  313sentinel_master(Id, Pool, Master, Options) :-
  314    must_have_option(sentinels(Sentinels), Options),
  315    sentinel_auth(Options, Options1),
  316    setting(sentinel_timeout, TMO),
  317    (   sentinel(Pool, Sentinel)
  318    ;   member(Sentinel, Sentinels)
  319    ),
  320    catch(call_with_time_limit(
  321	      TMO,
  322	      do_connect(Id, Sentinel, Conn,
  323			 [sentinel(true)|Options1])),
  324	  Error,
  325	  (print_message(warning, Error),fail)),
  326    !,
  327    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  328    call_cleanup(
  329	query_sentinel(Pool, Conn, Sentinel, MasterAddr),
  330	redis_disconnect(Conn)),
  331    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  332    do_connect(Id, MasterAddr, Master, Options),
  333    debug(redis(sentinel), 'Connected to claimed master', []),
  334    redis(Master, role, Role),
  335    (   Role = [master|_Slaves]
  336    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  337    ;   redis_disconnect(Master),
  338	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  339	sleep(TMO),
  340	sentinel_master(Id, Pool, Master, Options)
  341    ).
  342
  343sentinel_auth(Options0, Options) :-
  344    option(sentinel_user(User), Options0),
  345    option(sentinel_password(Passwd), Options0),
  346    !,
  347    merge_options([user(User), password(Passwd)], Options0, Options).
  348sentinel_auth(Options0, Options) :-
  349    select_option(password(_), Options0, Options, _).
  350
  351
  352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
  353    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  354    MasterData = [Host,Port],
  355    redis(Conn, sentinel(sentinels, Pool), Peers),
  356    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  357
  358update_known_sentinels(Pool, Sentinel, Peers) :-
  359    retractall(sentinel(Pool, _)),
  360    maplist(update_peer_sentinel(Pool), Peers),
  361    asserta(sentinel(Pool, Sentinel)).
  362
  363update_peer_sentinel(Pool, Attrs),
  364  memberchk(ip-Host, Attrs),
  365  memberchk(port-Port, Attrs) =>
  366    asserta(sentinel(Pool, Host:Port)).
  367
  368must_have_option(Opt, Options) :-
  369    option(Opt, Options),
  370    !.
  371must_have_option(Opt, Options) :-
  372    existence_error(option, Opt, Options).
  373
  374%!  hello(+Connection, +Option)
  375%
  376%   Initialize the connection. This is  used   to  upgrade  to the RESP3
  377%   protocol and/or to authenticate.
  378
  379hello(Con, Options) :-
  380    option(version(V), Options),
  381    V >= 3,
  382    !,
  383    (   option(user(User), Options),
  384        option(password(Password), Options)
  385    ->  redis(Con, hello(3, auth, User, Password))
  386    ;   redis(Con, hello(3))
  387    ).
  388hello(Con, Options) :-
  389    option(password(Password), Options),
  390    !,
  391    redis(Con, auth(Password)).
  392hello(_, _).
  393
  394%!  redis_stream(+Spec, --Stream, +DoConnect) is det.
  395%
  396%   Get the stream to a Redis server from  Spec. Spec is either the name
  397%   of       a       registered       server       or       a       term
  398%   redis_connection(Id,Stream,Failures,Options).  If  the    stream  is
  399%   disconnected it will be reconnected.
  400
  401redis_stream(Var, S, _) :-
  402    (   var(Var)
  403    ->  !, instantiation_error(Var)
  404    ;   nonvar(S)
  405    ->  !, uninstantiation_error(S)
  406    ).
  407redis_stream(ServerName, S, Connect) :-
  408    atom(ServerName),
  409    !,
  410    (   connection(ServerName, S0)
  411    ->  S = S0
  412    ;   Connect == true,
  413        server(ServerName, Address, Options)
  414    ->  redis_connect(Address, Connection, Options),
  415        redis_stream(Connection, S, false),
  416        asserta(connection(ServerName, S))
  417    ;   existence_error(redis_server, ServerName)
  418    ).
  419redis_stream(redis_connection(_,S0,_,_), S, _) :-
  420    S0 \== (-),
  421    !,
  422    S = S0.
  423redis_stream(Redis, S, _) :-
  424    Redis = redis_connection(Id,-,_,Options),
  425    option(address(Address), Options),
  426    do_connect(Id,Address,Redis2,Options),
  427    arg(2, Redis2, S0),
  428    nb_setarg(2, Redis, S0),
  429    S = S0.
  430
  431has_redis_stream(Var, _) :-
  432    var(Var),
  433    !,
  434    instantiation_error(Var).
  435has_redis_stream(Alias, S) :-
  436    atom(Alias),
  437    !,
  438    connection(Alias, S).
  439has_redis_stream(redis_connection(_,S,_,_), S) :-
  440    S \== (-).
  441
  442
  443%!  redis_disconnect(+Connection) is det.
  444%!  redis_disconnect(+Connection, +Options) is det.
  445%
  446%   Disconnect from a redis server. The   second  form takes one option,
  447%   similar to close/2:
  448%
  449%     - force(Force)
  450%       When `true` (default `false`), do not raise any errors if
  451%       Connection does not exist or closing the connection raises
  452%       a network or I/O related exception.  This version is used
  453%       internally if a connection is in a broken state, either due
  454%       to a protocol error or a network issue.
  455
  456redis_disconnect(Redis) :-
  457    redis_disconnect(Redis, []).
  458
  459redis_disconnect(Redis, Options) :-
  460    option(force(true), Options),
  461    !,
  462    (   Redis = redis_connection(_Id, S, _, _Opts)
  463    ->  (   S == (-)
  464        ->  true
  465        ;   close(S, [force(true)]),
  466            nb_setarg(2, Redis, -)
  467        )
  468    ;   has_redis_stream(Redis, S)
  469    ->  close(S, [force(true)]),
  470        retractall(connection(_,S))
  471    ;   true
  472    ).
  473redis_disconnect(Redis, _Options) :-
  474    redis_stream(Redis, S, false),
  475    close(S),
  476    retractall(connection(_,S)).
  477
  478%!  redis(+Connection, +Request) is semidet.
  479%
  480%   This predicate is overloaded to handle two types of requests. First,
  481%   it is a shorthand for `redis(Connection, Command, _)` and second, it
  482%   can be used to exploit  Redis   _pipelines_  and _transactions_. The
  483%   second form is acticated if Request is  a _list_. In that case, each
  484%   element of the list is either a term  `Command -> Reply` or a simple
  485%   `Command`. Semantically this represents a   sequence  of redis/3 and
  486%   redis/2 calls.  It differs in the following aspects:
  487%
  488%     - All commands are sent in one batch, after which all replies are
  489%       read.  This reduces the number of _round trips_ and typically
  490%       greatly improves performance.
  491%     - If the first command is `multi` and the last `exec`, the
  492%       commands are executed as a Redis _transaction_, i.e., they
  493%       are executed _atomically_.
  494%     - If one of the commands returns an error, the subsequent commands
  495%       __are still executed__.
  496%     - You can not use variables from commands earlier in the list for
  497%       commands later in the list as a result of the above execution
  498%       order.
  499%
  500%   Procedurally, the process takes the following steps:
  501%
  502%     1. Send all commands
  503%     2. Read all replies and push messages
  504%     3. Handle all callbacks from push messages
  505%     4. Check whether one of the replies is an error.  If so,
  506%        raise this error (subsequent errors are lost)
  507%     5. Bind all replies for the `Command -> Reply` terms.
  508%
  509%   Examples
  510%
  511%   ```
  512%   ?- redis(default,
  513%            [ lpush(li,1),
  514%              lpush(li,2),
  515%              lrange(li,0,-1) -> List
  516%            ]).
  517%   List = ["2", "1"].
  518%   ```
  519
  520redis(Redis, PipeLine) :-
  521    is_list(PipeLine),
  522    !,
  523    redis_pipeline(Redis, PipeLine).
  524redis(Redis, Req) :-
  525    redis(Redis, Req, _).
  526
  527%!  redis(+Connection, +Command, -Reply) is semidet.
  528%
  529%   Execute a redis Command on  Connnection.   Next,  bind  Reply to the
  530%   returned result. Command is a  callable   term  whose functor is the
  531%   name of the Redis command  and   whose  arguments  are translated to
  532%   Redis arguments according to the rules below.  Note that all text is
  533%   always represented using UTF-8 encoding.
  534%
  535%     - Atomic values are emitted verbatim
  536%     - A term A:B:... where all arguments are either atoms,
  537%       strings or integers (__no floats__) is translated into
  538%       a string `"A:B:..."`.  This is a common shorthand for
  539%       representing Redis keys.
  540%     - A term Term as prolog is emitted as "\u0000T\u0000" followed
  541%       by Term in canonical form.
  542%     - Any other term is emitted as write/1.
  543%
  544%   Reply is either a plain term (often a  variable) or a term `Value as
  545%   Type`. In the latter form,  `Type`   dictates  how  the Redis _bulk_
  546%   reply is translated to Prolog. The default equals to `auto`, i.e.,
  547%   as a number of the content satisfies the Prolog number syntax and
  548%   as an atom otherwise.
  549%
  550%     - status(Atom)
  551%       Returned if the server replies with ``+ Status``.  Atom
  552%       is the textual value of `Status` converted to lower case,
  553%       e.g., status(ok) or status(pong).
  554%     - `nil`
  555%       This atom is returned for a NIL/NULL value.  Note that if
  556%       the reply is only `nil`, redis/3 _fails_.  The `nil` value
  557%       may be embedded inside lists or maps.
  558%     - A number
  559%       Returned if the server replies an integer (":Int"), double
  560%       (",Num") or big integer ("(Num")
  561%     - A string
  562%       Returned on a _bulk_ reply.  Bulk replies are supposed to be
  563%       in UTF-8 encoding.  The the bulk reply starts with
  564%       "\u0000T\u0000" it is supposed to be a Prolog term.
  565%       Note that this intepretation means it is __not__ possible
  566%       to read arbitrary binary blobs.
  567%     - A list of replies.  A list may also contain `nil`.  If Reply
  568%       as a whole would be `nil` the call fails.
  569%     - A list of _pairs_.  This is returned for the redis version 3
  570%       protocol "%Map".  Both the key and value respect the same
  571%       rules as above.
  572%
  573%   Redis _bulk_ replies are translated depending  on the `as` `Type` as
  574%   explained above.
  575%
  576%     - string
  577%     - string(Encoding)
  578%       Create a SWI-Prolog string object interpreting the blob as
  579%       following Encoding. Encoding is a restricted set of SWI-Prolog's
  580%       encodings: `bytes` (`iso_latin_1`), `utf8` and `text` (the
  581%       current locale translation).
  582%     - atom
  583%     - atom(Encoding)
  584%       As above, producing an atom.
  585%     - codes
  586%     - codes(Encoding)
  587%       As above, producing a list of integers (Unicode code points)
  588%     - chars
  589%     - chars(Encoding)
  590%       As above, producing a list of one-character atoms.
  591%     - integer
  592%     - float
  593%     - rational
  594%     - number
  595%       Interpret the bytes as a string representing a number.  If
  596%       the string does not represent a number of the requested type
  597%       a type_error(Type, String) is raised.
  598%     - tagged_integer
  599%       Same as integer, but demands the value to be between the Prolog
  600%       flags `min_tagged_integer` and `max_tagged_integer`, allowing
  601%       the value to be used as a dict key.
  602%     - auto
  603%       Same as auto(atom, number)
  604%     - auto(AsText,AsNumber)
  605%       If the bulk string confirms the syntax of AsNumber, convert
  606%       the value to the requested numberical type.  Else convert
  607%       the value to text according to AsText.  This is similar to
  608%       the Prolog predicate name/2.
  609%     - dict_key
  610%       Alias for auto(atom,tagged_integer).  This allows the value
  611%       to be used as a key for a SWI-Prolog dict.
  612%     - pairs(AsKey, AsValue)
  613%       Convert a map or array of even length into pairs for which the
  614%       key satisfies AsKey and the value AsValue.  The `pairs` type
  615%       can also be applied to a Redis array.  In this case the array
  616%       length must be even.  This notably allows fetching a Redis
  617%       _hash_ as pairs using ``HGETALL`` using version 2 of the
  618%       Redis protocol.
  619%     - dict(AsKey, AsValue)
  620%       Similar to pairs(AsKey, AsValue), but convert the resulting
  621%       pair list into a SWI-Prolog dict.  AsKey must convert to a
  622%       valid dict key, i.e., an atom or tagged integer. See `dict_key`.
  623%     - dict(AsValue)
  624%       Shorthand for dict(dict_key, AsValue).
  625%
  626%   Here are some simple examples
  627%
  628%   ```
  629%   ?- redis(default, set(a, 42), X).
  630%   X = status("OK").
  631%   ?- redis(default, get(a), X).
  632%   X = "42".
  633%   ?- redis(default, get(a), X as integer).
  634%   X = 42.
  635%   ?- redis(default, get(a), X as float).
  636%   X = 42.0.
  637%   ?- redis(default, set(swipl:version, 8)).
  638%   true.
  639%   ?- redis(default, incr(swipl:version), X).
  640%   X = 9.
  641%   ```
  642%
  643%   @error redis_error(Code, String)
  644
  645redis(Redis, Req, Out) :-
  646    out_val(Out, Val),
  647    redis1(Redis, Req, Out),
  648    Val \== nil.
  649
  650out_val(Out, Val) :-
  651    (   nonvar(Out),
  652        Out = (Val as _)
  653    ->  true
  654    ;   Val = Out
  655    ).
  656
  657redis1(Redis, Req, Out) :-
  658    Error = error(Formal, _),
  659    catch(redis2(Redis, Req, Out), Error, true),
  660    (   var(Formal)
  661    ->  true
  662    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  663    ).
  664
  665redis2(Redis, Req, Out) :-
  666    atom(Redis),
  667    !,
  668    redis_stream(Redis, S, true),
  669    with_mutex(Redis,
  670               ( redis_write_msg(S, Req),
  671                 redis_read_stream(Redis, S, Out)
  672               )).
  673redis2(Redis, Req, Out) :-
  674    redis_stream(Redis, S, true),
  675    redis_write_msg(S, Req),
  676    redis_read_stream(Redis, S, Out).
  677
  678%!  redis_pipeline(+Redis, +PipeLine)
  679
  680redis_pipeline(Redis, PipeLine) :-
  681    Error = error(Formal, _),
  682    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  683    (   var(Formal)
  684    ->  true
  685    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  686    ).
  687
  688redis_pipeline2(Redis, PipeLine) :-
  689    atom(Redis),
  690    !,
  691    redis_stream(Redis, S, true),
  692    with_mutex(Redis,
  693               redis_pipeline3(Redis, S, PipeLine)).
  694redis_pipeline2(Redis, PipeLine) :-
  695    redis_stream(Redis, S, true),
  696    redis_pipeline3(Redis, S, PipeLine).
  697
  698redis_pipeline3(Redis, S, PipeLine) :-
  699    maplist(write_pipeline(S), PipeLine),
  700    flush_output(S),
  701    read_pipeline(Redis, S, PipeLine).
  702
  703write_pipeline(S, Command -> _Reply) :-
  704    !,
  705    redis_write_msg_no_flush(S, Command).
  706write_pipeline(S, Command) :-
  707    redis_write_msg_no_flush(S, Command).
  708
  709read_pipeline(Redis, S, PipeLine) :-
  710    E = error(Formal,_),
  711    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  712    (   var(Formal)
  713    ->  true
  714    ;   reconnect_error(E)
  715    ->  redis_disconnect(Redis, [force(true)]),
  716        throw(E)
  717    ;   resync(Redis),
  718        throw(E)
  719    ).
  720
  721read_pipeline2(Redis, S, PipeLine) :-
  722    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  723    maplist(handle_push(Redis), Pushed),
  724    maplist(handle_error, Errors),
  725    maplist(bind_reply, PipeLine, Replies).
  726
  727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  728    !,
  729    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  730redis_read_msg3(S, Var, Reply, Error, Push) :-
  731    redis_read_msg(S, Var, Reply, Error, Push).
  732
  733handle_push(Redis, Pushed) :-
  734    handle_push_messages(Pushed, Redis).
  735handle_error(Error) :-
  736    (   var(Error)
  737    ->  true
  738    ;   throw(Error)
  739    ).
  740bind_reply(_Command -> Reply0, Reply) :-
  741    !,
  742    Reply0 = Reply.
  743bind_reply(_Command, _).
  744
  745
  746%!  recover(+Error, +Redis, :Goal)
  747%
  748%   Error happened while running Goal on Redis. If this is a recoverable
  749%   error (i.e., a network or disconnected peer),  wait a little and try
  750%   running Goal again.
  751
  752:- meta_predicate recover(+, +, 0).  753
  754recover(Error, Redis, Goal) :-
  755    reconnect_error(Error),
  756    auto_reconnect(Redis),
  757    !,
  758    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  759          [Redis, Error]),
  760    redis_disconnect(Redis, [force(true)]),
  761    (   wait_to_retry(Redis, Error)
  762    ->  call(Goal),
  763        retractall(failure(Redis, _))
  764    ;   throw(Error)
  765    ).
  766recover(Error, _, _) :-
  767    throw(Error).
  768
  769auto_reconnect(redis_connection(_,_,_,Options)) :-
  770    !,
  771    option(reconnect(true), Options).
  772auto_reconnect(Server) :-
  773    ground(Server),
  774    server(Server, _, Options),
  775    option(reconnect(true), Options, true).
  776
  777reconnect_error(error(io_error(_Action, _On),_)).
  778reconnect_error(error(socket_error(_Code, _),_)).
  779reconnect_error(error(syntax_error(unexpected_eof),_)).
  780
  781%!  wait(+Redis, +Error)
  782%
  783%   Wait for some time after a failure. First  we wait for 10ms. This is
  784%   doubled on each failure upto the   setting  `max_retry_wait`. If the
  785%   setting `max_retry_count` is exceeded we fail and the called signals
  786%   an exception.
  787
  788:- dynamic failure/2 as volatile.  789
  790wait_to_retry(Redis, Error) :-
  791    redis_failures(Redis, Failures),
  792    setting(max_retry_count, Count),
  793    Failures < Count,
  794    Failures2 is Failures+1,
  795    redis_set_failures(Redis, Failures2),
  796    setting(max_retry_wait, MaxWait),
  797    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  798    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  799    retry_message_level(Failures, Level),
  800    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  801    sleep(Wait).
  802
  803redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  804    !,
  805    Failures = Failures0.
  806redis_failures(Server, Failures) :-
  807    atom(Server),
  808    (   failure(Server, Failures)
  809    ->  true
  810    ;   Failures = 0
  811    ).
  812
  813redis_set_failures(Connection, Count) :-
  814    compound(Connection),
  815    !,
  816    nb_setarg(3, Connection, Count).
  817redis_set_failures(Server, Count) :-
  818    atom(Server),
  819    retractall(failure(Server, _)),
  820    asserta(failure(Server, Count)).
  821
  822retry_message_level(0, warning) :- !.
  823retry_message_level(_, silent).
  824
  825
  826%!  redis(+Request)
  827%
  828%   Connect to the default redis server,   call  redist/3 using Request,
  829%   disconnect and print the result.  This   predicate  is  intended for
  830%   interactive usage.
  831
  832redis(Req) :-
  833    setup_call_cleanup(
  834        redis_connect(default, C, []),
  835        redis1(C, Req, Out),
  836        redis_disconnect(C)),
  837    print(Out).
  838
  839%!  redis_write(+Redis, +Command) is det.
  840%!  redis_read(+Redis, -Reply) is det.
  841%
  842%   Write command and read replies from a Redis server. These are
  843%   building blocks for subscribing to event streams.
  844
  845redis_write(Redis, Command) :-
  846    redis_stream(Redis, S, true),
  847    redis_write_msg(S, Command).
  848
  849redis_read(Redis, Reply) :-
  850    redis_stream(Redis, S, true),
  851    redis_read_stream(Redis, S, Reply).
  852
  853
  854		 /*******************************
  855		 *      HIGH LEVEL ACCESS	*
  856		 *******************************/
  857
  858%!  redis_get_list(+Redis, +Key, -List) is det.
  859%!  redis_get_list(+Redis, +Key, +ChunkSize, -List) is det.
  860%
  861%   Get the content of a Redis list in   List. If ChunkSize is given and
  862%   smaller than the list length, List is returned as a _lazy list_. The
  863%   actual values are requested using   redis  ``LRANGE`` requests. Note
  864%   that this results in O(N^2) complexity. Using   a  lazy list is most
  865%   useful for relatively short lists holding possibly large items.
  866%
  867%   Note that values retrieved are _strings_, unless the value was added
  868%   using `Term as prolog`.
  869%
  870%   @see lazy_list/2 for a discussion  on   the  difference between lazy
  871%   lists and normal lists.
  872
  873redis_get_list(Redis, Key, List) :-
  874    redis_get_list(Redis, Key, -1, List).
  875
  876redis_get_list(Redis, Key, Chunk, List) :-
  877    redis(Redis, llen(Key), Len),
  878    (   (   Chunk >= Len
  879        ;   Chunk == -1
  880        )
  881    ->  (   Len == 0
  882        ->  List = []
  883        ;   End is Len-1,
  884            list_range(Redis, Key, 0, End, List)
  885        )
  886    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  887    ).
  888
  889rlist_next(State, List, Tail) :-
  890    State = s(Redis,Key,Offset,Slice,Len),
  891    End is min(Len-1, Offset+Slice-1),
  892    list_range(Redis, Key, Offset, End, Elems),
  893    (   End =:= Len-1
  894    ->  List = Elems,
  895        Tail = []
  896    ;   Offset2 is Offset+Slice,
  897        nb_setarg(3, State, Offset2),
  898        append(Elems, Tail, List)
  899    ).
  900
  901% Redis LRANGE demands End > Start and returns inclusive.
  902
  903list_range(DB, Key, Start, Start, [Elem]) :-
  904    !,
  905    redis(DB, lindex(Key, Start), Elem).
  906list_range(DB, Key, Start, End, List) :-
  907    !,
  908    redis(DB, lrange(Key, Start, End), List).
  909
  910
  911
  912%!  redis_set_list(+Redis, +Key, +List) is det.
  913%
  914%   Associate a Redis key with a list.  As   Redis  has no concept of an
  915%   empty list, if List is `[]`, Key  is _deleted_. Note that key values
  916%   are always strings in  Redis.  The   same  conversion  rules  as for
  917%   redis/1-3 apply.
  918
  919redis_set_list(Redis, Key, List) :-
  920    redis(Redis, del(Key), _),
  921    (   List == []
  922    ->  true
  923    ;   Term =.. [rpush,Key|List],
  924        redis(Redis, Term, _Count)
  925    ).
  926
  927
  928%!  redis_get_hash(+Redis, +Key, -Data:dict) is det.
  929%!  redis_set_hash(+Redis, +Key, +Data:dict) is det.
  930%
  931%   Put/get a Redis hash as a Prolog  dict. Putting a dict first deletes
  932%   Key. Note that in many cases   applications will manage Redis hashes
  933%   by key. redis_get_hash/3 is notably a   user friendly alternative to
  934%   the Redis ``HGETALL`` command. If the  Redis   hash  is  not used by
  935%   other (non-Prolog) applications one  may   also  consider  using the
  936%   `Term as prolog` syntax to store the Prolog dict as-is.
  937
  938redis_get_hash(Redis, Key, Dict) :-
  939    redis(Redis, hgetall(Key), Dict as dict(auto)).
  940
  941redis_set_hash(Redis, Key, Dict) :-
  942    redis_array_dict(Array, _, Dict),
  943    Term =.. [hset,Key|Array],
  944    redis(Redis, del(Key), _),
  945    redis(Redis, Term, _Count).
  946
  947%!  redis_array_dict(?Array, ?Tag, ?Dict) is det.
  948%
  949%   Translate a Redis reply representing  hash   data  into a SWI-Prolog
  950%   dict. Array is either a list  of   alternating  keys and values or a
  951%   list of _pairs_. When translating to an array, this is always a list
  952%   of alternating keys and values.
  953%
  954%   @arg Tag is the SWI-Prolog dict tag.
  955
  956redis_array_dict(Array, Tag, Dict) :-
  957    nonvar(Array),
  958    !,
  959    array_to_pairs(Array, Pairs),
  960    dict_pairs(Dict, Tag, Pairs).
  961redis_array_dict(TwoList, Tag, Dict) :-
  962    dict_pairs(Dict, Tag, Pairs),
  963    pairs_to_array(Pairs, TwoList).
  964
  965array_to_pairs([], []) :-
  966    !.
  967array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  968    !,                                  % RESP3 returns a map as pairs.
  969    atom_string(Name, NameS),
  970    array_to_pairs(T0, T).
  971array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  972    atom_string(Name, NameS),
  973    array_to_pairs(T0, T).
  974
  975pairs_to_array([], []) :-
  976    !.
  977pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  978    atom_string(Name, NameS),
  979    pairs_to_array(T0, T).
  980
  981%!  redis_scan(+Redis, -LazyList, +Options) is det.
  982%!  redis_sscan(+Redis, +Set, -LazyList, +Options) is det.
  983%!  redis_hscan(+Redis, +Hash, -LazyList, +Options) is det.
  984%!  redis_zscan(+Redis, +Set, -LazyList, +Options) is det.
  985%
  986%   Map the Redis ``SCAN``, ``SSCAN``,   ``HSCAN`` and `ZSCAN`` commands
  987%   into a _lazy list_. For redis_scan/3 and redis_sscan/4 the result is
  988%   a list of strings. For redis_hscan/4   and redis_zscan/4, the result
  989%   is a list of _pairs_.   Options processed:
  990%
  991%     - match(Pattern)
  992%       Adds the ``MATCH`` subcommand, only returning matches for
  993%       Pattern.
  994%     - count(Count)
  995%       Adds the ``COUNT`` subcommand, giving a hint to the size of the
  996%       chunks fetched.
  997%     - type(Type)
  998%       Adds the ``TYPE`` subcommand, only returning answers of the
  999%       indicated type.
 1000%
 1001%   @see lazy_list/2.
 1002
 1003redis_scan(Redis, LazyList, Options) :-
 1004    scan_options([match,count,type], Options, Parms),
 1005    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1006
 1007redis_sscan(Redis, Set, LazyList, Options) :-
 1008    scan_options([match,count,type], Options, Parms),
 1009    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1010
 1011redis_hscan(Redis, Hash, LazyList, Options) :-
 1012    scan_options([match,count,type], Options, Parms),
 1013    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1014
 1015redis_zscan(Redis, Set, LazyList, Options) :-
 1016    scan_options([match,count,type], Options, Parms),
 1017    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1018
 1019scan_options([], _, []).
 1020scan_options([H|T0], Options, [H,V|T]) :-
 1021    Term =.. [H,V],
 1022    option(Term, Options),
 1023    !,
 1024    scan_options(T0, Options, T).
 1025scan_options([_|T0], Options, T) :-
 1026    scan_options(T0, Options, T).
 1027
 1028
 1029scan_next(State, List, Tail) :-
 1030    State = s(Command,Redis,Cursor,Params),
 1031    Command =.. CList,
 1032    append(CList, [Cursor|Params], CList2),
 1033    Term =.. CList2,
 1034    redis(Redis, Term, [NewCursor,Elems0]),
 1035    scan_pairs(Command, Elems0, Elems),
 1036    (   NewCursor == 0
 1037    ->  List = Elems,
 1038        Tail = []
 1039    ;   nb_setarg(3, State, NewCursor),
 1040        append(Elems, Tail, List)
 1041    ).
 1042
 1043scan_pairs(hscan(_), List, Pairs) :-
 1044    !,
 1045    scan_pairs(List, Pairs).
 1046scan_pairs(zscan(_), List, Pairs) :-
 1047    !,
 1048    scan_pairs(List, Pairs).
 1049scan_pairs(_, List, List).
 1050
 1051scan_pairs([], []).
 1052scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1053    !,
 1054    scan_pairs(T0, T).
 1055scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1056    scan_pairs(T0, T).
 1057
 1058
 1059		 /*******************************
 1060		 *              ABOUT		*
 1061		 *******************************/
 1062
 1063%!  redis_current_command(+Redis, ?Command) is nondet.
 1064%!  redis_current_command(+Redis, ?Command, -Properties) is nondet.
 1065%
 1066%   True when Command has Properties. Fails   if Command is not defined.
 1067%   The redis_current_command/3 version  returns   the  command argument
 1068%   specification. See Redis documentation for an explanation.
 1069
 1070redis_current_command(Redis, Command) :-
 1071    redis_current_command(Redis, Command, _).
 1072
 1073redis_current_command(Redis, Command, Properties) :-
 1074    nonvar(Command),
 1075    !,
 1076    redis(Redis, command(info, Command), [[_|Properties]]).
 1077redis_current_command(Redis, Command, Properties) :-
 1078    redis(Redis, command, Commands),
 1079    member([Name|Properties], Commands),
 1080    atom_string(Command, Name).
 1081
 1082%!  redis_property(+Redis, ?Property) is nondet.
 1083%
 1084%   True if Property is a property of   the Redis server. Currently uses
 1085%   redis(info, String) and parses the result.   As  this is for machine
 1086%   usage, properties names *_human are skipped.
 1087
 1088redis_property(Redis, Property) :-
 1089    redis(Redis, info, String),
 1090    info_terms(String, Terms),
 1091    member(Property, Terms).
 1092
 1093info_terms(Info, Pairs) :-
 1094    split_string(Info, "\n", "\r\n ", Lines),
 1095    convlist(info_line_term, Lines, Pairs).
 1096
 1097info_line_term(Line, Term) :-
 1098    sub_string(Line, B, _, A, :),
 1099    !,
 1100    sub_atom(Line, 0, B, _, Name),
 1101    \+ sub_atom(Name, _, _, 0, '_human'),
 1102    sub_string(Line, _, A, 0, ValueS),
 1103    (   number_string(Value, ValueS)
 1104    ->  true
 1105    ;   Value = ValueS
 1106    ),
 1107    Term =.. [Name,Value].
 1108
 1109
 1110		 /*******************************
 1111		 *            SUBSCRIBE		*
 1112		 *******************************/
 1113
 1114%!  redis_subscribe(+Redis, +Channels, -Id, +Options) is det.
 1115%
 1116%   Subscribe to one or more  Redis   PUB/SUB  channels.  This predicate
 1117%   creates a thread using thread_create/3 with  the given Options. Once
 1118%   running, the thread listens for messages.   The message content is a
 1119%   string or Prolog term  as  described   in  redis/3.  On  receiving a
 1120%   message, the following message is broadcasted:
 1121%
 1122%       redis(Id, Channel, Data)
 1123%
 1124%   If redis_unsubscribe/2 removes the  last   subscription,  the thread
 1125%   terminates.
 1126%
 1127%   To simply print the incomming messages use e.g.
 1128%
 1129%       ?- listen(redis(_, Channel, Data),
 1130%                 format('Channel ~p got ~p~n', [Channel,Data])).
 1131%       true.
 1132%       ?- redis_subscribe(default, test, Id, []).
 1133%       Id = redis_pubsub_3,
 1134%       ?- redis(publish(test, "Hello world")).
 1135%       Channel test got "Hello world"
 1136%       1
 1137%       true.
 1138%
 1139%   @arg Id is the thread identifier of  the listening thread. Note that
 1140%   the Options alias(Name) can be used to get a system wide name.
 1141
 1142:- dynamic ( subscription/2,            % Id, Channel
 1143             listening/3                % Id, Connection, Thread
 1144           ) as volatile. 1145
 1146redis_subscribe(Redis, Spec, Id, Options) :-
 1147    atom(Redis),
 1148    !,
 1149    channels(Spec, Channels),
 1150    pubsub_thread_options(ThreadOptions, Options),
 1151    thread_create(setup_call_cleanup(
 1152                      redis_connect(Redis, Conn, [reconnect(true)]),
 1153                      redis_subscribe1(Redis, Conn, Channels),
 1154                      redis_disconnect(Conn)),
 1155                  Thread,
 1156                  ThreadOptions),
 1157    pubsub_id(Thread, Id).
 1158redis_subscribe(Redis, Spec, Id, Options) :-
 1159    channels(Spec, Channels),
 1160    pubsub_thread_options(ThreadOptions, Options),
 1161    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1162                  Thread,
 1163                  ThreadOptions),
 1164    pubsub_id(Thread, Id).
 1165
 1166pubsub_thread_options(ThreadOptions, Options) :-
 1167    merge_options(Options, [detached(true)], ThreadOptions).
 1168
 1169pubsub_id(Thread, Thread).
 1170%pubsub_id(Thread, Id) :-
 1171%    thread_property(Thread, id(TID)),
 1172%    atom_concat('redis_pubsub_', TID, Id).
 1173
 1174redis_subscribe1(Redis, Conn, Channels) :-
 1175    Error = error(Formal, _),
 1176    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1177    (   var(Formal)
 1178    ->  true
 1179    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1180        thread_self(Me),
 1181        pubsub_id(Me, Id),
 1182        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1183        redis_subscribe1(Redis, Conn, CurrentChannels)
 1184    ).
 1185
 1186redis_subscribe2(Redis, Conn, Channels) :-
 1187    redis_subscribe3(Conn, Channels),
 1188    redis_listen(Redis, Conn).
 1189
 1190redis_subscribe3(Conn, Channels) :-
 1191    thread_self(Me),
 1192    pubsub_id(Me, Id),
 1193    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1194    maplist(register_subscription(Id), Channels),
 1195    redis_stream(Conn, S, true),
 1196    Req =.. [subscribe|Channels],
 1197    redis_write_msg(S, Req).
 1198
 1199pubsub_clean(Id) :-
 1200    retractall(listening(Id, _Connection, _Thread)),
 1201    retractall(subscription(Id, _Channel)).
 1202
 1203%!  redis_subscribe(+Id, +Channels) is det.
 1204%!  redis_unsubscribe(+Id, +Channels) is det.
 1205%
 1206%   Add/remove channels from for the   subscription. If no subscriptions
 1207%   remain, the listening thread terminates.
 1208%
 1209%   @arg Channels is either a single  channel   or  a list thereof. Each
 1210%   channel specification is either an atom   or a term `A:B:...`, where
 1211%   all parts are atoms.
 1212
 1213redis_subscribe(Id, Spec) :-
 1214    channels(Spec, Channels),
 1215    (   listening(Id, Connection, _Thread)
 1216    ->  true
 1217    ;   existence_error(redis_pubsub, Id)
 1218    ),
 1219    maplist(register_subscription(Id), Channels),
 1220    redis_stream(Connection, S, true),
 1221    Req =.. [subscribe|Channels],
 1222    redis_write_msg(S, Req).
 1223
 1224redis_unsubscribe(Id, Spec) :-
 1225    channels(Spec, Channels),
 1226    (   listening(Id, Connection, _Thread)
 1227    ->  true
 1228    ;   existence_error(redis_pubsub, Id)
 1229    ),
 1230    maplist(unregister_subscription(Id), Channels),
 1231    redis_stream(Connection, S, true),
 1232    Req =.. [unsubscribe|Channels],
 1233    redis_write_msg(S, Req).
 1234
 1235%!  redis_current_subscription(?Id, ?Channels)
 1236%
 1237%   True when a PUB/SUB subscription with Id is listening on Channels.
 1238
 1239redis_current_subscription(Id, Channels) :-
 1240    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1241    keysort(Pairs, Sorted),
 1242    group_pairs_by_key(Sorted, Grouped),
 1243    member(Id-Channels, Grouped).
 1244
 1245channels(Spec, List) :-
 1246    is_list(Spec),
 1247    !,
 1248    maplist(channel_name, Spec, List).
 1249channels(Ch, [Key]) :-
 1250    channel_name(Ch, Key).
 1251
 1252channel_name(Atom, Atom) :-
 1253    atom(Atom),
 1254    !.
 1255channel_name(Key, Atom) :-
 1256    phrase(key_parts(Key), Parts),
 1257    !,
 1258    atomic_list_concat(Parts, :, Atom).
 1259channel_name(Key, _) :-
 1260    type_error(redis_key, Key).
 1261
 1262key_parts(Var) -->
 1263    { var(Var), !, fail }.
 1264key_parts(Atom) -->
 1265    { atom(Atom) },
 1266    !,
 1267    [Atom].
 1268key_parts(A:B) -->
 1269    key_parts(A),
 1270    key_parts(B).
 1271
 1272
 1273
 1274
 1275register_subscription(Id, Channel) :-
 1276    (   subscription(Id, Channel)
 1277    ->  true
 1278    ;   assertz(subscription(Id, Channel))
 1279    ).
 1280
 1281unregister_subscription(Id, Channel) :-
 1282    retractall(subscription(Id, Channel)).
 1283
 1284redis_listen(Redis, Conn) :-
 1285    thread_self(Me),
 1286    pubsub_id(Me, Id),
 1287    setup_call_cleanup(
 1288        assertz(listening(Id, Conn, Me), Ref),
 1289        redis_listen_loop(Redis, Id, Conn),
 1290        erase(Ref)).
 1291
 1292redis_listen_loop(Redis, Id, Conn) :-
 1293    redis_stream(Conn, S, true),
 1294    (   subscription(Id, _)
 1295    ->  redis_read_stream(Redis, S, Reply),
 1296        redis_broadcast(Redis, Reply),
 1297        redis_listen_loop(Redis, Id, Conn)
 1298    ;   true
 1299    ).
 1300
 1301redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1302    !.
 1303redis_broadcast(Redis, [message, Channel, Data]) :-
 1304    !,
 1305    catch(broadcast(redis(Redis, Channel, Data)),
 1306          Error,
 1307          print_message(error, Error)).
 1308redis_broadcast(Redis, Message) :-
 1309    assertion((Message = [Type, Channel, _Data],
 1310               atom(Type),
 1311               atom(Channel))),
 1312    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1313          [Redis,Message]).
 1314
 1315
 1316		 /*******************************
 1317		 *          READ/WRITE		*
 1318		 *******************************/
 1319
 1320%!  redis_read_stream(+Redis, +Stream, -Term) is det.
 1321%
 1322%   Read a message from a Redis stream.  Term is one of
 1323%
 1324%     - A list of terms (array)
 1325%     - A list of pairs (map, RESP3 only)
 1326%     - The atom `nil`
 1327%     - A number
 1328%     - A term status(String)
 1329%     - A string
 1330%     - A boolean (`true` or `false`).  RESP3 only.
 1331%
 1332%   If something goes wrong, the connection   is closed and an exception
 1333%   is raised.
 1334
 1335redis_read_stream(Redis, SI, Out) :-
 1336    E = error(Formal,_),
 1337    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1338    (   var(Formal)
 1339    ->  handle_push_messages(Push, Redis),
 1340        (   var(Error)
 1341        ->  Out = Out0
 1342        ;   resync(Redis),
 1343            throw(Error)
 1344        )
 1345    ;   redis_disconnect(Redis, [force(true)]),
 1346        throw(E)
 1347    ).
 1348
 1349handle_push_messages([], _).
 1350handle_push_messages([H|T], Redis) :-
 1351    (   catch(handle_push_message(H, Redis), E,
 1352              print_message(warning, E))
 1353    ->  true
 1354    ;   true
 1355    ),
 1356    handle_push_messages(T, Redis).
 1357
 1358handle_push_message(["pubsub"|List], Redis) :-
 1359    redis_broadcast(Redis, List).
 1360% some protocol version 3 push messages (such as
 1361% __keyspace@* events) seem to come directly
 1362% without a pubsub header
 1363handle_push_message([message|List], Redis) :-
 1364    redis_broadcast(Redis, [message|List]).
 1365
 1366
 1367%!  resync(+Redis) is det.
 1368%
 1369%   Re-synchronize  after  an  error.  This  may  happen  if  some  type
 1370%   conversion fails and we have read  a   partial  reply. It is hard to
 1371%   figure out what to read from where we are, so we echo a random magic
 1372%   sequence and read until we find the reply.
 1373
 1374resync(Redis) :-
 1375    E = error(Formal,_),
 1376    catch(do_resync(Redis), E, true),
 1377    (   var(Formal)
 1378    ->  true
 1379    ;   redis_disconnect(Redis, [force(true)])
 1380    ).
 1381
 1382do_resync(Redis) :-
 1383    A is random(1_000_000_000),
 1384    redis_stream(Redis, S, true),
 1385    redis_write_msg(S, echo(A)),
 1386    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1387          time_limit_exceeded,
 1388          throw(error(time_limit_exceeded,_))).
 1389
 1390
 1391%!  redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det.
 1392%!  redis_write_msg(+Stream, +Message) is det.
 1393%
 1394%   Read/write a Redis message. Both these predicates are in the foreign
 1395%   module `redis4pl`.
 1396%
 1397%   @arg PushMessages is a list of push   messages that may be non-[] if
 1398%   protocol version 3 (see redis_connect/3) is selected. Using protocol
 1399%   version 2 this list is always empty.
 1400
 1401
 1402
 1403		 /*******************************
 1404		 *            MESSAGES		*
 1405		 *******************************/
 1406
 1407:- multifile
 1408    prolog:error_message//1,
 1409    prolog:message//1. 1410
 1411prolog:error_message(redis_error(Code, String)) -->
 1412    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1413
 1414prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1415    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1416    [ '    '-[] ], '$messages':translate_message(Error)