
redis_streams.pl -- Using Redis streamsA Redis stream is a set of messages consisting of key-value pairs that are identified by a time and sequence number. Streams are powerful objects that can roughly be used for three purposes:
This library abstracts the latter two scenarios. The main predicates are
xstream_set(+Redis, +Key, +Option)MAXLEN ~ Count option to the XADD
command, capping the length of the stream. See also
Redis as a message brokering system
xadd(+Redis, +Key, ?Id, +Data:dict) is detmaxlen(Count). If Id is
unbound, generating the id is left to the server and Id is unified
with the returned id. The returned id is a string consisting of the
time stamp in milliseconds and a sequence number. See Redis docs for
details.
xlisten(+Redis, +Streams, +Options)XREAD on one or more Streams on the server Redis.
For each message that arrives, call broadcast/1, where Data is a
dict representing the message.
broadcast(redis(Redis, Stream, Id, Data))
Options:
0 to start get all messages from the epoch
or $ to get messages starting with the last. Default is $.
Note that this predicate does not terminate. It is normally
executed in a thread. The following call listens to the streams
key1 and key2 on the default Redis server. Using
reconnect(true), the client will try to re-establish a connection if
the collection got lost.
?- redis_connect(default, C, [reconnect(true)]),
thread_create(xlisten(C, [key1, key2], [start($)]),
_, [detached(true)]).
xlisten(+Redis, +Streams, +OnBroadCast, +OnIdle, +Options)[private]XREAD
or XREADGROUP has returned and the messages are processed. These
callbacks are called as follows:
call(OnBroadCast, +Redis, +Stream, +MessageId, +Dict) call(OnIdle, +Redis, +Streams, +Starts, +NewStarts, +Options)
Both callbacks must succeeds and not leave any open choice points. Failure or exception causes xlisten/5 to stop.
xlisten_group(+Redis, +Group, +Consumer, +Streams, +Options)XACK
is sent to the server.Options processed:
XREADGROUP to return with timeout when no messages
arrive within Seconds. On a timeout, xidle_group/5 is called
which will try to handle messages to other consumers pending
longer than Seconds. Choosing the time depends on the
application. Notably:
max_deliveries(Count) is exceeded. Note that the original
receiver does not notice that the job is claimed and thus
multiple consumers may ultimately answer the message.XCLAIM) a message max Count times.
Exceeding this calls xhook/2. Default Count is 3.10.
xidle_group(+Redis, +Streams, +Starts, +NewStarts, +Options) is det[private]XREADGROUP returns and the returned messages (if
any) have been processed. If Start == NewStarts no messages have
been processed, indicating a timeout.
This implementation looks for idle messages on other consumer and will try to claim them.
check_limit_deliveries(+Redis, +Stream, +Delivered, +Id, +Options)[private]
xleave_group(+Redis, +Group, +Consumer, +Streams) is det[private]
xconsumer_stop(+Leave)redis(stop(Leave)), which is caught
by xlisten_group/5.
xhook(+Stream, +Event)[multifile]XACK. From introduction
to streams:
"So once the deliveries counter reaches a given large number that you chose, it is probably wiser to put such messages in another stream and send a notification to the system administrator. This is basically the way that Redis streams implement the concept of the dead letter."