From 11b51ac00f7cd0d7d837215beccc8bfd07898661 Mon Sep 17 00:00:00 2001 From: Nikita Prokopov Date: Fri, 27 Dec 2024 20:08:17 +0100 Subject: [PATCH] Moved diffs back to room level, proper consolidate for refresh-presence events --- server/src/instant/reactive/ephemeral.clj | 51 ++++------- server/src/instant/reactive/session.clj | 102 +++++++++------------- server/src/tool.clj | 4 +- 3 files changed, 60 insertions(+), 97 deletions(-) diff --git a/server/src/instant/reactive/ephemeral.clj b/server/src/instant/reactive/ephemeral.clj index d7d42e715..99a1da141 100644 --- a/server/src/instant/reactive/ephemeral.clj +++ b/server/src/instant/reactive/ephemeral.clj @@ -3,7 +3,7 @@ (:require [clojure.core.async :as a] [clojure.set :as set] - [datascript.core :refer [squuid-time-millis]] + [editscript.core :as editscript] [instant.config :as config] [instant.flags :as flags] [instant.gauges :as gauges] @@ -24,7 +24,6 @@ EntryRemovedListener EntryUpdatedListener) (com.hazelcast.topic ITopic MessageListener Message) - (java.util AbstractMap$SimpleImmutableEntry) (java.util.concurrent LinkedBlockingQueue))) ;; ------ @@ -40,7 +39,8 @@ ;; on session close) and some info about the rooms we're subscribed to on ;; this machine ;; {:sessions {: #{}} -;; :rooms {<{app-id: app-id, room-id: room-id}> {:session-ids #{}}}} +;; :rooms {<{app-id: app-id, room-id: room-id}> {:session-ids #{} +;; :last-data <...>}}} (defonce room-maps (atom {})) (declare handle-event) @@ -176,18 +176,25 @@ ;; Hazelcast (defn handle-event [store-conn ^DataAwareEntryEvent event] - (let [{:keys [app-id room-id] :as room-key} (.getKey event)] + (let [room-key (.getKey event) + {:keys [app-id room-id]} room-key + {:keys [session-ids last-data]} (get-in @room-maps [:rooms room-key])] (when (flags/use-hazelcast? app-id) - (when (seq (get-in @room-maps [:rooms room-key :session-ids])) - (let [room-data (.get (get-hz-rooms-map) room-key)] + (when (seq session-ids) + (let [room-data (.get (get-hz-rooms-map) room-key) + edits (when last-data + (editscript/get-edits + (editscript/diff last-data room-data {:algo :a-star :str-diff :none})))] + (swap! room-maps assoc-in [:rooms room-key :last-data] room-data) (doseq [[sess-id _] room-data :let [q (:receive-q (rs/get-socket @store-conn sess-id))] :when q] (receive-queue/enqueue->receive-q q - {:op :refresh-presence - :app-id app-id - :room-id room-id - :data room-data + {:op :refresh-presence + :app-id app-id + :room-id room-id + :data room-data + :edits edits :session-id sess-id}))))))) (defn handle-broadcast-message @@ -252,30 +259,6 @@ (hz-util/remove-session! (get-hz-rooms-map) room-key sess-id))) -(defn clean-old-sessions [] - (let [oldest-timestamp (aws-util/oldest-instance-timestamp) - hz-map (get-hz-rooms-map)] - (when-not oldest-timestamp - (throw (Exception. "Could not determine oldest instance timestamp"))) - (doseq [^AbstractMap$SimpleImmutableEntry entry (.entrySet hz-map) - :let [{:keys [app-id room-id]} (.getKey entry) - v (.getValue entry)] - :when (and app-id room-id) - sess-id (keys v) - :let [squuid-timestamp (squuid-time-millis sess-id)] - :when (< squuid-timestamp oldest-timestamp)] - (tracer/with-span! {:name "clean-old-session" - :attributes {:app-id app-id - :room-id room-id - :session-id sess-id - :squuid-timestamp squuid-timestamp}} - (remove-session! app-id room-id sess-id))) - (doseq [^AbstractMap$SimpleImmutableEntry entry (.entrySet hz-map) - :let [{:keys [app-id room-id]} (.getKey entry) - v (.getValue entry)] - :when (and app-id room-id (empty? v))] - (remove-session! app-id room-id (random-uuid))))) - ;; ---------- ;; Public API diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index efe3b5938..27b39a4bc 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -8,7 +8,6 @@ commands." (:require [clojure.main :refer [root-cause]] - [editscript.core :as editscript] [instant.db.datalog :as d] [instant.db.model.attr :as attr-model] [instant.db.permissioned-transaction :as permissioned-tx] @@ -316,21 +315,12 @@ :room-id room-id :client-event-id client-event-id}))) -; {sess-id -> {room-id -> data}} -(defonce last-sent-presence-data - (atom {})) - -(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data]}] - (let [prev-data (get-in @last-sent-presence-data [sess-id room-id]) - edits (when prev-data - (editscript/get-edits - (editscript/diff prev-data data {:algo :a-star :str-diff :none})))] - (swap! last-sent-presence-data assoc-in [sess-id room-id] data) - (rs/send-event! store-conn app-id sess-id {:op :refresh-presence - :room-id room-id - :data (when (nil? edits) - data) - :edits edits}))) +(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data edits]}] + (rs/send-event! store-conn app-id sess-id + (cond-> {:op :refresh-presence + :room-id room-id} + (some? edits) (assoc :edits edits) + (nil? edits) (assoc :data data)))) (defn- handle-client-broadcast! "Broadcasts a client message to other sessions in the room" @@ -566,52 +556,43 @@ :session-id (:session-id (:item entry)) :item entry)})))) -(defn drop-consecutive-set-presence [batch] - (let [{:keys [entries-to-process last-entry]} - (reduce (fn [{:keys [entries-to-process - last-entry]} - entry] - (if (and (= :set-presence - (-> last-entry :item :op)) - (= :set-presence - (-> entry :item :op))) - {:entries-to-process entries-to-process - :last-entry (assoc entry - :skipped-size - (inc (or (:skipped-size last-entry) 0)))} - {:entries-to-process (conj entries-to-process last-entry) - :last-entry entry})) - {:entries-to-process [] - :last-entry (first batch)} - (rest batch))] - (conj entries-to-process last-entry))) +(defmulti consolidate + (fn [type batch] + (if (= 1 (count batch)) + :default + type))) + +(defmethod consolidate :default [_ batch] + batch) + +(defmethod consolidate :refresh [_ batch] + [(-> (first batch) + (assoc :skipped-size (dec (count batch))))]) + +(defmethod consolidate :refresh-presence [_ batch] + [(-> (last batch) + (assoc :skipped-size (dec (count batch))) + (assoc-in [:item :edits] + (into [] (mapcat #(get-in % [:item :edits])) batch)))]) + +(defmethod consolidate :room [_ batch] + (loop [last-entry (first batch) + batch (next batch) + acc (transient [])] + (if (empty? batch) + (persistent! (conj! acc last-entry)) + (let [entry (first batch) + last-entry' (if (and (= :set-presence (-> last-entry :item :op)) + (= :set-presence (-> entry :item :op))) + (assoc entry :skipped-size (inc (:skipped-size last-entry 0))) + entry)] + (recur last-entry' (next batch) acc))))) (defn straight-jacket-process-receive-q-batch [store-conn eph-store-atom batch metadata] (try - (let [t (-> metadata :group-key first)] - (if (= 1 (count batch)) - (process-receive-q-entry store-conn eph-store-atom (first batch) metadata) - (case t - :refresh - (process-receive-q-entry store-conn - eph-store-atom - (assoc (first batch) - :skipped-size (dec (count batch))) - metadata) - - :refresh-presence - (process-receive-q-entry store-conn - eph-store-atom - (assoc (last batch) - :skipped-size (dec (count batch))) - metadata) - - :room - (doseq [entry (drop-consecutive-set-presence batch)] - (process-receive-q-entry store-conn - eph-store-atom - entry - metadata))))) + (let [type (-> metadata :group-key first)] + (doseq [entry (consolidate type batch)] + (process-receive-q-entry store-conn eph-store-atom entry metadata))) (catch Throwable e (tracer/record-exception-span! e {:name "receive-worker/handle-receive-batch-straight-jacket" :attributes (assoc metadata @@ -668,7 +649,6 @@ :escaping? false}))) (defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}] - (swap! last-sent-presence-data dissoc id) (tracer/with-span! {:name "socket/on-close" :attributes {:session-id id}} (doseq [{:keys [op @@ -723,7 +703,7 @@ ;; System (defn group-fn [{:keys [item] :as _input}] - (let [{:keys [session-id op room-id]} item] + (let [{:keys [op session-id room-id]} item] (case op :transact [:transact session-id] diff --git a/server/src/tool.clj b/server/src/tool.clj index 87fecf0a2..5b827279b 100644 --- a/server/src/tool.clj +++ b/server/src/tool.clj @@ -181,7 +181,7 @@ el ^StackTraceElement (nth trace 4)] (str "[" (Compiler/demunge (.getClassName el)) " " (.getFileName el) ":" (.getLineNumber el) "]"))) -(defn p-impl [position form res] +(defn p-impl [_position form res] (let [form (walk/postwalk (fn [form] (if (and @@ -191,7 +191,7 @@ form)) form)] (locking p-lock - (println (str position " #p " form " => " (pr-str res)))) + (println (str #_position "#p " form " => " (pr-str res)))) res)) (defn p