Skip to content

Commit

Permalink
[CT-521] move funding handler from unordered_handlers to ordered_hand…
Browse files Browse the repository at this point in the history
…lers (#1097)
  • Loading branch information
dydxwill authored Feb 27, 2024
1 parent 37cf36d commit e5cb3ef
Showing 11 changed files with 209 additions and 62 deletions.
43 changes: 43 additions & 0 deletions indexer/services/ender/__tests__/handlers/funding-handler.test.ts
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import { FundingHandler } from '../../src/handlers/funding-handler';
import {
defaultFundingRateEvent,
defaultFundingUpdateSampleEvent,
defaultFundingUpdateSampleEventWithAdditionalMarket,
defaultHeight,
defaultPreviousHeight,
defaultTime,
@@ -167,6 +168,48 @@ describe('fundingHandler', () => {
);
});

it('successfully processes and clears cache for a new funding rate with both existing/non-existent market',
async () => {
const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({
fundingEvents: [defaultFundingUpdateSampleEventWithAdditionalMarket],
height: defaultHeight,
time: defaultTime,
});

await onMessage(kafkaMessage);

await expectNextFundingRate(
'BTC-USD',
new Big(protocolTranslations.funding8HourValuePpmTo1HourRate(
defaultFundingUpdateSampleEvent.updates[0].fundingValuePpm,
)),
);

const kafkaMessage2: KafkaMessage = createKafkaMessageFromFundingEvents({
fundingEvents: [defaultFundingRateEvent],
height: 4,
time: defaultTime,
});

await onMessage(kafkaMessage2);
await expectNextFundingRate(
'BTC-USD',
undefined,
);
const fundingIndices: FundingIndexUpdatesFromDatabase[] = await
FundingIndexUpdatesTable.findAll({}, [], {});

expect(fundingIndices.length).toEqual(1);
expect(fundingIndices[0]).toEqual(expect.objectContaining({
perpetualId: '0',
rate: '0.00000125',
oraclePrice: '10000',
fundingIndex: '0.1',
}));
expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update_event', 0.1, { ticker: 'BTC-USD' });
expect(stats.gauge).toHaveBeenCalledWith('ender.funding_index_update', 0.1, { ticker: 'BTC-USD' });
});

it('successfully processes and clears cache for a new funding rate', async () => {
const kafkaMessage: KafkaMessage = createKafkaMessageFromFundingEvents({
fundingEvents: [defaultFundingUpdateSampleEvent],
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@ import {
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import { expectPerpetualMarketMatchesEvent } from '../helpers/postgres-helpers';

describe('perpetualMarketHandler', () => {
beforeAll(async () => {
@@ -164,23 +165,6 @@ describe('perpetualMarketHandler', () => {
});
});

function expectPerpetualMarketMatchesEvent(
perpetual: PerpetualMarketCreateEventV1,
perpetualMarket: PerpetualMarketFromDatabase,
) {
expect(perpetualMarket).toEqual(expect.objectContaining({
id: perpetual.id.toString(),
clobPairId: perpetual.clobPairId.toString(),
ticker: perpetual.ticker,
marketId: perpetual.marketId,
quantumConversionExponent: perpetual.quantumConversionExponent,
atomicResolution: perpetual.atomicResolution,
subticksPerTick: perpetual.subticksPerTick,
stepBaseQuantums: Number(perpetual.stepBaseQuantums),
liquidityTierId: perpetual.liquidityTier,
}));
}

function createKafkaMessageFromPerpetualMarketEvent({
perpetualMarketEvent,
transactionIndex,
16 changes: 16 additions & 0 deletions indexer/services/ender/__tests__/helpers/constants.ts
Original file line number Diff line number Diff line change
@@ -72,6 +72,22 @@ export const defaultFundingUpdateSampleEvent: FundingEventMessage = {
],
};

export const defaultFundingUpdateSampleEventWithAdditionalMarket: FundingEventMessage = {
type: FundingEventV1_Type.TYPE_PREMIUM_SAMPLE,
updates: [
{
perpetualId: 0,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
{
perpetualId: 99999,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
],
};

export const defaultFundingRateEvent: FundingEventMessage = {
type: FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX,
updates: [
19 changes: 19 additions & 0 deletions indexer/services/ender/__tests__/helpers/postgres-helpers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { PerpetualMarketFromDatabase } from '@dydxprotocol-indexer/postgres';
import { PerpetualMarketCreateEventV1 } from '@dydxprotocol-indexer/v4-protos';

export function expectPerpetualMarketMatchesEvent(
perpetual: PerpetualMarketCreateEventV1,
perpetualMarket: PerpetualMarketFromDatabase,
) {
expect(perpetualMarket).toEqual(expect.objectContaining({
id: perpetual.id.toString(),
clobPairId: perpetual.clobPairId.toString(),
ticker: perpetual.ticker,
marketId: perpetual.marketId,
quantumConversionExponent: perpetual.quantumConversionExponent,
atomicResolution: perpetual.atomicResolution,
subticksPerTick: perpetual.subticksPerTick,
stepBaseQuantums: Number(perpetual.stepBaseQuantums),
liquidityTierId: perpetual.liquidityTier,
}));
}
80 changes: 80 additions & 0 deletions indexer/services/ender/__tests__/lib/on-message.test.ts
Original file line number Diff line number Diff line change
@@ -7,7 +7,11 @@ import {
dbHelpers,
IsoString,
LiquidityTiersTable,
liquidityTierRefresher,
MarketTable,
Ordering,
PerpetualMarketColumns,
PerpetualMarketFromDatabase,
perpetualMarketRefresher,
PerpetualMarketTable,
TendermintEventFromDatabase,
@@ -22,6 +26,7 @@ import {
IndexerTendermintBlock,
IndexerTendermintEvent,
MarketEventV1,
PerpetualMarketCreateEventV1,
SubaccountMessage,
SubaccountUpdateEventV1,
Timestamp,
@@ -41,12 +46,14 @@ import {
defaultFundingUpdateSampleEvent,
defaultHeight,
defaultMarketModify,
defaultPerpetualMarketCreateEvent,
defaultPreviousHeight,
defaultSubaccountMessage,
} from '../helpers/constants';
import { updateBlockCache } from '../../src/caches/block-cache';
import Long from 'long';
import { createPostgresFunctions } from '../../src/helpers/postgres/postgres-functions';
import { expectPerpetualMarketMatchesEvent } from '../helpers/postgres-helpers';

describe('on-message', () => {
let producerSendMock: jest.SpyInstance;
@@ -126,6 +133,12 @@ describe('on-message', () => {
defaultMarketModify,
).finish());

const defaultPerpetualMarketEventBinary: Uint8Array = Uint8Array.from(
PerpetualMarketCreateEventV1.encode(
defaultPerpetualMarketCreateEvent,
).finish(),
);

it('successfully processes block with transaction event', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;
@@ -239,6 +252,73 @@ describe('on-message', () => {
expect.any(Number), 1, { success: 'true' });
});

it('successfully processes block with market create and its funding events', async () => {
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
MarketTable.create(testConstants.defaultMarket2),
]);
await Promise.all([
LiquidityTiersTable.create(testConstants.defaultLiquidityTier),
LiquidityTiersTable.create(testConstants.defaultLiquidityTier2),
]);
await Promise.all([
PerpetualMarketTable.create(testConstants.defaultPerpetualMarket2),
]);
await Promise.all([
perpetualMarketRefresher.updatePerpetualMarkets(),
liquidityTierRefresher.updateLiquidityTiers(),
]);

const transactionIndex: number = -1;
const eventIndex: number = 0;
const events: IndexerTendermintEvent[] = [
createIndexerTendermintEvent(
DydxIndexerSubtypes.PERPETUAL_MARKET,
defaultPerpetualMarketEventBinary,
0,
eventIndex,
),
createIndexerTendermintEvent(
DydxIndexerSubtypes.FUNDING,
defaultFundingEventBinary,
transactionIndex,
eventIndex + 1,
),
];

const block: IndexerTendermintBlock = createIndexerTendermintBlock(
defaultHeight,
defaultTime,
events,
[defaultTxHash],
);
const binaryBlock: Uint8Array = Uint8Array.from(IndexerTendermintBlock.encode(block).finish());
const kafkaMessage: KafkaMessage = createKafkaMessage(Buffer.from(binaryBlock));

await onMessage(kafkaMessage);
await Promise.all([
expectTendermintEvent(defaultHeight.toString(), 0, eventIndex),
expectTendermintEvent(defaultHeight.toString(), transactionIndex, eventIndex + 1),
expectTransactionWithHash([defaultTxHash]),
expectBlock(defaultHeight.toString(), defaultDateTime.toISO()),
]);

const newPerpetualMarkets: PerpetualMarketFromDatabase[] = await PerpetualMarketTable.findAll(
{},
[], {
orderBy: [[PerpetualMarketColumns.id, Ordering.ASC]],
});
expect(newPerpetualMarkets.length).toEqual(2);
expectPerpetualMarketMatchesEvent(defaultPerpetualMarketCreateEvent, newPerpetualMarkets[0]);

expect(stats.increment).toHaveBeenCalledWith('ender.received_kafka_message', 1);
expect(stats.timing).toHaveBeenCalledWith(
'ender.message_time_in_queue', expect.any(Number), 1, { topic: KafkaTopics.TO_ENDER });
expect(stats.gauge).toHaveBeenCalledWith('ender.processing_block_height', expect.any(Number));
expect(stats.timing).toHaveBeenCalledWith('ender.processed_block.timing',
expect.any(Number), 1, { success: 'true' });
});

it('successfully processes block with funding event', async () => {
await Promise.all([
MarketTable.create(testConstants.defaultMarket),
Original file line number Diff line number Diff line change
@@ -49,50 +49,50 @@ describe('funding-validator', () => {
expectDidntLogError();
});

it.each([
// Base Validation Errors
[
'does not specify valid type',
{
type: FundingEventV1_Type.TYPE_UNSPECIFIED,
updates: [
{
perpetualId: 0,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
],
} as FundingEventV1,
'Invalid FundingEvent, type must be TYPE_PREMIUM_SAMPLE or TYPE_FUNDING_RATE_AND_INDEX',
],
// Perpetual market doesn't exist
[
'perpetual market does not exist',
{
type: FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX,
updates: [
{
perpetualId: 10,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
],
} as FundingEventV1,
'Invalid FundingEvent, perpetualId does not exist',
],
])('throws error if event %s', (_message: string, event: FundingEventV1, message: string) => {
it('does not throw error if perpetualId does not exist', () => {
const event: FundingEventV1 = {
type: FundingEventV1_Type.TYPE_FUNDING_RATE_AND_INDEX,
updates: [
{
perpetualId: 10,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
],
} as FundingEventV1;
const validator: FundingValidator = new FundingValidator(
event,
createBlock(event),
0,
);

const errMsg: string = 'Invalid FundingEvent, perpetualId does not exist';
expect(() => validator.validate()).not.toThrow(new ParseMessageError(errMsg));
expect(logger.error).toHaveBeenCalledWith({
at: `${FundingValidator.name}#validate`,
message: errMsg,
blockHeight: defaultHeight,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
event,
});
});

it('throws error if event does not specify valid type', () => {
const event = {
type: FundingEventV1_Type.TYPE_UNSPECIFIED,
updates: [
{
perpetualId: 0,
fundingValuePpm: 10,
fundingIndex: bigIntToBytes(BigInt(0)),
},
],
} as FundingEventV1;
const message = 'Invalid FundingEvent, type must be TYPE_PREMIUM_SAMPLE or TYPE_FUNDING_RATE_AND_INDEX';
const validator = new FundingValidator(event, createBlock(event), 0);

expect(() => validator.validate()).toThrow(new ParseMessageError(message));
expectLoggedParseMessageError(
FundingValidator.name,
message,
{ event },
);
expectLoggedParseMessageError(FundingValidator.name, message, { event });
});
});
});
1 change: 1 addition & 0 deletions indexer/services/ender/src/lib/sync-handlers.ts
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ export const SYNCHRONOUS_SUBTYPES: DydxIndexerSubtypes[] = [
DydxIndexerSubtypes.PERPETUAL_MARKET,
DydxIndexerSubtypes.UPDATE_PERPETUAL,
DydxIndexerSubtypes.UPDATE_CLOB_PAIR,
DydxIndexerSubtypes.FUNDING,
];

/**
Original file line number Diff line number Diff line change
@@ -53,6 +53,8 @@ BEGIN
rval[i] = dydx_update_perpetual_handler(event_data);
WHEN '"update_clob_pair"'::jsonb THEN
rval[i] = dydx_update_clob_pair_handler(event_data);
WHEN '"funding_values"'::jsonb THEN
rval[i] = dydx_funding_handler(block_height, block_time, event_data, event_index, transaction_index);
ELSE
NULL;
END CASE;
Original file line number Diff line number Diff line change
@@ -56,8 +56,6 @@ BEGIN
rval[i] = dydx_transfer_handler(block_height, block_time, event_data, event_index, transaction_index, jsonb_array_element_text(block->'txHashes', transaction_index));
WHEN '"stateful_order"'::jsonb THEN
rval[i] = dydx_stateful_order_handler(block_height, block_time, event_data);
WHEN '"funding_values"'::jsonb THEN
rval[i] = dydx_funding_handler(block_height, block_time, event_data, event_index, transaction_index);
WHEN '"deleveraging"'::jsonb THEN
rval[i] = dydx_deleveraging_handler(block_height, block_time, event_data, event_index, transaction_index, jsonb_array_element_text(block->'txHashes', transaction_index));
WHEN '"trading_reward"'::jsonb THEN
Original file line number Diff line number Diff line change
@@ -38,7 +38,8 @@ BEGIN
perpetual_market_id = (funding_update->'perpetualId')::bigint;
SELECT * INTO perpetual_market_record FROM perpetual_markets WHERE "id" = perpetual_market_id;
IF NOT FOUND THEN
errors_response = array_append(errors_response, 'Received FundingUpdate with unknown perpetualId.');
errors_response = array_append(errors_response, '"Received FundingUpdate with unknown perpetualId."'::jsonb);
CONTINUE;
END IF;

perpetual_markets_response = jsonb_set(perpetual_markets_response, ARRAY[(perpetual_market_record."id")::text], dydx_to_jsonb(perpetual_market_record));
@@ -81,7 +82,7 @@ BEGIN
funding_update_response = jsonb_set(funding_update_response, ARRAY[(funding_index_updates_record."perpetualId")::text], dydx_to_jsonb(funding_index_updates_record));

ELSE
errors_response = array_append(errors_response, 'Received unknown FundingEvent type.');
errors_response = array_append(errors_response, '"Received unknown FundingEvent type."'::jsonb);
CONTINUE;
END CASE;

Loading

0 comments on commit e5cb3ef

Please sign in to comment.