-
Notifications
You must be signed in to change notification settings - Fork 29
/
transactions.lisp
1402 lines (1212 loc) · 48.3 KB
/
transactions.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(in-package :graph-db)
(defvar *transaction* nil)
(defvar *end-of-transaction-action* '%commit)
(defparameter *maximum-transaction-attempts* 8
"The number of times a transaction is retried after failing
validation before it is forced to run within an exclusive lock.")
(defparameter *add-to-indexes-unless-present-p* nil
"When true, add nodes to the type indexes with a check for
unqiueness in the index. Needed when potentially recovering from a
transaction multiple times, e.g. if the recovery crashes and has to
be restarted.")
;;; Psyching-out set-byte
(defmethod set-byte ((array array) offset byte)
(setf (aref array offset) byte))
(defmethod get-byte ((array array) offset)
(aref array offset))
(defmethod serialize-uint64 ((array array) int offset)
(setf (aref array (+ offset 0)) (ldb (byte 8 0) int))
(setf (aref array (+ offset 1)) (ldb (byte 8 8) int))
(setf (aref array (+ offset 2)) (ldb (byte 8 16) int))
(setf (aref array (+ offset 3)) (ldb (byte 8 24) int))
(setf (aref array (+ offset 4)) (ldb (byte 8 32) int))
(setf (aref array (+ offset 5)) (ldb (byte 8 40) int))
(setf (aref array (+ offset 6)) (ldb (byte 8 48) int))
(setf (aref array (+ offset 7)) (ldb (byte 8 56) int)))
;;; Object sets keep track of transaction read sets and write sets to
;;; aid in validating transactions.
;;;
;;; Initial implementation as a hash table is for simplicity. Many
;;; other data structures can be used for performance if needed.
(defgeneric make-object-set (initial-contents))
(defgeneric object-set-count (set))
(defgeneric object-set-list (set))
(defgeneric object-set-empty-p (set)
(:method (set)
(zerop (object-set-count set))))
(defgeneric add-to-object-set (object set))
(defgeneric object-set-member-p (object set))
(defgeneric call-for-object-set-objects (fun set))
(defmacro do-object-set ((object set) &body body)
`(block nil
(call-for-object-set-objects (lambda (,object) ,@body)
,set)))
(defgeneric object-sets-intersect-p (set1 set2)
(:method (set1 set2)
(do-object-set (object set1)
(when (object-set-member-p object set2)
(return t)))))
(defclass object-set ()
((table
:initform (make-id-table)
:reader table)))
(defmethod object-set-list ((set object-set))
(alexandria:hash-table-values (table set)))
(defmethod object-set-count ((set object-set))
(hash-table-count (table set)))
(defmethod print-object ((set object-set) stream)
(print-unreadable-object (set stream :type t)
(cond ((plusp (object-set-count set))
(format stream "[~{~A~^ ~}]" (object-set-list set)))
(t
(format stream "empty")))))
(defmethod make-object-set (initial-contents)
(let ((set (make-instance 'object-set)))
(dolist (object initial-contents set)
(add-to-object-set object set))))
(defmethod add-to-object-set (object (set object-set))
(setf (gethash (id object) (table set)) object))
(defmethod object-set-member-p (object (set object-set))
(nth-value 1 (gethash (id object) (table set))))
(defmethod call-for-object-set-objects (fun (set object-set))
(maphash (lambda (key object)
(declare (ignore key))
(funcall fun object))
(table set)))
;;; Replication - the bulk is in transaction-streaming.lisp
(defgeneric replicate-transaction (transaction graph))
;;; Transaction conditions
(define-condition validation-conflict (error)
((transaction
:initarg :transaction
:reader validation-conflict-transaction)))
(define-condition no-transaction-in-progress (error) ())
(define-condition no-transaction-in-progress-warning (warning) ()
(:report
(lambda (condition stream)
(declare (ignore condition))
(format stream "No transaction in progress; copy cannot be saved"))))
(define-condition modifying-non-copy (error)
((node
:initarg :node
:reader modifying-non-copy-node))
(:report (lambda (condition stream)
(format stream "Modifying ~A without copying first"
(modifying-non-copy-node condition)))))
;;; Transaction manager
(defgeneric create-transaction (transaction-manager))
(defgeneric cleanup-transaction (transaction))
(defgeneric graph (object)
(:documentation
"Return the associated graph of OBJECT."))
(defgeneric overlapping-transactions (transaction transaction-manager)
(:documentation
"Return a list of committed transactions that may affect
TRANSACTION."))
(defgeneric transaction-lock (transaction))
(defgeneric call-with-transaction-lock (transaction fun)
(:method (transaction fun)
;; TODO: This can have a coarse lock during recovery and fine
;; locks during normal use
(with-write-lock ((transaction-lock transaction))
(funcall fun))))
(defmacro with-transaction-lock ((transaction) &body body)
`(call-with-transaction-lock ,transaction
(lambda ()
,@body)))
(defgeneric assign-transaction-id (transaction transaction-manager)
(:method (transaction transaction-manager)
(let ((new-id (tx-id-counter transaction-manager)))
(setf (transaction-id transaction) new-id)
(incf (tx-id-counter transaction-manager))
new-id)))
;;; Transactions
(defgeneric transaction-manager (object)
(:documentation
"Return the transaction manager of OBJECT."))
(defgeneric state (transaction))
(defgeneric (setf state) (transaction new-value))
(defgeneric sequence-number (transaction))
(defgeneric (setf sequence-number) (transaction new-value))
(defgeneric read-set (transaction))
(defgeneric create-set (transaction))
(defgeneric write-set (transaction))
(defgeneric local-cache (transaction))
(defgeneric graph-cache (transaction))
(defgeneric lookup-object (id table transaction graph)
(:method (id table (transaction null) graph)
(lookup-node table id graph))
(:method (id table transaction (graph t))
(let ((local-cache (local-cache transaction))
(graph-cache (graph-cache transaction)))
(let ((local (gethash id local-cache)))
(if local
local
(let ((value (or (gethash id graph-cache)
(lookup-node table id (graph transaction)))))
(when value
(add-to-object-set value (read-set transaction))
(setf (gethash id local-cache) value))))))))
(defgeneric write-object (object transaction))
(defgeneric writes (transaction)
(:method (transaction)
(append (object-set-list (create-set transaction))
(object-set-list (write-set transaction)))))
(defgeneric write-count (transaction)
(:method (transaction)
(+ (object-set-count (create-set transaction))
(object-set-count (write-set transaction)))))
(defgeneric (setf writes) (new-value transaction))
(defgeneric validate (transaction)
(:method (transaction)
(let ((write-set (write-set transaction)))
(or (zerop (object-set-count write-set))
(loop for other-transaction in (overlapping-transactions
transaction
(transaction-manager transaction))
never (object-sets-intersect-p write-set
(read-set other-transaction))
never (object-sets-intersect-p write-set
(write-set other-transaction)))))))
(defgeneric %commit (transaction))
(defgeneric %rollback (transaction))
(defgeneric call-with-transaction (fun transaction-manager)
(:documentation "Call FUN with *TRANSACTION* bound to a new
transaction created from TRANSACTION-MANAGER."))
(defmacro with-transaction ((&optional (transaction-manager '(transaction-manager *graph*)))
&body body)
`(call-with-transaction (lambda () ,@body) ,transaction-manager))
(defclass tx ()
((read-set
:initarg :read-set
:reader read-set)
(create-set
:initarg :create-set
:reader create-set)
(write-set
:initarg :write-set
:reader write-set)
(transaction-lock
:initarg :transaction-lock
:reader transaction-lock)
(local-cache
:initarg :local-cache
:reader local-cache)
(copies
:initarg :copies
:reader copies
:documentation "A node to be modified must first be copied via
COPY, which places it in this EQ hash table. UPDATE-NODE will
refer to this copy when persisting the transaction.")
(graph-cache
:initarg :graph-cache
:reader graph-cache)
(graph
:initarg :graph
:reader graph)
(transaction-manager
:initarg :transaction-manager
:reader transaction-manager)
(state
:initarg :state
:accessor state)
(sequence-number
:initarg :sequence-number
:accessor sequence-number)
(start-tx-id
:initarg :start-tx-id
:reader start-tx-id
:documentation "The value of the tx-id-counter when this
transaction was created.")
(finish-tx-id
:initarg :finish-tx-id
:accessor finish-tx-id
:documentation "The value of the tx-id-counter when this
transaction is ended.")
(transaction-id
:initarg :tx-id
:accessor transaction-id
:documentation "A transaction-id is assigned from the transaction
manager tx-id-counter only after a transaction has been
validated.")
(bytes-components
:initarg :bytes-components
:initform '()
:accessor bytes-components
:documentation "A list of vectors that will be concatenated to
form BYTES during persisting.")
(bytes
:initarg :bytes
:accessor bytes
:documentation "BYTES has a serialization of the transaction after
it has been committed."))
(:default-initargs
:read-set (make-object-set nil)
:create-set (make-object-set nil)
:write-set (make-object-set nil)
:transaction-lock (make-rw-lock)
:local-cache (make-id-table)
:copies (make-hash-table)
:state :init))
(defmethod print-object ((transaction tx) stream)
(print-unreadable-object (transaction stream :type t :identity t)
(format stream "~D: ~D read~:P, ~D create~:P, ~D write~:P, ~S"
(sequence-number transaction)
(object-set-count (read-set transaction))
(object-set-count (create-set transaction))
(object-set-count (write-set transaction))
(state transaction))))
;;; Applying transaction writes to the graph
(defun maybe-initialize-bytes (node)
"Initialize the BYTES slot of NODE, if necessary."
(let ((data (data node))
(bytes (bytes node)))
(when (and data
(or (eql bytes :init)
(null bytes)))
(setf (bytes node) (serialize data)))))
(defun maybe-allocate-for-node (node graph)
"Allocate heap storage and initialize the data pointer for NODE, if
needed."
(maybe-initialize-bytes node)
(setf (data-pointer node)
(if (data node)
(allocate (heap graph) (length (bytes node)))
0)))
;;; FIXME: Find a better home for this method
(defmethod set-bytes ((memory memory) vec offset length)
(declare (type word offset length))
(dotimes (i length)
(set-byte memory (+ i offset) (aref vec i)))
vec)
(defun maybe-write-to-heap (node graph)
"Write the heap data for NODE to the heap, if necesssary. Nodes with
no data are not written."
(let ((data-pointer (maybe-allocate-for-node node graph)))
(unless (zerop data-pointer)
(let ((bytes (bytes node)))
(set-bytes (heap graph) bytes data-pointer (length bytes))))))
(defun maybe-free-from-heap (node graph)
"Free the heap space used by NODE, if necessary."
(let ((data-pointer (data-pointer node)))
(unless (zerop data-pointer)
(handler-case
(free (heap graph) data-pointer)
(error (c)
(log:error "Unable to free ~A (~A): ~A" (string-id node) data-pointer c))))))
(defgeneric add-node-to-indexes (node graph &key unless-present)
(:method ((node node) graph &key unless-present)
(add-to-type-index node graph :unless-present unless-present)
(setf (type-idx-written-p node) t))
(:method ((node edge) graph &key unless-present)
(call-next-method)
(add-to-vev-index node graph :unless-present unless-present)
(setf (vev-written-p node) t)
(add-to-ve-index node graph :unless-present unless-present)
(setf (ve-written-p node) t)))
;;; tx-writes have enough information to update the graph database and
;;; its views.
(defclass tx-write ()
((node
:initarg :node
:reader node)))
(defmethod id ((write tx-write))
(id (node write)))
(defmethod print-object ((write tx-write) stream)
(print-unreadable-object (write stream :type t)
(format stream "for ~A ~A"
(class-name (class-of (node write)))
(string-id (id write)))))
(defclass tx-create (tx-write) ())
(defclass tx-update (tx-write)
((old-node
:initarg :old-node
:reader old-node)))
(defclass tx-delete (tx-update) ())
(defgeneric tx-write-table (object graph)
(:method ((edge edge) graph)
(edge-table graph))
(:method ((vertex vertex) graph)
(vertex-table graph))
(:method ((write tx-write) graph)
(tx-write-table (node write) graph)))
(defgeneric apply-tx-write (tx-write graph))
(defmethod apply-tx-write :after ((write tx-write) graph)
(let ((node (node write)))
(setf (gethash (id node) (cache graph)) node)))
(defmethod apply-tx-write ((write tx-create) graph)
(let ((table (tx-write-table write graph))
(node (node write)))
(setf (revision node) 0)
(maybe-write-to-heap node graph)
(add-node-to-indexes node graph
:unless-present *add-to-indexes-unless-present-p*)
(handler-case
(lhash-insert table (id node) node)
(duplicate-key-error (condition)
(declare (ignore condition))
(lhash-update table (id node) node)))
(finalize-node node table graph))
write)
(defmethod apply-tx-write ((write tx-update) graph)
(let ((new-node (node write))
(old-node (old-node write))
(table (tx-write-table write graph)))
(setf (revision new-node)
(ldb (byte 32 0) (1+ (revision old-node))))
(setf (bytes new-node)
(serialize (data new-node)))
(maybe-write-to-heap new-node graph)
(lhash-update table (id new-node) new-node)
;; KTR: moved to post-view generation
;;(maybe-free-from-heap old-node graph)
)
write)
;;; Applying transaction view updates
(defgeneric call-for-applicable-views (fun graph node)
(:method (fun graph (node node))
(loop
for view in (lookup-views graph node)
when view do (funcall fun view))))
(defmacro do-applicable-views ((view graph node) &body body)
`(call-for-applicable-views (lambda (,view) ,@body)
,graph ,node))
(defgeneric applicable-views (node graph)
(:method (node graph)
(let ((result '()))
(do-applicable-views (view graph node)
(push view result))
(nreverse result))))
(defgeneric apply-tx-write-to-views (write graph))
(defmethod apply-tx-write-to-views ((write tx-create) graph)
(let ((node (node write)))
;; (do-applicable-views (view graph node)
;; (add-to-view graph view node))))
(log:debug "Apply ~A to views for ~A" write (type-of node))
(add-to-views graph node)))
(defmethod apply-tx-write-to-views ((write tx-update) graph)
(let ((new-node (node write))
(old-node (old-node write)))
;; (do-applicable-views (view graph new-node)
;; (remove-from-view graph view old-node)
;; (add-to-view graph view new-node))))
(log:debug "Apply ~A to views for ~A" write (type-of node))
(update-in-views graph new-node old-node)))
(defmethod apply-tx-write-to-views ((write tx-delete) graph)
(let ((node (node write)))
;; (do-applicable-views (view graph node)
;; (remove-from-view graph view node))))
(remove-from-views graph node)))
;;; Applying the transaction
(defvar *highest-transaction-id-lock*
(make-recursive-lock "transaction id file"))
(defgeneric highest-transaction-id-file (graph)
(:method (graph)
(make-pathname :name "transaction-id"
:type "dat"
:defaults (location graph))))
(defgeneric persist-highest-transaction-id (transaction-id graph)
(:method (transaction-id graph)
(let ((persist-file (highest-transaction-id-file graph))
(serialized (make-byte-vector 8)))
(serialize-uint64 serialized transaction-id 0)
(with-open-file (stream persist-file
:direction :output
:element-type '(unsigned-byte 8)
:if-does-not-exist :create
:if-exists :overwrite)
(with-recursive-lock-held (*highest-transaction-id-lock*)
(write-sequence serialized stream)))
transaction-id)))
(defgeneric load-highest-transaction-id (graph)
(:method (graph)
(let ((persist-file (highest-transaction-id-file graph))
(serialized (make-byte-vector 8)))
(with-recursive-lock-held (*highest-transaction-id-lock*)
(if (probe-file persist-file)
(with-open-file (stream persist-file
:direction :input
:element-type '(unsigned-byte 8))
(let ((offset (read-sequence serialized stream)))
(unless (= offset (length serialized))
(error "Bad read-sequence from transaction id file"))
(deserialize-uint64 serialized 0)))
0)))))
(defgeneric apply-tx-writes (writes graph)
(:method (writes graph)
(dolist (write writes)
(apply-tx-write write graph))))
(defgeneric apply-tx-writes-to-views (writes graph)
(:method (writes graph)
(dolist (write writes)
(apply-tx-write-to-views write graph))))
(defgeneric garbage-collect-heap (writes graph)
(:method (writes graph)
(let ((old-nodes (delete-duplicates
(mapcar 'old-node
(remove-if-not (lambda (write)
(typep write 'tx-update))
writes)))))
(dolist (old-node old-nodes)
(maybe-free-from-heap old-node graph)))))
(defgeneric apply-transaction (transaction graph)
(:method (transaction graph)
(with-transaction-lock (transaction)
(let ((writes (writes transaction)))
(apply-tx-writes writes graph)
(apply-tx-writes-to-views writes graph)
(garbage-collect-heap writes graph)
(persist-highest-transaction-id (transaction-id transaction) graph)))))
(defmethod apply-transaction :after (transaction (graph master-graph))
(replicate-transaction transaction graph))
;;;
;;; Serializing a transaction
;;;
;;; A transaction is serialized as a header chunk followed by a number
;;; of tx-write chunks.
;;;
;;; The transaction header is as follows:
;;;
;;; - 8 bytes for the header size (fixed)
;;;
;;; - 1 byte for flags; currently unused
;;;
;;; - 1 byte for type (#\x72)
;;;
;;; - 8 bytes for the transaction id
;;;
;;; - 8 bytes for the tx-write count
;;;
;;; - 8 bytes for the total size of following tx-writes
;;;
(defclass tx-header ()
((transaction-id
:initarg :transaction-id
:reader transaction-id)
(write-count
:initarg :write-count
:accessor write-count)
(write-size
:initarg :write-size
:accessor write-size
:documentation "The number of bytes of serialized tx-write data
following the tx-header.")
(writes
:initarg :writes
:accessor writes)
(graph
:initarg :graph
:accessor graph)))
(alexandria:define-constant +tx-header-size+ (+ 8 1 1 8 8 8))
(alexandria:define-constant +tx-header-type-code+ (char-code #\t))
(defun make-tx-header-vector ()
(make-byte-vector +tx-header-size+))
(defun serialize-tx-header (tx vector offset)
(serialize-uint64 vector +tx-header-size+ offset)
(incf offset 8)
;; Skip flags
(incf offset)
(setf (aref vector offset) +tx-header-type-code+)
(incf offset)
(serialize-uint64 vector (transaction-id tx) offset)
(incf offset 8)
(let ((writes (writes tx)))
(serialize-uint64 vector (length writes) offset)
(incf offset 8)
(let ((total-size (reduce #'+ writes :key 'tx-write-vector-size)))
(serialize-uint64 vector total-size offset)
(incf offset 8)
(values vector offset))))
(defun deserialize-tx-header-vector (vector)
;; Skip the size and flags and type
(let ((offset 10))
(let ((transaction-id (deserialize-uint64 vector offset))
(write-count (deserialize-uint64 vector (+ offset 8)))
(write-size (deserialize-uint64 vector (+ offset 16))))
(make-instance 'tx-header
:transaction-id transaction-id
:write-count write-count
:write-size write-size))))
(defun tx-header-vector (tx)
(let ((vector (make-tx-header-vector)))
(values (serialize-tx-header tx vector 0))))
(defun read-uint64-sized-vector (stream)
(let ((size-vector (make-byte-vector 8)))
(let ((last-position (read-sequence size-vector stream)))
(when (= 0 last-position)
(return-from read-uint64-sized-vector nil))
(unless (= 8 last-position)
(error "Could not read size information from ~A" stream))
(let* ((size (deserialize-uint64 size-vector 0))
(vector (make-byte-vector size)))
(setf last-position (read-sequence vector stream :start 8))
(unless (= last-position size)
(error "Could not read to ~D (got ~D) bytes from ~A"
size last-position stream))
(replace vector size-vector)
vector))))
;;; Serializing tx-writes
;;;
;;; A tx-write consists of a header followed by one or two serialized
;;; nodes.
;;;
;;; tx-write header:
;;;
;;; - 8 bytes for total tx-write size
;;;
;;; - 1 byte for flags; currently unused
;;;
;;; - 1 byte for type (#x63 for create, #x75 for update, #x64 for delete)
;;;
;;; - 1 byte for node count
;;;
;;; A node:
;;;
;;; - 8 bytes for size
;;;
;;; - 1 byte flags; currently unused
;;;
;;; - 1 byte for type (#x65 for edge, #x76 for vertex)
;;;
;;; - 16 bytes for uuid
;;;
;;; - 1 bytes for node header size
;;;
;;; - N bytes for node header
;;;
;;; - M bytes for node heap value
;;;
;;; Serialize a single node
(alexandria:define-constant +transaction-node-base-header-size+
(+ 8 1 16 1 1))
(alexandria:define-constant +transaction-node-edge-code+ (char-code #\e))
(alexandria:define-constant +transaction-node-vertex-code+ (char-code #\v))
(defun transaction-node-header-size (node)
(etypecase node
(edge +edge-header-size+)
(vertex +node-header-size+)))
(defun transaction-node-vector-size (node)
(+ +transaction-node-base-header-size+
(transaction-node-header-size node)
(if (typep (bytes node) 'sequence)
(length (bytes node))
0)))
(defun transaction-node-type-code (node)
(etypecase node
(edge +transaction-node-edge-code+)
(vertex +transaction-node-vertex-code+)))
(defun serialize-transaction-node-header (node vector offset)
(etypecase node
(edge
(serialize-edge-head vector node offset))
(vertex
(serialize-node-head vector node offset))))
(defun serialize-transaction-uuid (uuid vector offset)
(replace vector uuid :start1 offset))
(defun deserialize-transaction-uuid (vector offset)
(subseq vector offset (+ offset 16)))
(defun serialize-transaction-node (node vector offset)
(let* ((size (transaction-node-vector-size node))
(type-code (transaction-node-type-code node))
(header-size (transaction-node-header-size node))
(bytes (bytes node))
(flags 0))
;; 8 byte size
(serialize-uint64 vector size offset)
(incf offset 8)
;; 1 byte (unused) flags
(set-byte vector offset flags)
(incf offset)
;; 1 byte type
(set-byte vector offset type-code)
(incf offset)
;; 16 byte uuid
(serialize-transaction-uuid (id node) vector offset )
(incf offset 16)
;; 1 byte node header size
(set-byte vector offset header-size)
(incf offset)
;; header-size bytes of node header
(serialize-transaction-node-header node vector offset)
(incf offset header-size)
;; (length bytes) of node bytes
(when (typep bytes 'sequence)
(replace vector bytes :start1 offset)
(incf offset (length bytes)))
offset))
(defun transaction-node-vector (node)
(let* ((size (transaction-node-vector-size node))
(vector (make-byte-vector size))
(offset 0))
(serialize-transaction-node node vector offset)
vector))
(defun deserialize-edge-transaction-node-vector (vector id
header-offset
data-offset
end)
(let* ((edge (deserialize-edge-head vector header-offset))
(bytes (subseq vector data-offset end)))
(setf (id edge) id)
(if (> (length bytes) 0)
(progn
(setf (data edge) (deserialize bytes))
(setf (bytes edge) bytes))
(setf (data edge) nil))
edge))
(defun deserialize-vertex-transaction-node-vector (vector id
header-offset
data-offset
end)
(let ((vertex (deserialize-vertex-head vector header-offset))
(bytes (subseq vector data-offset end)))
(setf (id vertex) id)
(if (> (length bytes) 0)
(progn
(setf (data vertex) (deserialize bytes))
(setf (bytes vertex) bytes))
(setf (data vertex) nil))
vertex))
(defun deserialize-transaction-node-vector (vector &optional (offset 0))
"Return the edge or vertex represented by VECTOR."
(let (size uuid type header-size end)
(setf size (deserialize-uint64 vector offset))
(setf end (+ offset size))
(incf offset 8)
;; Skip flags
(incf offset)
(setf type (get-byte vector offset))
(incf offset)
(setf uuid (deserialize-transaction-uuid vector offset))
(incf offset 16)
(setf header-size (get-byte vector offset))
(incf offset)
(let* ((header-offset offset)
(data-offset (+ offset header-size))
(node
(cond ((eql type +transaction-node-edge-code+)
(deserialize-edge-transaction-node-vector vector
uuid
header-offset
data-offset
end))
((eql type +transaction-node-vertex-code+)
(deserialize-vertex-transaction-node-vector vector
uuid
header-offset
data-offset
end))
(t
(error "Unknown transaction node type ~S" type)))))
(values node end))))
;;; Serialize a tx-write
(alexandria:define-constant +tx-write-header-size+ (+ 8 1 1 1))
(alexandria:define-constant +tx-write-create-code+ (char-code #\c))
(alexandria:define-constant +tx-write-update-code+ (char-code #\u))
(alexandria:define-constant +tx-write-delete-code+ (char-code #\d))
(defun tx-write-vector-size (tx-write)
(+ +tx-write-header-size+
(transaction-node-vector-size (node tx-write))
(if (typep tx-write 'tx-update)
(transaction-node-vector-size (old-node tx-write))
0)))
(defgeneric tx-write-vector-code (tx-write)
(:method ((write tx-create))
+tx-write-create-code+)
(:method ((write tx-update))
+tx-write-update-code+)
(:method ((write tx-delete))
+tx-write-delete-code+))
(defgeneric tx-write-node-count (tx-write)
(:method ((write tx-write))
1)
(:method ((write tx-update))
2))
(defun tx-write-vector (tx-write)
"Serialize TX-WRITE to a byte vector and return the vector."
(let* ((size (tx-write-vector-size tx-write))
(vector (make-byte-vector size))
(node-count (tx-write-node-count tx-write))
(offset 0))
;; 8 byte size
(serialize-uint64 vector size offset)
(incf offset 8)
;; 1 byte flag (unused)
(incf offset 1)
;; 1 byte type code
(set-byte vector offset (tx-write-vector-code tx-write))
(incf offset 1)
;; 1 byte count
(set-byte vector offset node-count)
(incf offset 1)
;; Nodes
(setf offset (serialize-transaction-node (node tx-write)
vector
offset))
(when (= 2 node-count)
(setf offset (serialize-transaction-node (old-node tx-write)
vector
offset)))
(values vector offset)))
(defun tx-write-class (type-code)
"Return the class name designated by TYPE-CODE."
(cond ((eql type-code +tx-write-create-code+)
'tx-create)
((eql type-code +tx-write-update-code+)
'tx-update)
((eql type-code +tx-write-delete-code+)
'tx-delete)
(t
(error "Unknown type code ~S" type-code))))
(defun deserialize-tx-write-vector (vector)
;; Skip the size and flags
(let* ((offset 9)
type-code count node class)
(setf type-code (get-byte vector offset))
(setf class (tx-write-class type-code))
(incf offset)
(setf count (get-byte vector offset))
(incf offset)
(setf (values node offset)
(deserialize-transaction-node-vector vector offset))
(if (= count 2)
(let ((old-node (deserialize-transaction-node-vector vector offset)))
;; get local data-pointer to replace the one read from the txn file
(log:debug "FINDING PROPER DATA-POINTER FOR ~A (~A)"
(id old-node) (data-pointer old-node))
(let ((local-old-node (if (vertex-p old-node)
(lookup-vertex (id old-node))
(lookup-edge (id old-node)))))
(log:debug "SETTING DATA-POINTER OF ~A TO ~A"
(id old-node)
(data-pointer local-old-node))
(setf (data-pointer old-node) (data-pointer local-old-node)))
(make-instance class
:node node
:old-node old-node))
(make-instance class :node node))))
;;; Saving tx-writes to a file
(defun save-bytes-component-vector (vector tx)
(push vector (bytes-components tx)))
(defun write-tx-writes-to-stream (tx stream)
(dolist (write (writes tx))
(let ((v (tx-write-vector write)))
(save-bytes-component-vector v tx)
(write-sequence (tx-write-vector write) stream))))
(defun write-tx-header-to-stream (tx stream)
(let ((v (tx-header-vector tx)))
(save-bytes-component-vector v tx)
(write-sequence v stream)))
(defun initialize-bytes-from-components (tx)
(let* ((components (bytes-components tx))
(length (reduce #'+ components :key 'length))
(bytes (make-byte-vector length))
(offset 0))
(dolist (component (reverse components))
(replace bytes component :start1 offset)
(incf offset (length component)))
(setf (bytes-components tx) nil)
(setf (bytes tx) bytes)))
(defgeneric persist-tx (transaction initial-file final-file)
(:documentation
"Persist TRANSACTION to INITIAL-FILE. After the transaction has
been completely written and INITIAL-FILE is closed, INITIAL-FILE is
reanamed to FINAL-FILE. The intent is to make it atomically clear
that the transaction file has a complete complete set of related
changes. After transaction has persisted, its BYTES slot contains a
serialization of the transaction.")
(:method (transaction initial-file final-file)
(with-open-file (stream initial-file :direction :output
:if-exists :error
:element-type '(unsigned-byte 8))
(write-tx-header-to-stream transaction stream)
(write-tx-writes-to-stream transaction stream))
(initialize-bytes-from-components transaction)
(rename-file initial-file final-file)
final-file))
(defun load-tx-header (stream)
(let ((vector (read-uint64-sized-vector stream)))
(when vector
(deserialize-tx-header-vector vector))))
(defun load-one-tx-write (stream)
"Load a single write node from STREAM. Returns NIL if no writes are
left in the stream."
(let ((vector (read-uint64-sized-vector stream)))
(when vector
(deserialize-tx-write-vector vector))))
(defun load-tx-file (file)
(with-open-file (stream file :element-type '(unsigned-byte 8))
(let ((tx-header (load-tx-header stream)))
(setf (writes tx-header)
(loop for i from 0
repeat (write-count tx-header)
for node = (load-one-tx-write stream)
unless node do (error "Too few writes in ~A: expected ~A, ended at ~A"
file
(write-count tx-header)
i)
collect node))
tx-header)))
(defgeneric persistent-transaction-directory (graph)
(:method (graph)
(merge-pathnames "tx/" (location graph))))
(defgeneric transaction-pathname (transaction)
(:method (transaction)
(make-pathname :name (format nil "~16,'0X" (transaction-id transaction))
:type "txn"
:defaults (persistent-transaction-directory
(graph transaction)))))
(defgeneric transaction-temporary-pathname (transaction)
(:method (transaction)
(make-pathname :type "txn-tmp"
:defaults (transaction-pathname transaction))))
(defgeneric persist-transaction (transaction)
(:method (transaction)
(persist-tx transaction
(transaction-temporary-pathname transaction)
(transaction-pathname transaction))
(let* ((transaction-manager (transaction-manager transaction))