Skip to content

Commit

Permalink
feat(java-binding): store java binding row in iter (#12533) (#12727)
Browse files Browse the repository at this point in the history
Co-authored-by: William Wen <[email protected]>
  • Loading branch information
github-actions[bot] and wenym1 authored Oct 10, 2023
1 parent fdaedaf commit a2a9886
Show file tree
Hide file tree
Showing 23 changed files with 410 additions and 526 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,4 @@ public Data.Op getOp() {
public int size() {
return values.length;
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.risingwave.proto.Data;

public interface SinkRow extends AutoCloseable {
public interface SinkRow {
Object get(int index);

Data.Op getOp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,25 @@ public FileSink(FileSinkConfig config, TableSchema tableSchema) {
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,12 @@ public CloseableIterator<SinkRow> deserialize(

static class StreamChunkRowWrapper implements SinkRow {

private boolean isClosed;
private final StreamChunkRow inner;
private final ValueGetter[] valueGetters;

StreamChunkRowWrapper(StreamChunkRow inner, ValueGetter[] valueGetters) {
this.inner = inner;
this.valueGetters = valueGetters;
this.isClosed = false;
}

@Override
Expand All @@ -275,14 +273,6 @@ public Data.Op getOp() {
public int size() {
return valueGetters.length;
}

@Override
public void close() {
if (!isClosed) {
this.isClosed = true;
inner.close();
}
}
}

static class StreamChunkIteratorWrapper implements CloseableIterator<SinkRow> {
Expand All @@ -299,13 +289,6 @@ public StreamChunkIteratorWrapper(StreamChunkIterator iter, ValueGetter[] valueG
@Override
public void close() {
iter.close();
try {
if (row != null) {
row.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,24 @@ public void write(Iterator<SinkRow> rows) {
}
}
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
GenericRecord record = new GenericData.Record(this.sinkSchema);
for (int i = 0; i < this.sinkSchema.getFields().size(); i++) {
record.put(i, row.get(i));
}
try {
this.parquetWriter.write(record);
this.numOutputRows += 1;
} catch (IOException ioException) {
throw INTERNAL.withCause(ioException).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
GenericRecord record = new GenericData.Record(this.sinkSchema);
for (int i = 0; i < this.sinkSchema.getFields().size(); i++) {
record.put(i, row.get(i));
}
try {
this.parquetWriter.write(record);
this.numOutputRows += 1;
} catch (IOException ioException) {
throw INTERNAL.withCause(ioException).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,11 @@ private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingEx
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
SinkRow row = rows.next();
try {
writeRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,62 +55,57 @@ public AppendOnlyIcebergSinkWriter(
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
Record record = GenericRecord.create(rowSchema);
if (row.size() != tableSchema.getColumnNames().length) {
throw INTERNAL.withDescription("row values do not match table schema")
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
Record record = GenericRecord.create(rowSchema);
if (row.size() != tableSchema.getColumnNames().length) {
throw INTERNAL.withDescription("row values do not match table schema")
.asRuntimeException();
}
for (int i = 0; i < rowSchema.columns().size(); i++) {
record.set(i, row.get(i));
}
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
DataWriter<Record> dataWriter;
if (dataWriterMap.containsKey(partitionKey)) {
dataWriter = dataWriterMap.get(partitionKey);
} else {
try {
String filename = fileFormat.addExtension(UUID.randomUUID().toString());
OutputFile outputFile =
icebergTable
.io()
.newOutputFile(
icebergTable.location()
+ "/data/"
+ icebergTable
.spec()
.partitionToPath(partitionKey)
+ "/"
+ filename);
dataWriter =
Parquet.writeData(outputFile)
.schema(rowSchema)
.withSpec(icebergTable.spec())
.withPartition(partitionKey)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.build();
} catch (Exception e) {
throw INTERNAL.withDescription("failed to create dataWriter")
.asRuntimeException();
}
for (int i = 0; i < rowSchema.columns().size(); i++) {
record.set(i, row.get(i));
}
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
DataWriter<Record> dataWriter;
if (dataWriterMap.containsKey(partitionKey)) {
dataWriter = dataWriterMap.get(partitionKey);
} else {
try {
String filename =
fileFormat.addExtension(UUID.randomUUID().toString());
OutputFile outputFile =
icebergTable
.io()
.newOutputFile(
icebergTable.location()
+ "/data/"
+ icebergTable
.spec()
.partitionToPath(
partitionKey)
+ "/"
+ filename);
dataWriter =
Parquet.writeData(outputFile)
.schema(rowSchema)
.withSpec(icebergTable.spec())
.withPartition(partitionKey)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.build();
} catch (Exception e) {
throw INTERNAL.withDescription("failed to create dataWriter")
.asRuntimeException();
}
dataWriterMap.put(partitionKey, dataWriter);
}
dataWriter.write(record);
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
dataWriterMap.put(partitionKey, dataWriter);
}
dataWriter.write(record);
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,57 +142,52 @@ private List<Comparable<Object>> getKeyFromRow(SinkRow row) {
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
if (row.size() != tableSchema.getColumnNames().length) {
throw Status.FAILED_PRECONDITION
.withDescription("row values do not match table schema")
.asRuntimeException();
}
Record record = newRecord(rowSchema, row);
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
SinkRowMap sinkRowMap;
if (sinkRowMapByPartition.containsKey(partitionKey)) {
sinkRowMap = sinkRowMapByPartition.get(partitionKey);
} else {
sinkRowMap = new SinkRowMap();
sinkRowMapByPartition.put(partitionKey, sinkRowMap);
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
if (!updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
SinkRow row = rows.next();
if (row.size() != tableSchema.getColumnNames().length) {
throw Status.FAILED_PRECONDITION
.withDescription("row values do not match table schema")
.asRuntimeException();
}
Record record = newRecord(rowSchema, row);
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
SinkRowMap sinkRowMap;
if (sinkRowMapByPartition.containsKey(partitionKey)) {
sinkRowMap = sinkRowMapByPartition.get(partitionKey);
} else {
sinkRowMap = new SinkRowMap();
sinkRowMapByPartition.put(partitionKey, sinkRowMap);
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
if (!updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Loading

0 comments on commit a2a9886

Please sign in to comment.