From ee0674314d3d131a9f681be40b008e65f4859f3a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 18 Jun 2024 14:20:33 +0800 Subject: [PATCH] fix(jdbc-sink): relax the check for UPDATE_DELETE op (#17289) --- .../main/java/com/risingwave/connector/JDBCSink.java | 10 ++-------- src/stream/src/executor/sink.rs | 3 +++ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 399a312758e3b..10aa371c50aec 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) { break; case UPDATE_INSERT: if (!updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_DELETE should precede an UPDATE_INSERT") - .asRuntimeException(); + LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT"); } jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row); updateFlag = false; @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {} @Override public Optional barrier(boolean isCheckpoint) { if (updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") - .asRuntimeException(); + LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`"); } return Optional.empty(); } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index f7c7770c0bd7d..c0ee1330805bf 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -185,6 +185,9 @@ impl SinkExecutor { && !self.sink_param.downstream_pk.is_empty(); // Don't compact chunk for blackhole sink for better benchmark performance. let compact_chunk = !self.sink.is_blackhole(); + tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}", + sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk); + let processed_input = Self::process_msg( input, self.sink_param.sink_type,