41
42:- module(redis,
43 [ redis_server/3, 44 redis_connect/1, 45 redis_connect/3, 46 redis_disconnect/1, 47 redis_disconnect/2, 48 49 redis/1, 50 redis/2, 51 redis/3, 52 53 redis_get_list/3, 54 redis_get_list/4, 55 redis_set_list/3, 56 redis_get_hash/3, 57 redis_set_hash/3, 58 redis_scan/3, 59 redis_sscan/4, 60 redis_hscan/4, 61 redis_zscan/4, 62 63 redis_subscribe/4, 64 redis_subscribe/2, 65 redis_unsubscribe/2, 66 redis_current_subscription/2, 67 redis_write/2, 68 redis_read/2, 69 70 redis_array_dict/3, 71 72 redis_property/2, 73 redis_current_command/2, 74 redis_current_command/3 75 ]). 76:- autoload(library(socket), [tcp_connect/3]). 77:- autoload(library(apply), [maplist/2, convlist/3, maplist/3, maplist/5]). 78:- autoload(library(broadcast), [broadcast/1]). 79:- autoload(library(error),
80 [ must_be/2,
81 type_error/2,
82 instantiation_error/1,
83 uninstantiation_error/1,
84 existence_error/2,
85 existence_error/3
86 ]). 87:- autoload(library(lazy_lists), [lazy_list/2]). 88:- autoload(library(lists), [append/3, member/2]). 89:- autoload(library(option), [merge_options/3, option/2,
90 option/3, select_option/4]). 91:- autoload(library(pairs), [group_pairs_by_key/2]). 92:- autoload(library(time), [call_with_time_limit/2]). 93:- use_module(library(debug), [debug/3, assertion/1]). 94:- use_module(library(settings), [setting/4, setting/2]). 95:- if(exists_source(library(ssl))). 96:- autoload(library(ssl), [ssl_context/3, ssl_negotiate/5]). 97:- endif. 98
99:- use_foreign_library(foreign(redis4pl)). 100
101:- setting(max_retry_count, nonneg, 8640, 102 "Max number of retries"). 103:- setting(max_retry_wait, number, 10,
104 "Max time to wait between recovery attempts"). 105:- setting(sentinel_timeout, number, 0.2,
106 "Time to wait for a sentinel"). 107
108:- predicate_options(redis_server/3, 3,
109 [ pass_to(redis:redis_connect/3, 3)
110 ]). 111:- predicate_options(redis_connect/3, 3,
112 [ reconnect(boolean),
113 user(atom),
114 password(atomic),
115 version(between(2,3))
116 ]). 117:- predicate_options(redis_disconnect/2, 2,
118 [ force(boolean)
119 ]). 120:- predicate_options(redis_scan/3, 3,
121 [ match(atomic),
122 count(nonneg),
123 type(atom)
124 ]). 126:- predicate_options(redis_sscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 127:- predicate_options(redis_hscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 128:- predicate_options(redis_zscan/4, 4, [pass_to(redis:redis_scan/3, 3)]). 129
130
148
149:- dynamic server/3. 150
151:- dynamic ( connection/2, 152 sentinel/2 153 ) as volatile. 154
166
167redis_server(Alias, Address, Options) :-
168 must_be(ground, Alias),
169 retractall(server(Alias, _, _)),
170 asserta(server(Alias, Address, Options)).
171
172server(default, localhost:6379, []).
173
227
228redis_connect(Conn) :-
229 redis_connect(default, Conn, []).
230
231redis_connect(Conn, Host, Port) :-
232 var(Conn),
233 ground(Host), ground(Port),
234 !, 235 redis_connect(Host:Port, Conn, []).
236redis_connect(Server, Conn, Options) :-
237 atom(Server),
238 !,
239 ( server(Server, Address, DefaultOptions)
240 -> merge_options(Options, DefaultOptions, Options2),
241 do_connect(Server, Address, Conn, [address(Address)|Options2])
242 ; existence_error(redis_server, Server)
243 ).
244redis_connect(Address, Conn, Options) :-
245 do_connect(Address, Address, Conn, [address(Address)|Options]).
246
252
253do_connect(Id, sentinel(Pool), Conn, Options) =>
254 sentinel_master(Id, Pool, Conn, Options).
255do_connect(Id, Address0, Conn, Options) =>
256 tcp_address(Address0, Address),
257 tcp_connect(Address, Stream0, Options),
258 tls_upgrade(Address, Stream0, Stream, Options),
259 Conn = redis_connection(Id, Stream, 0, Options),
260 hello(Conn, Options).
261
262tcp_address(unix(Path), Path) :-
263 !. 264tcp_address(Address, Address).
265
269
270:- if(current_predicate(ssl_context/3)). 271tls_upgrade(Host:_Port, Raw, Stream, Options) :-
272 option(tls(true), Options),
273 !,
274 must_have_option(cacert(CacertFile), Options),
275 must_have_option(key(KeyFile), Options),
276 must_have_option(cert(CertFile), Options),
277 ssl_context(client, SSL,
278 [ host(Host),
279 certificate_file(CertFile),
280 key_file(KeyFile),
281 cacerts([file(CacertFile)]),
282 cert_verify_hook(tls_verify),
283 close_parent(true)
284 ]),
285 stream_pair(Raw, RawRead, RawWrite),
286 ssl_negotiate(SSL, RawRead, RawWrite, Read, Write),
287 stream_pair(Stream, Read, Write).
288:- endif. 289tls_upgrade(_, Stream, Stream, _).
290
291:- if(current_predicate(ssl_context/3)). 292
298
299:- public tls_verify/5. 300tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, verified) :-
301 !.
302tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, hostname_mismatch) :-
303 !.
304tls_verify(_SSL, _ProblemCert, _AllCerts, _FirstCert, _Error) :-
305 fail.
306
307:- endif. 308
312
313sentinel_master(Id, Pool, Master, Options) :-
314 must_have_option(sentinels(Sentinels), Options),
315 sentinel_auth(Options, Options1),
316 setting(sentinel_timeout, TMO),
317 ( sentinel(Pool, Sentinel)
318 ; member(Sentinel, Sentinels)
319 ),
320 catch(call_with_time_limit(
321 TMO,
322 do_connect(Id, Sentinel, Conn,
323 [sentinel(true)|Options1])),
324 Error,
325 (print_message(warning, Error),fail)),
326 !,
327 debug(redis(sentinel), 'Connected to sentinel at ~p', [Sentinel]),
328 call_cleanup(
329 query_sentinel(Pool, Conn, Sentinel, MasterAddr),
330 redis_disconnect(Conn)),
331 debug(redis(sentinel), 'Sentinel claims master is at ~p', [MasterAddr]),
332 do_connect(Id, MasterAddr, Master, Options),
333 debug(redis(sentinel), 'Connected to claimed master', []),
334 redis(Master, role, Role),
335 ( Role = [master|_Slaves]
336 -> debug(redis(sentinel), 'Verified role at ~p', [MasterAddr])
337 ; redis_disconnect(Master),
338 debug(redis(sentinel), '~p is not the master: ~p', [MasterAddr, Role]),
339 sleep(TMO),
340 sentinel_master(Id, Pool, Master, Options)
341 ).
342
343sentinel_auth(Options0, Options) :-
344 option(sentinel_user(User), Options0),
345 option(sentinel_password(Passwd), Options0),
346 !,
347 merge_options([user(User), password(Passwd)], Options0, Options).
348sentinel_auth(Options0, Options) :-
349 select_option(password(_), Options0, Options, _).
350
351
352query_sentinel(Pool, Conn, Sentinel, Host:Port) :-
353 redis(Conn, sentinel('get-master-addr-by-name', Pool), MasterData),
354 MasterData = [Host,Port],
355 redis(Conn, sentinel(sentinels, Pool), Peers),
356 transaction(update_known_sentinels(Pool, Sentinel, Peers)).
357
358update_known_sentinels(Pool, Sentinel, Peers) :-
359 retractall(sentinel(Pool, _)),
360 maplist(update_peer_sentinel(Pool), Peers),
361 asserta(sentinel(Pool, Sentinel)).
362
363update_peer_sentinel(Pool, Attrs),
364 memberchk(ip-Host, Attrs),
365 memberchk(port-Port, Attrs) =>
366 asserta(sentinel(Pool, Host:Port)).
367
368must_have_option(Opt, Options) :-
369 option(Opt, Options),
370 !.
371must_have_option(Opt, Options) :-
372 existence_error(option, Opt, Options).
373
378
379hello(Con, Options) :-
380 option(version(V), Options),
381 V >= 3,
382 !,
383 ( option(user(User), Options),
384 option(password(Password), Options)
385 -> redis(Con, hello(3, auth, User, Password))
386 ; redis(Con, hello(3))
387 ).
388hello(Con, Options) :-
389 option(password(Password), Options),
390 !,
391 redis(Con, auth(Password)).
392hello(_, _).
393
400
401redis_stream(Var, S, _) :-
402 ( var(Var)
403 -> !, instantiation_error(Var)
404 ; nonvar(S)
405 -> !, uninstantiation_error(S)
406 ).
407redis_stream(ServerName, S, Connect) :-
408 atom(ServerName),
409 !,
410 ( connection(ServerName, S0)
411 -> S = S0
412 ; Connect == true,
413 server(ServerName, Address, Options)
414 -> redis_connect(Address, Connection, Options),
415 redis_stream(Connection, S, false),
416 asserta(connection(ServerName, S))
417 ; existence_error(redis_server, ServerName)
418 ).
419redis_stream(redis_connection(_,S0,_,_), S, _) :-
420 S0 \== (-),
421 !,
422 S = S0.
423redis_stream(Redis, S, _) :-
424 Redis = redis_connection(Id,-,_,Options),
425 option(address(Address), Options),
426 do_connect(Id,Address,Redis2,Options),
427 arg(2, Redis2, S0),
428 nb_setarg(2, Redis, S0),
429 S = S0.
430
431has_redis_stream(Var, _) :-
432 var(Var),
433 !,
434 instantiation_error(Var).
435has_redis_stream(Alias, S) :-
436 atom(Alias),
437 !,
438 connection(Alias, S).
439has_redis_stream(redis_connection(_,S,_,_), S) :-
440 S \== (-).
441
442
455
456redis_disconnect(Redis) :-
457 redis_disconnect(Redis, []).
458
459redis_disconnect(Redis, Options) :-
460 option(force(true), Options),
461 !,
462 ( Redis = redis_connection(_Id, S, _, _Opts)
463 -> ( S == (-)
464 -> true
465 ; close(S, [force(true)]),
466 nb_setarg(2, Redis, -)
467 )
468 ; has_redis_stream(Redis, S)
469 -> close(S, [force(true)]),
470 retractall(connection(_,S))
471 ; true
472 ).
473redis_disconnect(Redis, _Options) :-
474 redis_stream(Redis, S, false),
475 close(S),
476 retractall(connection(_,S)).
477
519
520redis(Redis, PipeLine) :-
521 is_list(PipeLine),
522 !,
523 redis_pipeline(Redis, PipeLine).
524redis(Redis, Req) :-
525 redis(Redis, Req, _).
526
644
645redis(Redis, Req, Out) :-
646 out_val(Out, Val),
647 redis1(Redis, Req, Out),
648 Val \== nil.
649
650out_val(Out, Val) :-
651 ( nonvar(Out),
652 Out = (Val as _)
653 -> true
654 ; Val = Out
655 ).
656
657redis1(Redis, Req, Out) :-
658 Error = error(Formal, _),
659 catch(redis2(Redis, Req, Out), Error, true),
660 ( var(Formal)
661 -> true
662 ; recover(Error, Redis, redis1(Redis, Req, Out))
663 ).
664
665redis2(Redis, Req, Out) :-
666 atom(Redis),
667 !,
668 redis_stream(Redis, S, true),
669 with_mutex(Redis,
670 ( redis_write_msg(S, Req),
671 redis_read_stream(Redis, S, Out)
672 )).
673redis2(Redis, Req, Out) :-
674 redis_stream(Redis, S, true),
675 redis_write_msg(S, Req),
676 redis_read_stream(Redis, S, Out).
677
679
680redis_pipeline(Redis, PipeLine) :-
681 Error = error(Formal, _),
682 catch(redis_pipeline2(Redis, PipeLine), Error, true),
683 ( var(Formal)
684 -> true
685 ; recover(Error, Redis, redis_pipeline(Redis, PipeLine))
686 ).
687
688redis_pipeline2(Redis, PipeLine) :-
689 atom(Redis),
690 !,
691 redis_stream(Redis, S, true),
692 with_mutex(Redis,
693 redis_pipeline3(Redis, S, PipeLine)).
694redis_pipeline2(Redis, PipeLine) :-
695 redis_stream(Redis, S, true),
696 redis_pipeline3(Redis, S, PipeLine).
697
698redis_pipeline3(Redis, S, PipeLine) :-
699 maplist(write_pipeline(S), PipeLine),
700 flush_output(S),
701 read_pipeline(Redis, S, PipeLine).
702
703write_pipeline(S, Command -> _Reply) :-
704 !,
705 redis_write_msg_no_flush(S, Command).
706write_pipeline(S, Command) :-
707 redis_write_msg_no_flush(S, Command).
708
709read_pipeline(Redis, S, PipeLine) :-
710 E = error(Formal,_),
711 catch(read_pipeline2(Redis, S, PipeLine), E, true),
712 ( var(Formal)
713 -> true
714 ; reconnect_error(E)
715 -> redis_disconnect(Redis, [force(true)]),
716 throw(E)
717 ; resync(Redis),
718 throw(E)
719 ).
720
721read_pipeline2(Redis, S, PipeLine) :-
722 maplist(redis_read_msg3(S), PipeLine, Replies, Errors, Pushed),
723 maplist(handle_push(Redis), Pushed),
724 maplist(handle_error, Errors),
725 maplist(bind_reply, PipeLine, Replies).
726
727redis_read_msg3(S, _Command -> ReplyIn, Reply, Error, Push) :-
728 !,
729 redis_read_msg(S, ReplyIn, Reply, Error, Push).
730redis_read_msg3(S, Var, Reply, Error, Push) :-
731 redis_read_msg(S, Var, Reply, Error, Push).
732
733handle_push(Redis, Pushed) :-
734 handle_push_messages(Pushed, Redis).
735handle_error(Error) :-
736 ( var(Error)
737 -> true
738 ; throw(Error)
739 ).
740bind_reply(_Command -> Reply0, Reply) :-
741 !,
742 Reply0 = Reply.
743bind_reply(_Command, _).
744
745
751
752:- meta_predicate recover(+, +, 0). 753
754recover(Error, Redis, Goal) :-
755 Error = error(Formal, _),
756 reconnect_error(Formal),
757 auto_reconnect(Redis),
758 !,
759 debug(redis(recover), '~p: got error ~p; trying to reconnect',
760 [Redis, Error]),
761 redis_disconnect(Redis, [force(true)]),
762 ( wait_to_retry(Redis, Error)
763 -> call(Goal),
764 retractall(failure(Redis, _))
765 ; throw(Error)
766 ).
767recover(Error, _, _) :-
768 throw(Error).
769
770auto_reconnect(redis_connection(_,_,_,Options)) :-
771 !,
772 option(reconnect(true), Options).
773auto_reconnect(Server) :-
774 ground(Server),
775 server(Server, _, Options),
776 option(reconnect(true), Options, true).
777
778reconnect_error(io_error(_Action, _On)).
779reconnect_error(socket_error(_Code, _)).
780reconnect_error(syntax_error(unexpected_eof)).
781reconnect_error(existence_error(stream, _)).
782
789
790:- dynamic failure/2 as volatile. 791
792wait_to_retry(Redis, Error) :-
793 redis_failures(Redis, Failures),
794 setting(max_retry_count, Count),
795 Failures < Count,
796 Failures2 is Failures+1,
797 redis_set_failures(Redis, Failures2),
798 setting(max_retry_wait, MaxWait),
799 Wait is min(MaxWait*100, 1<<Failures)/100.0,
800 debug(redis(recover), ' Sleeping ~p seconds', [Wait]),
801 retry_message_level(Failures, Level),
802 print_message(Level, redis(retry(Redis, Failures, Wait, Error))),
803 sleep(Wait).
804
805redis_failures(redis_connection(_,_,Failures0,_), Failures) :-
806 !,
807 Failures = Failures0.
808redis_failures(Server, Failures) :-
809 atom(Server),
810 ( failure(Server, Failures)
811 -> true
812 ; Failures = 0
813 ).
814
815redis_set_failures(Connection, Count) :-
816 compound(Connection),
817 !,
818 nb_setarg(3, Connection, Count).
819redis_set_failures(Server, Count) :-
820 atom(Server),
821 retractall(failure(Server, _)),
822 asserta(failure(Server, Count)).
823
824retry_message_level(0, warning) :- !.
825retry_message_level(_, silent).
826
827
833
834redis(Req) :-
835 setup_call_cleanup(
836 redis_connect(default, C, []),
837 redis1(C, Req, Out),
838 redis_disconnect(C)),
839 print(Out).
840
846
847redis_write(Redis, Command) :-
848 redis_stream(Redis, S, true),
849 redis_write_msg(S, Command).
850
851redis_read(Redis, Reply) :-
852 redis_stream(Redis, S, true),
853 redis_read_stream(Redis, S, Reply).
854
855
856 859
874
875redis_get_list(Redis, Key, List) :-
876 redis_get_list(Redis, Key, -1, List).
877
878redis_get_list(Redis, Key, Chunk, List) :-
879 redis(Redis, llen(Key), Len),
880 ( ( Chunk >= Len
881 ; Chunk == -1
882 )
883 -> ( Len == 0
884 -> List = []
885 ; End is Len-1,
886 list_range(Redis, Key, 0, End, List)
887 )
888 ; lazy_list(rlist_next(s(Redis,Key,0,Chunk,Len)), List)
889 ).
890
891rlist_next(State, List, Tail) :-
892 State = s(Redis,Key,Offset,Slice,Len),
893 End is min(Len-1, Offset+Slice-1),
894 list_range(Redis, Key, Offset, End, Elems),
895 ( End =:= Len-1
896 -> List = Elems,
897 Tail = []
898 ; Offset2 is Offset+Slice,
899 nb_setarg(3, State, Offset2),
900 append(Elems, Tail, List)
901 ).
902
904
905list_range(DB, Key, Start, Start, [Elem]) :-
906 !,
907 redis(DB, lindex(Key, Start), Elem).
908list_range(DB, Key, Start, End, List) :-
909 !,
910 redis(DB, lrange(Key, Start, End), List).
911
912
913
920
921redis_set_list(Redis, Key, List) :-
922 redis(Redis, del(Key), _),
923 ( List == []
924 -> true
925 ; Term =.. [rpush,Key|List],
926 redis(Redis, Term, _Count)
927 ).
928
929
939
940redis_get_hash(Redis, Key, Dict) :-
941 redis(Redis, hgetall(Key), Dict as dict(auto)).
942
943redis_set_hash(Redis, Key, Dict) :-
944 redis_array_dict(Array, _, Dict),
945 Term =.. [hset,Key|Array],
946 redis(Redis, del(Key), _),
947 redis(Redis, Term, _Count).
948
957
958redis_array_dict(Array, Tag, Dict) :-
959 nonvar(Array),
960 !,
961 array_to_pairs(Array, Pairs),
962 dict_pairs(Dict, Tag, Pairs).
963redis_array_dict(TwoList, Tag, Dict) :-
964 dict_pairs(Dict, Tag, Pairs),
965 pairs_to_array(Pairs, TwoList).
966
967array_to_pairs([], []) :-
968 !.
969array_to_pairs([NameS-Value|T0], [Name-Value|T]) :-
970 !, 971 atom_string(Name, NameS),
972 array_to_pairs(T0, T).
973array_to_pairs([NameS,Value|T0], [Name-Value|T]) :-
974 atom_string(Name, NameS),
975 array_to_pairs(T0, T).
976
977pairs_to_array([], []) :-
978 !.
979pairs_to_array([Name-Value|T0], [NameS,Value|T]) :-
980 atom_string(Name, NameS),
981 pairs_to_array(T0, T).
982
1004
1005redis_scan(Redis, LazyList, Options) :-
1006 scan_options([match,count,type], Options, Parms),
1007 lazy_list(scan_next(s(scan,Redis,0,Parms)), LazyList).
1008
1009redis_sscan(Redis, Set, LazyList, Options) :-
1010 scan_options([match,count,type], Options, Parms),
1011 lazy_list(scan_next(s(sscan(Set),Redis,0,Parms)), LazyList).
1012
1013redis_hscan(Redis, Hash, LazyList, Options) :-
1014 scan_options([match,count,type], Options, Parms),
1015 lazy_list(scan_next(s(hscan(Hash),Redis,0,Parms)), LazyList).
1016
1017redis_zscan(Redis, Set, LazyList, Options) :-
1018 scan_options([match,count,type], Options, Parms),
1019 lazy_list(scan_next(s(zscan(Set),Redis,0,Parms)), LazyList).
1020
1021scan_options([], _, []).
1022scan_options([H|T0], Options, [H,V|T]) :-
1023 Term =.. [H,V],
1024 option(Term, Options),
1025 !,
1026 scan_options(T0, Options, T).
1027scan_options([_|T0], Options, T) :-
1028 scan_options(T0, Options, T).
1029
1030
1031scan_next(State, List, Tail) :-
1032 State = s(Command,Redis,Cursor,Params),
1033 Command =.. CList,
1034 append(CList, [Cursor|Params], CList2),
1035 Term =.. CList2,
1036 redis(Redis, Term, [NewCursor,Elems0]),
1037 scan_pairs(Command, Elems0, Elems),
1038 ( NewCursor == 0
1039 -> List = Elems,
1040 Tail = []
1041 ; nb_setarg(3, State, NewCursor),
1042 append(Elems, Tail, List)
1043 ).
1044
1045scan_pairs(hscan(_), List, Pairs) :-
1046 !,
1047 scan_pairs(List, Pairs).
1048scan_pairs(zscan(_), List, Pairs) :-
1049 !,
1050 scan_pairs(List, Pairs).
1051scan_pairs(_, List, List).
1052
1053scan_pairs([], []).
1054scan_pairs([Key,Value|T0], [Key-Value|T]) :-
1055 !,
1056 scan_pairs(T0, T).
1057scan_pairs([Key-Value|T0], [Key-Value|T]) :-
1058 scan_pairs(T0, T).
1059
1060
1061 1064
1071
1072redis_current_command(Redis, Command) :-
1073 redis_current_command(Redis, Command, _).
1074
1075redis_current_command(Redis, Command, Properties) :-
1076 nonvar(Command),
1077 !,
1078 redis(Redis, command(info, Command), [[_|Properties]]).
1079redis_current_command(Redis, Command, Properties) :-
1080 redis(Redis, command, Commands),
1081 member([Name|Properties], Commands),
1082 atom_string(Command, Name).
1083
1089
1090redis_property(Redis, Property) :-
1091 redis(Redis, info, String),
1092 info_terms(String, Terms),
1093 member(Property, Terms).
1094
1095info_terms(Info, Pairs) :-
1096 split_string(Info, "\n", "\r\n ", Lines),
1097 convlist(info_line_term, Lines, Pairs).
1098
1099info_line_term(Line, Term) :-
1100 sub_string(Line, B, _, A, :),
1101 !,
1102 sub_atom(Line, 0, B, _, Name),
1103 \+ sub_atom(Name, _, _, 0, '_human'),
1104 sub_string(Line, _, A, 0, ValueS),
1105 ( number_string(Value, ValueS)
1106 -> true
1107 ; Value = ValueS
1108 ),
1109 Term =.. [Name,Value].
1110
1111
1112 1115
1143
1144:- dynamic ( subscription/2, 1145 listening/3 1146 ) as volatile. 1147
1148redis_subscribe(Redis, Spec, Id, Options) :-
1149 atom(Redis),
1150 !,
1151 channels(Spec, Channels),
1152 pubsub_thread_options(ThreadOptions, Options),
1153 thread_create(setup_call_cleanup(
1154 redis_connect(Redis, Conn, [reconnect(true)]),
1155 redis_subscribe1(Redis, Conn, Channels),
1156 redis_disconnect(Conn)),
1157 Thread,
1158 ThreadOptions),
1159 pubsub_id(Thread, Id).
1160redis_subscribe(Redis, Spec, Id, Options) :-
1161 channels(Spec, Channels),
1162 pubsub_thread_options(ThreadOptions, Options),
1163 thread_create(redis_subscribe1(Redis, Redis, Channels),
1164 Thread,
1165 ThreadOptions),
1166 pubsub_id(Thread, Id).
1167
1168pubsub_thread_options(ThreadOptions, Options) :-
1169 merge_options(Options, [detached(true)], ThreadOptions).
1170
1171pubsub_id(Thread, Thread).
1175
1176redis_subscribe1(Redis, Conn, Channels) :-
1177 Error = error(Formal, _),
1178 catch(redis_subscribe2(Redis, Conn, Channels), Error, true),
1179 ( var(Formal)
1180 -> true
1181 ; recover(Error, Conn, redis1(Conn, echo("reconnect"), _)),
1182 thread_self(Me),
1183 pubsub_id(Me, Id),
1184 findall(Channel, subscription(Id, Channel), CurrentChannels),
1185 redis_subscribe1(Redis, Conn, CurrentChannels)
1186 ).
1187
1188redis_subscribe2(Redis, Conn, Channels) :-
1189 redis_subscribe3(Conn, Channels),
1190 redis_listen(Redis, Conn).
1191
1192redis_subscribe3(Conn, Channels) :-
1193 thread_self(Me),
1194 pubsub_id(Me, Id),
1195 prolog_listen(this_thread_exit, pubsub_clean(Id)),
1196 maplist(register_subscription(Id), Channels),
1197 redis_stream(Conn, S, true),
1198 Req =.. [subscribe|Channels],
1199 redis_write_msg(S, Req).
1200
1201pubsub_clean(Id) :-
1202 retractall(listening(Id, _Connection, _Thread)),
1203 retractall(subscription(Id, _Channel)).
1204
1214
1215redis_subscribe(Id, Spec) :-
1216 channels(Spec, Channels),
1217 ( listening(Id, Connection, _Thread)
1218 -> true
1219 ; existence_error(redis_pubsub, Id)
1220 ),
1221 maplist(register_subscription(Id), Channels),
1222 redis_stream(Connection, S, true),
1223 Req =.. [subscribe|Channels],
1224 redis_write_msg(S, Req).
1225
1226redis_unsubscribe(Id, Spec) :-
1227 channels(Spec, Channels),
1228 ( listening(Id, Connection, _Thread)
1229 -> true
1230 ; existence_error(redis_pubsub, Id)
1231 ),
1232 maplist(unregister_subscription(Id), Channels),
1233 redis_stream(Connection, S, true),
1234 Req =.. [unsubscribe|Channels],
1235 redis_write_msg(S, Req).
1236
1240
1241redis_current_subscription(Id, Channels) :-
1242 findall(Id-Channel, subscription(Id, Channel), Pairs),
1243 keysort(Pairs, Sorted),
1244 group_pairs_by_key(Sorted, Grouped),
1245 member(Id-Channels, Grouped).
1246
1247channels(Spec, List) :-
1248 is_list(Spec),
1249 !,
1250 maplist(channel_name, Spec, List).
1251channels(Ch, [Key]) :-
1252 channel_name(Ch, Key).
1253
1254channel_name(Atom, Atom) :-
1255 atom(Atom),
1256 !.
1257channel_name(Key, Atom) :-
1258 phrase(key_parts(Key), Parts),
1259 !,
1260 atomic_list_concat(Parts, :, Atom).
1261channel_name(Key, _) :-
1262 type_error(redis_key, Key).
1263
1264key_parts(Var) -->
1265 { var(Var), !, fail }.
1266key_parts(Atom) -->
1267 { atom(Atom) },
1268 !,
1269 [Atom].
1270key_parts(A:B) -->
1271 key_parts(A),
1272 key_parts(B).
1273
1274
1275
1276
1277register_subscription(Id, Channel) :-
1278 ( subscription(Id, Channel)
1279 -> true
1280 ; assertz(subscription(Id, Channel))
1281 ).
1282
1283unregister_subscription(Id, Channel) :-
1284 retractall(subscription(Id, Channel)).
1285
1286redis_listen(Redis, Conn) :-
1287 thread_self(Me),
1288 pubsub_id(Me, Id),
1289 setup_call_cleanup(
1290 assertz(listening(Id, Conn, Me), Ref),
1291 redis_listen_loop(Redis, Id, Conn),
1292 erase(Ref)).
1293
1294redis_listen_loop(Redis, Id, Conn) :-
1295 redis_stream(Conn, S, true),
1296 ( subscription(Id, _)
1297 -> redis_read_stream(Redis, S, Reply),
1298 redis_broadcast(Redis, Reply),
1299 redis_listen_loop(Redis, Id, Conn)
1300 ; true
1301 ).
1302
1303redis_broadcast(_, [subscribe, _Channel, _N]) :-
1304 !.
1305redis_broadcast(Redis, [message, Channel, Data]) :-
1306 !,
1307 catch(broadcast(redis(Redis, Channel, Data)),
1308 Error,
1309 print_message(error, Error)).
1310redis_broadcast(Redis, Message) :-
1311 assertion((Message = [Type, Channel, _Data],
1312 atom(Type),
1313 atom(Channel))),
1314 debug(redis(warning), '~p: Unknown message while listening: ~p',
1315 [Redis,Message]).
1316
1317
1318 1321
1336
1337redis_read_stream(Redis, SI, Out) :-
1338 E = error(Formal,_),
1339 catch(redis_read_msg(SI, Out, Out0, Error, Push), E, true),
1340 ( var(Formal)
1341 -> handle_push_messages(Push, Redis),
1342 ( var(Error)
1343 -> Out = Out0
1344 ; resync(Redis),
1345 throw(Error)
1346 )
1347 ; redis_disconnect(Redis, [force(true)]),
1348 throw(E)
1349 ).
1350
1351handle_push_messages([], _).
1352handle_push_messages([H|T], Redis) :-
1353 ( catch(handle_push_message(H, Redis), E,
1354 print_message(warning, E))
1355 -> true
1356 ; true
1357 ),
1358 handle_push_messages(T, Redis).
1359
1360handle_push_message(["pubsub"|List], Redis) :-
1361 redis_broadcast(Redis, List).
1365handle_push_message([message|List], Redis) :-
1366 redis_broadcast(Redis, [message|List]).
1367
1368
1375
1376resync(Redis) :-
1377 E = error(Formal,_),
1378 catch(do_resync(Redis), E, true),
1379 ( var(Formal)
1380 -> true
1381 ; redis_disconnect(Redis, [force(true)])
1382 ).
1383
1384do_resync(Redis) :-
1385 A is random(1_000_000_000),
1386 redis_stream(Redis, S, true),
1387 redis_write_msg(S, echo(A)),
1388 catch(call_with_time_limit(0.2, '$redis_resync'(S, A)),
1389 time_limit_exceeded,
1390 throw(error(time_limit_exceeded,_))).
1391
1392
1402
1403
1404
1405 1408
1409:- multifile
1410 prolog:error_message//1,
1411 prolog:message//1. 1412
1413prolog:error_message(redis_error(Code, String)) -->
1414 [ 'REDIS: ~w: ~s'-[Code, String] ].
1415
1416prolog:message(redis(retry(_Redis, _Failures, Wait, Error))) -->
1417 [ 'REDIS: connection error. Retrying in ~2f seconds'-[Wait], nl ],
1418 [ ' '-[] ], '$messages':translate_message(Error)