Skip to content

Commit

Permalink
refactor: udpate docs and tests for the telemetry crate (#25432)
Browse files Browse the repository at this point in the history
- Introduced traits, `ParquetMetrics` and `SystemInfoProvider` to enable
  writing easier tests
- Uses mockito for code that depends on reqwest::Client and also uses
  mockall to generally mock any traits like `SystemInfoProvider`
- Minor updates to docs
  • Loading branch information
praveen-influx authored Oct 8, 2024
1 parent c4534b0 commit 1f1125c
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 66 deletions.
42 changes: 41 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ indexmap = { version = "2.2.6" }
libc = { version = "0.2" }
mime = "0.3.17"
mockito = { version = "1.4.0", default-features = false }
mockall = { version = "0.13.0" }
num_cpus = "1.16.0"
object_store = "0.10.2"
parking_lot = "0.12.1"
Expand Down
11 changes: 7 additions & 4 deletions influxdb3_server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,12 @@ mod tests {
use influxdb3_id::{DbId, TableId};
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::WalConfig;
use influxdb3_write::last_cache::LastCacheProvider;
use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle;
use influxdb3_write::persister::Persister;
use influxdb3_write::WriteBuffer;
use influxdb3_write::{
last_cache::LastCacheProvider, write_buffer::persisted_files::PersistedFiles,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
use object_store::DynObjectStore;
Expand Down Expand Up @@ -787,9 +789,10 @@ mod tests {
.unwrap(),
);

let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone(
&write_buffer_impl.persisted_files(),
));
let parquet_metrics_provider: Arc<PersistedFiles> =
Arc::clone(&write_buffer_impl.persisted_files());
let dummy_telem_store =
TelemetryStore::new_without_background_runners(parquet_metrics_provider);
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
let common_state = crate::CommonServerState::new(
Arc::clone(&metrics),
Expand Down
16 changes: 9 additions & 7 deletions influxdb3_server/src/query_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,11 @@ mod tests {
use influxdb3_telemetry::store::TelemetryStore;
use influxdb3_wal::{Gen1Duration, WalConfig};
use influxdb3_write::{
last_cache::LastCacheProvider, parquet_cache::test_cached_obj_store_and_oracle,
persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer,
last_cache::LastCacheProvider,
parquet_cache::test_cached_obj_store_and_oracle,
persister::Persister,
write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl},
WriteBuffer,
};
use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig};
use iox_time::{MockProvider, Time};
Expand Down Expand Up @@ -651,7 +654,7 @@ mod tests {
let host_id = Arc::from("dummy-host-id");
let instance_id = Arc::from("instance-id");
let catalog = Arc::new(Catalog::new(host_id, instance_id));
let write_buffer = Arc::new(
let write_buffer_impl = Arc::new(
WriteBufferImpl::new(
Arc::clone(&persister),
Arc::clone(&catalog),
Expand All @@ -670,10 +673,9 @@ mod tests {
.unwrap(),
);

let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone(
&write_buffer.persisted_files(),
));
let write_buffer: Arc<dyn WriteBuffer> = write_buffer;
let persisted_files: Arc<PersistedFiles> = Arc::clone(&write_buffer_impl.persisted_files());
let dummy_telem_store = TelemetryStore::new_without_background_runners(persisted_files);
let write_buffer: Arc<dyn WriteBuffer> = write_buffer_impl;
let metrics = Arc::new(Registry::new());
let df_config = Arc::new(Default::default());
let query_executor = QueryExecutorImpl::new(
Expand Down
5 changes: 2 additions & 3 deletions influxdb3_telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ sysinfo.workspace = true
num.workspace = true
thiserror.workspace = true

# Local Deps
influxdb3_write = { path = "../influxdb3_write" }

[dev-dependencies]
test-log.workspace = true
proptest.workspace = true
mockito.workspace = true
mockall.workspace = true

4 changes: 4 additions & 0 deletions influxdb3_telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,7 @@ pub enum TelemetryError {
}

pub type Result<T, E = TelemetryError> = std::result::Result<T, E>;

pub trait ParquetMetrics: Send + Sync + std::fmt::Debug + 'static {
fn get_metrics(&self) -> (u64, f64, u64);
}
138 changes: 118 additions & 20 deletions influxdb3_telemetry/src/sampler.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,81 @@
use std::{sync::Arc, time::Duration};

#[cfg(test)]
use mockall::{automock, predicate::*};
use observability_deps::tracing::debug;
use sysinfo::{ProcessRefreshKind, System};
use sysinfo::{Pid, ProcessRefreshKind, System};

use crate::store::TelemetryStore;
use crate::Result;
use crate::{store::TelemetryStore, TelemetryError};

struct CpuAndMemorySampler {
#[cfg_attr(test, automock)]
pub trait SystemInfoProvider: Send + Sync + 'static {
fn refresh_metrics(&mut self, pid: Pid);

fn get_pid(&self) -> Result<Pid, &'static str>;

fn get_process_specific_metrics(&self, pid: Pid) -> Option<(f32, u64)>;
}

struct SystemInfo {
system: System,
}

impl CpuAndMemorySampler {
pub fn new(system: System) -> Self {
Self { system }
impl SystemInfo {
pub fn new() -> SystemInfo {
Self {
system: System::new(),
}
}
}

impl SystemInfoProvider for SystemInfo {
/// This method picks the memory and cpu usage for this process using the
/// pid.
pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> {
let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?;
fn refresh_metrics(&mut self, pid: Pid) {
self.system.refresh_pids_specifics(
&[pid],
ProcessRefreshKind::new()
.with_cpu()
.with_memory()
.with_disk_usage(),
);
}

let process = self
.system
.process(pid)
.unwrap_or_else(|| panic!("cannot get process with pid: {}", pid));
fn get_pid(&self) -> Result<Pid, &'static str> {
sysinfo::get_current_pid()
}

fn get_process_specific_metrics<'a>(&self, pid: Pid) -> Option<(f32, u64)> {
let process = self.system.process(pid)?;

let memory_used = process.memory();
let cpu_used = process.cpu_usage();
let memory_used = process.memory();
Some((cpu_used, memory_used))
}
}

struct CpuAndMemorySampler {
system: Box<dyn SystemInfoProvider>,
}

impl CpuAndMemorySampler {
pub fn new(system: impl SystemInfoProvider) -> Self {
Self {
system: Box::new(system),
}
}

pub fn get_cpu_and_mem_used(&mut self) -> Option<(f32, u64)> {
let pid = self.system.get_pid().ok()?;
self.system.refresh_metrics(pid);
let (cpu_used, memory_used) = self.system.get_process_specific_metrics(pid)?;
debug!(
mem_used = ?memory_used,
cpu_used = ?cpu_used,
mem_used = ?memory_used,
"trying to sample data for cpu/memory");

Ok((cpu_used, memory_used))
Some((cpu_used, memory_used))
}
}

Expand All @@ -49,18 +84,81 @@ pub(crate) async fn sample_metrics(
duration_secs: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut sampler = CpuAndMemorySampler::new(System::new());
let mut sampler = CpuAndMemorySampler::new(SystemInfo::new());

// sample every minute
let mut interval = tokio::time::interval(duration_secs);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

loop {
interval.tick().await;
if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() {
store.add_cpu_and_memory(cpu_used, memory_used);
store.rollup_events();
}
sample_all_metrics(&mut sampler, &store);
}
})
}

fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc<TelemetryStore>) {
if let Some((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() {
store.add_cpu_and_memory(cpu_used, memory_used);
} else {
debug!("Cannot get cpu/mem usage stats for this process");
}
store.rollup_events();
}

#[cfg(test)]
mod tests {

use crate::ParquetMetrics;

use super::*;

#[derive(Debug)]
struct MockParquetMetrics;

impl ParquetMetrics for MockParquetMetrics {
fn get_metrics(&self) -> (u64, f64, u64) {
(10, 20.0, 30)
}
}

#[test]
fn test_sample_all_metrics() {
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));

mock_sys_info_provider
.expect_get_pid()
.return_const(Ok(Pid::from(5)));
mock_sys_info_provider
.expect_refresh_metrics()
.return_const(());
mock_sys_info_provider
.expect_get_process_specific_metrics()
.return_const(Some((10.0f32, 100u64)));

let mut sampler = CpuAndMemorySampler::new(mock_sys_info_provider);

sample_all_metrics(&mut sampler, &store);
}

#[test]
fn test_sample_all_metrics_with_call_failure() {
let mut mock_sys_info_provider = MockSystemInfoProvider::new();
let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics));

mock_sys_info_provider
.expect_get_pid()
.return_const(Ok(Pid::from(5)));
mock_sys_info_provider
.expect_refresh_metrics()
.return_const(());
mock_sys_info_provider
.expect_get_process_specific_metrics()
.return_const(None);

let mut sampler = CpuAndMemorySampler::new(mock_sys_info_provider);

sample_all_metrics(&mut sampler, &store);
}
}
Loading

0 comments on commit 1f1125c

Please sign in to comment.