Skip to content

Commit

Permalink
fix(jdbc-sink): relax the check for UPDATE_DELETE op (#17289)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Jun 18, 2024
1 parent ade66c7 commit ee06743
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) {
break;
case UPDATE_INSERT:
if (!updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription(
"an UPDATE_DELETE should precede an UPDATE_INSERT")
.asRuntimeException();
LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT");
}
jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row);
updateFlag = false;
Expand Down Expand Up @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {}
@Override
public Optional<ConnectorServiceProto.SinkMetadata> barrier(boolean isCheckpoint) {
if (updateFlag) {
throw Status.FAILED_PRECONDITION
.withDescription(
"expected UPDATE_INSERT to complete an UPDATE operation, got `sync`")
.asRuntimeException();
LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`");
}
return Optional.empty();
}
Expand Down
3 changes: 3 additions & 0 deletions src/stream/src/executor/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ impl<F: LogStoreFactory> SinkExecutor<F> {
&& !self.sink_param.downstream_pk.is_empty();
// Don't compact chunk for blackhole sink for better benchmark performance.
let compact_chunk = !self.sink.is_blackhole();
tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}",
sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk);

let processed_input = Self::process_msg(
input,
self.sink_param.sink_type,
Expand Down

0 comments on commit ee06743

Please sign in to comment.