diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 456415a62e0d6..35d1033d8ee1b 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -371,7 +371,7 @@ pub struct StarrocksSinkWriter { pk_indices: Vec, is_append_only: bool, client: Option, - txn_client: StarrocksTxnClient, + txn_client: Arc, row_encoder: JsonEncoder, executor_id: u64, curr_txn_label: Option, @@ -432,7 +432,7 @@ impl StarrocksSinkWriter { pk_indices, is_append_only, client: None, - txn_client: StarrocksTxnClient::new(txn_request_builder), + txn_client: Arc::new(StarrocksTxnClient::new(txn_request_builder)), row_encoder: JsonEncoder::new_with_starrocks(schema, None), executor_id, curr_txn_label: None, @@ -527,6 +527,23 @@ impl StarrocksSinkWriter { } } +impl Drop for StarrocksSinkWriter { + fn drop(&mut self) { + if let Some(txn_label) = self.curr_txn_label.take() { + let txn_client = self.txn_client.clone(); + tokio::spawn(async move { + if let Err(e) = txn_client.rollback(txn_label.clone()).await { + tracing::error!( + "starrocks rollback transaction error: {:?}, txn label: {}", + e.as_report(), + txn_label + ); + } + }); + } + } +} + #[async_trait] impl SinkWriter for StarrocksSinkWriter { type CommitMetadata = Option;