1/*  File:    canny/redis.pl
    2    Author:  Roy Ratcliffe
    3    Created: Sep 24 2022
    4    Purpose: Canny Redis
    5
    6Copyright (c) 2022, Roy Ratcliffe, Northumberland, United Kingdom
    7
    8Permission is hereby granted, free of charge,  to any person obtaining a
    9copy  of  this  software  and    associated   documentation  files  (the
   10"Software"), to deal in  the   Software  without  restriction, including
   11without limitation the rights to  use,   copy,  modify,  merge, publish,
   12distribute, sublicense, and/or sell  copies  of   the  Software,  and to
   13permit persons to whom the Software is   furnished  to do so, subject to
   14the following conditions:
   15
   16    The above copyright notice and this permission notice shall be
   17    included in all copies or substantial portions of the Software.
   18
   19THE SOFTWARE IS PROVIDED "AS IS", WITHOUT  WARRANTY OF ANY KIND, EXPRESS
   20OR  IMPLIED,  INCLUDING  BUT  NOT   LIMITED    TO   THE   WARRANTIES  OF
   21MERCHANTABILITY, FITNESS FOR A PARTICULAR   PURPOSE AND NONINFRINGEMENT.
   22IN NO EVENT SHALL THE AUTHORS  OR   COPYRIGHT  HOLDERS BE LIABLE FOR ANY
   23CLAIM, DAMAGES OR OTHER LIABILITY,  WHETHER   IN  AN ACTION OF CONTRACT,
   24TORT OR OTHERWISE, ARISING FROM,  OUT  OF   OR  IN  CONNECTION  WITH THE
   25SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
   26
   27*/
   28
   29:- module(canny_redis,
   30          [ redis_last_streams/2,               % +Reads,-Streams:list
   31            redis_last_streams/3,               % +Reads,?Tag,-Streams:dict
   32            redis_last_stream_entry/3,          % +Streams,-StreamId,-Fields
   33            redis_last_stream_entry/4,          % +Streams,-StreamId,?Tag,-Fields
   34            redis_keys_and_stream_ids/4,        % +Streams,?Tag,-Keys,-StreamIds
   35            redis_keys_and_stream_ids/3,        % +Pairs,-Keys,-StreamIds,
   36            redis_stream_read/4,                % +Reads,-Key,-StreamId,-Fields
   37            redis_stream_read/5,                % +Reads,-Key,-StreamId,?Tag,-Fields
   38            redis_stream_entry/3,               % +Entries,-StreamId,-Fields
   39            redis_stream_entry/4,               % +Entries,-StreamId,?Tag,-Fields
   40            redis_stream_id/1,                  % ?RedisTimeSeqPair
   41            redis_stream_id/2,                  % ?StreamId,?RedisTimeSeqPair
   42            redis_stream_id/3,                  % ?StreamId,?RedisTime,?Seq
   43            redis_time/1,                       % +RedisTime
   44            redis_date_time/3                   % +RedisTime,-DateTime,+TimeZone
   45          ]).   46:- autoload(library(lists), [member/2]).   47:- autoload(library(redis), [redis_array_dict/3]).   48:- autoload(library(apply), [maplist/3]).   49
   50                /*******************************
   51                *       S t r e a m s          *
   52                *******************************/
 redis_last_streams(+Reads, -Streams:list) is det
 redis_last_streams(+Reads, ?Tag, -Streams:dict) is det
Collates the last Streams for a given list of Reads, the reply from an XREAD command. The implementation assumes that each stream's read reply has one entry at least, else the stream does not present a reply.
   62redis_last_streams(Reads, Streams) :-
   63    maplist(redis_last_stream, Reads, Streams).
   64
   65redis_last_stream([Key, Entries], Key-StreamId) :-
   66    redis_last_stream_entry(Entries, StreamId, _).
   67
   68redis_last_streams(Reads, Tag, Streams) :-
   69    redis_last_streams(Reads, Streams0),
   70    dict_create(Streams, Tag, Streams0).
 redis_last_stream_entry(+Entries, -StreamId, -Fields) is semidet
 redis_last_stream_entry(+Entries:list(list), -StreamId:atom, ?Tag:atom, -Fields:dict) is semidet
Unifies with the last StreamId and Fields. It fails for empty Entries. Each entry comprises a StreamId and a set of Fields.
   79redis_last_stream_entry([[StreamId, Fields]], StreamId, Fields) :-
   80    !.
   81redis_last_stream_entry([_|Entries], StreamId, Fields) :-
   82    redis_last_stream_entry(Entries, StreamId, Fields).
   83
   84redis_last_stream_entry(Entries, StreamId, Tag, Fields) :-
   85    redis_last_stream_entry(Entries, StreamId, Fields0),
   86    redis_array_dict(Fields0, Tag, Fields).
 redis_keys_and_stream_ids(+Streams, ?Tag, -Keys, -StreamIds) is det
 redis_keys_and_stream_ids(+Pairs, -Keys, -StreamIds) is det
