diff --git a/Cargo.lock b/Cargo.lock index 9c615745bb104..386b2ee98206b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8110,6 +8110,7 @@ dependencies = [ "sea-orm", "serde", "serde_json", + "strum", "sync-point", "thiserror", "thiserror-ext", diff --git a/proto/meta.proto b/proto/meta.proto index 19fb29c260736..0243bc88e3833 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -584,6 +584,13 @@ message EventLog { string definition = 3; string error = 4; } + message EventBarrierComplete { + uint64 prev_epoch = 1; + uint64 cur_epoch = 2; + double duration_sec = 3; + string command = 4; + string barrier_kind = 5; + } // Event logs identifier, which should be populated by event log service. optional string unique_id = 1; // Processing time, which should be populated by event log service. @@ -592,6 +599,7 @@ message EventLog { EventCreateStreamJobFail create_stream_job_fail = 3; EventDirtyStreamJobClear dirty_stream_job_clear = 4; EventMetaNodeStart meta_node_start = 5; + EventBarrierComplete barrier_complete = 6; } } diff --git a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs index ab012eefd1241..5d43f969add23 100644 --- a/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs +++ b/src/frontend/src/catalog/system_catalog/rw_catalog/rw_event_logs.rs @@ -42,11 +42,12 @@ impl SysCatalogReaderImpl { .await? .into_iter() .sorted_by(|a, b| a.timestamp.cmp(&b.timestamp)) - .map(|e| { - let ts = Timestamptz::from_millis(e.timestamp.unwrap() as i64).unwrap(); + .map(|mut e| { + let id = e.unique_id.take().unwrap().into(); + let ts = Timestamptz::from_millis(e.timestamp.take().unwrap() as i64).unwrap(); let event_type = event_type(e.event.as_ref().unwrap()); OwnedRow::new(vec![ - Some(ScalarImpl::Utf8(e.unique_id.to_owned().unwrap().into())), + Some(ScalarImpl::Utf8(id)), Some(ScalarImpl::Timestamptz(ts)), Some(ScalarImpl::Utf8(event_type.into())), Some(ScalarImpl::Jsonb(json!(e).into())), @@ -62,6 +63,7 @@ fn event_type(e: &Event) -> String { Event::CreateStreamJobFail(_) => "CREATE_STREAM_JOB_FAIL", Event::DirtyStreamJobClear(_) => "DIRTY_STREAM_JOB_CLEAR", Event::MetaNodeStart(_) => "META_NODE_START", + Event::BarrierComplete(_) => "BARRIER_COMPLETE", } .into() } diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 32b5ea9a5a3fa..20c506eb36f81 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -65,6 +65,7 @@ sea-orm = { version = "0.12.0", features = [ ] } serde = { version = "1", features = ["derive"] } serde_json = "1" +strum = { version = "0.25", features = ["derive"] } sync-point = { path = "../utils/sync-point" } thiserror = "1" thiserror-ext = { workspace = true } diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index ce5ec2c09f040..4375ff0c33358 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -76,7 +76,7 @@ pub struct Reschedule { /// [`Command`] is the action of [`crate::barrier::GlobalBarrierManager`]. For different commands, /// we'll build different barriers to send, and may do different stuffs after the barrier is /// collected. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, strum::Display)] pub enum Command { /// `Plain` command generates a barrier with the mutation it carries. /// diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 4f5b115968eaa..55dd66a2ed532 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -1110,9 +1110,24 @@ impl GlobalBarrierManager { self.scheduled_barriers.force_checkpoint_in_next_barrier(); } - node.timer.take().unwrap().observe_duration(); + let duration_sec = node.timer.take().unwrap().stop_and_record(); node.wait_commit_timer.take().unwrap().observe_duration(); + { + // Record barrier latency in event log. + use risingwave_pb::meta::event_log; + let event = event_log::EventBarrierComplete { + prev_epoch: node.command_ctx.prev_epoch.value().0, + cur_epoch: node.command_ctx.curr_epoch.value().0, + duration_sec, + command: node.command_ctx.command.to_string(), + barrier_kind: node.command_ctx.kind.as_str_name().to_string(), + }; + self.env + .event_log_manager_ref() + .add_event_logs(vec![event_log::Event::BarrierComplete(event)]); + } + Ok(()) } InFlight => unreachable!(), diff --git a/src/meta/src/manager/event_log.rs b/src/meta/src/manager/event_log.rs index df7bf3daa14cf..35840bd912ea0 100644 --- a/src/meta/src/manager/event_log.rs +++ b/src/meta/src/manager/event_log.rs @@ -167,6 +167,7 @@ impl From<&EventLog> for ChannelId { Event::CreateStreamJobFail(_) => 1, Event::DirtyStreamJobClear(_) => 2, Event::MetaNodeStart(_) => 3, + Event::BarrierComplete(_) => 4, } } }