Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add logs and metrics #2858

Merged
merged 6 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ impl RegionAliveKeeper {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(role, deadline).await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
);
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}

Expand Down
29 changes: 27 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics;
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand Down Expand Up @@ -72,9 +73,9 @@ impl HeartbeatTask {
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
region_alive_keeper.clone(),
]));

Ok(Self {
Expand All @@ -101,8 +102,10 @@ impl HeartbeatTask {
quit_signal: Arc<Notify>,
) -> Result<HeartbeatSender> {
let client_id = meta_client.id();

let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;

let mut _last_received_lease = Instant::now();
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

let _handle = common_runtime::spawn_bg(async move {
while let Some(res) = match rx.message().await {
Ok(m) => m,
Expand All @@ -114,6 +117,28 @@ impl HeartbeatTask {
if let Some(msg) = res.mailbox_message.as_ref() {
info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
}
if let Some(lease) = res.region_lease.as_ref() {
metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
.set(_last_received_lease.elapsed().as_millis() as i64);
// Resets the timer.
_last_received_lease = Instant::now();
WenyXu marked this conversation as resolved.
Show resolved Hide resolved

let mut leader_region_lease_count = 0;
let mut follower_region_lease_count = 0;
for lease in &lease.regions {
match lease.role() {
RegionRole::Leader => leader_region_lease_count += 1,
RegionRole::Follower => follower_region_lease_count += 1,
}
}

metrics::HEARTBEAT_REGION_LEASES
.with_label_values(&["leader"])
.set(leader_region_lease_count);
metrics::HEARTBEAT_REGION_LEASES
.with_label_values(&["follower"])
.set(follower_region_lease_count);
}
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
error!(e; "Error while handling heartbeat response");
Expand Down
15 changes: 15 additions & 0 deletions src/datanode/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use prometheus::*;
/// Region request type label.
pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type";

pub const REGION_ROLE: &str = "region_role";

lazy_static! {
/// The elapsed time of handling a request in the region_server.
pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
Expand All @@ -26,4 +28,17 @@ lazy_static! {
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The elapsed time since the last received heartbeat.
pub static ref LAST_RECEIVED_HEARTBEAT_ELAPSED: IntGauge = register_int_gauge!(
"last_received_heartbeat_lease_elapsed",
"last received heartbeat lease elapsed",
)
.unwrap();
/// The received region leases via heartbeat.
pub static ref HEARTBEAT_REGION_LEASES: IntGaugeVec = register_int_gauge_vec!(
"heartbeat_region_leases",
"received region leases via heartbeat",
&[REGION_ROLE]
)
.unwrap();
}
13 changes: 13 additions & 0 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::info;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -123,6 +124,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
if !closable.is_empty() {
info!(
"Granting region lease, found closable leader regions: {:?} on datanode {}",
closable, datanode_id
);
}
inactive_regions.extend(closable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();
Expand All @@ -144,6 +151,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
if !closable.is_empty() {
info!(
"Granting region lease, found closable follower regions {:?} on datanode {}",
closable, datanode_id
);
}
inactive_regions.extend(closable);

acc.inactive_region_ids = inactive_regions;
Expand Down
10 changes: 10 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,14 @@ lazy_static! {
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
pub static ref METRIC_META_LOAD_FOLLOWER_METADATA: Histogram = register_histogram!(
"meta_load_follower_metadata",
"meta load follower regions metadata elapsed"
)
.unwrap();
pub static ref METRIC_META_LOAD_LEADER_METADATA: Histogram = register_histogram!(
"meta_load_leader_metadata",
"meta load leader regions metadata elapsed"
)
.unwrap();
}
22 changes: 20 additions & 2 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use std::sync::{Arc, RwLock};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::DatanodeId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

use self::mito::find_staled_leader_regions;
use crate::error::{self, Result};
use crate::metrics;
use crate::region::lease_keeper::utils::find_staled_follower_regions;

pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
Expand Down Expand Up @@ -89,7 +91,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut closable_set = HashSet::new();
let mut downgradable_set = HashSet::new();
Expand All @@ -104,6 +110,10 @@ impl RegionLeaseKeeper {
downgradable_set.extend(downgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable leader regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand All @@ -128,7 +138,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut upgradable_set = HashSet::new();
let mut closable_set = HashSet::new();
Expand All @@ -143,6 +157,10 @@ impl RegionLeaseKeeper {
upgradable_set.extend(upgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable followers regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand Down
Loading