diff --git a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java index 665c1b50f..661fd760c 100644 --- a/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java +++ b/apps/spark/src/main/java/com/linkedin/openhouse/jobs/util/TableStatsCollectorUtil.java @@ -4,7 +4,12 @@ import com.linkedin.openhouse.common.stats.model.IcebergTableStats; import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.fs.FileSystem; @@ -114,6 +119,9 @@ protected static IcebergTableStats populateStatsForSnapshots( .min(Long::compareTo) .orElse(null); + Map snapshotCountByDay = + getSnapShotDistributionPerDay(table, spark, MetadataTableType.SNAPSHOTS); + return stats .toBuilder() .currentSnapshotId(currentSnapshotId) @@ -121,9 +129,32 @@ protected static IcebergTableStats populateStatsForSnapshots( .oldestSnapshotTimestamp(oldestSnapshotTimestamp) .numCurrentSnapshotReferencedDataFiles(countOfDataFiles) .totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes) + .snapshotCountByDay(snapshotCountByDay) .build(); } + /** Get snapshot distribution for a given table by date. */ + private static Map getSnapShotDistributionPerDay( + Table table, SparkSession spark, MetadataTableType metadataTableType) { + Dataset snapShotDistribution = + SparkTableUtil.loadMetadataTable(spark, table, metadataTableType) + .selectExpr(new String[] {"snapshot_id", "committed_at"}) + .dropDuplicates("snapshot_id", "committed_at"); + + Map snapshotCountByDay = + snapShotDistribution.collectAsList().stream() + .collect( + Collectors.toMap( + row -> { + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); + return formatter.format(new Date(row.getTimestamp(1).getTime())); + }, + row -> 1L, + Long::sum, + LinkedHashMap::new)); + return snapshotCountByDay; + } + /** * Collect storage stats for a given fully-qualified table name. * diff --git a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java index 7c9be6906..644128415 100644 --- a/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java +++ b/apps/spark/src/test/java/com/linkedin/openhouse/jobs/spark/OperationsTest.java @@ -9,9 +9,12 @@ import com.linkedin.openhouse.tables.client.model.Retention; import com.linkedin.openhouse.tablestest.OpenHouseSparkITest; import io.opentelemetry.api.metrics.Meter; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; @@ -566,6 +569,18 @@ public void testCollectTableStats() throws Exception { + stats.getNumExistingMetadataJsonFiles() + stats.getNumReferencedManifestFiles() + stats.getNumReferencedManifestLists()); + AtomicLong snapShotCount = new AtomicLong(); + + table + .snapshots() + .forEach( + snapshot -> { + snapShotCount.getAndIncrement(); + }); + + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd"); + Assertions.assertEquals( + stats.getSnapshotCountByDay().get(formatter.format(new Date())), snapShotCount.get()); } } diff --git a/services/common/src/main/java/com/linkedin/openhouse/common/stats/model/IcebergTableStats.java b/services/common/src/main/java/com/linkedin/openhouse/common/stats/model/IcebergTableStats.java index 8ef896c1e..e76f6738a 100644 --- a/services/common/src/main/java/com/linkedin/openhouse/common/stats/model/IcebergTableStats.java +++ b/services/common/src/main/java/com/linkedin/openhouse/common/stats/model/IcebergTableStats.java @@ -1,5 +1,6 @@ package com.linkedin.openhouse.common.stats.model; +import java.util.Map; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -35,4 +36,6 @@ public class IcebergTableStats extends BaseTableMetadata { private Long numReferencedManifestFiles; private Long numReferencedManifestLists; + + private Map snapshotCountByDay; }