diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java index e443a7d3e286e..9140558c41cd6 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/ArraySinkRow.java @@ -39,7 +39,4 @@ public Data.Op getOp() { public int size() { return values.length; } - - @Override - public void close() throws Exception {} } diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java index 0ae0aa3facf7e..dcddfc07479b6 100644 --- a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/SinkRow.java @@ -16,7 +16,7 @@ import com.risingwave.proto.Data; -public interface SinkRow extends AutoCloseable { +public interface SinkRow { Object get(int index); Data.Op getOp(); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java index 5ea2db204a06c..0959b389e55ca 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java @@ -60,28 +60,25 @@ public FileSink(FileSinkConfig config, TableSchema tableSchema) { @Override public void write(Iterator rows) { while (rows.hasNext()) { - try (SinkRow row = rows.next()) { - switch (row.getOp()) { - case INSERT: - String buf = - new Gson() - .toJson( - IntStream.range(0, row.size()) - .mapToObj(row::get) - .toArray()); - try { - sinkWriter.write(buf + System.lineSeparator()); - } catch (IOException e) { - throw INTERNAL.withCause(e).asRuntimeException(); - } - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); - } - } catch (Exception e) { - throw new RuntimeException(e); + SinkRow row = rows.next(); + switch (row.getOp()) { + case INSERT: + String buf = + new Gson() + .toJson( + IntStream.range(0, row.size()) + .mapToObj(row::get) + .toArray()); + try { + sinkWriter.write(buf + System.lineSeparator()); + } catch (IOException e) { + throw INTERNAL.withCause(e).asRuntimeException(); + } + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); } } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java index a8175bb2f738d..ab9a9068fabb9 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/deserializer/StreamChunkDeserializer.java @@ -251,14 +251,12 @@ public CloseableIterator deserialize( static class StreamChunkRowWrapper implements SinkRow { - private boolean isClosed; private final StreamChunkRow inner; private final ValueGetter[] valueGetters; StreamChunkRowWrapper(StreamChunkRow inner, ValueGetter[] valueGetters) { this.inner = inner; this.valueGetters = valueGetters; - this.isClosed = false; } @Override @@ -275,14 +273,6 @@ public Data.Op getOp() { public int size() { return valueGetters.length; } - - @Override - public void close() { - if (!isClosed) { - this.isClosed = true; - inner.close(); - } - } } static class StreamChunkIteratorWrapper implements CloseableIterator { @@ -299,13 +289,6 @@ public StreamChunkIteratorWrapper(StreamChunkIterator iter, ValueGetter[] valueG @Override public void close() { iter.close(); - try { - if (row != null) { - row.close(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } } @Override diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java index 1b3a7c28d97a9..413edeb10df81 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSink.java @@ -75,27 +75,24 @@ public void write(Iterator rows) { } } while (rows.hasNext()) { - try (SinkRow row = rows.next()) { - switch (row.getOp()) { - case INSERT: - GenericRecord record = new GenericData.Record(this.sinkSchema); - for (int i = 0; i < this.sinkSchema.getFields().size(); i++) { - record.put(i, row.get(i)); - } - try { - this.parquetWriter.write(record); - this.numOutputRows += 1; - } catch (IOException ioException) { - throw INTERNAL.withCause(ioException).asRuntimeException(); - } - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); - } - } catch (Exception e) { - throw new RuntimeException(e); + SinkRow row = rows.next(); + switch (row.getOp()) { + case INSERT: + GenericRecord record = new GenericData.Record(this.sinkSchema); + for (int i = 0; i < this.sinkSchema.getFields().size(); i++) { + record.put(i, row.get(i)); + } + try { + this.parquetWriter.write(record); + this.numOutputRows += 1; + } catch (IOException ioException) { + throw INTERNAL.withCause(ioException).asRuntimeException(); + } + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); } } } diff --git a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java index 178c76edf3e40..f9c266f0af117 100644 --- a/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java +++ b/java/connector-node/risingwave-sink-es-7/src/main/java/com/risingwave/connector/EsSink.java @@ -276,10 +276,11 @@ private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingEx @Override public void write(Iterator rows) { while (rows.hasNext()) { - try (SinkRow row = rows.next()) { + SinkRow row = rows.next(); + try { writeRow(row); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (Exception ex) { + throw new RuntimeException(ex); } } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java index 6a6aad0a460e0..6b60eedd23d37 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/AppendOnlyIcebergSinkWriter.java @@ -55,62 +55,57 @@ public AppendOnlyIcebergSinkWriter( @Override public void write(Iterator rows) { while (rows.hasNext()) { - try (SinkRow row = rows.next()) { - switch (row.getOp()) { - case INSERT: - Record record = GenericRecord.create(rowSchema); - if (row.size() != tableSchema.getColumnNames().length) { - throw INTERNAL.withDescription("row values do not match table schema") + SinkRow row = rows.next(); + switch (row.getOp()) { + case INSERT: + Record record = GenericRecord.create(rowSchema); + if (row.size() != tableSchema.getColumnNames().length) { + throw INTERNAL.withDescription("row values do not match table schema") + .asRuntimeException(); + } + for (int i = 0; i < rowSchema.columns().size(); i++) { + record.set(i, row.get(i)); + } + PartitionKey partitionKey = + new PartitionKey(icebergTable.spec(), icebergTable.schema()); + partitionKey.partition(record); + DataWriter dataWriter; + if (dataWriterMap.containsKey(partitionKey)) { + dataWriter = dataWriterMap.get(partitionKey); + } else { + try { + String filename = fileFormat.addExtension(UUID.randomUUID().toString()); + OutputFile outputFile = + icebergTable + .io() + .newOutputFile( + icebergTable.location() + + "/data/" + + icebergTable + .spec() + .partitionToPath(partitionKey) + + "/" + + filename); + dataWriter = + Parquet.writeData(outputFile) + .schema(rowSchema) + .withSpec(icebergTable.spec()) + .withPartition(partitionKey) + .createWriterFunc(GenericParquetWriter::buildWriter) + .overwrite() + .build(); + } catch (Exception e) { + throw INTERNAL.withDescription("failed to create dataWriter") .asRuntimeException(); } - for (int i = 0; i < rowSchema.columns().size(); i++) { - record.set(i, row.get(i)); - } - PartitionKey partitionKey = - new PartitionKey(icebergTable.spec(), icebergTable.schema()); - partitionKey.partition(record); - DataWriter dataWriter; - if (dataWriterMap.containsKey(partitionKey)) { - dataWriter = dataWriterMap.get(partitionKey); - } else { - try { - String filename = - fileFormat.addExtension(UUID.randomUUID().toString()); - OutputFile outputFile = - icebergTable - .io() - .newOutputFile( - icebergTable.location() - + "/data/" - + icebergTable - .spec() - .partitionToPath( - partitionKey) - + "/" - + filename); - dataWriter = - Parquet.writeData(outputFile) - .schema(rowSchema) - .withSpec(icebergTable.spec()) - .withPartition(partitionKey) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .build(); - } catch (Exception e) { - throw INTERNAL.withDescription("failed to create dataWriter") - .asRuntimeException(); - } - dataWriterMap.put(partitionKey, dataWriter); - } - dataWriter.write(record); - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) - .asRuntimeException(); - } - } catch (Exception e) { - throw new RuntimeException(e); + dataWriterMap.put(partitionKey, dataWriter); + } + dataWriter.write(record); + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); } } } diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java index 10fca804acf64..e1d649f028bf8 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/UpsertIcebergSinkWriter.java @@ -142,57 +142,52 @@ private List> getKeyFromRow(SinkRow row) { @Override public void write(Iterator rows) { while (rows.hasNext()) { - try (SinkRow row = rows.next()) { - if (row.size() != tableSchema.getColumnNames().length) { - throw Status.FAILED_PRECONDITION - .withDescription("row values do not match table schema") - .asRuntimeException(); - } - Record record = newRecord(rowSchema, row); - PartitionKey partitionKey = - new PartitionKey(icebergTable.spec(), icebergTable.schema()); - partitionKey.partition(record); - SinkRowMap sinkRowMap; - if (sinkRowMapByPartition.containsKey(partitionKey)) { - sinkRowMap = sinkRowMapByPartition.get(partitionKey); - } else { - sinkRowMap = new SinkRowMap(); - sinkRowMapByPartition.put(partitionKey, sinkRowMap); - } - switch (row.getOp()) { - case INSERT: - sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); - break; - case DELETE: - sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); - break; - case UPDATE_DELETE: - if (updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); - updateBufferExists = true; - break; - case UPDATE_INSERT: - if (!updateBufferExists) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_INSERT should precede an UPDATE_DELETE") - .asRuntimeException(); - } - sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); - updateBufferExists = false; - break; - default: - throw UNIMPLEMENTED - .withDescription("unsupported operation: " + row.getOp()) + SinkRow row = rows.next(); + if (row.size() != tableSchema.getColumnNames().length) { + throw Status.FAILED_PRECONDITION + .withDescription("row values do not match table schema") + .asRuntimeException(); + } + Record record = newRecord(rowSchema, row); + PartitionKey partitionKey = + new PartitionKey(icebergTable.spec(), icebergTable.schema()); + partitionKey.partition(record); + SinkRowMap sinkRowMap; + if (sinkRowMapByPartition.containsKey(partitionKey)) { + sinkRowMap = sinkRowMapByPartition.get(partitionKey); + } else { + sinkRowMap = new SinkRowMap(); + sinkRowMapByPartition.put(partitionKey, sinkRowMap); + } + switch (row.getOp()) { + case INSERT: + sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); + break; + case DELETE: + sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); + break; + case UPDATE_DELETE: + if (updateBufferExists) { + throw Status.FAILED_PRECONDITION + .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") .asRuntimeException(); - } - } catch (Exception e) { - throw new RuntimeException(e); + } + sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row)); + updateBufferExists = true; + break; + case UPDATE_INSERT: + if (!updateBufferExists) { + throw Status.FAILED_PRECONDITION + .withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE") + .asRuntimeException(); + } + sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row)); + updateBufferExists = false; + break; + default: + throw UNIMPLEMENTED + .withDescription("unsupported operation: " + row.getOp()) + .asRuntimeException(); } } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index fe23c7db5d846..db09572c30db8 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -219,22 +219,19 @@ public void write(Iterator rows) { PreparedStatement insertStatement = null; while (rows.hasNext()) { - try (SinkRow row = rows.next()) { - if (row.getOp() == Data.Op.UPDATE_DELETE) { - updateFlag = true; - continue; - } - if (config.isUpsertSink()) { - if (row.getOp() == Data.Op.DELETE) { - deleteStatement = prepareDeleteStatement(row); - } else { - upsertStatement = prepareUpsertStatement(row); - } + SinkRow row = rows.next(); + if (row.getOp() == Data.Op.UPDATE_DELETE) { + updateFlag = true; + continue; + } + if (config.isUpsertSink()) { + if (row.getOp() == Data.Op.DELETE) { + deleteStatement = prepareDeleteStatement(row); } else { - insertStatement = prepareInsertStatement(row); + upsertStatement = prepareUpsertStatement(row); } - } catch (Exception e) { - throw new RuntimeException(e); + } else { + insertStatement = prepareInsertStatement(row); } } diff --git a/java/java-binding-benchmark/src/main/java/com/risingwave/java/binding/StreamchunkBenchmark.java b/java/java-binding-benchmark/src/main/java/com/risingwave/java/binding/StreamchunkBenchmark.java index 628d1405c8d81..9ca6c4781983f 100644 --- a/java/java-binding-benchmark/src/main/java/com/risingwave/java/binding/StreamchunkBenchmark.java +++ b/java/java-binding-benchmark/src/main/java/com/risingwave/java/binding/StreamchunkBenchmark.java @@ -67,13 +67,12 @@ public void streamchunkTest() { var iter = iterOfIter.next(); int count = 0; while (true) { - try (StreamChunkRow row = iter.next()) { - if (row == null) { - break; - } - count += 1; - getValue(row); + StreamChunkRow row = iter.next(); + if (row == null) { + break; } + count += 1; + getValue(row); } if (count != loopTime) { throw new RuntimeException( diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java index 9f4038cf3f9a3..f1996bb96f43d 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/HummockReadDemo.java @@ -72,13 +72,12 @@ public static void main(String[] args) { try (HummockIterator iter = new HummockIterator(readPlan)) { int count = 0; while (true) { - try (KeyedRow row = iter.next()) { - if (row == null) { - break; - } - count += 1; - validateRow(row); + KeyedRow row = iter.next(); + if (row == null) { + break; } + count += 1; + validateRow(row); } int expectedCount = 30000; if (count != expectedCount) { diff --git a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java index 0cc6977de2f0c..ad59a74e4c20c 100644 --- a/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java +++ b/java/java-binding-integration-test/src/main/java/com/risingwave/java/binding/StreamChunkDemo.java @@ -25,13 +25,12 @@ public static void main(String[] args) throws IOException { try (StreamChunkIterator iter = new StreamChunkIterator(payload)) { int count = 0; while (true) { - try (StreamChunkRow row = iter.next()) { - if (row == null) { - break; - } - count += 1; - validateRow(row); + StreamChunkRow row = iter.next(); + if (row == null) { + break; } + count += 1; + validateRow(row); } int expectedCount = 30000; if (count != expectedCount) { diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java index a12978d92e995..d9fb28115b68c 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/BaseRow.java @@ -14,75 +14,73 @@ package com.risingwave.java.binding; -public class BaseRow implements AutoCloseable { +public class BaseRow { protected final long pointer; - private boolean isClosed; protected BaseRow(long pointer) { this.pointer = pointer; - this.isClosed = false; } public boolean isNull(int index) { - return Binding.rowIsNull(pointer, index); + return Binding.iteratorIsNull(pointer, index); } public short getShort(int index) { - return Binding.rowGetInt16Value(pointer, index); + return Binding.iteratorGetInt16Value(pointer, index); } public int getInt(int index) { - return Binding.rowGetInt32Value(pointer, index); + return Binding.iteratorGetInt32Value(pointer, index); } public long getLong(int index) { - return Binding.rowGetInt64Value(pointer, index); + return Binding.iteratorGetInt64Value(pointer, index); } public float getFloat(int index) { - return Binding.rowGetFloatValue(pointer, index); + return Binding.iteratorGetFloatValue(pointer, index); } public double getDouble(int index) { - return Binding.rowGetDoubleValue(pointer, index); + return Binding.iteratorGetDoubleValue(pointer, index); } public boolean getBoolean(int index) { - return Binding.rowGetBooleanValue(pointer, index); + return Binding.iteratorGetBooleanValue(pointer, index); } public String getString(int index) { - return Binding.rowGetStringValue(pointer, index); + return Binding.iteratorGetStringValue(pointer, index); } public java.sql.Timestamp getTimestamp(int index) { - return Binding.rowGetTimestampValue(pointer, index); + return Binding.iteratorGetTimestampValue(pointer, index); } public java.sql.Time getTime(int index) { - return Binding.rowGetTimeValue(pointer, index); + return Binding.iteratorGetTimeValue(pointer, index); } public java.math.BigDecimal getDecimal(int index) { - return Binding.rowGetDecimalValue(pointer, index); + return Binding.iteratorGetDecimalValue(pointer, index); } public java.sql.Date getDate(int index) { - return Binding.rowGetDateValue(pointer, index); + return Binding.iteratorGetDateValue(pointer, index); } // string representation of interval: "2 mons 3 days 00:00:00.000004" or "P1Y2M3DT4H5M6.789123S" public String getInterval(int index) { - return Binding.rowGetIntervalValue(pointer, index); + return Binding.iteratorGetIntervalValue(pointer, index); } // string representation of jsonb: '{"key": "value"}' public String getJsonb(int index) { - return Binding.rowGetJsonbValue(pointer, index); + return Binding.iteratorGetJsonbValue(pointer, index); } public byte[] getBytea(int index) { - return Binding.rowGetByteaValue(pointer, index); + return Binding.iteratorGetByteaValue(pointer, index); } /** @@ -92,16 +90,8 @@ public byte[] getBytea(int index) { * Object[] elements) */ public Object[] getArray(int index, Class clazz) { - var val = Binding.rowGetArrayValue(pointer, index, clazz); + var val = Binding.iteratorGetArrayValue(pointer, index, clazz); assert (val instanceof Object[]); return (Object[]) val; } - - @Override - public void close() { - if (!isClosed) { - isClosed = true; - Binding.rowClose(pointer); - } - } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java index f72c63ae6d3e6..732bb2e93c532 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/Binding.java @@ -30,65 +30,52 @@ public class Binding { // hummock iterator method // Return a pointer to the iterator - static native long hummockIteratorNew(byte[] readPlan); + static native long iteratorNewHummock(byte[] readPlan); - // return a pointer to the next row - static native long hummockIteratorNext(long pointer); + static native boolean iteratorNext(long pointer); - // Since the underlying rust does not have garbage collection, we will have to manually call - // close on the iterator to release the iterator instance pointed by the pointer. - static native void hummockIteratorClose(long pointer); + static native void iteratorClose(long pointer); - // row method - static native byte[] rowGetKey(long pointer); + static native long iteratorNewFromStreamChunkPayload(byte[] streamChunkPayload); - static native int rowGetOp(long pointer); + static native long iteratorNewFromStreamChunkPretty(String str); - static native boolean rowIsNull(long pointer, int index); + static native byte[] iteratorGetKey(long pointer); - static native short rowGetInt16Value(long pointer, int index); + static native int iteratorGetOp(long pointer); - static native int rowGetInt32Value(long pointer, int index); + static native boolean iteratorIsNull(long pointer, int index); - static native long rowGetInt64Value(long pointer, int index); + static native short iteratorGetInt16Value(long pointer, int index); - static native float rowGetFloatValue(long pointer, int index); + static native int iteratorGetInt32Value(long pointer, int index); - static native double rowGetDoubleValue(long pointer, int index); + static native long iteratorGetInt64Value(long pointer, int index); - static native boolean rowGetBooleanValue(long pointer, int index); + static native float iteratorGetFloatValue(long pointer, int index); - static native String rowGetStringValue(long pointer, int index); + static native double iteratorGetDoubleValue(long pointer, int index); - static native java.sql.Timestamp rowGetTimestampValue(long pointer, int index); + static native boolean iteratorGetBooleanValue(long pointer, int index); - static native java.math.BigDecimal rowGetDecimalValue(long pointer, int index); + static native String iteratorGetStringValue(long pointer, int index); - static native java.sql.Time rowGetTimeValue(long pointer, int index); + static native java.sql.Timestamp iteratorGetTimestampValue(long pointer, int index); - static native java.sql.Date rowGetDateValue(long pointer, int index); + static native java.math.BigDecimal iteratorGetDecimalValue(long pointer, int index); - static native String rowGetIntervalValue(long pointer, int index); + static native java.sql.Time iteratorGetTimeValue(long pointer, int index); - static native String rowGetJsonbValue(long pointer, int index); + static native java.sql.Date iteratorGetDateValue(long pointer, int index); - static native byte[] rowGetByteaValue(long pointer, int index); + static native String iteratorGetIntervalValue(long pointer, int index); - // TODO: object or object array? - static native Object rowGetArrayValue(long pointer, int index, Class clazz); - - // Since the underlying rust does not have garbage collection, we will have to manually call - // close on the row to release the row instance pointed by the pointer. - static native void rowClose(long pointer); - - // stream chunk iterator method - static native long streamChunkIteratorNew(byte[] streamChunkPayload); + static native String iteratorGetJsonbValue(long pointer, int index); - static native long streamChunkIteratorNext(long pointer); + static native byte[] iteratorGetByteaValue(long pointer, int index); - static native void streamChunkIteratorClose(long pointer); - - static native long streamChunkIteratorFromPretty(String str); + // TODO: object or object array? + static native Object iteratorGetArrayValue(long pointer, int index, Class clazz); public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg); diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java b/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java index ced034fd649d9..cf88068ddf615 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/HummockIterator.java @@ -21,13 +21,13 @@ public class HummockIterator implements AutoCloseable { private boolean isClosed; public HummockIterator(ReadPlan readPlan) { - this.pointer = Binding.hummockIteratorNew(readPlan.toByteArray()); + this.pointer = Binding.iteratorNewHummock(readPlan.toByteArray()); this.isClosed = false; } public KeyedRow next() { - long pointer = Binding.hummockIteratorNext(this.pointer); - if (pointer == 0) { + boolean hasNext = Binding.iteratorNext(this.pointer); + if (!hasNext) { return null; } return new KeyedRow(pointer); @@ -37,7 +37,7 @@ public KeyedRow next() { public void close() { if (!isClosed) { isClosed = true; - Binding.hummockIteratorClose(pointer); + Binding.iteratorClose(pointer); } } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java index 6bbfdaafebabc..8f1e0b0117ac4 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/KeyedRow.java @@ -20,6 +20,6 @@ public KeyedRow(long pointer) { } public byte[] getKey() { - return Binding.rowGetKey(pointer); + return Binding.iteratorGetKey(pointer); } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java index 89693befff700..5b300872bed51 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkIterator.java @@ -19,7 +19,7 @@ public class StreamChunkIterator implements AutoCloseable { private boolean isClosed; public StreamChunkIterator(byte[] streamChunkPayload) { - this.pointer = Binding.streamChunkIteratorNew(streamChunkPayload); + this.pointer = Binding.iteratorNewFromStreamChunkPayload(streamChunkPayload); this.isClosed = false; } @@ -30,13 +30,13 @@ public StreamChunkIterator(byte[] streamChunkPayload) { * 40" */ public StreamChunkIterator(String str) { - this.pointer = Binding.streamChunkIteratorFromPretty(str); + this.pointer = Binding.iteratorNewFromStreamChunkPretty(str); this.isClosed = false; } public StreamChunkRow next() { - long pointer = Binding.streamChunkIteratorNext(this.pointer); - if (pointer == 0) { + boolean hasNext = Binding.iteratorNext(this.pointer); + if (!hasNext) { return null; } return new StreamChunkRow(pointer); @@ -46,7 +46,7 @@ public StreamChunkRow next() { public void close() { if (!isClosed) { isClosed = true; - Binding.streamChunkIteratorClose(pointer); + Binding.iteratorClose(pointer); } } } diff --git a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java index 401d3d98f766d..2825d62a0b0ca 100644 --- a/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java +++ b/java/java-binding/src/main/java/com/risingwave/java/binding/StreamChunkRow.java @@ -22,6 +22,6 @@ public StreamChunkRow(long pointer) { } public Data.Op getOp() { - return Data.Op.forNumber(Binding.rowGetOp(pointer)); + return Data.Op.forNumber(Binding.iteratorGetOp(pointer)); } } diff --git a/java/pom.xml b/java/pom.xml index 28d7a688a5aef..e72e831b798e5 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -60,6 +60,8 @@ 11 11 1.0.0 + UTF-8 + UTF-8 3.21.1 1.53.0 2.10 diff --git a/src/jni_core/src/hummock_iterator.rs b/src/jni_core/src/hummock_iterator.rs index 7395a0f82273f..5917d08582998 100644 --- a/src/jni_core/src/hummock_iterator.rs +++ b/src/jni_core/src/hummock_iterator.rs @@ -48,22 +48,6 @@ fn select_all_vnode_stream( pub struct HummockJavaBindingIterator { row_serde: EitherSerde, stream: SelectAllIterStream, - pub class_cache: Arc, -} - -pub struct KeyedRow { - key: Bytes, - row: OwnedRow, -} - -impl KeyedRow { - pub fn key(&self) -> &[u8] { - self.key.as_ref() - } - - pub fn row(&self) -> &OwnedRow { - &self.row - } } impl HummockJavaBindingIterator { @@ -136,24 +120,20 @@ impl HummockJavaBindingIterator { .into() }; - Ok(Self { - row_serde, - stream, - class_cache: Default::default(), - }) + Ok(Self { row_serde, stream }) } - pub async fn next(&mut self) -> StorageResult> { + pub async fn next(&mut self) -> StorageResult> { let item = self.stream.try_next().await?; Ok(match item { - Some((key, value)) => Some(KeyedRow { - key: key.user_key.table_key.0, - row: OwnedRow::new( + Some((key, value)) => Some(( + key.user_key.table_key.0, + OwnedRow::new( self.row_serde .deserialize(&value) .map_err(StorageError::DeserializeRow)?, ), - }), + )), None => None, }) } diff --git a/src/jni_core/src/lib.rs b/src/jni_core/src/lib.rs index 3e6a12943b49e..61fe4ae8f2b1f 100644 --- a/src/jni_core/src/lib.rs +++ b/src/jni_core/src/lib.rs @@ -27,10 +27,10 @@ use std::backtrace::Backtrace; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::slice::from_raw_parts; -use std::sync::{Arc, LazyLock, OnceLock}; +use std::sync::{LazyLock, OnceLock}; +use bytes::Bytes; use cfg_or_panic::cfg_or_panic; -use hummock_iterator::{HummockJavaBindingIterator, KeyedRow}; use jni::objects::{ AutoElements, GlobalRef, JByteArray, JClass, JMethodID, JObject, JStaticMethodID, JString, JValue, JValueGen, JValueOwned, ReleaseMode, @@ -50,13 +50,15 @@ use risingwave_common::util::panic::rw_catch_unwind; use risingwave_pb::connector_service::{ GetEventStreamResponse, SinkWriterStreamRequest, SinkWriterStreamResponse, }; +use risingwave_pb::data::Op; use risingwave_storage::error::StorageError; use thiserror::Error; use tokio::runtime::Runtime; use tokio::sync::mpsc::{Receiver, Sender}; +use crate::hummock_iterator::HummockJavaBindingIterator; pub use crate::jvm_runtime::register_native_method_for_jvm; -use crate::stream_chunk_iterator::{StreamChunkIterator, StreamChunkRow}; +use crate::stream_chunk_iterator::{into_iter, StreamChunkRowIterator}; pub type GetEventStreamJniSender = Sender; static RUNTIME: LazyLock = LazyLock::new(|| tokio::runtime::Runtime::new().unwrap()); @@ -147,15 +149,6 @@ impl From for Pointer<'static, T> { } } -impl Pointer<'static, T> { - fn null() -> Self { - Pointer { - pointer: 0, - _phantom: PhantomData, - } - } -} - impl<'a, T> Pointer<'a, T> { fn as_ref(&self) -> &'a T { debug_assert!(self.pointer != 0); @@ -232,12 +225,8 @@ where } } -pub enum JavaBindingRowInner { - Keyed(KeyedRow), - StreamChunk(StreamChunkRow), -} #[derive(Default)] -pub struct JavaClassMethodCache { +struct JavaClassMethodCache { big_decimal_ctor: OnceLock<(GlobalRef, JMethodID)>, timestamp_ctor: OnceLock<(GlobalRef, JMethodID)>, @@ -245,177 +234,209 @@ pub struct JavaClassMethodCache { time_ctor: OnceLock<(GlobalRef, JStaticMethodID)>, } -pub struct JavaBindingRow { - inner: JavaBindingRowInner, - class_cache: Arc, +enum JavaBindingIteratorInner { + Hummock(HummockJavaBindingIterator), + StreamChunk(StreamChunkRowIterator), } -impl JavaBindingRow { - fn with_stream_chunk( - underlying: StreamChunkRow, - class_cache: Arc, - ) -> Self { - Self { - inner: JavaBindingRowInner::StreamChunk(underlying), - class_cache, - } - } +enum RowExtra { + Op(Op), + Key(Bytes), +} - fn with_keyed(underlying: KeyedRow, class_cache: Arc) -> Self { - Self { - inner: JavaBindingRowInner::Keyed(underlying), - class_cache, +impl RowExtra { + fn as_op(&self) -> Op { + match self { + RowExtra::Op(op) => *op, + RowExtra::Key(_) => unreachable!("should be op"), } } - fn as_keyed(&self) -> &KeyedRow { - match &self.inner { - JavaBindingRowInner::Keyed(r) => r, - _ => unreachable!("can only call as_keyed for KeyedRow"), + fn as_key(&self) -> &Bytes { + match self { + RowExtra::Key(key) => key, + RowExtra::Op(_) => unreachable!("should be key"), } } +} - fn as_stream_chunk(&self) -> &StreamChunkRow { - match &self.inner { - JavaBindingRowInner::StreamChunk(r) => r, - _ => unreachable!("can only call as_stream_chunk for StreamChunkRow"), - } - } +struct RowCursor { + row: OwnedRow, + extra: RowExtra, } -impl Deref for JavaBindingRow { +struct JavaBindingIterator { + inner: JavaBindingIteratorInner, + cursor: Option, + class_cache: JavaClassMethodCache, +} + +impl Deref for JavaBindingIterator { type Target = OwnedRow; fn deref(&self) -> &Self::Target { - match &self.inner { - JavaBindingRowInner::Keyed(r) => r.row(), - JavaBindingRowInner::StreamChunk(r) => r.row(), - } + &self + .cursor + .as_ref() + .expect("should exist when call row methods") + .row } } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount( - _env: EnvParam<'_>, -) -> jint { +extern "system" fn Java_com_risingwave_java_binding_Binding_vnodeCount(_env: EnvParam<'_>) -> jint { VirtualNode::COUNT as jint } #[cfg_or_panic(not(madsim))] #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNew<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNewHummock<'a>( env: EnvParam<'a>, read_plan: JByteArray<'a>, -) -> Pointer<'static, HummockJavaBindingIterator> { +) -> Pointer<'static, JavaBindingIterator> { execute_and_catch(env, move |env| { let read_plan = Message::decode(to_guarded_slice(&read_plan, env)?.deref())?; let iter = RUNTIME.block_on(HummockJavaBindingIterator::new(read_plan))?; + let iter = JavaBindingIterator { + inner: JavaBindingIteratorInner::Hummock(iter), + cursor: None, + class_cache: Default::default(), + }; Ok(iter.into()) }) } #[cfg_or_panic(not(madsim))] #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorNext<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>( env: EnvParam<'a>, - mut pointer: Pointer<'a, HummockJavaBindingIterator>, -) -> Pointer<'static, JavaBindingRow> { + mut pointer: Pointer<'a, JavaBindingIterator>, +) -> jboolean { execute_and_catch(env, move |_env| { let iter = pointer.as_mut(); - match RUNTIME.block_on(iter.next())? { - None => Ok(Pointer::null()), - Some(row) => Ok(JavaBindingRow::with_keyed(row, iter.class_cache.clone()).into()), + match &mut iter.inner { + JavaBindingIteratorInner::Hummock(ref mut hummock_iter) => { + match RUNTIME.block_on(hummock_iter.next())? { + None => { + iter.cursor = None; + Ok(JNI_FALSE) + } + Some((key, row)) => { + iter.cursor = Some(RowCursor { + row, + extra: RowExtra::Key(key), + }); + Ok(JNI_TRUE) + } + } + } + JavaBindingIteratorInner::StreamChunk(ref mut stream_chunk_iter) => { + match stream_chunk_iter.next() { + None => { + iter.cursor = None; + Ok(JNI_FALSE) + } + Some((op, row)) => { + iter.cursor = Some(RowCursor { + row, + extra: RowExtra::Op(op), + }); + Ok(JNI_TRUE) + } + } + } } }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_hummockIteratorClose( - _env: EnvParam<'_>, - pointer: Pointer<'_, HummockJavaBindingIterator>, +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorClose<'a>( + _env: EnvParam<'a>, + pointer: Pointer<'a, JavaBindingIterator>, ) { - pointer.drop(); + pointer.drop() } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorNew<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNewFromStreamChunkPayload< + 'a, +>( env: EnvParam<'a>, stream_chunk_payload: JByteArray<'a>, -) -> Pointer<'static, StreamChunkIterator> { +) -> Pointer<'static, JavaBindingIterator> { execute_and_catch(env, move |env| { let prost_stream_chumk = Message::decode(to_guarded_slice(&stream_chunk_payload, env)?.deref())?; - let iter = StreamChunkIterator::new(StreamChunk::from_protobuf(&prost_stream_chumk)?); + let iter = into_iter(StreamChunk::from_protobuf(&prost_stream_chumk)?); + let iter = JavaBindingIterator { + inner: JavaBindingIteratorInner::StreamChunk(iter), + cursor: None, + class_cache: Default::default(), + }; Ok(iter.into()) }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorFromPretty< - 'a, ->( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNewFromStreamChunkPretty<'a>( env: EnvParam<'a>, str: JString<'a>, -) -> Pointer<'static, StreamChunkIterator> { +) -> Pointer<'static, JavaBindingIterator> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { - let iter = StreamChunkIterator::new(StreamChunk::from_pretty( + let iter = into_iter(StreamChunk::from_pretty( env.get_string(&str) .expect("cannot get java string") .to_str() .unwrap(), )); + let iter = JavaBindingIterator { + inner: JavaBindingIteratorInner::StreamChunk(iter), + cursor: None, + class_cache: Default::default(), + }; Ok(iter.into()) }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorNext<'a>( - env: EnvParam<'a>, - mut pointer: Pointer<'a, StreamChunkIterator>, -) -> Pointer<'static, JavaBindingRow> { - execute_and_catch(env, move |_env| { - let iter = pointer.as_mut(); - match iter.next() { - None => Ok(Pointer::null()), - Some(row) => { - Ok(JavaBindingRow::with_stream_chunk(row, iter.class_cache.clone()).into()) - } - } - }) -} - -#[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_streamChunkIteratorClose( - _env: EnvParam<'_>, - pointer: Pointer<'_, StreamChunkIterator>, -) { - pointer.drop(); -} - -#[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetKey<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetKey<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, ) -> JByteArray<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { - Ok(env.byte_array_from_slice(pointer.as_ref().as_keyed().key())?) + Ok(env.byte_array_from_slice( + pointer + .as_ref() + .cursor + .as_ref() + .expect("should exists when call get key") + .extra + .as_key() + .as_ref(), + )?) }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetOp<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetOp<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, ) -> jint { execute_and_catch(env, move |_env| { - Ok(pointer.as_ref().as_stream_chunk().op() as jint) + Ok(pointer + .as_ref() + .cursor + .as_ref() + .expect("should exist when call get op") + .extra + .as_op() as jint) }) } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowIsNull<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorIsNull<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jboolean { execute_and_catch(env, move |_env| { @@ -424,9 +445,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowIsNull<'a>( } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt16Value<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt16Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jshort { execute_and_catch(env, move |_env| { @@ -439,9 +460,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt16Value } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt32Value<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt32Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jint { execute_and_catch(env, move |_env| { @@ -454,9 +475,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt32Value } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt64Value<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetInt64Value<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jlong { execute_and_catch(env, move |_env| { @@ -469,9 +490,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetInt64Value } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetFloatValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetFloatValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jfloat { execute_and_catch(env, move |_env| { @@ -485,9 +506,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetFloatValue } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDoubleValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDoubleValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jdouble { execute_and_catch(env, move |_env| { @@ -501,9 +522,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDoubleValu } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetBooleanValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetBooleanValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> jboolean { execute_and_catch(env, move |_env| { @@ -512,9 +533,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetBooleanVal } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetStringValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JString<'a> { execute_and_catch(env, move |env: &mut EnvParam<'a>| { @@ -523,9 +544,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetStringValu } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetIntervalValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetIntervalValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JString<'a> { execute_and_catch(env, move |env: &mut EnvParam<'a>| { @@ -540,9 +561,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetIntervalVa } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetJsonbValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetJsonbValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JString<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -557,9 +578,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetJsonbValue } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimestampValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JObject<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -589,9 +610,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimestampV } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDecimalValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JObject<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -626,9 +647,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDecimalVal } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetDateValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JObject<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -672,9 +693,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetDateValue< } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimeValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetTimeValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JObject<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -718,9 +739,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetTimeValue< } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetByteaValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetByteaValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, ) -> JByteArray<'a> { execute_and_catch(env, move |env: &mut EnvParam<'_>| { @@ -734,9 +755,9 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetByteaValue } #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetArrayValue<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorGetArrayValue<'a>( env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, + pointer: Pointer<'a, JavaBindingIterator>, idx: jint, class: JClass<'a>, ) -> JObject<'a> { @@ -824,20 +845,12 @@ pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowGetArrayValue }) } -#[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_rowClose<'a>( - _env: EnvParam<'a>, - pointer: Pointer<'a, JavaBindingRow>, -) { - pointer.drop() -} - /// Send messages to the channel received by `CdcSplitReader`. /// If msg is null, just check whether the channel is closed. /// Return true if sending is successful, otherwise, return false so that caller can stop /// gracefully. #[no_mangle] -pub extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>( +extern "system" fn Java_com_risingwave_java_binding_Binding_sendCdcSourceMsgToChannel<'a>( env: EnvParam<'a>, channel: Pointer<'a, GetEventStreamJniSender>, msg: JByteArray<'a>, @@ -910,7 +923,7 @@ mod tests { use risingwave_common::types::Timestamptz; /// make sure that the [`ScalarRefImpl::Int64`] received by - /// [`Java_com_risingwave_java_binding_Binding_rowGetTimestampValue`] + /// [`Java_com_risingwave_java_binding_Binding_iteratorGetTimestampValue`] /// is of type [`DataType::Timestamptz`] stored in microseconds #[test] fn test_timestamptz_to_i64() { diff --git a/src/jni_core/src/macros.rs b/src/jni_core/src/macros.rs index a4fe12f669193..be476c146d756 100644 --- a/src/jni_core/src/macros.rs +++ b/src/jni_core/src/macros.rs @@ -106,65 +106,52 @@ macro_rules! for_all_plain_native_methods { // hummock iterator method // Return a pointer to the iterator - static native long hummockIteratorNew(byte[] readPlan); + static native long iteratorNewHummock(byte[] readPlan); - // return a pointer to the next row - static native long hummockIteratorNext(long pointer); + static native boolean iteratorNext(long pointer); - // Since the underlying rust does not have garbage collection, we will have to manually call - // close on the iterator to release the iterator instance pointed by the pointer. - static native void hummockIteratorClose(long pointer); + static native void iteratorClose(long pointer); - // row method - static native byte[] rowGetKey(long pointer); + static native long iteratorNewFromStreamChunkPayload(byte[] streamChunkPayload); - static native int rowGetOp(long pointer); + static native long iteratorNewFromStreamChunkPretty(String str); - static native boolean rowIsNull(long pointer, int index); + static native byte[] iteratorGetKey(long pointer); - static native short rowGetInt16Value(long pointer, int index); + static native int iteratorGetOp(long pointer); - static native int rowGetInt32Value(long pointer, int index); + static native boolean iteratorIsNull(long pointer, int index); - static native long rowGetInt64Value(long pointer, int index); + static native short iteratorGetInt16Value(long pointer, int index); - static native float rowGetFloatValue(long pointer, int index); + static native int iteratorGetInt32Value(long pointer, int index); - static native double rowGetDoubleValue(long pointer, int index); + static native long iteratorGetInt64Value(long pointer, int index); - static native boolean rowGetBooleanValue(long pointer, int index); + static native float iteratorGetFloatValue(long pointer, int index); - static native String rowGetStringValue(long pointer, int index); + static native double iteratorGetDoubleValue(long pointer, int index); - static native java.sql.Timestamp rowGetTimestampValue(long pointer, int index); + static native boolean iteratorGetBooleanValue(long pointer, int index); - static native java.math.BigDecimal rowGetDecimalValue(long pointer, int index); + static native String iteratorGetStringValue(long pointer, int index); - static native java.sql.Time rowGetTimeValue(long pointer, int index); + static native java.sql.Timestamp iteratorGetTimestampValue(long pointer, int index); - static native java.sql.Date rowGetDateValue(long pointer, int index); + static native java.math.BigDecimal iteratorGetDecimalValue(long pointer, int index); - static native String rowGetIntervalValue(long pointer, int index); + static native java.sql.Time iteratorGetTimeValue(long pointer, int index); - static native String rowGetJsonbValue(long pointer, int index); + static native java.sql.Date iteratorGetDateValue(long pointer, int index); - static native byte[] rowGetByteaValue(long pointer, int index); + static native String iteratorGetIntervalValue(long pointer, int index); - // TODO: object or object array? - static native Object rowGetArrayValue(long pointer, int index, Class clazz); - - // Since the underlying rust does not have garbage collection, we will have to manually call - // close on the row to release the row instance pointed by the pointer. - static native void rowClose(long pointer); - - // stream chunk iterator method - static native long streamChunkIteratorNew(byte[] streamChunkPayload); + static native String iteratorGetJsonbValue(long pointer, int index); - static native long streamChunkIteratorNext(long pointer); + static native byte[] iteratorGetByteaValue(long pointer, int index); - static native void streamChunkIteratorClose(long pointer); - - static native long streamChunkIteratorFromPretty(String str); + // TODO: object or object array? + static native Object iteratorGetArrayValue(long pointer, int index, Class clazz); public static native boolean sendCdcSourceMsgToChannel(long channelPtr, byte[] msg); diff --git a/src/jni_core/src/stream_chunk_iterator.rs b/src/jni_core/src/stream_chunk_iterator.rs index d62117a0aa108..49d096d30339e 100644 --- a/src/jni_core/src/stream_chunk_iterator.rs +++ b/src/jni_core/src/stream_chunk_iterator.rs @@ -12,51 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use itertools::Itertools; use risingwave_common::array::StreamChunk; use risingwave_common::row::{OwnedRow, Row}; use risingwave_pb::data::Op; -pub struct StreamChunkRow { - op: Op, - row: OwnedRow, -} - -impl StreamChunkRow { - pub fn op(&self) -> Op { - self.op - } - - pub fn row(&self) -> &OwnedRow { - &self.row - } -} - -type StreamChunkRowIterator = impl Iterator + 'static; - -pub struct StreamChunkIterator { - iter: StreamChunkRowIterator, - pub class_cache: Arc, -} - -impl StreamChunkIterator { - pub(crate) fn new(stream_chunk: StreamChunk) -> Self { - Self { - iter: stream_chunk - .rows() - .map(|(op, row_ref)| StreamChunkRow { - op: op.to_protobuf(), - row: row_ref.to_owned_row(), - }) - .collect_vec() - .into_iter(), - class_cache: Default::default(), - } - } +pub(crate) type StreamChunkRowIterator = impl Iterator + 'static; - pub(crate) fn next(&mut self) -> Option { - self.iter.next() - } +pub(crate) fn into_iter(stream_chunk: StreamChunk) -> StreamChunkRowIterator { + stream_chunk + .rows() + .map(|(op, row_ref)| (op.to_protobuf(), row_ref.to_owned_row())) + .collect_vec() + .into_iter() }