diff --git a/client/packages/core/src/Reactor.js b/client/packages/core/src/Reactor.js index 4bde34e3..0a98a670 100644 --- a/client/packages/core/src/Reactor.js +++ b/client/packages/core/src/Reactor.js @@ -13,7 +13,7 @@ import { buildPresenceSlice, hasPresenceResponseChanged } from "./presence"; import { Deferred } from "./utils/Deferred"; import { PersistedObject } from "./utils/PersistedObject"; import { extractTriples } from "./model/instaqlResult"; -import { areObjectsDeepEqual } from "./utils/object"; +import { areObjectsDeepEqual, assocIn } from "./utils/object"; import { createLinkIndex } from "./utils/linkIndex"; import version from "./version"; @@ -460,7 +460,11 @@ export default class Reactor { break; case "refresh-presence": const roomId = msg["room-id"]; - this._setPresencePeers(roomId, msg.data); + if (msg["edits"]) { + this._updatePresencePeers(roomId, msg["edits"]); + } else { + this._setPresencePeers(roomId, msg["data"]); + } this._notifyPresenceSubs(roomId); break; case "server-broadcast": @@ -1676,6 +1680,23 @@ export default class Reactor { handler.cb(slice); } + _updatePresencePeers(roomId, edits) { + const peers = this._presence[roomId]?.result?.peers || {}; + let sessions = Object.fromEntries( + Object.entries(peers).map(([k, v]) => [k, {data: v}]), + ); + sessions[this._sessionId] = {data: this._presence[roomId]?.result?.user}; + for (let [path, op, value] of edits) { + console.log('edit', path, op, value); + if (op === '+' || op === 'r') { + sessions = assocIn(sessions, path, value); + } + // TODO op === '-' + } + + this._setPresencePeers(roomId, sessions); + } + _setPresencePeers(roomId, data) { const sessions = { ...data }; // no need to keep track of `user` @@ -1684,9 +1705,7 @@ export default class Reactor { Object.entries(sessions).map(([k, v]) => [k, v.data]), ); - this._presence[roomId] = this._presence[roomId] || {}; - this._presence[roomId].result = this._presence[roomId].result || {}; - this._presence[roomId].result.peers = peers; + this._presence = assocIn(this._presence, [roomId, 'result', 'peers'], peers); } // -------- diff --git a/client/packages/core/src/utils/object.js b/client/packages/core/src/utils/object.js index 39c1ebcd..dff75f17 100644 --- a/client/packages/core/src/utils/object.js +++ b/client/packages/core/src/utils/object.js @@ -81,3 +81,21 @@ export function immutableDeepReplace(target, replaceValue, replacementValue) { export function isObject(val) { return typeof val === "object" && val !== null && !Array.isArray(val); } + +export function assocIn(obj, path, value) { + if (path.length === 0) { + return value; + } + + let current = obj || {}; + for (let i = 0; i < path.length - 1; i++) { + const key = path[i]; + if (!(key in current) || typeof current[key] !== 'object') { + current[key] = typeof path[i + 1] === 'number' ? [] : {}; + } + current = current[key]; + } + + current[path[path.length - 1]] = value; + return obj; +} diff --git a/server/deps.edn b/server/deps.edn index 7f7e0a82..14a6e3f6 100644 --- a/server/deps.edn +++ b/server/deps.edn @@ -24,6 +24,7 @@ org.slf4j/slf4j-api {:mvn/version "2.0.13"} cider/cider-nrepl {:mvn/version "0.45.0"} amazonica/amazonica {:mvn/version "0.3.167"} + juji/editscript {:mvn/version "0.6.4"} org.bouncycastle/bcprov-jdk15on {:mvn/version "1.70"} incanter/incanter {:mvn/version "1.9.3" diff --git a/server/src/instant/reactive/ephemeral.clj b/server/src/instant/reactive/ephemeral.clj index 30de8455..e177d048 100644 --- a/server/src/instant/reactive/ephemeral.clj +++ b/server/src/instant/reactive/ephemeral.clj @@ -4,6 +4,7 @@ [clojure.core.async :as a] [clojure.set :as set] [datascript.core :refer [squuid-time-millis]] + [editscript.core :as editscript] [instant.config :as config] [instant.flags :as flags] [instant.gauges :as gauges] @@ -31,7 +32,6 @@ ;; Setup (declare room-refresh-ch) -(defonce refresh-map-ch (a/chan 1024)) ;; Channel we use to keep the hazelcast maps in sync for ;; apps that aren't using hazelcast. This can go away when ;; we fully migrate to hazelcast @@ -49,7 +49,7 @@ (defn init-hz [store-conn] (-> (java.util.logging.Logger/getLogger "com.hazelcast.system.logo") - (.setLevel java.util.logging.Level/WARNING)) + (.setLevel java.util.logging.Level/WARNING)) (System/setProperty "hazelcast.shutdownhook.enabled" "false") (let [config (Config.) network-config (.getNetworkConfig config) @@ -176,43 +176,20 @@ ;; --------- ;; Hazelcast -(defn handle-refresh-event [store-conn room-key room-id] - (let [room-data (.get (get-hz-rooms-map) room-key) - session-ids (filter (fn [sess-id] - (rs/get-session @store-conn sess-id)) - (keys room-data))] - (doseq [sess-id session-ids - :let [q (:receive-q (rs/get-socket @store-conn sess-id))] - :when q] - (receive-queue/enqueue->receive-q q - {:op :refresh-presence - :app-id (:app-id room-key) - :room-id room-id - :data room-data - :session-id sess-id})))) - -(defn straight-jacket-refresh-event! - [store-conn {:keys [room-key room-id on-sent]}] - (try - (handle-refresh-event store-conn room-key room-id) - (catch Throwable t - (tracer/record-exception-span! t {:name "rooms-refresh-map/straight-jacket"})) - (finally (on-sent)))) - -(defn start-refresh-map-worker [store-conn ch] - (loop [event (a/receive-q q @@ -220,6 +197,7 @@ :app-id app-id :room-id room-id :data room-data + :edits edits :session-id sess-id}))))))) (defn handle-broadcast-message @@ -408,7 +386,7 @@ (defn refresh-rooms! [store-conn old-v new-v] (let [old-apps-rooms (get-in old-v [:rooms]) new-apps-rooms (get-in new-v [:rooms]) - changed-rooms (get-changed-rooms old-apps-rooms new-apps-rooms)] + changed-rooms (get-changed-rooms old-apps-rooms new-apps-rooms)] (when (seq changed-rooms) (tracer/with-span! {:name "refresh-rooms" :attributes {:room-ids (pr-str (map first changed-rooms))}} @@ -470,7 +448,6 @@ (defn stop [] (a/close! room-refresh-ch) - (a/close! refresh-map-ch) (when-let [q ^LinkedBlockingQueue @hz-ops-q] (.put q close-sentinel)) (reset! hz-ops-q nil) @@ -481,3 +458,9 @@ (defn restart [] (stop) (start)) + +(defn before-ns-unload [] + (stop)) + +(defn after-ns-reload [] + (start)) diff --git a/server/src/instant/reactive/session.clj b/server/src/instant/reactive/session.clj index cd72df7d..9b5fa3de 100644 --- a/server/src/instant/reactive/session.clj +++ b/server/src/instant/reactive/session.clj @@ -316,10 +316,12 @@ :client-event-id client-event-id}))) (defn- handle-refresh-presence! - [store-conn sess-id {:keys [app-id room-id data]}] + [store-conn sess-id {:keys [app-id room-id data edits]}] (rs/send-event! store-conn app-id sess-id {:op :refresh-presence :room-id room-id - :data data})) + :data (when (nil? edits) + data) + :edits edits})) (defn- handle-client-broadcast! "Broadcasts a client message to other sessions in the room"