diff --git a/packages/server/src/ServerLibrary.ts b/packages/server/src/ServerLibrary.ts index f56193df..0b5a6b90 100644 --- a/packages/server/src/ServerLibrary.ts +++ b/packages/server/src/ServerLibrary.ts @@ -25,6 +25,7 @@ import { LibraryInfo, StoredDocumentBaseline, StoredOperation, + StoredReplicaInfo, } from './types.js'; export type ServerLibraryEvents = { @@ -170,13 +171,27 @@ export class ServerLibrary extends EventSubscriber { message.operations, ); - // TODO: can we defer this and rebroadcast in parallel? // insert patches into history - await this.storage.operations.insertAll( + + // first, record the replica's serverOrder before insertion. + const replicaInfo = await this.storage.replicas.getOrCreate( + info.libraryId, + message.replicaId, + { + userId: info.userId, + type: info.type, + }, + ); + const newServerOrder = await this.storage.operations.insertAll( info.libraryId, message.replicaId, message.operations, ); + this.preemptiveUpdateServerOrder( + replicaInfo.replicaInfo, + newServerOrder, + message.operations.length, + ); this.enqueueRebase(info.libraryId); @@ -195,6 +210,35 @@ export class ServerLibrary extends EventSubscriber { this.emit(`changes`, info, message.operations, []); }; + /** + * If a replica inserts operations when it's already + * up to date, we can preemptively update its ackedServerOrder + * to avoid a round trip. + */ + private preemptiveUpdateServerOrder = async ( + replicaInfo: StoredReplicaInfo, + newServerOrder: number, + insertedOperationsCount: number, + ) => { + if ( + newServerOrder - replicaInfo.ackedServerOrder === + insertedOperationsCount + ) { + await this.storage.replicas.updateAckedServerOrder( + replicaInfo.libraryId, + replicaInfo.id, + newServerOrder, + ); + this.log( + 'debug', + 'Preemptively updated ackedServerOrder for', + replicaInfo.id, + 'to', + newServerOrder, + ); + } + }; + private rebroadcastOperations = async ( libraryId: string, clientKey: string, @@ -268,6 +312,10 @@ export class ServerLibrary extends EventSubscriber { await this.storage.operations.getLatestServerOrder(info.libraryId); if (clientReplicaInfo.ackedServerOrder >= latestServerOrder) { overrideTruant = true; + this.log( + 'debug', + 'Overriding truant reset; truant replica has up-to-date server order', + ); } } @@ -303,6 +351,7 @@ export class ServerLibrary extends EventSubscriber { clientReplicaInfo.ackedServerOrder === 0 && ops.length === 0 && baselines.length === 0; + if (isEmptyLibrary) { this.log('info', 'Received sync from new library', replicaId); @@ -316,10 +365,12 @@ export class ServerLibrary extends EventSubscriber { // don't reset replica if library is empty... let overwriteReplicaData = replicaShouldReset && !isEmptyLibrary; + // EARLY RETURN -- // if the local library is empty and the replica is new to us, // but the replica is providing a "since" timestamp, this // suggests the local data is incomplete or gone. we request - // this replica should respond with a full history. + // this replica should respond with a full history. We will retry + // sync after replica has updated us with full history. if (isEmptyLibrary && status === 'new' && message.since !== null) { this.log( 'info', @@ -371,11 +422,16 @@ export class ServerLibrary extends EventSubscriber { message.operations.length, 'operations', ); - await this.storage.operations.insertAll( + const newServerOrder = await this.storage.operations.insertAll( info.libraryId, replicaId, message.operations, ); + this.preemptiveUpdateServerOrder( + clientReplicaInfo, + newServerOrder, + message.operations.length, + ); await this.storage.replicas.updateAcknowledgedLogicalTime( info.libraryId, replicaId, @@ -403,7 +459,13 @@ export class ServerLibrary extends EventSubscriber { } // create the nonce by encoding the server order of the last operation - const ackThisNonce = this.createAckNonce(ops); + const serverOrderToAck = ops.length + ? ops[ops.length - 1].serverOrder + : undefined; + const ackThisNonce = + serverOrderToAck !== undefined + ? this.createAckNonce(serverOrderToAck) + : undefined; // respond to client @@ -435,12 +497,8 @@ export class ServerLibrary extends EventSubscriber { }); }; - private createAckNonce = (ops: StoredOperation[]): string | undefined => { - return ops.length - ? Buffer.from(JSON.stringify(ops[ops.length - 1].serverOrder)).toString( - 'base64', - ) - : undefined; + private createAckNonce = (serverOrder: number): string | undefined => { + return Buffer.from(JSON.stringify(serverOrder)).toString('base64'); }; private handleAck = async ( diff --git a/packages/server/src/storage/Storage.ts b/packages/server/src/storage/Storage.ts index 9d8d8efa..111bdcbf 100644 --- a/packages/server/src/storage/Storage.ts +++ b/packages/server/src/storage/Storage.ts @@ -69,7 +69,7 @@ export interface OperationStorage { libraryId: string, replicaId: string, operations: Operation[], - ): Promise; + ): Promise; deleteAll(libraryId: string): Promise; delete(libraryId: string, operations: Operation[]): Promise; } diff --git a/packages/server/src/storage/sql/SqlOperations.ts b/packages/server/src/storage/sql/SqlOperations.ts index 4f79f4d5..1dd3bcd2 100644 --- a/packages/server/src/storage/sql/SqlOperations.ts +++ b/packages/server/src/storage/sql/SqlOperations.ts @@ -93,10 +93,10 @@ export class SqlOperations implements OperationStorage { libraryId: string, replicaId: string, operations: Operation[], - ): Promise => { + ): Promise => { // inserts all operations and updates server order // FIXME: this whole thing is kinda sus - await this.db.transaction().execute(async (tx): Promise => { + return await this.db.transaction().execute(async (tx): Promise => { let orderResult = await tx .selectFrom('OperationHistory') .select('serverOrder') @@ -106,7 +106,8 @@ export class SqlOperations implements OperationStorage { .executeTakeFirst(); let currentServerOrder = orderResult?.serverOrder ?? 0; for (const item of operations) { - await tx + // utilizing returned serverOrder accommodates for conflicts + const result = await tx .insertInto('OperationHistory') .values({ libraryId, @@ -114,7 +115,7 @@ export class SqlOperations implements OperationStorage { data: JSON.stringify(item.data), timestamp: item.timestamp, replicaId, - serverOrder: ++currentServerOrder, + serverOrder: currentServerOrder + 1, authz: item.authz, }) .onConflict((cb) => @@ -122,8 +123,16 @@ export class SqlOperations implements OperationStorage { .columns(['libraryId', 'replicaId', 'oid', 'timestamp']) .doNothing(), ) - .execute(); + .returning('serverOrder') + .executeTakeFirst(); + if (result) { + currentServerOrder = result.serverOrder; + } else { + // on conflict, nothing is returned. + // this would mean an operation was synced twice + } } + return currentServerOrder; }); }; diff --git a/packages/server/src/storage/sqlShard/SqlOperations.ts b/packages/server/src/storage/sqlShard/SqlOperations.ts index 2bcfea3b..8ff5b16a 100644 --- a/packages/server/src/storage/sqlShard/SqlOperations.ts +++ b/packages/server/src/storage/sqlShard/SqlOperations.ts @@ -91,11 +91,11 @@ export class SqlOperations implements OperationStorage { libraryId: string, replicaId: string, operations: Operation[], - ): Promise => { + ): Promise => { const db = await this.dbs.get(libraryId); // inserts all operations and updates server order // FIXME: this whole thing is kinda sus - await db.transaction().execute(async (tx): Promise => { + return await db.transaction().execute(async (tx): Promise => { let orderResult = await tx .selectFrom('OperationHistory') .select('serverOrder') @@ -104,21 +104,31 @@ export class SqlOperations implements OperationStorage { .executeTakeFirst(); let currentServerOrder = orderResult?.serverOrder ?? 0; for (const item of operations) { - await tx + // utilizing returned serverOrder accommodates for conflicts + const result = await tx .insertInto('OperationHistory') .values({ oid: item.oid, data: JSON.stringify(item.data), timestamp: item.timestamp, replicaId, - serverOrder: ++currentServerOrder, + serverOrder: currentServerOrder + 1, authz: item.authz, }) .onConflict((cb) => cb.columns(['replicaId', 'oid', 'timestamp']).doNothing(), ) - .execute(); + .returning('serverOrder') + .executeTakeFirst(); + if (result) { + currentServerOrder = result.serverOrder; + } else { + // on conflict, nothing is returned. + // this would mean an operation was synced twice. + } } + + return currentServerOrder; }); }; diff --git a/test/tests/push.test.ts b/test/tests/push.test.ts index 66f9577c..df809675 100644 --- a/test/tests/push.test.ts +++ b/test/tests/push.test.ts @@ -3,7 +3,6 @@ import { createTestContext } from '../lib/createTestContext.js'; import { ReplicaType } from '@verdant-web/server'; import { waitForCondition, - waitForOnline, waitForQueryResult, waitForSync, } from '../lib/waits.js'; @@ -11,8 +10,8 @@ import { assert } from '@a-type/utils'; const context = createTestContext({ // serverLog: true, - // keepDb: true, // testLog: true, + // keepDb: true, }); it("doesn't receive back its own ops after pushing them", async () => { @@ -80,16 +79,9 @@ it("doesn't receive back its own ops after pushing them", async () => { assert(!!log); expect(log[0].includes('\\"operations\\": []')).toBe(true); - // expect(log[0].includes('"baselines": []')).toBe(true); logWatcher.mockClear(); } - // const log = logWatcher.mock.calls.find((args) => - // args[0].includes('sync-resp'), - // ); - // assert(!!log); - // should not receive info about Apples, which we created - // expect(!log[0].includes('Apples')).toBe(true); logWatcher.mockClear(); await client.items.put({ @@ -112,7 +104,7 @@ it("doesn't receive back its own ops after pushing them", async () => { client.sync.start(); - await waitForOnline(client); + await waitForSync(client); context.log('Client 1 online again'); context.log('Begin wait for no operations 2'); @@ -120,7 +112,7 @@ it("doesn't receive back its own ops after pushing them", async () => { context.log('End wait for no operations 2'); clientB.sync.start(); - await waitForOnline(clientB); + await waitForSync(clientB); // this seems to time out sometimes? await waitForQueryResult(client.items.get(pears.get('id'))); diff --git a/test/tests/reset.test.ts b/test/tests/reset.test.ts index d18d1722..381c9daf 100644 --- a/test/tests/reset.test.ts +++ b/test/tests/reset.test.ts @@ -15,12 +15,14 @@ import { createMigration } from '@verdant-web/common'; const ctx = createTestContext({ // testLog: true, + // serverLog: true, }); async function connectAndSeedData(library = 'reset-1') { const clientA = await ctx.createTestClient({ library, user: 'User A', + // logId: 'A', }); const clientB = await ctx.createTestClient({ library, @@ -119,7 +121,7 @@ it('can re-initialize from replica after resetting server-side while replicas ar const pearId = b_pear.get('id'); clientA.sync.start(); - await waitForOnline(clientA); + await waitForSync(clientA); ctx.log('Client A online'); // client A should now "win" and re-initialize server data @@ -128,6 +130,7 @@ it('can re-initialize from replica after resetting server-side while replicas ar clientB.sync.start(); ctx.log('Waiting for client B to re-initialize'); + await waitForSync(clientB); await waitForQueryResult(clientB.items.get(a_unknownItem.get('id'))); await waitForQueryResult(clientB.items.get(a_banana.get('id'))); diff --git a/test/tests/shardTransition.test.ts b/test/tests/shardTransition.test.ts index 28c5190b..b9134c88 100644 --- a/test/tests/shardTransition.test.ts +++ b/test/tests/shardTransition.test.ts @@ -95,7 +95,7 @@ it('migrates data from unified to sharded databases on launch', async () => { replicas: [ { ackedLogicalTime: expect.any(String), - ackedServerOrder: 0, + ackedServerOrder: 6, id: expect.any(String), profile: { id: 'A', @@ -114,7 +114,7 @@ it('migrates data from unified to sharded databases on launch', async () => { replicas: [ { ackedLogicalTime: expect.any(String), - ackedServerOrder: 0, + ackedServerOrder: 6, id: expect.any(String), profile: { id: 'B', diff --git a/test/tests/truancy.test.ts b/test/tests/truancy.test.ts index 4e2b2e0c..02f180ca 100644 --- a/test/tests/truancy.test.ts +++ b/test/tests/truancy.test.ts @@ -4,6 +4,7 @@ import { waitForCondition, waitForQueryResult, waitForSync, + waitForTime, } from '../lib/waits.js'; import { assert } from '@verdant-web/common'; import { getPersistence } from '../lib/persistence.js'; @@ -18,14 +19,14 @@ const ctx = createTestContext({ }); it('should reset truant replicas upon their reconnection', async () => { + const library = 'truant'; const persistence = getPersistence(); const startTime = Date.now(); vi.setSystemTime(startTime); const truantClient = await ctx.createTestClient({ - library: 'truant', - user: 'truant', - // logId: 'A', + library, + user: 'truant-1', persistence, }); @@ -66,8 +67,8 @@ it('should reset truant replicas upon their reconnection', async () => { ctx.log('system time set to', startTime + 5 * 60 * 1000); const currentClient = await ctx.createTestClient({ - library: 'truant', - user: 'current', + library, + user: 'current-1', persistence, // logId: 'current', }); @@ -109,14 +110,15 @@ it('should reset truant replicas upon their reconnection', async () => { }); it('should not reset truant replicas with up to date server order', async () => { + const library = 'truant-up-to-date'; const persistence = getPersistence(); const startTime = Date.now(); vi.setSystemTime(startTime); const truantClient = await ctx.createTestClient({ - library: 'truant-up-to-date', - user: 'truant', - // logId: 'A', + library, + user: 'truant-2', + // logId: 'truant', persistence, }); @@ -157,8 +159,8 @@ it('should not reset truant replicas with up to date server order', async () => ctx.log('system time set to', startTime + 5 * 60 * 1000); const currentClient = await ctx.createTestClient({ - library: 'truant-up-to-date', - user: 'current', + library, + user: 'current-2', persistence, // logId: 'current', }); @@ -177,10 +179,21 @@ it('should not reset truant replicas with up to date server order', async () => const onReset = vi.fn(); truantClient.subscribe('resetToServer', onReset); + + // push some preemptive changes before sync, to simulate the truant client immediately + // making changes on launch + const item3 = await truantClient.items.put({ + content: 'item 3', + }); + item3.set('content', 'item 3 updated'); + await truantClient.entities.flushAllBatches(); + await truantClient.sync.start(); await waitForSync(truantClient); - await waitForQueryResult(truantClient.items.findAll(), (r) => r.length === 2); + await waitForQueryResult(truantClient.items.findAll(), (r) => r.length === 3); + + await waitForTime(300); // this would indicate the replica was found truant and reset. expect(onReset).not.toHaveBeenCalled(); });