Skip to content

Commit

Permalink
chore: add logs and metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 2, 2023
1 parent 781f242 commit 667b60f
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 5 deletions.
6 changes: 5 additions & 1 deletion src/datanode/src/alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,12 @@ impl RegionAliveKeeper {
let (role, region_id) = (region.role().into(), RegionId::from(region.region_id));
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(role, deadline).await;
} else {
warn!(
"Trying to renew the lease for region {region_id}, the keeper handler is not found!"
);
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ impl HeartbeatTask {
opts.heartbeat.interval.as_millis() as u64,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
region_alive_keeper.clone(),
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
region_alive_keeper.clone(),
]));

Ok(Self {
Expand Down
9 changes: 9 additions & 0 deletions src/meta-srv/src/handler/region_lease_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
use async_trait::async_trait;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::info;
use store_api::region_engine::{GrantedRegion, RegionRole};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -123,6 +124,10 @@ impl HeartbeatHandler for RegionLeaseHandler {
&leaders,
RegionRole::Leader,
);
info!(
"Granting region lease, found closable leader regions: {:?} on datanode {}",
closable, datanode_id
);
inactive_regions.extend(closable);

let followers = followers.into_iter().flatten().collect::<Vec<_>>();
Expand All @@ -144,6 +149,10 @@ impl HeartbeatHandler for RegionLeaseHandler {
&followers,
RegionRole::Follower,
);
info!(
"Granting region lease, found closable follower regions {:?} on datanode {}",
closable, datanode_id
);
inactive_regions.extend(closable);

acc.inactive_region_ids = inactive_regions;
Expand Down
16 changes: 16 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,20 @@ lazy_static! {
pub static ref METRIC_META_LEADER_CACHED_KV_LOAD: HistogramVec =
register_histogram_vec!("meta_leader_cache_kv_load", "meta load cache", &["prefix"])
.unwrap();
pub static ref METRIC_META_LOAD_FOLLOWER_METADATA: Histogram = register_histogram!(
"meta_load_follower_metadata",
"meta load follower regions metadata elapsed"
)
.unwrap();
pub static ref METRIC_META_LOAD_LEADER_METADATA: Histogram = register_histogram!(
"meta_load_leader_metadata",
"meta load leader regions metadata elapsed"
)
.unwrap();
pub static ref METRIC_META_HANDLE_HEARTBEAT: HistogramVec = register_histogram_vec!(
"meta_handle_heartbeat",
"meta handle heartbeat elapsed",
&["pusher"]
)
.unwrap();
}
22 changes: 20 additions & 2 deletions src/meta-srv/src/region/lease_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ use std::sync::{Arc, RwLock};
use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::DatanodeId;
use common_telemetry::warn;
use snafu::ResultExt;
use store_api::storage::{RegionId, TableId};

use self::mito::find_staled_leader_regions;
use crate::error::{self, Result};
use crate::metrics;
use crate::region::lease_keeper::utils::find_staled_follower_regions;

pub type RegionLeaseKeeperRef = Arc<RegionLeaseKeeper>;
Expand Down Expand Up @@ -89,7 +91,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_LEADER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut closable_set = HashSet::new();
let mut downgradable_set = HashSet::new();
Expand All @@ -104,6 +110,10 @@ impl RegionLeaseKeeper {
downgradable_set.extend(downgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable leader regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand All @@ -128,7 +138,11 @@ impl RegionLeaseKeeper {
) -> Result<(HashSet<RegionId>, HashSet<RegionId>)> {
let tables = self.collect_tables(datanode_regions);
let table_ids = tables.keys().copied().collect::<Vec<_>>();
let metadata_subset = self.collect_tables_metadata(&table_ids).await?;

let metadata_subset = {
let _timer = metrics::METRIC_META_LOAD_FOLLOWER_METADATA.start_timer();
self.collect_tables_metadata(&table_ids).await?
};

let mut upgradable_set = HashSet::new();
let mut closable_set = HashSet::new();
Expand All @@ -143,6 +157,10 @@ impl RegionLeaseKeeper {
upgradable_set.extend(upgradable);
closable_set.extend(closable);
} else {
warn!(
"The table {} metadata is not found, appends closable followers regions: {:?}",
table_id, regions
);
// If table metadata is not found.
closable_set.extend(regions);
}
Expand Down
5 changes: 4 additions & 1 deletion src/meta-srv/src/service/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Streaming};

use crate::error;
use crate::error::Result;
use crate::handler::Pusher;
use crate::metasrv::{Context, MetaSrv};
use crate::service::{GrpcResult, GrpcStream};
use crate::{error, metrics};

#[async_trait::async_trait]
impl heartbeat_server::Heartbeat for MetaSrv {
Expand Down Expand Up @@ -69,6 +69,9 @@ impl heartbeat_server::Heartbeat for MetaSrv {
handler_group.register(&key, pusher).await;
pusher_key = Some(key);
}
// Safety: must exist.
let _timer = metrics::METRIC_META_HANDLE_HEARTBEAT
.with_label_values(&[pusher_key.as_ref().unwrap()]);

let res = handler_group
.handle(req, ctx.clone())
Expand Down

0 comments on commit 667b60f

Please sign in to comment.