diff --git a/src/local/listener/createDocumentObserver.ts b/src/local/listener/createDocumentObserver.ts new file mode 100644 index 0000000..3a8ef84 --- /dev/null +++ b/src/local/listener/createDocumentObserver.ts @@ -0,0 +1,67 @@ +import {type SanityClient} from '@sanity/client' +import {catchError, concatMap, map, of, throwError} from 'rxjs' + +import {type SanityDocumentBase} from '../../mutations/types' +import {type ListenerSyncEvent} from '../types' +import {type GlobalMutationEventStream} from './createGlobalMutationEventsListener' +import {FetchError, isClientError} from './errors' +import {sequentializeListenerEvents} from './sequentializeListenerEvents' + +/** + * Creates a resilient document observer that will always do it's best to maintain a local copy of the latest document from a sanity dataset + * Features + * - builtin retrying and connection recovery (track disconnected state by listening for `reconnect` events) + * - builtin mutation event ordering (they might arrive out of order), lost events detection (listen endpoint doesn't guarantee delivery) and recovery + * - discards already-applied mutation events received while fetching the initial document snapshot + * @param options + */ +export function createDocumentObserver(options: { + client: SanityClient + globalEvents: GlobalMutationEventStream +}) { + const {client, globalEvents} = options + return function observe(documentId: string) { + return globalEvents.pipe( + concatMap(event => + event.type === 'welcome' + ? client.observable.getDocument(documentId).pipe( + map( + (doc: undefined | Doc): ListenerSyncEvent => ({ + type: 'sync', + transactionId: doc?._id, + document: doc, + }), + ), + catchError((err: unknown) => { + const error = toError(err) + if (isClientError(error)) { + return throwError(() => error) + } + return throwError( + () => + new FetchError( + `An unexpected error occurred while fetching document: ${error?.message}`, + {cause: error}, + ), + ) + }), + ) + : of(event), + ), + sequentializeListenerEvents({ + maxBufferSize: 10, + resolveChainDeadline: 10_000, + }), + ) + } +} + +function toError(maybeErr: unknown) { + if (maybeErr instanceof Error) { + return maybeErr + } + if (typeof maybeErr === 'object' && maybeErr) { + return Object.assign(new Error(), maybeErr) + } + return new Error(String(maybeErr)) +} diff --git a/src/local/listener/documentObserver.ts b/src/local/listener/documentObserver.ts deleted file mode 100644 index 6160b37..0000000 --- a/src/local/listener/documentObserver.ts +++ /dev/null @@ -1,35 +0,0 @@ -import {type SanityClient} from '@sanity/client' -import {concatMap, map, of} from 'rxjs' - -import {type SanityDocumentLike} from '../../mutations/types' -import {type ListenerSyncEvent} from '../types' -import {type GlobalMutationEventStream} from './createGlobalMutationEventsListener' -import {sequentializeListenerEvents} from './sequentializeListenerEvents' - -export function createDocumentObserver(options: { - client: SanityClient - globalEvents: GlobalMutationEventStream -}) { - const {client, globalEvents} = options - return function observe(documentId: string) { - return globalEvents.pipe( - concatMap(event => - event.type === 'welcome' - ? client.observable.getDocument(documentId).pipe( - map( - (doc: undefined | Doc): ListenerSyncEvent => ({ - type: 'sync', - transactionId: doc?._id, - document: doc, - }), - ), - ) - : of(event), - ), - sequentializeListenerEvents({ - maxBufferSize: 10, - resolveChainDeadline: 10_000, - }), - ) - } -} diff --git a/src/local/listener/errors.ts b/src/local/listener/errors.ts index b229cac..a42f55a 100644 --- a/src/local/listener/errors.ts +++ b/src/local/listener/errors.ts @@ -1,17 +1,36 @@ +import {ClientError as SanityClientError} from '@sanity/client' + import {type ListenerSequenceState} from './sequentializeListenerEvents' +/* + * This file should include all errors that can be thrown by the document observer + */ + +export const ClientError = SanityClientError + +export class FetchError extends Error { + cause?: Error + constructor(message: string, extra?: {cause?: Error}) { + super(message) + this.cause = extra?.cause + this.name = 'FetchError' + } +} + export class ChannelError extends Error { constructor(message: string) { super(message) this.name = 'ChannelError' } } + export class DisconnectError extends Error { constructor(message: string) { super(message) this.name = 'DisconnectError' } } + export class OutOfSyncError extends Error { /** * Attach state to the error for debugging/reporting @@ -36,3 +55,9 @@ export class MaxBufferExceededError extends OutOfSyncError { this.name = 'MaxBufferExceededError' } } + +export function isClientError(e: unknown): e is SanityClientError { + if (typeof e !== 'object') return false + if (!e) return false + return 'statusCode' in e && 'response' in e +}