diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml index 053f6556573c..71a167c315ec 100644 --- a/extensions-contrib/druid-deltalake-extensions/pom.xml +++ b/extensions-contrib/druid-deltalake-extensions/pom.xml @@ -44,12 +44,17 @@ delta-kernel-api ${delta-kernel.version} - io.delta delta-kernel-defaults ${delta-kernel.version} + + io.delta + delta-storage + ${delta-kernel.version} + + org.apache.hadoop hadoop-client-api diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java index 7d126caef349..01a18e9bc857 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -42,6 +42,7 @@ import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; +import io.delta.storage.LogStore; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -151,7 +152,8 @@ public InputSourceReader reader( } } else { final Table table = Table.forPath(engine, tablePath); - final Snapshot latestSnapshot = table.getLatestSnapshot(engine); + final Snapshot latestSnapshot = getLatestSnapshotForTable(table, engine); + final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine); final StructType prunedSchema = pruneSchema( fullSnapshotSchema, @@ -207,7 +209,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nul final Snapshot latestSnapshot; final Table table = Table.forPath(engine, tablePath); try { - latestSnapshot = table.getLatestSnapshot(engine); + latestSnapshot = getLatestSnapshotForTable(table, engine); } catch (TableNotFoundException e) { throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath); @@ -331,6 +333,21 @@ private CloseableIterator getTransformedDataIterator( ); } + private Snapshot getLatestSnapshotForTable(final Table table, final Engine engine) + { + // Setting the LogStore class loader before calling the Delta Kernel snapshot API is required as a workaround with + // the 3.2.0 Delta Kernel because the Kernel library cannot instantiate the LogStore class otherwise. Please see + // https://github.com/delta-io/delta/issues/3299 for details. This workaround can be removed once the issue is fixed. + final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(LogStore.class.getClassLoader()); + return table.getLatestSnapshot(engine); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + @VisibleForTesting String getTablePath() {