diff --git a/packages/indexer-api/src/main.ts b/packages/indexer-api/src/main.ts index d8e32dd..f1b5646 100644 --- a/packages/indexer-api/src/main.ts +++ b/packages/indexer-api/src/main.ts @@ -87,10 +87,10 @@ export async function Main( const redis = await initializeRedis(redisConfig, logger); const webhooks = Webhooks.WebhookFactory( { - requireApiKey: false, enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, - { postgres, logger }, + { postgres, logger, redis }, ); const allRouters: Record = { diff --git a/packages/indexer-database/src/entities/WebhookClient.ts b/packages/indexer-database/src/entities/WebhookClient.ts index e267717..ecae8fa 100644 --- a/packages/indexer-database/src/entities/WebhookClient.ts +++ b/packages/indexer-database/src/entities/WebhookClient.ts @@ -1,14 +1,15 @@ -import { Entity, PrimaryColumn, Column, PrimaryGeneratedColumn } from "typeorm"; +import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm"; @Entity() +@Unique("UK_webhook_client_api_key", ["apiKey"]) export class WebhookClient { + @PrimaryGeneratedColumn() + id: number; + @Column() name: string; - @PrimaryGeneratedColumn() - id: string; - - @Column({ unique: true }) + @Column() apiKey: string; @Column("jsonb") diff --git a/packages/indexer-database/src/entities/WebhookRequest.ts b/packages/indexer-database/src/entities/WebhookRequest.ts index 7022f5f..4f35ef3 100644 --- a/packages/indexer-database/src/entities/WebhookRequest.ts +++ b/packages/indexer-database/src/entities/WebhookRequest.ts @@ -1,16 +1,28 @@ -import { Entity, PrimaryColumn, Column } from "typeorm"; +import { + Entity, + PrimaryColumn, + Column, + Unique, + CreateDateColumn, + Index, +} from "typeorm"; @Entity() +@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"]) +@Index("IX_webhook_request_filter", ["filter"]) export class WebhookRequest { @PrimaryColumn() id: string; + @Column({ type: "integer" }) + clientId: number; + @Column() url: string; @Column() filter: string; - @Column({ type: "text", nullable: true, default: undefined }) - clientId?: string | undefined; + @CreateDateColumn() + createdAt: Date; } diff --git a/packages/indexer-database/src/main.ts b/packages/indexer-database/src/main.ts index 7ba56df..96c9bf4 100644 --- a/packages/indexer-database/src/main.ts +++ b/packages/indexer-database/src/main.ts @@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => { entities.RootBundleExecutedJoinTable, // Others entities.RelayHashInfo, + // Webhooks + entities.WebhookRequest, + entities.WebhookClient, ], migrationsTableName: "_migrations", migrations: ["migrations/*.ts"], diff --git a/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts new file mode 100644 index 0000000..3cfaf4c --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297474910-WebhookClient.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookClient1732297474910 implements MigrationInterface { + name = "WebhookClient1732297474910"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_client" ( + "id" SERIAL NOT NULL, + "name" character varying NOT NULL, + "apiKey" character varying NOT NULL, + "domains" jsonb NOT NULL, + CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"), + CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_client"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts new file mode 100644 index 0000000..8c8190d --- /dev/null +++ b/packages/indexer-database/src/migrations/1732297948190-WebhookRequest.ts @@ -0,0 +1,22 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732297948190 implements MigrationInterface { + name = "WebhookRequest1732297948190"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "webhook_request" ( + "id" character varying NOT NULL, + "clientId" integer NOT NULL, + "url" character varying NOT NULL, + "filter" character varying NOT NULL, + "createdAt" TIMESTAMP NOT NULL DEFAULT now(), + CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"), + CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id") + )`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE "webhook_request"`); + } +} diff --git a/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts new file mode 100644 index 0000000..bbf1d23 --- /dev/null +++ b/packages/indexer-database/src/migrations/1732310112989-WebhookRequest.ts @@ -0,0 +1,15 @@ +import { MigrationInterface, QueryRunner } from "typeorm"; + +export class WebhookRequest1732310112989 implements MigrationInterface { + name = "WebhookRequest1732310112989"; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`); + } +} diff --git a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts index 4a8c740..c5721c5 100644 --- a/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts +++ b/packages/indexer/src/data-indexing/service/AcrossIndexerManager.ts @@ -1,6 +1,7 @@ import { Logger } from "winston"; import { DataSource } from "@repo/indexer-database"; +import { eventProcessorManager } from "@repo/webhooks"; import { Config } from "../../parseEnv"; import { HubPoolRepository } from "../../database/HubPoolRepository"; @@ -39,6 +40,7 @@ export class AcrossIndexerManager { private spokePoolRepository: SpokePoolRepository, private redisCache: RedisCache, private indexerQueuesService: IndexerQueuesService, + private webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async start() { @@ -93,7 +95,12 @@ export class AcrossIndexerManager { this.hubPoolClientFactory, this.spokePoolClientFactory, this.spokePoolRepository, - new SpokePoolProcessor(this.postgres, this.logger, chainId), + new SpokePoolProcessor( + this.postgres, + this.logger, + chainId, + this.webhookWriteFn, + ), this.indexerQueuesService, ); const spokePoolIndexer = new Indexer( diff --git a/packages/indexer/src/main.ts b/packages/indexer/src/main.ts index 56847a5..3767271 100644 --- a/packages/indexer/src/main.ts +++ b/packages/indexer/src/main.ts @@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { // Call write to kick off webhook calls const { write } = WebhookFactory( { - requireApiKey: false, enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: true, }, - { postgres, logger }, + { postgres, logger, redis }, ); // Retry providers factory const retryProvidersFactory = new RetryProvidersFactory( @@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) { new SpokePoolRepository(postgres, logger), redisCache, indexerQueuesService, + write, ); const bundleServicesManager = new BundleServicesManager( config, diff --git a/packages/indexer/src/services/spokePoolProcessor.ts b/packages/indexer/src/services/spokePoolProcessor.ts index b913d3b..9c9bcb5 100644 --- a/packages/indexer/src/services/spokePoolProcessor.ts +++ b/packages/indexer/src/services/spokePoolProcessor.ts @@ -1,11 +1,14 @@ import { utils } from "@across-protocol/sdk"; +import winston from "winston"; + import { DataSource, entities, utils as dbUtils, SaveQueryResultType, } from "@repo/indexer-database"; -import winston from "winston"; +import { WebhookTypes, eventProcessorManager } from "@repo/webhooks"; + import { RelayStatus } from "../../../indexer-database/dist/src/entities"; import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler"; @@ -22,6 +25,7 @@ export class SpokePoolProcessor { private readonly postgres: DataSource, private readonly logger: winston.Logger, private readonly chainId: number, + private readonly webhookWriteFn?: eventProcessorManager.WebhookWriteFn, ) {} public async process(events: StoreEventsResult) { @@ -37,9 +41,19 @@ export class SpokePoolProcessor { SpokePoolEvents.V3FundsDeposited, [...newDeposits, ...updatedDeposits], ); - // TODO: for new deposits, notify status change to unfilled - // here... + // Notify webhook of new deposits + newDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.Unfilled, + }, + }); + }); const newSlowFillRequests = dbUtils.filterSaveQueryResults( events.slowFillRequests, SaveQueryResultType.Inserted, @@ -52,8 +66,19 @@ export class SpokePoolProcessor { SpokePoolEvents.RequestedV3SlowFill, [...newSlowFillRequests, ...updatedSlowFillRequests], ); - // TODO: for new slow fill requests, notify status change to slow fill requested - // here... + + // Notify webhook of new slow fill requests + newSlowFillRequests.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.id, + originChainId: deposit.originChainId, + depositTxHash: deposit.transactionHash, + status: RelayStatus.SlowFillRequested, + }, + }); + }); const newFills = dbUtils.filterSaveQueryResults( events.fills, @@ -67,16 +92,38 @@ export class SpokePoolProcessor { ...newFills, ...updatedFills, ]); - // TODO: for new fills, notify status change to filled - // here... + + // Notify webhook of new fills + newFills.forEach((fill) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: fill.depositId, + originChainId: fill.originChainId, + depositTxHash: fill.transactionHash, + status: RelayStatus.Filled, + }, + }); + }); const expiredDeposits = await this.updateExpiredRelays(); // TODO: for expired deposits, notify status change to expired // here... const refundedDeposits = await this.updateRefundedDepositsStatus(); - // TODO: for refunded deposits, notify status change to refunded - // here... + + // Notify webhook of refunded deposits + refundedDeposits.forEach((deposit) => { + this.webhookWriteFn?.({ + type: WebhookTypes.DepositStatus, + event: { + depositId: deposit.depositId, + originChainId: deposit.originChainId, + depositTxHash: deposit.depositTxHash, + status: RelayStatus.Refunded, + }, + }); + }); } /** diff --git a/packages/webhooks/README.md b/packages/webhooks/README.md index 189a516..0ec7ad7 100644 --- a/packages/webhooks/README.md +++ b/packages/webhooks/README.md @@ -11,7 +11,6 @@ The `factory.ts` file provides a `WebhookFactory` function that sets up the webh To use the `WebhookFactory`, you need to provide a configuration object and dependencies: - **Config**: This object should include: - - requireApiKey: boolean; - Should registration of new webhooks require an api key - enabledWebhooks: WebhookTypes[]; - What event processors should be enabled, like 'DepositStatus' - **Dependencies**: This object should include: @@ -27,8 +26,8 @@ import { DataSource } from "@repo/indexer-database"; const webhooks = WebhookFactory( { - requireApiKey: false, - enableWebhooks: [WebhookTypes.DepositStatus], + enabledWebhooks: [WebhookTypes.DepositStatus], + enabledWebhookRequestWorkers: false, }, { postgres, logger }, ); diff --git a/packages/webhooks/package.json b/packages/webhooks/package.json index 580c468..7ecb36e 100644 --- a/packages/webhooks/package.json +++ b/packages/webhooks/package.json @@ -22,8 +22,10 @@ "license": "ISC", "dependencies": { "@repo/indexer-database": "workspace:*", + "bullmq": "^5.12.12", "express": "^4.19.2", "express-bearer-token": "^3.0.0", + "ioredis": "^5.4.1", "redis": "^4.7.0", "superstruct": "2.0.3-1", "uuid": "^11.0.3" diff --git a/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts new file mode 100644 index 0000000..2956077 --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhookRequestWorker.ts @@ -0,0 +1,84 @@ +import Redis from "ioredis"; +import winston from "winston"; +import { Job, Worker } from "bullmq"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhooksQueues } from "./WebhooksQueuesService"; +import { WebhookTypes } from "../../factory"; +import { WebhookWriteFn } from "../../eventProcessorManager"; + +export type WebhookRequestQueueJob = { + webhookRequestId: string; + depositTxHash: string; + originChainId: number; +}; + +export class WebhookRequestWorker { + public worker: Worker; + + constructor( + private redis: Redis, + private postgres: DataSource, + private logger: winston.Logger, + private webhookWriteFn: WebhookWriteFn, + ) { + this.setWorker(); + } + + public setWorker() { + this.worker = new Worker( + WebhooksQueues.WebhookRequest, + async (job: Job) => { + try { + this.logger.debug({ + at: "WebhookRequestWorker", + message: `Processing job for webhook request ${job.data.webhookRequestId}`, + }); + await this.run(job.data); + } catch (error) { + this.logger.error({ + at: "WebhookRequestWorker", + message: `Error processing job for webhook request ${job.data.webhookRequestId}`, + error, + }); + throw error; + } + }, + { connection: this.redis, concurrency: 10 }, + ); + } + + private async run(webhookRequestJob: WebhookRequestQueueJob) { + const { depositTxHash, originChainId } = webhookRequestJob; + const relayHashInfo = await this.postgres + .getRepository(entities.RelayHashInfo) + .findOne({ + where: { + depositTxHash, + originChainId, + }, + }); + if (!relayHashInfo) { + this.logger.warn({ + at: "WebhookRequestWorker", + message: `Relay hash info not found for webhook request ${webhookRequestJob.webhookRequestId}`, + webhookRequestJob, + }); + return; + } + this.webhookWriteFn({ + type: WebhookTypes.DepositStatus, + event: { + originChainId: relayHashInfo.originChainId, + depositTxHash: relayHashInfo.depositTxHash, + depositId: relayHashInfo.depositId, + status: relayHashInfo.status, + }, + }); + } + + public async close() { + return this.worker.close(); + } +} diff --git a/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts new file mode 100644 index 0000000..f25cd1d --- /dev/null +++ b/packages/webhooks/src/adapter/messaging/WebhooksQueuesService.ts @@ -0,0 +1,53 @@ +import Redis from "ioredis"; +import { Queue, JobsOptions, BulkJobOptions } from "bullmq"; + +export enum WebhooksQueues { + WebhookRequest = "WebhookRequest", +} + +export class WebhooksQueuesService { + private queues = {} as Record; + + constructor(private connection: Redis) { + this.initializeQueues(); + } + + private initializeQueues() { + const queueNames = Object.values(WebhooksQueues); + queueNames.forEach( + (queueName) => + (this.queues[queueName] = new Queue(queueName, { + connection: this.connection, + defaultJobOptions: { + attempts: Number.MAX_SAFE_INTEGER, + removeOnComplete: true, + }, + })), + ); + } + + public async publishMessage( + queue: WebhooksQueues, + message: T, + options: JobsOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.add(queue, message, options); + } + } + + public async publishMessagesBulk( + queue: WebhooksQueues, + jobName: string, + messages: T[], + options: BulkJobOptions = {}, + ) { + const q = this.queues[queue]; + if (q) { + await q.addBulk( + messages.map((m) => ({ name: jobName, data: m, opts: options })), + ); + } + } +} diff --git a/packages/webhooks/src/database/webhookClientRepository.ts b/packages/webhooks/src/database/webhookClientRepository.ts index ec4c540..32035ae 100644 --- a/packages/webhooks/src/database/webhookClientRepository.ts +++ b/packages/webhooks/src/database/webhookClientRepository.ts @@ -19,7 +19,7 @@ export class WebhookClientRepository { await this.repository.insert(client); } - public async unregisterClient(clientId: string): Promise { + public async unregisterClient(clientId: number): Promise { const existingClient = await this.repository.findOne({ where: { id: clientId }, }); @@ -30,7 +30,7 @@ export class WebhookClientRepository { } public async getClient( - clientId: string, + clientId: number, ): Promise { return ( (await this.repository.findOne({ where: { id: clientId } })) ?? undefined @@ -48,4 +48,11 @@ export class WebhookClientRepository { assert(result, "Invalid api key"); return result; } + + public async getWebhookClientById( + id: number, + ): Promise { + const client = await this.repository.findOne({ where: { id } }); + return client ?? undefined; + } } diff --git a/packages/webhooks/src/database/webhookRequestRepository.ts b/packages/webhooks/src/database/webhookRequestRepository.ts index e6abc76..c632b33 100644 --- a/packages/webhooks/src/database/webhookRequestRepository.ts +++ b/packages/webhooks/src/database/webhookRequestRepository.ts @@ -9,7 +9,9 @@ export class WebhookRequestRepository { this.repository = this.dataSource.getRepository(entities.WebhookRequest); } - public async register(webhook: entities.WebhookRequest): Promise { + public async register( + webhook: Omit, + ): Promise { const existingWebhook = await this.repository.findOne({ where: { id: webhook.id }, }); @@ -47,6 +49,13 @@ export class WebhookRequestRepository { return this.repository.find({ where: { filter } }); } + public async findWebhookRequestsByFilterAndClient( + filter: string, + clientId: number, + ): Promise { + return this.repository.find({ where: { filter, clientId } }); + } + public async hasWebhookRequest(webhookId: string): Promise { const result = await this.repository.findOne({ where: { id: webhookId }, diff --git a/packages/webhooks/src/eventProcessorManager.ts b/packages/webhooks/src/eventProcessorManager.ts index 899f0ad..12aa53a 100644 --- a/packages/webhooks/src/eventProcessorManager.ts +++ b/packages/webhooks/src/eventProcessorManager.ts @@ -1,13 +1,23 @@ -import { WebhookClientRepository } from "./database/webhookClientRepository"; -import { DataSource, entities } from "@repo/indexer-database"; import { Logger } from "winston"; import assert from "assert"; + +import { DataSource, entities } from "@repo/indexer-database"; + +import { WebhookClientRepository } from "./database/webhookClientRepository"; import { JSONValue, IEventProcessor } from "./types"; +import { + WebhooksQueues, + WebhooksQueuesService, +} from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookTypes } from "./factory"; +import { WebhookRequestQueueJob } from "./adapter/messaging/WebhookRequestWorker"; export type EventProcessorRecord = Record; -type EventType = { - type: string; +export type WebhookWriteFn = (event: EventType) => void; + +export type EventType = { + type: WebhookTypes; event: JSONValue; }; @@ -18,21 +28,22 @@ export type Config = { export type Dependencies = { postgres: DataSource; logger: Logger; + webhooksQueuesService: WebhooksQueuesService; }; export class EventProcessorManager { private logger: Logger; private clientRepository: WebhookClientRepository; - private processors = new Map(); + private processors = new Map(); + private webhooksQueuesService: WebhooksQueuesService; - constructor( - private config: Config, - deps: Dependencies, - ) { + constructor(deps: Dependencies) { this.logger = deps.logger; this.clientRepository = new WebhookClientRepository(deps.postgres); // Initialize the client manager + this.webhooksQueuesService = deps.webhooksQueuesService; } + // Register a new type of webhook processor able to be written to - public registerEventProcessor(name: string, webhook: IEventProcessor) { + public registerEventProcessor(name: WebhookTypes, webhook: IEventProcessor) { this.logger.debug( `Attempting to register event processor with name: ${name}`, ); @@ -46,15 +57,12 @@ export class EventProcessorManager { ); } - private getEventProcessor(name: string) { + private getEventProcessor(name: WebhookTypes) { const eventProcessor = this.processors.get(name); - assert( - eventProcessor, - "EventProcessor does not exist by type: ${event.type}", - ); + assert(eventProcessor, `EventProcessor does not exist by type: ${name}`); return eventProcessor; } - write = (event: EventType): void => { + write: WebhookWriteFn = (event: EventType): void => { const webhook = this.getEventProcessor(event.type); webhook.write(event.event); }; @@ -62,35 +70,39 @@ export class EventProcessorManager { async registerWebhook( id: string, params: { type: string; url: string; filter: JSONValue }, - apiKey?: string, + apiKey: string, ) { this.logger.debug( `Attempting to register webhook of type: ${params.type} with URL: ${params.url}`, ); - let client; - if (this.config.requireApiKey) { - if (apiKey === undefined) throw new Error("Api Key required"); - client = await this.clientRepository.getClientByApiKey(apiKey); - const urlDomain = new URL(params.url).hostname; - const isDevDomain = - urlDomain === "localhost" || urlDomain.startsWith("127."); - if (!isDevDomain) { - const isDomainValid = client.domains.includes(urlDomain); - assert( - isDomainValid, - "The base URL of the provided webhook does not match any of the client domains", - ); - } - } - const webhook = this.getEventProcessor(params.type); - const result = await webhook.register( + const client = await this.clientRepository.getClientByApiKey(apiKey); + const urlDomain = new URL(params.url).hostname; + const isDomainValid = client.domains.includes(urlDomain); + assert( + isDomainValid, + "The base URL of the provided webhook does not match any of the client domains", + ); + assert((params.filter as any).depositTxHash, "depositTxHash is required"); + assert((params.filter as any).originChainId, "originChainId is required"); + const webhook = this.getEventProcessor(params.type as WebhookTypes); + const webhookRequestId = await webhook.register( id, params.url, params.filter, - client?.id, + client.id, + ); + this.logger.debug( + `Successfully registered webhook with ID: ${webhookRequestId}`, + ); + this.webhooksQueuesService.publishMessage( + WebhooksQueues.WebhookRequest, + { + webhookRequestId, + depositTxHash: (params.filter as any).depositTxHash, + originChainId: (params.filter as any).originChainId, + }, ); - this.logger.debug(`Successfully registered webhook with ID: ${result}`); - return result; + return webhookRequestId; } // TODO: gaurd this with api key @@ -101,7 +113,7 @@ export class EventProcessorManager { this.logger.debug( `Attempting to unregister webhook of type: ${params.type} with ID: ${params.id}`, ); - const webhook = this.getEventProcessor(params.type); + const webhook = this.getEventProcessor(params.type as WebhookTypes); await webhook.unregister(params.id); this.logger.debug( `Successfully unregistered webhook with ID: ${params.id}`, diff --git a/packages/webhooks/src/eventProcessors/depositStatus.ts b/packages/webhooks/src/eventProcessors/depositStatus.ts index 895e001..20512c9 100644 --- a/packages/webhooks/src/eventProcessors/depositStatus.ts +++ b/packages/webhooks/src/eventProcessors/depositStatus.ts @@ -1,11 +1,13 @@ import assert from "assert"; import * as ss from "superstruct"; +import { Logger } from "winston"; import { DataSource, entities } from "@repo/indexer-database"; import { WebhookRequestRepository } from "../database/webhookRequestRepository"; import { customId } from "../utils"; import { IEventProcessor, NotificationPayload } from "../types"; +import { WebhookClientRepository } from "../database/webhookClientRepository"; export const DepositStatusEvent = ss.object({ originChainId: ss.number(), @@ -24,18 +26,22 @@ export type DepositStatusFilter = ss.Infer; export type Dependencies = { notify: (params: NotificationPayload) => void; postgres: DataSource; + logger: Logger; }; export class DepositStatusProcessor implements IEventProcessor { private webhookRequests: WebhookRequestRepository; + private webhookClientsRepository: WebhookClientRepository; private notify: (params: NotificationPayload) => void; - private postgres: DataSource; + private logger: Logger; + constructor( deps: Dependencies, private type: string = "DepositStatus", ) { this.webhookRequests = new WebhookRequestRepository(deps.postgres); + this.webhookClientsRepository = new WebhookClientRepository(deps.postgres); this.notify = deps.notify; - this.postgres = deps.postgres; + this.logger = deps.logger; } private async _write(event: DepositStatusEvent): Promise { const filter = customId( @@ -43,37 +49,67 @@ export class DepositStatusProcessor implements IEventProcessor { event.originChainId, event.depositTxHash, ); - const hooks = + const webhookRequests = await this.webhookRequests.findWebhookRequestsByFilter(filter); + const clientIds = webhookRequests.map((hook) => hook.clientId); + const clients = await Promise.all( + clientIds.map((id) => + this.webhookClientsRepository.getWebhookClientById(id), + ), + ); + const clientsMap = clients + .filter((client) => client !== undefined) + .reduce( + (acc, client) => { + acc[client.id] = client; + return acc; + }, + {} as Record, + ); + //TODO: unregister any hooks where event has reached terminal state await Promise.all( - hooks.map((hook) => { - this.notify({ - url: hook.url, - data: event, - }); + webhookRequests.map((hook) => { + const client = clientsMap[hook.clientId]; + if (client) { + this.notify({ + url: hook.url, + data: { ...event, webhookRequestId: hook.id }, + apiKey: client.apiKey, + }); + } else { + this.logger.error({ + at: "DepositStatusProcessor::_write", + message: `Client not found for webhook request ${hook.id}`, + webhookRequest: hook, + }); + } }), ); } + write(e: unknown) { this._write(ss.create(e, DepositStatusEvent)).catch((err) => console.error(err), ); } + private async _register( id: string, url: string, params: DepositStatusFilter, - clientId?: string, + clientId: number, ): Promise { const filter = customId( this.type, - clientId ?? "", params.originChainId, params.depositTxHash, ); const existingFilters = - await this.webhookRequests.findWebhookRequestsByFilter(filter); + await this.webhookRequests.findWebhookRequestsByFilterAndClient( + filter, + clientId, + ); assert( existingFilters.length === 0, "Webhook already exists for this filter", @@ -84,21 +120,9 @@ export class DepositStatusProcessor implements IEventProcessor { url, clientId, }); - // const relayHashInfoRepository = this.postgres.getRepository( - // entities.RelayHashInfo, - // ); - // const relayHashInfo = await relayHashInfoRepository.findOne({ - // where: params, - // }); - // if (relayHashInfo) - // this._write({ - // depositId: relayHashInfo.depositId, - // status: relayHashInfo.status, - // ...params, - // }); return id; } - async register(id: string, url: string, params: unknown, clientId?: string) { + async register(id: string, url: string, params: unknown, clientId: number) { return this._register( id, url, diff --git a/packages/webhooks/src/factory.ts b/packages/webhooks/src/factory.ts index 7a99fc0..1d5023e 100644 --- a/packages/webhooks/src/factory.ts +++ b/packages/webhooks/src/factory.ts @@ -1,38 +1,42 @@ import assert from "assert"; -import { EventProcessorManager } from "./eventProcessorManager"; -import { DataSource } from "@repo/indexer-database"; import { Logger } from "winston"; +import { Redis } from "ioredis"; + +import { DataSource } from "@repo/indexer-database"; +import { EventProcessorManager } from "./eventProcessorManager"; import { WebhookNotifier } from "./notifier"; import { DepositStatusProcessor } from "./eventProcessors"; import { WebhookRouter } from "./router"; +import { WebhooksQueuesService } from "./adapter/messaging/WebhooksQueuesService"; +import { WebhookRequestWorker } from "./adapter/messaging/WebhookRequestWorker"; export enum WebhookTypes { DepositStatus = "DepositStatus", } export type Config = { - requireApiKey: boolean; enabledWebhooks: WebhookTypes[]; + enabledWebhookRequestWorkers: boolean; }; type Dependencies = { postgres: DataSource; + redis: Redis; logger: Logger; }; export function WebhookFactory(config: Config, deps: Dependencies) { - const { logger, postgres } = deps; + const { logger, postgres, redis } = deps; const notifier = new WebhookNotifier({ logger }); assert( config.enabledWebhooks.length, "No webhooks enabled, specify one in config", ); - const eventProcessorManager = new EventProcessorManager( - config ?? { requireApiKey: false }, - { - postgres, - logger, - }, - ); + const webhooksQueuesService = new WebhooksQueuesService(redis); + const eventProcessorManager = new EventProcessorManager({ + postgres, + logger, + webhooksQueuesService, + }); config.enabledWebhooks.forEach((name) => { switch (name) { // add more webhook types here @@ -42,6 +46,7 @@ export function WebhookFactory(config: Config, deps: Dependencies) { new DepositStatusProcessor( { postgres, + logger, notify: notifier.notify, }, WebhookTypes.DepositStatus, @@ -54,6 +59,14 @@ export function WebhookFactory(config: Config, deps: Dependencies) { } } }); + if (config.enabledWebhookRequestWorkers) { + new WebhookRequestWorker( + redis, + postgres, + logger, + eventProcessorManager.write, + ); + } const router = WebhookRouter({ eventProcessorManager }); return { write: eventProcessorManager.write, diff --git a/packages/webhooks/src/notifier.ts b/packages/webhooks/src/notifier.ts index 810ed2b..7c89666 100644 --- a/packages/webhooks/src/notifier.ts +++ b/packages/webhooks/src/notifier.ts @@ -10,7 +10,9 @@ export type Dependencies = { export class BaseNotifier { private logger: Logger; - constructor(private deps: Dependencies) {} + constructor(private deps: Dependencies) { + this.logger = deps.logger; + } public notify = (payload: NotificationPayload): void => { this.deps.notify(payload).catch((error) => { diff --git a/packages/webhooks/src/router.ts b/packages/webhooks/src/router.ts index 60f677c..b7d7a2d 100644 --- a/packages/webhooks/src/router.ts +++ b/packages/webhooks/src/router.ts @@ -27,17 +27,20 @@ export function WebhookRouter(deps: Dependencies): express.Router { router.post( "/webhook", async ( - req: express.Request & { token?: string }, + req: express.Request, res: express.Response, next: express.NextFunction, ) => { try { const parsedBody = RegistrationParams.create(req.body); + if (!req.token) { + throw new Error("API Key required"); + } const id = uuidv4(); await deps.eventProcessorManager.registerWebhook( id, parsedBody, - req.token, + req.token as string, ); res.status(201).send(id); } catch (error) { diff --git a/packages/webhooks/src/types.ts b/packages/webhooks/src/types.ts index dedc0a5..cfb74bb 100644 --- a/packages/webhooks/src/types.ts +++ b/packages/webhooks/src/types.ts @@ -6,7 +6,7 @@ export interface IEventProcessor { id: string, url: string, params: JSONValue, - clientId?: string, + clientId: number, ): Promise; unregister(id: string): Promise; } @@ -22,6 +22,7 @@ export type JSONValue = export type NotificationPayload = { url: string; data: JSONValue; + apiKey: string; }; export const RegistrationParams = ss.object({ diff --git a/packages/webhooks/src/utils.ts b/packages/webhooks/src/utils.ts index dd6a0ca..2bde3ed 100644 --- a/packages/webhooks/src/utils.ts +++ b/packages/webhooks/src/utils.ts @@ -1,10 +1,11 @@ import { NotificationPayload } from "./types"; export async function post(params: NotificationPayload): Promise { - const { url, data } = params; + const { url, data, apiKey } = params; const response = await fetch(url, { method: "POST", headers: { "Content-Type": "application/json", + Authorization: `Bearer ${apiKey}`, }, body: JSON.stringify(data), }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c9d722f..04ef2f9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -457,12 +457,18 @@ importers: '@repo/indexer-database': specifier: workspace:* version: link:../indexer-database + bullmq: + specifier: ^5.12.12 + version: 5.12.14 express: specifier: ^4.19.2 version: 4.21.1 express-bearer-token: specifier: ^3.0.0 version: 3.0.0 + ioredis: + specifier: ^5.4.1 + version: 5.4.1 redis: specifier: ^4.7.0 version: 4.7.0