1/* Part of SWISH 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2015-2020, VU University Amsterdam 7 CWI Amsterdam 8 SWI-Prolog Solutions b.v. 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(gitty_driver_files, 38 [ gitty_open/2, % +Store, +Options 39 gitty_close/1, % +Store 40 gitty_file/4, % +Store, ?Name, ?Ext, ?Hash 41 42 gitty_update_head/5, % +Store, +Name, +OldCommit, +NewCommit 43 % +DataHash 44 delete_head/2, % +Store, +Name 45 set_head/3, % +Store, +Name, +Hash 46 store_object/4, % +Store, +Hash, +Header, +Data 47 delete_object/2, % +Store, +Hash 48 49 gitty_hash/2, % +Store, ?Hash 50 load_plain_commit/3, % +Store, +Hash, -Meta 51 load_object/5, % +Store, +Hash, -Data, -Type, -Size 52 gitty_object_file/3, % +Store, +Hash, -File 53 54 repack_objects/2, % +Store, +Options 55 pack_objects/6, % +Store, +Objs, +Packs, +PackDir, 56 % -File, +Opts 57 unpack_packs/1, % +Store 58 unpack_pack/2, % +Store, +PackFile 59 60 attach_pack/2, % +Store, +PackFile 61 gitty_fsck/1, % +Store 62 fsck_pack/1, % +PackFile 63 load_object_from_pack/4, % +Hash, -Data, -Type, -Size 64 65 gitty_rescan/1 % Store 66 ]). 67:- use_module(library(apply)). 68:- use_module(library(zlib)). 69:- use_module(library(filesex)). 70:- use_module(library(lists)). 71:- use_module(library(apply)). 72:- use_module(library(error)). 73:- use_module(library(debug)). 74:- use_module(library(zlib)). 75:- use_module(library(hash_stream)). 76:- use_module(library(option)). 77:- use_module(library(dcg/basics)). 78:- use_module(library(redis)). 79:- use_module(library(redis_streams)). 80:- use_module(gitty, [is_gitty_hash/1]). 81 82:- use_module(swish_redis).
102:- dynamic 103 head/4, % Store, Name, Ext, Hash 104 store/2, % Store, Updated 105 commit/3, % Store, Hash, Meta 106 heads_input_stream_cache/2, % Store, Stream 107 pack_object/6, % Hash, Type, Size, Offset, Store,PackFile 108 attached_packs/1, % Store 109 attached_pack/2, % Store, PackFile 110 redis_db/4. % Store, DB, RO, Prefix 111 112:- volatile 113 head/4, 114 store/2, 115 commit/3, 116 heads_input_stream_cache/2, 117 pack_object/6, 118 attached_packs/1, 119 attached_pack/2. 120 121:- multifile 122 gitty:check_object/4. 123 124% enable/disable syncing remote servers running on the same file store. 125% This facility requires shared access to files and thus doesn't work on 126% Windows. 127 128:- if(current_prolog_flag(windows, true)). 129remote_sync(false). 130:- else. 131remote_sync(true). 132:- endif.
swish
.147gitty_open(Store, Options) :- 148 option(redis(DB), Options), 149 !, 150 option(redis_ro(RO), Options, DB), 151 option(redis_prefix(Prefix), Options, swish), 152 asserta(redis_db(Store, DB, RO, Prefix)), 153 thread_create(gitty_scan(Store), _, [detached(true)]). 154gitty_open(_, _).
161gitty_close(Store) :-
162 ( retract(heads_input_stream_cache(Store, In))
163 -> close(In)
164 ; true
165 ),
166 retractall(head(Store,_,_,_)),
167 retractall(store(Store,_)),
168 retractall(pack_object(_,_,_,_,Store,_)).
176gitty_file(Store, Head, Ext, Hash) :- 177 redis_db(Store, _, _, _), 178 !, 179 gitty_scan(Store), 180 redis_file(Store, Head, Ext, Hash). 181gitty_file(Store, Head, Ext, Hash) :- 182 gitty_scan(Store), 183 head(Store, Head, Ext, Hash).
194load_plain_commit(Store, Hash, Meta) :- 195 must_be(atom, Store), 196 must_be(atom, Hash), 197 commit(Store, Hash, Meta), 198 !. 199load_plain_commit(Store, Hash, Meta) :- 200 load_object(Store, Hash, String, _, _), 201 term_string(Meta0, String, []), 202 with_mutex(gitty_commit_cache, 203 assert_cached_commit(Store, Hash, Meta0)), 204 Meta = Meta0. 205 206assert_cached_commit(Store, Hash, Meta) :- 207 commit(Store, Hash, Meta0), 208 !, 209 assertion(Meta0 =@= Meta). 210assert_cached_commit(Store, Hash, Meta) :- 211 assertz(commit(Store, Hash, Meta)).
218store_object(Store, Hash, _Hdr, _Data) :- 219 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack), 220 !. 221store_object(Store, Hash, Hdr, Data) :- 222 gitty_object_file(Store, Hash, Path), 223 with_mutex(gitty_file, exists_or_create(Path, Out0)), 224 ( var(Out0) 225 -> true 226 ; setup_call_cleanup( 227 zopen(Out0, Out, [format(gzip)]), 228 format(Out, '~s~s', [Hdr, Data]), 229 close(Out)) 230 ). 231 232exists_or_create(Path, _Out) :- 233 exists_file(Path), 234 !. 235exists_or_create(Path, Out) :- 236 file_directory_name(Path, Dir), 237 make_directory_path(Dir), 238 open(Path, write, Out, [encoding(utf8), lock(write)]). 239 240:- if(\+current_predicate(ensure_directory/1)). 241% in Library as of SWI-Prolog 9.1.20 242ensure_directory(Dir) :- 243 exists_directory(Dir), 244 !. 245ensure_directory(Dir) :- 246 make_directory(Dir). 247:- endif.
254store_object_raw(Store, Hash, _Bytes, false) :- 255 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack), 256 !. 257store_object_raw(Store, Hash, Bytes, New) :- 258 gitty_object_file(Store, Hash, Path), 259 with_mutex(gitty_file, exists_or_create(Path, Out)), 260 ( var(Out) 261 -> New = false 262 ; call_cleanup( 263 ( set_stream(Out, type(binary)), 264 write(Out, Bytes) 265 ), 266 close(Out)), 267 New = true 268 ).
274load_object(_Store, Hash, Data, Type, Size) :- 275 load_object_from_pack(Hash, Data0, Type0, Size0), 276 !, 277 f(Data0, Type0, Size0) = f(Data, Type, Size). 278load_object(Store, Hash, Data, Type, Size) :- 279 load_object_file(Store, Hash, Data0, Type0, Size0), 280 !, 281 f(Data0, Type0, Size0) = f(Data, Type, Size). 282load_object(Store, Hash, Data, Type, Size) :- 283 redis_db(Store, _, _, _), 284 redis_replicate_get(Store, Hash), 285 load_object_file(Store, Hash, Data, Type, Size). 286 287load_object_file(Store, Hash, Data, Type, Size) :- 288 gitty_object_file(Store, Hash, Path), 289 exists_file(Path), 290 !, 291 setup_call_cleanup( 292 gzopen(Path, read, In, [encoding(utf8)]), 293 read_object(In, Data, Type, Size), 294 close(In)).
300has_object(Store, Hash) :- 301 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack), 302 !. 303has_object(Store, Hash) :- 304 gitty_object_file(Store, Hash, Path), 305 exists_file(Path).
311load_object_raw(_Store, Hash, Bytes) :- 312 load_object_from_pack(Hash, Data, Type, Size), 313 !, 314 object_bytes(Type, Size, Data, Bytes). 315load_object_raw(Store, Hash, Data) :- 316 gitty_object_file(Store, Hash, Path), 317 exists_file(Path), 318 !, 319 setup_call_cleanup( 320 open(Path, read, In, [type(binary)]), 321 read_string(In, _, Data), 322 close(In)).
328object_bytes(Type, Size, Data, Bytes) :-
329 setup_call_cleanup(
330 new_memory_file(MF),
331 ( setup_call_cleanup(
332 open_memory_file(MF, write, Out, [encoding(octet)]),
333 setup_call_cleanup(
334 zopen(Out, ZOut, [format(gzip), close_parent(false)]),
335 ( set_stream(ZOut, encoding(utf8)),
336 format(ZOut, '~w ~d\u0000~w', [Type, Size, Data])
337 ),
338 close(ZOut)),
339 close(Out)),
340 memory_file_to_string(MF, Bytes, octet)
341 ),
342 free_memory_file(MF)).
349load_object_header(Store, Hash, Type, Size) :- 350 gitty_object_file(Store, Hash, Path), 351 setup_call_cleanup( 352 gzopen(Path, read, In, [encoding(utf8)]), 353 read_object_hdr(In, Type, Size), 354 close(In)). 355 356read_object(In, Data, Type, Size) :- 357 read_object_hdr(In, Type, Size), 358 read_string(In, _, Data). 359 360read_object_hdr(In, Type, Size) :- 361 get_code(In, C0), 362 read_hdr(C0, In, Hdr), 363 phrase((nonblanks(TypeChars), " ", integer(Size)), Hdr), 364 atom_codes(Type, TypeChars). 365 366read_hdr(C, In, [C|T]) :- 367 C > 0, 368 !, 369 get_code(In, C1), 370 read_hdr(C1, In, T). 371read_hdr(_, _, []).
378gitty_rescan(Store) :-
379 retractall(store(Store, _)).
@tdb Possibly we need to maintain a cached version of this index to avoid having to open all objects of the gitty store.
390gitty_scan(Store) :- 391 store(Store, _), 392 !, 393 remote_updates(Store). 394gitty_scan(Store) :- 395 with_mutex(gitty, gitty_scan_sync(Store)). 396 397:- thread_local 398 latest/3. 399 400gitty_scan_sync(Store) :- 401 store(Store, _), 402 !. 403gitty_scan_sync(Store) :- 404 redis_db(Store, _, _, _), 405 !, 406 gitty_attach_packs(Store), 407 redis_ensure_heads(Store), 408 get_time(Now), 409 assertz(store(Store, Now)). 410:- if(remote_sync(true)). 411gitty_scan_sync(Store) :- 412 remote_sync(true), 413 !, 414 gitty_attach_packs(Store), 415 restore_heads_from_remote(Store). 416:- endif. 417gitty_scan_sync(Store) :- 418 gitty_attach_packs(Store), 419 read_heads_from_objects(Store).
head(Store,File,Ext,Hash)
relation by reading all
objects and adding a fact for the most recent commit.426read_heads_from_objects(Store) :- 427 gitty_scan_latest(Store), 428 forall(retract(latest(Name, Hash, _Time)), 429 assert_head(Store, Name, Hash)), 430 get_time(Now), 431 assertz(store(Store, Now)). 432 433assert_head(Store, Name, Hash) :- 434 file_name_extension(_, Ext, Name), 435 assertz(head(Store, Name, Ext, Hash)).
443gitty_scan_latest(Store) :-
444 retractall(head(Store, _, _, _)),
445 retractall(latest(_, _, _)),
446 ( gitty_hash(Store, Hash),
447 load_object(Store, Hash, Data, commit, _Size),
448 term_string(Meta, Data, []),
449 _{name:Name, time:Time} :< Meta,
450 ( latest(Name, _, OldTime),
451 OldTime > Time
452 -> true
453 ; retractall(latest(Name, _, _)),
454 assertz(latest(Name, Hash, Time))
455 ),
456 fail
457 ; true
458 ).
465gitty_hash(Store, Hash) :- 466 var(Hash), 467 !, 468 ( gitty_attach_packs(Store), 469 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack) 470 ; gitty_file_object(Store, Hash) 471 ). 472gitty_hash(Store, Hash) :- 473 ( gitty_attach_packs(Store), 474 pack_object(Hash, _Type, _Size, _Offset, Store, _Pack) 475 -> true 476 ; gitty_object_file(Store, Hash, File), 477 exists_file(File) 478 ). 479 480gitty_file_object(Store, Hash) :- 481 access_file(Store, exist), 482 directory_files(Store, Level0), 483 member(E0, Level0), 484 E0 \== '..', 485 atom_length(E0, 2), 486 directory_file_path(Store, E0, Dir0), 487 directory_files(Dir0, Level1), 488 member(E1, Level1), 489 E1 \== '..', 490 atom_length(E1, 2), 491 directory_file_path(Dir0, E1, Dir), 492 directory_files(Dir, Files), 493 member(File, Files), 494 atom_length(File, 36), 495 atomic_list_concat([E0,E1,File], Hash).
501delete_object(Store, Hash) :-
502 gitty_object_file(Store, Hash, File),
503 delete_file(File).
510gitty_object_file(Store, Hash, Path) :- 511 sub_string(Hash, 0, 2, _, Dir0), 512 sub_string(Hash, 2, 2, _, Dir1), 513 sub_string(Hash, 4, _, 0, File), 514 atomic_list_concat([Store, Dir0, Dir1, File], /, Path). 515 516 517 /******************************* 518 * SYNCING * 519 *******************************/
-
.
This operation can fail because another writer has updated the head. This can both be in-process or another process.
531gitty_update_head(Store, Name, OldCommit, NewCommit, DataHash) :- 532 redis_db(Store, _, _, _), 533 !, 534 redis_update_head(Store, Name, OldCommit, NewCommit, DataHash). 535gitty_update_head(Store, Name, OldCommit, NewCommit, _) :- 536 with_mutex(gitty, 537 gitty_update_head_sync(Store, Name, OldCommit, NewCommit)). 538 539:- if(remote_sync(true)). 540gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :- 541 remote_sync(true), 542 !, 543 setup_call_cleanup( 544 heads_output_stream(Store, HeadsOut), 545 gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut), 546 close(HeadsOut)). 547:- endif. 548gitty_update_head_sync(Store, Name, OldCommit, NewCommit) :- 549 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit). 550 551gitty_update_head_sync(Store, Name, OldCommit, NewCommit, HeadsOut) :- 552 gitty_update_head_sync2(Store, Name, OldCommit, NewCommit), 553 format(HeadsOut, '~q.~n', [head(Name, OldCommit, NewCommit)]). 554 555gitty_update_head_sync2(Store, Name, OldCommit, NewCommit) :- 556 gitty_scan(Store), % fetch remote changes 557 ( OldCommit == (-) 558 -> ( head(Store, Name, _, _) 559 -> throw(error(gitty(file_exists(Name),_))) 560 ; assert_head(Store, Name, NewCommit) 561 ) 562 ; ( retract(head(Store, Name, _, OldCommit)) 563 -> assert_head(Store, Name, NewCommit) 564 ; throw(error(gitty(not_at_head(Name, OldCommit)), _)) 565 ) 566 ).
573:- dynamic 574 last_remote_sync/2. 575 576:- if(remote_sync(false)). 577remote_updates(_) :- 578 remote_sync(false), 579 !. 580:- endif. 581remote_updates(Store) :- 582 remote_up_to_data(Store), 583 !. 584remote_updates(Store) :- 585 with_mutex(gitty, remote_updates_sync(Store)). 586 587remote_updates_sync(Store) :- 588 remote_up_to_data(Store), 589 !. 590remote_updates_sync(Store) :- 591 retractall(last_remote_sync(Store, _)), 592 get_time(Now), 593 asserta(last_remote_sync(Store, Now)), 594 remote_update(Store). 595 596remote_up_to_data(Store) :- 597 last_remote_sync(Store, Last), 598 get_time(Now), 599 Now-Last < 1. 600 601remote_update(Store) :- 602 remote_updates(Store, List), 603 maplist(update_head(Store), List). 604 605update_head(Store, head(Name, OldCommit, NewCommit)) :- 606 ( OldCommit == (-) 607 -> \+ head(Store, Name, _, _) 608 ; retract(head(Store, Name, _, OldCommit)) 609 ), 610 !, 611 assert_head(Store, Name, NewCommit). 612update_head(_, _).
621remote_updates(Store, List) :- 622 heads_input_stream(Store, Stream), 623 setup_call_cleanup( 624 '$push_input_context'(gitty_sync), 625 read_new_terms(Stream, List), 626 '$pop_input_context'). 627 628read_new_terms(Stream, Terms) :- 629 read(Stream, First), 630 read_new_terms(First, Stream, Terms). 631 632read_new_terms(end_of_file, _, List) :- 633 !, 634 List = []. 635read_new_terms(Term, Stream, [Term|More]) :- 636 read(Stream, Term2), 637 read_new_terms(Term2, Stream, More). 638 639heads_output_stream(Store, Out) :- 640 heads_file(Store, HeadsFile), 641 open(HeadsFile, append, Out, 642 [ encoding(utf8), 643 lock(exclusive) 644 ]). 645 646heads_input_stream(Store, Stream) :- 647 heads_input_stream_cache(Store, Stream0), 648 !, 649 Stream = Stream0. 650heads_input_stream(Store, Stream) :- 651 heads_file(Store, File), 652 between(1, 2, _), 653 catch(open(File, read, In, 654 [ encoding(utf8), 655 eof_action(reset) 656 ]), 657 _, 658 create_heads_file(Store)), 659 !, 660 assert(heads_input_stream_cache(Store, In)), 661 Stream = In. 662 663create_heads_file(Store) :- 664 call_cleanup( 665 heads_output_stream(Store, Out), 666 close(Out)), 667 fail. % always fail! 668 669heads_file(Store, HeadsFile) :- 670 ensure_directory(Store), 671 directory_file_path(Store, ref, RefDir), 672 ensure_directory(RefDir), 673 directory_file_path(RefDir, head, HeadsFile).
679restore_heads_from_remote(Store) :- 680 heads_file(Store, File), 681 exists_file(File), 682 setup_call_cleanup( 683 open(File, read, In, [encoding(utf8)]), 684 restore_heads(Store, In), 685 close(In)), 686 !, 687 get_time(Now), 688 assertz(store(Store, Now)). 689restore_heads_from_remote(Store) :- 690 read_heads_from_objects(Store), 691 heads_file(Store, File), 692 setup_call_cleanup( 693 open(File, write, Out, [encoding(utf8)]), 694 save_heads(Store, Out), 695 close(Out)), 696 !. 697 698restore_heads(Store, In) :- 699 read(In, Term0), 700 Term0 = epoch(_), 701 read(In, Term1), 702 restore_heads(Term1, In, Store). 703 704restore_heads(end_of_file, _, _) :- !. 705restore_heads(head(File, _, Hash), In, Store) :- 706 retractall(head(Store, File, _, _)), 707 assert_head(Store, File, Hash), 708 read(In, Term), 709 restore_heads(Term, In, Store). 710 711save_heads(Store, Out) :- 712 get_time(Now), 713 format(Out, 'epoch(~0f).~n~n', [Now]), 714 forall(head(Store, File, _, Hash), 715 format(Out, '~q.~n', [head(File, -, Hash)])).
724delete_head(Store, Head) :- 725 redis_db(Store, _, _, _), 726 !, 727 redis_delete_head(Store, Head). 728delete_head(Store, Head) :- 729 retractall(head(Store, Head, _, _)).
735set_head(Store, File, Hash) :- 736 redis_db(Store, _, _, _), 737 !, 738 redis_set_head(Store, File, Hash). 739set_head(Store, File, Hash) :- 740 file_name_extension(_, Ext, File), 741 ( head(Store, File, _, Hash0) 742 -> ( Hash == Hash0 743 -> true 744 ; asserta(head(Store, File, Ext, Hash)), 745 retractall(head(Store, File, _, Hash0)) 746 ) 747 ; asserta(head(Store, File, Ext, Hash)) 748 ). 749 750 751 /******************************* 752 * PACKS * 753 *******************************/ 754 755/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 756 757<pack file> := <header> 758 <file>* 759<header> := "gitty(Version).\n" <object>* "end_of_header.\n" 760<object> := obj(Hash, Type, Size, FileSize) 761- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ 762 763pack_version(1).
778:- debug(gitty(pack)). 779 780repack_objects(Store, Options) :- 781 option(min_files(MinFiles), Options, 1_000), 782 findall(Object, gitty_file_object(Store, Object), Objects), 783 length(Objects, NewFiles), 784 debug(gitty(pack), 'Found ~D file objects', [NewFiles]), 785 ( NewFiles >= MinFiles 786 -> pack_files(Store, ExistingPacks), 787 option(small_pack(MaxSize), Options, 10_000_000), 788 include(small_file(MaxSize), ExistingPacks, PackFiles), 789 ( debugging(gitty(pack)) 790 -> length(PackFiles, PackCount), 791 debug(gitty(pack), 'Found ~D small packs', [PackCount]) 792 ; true 793 ), 794 directory_file_path(Store, pack, PackDir), 795 make_directory_path(PackDir), 796 pack_objects(Store, Objects, PackFiles, PackDir, _PackFile, Options) 797 ; debug(gitty(pack), 'Nothing to do', []) 798 ). 799 800small_file(MaxSize, File) :- 801 size_file(File, Size), 802 Size < MaxSize.
809pack_objects(Store, Objects, Packs, PackDir, PackFile, Options) :- 810 with_mutex(gitty_pack, 811 pack_objects_sync(Store, Objects, Packs, PackDir, 812 PackFile, Options)). 813 814pack_objects_sync(_Store, [], [Pack], _, [Pack], _) :- 815 !. 816pack_objects_sync(Store, Objects, Packs, PackDir, PackFilePath, Options) :- 817 length(Objects, ObjCount), 818 length(Packs, PackCount), 819 debug(gitty(pack), 'Repacking ~D objects and ~D packs', 820 [ObjCount, PackCount]), 821 maplist(object_info(Store), Objects, FileInfo), 822 maplist(pack_info(Store), Packs, PackInfo), 823 append([FileInfo|PackInfo], Info0), 824 sort(1, @<, Info0, Info), % remove possible duplicates 825 ( debugging(gitty(pack)) 826 -> ( PackCount > 0 827 -> length(Info, FinalObjCount), 828 debug(gitty(pack), 'Total ~D objects', [FinalObjCount]) 829 ; true 830 ) 831 ; true 832 ), 833 directory_file_path(PackDir, 'pack-create', TmpPack), 834 setup_call_cleanup( 835 ( open(TmpPack, write, Out0, [type(binary), lock(write)]), 836 open_hash_stream(Out0, Out, [algorithm(sha1)]) 837 ), 838 ( write_signature(Out), 839 maplist(write_header(Out), Info), 840 format(Out, 'end_of_header.~n', []), 841 maplist(add_file(Out, Store), Info), 842 stream_hash(Out, SHA1) 843 ), 844 close(Out)), 845 format(atom(PackFile), 'pack-~w.pack', [SHA1]), 846 directory_file_path(PackDir, PackFile, PackFilePath), 847 rename_file(TmpPack, PackFilePath), 848 debug(gitty(pack), 'Attaching ~p', [PackFilePath]), 849 attach_pack(Store, PackFilePath), 850 remove_objects_after_pack(Store, Objects, Options), 851 delete(Packs, PackFilePath, RmPacks), 852 remove_repacked_packs(Store, RmPacks, Options), 853 debug(gitty(pack), 'Packing completed', []). 854 855object_info(Store, Object, obj(Object, Type, Size, FileSize)) :- 856 gitty_object_file(Store, Object, File), 857 load_object_header(Store, Object, Type, Size), 858 size_file(File, FileSize). 859 860pack_info(Store, PackFile, Objects) :- 861 attach_pack(Store, PackFile), 862 pack_read_header(PackFile, _Version, _DataOffset, Objects). 863 864write_signature(Out) :- 865 pack_version(Version), 866 format(Out, "gitty(~d).~n", [Version]). 867 868write_header(Out, obj(Object, Type, Size, FileSize)) :- 869 format(Out, 'obj(~q,~q,~d,~d).~n', [Object, Type, Size, FileSize]).
875add_file(Out, Store, obj(Object, _Type, _Size, _FileSize)) :- 876 gitty_object_file(Store, Object, File), 877 exists_file(File), 878 !, 879 setup_call_cleanup( 880 open(File, read, In, [type(binary)]), 881 copy_stream_data(In, Out), 882 close(In)). 883add_file(Out, Store, obj(Object, Type, Size, FileSize)) :- 884 pack_object(Object, Type, Size, Offset, Store, PackFile), 885 setup_call_cleanup( 886 open(PackFile, read, In, [type(binary)]), 887 ( seek(In, Offset, bof, Offset), 888 copy_stream_data(In, Out, FileSize) 889 ), 890 close(In)).
897gitty_fsck(Store) :-
898 pack_files(Store, PackFiles),
899 maplist(fsck_pack, PackFiles).
905fsck_pack(File) :- 906 debug(gitty(pack), 'fsck ~p', [File]), 907 check_pack_hash(File), 908 debug(gitty(pack), 'fsck ~p: checking objects', [File]), 909 check_pack_objects(File), 910 debug(gitty(pack), 'fsck ~p: done', [File]). 911 912check_pack_hash(File) :- 913 file_base_name(File, Base), 914 file_name_extension(Plain, Ext, Base), 915 must_be(oneof([pack]), Ext), 916 atom_concat('pack-', Hash, Plain), 917 setup_call_cleanup( 918 ( open(File, read, In0, [type(binary)]), 919 open_hash_stream(In0, In, [algorithm(sha1)]) 920 ), 921 ( setup_call_cleanup( 922 open_null_stream(Null), 923 copy_stream_data(In, Null), 924 close(Null)), 925 stream_hash(In, SHA1) 926 ), 927 close(In)), 928 assertion(Hash == SHA1). 929 930check_pack_objects(PackFile) :- 931 setup_call_cleanup( 932 open(PackFile, read, In, [type(binary)]), 933 ( read_header(In, Version, DataOffset, Objects), 934 set_stream(In, encoding(utf8)), 935 foldl(check_object(In, PackFile, Version), Objects, DataOffset, _) 936 ), 937 close(In)). 938 939check_object(In, PackFile, _Version, 940 obj(Object, Type, Size, FileSize), 941 Offset0, Offset) :- 942 Offset is Offset0+FileSize, 943 byte_count(In, Here), 944 ( Here == Offset0 945 -> true 946 ; print_message(warning, pack(reposition(Here, Offset0))), 947 seek(In, Offset0, bof, Offset0) 948 ), 949 ( setup_call_cleanup( 950 zopen(In, In2, [multi_part(false), close_parent(false)]), 951 catch(read_object(In2, Data, _0RType, _0RSize), E, 952 ( print_message(error, 953 gitty(PackFile, fsck(read_object(Object, E)))), 954 fail)), 955 close(In2)) 956 -> byte_count(In, End), 957 ( End == Offset 958 -> true 959 ; print_message(error, 960 gitty(PackFile, fsck(object_end(Object, End, 961 Offset0, Offset, 962 Data)))) 963 ), 964 assertion(Type == _0RType), 965 assertion(Size == _0RSize), 966 gitty:check_object(Object, Data, Type, Size) 967 ; true 968 ).
975gitty_attach_packs(Store) :- 976 attached_packs(Store), 977 !. 978gitty_attach_packs(Store) :- 979 with_mutex(gitty_attach_packs, 980 gitty_attach_packs_sync(Store)). 981 982gitty_attach_packs_sync(Store) :- 983 attached_packs(Store), 984 !. 985gitty_attach_packs_sync(Store) :- 986 pack_files(Store, PackFiles), 987 maplist(attach_pack(Store), PackFiles), 988 asserta(attached_packs(Store)). 989 990pack_files(Store, Packs) :- 991 directory_file_path(Store, pack, PackDir), 992 exists_directory(PackDir), 993 !, 994 directory_files(PackDir, Files), 995 convlist(is_pack(PackDir), Files, Packs). 996pack_files(_, []). 997 998is_pack(PackDir, File, Path) :- 999 file_name_extension(_, pack, File), 1000 directory_file_path(PackDir, File, Path).
1006attach_pack(Store, PackFile) :- 1007 attached_pack(Store, PackFile), 1008 !. 1009attach_pack(Store, PackFile) :- 1010 retractall(pack_object(_,_,_,_,_,PackFile)), 1011 pack_read_header(PackFile, Version, DataOffset, Objects), 1012 foldl(assert_object(Store, PackFile, Version), Objects, DataOffset, _), 1013 assertz(attached_pack(Store, PackFile)). 1014 1015pack_read_header(PackFile, Version, DataOffset, Objects) :- 1016 setup_call_cleanup( 1017 open(PackFile, read, In, [type(binary)]), 1018 read_header(In, Version, DataOffset, Objects), 1019 close(In)). 1020 1021read_header(In, Version, DataOffset, Objects) :- 1022 read(In, Signature), 1023 ( Signature = gitty(Version) 1024 -> true 1025 ; domain_error(gitty_pack_file, Objects) 1026 ), 1027 read(In, Term), 1028 read_index(Term, In, Objects), 1029 get_code(In, Code), 1030 assertion(Code == 0'\n), 1031 byte_count(In, DataOffset). 1032 1033read_index(end_of_header, _, []) :- 1034 !. 1035read_index(Object, In, [Object|T]) :- 1036 read(In, Term2), 1037 read_index(Term2, In, T). 1038 1039assert_object(Store, Pack, _Version, 1040 obj(Object, Type, Size, FileSize), 1041 Offset0, Offset) :- 1042 Offset is Offset0+FileSize, 1043 assertz(pack_object(Object, Type, Size, Offset0, Store, Pack)).
1049detach_pack(Store, Pack) :-
1050 retractall(pack_object(_, _, _, _, Store, Pack)),
1051 retractall(attached_pack(Store, Pack)).
1057load_object_from_pack(Hash, Data, Type, Size) :- 1058 pack_object(Hash, Type, Size, Offset, _Store, Pack), 1059 setup_call_cleanup( 1060 open(Pack, read, In, [type(binary)]), 1061 read_object_at(In, Offset, Data, Type, Size), 1062 close(In)). 1063 1064read_object_at(In, Offset, Data, Type, Size) :- 1065 seek(In, Offset, bof, Offset), 1066 read_object_here(In, Data, Type, Size). 1067 1068read_object_here(In, Data, Type, Size) :- 1069 stream_property(In, encoding(Enc)), 1070 setup_call_cleanup( 1071 ( set_stream(In, encoding(utf8)), 1072 zopen(In, In2, [multi_part(false), close_parent(false)]) 1073 ), 1074 read_object(In2, Data, Type, Size), 1075 ( close(In2), 1076 set_stream(In, encoding(Enc)) 1077 )).
1084unpack_packs(Store) :-
1085 absolute_file_name(Store, AbsStore, [file_type(directory),
1086 access(read)]),
1087 pack_files(AbsStore, Packs),
1088 maplist(unpack_pack(AbsStore), Packs).
1094unpack_pack(Store, PackFile) :- 1095 pack_read_header(PackFile, _Version, DataOffset, Objects), 1096 setup_call_cleanup( 1097 open(PackFile, read, In, [type(binary)]), 1098 foldl(create_file(Store, In), Objects, DataOffset, _), 1099 close(In)), 1100 detach_pack(Store, PackFile), 1101 delete_file(PackFile). 1102 1103create_file(Store, In, obj(Object, _Type, _Size, FileSize), Offset0, Offset) :- 1104 Offset is Offset0+FileSize, 1105 gitty_object_file(Store, Object, Path), 1106 with_mutex(gitty_file, exists_or_recreate(Path, Out)), 1107 ( var(Out) 1108 -> true 1109 ; setup_call_cleanup( 1110 seek(In, Offset0, bof, Offset0), 1111 copy_stream_data(In, Out, FileSize), 1112 close(Out)) 1113 ). 1114 1115exists_or_recreate(Path, _Out) :- 1116 exists_file(Path), 1117 !. 1118exists_or_recreate(Path, Out) :- 1119 file_directory_name(Path, Dir), 1120 make_directory_path(Dir), 1121 open(Path, write, Out, [type(binary), lock(write)]).
1128remove_objects_after_pack(Store, Objects, Options) :-
1129 debug(gitty(pack), 'Deleting plain files', []),
1130 maplist(delete_object(Store), Objects),
1131 ( option(prune_empty_directories(true), Options, true)
1132 -> debug(gitty(pack), 'Pruning empty directories', []),
1133 prune_empty_directories(Store)
1134 ; true
1135 ).
1141remove_repacked_packs(Store, Packs, Options) :- 1142 maplist(remove_pack(Store, Options), Packs). 1143 1144remove_pack(Store, _Options, Pack) :- 1145 detach_pack(Store, Pack), 1146 delete_file(Pack).
1153prune_empty_directories(Dir) :- 1154 prune_empty_directories(Dir, 0). 1155 1156prune_empty_directories(Dir, Level) :- 1157 directory_files(Dir, AllFiles), 1158 exclude(hidden, AllFiles, Files), 1159 ( Files == [], 1160 Level > 0 1161 -> delete_directory_async(Dir) 1162 ; convlist(prune_empty_directories(Dir, Level), Files, Left), 1163 ( Left == [], 1164 Level > 0 1165 -> delete_directory_async(Dir) 1166 ; true 1167 ) 1168 ). 1169 (.). 1171hidden(..). 1172 1173prune_empty_directories(Parent, Level0, File, _) :- 1174 directory_file_path(Parent, File, Path), 1175 exists_directory(Path), 1176 !, 1177 Level is Level0 + 1, 1178 prune_empty_directories(Path, Level), 1179 fail. 1180prune_empty_directories(_, _, File, File). 1181 1182delete_directory_async(Dir) :- 1183 with_mutex(gitty_file, delete_directory_async2(Dir)). 1184 1185delete_directory_async2(Dir) :- 1186 catch(delete_directory(Dir), E, 1187 ( \+ exists_directory(Dir) 1188 -> true 1189 ; \+ empty_directory(Dir) 1190 -> true 1191 ; throw(E) 1192 )). 1193 1194empty_directory(Dir) :- 1195 directory_files(Dir, AllFiles), 1196 exclude(hidden, AllFiles, []). 1197 1198 1199 /******************************* 1200 * REDIS PRIMITIVES * 1201 *******************************/ 1202 1203redis_head_db(Store, DB, Key) :- 1204 redis_db(Store, DB, _, Prefix), 1205 string_concat(Prefix, ":gitty:head", Key). 1206 1207redis_head_db_ro(Store, DB, Key) :- 1208 redis_db(Store, _, DB, Prefix), 1209 string_concat(Prefix, ":gitty:head", Key).
1214redis_file(Store, Name, Ext, Hash) :- 1215 nonvar(Name), 1216 !, 1217 file_name_extension(_Base, Ext, Name), 1218 redis_head_db_ro(Store, DB, Heads), 1219 redis(DB, hget(Heads, Name), Hash as atom). 1220redis_file(Store, Name, Ext, Hash) :- 1221 nonvar(Ext), 1222 !, 1223 string_concat("*.", Ext, Pattern), 1224 redis_head_db_ro(Store, DB, Heads), 1225 redis_hscan(DB, Heads, LazyList, [match(Pattern)]), 1226 member(NameS-HashS, LazyList), 1227 atom_string(Name, NameS), 1228 atom_string(Hash, HashS). 1229redis_file(Store, Name, Ext, Hash) :- 1230 nonvar(Hash), 1231 !, 1232 load_plain_commit(Store, Hash, Commit), 1233 Name = Commit.name, 1234 file_name_extension(_Base, Ext, Name). 1235redis_file(Store, Name, Ext, Hash) :- 1236 redis_head_db_ro(Store, DB, Heads), 1237 redis(DB, hgetall(Heads), Pairs as pairs(atom,atom)), 1238 member(Name-Hash, Pairs), 1239 file_name_extension(_Base, Ext, Name).
1246redis_ensure_heads(Store) :- 1247 redis_head_db_ro(Store, DB, Key), 1248 redis(DB, exists(Key), 1), 1249 !. 1250redis_ensure_heads(Store) :- 1251 redis_head_db(Store, DB, Key), 1252 debug(gitty(redis), 'Initializing gitty heads in ~p ...', [Key]), 1253 gitty_scan_latest(Store), 1254 forall(retract(latest(Name, Hash, _Time)), 1255 redis(DB, hset(Key, Name, Hash))), 1256 debug(gitty(redis), '... finished gitty heads', []).
1260redis_update_head(Store, Name, -, NewCommit, DataHash) :- 1261 !, 1262 redis_head_db(Store, DB, Key), 1263 redis(DB, hset(Key, Name, NewCommit)), 1264 publish_objects(Store, [NewCommit, DataHash]). 1265redis_update_head(Store, Name, OldCommit, NewCommit, DataHash) :- 1266 redis_head_db(Store, DB, Key), 1267 redis_hcas(DB, Key, Name, OldCommit, NewCommit), 1268 publish_objects(Store, [NewCommit, DataHash]).
1274redis_delete_head(Store, Head) :-
1275 redis_head_db(Store, DB, Key),
1276 redis(DB, hdel(Key, Head)).
1280redis_set_head(Store, File, Hash) :- 1281 redis_head_db(Store, DB, Key), 1282 redis(DB, hset(Key, File, Hash)). 1283 1284 /******************************* 1285 * REPLICATE * 1286 *******************************/
We could improve on this two ways: (1) put the hash published in a short-lived key on Redis and make others check that. That is likely to avoid many nodes sending the same object or (2) see how many nodes are in the pool and switch to a consumer group based approach if this number is high (and thus we are unlikely to be asked ourselves for the missing hash).
1312:- multifile 1313 swish_redis:stream/2. 1314 1315swish_redisstream('gitty:replicate', [maxlen(100)]). 1316 1317:- listen(http(pre_server_start(_)), 1318 init_replicator). 1319 1320init_replicator :- 1321 redis_swish_stream('gitty:replicate', ReplKey), 1322 listen(redis(_Redis, ReplKey, _Id, Data), 1323 replicate(Data)), 1324 listen(redis(_, 'swish:gitty', Message), 1325 gitty_message(Message)), 1326 message_queue_create(_, [alias(gitty_queue)]). 1327 1328:- debug(gitty(replicate)). 1329 1330gitty_message(discover(Hash)) :- 1331 debug(gitty(replicate), 'Discover: ~p', [Hash]), 1332 store(Store, _), 1333 load_object_raw(Store, Hash, Data), 1334 debug(gitty(replicate), 'Sending object ~p', [Hash]), 1335 redis(swish, publish(swish:gitty, object(Hash, Data) as prolog)). 1336gitty_message(object(Hash, Data)) :- 1337 debug(gitty(replicate), 'Replicate: ~p', [Hash]), 1338 redis_db(Store, _DB, _RO, _Prefix), 1339 store_object_raw(Store, Hash, Data, New), 1340 debug(gitty(replicate), 'Received object ~p (new=~p)', [Hash, New]), 1341 ( New == true 1342 -> thread_send_message(gitty_queue, Hash) 1343 ; true 1344 ).
discover
request for the hash. The replies are picked up by
gitty_message/1 above.
The code may be subject to various race conditions, but fortunately objects are immutable. It also seems possible that the Redis stream gets lost. Not sure when and how. For now, we restart if we get no reply, but nore more than once per minute.
1357redis_replicate_get(Store, Hash) :- 1358 is_gitty_hash(Hash), 1359 redis(swish, publish(swish:gitty, discover(Hash) as prolog), Count), 1360 Count > 1, % If I'm alone it won't help ... 1361 between(1, 100, _), 1362 ( thread_get_message(gitty_queue, Hash, 1363 [ timeout(0.1) 1364 ]) 1365 -> ! 1366 ; has_object(Store, Hash) 1367 -> ! 1368 ; restart_pubsub, 1369 fail 1370 ). 1371 1372:- dynamic 1373 restarted/1. 1374 1375restart_pubsub :- 1376 ( restarted(When) 1377 -> get_time(Now), 1378 Now-When < 60, 1379 ! 1380 ). 1381restart_pubsub :- 1382 get_time(Now), 1383 transaction(( retractall(restarted(_)), 1384 asserta(restarted(Now)))), 1385 thread_signal(redis_pubsub, throw(error(io_error(read, _),_))), 1386 sleep(0.05).
This realized eager replication as opposed to the above code (redis_replicate_get/2) which performs lazy replication. Eager replication ensure the object is on multiple places in the event that the node on which it was saved dies shortly after.
Note that we also receive the object we just saved. That is unavoidable in a network where all nodes are equal.
1403publish_objects(Store, Hashes) :- 1404 redis_swish_stream('gitty:replicate', ReplKey), 1405 maplist(publish_object(Store, ReplKey), Hashes). 1406 1407publish_object(Store, Stream, Hash) :- 1408 load_object_raw(Store, Hash, Data), 1409 debug(gitty(replicate), 'Sending ~p to ~p', [Hash, Stream]), 1410 xadd(swish, Stream, _, _{hash:Hash, data:Data}).
1418replicate(Data) :- 1419 redis_db(Store, _DB, _RO, _Prefix), 1420 atom_string(Hash, Data.hash), 1421 store_object_raw(Store, Hash, Data.data, _0New), 1422 debug(gitty(replicate), 'Received object ~p (new=~p)', 1423 [Hash, _0New]). 1424 1425 1426 /******************************* 1427 * REDIS BASICS * 1428 *******************************/
1434redis_hcas(DB, Hash, Key, Old, New) :-
1435 redis(DB, eval("if redis.call('HGET', KEYS[1], ARGV[1]) == ARGV[2] then \c
1436 redis.call('HSET', KEYS[1], ARGV[1], ARGV[3]); \c
1437 return 1; \c
1438 end; \c
1439 return 0\c
1440 ",
1441 1, Hash, Key, Old, New),
1442 1)
Gitty plain files driver
This version of the driver uses plain files to store the gitty data. It consists of a nested directory structure with files named after the hash. Objects and hash computation is the same as for
git
. The heads (files) are computed on startup by scanning all objects. There is a fileref/head
that is updated if a head is updated. Other clients can watch this file and update their notion of the head. This implies that the store can handle multiple clients that can access a shared file system, optionally shared using NFS from different machines.The store is simple and robust. The main disadvantages are long startup times as the store holds more objects and relatively high disk usage due to rounding the small objects to disk allocation units.