Skip to content

Commit

Permalink
Add missing delta-storage dependency and class loader workaround to…
Browse files Browse the repository at this point in the history
… Delta table ingestion (#16648)

* Workaround to ingesting from Delta table in 3.2.0.

With the upgrade to Kernel 3.2.0, the Druid Delta connector extension
isn't able to ingest from Delta tables successfully.

Please see delta-io/delta#3299

The underlying problem seems to be coming from
https://github.com/delta-io/delta/blob/master/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java#L99

This patch is a workaround to setting the thread class loader explictly.
The Kernel community may consider a fix in the next release as it's affected another
connector as well.

* Address review comment: clear the CL after the Thread CL is set.
  • Loading branch information
abhishekrb19 authored Jun 25, 2024
1 parent b43f406 commit e01f155
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
7 changes: 6 additions & 1 deletion extensions-contrib/druid-deltalake-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@
<artifactId>delta-kernel-api</artifactId>
<version>${delta-kernel.version}</version>
</dependency>

<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${delta-kernel.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-storage</artifactId>
<version>${delta-kernel.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
import io.delta.storage.LogStore;
import org.apache.druid.data.input.ColumnsFilter;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
Expand Down Expand Up @@ -151,7 +152,8 @@ public InputSourceReader reader(
}
} else {
final Table table = Table.forPath(engine, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(engine);
final Snapshot latestSnapshot = getLatestSnapshotForTable(table, engine);

final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType prunedSchema = pruneSchema(
fullSnapshotSchema,
Expand Down Expand Up @@ -207,7 +209,7 @@ public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nul
final Snapshot latestSnapshot;
final Table table = Table.forPath(engine, tablePath);
try {
latestSnapshot = table.getLatestSnapshot(engine);
latestSnapshot = getLatestSnapshotForTable(table, engine);
}
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
Expand Down Expand Up @@ -331,6 +333,21 @@ private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
);
}

private Snapshot getLatestSnapshotForTable(final Table table, final Engine engine)
{
// Setting the LogStore class loader before calling the Delta Kernel snapshot API is required as a workaround with
// the 3.2.0 Delta Kernel because the Kernel library cannot instantiate the LogStore class otherwise. Please see
// https://github.com/delta-io/delta/issues/3299 for details. This workaround can be removed once the issue is fixed.
final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(LogStore.class.getClassLoader());
return table.getLatestSnapshot(engine);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxCl);
}
}

@VisibleForTesting
String getTablePath()
{
Expand Down

0 comments on commit e01f155

Please sign in to comment.