View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker and Sean Charles
    4    E-mail:        jan@swi-prolog.org and <sean at objitsu dot com>
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2013-2022, Sean Charles
    7                              SWI-Prolog Solutions b.v.
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34
   35    NOTE
   36
   37    The original code was subject to the MIT licence and written by
   38    Sean Charles.  Re-licenced to standard SWI-Prolog BSD-2 with
   39    permission from Sean Charles.
   40*/
   41
   42:- module(redis,
   43          [ redis_server/3,             % +Alias, +Address, +Options
   44            redis_connect/1,            % -Connection
   45            redis_connect/3,            % -Connection, +Host, +Port
   46            redis_disconnect/1,         % +Connection
   47            redis_disconnect/2,         % +Connection, +Options
   48                                        % Queries
   49            redis/1,                    % +Request
   50            redis/2,                    % +Connection, +Request
   51            redis/3,                    % +Connection, +Request, -Reply
   52                                        % High level queries
   53            redis_get_list/3,           % +Redis, +Key, -List
   54            redis_get_list/4,           % +Redis, +Key, +ChunkSize, -List
   55            redis_set_list/3,           % +Redis, +Key, +List
   56            redis_get_hash/3,           % +Redis, +Key, -Data:dict
   57            redis_set_hash/3,           % +Redis, +Key, +Data:dict
   58            redis_scan/3,               % +Redis, -LazyList, +Options
   59            redis_sscan/4,              % +Redis, +Set, -LazyList, +Options
   60            redis_hscan/4,              % +Redis, +Hash, -LazyList, +Options
   61            redis_zscan/4,              % +Redis, +Set, -LazyList, +Options
   62                                        % Publish/Subscribe
   63            redis_subscribe/4,          % +Redis, +Channels, -Id, +Options
   64            redis_subscribe/2,          % +Id, +Channels
   65            redis_unsubscribe/2,        % +Id, +Channels
   66            redis_current_subscription/2, % ?Id,?Channels
   67            redis_write/2,              % +Redis, +Command
   68            redis_read/2,               % +Redis, -Reply
   69                                        % Building blocks
   70            redis_array_dict/3,         % ?Array, ?Tag, ?Dict
   71                                        % Admin stuff
   72            redis_property/2,           % +Reply, ?Property
   73            redis_current_command/2,    % +Redis,?Command
   74            redis_current_command/3     % +Redis, +Command, -Properties
   75          ]).   76:- autoload(library(socket), [tcp_connect/3]).   77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]).   78:- autoload(library(broadcast), [broadcast/1]).   79:- autoload(library(error),
   80            [ must_be/2,
   81	      type_error/2,
   82              instantiation_error/1,
   83              uninstantiation_error/1,
   84              existence_error/2,
   85              existence_error/3
   86            ]).   87:- autoload(library(lazy_lists), [lazy_list/2]).   88:- autoload(library(lists), [append/3, member/2]).   89:- autoload(library(option), [merge_options/3, option/2,
   90			      option/3, select_option/4]).   91:- autoload(library(pairs), [group_pairs_by_key/2]).   92:- autoload(library(time), [call_with_time_limit/2]).   93:- use_module(library(debug), [debug/3, assertion/1]).   94:- use_module(library(settings), [setting/4, setting/2]).   95:- if(exists_source(library(ssl))).   96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]).   97:- endif.   98
   99:- use_foreign_library(foreign(redis4pl)).  100
  101:- setting(max_retry_count, nonneg, 8640, % one day
  102           "Max number of retries").  103:- setting(max_retry_wait, number, 10,
  104           "Max time to wait between recovery attempts").  105:- setting(sentinel_timeout, number, 0.2,
  106	   "Time to wait for a sentinel").  107
  108:- predicate_options(redis_server/3, 3,
  109                     [ pass_to(redis:redis_connect/3, 3)
  110                     ]).  111:- predicate_options(redis_connect/3, 3,
  112                     [ reconnect(boolean),
  113                       user(atom),
  114                       password(atomic),
  115                       version(between(2,3))
  116                     ]).  117:- predicate_options(redis_disconnect/2, 2,
  118                     [ force(boolean)
  119                     ]).  120:- predicate_options(redis_scan/3, 3,
  121                     [ match(atomic),
  122                       count(nonneg),
  123                       type(atom)
  124                     ]).  125% Actually not passing, but the same
  126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).  128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).

Redis client

This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.

In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:

?- redis_server(default, redis:6379, [password("secret")]).
?- redis(default, set(user, "Bob")).
?- redis(default, get(user), User).
User = "Bob"

*/

  149:- dynamic server/3.  150
  151:- dynamic ( connection/2,              % ServerName, Stream
  152	     sentinel/2			% Pool, Address
  153           ) as volatile.
 redis_server(+ServerName, +Address, +Options) is det
Register a redis server without connecting to it. The ServerName acts as a lazy connection alias. Initially the ServerName default points at localhost:6379 with no connect options. The default server is used for redis/1 and redis/2 and may be changed using this predicate. Options are described with redis_connect/3.

Connections established this way are by default automatically reconnected if the connection is lost for some reason unless a reconnect(false) option is specified.

  167redis_server(Alias, Address, Options) :-
  168    must_be(ground, Alias),
  169    retractall(server(Alias, _, _)),
  170    asserta(server(Alias, Address, Options)).
  171
  172server(default, localhost:6379, []).
 redis_connect(-Connection) is det
 redis_connect(+Address, -Connection, +Options) is det
