From 1affa35b298c212e5c83c0bdf0e9166c8b36d4d5 Mon Sep 17 00:00:00 2001 From: Abhishek Radhakrishnan Date: Tue, 6 Feb 2024 07:55:17 -0800 Subject: [PATCH] Bump up Delta Lake Kernel to 3.1.0 (#15842) This patch bumps Delta Lake Kernel dependency from 3.0.0 to 3.1.0, which released last week - please see https://github.com/delta-io/delta/releases/tag/v3.1.0 for release notes. There were a few "breaking" API changes in 3.1.0, you can find the rationale for some of those changes here. Next-up in this extension: add and expose filter predicates. --- .../druid-deltalake-extensions/pom.xml | 2 +- .../druid/delta/input/DeltaInputSource.java | 78 ++++++++++++++----- .../delta/input/DeltaInputSourceReader.java | 39 +++++++--- .../apache/druid/delta/input/RowSerde.java | 5 +- .../druid/delta/input/DeltaInputRowTest.java | 46 +++++++---- .../src/test/resources/requirements.txt | 2 +- 6 files changed, 122 insertions(+), 50 deletions(-) diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml b/extensions-contrib/druid-deltalake-extensions/pom.xml index 267d21936928..93a2e1c339f4 100644 --- a/extensions-contrib/druid-deltalake-extensions/pom.xml +++ b/extensions-contrib/druid-deltalake-extensions/pom.xml @@ -35,7 +35,7 @@ 4.0.0 - 3.0.0 + 3.1.0 diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java index 936e2dd70d6b..cbce419291fe 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java @@ -28,13 +28,17 @@ import io.delta.kernel.Table; import io.delta.kernel.TableNotFoundException; import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.data.ScanStateRow; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; @@ -113,15 +117,19 @@ public InputSourceReader reader( { final TableClient tableClient = createTableClient(); try { - final Row scanState; - final List scanRowList; + final List> scanFileDataIters = new ArrayList<>(); if (deltaSplit != null) { - scanState = deserialize(tableClient, deltaSplit.getStateRow()); - scanRowList = deltaSplit.getFiles() - .stream() - .map(row -> deserialize(tableClient, row)) - .collect(Collectors.toList()); + final Row scanState = deserialize(tableClient, deltaSplit.getStateRow()); + final StructType physicalReadSchema = + ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); + + for (String file : deltaSplit.getFiles()) { + final Row scanFile = deserialize(tableClient, file); + scanFileDataIters.add( + getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema) + ); + } } else { final Table table = Table.forPath(tableClient, tablePath); final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient); @@ -130,25 +138,28 @@ public InputSourceReader reader( inputRowSchema.getColumnsFilter() ); final Scan scan = latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, prunedSchema).build(); - final CloseableIterator scanFiles = scan.getScanFiles(tableClient); + final CloseableIterator scanFilesIter = scan.getScanFiles(tableClient); + final Row scanState = scan.getScanState(tableClient); - scanState = scan.getScanState(tableClient); - scanRowList = new ArrayList<>(); + final StructType physicalReadSchema = + ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); - while (scanFiles.hasNext()) { - final FilteredColumnarBatch scanFileBatch = scanFiles.next(); + while (scanFilesIter.hasNext()) { + final FilteredColumnarBatch scanFileBatch = scanFilesIter.next(); final CloseableIterator scanFileRows = scanFileBatch.getRows(); - scanFileRows.forEachRemaining(scanRowList::add); + + while (scanFileRows.hasNext()) { + final Row scanFile = scanFileRows.next(); + scanFileDataIters.add( + getTransformedDataIterator(tableClient, scanState, scanFile, physicalReadSchema) + ); + } } } + return new DeltaInputSourceReader( - Scan.readData( - tableClient, - scanState, - Utils.toCloseableIterator(scanRowList.iterator()), - Optional.empty() - ), - inputRowSchema + scanFileDataIters.iterator(), + inputRowSchema ); } catch (TableNotFoundException e) { @@ -258,4 +269,31 @@ private TableClient createTableClient() Thread.currentThread().setContextClassLoader(currCtxClassloader); } } + + /** + * Utility to get the transformed data iterator from the scan file. Code borrowed and modified from + * + * SingleThreadedTableReader.java. + */ + private CloseableIterator getTransformedDataIterator( + final TableClient tableClient, + final Row scanState, + final Row scanFile, + final StructType physicalReadSchema + ) throws IOException + { + final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); + + final CloseableIterator physicalDataIter = tableClient.getParquetHandler().readParquetFiles( + Utils.singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty() + ); + return Scan.transformPhysicalData( + tableClient, + scanState, + scanFile, + physicalDataIter + ); + } } diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java index d0fc4780d001..02421997f432 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java @@ -29,6 +29,7 @@ import org.apache.druid.java.util.common.parsers.CloseableIterator; import java.io.IOException; +import java.util.Iterator; import java.util.NoSuchElementException; /** @@ -38,28 +39,29 @@ */ public class DeltaInputSourceReader implements InputSourceReader { - private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator; + private final Iterator> filteredColumnarBatchIterators; private final InputRowSchema inputRowSchema; public DeltaInputSourceReader( - io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator, + Iterator> filteredColumnarBatchIterators, InputRowSchema inputRowSchema + ) { - this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; + this.filteredColumnarBatchIterators = filteredColumnarBatchIterators; this.inputRowSchema = inputRowSchema; } @Override public CloseableIterator read() { - return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); + return new DeltaInputSourceIterator(filteredColumnarBatchIterators, inputRowSchema); } @Override public CloseableIterator read(InputStats inputStats) { - return new DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, inputRowSchema); + return new DeltaInputSourceIterator(filteredColumnarBatchIterators, inputRowSchema); } @Override @@ -92,17 +94,17 @@ public InputRowListPlusRawValues next() private static class DeltaInputSourceIterator implements CloseableIterator { - private final io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator; + private final Iterator> filteredColumnarBatchIterators; private io.delta.kernel.utils.CloseableIterator currentBatch = null; private final InputRowSchema inputRowSchema; public DeltaInputSourceIterator( - io.delta.kernel.utils.CloseableIterator filteredColumnarBatchCloseableIterator, + Iterator> filteredColumnarBatchCloseableIterator, InputRowSchema inputRowSchema ) { - this.filteredColumnarBatchCloseableIterator = filteredColumnarBatchCloseableIterator; + this.filteredColumnarBatchIterators = filteredColumnarBatchCloseableIterator; this.inputRowSchema = inputRowSchema; } @@ -110,10 +112,19 @@ public DeltaInputSourceIterator( public boolean hasNext() { while (currentBatch == null || !currentBatch.hasNext()) { - if (!filteredColumnarBatchCloseableIterator.hasNext()) { + if (!filteredColumnarBatchIterators.hasNext()) { return false; // No more batches or records to read! } - currentBatch = filteredColumnarBatchCloseableIterator.next().getRows(); + + final io.delta.kernel.utils.CloseableIterator filteredBatchIterator = + filteredColumnarBatchIterators.next(); + + while (filteredBatchIterator.hasNext()) { + currentBatch = filteredBatchIterator.next().getRows(); + if (currentBatch.hasNext()) { + return true; + } + } } return true; } @@ -132,7 +143,13 @@ public InputRow next() @Override public void close() throws IOException { - filteredColumnarBatchCloseableIterator.close(); + if (currentBatch != null) { + currentBatch.close(); + } + + if (filteredColumnarBatchIterators.hasNext()) { + filteredColumnarBatchIterators.next().close(); + } } } } diff --git a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java index f10ac0574e9f..e37c5d503314 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java +++ b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java @@ -26,7 +26,6 @@ import io.delta.kernel.client.TableClient; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.internal.data.DefaultJsonRow; -import io.delta.kernel.internal.types.TableSchemaSerDe; import io.delta.kernel.internal.util.VectorUtils; import io.delta.kernel.types.ArrayType; import io.delta.kernel.types.BooleanType; @@ -73,7 +72,7 @@ public static String serializeRowToJson(Row row) Map rowObject = convertRowToJsonObject(row); try { Map rowWithSchema = new HashMap<>(); - rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema())); + rowWithSchema.put("schema", row.getSchema().toJson()); rowWithSchema.put("row", rowObject); return OBJECT_MAPPER.writeValueAsString(rowWithSchema); } @@ -91,7 +90,7 @@ public static Row deserializeRowFromJson(TableClient tableClient, String jsonRow JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema); JsonNode schemaNode = jsonNode.get("schema"); StructType schema = - TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), schemaNode.asText()); + tableClient.getJsonHandler().deserializeStructType(schemaNode.asText()); return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), schema); } catch (JsonProcessingException e) { diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java index 6f68774dbd61..7069d79b55c6 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java +++ b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java @@ -22,10 +22,16 @@ import io.delta.kernel.Scan; import io.delta.kernel.TableNotFoundException; import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.internal.data.ScanStateRow; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; import org.junit.Test; @@ -42,23 +48,35 @@ public void testDeltaInputRow() throws TableNotFoundException, IOException final TableClient tableClient = DefaultTableClient.create(new Configuration()); final Scan scan = DeltaTestUtils.getScan(tableClient); - CloseableIterator scanFileIter = scan.getScanFiles(tableClient); + final Row scanState = scan.getScanState(tableClient); + final StructType physicalReadSchema = ScanStateRow.getPhysicalDataReadSchema(tableClient, scanState); + + final CloseableIterator scanFileIter = scan.getScanFiles(tableClient); int totalRecordCount = 0; while (scanFileIter.hasNext()) { - try (CloseableIterator data = - Scan.readData( - tableClient, - scan.getScanState(tableClient), - scanFileIter.next().getRows(), - Optional.empty() - )) { - while (data.hasNext()) { - FilteredColumnarBatch dataReadResult = data.next(); + final FilteredColumnarBatch scanFileBatch = scanFileIter.next(); + final CloseableIterator scanFileRows = scanFileBatch.getRows(); + + while (scanFileRows.hasNext()) { + final Row scanFile = scanFileRows.next(); + final FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFile); + + final CloseableIterator physicalDataIter = tableClient.getParquetHandler().readParquetFiles( + Utils.singletonCloseableIterator(fileStatus), + physicalReadSchema, + Optional.empty() + ); + final CloseableIterator dataIter = Scan.transformPhysicalData( + tableClient, + scanState, + scanFile, + physicalDataIter + ); + + while (dataIter.hasNext()) { + FilteredColumnarBatch dataReadResult = dataIter.next(); Row next = dataReadResult.getRows().next(); - DeltaInputRow deltaInputRow = new DeltaInputRow( - next, - DeltaTestUtils.FULL_SCHEMA - ); + DeltaInputRow deltaInputRow = new DeltaInputRow(next, DeltaTestUtils.FULL_SCHEMA); Assert.assertNotNull(deltaInputRow); Assert.assertEquals(DeltaTestUtils.DIMENSIONS, deltaInputRow.getDimensions()); diff --git a/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt index 8a846d26385d..bffa41335c2b 100644 --- a/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt +++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt @@ -1,2 +1,2 @@ -delta-spark==3.0.0 +delta-spark==3.1.0 pyspark==3.5.0 \ No newline at end of file