Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: migrate SpokePoolIndexer to new Indexer #64

2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ services:
container_name: redis_cache
volumes:
- indexer-redis-volume:/data
ports:
- 6379:6379

# Indexer Service
indexer-scraper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ export class ExecutedRelayerRefundRoot {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/evm/FilledV3Relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ export class FilledV3Relay {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ export class RelayedRootBundle {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ export class RequestedSpeedUpV3Deposit {
@Column()
logIndex: number;

@Column()
finalised: boolean;

@Column()
blockNumber: number;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ export class RequestedV3SlowFill {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
3 changes: 3 additions & 0 deletions packages/indexer-database/src/entities/evm/TokensBridged.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ export class TokensBridged {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ export class V3FundsDeposited {
@Column()
blockNumber: number;

@Column()
finalised: boolean;

@CreateDateColumn()
createdAt: Date;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class SpokePoolFinalised1728296909794 implements MigrationInterface {
name = "SpokePoolFinalised1728296909794";

public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "evm"."v3_funds_deposited" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."filled_v3_relay" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."requested_v3_slow_fill" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."requested_speed_up_v3_deposit" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."relayed_root_bundle" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."executed_relayer_refund_root" ADD "finalised" boolean NOT NULL`,
);
await queryRunner.query(
`ALTER TABLE "evm"."tokens_bridged" ADD "finalised" boolean NOT NULL`,
);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(
`ALTER TABLE "evm"."tokens_bridged" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."executed_relayer_refund_root" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."relayed_root_bundle" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."requested_speed_up_v3_deposit" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."requested_v3_slow_fill" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."filled_v3_relay" DROP COLUMN "finalised"`,
);
await queryRunner.query(
`ALTER TABLE "evm"."v3_funds_deposited" DROP COLUMN "finalised"`,
);
}
}
26 changes: 26 additions & 0 deletions packages/indexer-database/src/utils/BaseRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,30 @@ export class BaseRepository {
}
}
}

protected async insertWithFinalisationCheck<Entity extends ObjectLiteral>(
entity: EntityTarget<Entity>,
data: Partial<Entity>[],
uniqueKeys: (keyof Entity)[],
lastFinalisedBlock: number,
) {
const repository = this.postgres.getRepository(entity);
const uniqueKeysAsStrings = uniqueKeys.map((key) => key.toString());

const savedData = await repository
.createQueryBuilder()
.insert()
.values(data)
.orUpdate(Object.keys(data[0] as any), uniqueKeysAsStrings)
.returning("*")
.execute();
await repository
.createQueryBuilder()
.delete()
.where("finalised = false")
.andWhere("blockNumber <= :lastFinalisedBlock", { lastFinalisedBlock })
.execute();

return savedData.generatedMaps as Entity[];
}
}
115 changes: 89 additions & 26 deletions packages/indexer/src/database/SpokePoolRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ export class SpokePoolRepository extends utils.BaseRepository {
constructor(
postgres: DataSource,
logger: winston.Logger,
throwError: boolean,
private chunkSize = 2000,
) {
super(postgres, logger, throwError);
super(postgres, logger, true);
}

private formatRelayData(
Expand All @@ -34,28 +33,34 @@ export class SpokePoolRepository extends utils.BaseRepository {
v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & {
integratorId: string | undefined;
})[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = v3FundsDepositedEvents.map((event) => {
return {
...event,
relayHash: getRelayHashFromEvent(event),
...this.formatRelayData(event),
quoteTimestamp: new Date(event.quoteTimestamp * 1000),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insert(entities.V3FundsDeposited, eventsChunk, throwError),
this.insertWithFinalisationCheck(
entities.V3FundsDeposited,
eventsChunk,
["depositId", "originChainId"],
lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveFilledV3RelayEvents(
filledV3RelayEvents: across.interfaces.FillWithBlock[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = filledV3RelayEvents.map((event) => {
return {
Expand All @@ -67,33 +72,47 @@ export class SpokePoolRepository extends utils.BaseRepository {
updatedOutputAmount:
event.relayExecutionInfo.updatedOutputAmount.toString(),
},
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insert(entities.FilledV3Relay, eventsChunk, throwError),
this.insertWithFinalisationCheck(
entities.FilledV3Relay,
eventsChunk,
["depositId", "originChainId"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could override invalid fills that are stored in the database, should we use another set of keys? Maybe the relayHash

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveRequestedV3SlowFillEvents(
requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = requestedV3SlowFillEvents.map((event) => {
return {
...event,
relayHash: getRelayHashFromEvent(event),
...this.formatRelayData(event),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
return this.insert(
entities.RequestedV3SlowFill,
formattedEvents,
throwError,
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insertWithFinalisationCheck(
entities.RequestedV3SlowFill,
eventsChunk,
["depositId", "originChainId"],
lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveRequestedSpeedUpV3Events(
Expand All @@ -102,7 +121,7 @@ export class SpokePoolRepository extends utils.BaseRepository {
[depositId: number]: across.interfaces.SpeedUpWithBlock[];
};
},
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = Object.values(requestedSpeedUpV3Events).flatMap(
(eventsByDepositId) =>
Expand All @@ -111,56 +130,100 @@ export class SpokePoolRepository extends utils.BaseRepository {
return {
...event,
updatedOutputAmount: event.updatedOutputAmount.toString(),
finalised: event.blockNumber <= lastFinalisedBlock,
};
}),
),
);
await this.insert(
entities.RequestedSpeedUpV3Deposit,
formattedEvents,
throwError,
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insertWithFinalisationCheck(
entities.RequestedSpeedUpV3Deposit,
eventsChunk,
["depositId", "originChainId"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add updatedOutputAmount, updatedMessage and updatedRecipient? Taking into account there could be more than one speedUp event for a single deposit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the transaction hash as the 3rd component of the unique key 👍

lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveRelayedRootBundleEvents(
relayedRootBundleEvents: across.interfaces.RootBundleRelayWithBlock[],
chainId: number,
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = relayedRootBundleEvents.map((event) => {
return { ...event, chainId };
return {
...event,
chainId,
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.RelayedRootBundle, formattedEvents, throwError);

const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insertWithFinalisationCheck(
entities.RelayedRootBundle,
eventsChunk,
["chainId", "rootBundleId"],
lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveExecutedRelayerRefundRootEvents(
executedRelayerRefundRootEvents: across.interfaces.RelayerRefundExecutionWithBlock[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = executedRelayerRefundRootEvents.map((event) => {
return {
...event,
amountToReturn: event.amountToReturn.toString(),
refundAmounts: event.refundAmounts.map((amount) => amount.toString()),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
return this.insert(
entities.ExecutedRelayerRefundRoot,
formattedEvents,
throwError,
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insertWithFinalisationCheck(
entities.ExecutedRelayerRefundRoot,
eventsChunk,
["chainId", "rootBundleId", "leafId"],
lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}

public async formatAndSaveTokensBridgedEvents(
tokensBridgedEvents: across.interfaces.TokensBridged[],
throwError?: boolean,
lastFinalisedBlock: number,
) {
const formattedEvents = tokensBridgedEvents.map((event) => {
return {
...event,
amountToReturn: event.amountToReturn.toString(),
finalised: event.blockNumber <= lastFinalisedBlock,
};
});
await this.insert(entities.TokensBridged, formattedEvents, throwError);
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize);
const savedEvents = await Promise.all(
chunkedEvents.map((eventsChunk) =>
this.insertWithFinalisationCheck(
entities.TokensBridged,
eventsChunk,
["chainId", "leafId", "l2TokenAddress", "transactionHash"],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If transactionHash changes due to the reorg, I'm thinking we might end having both the first event we saw and the finalised one in the database. But the delete unfinalised query should handle this case, right?

Copy link
Contributor Author

@amateima amateima Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the delete unfinalised query should handle this case, right?

Good observation. and yes, the delete unfinalised query should handle it

lastFinalisedBlock,
),
),
);
return savedEvents.flat();
}
}
Loading