From a2914789d735b970bed32bc62cbf756661c103ad Mon Sep 17 00:00:00 2001 From: Atul Mohan Date: Fri, 17 Nov 2023 12:32:28 +0530 Subject: [PATCH] Add support for ingesting older iceberg snapshots (#15348) This patch introduces a param snapshotTime in the iceberg inputsource spec that allows the user to ingest data files associated with the most recent snapshot as of the given time. This helps the user ingest data based on older snapshots by specifying the associated snapshot time. This patch also upgrades the iceberg core version to 1.4.1 --- docs/ingestion/input-sources.md | 12 ++-- .../druid-iceberg-extensions/pom.xml | 2 +- .../druid/iceberg/input/IcebergCatalog.java | 10 ++- .../iceberg/input/IcebergInputSource.java | 18 +++++- .../iceberg/input/IcebergInputSourceTest.java | 63 ++++++++++++------- website/.spelling | 1 + 6 files changed, 73 insertions(+), 33 deletions(-) diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index e10b1c72474a..3e11734ee55f 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -858,7 +858,8 @@ The following is a sample spec for a HDFS warehouse source: }, "warehouseSource": { "type": "hdfs" - } + }, + "snapshotTime": "2023-06-01T00:00:00.000Z", }, "inputFormat": { "type": "parquet" @@ -937,10 +938,11 @@ The following is a sample spec for a S3 warehouse source: |--------|-----------|---------| |type|Set the value to `iceberg`.|yes| |tableName|The Iceberg table name configured in the catalog.|yes| -|namespace|The Iceberg namespace associated with the table|yes| -|icebergFilter|The JSON Object that filters data files within a snapshot|no| -|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes| -|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes| +|namespace|The Iceberg namespace associated with the table.|yes| +|icebergFilter|The JSON Object that filters data files within a snapshot.|no| +|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table.|yes| +|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse.|yes| +|snapshotTime|Timestamp in ISO8601 DateTime format that will be used to fetch the most recent snapshot as of this time.|no| ###Catalog Object diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index 228ab952edbb..577d56978bc4 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -35,7 +35,7 @@ 4.0.0 - 1.4.0 + 1.4.1 3.1.3 diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index e62ab830a6ad..07b41b6e10b1 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -32,6 +32,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.io.CloseableIterable; +import org.joda.time.DateTime; import java.util.ArrayList; import java.util.List; @@ -54,12 +55,15 @@ public abstract class IcebergCatalog * * @param tableNamespace The catalog namespace under which the table is defined * @param tableName The iceberg table name + * @param icebergFilter The iceberg filter that needs to be applied before reading the files + * @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time * @return a list of data file paths */ public List extractSnapshotDataFiles( String tableNamespace, String tableName, - IcebergFilter icebergFilter + IcebergFilter icebergFilter, + DateTime snapshotTime ) { Catalog catalog = retrieveCatalog(); @@ -85,7 +89,9 @@ public List extractSnapshotDataFiles( if (icebergFilter != null) { tableScan = icebergFilter.filter(tableScan); } - + if (snapshotTime != null) { + tableScan = tableScan.asOfTime(snapshotTime.getMillis()); + } CloseableIterable tasks = tableScan.planFiles(); CloseableIterable.transform(tasks, FileScanTask::file) .forEach(dataFile -> dataFilePaths.add(dataFile.path().toString())); diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index 1d7fc689b238..44df3e318611 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -36,6 +36,7 @@ import org.apache.druid.iceberg.filter.IcebergFilter; import org.apache.druid.java.util.common.CloseableIterators; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.File; @@ -68,6 +69,9 @@ public class IcebergInputSource implements SplittableInputSource> @JsonProperty private InputSourceFactory warehouseSource; + @JsonProperty + private final DateTime snapshotTime; + private boolean isLoaded = false; private SplittableInputSource delegateInputSource; @@ -78,7 +82,8 @@ public IcebergInputSource( @JsonProperty("namespace") String namespace, @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, - @JsonProperty("warehouseSource") InputSourceFactory warehouseSource + @JsonProperty("warehouseSource") InputSourceFactory warehouseSource, + @JsonProperty("snapshotTime") @Nullable DateTime snapshotTime ) { this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null"); @@ -86,6 +91,7 @@ public IcebergInputSource( this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null"); this.icebergFilter = icebergFilter; this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null"); + this.snapshotTime = snapshotTime; } @Override @@ -164,6 +170,13 @@ public IcebergFilter getIcebergFilter() return icebergFilter; } + @Nullable + @JsonProperty + public DateTime getSnapshotTime() + { + return snapshotTime; + } + public SplittableInputSource getDelegateInputSource() { return delegateInputSource; @@ -174,7 +187,8 @@ protected void retrieveIcebergDatafiles() List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( getNamespace(), getTableName(), - getIcebergFilter() + getIcebergFilter(), + getSnapshotTime() ); if (snapshotDataFiles.isEmpty()) { delegateInputSource = new EmptyInputSource(); diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index ec6d936d14bd..668a34b352b1 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSourceFactory; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; +import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; @@ -43,7 +44,9 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; +import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -62,32 +65,38 @@ public class IcebergInputSourceTest @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - IcebergCatalog testCatalog; + private IcebergCatalog testCatalog; + private TableIdentifier tableIdentifier; - Schema tableSchema = new Schema( + private Schema tableSchema = new Schema( Types.NestedField.required(1, "id", Types.StringType.get()), Types.NestedField.required(2, "name", Types.StringType.get()) ); - Map tableData = ImmutableMap.of("id", "123988", "name", "Foo"); + private Map tableData = ImmutableMap.of("id", "123988", "name", "Foo"); private static final String NAMESPACE = "default"; private static final String TABLENAME = "foosTable"; - @Test - public void testInputSource() throws IOException + @Before + public void setup() throws IOException { final File warehouseDir = FileUtils.createTempDir(); testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); - TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); + tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); createAndLoadTable(tableIdentifier); + } + @Test + public void testInputSource() throws IOException + { IcebergInputSource inputSource = new IcebergInputSource( TABLENAME, NAMESPACE, null, testCatalog, - new LocalInputSourceFactory() + new LocalInputSourceFactory(), + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) @@ -111,45 +120,33 @@ public void testInputSource() throws IOException Assert.assertEquals(tableData.get("id"), record.get(0)); Assert.assertEquals(tableData.get("name"), record.get(1)); } - dropTableFromCatalog(tableIdentifier); } @Test public void testInputSourceWithEmptySource() throws IOException { - final File warehouseDir = FileUtils.createTempDir(); - testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); - TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); - - createAndLoadTable(tableIdentifier); - IcebergInputSource inputSource = new IcebergInputSource( TABLENAME, NAMESPACE, new IcebergEqualsFilter("id", "0000"), testCatalog, - new LocalInputSourceFactory() + new LocalInputSourceFactory(), + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); Assert.assertEquals(0, splits.count()); - dropTableFromCatalog(tableIdentifier); } @Test public void testInputSourceWithFilter() throws IOException { - final File warehouseDir = FileUtils.createTempDir(); - testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>()); - TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME); - - createAndLoadTable(tableIdentifier); - IcebergInputSource inputSource = new IcebergInputSource( TABLENAME, NAMESPACE, new IcebergEqualsFilter("id", "123988"), testCatalog, - new LocalInputSourceFactory() + new LocalInputSourceFactory(), + null ); Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); List localInputSourceList = splits.map(inputSource::withSplit) @@ -173,6 +170,26 @@ public void testInputSourceWithFilter() throws IOException Assert.assertEquals(tableData.get("id"), record.get(0)); Assert.assertEquals(tableData.get("name"), record.get(1)); } + } + + @Test + public void testInputSourceReadFromLatestSnapshot() throws IOException + { + IcebergInputSource inputSource = new IcebergInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + DateTimes.nowUtc() + ); + Stream>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null)); + Assert.assertEquals(1, splits.count()); + } + + @After + public void tearDown() + { dropTableFromCatalog(tableIdentifier); } diff --git a/website/.spelling b/website/.spelling index ec714615199b..db7e9961f29e 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1281,6 +1281,7 @@ numShards IngestSegment IngestSegmentFirehose maxSizes +snapshotTime windowPeriod 2012-01-01T00 2012-01-03T00