Skip to content

Commit

Permalink
Add support for ingesting older iceberg snapshots (apache#15348)
Browse files Browse the repository at this point in the history
This patch introduces a param snapshotTime in the iceberg inputsource spec that allows the user to ingest data files associated with the most recent snapshot as of the given time. This helps the user ingest data based on older snapshots by specifying the associated snapshot time.
This patch also upgrades the iceberg core version to 1.4.1
  • Loading branch information
a2l007 authored Nov 17, 2023
1 parent 7b5790c commit a291478
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 33 deletions.
12 changes: 7 additions & 5 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,8 @@ The following is a sample spec for a HDFS warehouse source:
},
"warehouseSource": {
"type": "hdfs"
}
},
"snapshotTime": "2023-06-01T00:00:00.000Z",
},
"inputFormat": {
"type": "parquet"
Expand Down Expand Up @@ -937,10 +938,11 @@ The following is a sample spec for a S3 warehouse source:
|--------|-----------|---------|
|type|Set the value to `iceberg`.|yes|
|tableName|The Iceberg table name configured in the catalog.|yes|
|namespace|The Iceberg namespace associated with the table|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse|yes|
|namespace|The Iceberg namespace associated with the table.|yes|
|icebergFilter|The JSON Object that filters data files within a snapshot.|no|
|icebergCatalog|The JSON Object used to define the catalog that manages the configured Iceberg table.|yes|
|warehouseSource|The JSON Object that defines the native input source for reading the data files from the warehouse.|yes|
|snapshotTime|Timestamp in ISO8601 DateTime format that will be used to fetch the most recent snapshot as of this time.|no|

###Catalog Object

Expand Down
2 changes: 1 addition & 1 deletion extensions-contrib/druid-iceberg-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<iceberg.core.version>1.4.0</iceberg.core.version>
<iceberg.core.version>1.4.1</iceberg.core.version>
<hive.version>3.1.3</hive.version>
</properties>
<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.joda.time.DateTime;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -54,12 +55,15 @@ public abstract class IcebergCatalog
*
* @param tableNamespace The catalog namespace under which the table is defined
* @param tableName The iceberg table name
* @param icebergFilter The iceberg filter that needs to be applied before reading the files
* @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time
* @return a list of data file paths
*/
public List<String> extractSnapshotDataFiles(
String tableNamespace,
String tableName,
IcebergFilter icebergFilter
IcebergFilter icebergFilter,
DateTime snapshotTime
)
{
Catalog catalog = retrieveCatalog();
Expand All @@ -85,7 +89,9 @@ public List<String> extractSnapshotDataFiles(
if (icebergFilter != null) {
tableScan = icebergFilter.filter(tableScan);
}

if (snapshotTime != null) {
tableScan = tableScan.asOfTime(snapshotTime.getMillis());
}
CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
CloseableIterable.transform(tasks, FileScanTask::file)
.forEach(dataFile -> dataFilePaths.add(dataFile.path().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.druid.iceberg.filter.IcebergFilter;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.File;
Expand Down Expand Up @@ -68,6 +69,9 @@ public class IcebergInputSource implements SplittableInputSource<List<String>>
@JsonProperty
private InputSourceFactory warehouseSource;

@JsonProperty
private final DateTime snapshotTime;

private boolean isLoaded = false;

private SplittableInputSource delegateInputSource;
Expand All @@ -78,14 +82,16 @@ public IcebergInputSource(
@JsonProperty("namespace") String namespace,
@JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
@JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource
@JsonProperty("warehouseSource") InputSourceFactory warehouseSource,
@JsonProperty("snapshotTime") @Nullable DateTime snapshotTime
)
{
this.tableName = Preconditions.checkNotNull(tableName, "tableName cannot be null");
this.namespace = Preconditions.checkNotNull(namespace, "namespace cannot be null");
this.icebergCatalog = Preconditions.checkNotNull(icebergCatalog, "icebergCatalog cannot be null");
this.icebergFilter = icebergFilter;
this.warehouseSource = Preconditions.checkNotNull(warehouseSource, "warehouseSource cannot be null");
this.snapshotTime = snapshotTime;
}

@Override
Expand Down Expand Up @@ -164,6 +170,13 @@ public IcebergFilter getIcebergFilter()
return icebergFilter;
}

@Nullable
@JsonProperty
public DateTime getSnapshotTime()
{
return snapshotTime;
}

public SplittableInputSource getDelegateInputSource()
{
return delegateInputSource;
Expand All @@ -174,7 +187,8 @@ protected void retrieveIcebergDatafiles()
List<String> snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles(
getNamespace(),
getTableName(),
getIcebergFilter()
getIcebergFilter(),
getSnapshotTime()
);
if (snapshotDataFiles.isEmpty()) {
delegateInputSource = new EmptyInputSource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.LocalInputSourceFactory;
import org.apache.druid.iceberg.filter.IcebergEqualsFilter;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
Expand All @@ -43,7 +44,9 @@
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -62,32 +65,38 @@ public class IcebergInputSourceTest
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

IcebergCatalog testCatalog;
private IcebergCatalog testCatalog;
private TableIdentifier tableIdentifier;

Schema tableSchema = new Schema(
private Schema tableSchema = new Schema(
Types.NestedField.required(1, "id", Types.StringType.get()),
Types.NestedField.required(2, "name", Types.StringType.get())
);
Map<String, Object> tableData = ImmutableMap.of("id", "123988", "name", "Foo");
private Map<String, Object> tableData = ImmutableMap.of("id", "123988", "name", "Foo");

private static final String NAMESPACE = "default";
private static final String TABLENAME = "foosTable";

@Test
public void testInputSource() throws IOException
@Before
public void setup() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);
tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);
}

@Test
public void testInputSource() throws IOException
{
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
Expand All @@ -111,45 +120,33 @@ public void testInputSource() throws IOException
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
dropTableFromCatalog(tableIdentifier);
}

@Test
public void testInputSourceWithEmptySource() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);

IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "0000"),
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(0, splits.count());
dropTableFromCatalog(tableIdentifier);
}

@Test
public void testInputSourceWithFilter() throws IOException
{
final File warehouseDir = FileUtils.createTempDir();
testCatalog = new LocalCatalog(warehouseDir.getPath(), new HashMap<>());
TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(NAMESPACE), TABLENAME);

createAndLoadTable(tableIdentifier);

IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
new IcebergEqualsFilter("id", "123988"),
testCatalog,
new LocalInputSourceFactory()
new LocalInputSourceFactory(),
null
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
List<File> localInputSourceList = splits.map(inputSource::withSplit)
Expand All @@ -173,6 +170,26 @@ public void testInputSourceWithFilter() throws IOException
Assert.assertEquals(tableData.get("id"), record.get(0));
Assert.assertEquals(tableData.get("name"), record.get(1));
}
}

@Test
public void testInputSourceReadFromLatestSnapshot() throws IOException
{
IcebergInputSource inputSource = new IcebergInputSource(
TABLENAME,
NAMESPACE,
null,
testCatalog,
new LocalInputSourceFactory(),
DateTimes.nowUtc()
);
Stream<InputSplit<List<String>>> splits = inputSource.createSplits(null, new MaxSizeSplitHintSpec(null, null));
Assert.assertEquals(1, splits.count());
}

@After
public void tearDown()
{
dropTableFromCatalog(tableIdentifier);
}

Expand Down
1 change: 1 addition & 0 deletions website/.spelling
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ numShards
IngestSegment
IngestSegmentFirehose
maxSizes
snapshotTime
windowPeriod
2012-01-01T00
2012-01-03T00
Expand Down

0 comments on commit a291478

Please sign in to comment.