Skip to content

Commit

Permalink
Support for reading Delta Lake table snapshots (apache#17004)
Browse files Browse the repository at this point in the history
Problem
Currently, the delta input source only supports reading from the latest snapshot of the given Delta Lake table. This is a known documented limitation.

Description
Add support for reading Delta snapshot. By default, the Druid-Delta connector reads the latest snapshot of the Delta table in order to preserve compatibility. Users can specify a snapshotVersion to ingest change data events from Delta tables into Druid.

In the future, we can also add support for time-based snapshot reads. The Delta API to read time-based snapshots is not clear currently.
  • Loading branch information
abhishekrb19 authored Sep 9, 2024
1 parent 51fe3c0 commit aa833a7
Show file tree
Hide file tree
Showing 31 changed files with 363 additions and 56 deletions.
7 changes: 1 addition & 6 deletions docs/development/extensions-contrib/delta-lake.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,4 @@ java \
-c "org.apache.druid.extensions.contrib:druid-deltalake-extensions:<VERSION>"
```

See [Loading community extensions](../../configuration/extensions.md#loading-community-extensions) for more information.

## Known limitations

This extension relies on the Delta Kernel API and can only read from the latest Delta table snapshot. Ability to read from
arbitrary snapshots is tracked [here](https://github.com/delta-io/delta/issues/2581).
See [Loading community extensions](../../configuration/extensions.md#loading-community-extensions) for more information.
19 changes: 11 additions & 8 deletions docs/ingestion/input-sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -1147,11 +1147,12 @@ To use the Delta Lake input source, load the extension [`druid-deltalake-extensi
You can use the Delta input source to read data stored in a Delta Lake table. For a given table, the input source scans
the latest snapshot from the configured table. Druid ingests the underlying delta files from the table.

| Property|Description|Required|
|---------|-----------|--------|
| type|Set this value to `delta`.|yes|
| tablePath|The location of the Delta table.|yes|
| filter|The JSON Object that filters data files within a snapshot.|no|
| Property|Description| Default|Required |
|---------|-----------|-----------------|
|type|Set this value to `delta`.| None|yes|
|tablePath|The location of the Delta table.|None|yes|
|filter|The JSON Object that filters data files within a snapshot.|None|no|
|snapshotVersion|The snapshot version to read from the Delta table. An integer value must be specified.|Latest|no|

### Delta filter object

Expand Down Expand Up @@ -1224,7 +1225,7 @@ filters on partitioned columns.
| column | The table column to apply the filter on. | yes |
| value | The value to use in the filter. | yes |

The following is a sample spec to read all records from the Delta table `/delta-table/foo`:
The following is a sample spec to read all records from the latest snapshot from Delta table `/delta-table/foo`:

```json
...
Expand All @@ -1237,7 +1238,8 @@ The following is a sample spec to read all records from the Delta table `/delta-
}
```

The following is a sample spec to read records from the Delta table `/delta-table/foo` to select records where `name = 'Employee4' and age >= 30`:
The following is a sample spec to read records from the Delta table `/delta-table/foo` snapshot version `3` to select records where
`name = 'Employee4' and age >= 30`:

```json
...
Expand All @@ -1260,7 +1262,8 @@ The following is a sample spec to read records from the Delta table `/delta-tabl
"value": "30"
}
]
}
},
"snapshotVersion": 3
},
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@
import java.util.stream.Stream;

/**
* Input source to ingest data from a Delta Lake. This input source reads the latest snapshot from a Delta table
* specified by {@code tablePath} parameter. If {@code filter} is specified, it's used at the Kernel level
* for data pruning. The filtering behavior is as follows:
* Input source to ingest data from a Delta Lake. This input source reads the given {@code snapshotVersion} from a Delta
* table specified by {@code tablePath} parameter, or the latest snapshot if it's not specified.
* If {@code filter} is specified, it's used at the Kernel level for data pruning. The filtering behavior is as follows:
* <ul>
* <li> When a filter is applied on a partitioned table using the partitioning columns, the filtering is guaranteed. </li>
* <li> When a filter is applied on non-partitioned columns, the filtering is best-effort as the Delta
Expand All @@ -79,7 +79,6 @@
* <p>
* We leverage the Delta Kernel APIs to interact with a Delta table. The Kernel API abstracts away the
* complexities of the Delta protocol itself.
* Note: currently, the Kernel table API only supports reading from the latest snapshot.
* </p>
*/
public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
Expand All @@ -97,11 +96,15 @@ public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
@Nullable
private final DeltaFilter filter;

@JsonProperty
private final Long snapshotVersion;

@JsonCreator
public DeltaInputSource(
@JsonProperty("tablePath") final String tablePath,
@JsonProperty("deltaSplit") @Nullable final DeltaSplit deltaSplit,
@JsonProperty("filter") @Nullable final DeltaFilter filter
@JsonProperty("filter") @Nullable final DeltaFilter filter,
@JsonProperty("snapshotVersion") @Nullable final Long snapshotVersion
)
{
if (tablePath == null) {
Expand All @@ -110,6 +113,7 @@ public DeltaInputSource(
this.tablePath = tablePath;
this.deltaSplit = deltaSplit;
this.filter = filter;
this.snapshotVersion = snapshotVersion;
}

@Override
Expand Down Expand Up @@ -152,15 +156,15 @@ public InputSourceReader reader(
}
} else {
final Table table = Table.forPath(engine, tablePath);
final Snapshot latestSnapshot = getLatestSnapshotForTable(table, engine);
final Snapshot snapshot = getSnapshotForTable(table, engine);

final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType fullSnapshotSchema = snapshot.getSchema(engine);
final StructType prunedSchema = pruneSchema(
fullSnapshotSchema,
inputRowSchema.getColumnsFilter()
);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
Expand Down Expand Up @@ -206,17 +210,17 @@ public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nul
}

final Engine engine = createDeltaEngine();
final Snapshot latestSnapshot;
final Snapshot snapshot;
final Table table = Table.forPath(engine, tablePath);
try {
latestSnapshot = getLatestSnapshotForTable(table, engine);
snapshot = getSnapshotForTable(table, engine);
}
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
}
final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType fullSnapshotSchema = snapshot.getSchema(engine);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
Expand Down Expand Up @@ -254,7 +258,8 @@ public InputSource withSplit(InputSplit<DeltaSplit> split)
return new DeltaInputSource(
tablePath,
split.get(),
filter
filter,
snapshotVersion
);
}

Expand Down Expand Up @@ -333,15 +338,19 @@ private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
);
}

private Snapshot getLatestSnapshotForTable(final Table table, final Engine engine)
private Snapshot getSnapshotForTable(final Table table, final Engine engine)
{
// Setting the LogStore class loader before calling the Delta Kernel snapshot API is required as a workaround with
// the 3.2.0 Delta Kernel because the Kernel library cannot instantiate the LogStore class otherwise. Please see
// https://github.com/delta-io/delta/issues/3299 for details. This workaround can be removed once the issue is fixed.
final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(LogStore.class.getClassLoader());
return table.getLatestSnapshot(engine);
if (snapshotVersion != null) {
return table.getSnapshotAsOfVersion(engine, snapshotVersion);
} else {
return table.getLatestSnapshot(engine);
}
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
Expand All @@ -359,4 +368,10 @@ DeltaFilter getFilter()
{
return filter;
}

@VisibleForTesting
Long getSnapshotVersion()
{
return snapshotVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public static Collection<Object[]> data()
Object[][] data = new Object[][]{
{NonPartitionedDeltaTable.DELTA_TABLE_PATH, NonPartitionedDeltaTable.FULL_SCHEMA, NonPartitionedDeltaTable.DIMENSIONS, NonPartitionedDeltaTable.EXPECTED_ROWS},
{PartitionedDeltaTable.DELTA_TABLE_PATH, PartitionedDeltaTable.FULL_SCHEMA, PartitionedDeltaTable.DIMENSIONS, PartitionedDeltaTable.EXPECTED_ROWS},
{ComplexTypesDeltaTable.DELTA_TABLE_PATH, ComplexTypesDeltaTable.FULL_SCHEMA, ComplexTypesDeltaTable.DIMENSIONS, ComplexTypesDeltaTable.EXPECTED_ROWS}
{ComplexTypesDeltaTable.DELTA_TABLE_PATH, ComplexTypesDeltaTable.FULL_SCHEMA, ComplexTypesDeltaTable.DIMENSIONS, ComplexTypesDeltaTable.EXPECTED_ROWS},
{SnapshotDeltaTable.DELTA_TABLE_PATH, SnapshotDeltaTable.FULL_SCHEMA, SnapshotDeltaTable.DIMENSIONS, SnapshotDeltaTable.LATEST_SNAPSHOT_EXPECTED_ROWS}
};
return Arrays.asList(data);
}
Expand Down Expand Up @@ -124,7 +125,7 @@ public void testDeltaInputRow(
@ParameterizedTest(name = "{index}:with context {0}")
public void testReadNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null, null);

MatcherAssert.assertThat(
Assert.assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,18 @@ public void testDeltaInputSourceDeserializationWithNoFilterColumn()
exception.getCause().getMessage()
);
}

@Test
public void testDeltaInputSourceWithSnapshotVersion() throws JsonProcessingException
{
final String payload = "{\n"
+ " \"type\": \"delta\",\n"
+ " \"tablePath\": \"foo/bar\",\n"
+ " \"snapshotVersion\": 56\n"
+ " }";

final DeltaInputSource deltaInputSource = OBJECT_MAPPER.readValue(payload, DeltaInputSource.class);
Assert.assertEquals("foo/bar", deltaInputSource.getTablePath());
Assert.assertEquals((Long) 56L, deltaInputSource.getSnapshotVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.delta.input;

import io.delta.kernel.exceptions.KernelException;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowListPlusRawValues;
import org.apache.druid.data.input.InputRowSchema;
Expand Down Expand Up @@ -68,27 +69,62 @@ public static Object[][] data()
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.FULL_SCHEMA,
null,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.SCHEMA_1,
null,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
NonPartitionedDeltaTable.DELTA_TABLE_PATH,
NonPartitionedDeltaTable.SCHEMA_2,
null,
NonPartitionedDeltaTable.EXPECTED_ROWS
},
{
PartitionedDeltaTable.DELTA_TABLE_PATH,
PartitionedDeltaTable.FULL_SCHEMA,
null,
PartitionedDeltaTable.EXPECTED_ROWS
},
{
ComplexTypesDeltaTable.DELTA_TABLE_PATH,
ComplexTypesDeltaTable.FULL_SCHEMA,
null,
ComplexTypesDeltaTable.EXPECTED_ROWS
},
{
SnapshotDeltaTable.DELTA_TABLE_PATH,
SnapshotDeltaTable.FULL_SCHEMA,
0L,
SnapshotDeltaTable.V0_SNAPSHOT_EXPECTED_ROWS
},
{
SnapshotDeltaTable.DELTA_TABLE_PATH,
SnapshotDeltaTable.FULL_SCHEMA,
1L,
SnapshotDeltaTable.V1_SNAPSHOT_EXPECTED_ROWS
},
{
SnapshotDeltaTable.DELTA_TABLE_PATH,
SnapshotDeltaTable.FULL_SCHEMA,
2L,
SnapshotDeltaTable.V2_SNAPSHOT_EXPECTED_ROWS
},
{
SnapshotDeltaTable.DELTA_TABLE_PATH,
SnapshotDeltaTable.FULL_SCHEMA,
3L,
SnapshotDeltaTable.LATEST_SNAPSHOT_EXPECTED_ROWS
},
{
SnapshotDeltaTable.DELTA_TABLE_PATH,
SnapshotDeltaTable.FULL_SCHEMA,
null,
SnapshotDeltaTable.LATEST_SNAPSHOT_EXPECTED_ROWS
}
};
}
Expand All @@ -98,12 +134,14 @@ public static Object[][] data()
@Parameterized.Parameter(1)
public InputRowSchema schema;
@Parameterized.Parameter(2)
public Long snapshotVersion;
@Parameterized.Parameter(3)
public List<Map<String, Object>> expectedRows;

@Test
public void testSampleDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null);
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null, snapshotVersion);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);

List<InputRowListPlusRawValues> actualSampledRows = sampleAllRows(inputSourceReader);
Expand Down Expand Up @@ -137,7 +175,7 @@ public void testSampleDeltaTable() throws IOException
@Test
public void testReadDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null);
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, null, snapshotVersion);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(expectedRows, actualReadRows, schema);
Expand Down Expand Up @@ -269,7 +307,7 @@ public static Object[][] data()
@Test
public void testSampleDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter);
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);

List<InputRowListPlusRawValues> actualSampledRows = sampleAllRows(inputSourceReader);
Expand Down Expand Up @@ -311,7 +349,7 @@ private static List<Map<String, Object>> filterExpectedRows(
@Test
public void testReadDeltaTable() throws IOException
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter);
final DeltaInputSource deltaInputSource = new DeltaInputSource(deltaTablePath, null, filter, null);
final InputSourceReader inputSourceReader = deltaInputSource.reader(schema, null, null);
final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
validateRows(expectedRows, actualReadRows, schema);
Expand All @@ -326,7 +364,7 @@ public void testNullTable()
MatcherAssert.assertThat(
Assert.assertThrows(
DruidException.class,
() -> new DeltaInputSource(null, null, null)
() -> new DeltaInputSource(null, null, null, null)
),
DruidExceptionMatcher.invalidInput().expectMessageIs(
"tablePath cannot be null."
Expand All @@ -337,7 +375,7 @@ public void testNullTable()
@Test
public void testSplitNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null, null);

MatcherAssert.assertThat(
Assert.assertThrows(
Expand All @@ -353,7 +391,7 @@ public void testSplitNonExistentTable()
@Test
public void testReadNonExistentTable()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null);
final DeltaInputSource deltaInputSource = new DeltaInputSource("non-existent-table", null, null, null);

MatcherAssert.assertThat(
Assert.assertThrows(
Expand All @@ -365,6 +403,22 @@ public void testReadNonExistentTable()
)
);
}

@Test
public void testReadNonExistentSnapshot()
{
final DeltaInputSource deltaInputSource = new DeltaInputSource(
SnapshotDeltaTable.DELTA_TABLE_PATH,
null,
null,
100L
);

Assert.assertThrows(
KernelException.class,
() -> deltaInputSource.reader(null, null, null)
);
}
}

private static List<InputRowListPlusRawValues> sampleAllRows(InputSourceReader reader) throws IOException
Expand Down
Loading

0 comments on commit aa833a7

Please sign in to comment.