Streams or Pairs of Keys and StreamIds. Arity-3 exists with Tag in order to unify with a dictionary by Tag.
Arguments:
Streams- is a dictionary of stream identifiers, indexed by stream key.
Keys- is a list of stream keys.
StreamIds- is a list of corrected stream identifiers. The predicate applies redis_stream_id/3 to the incoming identifiers, allowing for arbitrary milliseconds-sequence pairs including implied missing zero sequence number.
  104redis_keys_and_stream_ids(Streams, Tag, Keys, StreamIds) :-
  105    dict_pairs(Streams, Tag, Pairs),
  106    redis_keys_and_stream_ids(Pairs, Keys, StreamIds).
  107
  108redis_keys_and_stream_ids([], [], []).
  109redis_keys_and_stream_ids([Key-StreamId0|T0], [Key|T1], [RedisTime-Seq|T]) :-
  110    redis_stream_id(StreamId0, RedisTime, Seq),
  111    redis_keys_and_stream_ids(T0, T1, T).
 redis_stream_read(+Reads, -Key, -StreamId, -Fields) is nondet
 redis_stream_read(+Reads, -Key, -StreamId, ?Tag, -Fields) is nondet
Unifies with all Key, StreamId and array of Fields for all Reads.
Arguments:
Reads- is a list of [Key, Entries] lists, a list of lists. The sub-lists always have two items: the Key of the stream followed by another sub-list of stream entries.
  122redis_stream_read(Reads, Key, StreamId, Fields) :-
  123    member([Key, Entries], Reads),
  124    redis_stream_entry(Entries, StreamId, Fields).
  125
  126redis_stream_read(Reads, Key, StreamId, Tag, Fields) :-
  127    member([Key, Entries], Reads),
  128    redis_stream_entry(Entries, StreamId, Tag, Fields).
 redis_stream_entry(+Entries, -StreamId, -Fields) is nondet
 redis_stream_entry(+Entries:list, -StreamId:pair(nonneg,nonneg), ?Tag:atom, -Fields:dict) is nondet
 redis_stream_entry(+Reads:list, -Key:atom, -StreamId:pair(nonneg,nonneg), ?Tag:atom, -Fields:dict) is nondet
Unifies non-deterministically with all Entries, or Fields dictionaries embedded with multi-stream Reads. Decodes the stream identifier and the Entry.
Arguments:
Entries- is a list of [StreamId, Fields] lists, another list of lists. Each sub-list describes an "entry" within the stream, a pairing between an identifier and some fields.
  144redis_stream_entry(Entries, StreamId, Fields) :-
  145    member([StreamId0, Fields], Entries),
  146    redis_stream_id(StreamId0, StreamId).
  147
  148redis_stream_entry(Entries, StreamId, Tag, Fields) :-
  149    redis_stream_entry(Entries, StreamId, Fields0),
  150    redis_array_dict(Fields0, Tag, Fields).
 redis_stream_id(?RedisTimeSeqPair) is semidet
 redis_stream_id(?StreamId:text, ?RedisTimeSeqPair) is semidet
 redis_stream_id(?StreamId:text, ?RedisTime:nonneg, ?Seq:nonneg) is semidet
Stream identifier to millisecond and sequence numbers. In practice, the numbers always convert to integers.

Deliberately validates incoming Redis time and sequence numbers. Both must be integers and both must be zero or more. The predicates fail otherwise. Internally, Redis stores stream identifiers as 128-bit unsigned integers split in half for the time and sequence values, each of 64 bits.

The 3-arity version of the predicate handles extraction of time and sequence integers from arbitrary stream identifiers: text or compound terms, including implied zero-sequence stream identifier with a single non-negative integer representing a millisecond Unix time.

Arguments:
StreamId- identifies a stream message or entry, element or item. All these terms apply to the contents of a stream, but Redis internally refers to the content as entries.
RedisTimeSeqPair- is a pair of non-negative integers, time and sequence. The Redis time equals Unix time multiplied by 1,000; in other words, Unix time in milliseconds.
  180redis_stream_id(RedisTime-Seq) :-
  181    redis_time(RedisTime),
  182    integer(Seq),
  183    Seq >= 0.
  184
  185redis_stream_id(StreamId, RedisTime-Seq) :-
  186    var(StreamId),
  187    !,
  188    redis_stream_id(RedisTime-Seq),
  189    atomic_list_concat([RedisTime, Seq], -, StreamId).
  190redis_stream_id(StreamId, RedisTime-Seq) :-
  191    (   atom(StreamId)
  192    ->  true
  193    ;   string(StreamId)
  194    ),
  195    split_string(StreamId, -, '', [RedisTime0, Seq0]),
  196    number_string(RedisTime, RedisTime0),
  197    number_string(Seq, Seq0),
  198    redis_stream_id(RedisTime-Seq).
  199
  200redis_stream_id(RedisTime-Seq, RedisTime, Seq) :-
  201    redis_stream_id(RedisTime-Seq),
  202    !.
  203redis_stream_id(RedisTime, RedisTime, 0) :-
  204    redis_time(RedisTime),
  205    !.
  206redis_stream_id(StreamId, RedisTime, Seq) :-
  207    redis_stream_id(StreamId, RedisTime-Seq).
 redis_time(+RedisTime) is semidet
Successful when RedisTime is a positive integer. Redis times amount to millisecond-scale Unix times.
Arguments:
RedisTime- in milliseconds since 1970.
  216redis_time(RedisTime) :-
  217    integer(RedisTime),
  218    RedisTime >= 0.
 redis_date_time(+RedisTime, -DateTime, +TimeZone) is det
Converts RedisTime to DateTime within TimeZone.
  224redis_date_time(RedisTime, DateTime, TimeZone) :-
  225    Stamp is RedisTime / 1000,
  226    stamp_date_time(Stamp, DateTime, TimeZone)