Skip to content

Commit

Permalink
feat(meta): record barrier latency in event table (#13633)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Nov 24, 2023
1 parent 3f5180c commit f707c5f
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,13 @@ message EventLog {
string definition = 3;
string error = 4;
}
message EventBarrierComplete {
uint64 prev_epoch = 1;
uint64 cur_epoch = 2;
double duration_sec = 3;
string command = 4;
string barrier_kind = 5;
}
// Event logs identifier, which should be populated by event log service.
optional string unique_id = 1;
// Processing time, which should be populated by event log service.
Expand All @@ -592,6 +599,7 @@ message EventLog {
EventCreateStreamJobFail create_stream_job_fail = 3;
EventDirtyStreamJobClear dirty_stream_job_clear = 4;
EventMetaNodeStart meta_node_start = 5;
EventBarrierComplete barrier_complete = 6;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,12 @@ impl SysCatalogReaderImpl {
.await?
.into_iter()
.sorted_by(|a, b| a.timestamp.cmp(&b.timestamp))
.map(|e| {
let ts = Timestamptz::from_millis(e.timestamp.unwrap() as i64).unwrap();
.map(|mut e| {
let id = e.unique_id.take().unwrap().into();
let ts = Timestamptz::from_millis(e.timestamp.take().unwrap() as i64).unwrap();
let event_type = event_type(e.event.as_ref().unwrap());
OwnedRow::new(vec![
Some(ScalarImpl::Utf8(e.unique_id.to_owned().unwrap().into())),
Some(ScalarImpl::Utf8(id)),
Some(ScalarImpl::Timestamptz(ts)),
Some(ScalarImpl::Utf8(event_type.into())),
Some(ScalarImpl::Jsonb(json!(e).into())),
Expand All @@ -62,6 +63,7 @@ fn event_type(e: &Event) -> String {
Event::CreateStreamJobFail(_) => "CREATE_STREAM_JOB_FAIL",
Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR",
Event::MetaNodeStart(_) => "META_NODE_START",
Event::BarrierComplete(_) => "BARRIER_COMPLETE",
}
.into()
}
1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ sea-orm = { version = "0.12.0", features = [
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
strum = { version = "0.25", features = ["derive"] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
thiserror-ext = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ pub struct Reschedule {
/// [`Command`] is the action of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// we'll build different barriers to send, and may do different stuffs after the barrier is
/// collected.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, strum::Display)]
pub enum Command {
/// `Plain` command generates a barrier with the mutation it carries.
///
Expand Down
17 changes: 16 additions & 1 deletion src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,9 +1110,24 @@ impl GlobalBarrierManager {
self.scheduled_barriers.force_checkpoint_in_next_barrier();
}

node.timer.take().unwrap().observe_duration();
let duration_sec = node.timer.take().unwrap().stop_and_record();
node.wait_commit_timer.take().unwrap().observe_duration();

{
// Record barrier latency in event log.
use risingwave_pb::meta::event_log;
let event = event_log::EventBarrierComplete {
prev_epoch: node.command_ctx.prev_epoch.value().0,
cur_epoch: node.command_ctx.curr_epoch.value().0,
duration_sec,
command: node.command_ctx.command.to_string(),
barrier_kind: node.command_ctx.kind.as_str_name().to_string(),
};
self.env
.event_log_manager_ref()
.add_event_logs(vec![event_log::Event::BarrierComplete(event)]);
}

Ok(())
}
InFlight => unreachable!(),
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/manager/event_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ impl From<&EventLog> for ChannelId {
Event::CreateStreamJobFail(_) => 1,
Event::DirtyStreamJobClear(_) => 2,
Event::MetaNodeStart(_) => 3,
Event::BarrierComplete(_) => 4,
}
}
}

0 comments on commit f707c5f

Please sign in to comment.