Skip to content

Commit

Permalink
feat(meta): record inject or collect barrier failure in event table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 24, 2023
1 parent 93ec8d3 commit 0d5bf77
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 5 deletions.
19 changes: 19 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,22 @@ fail-fast = false
[profile.ci.junit]
path = "junit-nextest.xml"
report-name = "nextest-run"

[test-groups]
failpoint-limited = { max-threads = 1 }

[[profile.default.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci-sim.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci.junit.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'
12 changes: 12 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,16 @@ message EventLog {
string command = 4;
string barrier_kind = 5;
}
message EventInjectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
string error = 3;
}
message EventCollectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
string error = 3;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -600,6 +610,8 @@ message EventLog {
EventDirtyStreamJobClear dirty_stream_job_clear = 4;
EventMetaNodeStart meta_node_start = 5;
EventBarrierComplete barrier_complete = 6;
EventInjectBarrierFail inject_barrier_fail = 7;
EventCollectBarrierFail collect_barrier_fail = 8;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ fn event_type(e: &Event) -> String {
Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR",
Event::MetaNodeStart(_) => "META_NODE_START",
Event::BarrierComplete(_) => "BARRIER_COMPLETE",
Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL",
Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL",
}
.into()
}
32 changes: 30 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ impl GlobalBarrierManager {
Ok(node_need_collect) => {
// todo: the collect handler should be abort when recovery.
tokio::spawn(Self::collect_barrier(
self.env.clone(),
node_need_collect,
self.env.stream_client_pool_ref(),
command_context,
Expand Down Expand Up @@ -851,12 +852,25 @@ impl GlobalBarrierManager {
.into()
}
});
try_join_all(inject_futures).await?;
try_join_all(inject_futures).await.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventInjectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
})?;
Ok(node_need_collect)
}

/// Send barrier-complete-rpc and wait for responses from all CNs
async fn collect_barrier(
env: MetaSrvEnv,
node_need_collect: HashMap<WorkerId, bool>,
client_pool_ref: StreamClientPoolRef,
command_context: Arc<CommandContext>,
Expand Down Expand Up @@ -894,7 +908,21 @@ impl GlobalBarrierManager {
}
});

let result = try_join_all(collect_futures).await.map_err(Into::into);
let result = try_join_all(collect_futures)
.await
.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventCollectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
env.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
})
.map_err(Into::into);
let _ = barrier_complete_tx
.send(BarrierCompletion { prev_epoch, result })
.inspect_err(|err| tracing::warn!("failed to complete barrier: {err}"));
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl From<&EventLog> for ChannelId {
Event::DirtyStreamJobClear(_) => 2,
Event::MetaNodeStart(_) => 3,
Event::BarrierComplete(_) => 4,
Event::InjectBarrierFail(_) => 5,
Event::CollectBarrierFail(_) => 6,
}
}
}
5 changes: 5 additions & 0 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ where
Err(err) => break Err(err),
};

fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
"intentional collect_actors_err"
)
.into()));

// Collect barriers to local barrier manager
self.context.lock_barrier_manager().collect(id, &barrier);

Expand Down
51 changes: 48 additions & 3 deletions src/tests/simulation/tests/integration_tests/recovery/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ event_log_flush_interval_ms = 10\
Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 1,
compute_nodes: 1,
compute_nodes: 3,
meta_nodes: 1,
compactor_nodes: 0,
compute_node_cores: 2,
Expand Down Expand Up @@ -126,9 +126,9 @@ async fn test_create_fail(
assert_latest_event(session.clone(), name, create_should_fail, error).await;
}

/// Tests event log do record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK.
/// Tests event log can record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK.
#[tokio::test]
async fn test_create_stream_job_fail() -> Result<()> {
async fn failpoint_limited_test_create_stream_job_fail() -> Result<()> {
let mut cluster = Cluster::start(cluster_config()).await.unwrap();
let mut session = cluster.start_session();
const WAIT_RECOVERY_SEC: u64 = 5;
Expand Down Expand Up @@ -217,3 +217,48 @@ async fn test_create_stream_job_fail() -> Result<()> {

Ok(())
}

/// Tests event log can record info of barrier collection failure.
///
/// This test is expected to be flaky, but is not according to my test:
/// Theoretically errors either reported by compute nodes, or chosen by meta node, may not be the root cause.
/// But during my tests with MADSIM_TEST_SEED from 1..1000, this test always succeeded.
#[tokio::test]
async fn failpoint_limited_test_collect_barrier_failure() -> Result<()> {
let mut cluster = Cluster::start(cluster_config()).await.unwrap();
let mut session = cluster.start_session();
session
.run("create table t (c int primary key)")
.await
.unwrap();
session
.run("create materialized view mv as select * from t")
.await
.unwrap();

let fp_collect_actor_err = "collect_actors_err";
fail::cfg(fp_collect_actor_err, "return").unwrap();
// Wait until barrier fails.
tokio::time::sleep(Duration::from_secs(3)).await;
fail::remove(fp_collect_actor_err);

let event_type = "COLLECT_BARRIER_FAIL";
let info = session
.run(format!(
"select info from rw_event_logs where event_type='{}' order by timestamp desc limit 1",
event_type
))
.await
.unwrap();
let json = serde_json::from_str::<serde_json::Value>(&info).unwrap();
let inner = json.get("collectBarrierFail").unwrap();
assert!(inner
.get("error")
.unwrap()
.as_str()
.unwrap()
.find("intentional collect_actors_err")
.is_some());

Ok(())
}

0 comments on commit 0d5bf77

Please sign in to comment.