Skip to content

Commit

Permalink
refactor(sink): reimplement remote sink without writer (#13137)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Nov 3, 2023
1 parent 244017c commit 453eed8
Show file tree
Hide file tree
Showing 6 changed files with 270 additions and 365 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.risingwave.java.binding.Binding;
import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,21 +27,23 @@ public class JniSinkCoordinatorResponseObserver
LoggerFactory.getLogger(JniSinkCoordinatorResponseObserver.class);
private long responseTxPtr;

private boolean success;
private boolean success = true;

public JniSinkCoordinatorResponseObserver(long responseTxPtr) {
this.responseTxPtr = responseTxPtr;
}

@Override
public void onNext(ConnectorServiceProto.SinkCoordinatorStreamResponse response) {
this.success =
Binding.sendSinkCoordinatorResponseToChannel(
this.responseTxPtr, response.toByteArray());
if (!Binding.sendSinkCoordinatorResponseToChannel(
this.responseTxPtr, response.toByteArray())) {
throw Status.INTERNAL.withDescription("unable to send response").asRuntimeException();
}
}

@Override
public void onError(Throwable throwable) {
this.success = false;
LOG.error("JniSinkCoordinatorHandler onError: ", throwable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.risingwave.java.binding.Binding;
import com.risingwave.proto.ConnectorServiceProto;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -25,22 +26,25 @@ public class JniSinkWriterResponseObserver
private static final Logger LOG = LoggerFactory.getLogger(JniSinkWriterResponseObserver.class);
private long responseTxPtr;

private boolean success;
private boolean success = true;

public JniSinkWriterResponseObserver(long responseTxPtr) {
this.responseTxPtr = responseTxPtr;
}

@Override
public void onNext(ConnectorServiceProto.SinkWriterStreamResponse response) {
this.success =
Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray());
if (!Binding.sendSinkWriterResponseToChannel(this.responseTxPtr, response.toByteArray())) {
throw Status.INTERNAL.withDescription("unable to send response").asRuntimeException();
}
}

@Override
public void onError(Throwable throwable) {
this.success =
Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage());
if (!Binding.sendSinkWriterErrorToChannel(this.responseTxPtr, throwable.getMessage())) {
LOG.warn("unable to send error: {}", throwable.getMessage());
}
this.success = false;
LOG.error("JniSinkWriterHandler onError: ", throwable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,14 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
boolean isCheckpoint = sinkTask.getBarrier().getIsCheckpoint();
Optional<ConnectorServiceProto.SinkMetadata> metadata = sink.barrier(isCheckpoint);
currentEpoch = sinkTask.getBarrier().getEpoch();
currentBatchId = null;
LOG.debug("Epoch {} barrier {}", currentEpoch, isCheckpoint);
if (isCheckpoint) {
ConnectorServiceProto.SinkWriterStreamResponse.CommitResponse.Builder builder =
ConnectorServiceProto.SinkWriterStreamResponse.CommitResponse
.newBuilder()
.setEpoch(currentEpoch);
if (metadata.isPresent()) {
builder.setMetadata(metadata.get());
}
metadata.ifPresent(builder::setMetadata);
responseObserver.onNext(
ConnectorServiceProto.SinkWriterStreamResponse.newBuilder()
.setCommit(builder)
Expand All @@ -173,7 +172,7 @@ public void onNext(ConnectorServiceProto.SinkWriterStreamRequest sinkTask) {
} else {
throw INVALID_ARGUMENT.withDescription("invalid sink task").asRuntimeException();
}
} catch (Exception e) {
} catch (Throwable e) {
LOG.error("sink writer error: ", e);
cleanup();
responseObserver.onError(e);
Expand Down
12 changes: 12 additions & 0 deletions src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,18 @@ impl TruncateOffset {
}
}

pub fn check_next_offset(&self, next_offset: TruncateOffset) -> anyhow::Result<()> {
if *self >= next_offset {
Err(anyhow!(
"next offset {:?} should be later than current offset {:?}",
next_offset,
self
))
} else {
Ok(())
}
}

pub fn check_next_item_epoch(&self, epoch: u64) -> LogStoreResult<()> {
match self {
TruncateOffset::Chunk {
Expand Down
Loading

0 comments on commit 453eed8

Please sign in to comment.