Skip to content

Commit

Permalink
refactored event processor interface
Browse files Browse the repository at this point in the history
Signed-off-by: david <[email protected]>
  • Loading branch information
daywiss authored and amateima committed Nov 26, 2024
1 parent d7b02f6 commit 180a47e
Show file tree
Hide file tree
Showing 12 changed files with 282 additions and 84 deletions.
8 changes: 4 additions & 4 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { Entity, PrimaryColumn, Column } from "typeorm";
import { Entity, PrimaryColumn, Column, PrimaryGeneratedColumn } from "typeorm";

@Entity()
export class WebhookClient {
@Column()
name: string;

@PrimaryColumn()
@PrimaryGeneratedColumn()
id: string;

@Column()
@Column({ unique: true })
apiKey: string;

@Column("simple-array")
@Column("jsonb")
domains: string[];
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/WebhookRequest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ export class WebhookRequest {

@Column()
filter: string;

@Column({ type: "text", nullable: true, default: undefined })
clientId?: string | undefined;
}
175 changes: 175 additions & 0 deletions packages/indexer-database/src/migrations/1732293783614-Webhook.ts

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion packages/webhooks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"redis": "^4.7.0",
"superstruct": "2.0.3-1"
"superstruct": "2.0.3-1",
"uuid": "^11.0.3"
},
"exports": {
".": "./dist/index.js"
Expand Down
11 changes: 7 additions & 4 deletions packages/webhooks/src/database/webhookClientRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { entities, DataSource } from "@repo/indexer-database";
import assert from "assert";

// This class is intended to store integration clients allowed to use the webhook service.
export class WebhookClientRepository {
Expand All @@ -15,7 +16,7 @@ export class WebhookClientRepository {
if (existingClient) {
throw new Error(`Client with id ${client.id} already exists.`);
}
await this.repository.save(client);
await this.repository.insert(client);
}

public async unregisterClient(clientId: string): Promise<void> {
Expand All @@ -40,9 +41,11 @@ export class WebhookClientRepository {
return this.repository.find();
}

public async findClientsByApiKey(
public async getClientByApiKey(
apiKey: string,
): Promise<entities.WebhookClient[]> {
return this.repository.find({ where: { apiKey } });
): Promise<entities.WebhookClient> {
const result = await this.repository.findOne({ where: { apiKey } });
assert(result, "Invalid api key");
return result;
}
}
26 changes: 15 additions & 11 deletions packages/webhooks/src/database/webhookRequestRepository.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { entities, DataSource } from "@repo/indexer-database";
import assert from "assert";
import { exists } from "../utils";

export class WebhookRequestRepository {
private repository;
Expand All @@ -14,7 +16,7 @@ export class WebhookRequestRepository {
if (existingWebhook) {
throw new Error(`Webhook with id ${webhook.id} already exists.`);
}
await this.repository.save(webhook);
await this.repository.insert(webhook);
}

public async unregister(webhookId: string): Promise<void> {
Expand All @@ -27,26 +29,28 @@ export class WebhookRequestRepository {
await this.repository.delete({ id: webhookId });
}

public async getWebhook(
public async getWebhookRequest(
webhookId: string,
): Promise<entities.WebhookRequest | undefined> {
return (
(await this.repository.findOne({ where: { id: webhookId } })) ?? undefined
);
): Promise<entities.WebhookRequest> {
const result = await this.repository.findOne({ where: { id: webhookId } });
assert(result, "Webhook request not found");
return result;
}

public async listWebhooks(): Promise<entities.WebhookRequest[]> {
public async listWebhookRequests(): Promise<entities.WebhookRequest[]> {
return this.repository.find();
}

public async filterWebhooks(
public async findWebhookRequestsByFilter(
filter: string,
): Promise<entities.WebhookRequest[]> {
return this.repository.find({ where: { filter } });
}

public async hasWebhook(webhookId: string): Promise<boolean> {
const count = await this.repository.count({ where: { id: webhookId } });
return count > 0;
public async hasWebhookRequest(webhookId: string): Promise<boolean> {
const result = await this.repository.findOne({
where: { id: webhookId },
});
return exists(result);
}
}
34 changes: 20 additions & 14 deletions packages/webhooks/src/eventProcessorManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ export class EventProcessorManager {
}
// Register a new type of webhook processor able to be written to
public registerEventProcessor(name: string, webhook: IEventProcessor) {
this.logger.info(
this.logger.debug(
`Attempting to register event processor with name: ${name}`,
);
assert(
!this.processors.has(name),
`Webhook with that name already exists: ${name}`,
);
this.processors.set(name, webhook);
this.logger.info(
this.logger.debug(
`Successfully registered event processor with name: ${name}`,
);
}
Expand All @@ -60,32 +60,36 @@ export class EventProcessorManager {
};

async registerWebhook(
id: string,
params: { type: string; url: string; filter: JSONValue },
apiKey?: string,
) {
this.logger.info(
this.logger.debug(
`Attempting to register webhook of type: ${params.type} with URL: ${params.url}`,
);
let client;
if (this.config.requireApiKey) {
if (apiKey === undefined) throw new Error("Api Key required");
const clients = await this.clientRepository.findClientsByApiKey(apiKey);
assert(clients.length > 0, "Invalid api key");
client = await this.clientRepository.getClientByApiKey(apiKey);
const urlDomain = new URL(params.url).hostname;
const isDevDomain =
urlDomain === "localhost" || urlDomain.startsWith("127.");
if (!isDevDomain) {
const isDomainValid = clients.some((client) =>
client.domains.includes(urlDomain),
);
const isDomainValid = client.domains.includes(urlDomain);
assert(
isDomainValid,
"The base URL of the provided webhook does not match any of the client domains",
);
}
}
const webhook = this.getEventProcessor(params.type);
const result = await webhook.register(params.url, params.filter);
this.logger.info(`Successfully registered webhook with ID: ${result}`);
const result = await webhook.register(
id,
params.url,
params.filter,
client?.id,
);
this.logger.debug(`Successfully registered webhook with ID: ${result}`);
return result;
}

Expand All @@ -94,17 +98,19 @@ export class EventProcessorManager {
params: { type: string; id: string },
apiKey?: string,
) {
this.logger.info(
this.logger.debug(
`Attempting to unregister webhook of type: ${params.type} with ID: ${params.id}`,
);
const webhook = this.getEventProcessor(params.type);
await webhook.unregister(params.id);
this.logger.info(`Successfully unregistered webhook with ID: ${params.id}`);
this.logger.debug(
`Successfully unregistered webhook with ID: ${params.id}`,
);
}

async registerClient(client: entities.WebhookClient) {
this.logger.info(`Attempting to register client with ID: ${client.id}`);
this.logger.debug(`Attempting to register client with ID: ${client.id}`);
await this.clientRepository.registerClient(client);
this.logger.info(`Successfully registered client with ID: ${client.id}`);
this.logger.debug(`Successfully registered client with ID: ${client.id}`);
}
}
43 changes: 24 additions & 19 deletions packages/webhooks/src/eventProcessors/depositStatus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,22 @@ export class DepositStatusProcessor implements IEventProcessor {
private webhookRequests: WebhookRequestRepository;
private notify: (params: NotificationPayload) => void;
private postgres: DataSource;
// Type shoudl be uniqe across all event processors, this is to avoid colliding with multiple
// processors writing to the same tables
static type = "DepositStatus";

constructor(deps: Dependencies) {
constructor(
deps: Dependencies,
private type: string = "DepositStatus",
) {
this.webhookRequests = new WebhookRequestRepository(deps.postgres);
this.notify = deps.notify;
this.postgres = deps.postgres;
}
private async _write(event: DepositStatusEvent): Promise<void> {
const filter = customId(
DepositStatusProcessor.type,
this.type,
event.originChainId,
event.depositTxHash,
);
const hooks = await this.webhookRequests.filterWebhooks(filter);
const hooks =
await this.webhookRequests.findWebhookRequestsByFilter(filter);
//TODO: unregister any hooks where event has reached terminal state
await Promise.all(
hooks.map((hook) => {
Expand All @@ -61,28 +61,28 @@ export class DepositStatusProcessor implements IEventProcessor {
);
}
private async _register(
id: string,
url: string,
params: DepositStatusFilter,
clientId?: string,
): Promise<string> {
const id = customId(
DepositStatusProcessor.type,
url,
params.originChainId,
params.depositTxHash,
);
const filter = customId(
DepositStatusProcessor.type,
this.type,
clientId ?? "",
params.originChainId,
params.depositTxHash,
);
const existingFilters =
await this.webhookRequests.findWebhookRequestsByFilter(filter);
assert(
!(await this.webhookRequests.hasWebhook(id)),
"This webhook already exists",
existingFilters.length === 0,
"Webhook already exists for this filter",
);
await this.webhookRequests.register({
id,
filter,
url,
clientId,
});
const relayHashInfoRepository = this.postgres.getRepository(
entities.RelayHashInfo,
Expand All @@ -98,12 +98,17 @@ export class DepositStatusProcessor implements IEventProcessor {
});
return id;
}
async register(url: string, params: unknown) {
return this._register(url, ss.create(params, DepositStatusFilter));
async register(id: string, url: string, params: unknown, clientId?: string) {
return this._register(
id,
url,
ss.create(params, DepositStatusFilter),
clientId,
);
}
async unregister(id: string): Promise<void> {
assert(
await this.webhookRequests.hasWebhook(id),
await this.webhookRequests.hasWebhookRequest(id),
"This webhook does not exist",
);
await this.webhookRequests.unregister(id);
Expand Down
13 changes: 8 additions & 5 deletions packages/webhooks/src/factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ export function WebhookFactory(config: Config, deps: Dependencies) {
config.enabledWebhooks.forEach((name) => {
switch (name) {
// add more webhook types here
case "DepositStatus": {
case WebhookTypes.DepositStatus: {
eventProcessorManager.registerEventProcessor(
name,
new DepositStatusProcessor({
postgres,
notify: notifier.notify,
}),
new DepositStatusProcessor(
{
postgres,
notify: notifier.notify,
},
WebhookTypes.DepositStatus,
),
);
break;
}
Expand Down
5 changes: 4 additions & 1 deletion packages/webhooks/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import express from "express";
import { EventProcessorManager } from "./eventProcessorManager";
import * as ss from "superstruct";
import bearerToken from "express-bearer-token";
import { v4 as uuidv4 } from "uuid";

type Dependencies = {
eventProcessorManager: EventProcessorManager;
Expand Down Expand Up @@ -32,7 +33,9 @@ export function WebhookRouter(deps: Dependencies): express.Router {
) => {
try {
const parsedBody = RegistrationParams.create(req.body);
const id = await deps.eventProcessorManager.registerWebhook(
const id = uuidv4();
await deps.eventProcessorManager.registerWebhook(
id,
parsedBody,
req.token,
);
Expand Down
7 changes: 6 additions & 1 deletion packages/webhooks/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import * as ss from "superstruct";

export interface IEventProcessor {
write(event: JSONValue): void;
register(url: string, params: JSONValue): Promise<string>;
register(
id: string,
url: string,
params: JSONValue,
clientId?: string,
): Promise<string>;
unregister(id: string): Promise<void>;
}

Expand Down
Loading

0 comments on commit 180a47e

Please sign in to comment.