redis_connect(-Connection, +Host, +Port) is det
Connect to a redis server. The main mode is redis_connect(+Address, -Connection, +Options). redis_connect/1 is equivalent to redis_connect(localhost:6379, Connection, []). Options:
reconnect(+Boolean)
If true, try to reconnect to the service when the connection seems lost. Default is true for connections specified using redis_server/3 and false for explictly opened connections.
user(+User)
If version(3) and password(Password) are specified, these are used to authenticate using the HELLO command.
password(+Password)
Authenticate using Password
version(+Version)
Specify the connection protocol version. Initially this is version 2. Redis 6 also supports version 3. When specified as 3, the HELLO command is used to upgrade the protocol.
tls(true)
When specified, initiate a TLS connection. If this option is specified we must also specify the cacert, key and cert options.
cacert(+File)
CA Certificate file to verify with.
cert(+File)
Client certificate to authenticate with.
key(+File)
Private key file to authenticate with.
sentinels(+ListOfAddresses)
Used together with an Address of the form sentinel(MasterName) to enable contacting a network of Redis servers guarded by a sentinel network.
sentinel_user(+User)
sentinel_password(+Password)
Authentication information for the senitels. When omitted we try to connect withour authentication.

Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.

Arguments:
Address- is a term Host:Port, unix(File) or the name of a server registered using redis_server/3. The latter realises a new connection that is typically used for blocking redis commands such as listening for published messages, waiting on a list or stream.
Compatibility
- redis_connect(-Connection, +Host, +Port) provides compatibility to the original GNU-Prolog interface and is equivalent to redis_connect(Host:Port, Connection, []).
  228redis_connect(Conn) :-
  229    redis_connect(default, Conn, []).
  230
  231redis_connect(Conn, Host, Port) :-
  232    var(Conn),
  233    ground(Host), ground(Port),
  234    !,                                  % GNU-Prolog compatibility
  235    redis_connect(Host:Port, Conn, []).
  236redis_connect(Server, Conn, Options) :-
  237    atom(Server),
  238    !,
  239    (   server(Server, Address, DefaultOptions)
  240    ->  merge_options(Options, DefaultOptions, Options2),
  241        do_connect(Server, Address, Conn, [address(Address)|Options2])
  242    ;   existence_error(redis_server, Server)
  243    ).
  244redis_connect(Address, Conn, Options) :-
  245    do_connect(Address, Address, Conn, [address(Address)|Options]).
 do_connect(+Id, +Address, -Conn, +Options)
Open the connection. A connection is a compound term of the shape
redis_connection(Id, Stream, Failures, Options)
  253do_connect(Id, sentinel(Pool), Conn, Options) =>
  254    sentinel_master(Id, Pool, Conn, Options).
  255do_connect(Id, Address0, Conn, Options) =>
  256    tcp_address(Address0, Address),
  257    tcp_connect(Address, Stream0, Options),
  258    tls_upgrade(Address, Stream0, Stream, Options),
  259    Conn = redis_connection(Id, Stream, 0, Options),
  260    hello(Conn, Options).
  261
  262tcp_address(unix(Path), Path) :-
  263    !.                                  % Using an atom is ambiguous
  264tcp_address(Address, Address).
 tls_upgrade(+Address, +Raw, -Stream, +Options) is det
Upgrade to a TLS connection when tls(true) is specified.
  270:- if(current_predicate(ssl_context/3)).  271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
  272    option(tls(true), Options),
  273    !,
  274    must_have_option(cacert(CacertFile), Options),
  275    must_have_option(key(KeyFile), Options),
  276    must_have_option(cert(CertFile), Options),
  277    ssl_context(client, SSL,
  278		[ host(Host),
  279		  certificate_file(CertFile),
  280		  key_file(KeyFile),
  281		  cacerts([file(CacertFile)]),
  282		  cert_verify_hook(tls_verify),
  283		  close_parent(true)
  284		]),
  285    stream_pair(Raw, RawRead, RawWrite),
  286    ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
  287    stream_pair(Stream, Read, Write).
  288:- endif.  289tls_upgrade(_, Stream, Stream, _).
  290
  291:- if(current_predicate(ssl_context/3)).
 tls_verify(+SSL, +ProblemCert, +AllCerts, +FirstCert, +Status) is semidet
Accept or reject the certificate verification. Similar to the Redis command line client (redis-cli), we accept the certificate as long as it is signed, not verifying the hostname.
  299:- public tls_verify/5.  300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
  301    !.
  302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
  303    !.
  304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
  305    fail.
  306
  307:- endif.
 sentinel_master(+ServerId, +SetinelPool, -Connection, +Options) is det
Discover the master and connect to it.
  313sentinel_master(Id, Pool, Master, Options) :-
  314    must_have_option(sentinels(Sentinels), Options),
  315    sentinel_auth(Options, Options1),
  316    setting(sentinel_timeout, TMO),
  317    (   sentinel(Pool, Sentinel)
  318    ;   member(Sentinel, Sentinels)
  319    ),
  320    catch(call_with_time_limit(
  321	      TMO,
  322	      do_connect(Id, Sentinel, Conn,
  323			 [sentinel(true)|Options1])),
  324	  Error,
  325	  (print_message(warning, Error),fail)),
  326    !,
  327    debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
  328    call_cleanup(
  329	query_sentinel(Pool, Conn, Sentinel, MasterAddr),
  330	redis_disconnect(Conn)),
  331    debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
  332    do_connect(Id, MasterAddr, Master, Options),
  333    debug(redis(sentinel), 'Connected to claimed master', []),
  334    redis(Master, role, Role),
  335    (   Role = [master|_Slaves]
  336    ->  debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
  337    ;   redis_disconnect(Master),
  338	debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
  339	sleep(TMO),
  340	sentinel_master(Id, Pool, Master, Options)
  341    ).
  342
  343sentinel_auth(Options0, Options) :-
  344    option(sentinel_user(User), Options0),
  345    option(sentinel_password(Passwd), Options0),
  346    !,
  347    merge_options([user(User), password(Passwd)], Options0, Options).
  348sentinel_auth(Options0, Options) :-
  349    select_option(password(_), Options0, Options, _).
  350
  351
  352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
  353    redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
  354    MasterData = [Host,Port],
  355    redis(Conn, sentinel(sentinels, Pool), Peers),
  356    transaction(update_known_sentinels(Pool, Sentinel, Peers)).
  357
  358update_known_sentinels(Pool, Sentinel, Peers) :-
  359    retractall(sentinel(Pool, _)),
  360    maplist(update_peer_sentinel(Pool), Peers),
  361    asserta(sentinel(Pool, Sentinel)).
  362
  363update_peer_sentinel(Pool, Attrs),
  364  memberchk(ip-Host, Attrs),
  365  memberchk(port-Port, Attrs) =>
  366    asserta(sentinel(Pool, Host:Port)).
  367
  368must_have_option(Opt, Options) :-
  369    option(Opt, Options),
  370    !.
  371must_have_option(Opt, Options) :-
  372    existence_error(option, Opt, Options).
 hello(+Connection, +Option)
