diff --git a/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts b/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts index 86bd8ef4..5010c8fb 100644 --- a/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts +++ b/packages/nitro-node/src/node/engine/chainservice/eth-chainservice.ts @@ -6,7 +6,9 @@ import { WaitGroup } from '@jpwilliams/waitgroup'; import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel'; import type { Log } from '@ethersproject/abstract-provider'; import Channel from '@cerc-io/ts-channel'; -import { EthClient, go, hex2Bytes } from '@cerc-io/nitro-util'; +import { + EthClient, go, hex2Bytes, Context, +} from '@cerc-io/nitro-util'; import { ChainService, ChainEvent, DepositedEvent, ConcludedEvent, AllocationUpdatedEvent, @@ -68,9 +70,9 @@ export class EthChainService implements ChainService { private logger: debug.Debugger; - private ctx: AbortController; + private ctx: Context; - private cancel: (reason ?: any) => void; + private cancel: () => void; private wg?: WaitGroup; @@ -83,7 +85,7 @@ export class EthChainService implements ChainService { txSigner: ethers.Signer, out: ReadWriteChannel, logger: debug.Debugger, - ctx: AbortController, + ctx: Context, cancel: () => void, wg: WaitGroup, ) { @@ -150,8 +152,8 @@ export class EthChainService implements ChainService { txSigner: ethers.Signer, logDestination?: WritableStream, ): EthChainService { - const ctx = new AbortController(); - const cancelCtx = ctx.abort.bind(ctx); + const ctx = new Context(); + const cancelCtx = ctx.withCancel(); const out = Channel(10); @@ -182,21 +184,15 @@ export class EthChainService implements ChainService { // listenForErrors listens for errors on the error channel and attempts to handle them if they occur. // TODO: Currently "handle" is panicking - private async listenForErrors(ctx: AbortController, errChan: ReadChannel): Promise { - // Channel to implement ctx.Done() - const ctxDone = Channel(); - this.ctx.signal.addEventListener('abort', (event) => { - ctxDone.close(); - }); - + private async listenForErrors(ctx: Context, errChan: ReadChannel): Promise { /* eslint-disable no-await-in-loop */ /* eslint-disable default-case */ while (true) { switch (await Channel.select([ - ctxDone.shift(), + this.ctx.done.shift(), errChan.shift(), ])) { - case ctxDone: { + case this.ctx.done: { this.wg!.done(); return; } @@ -361,12 +357,6 @@ export class EthChainService implements ChainService { query: ethers.providers.EventType, listener: (eventLog: Log) => void, ) { - // Channel to implement ctx.Done() - const ctxDone = Channel(); - this.ctx.signal.addEventListener('abort', (event) => { - ctxDone.close(); - }); - /* eslint-disable no-restricted-syntax */ /* eslint-disable no-labels */ out: @@ -374,11 +364,11 @@ export class EthChainService implements ChainService { /* eslint-disable no-await-in-loop */ /* eslint-disable default-case */ switch (await Channel.select([ - ctxDone.shift(), + this.ctx.done.shift(), subErr.shift(), logs.shift(), ])) { - case ctxDone: { + case this.ctx.done: { subUnsubscribe(); this.wg!.done(); return; diff --git a/packages/nitro-node/src/node/engine/engine.ts b/packages/nitro-node/src/node/engine/engine.ts index 40664930..66e02e6a 100644 --- a/packages/nitro-node/src/node/engine/engine.ts +++ b/packages/nitro-node/src/node/engine/engine.ts @@ -8,7 +8,7 @@ import { WaitGroup } from '@jpwilliams/waitgroup'; import Channel from '@cerc-io/ts-channel'; import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel'; -import { JSONbigNative, go } from '@cerc-io/nitro-util'; +import { JSONbigNative, go, Context } from '@cerc-io/nitro-util'; import { MessageService } from './messageservice/messageservice'; import { ChainService, ChainEvent, ChainEventHandler } from './chainservice/chainservice'; @@ -217,8 +217,8 @@ export class Engine { e.wg = new WaitGroup(); - const ctx = new AbortController(); - e.cancel = ctx.abort.bind(ctx); + const ctx = new Context(); + e.cancel = ctx.withCancel(); e.wg.add(1); go(e.run.bind(e), ctx); @@ -255,7 +255,7 @@ export class Engine { // run kicks of an infinite loop that waits for communications on the supplied channels, and handles them accordingly // The loop exits when the context is cancelled. - async run(ctx: AbortController): Promise { + async run(ctx: Context): Promise { assert(this.objectiveRequestsFromAPI); assert(this.paymentRequestsFromAPI); assert(this.fromChain); @@ -263,12 +263,6 @@ export class Engine { assert(this.fromLedger); assert(this._toApi); - // Channel to implement ctx.Done() - const ctxDone = Channel(); - ctx.signal.addEventListener('abort', (event) => { - ctxDone.close(); - }); - while (true) { let res = new EngineEvent(); let err: Error | null = null; @@ -287,7 +281,7 @@ export class Engine { this.fromChain.shift(), this.fromMsg.shift(), this.fromLedger.shift(), - ctxDone.shift(), + ctx.done.shift(), ])) { case this.objectiveRequestsFromAPI: [res, err] = await this.handleObjectiveRequest(this.objectiveRequestsFromAPI.value()); @@ -309,7 +303,7 @@ export class Engine { [res, err] = await this.handleProposal(this.fromLedger.value()); break; - case ctxDone: { + case ctx.done: { this.wg!.done(); return; } diff --git a/packages/nitro-node/src/node/node.ts b/packages/nitro-node/src/node/node.ts index 11e93582..46f263f1 100644 --- a/packages/nitro-node/src/node/node.ts +++ b/packages/nitro-node/src/node/node.ts @@ -5,7 +5,7 @@ import { WaitGroup } from '@jpwilliams/waitgroup'; import type { ReadChannel, ReadWriteChannel } from '@cerc-io/ts-channel'; import Channel from '@cerc-io/ts-channel'; -import { go, randUint64 } from '@cerc-io/nitro-util'; +import { go, randUint64, Context } from '@cerc-io/nitro-util'; import { MessageService } from './engine/messageservice/messageservice'; import { ChainService } from './engine/chainservice/chainservice'; @@ -105,8 +105,8 @@ export class Node { c.channelNotifier = ChannelNotifier.newChannelNotifier(store, c.vm); - const ctx = new AbortController(); - c.cancelEventHandler = ctx.abort.bind(ctx); + const ctx = new Context(); + c.cancelEventHandler = ctx.withCancel(); c.wg = new WaitGroup(); c.wg.add(1); @@ -204,20 +204,16 @@ export class Node { } // handleEngineEvents is responsible for monitoring the ToApi channel on the engine. - // It parses events from the ToApi chan and then dispatches events to the necessary node chan. - private async handleEngineEvents(ctx: AbortController) { - // Channel to implement ctx.Done() - const ctxDone = Channel(); - ctx.signal.onabort = () => { ctxDone.close(); }; - + // It parses events from the ToApi chan and then dispatches events to the necessary client chan. + private async handleEngineEvents(ctx: Context) { /* eslint-disable no-await-in-loop */ /* eslint-disable default-case */ while (true) { switch (await Channel.select([ - ctxDone.shift(), + ctx.done.shift(), this.engine.toApi.shift(), ])) { - case ctxDone: { + case ctx.done: { this.wg!.done(); return; } diff --git a/packages/nitro-util/package.json b/packages/nitro-util/package.json index d37f0059..1b07ea40 100644 --- a/packages/nitro-util/package.json +++ b/packages/nitro-util/package.json @@ -36,6 +36,7 @@ }, "dependencies": { "@statechannels/nitro-protocol": "^2.0.0-alpha.4", + "@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1", "assert": "^2.0.0", "debug": "^4.3.4", "ethers": "^5.7.2", diff --git a/packages/nitro-util/src/context.ts b/packages/nitro-util/src/context.ts new file mode 100644 index 00000000..ae5a6ee2 --- /dev/null +++ b/packages/nitro-util/src/context.ts @@ -0,0 +1,23 @@ +import type { ReadWriteChannel } from '@cerc-io/ts-channel'; +import Channel from '@cerc-io/ts-channel'; + +export class Context { + ctx: AbortController; + + done: ReadWriteChannel; + + constructor() { + this.ctx = new AbortController(); + this.done = Channel(); + + this.ctx.signal.addEventListener('abort', () => { + this.done.close(); + }); + } + + withCancel(): () => void { + return () => { + this.ctx.abort(); + }; + } +} diff --git a/packages/nitro-util/src/index.ts b/packages/nitro-util/src/index.ts index 531e9231..b705d202 100644 --- a/packages/nitro-util/src/index.ts +++ b/packages/nitro-util/src/index.ts @@ -7,6 +7,7 @@ export * from './contract-bindings'; export * from './types'; export * from './constants'; export * from './deploy-contracts'; +export * from './context'; export { INitroTypes, ExitFormat, DepositedEventObject, AllocationUpdatedEventObject, ConcludedEventObject,