Skip to content

Commit

Permalink
Merge pull request #9 from rfhayashi/snapshot-v2
Browse files Browse the repository at this point in the history
Add support for writting and reading snapshots using streams (Klaus will merge)
  • Loading branch information
klauswuestefeld authored Sep 5, 2024
2 parents f187f11 + 5891dbd commit 209236d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 22 deletions.
4 changes: 3 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
com.cognitect.aws/endpoints {:mvn/version "1.1.12.230"}
com.cognitect.aws/dynamodb {:mvn/version "821.2.1107.0"}
com.cognitect.aws/s3 {:mvn/version "822.2.1145.0"}
base64-clj/base64-clj {:mvn/version "0.1.1"}}
base64-clj/base64-clj {:mvn/version "0.1.1"}
com.amazonaws/aws-java-sdk-core {:mvn/version "1.12.388"}
com.amazonaws/aws-java-sdk-s3 {:mvn/version "1.12.388"}}
:aliases {:test {:extra-paths ["test"]
:extra-deps {clj-test-containers/clj-test-containers {:mvn/version "0.7.2"}
org.clojure/test.check {:mvn/version "1.1.1"}
Expand Down
56 changes: 44 additions & 12 deletions src/prevayler_clj_aws/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
[prevayler-clj-aws.util :as util])
(:import
[clojure.lang IDeref]
[java.io ByteArrayOutputStream Closeable]))
[java.io ByteArrayOutputStream Closeable]
[com.amazonaws.services.s3.model GetObjectRequest PutObjectRequest ObjectMetadata]
[com.amazonaws.services.s3 AmazonS3ClientBuilder]))

