Skip to content

Commit

Permalink
Use the grouped queue in the invalidator (#714)
Browse files Browse the repository at this point in the history
  • Loading branch information
dwwoelfel authored Jan 15, 2025
1 parent 527ba63 commit abb7946
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 87 deletions.
120 changes: 89 additions & 31 deletions server/src/instant/reactive/invalidator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
[instant.config :as config]
[instant.db.model.attr :as attr-model]
[instant.db.pg-introspect :as pg-introspect]
[instant.gauges :as gauges]
[instant.grouped-queue :as grouped-queue]
[instant.jdbc.aurora :as aurora]
[instant.jdbc.wal :as wal]
[instant.model.app :as app-model]
Expand All @@ -19,7 +21,7 @@
[instant.db.model.triple :as triple-model])
(:import
(java.sql Timestamp)
(java.time Instant)
(java.time Duration Instant)
(java.time.temporal ChronoUnit)
(java.util UUID)
(org.postgresql.replication LogSequenceNumber)))
Expand Down Expand Up @@ -149,7 +151,8 @@
(defn- invalidate!
"Given a collection of changes, stales all relevant queries and returns
sockets to be refreshed."
[store-conn {:keys [app-id tx-id] :as wal-record}]
;; process-id used for tests
[_process-id store-conn {:keys [app-id tx-id] :as wal-record}]
(let [topics (topics-for-changes wal-record)
[db session-ids] (rs/mark-stale-topics! store-conn app-id tx-id topics)
sockets (keep (partial rs/get-socket db) session-ids)]
Expand Down Expand Up @@ -276,6 +279,8 @@
(let [tx-id (extract-tx-id transactions-change)]
(e2e-tracer/invalidator-tracking-step! {:tx-id tx-id
:name "transform-wal-record"})
;; n.b. make sure to update combine-wal-records below if new
;; items are added to this map
{:attr-changes attrs
:ident-changes idents
:triple-changes triples
Expand All @@ -288,6 +293,30 @@
[]
(keep #'transform-wal-record))

(defn combine-wal-records
"Combines a list of wal-records into a single wal-record.
We combine all of the change lists and advance the tx-id to the
latest tx-id in the list."
[wal-records]
(reduce (fn [acc {:keys [attr-changes
ident-changes
triple-changes
app-id
tx-id]}]
;; Complain loudly if we accidently mix wal-records from multiple apps
(assert (= (:app-id acc) app-id) "app-id mismatch in combine-wal-records")
(e2e-tracer/invalidator-tracking-step! {:tx-id (:tx-id acc)
:name "skipped-in-combined-wal-record"})

;; Keep the old tx-created-at so that we see the
;; worst case wal-latency-ms
(-> acc
(update :attr-changes (fnil into []) attr-changes)
(update :ident-changes (fnil into []) ident-changes)
(update :triple-changes (fnil into []) triple-changes)
(assoc :tx-id tx-id)))
wal-records))

(defn transform-byop-wal-record [{:keys [changes nextlsn]}]
;; TODO(byop): if change is empty, then there might be changes to the schema
(let [triple-changes (filter (fn [c]
Expand All @@ -308,34 +337,64 @@
(when tx-created-at
(.between ChronoUnit/MILLIS tx-created-at (Instant/now))))

(defn start-worker [store-conn wal-chan]
(defn process-wal-record [process-id store-conn record-count wal-record]
(let [{:keys [app-id tx-id]} wal-record]
(tracer/with-span! {:name "invalidator/work"
:attributes {:app-id app-id
:tx-id tx-id
:wal-record-count record-count
:wal-latency-ms (wal-latency-ms wal-record)}}

(try
(let [sockets (invalidate! process-id store-conn wal-record)]
(tracer/add-data! {:attributes {:num-sockets (count sockets)}})
(e2e-tracer/invalidator-tracking-step! {:tx-id tx-id
:name "send-refreshes"
:attributes {:num-sockets (count sockets)}})
(tracer/with-span! {:name "invalidator/send-refreshes"}
(doseq [{:keys [id]} sockets]
(receive-queue/enqueue->receive-q {:op :refresh
:session-id id
:tx-id tx-id}))))
(catch Throwable t
(def -wal-record wal-record)
(def -store-value @store-conn)
(tracer/add-exception! t {:escaping? false}))))))

(defn invalidator-q-metrics [{:keys [grouped-queue get-worker-count]}]
[{:path "instant.reactive.invalidator.q.size"
:value (grouped-queue/size grouped-queue)}
{:path "instant.reactive.invalidator.q.longest-waiting-ms"
:value (if-let [{:keys [put-at]} (grouped-queue/peek grouped-queue)]
(.toMillis (Duration/between put-at (Instant/now)))
0)}
{:path "instant.reactive.invalidator.q.worker-count"
:value (get-worker-count)}])

(defn start-worker [process-id store-conn wal-chan]
(tracer/record-info! {:name "invalidation-worker/start"})
(loop []
(let [wal-record (a/<!! wal-chan)]
(if-not wal-record
(tracer/record-info! {:name "invalidation-worker/shutdown"})
(let [{:keys [app-id tx-id]} wal-record]
(tracer/with-span! {:name "invalidator/work"
:attributes {:app-id app-id
:tx-id tx-id
:wal-latency-ms (wal-latency-ms wal-record)}}

(try
(let [sockets (invalidate! store-conn wal-record)]
(tracer/add-data! {:attributes {:num-sockets (count sockets)}})
(e2e-tracer/invalidator-tracking-step! {:tx-id tx-id
:name "send-refreshes"
:attributes {:num-sockets (count sockets)}})
(tracer/with-span! {:name "invalidator/send-refreshes"}
(doseq [{:keys [id]} sockets]
(receive-queue/enqueue->receive-q {:op :refresh
:session-id id
:tx-id tx-id}))))
(catch Throwable t
(def -wal-record wal-record)
(def -store-value @store-conn)
(tracer/add-exception! t {:escaping? false}))))
(recur))))))
(let [queue-with-workers
(grouped-queue/start-grouped-queue-with-workers
{:group-fn :app-id
:reserve-fn (fn [_ q] (grouped-queue/inflight-queue-reserve 100 q))
:process-fn (fn [_key wal-records]
(process-wal-record process-id
store-conn
(count wal-records)
(combine-wal-records wal-records)))
:max-workers 10})
grouped-queue (:grouped-queue queue-with-workers)
cleanup-gauges (gauges/add-gauge-metrics-fn
(fn [_] (invalidator-q-metrics queue-with-workers)))]
(a/go
(loop []
(let [wal-record (a/<! wal-chan)]
(if-not wal-record
(do
(cleanup-gauges)
(tracer/record-info! {:name "invalidation-worker/shutdown"}))
(do (grouped-queue/put! grouped-queue wal-record)
(recur))))))))

(defn handle-byop-record [table-info app-id store-conn wal-record]
(when-let [record (transform-byop-wal-record wal-record)]
Expand Down Expand Up @@ -425,8 +484,7 @@

@(:started-promise wal-opts)

(ua/fut-bg
(start-worker rs/store-conn worker-chan))
(start-worker process-id rs/store-conn worker-chan)

(when byop-chan
(ua/fut-bg
Expand Down
105 changes: 49 additions & 56 deletions server/test/instant/reactive/invalidator_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,6 @@
(inv/topics-for-changes {:ident-changes delete-ident-changes
:attr-changes delete-attr-changes})))))

;; Use this to distinguish calls to `invalidate!` from our invalidator
;; in the with-redefs. Without this, we might get changes from the
;; global invalidator process.
(def ^:dynamic *inside* nil)

(defn ->md5 [s]
(-> s
crypt-util/str->md5
Expand All @@ -401,61 +396,59 @@
(with-zeneca-app
(fn [app r]
(let [invalidate! (var-get #'inv/invalidate!)
records (atom [])]
records (atom [])
machine-id (string/replace (str "test-" (random-uuid))
#"-"
"_")]
(with-redefs [inv/invalidate!
(fn [store-conn {:keys [app-id tx-id] :as wal-record}]
(if (and (= (:id app) app-id)
*inside*)
(fn [process-id store-conn {:keys [app-id tx-id] :as wal-record}]
(if (and (= machine-id process-id) (= (:id app) app-id))
(swap! records conj wal-record)
(invalidate! store-conn wal-record)))]
(binding [*inside* true]
(let [machine-id (string/replace (str "test-" (random-uuid))
#"-"
"_")
process (inv/start machine-id)
uid (random-uuid)]
(try
(tx/transact! (aurora/conn-pool)
(attr-model/get-by-app-id (:id app))
(:id app)
[[:add-triple uid (resolvers/->uuid r :users/id) uid]
[:add-triple uid (resolvers/->uuid r :users/handle) "dww"]])
(wait-for (fn []
(< 0 (count @records)))
1000)
(is (= 1 (count @records)))
(let [rec (first @records)]
(is (pos? (:tx-id rec)))
(is (= (set (map (fn [change]
(-> change
xform-change
(dissoc "created_at")))
(:triple-changes rec)))
#{{"eav" false,
"av" true,
"ave" true,
"value_md5" "057a88732b390295a8623cfd3cb799d9",
"entity_id" (str uid)
"attr_id" (str (resolvers/->uuid r :users/handle))
"ea" true,
"value" "\"dww\"",
"vae" false,
"app_id" (str (:id app))
"checked_data_type" nil}
{"eav" true,
"av" true,
"ave" true,
"value_md5" (->md5 (->json (str uid)))
"entity_id" (str uid)
"attr_id" (str (resolvers/->uuid r :users/id))
"ea" true,
"value" (->json (str uid))
"vae" true,
"app_id" (str (:id app))
"checked_data_type" nil}})))
(let [process (inv/start machine-id)
uid (random-uuid)]
(try
(tx/transact! (aurora/conn-pool)
(attr-model/get-by-app-id (:id app))
(:id app)
[[:add-triple uid (resolvers/->uuid r :users/id) uid]
[:add-triple uid (resolvers/->uuid r :users/handle) "dww"]])
(wait-for (fn []
(< 0 (count @records)))
1000)
(is (= 1 (count @records)))
(let [rec (first @records)]
(is (pos? (:tx-id rec)))
(is (= (set (map (fn [change]
(-> change
xform-change
(dissoc "created_at")))
(:triple-changes rec)))
#{{"eav" false,
"av" true,
"ave" true,
"value_md5" "057a88732b390295a8623cfd3cb799d9",
"entity_id" (str uid)
"attr_id" (str (resolvers/->uuid r :users/handle))
"ea" true,
"value" "\"dww\"",
"vae" false,
"app_id" (str (:id app))
"checked_data_type" nil}
{"eav" true,
"av" true,
"ave" true,
"value_md5" (->md5 (->json (str uid)))
"entity_id" (str uid)
"attr_id" (str (resolvers/->uuid r :users/id))
"ea" true,
"value" (->json (str uid))
"vae" true,
"app_id" (str (:id app))
"checked_data_type" nil}})))

(finally
(inv/stop process))))))))))
(finally
(inv/stop process)))))))))

(comment
(test/run-tests *ns*))

0 comments on commit abb7946

Please sign in to comment.