Skip to content

Commit

Permalink
Initial: Send diffs for presence
Browse files Browse the repository at this point in the history
  • Loading branch information
tonsky committed Dec 24, 2024
1 parent 835a282 commit 35632a7
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 43 deletions.
29 changes: 24 additions & 5 deletions client/packages/core/src/Reactor.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { buildPresenceSlice, hasPresenceResponseChanged } from "./presence";
import { Deferred } from "./utils/Deferred";
import { PersistedObject } from "./utils/PersistedObject";
import { extractTriples } from "./model/instaqlResult";
import { areObjectsDeepEqual } from "./utils/object";
import { areObjectsDeepEqual, assocIn } from "./utils/object";
import { createLinkIndex } from "./utils/linkIndex";
import version from "./version";

Expand Down Expand Up @@ -460,7 +460,11 @@ export default class Reactor {
break;
case "refresh-presence":
const roomId = msg["room-id"];
this._setPresencePeers(roomId, msg.data);
if (msg["edits"]) {
this._updatePresencePeers(roomId, msg["edits"]);
} else {
this._setPresencePeers(roomId, msg["data"]);
}
this._notifyPresenceSubs(roomId);
break;
case "server-broadcast":
Expand Down Expand Up @@ -1676,6 +1680,23 @@ export default class Reactor {
handler.cb(slice);
}

_updatePresencePeers(roomId, edits) {
const peers = this._presence[roomId]?.result?.peers || {};
let sessions = Object.fromEntries(
Object.entries(peers).map(([k, v]) => [k, {data: v}]),
);
sessions[this._sessionId] = {data: this._presence[roomId]?.result?.user};
for (let [path, op, value] of edits) {
console.log('edit', path, op, value);
if (op === '+' || op === 'r') {
sessions = assocIn(sessions, path, value);
}
// TODO op === '-'
}

this._setPresencePeers(roomId, sessions);
}

_setPresencePeers(roomId, data) {
const sessions = { ...data };
// no need to keep track of `user`
Expand All @@ -1684,9 +1705,7 @@ export default class Reactor {
Object.entries(sessions).map(([k, v]) => [k, v.data]),
);

this._presence[roomId] = this._presence[roomId] || {};
this._presence[roomId].result = this._presence[roomId].result || {};
this._presence[roomId].result.peers = peers;
this._presence = assocIn(this._presence, [roomId, 'result', 'peers'], peers);
}

// --------
Expand Down
18 changes: 18 additions & 0 deletions client/packages/core/src/utils/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,21 @@ export function immutableDeepReplace(target, replaceValue, replacementValue) {
export function isObject(val) {
return typeof val === "object" && val !== null && !Array.isArray(val);
}

export function assocIn(obj, path, value) {
if (path.length === 0) {
return value;
}

let current = obj || {};
for (let i = 0; i < path.length - 1; i++) {
const key = path[i];
if (!(key in current) || typeof current[key] !== 'object') {
current[key] = typeof path[i + 1] === 'number' ? [] : {};
}
current = current[key];
}

current[path[path.length - 1]] = value;
return obj;
}
1 change: 1 addition & 0 deletions server/deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
org.slf4j/slf4j-api {:mvn/version "2.0.13"}
cider/cider-nrepl {:mvn/version "0.45.0"}
amazonica/amazonica {:mvn/version "0.3.167"}
juji/editscript {:mvn/version "0.6.4"}

org.bouncycastle/bcprov-jdk15on {:mvn/version "1.70"}
incanter/incanter {:mvn/version "1.9.3"
Expand Down
55 changes: 19 additions & 36 deletions server/src/instant/reactive/ephemeral.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
[clojure.core.async :as a]
[clojure.set :as set]
[datascript.core :refer [squuid-time-millis]]
[editscript.core :as editscript]
[instant.config :as config]
[instant.flags :as flags]
[instant.gauges :as gauges]
Expand Down Expand Up @@ -31,7 +32,6 @@
;; Setup

(declare room-refresh-ch)
(defonce refresh-map-ch (a/chan 1024))
;; Channel we use to keep the hazelcast maps in sync for
;; apps that aren't using hazelcast. This can go away when
;; we fully migrate to hazelcast
Expand All @@ -49,7 +49,7 @@

(defn init-hz [store-conn]
(-> (java.util.logging.Logger/getLogger "com.hazelcast.system.logo")
(.setLevel java.util.logging.Level/WARNING))
(.setLevel java.util.logging.Level/WARNING))
(System/setProperty "hazelcast.shutdownhook.enabled" "false")
(let [config (Config.)
network-config (.getNetworkConfig config)
Expand Down Expand Up @@ -176,50 +176,28 @@
;; ---------
;; Hazelcast

(defn handle-refresh-event [store-conn room-key room-id]
(let [room-data (.get (get-hz-rooms-map) room-key)
session-ids (filter (fn [sess-id]
(rs/get-session @store-conn sess-id))
(keys room-data))]
(doseq [sess-id session-ids
:let [q (:receive-q (rs/get-socket @store-conn sess-id))]
:when q]
(receive-queue/enqueue->receive-q q
{:op :refresh-presence
:app-id (:app-id room-key)
:room-id room-id
:data room-data
:session-id sess-id}))))

(defn straight-jacket-refresh-event!
[store-conn {:keys [room-key room-id on-sent]}]
(try
(handle-refresh-event store-conn room-key room-id)
(catch Throwable t
(tracer/record-exception-span! t {:name "rooms-refresh-map/straight-jacket"}))
(finally (on-sent))))

(defn start-refresh-map-worker [store-conn ch]
(loop [event (a/<!! ch)]
(if (nil? event)
(tracer/record-info! {:name "room-refresh-map/closed"})
(do
(straight-jacket-refresh-event! store-conn event)
(recur (a/<!! ch))))))
(def last-sent-state
(atom {}))

(defn handle-event [store-conn ^DataAwareEntryEvent event]
(let [{:keys [app-id room-id] :as room-key} (.getKey event)]
(when (flags/use-hazelcast? app-id)
(when (seq (get-in @room-maps [:rooms room-key :session-ids]))
(let [room-data (.get (get-hz-rooms-map) room-key)]
(doseq [sess-id (keys room-data)
(let [room-data (.get (get-hz-rooms-map) room-key)
prev-room-data (get @last-sent-state room-key)
edits (when prev-room-data
(editscript/get-edits
(editscript/diff prev-room-data room-data {:algo :a-star :str-diff :none})))]
(swap! last-sent-state assoc room-key room-data)
(doseq [[sess-id _] room-data
:let [q (:receive-q (rs/get-socket @store-conn sess-id))]
:when q]
(receive-queue/enqueue->receive-q q
{:op :refresh-presence
:app-id app-id
:room-id room-id
:data room-data
:edits edits
:session-id sess-id})))))))

(defn handle-broadcast-message
Expand Down Expand Up @@ -408,7 +386,7 @@
(defn refresh-rooms! [store-conn old-v new-v]
(let [old-apps-rooms (get-in old-v [:rooms])
new-apps-rooms (get-in new-v [:rooms])
changed-rooms (get-changed-rooms old-apps-rooms new-apps-rooms)]
changed-rooms (get-changed-rooms old-apps-rooms new-apps-rooms)]
(when (seq changed-rooms)
(tracer/with-span! {:name "refresh-rooms"
:attributes {:room-ids (pr-str (map first changed-rooms))}}
Expand Down Expand Up @@ -470,7 +448,6 @@

(defn stop []
(a/close! room-refresh-ch)
(a/close! refresh-map-ch)
(when-let [q ^LinkedBlockingQueue @hz-ops-q]
(.put q close-sentinel))
(reset! hz-ops-q nil)
Expand All @@ -481,3 +458,9 @@
(defn restart []
(stop)
(start))

(defn before-ns-unload []
(stop))

(defn after-ns-reload []
(start))
6 changes: 4 additions & 2 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,12 @@
:client-event-id client-event-id})))

(defn- handle-refresh-presence!
[store-conn sess-id {:keys [app-id room-id data]}]
[store-conn sess-id {:keys [app-id room-id data edits]}]
(rs/send-event! store-conn app-id sess-id {:op :refresh-presence
:room-id room-id
:data data}))
:data (when (nil? edits)
data)
:edits edits}))

(defn- handle-client-broadcast!
"Broadcasts a client message to other sessions in the room"
Expand Down

0 comments on commit 35632a7

Please sign in to comment.