diff --git a/server/src/instant/jdbc/failover.clj b/server/src/instant/jdbc/failover.clj index b2d3b4c2..24e10062 100644 --- a/server/src/instant/jdbc/failover.clj +++ b/server/src/instant/jdbc/failover.clj @@ -41,7 +41,7 @@ (Thread/sleep 2500) (println "Canceling in-progress transactions" (count @(:stmts sql/default-statement-tracker))) - (sql/cancel-in-progress @(:stmts sql/default-statement-tracker)) + (sql/cancel-in-progress sql/default-statement-tracker) ;; Create a transaction we can use as a proxy for everything syncing over to ;; the new instance (let [tx (transaction-model/create! aurora/-conn-pool diff --git a/server/src/instant/jdbc/sql.clj b/server/src/instant/jdbc/sql.clj index 53a85dc1..dad3838b 100644 --- a/server/src/instant/jdbc/sql.clj +++ b/server/src/instant/jdbc/sql.clj @@ -167,8 +167,22 @@ (when remove (remove rw cancelable))) :stmts stmts})) -(defn cancel-in-progress [stmts] - (doseq [stmt stmts] +(defn make-top-level-statement-tracker + "Creates a statement tracker that ignores all intermediate trackers, except + for the top-level default tracker." + [] + (let [{:keys [add remove]} default-statement-tracker + stmts (atom #{})] + {:add (fn [rw cancelable] + (swap! stmts conj cancelable) + (when add (add rw cancelable))) + :remove (fn [rw cancelable] + (swap! stmts disj cancelable) + (when remove (remove rw cancelable))) + :stmts stmts})) + +(defn cancel-in-progress [{:keys [stmts]}] + (doseq [stmt @stmts] (cancel stmt))) (defn register-in-progress diff --git a/server/src/instant/reactive/query.clj b/server/src/instant/reactive/query.clj index 9529886f..d316e24b 100644 --- a/server/src/instant/reactive/query.clj +++ b/server/src/instant/reactive/query.clj @@ -19,9 +19,7 @@ "Returns the result of a datalog query. Leverages atom and delay to ensure queries are only run once in the face of concurrent requests." [store-conn {:keys [app-id] :as ctx} datalog-query] - (let [delayed-call (delay (d/query ctx datalog-query)) - delayed (rs/swap-datalog-cache-delay! store-conn app-id datalog-query delayed-call)] - @delayed)) + (rs/swap-datalog-cache! store-conn app-id d/query ctx datalog-query)) (comment (def ctx {:db {:conn-pool (aurora/conn-pool)} diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index cd72df7d..d5396545 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -476,19 +476,19 @@ debug-info))) pending-handler {:future event-fut :op (:op event) - :in-progress-stmts (:stmts in-progress-stmts) + :in-progress-stmts in-progress-stmts :silence-exceptions silence-exceptions}] (swap! pending-handlers conj pending-handler) (tracer/add-data! {:attributes {:concurrent-handler-count (count @pending-handlers)}}) (try (let [ret (deref event-fut handle-receive-timeout-ms :timeout)] (when (= :timeout ret) - (let [in-progress @(:stmts in-progress-stmts) - _ (sql/cancel-in-progress in-progress) + (let [in-progress-count (count @(:stmts in-progress-stmts)) + _ (sql/cancel-in-progress in-progress-stmts) cancel-res (future-cancel event-fut)] (tracer/add-data! {:attributes {:timedout true - :in-progress-query-count (count in-progress) + :in-progress-query-count in-progress-count ;; If false, then canceling the queries let ;; the future complete before we could cancel it :future-cancel-result cancel-res}})) @@ -663,12 +663,12 @@ future silence-exceptions in-progress-stmts]} @pending-handlers - :let [in-progress @in-progress-stmts]] + :let [in-progress-count (count @(:stmts in-progress-stmts))]] (tracer/with-span! {:name "cancel-pending-handler" :attributes {:op op - :in-progress-query-count (count in-progress)}} + :in-progress-query-count in-progress-count}} (silence-exceptions true) - (sql/cancel-in-progress in-progress) + (sql/cancel-in-progress in-progress-stmts) (future-cancel future))) (let [app-id (-> (rs/get-auth @store-conn id) diff --git a/server/src/instant/reactive/store.clj b/server/src/instant/reactive/store.clj index c97b5077..92f9b470 100644 --- a/server/src/instant/reactive/store.clj +++ b/server/src/instant/reactive/store.clj @@ -17,10 +17,15 @@ (:require [clojure.string :as string] [datascript.core :as d] - [instant.util.coll :as ucoll] + [instant.jdbc.sql :as sql] [instant.lib.ring.websocket :as ws] - [instant.util.tracer :as tracer] - [instant.util.exception :as ex])) + [instant.util.async :as ua] + [instant.util.coll :as ucoll] + [instant.util.exception :as ex] + [instant.util.tracer :as tracer]) + (:import + (java.lang InterruptedException) + (java.util.concurrent CancellationException))) (declare store-conn) @@ -342,23 +347,110 @@ ;; ------ ;; datalog cache -(defn swap-datalog-cache-delay! [conn app-id datalog-query delayed-call] +(defn swap-datalog-cache! [conn app-id datalog-query-fn ctx datalog-query] (let [lookup-ref [:datalog-query/app-id+query [app-id datalog-query]] - + watcher-id (Object.) + this-result-delay (atom { ;; Promise holds the result of the query + :promise (promise) + ;; Watchers keep track of who started listening + ;; while the query was running, so that we can + ;; safely cancel the query if all listeners cancel + :watchers #{watcher-id} + :cancel-signal (promise) + :aborted? false}) {:keys [db-after]} - (transact! "store/swap-datalog-cache-delay!" + (transact! "store/swap-datalog-cache!" conn - [[:db.fn/call (fn [db] - (if-let [existing (d/entity db lookup-ref)] - (when-not (:datalog-query/delayed-call existing) - [[:db/add - (:db/id existing) - :datalog-query/delayed-call delayed-call]]) - [{:datalog-query/app-id app-id - :datalog-query/query datalog-query - :datalog-query/delayed-call delayed-call}]))]])] - - (:datalog-query/delayed-call (d/entity db-after lookup-ref)))) + [[:db.fn/call + (fn [db] + (if-let [existing (d/entity db lookup-ref)] + (if (not (:datalog-query/delayed-call existing)) + [[:db/add + (:db/id existing) + :datalog-query/delayed-call this-result-delay]] + (let [{:keys [watchers]} + (swap! (:datalog-query/delayed-call existing) + (fn [state] + (if (:aborted? state) + state + (update state :watchers conj watcher-id))))] + (when-not (contains? watchers watcher-id) + [[:db/add + (:db/id existing) + :datalog-query/delayed-call this-result-delay]]))) + [{:datalog-query/app-id app-id + :datalog-query/query datalog-query + :datalog-query/delayed-call this-result-delay}]))]]) + result-delay (:datalog-query/delayed-call (d/entity db-after lookup-ref)) + unwrap-result (fn [] + (let [res @(:promise @result-delay)] + (assert (:result res) "Missing result") + (if (:ok res) + (:result res) + (throw (:result res))))) + + cancel! (fn [] + (tracer/with-span! {:name "store/datalog-query-cancel!"} + (deliver (:cancel-signal @result-delay) true)))] + + (tracer/add-data! {:attributes {:cache-hit (not= this-result-delay result-delay) + :realized (realized? (:promise @result-delay))}}) + + (when (= this-result-delay result-delay) + ;; We added it, so we must execute it + (let [stmt-tracker (sql/make-top-level-statement-tracker) + result-promise (:promise @result-delay) + work-fut (binding [ua/*child-vfutures* nil ;; Move future to a new "call-stack" + ;; Don't let our statements get canceled + sql/*in-progress-stmts* stmt-tracker] + (ua/vfuture + (try + (deliver result-promise + {:ok true + :result (datalog-query-fn ctx + datalog-query)}) + (catch Throwable t + (deliver result-promise + {:ok false + :result t})) + (finally + ;; noop if we already delivered + (deliver result-promise + {:ok false + :result + (Exception. "Did not deliver promise!")}) + (deliver (:cancel-signal @result-delay) + false))))) + _cancel-fut (binding [ua/*child-vfutures* nil] + (ua/vfuture + (when @(:cancel-signal @result-delay) + (sql/cancel-in-progress stmt-tracker) + (future-cancel work-fut))))])) + (try + (if (realized? (:promise @result-delay)) + ;; The work is already done, so we don't need to listen for cancellation + (unwrap-result) + ;; Start a tracked future to watch for cancelation + (let [wait-fut (ua/vfuture (unwrap-result))] + (try + @wait-fut + (catch Throwable t + (when (and (not (realized? (:promise @result-delay))) + (or (instance? InterruptedException t) + (instance? CancellationException t))) + (let [{:keys [aborted?]} + (swap! result-delay + (fn [{:keys [watchers] :as state}] + (let [new-watchers (disj watchers watcher-id)] + (cond-> state + true (assoc :watchers new-watchers) + (empty? new-watchers) (assoc :aborted? true)))))] + + (when aborted? + (cancel!)))) + (throw t))))) + (finally + (swap! result-delay update :watchers disj watcher-id))))) ;; -------------- ;; datalog loader diff --git a/server/src/instant/util/logging_exporter.clj b/server/src/instant/util/logging_exporter.clj index 33f05633..a2cf6e80 100644 --- a/server/src/instant/util/logging_exporter.clj +++ b/server/src/instant/util/logging_exporter.clj @@ -125,7 +125,7 @@ "handle-refresh/send-event!" "store/record-datalog-query-finish!" "store/record-datalog-query-start!" - "store/swap-datalog-cache-delay!" + "store/swap-datalog-cache!" "store/bump-instaql-version!" "store/add-instaql-query!") true diff --git a/server/test/instant/jdbc/sql_test.clj b/server/test/instant/jdbc/sql_test.clj index 70808625..644548fe 100644 --- a/server/test/instant/jdbc/sql_test.clj +++ b/server/test/instant/jdbc/sql_test.clj @@ -20,7 +20,7 @@ 1000) (is (= 1 (count @(:stmts in-progress)))) (is (not (future-done? query))) - (sql/cancel-in-progress @(:stmts in-progress)) + (sql/cancel-in-progress in-progress) (wait-for (fn [] (future-done? query)) 1000) diff --git a/server/test/instant/reactive/store_test.clj b/server/test/instant/reactive/store_test.clj index 2966fd15..bfbc67b5 100644 --- a/server/test/instant/reactive/store_test.clj +++ b/server/test/instant/reactive/store_test.clj @@ -1,7 +1,8 @@ (ns instant.reactive.store-test (:require - [clojure.test :as test :refer [deftest is]] - [instant.reactive.store :as rs])) + [clojure.test :as test :refer [deftest is testing]] + [instant.reactive.store :as rs] + [instant.util.async :as ua])) (deftest match-topic? (is (true? @@ -21,5 +22,109 @@ '[:eav #{3} _ _] '[:eav #{1} #{2} #{3}])))) +(deftest swap-datalog-cache! + (let [store (rs/init-store) + app-id (random-uuid)] + (testing "store returns cached data" + (let [q [[:ea (random-uuid)]]] + (is (= :a (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + :a) + nil + q))) + (is (= :a (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + :b) + nil + q))))) + + (testing "store returns cached data with delay" + (let [q [[:ea (random-uuid)]]] + (is (= :a (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + (Thread/sleep 100) + :a) + nil + q))) + (is (= :a (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + :b) + nil + q))))) + + (testing "work is canceled with no listeners" + (let [q [[:ea (random-uuid)]] + err (promise) + started (promise) + f1 (ua/vfuture (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + (try + (deliver started true) + @(promise) + (catch Throwable t + (deliver err t)))) + nil + q))] + @started + (future-cancel f1) + (is (instance? java.lang.InterruptedException (deref err 100 :timeout))))) + + (dotimes [x 100] + (testing "work isn't canceled if there are still listeners" + (let [q [[:ea (random-uuid)]] + err (promise) + started (promise) + wait (promise) + f1 (ua/vfuture (try (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + (deliver started true) + ;; It looks like the promise is getting canceled somehow? + @wait) + nil + q) + (catch Throwable t + (deliver err t)))) + _ @started + f2 (ua/vfuture (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + @wait) + nil + q))] + (future-cancel f1) + (is (or (instance? java.lang.InterruptedException (deref err 100 :timeout)) + (instance? java.util.concurrent.CancellationException (deref err 100 :timeout)))) + (Thread/sleep 10) + (deliver wait :a) + (is (= (deref f2 100 :timeout) :a))))) + + (testing "propagates failures" + (let [q [[:ea (random-uuid)]] + r1 (try (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + (throw (Exception. "oops"))) + nil + q) + (catch Exception e + e))] + + (is (instance? Exception + r1)) + + (is (thrown? Exception + (rs/swap-datalog-cache! store + app-id + (fn [_ctx _query] + :shouldn't-be-executed) + nil + q))))))) + (comment (test/run-tests *ns*))