Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect datalog cache from early cancellation #657

Merged
merged 6 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/instant/jdbc/failover.clj
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
(Thread/sleep 2500)
(println "Canceling in-progress transactions"
(count @(:stmts sql/default-statement-tracker)))
(sql/cancel-in-progress @(:stmts sql/default-statement-tracker))
(sql/cancel-in-progress sql/default-statement-tracker)
;; Create a transaction we can use as a proxy for everything syncing over to
;; the new instance
(let [tx (transaction-model/create! aurora/-conn-pool
Expand Down
18 changes: 16 additions & 2 deletions server/src/instant/jdbc/sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,22 @@
(when remove (remove rw cancelable)))
:stmts stmts}))

(defn cancel-in-progress [stmts]
(doseq [stmt stmts]
(defn make-top-level-statement-tracker
"Creates a statement tracker that ignores all intermediate trackers, except
for the top-level default tracker."
[]
(let [{:keys [add remove]} default-statement-tracker
stmts (atom #{})]
{:add (fn [rw cancelable]
(swap! stmts conj cancelable)
(when add (add rw cancelable)))
:remove (fn [rw cancelable]
(swap! stmts disj cancelable)
(when remove (remove rw cancelable)))
:stmts stmts}))

(defn cancel-in-progress [{:keys [stmts]}]
(doseq [stmt @stmts]
(cancel stmt)))

(defn register-in-progress
Expand Down
4 changes: 1 addition & 3 deletions server/src/instant/reactive/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@
"Returns the result of a datalog query. Leverages atom and
delay to ensure queries are only run once in the face of concurrent requests."
[store-conn {:keys [app-id] :as ctx} datalog-query]
(let [delayed-call (delay (d/query ctx datalog-query))
delayed (rs/swap-datalog-cache-delay! store-conn app-id datalog-query delayed-call)]
@delayed))
(rs/swap-datalog-cache! store-conn app-id d/query ctx datalog-query))

(comment
(def ctx {:db {:conn-pool (aurora/conn-pool)}
Expand Down
14 changes: 7 additions & 7 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -476,19 +476,19 @@
debug-info)))
pending-handler {:future event-fut
:op (:op event)
:in-progress-stmts (:stmts in-progress-stmts)
:in-progress-stmts in-progress-stmts
:silence-exceptions silence-exceptions}]
(swap! pending-handlers conj pending-handler)
(tracer/add-data! {:attributes {:concurrent-handler-count (count @pending-handlers)}})
(try
(let [ret (deref event-fut handle-receive-timeout-ms :timeout)]
(when (= :timeout ret)
(let [in-progress @(:stmts in-progress-stmts)
_ (sql/cancel-in-progress in-progress)
(let [in-progress-count (count @(:stmts in-progress-stmts))
_ (sql/cancel-in-progress in-progress-stmts)
cancel-res (future-cancel event-fut)]
(tracer/add-data! {:attributes
{:timedout true
:in-progress-query-count (count in-progress)
:in-progress-query-count in-progress-count
;; If false, then canceling the queries let
;; the future complete before we could cancel it
:future-cancel-result cancel-res}}))
Expand Down Expand Up @@ -663,12 +663,12 @@
future
silence-exceptions
in-progress-stmts]} @pending-handlers
:let [in-progress @in-progress-stmts]]
:let [in-progress-count (count @(:stmts in-progress-stmts))]]
(tracer/with-span! {:name "cancel-pending-handler"
:attributes {:op op
:in-progress-query-count (count in-progress)}}
:in-progress-query-count in-progress-count}}
(silence-exceptions true)
(sql/cancel-in-progress in-progress)
(sql/cancel-in-progress in-progress-stmts)
(future-cancel future)))

(let [app-id (-> (rs/get-auth @store-conn id)
Expand Down
128 changes: 111 additions & 17 deletions server/src/instant/reactive/store.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
(:require
[clojure.string :as string]
[datascript.core :as d]
[instant.util.coll :as ucoll]
[instant.jdbc.sql :as sql]
[instant.lib.ring.websocket :as ws]
[instant.util.tracer :as tracer]
[instant.util.exception :as ex]))
[instant.util.async :as ua]
[instant.util.coll :as ucoll]
[instant.util.exception :as ex]
[instant.util.tracer :as tracer])
(:import
(java.lang InterruptedException)
(java.util.concurrent CancellationException)))

