Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(java-binding): store java binding row in iter #12533

Merged
merged 13 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,4 @@ public Data.Op getOp() {
public int size() {
return values.length;
}

@Override
public void close() throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.risingwave.proto.Data;

public interface SinkRow extends AutoCloseable {
public interface SinkRow {
Object get(int index);

Data.Op getOp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,25 @@ public FileSink(FileSinkConfig config, TableSchema tableSchema) {
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
String buf =
new Gson()
.toJson(
IntStream.range(0, row.size())
.mapToObj(row::get)
.toArray());
try {
sinkWriter.write(buf + System.lineSeparator());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,12 @@ public CloseableIterator<SinkRow> deserialize(

static class StreamChunkRowWrapper implements SinkRow {

private boolean isClosed;
private final StreamChunkRow inner;
private final ValueGetter[] valueGetters;

StreamChunkRowWrapper(StreamChunkRow inner, ValueGetter[] valueGetters) {
this.inner = inner;
this.valueGetters = valueGetters;
this.isClosed = false;
}

@Override
Expand All @@ -275,14 +273,6 @@ public Data.Op getOp() {
public int size() {
return valueGetters.length;
}

@Override
public void close() {
if (!isClosed) {
this.isClosed = true;
inner.close();
}
}
}

static class StreamChunkIteratorWrapper implements CloseableIterator<SinkRow> {
Expand All @@ -299,13 +289,6 @@ public StreamChunkIteratorWrapper(StreamChunkIterator iter, ValueGetter[] valueG
@Override
public void close() {
iter.close();
try {
if (row != null) {
row.close();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,24 @@ public void write(Iterator<SinkRow> rows) {
}
}
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
GenericRecord record = new GenericData.Record(this.sinkSchema);
for (int i = 0; i < this.sinkSchema.getFields().size(); i++) {
record.put(i, row.get(i));
}
try {
this.parquetWriter.write(record);
this.numOutputRows += 1;
} catch (IOException ioException) {
throw INTERNAL.withCause(ioException).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
GenericRecord record = new GenericData.Record(this.sinkSchema);
for (int i = 0; i < this.sinkSchema.getFields().size(); i++) {
record.put(i, row.get(i));
}
try {
this.parquetWriter.write(record);
this.numOutputRows += 1;
} catch (IOException ioException) {
throw INTERNAL.withCause(ioException).asRuntimeException();
}
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,11 @@ private void writeRow(SinkRow row) throws JsonMappingException, JsonProcessingEx
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
SinkRow row = rows.next();
try {
writeRow(row);
} catch (Exception e) {
throw new RuntimeException(e);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,62 +55,57 @@ public AppendOnlyIcebergSinkWriter(
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
switch (row.getOp()) {
case INSERT:
Record record = GenericRecord.create(rowSchema);
if (row.size() != tableSchema.getColumnNames().length) {
throw INTERNAL.withDescription("row values do not match table schema")
SinkRow row = rows.next();
switch (row.getOp()) {
case INSERT:
Record record = GenericRecord.create(rowSchema);
if (row.size() != tableSchema.getColumnNames().length) {
throw INTERNAL.withDescription("row values do not match table schema")
.asRuntimeException();
}
for (int i = 0; i < rowSchema.columns().size(); i++) {
record.set(i, row.get(i));
}
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
DataWriter<Record> dataWriter;
if (dataWriterMap.containsKey(partitionKey)) {
dataWriter = dataWriterMap.get(partitionKey);
} else {
try {
String filename = fileFormat.addExtension(UUID.randomUUID().toString());
OutputFile outputFile =
icebergTable
.io()
.newOutputFile(
icebergTable.location()
+ "/data/"
+ icebergTable
.spec()
.partitionToPath(partitionKey)
+ "/"
+ filename);
dataWriter =
Parquet.writeData(outputFile)
.schema(rowSchema)
.withSpec(icebergTable.spec())
.withPartition(partitionKey)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.build();
} catch (Exception e) {
throw INTERNAL.withDescription("failed to create dataWriter")
.asRuntimeException();
}
for (int i = 0; i < rowSchema.columns().size(); i++) {
record.set(i, row.get(i));
}
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
DataWriter<Record> dataWriter;
if (dataWriterMap.containsKey(partitionKey)) {
dataWriter = dataWriterMap.get(partitionKey);
} else {
try {
String filename =
fileFormat.addExtension(UUID.randomUUID().toString());
OutputFile outputFile =
icebergTable
.io()
.newOutputFile(
icebergTable.location()
+ "/data/"
+ icebergTable
.spec()
.partitionToPath(
partitionKey)
+ "/"
+ filename);
dataWriter =
Parquet.writeData(outputFile)
.schema(rowSchema)
.withSpec(icebergTable.spec())
.withPartition(partitionKey)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.build();
} catch (Exception e) {
throw INTERNAL.withDescription("failed to create dataWriter")
.asRuntimeException();
}
dataWriterMap.put(partitionKey, dataWriter);
}
dataWriter.write(record);
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
dataWriterMap.put(partitionKey, dataWriter);
}
dataWriter.write(record);
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,57 +142,52 @@ private List<Comparable<Object>> getKeyFromRow(SinkRow row) {
@Override
public void write(Iterator<SinkRow> rows) {
while (rows.hasNext()) {
try (SinkRow row = rows.next()) {
if (row.size() != tableSchema.getColumnNames().length) {
throw Status.FAILED_PRECONDITION
.withDescription("row values do not match table schema")
.asRuntimeException();
}
Record record = newRecord(rowSchema, row);
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
SinkRowMap sinkRowMap;
if (sinkRowMapByPartition.containsKey(partitionKey)) {
sinkRowMap = sinkRowMapByPartition.get(partitionKey);
} else {
sinkRowMap = new SinkRowMap();
sinkRowMapByPartition.put(partitionKey, sinkRowMap);
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
if (!updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
SinkRow row = rows.next();
if (row.size() != tableSchema.getColumnNames().length) {
throw Status.FAILED_PRECONDITION
.withDescription("row values do not match table schema")
.asRuntimeException();
}
Record record = newRecord(rowSchema, row);
PartitionKey partitionKey =
new PartitionKey(icebergTable.spec(), icebergTable.schema());
partitionKey.partition(record);
SinkRowMap sinkRowMap;
if (sinkRowMapByPartition.containsKey(partitionKey)) {
sinkRowMap = sinkRowMapByPartition.get(partitionKey);
} else {
sinkRowMap = new SinkRowMap();
sinkRowMapByPartition.put(partitionKey, sinkRowMap);
}
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
break;
case DELETE:
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
break;
case UPDATE_DELETE:
if (updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
sinkRowMap.delete(getKeyFromRow(row), newRecord(deleteRowSchema, row));
updateBufferExists = true;
break;
case UPDATE_INSERT:
if (!updateBufferExists) {
throw Status.FAILED_PRECONDITION
.withDescription("an UPDATE_INSERT should precede an UPDATE_DELETE")
.asRuntimeException();
}
sinkRowMap.insert(getKeyFromRow(row), newRecord(rowSchema, row));
updateBufferExists = false;
break;
default:
throw UNIMPLEMENTED
.withDescription("unsupported operation: " + row.getOp())
.asRuntimeException();
}
}
}
Expand Down
Loading
Loading