Skip to content

Commit

Permalink
Protect datalog cache from early cancellation (#657)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwwoelfel authored Jan 2, 2025
1 parent 2a70a2d commit 6f24813
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 34 deletions.
2 changes: 1 addition & 1 deletion server/src/instant/jdbc/failover.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
(Thread/sleep 2500)
(println "Canceling in-progress transactions"
(count @(:stmts sql/default-statement-tracker)))
(sql/cancel-in-progress @(:stmts sql/default-statement-tracker))
(sql/cancel-in-progress sql/default-statement-tracker)
;; Create a transaction we can use as a proxy for everything syncing over to
;; the new instance
(let [tx (transaction-model/create! aurora/-conn-pool
Expand Down
18 changes: 16 additions & 2 deletions server/src/instant/jdbc/sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,22 @@
(when remove (remove rw cancelable)))
:stmts stmts}))

(defn cancel-in-progress [stmts]
(doseq [stmt stmts]
(defn make-top-level-statement-tracker
"Creates a statement tracker that ignores all intermediate trackers, except
for the top-level default tracker."
[]
(let [{:keys [add remove]} default-statement-tracker
stmts (atom #{})]
{:add (fn [rw cancelable]
(swap! stmts conj cancelable)
(when add (add rw cancelable)))
:remove (fn [rw cancelable]
(swap! stmts disj cancelable)
(when remove (remove rw cancelable)))
:stmts stmts}))

(defn cancel-in-progress [{:keys [stmts]}]
(doseq [stmt @stmts]
(cancel stmt)))

(defn register-in-progress
Expand Down
4 changes: 1 addition & 3 deletions server/src/instant/reactive/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
"Returns the result of a datalog query. Leverages atom and
delay to ensure queries are only run once in the face of concurrent requests."
[store-conn {:keys [app-id] :as ctx} datalog-query]
(let [delayed-call (delay (d/query ctx datalog-query))
delayed (rs/swap-datalog-cache-delay! store-conn app-id datalog-query delayed-call)]
@delayed))
(rs/swap-datalog-cache! store-conn app-id d/query ctx datalog-query))

(comment
(def ctx {:db {:conn-pool (aurora/conn-pool)}
Expand Down
14 changes: 7 additions & 7 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -476,19 +476,19 @@
debug-info)))
pending-handler {:future event-fut
:op (:op event)
:in-progress-stmts (:stmts in-progress-stmts)
:in-progress-stmts in-progress-stmts
:silence-exceptions silence-exceptions}]
(swap! pending-handlers conj pending-handler)
(tracer/add-data! {:attributes {:concurrent-handler-count (count @pending-handlers)}})
(try
(let [ret (deref event-fut handle-receive-timeout-ms :timeout)]
(when (= :timeout ret)
(let [in-progress @(:stmts in-progress-stmts)
_ (sql/cancel-in-progress in-progress)
(let [in-progress-count (count @(:stmts in-progress-stmts))
_ (sql/cancel-in-progress in-progress-stmts)
cancel-res (future-cancel event-fut)]
(tracer/add-data! {:attributes
{:timedout true
:in-progress-query-count (count in-progress)
:in-progress-query-count in-progress-count
;; If false, then canceling the queries let
;; the future complete before we could cancel it
:future-cancel-result cancel-res}}))
Expand Down Expand Up @@ -663,12 +663,12 @@
future
silence-exceptions
in-progress-stmts]} @pending-handlers
:let [in-progress @in-progress-stmts]]
:let [in-progress-count (count @(:stmts in-progress-stmts))]]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op
:in-progress-query-count (count in-progress)}}
:in-progress-query-count in-progress-count}}
(silence-exceptions true)
(sql/cancel-in-progress in-progress)
(sql/cancel-in-progress in-progress-stmts)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
Expand Down
126 changes: 109 additions & 17 deletions server/src/instant/reactive/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
(:require
[clojure.string :as string]
[datascript.core :as d]
[instant.util.coll :as ucoll]
[instant.jdbc.sql :as sql]
[instant.lib.ring.websocket :as ws]
[instant.util.tracer :as tracer]
[instant.util.exception :as ex]))
[instant.util.async :as ua]
[instant.util.coll :as ucoll]
[instant.util.exception :as ex]
[instant.util.tracer :as tracer])
(:import
(java.lang InterruptedException)
(java.util.concurrent CancellationException)))

