36
37:- module(swish_redis,
38 [ reinit_redis/0,
39 redis_swish_stream/2, 40 redis_consumer/1, 41 swish_cluster/1 42 ]). 43:- use_module(library(redis)). 44:- use_module(library(redis_streams)). 45:- use_module(library(broadcast)). 46:- use_module(library(option)). 47:- use_module(library(socket)). 48:- use_module(library(apply)). 49:- use_module(library(pairs)). 50:- use_module(library(http/http_path)). 51:- use_module(library(http/http_dispatch)). 52:- use_module(library(http/http_json)). 53
54:- use_module(config).
70:- multifile
71 stream/2. 72
73:- listen(http(pre_server_start(Port)),
74 init_redis(Port)). 75
76:- dynamic
77 port/1, 78 thread/1. 79
80init_redis(_Port) :-
81 \+ swish_config:config(redis, _),
82 !.
83init_redis(_Port) :-
84 catch(thread_property(redis_listener, id(_)), error(_,_), fail),
85 !.
86init_redis(Port) :-
87 init_pubsub,
88 retractall(port(_)),
89 asserta(port(Port)),
90 findall(Group-S, group_stream(S, Group), Pairs),
91 keysort(Pairs, Sorted),
92 group_pairs_by_key(Sorted, Grouped),
93 consumer(Port, Consumer),
94 maplist(create_listener(Consumer), Grouped),
95 publish_consumer(Consumer).
96
97create_listener(_, (-)-Streams) :-
98 !,
99 thread_create(xlisten(swish, Streams, []),
100 Id, [ alias(redis_no_group)
101 ]),
102 assertz(thread(Id)).
103create_listener(Consumer, Group-Streams) :-
104 atom_concat(redis_, Group, Alias),
105 thread_create(xlisten_group(swish, Group, Consumer, Streams,
106 [ block(1)
107 ]),
108 Id, [ alias(Alias)
109 ]),
110 assertz(thread(Id)).
117reinit_redis :-
118 forall(retract(thread(Id)),
119 catch(stop_listener(Id), error(_,_), true)),
120 port(Port),
121 init_redis(Port).
122
123stop_listener(Id) :-
124 thread_signal(Id, redis(stop(false))),
125 thread_join(Id, _).
126
127group_stream(Key, Group) :-
128 stream(Name, Options),
129 redis_swish_stream(Name, Key),
130 option(max_len(MaxLen), Options, 1000),
131 option(group(Group), Options, -),
132 add_consumer_group(Group, Key),
133 xstream_set(swish, Key, maxlen(MaxLen)).
134
135add_consumer_group(-, _) :-
136 !.
137add_consumer_group(Group, Key) :-
138 catch(redis(swish, xgroup(create, Key, Group, $, mkstream), _),
139 error(redis_error(busygroup,_),_),
140 true).
141
142redis_swish_stream(Name, Key) :-
143 swish_config(redis_prefix, Prefix),
144 atomic_list_concat([Prefix, Name], :, Key).
152:- dynamic consumer/1. 153
154consumer(_, Consumer) :-
155 consumer(Consumer0), !,
156 Consumer = Consumer0.
157consumer(Address, Consumer) :-
158 address_consumer(Address, Consumer0),
159 asserta(consumer(Consumer0)),
160 Consumer = Consumer0.
161
162address_consumer(_, Consumer) :-
163 swish_config(redis_consumer, Consumer),
164 !.
165address_consumer(Host:Port, Consumer) :-
166 !,
167 atomic_list_concat([Host,Port], :, Consumer).
168address_consumer(Port, Consumer) :-
169 gethostname(Host),
170 atomic_list_concat([Host,Port], :, Consumer).
176redis_consumer(Consumer) :-
177 consumer(Consumer).
178
179publish_consumer(Consumer) :-
180 http_absolute_uri(swish(.), URL),
181 consumer_key(Server, Key),
182 redis(Server, hset(Key:url, Consumer, URL)),
183 redis(Server, publish(swish:swish, joined(Consumer, URL) as prolog), Count),
184 print_message(informational, swish(redis_peers(Count))),
185 at_halt(publish_halt).
186
188:- listen(http(shutdown), publish_halt). 189
190publish_halt :-
191 redis_consumer(Consumer),
192 consumer_key(Server, Key),
193 ( redis(Server, hdel(Key:url, Consumer), 0)
194 -> true
195 ; redis(Server, publish(swish:swish, left(Consumer) as prolog), _Count)
196 ).
197
198consumer_key(swish, Key) :-
199 swish_config(redis_prefix, Prefix),
200 atomic_list_concat([Prefix, consumer], :, Key).
207swish_cluster(Pairs) :-
208 consumer_key(Server, Key),
209 redis(Server, hgetall(Key:url), Pairs).
210
211:- http_handler(swish(backends), backends, [id(backends)]). 212
213backends(_Request) :-
214 swish_cluster(Pairs),
215 maplist(backend_stats, Pairs, Pairs1),
216 dict_pairs(Dict, json, Pairs1),
217 reply_json(Dict).
218
219backend_stats(Consumer-URL, Consumer-Stat) :-
220 broadcast_request(swish(backend_status(Consumer, Stat0))),
221 !,
222 Stat = Stat0.put(url, URL).
223backend_stats(Consumer-URL, Consumer-json{url:URL}).
230init_pubsub :-
231 redis_current_subscription(redis_pubsub, _),
232 !.
233init_pubsub :-
234 redis_subscribe(swish,
235 [ swish:swish, 236 swish:chat, 237 swish:gitty 238 ],
239 _,
240 [ alias(redis_pubsub)
241 ]).
242
243:- initialization
244 listen(redis(_, 'swish:swish', Message),
245 swish_message(swish(Message))). 246
247swish_message(Message) :-
248 print_message(informational, Message).
249
250:- multifile prolog:message//1. 251
252prolog:message(swish(redis_peers(Count))) -->
253 [ 'Redis: the are ~d peers in the cluster'-[Count] ].
254prolog:message(swish(joined(Consumer, URL))) -->
255 ( { redis_consumer(Consumer) }
256 -> []
257 ; [ 'Redis: ~w joined the cluster, at ~w'-[Consumer, URL] ]
258 ).
259prolog:message(swish(left(Consumer))) -->
260 ( { redis_consumer(Consumer) }
261 -> []
262 ; [ 'Redis: ~w left the cluster'-[Consumer] ]
263 )
Redis stream connection
Setup to listening to redis events. We need all the push facilities of Redis:
XREAD
Note that config-available sets up the redis server using the alias
swish
. Streams (redis keys) to listen on are registered using the multifile predicate stream/2. */