-
Notifications
You must be signed in to change notification settings - Fork 6
/
transaction.lisp
291 lines (269 loc) · 9.71 KB
/
transaction.lisp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
(in-package #:vivace-graph-v2)
(defparameter *current-transaction* nil)
(defparameter *max-log-file-length* 10000000)
(defparameter *file-counter* 0)
(defun print-transaction (tx stream depth)
(declare (ignore depth))
(format stream "#<TX-~A>" (tx-id tx)))
(defstruct (transaction
(:print-function print-transaction)
(:conc-name tx-)
(:predicate transaction?))
(id (make-uuid))
(queue nil)
(rollback nil)
(mailbox (sb-concurrency:make-mailbox))
(thread (current-thread))
(store nil)
(locks nil))
(defun find-newest-snapshot (store)
(let ((snap nil)
(location (if (pathnamep (location store))
(namestring (location store))
(location store))))
(dolist (file (directory (make-pathname :directory location
:name :wild :type :wild)))
(when (and (pathname-match-p file "snap-*")
(or (null snap)
(> (file-write-date file) (file-write-date snap))))
(setq snap file)))
(if snap
(values snap (file-write-date snap))
(values nil nil))))
(defun find-transactions (store timestamp)
(let ((transaction-logs nil)
(location (if (pathnamep (location store))
(namestring (location store))
(location store))))
(format t "Looking for transactions to restore...~%")
(dolist (file (directory (make-pathname :directory location
:name :wild :type :wild)))
(when (and (pathname-match-p file "tx-*")
(or (null timestamp)
(and (numberp timestamp)
(> (file-write-date file) timestamp))))
(format t "Found transaction file ~A~%" file)
(push file transaction-logs)))
(sort transaction-logs
#'(lambda (x y)
(when (and (stringp x) (stringp y))
(let ((pieces-x (cl-ppcre:split "\-" (pathname-name x)))
(pieces-y (cl-ppcre:split "\-" (pathname-name y))))
(or (< (parse-integer (nth 1 pieces-x))
(parse-integer (nth 1 pieces-y)))
(and (= (parse-integer (nth 1 pieces-y))
(parse-integer (nth 1 pieces-x)))
(< (parse-integer (nth 2 pieces-x))
(parse-integer (nth 2 pieces-y))))))))
:key #'namestring)))
(defun replay-transactions (file &optional (store *store*))
(let ((*store* store))
(with-open-file (stream file :element-type '(unsigned-byte 8))
(let ((magic-byte (read-byte stream nil :eof)))
(unless (= +transaction+ magic-byte)
(error 'transaction-error
:reason (format nil "~A is not a tx file!" file)))
(deserialize-action magic-byte stream)))))
(defun restore-triple-store (store)
(let ((*store* store))
(with-locked-index ((main-idx store))
(multiple-value-bind (snapshot-file timestamp)
(find-newest-snapshot store)
(when snapshot-file
(format t "Restoring from snapshot file ~A~%" snapshot-file)
(with-open-file (stream snapshot-file
:element-type '(unsigned-byte 8))
(do ((code (read-byte stream nil :eof)
(read-byte stream nil :eof)))
((or (eql code :eof) (null code) (= code 0)))
(deserialize code stream))))
(dolist (file (find-transactions store timestamp))
(format t "REPLAYING TX ~A~%" file)
(replay-transactions file))
(do-indexing store)
store))))
(defun snapshot (store)
(with-open-file
(stream
(format nil "~A/snap-~A" (location store) (get-universal-time))
:direction :output
:element-type '(unsigned-byte 8)
:if-exists :overwrite
:if-does-not-exist :create)
(with-locked-index ((main-idx store))
(maphash #'(lambda (id triple)
(declare (ignore id))
(when (persistent? triple)
(logger :debug "serializing ~A: ~A"
(triple-id triple) triple)
(serialize triple stream)))
(gethash :id-idx (index-table (main-idx store)))))
(logger :debug "Recording null byte")
(write-byte 0 stream)
(force-output stream)))
(defun roll-logfile (store stream)
(when (and (streamp stream) (open-stream-p stream)) (close stream))
(open (format nil "~A/tx-~A" (location store) (get-universal-time))
:element-type '(unsigned-byte 8)
:direction :output
:if-exists :rename
:if-does-not-exist :create))
(defun set-dirty (store)
(with-open-file (stream (format nil "~A/.dirty" (location store))
:direction :output :if-exists :overwrite
:if-does-not-exist :create)
(format stream "~A" (gettimeofday))))
(defun set-clean (store)
(let ((file (format nil "~A/.dirty" (location store))))
(when (probe-file file)
(delete-file file))))
(defun clear-tx-log (store)
(dolist (file (directory
(make-pathname :directory (location store)
:name :wild :type :wild)))
(when (pathname-match-p file "tx-*")
(delete-file file))))
(defun clear-snapshots (store)
(dolist (file (directory
(make-pathname :directory (location store)
:name :wild :type :wild)))
(when (pathname-match-p file "snap-*")
(delete-file file))))
(defun dump-transaction (stream tx)
(when (and (transaction? tx) (tx-queue tx))
(logger :debug "Dumping tx ~A to ~A" tx stream)
(serialize-action :transaction stream tx)
(force-output stream)))
(defun record-tx (tx store)
(when (and (transaction? tx) (tx-queue tx))
(logger :debug "Recording tx ~A~%" (reverse (tx-queue tx)))
(handler-case
(with-open-file (stream
(format nil "~A/tx-~A-~A" (location store)
(get-universal-time) (incf *file-counter*))
:element-type '(unsigned-byte 8) :direction :output
:if-exists :rename :if-does-not-exist :create)
(set-dirty store)
(dump-transaction stream tx))
(error (c)
(logger :err "Unhandled error in record-tx: ~A" c)))))
(defun stop-logger (store)
(sb-concurrency:send-message (log-mailbox store) :shutdown)
(join-thread (logger-thread store)))
(defun start-logger (store)
(make-thread
#'(lambda ()
(let ((mailbox (sb-concurrency:make-mailbox)) (*file-counter* 0)
(last-snapshot (gettimeofday)))
(setf (log-mailbox store) mailbox)
(loop
(handler-case
(let ((msg (sb-concurrency:receive-message mailbox)))
(logger :debug "tx-log thread received message ~A" msg)
(typecase msg
(transaction (record-tx msg store))
(keyword
(case msg
(:shutdown-and-clear
(clear-tx-log store)
(clear-snapshots store)
(set-clean store)
(quit))
(:shutdown
(logger :debug "Processing all pending messages.")
(dolist
(msg
(sb-concurrency:receive-pending-messages
mailbox))
(logger :debug "Processing message ~A" msg)
(when (transaction? msg)
(record-tx msg store)))
;;(logger :info "Snapshotting the store.")
;;(snapshot store)
;;(logger :info "Marking the store clean.")
;;(set-clean store)
(logger :info "Logger thread quitting.")
(return t))
(:snapshot
(logger :info "Snapshot commencing")
(snapshot store)
(logger :debug "Snapshot complete. Set store CLEAN")
(set-clean store)
(logger :debug "Store set CLEAN")
(setq last-snapshot (gettimeofday))
(logger :info "Snapshot finished"))
(otherwise
(logger :info "Unknown msg to tx-log thread: ~A"
msg))))))
(error (condition)
(logger :err "Unhandled error in tx logger for ~A: ~A"
store condition))))))
:name (format nil "tx-log thread for ~A" store)))
(defun release-all-locks (tx)
(sb-ext:with-locked-hash-table ((locks *store*))
(dolist (pair (tx-locks tx))
(destructuring-bind (pattern-or-triple lock kind) pair
(declare (ignore lock))
(if (triple? pattern-or-triple)
(unlock-triple pattern-or-triple :kind kind)
(funcall #'unlock-pattern
(nth 0 pattern-or-triple)
(nth 1 pattern-or-triple)
(nth 2 pattern-or-triple)
(nth 3 pattern-or-triple)
:kind kind))))))
(defun enqueue-lock (pattern lock kind)
(push (list pattern lock kind) (tx-locks *current-transaction*)))
(defun rollback-tx (tx)
(dolist (fn (reverse (tx-rollback tx)))
(funcall fn)))
(defun execute-tx (store fn timeout max-tries retries)
(if (>= retries max-tries)
(error 'transaction-error
:reason
(format nil
"Unable to execute transaction. Too may retries (~A)."
retries))
(let ((*current-transaction* (make-transaction :store store)))
(logger :debug "~A execute-tx starting" *current-transaction*)
(handler-case
(sb-ext:with-timeout timeout
(funcall fn))
(sb-ext:timeout (condition)
(logger :debug "~A execute-tx timeout ~A"
*current-transaction* condition)
(rollback-tx *current-transaction*)
(release-all-locks *current-transaction*)
(execute-tx store fn timeout max-tries (1+ retries)))
(error (condition)
(logger :debug "~A execute-tx error ~A"
*current-transaction* condition)
(rollback-tx *current-transaction*)
(release-all-locks *current-transaction*)
(error 'transaction-error
:reason
(format nil "Unable to execute transaction: ~A"
condition)))
(:no-error (result)
(logger :debug "~A execute-tx success (~A)"
*current-transaction* result)
(when (tx-queue *current-transaction*)
(sb-concurrency:send-message
(log-mailbox store) *current-transaction*))
(release-all-locks *current-transaction*)
result)))))
(defmacro with-graph-transaction ((store &key (timeout 10) (max-tries 10))
&body body)
;; body must be idempotent!
(with-gensyms (atomic-op)
`(let ((,atomic-op #'(lambda () ,@body)))
(cond ((and (transaction? *current-transaction*)
(equal (store-name (tx-store *current-transaction*))
(store-name ,store)))
(funcall ,atomic-op))
((transaction? *current-transaction*)
(error 'transaction-error
:reason
"Transactions cannot currently span multiple stores."))
(t
(execute-tx ,store ,atomic-op ,timeout ,max-tries 0))))))