Skip to content

Commit

Permalink
fix tests for truant resets, then fix behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
a-type committed Nov 11, 2024
1 parent 046bff1 commit 52b3eef
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 47 deletions.
80 changes: 69 additions & 11 deletions packages/server/src/ServerLibrary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
LibraryInfo,
StoredDocumentBaseline,
StoredOperation,
StoredReplicaInfo,
} from './types.js';

export type ServerLibraryEvents = {
Expand Down Expand Up @@ -170,13 +171,27 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
message.operations,
);

// TODO: can we defer this and rebroadcast in parallel?
// insert patches into history
await this.storage.operations.insertAll(

// first, record the replica's serverOrder before insertion.
const replicaInfo = await this.storage.replicas.getOrCreate(
info.libraryId,
message.replicaId,
{
userId: info.userId,
type: info.type,
},
);
const newServerOrder = await this.storage.operations.insertAll(
info.libraryId,
message.replicaId,
message.operations,
);
this.preemptiveUpdateServerOrder(
replicaInfo.replicaInfo,
newServerOrder,
message.operations.length,
);

this.enqueueRebase(info.libraryId);

Expand All @@ -195,6 +210,35 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
this.emit(`changes`, info, message.operations, []);
};

/**
* If a replica inserts operations when it's already
* up to date, we can preemptively update its ackedServerOrder
* to avoid a round trip.
*/
private preemptiveUpdateServerOrder = async (
replicaInfo: StoredReplicaInfo,
newServerOrder: number,
insertedOperationsCount: number,
) => {
if (
newServerOrder - replicaInfo.ackedServerOrder ===
insertedOperationsCount
) {
await this.storage.replicas.updateAckedServerOrder(
replicaInfo.libraryId,
replicaInfo.id,
newServerOrder,
);
this.log(
'debug',
'Preemptively updated ackedServerOrder for',
replicaInfo.id,
'to',
newServerOrder,
);
}
};

private rebroadcastOperations = async (
libraryId: string,
clientKey: string,
Expand Down Expand Up @@ -268,6 +312,10 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
await this.storage.operations.getLatestServerOrder(info.libraryId);
if (clientReplicaInfo.ackedServerOrder >= latestServerOrder) {
overrideTruant = true;
this.log(
'debug',
'Overriding truant reset; truant replica has up-to-date server order',
);
}
}

Expand Down Expand Up @@ -303,6 +351,7 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
clientReplicaInfo.ackedServerOrder === 0 &&
ops.length === 0 &&
baselines.length === 0;

