Skip to content

Commit

Permalink
feat: queue chunk downloads when data roots don't match
Browse files Browse the repository at this point in the history
  * Rename BundleDataImporter to DataImporter
  * Instantiate separate dataImporter verificationDataImporter
  * Update log messages to refer to “contiguous data” instead of “bundles”
  * Only queue for unbundling after download if ans104 unbundler is
    passed
  * Set ENABLE_BACKGROUND_DATA_VERIFICATION to true by default
  * Add VERIFICATION_DATA_IMPORTER_QUEUE_SIZE for dataVerification
    dataImport queue
  • Loading branch information
hlolli committed Jan 28, 2025
1 parent 2325880 commit bc40819
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 30 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ services:
- MAX_DATA_ITEM_QUEUE_SIZE=${MAX_DATA_ITEM_QUEUE_SIZE:-}
- TAG_SELECTIVITY=${TAG_SELECTIVITY:-}
- MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS=${MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS:-}
- ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-}
- ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-true}
- BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-}
- CLICKHOUSE_URL=${CLICKHOUSE_URL:-}
- BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-}
Expand Down
2 changes: 1 addition & 1 deletion docs/envs.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ This document describes the environment variables that can be used to configure
| WRITE_ANS104_DATA_ITEM_DB_SIGNATURES | Boolean | false | If true, the data item signatures will be written to the database. |
| WRITE_TRANSACTION_DB_SIGNATURES | Boolean | false | If true, the transactions signatures will be written to the database. |
| ENABLE_DATA_DB_WAL_CLEANUP | Boolean | false | If true, the data database WAL cleanup worker will be enabled |
| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | false | If true, unverified data will be verified in background |
| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | true | If true, unverified data will be verified in background |
| MAX_DATA_ITEM_QUEUE_SIZE | Number | 100000 | Sets the maximum number of data items to queue for indexing before skipping indexing new data items |
| ARNS_ROOT_HOST | String | undefined | Domain name for ArNS host |
| SANDBOX_PROTOCOL | String | undefined | Protocol setting in process of creating sandbox domain in ArNS (ARNS_ROOT_HOST needs to be set for this env to have any effect) |
Expand Down
6 changes: 6 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,12 @@ export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault(
'1000',
);

// The maximum number of data imports to queue for verification puroses
export const VERIFICATION_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault(
'VERIFICATION_DATA_IMPORTER_QUEUE_SIZE',
'1000',
);

// The maximum number of data items indexed to flush stable data items
export const DATA_ITEM_FLUSH_COUNT_THRESHOLD = +env.varOrDefault(
'DATA_ITEM_FLUSH_COUNT_THRESHOLD',
Expand Down
15 changes: 13 additions & 2 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ReadThroughChunkDataCache } from './data/read-through-chunk-data-cache.
import { ReadThroughDataCache } from './data/read-through-data-cache.js';
import { SequentialDataSource } from './data/sequential-data-source.js';
import { TxChunksDataSource } from './data/tx-chunks-data-source.js';
import { BundleDataImporter } from './workers/bundle-data-importer.js';
import { DataImporter } from './workers/data-importer.js';
import { CompositeClickHouseDatabase } from './database/composite-clickhouse.js';
import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js';
import * as events from './events.js';
Expand Down Expand Up @@ -472,7 +472,16 @@ metrics.registerQueueLengthGauge('ans104Unbundler', {
length: () => ans104Unbundler.queueDepth(),
});

