Skip to content

Commit

Permalink
Merge pull request #21 from embedded-insurance/migration-logging
Browse files Browse the repository at this point in the history
fix(migration): use custom logger
  • Loading branch information
alex-dixon authored Jul 13, 2024
2 parents 9c8372e + c69ed44 commit 415e240
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 76 deletions.
9 changes: 9 additions & 0 deletions .changeset/sixty-years-sit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'toaster': patch
'@diachronic/migrate': patch
'@diachronic/ci': patch
'@diachronic/feature-flag-client': patch
'@diachronic/toolbox': patch
---

migrate package logging
3 changes: 3 additions & 0 deletions examples/toaster/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"webpack": "^5.88.2"
},
"dependencies": {
"@diachronic/migrate": "workspace:^",
"@diachronic/util": "workspace:^",
"@diachronic/workflow": "workspace:^",
"@effect/schema": "0.61.5",
"@temporalio/testing": "1.8.6",
"@temporalio/worker": "1.8.6",
Expand Down
72 changes: 67 additions & 5 deletions packages/migrate/src/clock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { SimulatedClock } from 'xstate'
import * as Clock from 'effect/Clock'
import { ClockTypeId } from 'effect/Clock'
import * as Effect from 'effect/Effect'
Expand All @@ -17,7 +16,7 @@ export interface XStateClockInterface {
*/
export class CustomClock implements XStateClockInterface, Clock.Clock {
readonly [ClockTypeId]: ClockTypeId
public timeouts: Map<NodeJS.Timeout, any> = new Map()
public timeouts: Map<NodeJS.Timeout | number, any> = new Map()

public getNowMs: () => number

Expand Down Expand Up @@ -48,9 +47,9 @@ export class CustomClock implements XStateClockInterface, Clock.Clock {
return id
}

private _id: number = 0
public _id: number = 0

private getId() {
public getId() {
return this._id++
}

Expand Down Expand Up @@ -89,11 +88,70 @@ export class CustomClock implements XStateClockInterface, Clock.Clock {
}
}

// copied from xstate with public members
class SimulatedClock {
public timeouts: Map<NodeJS.Timeout | number, any>
public _now: number
public _id: number

constructor() {
this.timeouts = new Map()
this._now = 0
this._id = 0
}
now() {
return this._now
}
getId() {
return this._id++
}
setTimeout(fn: (...args: any[]) => void, timeout: number) {
const id = this.getId()
this.timeouts.set(id, {
start: this.now(),
timeout,
fn,
})
return id
}
clearTimeout(id: number) {
this.timeouts.delete(id)
}
set(time: number) {
if (this._now > time) {
throw new Error('Unable to travel back in time')
}
this._now = time
this.flushTimeouts()
}
flushTimeouts() {
;[...this.timeouts]
.sort(([_idA, timeoutA], [_idB, timeoutB]) => {
const endA = timeoutA.start + timeoutA.timeout
const endB = timeoutB.start + timeoutB.timeout
return endB > endA ? -1 : 1
})
.forEach(([id, timeout]) => {
if (this.now() - timeout.start >= timeout.timeout) {
this.timeouts.delete(id)
timeout.fn.call(null)
}
})
}
increment(ms: number) {
this._now += ms
this.flushTimeouts()
}
}

/**
* A clock that gets its current time value from a variable.
* Timers are eligible to fire only on calls to `set` or `increment`.
*/
export class TestClock extends SimulatedClock implements Clock.Clock {
export class TestClock
extends SimulatedClock
implements Clock.Clock, CustomClock
{
readonly [ClockTypeId]: ClockTypeId

constructor() {
Expand Down Expand Up @@ -145,4 +203,8 @@ export class TestClock extends SimulatedClock implements Clock.Clock {
unsafeCurrentTimeMillis(): number {
return this.now()
}

getNowMs() {
return this.now()
}
}
28 changes: 17 additions & 11 deletions packages/migrate/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { pipe } from 'effect/Function'
import { dissoc } from 'ramda'
import { decode } from '@diachronic/util/decode'
import { ArrayFormatter } from '@effect/schema'
import { defaultLogImpl, Logger } from './logger'

/**
* Signal sent to a workflow that will cause it to migrate to a new version via ContinueAsNew
Expand Down Expand Up @@ -80,12 +81,17 @@ export const getSnapshot = defineQuery<WorkflowSnapshot>(getSnapshotQueryName)
const getContinuationXStateNode = (
state: StateValue,
context: any,
machine: AnyStateMachine
machine: AnyStateMachine,
log: Logger
): AnyState | Error => {
try {
return machine.resolveStateValue(state, context)
} catch (e) {
console.error('Could not resolve continuation state', e)
log.error(
'Could not resolve continuation state',
{ error: e },
'getContinuationXStateNode'
)
return e as Error
}
}
Expand Down Expand Up @@ -232,11 +238,7 @@ export type DbFns<DbSnapshot, WorkflowContext> = {
*/
onNewDbSnapshot: (dbSnapshotValue: DbSnapshot) => Promise<any>
}
const defaultLogImpl = {
debug: () => {},
info: () => {},
error: () => {},
}

/**
* Transforms Temporal signals into the form diachronic workflows consume
* Request/response signals have their metadata mapped to an extra `meta` property when present
Expand Down Expand Up @@ -356,7 +358,8 @@ export const makeWorkflow = <
const continuationState = getContinuationXStateNode(
continuationData.state,
continuationData.context,
machine
machine,
log
)

if (continuationState instanceof Error) {
Expand All @@ -372,6 +375,7 @@ export const makeWorkflow = <
interpreter = interpret(machine, {
state: continuationState,
clock: new CustomClock(),
log,
})
const initialState = interpreter.getPersistedState()

Expand Down Expand Up @@ -403,7 +407,8 @@ export const makeWorkflow = <
interpreter,
machine,
continuationState,
delayEventsInStateNotRestored
delayEventsInStateNotRestored,
log
)
} else {
log.info(
Expand All @@ -423,7 +428,8 @@ export const makeWorkflow = <
interpreter,
continuationData.timers,
initialState.value,
machine
machine,
log
)
} else {
log.debug('Migration function provided no timers to restore')
Expand All @@ -433,7 +439,7 @@ export const makeWorkflow = <
currentContext = initialState.context
} else {
// todo. add clock to arguments?
interpreter = interpret(machine, { clock: new CustomClock() })
interpreter = interpret(machine, { clock: new CustomClock(), log })
const initialState = interpreter.getPersistedState()
if (!initialState) {
throw new Error('Could not get persisted state')
Expand Down
Loading

0 comments on commit 415e240

Please sign in to comment.