From 7cc65f5397f67e750f20476a27aac292e1fd55fe Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Thu, 7 Mar 2024 21:03:20 -0300 Subject: [PATCH] Support/text (#1) * Support text patch Signed-off-by: Marcos Candeia * Allow set to null Signed-off-by: Marcos Candeia --------- Signed-off-by: Marcos Candeia --- src/bit.ts | 32 +++++++ src/index.ts | 5 +- src/realtime.ts | 220 ++++++++++++++++++++++++++++++++++++++---------- test.ts | 95 +++++++++++++++++++++ wrangler.toml | 8 ++ 5 files changed, 314 insertions(+), 46 deletions(-) create mode 100644 src/bit.ts diff --git a/src/bit.ts b/src/bit.ts new file mode 100644 index 0000000..88dff8d --- /dev/null +++ b/src/bit.ts @@ -0,0 +1,32 @@ +export class BinaryIndexedTree { + bit: number[]; + + constructor(size: number) { + this.bit = new Array(size + 1).fill(0); + } + + // Updates the value at index i by adding delta to it + update(idx: number, delta: number): void { + idx++; // Convert 0-based indexing to 1-based indexing + while (idx < this.bit.length) { + this.bit[idx] += delta; + idx += idx & -idx; // Move to next index + } + } + + // Returns the sum of values in the range [0, i] + query(r: number): number { + r++; // Convert 0-based indexing to 1-based indexing + let ret = 0; + while (r > 0) { + ret += this.bit[r]; + r -= r & -r; // Move to parent index + } + return ret; + } + + // Returns the sum of values in the range [left, right] + rangeQuery(left: number, right: number): number { + return this.query(right) - this.query(left - 1); + } +} diff --git a/src/index.ts b/src/index.ts index 18e5cd1..b395dd3 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,10 +1,11 @@ +import { getObjectFor } from "./realtime.ts"; import { createRouter } from "./router.ts"; import { wellKnownJWKSHandler } from "./security/identity.ts"; import { setFromString } from "./security/keys.ts"; -import { getObjectFor } from "./realtime.ts"; export interface Env { REALTIME: DurableObjectNamespace; + EPHEMERAL_REALTIME: DurableObjectNamespace; WORKER_PUBLIC_KEY: string; WORKER_PRIVATE_KEY: string; } @@ -30,4 +31,4 @@ export default { }, }; -export { Realtime } from "./realtime.ts"; +export { EphemeralRealtime, Realtime } from "./realtime.ts"; diff --git a/src/realtime.ts b/src/realtime.ts index ef57832..acef7ea 100644 --- a/src/realtime.ts +++ b/src/realtime.ts @@ -1,19 +1,63 @@ import { applyReducer, type Operation } from "fast-json-patch"; +import { BinaryIndexedTree } from "./bit.ts"; import { type Env } from "./index.ts"; import { createRouter, Router, Routes } from "./router.ts"; -export const getObjectFor = (volume: string, ctx: { env: Env }) => - ctx.env.REALTIME.get( - ctx.env.REALTIME.idFromName(volume), +export const getObjectFor = (volume: string, ctx: { env: Env }) => { + const object = volume.startsWith("ephemeral:") + ? ctx.env.EPHEMERAL_REALTIME + : ctx.env.REALTIME; + return object.get( + object.idFromName(volume), ) as unknown as Realtime; +}; const encoder = new TextEncoder(); const decoder = new TextDecoder(); -export type FilePatch = JSONFilePatch; - -export interface JSONFilePatch { +export interface BaseFilePatch { path: string; +} +export type TextFilePatchOperation = InsertAtOperation | DeleteAtOperation; + +export interface TextFilePatch extends BaseFilePatch { + operations: TextFilePatchOperation[]; + timestamp: number; +} + +export interface TextFileSet extends BaseFilePatch { + content: string | null; +} + +export interface TextFielPatchOperationBase { + at: number; +} + +export interface InsertAtOperation extends TextFielPatchOperationBase { + text: string; +} + +export interface DeleteAtOperation extends TextFielPatchOperationBase { + length: number; +} + +const isDeleteOperation = ( + op: TextFilePatchOperation, +): op is DeleteAtOperation => { + return (op as DeleteAtOperation).length !== undefined; +}; + +export type FilePatch = JSONFilePatch | TextFilePatch | TextFileSet; + +export const isJSONFilePatch = (patch: FilePatch): patch is JSONFilePatch => { + return (patch as JSONFilePatch).patches !== undefined; +}; + +export const isTextFileSet = (patch: FilePatch): patch is TextFileSet => { + return (patch as TextFileSet).content !== undefined; +}; + +export interface JSONFilePatch extends BaseFilePatch { patches: Operation[]; } @@ -231,6 +275,7 @@ export interface VolumeListResponse { } export class Realtime implements DurableObject { + textState: Map; state: DurableObjectState; sessions: Array<{ socket: WebSocket }> = []; fs: MFFS; @@ -238,31 +283,11 @@ export class Realtime implements DurableObject { router: Router; timestamp: number; - constructor(state: DurableObjectState) { + constructor(state: DurableObjectState, ephemeral = false) { + this.textState = new Map(); this.state = state; - this.timestamp = Date.now(); - const durableFS = createDurableFS(state); - const memFS = createMemFS(); - - this.fs = tieredFS(memFS, durableFS); - - // init memFS with durable content - this.state.blockConcurrencyWhile(async () => { - const paths = await durableFS.readdir("/"); - - for (const path of paths) { - const content = await durableFS.readFile(path); - - if (!content) { - continue; - } - - await memFS.writeFile(path, content); - } - }); - const routes: Routes = { "/volumes/:id/files/*": { GET: async (req, { params }) => { @@ -329,27 +354,107 @@ export class Realtime implements DurableObject { const results: FilePatchResult[] = []; for (const patch of patches) { - const { path, patches: operations } = patch; - const content = - await this.fs.readFile(path).catch(ignore("ENOENT")) ?? "{}"; + if (isJSONFilePatch(patch)) { + const { path, patches: operations } = patch; + const content = + await this.fs.readFile(path).catch(ignore("ENOENT")) ?? "{}"; + + try { + const newContent = JSON.stringify( + operations.reduce(applyReducer, JSON.parse(content)), + ); + + results.push({ + accepted: true, + path, + content: newContent, + deleted: newContent === "null", + }); + } catch (error) { + results.push({ accepted: false, path, content }); + } + } else if (isTextFileSet(patch)) { + const { path, content } = patch; + try { + if (!content) { + await this.fs.writeFile(path, ""); + } else { + await this.fs.writeFile(path, content); + } + results.push({ accepted: true, path, content: content ?? "" }); + } catch (error) { + results.push({ accepted: false, path, content: content ?? "" }); + } + } else { + const { path, operations, timestamp } = patch; + const content = + await this.fs.readFile(path).catch(ignore("ENOENT")) ?? ""; + if (!this.textState.has(timestamp)) { // durable was restarted + results.push({ accepted: false, path, content }); + continue; + } + const rollbacks: Array<() => void> = []; + const bit = this.textState.get(timestamp) ?? + new BinaryIndexedTree(2 ** 8); + const [result, success] = operations.reduce( + ([txt, success], op) => { + if (!success) { + return [txt, success]; + } + if (isDeleteOperation(op)) { + const { at, length } = op; + const offset = bit.rangeQuery(0, at) + at; + if (offset < 0) { + return [txt, false]; + } + const before = txt.slice(0, offset); + const after = txt.slice(offset + length); + + // Update BIT for deletion operation + bit.update(at, -length); // Subtract length from the index + rollbacks.push(() => { + bit.update(at, length); + }); + return [`${before}${after}`, true]; + } + const { at, text } = op; + const offset = bit.rangeQuery(0, at) + at; + if (offset < 0) { + return [txt, false]; + } - try { - const newContent = JSON.stringify( - operations.reduce(applyReducer, JSON.parse(content)), + const before = txt.slice(0, offset); + const after = txt.slice(offset); // Use offset instead of at + + // Update BIT for insertion operation + bit.update(at, text.length); // Add length of text at the index + rollbacks.push(() => { + bit.update(at, -text.length); + }); + return [`${before}${text}${after}`, true]; + }, + [content, true], ); - - results.push({ - accepted: true, - path, - content: newContent, - deleted: newContent === "null", - }); - } catch (error) { - results.push({ accepted: false, path, content }); + if (success) { + this.textState.set(timestamp, bit); + results.push({ + accepted: true, + path, + content: result, + }); + } else { + rollbacks.map((rollback) => rollback()); + results.push({ + accepted: false, + path, + content, + }); + } } } this.timestamp = Date.now(); + this.textState.set(this.timestamp, new BinaryIndexedTree(2 ** 8)); const shouldWrite = results.every((r) => r.accepted); if (shouldWrite) { @@ -361,7 +466,6 @@ export class Realtime implements DurableObject { } else { await this.fs.writeFile(r.path, r.content!); } - this.fs; } catch (error) { console.error(error); r.accepted = false; @@ -391,6 +495,28 @@ export class Realtime implements DurableObject { "/volumes/:id": {}, }; this.router = createRouter(routes); + const memFS = createMemFS(); + + if (ephemeral) { + this.fs = memFS; + return; + } + const durableFS = createDurableFS(state); + this.fs = tieredFS(memFS, durableFS); + // init memFS with durable content + this.state.blockConcurrencyWhile(async () => { + const paths = await durableFS.readdir("/"); + + for (const path of paths) { + const content = await durableFS.readFile(path); + + if (!content) { + continue; + } + + await memFS.writeFile(path, content); + } + }); } broadcast(msg: ServerEvent) { @@ -403,3 +529,9 @@ export class Realtime implements DurableObject { return this.router(request); } } + +export class EphemeralRealtime extends Realtime { + constructor(state: DurableObjectState) { + super(state, true); + } +} diff --git a/test.ts b/test.ts index b9a1282..73d61f6 100644 --- a/test.ts +++ b/test.ts @@ -105,6 +105,10 @@ const tests = { path: "/pdp.json", patches: jp.compare({}, { "title": "pdp" }), }, + { + path: "/sections/ProductShelf.tsx", + content: `BC`, + }, ], }); @@ -121,6 +125,10 @@ const tests = { vlr.fs["/pdp.json"]?.content, JSON.stringify({ "title": "pdp" }), ); + assertEquals( + vlr.fs["/sections/ProductShelf.tsx"]?.content, + "BC", + ); }, "Should not return value": async () => { const vlr = await realtime.list({ path: "/" }); @@ -133,6 +141,10 @@ const tests = { vlr.fs["/pdp.json"]?.content, null, ); + assertEquals( + vlr.fs["/sections/ProductShelf.tsx"]?.content, + null, + ); }, "should return specific listing value": async () => { const vlr = await realtime.list({ path: "/home.json" }); @@ -140,6 +152,89 @@ const tests = { assertAll(vlr.fs["/home.json"]); assertEquals(vlr.fs["/pdp.json"], undefined); }, + "should accept text patch": async () => { + const shelf = "/sections/ProductShelf.tsx"; + const vlr = await realtime.list({ path: shelf, content: true }); + assertEquals(vlr.fs[shelf]?.content, "BC"); + const { results } = await realtime.patch({ + patches: [ + { + path: shelf, + operations: [{ + text: "A", + at: 0, + }], + timestamp: vlr.timestamp, + }, + ], + }); + const snapshot = JSON.stringify([{ + accepted: true, + path: shelf, + }]); + assertEquals(JSON.stringify(results), snapshot); + + const vlrUpdated = await realtime.list({ path: shelf, content: true }); + assertEquals(vlrUpdated.fs[shelf]?.content, "ABC"); + }, + "should accept multiple text patch": async () => { + const shelf = "/sections/ProductShelf.tsx"; + const vlr = await realtime.list({ path: shelf, content: true }); + assertEquals(vlr.fs[shelf]?.content, "ABC"); + const { results } = await realtime.patch({ + patches: [ + { + path: shelf, + operations: [{ + text: "!", + at: 0, + }, { + text: "Z", + at: 0, + }], + timestamp: vlr.timestamp, + }, + ], + }); + const snapshot = JSON.stringify([{ + accepted: true, + path: shelf, + }]); + assertEquals(JSON.stringify(results), snapshot); + + const vlrUpdated = await realtime.list({ path: shelf, content: true }); + assertEquals(vlrUpdated.fs[shelf]?.content, "!ZABC"); + + const { results: resultsWithOldTimestamp } = await realtime.patch({ + patches: [ + { + path: shelf, + operations: [{ + text: "!", + at: 3, + }, { + length: 1, + at: 2, + }], + timestamp: vlr.timestamp, // from an old timestamp insert ! at the end, AB! as result + }, + ], + }); + const snapShotWithOldTimestamp = JSON.stringify([{ + accepted: true, + path: shelf, + }]); + assertEquals( + JSON.stringify(resultsWithOldTimestamp), + snapShotWithOldTimestamp, + ); + + const vlrWithOldTimestamp = await realtime.list({ + path: shelf, + content: true, + }); + assertEquals(vlrWithOldTimestamp.fs[shelf]?.content, "!ZAB!"); + }, "should not accept patch because of conflicts": async () => { const { results } = await realtime.patch({ patches: [ diff --git a/wrangler.toml b/wrangler.toml index d5cee4d..d4aeca8 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -11,6 +11,10 @@ local_protocol = "http" name = "REALTIME" class_name = "Realtime" +[[durable_objects.bindings]] +name = "EPHEMERAL_REALTIME" +class_name = "EphemeralRealtime" + [[rules]] type = "ESModule" globs = ["**/*.ts"] @@ -18,3 +22,7 @@ globs = ["**/*.ts"] [[migrations]] tag = "v1" # Should be unique for each entry new_classes = ["Realtime"] + +[[migrations]] +tag = "v2" +new_classes = ["EphemeralRealtime"] \ No newline at end of file