Skip to content

Commit

Permalink
Replace AbortSignal with "stop token" to rescue some aborting behavior (
Browse files Browse the repository at this point in the history
  • Loading branch information
cmdcolin authored Nov 19, 2024
1 parent 53da3f5 commit 1cccc87
Show file tree
Hide file tree
Showing 100 changed files with 1,201 additions and 1,427 deletions.
2 changes: 2 additions & 0 deletions config/jest/url.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
URL.createObjectURL = () => `${Math.random()}`
URL.revokeObjectURL = () => {}
53 changes: 34 additions & 19 deletions packages/core/assemblyManager/assembly.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,10 @@ async function loadRefNameMap(
assembly: Assembly,
adapterConfig: unknown,
options: BaseOptions,
signal?: AbortSignal,
stopToken?: string,
) {
const { sessionId } = options
await when(() => !!(assembly.regions && assembly.refNameAliases), {
signal,
name: 'when assembly ready',
})

Expand All @@ -71,7 +70,7 @@ async function loadRefNameMap(
'CoreGetRefNames',
{
adapterConfig,
signal,
stopToken,
...options,
},
{ timeout: 1000000 },
Expand Down Expand Up @@ -139,18 +138,14 @@ export default function assemblyFactory(
cache: new QuickLRU({ maxSize: 1000 }),

// @ts-expect-error
// TODO:ABORT (possible? desirable??)
async fill(
args: CacheData,
signal?: AbortSignal,
_stopToken?: string,
statusCallback?: (arg: string) => void,
) {
const { adapterConf, self, options } = args
return loadRefNameMap(
self,
adapterConf,
{ ...options, statusCallback },
signal,
)
return loadRefNameMap(self, adapterConf, { ...options, statusCallback })
},
})
return types
Expand All @@ -161,11 +156,29 @@ export default function assemblyFactory(
configuration: types.safeReference(assemblyConfigType),
})
.volatile(() => ({
/**
* #volatile
*/
error: undefined as unknown,
/**
* #volatile
*/
loadingP: undefined as Promise<void> | undefined,
/**
* #volatile
*/
volatileRegions: undefined as BasicRegion[] | undefined,
/**
* #volatile
*/
refNameAliases: undefined as RefNameAliases | undefined,
/**
* #volatile
*/
lowerCaseRefNameAliases: undefined as RefNameAliases | undefined,
/**
* #volatile
*/
cytobands: undefined as Feature[] | undefined,
}))
.views(self => ({
Expand All @@ -179,6 +192,8 @@ export default function assemblyFactory(
.views(self => ({
/**
* #getter
* this is a getter with a side effect of loading the data. not the best
* practice, but it helps to lazy load the assembly
*/
get initialized() {
// @ts-expect-error
Expand Down Expand Up @@ -216,7 +231,7 @@ export default function assemblyFactory(
return self.getConf('displayName')
},
/**
* #getter
* #method
*/
hasName(name: string) {
return this.allAliases.includes(name)
Expand Down Expand Up @@ -453,7 +468,7 @@ export default function assemblyFactory(
* #method
*/
getAdapterMapEntry(adapterConf: AdapterConf, options: BaseOptions) {
const { signal, statusCallback, ...rest } = options
const { stopToken, statusCallback, ...rest } = options
if (!options.sessionId) {
throw new Error('sessionId is required')
}
Expand All @@ -465,7 +480,7 @@ export default function assemblyFactory(
options: rest,
} as CacheData,

// signal intentionally not passed here, fixes issues like #2221.
// stopToken intentionally not passed here, fixes issues like #2221.
// alternative fix #2540 was proposed but non-working currently
undefined,
statusCallback,
Expand Down Expand Up @@ -504,11 +519,11 @@ export default function assemblyFactory(
async function getRefNameAliases({
config,
pluginManager,
signal,
stopToken,
}: {
config: AnyConfigurationModel
pluginManager: PluginManager
signal?: AbortSignal
stopToken?: string
}) {
const type = pluginManager.getAdapterType(config.type)!
const CLASS = await type.getAdapterClass()
Expand All @@ -517,7 +532,7 @@ async function getRefNameAliases({
undefined,
pluginManager,
) as BaseRefNameAliasAdapter
return adapter.getRefNameAliases({ signal })
return adapter.getRefNameAliases({ stopToken })
}

async function getCytobands({
Expand All @@ -538,16 +553,16 @@ async function getCytobands({
async function getAssemblyRegions({
config,
pluginManager,
signal,
stopToken,
}: {
config: AnyConfigurationModel
pluginManager: PluginManager
signal?: AbortSignal
stopToken?: string
}) {
const type = pluginManager.getAdapterType(config.type)!
const CLASS = await type.getAdapterClass()
const adapter = new CLASS(config, undefined, pluginManager) as RegionsAdapter
return adapter.getRegions({ signal })
return adapter.getRegions({ stopToken })
}

export type AssemblyModel = ReturnType<typeof assemblyFactory>
Expand Down
4 changes: 2 additions & 2 deletions packages/core/assemblyManager/assemblyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ function assemblyManagerFactory(conf: IAnyType, pm: PluginManager) {
async getRefNameMapForAdapter(
adapterConf: AdapterConf,
assemblyName: string | undefined,
opts: { signal?: AbortSignal; sessionId: string },
opts: { stopToken?: string; sessionId: string },
) {
if (assemblyName) {
const asm = await this.waitForAssembly(assemblyName)
Expand All @@ -136,7 +136,7 @@ function assemblyManagerFactory(conf: IAnyType, pm: PluginManager) {
async getReverseRefNameMapForAdapter(
adapterConf: AdapterConf,
assemblyName: string | undefined,
opts: { signal?: AbortSignal; sessionId: string },
opts: { stopToken?: string; sessionId: string },
) {
if (assemblyName) {
const asm = await this.waitForAssembly(assemblyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import { BaseAdapter } from './BaseAdapter'
import { BaseOptions } from './BaseOptions'
import { FeatureDensityStats } from './types'
import { ObservableCreate } from '../../util/rxjs'
import { checkAbortSignal, sum, max, min } from '../../util'
import { sum, max, min } from '../../util'
import { Feature } from '../../util/simpleFeature'
import { AugmentedRegion as Region } from '../../util/types'
import { blankStats, rectifyStats, scoresToStats } from '../../util/stats'
import { checkStopToken } from '../../util/stopToken'

/**
* Base class for feature adapters to extend. Defines some methods that
Expand Down Expand Up @@ -82,7 +83,7 @@ export abstract class BaseFeatureDataAdapter extends BaseAdapter {
public getFeaturesInRegion(region: Region, opts: BaseOptions = {}) {
return ObservableCreate<Feature>(async observer => {
const hasData = await this.hasDataForRefName(region.refName, opts)
checkAbortSignal(opts.signal)
checkStopToken(opts.stopToken)
if (!hasData) {
observer.complete()
} else {
Expand Down
4 changes: 2 additions & 2 deletions packages/core/data_adapters/BaseAdapter/BaseOptions.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface BaseOptions {
signal?: AbortSignal
stopToken?: string
bpPerPx?: number
sessionId?: string
statusCallback?: (message: string) => void
Expand All @@ -12,7 +12,7 @@ export type SearchType = 'full' | 'prefix' | 'exact'
export interface BaseTextSearchArgs {
queryString: string
searchType?: SearchType
signal?: AbortSignal
stopToken?: string
limit?: number
pageNumber?: number
}
4 changes: 2 additions & 2 deletions packages/core/data_adapters/BaseAdapter/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export interface BaseOptions {
signal?: AbortSignal
stopToken?: string
bpPerPx?: number
sessionId?: string
statusCallback?: (message: string) => void
Expand All @@ -12,7 +12,7 @@ export type SearchType = 'full' | 'prefix' | 'exact'
export interface BaseTextSearchArgs {
queryString: string
searchType?: SearchType
signal?: AbortSignal
stopToken?: string
limit?: number
pageNumber?: number
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/pluggableElementTypes/RpcMethodType.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ test('test serialize arguments with augmentLocationObject', async () => {
testLocation: locationInAdapter,
},
filters: [],
signal: 'teststring',
stopToken: 'teststring',
randomProperty: 'randomstring',
parentObject: {
nestedObject: {
Expand Down
28 changes: 6 additions & 22 deletions packages/core/pluggableElementTypes/RpcMethodType.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ import {
UriLocation,
} from '../util/types'

import {
deserializeAbortSignal,
isRemoteAbortSignal,
RemoteAbortSignal,
} from '../rpc/remoteAbortSignals'

interface SerializedArgs {
signal?: RemoteAbortSignal
blobMap?: Record<string, File>
}
export type RpcMethodConstructor = new (pm: PluginManager) => RpcMethodType

export default abstract class RpcMethodType extends PluggableElementBase {
Expand Down Expand Up @@ -58,21 +48,15 @@ export default abstract class RpcMethodType extends PluggableElementBase {
return loc
}

async deserializeArguments<T extends SerializedArgs>(
serializedArgs: T,
async deserializeArguments<T>(
args: T & { blobMap?: Record<string, File> },
_rpcDriverClassName: string,
) {
if (serializedArgs.blobMap) {
setBlobMap(serializedArgs.blobMap)
): Promise<T> {
if (args.blobMap) {
setBlobMap(args.blobMap)
}
const { signal } = serializedArgs

return {
...serializedArgs,
signal: isRemoteAbortSignal(signal)
? deserializeAbortSignal(signal)
: undefined,
}
return args
}

abstract execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import clone from 'clone'
import { firstValueFrom } from 'rxjs'

// locals
import { checkAbortSignal, iterMap } from '../../util'
import { iterMap } from '../../util'
import SimpleFeature, {
Feature,
SimpleFeatureSerialized,
Expand All @@ -20,6 +20,7 @@ import ServerSideRendererType, {
} from './ServerSideRendererType'
import { isFeatureAdapter } from '../../data_adapters/BaseAdapter'
import { AnyConfigurationModel } from '../../configuration'
import { checkStopToken } from '../../util/stopToken'

export interface RenderArgs extends ServerSideRenderArgs {
displayModel?: {
Expand Down Expand Up @@ -146,7 +147,7 @@ export default class FeatureRendererType extends ServerSideRendererType {
renderArgs: RenderArgsDeserialized,
): Promise<Map<string, Feature>> {
const pm = this.pluginManager
const { signal, regions, sessionId, adapterConfig } = renderArgs
const { stopToken, regions, sessionId, adapterConfig } = renderArgs
const { dataAdapter } = await getAdapter(pm, sessionId, adapterConfig)
if (!isFeatureAdapter(dataAdapter)) {
throw new Error('Adapter does not support retrieving features')
Expand Down Expand Up @@ -176,7 +177,7 @@ export default class FeatureRendererType extends ServerSideRendererType {
: dataAdapter.getFeaturesInMultipleRegions(requestRegions, renderArgs)

const feats = await firstValueFrom(featureObservable.pipe(toArray()))
checkAbortSignal(signal)
checkStopToken(stopToken)
return new Map<string, Feature>(
feats
.filter(feat => this.featurePassesFilters(renderArgs, feat))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
} from 'mobx-state-tree'

// locals
import { checkAbortSignal, getSerializedSvg, updateStatus } from '../../util'
import { getSerializedSvg, updateStatus } from '../../util'
import SerializableFilterChain, {
SerializedFilterChain,
} from './util/serializableFilterChain'
Expand All @@ -20,12 +20,13 @@ import { createJBrowseTheme } from '../../ui'

import RendererType, { RenderProps, RenderResults } from './RendererType'
import ServerSideRenderedContent from './ServerSideRenderedContent'
import { checkStopToken } from '../../util/stopToken'

interface BaseRenderArgs extends RenderProps {
sessionId: string
// Note that signal serialization happens after serializeArgsInClient and
// Note that stopToken serialization happens after serializeArgsInClient and
// deserialization happens before deserializeArgsInWorker
signal?: AbortSignal
stopToken?: string
theme: ThemeOptions
exportSVG?: {
rasterizeLayers?: boolean
Expand Down Expand Up @@ -189,13 +190,13 @@ export default class ServerSideRenderer extends RendererType {
* @param args - serialized render args
*/
async renderInWorker(args: RenderArgsSerialized): Promise<ResultsSerialized> {
const { signal, statusCallback = () => {} } = args
const { stopToken, statusCallback = () => {} } = args
const deserializedArgs = this.deserializeArgsInWorker(args)

const results = await updateStatus('Rendering plot', statusCallback, () =>
this.render(deserializedArgs),
)
checkAbortSignal(signal)
checkStopToken(stopToken)

// serialize the results for passing back to the main thread.
// these will be transmitted to the main process, and will come out
Expand Down
Loading

0 comments on commit 1cccc87

Please sign in to comment.