36
37:- module(swish_diagnostics,
38 [ pengine_stale_module/1, 39 pengine_stale_module/2, 40 stale_pengine/1, 41 swish_statistics/1, 42 start_swish_stat_collector/0,
43 swish_stats/2, 44 swish_save_stats/1, 45 swish_died_thread/2, 46 redis_consumer_status/2, 47 swish_cluster_member/2
48 ]). 49:- use_module(library(pengines)). 50:- use_module(library(broadcast)). 51:- use_module(library(lists)). 52:- use_module(library(apply)). 53:- use_module(library(debug)). 54:- use_module(library(aggregate)). 55:- use_module(library(settings)). 56:- use_module(procps). 57:- use_module(highlight). 58:- if(exists_source(library(mallocinfo))). 59:- use_module(library(mallocinfo)). 60:- export(malloc_info/1). 61:- endif. 62:- use_module(swish_redis). 63:- use_module(config). 64
65:- setting(stats_file, callable, data('stats.db'),
66 "Save statistics to achieve a long term view"). 67:- setting(stats_interval, integer, 300, 68 "Save stats every N seconds"). 69
70:- meta_predicate
71 must_succeed(0). 72
73redis_key(What, Server, Key) :-
74 redis_consumer(Consumer),
75 redis_key(Consumer, What, Server, Key).
76
77redis_key(Consumer, What, Server, Key) :-
78 swish_config(redis, Server),
79 swish_config(redis_prefix, Prefix),
80 atomic_list_concat([Prefix,What,Consumer], :, Key).
81
82wait_redis_key(Server, Key) :-
83 between(1, 10, X),
84 ( redis_key(stat, Server, Key)
85 -> !
86 ; Wait is (1<<X)*0.1,
87 sleep(Wait),
88 fail
89 ).
90
91use_redis :-
92 swish_config(redis, _).
93
94redis_publish_stats(Time, Stat) :-
95 Time mod 10 =:= 0,
96 use_redis,
97 redis_key(status, Server, Key),
98 !,
99 must_succeed(redis(Server, set(Key, Stat.put(time,Time) as prolog))).
100redis_publish_stats(_, _).
101
106
107:- listen(swish(backend_status(Consumer, Stat)),
108 redis_consumer_status(Consumer, Stat)). 109
110redis_consumer_status(Consumer, Stat) :-
111 redis_key(Consumer, status, Server, Key),
112 redis(Server, get(Key), Stat).
113
115
116swish_cluster_member(Consumer, Status) :-
117 swish_cluster(Pairs),
118 member(Consumer-URL, Pairs),
119 redis_consumer_status(Consumer, Status0),
120 Status = Status0.put(url, URL).
121
122
126
127stale_pengine(Pengine) :-
128 pengine_property(Pengine, thread(Thread)),
129 \+ catch(thread_property(Thread, status(running)), _, fail).
130
131
138
139pengine_stale_module(M) :-
140 current_module(M),
141 is_uuid(M),
142 \+ live_module(M),
143 \+ current_highlight_state(M, _).
144
145pengine_stale_module(M, State) :-
146 pengine_stale_module(M),
147 stale_module_state(M, State).
148
149live_module(M) :-
150 pengine_property(Pengine, module(M)),
151 pengine_property(Pengine, thread(Thread)),
152 catch(thread_property(Thread, status(running)), _, fail).
153
154stale_module_state(M, State) :-
155 findall(N-V, stale_module_property(M, N, V), Properties),
156 dict_create(State, stale, Properties).
157
158stale_module_property(M, pengine, Pengine) :-
159 pengine_property(Pengine, module(M)).
160stale_module_property(M, pengine_queue, Queue) :-
161 pengine_property(Pengine, module(M)),
162 member(G, [pengines:pengine_queue(Pengine, Queue, _TimeOut, _Time)]),
163 call(G). 164stale_module_property(M, pengine_pending_queue, Queue) :-
165 pengine_property(Pengine, module(M)),
166 member(G, [pengines:output_queue(Pengine, Queue, _Time)]),
167 call(G). 168stale_module_property(M, thread, Thread) :-
169 pengine_property(Pengine, module(M)),
170 member(G, [pengines:pengine_property(Pengine, thread(Thread))]),
171 call(G). 172stale_module_property(M, thread_status, Status) :-
173 pengine_property(Pengine, module(M)),
174 pengine_property(Pengine, thread(Thread)),
175 catch(thread_property(Thread, status(Status)), _, fail).
176stale_module_property(M, module_class, Class) :-
177 module_property(M, class(Class)).
178stale_module_property(M, program_space, Space) :-
179 module_property(M, program_space(Space)).
180stale_module_property(M, program_size, Size) :-
181 module_property(M, program_size(Size)).
182stale_module_property(M, predicates, List) :-
183 current_module(M),
184 findall(PI, pi_in_module(M, PI), List).
185stale_module_property(UUID, highlight_state, State) :-
186 current_highlight_state(UUID, State).
187
188pi_in_module(M, Name/Arity) :-
189 '$c_current_predicate'(_, M:Head),
190 functor(Head, Name, Arity).
191
195
196swish_statistics(highlight_states(Count)) :-
197 aggregate_all(count, current_highlight_state(_,_), Count).
198swish_statistics(pengines(Count)) :-
199 aggregate_all(count, pengine_property(_,thread(_)), Count).
200swish_statistics(remote_pengines(Count)) :-
201 aggregate_all(count, pengine_property(_,remote(_)), Count).
202swish_statistics(pengines_created(Count)) :-
203 ( flag(pengines_created, Old, Old)
204 -> Count = Old
205 ; Count = 0
206 ).
207
208:- listen(pengine(Action), swish_update_stats(Action)). 209
210swish_update_stats(create(_Pengine, _Application, _Options0)) :-
211 flag(pengines_created, Old, Old+1).
212swish_update_stats(send(_Pengine, _Event)).
213
214
218
219is_uuid(M) :-
220 atom(M),
221 atom_length(M, 36),
222 forall(sub_atom(M, S, 1, _, C),
223 uuid_code(S, C)).
224
225uuid_sep(8).
226uuid_sep(13).
227uuid_sep(18).
228uuid_sep(23).
229
230uuid_code(S, -) :- !, uuid_sep(S).
231uuid_code(_, X) :- char_type(X, xdigit(_)).
232
233 236
237:- if(current_predicate(http_unix_daemon:http_daemon/0)). 238:- use_module(library(broadcast)). 239:- listen(http(post_server_start), start_swish_stat_collector). 240:- else. 241:- initialization
242 start_swish_stat_collector. 243:- endif. 244
249
250start_swish_stat_collector :-
251 thread_property(_, alias(swish_stats)),
252 !.
253start_swish_stat_collector :-
254 persistent_stats(Persists),
255 swish_stat_collector(
256 swish_stats,
257 258 [ 60, 259 60/10, 260 24*6/6, 261 7*24/24, 262 52 263 ],
264 1,
265 Persists),
266 at_halt(swish_save_stats(_)).
267
268swish_stat_collector(Name, Dims, Interval, Persists) :-
269 atom(Name),
270 !,
271 thread_create(stat_collect(Dims, Interval, Persists), _, [alias(Name)]).
272swish_stat_collector(Thread, Dims, Interval, Persists) :-
273 thread_create(stat_collect(Dims, Interval, Persists), Thread, []).
274
275persistent_stats(save(Path, Interval)) :-
276 setting(stats_interval, Interval),
277 Interval > 0,
278 ( use_redis
279 -> redis_key(stat, Server, Key),
280 Path = redis(Server, Key)
281 ; setting(stats_file, File),
282 ( absolute_file_name(File, Path,
283 [ access(write),
284 file_errors(fail)
285 ])
286 -> true
287 ; File =.. [Alias,_],
288 DirSpec =.. [Alias, '.'],
289 absolute_file_name(DirSpec, Dir,
290 [ solutions(all)
291 ]),
292 \+ exists_directory(Dir),
293 catch(make_directory(Dir),
294 error(permission_error(create, directory, Dir), _),
295 fail),
296 absolute_file_name(File, Path,
297 [ access(write),
298 file_errors(fail)
299 ])
300 )
301 ),
302 !.
303persistent_stats(save(-, 0)).
304
305
306
329
330swish_stats(Name, Stats) :-
331 stats_ring(Name, Ring),
332 swish_stats(swish_stats, Ring, Stats).
333
334stats_ring(minute, 1).
335stats_ring(hour, 2).
336stats_ring(day, 3).
337stats_ring(week, 4).
338stats_ring(year, 5).
339
340swish_stats(Name, Ring, Stats) :-
341 thread_self(Me),
342 catch(thread_send_message(Name, Me-get_stats(Ring)), E,
343 stats_died(Name, E)),
344 thread_get_message(get_stats(Ring, Stats)).
345
346stats_died(Alias, E) :-
347 print_message(error, E),
348 thread_join(Alias, Status),
349 print_message(error, swish_stats(died, Status)),
350 start_swish_stat_collector,
351 fail.
352
353stat_collect(Dims, Interval, Persists) :-
354 E = error(_,_),
355 catch_with_backtrace(stat_collect_(Dims, Interval, Persists),
356 E, print_message(error, E)),
357 stat_collect(Dims, Interval, Persists).
358
359stat_collect_(Dims, Interval, Persists) :-
360 restart_sliding_stats(Persists, Dims, SlidingStat),
361 get_time(Now),
362 ITime is floor(Now),
363 stat_loop(SlidingStat, _{}, ITime, Interval, Persists, [true]).
364
365stat_loop(SlidingStat, Stat0, StatTime, Interval, Persists, Wrap) :-
366 ( thread_self(Me),
367 thread_get_message(Me, Request,
368 [ deadline(StatTime)
369 ])
370 -> ( reply_stats_request(Request, SlidingStat)
371 -> true
372 ; debug(swish_stats, 'Failed to process ~p', [Request])
373 ),
374 stat_loop(SlidingStat, Stat0, StatTime, Interval, Persists, Wrap)
375 ; get_stats(Wrap, Stat1),
376 dif_stat(Stat1, Stat0, Stat),
377 redis_publish_stats(StatTime, Stat),
378 push_sliding_stats(SlidingStat, Stat, Wrap1),
379 NextTime is StatTime+Interval,
380 save_stats(Persists, SlidingStat),
381 stat_loop(SlidingStat, Stat1, NextTime, Interval, Persists, Wrap1)
382 ).
383
384:- det(dif_stat/3). 385dif_stat(Stat1, Stat0, Stat) :-
386 maplist(dif_field(Stat1, Stat0),
387 [ cpu - d_cpu,
388 pengines_created - d_pengines_created
389 ],
390 Fields),
391 !,
392 dict_pairs(Extra, _, Fields),
393 put_dict(Extra, Stat1, Stat).
394dif_stat(Stat, _, Stat).
395
396dif_field(Stat1, Stat0, Key-DKey, DKey-DValue) :-
397 DValue is Stat1.get(Key) - Stat0.get(Key).
398
399reply_stats_request(Client-get_stats(Period), SlidingStat) :-
400 !,
401 arg(Period, SlidingStat, Ring),
402 ring_values(Ring, Values),
403 thread_send_message(Client, get_stats(Period, Values)).
404reply_stats_request(Client-save_stats(File), SlidingStat) :-
405 !,
406 ( var(File)
407 -> persistent_stats(save(File, _Interval))
408 ; true
409 ),
410 catch(save_stats_file(File, SlidingStat), E, true),
411 ( var(E)
412 -> thread_send_message(Client, save_stats(File))
413 ; thread_send_message(Client, save_stats(error(E)))
414 ).
415
416
420
421:- det(get_stats/2). 422get_stats(Wrap, Stats) :-
423 Stats0 = stats{ cpu:CPU,
424 rss:RSS,
425 stack:Stack,
426 pengines:Pengines,
427 threads:Threads,
428 pengines_created:PenginesCreated,
429 time:Time
430 },
431 get_time(Now),
432 Time is floor(Now),
433 statistics(process_cputime, PCPU),
434 statistics(cputime, MyCPU),
435 CPU is PCPU-MyCPU,
436 statistics(stack, Stack),
437 statistics(threads, Threads),
438 catch(procps_stat(Stat), _,
439 Stat = stat{rss:0}),
440 RSS = Stat.rss,
441 swish_statistics(pengines(Pengines)),
442 swish_statistics(pengines_created(PenginesCreated)),
443 add_fordblks(Wrap, Stats0, Stats1),
444 add_heap(Stats1, Stats2),
445 add_visitors(Stats2, Stats).
446
447:- if(current_predicate(malloc_property/1)). 448add_heap(Stats0, Stats) :-
449 malloc_property('generic.current_allocated_bytes'(Heap)),
450 Stats = Stats0.put(heap, Heap).
451:- else. 452add_heap(Stats, Stats).
453:- endif. 454
455:- if(current_predicate(malloc_property/1)). 456
457add_fordblks(_, Stats0, Stats) :-
458 malloc_property('generic.current_allocated_bytes'(Used)),
459 malloc_property('generic.heap_size'(Heap)),
460 !,
461 FordBlks is Heap - Used,
462 Stats = Stats0.put(fordblks, FordBlks).
463
464:- elif(current_predicate(mallinfo/1)). 465:- dynamic fordblks_wrap/1. 466fordblks_wrap(0).
467
468add_wrap(0) :- !.
469add_wrap(Amount) :-
470 retract(fordblks_wrap(Wrap0)),
471 Wrap1 is Wrap0+Amount,
472 asserta(fordblks_wrap(Wrap1)).
473
474fix_fordblks_wrap(FordBlks0, FordBlks) :-
475 fordblks_wrap(Wrap),
476 FordBlks1 is FordBlks0+Wrap,
477 ( nb_current(fordblks, Prev)
478 -> NW is FordBlks0 mod (1<<32),
479 PW is Prev mod (1<<32),
480 ( PW > (1<<32)-(1<<30),
481 NW < (1<<30)
482 -> Add is 1<<32
483 ; NW > (1<<32)-(1<<30),
484 PW < (1<<30)
485 -> Add is -(1<<32)
486 ; Add = 0
487 ),
488 add_wrap(Add),
489 FordBlks = FordBlks1+Add
490 ; FordBlks = FordBlks1
491 ).
492
493add_fordblks(Wrap, Stats0, Stats) :-
494 ( Wrap = [true|_]
495 -> member(G, [mallinfo(MallInfo)]),
496 call(G), 497 FordBlks0 = MallInfo.get(fordblks),
498 fix_fordblks_wrap(FordBlks0, FordBlks),
499 b_setval(fordblks, FordBlks)
500 ; nb_current(fordblks, FordBlks)
501 ),
502 !,
503 Stats = Stats0.put(fordblks, FordBlks).
504:- endif. 505add_fordblks(_, Stats, Stats).
506
507add_visitors(Stats0, Stats) :-
508 use_redis,
509 broadcast_request(swish(visitor_count(Cluster, Local))),
510 !,
511 Stats = Stats0.put(_{visitors:Cluster, local_visitors:Local}).
512add_visitors(Stats0, Stats) :-
513 broadcast_request(swish(visitor_count(C))),
514 !,
515 Stats = Stats0.put(visitors, C).
516add_visitors(Stats, Stats).
517
518
523
524new_sliding_stats(Dims, Stats) :-
525 maplist(new_ring, Dims, Rings),
526 compound_name_arguments(Stats, sliding_stats, Rings).
527
528:- det(push_sliding_stats/3). 529push_sliding_stats(Stats, Values, Wrap) :-
530 push_sliding_stats(1, Stats, Values, Wrap).
531
532push_sliding_stats(I, Stats, Values, [Wrap|WrapT]) :-
533 arg(I, Stats, Ring),
534 push_ring(Ring, Values, Wrap),
535 ( Wrap == true
536 -> average_ring(Ring, Avg),
537 I2 is I+1,
538 ( push_sliding_stats(I2, Stats, Avg, WrapT)
539 -> true
540 ; true
541 )
542 ; WrapT = []
543 ).
544
545new_ring(Dim0/Avg, ring(0, Avg, Ring)) :-
546 !,
547 Dim is Dim0,
548 compound_name_arity(Ring, [], Dim).
549new_ring(Dim0, ring(0, Dim, Ring)) :-
550 Dim is Dim0,
551 compound_name_arity(Ring, [], Dim).
552
553push_ring(Ring, Value, Wrap) :-
554 Ring = ring(Here0, Avg, Data),
555 Here is Here0+1,
556 compound_name_arity(Data, _, Size),
557 Arg is (Here0 mod Size)+1,
558 ( Arg mod Avg =:= 0
559 -> Wrap = true
560 ; Wrap = false
561 ),
562 nb_setarg(Arg, Data, Value),
563 nb_setarg(1, Ring, Here).
564
565ring_values(Ring, Values) :-
566 Ring = ring(Here, _, Data),
567 compound_name_arity(Data, _, Size),
568 Start is Here - 1,
569 End is Start - min(Here,Size),
570 read_ring(Start, End, Size, Data, Values).
571
572read_ring(End, End, _, _, []) :- !.
573read_ring(Here0, End, Size, Data, [H|T]) :-
574 A is (Here0 mod Size)+1,
575 arg(A, Data, H),
576 Here1 is Here0-1,
577 read_ring(Here1, End, Size, Data, T).
578
579average_ring(ring(Here0,AvgI,Data), Avg) :-
580 compound_name_arity(Data, _, Dim),
581 Here is ((Here0-1) mod Dim)+1,
582 Start0 is Here - AvgI + 1,
583 ( Start0 < 1
584 -> Start is Start0+Dim
585 ; Start is Start0
586 ),
587 avg_window(Start, Here, Dim, Data, Dicts),
588 average_dicts(Dicts, Avg).
589
590avg_window(End, End, _, Data, [Dict]) :-
591 !,
592 arg(End, Data, Dict).
593avg_window(Here, End, DIM, Data, [H|T]) :-
594 arg(Here, Data, H),
595 Here1 is Here+1,
596 ( Here1 > DIM
597 -> Here2 is Here1-DIM
598 ; Here2 is Here1
599 ),
600 avg_window(Here2, End, DIM, Data, T).
601
602average_dicts(Dicts, Avg) :-
603 dicts_to_same_keys(Dicts, dict_fill(0), Dicts1),
604 Dicts1 = [H|_],
605 is_dict(H, Tag),
606 dict_keys(H, Keys),
607 length(Dicts1, Len),
608 maplist(avg_key(Dicts1, Len), Keys, Pairs),
609 dict_pairs(Avg, Tag, Pairs).
610
611avg_key(Dicts, Len, Key, Key-Avg) :-
612 maplist(get_dict(Key), Dicts, Values),
613 sum_list(Values, Sum),
614 Avg is float(Sum)/Len.
615
619
620save_stats(save(File, Interval), Stats) :-
621 Interval > 0,
622 arg(1, Stats, ring(Here, _, _)),
623 Here mod Interval =:= 0,
624 must_succeed(save_stats_file(File, Stats)),
625 !.
626save_stats(_, _).
627
628save_stats_file(redis(Server, Key), Stats) =>
629 redis(Server, set(Key, Stats as prolog)).
630save_stats_file(File, Stats) =>
631 setup_call_cleanup(
632 open(File, write, Out),
633 save_stats_stream(Stats, Out),
634 close(Out)).
635
636save_stats_stream(Stats, Out) :-
637 get_time(Now),
638 \+ \+ ( numbervars(Stats, 0, _, [singletons(true)]),
639 format(Out, 'stats(~1f, ~q).~n', [Now, Stats])
640 ).
641
647
648restart_sliding_stats(save(_, _), Dims, Stats) :-
649 use_redis,
650 !,
651 ( wait_redis_key(Server, Key),
652 redis(Server, get(Key), Stats),
653 new_sliding_stats(Dims, New),
654 compatible_sliding_stats(Stats, New)
655 -> true
656 ; new_sliding_stats(Dims, Stats)
657 ).
658restart_sliding_stats(save(File, _), Dims, Stats) :-
659 exists_file(File),
660 E = error(_,_),
661 catch(setup_call_cleanup(
662 open(File, read, In),
663 read(In, stats(_Saved, Stats)),
664 close(In)),
665 E, (print_message(warning, E), fail)),
666 new_sliding_stats(Dims, New),
667 compatible_sliding_stats(Stats, New),
668 !.
669restart_sliding_stats(_, Dims, Stats) :-
670 new_sliding_stats(Dims, Stats).
671
672compatible_sliding_stats(Stats1, Stats2) :-
673 compound_name_arguments(Stats1, Name, List1),
674 compound_name_arguments(Stats2, Name, List2),
675 maplist(compatible_window, List1, List2).
676
677compatible_window(ring(_,Avg,Data1), ring(_,Avg,Data2)) :-
678 compound_name_arity(Data1, Name, Dim),
679 compound_name_arity(Data2, Name, Dim).
680
684
685:- listen(http(shutdown), swish_save_stats(_)). 686swish_save_stats(File) :-
687 thread_self(Me),
688 catch(thread_send_message(swish_stats, Me-save_stats(File)), E,
689 stats_died(swish_stats, E)),
690 thread_get_message(Me, save_stats(Result), [timeout(1)]),
691 ( Result = error(E)
692 -> throw(E)
693 ; File = Result
694 ).
695
696
701
702swish_died_thread(TID, Status) :-
703 findall(TID-Stat, (thread_property(Thread, status(Stat)),
704 Stat \== running,
705 thread_property(Thread, id(TID))), Pairs),
706 member(TID-Stat, Pairs),
707 status_message(Stat, Status).
708
709status_message(exception(Ex), Message) :-
710 !,
711 message_to_string(Ex, Message0),
712 string_concat('ERROR: ', Message0, Message).
713status_message(Status, Status).
714
715
716must_succeed(Goal) :-
717 E = error(_,_),
718 catch_with_backtrace(Goal, E, print_message(warning, E)),
719 !.
720must_succeed(Goal) :-
721 print_message(warning, goal_failed(Goal)).
722
723
724 727
728:- multifile
729 sandbox:safe_primitive/1. 730
731sandbox:safe_primitive(swish_diagnostics:pengine_stale_module(_)).
732sandbox:safe_primitive(swish_diagnostics:pengine_stale_module(_,_)).
733sandbox:safe_primitive(swish_diagnostics:stale_pengine(_)).
734sandbox:safe_primitive(swish_diagnostics:swish_statistics(_)).
735sandbox:safe_primitive(swish_diagnostics:swish_stats(_, _)).
736sandbox:safe_primitive(swish_diagnostics:swish_died_thread(_, _)).
737sandbox:safe_primitive(swish_diagnostics:swish_cluster_member(_,_)).
738:- if(current_predicate(malloc_info:malloc_info/1)). 739sandbox:safe_primitive(malloc_info:malloc_info(_)).
740:- endif.