(declare store-conn)

Expand Down Expand Up @@ -342,23 +347,110 @@
;; ------
;; datalog cache

(defn swap-datalog-cache-delay! [conn app-id datalog-query delayed-call]
(defn swap-datalog-cache! [conn app-id datalog-query-fn ctx datalog-query]
(let [lookup-ref [:datalog-query/app-id+query [app-id datalog-query]]

watcher-id (Object.)
this-result-delay (atom { ;; Promise holds the result of the query
:promise (promise)
;; Watchers keep track of who started listening
;; while the query was running, so that we can
;; safely cancel the query if all listeners cancel
:watchers #{watcher-id}
:cancel-signal (promise)
:aborted? false})
{:keys [db-after]}
(transact! "store/swap-datalog-cache-delay!"
(transact! "store/swap-datalog-cache!"
conn
[[:db.fn/call (fn [db]
(if-let [existing (d/entity db lookup-ref)]
(when-not (:datalog-query/delayed-call existing)
[[:db/add
(:db/id existing)
:datalog-query/delayed-call delayed-call]])
[{:datalog-query/app-id app-id
:datalog-query/query datalog-query
:datalog-query/delayed-call delayed-call}]))]])]

(:datalog-query/delayed-call (d/entity db-after lookup-ref))))
[[:db.fn/call
(fn [db]
(if-let [existing (d/entity db lookup-ref)]
(if (not (:datalog-query/delayed-call existing))
[[:db/add
(:db/id existing)
:datalog-query/delayed-call this-result-delay]]
(let [{:keys [watchers]}
(swap! (:datalog-query/delayed-call existing)
(fn [state]
(if (:aborted? state)
state
(update state :watchers conj watcher-id))))]
(when-not (contains? watchers watcher-id)
[[:db/add
(:db/id existing)
:datalog-query/delayed-call this-result-delay]])))
[{:datalog-query/app-id app-id
:datalog-query/query datalog-query
:datalog-query/delayed-call this-result-delay}]))]])
result-delay (:datalog-query/delayed-call (d/entity db-after lookup-ref))
unwrap-result (fn []
(let [res @(:promise @result-delay)]
(assert (:result res) "Missing result")
(if (:ok res)
(:result res)
(throw (:result res)))))

cancel! (fn []
(tracer/with-span! {:name "store/datalog-query-cancel!"}
(deliver (:cancel-signal @result-delay) true)))]

(tracer/add-data! {:attributes {:cache-hit (not= this-result-delay result-delay)
:realized (realized? (:promise @result-delay))}})

