-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: migrate SpokePoolIndexer to new Indexer #64
Changes from 10 commits
6fe522d
e16254a
4aceea4
90795c5
44b2e1e
4788c6f
7a84ff2
2661bb6
c0a937b
6f74a82
9a6ca81
0a12b96
ed1db3e
c49c632
5f2e92f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
import { MigrationInterface, QueryRunner } from "typeorm"; | ||
|
||
export class SpokePoolFinalised1728296909794 implements MigrationInterface { | ||
name = "SpokePoolFinalised1728296909794"; | ||
|
||
public async up(queryRunner: QueryRunner): Promise<void> { | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."v3_funds_deposited" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."filled_v3_relay" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."requested_v3_slow_fill" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."requested_speed_up_v3_deposit" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."relayed_root_bundle" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."executed_relayer_refund_root" ADD "finalised" boolean NOT NULL`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."tokens_bridged" ADD "finalised" boolean NOT NULL`, | ||
); | ||
} | ||
|
||
public async down(queryRunner: QueryRunner): Promise<void> { | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."tokens_bridged" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."executed_relayer_refund_root" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."relayed_root_bundle" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."requested_speed_up_v3_deposit" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."requested_v3_slow_fill" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."filled_v3_relay" DROP COLUMN "finalised"`, | ||
); | ||
await queryRunner.query( | ||
`ALTER TABLE "evm"."v3_funds_deposited" DROP COLUMN "finalised"`, | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,10 +7,9 @@ export class SpokePoolRepository extends utils.BaseRepository { | |
constructor( | ||
postgres: DataSource, | ||
logger: winston.Logger, | ||
throwError: boolean, | ||
private chunkSize = 2000, | ||
) { | ||
super(postgres, logger, throwError); | ||
super(postgres, logger, true); | ||
} | ||
|
||
private formatRelayData( | ||
|
@@ -34,28 +33,34 @@ export class SpokePoolRepository extends utils.BaseRepository { | |
v3FundsDepositedEvents: (across.interfaces.DepositWithBlock & { | ||
integratorId: string | undefined; | ||
})[], | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = v3FundsDepositedEvents.map((event) => { | ||
return { | ||
...event, | ||
relayHash: getRelayHashFromEvent(event), | ||
...this.formatRelayData(event), | ||
quoteTimestamp: new Date(event.quoteTimestamp * 1000), | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insert(entities.V3FundsDeposited, eventsChunk, throwError), | ||
this.insertWithFinalisationCheck( | ||
entities.V3FundsDeposited, | ||
eventsChunk, | ||
["depositId", "originChainId"], | ||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveFilledV3RelayEvents( | ||
filledV3RelayEvents: across.interfaces.FillWithBlock[], | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = filledV3RelayEvents.map((event) => { | ||
return { | ||
|
@@ -67,33 +72,47 @@ export class SpokePoolRepository extends utils.BaseRepository { | |
updatedOutputAmount: | ||
event.relayExecutionInfo.updatedOutputAmount.toString(), | ||
}, | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insert(entities.FilledV3Relay, eventsChunk, throwError), | ||
this.insertWithFinalisationCheck( | ||
entities.FilledV3Relay, | ||
eventsChunk, | ||
["depositId", "originChainId"], | ||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveRequestedV3SlowFillEvents( | ||
requestedV3SlowFillEvents: across.interfaces.SlowFillRequestWithBlock[], | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = requestedV3SlowFillEvents.map((event) => { | ||
return { | ||
...event, | ||
relayHash: getRelayHashFromEvent(event), | ||
...this.formatRelayData(event), | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
return this.insert( | ||
entities.RequestedV3SlowFill, | ||
formattedEvents, | ||
throwError, | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insertWithFinalisationCheck( | ||
entities.RequestedV3SlowFill, | ||
eventsChunk, | ||
["depositId", "originChainId"], | ||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveRequestedSpeedUpV3Events( | ||
|
@@ -102,7 +121,7 @@ export class SpokePoolRepository extends utils.BaseRepository { | |
[depositId: number]: across.interfaces.SpeedUpWithBlock[]; | ||
}; | ||
}, | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = Object.values(requestedSpeedUpV3Events).flatMap( | ||
(eventsByDepositId) => | ||
|
@@ -111,56 +130,100 @@ export class SpokePoolRepository extends utils.BaseRepository { | |
return { | ||
...event, | ||
updatedOutputAmount: event.updatedOutputAmount.toString(), | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}), | ||
), | ||
); | ||
await this.insert( | ||
entities.RequestedSpeedUpV3Deposit, | ||
formattedEvents, | ||
throwError, | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insertWithFinalisationCheck( | ||
entities.RequestedSpeedUpV3Deposit, | ||
eventsChunk, | ||
["depositId", "originChainId"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add updatedOutputAmount, updatedMessage and updatedRecipient? Taking into account there could be more than one speedUp event for a single deposit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added the transaction hash as the 3rd component of the unique key 👍 |
||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveRelayedRootBundleEvents( | ||
relayedRootBundleEvents: across.interfaces.RootBundleRelayWithBlock[], | ||
chainId: number, | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = relayedRootBundleEvents.map((event) => { | ||
return { ...event, chainId }; | ||
return { | ||
...event, | ||
chainId, | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
await this.insert(entities.RelayedRootBundle, formattedEvents, throwError); | ||
|
||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insertWithFinalisationCheck( | ||
entities.RelayedRootBundle, | ||
eventsChunk, | ||
["chainId", "rootBundleId"], | ||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveExecutedRelayerRefundRootEvents( | ||
executedRelayerRefundRootEvents: across.interfaces.RelayerRefundExecutionWithBlock[], | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = executedRelayerRefundRootEvents.map((event) => { | ||
return { | ||
...event, | ||
amountToReturn: event.amountToReturn.toString(), | ||
refundAmounts: event.refundAmounts.map((amount) => amount.toString()), | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
return this.insert( | ||
entities.ExecutedRelayerRefundRoot, | ||
formattedEvents, | ||
throwError, | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insertWithFinalisationCheck( | ||
entities.ExecutedRelayerRefundRoot, | ||
eventsChunk, | ||
["chainId", "rootBundleId", "leafId"], | ||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
|
||
public async formatAndSaveTokensBridgedEvents( | ||
tokensBridgedEvents: across.interfaces.TokensBridged[], | ||
throwError?: boolean, | ||
lastFinalisedBlock: number, | ||
) { | ||
const formattedEvents = tokensBridgedEvents.map((event) => { | ||
return { | ||
...event, | ||
amountToReturn: event.amountToReturn.toString(), | ||
finalised: event.blockNumber <= lastFinalisedBlock, | ||
}; | ||
}); | ||
await this.insert(entities.TokensBridged, formattedEvents, throwError); | ||
const chunkedEvents = across.utils.chunk(formattedEvents, this.chunkSize); | ||
const savedEvents = await Promise.all( | ||
chunkedEvents.map((eventsChunk) => | ||
this.insertWithFinalisationCheck( | ||
entities.TokensBridged, | ||
eventsChunk, | ||
["chainId", "leafId", "l2TokenAddress", "transactionHash"], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Good observation. and yes, the delete unfinalised query should handle it |
||
lastFinalisedBlock, | ||
), | ||
), | ||
); | ||
return savedEvents.flat(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could override invalid fills that are stored in the database, should we use another set of keys? Maybe the
relayHash
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