36
37:- module(gitty_driver_files,
38 [ gitty_open/2, 39 gitty_close/1, 40 gitty_file/4, 41
42 gitty_update_head/5, 43 44 delete_head/2, 45 set_head/3, 46 store_object/4, 47 delete_object/2, 48
49 gitty_hash/2, 50 load_plain_commit/3, 51 load_object/5, 52 gitty_object_file/3, 53
54 repack_objects/2, 55 pack_objects/6, 56 57 unpack_packs/1, 58 unpack_pack/2, 59
60 attach_pack/2, 61 gitty_fsck/1, 62 fsck_pack/1, 63 load_object_from_pack/4, 64
65 gitty_rescan/1 66 ]). 67:- use_module(library(apply)). 68:- use_module(library(zlib)). 69:- use_module(library(filesex)). 70:- use_module(library(lists)). 71:- use_module(library(apply)). 72:- use_module(library(error)). 73:- use_module(library(debug)). 74:- use_module(library(zlib)). 75:- use_module(library(hash_stream)). 76:- use_module(library(option)). 77:- use_module(library(dcg/basics)). 78:- use_module(library(redis)). 79:- use_module(library(redis_streams)). 80:- use_module(gitty, [is_gitty_hash/1]). 81
82:- use_module(swish_redis). 83
101
102:- dynamic
103 head/4, 104 store/2, 105 commit/3, 106 heads_input_stream_cache/2, 107 pack_object/6, 108 attached_packs/1, 109 attached_pack/2, 110 redis_db/4. 111
112:- volatile
113 head/4,
114 store/2,
115 commit/3,
116 heads_input_stream_cache/2,
117 pack_object/6,
118 attached_packs/1,
119 attached_pack/2. 120
121:- multifile
122 gitty:check_object/4. 123
127
128:- if(current_prolog_flag(windows, true)). 129remote_sync(false).
130:- else. 131remote_sync(true).
132:- endif. 133
146
147gitty_open(Store, Options) :-
148 option(redis(DB), Options),
149 !,
150 option(redis_ro(RO), Options, DB),
151 option(redis_prefix(Prefix), Options, swish),
152 asserta(redis_db(Store, DB, RO, Prefix)),
153 thread_create(gitty_scan(Store), _, [detached(true)]).
154gitty_open(_, _).
155
156
160
161gitty_close(Store) :-
162 ( retract(heads_input_stream_cache(Store, In))
163 -> close(In)
164 ; true
165 ),
166 retractall(head(Store,_,_,_)),
167 retractall(store(Store,_)),
168 retractall(pack_object(_,_,_,_,Store,_)).
169
170
175
176gitty_file(Store, Head, Ext, Hash) :-
177 redis_db(Store, _, _, _),
178 !,
179 gitty_scan(Store),
180 redis_file(Store, Head, Ext, Hash).
181gitty_file(Store, Head, Ext, Hash) :-
182 gitty_scan(Store),
183 head(Store, Head, Ext, Hash).
184
193
194load_plain_commit(Store, Hash, Meta) :-
195 must_be(atom, Store),
196 must_be(atom, Hash),
197 commit(Store, Hash, Meta),
198 !.
199load_plain_commit(Store, Hash, Meta) :-
200 load_object(Store, Hash, String, _, _),
201 term_string(Meta0, String, []),
202 with_mutex(gitty_commit_cache,
203 assert_cached_commit(Store, Hash, Meta0)),
204 Meta = Meta0.
205
206assert_cached_commit(Store, Hash, Meta) :-
207 commit(Store, Hash, Meta0),
208 !,
209 assertion(Meta0 =@= Meta).
210assert_cached_commit(Store, Hash, Meta) :-
211 assertz(commit(Store, Hash, Meta)).
212
217
218store_object(Store, Hash, _Hdr, _Data) :-
219 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
220 !.
221store_object(Store, Hash, Hdr, Data) :-
222 gitty_object_file(Store, Hash, Path),
223 with_mutex(gitty_file, exists_or_create(Path, Out0)),
224 ( var(Out0)
225 -> true
226 ; setup_call_cleanup(
227 zopen(Out0, Out, [format(gzip)]),
228 format(Out, '~s~s', [Hdr, Data]),
229 close(Out))
230 ).
231
232exists_or_create(Path, _Out) :-
233 exists_file(Path),
234 !.
235exists_or_create(Path, Out) :-
236 file_directory_name(Path, Dir),
237 make_directory_path(Dir),
238 open(Path, write, Out, [encoding(utf8), lock(write)]).
239
240:- if(\+current_predicate(ensure_directory/1)). 242ensure_directory(Dir) :-
243 exists_directory(Dir),
244 !.
245ensure_directory(Dir) :-
246 make_directory(Dir).
247:- endif. 248
253
254store_object_raw(Store, Hash, _Bytes, false) :-
255 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
256 !.
257store_object_raw(Store, Hash, Bytes, New) :-
258 gitty_object_file(Store, Hash, Path),
259 with_mutex(gitty_file, exists_or_create(Path, Out)),
260 ( var(Out)
261 -> New = false
262 ; call_cleanup(
263 ( set_stream(Out, type(binary)),
264 write(Out, Bytes)
265 ),
266 close(Out)),
267 New = true
268 ).
269
273
274load_object(_Store, Hash, Data, Type, Size) :-
275 load_object_from_pack(Hash, Data0, Type0, Size0),
276 !,
277 f(Data0, Type0, Size0) = f(Data, Type, Size).
278load_object(Store, Hash, Data, Type, Size) :-
279 load_object_file(Store, Hash, Data0, Type0, Size0),
280 !,
281 f(Data0, Type0, Size0) = f(Data, Type, Size).
282load_object(Store, Hash, Data, Type, Size) :-
283 redis_db(Store, _, _, _),
284 redis_replicate_get(Store, Hash),
285 load_object_file(Store, Hash, Data, Type, Size).
286
287load_object_file(Store, Hash, Data, Type, Size) :-
288 gitty_object_file(Store, Hash, Path),
289 exists_file(Path),
290 !,
291 setup_call_cleanup(
292 gzopen(Path, read, In, [encoding(utf8)]),
293 read_object(In, Data, Type, Size),
294 close(In)).
295
299
300has_object(Store, Hash) :-
301 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack),
302 !.
303has_object(Store, Hash) :-
304 gitty_object_file(Store, Hash, Path),
305 exists_file(Path).
306
310
311load_object_raw(_Store, Hash, Bytes) :-
312 load_object_from_pack(Hash, Data, Type, Size),
313 !,
314 object_bytes(Type, Size, Data, Bytes).
315load_object_raw(Store, Hash, Data) :-
316 gitty_object_file(Store, Hash, Path),
317 exists_file(Path),
318 !,
319 setup_call_cleanup(
320 open(Path, read, In, [type(binary)]),
321 read_string(In, _, Data),
322 close(In)).
323
327
328object_bytes(Type, Size, Data, Bytes) :-
329 setup_call_cleanup(
330 new_memory_file(MF),
331 ( setup_call_cleanup(
332 open_memory_file(MF, write, Out, [encoding(octet)]),
333 setup_call_cleanup(
334 zopen(Out, ZOut, [format(gzip), close_parent(false)]),
335 ( set_stream(ZOut, encoding(utf8)),
336 format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
337 ),
338 close(ZOut)),
339 close(Out)),
340 memory_file_to_string(MF, Bytes, octet)
341 ),
342 free_memory_file(MF)).
343
344
348
(Store, Hash, Type, Size) :-
350 gitty_object_file(Store, Hash, Path),
351 setup_call_cleanup(
352 gzopen(Path, read, In, [encoding(utf8)]),
353 read_object_hdr(In, Type, Size),
354 close(In)).
355
356read_object(In, Data, Type, Size) :-
357 read_object_hdr(In, Type, Size),
358 read_string(In, _, Data).
359
360read_object_hdr(In, Type, Size) :-
361 get_code(In, C0),
362 read_hdr(C0, In, Hdr),
363 phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr),
364 atom_codes(Type, TypeChars).
365
366read_hdr(C, In, [C|T]) :-
367 C > 0,
368 !,
369 get_code(In, C1),
370 read_hdr(C1, In, T).
371read_hdr(_, _, []).
372
377
378gitty_rescan(Store) :-
379 retractall(store(Store, _)).
380
389
390gitty_scan(Store) :-
391 store(Store, _),
392 !,
393 remote_updates(Store).
394gitty_scan(Store) :-
395 with_mutex(gitty, gitty_scan_sync(Store)).
396
397:- thread_local
398 latest/3. 399
400gitty_scan_sync(Store) :-
401 store(Store, _),
402 !.
403gitty_scan_sync(Store) :-
404 redis_db(Store, _, _, _),
405 !,
406 gitty_attach_packs(Store),
407 redis_ensure_heads(Store),
408 get_time(Now),
409 assertz(store(Store, Now)).
410:- if(remote_sync(true)). 411gitty_scan_sync(Store) :-
412 remote_sync(true),
413 !,
414 gitty_attach_packs(Store),
415 restore_heads_from_remote(Store).
416:- endif. 417gitty_scan_sync(Store) :-
418 gitty_attach_packs(Store),
419 read_heads_from_objects(Store).
420
425
426read_heads_from_objects(Store) :-
427 gitty_scan_latest(Store),
428 forall(retract(latest(Name, Hash, _Time)),
429 assert_head(Store, Name, Hash)),
430 get_time(Now),
431 assertz(store(Store, Now)).
432
433assert_head(Store, Name, Hash) :-
434 file_name_extension(_, Ext, Name),
435 assertz(head(Store, Name, Ext, Hash)).
436
437
442
443gitty_scan_latest(Store) :-
444 retractall(head(Store, _, _, _)),
445 retractall(latest(_, _, _)),
446 ( gitty_hash(Store, Hash),
447 load_object(Store, Hash, Data, commit, _Size),
448 term_string(Meta, Data, []),
449 _{name:Name, time:Time} :< Meta,
450 ( latest(Name, _, OldTime),
451 OldTime > Time
452 -> true
453 ; retractall(latest(Name, _, _)),
454 assertz(latest(Name, Hash, Time))
455 ),
456 fail
457 ; true
458 ).
459
460
464
465gitty_hash(Store, Hash) :-
466 var(Hash),
467 !,
468 ( gitty_attach_packs(Store),
469 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
470 ; gitty_file_object(Store, Hash)
471 ).
472gitty_hash(Store, Hash) :-
473 ( gitty_attach_packs(Store),
474 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack)
475 -> true
476 ; gitty_object_file(Store, Hash, File),
477 exists_file(File)
478 ).
479
480gitty_file_object(Store, Hash) :-
481 access_file(Store, exist),
482 directory_files(Store, Level0),
483 member(E0, Level0),
484 E0 \== '..',
485 atom_length(E0, 2),
486 directory_file_path(Store, E0, Dir0),
487 directory_files(Dir0, Level1),
488 member(E1, Level1),
489 E1 \== '..',
490 atom_length(E1, 2),
491 directory_file_path(Dir0, E1, Dir),
492 directory_files(Dir, Files),
493 member(File, Files),
494 atom_length(File, 36),
495 atomic_list_concat([E0,E1,File], Hash).
496
500
501delete_object(Store, Hash) :-
502 gitty_object_file(Store, Hash, File),
503 delete_file(File).
504
509
510gitty_object_file(Store, Hash, Path) :-
511 sub_string(Hash, 0, 2, _, Dir0),
512 sub_string(Hash, 2, 2, _, Dir1),
513 sub_string(Hash, 4, _, 0, File),
514 atomic_list_concat([Store, Dir0, Dir1, File], /, Path).
515
516
517 520
530
531gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
532 redis_db(Store, _, _, _),
533 !,
534 redis_update_head(Store, Name, OldCommit, NewCommit, DataHash).
535gitty_update_head(Store, Name, OldCommit, NewCommit, _) :-
536 with_mutex(gitty,
537 gitty_update_head_sync(Store, Name, OldCommit, NewCommit)).
538
539:- if(remote_sync(true)). 540gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
541 remote_sync(true),
542 !,
543 setup_call_cleanup(
544 heads_output_stream(Store, HeadsOut),
545 gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut),
546 close(HeadsOut)).
547:- endif. 548gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :-
549 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit).
550
551gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :-
552 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit),
553 format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]).
554
555gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :-
556 gitty_scan(Store), 557 ( OldCommit == (-)
558 -> ( head(Store, Name, _, _)
559 -> throw(error(gitty(file_exists(Name),_)))
560 ; assert_head(Store, Name, NewCommit)
561 )
562 ; ( retract(head(Store, Name, _, OldCommit))
563 -> assert_head(Store, Name, NewCommit)
564 ; throw(error(gitty(not_at_head(Name, OldCommit)), _))
565 )
566 ).
567
572
573:- dynamic
574 last_remote_sync/2. 575
576:- if(remote_sync(false)). 577remote_updates(_) :-
578 remote_sync(false),
579 !.
580:- endif. 581remote_updates(Store) :-
582 remote_up_to_data(Store),
583 !.
584remote_updates(Store) :-
585 with_mutex(gitty, remote_updates_sync(Store)).
586
587remote_updates_sync(Store) :-
588 remote_up_to_data(Store),
589 !.
590remote_updates_sync(Store) :-
591 retractall(last_remote_sync(Store, _)),
592 get_time(Now),
593 asserta(last_remote_sync(Store, Now)),
594 remote_update(Store).
595
596remote_up_to_data(Store) :-
597 last_remote_sync(Store, Last),
598 get_time(Now),
599 Now-Last < 1.
600
601remote_update(Store) :-
602 remote_updates(Store, List),
603 maplist(update_head(Store), List).
604
605update_head(Store, head(Name, OldCommit, NewCommit)) :-
606 ( OldCommit == (-)
607 -> \+ head(Store, Name, _, _)
608 ; retract(head(Store, Name, _, OldCommit))
609 ),
610 !,
611 assert_head(Store, Name, NewCommit).
612update_head(_, _).
613
620
621remote_updates(Store, List) :-
622 heads_input_stream(Store, Stream),
623 setup_call_cleanup(
624 '$push_input_context'(gitty_sync),
625 read_new_terms(Stream, List),
626 '$pop_input_context').
627
628read_new_terms(Stream, Terms) :-
629 read(Stream, First),
630 read_new_terms(First, Stream, Terms).
631
632read_new_terms(end_of_file, _, List) :-
633 !,
634 List = [].
635read_new_terms(Term, Stream, [Term|More]) :-
636 read(Stream, Term2),
637 read_new_terms(Term2, Stream, More).
638
639heads_output_stream(Store, Out) :-
640 heads_file(Store, HeadsFile),
641 open(HeadsFile, append, Out,
642 [ encoding(utf8),
643 lock(exclusive)
644 ]).
645
646heads_input_stream(Store, Stream) :-
647 heads_input_stream_cache(Store, Stream0),
648 !,
649 Stream = Stream0.
650heads_input_stream(Store, Stream) :-
651 heads_file(Store, File),
652 between(1, 2, _),
653 catch(open(File, read, In,
654 [ encoding(utf8),
655 eof_action(reset)
656 ]),
657 _,
658 create_heads_file(Store)),
659 !,
660 assert(heads_input_stream_cache(Store, In)),
661 Stream = In.
662
663create_heads_file(Store) :-
664 call_cleanup(
665 heads_output_stream(Store, Out),
666 close(Out)),
667 fail. 668
669heads_file(Store, HeadsFile) :-
670 ensure_directory(Store),
671 directory_file_path(Store, ref, RefDir),
672 ensure_directory(RefDir),
673 directory_file_path(RefDir, head, HeadsFile).
674
678
679restore_heads_from_remote(Store) :-
680 heads_file(Store, File),
681 exists_file(File),
682 setup_call_cleanup(
683 open(File, read, In, [encoding(utf8)]),
684 restore_heads(Store, In),
685 close(In)),
686 !,
687 get_time(Now),
688 assertz(store(Store, Now)).
689restore_heads_from_remote(Store) :-
690 read_heads_from_objects(Store),
691 heads_file(Store, File),
692 setup_call_cleanup(
693 open(File, write, Out, [encoding(utf8)]),
694 save_heads(Store, Out),
695 close(Out)),
696 !.
697
698restore_heads(Store, In) :-
699 read(In, Term0),
700 Term0 = epoch(_),
701 read(In, Term1),
702 restore_heads(Term1, In, Store).
703
704restore_heads(end_of_file, _, _) :- !.
705restore_heads(head(File, _, Hash), In, Store) :-
706 retractall(head(Store, File, _, _)),
707 assert_head(Store, File, Hash),
708 read(In, Term),
709 restore_heads(Term, In, Store).
710
711save_heads(Store, Out) :-
712 get_time(Now),
713 format(Out, 'epoch(~0f).~n~n', [Now]),
714 forall(head(Store, File, _, Hash),
715 format(Out, '~q.~n', [head(File, -, Hash)])).
716
717
723
724delete_head(Store, Head) :-
725 redis_db(Store, _, _, _),
726 !,
727 redis_delete_head(Store, Head).
728delete_head(Store, Head) :-
729 retractall(head(Store, Head, _, _)).
730
734
735set_head(Store, File, Hash) :-
736 redis_db(Store, _, _, _),
737 !,
738 redis_set_head(Store, File, Hash).
739set_head(Store, File, Hash) :-
740 file_name_extension(_, Ext, File),
741 ( head(Store, File, _, Hash0)
742 -> ( Hash == Hash0
743 -> true
744 ; asserta(head(Store, File, Ext, Hash)),
745 retractall(head(Store, File, _, Hash0))
746 )
747 ; asserta(head(Store, File, Ext, Hash))
748 ).
749
750
751 754
762
763pack_version(1).
764
777
778:- debug(gitty(pack)). 779
780repack_objects(Store, Options) :-
781 option(min_files(MinFiles), Options, 1_000),
782 findall(Object, gitty_file_object(Store, Object), Objects),
783 length(Objects, NewFiles),
784 debug(gitty(pack), 'Found ~D file objects', [NewFiles]),
785 ( NewFiles >= MinFiles
786 -> pack_files(Store, ExistingPacks),
787 option(small_pack(MaxSize), Options, 10_000_000),
788 include(small_file(MaxSize), ExistingPacks, PackFiles),
789 ( debugging(gitty(pack))
790 -> length(PackFiles, PackCount),
791 debug(gitty(pack), 'Found ~D small packs', [PackCount])
792 ; true
793 ),
794 directory_file_path(Store, pack, PackDir),
795 make_directory_path(PackDir),
796 pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options)
797 ; debug(gitty(pack), 'Nothing to do', [])
798 ).
799
800small_file(MaxSize, File) :-
801 size_file(File, Size),
802 Size < MaxSize.
803
808
809pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :-
810 with_mutex(gitty_pack,
811 pack_objects_sync(Store, Objects, Packs, PackDir,
812 PackFile, Options)).
813
814pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :-
815 !.
816pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :-
817 length(Objects, ObjCount),
818 length(Packs, PackCount),
819 debug(gitty(pack), 'Repacking ~D objects and ~D packs',
820 [ObjCount, PackCount]),
821 maplist(object_info(Store), Objects, FileInfo),
822 maplist(pack_info(Store), Packs, PackInfo),
823 append([FileInfo|PackInfo], Info0),
824 sort(1, @<, Info0, Info), 825 ( debugging(gitty(pack))
826 -> ( PackCount > 0
827 -> length(Info, FinalObjCount),
828 debug(gitty(pack), 'Total ~D objects', [FinalObjCount])
829 ; true
830 )
831 ; true
832 ),
833 directory_file_path(PackDir, 'pack-create', TmpPack),
834 setup_call_cleanup(
835 ( open(TmpPack, write, Out0, [type(binary), lock(write)]),
836 open_hash_stream(Out0, Out, [algorithm(sha1)])
837 ),
838 ( write_signature(Out),
839 maplist(write_header(Out), Info),
840 format(Out, 'end_of_header.~n', []),
841 maplist(add_file(Out, Store), Info),
842 stream_hash(Out, SHA1)
843 ),
844 close(Out)),
845 format(atom(PackFile), 'pack-~w.pack', [SHA1]),
846 directory_file_path(PackDir, PackFile, PackFilePath),
847 rename_file(TmpPack, PackFilePath),
848 debug(gitty(pack), 'Attaching ~p', [PackFilePath]),
849 attach_pack(Store, PackFilePath),
850 remove_objects_after_pack(Store, Objects, Options),
851 delete(Packs, PackFilePath, RmPacks),
852 remove_repacked_packs(Store, RmPacks, Options),
853 debug(gitty(pack), 'Packing completed', []).
854
855object_info(Store, Object, obj(Object, Type, Size, FileSize)) :-
856 gitty_object_file(Store, Object, File),
857 load_object_header(Store, Object, Type, Size),
858 size_file(File, FileSize).
859
860pack_info(Store, PackFile, Objects) :-
861 attach_pack(Store, PackFile),
862 pack_read_header(PackFile, _Version, _DataOffset, Objects).
863
864write_signature(Out) :-
865 pack_version(Version),
866 format(Out, "gitty(~d).~n", [Version]).
867
(Out, obj(Object, Type, Size, FileSize)) :-
869 format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
870
874
875add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :-
876 gitty_object_file(Store, Object, File),
877 exists_file(File),
878 !,
879 setup_call_cleanup(
880 open(File, read, In, [type(binary)]),
881 copy_stream_data(In, Out),
882 close(In)).
883add_file(Out, Store, obj(Object, Type, Size, FileSize)) :-
884 pack_object(Object, Type, Size, Offset, Store, PackFile),
885 setup_call_cleanup(
886 open(PackFile, read, In, [type(binary)]),
887 ( seek(In, Offset, bof, Offset),
888 copy_stream_data(In, Out, FileSize)
889 ),
890 close(In)).
891
892
896
897gitty_fsck(Store) :-
898 pack_files(Store, PackFiles),
899 maplist(fsck_pack, PackFiles).
900
904
905fsck_pack(File) :-
906 debug(gitty(pack), 'fsck ~p', [File]),
907 check_pack_hash(File),
908 debug(gitty(pack), 'fsck ~p: checking objects', [File]),
909 check_pack_objects(File),
910 debug(gitty(pack), 'fsck ~p: done', [File]).
911
912check_pack_hash(File) :-
913 file_base_name(File, Base),
914 file_name_extension(Plain, Ext, Base),
915 must_be(oneof([pack]), Ext),
916 atom_concat('pack-', Hash, Plain),
917 setup_call_cleanup(
918 ( open(File, read, In0, [type(binary)]),
919 open_hash_stream(In0, In, [algorithm(sha1)])
920 ),
921 ( setup_call_cleanup(
922 open_null_stream(Null),
923 copy_stream_data(In, Null),
924 close(Null)),
925 stream_hash(In, SHA1)
926 ),
927 close(In)),
928 assertion(Hash == SHA1).
929
930check_pack_objects(PackFile) :-
931 setup_call_cleanup(
932 open(PackFile, read, In, [type(binary)]),
933 ( read_header(In, Version, DataOffset, Objects),
934 set_stream(In, encoding(utf8)),
935 foldl(check_object(In, PackFile, Version), Objects, DataOffset, _)
936 ),
937 close(In)).
938
939check_object(In, PackFile, _Version,
940 obj(Object, Type, Size, FileSize),
941 Offset0, Offset) :-
942 Offset is Offset0+FileSize,
943 byte_count(In, Here),
944 ( Here == Offset0
945 -> true
946 ; print_message(warning, pack(reposition(Here, Offset0))),
947 seek(In, Offset0, bof, Offset0)
948 ),
949 ( setup_call_cleanup(
950 zopen(In, In2, [multi_part(false), close_parent(false)]),
951 catch(read_object(In2, Data, _0RType, _0RSize), E,
952 ( print_message(error,
953 gitty(PackFile, fsck(read_object(Object, E)))),
954 fail)),
955 close(In2))
956 -> byte_count(In, End),
957 ( End == Offset
958 -> true
959 ; print_message(error,
960 gitty(PackFile, fsck(object_end(Object, End,
961 Offset0, Offset,
962 Data))))
963 ),
964 assertion(Type == _0RType),
965 assertion(Size == _0RSize),
966 gitty:check_object(Object, Data, Type, Size)
967 ; true
968 ).
969
970
974
975gitty_attach_packs(Store) :-
976 attached_packs(Store),
977 !.
978gitty_attach_packs(Store) :-
979 with_mutex(gitty_attach_packs,
980 gitty_attach_packs_sync(Store)).
981
982gitty_attach_packs_sync(Store) :-
983 attached_packs(Store),
984 !.
985gitty_attach_packs_sync(Store) :-
986 pack_files(Store, PackFiles),
987 maplist(attach_pack(Store), PackFiles),
988 asserta(attached_packs(Store)).
989
990pack_files(Store, Packs) :-
991 directory_file_path(Store, pack, PackDir),
992 exists_directory(PackDir),
993 !,
994 directory_files(PackDir, Files),
995 convlist(is_pack(PackDir), Files, Packs).
996pack_files(_, []).
997
998is_pack(PackDir, File, Path) :-
999 file_name_extension(_, pack, File),
1000 directory_file_path(PackDir, File, Path).
1001
1005
1006attach_pack(Store, PackFile) :-
1007 attached_pack(Store, PackFile),
1008 !.
1009attach_pack(Store, PackFile) :-
1010 retractall(pack_object(_,_,_,_,_,PackFile)),
1011 pack_read_header(PackFile, Version, DataOffset, Objects),
1012 foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _),
1013 assertz(attached_pack(Store, PackFile)).
1014
(PackFile, Version, DataOffset, Objects) :-
1016 setup_call_cleanup(
1017 open(PackFile, read, In, [type(binary)]),
1018 read_header(In, Version, DataOffset, Objects),
1019 close(In)).
1020
(In, Version, DataOffset, Objects) :-
1022 read(In, Signature),
1023 ( Signature = gitty(Version)
1024 -> true
1025 ; domain_error(gitty_pack_file, Objects)
1026 ),
1027 read(In, Term),
1028 read_index(Term, In, Objects),
1029 get_code(In, Code),
1030 assertion(Code == 0'\n),
1031 byte_count(In, DataOffset).
1032
1033read_index(end_of_header, _, []) :-
1034 !.
1035read_index(Object, In, [Object|T]) :-
1036 read(In, Term2),
1037 read_index(Term2, In, T).
1038
1039assert_object(Store, Pack, _Version,
1040 obj(Object, Type, Size, FileSize),
1041 Offset0, Offset) :-
1042 Offset is Offset0+FileSize,
1043 assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
1044
1048
1049detach_pack(Store, Pack) :-
1050 retractall(pack_object(_, _, _, _, Store, Pack)),
1051 retractall(attached_pack(Store, Pack)).
1052
1056
1057load_object_from_pack(Hash, Data, Type, Size) :-
1058 pack_object(Hash, Type, Size, Offset, _Store, Pack),
1059 setup_call_cleanup(
1060 open(Pack, read, In, [type(binary)]),
1061 read_object_at(In, Offset, Data, Type, Size),
1062 close(In)).
1063
1064read_object_at(In, Offset, Data, Type, Size) :-
1065 seek(In, Offset, bof, Offset),
1066 read_object_here(In, Data, Type, Size).
1067
1068read_object_here(In, Data, Type, Size) :-
1069 stream_property(In, encoding(Enc)),
1070 setup_call_cleanup(
1071 ( set_stream(In, encoding(utf8)),
1072 zopen(In, In2, [multi_part(false), close_parent(false)])
1073 ),
1074 read_object(In2, Data, Type, Size),
1075 ( close(In2),
1076 set_stream(In, encoding(Enc))
1077 )).
1078
1079
1083
1084unpack_packs(Store) :-
1085 absolute_file_name(Store, AbsStore, [file_type(directory),
1086 access(read)]),
1087 pack_files(AbsStore, Packs),
1088 maplist(unpack_pack(AbsStore), Packs).
1089
1093
1094unpack_pack(Store, PackFile) :-
1095 pack_read_header(PackFile, _Version, DataOffset, Objects),
1096 setup_call_cleanup(
1097 open(PackFile, read, In, [type(binary)]),
1098 foldl(create_file(Store, In), Objects, DataOffset, _),
1099 close(In)),
1100 detach_pack(Store, PackFile),
1101 delete_file(PackFile).
1102
1103create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :-
1104 Offset is Offset0+FileSize,
1105 gitty_object_file(Store, Object, Path),
1106 with_mutex(gitty_file, exists_or_recreate(Path, Out)),
1107 ( var(Out)
1108 -> true
1109 ; setup_call_cleanup(
1110 seek(In, Offset0, bof, Offset0),
1111 copy_stream_data(In, Out, FileSize),
1112 close(Out))
1113 ).
1114
1115exists_or_recreate(Path, _Out) :-
1116 exists_file(Path),
1117 !.
1118exists_or_recreate(Path, Out) :-
1119 file_directory_name(Path, Dir),
1120 make_directory_path(Dir),
1121 open(Path, write, Out, [type(binary), lock(write)]).
1122
1123
1127
1128remove_objects_after_pack(Store, Objects, Options) :-
1129 debug(gitty(pack), 'Deleting plain files', []),
1130 maplist(delete_object(Store), Objects),
1131 ( option(prune_empty_directories(true), Options, true)
1132 -> debug(gitty(pack), 'Pruning empty directories', []),
1133 prune_empty_directories(Store)
1134 ; true
1135 ).
1136
1140
1141remove_repacked_packs(Store, Packs, Options) :-
1142 maplist(remove_pack(Store, Options), Packs).
1143
1144remove_pack(Store, _Options, Pack) :-
1145 detach_pack(Store, Pack),
1146 delete_file(Pack).
1147
1152
1153prune_empty_directories(Dir) :-
1154 prune_empty_directories(Dir, 0).
1155
1156prune_empty_directories(Dir, Level) :-
1157 directory_files(Dir, AllFiles),
1158 exclude(hidden, AllFiles, Files),
1159 ( Files == [],
1160 Level > 0
1161 -> delete_directory_async(Dir)
1162 ; convlist(prune_empty_directories(Dir, Level), Files, Left),
1163 ( Left == [],
1164 Level > 0
1165 -> delete_directory_async(Dir)
1166 ; true
1167 )
1168 ).
1169
1170hidden(.).
1171hidden(..).
1172
1173prune_empty_directories(Parent, Level0, File, _) :-
1174 directory_file_path(Parent, File, Path),
1175 exists_directory(Path),
1176 !,
1177 Level is Level0 + 1,
1178 prune_empty_directories(Path, Level),
1179 fail.
1180prune_empty_directories(_, _, File, File).
1181
1182delete_directory_async(Dir) :-
1183 with_mutex(gitty_file, delete_directory_async2(Dir)).
1184
1185delete_directory_async2(Dir) :-
1186 catch(delete_directory(Dir), E,
1187 ( \+ exists_directory(Dir)
1188 -> true
1189 ; \+ empty_directory(Dir)
1190 -> true
1191 ; throw(E)
1192 )).
1193
1194empty_directory(Dir) :-
1195 directory_files(Dir, AllFiles),
1196 exclude(hidden, AllFiles, []).
1197
1198
1199 1202
1203redis_head_db(Store, DB, Key) :-
1204 redis_db(Store, DB, _, Prefix),
1205 string_concat(Prefix, ":gitty:head", Key).
1206
1207redis_head_db_ro(Store, DB, Key) :-
1208 redis_db(Store, _, DB, Prefix),
1209 string_concat(Prefix, ":gitty:head", Key).
1210
1211
1213
1214redis_file(Store, Name, Ext, Hash) :-
1215 nonvar(Name),
1216 !,
1217 file_name_extension(_Base, Ext, Name),
1218 redis_head_db_ro(Store, DB, Heads),
1219 redis(DB, hget(Heads, Name), Hash as atom).
1220redis_file(Store, Name, Ext, Hash) :-
1221 nonvar(Ext),
1222 !,
1223 string_concat("*.", Ext, Pattern),
1224 redis_head_db_ro(Store, DB, Heads),
1225 redis_hscan(DB, Heads, LazyList, [match(Pattern)]),
1226 member(NameS-HashS, LazyList),
1227 atom_string(Name, NameS),
1228 atom_string(Hash, HashS).
1229redis_file(Store, Name, Ext, Hash) :-
1230 nonvar(Hash),
1231 !,
1232 load_plain_commit(Store, Hash, Commit),
1233 Name = Commit.name,
1234 file_name_extension(_Base, Ext, Name).
1235redis_file(Store, Name, Ext, Hash) :-
1236 redis_head_db_ro(Store, DB, Heads),
1237 redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)),
1238 member(Name-Hash, Pairs),
1239 file_name_extension(_Base, Ext, Name).
1240
1245
1246redis_ensure_heads(Store) :-
1247 redis_head_db_ro(Store, DB, Key),
1248 redis(DB, exists(Key), 1),
1249 !.
1250redis_ensure_heads(Store) :-
1251 redis_head_db(Store, DB, Key),
1252 debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]),
1253 gitty_scan_latest(Store),
1254 forall(retract(latest(Name, Hash, _Time)),
1255 redis(DB, hset(Key, Name, Hash))),
1256 debug(gitty(redis), '... finished gitty heads', []).
1257
1259
1260redis_update_head(Store, Name, -, NewCommit, DataHash) :-
1261 !,
1262 redis_head_db(Store, DB, Key),
1263 redis(DB, hset(Key, Name, NewCommit)),
1264 publish_objects(Store, [NewCommit, DataHash]).
1265redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :-
1266 redis_head_db(Store, DB, Key),
1267 redis_hcas(DB, Key, Name, OldCommit, NewCommit),
1268 publish_objects(Store, [NewCommit, DataHash]).
1269
1273
1274redis_delete_head(Store, Head) :-
1275 redis_head_db(Store, DB, Key),
1276 redis(DB, hdel(Key, Head)).
1277
1279
1280redis_set_head(Store, File, Hash) :-
1281 redis_head_db(Store, DB, Key),
1282 redis(DB, hset(Key, File, Hash)).
1283
1284 1287
1311
1312:- multifile
1313 swish_redis:stream/2. 1314
1315swish_redis:stream('gitty:replicate', [maxlen(100)]).
1316
1317:- listen(http(pre_server_start(_)),
1318 init_replicator). 1319
1320init_replicator :-
1321 redis_swish_stream('gitty:replicate', ReplKey),
1322 listen(redis(_Redis, ReplKey, _Id, Data),
1323 replicate(Data)),
1324 listen(redis(_, 'swish:gitty', Message),
1325 gitty_message(Message)),
1326 message_queue_create(_, [alias(gitty_queue)]).
1327
1328:- debug(gitty(replicate)). 1329
1330gitty_message(discover(Hash)) :-
1331 debug(gitty(replicate), 'Discover: ~p', [Hash]),
1332 store(Store, _),
1333 load_object_raw(Store, Hash, Data),
1334 debug(gitty(replicate), 'Sending object ~p', [Hash]),
1335 redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)).
1336gitty_message(object(Hash, Data)) :-
1337 debug(gitty(replicate), 'Replicate: ~p', [Hash]),
1338 redis_db(Store, _DB, _RO, _Prefix),
1339 store_object_raw(Store, Hash, Data, New),
1340 debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]),
1341 ( New == true
1342 -> thread_send_message(gitty_queue, Hash)
1343 ; true
1344 ).
1345
1356
1357redis_replicate_get(Store, Hash) :-
1358 is_gitty_hash(Hash),
1359 redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count),
1360 Count > 1, 1361 between(1, 100, _),
1362 ( thread_get_message(gitty_queue, Hash,
1363 [ timeout(0.1)
1364 ])
1365 -> !
1366 ; has_object(Store, Hash)
1367 -> !
1368 ; restart_pubsub,
1369 fail
1370 ).
1371
1372:- dynamic
1373 restarted/1. 1374
1375restart_pubsub :-
1376 ( restarted(When)
1377 -> get_time(Now),
1378 Now-When < 60,
1379 !
1380 ).
1381restart_pubsub :-
1382 get_time(Now),
1383 transaction(( retractall(restarted(_)),
1384 asserta(restarted(Now)))),
1385 thread_signal(redis_pubsub, throw(error(io_error(read, _),_))),
1386 sleep(0.05).
1387
1388
1389
1402
1403publish_objects(Store, Hashes) :-
1404 redis_swish_stream('gitty:replicate', ReplKey),
1405 maplist(publish_object(Store, ReplKey), Hashes).
1406
1407publish_object(Store, Stream, Hash) :-
1408 load_object_raw(Store, Hash, Data),
1409 debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]),
1410 xadd(swish, Stream, _, _{hash:Hash, data:Data}).
1411
1417
1418replicate(Data) :-
1419 redis_db(Store, _DB, _RO, _Prefix),
1420 atom_string(Hash, Data.hash),
1421 store_object_raw(Store, Hash, Data.data, _0New),
1422 debug(gitty(replicate), 'Received object ~p (new=~p)',
1423 [Hash, _0New]).
1424
1425
1426 1429
1433
1434redis_hcas(DB, Hash, Key, Old, New) :-
1435 redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
1436 redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]); \c
1437 return 1; \c
1438 end; \c
1439 return 0\c
1440 ",
1441 1, Hash, Key, Old, New),
1442 1)