export const bundleDataImporter = new BundleDataImporter({
export const verificationDataImporter = new DataImporter({
log,
contiguousDataSource: txChunksDataSource,
workerCount: config.ANS104_DOWNLOAD_WORKERS,
maxQueueSize: config.VERIFICATION_DATA_IMPORTER_QUEUE_SIZE,
});
metrics.registerQueueLengthGauge('verificationDataImporter', {
length: () => verificationDataImporter.queueDepth(),
});
export const bundleDataImporter = new DataImporter({
log,
contiguousDataSource: backgroundContiguousDataSource,
ans104Unbundler,
Expand Down Expand Up @@ -682,6 +691,7 @@ const dataVerificationWorker = config.ENABLE_BACKGROUND_DATA_VERIFICATION
log,
contiguousDataIndex,
contiguousDataSource: gatewaysDataSource,
dataImporter: verificationDataImporter,
})
: undefined;

Expand Down Expand Up @@ -720,6 +730,7 @@ export const shutdown = async (exitCode = 0) => {
await txFetcher.stop();
await txOffsetImporter.stop();
await txOffsetRepairWorker.stop();
await verificationDataImporter.stop();
await bundleDataImporter.stop();
await bundleRepairWorker.stop();
await ans104DataIndexer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import {
} from 'node:test';
import winston from 'winston';
import { ContiguousDataSource } from '../types.js';
import { BundleDataImporter } from './bundle-data-importer.js';
import { DataImporter } from './data-importer.js';

class Ans104UnbundlerStub {
async queueItem(): Promise<void> {
Expand All @@ -44,10 +44,10 @@ class Ans104UnbundlerStub {
}
}

describe('BundleDataImporter', () => {
describe('DataImporter', () => {
let log: winston.Logger;
let bundleDataImporter: BundleDataImporter;
let bundleDataImporterWithFullQueue: BundleDataImporter;
let bundleDataImporter: DataImporter;
let bundleDataImporterWithFullQueue: DataImporter;
let contiguousDataSource: ContiguousDataSource;
let ans104Unbundler: any;
let mockItem: any;
Expand Down Expand Up @@ -76,14 +76,14 @@ describe('BundleDataImporter', () => {

beforeEach(() => {
ans104Unbundler = new Ans104UnbundlerStub();
bundleDataImporter = new BundleDataImporter({
bundleDataImporter = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler,
workerCount: 1,
maxQueueSize: 1,
});
bundleDataImporterWithFullQueue = new BundleDataImporter({
bundleDataImporterWithFullQueue = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler,
Expand Down Expand Up @@ -142,7 +142,7 @@ describe('BundleDataImporter', () => {
describe('download', () => {
it('should download and queue the item for unbundling', async () => {
mock.method(ans104Unbundler, 'queueItem');
bundleDataImporter = new BundleDataImporter({
bundleDataImporter = new DataImporter({
log,
contiguousDataSource,
ans104Unbundler: ans104Unbundler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,27 @@ interface IndexProperty {
index: number;
}

type AnyContiguousData = { id: string };
type UnbundleableItem = (NormalizedDataItem | PartialJsonTransaction) &
IndexProperty;
type ImportableItem = AnyContiguousData | UnbundleableItem;

interface UnbundlingQueueItem {
item: UnbundleableItem;
interface DataImporterQueueItem {
item: ImportableItem;
prioritized: boolean | undefined;
bypassFilter: boolean;
}

export class BundleDataImporter {
export class DataImporter {
// Dependencies
private log: winston.Logger;
private contiguousDataSource: ContiguousDataSource;
private ans104Unbundler: Ans104Unbundler;
private ans104Unbundler: Ans104Unbundler | undefined;

// Unbundling queue
// Contiguous data queue
private workerCount: number;
private maxQueueSize: number;
private queue: queueAsPromised<UnbundlingQueueItem, void>;
private queue: queueAsPromised<DataImporterQueueItem, void>;

constructor({
log,
Expand All @@ -60,13 +62,15 @@ export class BundleDataImporter {
}: {
log: winston.Logger;
contiguousDataSource: ContiguousDataSource;
ans104Unbundler: Ans104Unbundler;
ans104Unbundler?: Ans104Unbundler;
workerCount: number;
maxQueueSize?: number;
}) {
this.log = log.child({ class: this.constructor.name });
this.contiguousDataSource = contiguousDataSource;
this.ans104Unbundler = ans104Unbundler;
if (ans104Unbundler) {
this.ans104Unbundler = ans104Unbundler;
}
this.workerCount = workerCount;
this.maxQueueSize = maxQueueSize;
this.queue = fastq.promise(
Expand All @@ -76,47 +80,56 @@ export class BundleDataImporter {
}

async queueItem(
item: UnbundleableItem,
item: ImportableItem,
prioritized: boolean | undefined,
bypassFilter = false,
): Promise<void> {
const log = this.log.child({ method: 'queueItem', id: item.id });
if (this.workerCount === 0) {
log.debug('Skipping bundle download, no workers.');
log.debug('Skipping contiguous-data download, no workers.');
return;
}

if (prioritized === true) {
log.debug('Queueing prioritized bundle download...');
log.debug('Queueing prioritized contiguous data download...');
this.queue.unshift({ item, prioritized, bypassFilter });
log.debug('Prioritized bundle download queued.');
log.debug('Prioritized contiguous data download queued.');
} else if (this.queue.length() < this.maxQueueSize) {
log.debug('Queueing bundle download...');
log.debug('Queueing contiguous data download...');
this.queue.push({ item, prioritized, bypassFilter });
log.debug('Bundle download queued.');
log.debug('Contiguous data download queued.');
} else {
log.debug('Skipping bundle download, queue is full.');
log.debug('Skipping contiguous data download, queue is full.');
}
}

async download({
item,
prioritized,
bypassFilter,
}: UnbundlingQueueItem): Promise<void> {
}: DataImporterQueueItem): Promise<void> {
const log = this.log.child({ method: 'download', id: item.id });

const data = await this.contiguousDataSource.getData({ id: item.id });

return new Promise((resolve, reject) => {
data.stream.on('end', () => {
log.debug('Bundle data downloaded complete. Queuing for unbundling..');
this.ans104Unbundler.queueItem(item, prioritized, bypassFilter);
const isUnbundleableItem = this.isUnbundleableItem(item);
if (this.ans104Unbundler && isUnbundleableItem) {
log.debug('Data download completed. Queuing for unbundling...');
this.ans104Unbundler.queueItem(item, prioritized, bypassFilter);
} else {
log.debug(
isUnbundleableItem
? 'Data download completed, skipping unbundling because unbundler is not available'
: 'Data download completed, marked as any contiguous tx/data-item, skipping unbundling',
);
}
resolve();
});

data.stream.on('error', (error) => {
log.error('Error downloading bundle data.', {
log.error('Error downloading data.', {
message: error.message,
stack: error.stack,
});
Expand All @@ -141,4 +154,8 @@ export class BundleDataImporter {
async isQueueFull(): Promise<boolean> {
return this.queue.length() >= this.maxQueueSize;
}

isUnbundleableItem(item: ImportableItem): item is UnbundleableItem {
return Object.keys(item).length > 1 && 'index' in item;
}
}
13 changes: 13 additions & 0 deletions src/workers/data-verification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import { default as fastq } from 'fastq';
import type { queueAsPromised } from 'fastq';
import * as winston from 'winston';
import { DataImporter } from './data-importer.js';
import { ContiguousDataIndex, ContiguousDataSource } from '../types.js';
import { DataRootComputer } from '../lib/data-root.js';
import * as config from '../config.js';
Expand All @@ -28,6 +29,7 @@ export class DataVerificationWorker {
private log: winston.Logger;
private contiguousDataIndex: ContiguousDataIndex;
private dataRootComputer: DataRootComputer;
private dataImporter: DataImporter | undefined;

private workerCount: number;
private queue: queueAsPromised<string, void | boolean>;
Expand All @@ -38,13 +40,15 @@ export class DataVerificationWorker {
log,
contiguousDataIndex,
contiguousDataSource,
dataImporter,
workerCount = config.BACKGROUND_DATA_VERIFICATION_WORKER_COUNT,
streamTimeout = config.BACKGROUND_DATA_VERIFICATION_STREAM_TIMEOUT_MS,
interval = config.BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS * 1000,
}: {
log: winston.Logger;
contiguousDataIndex: ContiguousDataIndex;
contiguousDataSource: ContiguousDataSource;
dataImporter?: DataImporter;
workerCount?: number;
streamTimeout?: number;
interval?: number;
Expand All @@ -63,6 +67,8 @@ export class DataVerificationWorker {
workerCount,
streamTimeout,
});

this.dataImporter = dataImporter;
}

async start(): Promise<void> {
Expand Down Expand Up @@ -129,6 +135,13 @@ export class DataVerificationWorker {
indexedDataRoot,
computedDataRoot,
});
if (this.dataImporter) {
log.debug(
'Because of data-root mismatch, we are queueing data item for reimport.',
{ id },
);
await this.dataImporter.queueItem({ id }, true);
}
return false;
}

Expand Down

0 comments on commit bc40819

Please sign in to comment.