diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 399a312758e3b..10aa371c50aec 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -254,10 +254,7 @@ public void prepareUpsert(SinkRow row) { break; case UPDATE_INSERT: if (!updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "an UPDATE_DELETE should precede an UPDATE_INSERT") - .asRuntimeException(); + LOG.warn("Missing an UPDATE_DELETE precede an UPDATE_INSERT"); } jdbcDialect.bindUpsertStatement(upsertStatement, conn, tableSchema, row); updateFlag = false; @@ -364,10 +361,7 @@ public void beginEpoch(long epoch) {} @Override public Optional barrier(boolean isCheckpoint) { if (updateFlag) { - throw Status.FAILED_PRECONDITION - .withDescription( - "expected UPDATE_INSERT to complete an UPDATE operation, got `sync`") - .asRuntimeException(); + LOG.warn("expect an UPDATE_INSERT to complete an UPDATE operation, got `sync`"); } return Optional.empty(); } diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 1bd8d5e5c96a4..562012165e66a 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -189,6 +189,9 @@ impl SinkExecutor { let re_construct_with_sink_pk = need_advance_delete && self.sink_param.sink_type == SinkType::Upsert && !self.sink_param.downstream_pk.is_empty(); + tracing::info!("Sink info: sink_id: {} actor_id: {}, need_advance_delete: {}, re_construct_with_sink_pk: {}", + sink_id, actor_id, need_advance_delete, re_construct_with_sink_pk); + let processed_input = Self::process_msg( input, self.sink_param.sink_id,