You can build your own EventStorageAdapter
class by following the EventStorageAdapter
interface:
import type { StorageAdapter } from '@castore/core';
export class CustomStorageAdapter implements StorageAdapter {
getEvents: StorageAdapter['getEvents'];
pushEvent: StorageAdapter['pushEvent'];
listAggregateIds: StorageAdapter['listAggregateIds'];
// 👇 Not used for the moment
putSnapshot: StorageAdapter['putSnapshot'];
getLastSnapshot: StorageAdapter['getLastSnapshot'];
listSnapshots: StorageAdapter['listSnapshots'];
constructor(
... // Add required inputs
) {
// Build your adapter
}
}
You can also take a look at the
InMemoryStorageAdapter
implementation which is quite simple.
The required methods are the following:
-
getEvents ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the events of an aggregate, ordered byversion
. Returns an empty array if no event is found for thisaggregateId
.OptionsObj
contains the following attributes:minVersion (?number)
: To retrieve events above a certain versionmaxVersion (?number)
: To retrieve events below a certain versionlimit (?number)
: Maximum number of events to retrievereverse (?boolean = false)
: To retrieve events in reverse order (does not require to swapminVersion
andmaxVersion
)
ResponseObj
contains the following attributes:events (EventDetail[])
: The aggregate events (possibly empty)
const { events: allEvents } = await customStorageAdapter.getEvents(aggregateId);
// 👇 Retrieve a range of events
const { events: rangedEvents } = await customStorageAdapter.getEvents(
aggregateId,
{ minVersion: 2, maxVersion: 5 },
);
// 👇 Retrieve the last event of the aggregate
const { events: onlyLastEvent } = await customStorageAdapter.getEvents(
aggregateId,
{ reverse: true, limit: 1 },
);
pushEvent ((eventDetail: EventDetail) => Promise<void>)
: Pushes a new event to the event store.
await customStorageAdapter.pushEvent({
aggregateId,
version: lastVersion + 1,
timestamp: new Date().toISOString(),
type: 'USER_CREATED',
payload,
metadata,
});
The pushEvent
method should check at write time that an event doesn't already exist for the given aggregateId
and version
. If one exists, it should throw a custom error implementing the EventAlreadyExistsError
interface for the corresponding aggregateId
and version
.
import {
eventAlreadyExistsErrorCode,
EventAlreadyExistsError,
} from '@castore/core';
class CustomEventAlreadyExistsError
extends Error
implements EventAlreadyExistsError
{
code: typeof eventAlreadyExistsErrorCode;
aggregateId: string;
version: number;
constructor({
eventStoreId = '',
aggregateId,
version,
}: {
eventStoreId?: string;
aggregateId: string;
version: number;
}) {
// 👇 Custom error message
super(
`Event already exists for ${eventStoreId} aggregate ${aggregateId} and version ${version}`,
);
// 👇 Ensures Error is correctly handled
this.code = eventAlreadyExistsErrorCode;
this.aggregateId = aggregateId;
this.version = version;
}
}
This ensures that executed
Commands
are not subject to race conditions and are accordingly retried.
-
listAggregateIds ((opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Retrieves the list ofaggregateId
of an event store, ordered bytimestamp
of their first event. Returns an empty array if no aggregate is found.OptionsObj
contains the following attributes:limit (?number)
: Maximum number of aggregate ids to retrievepageToken (?string)
: To retrieve a paginated result of aggregate ids
ResponseObj
contains the following attributes:aggregateIds (string[])
: The list of aggregate idsnextPageToken (?string)
: A token for the next page of aggregate ids if one exists
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
await customStorageAdapter.listAggregateIds({ limit: 20 });
accAggregateIds.push(...firstPage);
if (nextPageToken) {
const { aggregateIds: secondPage } =
await customStorageAdapter.listAggregateIds({
pageToken: nextPageToken,
});
accAggregateIds.push(...secondPage);
}
putSnapshot ((aggregate: Aggregate) => Promise<void>)
: Saves a snapshot of an aggregate.
⚠️ Snapshot methods are a work in progress. Don't use them in production yet!
-
getLastSnapshot ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Fetches the last snapshot of an aggregate.OptionsObj
contains the following attributes:maxVersion (?number)
: To retrieve snapshot below a certain version
ResponseObj
contains the following attributes:snapshot (?Aggregate)
: The snapshot (possibly undefined)
⚠️ Snapshot methods are a work in progress. Don't use them in production yet!
-
listSnapshots ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>)
: Fetches all snapshots of an aggregate.OptionsObj
contains the following attributes:minVersion (?number)
: To retrieve snapshots above a certain versionmaxVersion (?number)
: To retrieve snapshots below a certain versionlimit (?number)
: Maximum number of snapshots to retrievereverse (?boolean = false)
: To retrieve snapshots in reverse order (does not require to swapminVersion
andmaxVersion
)
ResponseObj
contains the following attributes:snapshots (Aggregate[])
: The list of snapshots (possibly empty)
⚠️ Snapshot methods are a work in progress. Don't use them in production yet!