From b2375ce3faae6ddcfbfb910759525a966cc0ed38 Mon Sep 17 00:00:00 2001 From: Jordan Lawrence Date: Mon, 19 Aug 2024 16:41:19 +0100 Subject: [PATCH 1/2] chore(core): introduce improvements to efficiency of listeners to preview store --- packages/sanity/package.json | 1 + .../sanity/src/core/preview/availability.ts | 8 +- .../sanity/src/core/preview/constants.tsx | 6 + .../src/core/preview/createGlobalListener.ts | 32 +++++ .../src/core/preview/createObserveDocument.ts | 87 ++++++++++++++ .../src/core/preview/createPathObserver.ts | 9 +- .../src/core/preview/createPreviewObserver.ts | 22 ++-- .../sanity/src/core/preview/documentPair.ts | 4 +- .../src/core/preview/documentPreviewStore.ts | 103 ++++++++++++---- packages/sanity/src/core/preview/index.ts | 1 + .../src/core/preview/liveDocumentIdSet.ts | 112 ++++++++++++++++++ .../sanity/src/core/preview/observeFields.ts | 90 ++++---------- packages/sanity/src/core/preview/types.ts | 11 ++ .../src/core/preview/useLiveDocumentIdSet.ts | 47 ++++++++ .../src/core/preview/useLiveDocumentSet.ts | 34 ++++++ .../src/core/preview/useObserveDocument.ts | 32 +++++ .../src/core/preview/useValuePreview.ts | 11 +- .../core/preview/utils/applyMendozaPatch.ts | 18 +++ .../core/preview/utils/replayLatest.test.ts | 37 ++++++ .../core/preview/utils/shareReplayLatest.ts | 70 +++++++++++ 20 files changed, 630 insertions(+), 105 deletions(-) create mode 100644 packages/sanity/src/core/preview/createGlobalListener.ts create mode 100644 packages/sanity/src/core/preview/createObserveDocument.ts create mode 100644 packages/sanity/src/core/preview/liveDocumentIdSet.ts create mode 100644 packages/sanity/src/core/preview/useLiveDocumentIdSet.ts create mode 100644 packages/sanity/src/core/preview/useLiveDocumentSet.ts create mode 100644 packages/sanity/src/core/preview/useObserveDocument.ts create mode 100644 packages/sanity/src/core/preview/utils/applyMendozaPatch.ts create mode 100644 packages/sanity/src/core/preview/utils/replayLatest.test.ts create mode 100644 packages/sanity/src/core/preview/utils/shareReplayLatest.ts diff --git a/packages/sanity/package.json b/packages/sanity/package.json index a4240242c5e..be89f65223d 100644 --- a/packages/sanity/package.json +++ b/packages/sanity/package.json @@ -252,6 +252,7 @@ "rimraf": "^3.0.2", "rxjs": "^7.8.0", "rxjs-exhaustmap-with-trailing": "^2.1.1", + "rxjs-mergemap-array": "^0.1.0", "sanity-diff-patch": "^3.0.2", "scroll-into-view-if-needed": "^3.0.3", "semver": "^7.3.5", diff --git a/packages/sanity/src/core/preview/availability.ts b/packages/sanity/src/core/preview/availability.ts index e77537678e8..acabe99a4b6 100644 --- a/packages/sanity/src/core/preview/availability.ts +++ b/packages/sanity/src/core/preview/availability.ts @@ -67,17 +67,21 @@ function mutConcat(array: T[], chunks: T[]) { return array } -export function create_preview_availability( +export function createPreviewAvailabilityObserver( versionedClient: SanityClient, observePaths: ObservePathsFn, ): { - observeDocumentPairAvailability(id: string): Observable + observeDocumentPairAvailability( + id: string, + options?: {version?: string}, + ): Observable } { /** * Returns an observable of metadata for a given drafts model document */ function observeDocumentPairAvailability( id: string, + {version}: {version?: string} = {}, ): Observable { const draftId = getDraftId(id) const publishedId = getPublishedId(id) diff --git a/packages/sanity/src/core/preview/constants.tsx b/packages/sanity/src/core/preview/constants.tsx index a21b6825738..cd3785e79ae 100644 --- a/packages/sanity/src/core/preview/constants.tsx +++ b/packages/sanity/src/core/preview/constants.tsx @@ -4,6 +4,12 @@ import {type PreviewValue} from '@sanity/types' export const INCLUDE_FIELDS_QUERY = ['_id', '_rev', '_type'] export const INCLUDE_FIELDS = [...INCLUDE_FIELDS_QUERY, '_key'] +/** + * How long to wait after the last subscriber has unsubscribed before resetting the observable and disconnecting the listener + * We want to keep the listener alive for a short while after the last subscriber has unsubscribed to avoid unnecessary reconnects + */ +export const LISTENER_RESET_DELAY = 2000 + export const AVAILABILITY_READABLE = { available: true, reason: 'READABLE', diff --git a/packages/sanity/src/core/preview/createGlobalListener.ts b/packages/sanity/src/core/preview/createGlobalListener.ts new file mode 100644 index 00000000000..7be43f93e31 --- /dev/null +++ b/packages/sanity/src/core/preview/createGlobalListener.ts @@ -0,0 +1,32 @@ +import {type SanityClient} from '@sanity/client' +import {timer} from 'rxjs' + +import {LISTENER_RESET_DELAY} from './constants' +import {shareReplayLatest} from './utils/shareReplayLatest' + +/** + * @internal + * Creates a listener that will emit 'welcome' for all new subscribers immediately, and thereafter emit at every mutation event + */ +export function createGlobalListener(client: SanityClient) { + return client + .listen( + '*[!(_id in path("_.**"))]', + {}, + { + events: ['welcome', 'mutation', 'reconnect'], + includeResult: false, + includePreviousRevision: false, + includeMutations: false, + visibility: 'query', + effectFormat: 'mendoza', + tag: 'preview.global', + }, + ) + .pipe( + shareReplayLatest({ + predicate: (event) => event.type === 'welcome' || event.type === 'reconnect', + resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY), + }), + ) +} diff --git a/packages/sanity/src/core/preview/createObserveDocument.ts b/packages/sanity/src/core/preview/createObserveDocument.ts new file mode 100644 index 00000000000..6b384a7fbf8 --- /dev/null +++ b/packages/sanity/src/core/preview/createObserveDocument.ts @@ -0,0 +1,87 @@ +import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client' +import {type SanityDocument} from '@sanity/types' +import {memoize, uniq} from 'lodash' +import {EMPTY, finalize, type Observable, of} from 'rxjs' +import {concatMap, map, scan, shareReplay} from 'rxjs/operators' + +import {type ApiConfig} from './types' +import {applyMendozaPatch} from './utils/applyMendozaPatch' +import {debounceCollect} from './utils/debounceCollect' + +export function createObserveDocument({ + mutationChannel, + client, +}: { + client: SanityClient + mutationChannel: Observable +}) { + const getBatchFetcher = memoize( + function getBatchFetcher(apiConfig: {dataset: string; projectId: string}) { + const _client = client.withConfig(apiConfig) + + function batchFetchDocuments(ids: [string][]) { + return _client.observable + .fetch(`*[_id in $ids]`, {ids: uniq(ids.flat())}, {tag: 'preview.observe-document'}) + .pipe( + // eslint-disable-next-line max-nested-callbacks + map((result) => ids.map(([id]) => result.find((r: {_id: string}) => r._id === id))), + ) + } + return debounceCollect(batchFetchDocuments, 100) + }, + (apiConfig) => apiConfig.dataset + apiConfig.projectId, + ) + + const MEMO: Record> = {} + + function observeDocument(id: string, apiConfig?: ApiConfig) { + const _apiConfig = apiConfig || { + dataset: client.config().dataset!, + projectId: client.config().projectId!, + } + const fetchDocument = getBatchFetcher(_apiConfig) + return mutationChannel.pipe( + concatMap((event) => { + if (event.type === 'welcome') { + return fetchDocument(id).pipe(map((document) => ({type: 'sync' as const, document}))) + } + return event.documentId === id ? of(event) : EMPTY + }), + scan((current: SanityDocument | undefined, event) => { + if (event.type === 'sync') { + return event.document + } + if (event.type === 'mutation') { + return applyMutationEvent(current, event) + } + //@ts-expect-error - this should never happen + throw new Error(`Unexpected event type: "${event.type}"`) + }, undefined), + ) + } + return function memoizedObserveDocument(id: string, apiConfig?: ApiConfig) { + const key = apiConfig ? `${id}-${JSON.stringify(apiConfig)}` : id + if (!(key in MEMO)) { + MEMO[key] = observeDocument(id, apiConfig).pipe( + finalize(() => delete MEMO[key]), + shareReplay({bufferSize: 1, refCount: true}), + ) + } + return MEMO[key] + } +} + +function applyMutationEvent(current: SanityDocument | undefined, event: MutationEvent) { + if (event.previousRev !== current?._rev) { + console.warn('Document out of sync, skipping mutation') + return current + } + if (!event.effects) { + throw new Error( + 'Mutation event is missing effects. Is the listener set up with effectFormat=mendoza?', + ) + } + const next = applyMendozaPatch(current, event.effects.apply) + // next will be undefined in case of deletion + return next ? {...next, _rev: event.resultRev} : undefined +} diff --git a/packages/sanity/src/core/preview/createPathObserver.ts b/packages/sanity/src/core/preview/createPathObserver.ts index 773a0338434..0c006b9af9f 100644 --- a/packages/sanity/src/core/preview/createPathObserver.ts +++ b/packages/sanity/src/core/preview/createPathObserver.ts @@ -119,8 +119,13 @@ function normalizePaths(path: (FieldName | PreviewPath)[]): PreviewPath[] { ) } -export function createPathObserver(context: {observeFields: ObserveFieldsFn}) { - const {observeFields} = context +/** + * Creates a function that allows observing nested paths on a document. + * If the path includes a reference, the reference will be "followed", allowing for selecting paths within the referenced document. + * @param options - Options - Requires a function that can observe fields on a document + * */ +export function createPathObserver(options: {observeFields: ObserveFieldsFn}) { + const {observeFields} = options return { observePaths( diff --git a/packages/sanity/src/core/preview/createPreviewObserver.ts b/packages/sanity/src/core/preview/createPreviewObserver.ts index 08b60271829..3b3338adef6 100644 --- a/packages/sanity/src/core/preview/createPreviewObserver.ts +++ b/packages/sanity/src/core/preview/createPreviewObserver.ts @@ -33,16 +33,20 @@ export function createPreviewObserver(context: { }): ( value: Previewable, type: PreviewableType, - viewOptions?: PrepareViewOptions, - apiConfig?: ApiConfig, + options?: { + viewOptions?: PrepareViewOptions + apiConfig?: ApiConfig + }, ) => Observable { const {observeDocumentTypeFromId, observePaths} = context return function observeForPreview( value: Previewable, type: PreviewableType, - viewOptions?: PrepareViewOptions, - apiConfig?: ApiConfig, + options?: { + viewOptions?: PrepareViewOptions + apiConfig?: ApiConfig + }, ): Observable { if (isCrossDatasetReferenceSchemaType(type)) { // if the value is of type crossDatasetReference, but has no _ref property, we cannot prepare any value for the preview @@ -57,7 +61,7 @@ export function createPreviewObserver(context: { switchMap((typeName) => { if (typeName) { const refType = type.to.find((toType) => toType.type === typeName) - return observeForPreview(value, refType as any, {}, refApiConfig) + return observeForPreview(value, refType as any, {apiConfig: refApiConfig}) } return observableOf({snapshot: undefined}) }), @@ -88,10 +92,10 @@ export function createPreviewObserver(context: { } const paths = getPreviewPaths(type.preview) if (paths) { - return observePaths(value, paths, apiConfig).pipe( + return observePaths(value, paths, options?.apiConfig).pipe( map((snapshot) => ({ type: type, - snapshot: snapshot && prepareForPreview(snapshot, type as any, viewOptions), + snapshot: snapshot && prepareForPreview(snapshot, type as any, options?.viewOptions), })), ) } @@ -103,7 +107,9 @@ export function createPreviewObserver(context: { return observableOf({ type, snapshot: - value && isRecord(value) ? invokePrepare(type, value, viewOptions).returnValue : null, + value && isRecord(value) + ? invokePrepare(type, value, options?.viewOptions).returnValue + : null, }) } } diff --git a/packages/sanity/src/core/preview/documentPair.ts b/packages/sanity/src/core/preview/documentPair.ts index d797a147141..6c8e682258c 100644 --- a/packages/sanity/src/core/preview/documentPair.ts +++ b/packages/sanity/src/core/preview/documentPair.ts @@ -4,7 +4,7 @@ import {combineLatest, type Observable, of} from 'rxjs' import {map, switchMap} from 'rxjs/operators' import {getIdPair, isRecord} from '../util' -import {create_preview_availability} from './availability' +import {createPreviewAvailabilityObserver} from './availability' import {type DraftsModelDocument, type ObservePathsFn, type PreviewPath} from './types' export function create_preview_documentPair( @@ -16,7 +16,7 @@ export function create_preview_documentPair( paths: PreviewPath[], ) => Observable> } { - const {observeDocumentPairAvailability} = create_preview_availability( + const {observeDocumentPairAvailability} = createPreviewAvailabilityObserver( versionedClient, observePaths, ) diff --git a/packages/sanity/src/core/preview/documentPreviewStore.ts b/packages/sanity/src/core/preview/documentPreviewStore.ts index e715cee925d..7f4343819ef 100644 --- a/packages/sanity/src/core/preview/documentPreviewStore.ts +++ b/packages/sanity/src/core/preview/documentPreviewStore.ts @@ -1,14 +1,22 @@ -import {type SanityClient} from '@sanity/client' +import { + type MutationEvent, + type QueryParams, + type SanityClient, + type WelcomeEvent, +} from '@sanity/client' import {type PrepareViewOptions, type SanityDocument} from '@sanity/types' -import {type Observable} from 'rxjs' -import {distinctUntilChanged, map} from 'rxjs/operators' +import {combineLatest, type Observable} from 'rxjs' +import {distinctUntilChanged, filter, map} from 'rxjs/operators' import {isRecord} from '../util' -import {create_preview_availability} from './availability' +import {createPreviewAvailabilityObserver} from './availability' +import {createGlobalListener} from './createGlobalListener' +import {createObserveDocument} from './createObserveDocument' import {createPathObserver} from './createPathObserver' import {createPreviewObserver} from './createPreviewObserver' import {create_preview_documentPair} from './documentPair' -import {create_preview_observeFields} from './observeFields' +import {createDocumentIdSetObserver, type DocumentIdSetObserverState} from './liveDocumentIdSet' +import {createObserveFields} from './observeFields' import { type ApiConfig, type DraftsModelDocument, @@ -26,11 +34,15 @@ import { export type ObserveForPreviewFn = ( value: Previewable, type: PreviewableType, - viewOptions?: PrepareViewOptions, - apiConfig?: ApiConfig, + options?: {viewOptions?: PrepareViewOptions; apiConfig?: ApiConfig}, ) => Observable /** + * The document preview store supports subscribing to content for previewing purposes. + * Documents observed by this store will be kept in sync and receive real-time updates from all collaborators, + * but has no support for optimistic updates, so any local edits will require a server round-trip before becoming visible, + * which means this store is less suitable for real-time editing scenarios. + * * @hidden * @beta */ export interface DocumentPreviewStore { @@ -45,12 +57,51 @@ export interface DocumentPreviewStore { */ unstable_observeDocumentPairAvailability: ( id: string, + options?: {version?: string}, ) => Observable unstable_observePathsDocumentPair: ( id: string, paths: PreviewPath[], + options?: {version?: string}, ) => Observable> + + /** + * Observes a set of document IDs that matches the given groq-filter. The document ids are returned in ascending order and will update in real-time + * Whenever a document appears or disappears from the set, a new array with the updated set of IDs will be pushed to subscribers. + * The query is performed once, initially, and thereafter the set of ids are patched based on the `appear` and `disappear` + * transitions on the received listener events. + * This provides a lightweight way of subscribing to a list of ids for simple cases where you just want to subscribe to a set of documents ids + * that matches a particular filter. + * @hidden + * @beta + * @param filter - A groq filter to use for the document set + * @param params - Parameters to use with the groq filter + * @param options - Options for the observer + */ + unstable_observeDocumentIdSet: ( + filter: string, + params?: QueryParams, + options?: { + /** + * Where to insert new items into the set. Defaults to 'sorted' which is based on the lexicographic order of the id + */ + insert?: 'sorted' | 'prepend' | 'append' + }, + ) => Observable + + /** + * Observe a complete document with the given ID + * @hidden + * @beta + */ + unstable_observeDocument: (id: string) => Observable + /** + * Observe a list of complete documents with the given IDs + * @hidden + * @beta + */ + unstable_observeDocuments: (ids: string[]) => Observable<(SanityDocument | undefined)[]> } /** @internal */ @@ -63,18 +114,19 @@ export function createDocumentPreviewStore({ client, }: DocumentPreviewStoreOptions): DocumentPreviewStore { const versionedClient = client.withConfig({apiVersion: '1'}) + const globalListener = createGlobalListener(versionedClient).pipe( + filter( + (event): event is MutationEvent | WelcomeEvent => + // ignore reconnect events for now until we know that downstream consumers can handle them + event.type === 'mutation' || event.type === 'welcome', + ), + ) + const invalidationChannel = globalListener.pipe( + map((event) => (event.type === 'welcome' ? {type: 'connected' as const} : event)), + ) - // NOTE: this is workaroudn for circumventing a circular dependency between `observePaths` and - // `observeFields`. - // eslint-disable-next-line camelcase - const __proxy_observePaths: ObservePathsFn = (value, paths, apiConfig) => { - return observePaths(value, paths, apiConfig) - } - - const {observeFields} = create_preview_observeFields({ - observePaths: __proxy_observePaths, - versionedClient, - }) + const observeDocument = createObserveDocument({client, mutationChannel: globalListener}) + const observeFields = createObserveFields({client: versionedClient, invalidationChannel}) const {observePaths} = createPathObserver({observeFields}) @@ -82,27 +134,34 @@ export function createDocumentPreviewStore({ id: string, apiConfig?: ApiConfig, ): Observable { - return observePaths({_type: 'reference', _ref: id}, ['_type'], apiConfig).pipe( + return observePaths({_type: 'reference', _ref: id}, ['_type', '_version'], apiConfig).pipe( map((res) => (isRecord(res) && typeof res._type === 'string' ? res._type : undefined)), distinctUntilChanged(), ) } - // const {createPreviewObserver} = create_preview_createPreviewObserver(observeDocumentTypeFromId) + const observeDocumentIdSet = createDocumentIdSetObserver( + versionedClient.withConfig({apiVersion: '2024-07-22'}), + ) + const observeForPreview = createPreviewObserver({observeDocumentTypeFromId, observePaths}) - const {observeDocumentPairAvailability} = create_preview_availability( + const {observeDocumentPairAvailability} = createPreviewAvailabilityObserver( versionedClient, observePaths, ) const {observePathsDocumentPair} = create_preview_documentPair(versionedClient, observePaths) // @todo: explain why the API is like this now, and that it should not be like this in the future! + return { observePaths, observeForPreview, observeDocumentTypeFromId, - // eslint-disable-next-line camelcase + unstable_observeDocumentIdSet: observeDocumentIdSet, + unstable_observeDocument: observeDocument, + unstable_observeDocuments: (ids: string[]) => + combineLatest(ids.map((id) => observeDocument(id))), unstable_observeDocumentPairAvailability: observeDocumentPairAvailability, unstable_observePathsDocumentPair: observePathsDocumentPair, } diff --git a/packages/sanity/src/core/preview/index.ts b/packages/sanity/src/core/preview/index.ts index 5a283024f4d..7e9a3043d57 100644 --- a/packages/sanity/src/core/preview/index.ts +++ b/packages/sanity/src/core/preview/index.ts @@ -3,6 +3,7 @@ export * from './components/PreviewLoader' export * from './components/SanityDefaultPreview' export * from './documentPreviewStore' export * from './types' +export {useObserveDocument as unstable_useObserveDocument} from './useObserveDocument' export * from './useValuePreview' export {getPreviewPaths} from './utils/getPreviewPaths' export {getPreviewStateObservable} from './utils/getPreviewStateObservable' diff --git a/packages/sanity/src/core/preview/liveDocumentIdSet.ts b/packages/sanity/src/core/preview/liveDocumentIdSet.ts new file mode 100644 index 00000000000..49d8401cde1 --- /dev/null +++ b/packages/sanity/src/core/preview/liveDocumentIdSet.ts @@ -0,0 +1,112 @@ +import {type QueryParams, type SanityClient} from '@sanity/client' +import {sortedIndex} from 'lodash' +import {of} from 'rxjs' +import {distinctUntilChanged, filter, map, mergeMap, scan, tap} from 'rxjs/operators' + +export type DocumentIdSetObserverState = { + status: 'reconnecting' | 'connected' + documentIds: string[] +} + +interface LiveDocumentIdSetOptions { + insert?: 'sorted' | 'prepend' | 'append' +} + +export function createDocumentIdSetObserver(client: SanityClient) { + return function observe( + queryFilter: string, + params?: QueryParams, + options: LiveDocumentIdSetOptions = {}, + ) { + const {insert: insertOption = 'sorted'} = options + + const query = `*[${queryFilter}]._id` + function fetchFilter() { + return client.observable + .fetch(query, params, { + tag: 'preview.observe-document-set.fetch', + }) + .pipe( + tap((result) => { + if (!Array.isArray(result)) { + throw new Error( + `Expected query to return array of documents, but got ${typeof result}`, + ) + } + }), + ) + } + return client.observable + .listen(query, params, { + visibility: 'transaction', + events: ['welcome', 'mutation', 'reconnect'], + includeResult: false, + includeMutations: false, + tag: 'preview.observe-document-set.listen', + }) + .pipe( + mergeMap((event) => { + return event.type === 'welcome' + ? fetchFilter().pipe(map((result) => ({type: 'fetch' as const, result}))) + : of(event) + }), + scan( + ( + state: DocumentIdSetObserverState | undefined, + event, + ): DocumentIdSetObserverState | undefined => { + if (event.type === 'reconnect') { + return { + documentIds: state?.documentIds || [], + ...state, + status: 'reconnecting' as const, + } + } + if (event.type === 'fetch') { + return {...state, status: 'connected' as const, documentIds: event.result} + } + if (event.type === 'mutation') { + if (event.transition === 'update') { + // ignore updates, as we're only interested in documents appearing and disappearing from the set + return state + } + if (event.transition === 'appear') { + return { + status: 'connected', + documentIds: insert(state?.documentIds || [], event.documentId, insertOption), + } + } + if (event.transition === 'disappear') { + return { + status: 'connected', + documentIds: state?.documentIds + ? state.documentIds.filter((id) => id !== event.documentId) + : [], + } + } + } + return state + }, + undefined, + ), + distinctUntilChanged(), + filter( + (state: DocumentIdSetObserverState | undefined): state is DocumentIdSetObserverState => + state !== undefined, + ), + ) + } +} + +function insert(array: T[], element: T, strategy: 'sorted' | 'prepend' | 'append') { + let index + if (strategy === 'prepend') { + index = 0 + } else if (strategy === 'append') { + index = array.length + } else { + index = sortedIndex(array, element) + } + + return array.toSpliced(index, 0, element) +} diff --git a/packages/sanity/src/core/preview/observeFields.ts b/packages/sanity/src/core/preview/observeFields.ts index b86a038da20..de9ea564b53 100644 --- a/packages/sanity/src/core/preview/observeFields.ts +++ b/packages/sanity/src/core/preview/observeFields.ts @@ -5,7 +5,6 @@ import { concat, defer, EMPTY, - from, fromEvent, merge, type Observable, @@ -29,7 +28,7 @@ import { type ApiConfig, type FieldName, type Id, - type ObservePathsFn, + type InvalidationChannelEvent, type PreviewPath, type Selection, } from './types' @@ -48,58 +47,19 @@ type Cache = { [id: string]: CachedFieldObserver[] } -export function create_preview_observeFields(context: { - observePaths: ObservePathsFn - versionedClient: SanityClient +/** + * Creates a function that allows observing individual fields on a document. + * It will automatically debounce and batch requests, and maintain an in-memory cache of the latest field values + * @param options - Options to use when creating the observer + */ +export function createObserveFields(options: { + client: SanityClient + invalidationChannel: Observable }) { - const {observePaths, versionedClient} = context - - let _globalListener: any - - const getGlobalEvents = () => { - if (!_globalListener) { - const allEvents$ = from( - versionedClient.listen( - '*[!(_id in path("_.**"))]', - {}, - { - events: ['welcome', 'mutation'], - includeResult: false, - visibility: 'query', - tag: 'preview.global', - }, - ), - ).pipe(share()) - - // This is a stream of welcome events from the server, each telling us that we have established listener connection - // We map these to snapshot fetch/sync. It is good to wait for the first welcome event before fetching any snapshots as, we may miss - // events that happens in the time period after initial fetch and before the listener is established. - const welcome$ = allEvents$.pipe( - filter((event: any) => event.type === 'welcome'), - shareReplay({refCount: true, bufferSize: 1}), - ) - - // This will keep the listener active forever and in turn reduce the number of initial fetches - // as less 'welcome' events will be emitted. - // @todo: see if we can delay unsubscribing or connect with some globally defined shared listener - welcome$.subscribe() - - const mutations$ = allEvents$.pipe(filter((event: any) => event.type === 'mutation')) - - _globalListener = { - welcome$, - mutations$, - } - } - - return _globalListener - } - + const {client: currentDatasetClient, invalidationChannel} = options function listen(id: Id) { - const globalEvents = getGlobalEvents() - return merge( - globalEvents.welcome$, - globalEvents.mutations$.pipe(filter((event: any) => event.documentId === id)), + return invalidationChannel.pipe( + filter((event) => event.type === 'connected' || event.documentId === id), ) } @@ -112,13 +72,20 @@ export function create_preview_observeFields(context: { } } - const fetchDocumentPathsFast = debounceCollect(fetchAllDocumentPathsWith(versionedClient), 100) - const fetchDocumentPathsSlow = debounceCollect(fetchAllDocumentPathsWith(versionedClient), 1000) + const fetchDocumentPathsFast = debounceCollect( + fetchAllDocumentPathsWith(currentDatasetClient), + 100, + ) + + const fetchDocumentPathsSlow = debounceCollect( + fetchAllDocumentPathsWith(currentDatasetClient), + 1000, + ) function currentDatasetListenFields(id: Id, fields: PreviewPath[]) { return listen(id).pipe( - switchMap((event: any) => { - if (event.type === 'welcome' || event.visibility === 'query') { + switchMap((event) => { + if (event.type === 'connected' || event.visibility === 'query') { return fetchDocumentPathsFast(id, fields as any).pipe( mergeMap((result) => { return concat( @@ -137,17 +104,11 @@ export function create_preview_observeFields(context: { ) } - // keep for debugging purposes for now - // function fetchDocumentPaths(id, selection) { - // return client.observable.fetch(`*[_id==$id]{_id,_type,${selection.join(',')}}`, {id}) - // .map(result => result[0]) - // } - const CACHE: Cache = {} // todo: use a LRU cache instead (e.g. hashlru or quick-lru) const getBatchFetcherForDataset = memoize( function getBatchFetcherForDataset(apiConfig: ApiConfig) { - const client = versionedClient.withConfig(apiConfig) + const client = currentDatasetClient.withConfig(apiConfig) const fetchAll = fetchAllDocumentPathsWith(client) return debounceCollect(fetchAll, 10) }, @@ -226,8 +187,7 @@ export function create_preview_observeFields(context: { ) } - // API - return {observeFields: cachedObserveFields} + return cachedObserveFields function pickFrom(objects: Record[], fields: string[]) { return [...INCLUDE_FIELDS, ...fields].reduce((result, fieldName) => { diff --git a/packages/sanity/src/core/preview/types.ts b/packages/sanity/src/core/preview/types.ts index c4adb85bab6..dbd82de69fa 100644 --- a/packages/sanity/src/core/preview/types.ts +++ b/packages/sanity/src/core/preview/types.ts @@ -112,6 +112,17 @@ export interface DraftsModelDocument + documentPreviewStore.unstable_observeDocumentIdSet(filter, params, options).pipe( + scan( + (currentState: LiveDocumentSetState, nextState) => ({ + ...currentState, + ...nextState, + }), + INITIAL_STATE, + ), + ), + [documentPreviewStore, filter, params, options], + ) + return useObservable(observable, INITIAL_STATE) +} diff --git a/packages/sanity/src/core/preview/useLiveDocumentSet.ts b/packages/sanity/src/core/preview/useLiveDocumentSet.ts new file mode 100644 index 00000000000..16c5c27be24 --- /dev/null +++ b/packages/sanity/src/core/preview/useLiveDocumentSet.ts @@ -0,0 +1,34 @@ +import {type QueryParams} from '@sanity/client' +import {type SanityDocument} from '@sanity/types' +import {useMemo} from 'react' +import {useObservable} from 'react-rx' +import {map} from 'rxjs/operators' +import {mergeMapArray} from 'rxjs-mergemap-array' + +import {useDocumentPreviewStore} from '../store' + +const INITIAL_VALUE = {loading: true, documents: []} + +/** + * @internal + * @beta + * + * Observes a set of documents matching the filter and returns an array of complete documents + * A new array will be pushed whenever a document in the set changes + * Document ids are returned in ascending order + * Any sorting beyond that must happen client side + */ +export function useLiveDocumentSet( + groqFilter: string, + params?: QueryParams, +): {loading: boolean; documents: SanityDocument[]} { + const documentPreviewStore = useDocumentPreviewStore() + const observable = useMemo(() => { + return documentPreviewStore.unstable_observeDocumentIdSet(groqFilter, params).pipe( + map((state) => (state.documentIds || []) as string[]), + mergeMapArray((id) => documentPreviewStore.unstable_observeDocument(id)), + map((docs) => ({loading: false, documents: docs as SanityDocument[]})), + ) + }, [documentPreviewStore, groqFilter, params]) + return useObservable(observable, INITIAL_VALUE) +} diff --git a/packages/sanity/src/core/preview/useObserveDocument.ts b/packages/sanity/src/core/preview/useObserveDocument.ts new file mode 100644 index 00000000000..7d386265984 --- /dev/null +++ b/packages/sanity/src/core/preview/useObserveDocument.ts @@ -0,0 +1,32 @@ +import {type SanityDocument} from '@sanity/types' +import {useMemo} from 'react' +import {useObservable} from 'react-rx' +import {map} from 'rxjs/operators' + +import {useDocumentPreviewStore} from '../store/_legacy/datastores' + +const INITIAL_STATE = {loading: true, document: null} + +/** + * @internal + * @beta + * + * Observes a document by its ID and returns the document and loading state + * it will listen to the document changes. + */ +export function useObserveDocument( + documentId: string, +): { + document: T | null + loading: boolean +} { + const documentPreviewStore = useDocumentPreviewStore() + const observable = useMemo( + () => + documentPreviewStore + .unstable_observeDocument(documentId) + .pipe(map((document) => ({loading: false, document: document as T}))), + [documentId, documentPreviewStore], + ) + return useObservable(observable, INITIAL_STATE) +} diff --git a/packages/sanity/src/core/preview/useValuePreview.ts b/packages/sanity/src/core/preview/useValuePreview.ts index 5b70deb514f..863dc2a54e7 100644 --- a/packages/sanity/src/core/preview/useValuePreview.ts +++ b/packages/sanity/src/core/preview/useValuePreview.ts @@ -1,7 +1,7 @@ import {type PreviewValue, type SchemaType, type SortOrdering} from '@sanity/types' import {useMemo} from 'react' import {useObservable} from 'react-rx' -import {of} from 'rxjs' +import {type Observable, of} from 'rxjs' import {catchError, map} from 'rxjs/operators' import {useDocumentPreviewStore} from '../store' @@ -32,13 +32,16 @@ function useDocumentPreview(props: { }): State { const {enabled = true, ordering, schemaType, value: previewValue} = props || {} const {observeForPreview} = useDocumentPreviewStore() - const observable = useMemo(() => { + const observable = useMemo>(() => { if (!enabled || !previewValue || !schemaType) return of(PENDING_STATE) - return observeForPreview(previewValue as Previewable, schemaType, {ordering}).pipe( + return observeForPreview(previewValue as Previewable, schemaType, { + viewOptions: {ordering: ordering}, + }).pipe( map((event) => ({isLoading: false, value: event.snapshot || undefined})), catchError((error) => of({isLoading: false, error})), ) - }, [enabled, observeForPreview, ordering, previewValue, schemaType]) + }, [enabled, previewValue, schemaType, observeForPreview, ordering]) + return useObservable(observable, INITIAL_STATE) } diff --git a/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts b/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts new file mode 100644 index 00000000000..0c1be69450c --- /dev/null +++ b/packages/sanity/src/core/preview/utils/applyMendozaPatch.ts @@ -0,0 +1,18 @@ +import {type SanityDocument} from '@sanity/types' +import {applyPatch, type RawPatch} from 'mendoza' + +function omitRev(document: SanityDocument | undefined) { + if (document === undefined) { + return undefined + } + const {_rev, ...doc} = document + return doc +} + +export function applyMendozaPatch( + document: SanityDocument | undefined, + patch: RawPatch, +): SanityDocument | undefined { + const next = applyPatch(omitRev(document), patch) + return next === null ? undefined : next +} diff --git a/packages/sanity/src/core/preview/utils/replayLatest.test.ts b/packages/sanity/src/core/preview/utils/replayLatest.test.ts new file mode 100644 index 00000000000..03796d77b38 --- /dev/null +++ b/packages/sanity/src/core/preview/utils/replayLatest.test.ts @@ -0,0 +1,37 @@ +import {expect, test} from '@jest/globals' +import {concat, from, lastValueFrom, of, share, timer} from 'rxjs' +import {concatMap, delay, mergeMap, take, toArray} from 'rxjs/operators' + +import {shareReplayLatest} from './shareReplayLatest' + +test('replayLatest() replays matching value to new subscribers', async () => { + const observable = from(['foo', 'bar', 'baz']).pipe( + concatMap((value) => of(value).pipe(delay(100))), + share(), + shareReplayLatest((v) => v === 'foo'), + ) + + const result = observable.pipe( + mergeMap((value) => + value === 'bar' ? concat(of(value), observable.pipe(take(1))) : of(value), + ), + toArray(), + ) + expect(await lastValueFrom(result)).toEqual(['foo', 'bar', 'foo', 'baz']) +}) + +test('replayLatest() doesnt keep the replay value after resets', async () => { + const observable = timer(0, 10).pipe( + shareReplayLatest({ + resetOnRefCountZero: true, + resetOnComplete: true, + predicate: (v) => v < 2, + }), + ) + + const result = observable.pipe(take(5), toArray()) + expect(await lastValueFrom(result)).toEqual([0, 1, 2, 3, 4]) + + const resultAfter = observable.pipe(take(5), toArray()) + expect(await lastValueFrom(resultAfter)).toEqual([0, 1, 2, 3, 4]) +}) diff --git a/packages/sanity/src/core/preview/utils/shareReplayLatest.ts b/packages/sanity/src/core/preview/utils/shareReplayLatest.ts new file mode 100644 index 00000000000..7707c410fe7 --- /dev/null +++ b/packages/sanity/src/core/preview/utils/shareReplayLatest.ts @@ -0,0 +1,70 @@ +import { + finalize, + merge, + type MonoTypeOperatorFunction, + Observable, + share, + type ShareConfig, + tap, +} from 'rxjs' + +export type ShareReplayLatestConfig = ShareConfig & {predicate: (value: T) => boolean} + +/** + * A variant of share that takes a predicate function to determine which value to replay to new subscribers + * @param predicate - Predicate function to determine which value to replay + */ +export function shareReplayLatest(predicate: (value: T) => boolean): MonoTypeOperatorFunction + +/** + * A variant of share that takes a predicate function to determine which value to replay to new subscribers + * @param config - ShareConfig with additional predicate function + */ +export function shareReplayLatest( + config: ShareReplayLatestConfig, +): MonoTypeOperatorFunction + +/** + * A variant of share that takes a predicate function to determine which value to replay to new subscribers + * @param configOrPredicate - Predicate function to determine which value to replay + * @param config - Optional ShareConfig + */ +export function shareReplayLatest( + configOrPredicate: ShareReplayLatestConfig | ShareReplayLatestConfig['predicate'], + config?: ShareConfig, +) { + return _shareReplayLatest( + typeof configOrPredicate === 'function' + ? {predicate: configOrPredicate, ...config} + : configOrPredicate, + ) +} +function _shareReplayLatest(config: ShareReplayLatestConfig): MonoTypeOperatorFunction { + return (source: Observable) => { + let latest: T | undefined + let emitted = false + + const {predicate, ...shareConfig} = config + + const wrapped = source.pipe( + tap((value) => { + if (config.predicate(value)) { + emitted = true + latest = value + } + }), + finalize(() => { + emitted = false + latest = undefined + }), + share(shareConfig), + ) + const emitLatest = new Observable((subscriber) => { + if (emitted) { + subscriber.next(latest) + } + subscriber.complete() + }) + return merge(wrapped, emitLatest) + } +} From 9673eafc377044c2b2a260eca0933cfca499c918 Mon Sep 17 00:00:00 2001 From: Jordan Lawrence Date: Mon, 19 Aug 2024 16:41:45 +0100 Subject: [PATCH 2/2] chore(core): introduce improvements to efficiency of listeners to preview store --- pnpm-lock.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d00dbd2e45f..67c2bad7851 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1659,6 +1659,9 @@ importers: rxjs-exhaustmap-with-trailing: specifier: ^2.1.1 version: 2.1.1(rxjs@7.8.1) + rxjs-mergemap-array: + specifier: ^0.1.0 + version: 0.1.0(rxjs@7.8.1) sanity-diff-patch: specifier: ^3.0.2 version: 3.0.2 @@ -10016,6 +10019,12 @@ packages: peerDependencies: rxjs: 7.x + rxjs-mergemap-array@0.1.0: + resolution: {integrity: sha512-19fXxPXN4X8LPWu7fg/nyX+nr0G97qSNXhEvF32cdgWuoyUVQ4MrFr+UL4HGip6iO5kbZOL4puAjPeQ/D5qSlA==} + engines: {node: '>=18.0.0'} + peerDependencies: + rxjs: 7.x + rxjs@6.6.7: resolution: {integrity: sha512-hTdwr+7yYNIT5n4AMYp85KA6yw2Va0FLa3Rguvbpa4W3I5xynaBZo41cM3XM+4Q6fRMj3sBYIR1VAmZMXYJvRQ==} engines: {npm: '>=2.0.0'} @@ -21452,6 +21461,10 @@ snapshots: dependencies: rxjs: 7.8.1 + rxjs-mergemap-array@0.1.0(rxjs@7.8.1): + dependencies: + rxjs: 7.8.1 + rxjs@6.6.7: dependencies: tslib: 1.14.1