36
37:- module(chat_store,
38 [ chat_store/1, 39 chat_messages/3 40 ]). 41:- use_module(library(settings)). 42:- use_module(library(filesex)). 43:- use_module(library(option)). 44:- use_module(library(sha)). 45:- use_module(library(apply)). 46:- use_module(library(http/http_dispatch)). 47:- use_module(library(http/http_parameters)). 48:- use_module(library(http/http_json)). 49
50:- use_module(config). 51
52:- http_handler(swish(chat/messages), chat_messages, [ id(chat_messages) ]). 53:- http_handler(swish(chat/status), chat_status, [ id(chat_status) ]). 54
55:- setting(directory, callable, data(chat),
56 'The directory for storing chat messages.'). 57
66
67:- multifile
68 swish_config:chat_count_about/2. 69
70:- listen(http(pre_server_start),
71 open_chatstore). 72
74
75redis_docid_key(DocId, Server, Key) :-
76 swish_config(redis, Server),
77 swish_config(redis_prefix, Prefix),
78 atomic_list_concat([Prefix, chat, docid, DocId], :, Key).
79
80uses_redis :-
81 swish_config(redis, _).
82
83:- dynamic storage_dir/1. 84:- volatile storage_dir/1. 85
86open_chatstore :-
87 uses_redis,
88 !.
89open_chatstore :-
90 storage_dir(_),
91 !.
92open_chatstore :-
93 with_mutex(chat_store, open_chatstore_guarded).
94
95open_chatstore_guarded :-
96 storage_dir(_),
97 !.
98open_chatstore_guarded :-
99 setting(directory, Spec),
100 absolute_file_name(Spec, Dir,
101 [ file_type(directory),
102 access(write),
103 file_errors(fail)
104 ]), !,
105 asserta(storage_dir(Dir)).
106open_chatstore_guarded :-
107 setting(directory, Spec),
108 absolute_file_name(Spec, Dir,
109 [ solutions(all)
110 ]),
111 \+ exists_directory(Dir),
112 catch(make_directory(Dir),
113 error(permission_error(create, directory, Dir), _),
114 fail), !,
115 asserta(storage_dir(Dir)).
116
120
121chat_dir_file(DocID, Path, File) :-
122 open_chatstore,
123 sha_hash(DocID, Bin, []),
124 hash_atom(Bin, Hash),
125 sub_atom(Hash, 0, 2, _, D1),
126 sub_atom(Hash, 2, 2, _, D2),
127 sub_atom(Hash, 4, _, 0, Name),
128 storage_dir(Dir),
129 atomic_list_concat([Dir, D1, D2], /, Path),
130 atomic_list_concat([Path, Name], /, File).
131
136
137existing_chat_file(DocID, File) :-
138 chat_dir_file(DocID, _, File),
139 exists_file(File).
140
147
148chat_store(Message0) :-
149 uses_redis,
150 !,
151 ( prepare_message(Message0, DocID, Create, Message),
152 redis_docid_key(DocID, Server, Key),
153 ( Create == false
154 -> redis(Server, exists(Key), 1)
155 ; true
156 )
157 -> Score is integer(Message.time*1000),
158 redis(Server, zadd(Key, nx, Score, Message as prolog))
159 ; true
160 ).
161chat_store(Message0) :-
162 prepare_message(Message0, DocID, Create, Message),
163 chat_dir_file(DocID, Dir, File),
164 ( Create == false
165 -> exists_file(File)
166 ; true
167 ),
168 !,
169 make_directory_path(Dir),
170 with_mutex(chat_store,
171 ( setup_call_cleanup(
172 open(File, append, Out, [encoding(utf8)]),
173 format(Out, '~q.~n', [Message]),
174 close(Out)),
175 increment_message_count(DocID)
176 )).
177chat_store(_).
178
179prepare_message(Message0, DocID, Create, Message) :-
180 chat{docid:DocIDS} :< Message0,
181 atom_string(DocID, DocIDS),
182 ( del_dict(create, Message0, false, Message1)
183 -> Create = false
184 ; Create = true,
185 Message1 = Message0
186 ),
187 strip_chat(Message1, Message).
188
189
190
195
196strip_chat(Message0, Message) :-
197 strip_chat_user(Message0.get(user), User),
198 !,
199 Message = Message0.put(user, User).
200strip_chat(Message, Message).
201
202strip_chat_user(User0, User) :-
203 del_dict(wsid, User0, _, User),
204 !.
205strip_chat_user(User, User).
206
207
216
217chat_messages(DocID, Messages, Options) :-
218 redis_docid_key(DocID, Server, Key),
219 !,
220 ( option(max(Max), Options)
221 -> Start is -Max,
222 redis(Server, zrange(Key, Start, -1), Messages0),
223 filter_old(Messages0, Messages, Options)
224 ; option(after(Time), Options)
225 -> Score is integer(Time*1000)+1,
226 redis(Server, zrangebyscore(Key, Score, +inf), Messages)
227 ; redis(Server, zrange(Key, 0, -1), Messages)
228 ).
229chat_messages(DocID, Messages, Options) :-
230 chat_messages_from_files(DocID, Messages, Options).
231
232chat_messages_from_files(DocID, Messages, Options) :-
233 ( existing_chat_file(DocID, File)
234 -> read_messages(File, Messages0, Options),
235 filter_old(Messages0, Messages, Options)
236 ; Messages = []
237 ).
238
239read_messages(File, Messages, Options) :-
240 setup_call_cleanup(
241 open(File, read, In, [encoding(utf8)]),
242 read_messages_from_stream(In, Messages, Options),
243 close(In)).
244
245read_messages_from_stream(In, Messages, Options) :-
246 option(max(Max), Options, 25),
247 integer(Max),
248 setup_call_cleanup(
249 set_stream(In, encoding(octet)),
250 ( seek(In, 0, eof, _Pos),
251 backskip_lines(In, Max)
252 ),
253 set_stream(In, encoding(utf8))),
254 !,
255 read_terms(In, Messages).
256read_messages_from_stream(In, Messages, _Options) :-
257 seek(In, 0, bof, _NewPos),
258 read_terms(In, Messages).
259
260read_terms(In, Terms) :-
261 read_term(In, H, []),
262 ( H == end_of_file
263 -> Terms = []
264 ; Terms = [H|T],
265 read_terms(In, T)
266 ).
267
268backskip_lines(Stream, Lines) :-
269 byte_count(Stream, Here),
270 between(10, 20, X),
271 Start is max(0, Here-(1<<X)),
272 seek(Stream, Start, bof, _NewPos),
273 skip(Stream, 0'\n),
274 line_starts(Stream, Here, Starts),
275 reverse(Starts, RStarts),
276 nth1(Lines, RStarts, LStart),
277 !,
278 seek(Stream, LStart, bof, _).
279
280line_starts(Stream, To, Starts) :-
281 byte_count(Stream, Here),
282 ( Here >= To
283 -> Starts = []
284 ; Starts = [Here|T],
285 skip(Stream, 0'\n),
286 line_starts(Stream, To, T)
287 ).
288
289filter_old(Messages0, Messages, Options) :-
290 option(after(After), Options),
291 After > 0,
292 !,
293 include(after(After), Messages0, Messages).
294filter_old(Messages, Messages, _).
295
296after(After, Message) :-
297 is_dict(Message),
298 Message.get(time) > After.
299
304
305:- dynamic message_count/2. 306:- volatile message_count/2. 307
308chat_message_count(DocID, Count) :-
309 redis_docid_key(DocID, Server, Key),
310 !,
311 redis(Server, zcount(Key, 0, +inf), Count).
312chat_message_count(DocID, Count) :-
313 message_count(DocID, Count),
314 !.
315chat_message_count(DocID, Count) :-
316 count_messages(DocID, Count),
317 asserta(message_count(DocID, Count)).
318
319count_messages(DocID, Count) :-
320 ( existing_chat_file(DocID, File)
321 -> setup_call_cleanup(
322 open(File, read, In, [encoding(iso_latin_1)]),
323 ( skip(In, 256),
324 line_count(In, Line)
325 ),
326 close(In)),
327 Count is Line - 1
328 ; Count = 0
329 ).
330
331increment_message_count(DocID) :-
332 clause(message_count(DocID, Count0), _, CRef),
333 !,
334 Count is Count0+1,
335 asserta(message_count(DocID, Count)),
336 erase(CRef).
337increment_message_count(_).
338
342
343swish_config:chat_count_about(DocID, Count) :-
344 chat_message_count(DocID, Count).
345
346
347 350
354
355chat_messages(Request) :-
356 http_parameters(Request,
357 [ docid(DocID, []),
358 max(Max, [nonneg, optional(true)]),
359 after(After, [number, optional(true)])
360 ]),
361 include(ground, [max(Max), after(After)], Options),
362 chat_messages(DocID, Messages, Options),
363 reply_json_dict(Messages).
364
368
369chat_status(Request) :-
370 http_parameters(Request,
371 [ docid(DocID, []),
372 max(Max, [nonneg, optional(true)]),
373 after(After, [number, optional(true)])
374 ]),
375 include(ground, [max(Max), after(After)], Options),
376 chat_message_count(DocID, Total),
377 ( Options == []
378 -> Count = Total
379 ; chat_messages(DocID, Messages, Options),
380 length(Messages, Count)
381 ),
382 reply_json_dict(
383 json{docid: DocID,
384 total: Total,
385 count: Count
386 })