(declare store-conn)

Expand Down Expand Up @@ -342,23 +347,112 @@
;; ------
;; datalog cache

(defn swap-datalog-cache-delay! [conn app-id datalog-query delayed-call]
(defn swap-datalog-cache! [conn app-id datalog-query-fn ctx datalog-query]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤯 I salute you sir.

(let [lookup-ref [:datalog-query/app-id+query [app-id datalog-query]]

watcher-id (Object.)
this-result-delay (atom { ;; Promise holds the result of the query
:promise (promise)
;; Watchers keep track of who started listening
;; while the query was running, so that we can
;; safely cancel the query if all listeners cancel
:watchers #{watcher-id}
:cancel-signal (promise)
:aborted? false})
{:keys [db-after]}
(transact! "store/swap-datalog-cache-delay!"
(transact! "store/swap-datalog-cache!"
conn
[[:db.fn/call (fn [db]
(if-let [existing (d/entity db lookup-ref)]
(when-not (:datalog-query/delayed-call existing)
[[:db/add
(:db/id existing)
:datalog-query/delayed-call delayed-call]])
[{:datalog-query/app-id app-id
:datalog-query/query datalog-query
:datalog-query/delayed-call delayed-call}]))]])]

(:datalog-query/delayed-call (d/entity db-after lookup-ref))))
[[:db.fn/call
(fn [db]
(if-let [existing (d/entity db lookup-ref)]
(if (not (:datalog-query/delayed-call existing))
[[:db/add
(:db/id existing)
:datalog-query/delayed-call this-result-delay]]
(let [{:keys [watchers]}
(swap! (:datalog-query/delayed-call existing)
(fn [state]
(if (:aborted? state)
state
(update state :watchers conj watcher-id))))]
(when-not (contains? watchers watcher-id)
[[:db/add
(:db/id existing)
:datalog-query/delayed-call this-result-delay]])))
[{:datalog-query/app-id app-id
:datalog-query/query datalog-query
:datalog-query/delayed-call this-result-delay}]))]])
result-delay (:datalog-query/delayed-call (d/entity db-after lookup-ref))
unwrap-result (fn []
(let [res @(:promise @result-delay)]
(assert (:result res) "Missing result")
(if (:ok res)
(:result res)
(throw (:result res)))))

cancel! (fn []
(tracer/with-span! {:name "store/datalog-query-cancel!"}
(deliver (:cancel-signal @result-delay) true)))]

(tracer/add-data! {:attributes {:cache-hit (not= this-result-delay result-delay)
:realized (realized? (:promise @result-delay))}})