Initialize the connection. This is used to upgrade to the RESP3 protocol and/or to authenticate.
  379hello(Con, Options) :-
  380    option(version(V), Options),
  381    V >= 3,
  382    !,
  383    (   option(user(User), Options),
  384        option(password(Password), Options)
  385    ->  redis(Con, hello(3, auth, User, Password))
  386    ;   redis(Con, hello(3))
  387    ).
  388hello(Con, Options) :-
  389    option(password(Password), Options),
  390    !,
  391    redis(Con, auth(Password)).
  392hello(_, _).
 redis_stream(+Spec, --Stream, +DoConnect) is det
Get the stream to a Redis server from Spec. Spec is either the name of a registered server or a term redis_connection(Id,Stream,Failures,Options). If the stream is disconnected it will be reconnected.
  401redis_stream(Var, S, _) :-
  402    (   var(Var)
  403    ->  !, instantiation_error(Var)
  404    ;   nonvar(S)
  405    ->  !, uninstantiation_error(S)
  406    ).
  407redis_stream(ServerName, S, Connect) :-
  408    atom(ServerName),
  409    !,
  410    (   connection(ServerName, S0)
  411    ->  S = S0
  412    ;   Connect == true,
  413        server(ServerName, Address, Options)
  414    ->  redis_connect(Address, Connection, Options),
  415        redis_stream(Connection, S, false),
  416        asserta(connection(ServerName, S))
  417    ;   existence_error(redis_server, ServerName)
  418    ).
  419redis_stream(redis_connection(_,S0,_,_), S, _) :-
  420    S0 \== (-),
  421    !,
  422    S = S0.
  423redis_stream(Redis, S, _) :-
  424    Redis = redis_connection(Id,-,_,Options),
  425    option(address(Address), Options),
  426    do_connect(Id,Address,Redis2,Options),
  427    arg(2, Redis2, S0),
  428    nb_setarg(2, Redis, S0),
  429    S = S0.
  430
  431has_redis_stream(Var, _) :-
  432    var(Var),
  433    !,
  434    instantiation_error(Var).
  435has_redis_stream(Alias, S) :-
  436    atom(Alias),
  437    !,
  438    connection(Alias, S).
  439has_redis_stream(redis_connection(_,S,_,_), S) :-
  440    S \== (-).
 redis_disconnect(+Connection) is det
 redis_disconnect(+Connection, +Options) is det
Disconnect from a redis server. The second form takes one option, similar to close/2:
force(Force)
When true (default false), do not raise any errors if Connection does not exist or closing the connection raises a network or I/O related exception. This version is used internally if a connection is in a broken state, either due to a protocol error or a network issue.
  456redis_disconnect(Redis) :-
  457    redis_disconnect(Redis, []).
  458
  459redis_disconnect(Redis, Options) :-
  460    option(force(true), Options),
  461    !,
  462    (   Redis = redis_connection(_Id, S, _, _Opts)
  463    ->  (   S == (-)
  464        ->  true
  465        ;   close(S, [force(true)]),
  466            nb_setarg(2, Redis, -)
  467        )
  468    ;   has_redis_stream(Redis, S)
  469    ->  close(S, [force(true)]),
  470        retractall(connection(_,S))
  471    ;   true
  472    ).
  473redis_disconnect(Redis, _Options) :-
  474    redis_stream(Redis, S, false),
  475    close(S),
  476    retractall(connection(_,S)).
 redis(+Connection, +Request) is semidet
This predicate is overloaded to handle two types of requests. First, it is a shorthand for redis(Connection, Command, _) and second, it can be used to exploit Redis pipelines and transactions. The second form is acticated if Request is a list. In that case, each element of the list is either a term Command -> Reply or a simple Command. Semantically this represents a sequence of redis/3 and redis/2 calls. It differs in the following aspects:

Procedurally, the process takes the following steps:

  1. Send all commands
  2. Read all replies and push messages
  3. Handle all callbacks from push messages
  4. Check whether one of the replies is an error. If so, raise this error (subsequent errors are lost)
  5. Bind all replies for the Command -> Reply terms.

Examples

?- redis(default,
         [ lpush(li,1),
           lpush(li,2),
           lrange(li,0,-1) -> List
         ]).
List = ["2", "1"].
  520redis(Redis, PipeLine) :-
  521    is_list(PipeLine),
  522    !,
  523    redis_pipeline(Redis, PipeLine).
  524redis(Redis, Req) :-
  525    redis(Redis, Req, _).
 redis(+Connection, +Command, -Reply) is semidet
Execute a redis Command on Connnection. Next, bind Reply to the returned result. Command is a callable term whose functor is the name of the Redis command and whose arguments are translated to Redis arguments according to the rules below. Note that all text is always represented using UTF-8 encoding.

Reply is either a plain term (often a variable) or a term Value as Type. In the latter form, Type dictates how the Redis bulk reply is translated to Prolog. The default equals to auto, i.e., as a number of the content satisfies the Prolog number syntax and as an atom otherwise.

Redis bulk replies are translated depending on the as Type as explained above.

