Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support jdbc sink async truncate #13190

Merged
merged 23 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6b0e60b
refactor(sink): reimplement remote sink without writer
wenym1 Oct 29, 2023
b2e62a4
keep polling response stream
wenym1 Oct 30, 2023
9b8ca15
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Oct 31, 2023
b74e04b
use offset as the consuming state and reset batch id to null in java
wenym1 Oct 31, 2023
4535b9b
start epoch
wenym1 Oct 31, 2023
d937d44
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Oct 31, 2023
30a5386
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Oct 31, 2023
3761540
remove RemoteSinkWriter
wenym1 Oct 31, 2023
5381129
feat(sink): support jdbc sink async truncate
wenym1 Nov 1, 2023
c03963e
start epoch also for state just receive barrier
wenym1 Nov 2, 2023
0224932
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Nov 2, 2023
0416c0f
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Nov 2, 2023
14bea90
Merge branch 'yiming/remote-log-sinker-refactor' into yiming/jdbc-asy…
wenym1 Nov 2, 2023
c2b3ba0
fix
wenym1 Nov 2, 2023
1449692
fix comment
wenym1 Nov 3, 2023
4a708eb
Merge branch 'main' into yiming/remote-log-sinker-refactor
wenym1 Nov 6, 2023
06b4475
Merge branch 'yiming/remote-log-sinker-refactor' into yiming/jdbc-asy…
wenym1 Nov 6, 2023
7d64ae5
remove is_async_truncate flag
wenym1 Nov 6, 2023
e41af89
enable kv log store for append only jdbc sink by default
wenym1 Nov 7, 2023
2568f82
Merge branch 'main' into yiming/jdbc-async-truncate
wenym1 Nov 7, 2023
85340c8
port the async truncate logic to jdbc sink
wenym1 Nov 8, 2023
f038997
add java doc for SinkWriter interface
wenym1 Nov 8, 2023
095d91f
minor rename
wenym1 Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public interface SinkWriter {
void beginEpoch(long epoch);

void write(Iterable<SinkRow> rows);
boolean write(Iterable<SinkRow> rows);
wenym1 marked this conversation as resolved.
Show resolved Hide resolved

Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,12 @@ public SinkWriterV1 getInner() {
public void beginEpoch(long epoch) {}

@Override
public void write(Iterable<SinkRow> rows) {
public boolean write(Iterable<SinkRow> rows) {
if (!hasBegun) {
hasBegun = true;
}
this.inner.write(rows.iterator());
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,29 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
.asRuntimeException();
}

boolean batchWritten;

try (CloseableIterable<SinkRow> rowIter = deserializer.deserialize(batch)) {
sink.write(
new MonitoredRowIterable(
rowIter, connectorName, String.valueOf(sinkId)));
batchWritten =
sink.write(
new MonitoredRowIterable(
rowIter, connectorName, String.valueOf(sinkId)));
}

currentBatchId = batch.getBatchId();

if (batchWritten) {
responseObserver.onNext(
ConnectorServiceProto.SinkWriterStreamResponse.newBuilder()
.setBatch(
ConnectorServiceProto.SinkWriterStreamResponse
.BatchWrittenResponse.newBuilder()
.setEpoch(currentEpoch)
.setBatchId(currentBatchId)
.build())
.build());
}

LOG.debug("Batch {} written to epoch {}", currentBatchId, batch.getEpoch());
} else if (sinkTask.hasBarrier()) {
if (!isInitialized()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import static org.junit.Assert.*;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.risingwave.connector.JDBCSink;
import com.risingwave.connector.JDBCSinkConfig;
Expand All @@ -26,6 +25,7 @@
import com.risingwave.proto.Data.DataType.TypeName;
import com.risingwave.proto.Data.Op;
import java.sql.*;
import java.util.Arrays;
import org.junit.Test;
import org.testcontainers.containers.JdbcDatabaseContainer;
import org.testcontainers.containers.MySQLContainer;
Expand Down Expand Up @@ -84,7 +84,7 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
Connection conn = sink.getConn();

sink.write(
Iterators.forArray(
Arrays.asList(
new ArraySinkRow(
Op.INSERT,
1,
Expand All @@ -94,7 +94,7 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();
sink.barrier(true);

Statement stmt = conn.createStatement();
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
Expand All @@ -106,7 +106,7 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
}

sink.write(
Iterators.forArray(
Arrays.asList(
new ArraySinkRow(
Op.INSERT,
2,
Expand All @@ -116,7 +116,7 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
new Timestamp(1000000000),
"{\"key\": \"password\", \"value\": \"Singularity123\"}",
"I want to sleep".getBytes())));
sink.sync();
sink.barrier(true);
try (var rs = stmt.executeQuery(String.format("SELECT * FROM %s", tableName))) {
int count;
for (count = 0; rs.next(); ) {
Expand All @@ -126,7 +126,7 @@ static void testJDBCSync(JdbcDatabaseContainer<?> container, TestType testType)
}
stmt.close();

sink.sync();
sink.barrier(true);
sink.drop();
}

Expand All @@ -144,7 +144,7 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
Statement stmt = conn.createStatement();

sink.write(
Iterators.forArray(
Arrays.asList(
new ArraySinkRow(
Op.INSERT,
1,
Expand All @@ -171,7 +171,7 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
}

sink.write(
Iterators.forArray(
Arrays.asList(
new ArraySinkRow(
Op.UPDATE_DELETE,
1,
Expand Down Expand Up @@ -216,7 +216,7 @@ static void testJDBCWrite(JdbcDatabaseContainer<?> container, TestType testType)
assertFalse(rs.next());
}

sink.sync();
sink.barrier(true);
stmt.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AppendOnlyIcebergSinkWriter(
}

@Override
public void write(Iterable<SinkRow> rows) {
public boolean write(Iterable<SinkRow> rows) {
for (SinkRow row : rows) {
switch (row.getOp()) {
case INSERT:
Expand Down Expand Up @@ -106,6 +106,7 @@ public void write(Iterable<SinkRow> rows) {
.asRuntimeException();
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private List<Comparable<Object>> getKeyFromRow(SinkRow row) {
}

@Override
public void write(Iterable<SinkRow> rows) {
public boolean write(Iterable<SinkRow> rows) {
for (SinkRow row : rows) {
if (row.size() != tableSchema.getColumnNames().length) {
throw Status.FAILED_PRECONDITION
Expand Down Expand Up @@ -188,6 +188,7 @@ public void write(Iterable<SinkRow> rows) {
.asRuntimeException();
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@

import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkRow;
import com.risingwave.connector.api.sink.SinkWriterBase;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.jdbc.JdbcDialect;
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.Data;
import io.grpc.Status;
import java.sql.*;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JDBCSink extends SinkWriterBase {
public class JDBCSink implements SinkWriter {
private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s";

private final JdbcDialect jdbcDialect;
private final JDBCSinkConfig config;
private final Connection conn;
private final List<String> pkColumnNames;

private final TableSchema tableSchema;

public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME";
public static final String JDBC_DATA_TYPE_KEY = "DATA_TYPE";

Expand All @@ -45,7 +48,7 @@ public class JDBCSink extends SinkWriterBase {
private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class);

public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) {
super(tableSchema);
this.tableSchema = tableSchema;

var jdbcUrl = config.getJdbcUrl().toLowerCase();
var factory = JdbcUtils.getDialectFactory(jdbcUrl);
Expand Down Expand Up @@ -175,7 +178,7 @@ private PreparedStatement prepareInsertStatement(SinkRow row) {
}
try {
var preparedStmt = insertPreparedStmt;
jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, getTableSchema(), row);
jdbcDialect.bindInsertIntoStatement(preparedStmt, conn, tableSchema, row);
preparedStmt.addBatch();
return preparedStmt;
} catch (SQLException e) {
Expand All @@ -192,15 +195,15 @@ private PreparedStatement prepareUpsertStatement(SinkRow row) {
var preparedStmt = upsertPreparedStmt;
switch (row.getOp()) {
case INSERT:
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
jdbcDialect.bindUpsertStatement(preparedStmt, conn, tableSchema, row);
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription("an UPDATE_DELETE should precede an UPDATE_INSERT")
.asRuntimeException();
}
jdbcDialect.bindUpsertStatement(preparedStmt, conn, getTableSchema(), row);
jdbcDialect.bindUpsertStatement(preparedStmt, conn, tableSchema, row);
updateFlag = false;
break;
default:
Expand Down Expand Up @@ -235,7 +238,7 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {
try {
int placeholderIdx = 1;
for (String primaryKey : pkColumnNames) {
Object fromRow = getTableSchema().getFromRow(primaryKey, row);
Object fromRow = tableSchema.getFromRow(primaryKey, row);
deletePreparedStmt.setObject(placeholderIdx++, fromRow);
}
deletePreparedStmt.addBatch();
Expand All @@ -249,13 +252,15 @@ private PreparedStatement prepareDeleteStatement(SinkRow row) {
}

@Override
public void write(Iterator<SinkRow> rows) {
public void beginEpoch(long epoch) {}

@Override
public boolean write(Iterable<SinkRow> rows) {
PreparedStatement deleteStatement = null;
PreparedStatement upsertStatement = null;
PreparedStatement insertStatement = null;

while (rows.hasNext()) {
SinkRow row = rows.next();
for (SinkRow row : rows) {
if (row.getOp() == Data.Op.UPDATE_DELETE) {
updateFlag = true;
continue;
Expand Down Expand Up @@ -285,6 +290,7 @@ public void write(Iterator<SinkRow> rows) {
String.format(ERROR_REPORT_TEMPLATE, e.getSQLState(), e.getMessage()))
.asRuntimeException();
}
return true;
}

private void executeStatement(PreparedStatement stmt) throws SQLException {
Expand All @@ -297,13 +303,14 @@ private void executeStatement(PreparedStatement stmt) throws SQLException {
}

@Override
public void sync() {
public Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint) {
if (updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription(
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
}
return Optional.empty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.connector.api.sink.SinkWriter;
import com.risingwave.connector.api.sink.SinkWriterV1;
import com.risingwave.proto.Catalog.SinkType;
import io.grpc.Status;
import java.sql.*;
Expand All @@ -40,7 +39,7 @@ public class JDBCSinkFactory implements SinkFactory {
public SinkWriter createWriter(TableSchema tableSchema, Map<String, String> tableProperties) {
ObjectMapper mapper = new ObjectMapper();
JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class);
return new SinkWriterV1.Adapter(new JDBCSink(config, tableSchema));
return new JDBCSink(config, tableSchema);
}

@Override
Expand Down
6 changes: 6 additions & 0 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,15 @@ message SinkWriterStreamResponse {
SinkMetadata metadata = 2;
}

message BatchWrittenResponse {
uint64 epoch = 1;
uint64 batch_id = 2;
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
}

oneof response {
StartResponse start = 1;
CommitResponse commit = 2;
BatchWrittenResponse batch = 3;
}
}

Expand Down
Loading
Loading