Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(core): introduce improvements to efficiency of listeners to preview store #7383

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/sanity/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@
"rimraf": "^3.0.2",
"rxjs": "^7.8.0",
"rxjs-exhaustmap-with-trailing": "^2.1.1",
"rxjs-mergemap-array": "^0.1.0",
"sanity-diff-patch": "^3.0.2",
"scroll-into-view-if-needed": "^3.0.3",
"semver": "^7.3.5",
Expand Down
8 changes: 6 additions & 2 deletions packages/sanity/src/core/preview/availability.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,21 @@ function mutConcat<T>(array: T[], chunks: T[]) {
return array
}

export function create_preview_availability(
export function createPreviewAvailabilityObserver(
versionedClient: SanityClient,
observePaths: ObservePathsFn,
): {
observeDocumentPairAvailability(id: string): Observable<DraftsModelDocumentAvailability>
observeDocumentPairAvailability(
id: string,
options?: {version?: string},
): Observable<DraftsModelDocumentAvailability>
} {
/**
* Returns an observable of metadata for a given drafts model document
*/
function observeDocumentPairAvailability(
id: string,
{version}: {version?: string} = {},
): Observable<DraftsModelDocumentAvailability> {
const draftId = getDraftId(id)
const publishedId = getPublishedId(id)
Expand Down
6 changes: 6 additions & 0 deletions packages/sanity/src/core/preview/constants.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import {type PreviewValue} from '@sanity/types'
export const INCLUDE_FIELDS_QUERY = ['_id', '_rev', '_type']
export const INCLUDE_FIELDS = [...INCLUDE_FIELDS_QUERY, '_key']

/**
* How long to wait after the last subscriber has unsubscribed before resetting the observable and disconnecting the listener
* We want to keep the listener alive for a short while after the last subscriber has unsubscribed to avoid unnecessary reconnects
*/
export const LISTENER_RESET_DELAY = 2000

export const AVAILABILITY_READABLE = {
available: true,
reason: 'READABLE',
Expand Down
32 changes: 32 additions & 0 deletions packages/sanity/src/core/preview/createGlobalListener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import {type SanityClient} from '@sanity/client'
import {timer} from 'rxjs'

import {LISTENER_RESET_DELAY} from './constants'
import {shareReplayLatest} from './utils/shareReplayLatest'

/**
* @internal
* Creates a listener that will emit 'welcome' for all new subscribers immediately, and thereafter emit at every mutation event
*/
export function createGlobalListener(client: SanityClient) {
return client
.listen(
'*[!(_id in path("_.**"))]',
{},
{
events: ['welcome', 'mutation', 'reconnect'],
includeResult: false,
includePreviousRevision: false,
includeMutations: false,
visibility: 'query',
effectFormat: 'mendoza',
tag: 'preview.global',
},
)
.pipe(
shareReplayLatest({
predicate: (event) => event.type === 'welcome' || event.type === 'reconnect',
resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY),
}),
)
}
87 changes: 87 additions & 0 deletions packages/sanity/src/core/preview/createObserveDocument.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {type MutationEvent, type SanityClient, type WelcomeEvent} from '@sanity/client'
import {type SanityDocument} from '@sanity/types'
import {memoize, uniq} from 'lodash'
import {EMPTY, finalize, type Observable, of} from 'rxjs'
import {concatMap, map, scan, shareReplay} from 'rxjs/operators'

import {type ApiConfig} from './types'
import {applyMendozaPatch} from './utils/applyMendozaPatch'
import {debounceCollect} from './utils/debounceCollect'

export function createObserveDocument({
mutationChannel,
client,
}: {
client: SanityClient
mutationChannel: Observable<WelcomeEvent | MutationEvent>
}) {
const getBatchFetcher = memoize(
function getBatchFetcher(apiConfig: {dataset: string; projectId: string}) {
const _client = client.withConfig(apiConfig)

function batchFetchDocuments(ids: [string][]) {
return _client.observable
.fetch(`*[_id in $ids]`, {ids: uniq(ids.flat())}, {tag: 'preview.observe-document'})
.pipe(
// eslint-disable-next-line max-nested-callbacks
map((result) => ids.map(([id]) => result.find((r: {_id: string}) => r._id === id))),
)
}
return debounceCollect(batchFetchDocuments, 100)
},
(apiConfig) => apiConfig.dataset + apiConfig.projectId,
)

const MEMO: Record<string, Observable<SanityDocument | undefined>> = {}

function observeDocument(id: string, apiConfig?: ApiConfig) {
const _apiConfig = apiConfig || {
dataset: client.config().dataset!,
projectId: client.config().projectId!,
}
const fetchDocument = getBatchFetcher(_apiConfig)
return mutationChannel.pipe(
concatMap((event) => {
if (event.type === 'welcome') {
return fetchDocument(id).pipe(map((document) => ({type: 'sync' as const, document})))
}
return event.documentId === id ? of(event) : EMPTY
}),
scan((current: SanityDocument | undefined, event) => {
if (event.type === 'sync') {
return event.document
}
if (event.type === 'mutation') {
return applyMutationEvent(current, event)
}
//@ts-expect-error - this should never happen
throw new Error(`Unexpected event type: "${event.type}"`)
}, undefined),
)
}
return function memoizedObserveDocument(id: string, apiConfig?: ApiConfig) {
const key = apiConfig ? `${id}-${JSON.stringify(apiConfig)}` : id
if (!(key in MEMO)) {
MEMO[key] = observeDocument(id, apiConfig).pipe(
finalize(() => delete MEMO[key]),
shareReplay({bufferSize: 1, refCount: true}),
)
}
return MEMO[key]
}
}

function applyMutationEvent(current: SanityDocument | undefined, event: MutationEvent) {
if (event.previousRev !== current?._rev) {
console.warn('Document out of sync, skipping mutation')
return current
}
if (!event.effects) {
throw new Error(
'Mutation event is missing effects. Is the listener set up with effectFormat=mendoza?',
)
}
const next = applyMendozaPatch(current, event.effects.apply)
// next will be undefined in case of deletion
return next ? {...next, _rev: event.resultRev} : undefined
}
9 changes: 7 additions & 2 deletions packages/sanity/src/core/preview/createPathObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,13 @@ function normalizePaths(path: (FieldName | PreviewPath)[]): PreviewPath[] {
)
}

export function createPathObserver(context: {observeFields: ObserveFieldsFn}) {
const {observeFields} = context
/**
* Creates a function that allows observing nested paths on a document.
* If the path includes a reference, the reference will be "followed", allowing for selecting paths within the referenced document.
* @param options - Options - Requires a function that can observe fields on a document
* */
export function createPathObserver(options: {observeFields: ObserveFieldsFn}) {
const {observeFields} = options

return {
observePaths(
Expand Down
22 changes: 14 additions & 8 deletions packages/sanity/src/core/preview/createPreviewObserver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ export function createPreviewObserver(context: {
}): (
value: Previewable,
type: PreviewableType,
viewOptions?: PrepareViewOptions,
apiConfig?: ApiConfig,
options?: {
viewOptions?: PrepareViewOptions
apiConfig?: ApiConfig
},
) => Observable<PreparedSnapshot> {
const {observeDocumentTypeFromId, observePaths} = context

return function observeForPreview(
value: Previewable,
type: PreviewableType,
viewOptions?: PrepareViewOptions,
apiConfig?: ApiConfig,
options?: {
viewOptions?: PrepareViewOptions
apiConfig?: ApiConfig
},
): Observable<PreparedSnapshot> {
if (isCrossDatasetReferenceSchemaType(type)) {
// if the value is of type crossDatasetReference, but has no _ref property, we cannot prepare any value for the preview
Expand All @@ -57,7 +61,7 @@ export function createPreviewObserver(context: {
switchMap((typeName) => {
if (typeName) {
const refType = type.to.find((toType) => toType.type === typeName)
return observeForPreview(value, refType as any, {}, refApiConfig)
return observeForPreview(value, refType as any, {apiConfig: refApiConfig})
}
return observableOf({snapshot: undefined})
}),
Expand Down Expand Up @@ -88,10 +92,10 @@ export function createPreviewObserver(context: {
}
const paths = getPreviewPaths(type.preview)
if (paths) {
return observePaths(value, paths, apiConfig).pipe(
return observePaths(value, paths, options?.apiConfig).pipe(
map((snapshot) => ({
type: type,
snapshot: snapshot && prepareForPreview(snapshot, type as any, viewOptions),
snapshot: snapshot && prepareForPreview(snapshot, type as any, options?.viewOptions),
})),
)
}
Expand All @@ -103,7 +107,9 @@ export function createPreviewObserver(context: {
return observableOf({
type,
snapshot:
value && isRecord(value) ? invokePrepare(type, value, viewOptions).returnValue : null,
value && isRecord(value)
? invokePrepare(type, value, options?.viewOptions).returnValue
: null,
})
}
}
4 changes: 2 additions & 2 deletions packages/sanity/src/core/preview/documentPair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {combineLatest, type Observable, of} from 'rxjs'
import {map, switchMap} from 'rxjs/operators'

import {getIdPair, isRecord} from '../util'
import {create_preview_availability} from './availability'
import {createPreviewAvailabilityObserver} from './availability'
import {type DraftsModelDocument, type ObservePathsFn, type PreviewPath} from './types'

export function create_preview_documentPair(
Expand All @@ -16,7 +16,7 @@ export function create_preview_documentPair(
paths: PreviewPath[],
) => Observable<DraftsModelDocument<T>>
} {
const {observeDocumentPairAvailability} = create_preview_availability(
const {observeDocumentPairAvailability} = createPreviewAvailabilityObserver(
versionedClient,
observePaths,
)
Expand Down
Loading
Loading