Skip to content

Commit

Permalink
Merge branch 'main' into wangzheng/refactor_var
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 18, 2024
2 parents b059904 + 0b223b4 commit 8cf025c
Show file tree
Hide file tree
Showing 9 changed files with 240 additions and 246 deletions.
446 changes: 218 additions & 228 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ci/workflows/main-cron.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ steps:
# Ddl statements will randomly run with background_ddl.
- label: "background_ddl, arrangement_backfill recovery test (madsim)"
key: "background-ddl-arrangement-backfill-recovery-test-deterministic"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 65m ci/scripts/deterministic-recovery-test.sh"
command: "TEST_NUM=12 KILL_RATE=1.0 BACKGROUND_DDL_RATE=0.8 USE_ARRANGEMENT_BACKFILL=true timeout 70m ci/scripts/deterministic-recovery-test.sh"
if: |
!(build.pull_request.labels includes "ci/main-cron/run-selected") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-recovery-test-deterministic-simulation"
Expand Down
17 changes: 6 additions & 11 deletions src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use serde_derive::Serialize;
use serde_json::Value;
use serde_with::{serde_as, DisplayFromStr};
use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use url::form_urlencoded;
use with_options::WithOptions;

Expand Down Expand Up @@ -898,16 +897,12 @@ impl SinkCommitCoordinator for StarrocksSinkCommitter {
tracing::debug!(?epoch, ?txn_labels, "commit transaction");

if !txn_labels.is_empty() {
let join_handles = txn_labels
.into_iter()
.map(|txn_label| {
let client = self.client.clone();
tokio::spawn(async move { client.commit(txn_label).await })
})
.collect::<Vec<JoinHandle<Result<String>>>>();
futures::future::try_join_all(join_handles)
.await
.map_err(|err| SinkError::DorisStarrocksConnect(anyhow!(err)))?;
futures::future::try_join_all(
txn_labels
.into_iter()
.map(|txn_label| self.client.commit(txn_label)),
)
.await?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ risingwave_sqlsmith = { workspace = true }
serde = "1.0.188"
serde_derive = "1.0.188"
serde_json = "1.0.107"
sqllogictest = "0.22.0"
sqllogictest = "0.23.0"
tempfile = "3"
tikv-jemallocator = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/tests/integration_tests/sink/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn basic_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));
test_sink.wait_initial_parallelism(6).await?;

let internal_tables = session.run("show internal tables").await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn test_sink_decouple_err_isolation() -> Result<()> {
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));
test_sink.wait_initial_parallelism(6).await?;

test_sink.set_err_rate(0.002);

Expand Down Expand Up @@ -81,7 +81,7 @@ async fn test_sink_error_event_logs() -> Result<()> {
session.run("set sink_decouple = true").await?;
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));
test_sink.wait_initial_parallelism(6).await?;

test_sink.store.wait_for_err(1).await?;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn recovery_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));
test_sink.wait_initial_parallelism(6).await?;

let count = test_source.id_list.len();

Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/tests/integration_tests/sink/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn scale_test_inner(is_decouple: bool) -> Result<()> {
}
session.run(CREATE_SOURCE).await?;
session.run(CREATE_SINK).await?;
assert_eq!(6, test_sink.parallelism_counter.load(Relaxed));
test_sink.wait_initial_parallelism(6).await?;

let mut sink_fragments = cluster
.locate_fragments([identity_contains("Sink")])
Expand Down
9 changes: 9 additions & 0 deletions src/tests/simulation/tests/integration_tests/sink/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use risingwave_connector::source::test_source::{
registry_test_source, BoxSource, TestSourceRegistryGuard, TestSourceSplit,
};
use risingwave_simulation::cluster::{Cluster, ConfigPath, Configuration};
use tokio::task::yield_now;
use tokio::time::sleep;

use crate::{assert_eq_with_err_returned as assert_eq, assert_with_err_returned as assert};
Expand Down Expand Up @@ -244,6 +245,14 @@ impl SimulationTestSink {
let err_rate = u32::MAX as f64 * err_rate;
self.err_rate.store(err_rate as _, Relaxed);
}

pub async fn wait_initial_parallelism(&self, parallelism: usize) -> Result<()> {
while self.parallelism_counter.load(Relaxed) < parallelism {
yield_now().await;
}
assert_eq!(self.parallelism_counter.load(Relaxed), parallelism);
Ok(())
}
}

pub fn build_stream_chunk(
Expand Down

0 comments on commit 8cf025c

Please sign in to comment.