(defn- marshal [value]
(-> (nippy/freeze value)
Expand All @@ -29,20 +31,48 @@
:Contents
(some #(= snapshot-path (:Key %)))))

(defn- read-snapshot [s3-cli bucket snapshot-path]
(defn- snapshot-v2-path [snapshot-path]
(str snapshot-path "-v2"))

(defn- unmarshal-from-in [in]
(-> in
java.io.BufferedInputStream.
java.io.DataInputStream.
nippy/thaw-from-in!))

(defn- read-object [s3-sdk-cli bucket path unmarshal-fn]
(-> (.getObject s3-sdk-cli (GetObjectRequest. bucket path))
(.getObjectContent)
unmarshal-fn))

(defn- read-snapshot [s3-cli s3-sdk-cli bucket snapshot-path]
(if (snapshot-exists? s3-cli bucket snapshot-path)
(-> (util/aws-invoke s3-cli {:op :GetObject
:request {:Bucket bucket
:Key snapshot-path}})
:Body
unmarshal)
(let [v2-path (snapshot-v2-path snapshot-path)
snap1 (read-object s3-sdk-cli bucket snapshot-path unmarshal)]
(try
(when (snapshot-exists? s3-cli bucket v2-path)
(let [snap2 (read-object s3-sdk-cli bucket v2-path unmarshal-from-in)]
(println "Snapshot v1" (if (= snap1 snap2) "IS" "IS NOT") "equal to v2")))
(catch Exception e
(.printStackTrace e)))
snap1)
{:partkey 0}))

(defn- save-snapshot! [s3-cli bucket snapshot-path snapshot]
(defn- save-snapshot! [s3-cli s3-sdk-cli bucket snapshot-path snapshot]
(util/aws-invoke s3-cli {:op :PutObject
:request {:Bucket bucket
:Key snapshot-path
:Body (marshal snapshot)}}))
:Body (marshal snapshot)}})
(try
(let [v2-path (snapshot-v2-path snapshot-path)
temp-file (java.io.File/createTempFile "snapshot" "")]
(with-open [temp-out (-> (java.io.FileOutputStream. temp-file) java.io.BufferedOutputStream. java.io.DataOutputStream.)]
(nippy/freeze-to-out! temp-out snapshot))
(with-open [temp-in (java.io.BufferedInputStream. (java.io.FileInputStream. temp-file))]
(.putObject s3-sdk-cli (PutObjectRequest. bucket v2-path temp-in (doto (ObjectMetadata.)
(.setContentLength (.length temp-file)))))))
(catch Exception e
(.printStackTrace e))))

(defn- read-items [dynamo-cli table partkey page-size]
(letfn [(read-page [exclusive-start-key]
Expand Down Expand Up @@ -104,13 +134,15 @@
[{:keys [initial-state business-fn timestamp-fn aws-opts]
:or {initial-state {}
timestamp-fn #(System/currentTimeMillis)}}]
(let [{:keys [dynamodb-client s3-client dynamodb-table snapshot-path s3-bucket page-size]
(let [{:keys [dynamodb-client s3-client s3-sdk-cli dynamodb-table snapshot-path s3-bucket page-size]
:or {dynamodb-client (aws/client {:api :dynamodb})
s3-client (aws/client {:api :s3})
s3-sdk-cli (-> (AmazonS3ClientBuilder/standard)
(.build))
snapshot-path "snapshot"
page-size 1000}} aws-opts
_ (println "Reading snapshot bucket...")
{state :state snapshot-index :partkey} (read-snapshot s3-client s3-bucket snapshot-path)
{state :state snapshot-index :partkey} (read-snapshot s3-client s3-sdk-cli s3-bucket snapshot-path)
_ (println "Reading snapshot bucket done.")
state-atom (atom (or state initial-state))
snapshot-index-atom (atom snapshot-index)]
Expand All @@ -124,7 +156,7 @@
(println "Saving snapshot to bucket...")
; Since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state
; and restore events from the previous partkey
(save-snapshot! s3-client s3-bucket snapshot-path {:state @state-atom
(save-snapshot! s3-client s3-sdk-cli s3-bucket snapshot-path {:state @state-atom
:partkey (inc @snapshot-index-atom)})
(println "Snapshot done.")
(swap! snapshot-index-atom inc)
Expand Down
40 changes: 31 additions & 9 deletions test/prevayler_clj_aws/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
[com.gfredericks.test.chuck.generators :as genc]
[clojure.test.check.generators :as gen]
[cognitect.aws.client.api :as aws]
[cognitect.aws.credentials :as credentials]
[meta-merge.core :refer [meta-merge]]
[matcher-combinators.test :refer [match?]]))
[matcher-combinators.test :refer [match?]])
(:import [com.amazonaws.services.s3 AmazonS3ClientBuilder]
[com.amazonaws.client.builder AwsClientBuilder$EndpointConfiguration]))

(defonce localstack-port
(memoize
Expand All @@ -26,9 +29,16 @@
(let [s3-bucket (gen-name)
dynamodb-table (gen-name)
hostname (or (System/getenv "LOCALSTACK_HOST") "localhost")
endpoint-override {:protocol "http" :hostname hostname :port (localstack-port)}
s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override})
dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override})]
port (localstack-port)
endpoint-override {:protocol "http" :hostname hostname :port port}
endpoint (str "http://" hostname ":" port)
credentials-provider (credentials/basic-credentials-provider {:access-key-id "dumb" :secret-access-key "dumb"})
s3-cli (aws/client {:api :s3 :endpoint-override endpoint-override :region "us-east-1" :credentials-provider credentials-provider})
dynamodb-cli (aws/client {:api :dynamodb :endpoint-override endpoint-override :region "us-east-1" :credentials-provider credentials-provider})
s3-sdk-cli (-> (AmazonS3ClientBuilder/standard)
(.withEndpointConfiguration (AwsClientBuilder$EndpointConfiguration. endpoint "us-east-1"))
(.withPathStyleAccessEnabled true)
(.build))]
(util/aws-invoke s3-cli {:op :CreateBucket :request {:Bucket s3-bucket}})
(util/aws-invoke dynamodb-cli {:op :CreateTable :request {:TableName dynamodb-table
:AttributeDefinitions [{:AttributeName "partkey"
Expand All @@ -43,7 +53,8 @@
(meta-merge {:aws-opts {:s3-bucket s3-bucket
:dynamodb-table dynamodb-table
:s3-client s3-cli
:dynamodb-client dynamodb-cli}}
:dynamodb-client dynamodb-cli
:s3-sdk-cli s3-sdk-cli}}
opts)))

(defn prev!
Expand All @@ -66,15 +77,15 @@
(is (= :timestamp
(prevayler/timestamp prevayler)))))

(testing "snapshot is the default snapshot file name"
(testing "snapshot-v2 is the default snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (prev! opts)]
(is (match? [{:Key "snapshot"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "snapshot"} {:Key "snapshot-v2"}] (list-objects s3-client s3-bucket)))))

(testing "can override snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts :aws-opts {:snapshot-path "my-path"})
_ (prev! opts)]
(is (match? [{:Key "my-path"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "my-path"} {:Key "my-path-v2"}] (list-objects s3-client s3-bucket)))))

(testing "default initial state is empty map"
(let [prevayler (prev! (gen-opts))]
Expand Down Expand Up @@ -143,4 +154,15 @@
(prevayler/snapshot! prev1)
(prevayler/snapshot! prev1)
(let [prev2 (prev! (assoc opts :business-fn (constantly "rubbish")))]
(is (= ["A" "B" "C" "D"] @prev2))))))
(is (= ["A" "B" "C" "D"] @prev2)))))

(testing "it generates snapshot v2"
(let [{{:keys [s3-client s3-sdk-cli s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (util/aws-invoke s3-client {:op :PutObject
:request {:Bucket s3-bucket
:Key "snapshot"
:Body (#'core/marshal {:partkey 0
:state :state})}})
prev (prev! opts)] ;; saves snapshot-v2
(is (= :state @prev))
(is (= {:state :state :partkey 1} (#'core/read-object s3-sdk-cli s3-bucket "snapshot-v2" #'core/unmarshal-from-in))))))

0 comments on commit 209236d

Please sign in to comment.