string
string(Encoding)
Create a SWI-Prolog string object interpreting the blob as following Encoding. Encoding is a restricted set of SWI-Prolog's encodings: bytes (iso_latin_1), utf8 and text (the current locale translation).
atom
atom(Encoding)
As above, producing an atom.
codes
codes(Encoding)
As above, producing a list of integers (Unicode code points)
chars
chars(Encoding)
As above, producing a list of one-character atoms.
integer
float
rational
number
Interpret the bytes as a string representing a number. If the string does not represent a number of the requested type a type_error(Type, String) is raised.
tagged_integer
Same as integer, but demands the value to be between the Prolog flags min_tagged_integer and max_tagged_integer, allowing the value to be used as a dict key.
auto
Same as auto(atom, number)
auto(AsText, AsNumber)
If the bulk string confirms the syntax of AsNumber, convert the value to the requested numberical type. Else convert the value to text according to AsText. This is similar to the Prolog predicate name/2.
dict_key
Alias for auto(atom,tagged_integer). This allows the value to be used as a key for a SWI-Prolog dict.
pairs(AsKey, AsValue)
Convert a map or array of even length into pairs for which the key satisfies AsKey and the value AsValue. The pairs type can also be applied to a Redis array. In this case the array length must be even. This notably allows fetching a Redis hash as pairs using HGETALL using version 2 of the Redis protocol.
dict(AsKey, AsValue)
Similar to pairs(AsKey, AsValue), but convert the resulting pair list into a SWI-Prolog dict. AsKey must convert to a valid dict key, i.e., an atom or tagged integer. See dict_key.
dict(AsValue)
Shorthand for dict(dict_key, AsValue).

Here are some simple examples

?- redis(default, set(a, 42), X).
X = status("OK").
?- redis(default, get(a), X).
X = "42".
?- redis(default, get(a), X as integer).
X = 42.
?- redis(default, get(a), X as float).
X = 42.0.
?- redis(default, set(swipl:version, 8)).
true.
?- redis(default, incr(swipl:version), X).
X = 9.
Errors
- redis_error(Code, String)
  645redis(Redis, Req, Out) :-
  646    out_val(Out, Val),
  647    redis1(Redis, Req, Out),
  648    Val \== nil.
  649
  650out_val(Out, Val) :-
  651    (   nonvar(Out),
  652        Out = (Val as _)
  653    ->  true
  654    ;   Val = Out
  655    ).
  656
  657redis1(Redis, Req, Out) :-
  658    Error = error(Formal, _),
  659    catch(redis2(Redis, Req, Out), Error, true),
  660    (   var(Formal)
  661    ->  true
  662    ;   recover(Error, Redis, redis1(Redis, Req, Out))
  663    ).
  664
  665redis2(Redis, Req, Out) :-
  666    atom(Redis),
  667    !,
  668    redis_stream(Redis, S, true),
  669    with_mutex(Redis,
  670               ( redis_write_msg(S, Req),
  671                 redis_read_stream(Redis, S, Out)
  672               )).
  673redis2(Redis, Req, Out) :-
  674    redis_stream(Redis, S, true),
  675    redis_write_msg(S, Req),
  676    redis_read_stream(Redis, S, Out).
 redis_pipeline(+Redis, +PipeLine)
  680redis_pipeline(Redis, PipeLine) :-
  681    Error = error(Formal, _),
  682    catch(redis_pipeline2(Redis, PipeLine), Error, true),
  683    (   var(Formal)
  684    ->  true
  685    ;   recover(Error, Redis, redis_pipeline(Redis, PipeLine))
  686    ).
  687
  688redis_pipeline2(Redis, PipeLine) :-
  689    atom(Redis),
  690    !,
  691    redis_stream(Redis, S, true),
  692    with_mutex(Redis,
  693               redis_pipeline3(Redis, S, PipeLine)).
  694redis_pipeline2(Redis, PipeLine) :-
  695    redis_stream(Redis, S, true),
  696    redis_pipeline3(Redis, S, PipeLine).
  697
  698redis_pipeline3(Redis, S, PipeLine) :-
  699    maplist(write_pipeline(S), PipeLine),
  700    flush_output(S),
  701    read_pipeline(Redis, S, PipeLine).
  702
  703write_pipeline(S, Command -> _Reply) :-
  704    !,
  705    redis_write_msg_no_flush(S, Command).
  706write_pipeline(S, Command) :-
  707    redis_write_msg_no_flush(S, Command).
  708
  709read_pipeline(Redis, S, PipeLine) :-
  710    E = error(Formal,_),
  711    catch(read_pipeline2(Redis, S, PipeLine), E, true),
  712    (   var(Formal)
  713    ->  true
  714    ;   reconnect_error(E)
  715    ->  redis_disconnect(Redis, [force(true)]),
  716        throw(E)
  717    ;   resync(Redis),
  718        throw(E)
  719    ).
  720
  721read_pipeline2(Redis, S, PipeLine) :-
  722    maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
  723    maplist(handle_push(Redis), Pushed),
  724    maplist(handle_error, Errors),
  725    maplist(bind_reply, PipeLine, Replies).
  726
  727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
  728    !,
  729    redis_read_msg(S, ReplyIn, Reply, Error, Push).
  730redis_read_msg3(S, Var, Reply, Error, Push) :-
  731    redis_read_msg(S, Var, Reply, Error, Push).
  732
  733handle_push(Redis, Pushed) :-
  734    handle_push_messages(Pushed, Redis).
  735handle_error(Error) :-
  736    (   var(Error)
  737    ->  true
  738    ;   throw(Error)
  739    ).
  740bind_reply(_Command -> Reply0, Reply) :-
  741    !,
  742    Reply0 = Reply.
  743bind_reply(_Command, _).
 recover(+Error, +Redis, :Goal)
