Skip to content

Commit

Permalink
Support/text (#1)
Browse files Browse the repository at this point in the history
* Support text patch

Signed-off-by: Marcos Candeia <[email protected]>

* Allow set to null

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Mar 8, 2024
1 parent 139f0d0 commit 7cc65f5
Show file tree
Hide file tree
Showing 5 changed files with 314 additions and 46 deletions.
32 changes: 32 additions & 0 deletions src/bit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
export class BinaryIndexedTree {
bit: number[];

constructor(size: number) {
this.bit = new Array(size + 1).fill(0);
}

// Updates the value at index i by adding delta to it
update(idx: number, delta: number): void {
idx++; // Convert 0-based indexing to 1-based indexing
while (idx < this.bit.length) {
this.bit[idx] += delta;
idx += idx & -idx; // Move to next index
}
}

// Returns the sum of values in the range [0, i]
query(r: number): number {
r++; // Convert 0-based indexing to 1-based indexing
let ret = 0;
while (r > 0) {
ret += this.bit[r];
r -= r & -r; // Move to parent index
}
return ret;
}

// Returns the sum of values in the range [left, right]
rangeQuery(left: number, right: number): number {
return this.query(right) - this.query(left - 1);
}
}
5 changes: 3 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { getObjectFor } from "./realtime.ts";
import { createRouter } from "./router.ts";
import { wellKnownJWKSHandler } from "./security/identity.ts";
import { setFromString } from "./security/keys.ts";
import { getObjectFor } from "./realtime.ts";

export interface Env {
REALTIME: DurableObjectNamespace;
EPHEMERAL_REALTIME: DurableObjectNamespace;
WORKER_PUBLIC_KEY: string;
WORKER_PRIVATE_KEY: string;
}
Expand All @@ -30,4 +31,4 @@ export default {
},
};

export { Realtime } from "./realtime.ts";
export { EphemeralRealtime, Realtime } from "./realtime.ts";
220 changes: 176 additions & 44 deletions src/realtime.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,63 @@
import { applyReducer, type Operation } from "fast-json-patch";
import { BinaryIndexedTree } from "./bit.ts";
import { type Env } from "./index.ts";
import { createRouter, Router, Routes } from "./router.ts";

