Skip to content

Commit

Permalink
Moved diffs back to room level, proper consolidate for refresh-presen…
Browse files Browse the repository at this point in the history
…ce events
  • Loading branch information
tonsky committed Dec 27, 2024
1 parent ddb2db7 commit 11b51ac
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 97 deletions.
51 changes: 17 additions & 34 deletions server/src/instant/reactive/ephemeral.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(:require
[clojure.core.async :as a]
[clojure.set :as set]
[datascript.core :refer [squuid-time-millis]]
[editscript.core :as editscript]
[instant.config :as config]
[instant.flags :as flags]
[instant.gauges :as gauges]
Expand All @@ -24,7 +24,6 @@
EntryRemovedListener
EntryUpdatedListener)
(com.hazelcast.topic ITopic MessageListener Message)
(java.util AbstractMap$SimpleImmutableEntry)
(java.util.concurrent LinkedBlockingQueue)))

;; ------
Expand All @@ -40,7 +39,8 @@
;; on session close) and some info about the rooms we're subscribed to on
;; this machine
;; {:sessions {<session-id>: #{<room-id>}}
;; :rooms {<{app-id: app-id, room-id: room-id}> {:session-ids #{<sess-id>}}}}
;; :rooms {<{app-id: app-id, room-id: room-id}> {:session-ids #{<sess-id>}
;; :last-data <...>}}}
(defonce room-maps (atom {}))

(declare handle-event)
Expand Down Expand Up @@ -176,18 +176,25 @@
;; Hazelcast

(defn handle-event [store-conn ^DataAwareEntryEvent event]
(let [{:keys [app-id room-id] :as room-key} (.getKey event)]
(let [room-key (.getKey event)
{:keys [app-id room-id]} room-key
{:keys [session-ids last-data]} (get-in @room-maps [:rooms room-key])]
(when (flags/use-hazelcast? app-id)
(when (seq (get-in @room-maps [:rooms room-key :session-ids]))
(let [room-data (.get (get-hz-rooms-map) room-key)]
(when (seq session-ids)
(let [room-data (.get (get-hz-rooms-map) room-key)
edits (when last-data
(editscript/get-edits
(editscript/diff last-data room-data {:algo :a-star :str-diff :none})))]
(swap! room-maps assoc-in [:rooms room-key :last-data] room-data)
(doseq [[sess-id _] room-data
:let [q (:receive-q (rs/get-socket @store-conn sess-id))]
:when q]
(receive-queue/enqueue->receive-q q
{:op :refresh-presence
:app-id app-id
:room-id room-id
:data room-data
{:op :refresh-presence
:app-id app-id
:room-id room-id
:data room-data
:edits edits
:session-id sess-id})))))))

(defn handle-broadcast-message
Expand Down Expand Up @@ -252,30 +259,6 @@

(hz-util/remove-session! (get-hz-rooms-map) room-key sess-id)))

(defn clean-old-sessions []
(let [oldest-timestamp (aws-util/oldest-instance-timestamp)
hz-map (get-hz-rooms-map)]
(when-not oldest-timestamp
(throw (Exception. "Could not determine oldest instance timestamp")))
(doseq [^AbstractMap$SimpleImmutableEntry entry (.entrySet hz-map)
:let [{:keys [app-id room-id]} (.getKey entry)
v (.getValue entry)]
:when (and app-id room-id)
sess-id (keys v)
:let [squuid-timestamp (squuid-time-millis sess-id)]
:when (< squuid-timestamp oldest-timestamp)]
(tracer/with-span! {:name "clean-old-session"
:attributes {:app-id app-id
:room-id room-id
:session-id sess-id
:squuid-timestamp squuid-timestamp}}
(remove-session! app-id room-id sess-id)))
(doseq [^AbstractMap$SimpleImmutableEntry entry (.entrySet hz-map)
:let [{:keys [app-id room-id]} (.getKey entry)
v (.getValue entry)]
:when (and app-id room-id (empty? v))]
(remove-session! app-id room-id (random-uuid)))))

;; ----------
;; Public API

Expand Down
102 changes: 41 additions & 61 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
commands."
(:require
[clojure.main :refer [root-cause]]
[editscript.core :as editscript]
[instant.db.datalog :as d]
[instant.db.model.attr :as attr-model]
[instant.db.permissioned-transaction :as permissioned-tx]
Expand Down Expand Up @@ -316,21 +315,12 @@
:room-id room-id
:client-event-id client-event-id})))

; {sess-id -> {room-id -> data}}
(defonce last-sent-presence-data
(atom {}))

(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data]}]
(let [prev-data (get-in @last-sent-presence-data [sess-id room-id])
edits (when prev-data
(editscript/get-edits
(editscript/diff prev-data data {:algo :a-star :str-diff :none})))]
(swap! last-sent-presence-data assoc-in [sess-id room-id] data)
(rs/send-event! store-conn app-id sess-id {:op :refresh-presence
:room-id room-id
:data (when (nil? edits)
data)
:edits edits})))
(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data edits]}]
(rs/send-event! store-conn app-id sess-id
(cond-> {:op :refresh-presence
:room-id room-id}
(some? edits) (assoc :edits edits)
(nil? edits) (assoc :data data))))

