diff --git a/.config/nextest.toml b/.config/nextest.toml index 898b335fb8f86..15de353595390 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -19,3 +19,22 @@ fail-fast = false [profile.ci.junit] path = "junit-nextest.xml" report-name = "nextest-run" + +[test-groups] +failpoint-limited = { max-threads = 1 } + +[[profile.default.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci-sim.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci.junit.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' \ No newline at end of file diff --git a/proto/meta.proto b/proto/meta.proto index 0243bc88e3833..d9187c2afd27f 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -591,6 +591,16 @@ message EventLog { string command = 4; string barrier_kind = 5; } + message EventInjectBarrierFail { + uint64 prev_epoch = 1; + uint64 cur_epoch = 2; + string error = 3; + } + message EventCollectBarrierFail { + uint64 prev_epoch = 1; + uint64 cur_epoch = 2; + string error = 3; + } // Event logs identifier, which should be populated by event log service. optional string unique_id = 1; // Processing time, which should be populated by event log service. @@ -600,6 +610,8 @@ message EventLog { EventDirtyStreamJobClear dirty_stream_job_clear = 4; EventMetaNodeStart meta_node_start = 5; EventBarrierComplete barrier_complete = 6; + EventInjectBarrierFail inject_barrier_fail = 7; + EventCollectBarrierFail collect_barrier_fail = 8; } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs index 5d43f969add23..0c801548c956c 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -64,6 +64,8 @@ fn event_type(e: &Event) -> String { Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR", Event::MetaNodeStart(_) => "META_NODE_START", Event::BarrierComplete(_) => "BARRIER_COMPLETE", + Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL", + Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL", } .into() } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 55dd66a2ed532..f7dc78e35c7a3 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -784,6 +784,7 @@ impl GlobalBarrierManager { Ok(node_need_collect) => { // todo: the collect handler should be abort when recovery. tokio::spawn(Self::collect_barrier( + self.env.clone(), node_need_collect, self.env.stream_client_pool_ref(), command_context, @@ -851,12 +852,25 @@ impl GlobalBarrierManager { .into() } }); - try_join_all(inject_futures).await?; + try_join_all(inject_futures).await.inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventInjectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); + })?; Ok(node_need_collect) } /// Send barrier-complete-rpc and wait for responses from all CNs async fn collect_barrier( + env: MetaSrvEnv, node_need_collect: HashMap, client_pool_ref: StreamClientPoolRef, command_context: Arc, @@ -894,7 +908,21 @@ impl GlobalBarrierManager { } }); - let result = try_join_all(collect_futures).await.map_err(Into::into); + let result = try_join_all(collect_futures) + .await + .inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::EventCollectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + env.event_log_manager_ref() + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + }) + .map_err(Into::into); let _ = barrier_complete_tx .send(BarrierCompletion { prev_epoch, result }) .inspect_err(|err| tracing::warn!("failed to complete barrier: {err}")); diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs index 35840bd912ea0..b54f0a17dbec7 100644 --- a/src/meta/src/manager/event_log.rs +++ b/src/meta/src/manager/event_log.rs @@ -168,6 +168,8 @@ impl From<&EventLog> for ChannelId { Event::DirtyStreamJobClear(_) => 2, Event::MetaNodeStart(_) => 3, Event::BarrierComplete(_) => 4, + Event::InjectBarrierFail(_) => 5, + Event::CollectBarrierFail(_) => 6, } } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index dc8f71eb6ce9f..3933efcbcaf30 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -200,6 +200,11 @@ where Err(err) => break Err(err), }; + fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!( + "intentional collect_actors_err" + ) + .into())); + // Collect barriers to local barrier manager self.context.lock_barrier_manager().collect(id, &barrier); diff --git a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs index f2f929c823908..f5ccddb2fd9c8 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs @@ -40,7 +40,7 @@ event_log_flush_interval_ms = 10\ Configuration { config_path: ConfigPath::Temp(config_path.into()), frontend_nodes: 1, - compute_nodes: 1, + compute_nodes: 3, meta_nodes: 1, compactor_nodes: 0, compute_node_cores: 2, @@ -126,9 +126,9 @@ async fn test_create_fail( assert_latest_event(session.clone(), name, create_should_fail, error).await; } -/// Tests event log do record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK. +/// Tests event log can record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK. #[tokio::test] -async fn test_create_stream_job_fail() -> Result<()> { +async fn failpoint_limited_test_create_stream_job_fail() -> Result<()> { let mut cluster = Cluster::start(cluster_config()).await.unwrap(); let mut session = cluster.start_session(); const WAIT_RECOVERY_SEC: u64 = 5; @@ -217,3 +217,48 @@ async fn test_create_stream_job_fail() -> Result<()> { Ok(()) } + +/// Tests event log can record info of barrier collection failure. +/// +/// This test is expected to be flaky, but is not according to my test: +/// Theoretically errors either reported by compute nodes, or chosen by meta node, may not be the root cause. +/// But during my tests with MADSIM_TEST_SEED from 1..1000, this test always succeeded. +#[tokio::test] +async fn failpoint_limited_test_collect_barrier_failure() -> Result<()> { + let mut cluster = Cluster::start(cluster_config()).await.unwrap(); + let mut session = cluster.start_session(); + session + .run("create table t (c int primary key)") + .await + .unwrap(); + session + .run("create materialized view mv as select * from t") + .await + .unwrap(); + + let fp_collect_actor_err = "collect_actors_err"; + fail::cfg(fp_collect_actor_err, "return").unwrap(); + // Wait until barrier fails. + tokio::time::sleep(Duration::from_secs(3)).await; + fail::remove(fp_collect_actor_err); + + let event_type = "COLLECT_BARRIER_FAIL"; + let info = session + .run(format!( + "select info from rw_event_logs where event_type='{}' order by timestamp desc limit 1", + event_type + )) + .await + .unwrap(); + let json = serde_json::from_str::(&info).unwrap(); + let inner = json.get("collectBarrierFail").unwrap(); + assert!(inner + .get("error") + .unwrap() + .as_str() + .unwrap() + .find("intentional collect_actors_err") + .is_some()); + + Ok(()) +}