From 972f1013962e6d9f1343c452e8acf0be8c76cbab Mon Sep 17 00:00:00 2001 From: Karl Prieb Date: Fri, 17 Jan 2025 18:17:52 -0300 Subject: [PATCH] feat(bundles): add duplicated data item count --- ...1.17T21.06.52.bundles.add-duplicated_data_item_count.sql | 1 + ...1.17T21.06.52.bundles.add-duplicated_data_item_count.sql | 1 + src/database/sql/bundles/import.sql | 4 +++- src/database/standalone-sqlite.ts | 2 ++ src/lib/ans-104.ts | 6 ++++++ src/system.ts | 1 + src/types.d.ts | 1 + test/bundles-schema.sql | 2 +- 8 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 migrations/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql create mode 100644 migrations/down/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql diff --git a/migrations/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql b/migrations/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql new file mode 100644 index 00000000..d6c18801 --- /dev/null +++ b/migrations/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql @@ -0,0 +1 @@ +ALTER TABLE bundles ADD COLUMN dupliocated_data_item_count INTEGER; diff --git a/migrations/down/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql b/migrations/down/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql new file mode 100644 index 00000000..f91e764f --- /dev/null +++ b/migrations/down/2025.01.17T21.06.52.bundles.add-duplicated_data_item_count.sql @@ -0,0 +1 @@ +ALTER TABLE bundles DROP COLUMN dupliocated_data_item_count; diff --git a/src/database/sql/bundles/import.sql b/src/database/sql/bundles/import.sql index 98c65fa8..d516b055 100644 --- a/src/database/sql/bundles/import.sql +++ b/src/database/sql/bundles/import.sql @@ -7,7 +7,7 @@ INSERT INTO bundles ( first_skipped_at, last_skipped_at, first_unbundled_at, last_unbundled_at, first_fully_indexed_at, last_fully_indexed_at, - import_attempt_count + import_attempt_count, duplicated_data_item_count ) VALUES ( @id, @root_transaction_id, @format_id, @unbundle_filter_id, @index_filter_id, @@ -16,10 +16,12 @@ INSERT INTO bundles ( @skipped_at, @skipped_at, @unbundled_at, @unbundled_at, @fully_indexed_at, @fully_indexed_at, + @duplicated_data_item_count CASE WHEN @queued_at IS NOT NULL THEN 1 ELSE 0 END ) ON CONFLICT DO UPDATE SET data_item_count = IFNULL(@data_item_count, data_item_count), matched_data_item_count = IFNULL(@matched_data_item_count, matched_data_item_count), + duplicated_data_item_count = IFNULL(@duplicated_data_item_count, duplicated_data_item_count), unbundle_filter_id = IFNULL(@unbundle_filter_id, unbundle_filter_id), index_filter_id = IFNULL(@index_filter_id, index_filter_id), first_queued_at = IFNULL(first_queued_at, @queued_at), diff --git a/src/database/standalone-sqlite.ts b/src/database/standalone-sqlite.ts index 14ef296c..9d821fbe 100644 --- a/src/database/standalone-sqlite.ts +++ b/src/database/standalone-sqlite.ts @@ -918,6 +918,7 @@ export class StandaloneSqliteDatabaseWorker { indexFilter, dataItemCount, matchedDataItemCount, + duplicatedDataItemCount, queuedAt, skippedAt, unbundledAt, @@ -936,6 +937,7 @@ export class StandaloneSqliteDatabaseWorker { index_filter_id: this.getFilterId(indexFilter), data_item_count: dataItemCount, matched_data_item_count: matchedDataItemCount, + duplicated_data_item_count: duplicatedDataItemCount, queued_at: queuedAt, skipped_at: skippedAt, unbundled_at: unbundledAt, diff --git a/src/lib/ans-104.ts b/src/lib/ans-104.ts index a6d232e0..445f64bb 100644 --- a/src/lib/ans-104.ts +++ b/src/lib/ans-104.ts @@ -426,6 +426,7 @@ if (!isMainThread) { const iterable = await processBundleStream(stream); const bundleLength = iterable.length; let matchedItemCount = 0; + let duplicatedItemCount = 0; const fnLog = log.child({ rootTxId, parentId, bundleLength }); fnLog.info('Unbundling ANS-104 bundle stream data items...'); @@ -445,13 +446,17 @@ if (!isMainThread) { if (processedDataItemIds.has(dataItem.id)) { diLog.warn('Skipping duplicate data item ID.'); + duplicatedItemCount++; continue; } if (!dataItem.dataOffset) { diLog.warn('Skipping data item with missing data offset.'); + continue; } + processedDataItemIds.add(dataItem.id); + // compute the hash of the data item data const dataItemHash = await hashDataItemData( bundlePath, @@ -485,6 +490,7 @@ if (!isMainThread) { parentId: parentId as string, itemCount: bundleLength, matchedItemCount, + duplicatedItemCount, }); } catch (error: any) { log.error('Error unbundling ANS-104 bundle stream', { diff --git a/src/system.ts b/src/system.ts index 2003619b..7f48cb17 100644 --- a/src/system.ts +++ b/src/system.ts @@ -579,6 +579,7 @@ eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => { format: 'ans-104', dataItemCount: bundleEvent.itemCount, matchedDataItemCount: bundleEvent.matchedItemCount, + duplicatedDataItemCount: bundleEvent.duplicatedItemCount, unbundledAt: currentUnixTimestamp(), }); } catch (error: any) { diff --git a/src/types.d.ts b/src/types.d.ts index d1277a7e..22f8f5d0 100644 --- a/src/types.d.ts +++ b/src/types.d.ts @@ -211,6 +211,7 @@ export interface BundleRecord { indexFilter?: string; dataItemCount?: number; matchedDataItemCount?: number; + duplicatedDataItemCount?: number; queuedAt?: number; skippedAt?: number; unbundledAt?: number; diff --git a/test/bundles-schema.sql b/test/bundles-schema.sql index b7cc6754..553c1b40 100644 --- a/test/bundles-schema.sql +++ b/test/bundles-schema.sql @@ -54,7 +54,7 @@ CREATE TABLE bundles ( last_unbundled_at INTEGER, first_fully_indexed_at INTEGER, last_fully_indexed_at INTEGER -, root_transaction_id BLOB, import_attempt_count INTEGER); +, root_transaction_id BLOB, import_attempt_count INTEGER, dupliocated_data_item_count INTEGER); CREATE INDEX bundles_format_id_idx ON bundles (format_id); CREATE INDEX bundles_last_queued_at_idx ON bundles (last_queued_at);