if (isEmptyLibrary) {
this.log('info', 'Received sync from new library', replicaId);

Expand All @@ -316,10 +365,12 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
// don't reset replica if library is empty...
let overwriteReplicaData = replicaShouldReset && !isEmptyLibrary;

// EARLY RETURN --
// if the local library is empty and the replica is new to us,
// but the replica is providing a "since" timestamp, this
// suggests the local data is incomplete or gone. we request
// this replica should respond with a full history.
// this replica should respond with a full history. We will retry
// sync after replica has updated us with full history.
if (isEmptyLibrary && status === 'new' && message.since !== null) {
this.log(
'info',
Expand Down Expand Up @@ -371,11 +422,16 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
message.operations.length,
'operations',
);
await this.storage.operations.insertAll(
const newServerOrder = await this.storage.operations.insertAll(
info.libraryId,
replicaId,
message.operations,
);
this.preemptiveUpdateServerOrder(
clientReplicaInfo,
newServerOrder,
message.operations.length,
);
await this.storage.replicas.updateAcknowledgedLogicalTime(
info.libraryId,
replicaId,
Expand Down Expand Up @@ -403,7 +459,13 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
}

// create the nonce by encoding the server order of the last operation
const ackThisNonce = this.createAckNonce(ops);
const serverOrderToAck = ops.length
? ops[ops.length - 1].serverOrder
: undefined;
const ackThisNonce =
serverOrderToAck !== undefined
? this.createAckNonce(serverOrderToAck)
: undefined;

// respond to client

Expand Down Expand Up @@ -435,12 +497,8 @@ export class ServerLibrary extends EventSubscriber<ServerLibraryEvents> {
});
};

private createAckNonce = (ops: StoredOperation[]): string | undefined => {
return ops.length
? Buffer.from(JSON.stringify(ops[ops.length - 1].serverOrder)).toString(
'base64',
)
: undefined;
private createAckNonce = (serverOrder: number): string | undefined => {
return Buffer.from(JSON.stringify(serverOrder)).toString('base64');
};

private handleAck = async (
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/storage/Storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ export interface OperationStorage {
libraryId: string,
replicaId: string,
operations: Operation[],
): Promise<void>;
): Promise<number>;
deleteAll(libraryId: string): Promise<void>;
delete(libraryId: string, operations: Operation[]): Promise<void>;
}
Expand Down
19 changes: 14 additions & 5 deletions packages/server/src/storage/sql/SqlOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ export class SqlOperations implements OperationStorage {
libraryId: string,
replicaId: string,
operations: Operation[],
): Promise<void> => {
): Promise<number> => {
// inserts all operations and updates server order
// FIXME: this whole thing is kinda sus
await this.db.transaction().execute(async (tx): Promise<void> => {
return await this.db.transaction().execute(async (tx): Promise<number> => {
let orderResult = await tx
.selectFrom('OperationHistory')
.select('serverOrder')
Expand All @@ -106,24 +106,33 @@ export class SqlOperations implements OperationStorage {
.executeTakeFirst();
let currentServerOrder = orderResult?.serverOrder ?? 0;
for (const item of operations) {
await tx
// utilizing returned serverOrder accommodates for conflicts
const result = await tx
.insertInto('OperationHistory')
.values({
libraryId,
oid: item.oid,
data: JSON.stringify(item.data),
timestamp: item.timestamp,
replicaId,
serverOrder: ++currentServerOrder,
serverOrder: currentServerOrder + 1,
authz: item.authz,
})
.onConflict((cb) =>
cb
.columns(['libraryId', 'replicaId', 'oid', 'timestamp'])
.doNothing(),
)
.execute();
.returning('serverOrder')
.executeTakeFirst();
if (result) {
currentServerOrder = result.serverOrder;
} else {
// on conflict, nothing is returned.
// this would mean an operation was synced twice
}
}
return currentServerOrder;
});
};

Expand Down
20 changes: 15 additions & 5 deletions packages/server/src/storage/sqlShard/SqlOperations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ export class SqlOperations implements OperationStorage {
libraryId: string,
replicaId: string,
operations: Operation[],
): Promise<void> => {
): Promise<number> => {
const db = await this.dbs.get(libraryId);
// inserts all operations and updates server order
// FIXME: this whole thing is kinda sus
await db.transaction().execute(async (tx): Promise<void> => {
return await db.transaction().execute(async (tx): Promise<number> => {
let orderResult = await tx
.selectFrom('OperationHistory')
.select('serverOrder')
Expand All @@ -104,21 +104,31 @@ export class SqlOperations implements OperationStorage {
.executeTakeFirst();
let currentServerOrder = orderResult?.serverOrder ?? 0;
for (const item of operations) {
await tx
// utilizing returned serverOrder accommodates for conflicts
const result = await tx
.insertInto('OperationHistory')
.values({
oid: item.oid,
data: JSON.stringify(item.data),
timestamp: item.timestamp,
replicaId,
serverOrder: ++currentServerOrder,
serverOrder: currentServerOrder + 1,
authz: item.authz,
})
.onConflict((cb) =>
cb.columns(['replicaId', 'oid', 'timestamp']).doNothing(),
)
.execute();
.returning('serverOrder')
.executeTakeFirst();
if (result) {
currentServerOrder = result.serverOrder;
} else {
// on conflict, nothing is returned.
// this would mean an operation was synced twice.
}
}

return currentServerOrder;
});
};

Expand Down
14 changes: 3 additions & 11 deletions test/tests/push.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,15 @@ import { createTestContext } from '../lib/createTestContext.js';
import { ReplicaType } from '@verdant-web/server';
import {
waitForCondition,
waitForOnline,
waitForQueryResult,
waitForSync,
} from '../lib/waits.js';
import { assert } from '@a-type/utils';

const context = createTestContext({
// serverLog: true,
// keepDb: true,
// testLog: true,
// keepDb: true,
});

it("doesn't receive back its own ops after pushing them", async () => {
Expand Down Expand Up @@ -80,16 +79,9 @@ it("doesn't receive back its own ops after pushing them", async () => {
assert(!!log);

expect(log[0].includes('\\"operations\\": []')).toBe(true);
// expect(log[0].includes('"baselines": []')).toBe(true);
logWatcher.mockClear();
}

// const log = logWatcher.mock.calls.find((args) =>
// args[0].includes('sync-resp'),
// );
// assert(!!log);
// should not receive info about Apples, which we created
// expect(!log[0].includes('Apples')).toBe(true);
logWatcher.mockClear();

await client.items.put({
Expand All @@ -112,15 +104,15 @@ it("doesn't receive back its own ops after pushing them", async () => {

client.sync.start();

await waitForOnline(client);
await waitForSync(client);
context.log('Client 1 online again');

context.log('Begin wait for no operations 2');
await waitAndAssertNoOperationsReturned();
context.log('End wait for no operations 2');

clientB.sync.start();
await waitForOnline(clientB);
await waitForSync(clientB);

// this seems to time out sometimes?
await waitForQueryResult(client.items.get(pears.get('id')));
Expand Down
5 changes: 4 additions & 1 deletion test/tests/reset.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import { createMigration } from '@verdant-web/common';

const ctx = createTestContext({
// testLog: true,
// serverLog: true,
});

async function connectAndSeedData(library = 'reset-1') {
const clientA = await ctx.createTestClient({
library,
user: 'User A',
// logId: 'A',
});
const clientB = await ctx.createTestClient({
library,
Expand Down Expand Up @@ -119,7 +121,7 @@ it('can re-initialize from replica after resetting server-side while replicas ar
const pearId = b_pear.get('id');

clientA.sync.start();
await waitForOnline(clientA);
await waitForSync(clientA);
ctx.log('Client A online');
// client A should now "win" and re-initialize server data

Expand All @@ -128,6 +130,7 @@ it('can re-initialize from replica after resetting server-side while replicas ar

clientB.sync.start();
ctx.log('Waiting for client B to re-initialize');
await waitForSync(clientB);

await waitForQueryResult(clientB.items.get(a_unknownItem.get('id')));
await waitForQueryResult(clientB.items.get(a_banana.get('id')));
Expand Down
4 changes: 2 additions & 2 deletions test/tests/shardTransition.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ it('migrates data from unified to sharded databases on launch', async () => {
replicas: [
{
ackedLogicalTime: expect.any(String),
ackedServerOrder: 0,
ackedServerOrder: 6,
id: expect.any(String),
profile: {
id: 'A',
Expand All @@ -114,7 +114,7 @@ it('migrates data from unified to sharded databases on launch', async () => {
replicas: [
{
ackedLogicalTime: expect.any(String),
ackedServerOrder: 0,
ackedServerOrder: 6,
id: expect.any(String),
profile: {
id: 'B',
Expand Down
Loading

0 comments on commit 52b3eef

Please sign in to comment.