Skip to content

Commit

Permalink
Calculate diffs after batching, support '-' in edits (dissocIn), clea…
Browse files Browse the repository at this point in the history
…n up session data after disconnect
  • Loading branch information
tonsky committed Dec 26, 2024
1 parent 35632a7 commit e85bd73
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 22 deletions.
7 changes: 4 additions & 3 deletions client/packages/core/src/Reactor.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { buildPresenceSlice, hasPresenceResponseChanged } from "./presence";
import { Deferred } from "./utils/Deferred";
import { PersistedObject } from "./utils/PersistedObject";
import { extractTriples } from "./model/instaqlResult";
import { areObjectsDeepEqual, assocIn } from "./utils/object";
import { areObjectsDeepEqual, assocIn, dissocIn } from "./utils/object";
import { createLinkIndex } from "./utils/linkIndex";
import version from "./version";

Expand Down Expand Up @@ -1687,11 +1687,12 @@ export default class Reactor {
);
sessions[this._sessionId] = {data: this._presence[roomId]?.result?.user};
for (let [path, op, value] of edits) {
console.log('edit', path, op, value);
if (op === '+' || op === 'r') {
sessions = assocIn(sessions, path, value);
}
// TODO op === '-'
if (op === '-') {
sessions = dissocIn(sessions, path);
}
}

this._setPresencePeers(roomId, sessions);
Expand Down
30 changes: 30 additions & 0 deletions client/packages/core/src/utils/object.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,33 @@ export function assocIn(obj, path, value) {
current[path[path.length - 1]] = value;
return obj;
}

export function dissocIn(obj, path) {
if (path.length === 0) {
return undefined;
}

const [key, ...restPath] = path;

if (!(key in obj)) {
return obj;
}

if (restPath.length === 0) {
delete obj[key];
return isEmpty(obj) ? undefined : obj;
}

const child = dissocIn(obj[key], restPath);

if (child === undefined) {
delete obj[key];
return isEmpty(obj) ? undefined : obj;
}

return obj;
}

function isEmpty(obj) {
return obj && Object.keys(obj).length === 0;
}
12 changes: 1 addition & 11 deletions server/src/instant/reactive/ephemeral.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
[clojure.core.async :as a]
[clojure.set :as set]
[datascript.core :refer [squuid-time-millis]]
[editscript.core :as editscript]
[instant.config :as config]
[instant.flags :as flags]
[instant.gauges :as gauges]
Expand Down Expand Up @@ -176,19 +175,11 @@
;; ---------
;; Hazelcast

(def last-sent-state
(atom {}))

(defn handle-event [store-conn ^DataAwareEntryEvent event]
(let [{:keys [app-id room-id] :as room-key} (.getKey event)]
(when (flags/use-hazelcast? app-id)
(when (seq (get-in @room-maps [:rooms room-key :session-ids]))
(let [room-data (.get (get-hz-rooms-map) room-key)
prev-room-data (get @last-sent-state room-key)
edits (when prev-room-data
(editscript/get-edits
(editscript/diff prev-room-data room-data {:algo :a-star :str-diff :none})))]
(swap! last-sent-state assoc room-key room-data)
(let [room-data (.get (get-hz-rooms-map) room-key)]
(doseq [[sess-id _] room-data
:let [q (:receive-q (rs/get-socket @store-conn sess-id))]
:when q]
Expand All @@ -197,7 +188,6 @@
:app-id app-id
:room-id room-id
:data room-data
:edits edits
:session-id sess-id})))))))

(defn handle-broadcast-message
Expand Down
26 changes: 18 additions & 8 deletions server/src/instant/reactive/session.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
commands."
(:require
[clojure.main :refer [root-cause]]
[editscript.core :as editscript]
[instant.db.datalog :as d]
[instant.db.model.attr :as attr-model]
[instant.db.permissioned-transaction :as permissioned-tx]
Expand Down Expand Up @@ -315,13 +316,21 @@
:room-id room-id
:client-event-id client-event-id})))

(defn- handle-refresh-presence!
[store-conn sess-id {:keys [app-id room-id data edits]}]
(rs/send-event! store-conn app-id sess-id {:op :refresh-presence
:room-id room-id
:data (when (nil? edits)
data)
:edits edits}))
; {sess-id -> {room-id -> data}}
(defonce last-sent-presence-data
(atom {}))

(defn- handle-refresh-presence! [store-conn sess-id {:keys [app-id room-id data edits]}]

Check warning on line 323 in server/src/instant/reactive/session.clj

View workflow job for this annotation

GitHub Actions / lint

unused binding edits

Check warning on line 323 in server/src/instant/reactive/session.clj

View workflow job for this annotation

GitHub Actions / lint

unused binding edits
(let [prev-data (get-in @last-sent-presence-data [sess-id room-id])
edits (when prev-data
(editscript/get-edits
(editscript/diff prev-data data {:algo :a-star :str-diff :none})))]
(swap! last-sent-presence-data assoc-in [sess-id room-id] data)
(rs/send-event! store-conn app-id sess-id {:op :refresh-presence
:room-id room-id
:data (when (nil? edits)
data)
:edits edits})))

(defn- handle-client-broadcast!
"Broadcasts a client message to other sessions in the room"
Expand Down Expand Up @@ -659,6 +668,7 @@
:escaping? false})))

(defn on-close [store-conn eph-store-atom {:keys [id pending-handlers]}]
(swap! last-sent-presence-data dissoc id)
(tracer/with-span! {:name "socket/on-close"
:attributes {:session-id id}}
(doseq [{:keys [op
Expand Down Expand Up @@ -713,7 +723,7 @@
;; System

(defn group-fn [{:keys [item] :as _input}]
(let [{:keys [session-id op room-id]} item]
(let [{:keys [session-id op app-id room-id]} item]

Check warning on line 726 in server/src/instant/reactive/session.clj

View workflow job for this annotation

GitHub Actions / lint

unused binding app-id

Check warning on line 726 in server/src/instant/reactive/session.clj

View workflow job for this annotation

GitHub Actions / lint

unused binding app-id
(case op
:transact
[:transact session-id]
Expand Down

0 comments on commit e85bd73

Please sign in to comment.