Skip to content

Commit

Permalink
feat(bundles): add duplicated data item count
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Jan 17, 2025
1 parent 27f0ee3 commit 972f101
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE bundles ADD COLUMN dupliocated_data_item_count INTEGER;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE bundles DROP COLUMN dupliocated_data_item_count;
4 changes: 3 additions & 1 deletion src/database/sql/bundles/import.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ INSERT INTO bundles (
first_skipped_at, last_skipped_at,
first_unbundled_at, last_unbundled_at,
first_fully_indexed_at, last_fully_indexed_at,
import_attempt_count
import_attempt_count, duplicated_data_item_count
) VALUES (
@id, @root_transaction_id, @format_id,
@unbundle_filter_id, @index_filter_id,
Expand All @@ -16,10 +16,12 @@ INSERT INTO bundles (
@skipped_at, @skipped_at,
@unbundled_at, @unbundled_at,
@fully_indexed_at, @fully_indexed_at,
@duplicated_data_item_count
CASE WHEN @queued_at IS NOT NULL THEN 1 ELSE 0 END
) ON CONFLICT DO UPDATE SET
data_item_count = IFNULL(@data_item_count, data_item_count),
matched_data_item_count = IFNULL(@matched_data_item_count, matched_data_item_count),
duplicated_data_item_count = IFNULL(@duplicated_data_item_count, duplicated_data_item_count),
unbundle_filter_id = IFNULL(@unbundle_filter_id, unbundle_filter_id),
index_filter_id = IFNULL(@index_filter_id, index_filter_id),
first_queued_at = IFNULL(first_queued_at, @queued_at),
Expand Down
2 changes: 2 additions & 0 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ export class StandaloneSqliteDatabaseWorker {
indexFilter,
dataItemCount,
matchedDataItemCount,
duplicatedDataItemCount,
queuedAt,
skippedAt,
unbundledAt,
Expand All @@ -936,6 +937,7 @@ export class StandaloneSqliteDatabaseWorker {
index_filter_id: this.getFilterId(indexFilter),
data_item_count: dataItemCount,
matched_data_item_count: matchedDataItemCount,
duplicated_data_item_count: duplicatedDataItemCount,
queued_at: queuedAt,
skipped_at: skippedAt,
unbundled_at: unbundledAt,
Expand Down
6 changes: 6 additions & 0 deletions src/lib/ans-104.ts
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ if (!isMainThread) {
const iterable = await processBundleStream(stream);
const bundleLength = iterable.length;
let matchedItemCount = 0;
let duplicatedItemCount = 0;

const fnLog = log.child({ rootTxId, parentId, bundleLength });
fnLog.info('Unbundling ANS-104 bundle stream data items...');
Expand All @@ -445,13 +446,17 @@ if (!isMainThread) {

if (processedDataItemIds.has(dataItem.id)) {
diLog.warn('Skipping duplicate data item ID.');
duplicatedItemCount++;
continue;
}

if (!dataItem.dataOffset) {
diLog.warn('Skipping data item with missing data offset.');
continue;
}

processedDataItemIds.add(dataItem.id);

// compute the hash of the data item data
const dataItemHash = await hashDataItemData(
bundlePath,
Expand Down Expand Up @@ -485,6 +490,7 @@ if (!isMainThread) {
parentId: parentId as string,
itemCount: bundleLength,
matchedItemCount,
duplicatedItemCount,
});
} catch (error: any) {
log.error('Error unbundling ANS-104 bundle stream', {
Expand Down
1 change: 1 addition & 0 deletions src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ eventEmitter.on(events.ANS104_UNBUNDLE_COMPLETE, async (bundleEvent: any) => {
format: 'ans-104',
dataItemCount: bundleEvent.itemCount,
matchedDataItemCount: bundleEvent.matchedItemCount,
duplicatedDataItemCount: bundleEvent.duplicatedItemCount,
unbundledAt: currentUnixTimestamp(),
});
} catch (error: any) {
Expand Down
1 change: 1 addition & 0 deletions src/types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ export interface BundleRecord {
indexFilter?: string;
dataItemCount?: number;
matchedDataItemCount?: number;
duplicatedDataItemCount?: number;
queuedAt?: number;
skippedAt?: number;
unbundledAt?: number;
Expand Down
2 changes: 1 addition & 1 deletion test/bundles-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ CREATE TABLE bundles (
last_unbundled_at INTEGER,
first_fully_indexed_at INTEGER,
last_fully_indexed_at INTEGER
, root_transaction_id BLOB, import_attempt_count INTEGER);
, root_transaction_id BLOB, import_attempt_count INTEGER, dupliocated_data_item_count INTEGER);
CREATE INDEX bundles_format_id_idx ON bundles (format_id);
CREATE INDEX bundles_last_queued_at_idx
ON bundles (last_queued_at);
Expand Down

0 comments on commit 972f101

Please sign in to comment.