From 0f8efed57597fc5f0e689dc055cd5e660e756e10 Mon Sep 17 00:00:00 2001 From: vincentwschau <99756290+vincentwschau@users.noreply.github.com> Date: Thu, 17 Oct 2024 17:53:50 -0400 Subject: [PATCH] Add config var to exclude specific stateful order ids from being processed. (#2513) --- ...onditional-order-placement-handler.test.ts | 22 ++++++++++ ...onditional-order-triggered-handler.test.ts | 40 +++++++++++++++++++ .../stateful-order-placement-handler.test.ts | 26 ++++++++++++ .../stateful-order-removal-handler.test.ts | 35 ++++++++++++++++ indexer/services/ender/src/config.ts | 8 ++++ .../services/ender/src/lib/block-processor.ts | 20 +++++++++- indexer/services/ender/src/lib/types.ts | 2 + .../dydx_block_processor_ordered_handlers.sql | 2 + ...ydx_block_processor_unordered_handlers.sql | 2 + .../validators/stateful-order-validator.ts | 38 ++++++++++++++++++ .../ender/src/validators/validator.ts | 9 +++++ 11 files changed, 202 insertions(+), 2 deletions(-) diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts index edc47dfd76..ca5fb27041 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-placement-handler.test.ts @@ -44,8 +44,11 @@ import Long from 'long'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('conditionalOrderPlacementHandler', () => { + const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS; + beforeAll(async () => { await dbHelpers.migrate(); await createPostgresFunctions(); @@ -59,6 +62,7 @@ describe('conditionalOrderPlacementHandler', () => { }); afterEach(async () => { + config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs; await dbHelpers.clearData(); jest.clearAllMocks(); }); @@ -226,4 +230,22 @@ describe('conditionalOrderPlacementHandler', () => { order!, ); }); + + it.each([ + ['transaction event', 0], + ['block event', -1], + ])('successfully skips order (as %s)', async ( + _name: string, + transactionIndex: number, + ) => { + config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!); + const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( + defaultStatefulOrderEvent, + transactionIndex, + ); + + await onMessage(kafkaMessage); + const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); + expect(order).toBeUndefined(); + }); }); diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts index cb5ad98721..9e17cbe521 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/conditional-order-triggered-handler.test.ts @@ -38,8 +38,11 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser'; import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler'; import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('conditionalOrderTriggeredHandler', () => { + const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS; + beforeAll(async () => { await dbHelpers.migrate(); await createPostgresFunctions(); @@ -53,6 +56,7 @@ describe('conditionalOrderTriggeredHandler', () => { }); afterEach(async () => { + config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs; await dbHelpers.clearData(); jest.clearAllMocks(); }); @@ -163,4 +167,40 @@ describe('conditionalOrderTriggeredHandler', () => { `Unable to update order status with orderId: ${orderId}`, ); }); + + it.each([ + ['transaction event', 0], + ['block event', -1], + ])('successfully skips order trigger event (as %s)', async ( + _name: string, + transactionIndex: number, + ) => { + config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid( + testConstants.defaultOrderGoodTilBlockTime.subaccountId, + '0', + testConstants.defaultOrderGoodTilBlockTime.clobPairId, + testConstants.defaultOrderGoodTilBlockTime.orderFlags, + ); + await OrderTable.create({ + ...testConstants.defaultOrderGoodTilBlockTime, + orderFlags: conditionalOrderId.orderFlags.toString(), + status: OrderStatus.UNTRIGGERED, + triggerPrice: '1000', + clientId: '0', + }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( + defaultStatefulOrderEvent, + transactionIndex, + ); + + await onMessage(kafkaMessage); + const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); + + expect(order).toBeDefined(); + expect(order).toEqual(expect.objectContaining({ + status: OrderStatus.OPEN, + updatedAt: defaultDateTime.toISO(), + updatedAtHeight: defaultHeight.toString(), + })); + }); }); diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts index b65e6fe6f6..daa3a746a1 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-placement-handler.test.ts @@ -44,8 +44,11 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('statefulOrderPlacementHandler', () => { + const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS; + beforeAll(async () => { await dbHelpers.migrate(); await createPostgresFunctions(); @@ -59,6 +62,7 @@ describe('statefulOrderPlacementHandler', () => { }); afterEach(async () => { + config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs; await dbHelpers.clearData(); jest.clearAllMocks(); }); @@ -250,4 +254,26 @@ describe('statefulOrderPlacementHandler', () => { }); // TODO[IND-20]: Add tests for vulcan messages }); + + it.each([ + // TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent + ['stateful order placement as txn event', defaultStatefulOrderEvent, 0], + ['stateful long term order placement as txn event', defaultStatefulOrderLongTermEvent, 0], + ['stateful order placement as block event', defaultStatefulOrderEvent, -1], + ['stateful long term order placement as block event', defaultStatefulOrderLongTermEvent, -1], + ])('successfully skips order with %s', async ( + _name: string, + statefulOrderEvent: StatefulOrderEventV1, + transactionIndex: number, + ) => { + config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!); + const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( + statefulOrderEvent, + transactionIndex, + ); + + await onMessage(kafkaMessage); + const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); + expect(order).toBeUndefined(); + }); }); diff --git a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts index 565b361057..135f745256 100644 --- a/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts +++ b/indexer/services/ender/__tests__/handlers/stateful-order/stateful-order-removal-handler.test.ts @@ -37,8 +37,11 @@ import { StatefulOrderRemovalHandler } from '../../../src/handlers/stateful-orde import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants'; import { producer } from '@dydxprotocol-indexer/kafka'; import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions'; +import config from '../../../src/config'; describe('statefulOrderRemovalHandler', () => { + const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS; + beforeAll(async () => { await dbHelpers.migrate(); await createPostgresFunctions(); @@ -52,6 +55,7 @@ describe('statefulOrderRemovalHandler', () => { }); afterEach(async () => { + config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs; await dbHelpers.clearData(); jest.clearAllMocks(); }); @@ -153,4 +157,35 @@ describe('statefulOrderRemovalHandler', () => { `Unable to update order status with orderId: ${orderId}`, ); }); + + it.each([ + ['transaction event', 0], + ['block event', -1], + ])('successfully skips order removal event (as %s)', async ( + _name: string, + transactionIndex: number, + ) => { + config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid( + testConstants.defaultOrder.subaccountId, + '0', + testConstants.defaultOrder.clobPairId, + testConstants.defaultOrder.orderFlags, + ); + await OrderTable.create({ + ...testConstants.defaultOrder, + clientId: '0', + }); + const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent( + defaultStatefulOrderEvent, + transactionIndex, + ); + + await onMessage(kafkaMessage); + const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId); + expect(order).toBeDefined(); + expect(order).toEqual(expect.objectContaining({ + ...testConstants.defaultOrder, + clientId: '0', + })); + }); }); diff --git a/indexer/services/ender/src/config.ts b/indexer/services/ender/src/config.ts index c22122b318..201f7807ee 100644 --- a/indexer/services/ender/src/config.ts +++ b/indexer/services/ender/src/config.ts @@ -6,6 +6,7 @@ import { parseSchema, baseConfigSchema, parseBoolean, + parseString, } from '@dydxprotocol-indexer/base'; import { kafkaConfigSchema, @@ -23,6 +24,13 @@ export const configSchema = { SEND_WEBSOCKET_MESSAGES: parseBoolean({ default: true, }), + // Config var to skip processing stateful order events with specific uuids. + // Order UUIDs should be in a string delimited by commas. + // Only set if invalid order events are being included in a block and preventing ender from + // progressing. + SKIP_STATEFUL_ORDER_UUIDS: parseString({ + default: '', + }), }; export default parseSchema(configSchema); diff --git a/indexer/services/ender/src/lib/block-processor.ts b/indexer/services/ender/src/lib/block-processor.ts index f3ab8ba1e3..6051425b8e 100644 --- a/indexer/services/ender/src/lib/block-processor.ts +++ b/indexer/services/ender/src/lib/block-processor.ts @@ -37,7 +37,7 @@ import { KafkaPublisher } from './kafka-publisher'; import { SyncHandlers, SYNCHRONOUS_SUBTYPES } from './sync-handlers'; import { ConsolidatedKafkaEvent, - DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents, + DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents, SKIPPED_EVENT_SUBTYPE, } from './types'; const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record = { @@ -216,12 +216,28 @@ export class BlockProcessor { ); validator.validate(); this.sqlEventPromises[eventProto.blockEventIndex] = validator.getEventForBlockProcessor(); - const handlers: Handler[] = validator.createHandlers( + let handlers: Handler[] = validator.createHandlers( eventProto.indexerTendermintEvent, this.txId, this.messageReceivedTimestamp, ); + if (validator.shouldExcludeEvent()) { + // If the event should be excluded from being processed, set the subtype to a special value + // for skipped events. + this.block.events[eventProto.blockEventIndex] = { + ...this.block.events[eventProto.blockEventIndex], + subtype: SKIPPED_EVENT_SUBTYPE, + }; + // Set handlers to empty array if event is to be skipped. + handlers = []; + logger.info({ + at: 'onMessage#shouldExcludeEvent', + message: 'Excluded event from processing', + eventProto, + }); + } + _.map(handlers, (handler: Handler) => { if (SYNCHRONOUS_SUBTYPES.includes(eventProto.type as DydxIndexerSubtypes)) { this.syncHandlers.addHandler(eventProto.type, handler); diff --git a/indexer/services/ender/src/lib/types.ts b/indexer/services/ender/src/lib/types.ts index 3150c464fc..b13797bf63 100644 --- a/indexer/services/ender/src/lib/types.ts +++ b/indexer/services/ender/src/lib/types.ts @@ -61,6 +61,8 @@ export enum DydxIndexerSubtypes { UPSERT_VAULT = 'upsert_vault', } +export const SKIPPED_EVENT_SUBTYPE = 'skipped_event'; + // Generic interface used for creating the Handler objects // eslint-disable-next-line @typescript-eslint/no-explicit-any export type EventMessage = any; diff --git a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql index 1a6235d850..8d75ea9fe3 100644 --- a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql +++ b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_ordered_handlers.sql @@ -69,6 +69,8 @@ BEGIN rval[i] = dydx_funding_handler(block_height, block_time, event_data, event_index, transaction_index); WHEN '"upsert_vault"'::jsonb THEN rval[i] = dydx_vault_upsert_handler(block_time, event_data); + WHEN '"skipped_event"'::jsonb THEN + rval[i] = jsonb_build_object(); ELSE NULL; END CASE; diff --git a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql index 9c985c79ca..4b82d32b5a 100644 --- a/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql +++ b/indexer/services/ender/src/scripts/handlers/dydx_block_processor_unordered_handlers.sql @@ -67,6 +67,8 @@ BEGIN rval[i] = dydx_trading_rewards_handler(block_height, block_time, event_data, event_index, transaction_index, jsonb_array_element_text(block->'txHashes', transaction_index)); WHEN '"register_affiliate"'::jsonb THEN rval[i] = dydx_register_affiliate_handler(block_height, event_data); + WHEN '"skipped_event"'::jsonb THEN + rval[i] = jsonb_build_object(); ELSE NULL; END CASE; diff --git a/indexer/services/ender/src/validators/stateful-order-validator.ts b/indexer/services/ender/src/validators/stateful-order-validator.ts index 8507fb6ca7..4561c3525b 100644 --- a/indexer/services/ender/src/validators/stateful-order-validator.ts +++ b/indexer/services/ender/src/validators/stateful-order-validator.ts @@ -1,3 +1,4 @@ +import { OrderTable } from '@dydxprotocol-indexer/postgres'; import { ORDER_FLAG_CONDITIONAL, ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser'; import { IndexerTendermintEvent, @@ -13,6 +14,7 @@ import { } from '@dydxprotocol-indexer/v4-protos'; import Long from 'long'; +import config from '../config'; import { Handler, HandlerInitializer } from '../handlers/handler'; import { ConditionalOrderPlacementHandler } from '../handlers/stateful-order/conditional-order-placement-handler'; import { ConditionalOrderTriggeredHandler } from '../handlers/stateful-order/conditional-order-triggered-handler'; @@ -233,4 +235,40 @@ export class StatefulOrderValidator extends Validator { return [handler]; } + + /** + * Skip order uuids in config env var. + */ + public shouldExcludeEvent(): boolean { + const orderUUIDsToSkip: string[] = config.SKIP_STATEFUL_ORDER_UUIDS.split(','); + if (orderUUIDsToSkip.length === 0) { + return false; + } + + const orderUUIDStoSkipSet: Set = new Set(orderUUIDsToSkip); + if (orderUUIDStoSkipSet.has(this.getOrderUUId())) { + return true; + } + + return false; + } + + /** + * Gets order uuid for the event being validated. + * Assumes events are valid. + */ + private getOrderUUId(): string { + if (this.event.orderPlace !== undefined) { + return OrderTable.orderIdToUuid(this.event.orderPlace.order!.orderId!); + } else if (this.event.orderRemoval !== undefined) { + return OrderTable.orderIdToUuid(this.event.orderRemoval.removedOrderId!); + } else if (this.event.conditionalOrderPlacement !== undefined) { + return OrderTable.orderIdToUuid(this.event.conditionalOrderPlacement.order!.orderId!); + } else if (this.event.conditionalOrderTriggered !== undefined) { + return OrderTable.orderIdToUuid(this.event.conditionalOrderTriggered.triggeredOrderId!); + } else if (this.event.longTermOrderPlacement !== undefined) { + return OrderTable.orderIdToUuid(this.event.longTermOrderPlacement.order!.orderId!); + } + return ''; + } } diff --git a/indexer/services/ender/src/validators/validator.ts b/indexer/services/ender/src/validators/validator.ts index f9c0a53662..e3235fb96f 100644 --- a/indexer/services/ender/src/validators/validator.ts +++ b/indexer/services/ender/src/validators/validator.ts @@ -58,4 +58,13 @@ export abstract class Validator { txId: number, messageReceivedTimestamp: string, ): Handler[]; + + /** + * Allows aribtrary logic to exclude events from being processed. + * Defaults to no events being excluded. + * @returns + */ + public shouldExcludeEvent(): boolean { + return false; + } }