Skip to content

Commit

Permalink
chore: add logs and metrics (#2858)
Browse files Browse the repository at this point in the history
* chore: add logs and metrics

* feat: add the timer to track heartbeat intervel

* feat: add the gauge to track region leases

* refactor: use gauge instead of the timer

* chore: apply suggestions from CR

* feat: add hit rate and etcd txn metrics
  • Loading branch information
WenyXu authored Dec 4, 2023
1 parent 781f242 commit c26f2f9
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/common/meta/src/kv_backend/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ impl EtcdStore {
async fn do_multi_txn(&self, txn_ops: Vec<TxnOp>) -> Result<Vec<TxnResponse>> {
if txn_ops.len() < MAX_TXN_SIZE {
// fast path
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
.start_timer();
let txn = Txn::new().and_then(txn_ops);
let txn_res = self
.client
Expand All @@ -81,6 +84,9 @@ impl EtcdStore {
let txns = txn_ops
.chunks(MAX_TXN_SIZE)
.map(|part| async move {
let _timer = METRIC_META_TXN_REQUEST
.with_label_values(&["etcd", "txn"])
.start_timer();
let txn = Txn::new().and_then(part);
self.client.kv_client().txn(txn).await
})
Expand Down
6 changes: 5 additions & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ impl RegionAliveKeeper {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(role, deadline).await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
);
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}

Expand Down
29 changes: 27 additions & 2 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::alive_keeper::RegionAliveKeeper;
use crate::config::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
use crate::event_listener::RegionServerEventReceiver;
use crate::metrics;
use crate::region_server::RegionServer;

pub(crate) mod handler;
Expand Down Expand Up @@ -72,9 +73,9 @@ impl HeartbeatTask {
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
region_alive_keeper.clone(),
]));

Ok(Self {
Expand All @@ -101,8 +102,10 @@ impl HeartbeatTask {
quit_signal: Arc<Notify>,
) -> Result<HeartbeatSender> {
let client_id = meta_client.id();

let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;

let mut last_received_lease = Instant::now();

let _handle = common_runtime::spawn_bg(async move {
while let Some(res) = match rx.message().await {
Ok(m) => m,
Expand All @@ -114,6 +117,28 @@ impl HeartbeatTask {
if let Some(msg) = res.mailbox_message.as_ref() {
info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
}
if let Some(lease) = res.region_lease.as_ref() {
metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
.set(last_received_lease.elapsed().as_millis() as i64);
// Resets the timer.
last_received_lease = Instant::now();

let mut leader_region_lease_count = 0;
let mut follower_region_lease_count = 0;
for lease in &lease.regions {
match lease.role() {
RegionRole::Leader => leader_region_lease_count += 1,
RegionRole::Follower => follower_region_lease_count += 1,
}
}

metrics::HEARTBEAT_REGION_LEASES
.with_label_values(&["leader"])
.set(leader_region_lease_count);
metrics::HEARTBEAT_REGION_LEASES
.with_label_values(&["follower"])
.set(follower_region_lease_count);
}
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
error!(e; "Error while handling heartbeat response");
Expand Down
15 changes: 15 additions & 0 deletions src/datanode/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use prometheus::*;
/// Region request type label.
pub const REGION_REQUEST_TYPE: &str = "datanode_region_request_type";

pub const REGION_ROLE: &str = "region_role";

lazy_static! {
/// The elapsed time of handling a request in the region_server.
pub static ref HANDLE_REGION_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
Expand All @@ -26,4 +28,17 @@ lazy_static! {
&[REGION_REQUEST_TYPE]
)
.unwrap();
/// The elapsed time since the last received heartbeat.
pub static ref LAST_RECEIVED_HEARTBEAT_ELAPSED: IntGauge = register_int_gauge!(
"last_received_heartbeat_lease_elapsed",
"last received heartbeat lease elapsed",
)
.unwrap();
/// The received region leases via heartbeat.
pub static ref HEARTBEAT_REGION_LEASES: IntGaugeVec = register_int_gauge_vec!(
"heartbeat_region_leases",
"received region leases via heartbeat",
&[REGION_ROLE]
)
.unwrap();
}
13 changes: 13 additions & 0 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::info;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -123,6 +124,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
if !closable.is_empty() {
info!(
"Granting region lease, found closable leader regions: {:?} on datanode {}",
closable, datanode_id
);
}
inactive_regions.extend(closable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();
Expand All @@ -144,6 +151,12 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
if !closable.is_empty() {
info!(
"Granting region lease, found closable follower regions {:?} on datanode {}",
closable, datanode_id
);
}
inactive_regions.extend(closable);

acc.inactive_region_ids = inactive_regions;
Expand Down
15 changes: 15 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,19 @@ lazy_static! {
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
pub static ref METRIC_META_LOAD_FOLLOWER_METADATA: Histogram = register_histogram!(
"meta_load_follower_metadata",
"meta load follower regions metadata elapsed"
)
.unwrap();
pub static ref METRIC_META_LOAD_LEADER_METADATA: Histogram = register_histogram!(
"meta_load_leader_metadata",
"meta load leader regions metadata elapsed"
)
.unwrap();
pub static ref METRIC_META_KV_CACHE_BATCH_GET_HIT_RATE: Gauge = register_gauge!(
"meta_kv_cache_batch_get_hit_rate",
"meta kv cache batch get hit rate"
)
.unwrap();
}
22 changes: 20 additions & 2 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use std::sync::{Arc, RwLock};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::DatanodeId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

use self::mito::find_staled_leader_regions;
use crate::error::{self, Result};
use crate::metrics;
use crate::region::lease_keeper::utils::find_staled_follower_regions;

pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
Expand Down Expand Up @@ -89,7 +91,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut closable_set = HashSet::new();
let mut downgradable_set = HashSet::new();
Expand All @@ -104,6 +110,10 @@ impl RegionLeaseKeeper {
downgradable_set.extend(downgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable leader regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand All @@ -128,7 +138,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut upgradable_set = HashSet::new();
let mut closable_set = HashSet::new();
Expand All @@ -143,6 +157,10 @@ impl RegionLeaseKeeper {
upgradable_set.extend(upgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable followers regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand Down
4 changes: 4 additions & 0 deletions src/meta-srv/src/service/store/cached_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ impl KvBackend for LeaderCachedKvBackend {
.iter()
.map(|kv| kv.key.clone())
.collect::<HashSet<_>>();

let hit_rate = hit_keys.len() as f64 / req.keys.len() as f64;
metrics::METRIC_META_KV_CACHE_BATCH_GET_HIT_RATE.set(hit_rate);

let missed_keys = req
.keys
.iter()
Expand Down

0 comments on commit c26f2f9

Please sign in to comment.