Skip to content

Commit

Permalink
feat: integrate webhooks in the indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
amateima committed Nov 22, 2024
1 parent e543d4e commit 89cb8c1
Show file tree
Hide file tree
Showing 24 changed files with 451 additions and 106 deletions.
4 changes: 2 additions & 2 deletions packages/indexer-api/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ export async function Main(
const redis = await initializeRedis(redisConfig, logger);
const webhooks = Webhooks.WebhookFactory(
{
requireApiKey: false,
enabledWebhooks: [Webhooks.WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
},
{ postgres, logger },
{ postgres, logger, redis },
);

const allRouters: Record<string, Router> = {
Expand Down
11 changes: 6 additions & 5 deletions packages/indexer-database/src/entities/WebhookClient.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { Entity, PrimaryColumn, Column, PrimaryGeneratedColumn } from "typeorm";
import { Entity, Column, PrimaryGeneratedColumn, Unique } from "typeorm";

@Entity()
@Unique("UK_webhook_client_api_key", ["apiKey"])
export class WebhookClient {
@PrimaryGeneratedColumn()
id: number;

@Column()
name: string;

@PrimaryGeneratedColumn()
id: string;

@Column({ unique: true })
@Column()
apiKey: string;

@Column("jsonb")
Expand Down
18 changes: 15 additions & 3 deletions packages/indexer-database/src/entities/WebhookRequest.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
import { Entity, PrimaryColumn, Column } from "typeorm";
import {
Entity,
PrimaryColumn,
Column,
Unique,
CreateDateColumn,
Index,
} from "typeorm";

@Entity()
@Unique("UK_webhook_request_clientId_filter", ["clientId", "filter"])
@Index("IX_webhook_request_filter", ["filter"])
export class WebhookRequest {
@PrimaryColumn()
id: string;

@Column({ type: "integer" })
clientId: number;

@Column()
url: string;

@Column()
filter: string;

@Column({ type: "text", nullable: true, default: undefined })
clientId?: string | undefined;
@CreateDateColumn()
createdAt: Date;
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export const createDataSource = (config: DatabaseConfig): DataSource => {
entities.RootBundleExecutedJoinTable,
// Others
entities.RelayHashInfo,
// Webhooks
entities.WebhookRequest,
entities.WebhookClient,
],
migrationsTableName: "_migrations",
migrations: ["migrations/*.ts"],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookClient1732297474910 implements MigrationInterface {
name = "WebhookClient1732297474910";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "webhook_client" (
"id" SERIAL NOT NULL,
"name" character varying NOT NULL,
"apiKey" character varying NOT NULL,
"domains" jsonb NOT NULL,
CONSTRAINT "UK_webhook_client_api_key" UNIQUE ("apiKey"),
CONSTRAINT "PK_f7330fb3bdb2e19534eae691d44" PRIMARY KEY ("id")
)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "webhook_client"`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookRequest1732297948190 implements MigrationInterface {
name = "WebhookRequest1732297948190";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "webhook_request" (
"id" character varying NOT NULL,
"clientId" integer NOT NULL,
"url" character varying NOT NULL,
"filter" character varying NOT NULL,
"createdAt" TIMESTAMP NOT NULL DEFAULT now(),
CONSTRAINT "UK_webhook_request_clientId_filter" UNIQUE ("clientId", "filter"),
CONSTRAINT "PK_67a7784045de2d1b7139b611b93" PRIMARY KEY ("id")
)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "webhook_request"`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class WebhookRequest1732310112989 implements MigrationInterface {
name = "WebhookRequest1732310112989";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`CREATE INDEX "IX_webhook_request_filter" ON "webhook_request" ("filter") `,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP INDEX "public"."IX_webhook_request_filter"`);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Logger } from "winston";

import { DataSource } from "@repo/indexer-database";
import { eventProcessorManager } from "@repo/webhooks";

import { Config } from "../../parseEnv";
import { HubPoolRepository } from "../../database/HubPoolRepository";
Expand Down Expand Up @@ -39,6 +40,7 @@ export class AcrossIndexerManager {
private spokePoolRepository: SpokePoolRepository,
private redisCache: RedisCache,
private indexerQueuesService: IndexerQueuesService,
private webhookWriteFn?: eventProcessorManager.WebhookWriteFn,
) {}

public async start() {
Expand Down Expand Up @@ -93,7 +95,12 @@ export class AcrossIndexerManager {
this.hubPoolClientFactory,
this.spokePoolClientFactory,
this.spokePoolRepository,
new SpokePoolProcessor(this.postgres, this.logger, chainId),
new SpokePoolProcessor(
this.postgres,
this.logger,
chainId,
this.webhookWriteFn,
),
this.indexerQueuesService,
);
const spokePoolIndexer = new Indexer(
Expand Down
5 changes: 3 additions & 2 deletions packages/indexer/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
// Call write to kick off webhook calls
const { write } = WebhookFactory(
{
requireApiKey: false,
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: true,
},
{ postgres, logger },
{ postgres, logger, redis },
);
// Retry providers factory
const retryProvidersFactory = new RetryProvidersFactory(
Expand Down Expand Up @@ -96,6 +96,7 @@ export async function Main(config: parseEnv.Config, logger: winston.Logger) {
new SpokePoolRepository(postgres, logger),
redisCache,
indexerQueuesService,
write,
);
const bundleServicesManager = new BundleServicesManager(
config,
Expand Down
65 changes: 56 additions & 9 deletions packages/indexer/src/services/spokePoolProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { utils } from "@across-protocol/sdk";
import winston from "winston";

import {
DataSource,
entities,
utils as dbUtils,
SaveQueryResultType,
} from "@repo/indexer-database";
import winston from "winston";
import { WebhookTypes, eventProcessorManager } from "@repo/webhooks";

import { RelayStatus } from "../../../indexer-database/dist/src/entities";
import { StoreEventsResult } from "../data-indexing/service/SpokePoolIndexerDataHandler";

Expand All @@ -22,6 +25,7 @@ export class SpokePoolProcessor {
private readonly postgres: DataSource,
private readonly logger: winston.Logger,
private readonly chainId: number,
private readonly webhookWriteFn?: eventProcessorManager.WebhookWriteFn,
) {}

public async process(events: StoreEventsResult) {
Expand All @@ -37,9 +41,19 @@ export class SpokePoolProcessor {
SpokePoolEvents.V3FundsDeposited,
[...newDeposits, ...updatedDeposits],
);
// TODO: for new deposits, notify status change to unfilled
// here...

// Notify webhook of new deposits
newDeposits.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.id,
originChainId: deposit.originChainId,
depositTxHash: deposit.transactionHash,
status: RelayStatus.Unfilled,
},
});
});
const newSlowFillRequests = dbUtils.filterSaveQueryResults(
events.slowFillRequests,
SaveQueryResultType.Inserted,
Expand All @@ -52,8 +66,19 @@ export class SpokePoolProcessor {
SpokePoolEvents.RequestedV3SlowFill,
[...newSlowFillRequests, ...updatedSlowFillRequests],
);
// TODO: for new slow fill requests, notify status change to slow fill requested
// here...

// Notify webhook of new slow fill requests
newSlowFillRequests.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.id,
originChainId: deposit.originChainId,
depositTxHash: deposit.transactionHash,
status: RelayStatus.SlowFillRequested,
},
});
});

const newFills = dbUtils.filterSaveQueryResults(
events.fills,
Expand All @@ -67,16 +92,38 @@ export class SpokePoolProcessor {
...newFills,
...updatedFills,
]);
// TODO: for new fills, notify status change to filled
// here...

// Notify webhook of new fills
newFills.forEach((fill) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: fill.depositId,
originChainId: fill.originChainId,
depositTxHash: fill.transactionHash,
status: RelayStatus.Filled,
},
});
});

const expiredDeposits = await this.updateExpiredRelays();
// TODO: for expired deposits, notify status change to expired
// here...

const refundedDeposits = await this.updateRefundedDepositsStatus();
// TODO: for refunded deposits, notify status change to refunded
// here...

// Notify webhook of refunded deposits
refundedDeposits.forEach((deposit) => {
this.webhookWriteFn?.({
type: WebhookTypes.DepositStatus,
event: {
depositId: deposit.depositId,
originChainId: deposit.originChainId,
depositTxHash: deposit.depositTxHash,
status: RelayStatus.Refunded,
},
});
});
}

/**
Expand Down
5 changes: 2 additions & 3 deletions packages/webhooks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ The `factory.ts` file provides a `WebhookFactory` function that sets up the webh
To use the `WebhookFactory`, you need to provide a configuration object and dependencies:

- **Config**: This object should include:
- requireApiKey: boolean; - Should registration of new webhooks require an api key
- enabledWebhooks: WebhookTypes[]; - What event processors should be enabled, like 'DepositStatus'

- **Dependencies**: This object should include:
Expand All @@ -27,8 +26,8 @@ import { DataSource } from "@repo/indexer-database";

const webhooks = WebhookFactory(
{
requireApiKey: false,
enableWebhooks: [WebhookTypes.DepositStatus],
enabledWebhooks: [WebhookTypes.DepositStatus],
enabledWebhookRequestWorkers: false,
},
{ postgres, logger },
);
Expand Down
2 changes: 2 additions & 0 deletions packages/webhooks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
"license": "ISC",
"dependencies": {
"@repo/indexer-database": "workspace:*",
"bullmq": "^5.12.12",
"express": "^4.19.2",
"express-bearer-token": "^3.0.0",
"ioredis": "^5.4.1",
"redis": "^4.7.0",
"superstruct": "2.0.3-1",
"uuid": "^11.0.3"
Expand Down
Loading

0 comments on commit 89cb8c1

Please sign in to comment.