From d9f84dce20df80b126073c113b244bcac5fde545 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Thu, 23 Nov 2023 16:37:50 +0800 Subject: [PATCH 1/2] feat(meta): record inject or collect barrier failure in event table --- .config/nextest.toml | 19 +++++++ proto/meta.proto | 12 +++++ .../rw_catalog/rw_event_logs.rs | 2 + src/meta/src/barrier/mod.rs | 32 +++++++++++- src/meta/src/manager/event_log.rs | 2 + src/stream/src/executor/actor.rs | 5 ++ .../integration_tests/recovery/event_log.rs | 51 +++++++++++++++++-- 7 files changed, 118 insertions(+), 5 deletions(-) diff --git a/.config/nextest.toml b/.config/nextest.toml index 898b335fb8f86..15de353595390 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -19,3 +19,22 @@ fail-fast = false [profile.ci.junit] path = "junit-nextest.xml" report-name = "nextest-run" + +[test-groups] +failpoint-limited = { max-threads = 1 } + +[[profile.default.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci-sim.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' + +[[profile.ci.junit.overrides]] +filter = 'test(failpoint_limited::)' +test-group = 'failpoint-limited' \ No newline at end of file diff --git a/proto/meta.proto b/proto/meta.proto index 19fb29c260736..43b381521e9ee 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -584,6 +584,16 @@ message EventLog { string definition = 3; string error = 4; } + message InjectBarrierFail { + uint64 prev_epoch = 1; + uint64 cur_epoch = 2; + string error = 3; + } + message CollectBarrierFail { + uint64 prev_epoch = 1; + uint64 cur_epoch = 2; + string error = 3; + } // Event logs identifier, which should be populated by event log service. optional string unique_id = 1; // Processing time, which should be populated by event log service. @@ -592,6 +602,8 @@ message EventLog { EventCreateStreamJobFail create_stream_job_fail = 3; EventDirtyStreamJobClear dirty_stream_job_clear = 4; EventMetaNodeStart meta_node_start = 5; + InjectBarrierFail inject_barrier_fail = 6; + CollectBarrierFail collect_barrier_fail = 7; } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs index ab012eefd1241..b11052e485775 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -62,6 +62,8 @@ fn event_type(e: &Event) -> String { Event::CreateStreamJobFail(_) => "CREATE_STREAM_JOB_FAIL", Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR", Event::MetaNodeStart(_) => "META_NODE_START", + Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL", + Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL", } .into() } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 4f5b115968eaa..6d83c7fc39b56 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -784,6 +784,7 @@ impl GlobalBarrierManager { Ok(node_need_collect) => { // todo: the collect handler should be abort when recovery. tokio::spawn(Self::collect_barrier( + self.env.clone(), node_need_collect, self.env.stream_client_pool_ref(), command_context, @@ -851,12 +852,25 @@ impl GlobalBarrierManager { .into() } }); - try_join_all(inject_futures).await?; + try_join_all(inject_futures).await.inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::InjectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]); + })?; Ok(node_need_collect) } /// Send barrier-complete-rpc and wait for responses from all CNs async fn collect_barrier( + env: MetaSrvEnv, node_need_collect: HashMap, client_pool_ref: StreamClientPoolRef, command_context: Arc, @@ -894,7 +908,21 @@ impl GlobalBarrierManager { } }); - let result = try_join_all(collect_futures).await.map_err(Into::into); + let result = try_join_all(collect_futures) + .await + .inspect_err(|e| { + // Record failure in event log. + use risingwave_pb::meta::event_log; + use thiserror_ext::AsReport; + let event = event_log::CollectBarrierFail { + prev_epoch: command_context.prev_epoch.value().0, + cur_epoch: command_context.curr_epoch.value().0, + error: e.to_report_string(), + }; + env.event_log_manager_ref() + .add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]); + }) + .map_err(Into::into); let _ = barrier_complete_tx .send(BarrierCompletion { prev_epoch, result }) .inspect_err(|err| tracing::warn!("failed to complete barrier: {err}")); diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs index df7bf3daa14cf..843dfaa8226da 100644 --- a/src/meta/src/manager/event_log.rs +++ b/src/meta/src/manager/event_log.rs @@ -167,6 +167,8 @@ impl From<&EventLog> for ChannelId { Event::CreateStreamJobFail(_) => 1, Event::DirtyStreamJobClear(_) => 2, Event::MetaNodeStart(_) => 3, + Event::InjectBarrierFail(_) => 4, + Event::CollectBarrierFail(_) => 5, } } } diff --git a/src/stream/src/executor/actor.rs b/src/stream/src/executor/actor.rs index dc8f71eb6ce9f..3933efcbcaf30 100644 --- a/src/stream/src/executor/actor.rs +++ b/src/stream/src/executor/actor.rs @@ -200,6 +200,11 @@ where Err(err) => break Err(err), }; + fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!( + "intentional collect_actors_err" + ) + .into())); + // Collect barriers to local barrier manager self.context.lock_barrier_manager().collect(id, &barrier); diff --git a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs index f2f929c823908..f5ccddb2fd9c8 100644 --- a/src/tests/simulation/tests/integration_tests/recovery/event_log.rs +++ b/src/tests/simulation/tests/integration_tests/recovery/event_log.rs @@ -40,7 +40,7 @@ event_log_flush_interval_ms = 10\ Configuration { config_path: ConfigPath::Temp(config_path.into()), frontend_nodes: 1, - compute_nodes: 1, + compute_nodes: 3, meta_nodes: 1, compactor_nodes: 0, compute_node_cores: 2, @@ -126,9 +126,9 @@ async fn test_create_fail( assert_latest_event(session.clone(), name, create_should_fail, error).await; } -/// Tests event log do record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK. +/// Tests event log can record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK. #[tokio::test] -async fn test_create_stream_job_fail() -> Result<()> { +async fn failpoint_limited_test_create_stream_job_fail() -> Result<()> { let mut cluster = Cluster::start(cluster_config()).await.unwrap(); let mut session = cluster.start_session(); const WAIT_RECOVERY_SEC: u64 = 5; @@ -217,3 +217,48 @@ async fn test_create_stream_job_fail() -> Result<()> { Ok(()) } + +/// Tests event log can record info of barrier collection failure. +/// +/// This test is expected to be flaky, but is not according to my test: +/// Theoretically errors either reported by compute nodes, or chosen by meta node, may not be the root cause. +/// But during my tests with MADSIM_TEST_SEED from 1..1000, this test always succeeded. +#[tokio::test] +async fn failpoint_limited_test_collect_barrier_failure() -> Result<()> { + let mut cluster = Cluster::start(cluster_config()).await.unwrap(); + let mut session = cluster.start_session(); + session + .run("create table t (c int primary key)") + .await + .unwrap(); + session + .run("create materialized view mv as select * from t") + .await + .unwrap(); + + let fp_collect_actor_err = "collect_actors_err"; + fail::cfg(fp_collect_actor_err, "return").unwrap(); + // Wait until barrier fails. + tokio::time::sleep(Duration::from_secs(3)).await; + fail::remove(fp_collect_actor_err); + + let event_type = "COLLECT_BARRIER_FAIL"; + let info = session + .run(format!( + "select info from rw_event_logs where event_type='{}' order by timestamp desc limit 1", + event_type + )) + .await + .unwrap(); + let json = serde_json::from_str::(&info).unwrap(); + let inner = json.get("collectBarrierFail").unwrap(); + assert!(inner + .get("error") + .unwrap() + .as_str() + .unwrap() + .find("intentional collect_actors_err") + .is_some()); + + Ok(()) +} From e11e63312189a644607dbb3a7d41979889ec2b97 Mon Sep 17 00:00:00 2001 From: zwang28 <84491488@qq.com> Date: Fri, 24 Nov 2023 11:48:05 +0800 Subject: [PATCH 2/2] rename event proto --- proto/meta.proto | 8 ++++---- src/meta/src/barrier/mod.rs | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index 43b381521e9ee..7616d1b8ffee9 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -584,12 +584,12 @@ message EventLog { string definition = 3; string error = 4; } - message InjectBarrierFail { + message EventInjectBarrierFail { uint64 prev_epoch = 1; uint64 cur_epoch = 2; string error = 3; } - message CollectBarrierFail { + message EventCollectBarrierFail { uint64 prev_epoch = 1; uint64 cur_epoch = 2; string error = 3; @@ -602,8 +602,8 @@ message EventLog { EventCreateStreamJobFail create_stream_job_fail = 3; EventDirtyStreamJobClear dirty_stream_job_clear = 4; EventMetaNodeStart meta_node_start = 5; - InjectBarrierFail inject_barrier_fail = 6; - CollectBarrierFail collect_barrier_fail = 7; + EventInjectBarrierFail inject_barrier_fail = 6; + EventCollectBarrierFail collect_barrier_fail = 7; } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 6d83c7fc39b56..dcda5e1b19c1f 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -856,7 +856,7 @@ impl GlobalBarrierManager { // Record failure in event log. use risingwave_pb::meta::event_log; use thiserror_ext::AsReport; - let event = event_log::InjectBarrierFail { + let event = event_log::EventInjectBarrierFail { prev_epoch: command_context.prev_epoch.value().0, cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(), @@ -914,7 +914,7 @@ impl GlobalBarrierManager { // Record failure in event log. use risingwave_pb::meta::event_log; use thiserror_ext::AsReport; - let event = event_log::CollectBarrierFail { + let event = event_log::EventCollectBarrierFail { prev_epoch: command_context.prev_epoch.value().0, cur_epoch: command_context.curr_epoch.value().0, error: e.to_report_string(),