Skip to content

Commit

Permalink
refactor(common): remove dead code and simplify (#17585)
Browse files Browse the repository at this point in the history
Signed-off-by: xxchan <[email protected]>
  • Loading branch information
xxchan authored Jul 9, 2024
1 parent b3e1fad commit c9a6a1f
Show file tree
Hide file tree
Showing 24 changed files with 53 additions and 387 deletions.
12 changes: 8 additions & 4 deletions src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

pub mod metrics_manager;
pub mod observer_manager;
mod metrics_manager;
mod observer_manager;
mod tracing;

pub use metrics_manager::MetricsManager;

pub mod tracing;
pub use observer_manager::{
Channel, NotificationClient, ObserverError, ObserverManager, ObserverState,
RpcNotificationClient,
};
pub use tracing::TracingExtractLayer;
49 changes: 3 additions & 46 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,42 +22,6 @@ use thiserror_ext::AsReport;
use tokio::task::JoinHandle;
use tonic::{Status, Streaming};

pub trait SubscribeTypeEnum {
fn subscribe_type() -> SubscribeType;
}

pub struct SubscribeFrontend {}

impl SubscribeTypeEnum for SubscribeFrontend {
fn subscribe_type() -> SubscribeType {
SubscribeType::Frontend
}
}

pub struct SubscribeHummock {}

impl SubscribeTypeEnum for SubscribeHummock {
fn subscribe_type() -> SubscribeType {
SubscribeType::Hummock
}
}

pub struct SubscribeCompactor {}

impl SubscribeTypeEnum for SubscribeCompactor {
fn subscribe_type() -> SubscribeType {
SubscribeType::Compactor
}
}

pub struct SubscribeCompute {}

impl SubscribeTypeEnum for SubscribeCompute {
fn subscribe_type() -> SubscribeType {
SubscribeType::Compute
}
}

/// `ObserverManager` is used to update data based on notification from meta.
/// Call `start` to spawn a new asynchronous task
/// We can write the notification logic by implementing `ObserverNodeImpl`.
Expand All @@ -68,7 +32,7 @@ pub struct ObserverManager<T: NotificationClient, S: ObserverState> {
}

pub trait ObserverState: Send + 'static {
type SubscribeType: SubscribeTypeEnum;
fn subscribe_type() -> SubscribeType;
/// modify data after receiving notification from meta
fn handle_notification(&mut self, resp: SubscribeResponse);

Expand Down Expand Up @@ -109,10 +73,7 @@ where
S: ObserverState,
{
pub async fn new(client: T, observer_states: S) -> Self {
let rx = client
.subscribe(S::SubscribeType::subscribe_type())
.await
.unwrap();
let rx = client.subscribe(S::subscribe_type()).await.unwrap();
Self {
rx,
client,
Expand Down Expand Up @@ -214,11 +175,7 @@ where
/// `re_subscribe` is used to re-subscribe to the meta's notification.
async fn re_subscribe(&mut self) {
loop {
match self
.client
.subscribe(S::SubscribeType::subscribe_type())
.await
{
match self.client.subscribe(S::subscribe_type()).await {
Ok(rx) => {
tracing::debug!("re-subscribe success");
self.rx = rx;
Expand Down
22 changes: 0 additions & 22 deletions src/common/metrics/src/monitor/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,28 +587,6 @@ impl<L> tonic::transport::server::Router<L> {
}
}

#[cfg(not(madsim))]
pub fn monitored_tcp_incoming(
listen_addr: std::net::SocketAddr,
connection_type: impl Into<String>,
config: TcpConfig,
) -> Result<
MonitoredConnection<tonic::transport::server::TcpIncoming, MonitorNewConnectionImpl>,
Box<dyn std::error::Error + Send + Sync>,
> {
let incoming = tonic::transport::server::TcpIncoming::new(
listen_addr,
config.tcp_nodelay,
config.keepalive_duration,
)?;
Ok(MonitoredConnection::new(
incoming,
MonitorNewConnectionImpl {
connection_type: connection_type.into(),
},
))
}

#[derive(Clone)]
pub struct MonitorNewConnectionImpl {
connection_type: String,
Expand Down
71 changes: 8 additions & 63 deletions src/common/metrics/src/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod connection;
pub mod my_stats;
pub mod process;
pub mod rwlock;
pub use connection::{monitor_connector, EndpointExt, RouterExt, TcpConfig};
pub use rwlock::MonitoredRwLock;

use std::sync::LazyLock;
mod connection;
mod process;
mod rwlock;

use prometheus::core::{
AtomicI64, AtomicU64, Collector, GenericCounter, GenericCounterVec, GenericGauge, Metric,
};
use prometheus::{Histogram, HistogramVec, Registry};
use std::sync::LazyLock;

use crate::monitor::my_stats::MyHistogram;
use crate::monitor::process::monitor_process;
use prometheus::Registry;

#[cfg(target_os = "linux")]
static PAGESIZE: std::sync::LazyLock<i64> =
Expand All @@ -35,59 +31,8 @@ static PAGESIZE: std::sync::LazyLock<i64> =
pub static CLOCK_TICK: std::sync::LazyLock<u64> =
std::sync::LazyLock::new(|| unsafe { libc::sysconf(libc::_SC_CLK_TCK) as u64 });

/// Define extension method `print` used in `print_statistics`.
pub trait Print {
fn print(&self);
}

impl Print for GenericCounter<AtomicU64> {
fn print(&self) {
let desc = &self.desc()[0].fq_name;
let counter = self.metric().get_counter().get_value() as u64;
println!("{desc} COUNT : {counter}");
}
}

impl Print for GenericGauge<AtomicI64> {
fn print(&self) {
let desc = &self.desc()[0].fq_name;
let counter = self.get();
println!("{desc} COUNT : {counter}");
}
}

impl Print for Histogram {
fn print(&self) {
let desc = &self.desc()[0].fq_name;

let histogram = MyHistogram::from_prom_hist(self.metric().get_histogram());
let p50 = histogram.get_percentile(50.0);
let p95 = histogram.get_percentile(95.0);
let p99 = histogram.get_percentile(99.0);
let p100 = histogram.get_percentile(100.0);

let sample_count = self.get_sample_count();
let sample_sum = self.get_sample_sum();
println!("{desc} P50 : {p50} P95 : {p95} P99 : {p99} P100 : {p100} COUNT : {sample_count} SUM : {sample_sum}");
}
}

impl Print for HistogramVec {
fn print(&self) {
let desc = &self.desc()[0].fq_name;
println!("{desc} {:?}", self);
}
}

impl Print for GenericCounterVec<AtomicU64> {
fn print(&self) {
let desc = &self.desc()[0].fq_name;
println!("{desc} {:?}", self);
}
}

pub static GLOBAL_METRICS_REGISTRY: LazyLock<Registry> = LazyLock::new(|| {
let registry = Registry::new();
monitor_process(&registry);
process::monitor_process(&registry);
registry
});
Loading

0 comments on commit c9a6a1f

Please sign in to comment.