Skip to content

Commit

Permalink
refactor(sink): remove unnecessary StartEpoch of remote sink (#13806)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Dec 6, 2023
1 parent 59d56c3 commit 27fbce9
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 162 deletions.
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ steps:
retry: *auto-retry

- label: "connector node integration test Java {{matrix.java_version}}"
if: build.pull_request.labels includes "ci/run-java-connector-node-integration-tests" || build.env("CI_STEPS") =~ /(^|,)java-connector-node-integration-tests?(,|$$)/
if: build.pull_request.labels includes "ci/run-connector-node-integration-tests" || build.env("CI_STEPS") =~ /(^|,)java-connector-node-integration-tests?(,|$$)/
command: "ci/scripts/connector-node-integration-test.sh -p ci-dev -v {{matrix.java_version}}"
depends_on:
- "build"
Expand Down
7 changes: 0 additions & 7 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,6 @@ def test_sink(prop, format, payload_input, table_schema, is_coordinated=False):
epoch_list = []
# construct request
for payload in payload_input:
request_list.append(
connector_service_pb2.SinkWriterStreamRequest(
begin_epoch=connector_service_pb2.SinkWriterStreamRequest.BeginEpoch(
epoch=epoch
)
)
)
request_list.append(
connector_service_pb2.SinkWriterStreamRequest(
write_batch=connector_service_pb2.SinkWriterStreamRequest.WriteBatch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,48 @@ public class SinkWriterStreamObserver
private boolean finished = false;

private boolean epochStarted;
private long currentEpoch;
private Long currentEpoch;
private Long currentBatchId;

private Deserializer deserializer;
private final StreamObserver<ConnectorServiceProto.SinkWriterStreamResponse> responseObserver;

private static final Logger LOG = LoggerFactory.getLogger(SinkWriterStreamObserver.class);

public boolean isInitialized() {
private boolean isInitialized() {
return sink != null;
}

private void receiveEpoch(long epoch, String context) {
if (!isInitialized()) {
throw FAILED_PRECONDITION
.withDescription("Sink is not initialized. Invoke `CreateSink` first.")
.asRuntimeException();
}
if (!epochStarted) {
if (currentEpoch != null && epoch <= currentEpoch) {
throw FAILED_PRECONDITION
.withDescription(
String.format(
"in [%s], expect a new epoch higher than current epoch %s but got %s",
context, currentEpoch, epoch))
.asRuntimeException();
}
sink.beginEpoch(epoch);
epochStarted = true;
currentEpoch = epoch;
} else {
if (epoch != currentEpoch) {
throw INVALID_ARGUMENT
.withDescription(
String.format(
"in [%s] invalid epoch: expected write to epoch %s, got %s",
context, currentEpoch, epoch))
.asRuntimeException();
}
}
}

public SinkWriterStreamObserver(
StreamObserver<ConnectorServiceProto.SinkWriterStreamResponse> responseObserver) {
this.responseObserver = responseObserver;
Expand All @@ -72,49 +102,19 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
}
sinkId = sinkTask.getStart().getSinkParam().getSinkId();
bindSink(sinkTask.getStart().getSinkParam(), sinkTask.getStart().getFormat());
currentEpoch = null;
currentBatchId = null;
epochStarted = false;
responseObserver.onNext(
ConnectorServiceProto.SinkWriterStreamResponse.newBuilder()
.setStart(
ConnectorServiceProto.SinkWriterStreamResponse.StartResponse
.newBuilder())
.build());
} else if (sinkTask.hasBeginEpoch()) {
if (!isInitialized()) {
throw FAILED_PRECONDITION
.withDescription("sink is not initialized, please call start first")
.asRuntimeException();
}
if (epochStarted && sinkTask.getBeginEpoch().getEpoch() <= currentEpoch) {
throw INVALID_ARGUMENT
.withDescription(
"invalid epoch: new epoch ID should be larger than current epoch")
.asRuntimeException();
}
epochStarted = true;
currentEpoch = sinkTask.getBeginEpoch().getEpoch();
LOG.debug("Epoch {} started", currentEpoch);
} else if (sinkTask.hasWriteBatch()) {
if (!isInitialized()) {
throw FAILED_PRECONDITION
.withDescription("Sink is not initialized. Invoke `CreateSink` first.")
.asRuntimeException();
}
if (!epochStarted) {
throw FAILED_PRECONDITION
.withDescription("Epoch is not started. Invoke `StartEpoch` first.")
.asRuntimeException();
}
ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch batch =
sinkTask.getWriteBatch();
if (batch.getEpoch() != currentEpoch) {
throw INVALID_ARGUMENT
.withDescription(
"invalid epoch: expected write to epoch "
+ currentEpoch
+ ", got "
+ sinkTask.getWriteBatch().getEpoch())
.asRuntimeException();
}
receiveEpoch(batch.getEpoch(), "WriteBatch");
if (currentBatchId != null && batch.getBatchId() <= currentBatchId) {
throw INVALID_ARGUMENT
.withDescription(
Expand Down Expand Up @@ -150,29 +150,11 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {

LOG.debug("Batch {} written to epoch {}", currentBatchId, batch.getEpoch());
} else if (sinkTask.hasBarrier()) {
if (!isInitialized()) {
throw FAILED_PRECONDITION
.withDescription("Sink is not initialized. Invoke `Start` first.")
.asRuntimeException();
}
if (!epochStarted) {
throw FAILED_PRECONDITION
.withDescription("Epoch is not started. Invoke `StartEpoch` first.")
.asRuntimeException();
}
if (sinkTask.getBarrier().getEpoch() != currentEpoch) {
throw INVALID_ARGUMENT
.withDescription(
"invalid epoch: expected sync to epoch "
+ currentEpoch
+ ", got "
+ sinkTask.getBarrier().getEpoch())
.asRuntimeException();
}
boolean isCheckpoint = sinkTask.getBarrier().getIsCheckpoint();
ConnectorServiceProto.SinkWriterStreamRequest.Barrier barrier =
sinkTask.getBarrier();
receiveEpoch(barrier.getEpoch(), "Barrier");
boolean isCheckpoint = barrier.getIsCheckpoint();
Optional<ConnectorServiceProto.SinkMetadata> metadata = sink.barrier(isCheckpoint);
currentEpoch = sinkTask.getBarrier().getEpoch();
currentBatchId = null;
LOG.debug("Epoch {} barrier {}", currentEpoch, isCheckpoint);
if (isCheckpoint) {
ConnectorServiceProto.SinkWriterStreamResponse.CommitResponse.Builder builder =
Expand All @@ -185,6 +167,8 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
.setCommit(builder)
.build());
}
currentBatchId = null;
epochStarted = false;
} else {
throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,53 +149,11 @@ public void testOnNext_startEpochValidation() {
.setIsCheckpoint(true)
.build())
.build();
ConnectorServiceProto.SinkWriterStreamRequest startEpoch =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
.setBeginEpoch(
ConnectorServiceProto.SinkWriterStreamRequest.BeginEpoch
.newBuilder()
.setEpoch(0)
.build())
.build();
ConnectorServiceProto.SinkWriterStreamRequest duplicateStartEpoch =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
.setBeginEpoch(
ConnectorServiceProto.SinkWriterStreamRequest.BeginEpoch
.newBuilder()
.setEpoch(0)
.build())
.build();

// test validation of start epoch
boolean exceptionThrown = false;
try {
sinkWriterStreamObserver = getMockSinkStreamObserver(createNoisyFailResponseObserver());
sinkWriterStreamObserver.onNext(startSink);
sinkWriterStreamObserver.onNext(firstSync);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(e.getMessage().toLowerCase().contains("epoch is not started"));
}
if (!exceptionThrown) {
Assert.fail(
"Expected exception not thrown: `Epoch is not started. Invoke `StartEpoch`.`");
}

exceptionThrown = false;
try {
sinkWriterStreamObserver = getMockSinkStreamObserver(createNoisyFailResponseObserver());
sinkWriterStreamObserver.onNext(startSink);
sinkWriterStreamObserver.onNext(startEpoch);
sinkWriterStreamObserver.onNext(duplicateStartEpoch);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(
e.getMessage().toLowerCase().contains("new epoch id should be larger"));
}
if (!exceptionThrown) {
Assert.fail(
"Expected exception not thrown: `invalid epoch: new epoch ID should be larger than current epoch`");
}
sinkWriterStreamObserver = getMockSinkStreamObserver(createNoisyFailResponseObserver());
sinkWriterStreamObserver.onNext(startSink);
sinkWriterStreamObserver.onNext(firstSync);
}

@Test
Expand All @@ -209,14 +167,6 @@ public void testOnNext_writeValidation() {
.setFormat(ConnectorServiceProto.SinkPayloadFormat.JSON)
.setSinkParam(fileSinkParam))
.build();
ConnectorServiceProto.SinkWriterStreamRequest firstStartEpoch =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
.setBeginEpoch(
ConnectorServiceProto.SinkWriterStreamRequest.BeginEpoch
.newBuilder()
.setEpoch(0)
.build())
.build();

ConnectorServiceProto.SinkWriterStreamRequest firstWrite =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
Expand Down Expand Up @@ -249,22 +199,35 @@ public void testOnNext_writeValidation() {
.build())
.build();

ConnectorServiceProto.SinkWriterStreamRequest secondStartEpoch =
ConnectorServiceProto.SinkWriterStreamRequest secondWrite =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
.setBeginEpoch(
ConnectorServiceProto.SinkWriterStreamRequest.BeginEpoch
.setWriteBatch(
ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch
.newBuilder()
.setEpoch(1)
.setBatchId(2)
.setJsonPayload(
ConnectorServiceProto.SinkWriterStreamRequest
.WriteBatch.JsonPayload.newBuilder()
.addRowOps(
ConnectorServiceProto
.SinkWriterStreamRequest
.WriteBatch.JsonPayload
.RowOp.newBuilder()
.setOpType(Op.INSERT)
.setLine(
"{\"id\": 2, \"name\": \"test\"}")
.build()))
.build())
.build();

ConnectorServiceProto.SinkWriterStreamRequest secondWrite =
ConnectorServiceProto.SinkWriterStreamRequest secondWriteWrongEpoch =
ConnectorServiceProto.SinkWriterStreamRequest.newBuilder()
.setWriteBatch(
ConnectorServiceProto.SinkWriterStreamRequest.WriteBatch
.newBuilder()
.setEpoch(0)
.setBatchId(2)
.setEpoch(2)
.setBatchId(3)
.setJsonPayload(
ConnectorServiceProto.SinkWriterStreamRequest
.WriteBatch.JsonPayload.newBuilder()
Expand All @@ -284,7 +247,6 @@ public void testOnNext_writeValidation() {
try {
sinkWriterStreamObserver = getMockSinkStreamObserver(createNoisyFailResponseObserver());
sinkWriterStreamObserver.onNext(startSink);
sinkWriterStreamObserver.onNext(firstStartEpoch);
sinkWriterStreamObserver.onNext(firstWrite);
sinkWriterStreamObserver.onNext(firstWrite);
} catch (RuntimeException e) {
Expand All @@ -299,18 +261,17 @@ public void testOnNext_writeValidation() {
try {
sinkWriterStreamObserver = getMockSinkStreamObserver(createNoisyFailResponseObserver());
sinkWriterStreamObserver.onNext(startSink);
sinkWriterStreamObserver.onNext(firstStartEpoch);
sinkWriterStreamObserver.onNext(firstWrite);
sinkWriterStreamObserver.onNext(firstSync);
sinkWriterStreamObserver.onNext(secondStartEpoch);
sinkWriterStreamObserver.onNext(secondWrite); // with mismatched epoch
sinkWriterStreamObserver.onNext(secondWriteWrongEpoch);
} catch (RuntimeException e) {
exceptionThrown = true;
Assert.assertTrue(e.getMessage().toLowerCase().contains("invalid epoch"));
}
if (!exceptionThrown) {
Assert.fail(
"Expected exception not thrown: `invalid epoch: expected write to epoch 1, got 0`");
"Expected exception not thrown: `invalid epoch: expected write to epoch 2, got 1`");
}
}
}
10 changes: 5 additions & 5 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ message SinkWriterStreamRequest {
SinkPayloadFormat format = 2;
}

message BeginEpoch {
uint64 epoch = 1;
}

message WriteBatch {
message JsonPayload {
message RowOp {
Expand Down Expand Up @@ -76,9 +72,13 @@ message SinkWriterStreamRequest {
bool is_checkpoint = 2;
}

// reserve for previous BeginEpoch to avoid protobuf breaking change
// though there is no harm.
reserved 2;
reserved "begin_epoch";

oneof request {
StartSink start = 1;
BeginEpoch begin_epoch = 2;
WriteBatch write_batch = 3;
Barrier barrier = 4;
}
Expand Down
16 changes: 0 additions & 16 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,6 @@ impl LogSinker for RemoteLogSinker {
futures::future::Either::Right(result) => {
let (epoch, item): (u64, LogStoreReadItem) = result?;

match &prev_offset {
Some(TruncateOffset::Barrier { .. }) | None => {
// TODO: this start epoch is actually unnecessary
request_tx.start_epoch(epoch).await?;
}
_ => {}
}

match item {
LogStoreReadItem::StreamChunk { chunk, chunk_id } => {
let offset = TruncateOffset::Chunk { epoch, chunk_id };
Expand Down Expand Up @@ -577,7 +569,6 @@ impl SinkWriter for CoordinatedRemoteSinkWriter {
}

async fn begin_epoch(&mut self, epoch: u64) -> Result<()> {
self.stream_handle.start_epoch(epoch).await?;
self.epoch = Some(epoch);
Ok(())
}
Expand Down Expand Up @@ -834,11 +825,6 @@ mod test {
sink.begin_epoch(2022).await.unwrap();
assert_eq!(sink.epoch, Some(2022));

request_receiver
.recv()
.await
.expect("test failed: failed to construct start_epoch request");

sink.write_batch(chunk_a.clone()).await.unwrap();
assert_eq!(sink.epoch, Some(2022));
assert_eq!(sink.batch_id, 1);
Expand Down Expand Up @@ -882,8 +868,6 @@ mod test {

// begin another epoch
sink.begin_epoch(2023).await.unwrap();
// simply keep the channel empty since we've tested begin_epoch
let _ = request_receiver.recv().await.unwrap();
assert_eq!(sink.epoch, Some(2023));

// test another write
Expand Down
Loading

0 comments on commit 27fbce9

Please sign in to comment.