Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): record inject or collect barrier failure in event table #13624

Merged
merged 3 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,22 @@ fail-fast = false
[profile.ci.junit]
path = "junit-nextest.xml"
report-name = "nextest-run"

[test-groups]
failpoint-limited = { max-threads = 1 }

[[profile.default.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci-sim.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'

[[profile.ci.junit.overrides]]
filter = 'test(failpoint_limited::)'
test-group = 'failpoint-limited'
12 changes: 12 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,16 @@ message EventLog {
string command = 4;
string barrier_kind = 5;
}
message EventInjectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
string error = 3;
}
message EventCollectBarrierFail {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
string error = 3;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -600,6 +610,8 @@ message EventLog {
EventDirtyStreamJobClear dirty_stream_job_clear = 4;
EventMetaNodeStart meta_node_start = 5;
EventBarrierComplete barrier_complete = 6;
EventInjectBarrierFail inject_barrier_fail = 7;
EventCollectBarrierFail collect_barrier_fail = 8;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ fn event_type(e: &Event) -> String {
Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR",
Event::MetaNodeStart(_) => "META_NODE_START",
Event::BarrierComplete(_) => "BARRIER_COMPLETE",
Event::InjectBarrierFail(_) => "INJECT_BARRIER_FAIL",
Event::CollectBarrierFail(_) => "COLLECT_BARRIER_FAIL",
}
.into()
}
32 changes: 30 additions & 2 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ impl GlobalBarrierManager {
Ok(node_need_collect) => {
// todo: the collect handler should be abort when recovery.
tokio::spawn(Self::collect_barrier(
self.env.clone(),
node_need_collect,
self.env.stream_client_pool_ref(),
command_context,
Expand Down Expand Up @@ -851,12 +852,25 @@ impl GlobalBarrierManager {
.into()
}
});
try_join_all(inject_futures).await?;
try_join_all(inject_futures).await.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventInjectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::InjectBarrierFail(event)]);
})?;
Ok(node_need_collect)
}

/// Send barrier-complete-rpc and wait for responses from all CNs
async fn collect_barrier(
env: MetaSrvEnv,
node_need_collect: HashMap<WorkerId, bool>,
client_pool_ref: StreamClientPoolRef,
command_context: Arc<CommandContext>,
Expand Down Expand Up @@ -894,7 +908,21 @@ impl GlobalBarrierManager {
}
});

let result = try_join_all(collect_futures).await.map_err(Into::into);
let result = try_join_all(collect_futures)
.await
.inspect_err(|e| {
// Record failure in event log.
use risingwave_pb::meta::event_log;
use thiserror_ext::AsReport;
let event = event_log::EventCollectBarrierFail {
prev_epoch: command_context.prev_epoch.value().0,
cur_epoch: command_context.curr_epoch.value().0,
error: e.to_report_string(),
};
env.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::CollectBarrierFail(event)]);
})
.map_err(Into::into);
let _ = barrier_complete_tx
.send(BarrierCompletion { prev_epoch, result })
.inspect_err(|err| tracing::warn!("failed to complete barrier: {err}"));
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ impl From<&EventLog> for ChannelId {
Event::DirtyStreamJobClear(_) => 2,
Event::MetaNodeStart(_) => 3,
Event::BarrierComplete(_) => 4,
Event::InjectBarrierFail(_) => 5,
Event::CollectBarrierFail(_) => 6,
}
}
}
5 changes: 5 additions & 0 deletions src/stream/src/executor/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ where
Err(err) => break Err(err),
};

fail::fail_point!("collect_actors_err", id == 10, |_| Err(anyhow::anyhow!(
"intentional collect_actors_err"
)
.into()));

// Collect barriers to local barrier manager
self.context.lock_barrier_manager().collect(id, &barrier);

Expand Down
51 changes: 48 additions & 3 deletions src/tests/simulation/tests/integration_tests/recovery/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ event_log_flush_interval_ms = 10\
Configuration {
config_path: ConfigPath::Temp(config_path.into()),
frontend_nodes: 1,
compute_nodes: 1,
compute_nodes: 3,
meta_nodes: 1,
compactor_nodes: 0,
compute_node_cores: 2,
Expand Down Expand Up @@ -126,9 +126,9 @@ async fn test_create_fail(
assert_latest_event(session.clone(), name, create_should_fail, error).await;
}

/// Tests event log do record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK.
/// Tests event log can record info of stream job creation failure, for CREATE TABLE/MV/INDEX/SINK.
#[tokio::test]
async fn test_create_stream_job_fail() -> Result<()> {
async fn failpoint_limited_test_create_stream_job_fail() -> Result<()> {
let mut cluster = Cluster::start(cluster_config()).await.unwrap();
let mut session = cluster.start_session();
const WAIT_RECOVERY_SEC: u64 = 5;
Expand Down Expand Up @@ -217,3 +217,48 @@ async fn test_create_stream_job_fail() -> Result<()> {

Ok(())
}

/// Tests event log can record info of barrier collection failure.
///
/// This test is expected to be flaky, but is not according to my test:
/// Theoretically errors either reported by compute nodes, or chosen by meta node, may not be the root cause.
/// But during my tests with MADSIM_TEST_SEED from 1..1000, this test always succeeded.
#[tokio::test]
async fn failpoint_limited_test_collect_barrier_failure() -> Result<()> {
let mut cluster = Cluster::start(cluster_config()).await.unwrap();
let mut session = cluster.start_session();
session
.run("create table t (c int primary key)")
.await
.unwrap();
session
.run("create materialized view mv as select * from t")
.await
.unwrap();

let fp_collect_actor_err = "collect_actors_err";
fail::cfg(fp_collect_actor_err, "return").unwrap();
// Wait until barrier fails.
tokio::time::sleep(Duration::from_secs(3)).await;
fail::remove(fp_collect_actor_err);

let event_type = "COLLECT_BARRIER_FAIL";
let info = session
.run(format!(
"select info from rw_event_logs where event_type='{}' order by timestamp desc limit 1",
event_type
))
.await
.unwrap();
let json = serde_json::from_str::<serde_json::Value>(&info).unwrap();
let inner = json.get("collectBarrierFail").unwrap();
assert!(inner
.get("error")
.unwrap()
.as_str()
.unwrap()
.find("intentional collect_actors_err")
.is_some());

Ok(())
}
Loading