export const getObjectFor = (volume: string, ctx: { env: Env }) =>
ctx.env.REALTIME.get(
ctx.env.REALTIME.idFromName(volume),
export const getObjectFor = (volume: string, ctx: { env: Env }) => {
const object = volume.startsWith("ephemeral:")
? ctx.env.EPHEMERAL_REALTIME
: ctx.env.REALTIME;
return object.get(
object.idFromName(volume),
) as unknown as Realtime;
};

const encoder = new TextEncoder();
const decoder = new TextDecoder();

export type FilePatch = JSONFilePatch;

export interface JSONFilePatch {
export interface BaseFilePatch {
path: string;
}
export type TextFilePatchOperation = InsertAtOperation | DeleteAtOperation;

export interface TextFilePatch extends BaseFilePatch {
operations: TextFilePatchOperation[];
timestamp: number;
}

export interface TextFileSet extends BaseFilePatch {
content: string | null;
}

export interface TextFielPatchOperationBase {
at: number;
}

export interface InsertAtOperation extends TextFielPatchOperationBase {
text: string;
}

export interface DeleteAtOperation extends TextFielPatchOperationBase {
length: number;
}

const isDeleteOperation = (
op: TextFilePatchOperation,
): op is DeleteAtOperation => {
return (op as DeleteAtOperation).length !== undefined;
};

export type FilePatch = JSONFilePatch | TextFilePatch | TextFileSet;

export const isJSONFilePatch = (patch: FilePatch): patch is JSONFilePatch => {
return (patch as JSONFilePatch).patches !== undefined;
};

export const isTextFileSet = (patch: FilePatch): patch is TextFileSet => {
return (patch as TextFileSet).content !== undefined;
};

export interface JSONFilePatch extends BaseFilePatch {
patches: Operation[];
}

Expand Down Expand Up @@ -231,38 +275,19 @@ export interface VolumeListResponse {
}

export class Realtime implements DurableObject {
textState: Map<number, BinaryIndexedTree>;
state: DurableObjectState;
sessions: Array<{ socket: WebSocket }> = [];
fs: MFFS;

router: Router;
timestamp: number;

constructor(state: DurableObjectState) {
constructor(state: DurableObjectState, ephemeral = false) {
this.textState = new Map();
this.state = state;

this.timestamp = Date.now();

const durableFS = createDurableFS(state);
const memFS = createMemFS();

this.fs = tieredFS(memFS, durableFS);

// init memFS with durable content
this.state.blockConcurrencyWhile(async () => {
const paths = await durableFS.readdir("/");

for (const path of paths) {
const content = await durableFS.readFile(path);

if (!content) {
continue;
}

await memFS.writeFile(path, content);
}
});

const routes: Routes = {
"/volumes/:id/files/*": {
GET: async (req, { params }) => {
Expand Down Expand Up @@ -329,27 +354,107 @@ export class Realtime implements DurableObject {
const results: FilePatchResult[] = [];

for (const patch of patches) {
const { path, patches: operations } = patch;
const content =
await this.fs.readFile(path).catch(ignore("ENOENT")) ?? "{}";
if (isJSONFilePatch(patch)) {
const { path, patches: operations } = patch;
const content =
await this.fs.readFile(path).catch(ignore("ENOENT")) ?? "{}";

try {
const newContent = JSON.stringify(
operations.reduce(applyReducer, JSON.parse(content)),
);

results.push({
accepted: true,
path,
content: newContent,
deleted: newContent === "null",
});
} catch (error) {
results.push({ accepted: false, path, content });
}
} else if (isTextFileSet(patch)) {
const { path, content } = patch;
try {
if (!content) {
await this.fs.writeFile(path, "");
} else {
await this.fs.writeFile(path, content);
}
results.push({ accepted: true, path, content: content ?? "" });
} catch (error) {
results.push({ accepted: false, path, content: content ?? "" });
}
} else {
const { path, operations, timestamp } = patch;
const content =
await this.fs.readFile(path).catch(ignore("ENOENT")) ?? "";
if (!this.textState.has(timestamp)) { // durable was restarted
results.push({ accepted: false, path, content });
continue;
}
const rollbacks: Array<() => void> = [];
const bit = this.textState.get(timestamp) ??
new BinaryIndexedTree(2 ** 8);
const [result, success] = operations.reduce(
([txt, success], op) => {
if (!success) {
return [txt, success];
}
if (isDeleteOperation(op)) {
const { at, length } = op;
const offset = bit.rangeQuery(0, at) + at;
if (offset < 0) {
return [txt, false];
}
const before = txt.slice(0, offset);
const after = txt.slice(offset + length);

// Update BIT for deletion operation
bit.update(at, -length); // Subtract length from the index
rollbacks.push(() => {
bit.update(at, length);
});
return [`${before}${after}`, true];
}
const { at, text } = op;
const offset = bit.rangeQuery(0, at) + at;
if (offset < 0) {
return [txt, false];
}

try {
const newContent = JSON.stringify(
operations.reduce(applyReducer, JSON.parse(content)),
const before = txt.slice(0, offset);
const after = txt.slice(offset); // Use offset instead of at

// Update BIT for insertion operation
bit.update(at, text.length); // Add length of text at the index
rollbacks.push(() => {
bit.update(at, -text.length);
});
return [`${before}${text}${after}`, true];
},
[content, true],
);

results.push({
accepted: true,
path,
content: newContent,
deleted: newContent === "null",
});
} catch (error) {
results.push({ accepted: false, path, content });
if (success) {
this.textState.set(timestamp, bit);
results.push({
accepted: true,
path,
content: result,
});
} else {
rollbacks.map((rollback) => rollback());
results.push({
accepted: false,
path,
content,
});
}
}
}

this.timestamp = Date.now();
this.textState.set(this.timestamp, new BinaryIndexedTree(2 ** 8));
const shouldWrite = results.every((r) => r.accepted);

if (shouldWrite) {
Expand All @@ -361,7 +466,6 @@ export class Realtime implements DurableObject {
} else {
await this.fs.writeFile(r.path, r.content!);
}
this.fs;
} catch (error) {
console.error(error);
r.accepted = false;
Expand Down Expand Up @@ -391,6 +495,28 @@ export class Realtime implements DurableObject {
"/volumes/:id": {},
};
this.router = createRouter(routes);
const memFS = createMemFS();

if (ephemeral) {
this.fs = memFS;
return;
}
const durableFS = createDurableFS(state);
this.fs = tieredFS(memFS, durableFS);
// init memFS with durable content
this.state.blockConcurrencyWhile(async () => {
const paths = await durableFS.readdir("/");

for (const path of paths) {
const content = await durableFS.readFile(path);

if (!content) {
continue;
}

await memFS.writeFile(path, content);
}
});
}

broadcast(msg: ServerEvent) {
Expand All @@ -403,3 +529,9 @@ export class Realtime implements DurableObject {
return this.router(request);
}
}

export class EphemeralRealtime extends Realtime {
constructor(state: DurableObjectState) {
super(state, true);
}
}
Loading

0 comments on commit 7cc65f5

Please sign in to comment.