From ec7c5a4f4b0418ddc20a47d05bc4c4c20e45cefb Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Mon, 18 Mar 2024 08:09:21 +0000 Subject: [PATCH 1/6] add error handling when sequenceId is undefined --- src/Model.test.ts | 129 +++++++++++++++++++++++++++++++++++++++++++++- src/Model.ts | 37 ++++++++++++- 2 files changed, 163 insertions(+), 3 deletions(-) diff --git a/src/Model.test.ts b/src/Model.test.ts index b57af97..ded2036 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -3,6 +3,7 @@ import pino from 'pino'; import { Subject } from 'rxjs'; import { it, describe, expect, afterEach, vi, beforeEach } from 'vitest'; +import { StreamDiscontinuityError } from './Errors.js'; import Model from './Model.js'; import { defaultSyncOptions, defaultEventBufferOptions, defaultOptimisticEventOptions } from './Options.js'; import { IStream } from './stream/Stream.js'; @@ -126,6 +127,69 @@ describe('Model', () => { expect([undefined, { current: 'ready', previous: 'syncing', reason: undefined }]).toContain(syncResult); }); + it('it fails sync when sequenceId is undefined and does not retry', async ({ + channelName, + ably, + logger, + }) => { + const configNumRetries = 5; + const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined })); + const merge = vi.fn(); + const erroredListener = vi.fn(); + + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(10, configNumRetries) }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + model.on('errored', erroredListener); + + await model + .sync() + .catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId')); + expect(sync).toHaveBeenCalledOnce(); // sync is not retried + expect(merge).not.toHaveBeenCalled(); + expect(erroredListener).toHaveBeenCalledOnce(); + }); + + it('it fails sync when sequenceId is undefined with no retryable', async ({ + channelName, + ably, + logger, + }) => { + const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: undefined })); + const merge = vi.fn(); + const erroredListener = vi.fn(); + + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: () => -1 }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + model.on('errored', erroredListener); + + await model + .sync() + .catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId')); + expect(sync).toHaveBeenCalledOnce(); + expect(merge).not.toHaveBeenCalled(); + expect(erroredListener).toHaveBeenCalledOnce(); + }); + it('allows sync to be called manually, with params', async ({ channelName, ably, logger }) => { let completeSync: (...args: any[]) => void = () => { throw new Error('completeSync not defined'); @@ -1543,6 +1607,68 @@ describe('Model', () => { expect(lis).toHaveBeenCalledTimes(0); }); + it('stream message error can handle the a the StreamDiscontinuityError', async ({ + channelName, + ably, + logger, + streams, + }) => { + const events = { channelEvents: new Subject() }; + + const streamDiscontinuityError = new StreamDiscontinuityError('stream error'); + streams.newStream({ channelName }).subscribe = vi.fn(async (callback) => { + events.channelEvents.subscribe(() => callback(streamDiscontinuityError)); + }); + + let syncCalled = 0; + const sync = vi.fn(async () => { + if (syncCalled > 0) { + syncCalled++; + throw new Error('Syncing Error'); + } + + // first call successful + syncCalled++; + return { data: 'data_0', sequenceId: '0' }; + }); + + const merge = vi.fn((_, event) => event.data); + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(1, 1) }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + let subscription = new Subject(); + const subscriptionCall = getEventPromises(subscription, 1)[0]; + + const subscriptionListener = vi.fn(() => subscription.next()); + const erroredListener = vi.fn(); + + // initialize and sync + model.on('errored', erroredListener); + await model.subscribe(subscriptionListener); + expect(sync).toHaveBeenCalledOnce(); + + await subscriptionCall; // wait for first event to propagate + expect(subscriptionListener).toHaveBeenCalledTimes(1); + events.channelEvents.next(createMessage(1)); + + await statePromise(model, 'paused'); // pause before attempting to recover from error + await statePromise(model, 'syncing'); // resume() -> resync() to recover from error + await statePromise(model, 'errored'); // resync failed + + expect(merge).toHaveBeenCalledTimes(0); // message fails to process, merge not called + expect(sync).toHaveBeenCalledTimes(2); + expect(erroredListener).toHaveBeenCalledTimes(1); + }); + it('if merge function fails on confirmed event, and subsequent replay fails, set model to errored', async ({ channelName, ably, @@ -1550,7 +1676,6 @@ describe('Model', () => { streams, }) => { const s1 = streams.newStream({ channelName: channelName }); - s1.subscribe = vi.fn(); const events = new Subject(); s1.subscribe = vi.fn(async (callback) => { @@ -1588,7 +1713,7 @@ describe('Model', () => { eventBufferOptions: defaultEventBufferOptions, }, ); - model.sync(); + await model.sync(); expect(syncFn).toHaveReturnedTimes(1); const lis: EventListener = vi.fn(); diff --git a/src/Model.ts b/src/Model.ts index 20f56bd..a53b35d 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -196,6 +196,7 @@ export default class Model extends EventEmitter} params - The parameters to pass to the sync function. * @returns A promise that resolves when the model has successfully re-synchronised its state and is ready to start emitting updates. + * @throws {Error} If there is an error during the resync operation. */ public async sync(...params: Parameters) { this.logger.trace({ ...this.baseLogContext, action: 'sync()', params }); @@ -299,6 +300,8 @@ export default class Model extends EventEmitter} callback - The callback to invoke when the model data changes. * @param {SubscriptionOptions} options - Optional subscription options that can be used to specify whether to subscribe to * optimistic or only confirmed updates. Defaults to optimistic. + * @return A promise that resolves when the subscription has been set up and if in an initialized state successfully synchronised + * @throws {Error} If there is an error during the sync operation. */ public async subscribe(callback: SubscriptionCallback, options: SubscriptionOptions = { optimistic: true }) { if (typeof callback !== 'function') { @@ -367,6 +370,7 @@ export default class Model extends EventEmitter} callback - The callback to unsubscribe. + * @throws {Error} If callback is not a function */ public unsubscribe(callback: SubscriptionCallback) { if (typeof callback !== 'function') { @@ -405,6 +409,10 @@ export default class Model extends EventEmitter extends EventEmitter 0) { try { + if (this.state === 'errored') { + return; + } + await fn(); return; } catch (err) { + if (this.state === 'errored') { + throw err; + } + delay = retries(++i); if (delay < 0) { throw err; @@ -458,7 +478,16 @@ export default class Model extends EventEmitter))); + if (!sequenceId) { + const err = Error('The sync function returned an undefined sequenceId'); + // we set the state to errored here to ensure that this function is not retried by the Model.retryable() + // this avoids a sync function that returns the wrong response structure from being retried. + this.setState('errored', err); + throw err; + } + this.setConfirmedData(data); await this.computeState(this.confirmedData, this.optimisticData, this.optimisticEvents); await this.addStream(sequenceId); @@ -470,6 +499,7 @@ export default class Model extends EventEmitter extends EventEmitter) { From 58f4bb10e794cecd5e396252c7e1cfc6dcba17a1 Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Mon, 18 Mar 2024 08:10:11 +0000 Subject: [PATCH 2/6] add support when serquenceId is 0 and channel history is present --- src/stream/Middleware.test.ts | 65 +++++++++++++++---- src/stream/Middleware.ts | 22 ++++++- src/stream/Stream.test.ts | 114 ++++++++++++++++++++++++++++++++++ src/stream/Stream.ts | 8 +++ 4 files changed, 196 insertions(+), 13 deletions(-) diff --git a/src/stream/Middleware.test.ts b/src/stream/Middleware.test.ts index f8ff4bb..60c8dbf 100644 --- a/src/stream/Middleware.test.ts +++ b/src/stream/Middleware.test.ts @@ -181,13 +181,13 @@ describe('OrderedHistoryResumer', () => { }); it('orders numerically', () => { - const sequenceId = 0; + const sequenceId = 1; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); const subscription = vi.fn(); middleware.subscribe(subscription); // construct history page newest to oldest - let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)]; + let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)]; // shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true); expect(() => middleware.addHistoricalMessages(history)).toThrowError( @@ -201,13 +201,13 @@ describe('OrderedHistoryResumer', () => { }); it('orders lexicographically', () => { - const sequenceId = 0; + const sequenceId = 1; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0, lexicographicOrderer); const subscription = vi.fn(); middleware.subscribe(subscription); // construct history page newest to oldest - let history: Types.Message[] = [createMessage(10), createMessage(2), createMessage(1), createMessage(0)]; + let history: Types.Message[] = [createMessage(10), createMessage(3), createMessage(2), createMessage(1)]; // shuffle as the middleware should be resilient to some out-of-orderiness by sequenceId due to CGO expect(middleware.addHistoricalMessages(shuffle(history))).toBe(true); expect(() => middleware.addHistoricalMessages(history)).toThrowError( @@ -215,9 +215,9 @@ describe('OrderedHistoryResumer', () => { ); expect(subscription).toHaveBeenCalledTimes(3); - expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]); - expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]); - expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]); + expect(subscription).toHaveBeenNthCalledWith(1, null, history[0]); // id: 10 + expect(subscription).toHaveBeenNthCalledWith(2, null, history[2]); // id: 2 + expect(subscription).toHaveBeenNthCalledWith(3, null, history[1]); // id: 3 }); it('emits messages after the boundary with sparse sequence', () => { @@ -266,25 +266,26 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(2, null, history[0]); }); - it('flushes when empty history page reached', () => { - const sequenceId = 0; // out of reach + it('flushes but still in seeking state when empty history page reached', () => { + const sequenceId = 1; // out of reach const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); const subscription = vi.fn(); middleware.subscribe(subscription); let history: Types.Message[] = [ + createMessage(6), createMessage(5), createMessage(4), createMessage(3), createMessage(2), - createMessage(1), ]; const page1 = history; const page2 = []; expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false); - expect(middleware.addHistoricalMessages(shuffle(page2))).toBe(true); + expect(middleware.addHistoricalMessages(page2)).toBe(true); + expect(middleware.state).toBe('seeking'); expect(subscription).toHaveBeenCalledTimes(5); expect(subscription).toHaveBeenNthCalledWith(1, null, history[4]); expect(subscription).toHaveBeenNthCalledWith(2, null, history[3]); @@ -293,6 +294,37 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(5, null, history[0]); }); + it('successfully applies history when messages is empty but sequenceId is 0', () => { + const sequenceId = 0; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + let history: Types.Message[] = [createMessage(3), createMessage(2), createMessage(1)]; + const page1 = history; + const page2 = []; + + expect(middleware.addHistoricalMessages(shuffle(page1))).toBe(false); + expect(middleware.addHistoricalMessages(page2)).toBe(true); + + expect(middleware.state).toEqual('success'); + expect(subscription).toHaveBeenCalledTimes(3); + expect(subscription).toHaveBeenNthCalledWith(1, null, history[2]); + expect(subscription).toHaveBeenNthCalledWith(2, null, history[1]); + expect(subscription).toHaveBeenNthCalledWith(3, null, history[0]); + }); + + it('state is successful with empty page and no history', () => { + const sequenceId = 1; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + expect(middleware.addHistoricalMessages([])).toBe(true); + expect(middleware.state).toEqual('success'); + expect(subscription).not.toHaveBeenCalled(); + }); + it('merges historical messages with live messages', () => { const sequenceId = 3; const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); @@ -354,4 +386,15 @@ describe('OrderedHistoryResumer', () => { expect(subscription).toHaveBeenNthCalledWith(5, null, live[2]); expect(subscription).toHaveBeenNthCalledWith(6, null, live[3]); }); + + it('no seeking preformed if sequenceId is 0', () => { + const sequenceId = 0; + const middleware = new OrderedHistoryResumer(`${sequenceId}`, 0); + const subscription = vi.fn(); + middleware.subscribe(subscription); + + let history: Types.Message[] = [createMessage(1), createMessage(0)]; + expect(middleware.addHistoricalMessages(shuffle(history))).toBe(false); + expect(subscription).not.toHaveBeenCalled(); + }); }); diff --git a/src/stream/Middleware.ts b/src/stream/Middleware.ts index f141ffb..2d0cda1 100644 --- a/src/stream/Middleware.ts +++ b/src/stream/Middleware.ts @@ -133,6 +133,15 @@ export class OrderedHistoryResumer extends MiddlewareBase { return this.eventOrderer(a, b) <= 0; } + public applyHistory() { + if (this.historicalMessages.length === 0 || this.currentState === 'success') { + return; + } + + this.flush(); + this.currentState = 'success'; + } + public get state() { return this.currentState; } @@ -145,10 +154,12 @@ export class OrderedHistoryResumer extends MiddlewareBase { // the messages expired before the next page was requested. if (messages.length === 0) { // If there were some messages in history then there have definitely been changes to the state - // and we can't reach back far enough to resume from the correct point. + // and we can't reach back far enough to resume from the correct point. If the sequenceId is + // '0' then we assume this due to SQL coalesce and no actual message will ever be found so + // flush() will reply what history is there. const noHistory = this.historicalMessages.length === 0; this.flush(); - if (noHistory) { + if (noHistory || this.sequenceId === '0') { this.currentState = 'success'; } return true; @@ -169,6 +180,13 @@ export class OrderedHistoryResumer extends MiddlewareBase { // This is sufficiently low likelihood that this can be ignored for now. this.historicalMessages.sort((a, b) => this.reverseOrderer(a.id, b.id)); + // A sequenceId of 0 is a cursor that represents the position before the start of the stream. + // There is no such historical message to seek to, instead we should paginate through all the + // of history to obtain all messages that were published since the sequenceId was obtained. + if (this.sequenceId === '0') { + return false; + } + // Seek backwards through history until we reach a message id <= the specified sequenceId. // Discard anything older (>= sequenceId) and flush out the remaining messages. for (let i = 0; i < this.historicalMessages.length; i++) { diff --git a/src/stream/Stream.test.ts b/src/stream/Stream.test.ts index 150c0ab..fafa80a 100644 --- a/src/stream/Stream.test.ts +++ b/src/stream/Stream.test.ts @@ -270,6 +270,120 @@ describe('Stream', () => { }); }); + it('successfully to syncs if sequenceId is 0 with multiple pages of history', async ({ + ably, + logger, + channelName, + }) => { + const subscribeListener = vi.fn(); + const channel = ably.channels.get(channelName); + ably.channels.release = vi.fn(); + channel.subscribe = vi.fn( + async (): Promise => ({ + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }), + ); + let i = 0; + channel.history = vi.fn(async (): Promise>> => { + i++; + if (i === 1) { + return { + items: [createMessage(7), createMessage(6), createMessage(5)], + hasNext: () => true, + }; + } + return { + items: [createMessage(4), createMessage(3), createMessage(2)], + hasNext: () => false, + }; + }); + + const stream = new Stream({ + ably, + logger, + channelName: 'foobar', + syncOptions: defaultSyncOptions, + eventBufferOptions: defaultEventBufferOptions, + }); + stream.subscribe(subscribeListener); + let replayPromise = stream.replay('0'); + + await statePromise(stream, 'seeking'); + await expect(replayPromise).resolves.toBeUndefined(); + + expect(channel.subscribe).toHaveBeenCalledOnce(); + expect(channel.history).toHaveBeenCalledTimes(2); + expect(channel.history).toHaveBeenNthCalledWith(1, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(channel.history).toHaveBeenNthCalledWith(2, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(subscribeListener).toHaveBeenCalledTimes(6); + }); + + it('successfully to syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({ + ably, + logger, + channelName, + }) => { + const subscribeListener = vi.fn(); + const channel = ably.channels.get(channelName); + ably.channels.release = vi.fn(); + channel.subscribe = vi.fn( + async (): Promise => ({ + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }), + ); + let i = 0; + channel.history = vi.fn(async (): Promise>> => { + i++; + if (i === 1) { + return { + items: [createMessage(7), createMessage(6), createMessage(5)], + hasNext: () => true, + }; + } + return { + items: [], + hasNext: () => false, + }; + }); + + const stream = new Stream({ + ably, + logger, + channelName: 'foobar', + syncOptions: defaultSyncOptions, + eventBufferOptions: defaultEventBufferOptions, + }); + stream.subscribe(subscribeListener); + let replayPromise = stream.replay('0'); + + await statePromise(stream, 'seeking'); + await expect(replayPromise).resolves.toBeUndefined(); + + expect(channel.subscribe).toHaveBeenCalledOnce(); + expect(channel.history).toHaveBeenCalledTimes(2); + expect(channel.history).toHaveBeenNthCalledWith(1, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(channel.history).toHaveBeenNthCalledWith(2, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(subscribeListener).toHaveBeenCalledTimes(3); + }); + it('subscribes to messages', async ({ ably, logger, channelName }) => { const channel = ably.channels.get(channelName); channel.history = vi.fn( diff --git a/src/stream/Stream.ts b/src/stream/Stream.ts index b3869fa..b7514c2 100644 --- a/src/stream/Stream.ts +++ b/src/stream/Stream.ts @@ -197,6 +197,14 @@ export default class Stream extends EventEmitter 0 && page.hasNext() && !done); + if (sequenceId === '0' && this.middleware.state !== 'success') { + // The sequenceId is 0 there will be no message in the history to match it. + // The middleware is not in success which means there is some history so we apply it + // The situation occurs when history has been added in the time between the sync function resolving the stream + // getting to this point + this.middleware.applyHistory(); + } + // If the middleware is not in the success state it means there were some history messages and we never reached the target sequenceId. // This means the target sequenceId was too old and a re-sync from a newer state snapshot is required. if (this.middleware.state !== 'success') { From a53c505845329beb8dc9a2a4d0cfa10798da7a8d Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Mon, 18 Mar 2024 15:16:55 +0000 Subject: [PATCH 3/6] feat: add integration for sequence history test - failing test skipped --- package-lock.json | 20 ++ package.json | 1 + src/Model.integration.test.ts | 552 ++++++++++++++++++++------------- src/Model.test.ts | 2 +- src/Model.ts | 2 +- src/stream/Stream.test.ts | 4 +- src/types/model.ts | 2 +- src/utilities/test/promises.ts | 8 + 8 files changed, 368 insertions(+), 223 deletions(-) diff --git a/package-lock.json b/package-lock.json index 3f95b7a..25e11f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -30,6 +30,7 @@ "prettier": "^3.1.0", "typedoc": "^0.25.3", "typescript": "^5.1.6", + "uuid": "^9.0.1", "vitest": "^0.34.1" } }, @@ -4927,6 +4928,19 @@ "punycode": "^2.1.0" } }, + "node_modules/uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "dev": true, + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } + }, "node_modules/v8-to-istanbul": { "version": "9.1.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.1.0.tgz", @@ -8663,6 +8677,12 @@ "punycode": "^2.1.0" } }, + "uuid": { + "version": "9.0.1", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-9.0.1.tgz", + "integrity": "sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==", + "dev": true + }, "v8-to-istanbul": { "version": "9.1.0", "resolved": "https://registry.npmjs.org/v8-to-istanbul/-/v8-to-istanbul-9.1.0.tgz", diff --git a/package.json b/package.json index a154a8d..e9462d9 100644 --- a/package.json +++ b/package.json @@ -55,6 +55,7 @@ "prettier": "^3.1.0", "typedoc": "^0.25.3", "typescript": "^5.1.6", + "uuid": "^9.0.1", "vitest": "^0.34.1" }, "dependencies": { diff --git a/src/Model.integration.test.ts b/src/Model.integration.test.ts index b0051b1..5a3e8dc 100644 --- a/src/Model.integration.test.ts +++ b/src/Model.integration.test.ts @@ -1,13 +1,15 @@ import { Realtime } from 'ably/promises'; import pino from 'pino'; import { Subject } from 'rxjs'; +import { v4 as uuid } from 'uuid'; import { it, describe, expect, beforeEach, afterEach, vi } from 'vitest'; import Model from './Model.js'; import { defaultSyncOptions, defaultEventBufferOptions, defaultOptimisticEventOptions } from './Options.js'; import type { ModelOptions } from './types/model.js'; +import { fixedRetryStrategy } from './utilities/retries.js'; import { createAblyApp } from './utilities/test/createAblyApp.js'; -import { getEventPromises } from './utilities/test/promises.js'; +import { foreachSync, getEventPromises } from './utilities/test/promises.js'; interface TestContext extends ModelOptions { model: Model; @@ -20,262 +22,376 @@ interface TestContext extends ModelOptions { syncData: Record; } +interface TestStreamContext extends ModelOptions { + model: Model; + channelName: string; +} + describe('Model integration', () => { - beforeEach(async (context) => { - const channelName = 'test-channel'; - const data = await createAblyApp({ - keys: [{}], - namespaces: [{ id: channelName, persisted: true }], - channels: [ + describe('model state and optimistic events', () => { + beforeEach(async (context) => { + const channelName = 'test-channel'; + const data = await createAblyApp({ + keys: [{}], + namespaces: [{ id: channelName, persisted: true }], + channels: [ + { + name: channelName, + presence: [ + { clientId: 'John', data: 'john@test.com' }, + { clientId: 'Dave', data: 'dave@test.com' }, + ], + }, + ], + }); + const syncData = { + 1: { + data: { + name: 'John', + email: 'john@test.io', + }, + sequenceId: '1', + }, + 2: { + data: { + city: 'London', + country: 'Canada', + }, + sequenceId: '2', + }, + }; + const ably = new Realtime({ + key: data.keys[0].keyStr, + environment: 'sandbox', + }); + const logger = pino({ level: 'debug' }); + const model = new Model( + channelName, { - name: channelName, - presence: [ - { clientId: 'John', data: 'john@test.com' }, - { clientId: 'Dave', data: 'dave@test.com' }, - ], + sync: async (id: string) => syncData[id], + merge: (state: object, event) => (state ? { ...state, ...event.data } : event.data), }, - ], - }); - const syncData = { - 1: { - data: { - name: 'John', - email: 'john@test.io', + { + ably, + channelName, + logger, + syncOptions: defaultSyncOptions, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, }, - sequenceId: '1', - }, - 2: { + ); + + context.model = model; + context.channelName = channelName; + context.ably = ably; + + context.eventData = { + name: 'update', data: { - city: 'London', - country: 'Canada', + foo: 34, }, - sequenceId: '2', - }, - }; - const ably = new Realtime({ - key: data.keys[0].keyStr, - environment: 'sandbox', + mutationId: 'some-id-1', + }; + context.syncData = syncData; }); - const logger = pino({ level: 'debug' }); - const model = new Model( - channelName, - { - sync: async (id: string) => syncData[id], - merge: (state, event) => (state ? { ...state, ...event.data } : event.data), - }, - { - ably, - channelName, - logger, - syncOptions: defaultSyncOptions, - optimisticEventOptions: defaultOptimisticEventOptions, - eventBufferOptions: defaultEventBufferOptions, - }, - ); - - context.model = model; - context.channelName = channelName; - context.ably = ably; - - context.eventData = { - name: 'update', - data: { - foo: 34, - }, - mutationId: 'some-id-1', - }; - context.syncData = syncData; - }); - afterEach(async ({ model }) => { - await model.dispose(); - vi.restoreAllMocks(); - }); + afterEach(async ({ model }) => { + await model.dispose(); + vi.restoreAllMocks(); + }); - it('changes state on sync, pause, resume, dispose', async ({ model }) => { - expect(model.state).toEqual('initialized'); + it('changes state on sync, pause, resume, dispose', async ({ model }) => { + expect(model.state).toEqual('initialized'); - const sync = model.sync(1); - expect(model.state).toEqual('syncing'); + const sync = model.sync(1); + expect(model.state).toEqual('syncing'); - await sync; - expect(model.state).toEqual('ready'); + await sync; + expect(model.state).toEqual('ready'); - await model.pause(); - expect(model.state).toEqual('paused'); + await model.pause(); + expect(model.state).toEqual('paused'); - await model.resume(); - expect(model.state).toEqual('ready'); + await model.resume(); + expect(model.state).toEqual('ready'); - await model.dispose(); - expect(model.state).toEqual('disposed'); - }); + await model.dispose(); + expect(model.state).toEqual('disposed'); + }); - it('successfully sets the data from the event in optimistic()', async ({ - ably, - model, - channelName, - eventData, - syncData, - }) => { - await model.sync(1); - expect(model.data.optimistic).toEqual(syncData[1].data); - - let subscription = new Subject(); - const subscriptionCalls = getEventPromises(subscription, 3); - const subscriptionSpy = vi.fn(() => subscription.next()); - const finalData = { ...syncData[1].data, ...eventData.data }; - - model.subscribe(subscriptionSpy); - - await subscriptionCalls[0]; - expect(model.data.confirmed).toEqual(syncData[1].data); - - const [confirmation] = await model.optimistic(eventData); - - await subscriptionCalls[1]; - expect(model.data.optimistic).toEqual(finalData); - - const channel = ably.channels.get(channelName); - await channel.publish({ - data: eventData.data, - name: 'update', - extras: { - headers: { - 'x-ably-models-event-uuid': eventData.mutationId, + it('successfully sets the data from the event in optimistic()', async ({ + ably, + model, + channelName, + eventData, + syncData, + }) => { + await model.sync(1); + expect(model.data.optimistic).toEqual(syncData[1].data); + + let subscription = new Subject(); + const subscriptionCalls = getEventPromises(subscription, 3); + const subscriptionSpy = vi.fn(() => subscription.next()); + const finalData = { ...syncData[1].data, ...eventData.data }; + + model.subscribe(subscriptionSpy); + + await subscriptionCalls[0]; + expect(model.data.confirmed).toEqual(syncData[1].data); + + const [confirmation] = await model.optimistic(eventData); + + await subscriptionCalls[1]; + expect(model.data.optimistic).toEqual(finalData); + + const channel = ably.channels.get(channelName); + await channel.publish({ + data: eventData.data, + name: 'update', + extras: { + headers: { + 'x-ably-models-event-uuid': eventData.mutationId, + }, }, - }, - }); + }); - await subscriptionCalls[2]; - await confirmation; + await subscriptionCalls[2]; + await confirmation; - expect(subscriptionSpy).toHaveBeenCalledTimes(3); - expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, finalData); - expect(model.data.confirmed).toEqual(finalData); - }); + expect(subscriptionSpy).toHaveBeenCalledTimes(3); + expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, finalData); + expect(model.data.confirmed).toEqual(finalData); + }); - it('rejects the data in optimistic() and rolls back the changes if back-end rejects the data', async ({ - ably, - model, - channelName, - eventData, - syncData, - }) => { - await model.sync(1); - let subscription = new Subject(); - const subscriptionCalls = getEventPromises(subscription, 2); - const subscriptionSpy = vi.fn(() => subscription.next()); - const finalData = { ...syncData[1].data, ...eventData.data }; - - model.subscribe(subscriptionSpy); - await model.optimistic(eventData); - expect(model.data.optimistic).toEqual(finalData); - - const channel = ably.channels.get(channelName); - await channel.publish({ - data: eventData.data, - name: 'update', - extras: { - headers: { - 'x-ably-models-event-uuid': eventData.mutationId, - 'x-ably-models-reject': true, + it('rejects the data in optimistic() and rolls back the changes if back-end rejects the data', async ({ + ably, + model, + channelName, + eventData, + syncData, + }) => { + await model.sync(1); + let subscription = new Subject(); + const subscriptionCalls = getEventPromises(subscription, 2); + const subscriptionSpy = vi.fn(() => subscription.next()); + const finalData = { ...syncData[1].data, ...eventData.data }; + + model.subscribe(subscriptionSpy); + await model.optimistic(eventData); + expect(model.data.optimistic).toEqual(finalData); + + const channel = ably.channels.get(channelName); + await channel.publish({ + data: eventData.data, + name: 'update', + extras: { + headers: { + 'x-ably-models-event-uuid': eventData.mutationId, + 'x-ably-models-reject': 'true', + }, }, - }, + }); + + await subscriptionCalls[0]; + expect(model.data.confirmed).toEqual(syncData[1].data); + + await subscriptionCalls[1]; + + expect(subscriptionSpy).toHaveBeenCalledTimes(2); + expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, finalData); + expect(model.data.confirmed).toEqual(syncData[1].data); }); - await subscriptionCalls[0]; - expect(model.data.confirmed).toEqual(syncData[1].data); + it('rejects the data and rolls back the changes if optimistic() timeouts', async ({ + model, + eventData, + syncData, + }) => { + await model.sync(1); - await subscriptionCalls[1]; + let subscription = new Subject(); + const subscriptionCalls = getEventPromises(subscription, 2); + const subscriptionSpy = vi.fn(() => subscription.next()); - expect(subscriptionSpy).toHaveBeenCalledTimes(2); - expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, finalData); - expect(model.data.confirmed).toEqual(syncData[1].data); - }); + model.subscribe(subscriptionSpy); - it('rejects the data and rolls back the changes if optimistic() timeouts', async ({ - model, - eventData, - syncData, - }) => { - await model.sync(1); + await subscriptionCalls[0]; + expect(model.data.confirmed).toEqual(syncData[1].data); - let subscription = new Subject(); - const subscriptionCalls = getEventPromises(subscription, 2); - const subscriptionSpy = vi.fn(() => subscription.next()); + const [confirmation] = await model.optimistic(eventData, { timeout: 10 }); + expect(model.data.confirmed).toEqual(syncData[1].data); + expect(model.data.optimistic).toEqual({ ...syncData[1].data, ...eventData.data }); - model.subscribe(subscriptionSpy); + await subscriptionCalls[1]; + expect(model.data.confirmed).toEqual(syncData[1].data); + await expect(confirmation).rejects.toThrow('timed out waiting for event confirmation'); + }); - await subscriptionCalls[0]; - expect(model.data.confirmed).toEqual(syncData[1].data); + it('rebases optimistic data on top of subsequent event data', async ({ + ably, + model, + channelName, + eventData, + syncData, + }) => { + const channel = ably.channels.get(channelName); + const otherEvent = { + data: { + comment: "I'm blazingly fast!", + }, + mutationId: 'some-id-2', + name: 'updateComment', + }; + await model.sync(1); + + let subscription = new Subject(); + const subscriptionCalls = getEventPromises(subscription, 5); + const subscriptionSpy = vi.fn(() => subscription.next()); + + model.subscribe(subscriptionSpy); + + const [confirmation] = await model.optimistic(eventData); + expect(model.data.optimistic).toEqual({ ...syncData[1].data, ...eventData.data }); + + await channel.publish({ + ...otherEvent, + extras: { + headers: { + 'x-ably-models-event-uuid': otherEvent.mutationId, + }, + }, + }); + + await subscriptionCalls[2]; + expect(model.data.confirmed).toEqual({ + ...syncData[1].data, + ...otherEvent.data, + }); + + await channel.publish({ + ...eventData, + extras: { + headers: { + 'x-ably-models-event-uuid': eventData.mutationId, + }, + }, + }); - const [confirmation] = await model.optimistic(eventData, { timeout: 10 }); - expect(model.data.confirmed).toEqual(syncData[1].data); - expect(model.data.optimistic).toEqual({ ...syncData[1].data, ...eventData.data }); + await subscriptionCalls[5]; + await confirmation; - await subscriptionCalls[1]; - expect(model.data.confirmed).toEqual(syncData[1].data); - await expect(confirmation).rejects.toThrow('timed out waiting for event confirmation'); + expect(model.data.confirmed).toEqual({ + ...syncData[1].data, + ...eventData.data, + ...otherEvent.data, + }); + }); }); - it('rebases optimistic data on top of subsequent event data', async ({ - ably, - model, - channelName, - eventData, - syncData, - }) => { - const channel = ably.channels.get(channelName); - const otherEvent = { - data: { - comment: "I'm blazingly fast!", - }, - mutationId: 'some-id-2', - name: 'updateComment', - }; - await model.sync(1); - - let subscription = new Subject(); - const subscriptionCalls = getEventPromises(subscription, 5); - const subscriptionSpy = vi.fn(() => subscription.next()); - - model.subscribe(subscriptionSpy); - - const [confirmation] = await model.optimistic(eventData); - expect(model.data.optimistic).toEqual({ ...syncData[1].data, ...eventData.data }); - - await channel.publish({ - ...otherEvent, - extras: { - headers: { - 'x-ably-models-event-uuid': otherEvent.mutationId, + describe('channel stream', () => { + beforeEach(async (context) => { + const channelName = 'test-channel-stream-' + uuid(); + const data = await createAblyApp({ + keys: [{}], + namespaces: [{ id: channelName, persisted: true }], + channels: [ + { + name: channelName, + presence: [], + }, + ], + }); + + const ably = new Realtime({ + key: data.keys[0].keyStr, + environment: 'sandbox', + }); + const logger = pino({ level: 'debug' }); + const model = new Model( + channelName, + { + // sync: async (id: string) => ( { data: { value: id}, sequenceId: id}), + sync: async (id: string) => { + return { data: { value: id }, sequenceId: id }; + }, + merge: (state: object, event) => (state ? { ...state, ...event.data } : event.data), }, - }, - }); + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: fixedRetryStrategy(1, 1) }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); - await subscriptionCalls[2]; - expect(model.data.confirmed).toEqual({ - ...syncData[1].data, - ...otherEvent.data, + context.model = model; + context.channelName = channelName; + context.ably = ably; }); - await channel.publish({ - ...eventData, - extras: { - headers: { - 'x-ably-models-event-uuid': eventData.mutationId, - }, - }, + afterEach(async ({ model }) => { + await model.dispose(); + vi.restoreAllMocks(); }); - await subscriptionCalls[5]; - await confirmation; + it('message seeking correctly applies only the latest messages', async ({ + model, + ably, + channelName, + }) => { + const subscriptionEventCounter = new Subject(); + const subscriptionEvents = getEventPromises(subscriptionEventCounter, 2); + const subscribeListener = vi.fn(() => subscriptionEventCounter.next()); + + const syncSequenceId = 2; + const messages = [ + { id: '1', data: { value: 1 }, extras: { headers: { 'x-ably-models-event-uuid': '1' } } }, + { id: '2', data: { value: 2 }, extras: { headers: { 'x-ably-models-event-uuid': '2' } } }, // sequenceId matches this event + { id: '3', data: { value: 3 }, extras: { headers: { 'x-ably-models-event-uuid': '3' } } }, // subscription event 1 + { id: '4', data: { value: 4 }, extras: { headers: { 'x-ably-models-event-uuid': '4' } } }, // subscription event 2 + ]; + const channel = ably.channels.get(channelName); + await foreachSync(messages, ({ id, data, extras }) => + channel.publish({ id, data, name: 'model-mutation', extras }), + ); + + await model.sync(syncSequenceId); + await model.subscribe(subscribeListener); + + await subscriptionEvents[0]; + await subscriptionEvents[1]; + + expect(subscribeListener).toHaveBeenCalledTimes(2); + expect(subscribeListener).toHaveBeenNthCalledWith(1, null, { value: 3 }); + expect(subscribeListener).toHaveBeenNthCalledWith(2, null, { value: 4 }); + }); - expect(model.data.confirmed).toEqual({ - ...syncData[1].data, - ...eventData.data, - ...otherEvent.data, + it('sequence id 0 applies all history', async ({ model, ably, channelName }) => { + const subscriptionEventCounter = new Subject(); + const subscriptionEvents = getEventPromises(subscriptionEventCounter, 2); + const subscribeListener = vi.fn(() => subscriptionEventCounter.next()); + + const syncSequenceId = '0'; + const messages = [ + { id: '7', data: { value: 7 }, extras: { headers: { 'x-ably-models-event-uuid': '7' } } }, + { id: '3', data: { value: 3 }, extras: { headers: { 'x-ably-models-event-uuid': '3' } } }, + ]; + const channel = ably.channels.get(channelName); + await foreachSync(messages, ({ id, data, extras }) => + channel.publish({ id, data, name: 'model-mutation', extras }), + ); + + await model.sync(syncSequenceId); + await model.subscribe(subscribeListener); + + await subscriptionEvents[0]; + await subscriptionEvents[1]; + + expect(subscribeListener).toHaveBeenCalledTimes(2); + expect(subscribeListener).toHaveBeenNthCalledWith(1, null, { value: 3 }); + expect(subscribeListener).toHaveBeenNthCalledWith(2, null, { value: 7 }); }); }); }); diff --git a/src/Model.test.ts b/src/Model.test.ts index ded2036..a345783 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -1607,7 +1607,7 @@ describe('Model', () => { expect(lis).toHaveBeenCalledTimes(0); }); - it('stream message error can handle the a the StreamDiscontinuityError', async ({ + it('stream message error can handle the StreamDiscontinuityError', async ({ channelName, ably, logger, diff --git a/src/Model.ts b/src/Model.ts index a53b35d..54a0d7e 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -607,7 +607,7 @@ export default class Model extends EventEmitter { }); }); - it('successfully to syncs if sequenceId is 0 with multiple pages of history', async ({ + it('successfully syncs if sequenceId is 0 with multiple pages of history', async ({ ably, logger, channelName, @@ -327,7 +327,7 @@ describe('Stream', () => { expect(subscribeListener).toHaveBeenCalledTimes(6); }); - it('successfully to syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({ + it('successfully syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({ ably, logger, channelName, diff --git a/src/types/model.ts b/src/types/model.ts index 434b617..a660505 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -145,7 +145,7 @@ export type ConfirmedEvent = Event & { * If true, indicates that the backend is (asynchronously) explicitly rejecting this optimistic change. * This is useful if the server cannot reject the change synchronously with the mutation request * (such as if the rejection occurred after the backend sent a response). - * This field is set to `true` iff. there is an `x-ably-models-reject: true` header in the message extras. + * This field is set to `true` if. there is an `x-ably-models-reject: true` header in the message extras. * @see https://ably.com/docs/api/realtime-sdk/messages?lang=nodejs#extras */ rejected: boolean; diff --git a/src/utilities/test/promises.ts b/src/utilities/test/promises.ts index 9b24e09..9a61143 100644 --- a/src/utilities/test/promises.ts +++ b/src/utilities/test/promises.ts @@ -9,3 +9,11 @@ export const getEventPromises = (subject: Subject, n: number) => { } return promises; }; + +export async function foreachSync(data: any[], callback: Function): Promise { + for (let i = 0; i < data.length; i++) { + await callback(data[i]); + } + + return; +} From 25ec24a15ef94126c9ec44a2efdbfbc618320a99 Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Tue, 19 Mar 2024 14:43:53 +0000 Subject: [PATCH 4/6] feat: update sequenceId type to be string or number --- src/Model.test.ts | 76 ++++++++++++++++++++++++++++++++++++++- src/Model.ts | 5 +-- src/stream/Middleware.ts | 9 ++--- src/stream/Stream.test.ts | 59 +++++++++++++++++++++++++++++- src/stream/Stream.ts | 9 ++--- src/types/model.ts | 6 +++- src/types/optimistic.ts | 6 ++-- 7 files changed, 154 insertions(+), 16 deletions(-) diff --git a/src/Model.test.ts b/src/Model.test.ts index a345783..8e53eab 100644 --- a/src/Model.test.ts +++ b/src/Model.test.ts @@ -92,7 +92,11 @@ describe('Model', () => { vi.restoreAllMocks(); }); - it('enters ready state after sync', async ({ channelName, ably, logger }) => { + it('enters ready state after sync with a string sequenceId ', async ({ + channelName, + ably, + logger, + }) => { // the promise returned by the subscribe method resolves when we have successfully attached to the channel let completeSync: (...args: any[]) => void = () => { throw new Error('completeSync not defined'); @@ -127,6 +131,45 @@ describe('Model', () => { expect([undefined, { current: 'ready', previous: 'syncing', reason: undefined }]).toContain(syncResult); }); + it('eenters ready state after sync with a number sequenceId', async ({ + channelName, + ably, + logger, + }) => { + // the promise returned by the subscribe method resolves when we have successfully attached to the channel + let completeSync: (...args: any[]) => void = () => { + throw new Error('completeSync not defined'); + }; + const synchronised = new Promise((resolve) => (completeSync = resolve)); + const sync = vi.fn(async () => { + await synchronised; + return { data: simpleTestData, sequenceId: 0 }; + }); + const model = new Model( + 'test', + { sync: sync, merge: () => simpleTestData }, + { + ably, + channelName, + logger, + syncOptions: defaultSyncOptions, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + await statePromise(model, 'initialized'); + const modelSynced = model.sync(); + + await statePromise(model, 'syncing'); + completeSync(); + await statePromise(model, 'ready'); + expect(sync).toHaveBeenCalledOnce(); + expect(model.data.optimistic).toEqual(simpleTestData); + expect(model.data.confirmed).toEqual(simpleTestData); + const syncResult = await modelSynced; + expect([undefined, { current: 'ready', previous: 'syncing', reason: undefined }]).toContain(syncResult); + }); + it('it fails sync when sequenceId is undefined and does not retry', async ({ channelName, ably, @@ -190,6 +233,37 @@ describe('Model', () => { expect(erroredListener).toHaveBeenCalledOnce(); }); + it('it fails sync when sequenceId is null with no retryable', async ({ + channelName, + ably, + logger, + }) => { + const sync = vi.fn(async () => ({ data: simpleTestData, sequenceId: null })); + const merge = vi.fn(); + const erroredListener = vi.fn(); + + const model = new Model( + 'test', + { sync, merge }, + { + ably, + channelName, + logger, + syncOptions: { ...defaultSyncOptions, retryStrategy: () => -1 }, + optimisticEventOptions: defaultOptimisticEventOptions, + eventBufferOptions: defaultEventBufferOptions, + }, + ); + model.on('errored', erroredListener); + + await model + .sync() + .catch((err) => expect(err.message).toEqual('The sync function returned an undefined sequenceId')); + expect(sync).toHaveBeenCalledOnce(); + expect(merge).not.toHaveBeenCalled(); + expect(erroredListener).toHaveBeenCalledOnce(); + }); + it('allows sync to be called manually, with params', async ({ channelName, ably, logger }) => { let completeSync: (...args: any[]) => void = () => { throw new Error('completeSync not defined'); diff --git a/src/Model.ts b/src/Model.ts index 54a0d7e..24c304a 100644 --- a/src/Model.ts +++ b/src/Model.ts @@ -22,6 +22,7 @@ import type { ConfirmedEvent, ExtractData, SyncFuncConstraint, + MessageId, } from './types/model.js'; import { MODELS_EVENT_REJECT_HEADER, @@ -480,7 +481,7 @@ export default class Model extends EventEmitter))); - if (!sequenceId) { + if (sequenceId === null || sequenceId === undefined) { const err = Error('The sync function returned an undefined sequenceId'); // we set the state to errored here to ensure that this function is not retried by the Model.retryable() // this avoids a sync function that returns the wrong response structure from being retried. @@ -519,7 +520,7 @@ export default class Model extends EventEmitter { }); }); - it('successfully syncs if sequenceId is 0 with multiple pages of history', async ({ + it('successfully syncs if sequenceId is "0" with multiple pages of history', async ({ ably, logger, channelName, @@ -327,6 +327,63 @@ describe('Stream', () => { expect(subscribeListener).toHaveBeenCalledTimes(6); }); + it('successfully syncs if sequenceId is 0 with multiple pages of history', async ({ + ably, + logger, + channelName, + }) => { + const subscribeListener = vi.fn(); + const channel = ably.channels.get(channelName); + ably.channels.release = vi.fn(); + channel.subscribe = vi.fn( + async (): Promise => ({ + current: 'attached', + previous: 'attaching', + resumed: false, + hasBacklog: false, + }), + ); + let i = 0; + channel.history = vi.fn(async (): Promise>> => { + i++; + if (i === 1) { + return { + items: [createMessage(7), createMessage(6), createMessage(5)], + hasNext: () => true, + }; + } + return { + items: [createMessage(4), createMessage(3), createMessage(2)], + hasNext: () => false, + }; + }); + + const stream = new Stream({ + ably, + logger, + channelName: 'foobar', + syncOptions: defaultSyncOptions, + eventBufferOptions: defaultEventBufferOptions, + }); + stream.subscribe(subscribeListener); + let replayPromise = stream.replay(0); + + await statePromise(stream, 'seeking'); + await expect(replayPromise).resolves.toBeUndefined(); + + expect(channel.subscribe).toHaveBeenCalledOnce(); + expect(channel.history).toHaveBeenCalledTimes(2); + expect(channel.history).toHaveBeenNthCalledWith(1, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(channel.history).toHaveBeenNthCalledWith(2, { + untilAttach: true, + limit: defaultSyncOptions.historyPageSize, + }); + expect(subscribeListener).toHaveBeenCalledTimes(6); + }); + it('successfully syncs if sequenceId is 0 with 2 pages of history, second one empty', async ({ ably, logger, diff --git a/src/stream/Stream.ts b/src/stream/Stream.ts index b7514c2..9502b56 100644 --- a/src/stream/Stream.ts +++ b/src/stream/Stream.ts @@ -5,6 +5,7 @@ import { Subject, Subscription } from 'rxjs'; import { OrderedHistoryResumer } from './Middleware.js'; import { StreamDiscontinuityError } from '../Errors.js'; import type { StandardCallback } from '../types/callbacks'; +import { MessageId } from '../types/model.js'; import type { StreamStateChange, StreamOptions, StreamState } from '../types/stream.js'; import EventEmitter from '../utilities/EventEmitter.js'; import { VERSION } from '../version.js'; @@ -14,7 +15,7 @@ export interface IStream { get channelName(): string; reset(): Promise; - replay(sequenceId: string): Promise; + replay(sequenceId: MessageId): Promise; subscribe(callback: StandardCallback): void; unsubscribe(callback: StandardCallback): void; dispose(reason?: AblyTypes.ErrorInfo | string): Promise; @@ -106,7 +107,7 @@ export default class Stream extends EventEmitter 0 && page.hasNext() && !done); - if (sequenceId === '0' && this.middleware.state !== 'success') { + if ((sequenceId === '0' || sequenceId === 0) && this.middleware.state !== 'success') { // The sequenceId is 0 there will be no message in the history to match it. // The middleware is not in success which means there is some history so we apply it // The situation occurs when history has been added in the time between the sync function resolving the stream diff --git a/src/types/model.ts b/src/types/model.ts index a660505..3529435 100644 --- a/src/types/model.ts +++ b/src/types/model.ts @@ -129,6 +129,10 @@ export type OptimisticEvent = Event & { confirmed: false; }; +/** + * A message ID used to identify an event. Used for message seeking. + */ +export type MessageId = string | number; /** * An event received from the backend over Ably that represents a confirmed change on the underlying state in the database. */ @@ -174,7 +178,7 @@ export type SyncFunc = F; * @returns {Promise<{data: T, sequenceId: string}>} A promise containing the data from the backend and a sequenceId. * @interface */ -export type SyncReturnType = Promise<{ data: T; sequenceId: string }>; +export type SyncReturnType = Promise<{ data: T; sequenceId: MessageId }>; /** * Type constraint for a sync function. diff --git a/src/types/optimistic.ts b/src/types/optimistic.ts index 5fad58c..9cb9786 100644 --- a/src/types/optimistic.ts +++ b/src/types/optimistic.ts @@ -13,11 +13,11 @@ export type EventComparator = (optimistic: Event, confirmed: Event) => boolean; * EventOrderer is used to determine the order of elements in the event buffer. It expects * to return a negative value of the first argument is less than the second argument, zero * if they are equal, and a positive value otherwise. - * @param {string} a - The first event ID. - * @param {string} b - The second event ID. + * @param {string | number} a - The first event ID. + * @param {string | number} b - The second event ID. * @param {number} - A negative value if a < b, zero if a == b, a positive value otherwise. */ -export type EventOrderer = (a: string, b: string) => number; +export type EventOrderer = (a: string | number, b: string | number) => number; /** * OptimisticEventOptions can be used to configure options on individual optimistic events. From b3c7880aabcaa56273b8118cd56653b8a55cdc36 Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Tue, 19 Mar 2024 10:49:38 +0000 Subject: [PATCH 5/6] chore: add missing types dev deps for uuid package --- package-lock.json | 13 +++++++++++++ package.json | 1 + 2 files changed, 14 insertions(+) diff --git a/package-lock.json b/package-lock.json index 25e11f1..5c3ff3a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "typedoc-plugin-missing-exports": "^2.1.0" }, "devDependencies": { + "@types/uuid": "^9.0.8", "@typescript-eslint/eslint-plugin": "^6.1.0", "@typescript-eslint/parser": "^6.1.0", "@vitest/coverage-c8": "^0.33.0", @@ -894,6 +895,12 @@ "integrity": "sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==", "dev": true }, + "node_modules/@types/uuid": { + "version": "9.0.8", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz", + "integrity": "sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==", + "dev": true + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "6.13.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-6.13.1.tgz", @@ -5801,6 +5808,12 @@ "integrity": "sha512-dn1l8LaMea/IjDoHNd9J52uBbInB796CDffS6VdIxvqYCPSG0V0DzHp76GpaWnlhg88uYyPbXCDIowa86ybd5A==", "dev": true }, + "@types/uuid": { + "version": "9.0.8", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-9.0.8.tgz", + "integrity": "sha512-jg+97EGIcY9AGHJJRaaPVgetKDsrTgbRjQ5Msgjh/DQKEFl0DtyRr/VCOyD1T2R1MNeWPK/u7JoGhlDZnKBAfA==", + "dev": true + }, "@typescript-eslint/eslint-plugin": { "version": "6.13.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-6.13.1.tgz", diff --git a/package.json b/package.json index e9462d9..0b815ff 100644 --- a/package.json +++ b/package.json @@ -42,6 +42,7 @@ "collaboration" ], "devDependencies": { + "@types/uuid": "^9.0.8", "@typescript-eslint/eslint-plugin": "^6.1.0", "@typescript-eslint/parser": "^6.1.0", "@vitest/coverage-c8": "^0.33.0", From e95124ced9934c4e54d0ac63839adcd8585c4a73 Mon Sep 17 00:00:00 2001 From: Matt Kaschula Date: Tue, 19 Mar 2024 12:01:49 +0000 Subject: [PATCH 6/6] chore: add confirmation rejects expectation --- src/Model.integration.test.ts | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/Model.integration.test.ts b/src/Model.integration.test.ts index 5a3e8dc..8bb12c2 100644 --- a/src/Model.integration.test.ts +++ b/src/Model.integration.test.ts @@ -30,7 +30,7 @@ interface TestStreamContext extends ModelOptions { describe('Model integration', () => { describe('model state and optimistic events', () => { beforeEach(async (context) => { - const channelName = 'test-channel'; + const channelName = 'test-channel-' + uuid(); const data = await createAblyApp({ keys: [{}], namespaces: [{ id: channelName, persisted: true }], @@ -168,22 +168,23 @@ describe('Model integration', () => { model, channelName, eventData, - syncData, + syncData: mockSyncResponses, }) => { await model.sync(1); let subscription = new Subject(); - const subscriptionCalls = getEventPromises(subscription, 2); + const subscriptionCalls = getEventPromises(subscription, 3); const subscriptionSpy = vi.fn(() => subscription.next()); - const finalData = { ...syncData[1].data, ...eventData.data }; + const syncData = mockSyncResponses[1].data; + const optimisticallyUpdatedData = { ...syncData, ...eventData.data }; - model.subscribe(subscriptionSpy); - await model.optimistic(eventData); - expect(model.data.optimistic).toEqual(finalData); + await model.subscribe(subscriptionSpy); + const [confirmation] = await model.optimistic(eventData); + expect(model.data.optimistic).toEqual(optimisticallyUpdatedData); const channel = ably.channels.get(channelName); await channel.publish({ data: eventData.data, - name: 'update', + name: eventData.name, extras: { headers: { 'x-ably-models-event-uuid': eventData.mutationId, @@ -193,13 +194,15 @@ describe('Model integration', () => { }); await subscriptionCalls[0]; - expect(model.data.confirmed).toEqual(syncData[1].data); + expect(model.data.confirmed).toEqual(syncData); await subscriptionCalls[1]; - - expect(subscriptionSpy).toHaveBeenCalledTimes(2); - expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, finalData); - expect(model.data.confirmed).toEqual(syncData[1].data); + await subscriptionCalls[2]; + expect(subscriptionSpy).toHaveBeenCalledTimes(3); + expect(subscriptionSpy).toHaveBeenNthCalledWith(2, null, optimisticallyUpdatedData); + expect(subscriptionSpy).toHaveBeenNthCalledWith(3, null, syncData); + expect(model.data.confirmed).toEqual(syncData); + expect(confirmation).rejects.toThrow('events contain rejections: name:update'); }); it('rejects the data and rolls back the changes if optimistic() timeouts', async ({