(when (= this-result-delay result-delay)
;; We added it, so we must execute it
(let [stmt-tracker (sql/make-top-level-statement-tracker)
result-promise (:promise @result-delay)
work-fut (binding [ua/*child-vfutures* nil ;; Move future to a new "call-stack"
;; Don't let our statements get canceled
sql/*in-progress-stmts* stmt-tracker]
(ua/vfuture
(try
(deliver result-promise
{:ok true
:result (datalog-query-fn ctx
datalog-query)})
(catch Throwable t
(deliver result-promise
{:ok false
:result t}))
(finally
;; noop if we already delivered
(deliver result-promise
{:ok false
:result
(Exception. "Did not deliver promise!")})
(deliver (:cancel-signal @result-delay)
false)))))
_cancel-fut (binding [ua/*child-vfutures* nil]
(ua/vfuture
(when @(:cancel-signal @result-delay)
(sql/cancel-in-progress stmt-tracker)
(future-cancel work-fut))))]))
(try
(if (realized? (:promise @result-delay))
;; The work is already done, so we don't need to listen for cancellation
(unwrap-result)
;; Start a tracked future to watch for cancelation
(let [wait-fut (ua/vfuture (unwrap-result))]
(try
@wait-fut
(catch Throwable t
(when (and (not (realized? (:promise @result-delay)))
(or (instance? InterruptedException t)
(instance? CancellationException t)))
(let [{:keys [aborted?]}
(swap! result-delay
(fn [{:keys [watchers] :as state}]
(let [new-watchers (disj watchers watcher-id)]
(cond-> state
true (assoc :watchers new-watchers)
(empty? new-watchers) (assoc :aborted? true)))))]

(when aborted?
(cancel!))))
(throw t)))))
(finally
(swap! result-delay update :watchers disj watcher-id)))))

;; --------------
;; datalog loader
Expand Down
2 changes: 1 addition & 1 deletion server/src/instant/util/logging_exporter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"handle-refresh/send-event!"
"store/record-datalog-query-finish!"
"store/record-datalog-query-start!"
"store/swap-datalog-cache-delay!"
"store/swap-datalog-cache!"
"store/bump-instaql-version!"
"store/add-instaql-query!") true

Expand Down
2 changes: 1 addition & 1 deletion server/test/instant/jdbc/sql_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
1000)
(is (= 1 (count @(:stmts in-progress))))
(is (not (future-done? query)))
(sql/cancel-in-progress @(:stmts in-progress))
(sql/cancel-in-progress in-progress)
(wait-for (fn []
(future-done? query))
1000)
Expand Down
109 changes: 107 additions & 2 deletions server/test/instant/reactive/store_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns instant.reactive.store-test
(:require
[clojure.test :as test :refer [deftest is]]
[instant.reactive.store :as rs]))
[clojure.test :as test :refer [deftest is testing]]
[instant.reactive.store :as rs]
[instant.util.async :as ua]))

(deftest match-topic?
(is (true?
Expand All @@ -21,5 +22,109 @@
'[:eav #{3} _ _]
'[:eav #{1} #{2} #{3}]))))

(deftest swap-datalog-cache!
(let [store (rs/init-store)
app-id (random-uuid)]
(testing "store returns cached data"
(let [q [[:ea (random-uuid)]]]
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:a)
nil
q)))
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:b)
nil
q)))))

(testing "store returns cached data with delay"
(let [q [[:ea (random-uuid)]]]
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(Thread/sleep 100)
:a)
nil
q)))
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:b)
nil
q)))))

(testing "work is canceled with no listeners"
(let [q [[:ea (random-uuid)]]
err (promise)
started (promise)
f1 (ua/vfuture (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(try
(deliver started true)
@(promise)
(catch Throwable t
(deliver err t))))
nil
q))]
@started
(future-cancel f1)
(is (instance? java.lang.InterruptedException (deref err 100 :timeout)))))

(dotimes [x 100]
(testing "work isn't canceled if there are still listeners"
(let [q [[:ea (random-uuid)]]
err (promise)
started (promise)
wait (promise)
f1 (ua/vfuture (try (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(deliver started true)
;; It looks like the promise is getting canceled somehow?
@wait)
nil
q)
(catch Throwable t
(deliver err t))))
_ @started
f2 (ua/vfuture (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
@wait)
nil
q))]
(future-cancel f1)
(is (or (instance? java.lang.InterruptedException (deref err 100 :timeout))
(instance? java.util.concurrent.CancellationException (deref err 100 :timeout))))
(Thread/sleep 10)
(deliver wait :a)
(is (= (deref f2 100 :timeout) :a)))))

(testing "propagates failures"
(let [q [[:ea (random-uuid)]]
r1 (try (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(throw (Exception. "oops")))
nil
q)
(catch Exception e
e))]

(is (instance? Exception
r1))

(is (thrown? Exception
(rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:shouldn't-be-executed)
nil
q)))))))

(comment
(test/run-tests *ns*))

0 comments on commit 6f24813

Please sign in to comment.