Skip to content

Commit

Permalink
Add config var to exclude specific stateful order ids from being proc…
Browse files Browse the repository at this point in the history
…essed. (backport #2513) (#2514)

Co-authored-by: vincentwschau <[email protected]>
  • Loading branch information
mergify[bot] and vincentwschau authored Oct 18, 2024
1 parent e2298c4 commit ec8b919
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import Long from 'long';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ConditionalOrderPlacementHandler } from '../../../src/handlers/stateful-order/conditional-order-placement-handler';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('conditionalOrderPlacementHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -59,6 +62,7 @@ describe('conditionalOrderPlacementHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -226,4 +230,22 @@ describe('conditionalOrderPlacementHandler', () => {
order!,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!);
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeUndefined();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ import { ORDER_FLAG_CONDITIONAL } from '@dydxprotocol-indexer/v4-proto-parser';
import { ConditionalOrderTriggeredHandler } from '../../../src/handlers/stateful-order/conditional-order-triggered-handler';
import { defaultPerpetualMarket } from '@dydxprotocol-indexer/postgres/build/__tests__/helpers/constants';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('conditionalOrderTriggeredHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -53,6 +56,7 @@ describe('conditionalOrderTriggeredHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -163,4 +167,40 @@ describe('conditionalOrderTriggeredHandler', () => {
`Unable to update order status with orderId: ${orderId}`,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order trigger event (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid(
testConstants.defaultOrderGoodTilBlockTime.subaccountId,
'0',
testConstants.defaultOrderGoodTilBlockTime.clobPairId,
testConstants.defaultOrderGoodTilBlockTime.orderFlags,
);
await OrderTable.create({
...testConstants.defaultOrderGoodTilBlockTime,
orderFlags: conditionalOrderId.orderFlags.toString(),
status: OrderStatus.UNTRIGGERED,
triggerPrice: '1000',
clientId: '0',
});
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);

expect(order).toBeDefined();
expect(order).toEqual(expect.objectContaining({
status: OrderStatus.OPEN,
updatedAt: defaultDateTime.toISO(),
updatedAtHeight: defaultHeight.toString(),
}));
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@ import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderPlacementHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -59,6 +62,7 @@ describe('statefulOrderPlacementHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -250,4 +254,26 @@ describe('statefulOrderPlacementHandler', () => {
});
// TODO[IND-20]: Add tests for vulcan messages
});

it.each([
// TODO(IND-334): Remove after deprecating StatefulOrderPlacementEvent
['stateful order placement as txn event', defaultStatefulOrderEvent, 0],
['stateful long term order placement as txn event', defaultStatefulOrderLongTermEvent, 0],
['stateful order placement as block event', defaultStatefulOrderEvent, -1],
['stateful long term order placement as block event', defaultStatefulOrderLongTermEvent, -1],
])('successfully skips order with %s', async (
_name: string,
statefulOrderEvent: StatefulOrderEventV1,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.orderIdToUuid(defaultOrder.orderId!);
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
statefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeUndefined();
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ import { StatefulOrderRemovalHandler } from '../../../src/handlers/stateful-orde
import { STATEFUL_ORDER_ORDER_FILL_EVENT_TYPE } from '../../../src/constants';
import { producer } from '@dydxprotocol-indexer/kafka';
import { createPostgresFunctions } from '../../../src/helpers/postgres/postgres-functions';
import config from '../../../src/config';

describe('statefulOrderRemovalHandler', () => {
const prevSkippedOrderUUIDs: string = config.SKIP_STATEFUL_ORDER_UUIDS;

beforeAll(async () => {
await dbHelpers.migrate();
await createPostgresFunctions();
Expand All @@ -52,6 +55,7 @@ describe('statefulOrderRemovalHandler', () => {
});

afterEach(async () => {
config.SKIP_STATEFUL_ORDER_UUIDS = prevSkippedOrderUUIDs;
await dbHelpers.clearData();
jest.clearAllMocks();
});
Expand Down Expand Up @@ -153,4 +157,35 @@ describe('statefulOrderRemovalHandler', () => {
`Unable to update order status with orderId: ${orderId}`,
);
});

it.each([
['transaction event', 0],
['block event', -1],
])('successfully skips order removal event (as %s)', async (
_name: string,
transactionIndex: number,
) => {
config.SKIP_STATEFUL_ORDER_UUIDS = OrderTable.uuid(
testConstants.defaultOrder.subaccountId,
'0',
testConstants.defaultOrder.clobPairId,
testConstants.defaultOrder.orderFlags,
);
await OrderTable.create({
...testConstants.defaultOrder,
clientId: '0',
});
const kafkaMessage: KafkaMessage = createKafkaMessageFromStatefulOrderEvent(
defaultStatefulOrderEvent,
transactionIndex,
);

await onMessage(kafkaMessage);
const order: OrderFromDatabase | undefined = await OrderTable.findById(orderId);
expect(order).toBeDefined();
expect(order).toEqual(expect.objectContaining({
...testConstants.defaultOrder,
clientId: '0',
}));
});
});
8 changes: 8 additions & 0 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
parseSchema,
baseConfigSchema,
parseBoolean,
parseString,
} from '@dydxprotocol-indexer/base';
import {
kafkaConfigSchema,
Expand All @@ -23,6 +24,13 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
// Config var to skip processing stateful order events with specific uuids.
// Order UUIDs should be in a string delimited by commas.
// Only set if invalid order events are being included in a block and preventing ender from
// progressing.
SKIP_STATEFUL_ORDER_UUIDS: parseString({
default: '',
}),
};

export default parseSchema(configSchema);
20 changes: 18 additions & 2 deletions indexer/services/ender/src/lib/block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import { KafkaPublisher } from './kafka-publisher';
import { SyncHandlers, SYNCHRONOUS_SUBTYPES } from './sync-handlers';
import {
ConsolidatedKafkaEvent,
DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents,
DydxIndexerSubtypes, EventMessage, EventProtoWithTypeAndVersion, GroupedEvents, SKIPPED_EVENT_SUBTYPE,
} from './types';

const TXN_EVENT_SUBTYPE_VERSION_TO_VALIDATOR_MAPPING: Record<string, ValidatorInitializer> = {
Expand Down Expand Up @@ -216,12 +216,28 @@ export class BlockProcessor {
);
validator.validate();
this.sqlEventPromises[eventProto.blockEventIndex] = validator.getEventForBlockProcessor();
const handlers: Handler<EventMessage>[] = validator.createHandlers(
let handlers: Handler<EventMessage>[] = validator.createHandlers(
eventProto.indexerTendermintEvent,
this.txId,
this.messageReceivedTimestamp,
);

if (validator.shouldExcludeEvent()) {
// If the event should be excluded from being processed, set the subtype to a special value
// for skipped events.
this.block.events[eventProto.blockEventIndex] = {
...this.block.events[eventProto.blockEventIndex],
subtype: SKIPPED_EVENT_SUBTYPE,
};
// Set handlers to empty array if event is to be skipped.
handlers = [];
logger.info({
at: 'onMessage#shouldExcludeEvent',
message: 'Excluded event from processing',
eventProto,
});
}

_.map(handlers, (handler: Handler<EventMessage>) => {
if (SYNCHRONOUS_SUBTYPES.includes(eventProto.type as DydxIndexerSubtypes)) {
this.syncHandlers.addHandler(eventProto.type, handler);
Expand Down
2 changes: 2 additions & 0 deletions indexer/services/ender/src/lib/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ export enum DydxIndexerSubtypes {
UPSERT_VAULT = 'upsert_vault',
}

export const SKIPPED_EVENT_SUBTYPE = 'skipped_event';

// Generic interface used for creating the Handler objects
// eslint-disable-next-line @typescript-eslint/no-explicit-any
export type EventMessage = any;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ BEGIN
rval[i] = dydx_funding_handler(block_height, block_time, event_data, event_index, transaction_index);
WHEN '"upsert_vault"'::jsonb THEN
rval[i] = dydx_vault_upsert_handler(block_time, event_data);
WHEN '"skipped_event"'::jsonb THEN
rval[i] = jsonb_build_object();
ELSE
NULL;
END CASE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ BEGIN
rval[i] = dydx_trading_rewards_handler(block_height, block_time, event_data, event_index, transaction_index, jsonb_array_element_text(block->'txHashes', transaction_index));
WHEN '"register_affiliate"'::jsonb THEN
rval[i] = dydx_register_affiliate_handler(block_height, event_data);
WHEN '"skipped_event"'::jsonb THEN
rval[i] = jsonb_build_object();
ELSE
NULL;
END CASE;
Expand Down
38 changes: 38 additions & 0 deletions indexer/services/ender/src/validators/stateful-order-validator.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { OrderTable } from '@dydxprotocol-indexer/postgres';
import { ORDER_FLAG_CONDITIONAL, ORDER_FLAG_LONG_TERM } from '@dydxprotocol-indexer/v4-proto-parser';
import {
IndexerTendermintEvent,
Expand All @@ -13,6 +14,7 @@ import {
} from '@dydxprotocol-indexer/v4-protos';
import Long from 'long';

import config from '../config';
import { Handler, HandlerInitializer } from '../handlers/handler';
import { ConditionalOrderPlacementHandler } from '../handlers/stateful-order/conditional-order-placement-handler';
import { ConditionalOrderTriggeredHandler } from '../handlers/stateful-order/conditional-order-triggered-handler';
Expand Down Expand Up @@ -233,4 +235,40 @@ export class StatefulOrderValidator extends Validator<StatefulOrderEventV1> {

return [handler];
}

/**
* Skip order uuids in config env var.
*/
public shouldExcludeEvent(): boolean {
const orderUUIDsToSkip: string[] = config.SKIP_STATEFUL_ORDER_UUIDS.split(',');
if (orderUUIDsToSkip.length === 0) {
return false;
}

const orderUUIDStoSkipSet: Set<string> = new Set(orderUUIDsToSkip);
if (orderUUIDStoSkipSet.has(this.getOrderUUId())) {
return true;
}

return false;
}

/**
* Gets order uuid for the event being validated.
* Assumes events are valid.
*/
private getOrderUUId(): string {
if (this.event.orderPlace !== undefined) {
return OrderTable.orderIdToUuid(this.event.orderPlace.order!.orderId!);
} else if (this.event.orderRemoval !== undefined) {
return OrderTable.orderIdToUuid(this.event.orderRemoval.removedOrderId!);
} else if (this.event.conditionalOrderPlacement !== undefined) {
return OrderTable.orderIdToUuid(this.event.conditionalOrderPlacement.order!.orderId!);
} else if (this.event.conditionalOrderTriggered !== undefined) {
return OrderTable.orderIdToUuid(this.event.conditionalOrderTriggered.triggeredOrderId!);
} else if (this.event.longTermOrderPlacement !== undefined) {
return OrderTable.orderIdToUuid(this.event.longTermOrderPlacement.order!.orderId!);
}
return '';
}
}
9 changes: 9 additions & 0 deletions indexer/services/ender/src/validators/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ export abstract class Validator<T extends object> {
txId: number,
messageReceivedTimestamp: string,
): Handler<EventMessage>[];

/**
* Allows aribtrary logic to exclude events from being processed.
* Defaults to no events being excluded.
* @returns
*/
public shouldExcludeEvent(): boolean {
return false;
}
}

0 comments on commit ec8b919

Please sign in to comment.