From 014d597de093fdb78b72f5578083c655f7208060 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Tue, 18 Jun 2024 15:08:07 +0800 Subject: [PATCH] fix(jdbc-sink): relax the check for UPDATE_DELETE op (#17289) --- .../main/java/com/risingwave/connector/JDBCSink.java | 10 ++-------- src/stream/src/executor/sink.rs | 3 +++ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 399a312758e3..10aa371c50ae 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) { break; case UPDATE_INSERT: if (!updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_DELETE should precede an UPDATE_INSERT") - .asRuntimeException(); + LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT"); } jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row); updateFlag = false; @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {} @Override public Optional barrier(boolean isCheckpoint) { if (updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") - .asRuntimeException(); + LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`"); } return Optional.empty(); } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 1bd8d5e5c96a..562012165e66 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -189,6 +189,9 @@ impl SinkExecutor { let re_construct_with_sink_pk = need_advance_delete && self.sink_param.sink_type == SinkType::Upsert && !self.sink_param.downstream_pk.is_empty(); + tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}", + sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk); + let processed_input = Self::process_msg( input, self.sink_param.sink_id,