Error happened while running Goal on Redis. If this is a recoverable error (i.e., a network or disconnected peer), wait a little and try running Goal again.
  752:- meta_predicate recover(+, +, 0).  753
  754recover(Error, Redis, Goal) :-
  755    reconnect_error(Error),
  756    auto_reconnect(Redis),
  757    !,
  758    debug(redis(recover), '~p: got error ~p; trying to reconnect',
  759          [Redis, Error]),
  760    redis_disconnect(Redis, [force(true)]),
  761    (   wait_to_retry(Redis, Error)
  762    ->  call(Goal),
  763        retractall(failure(Redis, _))
  764    ;   throw(Error)
  765    ).
  766recover(Error, _, _) :-
  767    throw(Error).
  768
  769auto_reconnect(redis_connection(_,_,_,Options)) :-
  770    !,
  771    option(reconnect(true), Options).
  772auto_reconnect(Server) :-
  773    ground(Server),
  774    server(Server, _, Options),
  775    option(reconnect(true), Options, true).
  776
  777reconnect_error(error(io_error(_Action, _On),_)).
  778reconnect_error(error(socket_error(_Code, _),_)).
  779reconnect_error(error(syntax_error(unexpected_eof),_)).
 wait(+Redis, +Error)
Wait for some time after a failure. First we wait for 10ms. This is doubled on each failure upto the setting max_retry_wait. If the setting max_retry_count is exceeded we fail and the called signals an exception.
  788:- dynamic failure/2 as volatile.  789
  790wait_to_retry(Redis, Error) :-
  791    redis_failures(Redis, Failures),
  792    setting(max_retry_count, Count),
  793    Failures < Count,
  794    Failures2 is Failures+1,
  795    redis_set_failures(Redis, Failures2),
  796    setting(max_retry_wait, MaxWait),
  797    Wait is min(MaxWait*100, 1<<Failures)/100.0,
  798    debug(redis(recover), '  Sleeping ~p seconds', [Wait]),
  799    retry_message_level(Failures, Level),
  800    print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
  801    sleep(Wait).
  802
  803redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
  804    !,
  805    Failures = Failures0.
  806redis_failures(Server, Failures) :-
  807    atom(Server),
  808    (   failure(Server, Failures)
  809    ->  true
  810    ;   Failures = 0
  811    ).
  812
  813redis_set_failures(Connection, Count) :-
  814    compound(Connection),
  815    !,
  816    nb_setarg(3, Connection, Count).
  817redis_set_failures(Server, Count) :-
  818    atom(Server),
  819    retractall(failure(Server, _)),
  820    asserta(failure(Server, Count)).
  821
  822retry_message_level(0, warning) :- !.
  823retry_message_level(_, silent).
 redis(+Request)
Connect to the default redis server, call redist/3 using Request, disconnect and print the result. This predicate is intended for interactive usage.
  832redis(Req) :-
  833    setup_call_cleanup(
  834        redis_connect(default, C, []),
  835        redis1(C, Req, Out),
  836        redis_disconnect(C)),
  837    print(Out).
 redis_write(+Redis, +Command) is det
 redis_read(+Redis, -Reply) is det
Write command and read replies from a Redis server. These are building blocks for subscribing to event streams.
  845redis_write(Redis, Command) :-
  846    redis_stream(Redis, S, true),
  847    redis_write_msg(S, Command).
  848
  849redis_read(Redis, Reply) :-
  850    redis_stream(Redis, S, true),
  851    redis_read_stream(Redis, S, Reply).
  852
  853
  854		 /*******************************
  855		 *      HIGH LEVEL ACCESS	*
  856		 *******************************/
 redis_get_list(+Redis, +Key, -List) is det
 redis_get_list(+Redis, +Key, +ChunkSize, -List) is det
Get the content of a Redis list in List. If ChunkSize is given and smaller than the list length, List is returned as a lazy list. The actual values are requested using redis LRANGE requests. Note that this results in O(N^2) complexity. Using a lazy list is most useful for relatively short lists holding possibly large items.

Note that values retrieved are strings, unless the value was added using Term as prolog.

See also
- lazy_list/2 for a discussion on the difference between lazy lists and normal lists.
  873redis_get_list(Redis, Key, List) :-
  874    redis_get_list(Redis, Key, -1, List).
  875
  876redis_get_list(Redis, Key, Chunk, List) :-
  877    redis(Redis, llen(Key), Len),
  878    (   (   Chunk >= Len
  879        ;   Chunk == -1
  880        )
  881    ->  (   Len == 0
  882        ->  List = []
  883        ;   End is Len-1,
  884            list_range(Redis, Key, 0, End, List)
  885        )
  886    ;   lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
  887    ).
  888
  889rlist_next(State, List, Tail) :-
  890    State = s(Redis,Key,Offset,Slice,Len),
  891    End is min(Len-1, Offset+Slice-1),
  892    list_range(Redis, Key, Offset, End, Elems),
  893    (   End =:= Len-1
  894    ->  List = Elems,
  895        Tail = []
  896    ;   Offset2 is Offset+Slice,
  897        nb_setarg(3, State, Offset2),
  898        append(Elems, Tail, List)
  899    ).
  900
  901% Redis LRANGE demands End > Start and returns inclusive.
  902
  903list_range(DB, Key, Start, Start, [Elem]) :-
  904    !,
  905    redis(DB, lindex(Key, Start), Elem).
  906list_range(DB, Key, Start, End, List) :-
  907    !,
  908    redis(DB, lrange(Key, Start, End), List).
 redis_set_list(+Redis, +Key, +List) is det
Associate a Redis key with a list. As Redis has no concept of an empty list, if List is [], Key is deleted. Note that key values are always strings in Redis. The same conversion rules as for redis/1-3 apply.
  919redis_set_list(Redis, Key, List) :-
  920    redis(Redis, del(Key), _),
  921    (   List == []
  922    ->  true
  923    ;   Term =.. [rpush,Key|List],
  924        redis(Redis, Term, _Count)
  925    ).
 redis_get_hash(+Redis, +Key, -Data:dict) is det
 redis_set_hash(+Redis, +Key, +Data:dict) is det