(defn- handle-client-broadcast!
"Broadcasts a client message to other sessions in the room"
Expand Down Expand Up @@ -566,52 +556,43 @@
:session-id (:session-id (:item entry))
:item entry)}))))

(defn drop-consecutive-set-presence [batch]
(let [{:keys [entries-to-process last-entry]}
(reduce (fn [{:keys [entries-to-process
last-entry]}
entry]
(if (and (= :set-presence
(-> last-entry :item :op))
(= :set-presence
(-> entry :item :op)))
{:entries-to-process entries-to-process
:last-entry (assoc entry
:skipped-size
(inc (or (:skipped-size last-entry) 0)))}
{:entries-to-process (conj entries-to-process last-entry)
:last-entry entry}))
{:entries-to-process []
:last-entry (first batch)}
(rest batch))]
(conj entries-to-process last-entry)))
(defmulti consolidate
(fn [type batch]
(if (= 1 (count batch))
:default
type)))

(defmethod consolidate :default [_ batch]
batch)

(defmethod consolidate :refresh [_ batch]
[(-> (first batch)
(assoc :skipped-size (dec (count batch))))])

(defmethod consolidate :refresh-presence [_ batch]
[(-> (last batch)
(assoc :skipped-size (dec (count batch)))
(assoc-in [:item :edits]
(into [] (mapcat #(get-in % [:item :edits])) batch)))])

(defmethod consolidate :room [_ batch]
(loop [last-entry (first batch)
batch (next batch)
acc (transient [])]
(if (empty? batch)
(persistent! (conj! acc last-entry))
(let [entry (first batch)
last-entry' (if (and (= :set-presence (-> last-entry :item :op))
(= :set-presence (-> entry :item :op)))
(assoc entry :skipped-size (inc (:skipped-size last-entry 0)))
entry)]
(recur last-entry' (next batch) acc)))))

(defn straight-jacket-process-receive-q-batch [store-conn eph-store-atom batch metadata]
(try
(let [t (-> metadata :group-key first)]
(if (= 1 (count batch))
(process-receive-q-entry store-conn eph-store-atom (first batch) metadata)
(case t
:refresh
(process-receive-q-entry store-conn
eph-store-atom
(assoc (first batch)
:skipped-size (dec (count batch)))
metadata)

:refresh-presence
(process-receive-q-entry store-conn
eph-store-atom
(assoc (last batch)
:skipped-size (dec (count batch)))
metadata)

:room
(doseq [entry (drop-consecutive-set-presence batch)]
(process-receive-q-entry store-conn
eph-store-atom
entry
metadata)))))
(let [type (-> metadata :group-key first)]
(doseq [entry (consolidate type batch)]
(process-receive-q-entry store-conn eph-store-atom entry metadata)))
(catch Throwable e
(tracer/record-exception-span! e {:name "receive-worker/handle-receive-batch-straight-jacket"
:attributes (assoc metadata
Expand Down Expand Up @@ -668,7 +649,6 @@
:escaping? false})))

(defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}]
(swap! last-sent-presence-data dissoc id)
(tracer/with-span! {:name "socket/on-close"
:attributes {:session-id id}}
(doseq [{:keys [op
Expand Down Expand Up @@ -723,7 +703,7 @@
;; System

(defn group-fn [{:keys [item] :as _input}]
(let [{:keys [session-id op room-id]} item]
(let [{:keys [op session-id room-id]} item]
(case op
:transact
[:transact session-id]
Expand Down
4 changes: 2 additions & 2 deletions server/src/tool.clj
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@
el ^StackTraceElement (nth trace 4)]
(str "[" (Compiler/demunge (.getClassName el)) " " (.getFileName el) ":" (.getLineNumber el) "]")))

(defn p-impl [position form res]
(defn p-impl [_position form res]
(let [form (walk/postwalk
(fn [form]
(if (and
Expand All @@ -191,7 +191,7 @@
form))
form)]
(locking p-lock
(println (str position " #p " form " => " (pr-str res))))
(println (str #_position "#p " form " => " (pr-str res))))
res))

(defn p
Expand Down

0 comments on commit 11b51ac

Please sign in to comment.