Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
fix comm
  • Loading branch information
xxhZs committed Oct 31, 2024
1 parent 13dcf9d commit a9871cc
Showing 1 changed file with 4 additions and 8 deletions.
12 changes: 4 additions & 8 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ pub struct StarrocksSinkWriter {
pk_indices: Vec<usize>,
is_append_only: bool,
client: Option<StarrocksClient>,
txn_client: StarrocksTxnClient,
txn_client: Arc<StarrocksTxnClient>,
row_encoder: JsonEncoder,
executor_id: u64,
curr_txn_label: Option<String>,
Expand Down Expand Up @@ -432,7 +432,7 @@ impl StarrocksSinkWriter {
pk_indices,
is_append_only,
client: None,
txn_client: StarrocksTxnClient::new(txn_request_builder),
txn_client: Arc::new(StarrocksTxnClient::new(txn_request_builder)),
row_encoder: JsonEncoder::new_with_starrocks(schema, None),
executor_id,
curr_txn_label: None,
Expand Down Expand Up @@ -530,12 +530,8 @@ impl StarrocksSinkWriter {
impl Drop for StarrocksSinkWriter {
fn drop(&mut self) {
if let Some(txn_label) = self.curr_txn_label.take() {
tokio::runtime::Builder::new_current_thread()
.enable_time()
.build()
.unwrap()
.block_on(self.txn_client.rollback(txn_label))
.ok();
let txn_client = self.txn_client.clone();
tokio::spawn(async move { txn_client.rollback(txn_label).await.ok() });
}
}
}
Expand Down

0 comments on commit a9871cc

Please sign in to comment.