Put/get a Redis hash as a Prolog dict. Putting a dict first deletes Key. Note that in many cases applications will manage Redis hashes by key. redis_get_hash/3 is notably a user friendly alternative to the Redis HGETALL command. If the Redis hash is not used by other (non-Prolog) applications one may also consider using the Term as prolog syntax to store the Prolog dict as-is.
  938redis_get_hash(Redis, Key, Dict) :-
  939    redis(Redis, hgetall(Key), Dict as dict(auto)).
  940
  941redis_set_hash(Redis, Key, Dict) :-
  942    redis_array_dict(Array, _, Dict),
  943    Term =.. [hset,Key|Array],
  944    redis(Redis, del(Key), _),
  945    redis(Redis, Term, _Count).
 redis_array_dict(?Array, ?Tag, ?Dict) is det
Translate a Redis reply representing hash data into a SWI-Prolog dict. Array is either a list of alternating keys and values or a list of pairs. When translating to an array, this is always a list of alternating keys and values.
Arguments:
Tag- is the SWI-Prolog dict tag.
  956redis_array_dict(Array, Tag, Dict) :-
  957    nonvar(Array),
  958    !,
  959    array_to_pairs(Array, Pairs),
  960    dict_pairs(Dict, Tag, Pairs).
  961redis_array_dict(TwoList, Tag, Dict) :-
  962    dict_pairs(Dict, Tag, Pairs),
  963    pairs_to_array(Pairs, TwoList).
  964
  965array_to_pairs([], []) :-
  966    !.
  967array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
  968    !,                                  % RESP3 returns a map as pairs.
  969    atom_string(Name, NameS),
  970    array_to_pairs(T0, T).
  971array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
  972    atom_string(Name, NameS),
  973    array_to_pairs(T0, T).
  974
  975pairs_to_array([], []) :-
  976    !.
  977pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
  978    atom_string(Name, NameS),
  979    pairs_to_array(T0, T).
 redis_scan(+Redis, -LazyList, +Options) is det
 redis_sscan(+Redis, +Set, -LazyList, +Options) is det
 redis_hscan(+Redis, +Hash, -LazyList, +Options) is det
 redis_zscan(+Redis, +Set, -LazyList, +Options) is det
