Skip to content

Commit

Permalink
[OTE-135] add roundtable task for CLOSE_ONLY -> BLOCKED compliance st…
Browse files Browse the repository at this point in the history
…atus transition (#1034)
  • Loading branch information
dydxwill authored Feb 5, 2024
1 parent 087e939 commit 7ec713e
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 0 deletions.
1 change: 1 addition & 0 deletions indexer/packages/postgres/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export * as FundingIndexUpdatesTable from './stores/funding-index-updates-table'
export * as LiquidityTiersTable from './stores/liquidity-tiers-table';
export * as WalletTable from './stores/wallet-table';
export * as ComplianceTable from './stores/compliance-table';
export * as ComplianceStatusTable from './stores/compliance-status-table';
export * as TradingRewardTable from './stores/trading-reward-table';
export * as TradingRewardAggregationTable from './stores/trading-reward-aggregation-table';

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import {
dbHelpers,
testConstants,
ComplianceStatusTable,
ComplianceStatus,
testMocks,
} from '@dydxprotocol-indexer/postgres';
import performComplianceStatusTransitionsTask from '../../src/tasks/perform-compliance-status-transitions';
import { logger, stats } from '@dydxprotocol-indexer/base';
import config from '../../src/config';
import { DateTime } from 'luxon';

describe('update-close-only-status', () => {
beforeAll(async () => {
await dbHelpers.migrate();
await dbHelpers.clearData();
});

beforeEach(async () => {
await testMocks.seedData();
jest.spyOn(stats, 'increment');
jest.spyOn(stats, 'timing');
jest.spyOn(stats, 'gauge');
jest.spyOn(logger, 'error');
});

afterEach(async () => {
await dbHelpers.clearData();
jest.clearAllMocks();
});

afterAll(async () => {
await dbHelpers.teardown();
jest.resetAllMocks();
});

it('succeeds with no CLOSE_ONLY addresses', async () => {
await performComplianceStatusTransitionsTask();

// Assert no addresses were updated
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.num_stale_close_only_updated.count`,
0,
);
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.num_stale_close_only.count`,
0,
);
});

it('updates CLOSE_ONLY addresses older than 7 days to BLOCKED', async () => {
config.CLOSE_ONLY_TO_BLOCKED_DAYS = 7;
// Seed database with CLOSE_ONLY compliance status older than 7 days
const oldUpdatedAt = DateTime.utc().minus({ days: 8 }).toISO();
const newTs = DateTime.utc().toISO();
await Promise.all([
ComplianceStatusTable.create({
address: testConstants.blockedAddress,
status: ComplianceStatus.CLOSE_ONLY,
createdAt: oldUpdatedAt,
updatedAt: oldUpdatedAt,
}),
ComplianceStatusTable.create({
address: testConstants.defaultAddress,
status: ComplianceStatus.CLOSE_ONLY,
createdAt: newTs,
updatedAt: newTs,
}),
]);

await performComplianceStatusTransitionsTask();

// Assert the status was updated to BLOCKED
const updatedStatus = await ComplianceStatusTable.findAll(
{ address: [testConstants.blockedAddress] },
[],
{},
);
expect(updatedStatus[0].status).toEqual(ComplianceStatus.BLOCKED);
expect(updatedStatus[0].updatedAt).not.toEqual(oldUpdatedAt);
const nonUpdatedStatus = await ComplianceStatusTable.findAll(
{ address: [testConstants.defaultAddress] },
[],
{},
);
expect(nonUpdatedStatus[0]).toEqual(expect.objectContaining({
address: testConstants.defaultAddress,
status: ComplianceStatus.CLOSE_ONLY,
createdAt: newTs,
updatedAt: newTs,
}));

// Assert the stats were correctly recorded
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.num_stale_close_only_updated.count`,
1,
);
expect(stats.gauge).toHaveBeenCalledWith(
`${config.SERVICE_NAME}.num_stale_close_only.count`,
1,
);
});
});
4 changes: 4 additions & 0 deletions indexer/services/roundtable/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ export const configSchema = {
LOOPS_INTERVAL_MS_AGGREGATE_TRADING_REWARDS: parseInteger({
default: THIRTY_SECONDS_IN_MILLISECONDS,
}),
LOOPS_INTERVAL_MS_PERFORM_COMPLIANCE_STATUS_TRANSITIONS: parseInteger({
default: ONE_HOUR_IN_MILLISECONDS,
}),

// Start delay
START_DELAY_ENABLED: parseBoolean({ default: true }),
Expand Down Expand Up @@ -141,6 +144,7 @@ export const configSchema = {
MAX_COMPLIANCE_DATA_QUERY_PER_LOOP: parseInteger({ default: 100 }),
COMPLIANCE_PROVIDER_QUERY_BATCH_SIZE: parseInteger({ default: 100 }),
COMPLIANCE_PROVIDER_QUERY_DELAY_MS: parseInteger({ default: ONE_SECOND_IN_MILLISECONDS }),
CLOSE_ONLY_TO_BLOCKED_DAYS: parseInteger({ default: 7 }),

// Remove old cached order updates
OLD_CACHED_ORDER_UPDATES_WINDOW_MS: parseInteger({ default: 30 * ONE_SECOND_IN_MILLISECONDS }),
Expand Down
7 changes: 7 additions & 0 deletions indexer/services/roundtable/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import deleteOldFastSyncSnapshots from './tasks/delete-old-fast-sync-snapshots';
import deleteZeroPriceLevelsTask from './tasks/delete-zero-price-levels';
import marketUpdaterTask from './tasks/market-updater';
import orderbookInstrumentationTask from './tasks/orderbook-instrumentation';
import performComplianceStatusTransitionsTask from './tasks/perform-compliance-status-transitions';
import removeExpiredOrdersTask from './tasks/remove-expired-orders';
import removeOldOrderUpdatesTask from './tasks/remove-old-order-updates';
import takeFastSyncSnapshotTask from './tasks/take-fast-sync-snapshot';
Expand Down Expand Up @@ -126,6 +127,12 @@ async function start(): Promise<void> {
config.LOOPS_INTERVAL_MS_UPDATE_COMPLIANCE_DATA,
);

startLoop(
() => performComplianceStatusTransitionsTask(),
'update_compliance_status',
config.LOOPS_INTERVAL_MS_PERFORM_COMPLIANCE_STATUS_TRANSITIONS,
);

if (config.LOOPS_ENABLED_TRACK_LAG) {
startLoop(
trackLag,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import {
ONE_DAY_IN_MILLISECONDS,
stats,
} from '@dydxprotocol-indexer/base';
import {
ComplianceStatusFromDatabase,
ComplianceStatusTable,
ComplianceStatus,
} from '@dydxprotocol-indexer/postgres';

import config from '../config';

// eslint-disable-next-line max-len
const CLOSE_ONLY_TO_BLOCKED_DAYS_IN_MS: number = config.CLOSE_ONLY_TO_BLOCKED_DAYS * ONE_DAY_IN_MILLISECONDS;

export default async function runTask(): Promise<void> {
const queryStart: number = Date.now();

// Query for addresses with status CLOSE_ONLY and updatedAt less than NOW() - INTERVAL days
const staleCloseOnlyAddresses: ComplianceStatusFromDatabase[] = await
ComplianceStatusTable.findAll(
{
status: ComplianceStatus.CLOSE_ONLY,
updatedBeforeOrAt: new Date(
queryStart - CLOSE_ONLY_TO_BLOCKED_DAYS_IN_MS,
).toISOString(),
},
[],
{
readReplica: true,
},
);
stats.timing(`${config.SERVICE_NAME}.query_stale_close_only.timing`, Date.now() - queryStart);

const updateStart: number = Date.now();
const addressesToUpdate: string[] = staleCloseOnlyAddresses.map(
(record: ComplianceStatusFromDatabase) => record.address,
);

// Update addresses status to BLOCKED
const updatedAddresses: ComplianceStatusFromDatabase[] = await Promise.all(
addressesToUpdate.map((address) => ComplianceStatusTable.update({
address,
status: ComplianceStatus.BLOCKED,
updatedAt: new Date().toISOString(),
}),
),
) as ComplianceStatusFromDatabase[];

stats.timing(
`${config.SERVICE_NAME}.update_stale_close_only.timing`,
Date.now() - updateStart,
);
stats.gauge(`${config.SERVICE_NAME}.num_stale_close_only.count`, addressesToUpdate.length);
stats.gauge(`${config.SERVICE_NAME}.num_stale_close_only_updated.count`, updatedAddresses.length);
}

0 comments on commit 7ec713e

Please sign in to comment.