Skip to content

Latest commit

 

History

History
168 lines (124 loc) · 6.2 KB

building-your-own-event-storage-adapter.md

File metadata and controls

168 lines (124 loc) · 6.2 KB

Building your own EventStorageAdapter

You can build your own EventStorageAdapter class by following the EventStorageAdapter interface:

import type { StorageAdapter } from '@castore/core';

export class CustomStorageAdapter implements StorageAdapter {
  getEvents: StorageAdapter['getEvents'];
  pushEvent: StorageAdapter['pushEvent'];
  listAggregateIds: StorageAdapter['listAggregateIds'];
  // 👇 Not used for the moment
  putSnapshot: StorageAdapter['putSnapshot'];
  getLastSnapshot: StorageAdapter['getLastSnapshot'];
  listSnapshots: StorageAdapter['listSnapshots'];

  constructor(
    ... // Add required inputs
  ) {
    // Build your adapter
  }
}

You can also take a look at the InMemoryStorageAdapter implementation which is quite simple.

The required methods are the following:

  • getEvents ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>): Retrieves the events of an aggregate, ordered by version. Returns an empty array if no event is found for this aggregateId.

    OptionsObj contains the following attributes:

    • minVersion (?number): To retrieve events above a certain version
    • maxVersion (?number): To retrieve events below a certain version
    • limit (?number): Maximum number of events to retrieve
    • reverse (?boolean = false): To retrieve events in reverse order (does not require to swap minVersion and maxVersion)

    ResponseObj contains the following attributes:

    • events (EventDetail[]): The aggregate events (possibly empty)
const { events: allEvents } = await customStorageAdapter.getEvents(aggregateId);

// 👇 Retrieve a range of events
const { events: rangedEvents } = await customStorageAdapter.getEvents(
  aggregateId,
  { minVersion: 2, maxVersion: 5 },
);

// 👇 Retrieve the last event of the aggregate
const { events: onlyLastEvent } = await customStorageAdapter.getEvents(
  aggregateId,
  { reverse: true, limit: 1 },
);
  • pushEvent ((eventDetail: EventDetail) => Promise<void>): Pushes a new event to the event store.
await customStorageAdapter.pushEvent({
  aggregateId,
  version: lastVersion + 1,
  timestamp: new Date().toISOString(),
  type: 'USER_CREATED',
  payload,
  metadata,
});

The pushEvent method should check at write time that an event doesn't already exist for the given aggregateId and version. If one exists, it should throw a custom error implementing the EventAlreadyExistsError interface for the corresponding aggregateId and version.

import {
  eventAlreadyExistsErrorCode,
  EventAlreadyExistsError,
} from '@castore/core';

class CustomEventAlreadyExistsError
  extends Error
  implements EventAlreadyExistsError
{
  code: typeof eventAlreadyExistsErrorCode;
  aggregateId: string;
  version: number;

  constructor({
    eventStoreId = '',
    aggregateId,
    version,
  }: {
    eventStoreId?: string;
    aggregateId: string;
    version: number;
  }) {
    // 👇 Custom error message
    super(
      `Event already exists for ${eventStoreId} aggregate ${aggregateId} and version ${version}`,
    );

    // 👇 Ensures Error is correctly handled
    this.code = eventAlreadyExistsErrorCode;
    this.aggregateId = aggregateId;
    this.version = version;
  }
}

This ensures that executed Commands are not subject to race conditions and are accordingly retried.

  • listAggregateIds ((opt?: OptionsObj = {}) => Promise<ResponseObj>): Retrieves the list of aggregateId of an event store, ordered by timestamp of their first event. Returns an empty array if no aggregate is found.

    OptionsObj contains the following attributes:

    • limit (?number): Maximum number of aggregate ids to retrieve
    • pageToken (?string): To retrieve a paginated result of aggregate ids

    ResponseObj contains the following attributes:

    • aggregateIds (string[]): The list of aggregate ids
    • nextPageToken (?string): A token for the next page of aggregate ids if one exists
const accAggregateIds: string = [];
const { aggregateIds: firstPage, nextPageToken } =
  await customStorageAdapter.listAggregateIds({ limit: 20 });

accAggregateIds.push(...firstPage);

if (nextPageToken) {
  const { aggregateIds: secondPage } =
    await customStorageAdapter.listAggregateIds({
      pageToken: nextPageToken,
    });
  accAggregateIds.push(...secondPage);
}
  • putSnapshot ((aggregate: Aggregate) => Promise<void>): Saves a snapshot of an aggregate.

⚠️ Snapshot methods are a work in progress. Don't use them in production yet!

  • getLastSnapshot ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>): Fetches the last snapshot of an aggregate.

    OptionsObj contains the following attributes:

    • maxVersion (?number): To retrieve snapshot below a certain version

    ResponseObj contains the following attributes:

    • snapshot (?Aggregate): The snapshot (possibly undefined)

⚠️ Snapshot methods are a work in progress. Don't use them in production yet!

  • listSnapshots ((aggregateId: string, opt?: OptionsObj = {}) => Promise<ResponseObj>): Fetches all snapshots of an aggregate.

    OptionsObj contains the following attributes:

    • minVersion (?number): To retrieve snapshots above a certain version
    • maxVersion (?number): To retrieve snapshots below a certain version
    • limit (?number): Maximum number of snapshots to retrieve
    • reverse (?boolean = false): To retrieve snapshots in reverse order (does not require to swap minVersion and maxVersion)

    ResponseObj contains the following attributes:

    • snapshots (Aggregate[]): The list of snapshots (possibly empty)

⚠️ Snapshot methods are a work in progress. Don't use them in production yet!