Map the Redis SCAN, SSCAN, HSCAN and ZSCAN` commands into a lazy list. For redis_scan/3 and redis_sscan/4 the result is a list of strings. For redis_hscan/4 and redis_zscan/4, the result is a list of pairs. Options processed:
match(Pattern)
Adds the MATCH subcommand, only returning matches for Pattern.
count(Count)
Adds the COUNT subcommand, giving a hint to the size of the chunks fetched.
type(Type)
Adds the TYPE subcommand, only returning answers of the indicated type.
See also
- lazy_list/2.
 1003redis_scan(Redis, LazyList, Options) :-
 1004    scan_options([match,count,type], Options, Parms),
 1005    lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
 1006
 1007redis_sscan(Redis, Set, LazyList, Options) :-
 1008    scan_options([match,count,type], Options, Parms),
 1009    lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
 1010
 1011redis_hscan(Redis, Hash, LazyList, Options) :-
 1012    scan_options([match,count,type], Options, Parms),
 1013    lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
 1014
 1015redis_zscan(Redis, Set, LazyList, Options) :-
 1016    scan_options([match,count,type], Options, Parms),
 1017    lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
 1018
 1019scan_options([], _, []).
 1020scan_options([H|T0], Options, [H,V|T]) :-
 1021    Term =.. [H,V],
 1022    option(Term, Options),
 1023    !,
 1024    scan_options(T0, Options, T).
 1025scan_options([_|T0], Options, T) :-
 1026    scan_options(T0, Options, T).
 1027
 1028
 1029scan_next(State, List, Tail) :-
 1030    State = s(Command,Redis,Cursor,Params),
 1031    Command =.. CList,
 1032    append(CList, [Cursor|Params], CList2),
 1033    Term =.. CList2,
 1034    redis(Redis, Term, [NewCursor,Elems0]),
 1035    scan_pairs(Command, Elems0, Elems),
 1036    (   NewCursor == 0
 1037    ->  List = Elems,
 1038        Tail = []
 1039    ;   nb_setarg(3, State, NewCursor),
 1040        append(Elems, Tail, List)
 1041    ).
 1042
 1043scan_pairs(hscan(_), List, Pairs) :-
 1044    !,
 1045    scan_pairs(List, Pairs).
 1046scan_pairs(zscan(_), List, Pairs) :-
 1047    !,
 1048    scan_pairs(List, Pairs).
 1049scan_pairs(_, List, List).
 1050
 1051scan_pairs([], []).
 1052scan_pairs([Key,Value|T0], [Key-Value|T]) :-
 1053    !,
 1054    scan_pairs(T0, T).
 1055scan_pairs([Key-Value|T0], [Key-Value|T]) :-
 1056    scan_pairs(T0, T).
 1057
 1058
 1059		 /*******************************
 1060		 *              ABOUT		*
 1061		 *******************************/
 redis_current_command(+Redis, ?Command) is nondet
 redis_current_command(+Redis, ?Command, -Properties) is nondet
True when Command has Properties. Fails if Command is not defined. The redis_current_command/3 version returns the command argument specification. See Redis documentation for an explanation.
 1070redis_current_command(Redis, Command) :-
 1071    redis_current_command(Redis, Command, _).
 1072
 1073redis_current_command(Redis, Command, Properties) :-
 1074    nonvar(Command),
 1075    !,
 1076    redis(Redis, command(info, Command), [[_|Properties]]).
 1077redis_current_command(Redis, Command, Properties) :-
 1078    redis(Redis, command, Commands),
 1079    member([Name|Properties], Commands),
 1080    atom_string(Command, Name).
 redis_property(+Redis, ?Property) is nondet
True if Property is a property of the Redis server. Currently uses redis(info, String) and parses the result. As this is for machine usage, properties names *_human are skipped.
 1088redis_property(Redis, Property) :-
 1089    redis(Redis, info, String),
 1090    info_terms(String, Terms),
 1091    member(Property, Terms).
 1092
 1093info_terms(Info, Pairs) :-
 1094    split_string(Info, "\n", "\r\n ", Lines),
 1095    convlist(info_line_term, Lines, Pairs).
 1096
 1097info_line_term(Line, Term) :-
 1098    sub_string(Line, B, _, A, :),
 1099    !,
 1100    sub_atom(Line, 0, B, _, Name),
 1101    \+ sub_atom(Name, _, _, 0, '_human'),
 1102    sub_string(Line, _, A, 0, ValueS),
 1103    (   number_string(Value, ValueS)
 1104    ->  true
 1105    ;   Value = ValueS
 1106    ),
 1107    Term =.. [Name,Value].
 1108
 1109
 1110		 /*******************************
 1111		 *            SUBSCRIBE		*
 1112		 *******************************/
 redis_subscribe(+Redis, +Channels, -Id, +Options) is det
Subscribe to one or more Redis PUB/SUB channels. This predicate creates a thread using thread_create/3 with the given Options. Once running, the thread listens for messages. The message content is a string or Prolog term as described in redis/3. On receiving a message, the following message is broadcasted:
redis(Id, Channel, Data)

If redis_unsubscribe/2 removes the last subscription, the thread terminates.

To simply print the incomming messages use e.g.

?- listen(redis(_, Channel, Data),
          format('Channel ~p got ~p~n', [Channel,Data])).
true.
?- redis_subscribe(default, test, Id, []).
Id = redis_pubsub_3,
?- redis(publish(test, "Hello world")).
Channel test got "Hello world"
1
true.
Arguments:
Id- is the thread identifier of the listening thread. Note that the Options alias(Name) can be used to get a system wide name.
 1142:- dynamic ( subscription/2,            % Id, Channel
 1143             listening/3                % Id, Connection, Thread
 1144           ) as volatile. 1145
 1146redis_subscribe(Redis, Spec, Id, Options) :-
 1147    atom(Redis),
 1148    !,
 1149    channels(Spec, Channels),
 1150    pubsub_thread_options(ThreadOptions, Options),
 1151    thread_create(setup_call_cleanup(
 1152                      redis_connect(Redis, Conn, [reconnect(true)]),
 1153                      redis_subscribe1(Redis, Conn, Channels),
 1154                      redis_disconnect(Conn)),
 1155                  Thread,
 1156                  ThreadOptions),
 1157    pubsub_id(Thread, Id).
 1158redis_subscribe(Redis, Spec, Id, Options) :-
 1159    channels(Spec, Channels),
 1160    pubsub_thread_options(ThreadOptions, Options),
 1161    thread_create(redis_subscribe1(Redis, Redis, Channels),
 1162                  Thread,
 1163                  ThreadOptions),
 1164    pubsub_id(Thread, Id).
 1165
 1166pubsub_thread_options(ThreadOptions, Options) :-
 1167    merge_options(Options, [detached(true)], ThreadOptions).
 1168
 1169pubsub_id(Thread, Thread).
 1170%pubsub_id(Thread, Id) :-
 1171%    thread_property(Thread, id(TID)),
 1172%    atom_concat('redis_pubsub_', TID, Id).
 1173
 1174redis_subscribe1(Redis, Conn, Channels) :-
 1175    Error = error(Formal, _),
 1176    catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
 1177    (   var(Formal)
 1178    ->  true
 1179    ;   recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
 1180        thread_self(Me),
 1181        pubsub_id(Me, Id),
 1182        findall(Channel, subscription(Id, Channel), CurrentChannels),
 1183        redis_subscribe1(Redis, Conn, CurrentChannels)
 1184    ).
 1185
 1186redis_subscribe2(Redis, Conn, Channels) :-
 1187    redis_subscribe3(Conn, Channels),
 1188    redis_listen(Redis, Conn).
 1189
 1190redis_subscribe3(Conn, Channels) :-
 1191    thread_self(Me),
 1192    pubsub_id(Me, Id),
 1193    prolog_listen(this_thread_exit, pubsub_clean(Id)),
 1194    maplist(register_subscription(Id), Channels),
 1195    redis_stream(Conn, S, true),
 1196    Req =.. [subscribe|Channels],
 1197    redis_write_msg(S, Req).
 1198
 1199pubsub_clean(Id) :-
 1200    retractall(listening(Id, _Connection, _Thread)),
 1201    retractall(subscription(Id, _Channel)).
 redis_subscribe(+Id, +Channels) is det
 redis_unsubscribe(+Id, +Channels) is det
Add/remove channels from for the subscription. If no subscriptions remain, the listening thread terminates.
Arguments:
Channels- is either a single channel or a list thereof. Each channel specification is either an atom or a term `A:B:...`, where all parts are atoms.
 1213redis_subscribe(Id, Spec) :-
 1214    channels(Spec, Channels),
 1215    (   listening(Id, Connection, _Thread)
 1216    ->  true
 1217    ;   existence_error(redis_pubsub, Id)
 1218    ),
 1219    maplist(register_subscription(Id), Channels),
 1220    redis_stream(Connection, S, true),
 1221    Req =.. [subscribe|Channels],
 1222    redis_write_msg(S, Req).
 1223
 1224redis_unsubscribe(Id, Spec) :-
 1225    channels(Spec, Channels),
 1226    (   listening(Id, Connection, _Thread)
 1227    ->  true
 1228    ;   existence_error(redis_pubsub, Id)
 1229    ),
 1230    maplist(unregister_subscription(Id), Channels),
 1231    redis_stream(Connection, S, true),
 1232    Req =.. [unsubscribe|Channels],
 1233    redis_write_msg(S, Req).
 redis_current_subscription(?Id, ?Channels)
True when a PUB/SUB subscription with Id is listening on Channels.
 1239redis_current_subscription(Id, Channels) :-
 1240    findall(Id-Channel, subscription(Id, Channel), Pairs),
 1241    keysort(Pairs, Sorted),
 1242    group_pairs_by_key(Sorted, Grouped),
 1243    member(Id-Channels, Grouped).
 1244
 1245channels(Spec, List) :-
 1246    is_list(Spec),
 1247    !,
 1248    maplist(channel_name, Spec, List).
 1249channels(Ch, [Key]) :-
 1250    channel_name(Ch, Key).
 1251
 1252channel_name(Atom, Atom) :-
 1253    atom(Atom),
 1254    !.
 1255channel_name(Key, Atom) :-
 1256    phrase(key_parts(Key), Parts),
 1257    !,
 1258    atomic_list_concat(Parts, :, Atom).
 1259channel_name(Key, _) :-
 1260    type_error(redis_key, Key).
 1261
 1262key_parts(Var) -->
 1263    { var(Var), !, fail }.
 1264key_parts(Atom) -->
 1265    { atom(Atom) },
 1266    !,
 1267    [Atom].
 1268key_parts(A:B) -->
 1269    key_parts(A),
 1270    key_parts(B).
 1271
 1272
 1273
 1274
 1275register_subscription(Id, Channel) :-
 1276    (   subscription(Id, Channel)
 1277    ->  true
 1278    ;   assertz(subscription(Id, Channel))
 1279    ).
 1280
 1281unregister_subscription(Id, Channel) :-
 1282    retractall(subscription(Id, Channel)).
 1283
 1284redis_listen(Redis, Conn) :-
 1285    thread_self(Me),
 1286    pubsub_id(Me, Id),
 1287    setup_call_cleanup(
 1288        assertz(listening(Id, Conn, Me), Ref),
 1289        redis_listen_loop(Redis, Id, Conn),
 1290        erase(Ref)).
 1291
 1292redis_listen_loop(Redis, Id, Conn) :-
 1293    redis_stream(Conn, S, true),
 1294    (   subscription(Id, _)
 1295    ->  redis_read_stream(Redis, S, Reply),
 1296        redis_broadcast(Redis, Reply),
 1297        redis_listen_loop(Redis, Id, Conn)
 1298    ;   true
 1299    ).
 1300
 1301redis_broadcast(_, [subscribe, _Channel, _N]) :-
 1302    !.
 1303redis_broadcast(Redis, [message, Channel, Data]) :-
 1304    !,
 1305    catch(broadcast(redis(Redis, Channel, Data)),
 1306          Error,
 1307          print_message(error, Error)).
 1308redis_broadcast(Redis, Message) :-
 1309    assertion((Message = [Type, Channel, _Data],
 1310               atom(Type),
 1311               atom(Channel))),
 1312    debug(redis(warning), '~p: Unknown message while listening: ~p',
 1313          [Redis,Message]).
 1314
 1315
 1316		 /*******************************
 1317		 *          READ/WRITE		*
 1318		 *******************************/
 redis_read_stream(+Redis, +Stream, -Term) is det
Read a message from a Redis stream. Term is one of

If something goes wrong, the connection is closed and an exception is raised.

 1335redis_read_stream(Redis, SI, Out) :-
 1336    E = error(Formal,_),
 1337    catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
 1338    (   var(Formal)
 1339    ->  handle_push_messages(Push, Redis),
 1340        (   var(Error)
 1341        ->  Out = Out0
 1342        ;   resync(Redis),
 1343            throw(Error)
 1344        )
 1345    ;   redis_disconnect(Redis, [force(true)]),
 1346        throw(E)
 1347    ).
 1348
 1349handle_push_messages([], _).
 1350handle_push_messages([H|T], Redis) :-
 1351    (   catch(handle_push_message(H, Redis), E,
 1352              print_message(warning, E))
 1353    ->  true
 1354    ;   true
 1355    ),
 1356    handle_push_messages(T, Redis).
 1357
 1358handle_push_message(["pubsub"|List], Redis) :-
 1359    redis_broadcast(Redis, List).
 1360% some protocol version 3 push messages (such as
 1361% __keyspace@* events) seem to come directly
 1362% without a pubsub header
 1363handle_push_message([message|List], Redis) :-
 1364    redis_broadcast(Redis, [message|List]).
 resync(+Redis) is det
Re-synchronize after an error. This may happen if some type conversion fails and we have read a partial reply. It is hard to figure out what to read from where we are, so we echo a random magic sequence and read until we find the reply.
 1374resync(Redis) :-
 1375    E = error(Formal,_),
 1376    catch(do_resync(Redis), E, true),
 1377    (   var(Formal)
 1378    ->  true
 1379    ;   redis_disconnect(Redis, [force(true)])
 1380    ).
 1381
 1382do_resync(Redis) :-
 1383    A is random(1_000_000_000),
 1384    redis_stream(Redis, S, true),
 1385    redis_write_msg(S, echo(A)),
 1386    catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
 1387          time_limit_exceeded,
 1388          throw(error(time_limit_exceeded,_))).
 redis_read_msg(+Stream, -Message, -Error, -PushMessages) is det
 redis_write_msg(+Stream, +Message) is det
Read/write a Redis message. Both these predicates are in the foreign module redis4pl.
Arguments:
PushMessages- is a list of push messages that may be non-[] if protocol version 3 (see redis_connect/3) is selected. Using protocol version 2 this list is always empty.
 1403		 /*******************************
 1404		 *            MESSAGES		*
 1405		 *******************************/
 1406
 1407:- multifile
 1408    prolog:error_message//1,
 1409    prolog:message//1. 1410
 1411prolog:error_message(redis_error(Code, String)) -->
 1412    [ 'REDIS: ~w: ~s'-[Code, String] ].
 1413
 1414prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
 1415    [ 'REDIS: connection error.  Retrying in ~2f seconds'-[Wait], nl ],
 1416    [ '    '-[] ], '$messages':translate_message(Error)