1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker and Sean Charles 4 E-mail: jan@swi-prolog.org and <sean at objitsu dot com> 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2013-2022, Sean Charles 7 SWI-Prolog Solutions b.v. 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34 35 NOTE 36 37 The original code was subject to the MIT licence and written by 38 Sean Charles. Re-licenced to standard SWI-Prolog BSD-2 with 39 permission from Sean Charles. 40*/ 41 42:- module(redis, 43 [ redis_server/3, % +Alias, +Address, +Options 44 redis_connect/1, % -Connection 45 redis_connect/3, % -Connection, +Host, +Port 46 redis_disconnect/1, % +Connection 47 redis_disconnect/2, % +Connection, +Options 48 % Queries 49 redis/1, % +Request 50 redis/2, % +Connection, +Request 51 redis/3, % +Connection, +Request, -Reply 52 % High level queries 53 redis_get_list/3, % +Redis, +Key, -List 54 redis_get_list/4, % +Redis, +Key, +ChunkSize, -List 55 redis_set_list/3, % +Redis, +Key, +List 56 redis_get_hash/3, % +Redis, +Key, -Data:dict 57 redis_set_hash/3, % +Redis, +Key, +Data:dict 58 redis_scan/3, % +Redis, -LazyList, +Options 59 redis_sscan/4, % +Redis, +Set, -LazyList, +Options 60 redis_hscan/4, % +Redis, +Hash, -LazyList, +Options 61 redis_zscan/4, % +Redis, +Set, -LazyList, +Options 62 % Publish/Subscribe 63 redis_subscribe/4, % +Redis, +Channels, -Id, +Options 64 redis_subscribe/2, % +Id, +Channels 65 redis_unsubscribe/2, % +Id, +Channels 66 redis_current_subscription/2, % ?Id,?Channels 67 redis_write/2, % +Redis, +Command 68 redis_read/2, % +Redis, -Reply 69 % Building blocks 70 redis_array_dict/3, % ?Array, ?Tag, ?Dict 71 % Admin stuff 72 redis_property/2, % +Reply, ?Property 73 redis_current_command/2, % +Redis,?Command 74 redis_current_command/3 % +Redis, +Command, -Properties 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error), 80 [ must_be/2, 81 type_error/2, 82 instantiation_error/1, 83 uninstantiation_error/1, 84 existence_error/2, 85 existence_error/3 86 ]). 87:- autoload(library(lazy_lists), [lazy_list/2]). 88:- autoload(library(lists), [append/3, member/2]). 89:- autoload(library(option), [merge_options/3, option/2, 90 option/3, select_option/4]). 91:- autoload(library(pairs), [group_pairs_by_key/2]). 92:- autoload(library(time), [call_with_time_limit/2]). 93:- use_module(library(debug), [debug/3, assertion/1]). 94:- use_module(library(settings), [setting/4, setting/2]). 95:- if(exists_source(library(ssl))). 96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 97:- endif. 98 99:- use_foreign_library(foreign(redis4pl)). 100 101:- setting(max_retry_count, nonneg, 8640, % one day 102 "Max number of retries"). 103:- setting(max_retry_wait, number, 10, 104 "Max time to wait between recovery attempts"). 105:- setting(sentinel_timeout, number, 0.2, 106 "Time to wait for a sentinel"). 107 108:- predicate_options(redis_server/3, 3, 109 [ pass_to(redis:redis_connect/3, 3) 110 ]). 111:- predicate_options(redis_connect/3, 3, 112 [ reconnect(boolean), 113 user(atom), 114 password(atomic), 115 version(between(2,3)) 116 ]). 117:- predicate_options(redis_disconnect/2, 2, 118 [ force(boolean) 119 ]). 120:- predicate_options(redis_scan/3, 3, 121 [ match(atomic), 122 count(nonneg), 123 type(atom) 124 ]). 125% Actually not passing, but the same 126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]).
149:- dynamic server/3. 150 151:- dynamic ( connection/2, % ServerName, Stream 152 sentinel/2 % Pool, Address 153 ) as volatile.
default
points at localhost:6379
with no connect options. The default
server is used for redis/1 and redis/2 and may be changed using this
predicate. Options are described with redis_connect/3.
Connections established this way are by default automatically
reconnected if the connection is lost for some reason unless a
reconnect(false)
option is specified.
167redis_server(Alias, Address, Options) :- 168 must_be(ground, Alias), 169 retractall(server(Alias, _, _)), 170 asserta(server(Alias, Address, Options)). 171 172server(default, localhost:6379, []).
redis_connect(+Address,
-Connection, +Options)
. redis_connect/1 is equivalent to
redis_connect(localhost:6379, Connection, [])
. Options:
true
, try to reconnect to the service when the connection
seems lost. Default is true
for connections specified using
redis_server/3 and false
for explictly opened connections.version(3)
and password(Password)
are specified, these
are used to authenticate using the HELLO command.3
, the HELLO command is used to upgrade the protocol.cacert
, key
and cert
options.sentinel(MasterName)
to enable contacting a network of Redis servers guarded by a
sentinel network.Instead of using these predicates, redis/2 and redis/3 are normally used with a server name argument registered using redis_server/3. These predicates are meant for creating a temporary paralel connection or using a connection with a blocking call.
228redis_connect(Conn) :- 229 redis_connect(default, Conn, []). 230 231redis_connect(Conn, Host, Port) :- 232 var(Conn), 233 ground(Host), ground(Port), 234 !, % GNU-Prolog compatibility 235 redis_connect(Host:Port, Conn, []). 236redis_connect(Server, Conn, Options) :- 237 atom(Server), 238 !, 239 ( server(Server, Address, DefaultOptions) 240 -> merge_options(Options, DefaultOptions, Options2), 241 do_connect(Server, Address, Conn, [address(Address)|Options2]) 242 ; existence_error(redis_server, Server) 243 ). 244redis_connect(Address, Conn, Options) :- 245 do_connect(Address, Address, Conn, [address(Address)|Options]).
redis_connection(Id, Stream, Failures, Options)
253do_connect(Id, sentinel(Pool), Conn, Options) => 254 sentinel_master(Id, Pool, Conn, Options). 255do_connect(Id, Address0, Conn, Options) => 256 tcp_address(Address0, Address), 257 tcp_connect(Address, Stream0, Options), 258 tls_upgrade(Address, Stream0, Stream, Options), 259 Conn = redis_connection(Id, Stream, 0, Options), 260 hello(Conn, Options). 261 262tcp_address(unix(Path), Path) :- 263 !. % Using an atom is ambiguous 264tcp_address(Address, Address).
tls(true)
is specified.270:- if(current_predicate(ssl_context/3)). 271tls_upgrade(Host:_Port, Raw, Stream, Options) :- 272 option(tls(true), Options), 273 !, 274 must_have_option(cacert(CacertFile), Options), 275 must_have_option(key(KeyFile), Options), 276 must_have_option(cert(CertFile), Options), 277 ssl_context(client, SSL, 278 [ host(Host), 279 certificate_file(CertFile), 280 key_file(KeyFile), 281 cacerts([file(CacertFile)]), 282 cert_verify_hook(tls_verify), 283 close_parent(true) 284 ]), 285 stream_pair(Raw, RawRead, RawWrite), 286 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write), 287 stream_pair(Stream, Read, Write). 288:- endif. 289tls_upgrade(_, Stream, Stream, _). 290 291:- if(current_predicate(ssl_context/3)).
redis-cli
), we accept the
certificate as long as it is signed, not verifying the hostname.299:- public tls_verify/5. 300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :- 301 !. 302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :- 303 !. 304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :- 305 fail. 306 307:- endif.
313sentinel_master(Id, Pool, Master, Options) :- 314 must_have_option(sentinels(Sentinels), Options), 315 sentinel_auth(Options, Options1), 316 setting(sentinel_timeout, TMO), 317 ( sentinel(Pool, Sentinel) 318 ; member(Sentinel, Sentinels) 319 ), 320 catch(call_with_time_limit( 321 TMO, 322 do_connect(Id, Sentinel, Conn, 323 [sentinel(true)|Options1])), 324 Error, 325 (print_message(warning, Error),fail)), 326 !, 327 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]), 328 call_cleanup( 329 query_sentinel(Pool, Conn, Sentinel, MasterAddr), 330 redis_disconnect(Conn)), 331 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]), 332 do_connect(Id, MasterAddr, Master, Options), 333 debug(redis(sentinel), 'Connected to claimed master', []), 334 redis(Master, role, Role), 335 ( Role = [master|_Slaves] 336 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr]) 337 ; redis_disconnect(Master), 338 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]), 339 sleep(TMO), 340 sentinel_master(Id, Pool, Master, Options) 341 ). 342 343sentinel_auth(Options0, Options) :- 344 option(sentinel_user(User), Options0), 345 option(sentinel_password(Passwd), Options0), 346 !, 347 merge_options([user(User), password(Passwd)], Options0, Options). 348sentinel_auth(Options0, Options) :- 349 select_option(password(_), Options0, Options, _). 350 351 352query_sentinel(Pool, Conn, Sentinel, Host:Port) :- 353 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData), 354 MasterData = [Host,Port], 355 redis(Conn, sentinel(sentinels, Pool), Peers), 356 transaction(update_known_sentinels(Pool, Sentinel, Peers)). 357 358update_known_sentinels(Pool, Sentinel, Peers) :- 359 retractall(sentinel(Pool, _)), 360 maplist(update_peer_sentinel(Pool), Peers), 361 asserta(sentinel(Pool, Sentinel)). 362 363update_peer_sentinel(Pool, Attrs), 364 memberchk(ip-Host, Attrs), 365 memberchk(port-Port, Attrs) => 366 asserta(sentinel(Pool, Host:Port)). 367 368must_have_option(Opt, Options) :- 369 option(Opt, Options), 370 !. 371must_have_option(Opt, Options) :- 372 existence_error(option, Opt, Options).
379hello(Con, Options) :- 380 option(version(V), Options), 381 V >= 3, 382 !, 383 ( option(user(User), Options), 384 option(password(Password), Options) 385 -> redis(Con, hello(3, auth, User, Password)) 386 ; redis(Con, hello(3)) 387 ). 388hello(Con, Options) :- 389 option(password(Password), Options), 390 !, 391 redis(Con, auth(Password)). 392hello(_, _).
redis_connection(Id,Stream,Failures,Options)
. If the stream is
disconnected it will be reconnected.401redis_stream(Var, S, _) :- 402 ( var(Var) 403 -> !, instantiation_error(Var) 404 ; nonvar(S) 405 -> !, uninstantiation_error(S) 406 ). 407redis_stream(ServerName, S, Connect) :- 408 atom(ServerName), 409 !, 410 ( connection(ServerName, S0) 411 -> S = S0 412 ; Connect == true, 413 server(ServerName, Address, Options) 414 -> redis_connect(Address, Connection, Options), 415 redis_stream(Connection, S, false), 416 asserta(connection(ServerName, S)) 417 ; existence_error(redis_server, ServerName) 418 ). 419redis_stream(redis_connection(_,S0,_,_), S, _) :- 420 S0 \== (-), 421 !, 422 S = S0. 423redis_stream(Redis, S, _) :- 424 Redis = redis_connection(Id,-,_,Options), 425 option(address(Address), Options), 426 do_connect(Id,Address,Redis2,Options), 427 arg(2, Redis2, S0), 428 nb_setarg(2, Redis, S0), 429 S = S0. 430 431has_redis_stream(Var, _) :- 432 var(Var), 433 !, 434 instantiation_error(Var). 435has_redis_stream(Alias, S) :- 436 atom(Alias), 437 !, 438 connection(Alias, S). 439has_redis_stream(redis_connection(_,S,_,_), S) :- 440 S \== (-).
true
(default false
), do not raise any errors if
Connection does not exist or closing the connection raises
a network or I/O related exception. This version is used
internally if a connection is in a broken state, either due
to a protocol error or a network issue.456redis_disconnect(Redis) :- 457 redis_disconnect(Redis, []). 458 459redis_disconnect(Redis, Options) :- 460 option(force(true), Options), 461 !, 462 ( Redis = redis_connection(_Id, S, _, _Opts) 463 -> ( S == (-) 464 -> true 465 ; close(S, [force(true)]), 466 nb_setarg(2, Redis, -) 467 ) 468 ; has_redis_stream(Redis, S) 469 -> close(S, [force(true)]), 470 retractall(connection(_,S)) 471 ; true 472 ). 473redis_disconnect(Redis, _Options) :- 474 redis_stream(Redis, S, false), 475 close(S), 476 retractall(connection(_,S)).
redis(Connection, Command, _)
and second, it
can be used to exploit Redis pipelines and transactions. The
second form is acticated if Request is a list. In that case, each
element of the list is either a term Command -> Reply
or a simple
Command. Semantically this represents a sequence of redis/3 and
redis/2 calls. It differs in the following aspects:
multi
and the last exec
, the
commands are executed as a Redis transaction, i.e., they
are executed atomically.Procedurally, the process takes the following steps:
Command -> Reply
terms.Examples
?- redis(default, [ lpush(li,1), lpush(li,2), lrange(li,0,-1) -> List ]). List = ["2", "1"].
520redis(Redis, PipeLine) :- 521 is_list(PipeLine), 522 !, 523 redis_pipeline(Redis, PipeLine). 524redis(Redis, Req) :- 525 redis(Redis, Req, _).
"A:B:..."
. This is a common shorthand for
representing Redis keys.
Reply is either a plain term (often a variable) or a term Value as
Type
. In the latter form, Type dictates how the Redis bulk
reply is translated to Prolog. The default equals to auto
, i.e.,
as a number of the content satisfies the Prolog number syntax and
as an atom otherwise.
status(Atom)
Returned if the server replies with + Status
. Atom
is the textual value of Status converted to lower case,
e.g., status(ok)
or status(pong)
.nil
This atom is returned for a NIL/NULL value. Note that if
the reply is only nil
, redis/3 fails. The nil
value
may be embedded inside lists or maps.nil
. If Reply
as a whole would be nil
the call fails.
Redis bulk replies are translated depending on the as
Type as
explained above.
bytes
(iso_latin_1
), utf8
and text
(the
current locale translation).type_error(Type, String)
is raised.min_tagged_integer
and max_tagged_integer
, allowing
the value to be used as a dict key.auto(atom, number)
auto(atom,tagged_integer)
. This allows the value
to be used as a key for a SWI-Prolog dict.pairs
type
can also be applied to a Redis array. In this case the array
length must be even. This notably allows fetching a Redis
hash as pairs using HGETALL
using version 2 of the
Redis protocol.pairs(AsKey, AsValue)
, but convert the resulting
pair list into a SWI-Prolog dict. AsKey must convert to a
valid dict key, i.e., an atom or tagged integer. See dict_key
.dict(dict_key, AsValue)
.Here are some simple examples
?- redis(default, set(a, 42), X). X = status("OK"). ?- redis(default, get(a), X). X = "42". ?- redis(default, get(a), X as integer). X = 42. ?- redis(default, get(a), X as float). X = 42.0. ?- redis(default, set(swipl:version, 8)). true. ?- redis(default, incr(swipl:version), X). X = 9.
645redis(Redis, Req, Out) :- 646 out_val(Out, Val), 647 redis1(Redis, Req, Out), 648 Val \== nil. 649 650out_val(Out, Val) :- 651 ( nonvar(Out), 652 Out = (Val as _) 653 -> true 654 ; Val = Out 655 ). 656 657redis1(Redis, Req, Out) :- 658 Error = error(Formal, _), 659 catch(redis2(Redis, Req, Out), Error, true), 660 ( var(Formal) 661 -> true 662 ; recover(Error, Redis, redis1(Redis, Req, Out)) 663 ). 664 665redis2(Redis, Req, Out) :- 666 atom(Redis), 667 !, 668 redis_stream(Redis, S, true), 669 with_mutex(Redis, 670 ( redis_write_msg(S, Req), 671 redis_read_stream(Redis, S, Out) 672 )). 673redis2(Redis, Req, Out) :- 674 redis_stream(Redis, S, true), 675 redis_write_msg(S, Req), 676 redis_read_stream(Redis, S, Out).
680redis_pipeline(Redis, PipeLine) :- 681 Error = error(Formal, _), 682 catch(redis_pipeline2(Redis, PipeLine), Error, true), 683 ( var(Formal) 684 -> true 685 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine)) 686 ). 687 688redis_pipeline2(Redis, PipeLine) :- 689 atom(Redis), 690 !, 691 redis_stream(Redis, S, true), 692 with_mutex(Redis, 693 redis_pipeline3(Redis, S, PipeLine)). 694redis_pipeline2(Redis, PipeLine) :- 695 redis_stream(Redis, S, true), 696 redis_pipeline3(Redis, S, PipeLine). 697 698redis_pipeline3(Redis, S, PipeLine) :- 699 maplist(write_pipeline(S), PipeLine), 700 flush_output(S), 701 read_pipeline(Redis, S, PipeLine). 702 703write_pipeline(S, Command -> _Reply) :- 704 !, 705 redis_write_msg_no_flush(S, Command). 706write_pipeline(S, Command) :- 707 redis_write_msg_no_flush(S, Command). 708 709read_pipeline(Redis, S, PipeLine) :- 710 E = error(Formal,_), 711 catch(read_pipeline2(Redis, S, PipeLine), E, true), 712 ( var(Formal) 713 -> true 714 ; reconnect_error(E) 715 -> redis_disconnect(Redis, [force(true)]), 716 throw(E) 717 ; resync(Redis), 718 throw(E) 719 ). 720 721read_pipeline2(Redis, S, PipeLine) :- 722 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed), 723 maplist(handle_push(Redis), Pushed), 724 maplist(handle_error, Errors), 725 maplist(bind_reply, PipeLine, Replies). 726 727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :- 728 !, 729 redis_read_msg(S, ReplyIn, Reply, Error, Push). 730redis_read_msg3(S, Var, Reply, Error, Push) :- 731 redis_read_msg(S, Var, Reply, Error, Push). 732 733handle_push(Redis, Pushed) :- 734 handle_push_messages(Pushed, Redis). 735handle_error(Error) :- 736 ( var(Error) 737 -> true 738 ; throw(Error) 739 ). 740bind_reply(_Command -> Reply0, Reply) :- 741 !, 742 Reply0 = Reply. 743bind_reply(_Command, _).
752:- meta_predicate recover( , , ). 753 754recover(Error, Redis, Goal) :- 755 Error = error(Formal, _), 756 reconnect_error(Formal), 757 auto_reconnect(Redis), 758 !, 759 debug(redis(recover), '~p: got error ~p; trying to reconnect', 760 [Redis, Error]), 761 redis_disconnect(Redis, [force(true)]), 762 ( wait_to_retry(Redis, Error) 763 -> call(Goal), 764 retractall(failure(Redis, _)) 765 ; throw(Error) 766 ). 767recover(Error, _, _) :- 768 throw(Error). 769 770auto_reconnect(redis_connection(_,_,_,Options)) :- 771 !, 772 option(reconnect(true), Options). 773auto_reconnect(Server) :- 774 ground(Server), 775 server(Server, _, Options), 776 option(reconnect(true), Options, true). 777 778reconnect_error(io_error(_Action, _On)). 779reconnect_error(socket_error(_Code, _)). 780reconnect_error(syntax_error(unexpected_eof)). 781reconnect_error(existence_error(stream, _)).
max_retry_wait
. If the
setting max_retry_count
is exceeded we fail and the called signals
an exception.790:- dynamic failure/2 as volatile. 791 792wait_to_retry(Redis, Error) :- 793 redis_failures(Redis, Failures), 794 setting(max_retry_count, Count), 795 Failures < Count, 796 Failures2 is Failures+1, 797 redis_set_failures(Redis, Failures2), 798 setting(max_retry_wait, MaxWait), 799 Wait is min(MaxWait*100, 1<<Failures)/100.0, 800 debug(redis(recover), ' Sleeping ~p seconds', [Wait]), 801 retry_message_level(Failures, Level), 802 print_message(Level, redis(retry(Redis, Failures, Wait, Error))), 803 sleep(Wait). 804 805redis_failures(redis_connection(_,_,Failures0,_), Failures) :- 806 !, 807 Failures = Failures0. 808redis_failures(Server, Failures) :- 809 atom(Server), 810 ( failure(Server, Failures) 811 -> true 812 ; Failures = 0 813 ). 814 815redis_set_failures(Connection, Count) :- 816 compound(Connection), 817 !, 818 nb_setarg(3, Connection, Count). 819redis_set_failures(Server, Count) :- 820 atom(Server), 821 retractall(failure(Server, _)), 822 asserta(failure(Server, Count)). 823 824retry_message_level(0, warning) :- !. 825retry_message_level(_, silent).
834redis(Req) :-
835 setup_call_cleanup(
836 redis_connect(default, C, []),
837 redis1(C, Req, Out),
838 redis_disconnect(C)),
839 print(Out).
847redis_write(Redis, Command) :- 848 redis_stream(Redis, S, true), 849 redis_write_msg(S, Command). 850 851redis_read(Redis, Reply) :- 852 redis_stream(Redis, S, true), 853 redis_read_stream(Redis, S, Reply). 854 855 856 /******************************* 857 * HIGH LEVEL ACCESS * 858 *******************************/
LRANGE
requests. Note
that this results in O(N^2) complexity. Using a lazy list is most
useful for relatively short lists holding possibly large items.
Note that values retrieved are strings, unless the value was added
using Term as prolog
.
875redis_get_list(Redis, Key, List) :- 876 redis_get_list(Redis, Key, -1, List). 877 878redis_get_list(Redis, Key, Chunk, List) :- 879 redis(Redis, llen(Key), Len), 880 ( ( Chunk >= Len 881 ; Chunk == -1 882 ) 883 -> ( Len == 0 884 -> List = [] 885 ; End is Len-1, 886 list_range(Redis, Key, 0, End, List) 887 ) 888 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List) 889 ). 890 891rlist_next(State, List, Tail) :- 892 State = s(Redis,Key,Offset,Slice,Len), 893 End is min(Len-1, Offset+Slice-1), 894 list_range(Redis, Key, Offset, End, Elems), 895 ( End =:= Len-1 896 -> List = Elems, 897 Tail = [] 898 ; Offset2 is Offset+Slice, 899 nb_setarg(3, State, Offset2), 900 append(Elems, Tail, List) 901 ). 902 903% Redis LRANGE demands End > Start and returns inclusive. 904 905list_range(DB, Key, Start, Start, [Elem]) :- 906 !, 907 redis(DB, lindex(Key, Start), Elem). 908list_range(DB, Key, Start, End, List) :- 909 !, 910 redis(DB, lrange(Key, Start, End), List).
[]
, Key is deleted. Note that key values
are always strings in Redis. The same conversion rules as for
redis/1-3 apply.
921redis_set_list(Redis, Key, List) :-
922 redis(Redis, del(Key), _),
923 ( List == []
924 -> true
925 ; Term =.. [rpush,Key|List],
926 redis(Redis, Term, _Count)
927 ).
HGETALL
command. If the Redis hash is not used by
other (non-Prolog) applications one may also consider using the
Term as prolog
syntax to store the Prolog dict as-is.940redis_get_hash(Redis, Key, Dict) :- 941 redis(Redis, hgetall(Key), Dict as dict(auto)). 942 943redis_set_hash(Redis, Key, Dict) :- 944 redis_array_dict(Array, _, Dict), 945 Term =.. [hset,Key|Array], 946 redis(Redis, del(Key), _), 947 redis(Redis, Term, _Count).
958redis_array_dict(Array, Tag, Dict) :- 959 nonvar(Array), 960 !, 961 array_to_pairs(Array, Pairs), 962 dict_pairs(Dict, Tag, Pairs). 963redis_array_dict(TwoList, Tag, Dict) :- 964 dict_pairs(Dict, Tag, Pairs), 965 pairs_to_array(Pairs, TwoList). 966 967array_to_pairs([], []) :- 968 !. 969array_to_pairs([NameS-Value|T0], [Name-Value|T]) :- 970 !, % RESP3 returns a map as pairs. 971 atom_string(Name, NameS), 972 array_to_pairs(T0, T). 973array_to_pairs([NameS,Value|T0], [Name-Value|T]) :- 974 atom_string(Name, NameS), 975 array_to_pairs(T0, T). 976 977pairs_to_array([], []) :- 978 !. 979pairs_to_array([Name-Value|T0], [NameS,Value|T]) :- 980 atom_string(Name, NameS), 981 pairs_to_array(T0, T).
SCAN
, SSCAN
, HSCAN
and ZSCAN` commands
into a lazy list. For redis_scan/3 and redis_sscan/4 the result is
a list of strings. For redis_hscan/4 and redis_zscan/4, the result
is a list of pairs. Options processed:
MATCH
subcommand, only returning matches for
Pattern.COUNT
subcommand, giving a hint to the size of the
chunks fetched.TYPE
subcommand, only returning answers of the
indicated type.1005redis_scan(Redis, LazyList, Options) :- 1006 scan_options([match,count,type], Options, Parms), 1007 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList). 1008 1009redis_sscan(Redis, Set, LazyList, Options) :- 1010 scan_options([match,count,type], Options, Parms), 1011 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList). 1012 1013redis_hscan(Redis, Hash, LazyList, Options) :- 1014 scan_options([match,count,type], Options, Parms), 1015 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList). 1016 1017redis_zscan(Redis, Set, LazyList, Options) :- 1018 scan_options([match,count,type], Options, Parms), 1019 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList). 1020 1021scan_options([], _, []). 1022scan_options([H|T0], Options, [H,V|T]) :- 1023 Term =.. [H,V], 1024 option(Term, Options), 1025 !, 1026 scan_options(T0, Options, T). 1027scan_options([_|T0], Options, T) :- 1028 scan_options(T0, Options, T). 1029 1030 1031scan_next(State, List, Tail) :- 1032 State = s(Command,Redis,Cursor,Params), 1033 Command =.. CList, 1034 append(CList, [Cursor|Params], CList2), 1035 Term =.. CList2, 1036 redis(Redis, Term, [NewCursor,Elems0]), 1037 scan_pairs(Command, Elems0, Elems), 1038 ( NewCursor == 0 1039 -> List = Elems, 1040 Tail = [] 1041 ; nb_setarg(3, State, NewCursor), 1042 append(Elems, Tail, List) 1043 ). 1044 1045scan_pairs(hscan(_), List, Pairs) :- 1046 !, 1047 scan_pairs(List, Pairs). 1048scan_pairs(zscan(_), List, Pairs) :- 1049 !, 1050 scan_pairs(List, Pairs). 1051scan_pairs(_, List, List). 1052 1053scan_pairs([], []). 1054scan_pairs([Key,Value|T0], [Key-Value|T]) :- 1055 !, 1056 scan_pairs(T0, T). 1057scan_pairs([Key-Value|T0], [Key-Value|T]) :- 1058 scan_pairs(T0, T). 1059 1060 1061 /******************************* 1062 * ABOUT * 1063 *******************************/
1072redis_current_command(Redis, Command) :- 1073 redis_current_command(Redis, Command, _). 1074 1075redis_current_command(Redis, Command, Properties) :- 1076 nonvar(Command), 1077 !, 1078 redis(Redis, command(info, Command), [[_|Properties]]). 1079redis_current_command(Redis, Command, Properties) :- 1080 redis(Redis, command, Commands), 1081 member([Name|Properties], Commands), 1082 atom_string(Command, Name).
redis(info, String)
and parses the result. As this is for machine
usage, properties names *_human are skipped.1090redis_property(Redis, Property) :- 1091 redis(Redis, info, String), 1092 info_terms(String, Terms), 1093 member(Property, Terms). 1094 1095info_terms(Info, Pairs) :- 1096 split_string(Info, "\n", "\r\n ", Lines), 1097 convlist(info_line_term, Lines, Pairs). 1098 1099info_line_term(Line, Term) :- 1100 sub_string(Line, B, _, A, :), 1101 !, 1102 sub_atom(Line, 0, B, _, Name), 1103 \+ sub_atom(Name, _, _, 0, '_human'), 1104 sub_string(Line, _, A, 0, ValueS), 1105 ( number_string(Value, ValueS) 1106 -> true 1107 ; Value = ValueS 1108 ), 1109 Term =.. [Name,Value]. 1110 1111 1112 /******************************* 1113 * SUBSCRIBE * 1114 *******************************/
redis(Id, Channel, Data)
If redis_unsubscribe/2 removes the last subscription, the thread terminates.
To simply print the incomming messages use e.g.
?- listen(redis(_, Channel, Data), format('Channel ~p got ~p~n', [Channel,Data])). true. ?- redis_subscribe(default, test, Id, []). Id = redis_pubsub_3, ?- redis(publish(test, "Hello world")). Channel test got "Hello world" 1 true.
1144:- dynamic ( subscription/2, % Id, Channel 1145 listening/3 % Id, Connection, Thread 1146 ) as volatile. 1147 1148redis_subscribe(Redis, Spec, Id, Options) :- 1149 atom(Redis), 1150 !, 1151 channels(Spec, Channels), 1152 pubsub_thread_options(ThreadOptions, Options), 1153 thread_create(setup_call_cleanup( 1154 redis_connect(Redis, Conn, [reconnect(true)]), 1155 redis_subscribe1(Redis, Conn, Channels), 1156 redis_disconnect(Conn)), 1157 Thread, 1158 ThreadOptions), 1159 pubsub_id(Thread, Id). 1160redis_subscribe(Redis, Spec, Id, Options) :- 1161 channels(Spec, Channels), 1162 pubsub_thread_options(ThreadOptions, Options), 1163 thread_create(redis_subscribe1(Redis, Redis, Channels), 1164 Thread, 1165 ThreadOptions), 1166 pubsub_id(Thread, Id). 1167 1168pubsub_thread_options(ThreadOptions, Options) :- 1169 merge_options(Options, [detached(true)], ThreadOptions). 1170 1171pubsub_id(Thread, Thread). 1172%pubsub_id(Thread, Id) :- 1173% thread_property(Thread, id(TID)), 1174% atom_concat('redis_pubsub_', TID, Id). 1175 1176redis_subscribe1(Redis, Conn, Channels) :- 1177 Error = error(Formal, _), 1178 catch(redis_subscribe2(Redis, Conn, Channels), Error, true), 1179 ( var(Formal) 1180 -> true 1181 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)), 1182 thread_self(Me), 1183 pubsub_id(Me, Id), 1184 findall(Channel, subscription(Id, Channel), CurrentChannels), 1185 redis_subscribe1(Redis, Conn, CurrentChannels) 1186 ). 1187 1188redis_subscribe2(Redis, Conn, Channels) :- 1189 redis_subscribe3(Conn, Channels), 1190 redis_listen(Redis, Conn). 1191 1192redis_subscribe3(Conn, Channels) :- 1193 thread_self(Me), 1194 pubsub_id(Me, Id), 1195 prolog_listen(this_thread_exit, pubsub_clean(Id)), 1196 maplist(register_subscription(Id), Channels), 1197 redis_stream(Conn, S, true), 1198 Req =.. [subscribe|Channels], 1199 redis_write_msg(S, Req). 1200 1201pubsub_clean(Id) :- 1202 retractall(listening(Id, _Connection, _Thread)), 1203 retractall(subscription(Id, _Channel)).
1215redis_subscribe(Id, Spec) :- 1216 channels(Spec, Channels), 1217 ( listening(Id, Connection, _Thread) 1218 -> true 1219 ; existence_error(redis_pubsub, Id) 1220 ), 1221 maplist(register_subscription(Id), Channels), 1222 redis_stream(Connection, S, true), 1223 Req =.. [subscribe|Channels], 1224 redis_write_msg(S, Req). 1225 1226redis_unsubscribe(Id, Spec) :- 1227 channels(Spec, Channels), 1228 ( listening(Id, Connection, _Thread) 1229 -> true 1230 ; existence_error(redis_pubsub, Id) 1231 ), 1232 maplist(unregister_subscription(Id), Channels), 1233 redis_stream(Connection, S, true), 1234 Req =.. [unsubscribe|Channels], 1235 redis_write_msg(S, Req).
1241redis_current_subscription(Id, Channels) :- 1242 findall(Id-Channel, subscription(Id, Channel), Pairs), 1243 keysort(Pairs, Sorted), 1244 group_pairs_by_key(Sorted, Grouped), 1245 member(Id-Channels, Grouped). 1246 1247channels(Spec, List) :- 1248 is_list(Spec), 1249 !, 1250 maplist(channel_name, Spec, List). 1251channels(Ch, [Key]) :- 1252 channel_name(Ch, Key). 1253 1254channel_name(Atom, Atom) :- 1255 atom(Atom), 1256 !. 1257channel_name(Key, Atom) :- 1258 phrase(key_parts(Key), Parts), 1259 !, 1260 atomic_list_concat(Parts, :, Atom). 1261channel_name(Key, _) :- 1262 type_error(redis_key, Key). 1263 1264key_parts(Var) --> 1265 { var(Var), !, fail }. 1266key_parts(Atom) --> 1267 { atom(Atom) }, 1268 !, 1269 [Atom]. 1270key_parts(A:B) --> 1271 key_parts(A), 1272 key_parts(B). 1273 1274 1275 1276 1277register_subscription(Id, Channel) :- 1278 ( subscription(Id, Channel) 1279 -> true 1280 ; assertz(subscription(Id, Channel)) 1281 ). 1282 1283unregister_subscription(Id, Channel) :- 1284 retractall(subscription(Id, Channel)). 1285 1286redis_listen(Redis, Conn) :- 1287 thread_self(Me), 1288 pubsub_id(Me, Id), 1289 setup_call_cleanup( 1290 assertz(listening(Id, Conn, Me), Ref), 1291 redis_listen_loop(Redis, Id, Conn), 1292 erase(Ref)). 1293 1294redis_listen_loop(Redis, Id, Conn) :- 1295 redis_stream(Conn, S, true), 1296 ( subscription(Id, _) 1297 -> redis_read_stream(Redis, S, Reply), 1298 redis_broadcast(Redis, Reply), 1299 redis_listen_loop(Redis, Id, Conn) 1300 ; true 1301 ). 1302 1303redis_broadcast(_, [subscribe, _Channel, _N]) :- 1304 !. 1305redis_broadcast(Redis, [message, Channel, Data]) :- 1306 !, 1307 catch(broadcast(redis(Redis, Channel, Data)), 1308 Error, 1309 print_message(error, Error)). 1310redis_broadcast(Redis, Message) :- 1311 assertion((Message = [Type, Channel, _Data], 1312 atom(Type), 1313 atom(Channel))), 1314 debug(redis(warning), '~p: Unknown message while listening: ~p', 1315 [Redis,Message]). 1316 1317 1318 /******************************* 1319 * READ/WRITE * 1320 *******************************/
nil
status(String)
true
or false
). RESP3 only.If something goes wrong, the connection is closed and an exception is raised.
1337redis_read_stream(Redis, SI, Out) :- 1338 E = error(Formal,_), 1339 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true), 1340 ( var(Formal) 1341 -> handle_push_messages(Push, Redis), 1342 ( var(Error) 1343 -> Out = Out0 1344 ; resync(Redis), 1345 throw(Error) 1346 ) 1347 ; redis_disconnect(Redis, [force(true)]), 1348 throw(E) 1349 ). 1350 1351handle_push_messages([], _). 1352handle_push_messages([H|T], Redis) :- 1353 ( catch(handle_push_message(H, Redis), E, 1354 print_message(warning, E)) 1355 -> true 1356 ; true 1357 ), 1358 handle_push_messages(T, Redis). 1359 1360handle_push_message(["pubsub"|List], Redis) :- 1361 redis_broadcast(Redis, List). 1362% some protocol version 3 push messages (such as 1363% __keyspace@* events) seem to come directly 1364% without a pubsub header 1365handle_push_message([message|List], Redis) :- 1366 redis_broadcast(Redis, [message|List]).
1376resync(Redis) :- 1377 E = error(Formal,_), 1378 catch(do_resync(Redis), E, true), 1379 ( var(Formal) 1380 -> true 1381 ; redis_disconnect(Redis, [force(true)]) 1382 ). 1383 1384do_resync(Redis) :- 1385 A is random(1_000_000_000), 1386 redis_stream(Redis, S, true), 1387 redis_write_msg(S, echo(A)), 1388 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)), 1389 time_limit_exceeded, 1390 throw(error(time_limit_exceeded,_))).
redis4pl
.
1405 /******************************* 1406 * MESSAGES * 1407 *******************************/ 1408 1409:- multifile 1410 prolog:error_message//1, 1411 prolog:message//1. 1412 1413prologerror_message(redis_error(Code, String)) --> 1414 [ 'REDIS: ~w: ~s'-[Code, String] ]. 1415 1416prologmessage(redis(retry(_Redis, _Failures, Wait, Error))) --> 1417 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ], 1418 [ ' '-[] ], '$messages':translate_message(Error)
Redis client
This library is a client to Redis, a popular key value store to deal with caching and communication between micro services.
In the typical use case we register the details of one or more Redis servers using redis_server/3. Subsequenly, redis/2-3 is used to issue commands on the server. For example:
*/