diff --git a/.changeset/sixty-years-sit.md b/.changeset/sixty-years-sit.md new file mode 100644 index 0000000..6de0081 --- /dev/null +++ b/.changeset/sixty-years-sit.md @@ -0,0 +1,9 @@ +--- +'toaster': patch +'@diachronic/migrate': patch +'@diachronic/ci': patch +'@diachronic/feature-flag-client': patch +'@diachronic/toolbox': patch +--- + +migrate package logging diff --git a/examples/toaster/package.json b/examples/toaster/package.json index f8ee240..8467fe9 100644 --- a/examples/toaster/package.json +++ b/examples/toaster/package.json @@ -36,6 +36,9 @@ "webpack": "^5.88.2" }, "dependencies": { + "@diachronic/migrate": "workspace:^", + "@diachronic/util": "workspace:^", + "@diachronic/workflow": "workspace:^", "@effect/schema": "0.61.5", "@temporalio/testing": "1.8.6", "@temporalio/worker": "1.8.6", diff --git a/packages/migrate/src/clock.ts b/packages/migrate/src/clock.ts index 701030b..05f36fc 100644 --- a/packages/migrate/src/clock.ts +++ b/packages/migrate/src/clock.ts @@ -1,4 +1,3 @@ -import { SimulatedClock } from 'xstate' import * as Clock from 'effect/Clock' import { ClockTypeId } from 'effect/Clock' import * as Effect from 'effect/Effect' @@ -17,7 +16,7 @@ export interface XStateClockInterface { */ export class CustomClock implements XStateClockInterface, Clock.Clock { readonly [ClockTypeId]: ClockTypeId - public timeouts: Map = new Map() + public timeouts: Map = new Map() public getNowMs: () => number @@ -48,9 +47,9 @@ export class CustomClock implements XStateClockInterface, Clock.Clock { return id } - private _id: number = 0 + public _id: number = 0 - private getId() { + public getId() { return this._id++ } @@ -89,11 +88,70 @@ export class CustomClock implements XStateClockInterface, Clock.Clock { } } +// copied from xstate with public members +class SimulatedClock { + public timeouts: Map + public _now: number + public _id: number + + constructor() { + this.timeouts = new Map() + this._now = 0 + this._id = 0 + } + now() { + return this._now + } + getId() { + return this._id++ + } + setTimeout(fn: (...args: any[]) => void, timeout: number) { + const id = this.getId() + this.timeouts.set(id, { + start: this.now(), + timeout, + fn, + }) + return id + } + clearTimeout(id: number) { + this.timeouts.delete(id) + } + set(time: number) { + if (this._now > time) { + throw new Error('Unable to travel back in time') + } + this._now = time + this.flushTimeouts() + } + flushTimeouts() { + ;[...this.timeouts] + .sort(([_idA, timeoutA], [_idB, timeoutB]) => { + const endA = timeoutA.start + timeoutA.timeout + const endB = timeoutB.start + timeoutB.timeout + return endB > endA ? -1 : 1 + }) + .forEach(([id, timeout]) => { + if (this.now() - timeout.start >= timeout.timeout) { + this.timeouts.delete(id) + timeout.fn.call(null) + } + }) + } + increment(ms: number) { + this._now += ms + this.flushTimeouts() + } +} + /** * A clock that gets its current time value from a variable. * Timers are eligible to fire only on calls to `set` or `increment`. */ -export class TestClock extends SimulatedClock implements Clock.Clock { +export class TestClock + extends SimulatedClock + implements Clock.Clock, CustomClock +{ readonly [ClockTypeId]: ClockTypeId constructor() { @@ -145,4 +203,8 @@ export class TestClock extends SimulatedClock implements Clock.Clock { unsafeCurrentTimeMillis(): number { return this.now() } + + getNowMs() { + return this.now() + } } diff --git a/packages/migrate/src/index.ts b/packages/migrate/src/index.ts index 24b8ebf..9122b92 100644 --- a/packages/migrate/src/index.ts +++ b/packages/migrate/src/index.ts @@ -32,6 +32,7 @@ import { pipe } from 'effect/Function' import { dissoc } from 'ramda' import { decode } from '@diachronic/util/decode' import { ArrayFormatter } from '@effect/schema' +import { defaultLogImpl, Logger } from './logger' /** * Signal sent to a workflow that will cause it to migrate to a new version via ContinueAsNew @@ -80,12 +81,17 @@ export const getSnapshot = defineQuery(getSnapshotQueryName) const getContinuationXStateNode = ( state: StateValue, context: any, - machine: AnyStateMachine + machine: AnyStateMachine, + log: Logger ): AnyState | Error => { try { return machine.resolveStateValue(state, context) } catch (e) { - console.error('Could not resolve continuation state', e) + log.error( + 'Could not resolve continuation state', + { error: e }, + 'getContinuationXStateNode' + ) return e as Error } } @@ -232,11 +238,7 @@ export type DbFns = { */ onNewDbSnapshot: (dbSnapshotValue: DbSnapshot) => Promise } -const defaultLogImpl = { - debug: () => {}, - info: () => {}, - error: () => {}, -} + /** * Transforms Temporal signals into the form diachronic workflows consume * Request/response signals have their metadata mapped to an extra `meta` property when present @@ -356,7 +358,8 @@ export const makeWorkflow = < const continuationState = getContinuationXStateNode( continuationData.state, continuationData.context, - machine + machine, + log ) if (continuationState instanceof Error) { @@ -372,6 +375,7 @@ export const makeWorkflow = < interpreter = interpret(machine, { state: continuationState, clock: new CustomClock(), + log, }) const initialState = interpreter.getPersistedState() @@ -403,7 +407,8 @@ export const makeWorkflow = < interpreter, machine, continuationState, - delayEventsInStateNotRestored + delayEventsInStateNotRestored, + log ) } else { log.info( @@ -423,7 +428,8 @@ export const makeWorkflow = < interpreter, continuationData.timers, initialState.value, - machine + machine, + log ) } else { log.debug('Migration function provided no timers to restore') @@ -433,7 +439,7 @@ export const makeWorkflow = < currentContext = initialState.context } else { // todo. add clock to arguments? - interpreter = interpret(machine, { clock: new CustomClock() }) + interpreter = interpret(machine, { clock: new CustomClock(), log }) const initialState = interpreter.getPersistedState() if (!initialState) { throw new Error('Could not get persisted state') diff --git a/packages/migrate/src/interpreter.ts b/packages/migrate/src/interpreter.ts index 4f4d2b2..8abaa0a 100644 --- a/packages/migrate/src/interpreter.ts +++ b/packages/migrate/src/interpreter.ts @@ -1,4 +1,10 @@ -import { Actor, AnyState, EventFromLogic, StateValue } from 'xstate' +import { + Actor, + ActorOptions, + AnyState, + EventFromLogic, + StateValue, +} from 'xstate' import * as S from '@effect/schema/Schema' // @ts-ignore import { ActorStatus } from 'xstate/dist/declarations/src/interpreter' @@ -7,7 +13,6 @@ import { AnyActorRef, AnyStateMachine, EventObject, - InterpreterOptions, // @ts-ignore } from 'xstate/dist/declarations/src/types' import { CustomClock } from './clock' @@ -16,10 +21,12 @@ import { getDelayFunctionName, XStateTimerId, } from './analysis' +import { defaultLogImpl, Logger } from './logger' export interface CustomInterpreterOptions - extends InterpreterOptions { + extends ActorOptions { clock?: CustomClock + log?: Logger } export type TimerDataEntry = { @@ -39,6 +46,7 @@ export class CustomInterpreter< public data = { timers: {} as TimerData, } + log: Logger public override clock: CustomClock public _statusExtra: 'STOPPING' | undefined @@ -58,35 +66,39 @@ export class CustomInterpreter< constructor(machine: TLogic, options?: CustomInterpreterOptions) { super(machine, options) this.clock = options?.clock || new CustomClock() + this.log = options?.log || defaultLogImpl // I think xstate clears timers using this method // // Sometimes XState calls this for timers that were never scheduled, // but have a definition in a state node this['cancel'] = (sendId: XStateTimerId) => { - console.log('[interpreter] status', this.status) - // TODO. when/how does this get called typically...? - console.log('[interpreter] cancelling timer', sendId, { - 'interpreter.status': this.status, - 'interpreter.statusExtra': this.statusExtra, - }) + this.log.debug( + 'Cancelling timer', + { + sendId, + status: this.status, + statusExtra: this.statusExtra, + }, + 'interpreter.cancel' + ) - // FIXME. what does this method do? - // aren't there timers in the clock also? + // Invoke the default xstate timer cancellation behavior super.cancel(sendId) if (!this.data.timers[sendId]) { - console.debug( - 'Timer data not found for timer id', - sendId, - 'Timers are:', - this.data.timers + this.log.debug( + 'Timer data not found for timer id ' + sendId, + { timers: this.data.timers }, + 'interpreter.cancel' ) return } if (this.statusExtra !== 'STOPPING') { delete this.data.timers[sendId] - console.log('Timer data deleted. Timers are now:', this.data.timers) + this.log.debug('Timer data deleted', { + updatedTimers: this.data.timers, + }) } } } @@ -101,8 +113,6 @@ export class CustomInterpreter< delay: number to?: AnyActorRef }) { - // TODO. custom logger - // console.log('delaySend', sendAction) super.delaySend(sendAction) const now = this.clock.now() if (S.is(XStateTimerId)(sendAction.id)) { @@ -121,17 +131,19 @@ export class CustomInterpreter< undefined, } } else { - console.error('Failed to parse delay event:', sendAction) + this.log.error( + 'Failed to parse delay event: ' + sendAction, + undefined, + 'interpreter.delaySend' + ) } } send(event: TEvent) { - // console.log('gonna send this', event) super.send(event) } stop(): this { - // console.log('gonna stop this') this.setStatusExtra('STOPPING') const ok = super.stop() this.setStatusExtra(undefined) @@ -149,12 +161,14 @@ export const getTimeLeft = (t: TimerDataEntry, now: number) => * @param machine * @param initialState * @param delayEventsInStateNotRestored + * @param log */ export const startTimersOnStart = ( interpreter: CustomInterpreter, machine: AnyStateMachine, initialState: AnyState, - delayEventsInStateNotRestored: Array + delayEventsInStateNotRestored: Array, + log: Logger ) => { const sub = interpreter.subscribe((x) => { if (!x.matches(initialState.value)) { @@ -163,42 +177,42 @@ export const startTimersOnStart = ( ) } - console.log( + log.info( 'Interpreter reached start state. Sending new delay events', - delayEventsInStateNotRestored + { delayEventsInStateNotRestored }, + 'startTimersOnStart' ) delayEventsInStateNotRestored.forEach((delayEvent) => { const fnName = getDelayFunctionName(delayEvent) const delayFn = machine.implementations.delays[fnName] if (!machine.implementations.delays[fnName]) { - console.error( - 'Expected to find a delay function named ', - fnName, - 'but did not' + log.error( + `Expected to find a delay function named ${fnName} but did not`, + { delayEvent, fnName, delayEventsInStateNotRestored }, + 'startTimersOnStart' ) throw new Error( 'Missing implementation for delay function named ' + fnName ) } - console.log( + log.debug( `Computing delay value for delay function "${fnName}" and state "${x.value}"` ) let delayValue: number try { delayValue = delayFn(initialState.context) - console.debug( - 'Computed delay value', - delayValue, - 'for delay function', - fnName + log.debug( + `Computed delay value ${delayValue} for delay function ${fnName}`, + undefined, + 'startTimersOnStart' ) } catch (e) { - console.error( + log.error( 'Error computing delay value for delay function', - fnName, - e + { fnName, error: e }, + 'startTimersOnStart' ) throw e } @@ -225,6 +239,7 @@ export const startTimersOnStart = ( * @param startState - StateValue representing the machine's start state. * Used to detect when the machine has reached this state upon start * @param machine + * @param log */ export const restoreTimersOnStart = < Interpreter extends CustomInterpreter @@ -232,7 +247,8 @@ export const restoreTimersOnStart = < interpreter: Interpreter, timers: TimerData, startState: StateValue, - machine: AnyStateMachine + machine: AnyStateMachine, + log: Logger ) => { // restore timers once machine in start state // machine must be in this state to receive timer fired event @@ -243,32 +259,38 @@ export const restoreTimersOnStart = < ) } - console.log('[restore-timers] Machine resumed in expected state:', { - state: x.value, - context: x.context, - }) + log.debug( + 'Machine resumed in expected state:', + { + state: x.value, + context: x.context, + }, + 'restore-timers' + ) - console.log('[restore-timers] Reinitializing timers...') + log.debug('Reinitializing timers...', undefined, 'restore-timers') Object.values(timers).forEach((timer) => { // todo. should we check if the subject state of this timer still exists in the machine? // and only schedule it if it does? // the state is on the right side of the timer id and should be parsed as stateId // in the `timer` var here (TimerDataEntry type) - console.log('[restore-timers] Attempting to restore timer', timer) + log.debug('Attempting to restore timer', { timer }, 'restore-timers') const now = interpreter.clock.now() const timeElapsed = now - timer.start const timeLeft = timer.delay - timeElapsed - console.log('[restore-timers] Existing timer has time left', timeLeft) + log.debug('Existing timer has time left', { timeLeft }, 'restore-timers') const delayFunctionName = getDelayFunctionName(timer.id) if (!machine.implementations.delays[delayFunctionName]) { - console.error( + log.error( `Previous timer with delay function named "${delayFunctionName}" not found. The existing timer that would have fired at ${new Date( interpreter.clock.now() + timeLeft - ).toISOString()} will not be scheduled as part of this continuation.` + ).toISOString()} will not be scheduled as part of this continuation.`, + undefined, + 'restore-timers' ) return } @@ -279,28 +301,33 @@ export const restoreTimersOnStart = < let delayValueToSet = timeLeft <= 0 ? 1 : timeLeft if (newlyComputedDelay !== timer.delay) { - console.log( - '[restore-timers] Delay function returned new delay', - newlyComputedDelay + log.debug( + 'Delay function returned new delay', + { newlyComputedDelay }, + 'restore-timers' ) - console.log('Calculating a new fire at time...') + log.debug('Calculating a new fire at time...') const fireAt = timer.start - timeElapsed + newlyComputedDelay - console.log( + log.debug( `Timer ${timer.id} should fire at ${new Date( fireAt - ).toISOString()} according to new delay value (${newlyComputedDelay}ms)` + ).toISOString()} according to new delay value (${newlyComputedDelay}ms)`, + {}, + 'restore-timers' ) const distance = fireAt - now - console.log( + log.debug( `...that's approximately ${distance}ms ${ distance < 0 ? 'ago' : distance > 0 ? 'from now' : 'now' - }` + }`, + {}, + 'restore-timers' ) const newDelayValue = distance <= 0 ? 1 : distance - console.log('Setting new delay value to:', newDelayValue) + log.debug('Setting new delay value to:', { newDelayValue }) delayValueToSet = newDelayValue } @@ -320,14 +347,17 @@ export const restoreTimersOnStart = < export function interpret( machine: T, - options?: InterpreterOptions + options?: CustomInterpreterOptions ): CustomInterpreter export function interpret( machine: TLogic, - options?: InterpreterOptions + options?: CustomInterpreterOptions ): CustomInterpreter> -export function interpret(a: any, options?: InterpreterOptions): any { +export function interpret( + a: any, + options?: CustomInterpreterOptions +): any { return new CustomInterpreter(a, options) } diff --git a/packages/migrate/src/logger.ts b/packages/migrate/src/logger.ts new file mode 100644 index 0000000..40de6ce --- /dev/null +++ b/packages/migrate/src/logger.ts @@ -0,0 +1,11 @@ +export type Logger = { + debug: (msg: string, args?: Record, span?: string) => void + info: (msg: string, args?: Record, span?: string) => void + error: (msg: string, args?: Record, span?: string) => void +} + +export const defaultLogImpl = { + debug: () => {}, + info: () => {}, + error: () => {}, +} satisfies Logger diff --git a/yarn.lock b/yarn.lock index f38a35f..0aa594d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -8221,6 +8221,9 @@ __metadata: version: 0.0.0-use.local resolution: "toaster@workspace:examples/toaster" dependencies: + "@diachronic/migrate": "workspace:^" + "@diachronic/util": "workspace:^" + "@diachronic/workflow": "workspace:^" "@effect/schema": "npm:0.61.5" "@temporalio/testing": "npm:1.8.6" "@temporalio/worker": "npm:1.8.6"