(when (= this-result-delay result-delay)
;; We added it, so we must execute it
(let [stmt-tracker (sql/make-top-level-statement-tracker)
result-promise (:promise @result-delay)
work-fut (binding [ua/*child-vfutures* nil ;; Move future to a new "call-stack"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Man, bind + statement-tracker makes for a really powerful abstraction.

;; Don't let our statements get canceled
sql/*in-progress-stmts* stmt-tracker]
(ua/vfuture
(try
(deliver result-promise
{:ok true
:result (datalog-query-fn ctx
datalog-query)})
(catch Throwable t
(deliver result-promise
{:ok false
:result t}))
(finally
;; noop if we already delivered
(deliver result-promise
{:ok false
:result
(Exception. "Did not deliver promise!")})
(deliver (:cancel-signal @result-delay)
false)))))
_cancel-fut (binding [ua/*child-vfutures* nil]
(ua/vfuture
(when @(:cancel-signal @result-delay)
(sql/cancel-in-progress stmt-tracker)
(future-cancel work-fut))))]))
(try
(if (realized? (:promise @result-delay))
;; The work is already done, so we don't need to listen for cancellation
(unwrap-result)
;; Start a tracked future to watch for cancelation
(let [wait-fut
(ua/vfuture
(try
(unwrap-result)
(catch Throwable t
(when (and (not (realized? (:promise @result-delay)))
(or (instance? InterruptedException t)
(instance? CancellationException t)))
(let [{:keys [aborted?]}
(swap! result-delay
(fn [{:keys [watchers] :as state}]
(let [new-watchers (disj watchers watcher-id)]
(cond-> state
true (assoc :watchers new-watchers)
(empty? new-watchers) (assoc :aborted? true)))))]

(when aborted?
(cancel!))))
(throw t))))]
@wait-fut))
(finally
(swap! result-delay update :watchers disj watcher-id)))))

;; --------------
;; datalog loader
Expand Down
2 changes: 1 addition & 1 deletion server/src/instant/util/logging_exporter.clj
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"handle-refresh/send-event!"
"store/record-datalog-query-finish!"
"store/record-datalog-query-start!"
"store/swap-datalog-cache-delay!"
"store/swap-datalog-cache!"
"store/bump-instaql-version!"
"store/add-instaql-query!") true

Expand Down
2 changes: 1 addition & 1 deletion server/test/instant/jdbc/sql_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
1000)
(is (= 1 (count @(:stmts in-progress))))
(is (not (future-done? query)))
(sql/cancel-in-progress @(:stmts in-progress))
(sql/cancel-in-progress in-progress)
(wait-for (fn []
(future-done? query))
1000)
Expand Down
109 changes: 107 additions & 2 deletions server/test/instant/reactive/store_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
(ns instant.reactive.store-test
(:require
[clojure.test :as test :refer [deftest is]]
[instant.reactive.store :as rs]))
[clojure.test :as test :refer [deftest is testing]]
[instant.reactive.store :as rs]
[instant.util.async :as ua]))

(deftest match-topic?
(is (true?
Expand All @@ -21,5 +22,109 @@
'[:eav #{3} _ _]
'[:eav #{1} #{2} #{3}]))))

(deftest swap-datalog-cache!
(let [store (rs/init-store)
app-id (random-uuid)]
(testing "store returns cached data"
(let [q [[:ea (random-uuid)]]]
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:a)
nil
q)))
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:b)
nil
q)))))

(testing "store returns cached data with delay"
(let [q [[:ea (random-uuid)]]]
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(Thread/sleep 100)
:a)
nil
q)))
(is (= :a (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:b)
nil
q)))))

(testing "work is canceled with no listeners"
(let [q [[:ea (random-uuid)]]
err (promise)
started (promise)
f1 (ua/vfuture (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(try
(deliver started true)
@(promise)
(catch Throwable t
(deliver err t))))
nil
q))]
@started
(future-cancel f1)
(is (instance? java.lang.InterruptedException (deref err 100 :timeout)))))

Check failure on line 75 in server/test/instant/reactive/store_test.clj

View workflow job for this annotation

GitHub Actions / clj-check

Test failure

Check failure on line 75 in server/test/instant/reactive/store_test.clj

View workflow job for this annotation

GitHub Actions / clj-check

Test failure
stopachka marked this conversation as resolved.
Show resolved Hide resolved

(dotimes [x 100]
(testing "work isn't canceled if there are still listeners"
(let [q [[:ea (random-uuid)]]
err (promise)
started (promise)
wait (promise)
f1 (ua/vfuture (try (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(deliver started true)
;; It looks like the promise is getting canceled somehow?
@wait)
nil
q)
(catch Throwable t
(deliver err t))))
_ @started
f2 (ua/vfuture (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
@wait)
nil
q))]
(future-cancel f1)
(is (or (instance? java.lang.InterruptedException (deref err 100 :timeout))
(instance? java.util.concurrent.CancellationException (deref err 100 :timeout))))
(Thread/sleep 10)
(deliver wait :a)
(is (= (deref f2 100 :timeout) :a)))))

(testing "propagates failures"
(let [q [[:ea (random-uuid)]]
r1 (try (rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
(throw (Exception. "oops")))
nil
q)
(catch Exception e
e))]

(is (instance? Exception
r1))

(is (thrown? Exception
(rs/swap-datalog-cache! store
app-id
(fn [_ctx _query]
:shouldn't-be-executed)
nil
q)))))))

(comment
(test/run-tests *ns*))
Loading