diff --git a/src/connector/src/sink/starrocks.rs b/src/connector/src/sink/starrocks.rs index 2bfa9af6016c9..23f373cd92dfc 100644 --- a/src/connector/src/sink/starrocks.rs +++ b/src/connector/src/sink/starrocks.rs @@ -371,7 +371,7 @@ pub struct StarrocksSinkWriter { pk_indices: Vec, is_append_only: bool, client: Option, - txn_client: StarrocksTxnClient, + txn_client: Arc, row_encoder: JsonEncoder, executor_id: u64, curr_txn_label: Option, @@ -432,7 +432,7 @@ impl StarrocksSinkWriter { pk_indices, is_append_only, client: None, - txn_client: StarrocksTxnClient::new(txn_request_builder), + txn_client: Arc::new(StarrocksTxnClient::new(txn_request_builder)), row_encoder: JsonEncoder::new_with_starrocks(schema, None), executor_id, curr_txn_label: None, @@ -530,12 +530,8 @@ impl StarrocksSinkWriter { impl Drop for StarrocksSinkWriter { fn drop(&mut self) { if let Some(txn_label) = self.curr_txn_label.take() { - tokio::runtime::Builder::new_current_thread() - .enable_time() - .build() - .unwrap() - .block_on(self.txn_client.rollback(txn_label)) - .ok(); + let txn_client = self.txn_client.clone(); + tokio::spawn(async move { txn_client.rollback(txn_label).await.ok() }); } } }