Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
klauswuestefeld committed May 21, 2024
1 parent 933da40 commit d1005aa
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 35 deletions.
73 changes: 40 additions & 33 deletions src/prevayler_clj_aws/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -110,41 +110,48 @@
snapshot-path "snapshot"
page-size 1000}} aws-opts
_ (println "Reading snapshot bucket...")
{state :state previous-snapshot-index :partkey} (read-snapshot s3-client s3-bucket snapshot-path)
{state :state snapshot-index :partkey} (read-snapshot s3-client s3-bucket snapshot-path)
_ (println "Reading snapshot bucket done.")
state-atom (atom (or state initial-state))
snapshot-index-atom (atom (inc previous-snapshot-index))
order-atom (atom 0)]

snapshot-index-atom (atom snapshot-index)]

(println "Restoring events...")
(restore-events! dynamodb-client business-fn state-atom dynamodb-table previous-snapshot-index page-size)
(restore-events! dynamodb-client business-fn state-atom dynamodb-table @snapshot-index-atom page-size)
(println "Restoring events done.")

; since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state
; and restore events from the previous partkey
(save-snapshot! s3-client s3-bucket snapshot-path {:state @state-atom :partkey @snapshot-index-atom})
(println "Saving snapshot done.")

(reify
Prevayler
(handle! [this event]
(locking this ; (I)solation: strict serializability.
(let [current-state @state-atom
timestamp (timestamp-fn)
new-state (business-fn current-state event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.)
(when-not (identical? new-state current-state)
(write-event! dynamodb-client dynamodb-table @snapshot-index-atom
(swap! order-atom inc) ; Skips an order number if there is an exception, but that's OK.
[timestamp event (hash new-state)]) ; (D)urability
(reset! state-atom new-state)) ; (A)tomicity
new-state)))

#_(snapshot! [this]
(locking this
(start-new-journal! journal-file data-out-atom @state-atom backup)))

(timestamp [_] (timestamp-fn))

IDeref (deref [_] @state-atom)

Closeable (close [_] (aws/stop dynamodb-client)))))
(let [order-atom (atom 0)
snapshot-fn! (fn []
(println "Saving snapshot to bucket...")
; Since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state
; and restore events from the previous partkey
(save-snapshot! s3-client s3-bucket snapshot-path {:state @state-atom
:partkey (inc @snapshot-index-atom)})
(println "Snapshot done.")
(swap! snapshot-index-atom inc)
(reset! order-atom 0))]

(snapshot-fn!)

(reify
Prevayler
(handle! [this event]
(locking this ; (I)solation: strict serializability.
(let [current-state @state-atom
timestamp (timestamp-fn)
new-state (business-fn current-state event timestamp)] ; (C)onsistency: must be guaranteed by the handler. The event won't be journalled when the handler throws an exception.)
(when-not (identical? new-state current-state)
(write-event! dynamodb-client dynamodb-table @snapshot-index-atom
(swap! order-atom inc) ; Skips an order number if there is an exception, but that's OK.
[timestamp event (hash new-state)]) ; (D)urability
(reset! state-atom new-state)) ; (A)tomicity
new-state)))

(snapshot! [this]
(locking this
(snapshot-fn!)))

(timestamp [_] (timestamp-fn))

IDeref (deref [_] @state-atom)

Closeable (close [_] (aws/stop dynamodb-client))))))
4 changes: 2 additions & 2 deletions test/prevayler_clj_aws/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
dynamodb-table (gen-name)
hostname (or (System/getenv "LOCALSTACK_HOST") "localhost")
endpoint-override {:protocol "http" :hostname hostname :port (localstack-port)}
s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override #_#_:region "us-east-1"})
dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override #_#_:region "us-east-1"})]
s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override})
dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override})]
(util/aws-invoke s3-cli {:op :CreateBucket :request {:Bucket s3-bucket}})
(util/aws-invoke dynamodb-cli {:op :CreateTable :request {:TableName dynamodb-table
:AttributeDefinitions [{:AttributeName "partkey"
Expand Down

0 comments on commit d1005aa

Please sign in to comment.