From e01f1552098070616b1f8c695d5d42c9733d0727 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Tue, 25 Jun 2024 09:16:13 -0700 Subject: [PATCH] Add missing `delta-storage` dependency and class loader workaround to Delta table ingestion (#16648) * Workaround to ingesting from Delta table in 3.2.0. With the upgrade to Kernel 3.2.0, the Druid Delta connector extension isn't able to ingest from Delta tables successfully. Please see https://github.com/delta-io/delta/issues/3299 The underlying problem seems to be coming from https://github.com/delta-io/delta/blob/master/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java#L99 This patch is a workaround to setting the thread class loader explictly. The Kernel community may consider a fix in the next release as it's affected another connector as well. * Address review comment: clear the CL after the Thread CL is set. --- .../druid-deltalake-extensions/pom.xml | 7 ++++++- .../druid/delta/input/DeltaInputSource.java | 21 +++++++++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml index 053f6556573c..71a167c315ec 100644 --- a/extensions-contrib/druid-deltalake-extensions/pom.xml +++ b/extensions-contrib/druid-deltalake-extensions/pom.xml @@ -44,12 +44,17 @@ delta-kernel-api ${delta-kernel.version} - io.delta delta-kernel-defaults ${delta-kernel.version} + + io.delta + delta-storage + ${delta-kernel.version} + + org.apache.hadoop hadoop-client-api diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java index 7d126caef349..01a18e9bc857 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -42,6 +42,7 @@ import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; +import io.delta.storage.LogStore; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -151,7 +152,8 @@ public InputSourceReader reader( } } else { final Table table = Table.forPath(engine, tablePath); - final Snapshot latestSnapshot = table.getLatestSnapshot(engine); + final Snapshot latestSnapshot = getLatestSnapshotForTable(table, engine); + final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine); final StructType prunedSchema = pruneSchema( fullSnapshotSchema, @@ -207,7 +209,7 @@ public Stream> createSplits(InputFormat inputFormat, @Nul final Snapshot latestSnapshot; final Table table = Table.forPath(engine, tablePath); try { - latestSnapshot = table.getLatestSnapshot(engine); + latestSnapshot = getLatestSnapshotForTable(table, engine); } catch (TableNotFoundException e) { throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath); @@ -331,6 +333,21 @@ private CloseableIterator getTransformedDataIterator( ); } + private Snapshot getLatestSnapshotForTable(final Table table, final Engine engine) + { + // Setting the LogStore class loader before calling the Delta Kernel snapshot API is required as a workaround with + // the 3.2.0 Delta Kernel because the Kernel library cannot instantiate the LogStore class otherwise. Please see + // https://github.com/delta-io/delta/issues/3299 for details. This workaround can be removed once the issue is fixed. + final ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader(); + try { + Thread.currentThread().setContextClassLoader(LogStore.class.getClassLoader()); + return table.getLatestSnapshot(engine); + } + finally { + Thread.currentThread().setContextClassLoader(currCtxCl); + } + } + @VisibleForTesting String getTablePath() {