Skip to content

Commit

Permalink
Merge pull request #11 from nubank/snapshot-streams2
Browse files Browse the repository at this point in the history
Snapshot streams2
  • Loading branch information
NodariAtNubank authored Nov 7, 2024
2 parents 17fffe8 + 8bcf26f commit 95f07b2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 33 deletions.
26 changes: 7 additions & 19 deletions src/prevayler_clj_aws/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,12 @@
unmarshal-fn))

(defn- read-snapshot [s3-cli s3-sdk-cli bucket snapshot-path]
(if (snapshot-exists? s3-cli bucket snapshot-path)
(let [v2-path (snapshot-v2-path snapshot-path)
snap1 (read-object s3-sdk-cli bucket snapshot-path unmarshal)]
(try
(if (snapshot-exists? s3-cli bucket v2-path)
(let [snap2 (read-object s3-sdk-cli bucket v2-path unmarshal-from-in)]
(println "Snapshot v1" (if (= snap1 snap2) "IS" "IS NOT") "equal to v2"))
(println v2-path "object not found in bucket."))
(catch Exception e
(.printStackTrace e)))
snap1)
{:partkey 0}))

(defn- save-snapshot! [s3-cli s3-sdk-cli bucket snapshot-path snapshot]
(util/aws-invoke s3-cli {:op :PutObject
:request {:Bucket bucket
:Key snapshot-path
:Body (marshal snapshot)}})
(let [v2-path (snapshot-v2-path snapshot-path)]
(if (snapshot-exists? s3-cli bucket v2-path)
(read-object s3-sdk-cli bucket v2-path unmarshal-from-in)
{:partkey 0})))

(defn- save-snapshot! [s3-sdk-cli bucket snapshot-path snapshot]
(try
(let [v2-path (snapshot-v2-path snapshot-path)
temp-file (java.io.File/createTempFile "snapshot" "")] ; We use an intermediary file to easily determine the length of the stream. Otherwise, to determine its length, Amazon's SDK would buffer the entire stream in RAM, defeating our purpose.
Expand Down Expand Up @@ -160,7 +148,7 @@
(println "Saving snapshot to bucket...")
; Since s3 update is atomic, if saving snapshot fails next prevayler will pick the previous state
; and restore events from the previous partkey
(save-snapshot! s3-client s3-sdk-cli s3-bucket snapshot-path {:state @state-atom
(save-snapshot! s3-sdk-cli s3-bucket snapshot-path {:state @state-atom
:partkey (inc @snapshot-index-atom)})
(println "Snapshot done.")
(swap! snapshot-index-atom inc)
Expand Down
17 changes: 3 additions & 14 deletions test/prevayler_clj_aws/core_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@
(testing "snapshot-v2 is the default snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (prev! opts)]
(is (match? [{:Key "snapshot"} {:Key "snapshot-v2"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "snapshot-v2"}] (list-objects s3-client s3-bucket)))))

(testing "can override snapshot file name"
(let [{{:keys [s3-client s3-bucket]} :aws-opts :as opts} (gen-opts :aws-opts {:snapshot-path "my-path"})
_ (prev! opts)]
(is (match? [{:Key "my-path"} {:Key "my-path-v2"}] (list-objects s3-client s3-bucket)))))
(is (match? [{:Key "my-path-v2"}] (list-objects s3-client s3-bucket)))))

(testing "default initial state is empty map"
(let [prevayler (prev! (gen-opts))]
Expand Down Expand Up @@ -154,15 +154,4 @@
(prevayler/snapshot! prev1)
(prevayler/snapshot! prev1)
(let [prev2 (prev! (assoc opts :business-fn (constantly "rubbish")))]
(is (= ["A" "B" "C" "D"] @prev2)))))

(testing "it generates snapshot v2"
(let [{{:keys [s3-client s3-sdk-cli s3-bucket]} :aws-opts :as opts} (gen-opts)
_ (util/aws-invoke s3-client {:op :PutObject
:request {:Bucket s3-bucket
:Key "snapshot"
:Body (#'core/marshal {:partkey 0
:state :state})}})
prev (prev! opts)] ;; saves snapshot-v2
(is (= :state @prev))
(is (= {:state :state :partkey 1} (#'core/read-object s3-sdk-cli s3-bucket "snapshot-v2" #'core/unmarshal-from-in))))))
(is (= ["A" "B" "C" "D"] @prev2))))))

0 comments on commit 95f07b2

Please sign in to comment.