From bc408191173c7a499aa081277fa27eb8f168b546 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Hl=C3=B6=C3=B0ver=20Sigur=C3=B0sson?= Date: Tue, 28 Jan 2025 12:49:12 +0200 Subject: [PATCH] feat: queue chunk downloads when data roots don't match MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Rename BundleDataImporter to DataImporter * Instantiate separate dataImporter verificationDataImporter * Update log messages to refer to “contiguous data” instead of “bundles” * Only queue for unbundling after download if ans104 unbundler is passed * Set ENABLE_BACKGROUND_DATA_VERIFICATION to true by default * Add VERIFICATION_DATA_IMPORTER_QUEUE_SIZE for dataVerification dataImport queue --- docker-compose.yaml | 2 +- docs/envs.md | 2 +- src/config.ts | 6 ++ src/system.ts | 15 ++++- ...importer.test.ts => data-importer.test.ts} | 14 ++--- ...ndle-data-importer.ts => data-importer.ts} | 55 ++++++++++++------- src/workers/data-verification.ts | 13 +++++ 7 files changed, 77 insertions(+), 30 deletions(-) rename src/workers/{bundle-data-importer.test.ts => data-importer.test.ts} (92%) rename src/workers/{bundle-data-importer.ts => data-importer.ts} (67%) diff --git a/docker-compose.yaml b/docker-compose.yaml index a3098d62..491e01a6 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -112,7 +112,7 @@ services: - MAX_DATA_ITEM_QUEUE_SIZE=${MAX_DATA_ITEM_QUEUE_SIZE:-} - TAG_SELECTIVITY=${TAG_SELECTIVITY:-} - MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS=${MAX_EXPECTED_DATA_ITEM_INDEXING_INTERVAL_SECONDS:-} - - ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-} + - ENABLE_BACKGROUND_DATA_VERIFICATION=${ENABLE_BACKGROUND_DATA_VERIFICATION:-true} - BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS=${BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS:-} - CLICKHOUSE_URL=${CLICKHOUSE_URL:-} - BUNDLE_DATA_IMPORTER_QUEUE_SIZE=${BUNDLE_DATA_IMPORTER_QUEUE_SIZE:-} diff --git a/docs/envs.md b/docs/envs.md index 0fcebe10..25a387a4 100644 --- a/docs/envs.md +++ b/docs/envs.md @@ -32,7 +32,7 @@ This document describes the environment variables that can be used to configure | WRITE_ANS104_DATA_ITEM_DB_SIGNATURES | Boolean | false | If true, the data item signatures will be written to the database. | | WRITE_TRANSACTION_DB_SIGNATURES | Boolean | false | If true, the transactions signatures will be written to the database. | | ENABLE_DATA_DB_WAL_CLEANUP | Boolean | false | If true, the data database WAL cleanup worker will be enabled | -| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | false | If true, unverified data will be verified in background | +| ENABLE_BACKGROUND_DATA_VERIFICATION | Boolean | true | If true, unverified data will be verified in background | | MAX_DATA_ITEM_QUEUE_SIZE | Number | 100000 | Sets the maximum number of data items to queue for indexing before skipping indexing new data items | | ARNS_ROOT_HOST | String | undefined | Domain name for ArNS host | | SANDBOX_PROTOCOL | String | undefined | Protocol setting in process of creating sandbox domain in ArNS (ARNS_ROOT_HOST needs to be set for this env to have any effect) | diff --git a/src/config.ts b/src/config.ts index f93fdc22..f7567049 100644 --- a/src/config.ts +++ b/src/config.ts @@ -245,6 +245,12 @@ export const BUNDLE_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault( '1000', ); +// The maximum number of data imports to queue for verification puroses +export const VERIFICATION_DATA_IMPORTER_QUEUE_SIZE = +env.varOrDefault( + 'VERIFICATION_DATA_IMPORTER_QUEUE_SIZE', + '1000', +); + // The maximum number of data items indexed to flush stable data items export const DATA_ITEM_FLUSH_COUNT_THRESHOLD = +env.varOrDefault( 'DATA_ITEM_FLUSH_COUNT_THRESHOLD', diff --git a/src/system.ts b/src/system.ts index 7f48cb17..f81aa765 100644 --- a/src/system.ts +++ b/src/system.ts @@ -29,7 +29,7 @@ import { ReadThroughChunkDataCache } from './data/read-through-chunk-data-cache. import { ReadThroughDataCache } from './data/read-through-data-cache.js'; import { SequentialDataSource } from './data/sequential-data-source.js'; import { TxChunksDataSource } from './data/tx-chunks-data-source.js'; -import { BundleDataImporter } from './workers/bundle-data-importer.js'; +import { DataImporter } from './workers/data-importer.js'; import { CompositeClickHouseDatabase } from './database/composite-clickhouse.js'; import { StandaloneSqliteDatabase } from './database/standalone-sqlite.js'; import * as events from './events.js'; @@ -472,7 +472,16 @@ metrics.registerQueueLengthGauge('ans104Unbundler', { length: () => ans104Unbundler.queueDepth(), }); -export const bundleDataImporter = new BundleDataImporter({ +export const verificationDataImporter = new DataImporter({ + log, + contiguousDataSource: txChunksDataSource, + workerCount: config.ANS104_DOWNLOAD_WORKERS, + maxQueueSize: config.VERIFICATION_DATA_IMPORTER_QUEUE_SIZE, +}); +metrics.registerQueueLengthGauge('verificationDataImporter', { + length: () => verificationDataImporter.queueDepth(), +}); +export const bundleDataImporter = new DataImporter({ log, contiguousDataSource: backgroundContiguousDataSource, ans104Unbundler, @@ -682,6 +691,7 @@ const dataVerificationWorker = config.ENABLE_BACKGROUND_DATA_VERIFICATION log, contiguousDataIndex, contiguousDataSource: gatewaysDataSource, + dataImporter: verificationDataImporter, }) : undefined; @@ -720,6 +730,7 @@ export const shutdown = async (exitCode = 0) => { await txFetcher.stop(); await txOffsetImporter.stop(); await txOffsetRepairWorker.stop(); + await verificationDataImporter.stop(); await bundleDataImporter.stop(); await bundleRepairWorker.stop(); await ans104DataIndexer.stop(); diff --git a/src/workers/bundle-data-importer.test.ts b/src/workers/data-importer.test.ts similarity index 92% rename from src/workers/bundle-data-importer.test.ts rename to src/workers/data-importer.test.ts index 36672fd0..49440e57 100644 --- a/src/workers/bundle-data-importer.test.ts +++ b/src/workers/data-importer.test.ts @@ -28,7 +28,7 @@ import { } from 'node:test'; import winston from 'winston'; import { ContiguousDataSource } from '../types.js'; -import { BundleDataImporter } from './bundle-data-importer.js'; +import { DataImporter } from './data-importer.js'; class Ans104UnbundlerStub { async queueItem(): Promise { @@ -44,10 +44,10 @@ class Ans104UnbundlerStub { } } -describe('BundleDataImporter', () => { +describe('DataImporter', () => { let log: winston.Logger; - let bundleDataImporter: BundleDataImporter; - let bundleDataImporterWithFullQueue: BundleDataImporter; + let bundleDataImporter: DataImporter; + let bundleDataImporterWithFullQueue: DataImporter; let contiguousDataSource: ContiguousDataSource; let ans104Unbundler: any; let mockItem: any; @@ -76,14 +76,14 @@ describe('BundleDataImporter', () => { beforeEach(() => { ans104Unbundler = new Ans104UnbundlerStub(); - bundleDataImporter = new BundleDataImporter({ + bundleDataImporter = new DataImporter({ log, contiguousDataSource, ans104Unbundler, workerCount: 1, maxQueueSize: 1, }); - bundleDataImporterWithFullQueue = new BundleDataImporter({ + bundleDataImporterWithFullQueue = new DataImporter({ log, contiguousDataSource, ans104Unbundler, @@ -142,7 +142,7 @@ describe('BundleDataImporter', () => { describe('download', () => { it('should download and queue the item for unbundling', async () => { mock.method(ans104Unbundler, 'queueItem'); - bundleDataImporter = new BundleDataImporter({ + bundleDataImporter = new DataImporter({ log, contiguousDataSource, ans104Unbundler: ans104Unbundler, diff --git a/src/workers/bundle-data-importer.ts b/src/workers/data-importer.ts similarity index 67% rename from src/workers/bundle-data-importer.ts rename to src/workers/data-importer.ts index 69e80683..6123c153 100644 --- a/src/workers/bundle-data-importer.ts +++ b/src/workers/data-importer.ts @@ -31,25 +31,27 @@ interface IndexProperty { index: number; } +type AnyContiguousData = { id: string }; type UnbundleableItem = (NormalizedDataItem | PartialJsonTransaction) & IndexProperty; +type ImportableItem = AnyContiguousData | UnbundleableItem; -interface UnbundlingQueueItem { - item: UnbundleableItem; +interface DataImporterQueueItem { + item: ImportableItem; prioritized: boolean | undefined; bypassFilter: boolean; } -export class BundleDataImporter { +export class DataImporter { // Dependencies private log: winston.Logger; private contiguousDataSource: ContiguousDataSource; - private ans104Unbundler: Ans104Unbundler; + private ans104Unbundler: Ans104Unbundler | undefined; - // Unbundling queue + // Contiguous data queue private workerCount: number; private maxQueueSize: number; - private queue: queueAsPromised; + private queue: queueAsPromised; constructor({ log, @@ -60,13 +62,15 @@ export class BundleDataImporter { }: { log: winston.Logger; contiguousDataSource: ContiguousDataSource; - ans104Unbundler: Ans104Unbundler; + ans104Unbundler?: Ans104Unbundler; workerCount: number; maxQueueSize?: number; }) { this.log = log.child({ class: this.constructor.name }); this.contiguousDataSource = contiguousDataSource; - this.ans104Unbundler = ans104Unbundler; + if (ans104Unbundler) { + this.ans104Unbundler = ans104Unbundler; + } this.workerCount = workerCount; this.maxQueueSize = maxQueueSize; this.queue = fastq.promise( @@ -76,26 +80,26 @@ export class BundleDataImporter { } async queueItem( - item: UnbundleableItem, + item: ImportableItem, prioritized: boolean | undefined, bypassFilter = false, ): Promise { const log = this.log.child({ method: 'queueItem', id: item.id }); if (this.workerCount === 0) { - log.debug('Skipping bundle download, no workers.'); + log.debug('Skipping contiguous-data download, no workers.'); return; } if (prioritized === true) { - log.debug('Queueing prioritized bundle download...'); + log.debug('Queueing prioritized contiguous data download...'); this.queue.unshift({ item, prioritized, bypassFilter }); - log.debug('Prioritized bundle download queued.'); + log.debug('Prioritized contiguous data download queued.'); } else if (this.queue.length() < this.maxQueueSize) { - log.debug('Queueing bundle download...'); + log.debug('Queueing contiguous data download...'); this.queue.push({ item, prioritized, bypassFilter }); - log.debug('Bundle download queued.'); + log.debug('Contiguous data download queued.'); } else { - log.debug('Skipping bundle download, queue is full.'); + log.debug('Skipping contiguous data download, queue is full.'); } } @@ -103,20 +107,29 @@ export class BundleDataImporter { item, prioritized, bypassFilter, - }: UnbundlingQueueItem): Promise { + }: DataImporterQueueItem): Promise { const log = this.log.child({ method: 'download', id: item.id }); const data = await this.contiguousDataSource.getData({ id: item.id }); return new Promise((resolve, reject) => { data.stream.on('end', () => { - log.debug('Bundle data downloaded complete. Queuing for unbundling..'); - this.ans104Unbundler.queueItem(item, prioritized, bypassFilter); + const isUnbundleableItem = this.isUnbundleableItem(item); + if (this.ans104Unbundler && isUnbundleableItem) { + log.debug('Data download completed. Queuing for unbundling...'); + this.ans104Unbundler.queueItem(item, prioritized, bypassFilter); + } else { + log.debug( + isUnbundleableItem + ? 'Data download completed, skipping unbundling because unbundler is not available' + : 'Data download completed, marked as any contiguous tx/data-item, skipping unbundling', + ); + } resolve(); }); data.stream.on('error', (error) => { - log.error('Error downloading bundle data.', { + log.error('Error downloading data.', { message: error.message, stack: error.stack, }); @@ -141,4 +154,8 @@ export class BundleDataImporter { async isQueueFull(): Promise { return this.queue.length() >= this.maxQueueSize; } + + isUnbundleableItem(item: ImportableItem): item is UnbundleableItem { + return Object.keys(item).length > 1 && 'index' in item; + } } diff --git a/src/workers/data-verification.ts b/src/workers/data-verification.ts index e91cb497..84319e8d 100644 --- a/src/workers/data-verification.ts +++ b/src/workers/data-verification.ts @@ -19,6 +19,7 @@ import { default as fastq } from 'fastq'; import type { queueAsPromised } from 'fastq'; import * as winston from 'winston'; +import { DataImporter } from './data-importer.js'; import { ContiguousDataIndex, ContiguousDataSource } from '../types.js'; import { DataRootComputer } from '../lib/data-root.js'; import * as config from '../config.js'; @@ -28,6 +29,7 @@ export class DataVerificationWorker { private log: winston.Logger; private contiguousDataIndex: ContiguousDataIndex; private dataRootComputer: DataRootComputer; + private dataImporter: DataImporter | undefined; private workerCount: number; private queue: queueAsPromised; @@ -38,6 +40,7 @@ export class DataVerificationWorker { log, contiguousDataIndex, contiguousDataSource, + dataImporter, workerCount = config.BACKGROUND_DATA_VERIFICATION_WORKER_COUNT, streamTimeout = config.BACKGROUND_DATA_VERIFICATION_STREAM_TIMEOUT_MS, interval = config.BACKGROUND_DATA_VERIFICATION_INTERVAL_SECONDS * 1000, @@ -45,6 +48,7 @@ export class DataVerificationWorker { log: winston.Logger; contiguousDataIndex: ContiguousDataIndex; contiguousDataSource: ContiguousDataSource; + dataImporter?: DataImporter; workerCount?: number; streamTimeout?: number; interval?: number; @@ -63,6 +67,8 @@ export class DataVerificationWorker { workerCount, streamTimeout, }); + + this.dataImporter = dataImporter; } async start(): Promise { @@ -129,6 +135,13 @@ export class DataVerificationWorker { indexedDataRoot, computedDataRoot, }); + if (this.dataImporter) { + log.debug( + 'Because of data-root mismatch, we are queueing data item for reimport.', + { id }, + ); + await this.dataImporter.queueItem({ id }, true); + } return false; }