Skip to content

Commit

Permalink
Add day wise snapshots details
Browse files Browse the repository at this point in the history
  • Loading branch information
maluchari committed Jul 30, 2024
1 parent b65b434 commit c3c15c5
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@

import com.linkedin.openhouse.common.stats.model.IcebergTableStats;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -114,16 +119,42 @@ protected static IcebergTableStats populateStatsForSnapshots(
.min(Long::compareTo)
.orElse(null);

Map<String, Long> snapshotCountByDay =
getSnapShotDistributionPerDay(table, spark, MetadataTableType.SNAPSHOTS);

return stats
.toBuilder()
.currentSnapshotId(currentSnapshotId)
.currentSnapshotTimestamp(currentSnapshotTimestamp)
.oldestSnapshotTimestamp(oldestSnapshotTimestamp)
.numCurrentSnapshotReferencedDataFiles(countOfDataFiles)
.totalCurrentSnapshotReferencedDataFilesSizeInBytes(sumOfFileSizeBytes)
.snapshotCountByDay(snapshotCountByDay)
.build();
}

/** Get snapshot distribution for a given table by date. */
private static Map<String, Long> getSnapShotDistributionPerDay(
Table table, SparkSession spark, MetadataTableType metadataTableType) {
Dataset<Row> snapShotDistribution =
SparkTableUtil.loadMetadataTable(spark, table, metadataTableType)
.selectExpr(new String[] {"snapshot_id", "committed_at"})
.dropDuplicates("snapshot_id", "committed_at");

Map<String, Long> snapshotCountByDay =
snapShotDistribution.collectAsList().stream()
.collect(
Collectors.toMap(
row -> {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
return formatter.format(new Date(row.getTimestamp(1).getTime()));
},
row -> 1L,
Long::sum,
LinkedHashMap::new));
return snapshotCountByDay;
}

/**
* Collect storage stats for a given fully-qualified table name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import com.linkedin.openhouse.tables.client.model.Retention;
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import io.opentelemetry.api.metrics.Meter;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -566,6 +569,18 @@ public void testCollectTableStats() throws Exception {
+ stats.getNumExistingMetadataJsonFiles()
+ stats.getNumReferencedManifestFiles()
+ stats.getNumReferencedManifestLists());
AtomicLong snapShotCount = new AtomicLong();

table
.snapshots()
.forEach(
snapshot -> {
snapShotCount.getAndIncrement();
});

SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd");
Assertions.assertEquals(
stats.getSnapshotCountByDay().get(formatter.format(new Date())), snapShotCount.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.openhouse.common.stats.model;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand Down Expand Up @@ -35,4 +36,6 @@ public class IcebergTableStats extends BaseTableMetadata {
private Long numReferencedManifestFiles;

private Long numReferencedManifestLists;

private Map<String, Long> snapshotCountByDay;
}

0 comments on commit c3c15c5

Please sign in to comment.