From e85bd739ee0d51ddab5c8854ddbd31b0b784c327 Mon Sep 17 00:00:00 2001 From: Nikita Prokopov Date: Thu, 26 Dec 2024 19:51:25 +0100 Subject: [PATCH] Calculate diffs after batching, support '-' in edits (dissocIn), clean up session data after disconnect --- client/packages/core/src/Reactor.js | 7 +++--- client/packages/core/src/utils/object.js | 30 +++++++++++++++++++++++ server/src/instant/reactive/ephemeral.clj | 12 +-------- server/src/instant/reactive/session.clj | 26 ++++++++++++++------ 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/client/packages/core/src/Reactor.js b/client/packages/core/src/Reactor.js index 0a98a670d..274432e0b 100644 --- a/client/packages/core/src/Reactor.js +++ b/client/packages/core/src/Reactor.js @@ -13,7 +13,7 @@ import { buildPresenceSlice, hasPresenceResponseChanged } from "./presence"; import { Deferred } from "./utils/Deferred"; import { PersistedObject } from "./utils/PersistedObject"; import { extractTriples } from "./model/instaqlResult"; -import { areObjectsDeepEqual, assocIn } from "./utils/object"; +import { areObjectsDeepEqual, assocIn, dissocIn } from "./utils/object"; import { createLinkIndex } from "./utils/linkIndex"; import version from "./version"; @@ -1687,11 +1687,12 @@ export default class Reactor { ); sessions[this._sessionId] = {data: this._presence[roomId]?.result?.user}; for (let [path, op, value] of edits) { - console.log('edit', path, op, value); if (op === '+' || op === 'r') { sessions = assocIn(sessions, path, value); } - // TODO op === '-' + if (op === '-') { + sessions = dissocIn(sessions, path); + } } this._setPresencePeers(roomId, sessions); diff --git a/client/packages/core/src/utils/object.js b/client/packages/core/src/utils/object.js index dff75f177..10d875205 100644 --- a/client/packages/core/src/utils/object.js +++ b/client/packages/core/src/utils/object.js @@ -99,3 +99,33 @@ export function assocIn(obj, path, value) { current[path[path.length - 1]] = value; return obj; } + +export function dissocIn(obj, path) { + if (path.length === 0) { + return undefined; + } + + const [key, ...restPath] = path; + + if (!(key in obj)) { + return obj; + } + + if (restPath.length === 0) { + delete obj[key]; + return isEmpty(obj) ? undefined : obj; + } + + const child = dissocIn(obj[key], restPath); + + if (child === undefined) { + delete obj[key]; + return isEmpty(obj) ? undefined : obj; + } + + return obj; +} + +function isEmpty(obj) { + return obj && Object.keys(obj).length === 0; +} \ No newline at end of file diff --git a/server/src/instant/reactive/ephemeral.clj b/server/src/instant/reactive/ephemeral.clj index e177d0483..d7d42e715 100644 --- a/server/src/instant/reactive/ephemeral.clj +++ b/server/src/instant/reactive/ephemeral.clj @@ -4,7 +4,6 @@ [clojure.core.async :as a] [clojure.set :as set] [datascript.core :refer [squuid-time-millis]] - [editscript.core :as editscript] [instant.config :as config] [instant.flags :as flags] [instant.gauges :as gauges] @@ -176,19 +175,11 @@ ;; --------- ;; Hazelcast -(def last-sent-state - (atom {})) - (defn handle-event [store-conn ^DataAwareEntryEvent event] (let [{:keys [app-id room-id] :as room-key} (.getKey event)] (when (flags/use-hazelcast? app-id) (when (seq (get-in @room-maps [:rooms room-key :session-ids])) - (let [room-data (.get (get-hz-rooms-map) room-key) - prev-room-data (get @last-sent-state room-key) - edits (when prev-room-data - (editscript/get-edits - (editscript/diff prev-room-data room-data {:algo :a-star :str-diff :none})))] - (swap! last-sent-state assoc room-key room-data) + (let [room-data (.get (get-hz-rooms-map) room-key)] (doseq [[sess-id _] room-data :let [q (:receive-q (rs/get-socket @store-conn sess-id))] :when q] @@ -197,7 +188,6 @@ :app-id app-id :room-id room-id :data room-data - :edits edits :session-id sess-id}))))))) (defn handle-broadcast-message diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index 9b5fa3dee..10ed0e478 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -8,6 +8,7 @@ commands." (:require [clojure.main :refer [root-cause]] + [editscript.core :as editscript] [instant.db.datalog :as d] [instant.db.model.attr :as attr-model] [instant.db.permissioned-transaction :as permissioned-tx] @@ -315,13 +316,21 @@ :room-id room-id :client-event-id client-event-id}))) -(defn- handle-refresh-presence! - [store-conn sess-id {:keys [app-id room-id data edits]}] - (rs/send-event! store-conn app-id sess-id {:op :refresh-presence - :room-id room-id - :data (when (nil? edits) - data) - :edits edits})) +; {sess-id -> {room-id -> data}} +(defonce last-sent-presence-data + (atom {})) + +(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data edits]}] + (let [prev-data (get-in @last-sent-presence-data [sess-id room-id]) + edits (when prev-data + (editscript/get-edits + (editscript/diff prev-data data {:algo :a-star :str-diff :none})))] + (swap! last-sent-presence-data assoc-in [sess-id room-id] data) + (rs/send-event! store-conn app-id sess-id {:op :refresh-presence + :room-id room-id + :data (when (nil? edits) + data) + :edits edits}))) (defn- handle-client-broadcast! "Broadcasts a client message to other sessions in the room" @@ -659,6 +668,7 @@ :escaping? false}))) (defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}] + (swap! last-sent-presence-data dissoc id) (tracer/with-span! {:name "socket/on-close" :attributes {:session-id id}} (doseq [{:keys [op @@ -713,7 +723,7 @@ ;; System (defn group-fn [{:keys [item] :as _input}] - (let [{:keys [session-id op room-id]} item] + (let [{:keys [session-id op app-id room-id]} item] (case op :transact [:transact session-id]