Skip to content

Commit

Permalink
Upgrade delta kernel from 3.1.0 to 3.2.0 (#16513)
Browse files Browse the repository at this point in the history
Upstream release: https://github.com/delta-io/delta/releases/tag/v3.2.0

- Upgrade kernel dependency to 3.2.0
- Notable breaking changes introduced in upstream that affects the Druid extension:
 - Rename TableClient -> Engine
 - Rename DefaultTableClient -> DefaultEngine
 - Exceptions moved to a separate package
 - Table.getPath() doesn't throw TableNotFoundException. Instead the exception is thrown
   when getting snapshot info from the Table object
  • Loading branch information
abhishekrb19 authored May 29, 2024
1 parent b3b62ac commit 75937c9
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 68 deletions.
2 changes: 1 addition & 1 deletion extensions-contrib/druid-deltalake-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
<modelVersion>4.0.0</modelVersion>

<properties>
<delta-kernel.version>3.1.0</delta-kernel.version>
<delta-kernel.version>3.2.0</delta-kernel.version>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
Expand Down Expand Up @@ -120,7 +120,7 @@ public boolean needsFormat()

/**
* Instantiates a {@link DeltaInputSourceReader} to read the Delta table rows. If a {@link DeltaSplit} is supplied,
* the Delta files and schema are obtained from it to instantiate the reader. Otherwise, a Delta table client is
* the Delta files and schema are obtained from it to instantiate the reader. Otherwise, the Delta engine is
* instantiated with the supplied configuration to read the table.
*
* @param inputRowSchema schema for {@link org.apache.druid.data.input.InputRow}
Expand All @@ -134,40 +134,40 @@ public InputSourceReader reader(
File temporaryDirectory
)
{
final TableClient tableClient = createTableClient();
final Engine engine = createDeltaEngine();
try {
final List<CloseableIterator<FilteredColumnarBatch>> scanFileDataIters = new ArrayList<>();

if (deltaSplit != null) {
final Row scanState = deserialize(tableClient, deltaSplit.getStateRow());
final Row scanState = deserialize(engine, deltaSplit.getStateRow());
final StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

for (String file : deltaSplit.getFiles()) {
final Row scanFile = deserialize(tableClient, file);
final Row scanFile = deserialize(engine, file);
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, Optional.empty())
getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, Optional.empty())
);
}
} else {
final Table table = Table.forPath(tableClient, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final Table table = Table.forPath(engine, tablePath);
final Snapshot latestSnapshot = table.getLatestSnapshot(engine);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);
final StructType prunedSchema = pruneSchema(
fullSnapshotSchema,
inputRowSchema.getColumnsFilter()
);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(tableClient);
final Row scanState = scan.getScanState(tableClient);
final Scan scan = scanBuilder.withReadSchema(engine, prunedSchema).build();
final CloseableIterator<FilteredColumnarBatch> scanFilesIter = scan.getScanFiles(engine);
final Row scanState = scan.getScanState(engine);

final StructType physicalReadSchema =
ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

while (scanFilesIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFilesIter.next();
Expand All @@ -176,7 +176,7 @@ public InputSourceReader reader(
while (scanFileRows.hasNext()) {
final Row scanFile = scanFileRows.next();
scanFileDataIters.add(
getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
getTransformedDataIterator(engine, scanState, scanFile, physicalReadSchema, scan.getRemainingFilter())
);
}
}
Expand All @@ -203,26 +203,26 @@ public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, @Nul
return Stream.of(new InputSplit<>(deltaSplit));
}

final TableClient tableClient = createTableClient();
final Engine engine = createDeltaEngine();
final Snapshot latestSnapshot;
final Table table = Table.forPath(engine, tablePath);
try {
final Table table = Table.forPath(tableClient, tablePath);
latestSnapshot = table.getLatestSnapshot(tableClient);
latestSnapshot = table.getLatestSnapshot(engine);
}
catch (TableNotFoundException e) {
throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
}
final StructType fullSnapshotSchema = latestSnapshot.getSchema(tableClient);
final StructType fullSnapshotSchema = latestSnapshot.getSchema(engine);

final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(tableClient);
final ScanBuilder scanBuilder = latestSnapshot.getScanBuilder(engine);
if (filter != null) {
scanBuilder.withFilter(tableClient, filter.getFilterPredicate(fullSnapshotSchema));
scanBuilder.withFilter(engine, filter.getFilterPredicate(fullSnapshotSchema));
}
final Scan scan = scanBuilder.withReadSchema(tableClient, fullSnapshotSchema).build();
final Scan scan = scanBuilder.withReadSchema(engine, fullSnapshotSchema).build();
// scan files iterator for the current snapshot
final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = scan.getScanFiles(tableClient);
final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = scan.getScanFiles(engine);

final Row scanState = scan.getScanState(tableClient);
final Row scanState = scan.getScanState(engine);
final String scanStateStr = RowSerde.serializeRowToJson(scanState);

Iterator<DeltaSplit> deltaSplitIterator = Iterators.transform(
Expand Down Expand Up @@ -256,9 +256,9 @@ public InputSource withSplit(InputSplit<DeltaSplit> split)
);
}

private Row deserialize(TableClient tableClient, String row)
private Row deserialize(Engine engine, String row)
{
return RowSerde.deserializeRowFromJson(tableClient, row);
return RowSerde.deserializeRowFromJson(engine, row);
}

/**
Expand All @@ -285,17 +285,17 @@ private StructType pruneSchema(final StructType baseSchema, final ColumnsFilter
}

/**
* @return a table client where the client is initialized with {@link Configuration} class that uses the class's
* @return a Delta engine initialized with {@link Configuration} class that uses the class's
* class loader instead of the context classloader. The latter by default doesn't know about the extension classes,
* so the table client cannot load runtime classes resulting in {@link ClassNotFoundException}.
* so the Delta engine cannot load runtime classes resulting in {@link ClassNotFoundException}.
*/
private TableClient createTableClient()
private Engine createDeltaEngine()
{
final ClassLoader currCtxClassloader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
final Configuration conf = new Configuration();
return DefaultTableClient.create(conf);
return DefaultEngine.create(conf);
}
finally {
Thread.currentThread().setContextClassLoader(currCtxClassloader);
Expand All @@ -308,7 +308,7 @@ private TableClient createTableClient()
* SingleThreadedTableReader.java</a>.
*/
private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
final TableClient tableClient,
final Engine engine,
final Row scanState,
final Row scanFile,
final StructType physicalReadSchema,
Expand All @@ -317,14 +317,14 @@ private CloseableIterator<FilteredColumnarBatch> getTransformedDataIterator(
{
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);

final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
final CloseableIterator<ColumnarBatch> physicalDataIter = engine.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
optionalPredicate
);

return Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFile,
physicalDataIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.ArrayType;
import io.delta.kernel.types.BooleanType;
Expand Down Expand Up @@ -84,13 +84,12 @@ public static String serializeRowToJson(Row row)
/**
* Utility method to deserialize a {@link Row} object from the JSON form.
*/
public static Row deserializeRowFromJson(TableClient tableClient, String jsonRowWithSchema)
public static Row deserializeRowFromJson(Engine engine, String jsonRowWithSchema)
{
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
JsonNode schemaNode = jsonNode.get("schema");
StructType schema =
tableClient.getJsonHandler().deserializeStructType(schemaNode.asText());
StructType schema = engine.getJsonHandler().deserializeStructType(schemaNode.asText());
return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema);
}
catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
package org.apache.druid.delta.input;

import io.delta.kernel.Scan;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.util.Utils;
Expand Down Expand Up @@ -68,13 +68,13 @@ public void testDeltaInputRow(
final List<Map<String, Object>> expectedRows
) throws TableNotFoundException, IOException
{
final TableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient, deltaTablePath);
final Engine engine = DefaultEngine.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(engine, deltaTablePath);

final Row scanState = scan.getScanState(tableClient);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState);
final Row scanState = scan.getScanState(engine);
final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(engine, scanState);

final CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(tableClient);
final CloseableIterator<FilteredColumnarBatch> scanFileIter = scan.getScanFiles(engine);
int totalRecordCount = 0;
while (scanFileIter.hasNext()) {
final FilteredColumnarBatch scanFileBatch = scanFileIter.next();
Expand All @@ -84,13 +84,13 @@ public void testDeltaInputRow(
final Row scanFile = scanFileRows.next();
final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile);

final CloseableIterator<ColumnarBatch> physicalDataIter = tableClient.getParquetHandler().readParquetFiles(
final CloseableIterator<ColumnarBatch> physicalDataIter = engine.getParquetHandler().readParquetFiles(
Utils.singletonCloseableIterator(fileStatus),
physicalReadSchema,
Optional.empty()
);
final CloseableIterator<FilteredColumnarBatch> dataIter = Scan.transformPhysicalData(
tableClient,
engine,
scanState,
scanFile,
physicalDataIter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@
import io.delta.kernel.ScanBuilder;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.client.TableClient;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.types.StructType;

public class DeltaTestUtils
{
public static Scan getScan(final TableClient tableClient, final String deltaTablePath) throws TableNotFoundException
public static Scan getScan(final Engine engine, final String deltaTablePath) throws TableNotFoundException
{
final Table table = Table.forPath(tableClient, deltaTablePath);
final Snapshot snapshot = table.getLatestSnapshot(tableClient);
final StructType readSchema = snapshot.getSchema(tableClient);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
.withReadSchema(tableClient, readSchema);
final Table table = Table.forPath(engine, deltaTablePath);
final Snapshot snapshot = table.getLatestSnapshot(engine);
final StructType readSchema = snapshot.getSchema(engine);
final ScanBuilder scanBuilder = snapshot.getScanBuilder(engine)
.withReadSchema(engine, readSchema);
return scanBuilder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
package org.apache.druid.delta.input;

import io.delta.kernel.Scan;
import io.delta.kernel.TableNotFoundException;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.client.DefaultTableClient;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.exceptions.TableNotFoundException;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -46,13 +46,13 @@ public static Collection<Object[]> data()
@ParameterizedTest(name = "{index}:with context {0}")
public void testSerializeDeserializeRoundtrip(final String tablePath) throws TableNotFoundException
{
final DefaultTableClient tableClient = DefaultTableClient.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(tableClient, tablePath);
final Row scanState = scan.getScanState(tableClient);
final DefaultEngine engine = DefaultEngine.create(new Configuration());
final Scan scan = DeltaTestUtils.getScan(engine, tablePath);
final Row scanState = scan.getScanState(engine);

final String rowJson = RowSerde.serializeRowToJson(scanState);
final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson);
final Row row = RowSerde.deserializeRowFromJson(engine, rowJson);

Assert.assertEquals(scanState.getSchema(), row.getSchema());
Assertions.assertEquals(scanState.getSchema(), row.getSchema());
}
}

0 comments on commit 75937c9

Please sign in to comment.