Skip to content

Commit

Permalink
fix(sink): Fix excessive number of starrocks txn (#19217)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Nov 4, 2024
1 parent e7e4a2c commit 271faac
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ pub struct StarrocksSinkWriter {
pk_indices: Vec<usize>,
is_append_only: bool,
client: Option<StarrocksClient>,
txn_client: StarrocksTxnClient,
txn_client: Arc<StarrocksTxnClient>,
row_encoder: JsonEncoder,
executor_id: u64,
curr_txn_label: Option<String>,
Expand Down Expand Up @@ -432,7 +432,7 @@ impl StarrocksSinkWriter {
pk_indices,
is_append_only,
client: None,
txn_client: StarrocksTxnClient::new(txn_request_builder),
txn_client: Arc::new(StarrocksTxnClient::new(txn_request_builder)),
row_encoder: JsonEncoder::new_with_starrocks(schema, None),
executor_id,
curr_txn_label: None,
Expand Down Expand Up @@ -527,6 +527,23 @@ impl StarrocksSinkWriter {
}
}

impl Drop for StarrocksSinkWriter {
fn drop(&mut self) {
if let Some(txn_label) = self.curr_txn_label.take() {
let txn_client = self.txn_client.clone();
tokio::spawn(async move {
if let Err(e) = txn_client.rollback(txn_label.clone()).await {
tracing::error!(
"starrocks rollback transaction error: {:?}, txn label: {}",
e.as_report(),
txn_label
);
}
});
}
}
}

#[async_trait]
impl SinkWriter for StarrocksSinkWriter {
type CommitMetadata = Option<SinkMetadata>;
Expand Down

0 comments on commit 271faac

Please sign in to comment.