diff --git a/Cargo.lock b/Cargo.lock index 24c5532091e..5bdee84c16c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1792,6 +1792,12 @@ version = "0.15.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "ed25519" version = "2.2.3" @@ -1995,6 +2001,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa" + [[package]] name = "futures" version = "0.3.31" @@ -2789,7 +2801,8 @@ version = "0.1.0" dependencies = [ "futures", "futures-util", - "influxdb3_write", + "mockall", + "mockito", "num", "observability_deps", "parking_lot", @@ -2867,6 +2880,7 @@ dependencies = [ "influxdb-line-protocol", "influxdb3_catalog", "influxdb3_id", + "influxdb3_telemetry", "influxdb3_test_helpers", "influxdb3_wal", "insta", @@ -3478,6 +3492,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4c28b3fb6d753d28c20e826cd46ee611fda1cf3cde03a443a974043247c065a" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "341014e7f530314e9a1fdbc7400b244efea7122662c96bfa248c31da5bfb2020" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn 2.0.79", +] + [[package]] name = "mockito" version = "1.5.0" diff --git a/Cargo.toml b/Cargo.toml index 0f0266d5f64..a3b20c951ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -74,6 +74,7 @@ indexmap = { version = "2.2.6" } libc = { version = "0.2" } mime = "0.3.17" mockito = { version = "1.4.0", default-features = false } +mockall = { version = "0.13.0" } num_cpus = "1.16.0" object_store = "0.10.2" parking_lot = "0.12.1" diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index 3c4beb541f7..8629c8f547c 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -236,10 +236,12 @@ mod tests { use influxdb3_id::{DbId, TableId}; use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::WalConfig; - use influxdb3_write::last_cache::LastCacheProvider; use influxdb3_write::parquet_cache::test_cached_obj_store_and_oracle; use influxdb3_write::persister::Persister; use influxdb3_write::WriteBuffer; + use influxdb3_write::{ + last_cache::LastCacheProvider, write_buffer::persisted_files::PersistedFiles, + }; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig}; use iox_time::{MockProvider, Time}; use object_store::DynObjectStore; @@ -787,9 +789,10 @@ mod tests { .unwrap(), ); - let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone( - &write_buffer_impl.persisted_files(), - )); + let parquet_metrics_provider: Arc = + Arc::clone(&write_buffer_impl.persisted_files()); + let dummy_telem_store = + TelemetryStore::new_without_background_runners(parquet_metrics_provider); let write_buffer: Arc = write_buffer_impl; let common_state = crate::CommonServerState::new( Arc::clone(&metrics), diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index e45b176a9fd..7d5d29dcc6f 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -604,8 +604,11 @@ mod tests { use influxdb3_telemetry::store::TelemetryStore; use influxdb3_wal::{Gen1Duration, WalConfig}; use influxdb3_write::{ - last_cache::LastCacheProvider, parquet_cache::test_cached_obj_store_and_oracle, - persister::Persister, write_buffer::WriteBufferImpl, WriteBuffer, + last_cache::LastCacheProvider, + parquet_cache::test_cached_obj_store_and_oracle, + persister::Persister, + write_buffer::{persisted_files::PersistedFiles, WriteBufferImpl}, + WriteBuffer, }; use iox_query::exec::{DedicatedExecutor, Executor, ExecutorConfig}; use iox_time::{MockProvider, Time}; @@ -651,7 +654,7 @@ mod tests { let host_id = Arc::from("dummy-host-id"); let instance_id = Arc::from("instance-id"); let catalog = Arc::new(Catalog::new(host_id, instance_id)); - let write_buffer = Arc::new( + let write_buffer_impl = Arc::new( WriteBufferImpl::new( Arc::clone(&persister), Arc::clone(&catalog), @@ -670,10 +673,9 @@ mod tests { .unwrap(), ); - let dummy_telem_store = TelemetryStore::new_without_background_runners(Arc::clone( - &write_buffer.persisted_files(), - )); - let write_buffer: Arc = write_buffer; + let persisted_files: Arc = Arc::clone(&write_buffer_impl.persisted_files()); + let dummy_telem_store = TelemetryStore::new_without_background_runners(persisted_files); + let write_buffer: Arc = write_buffer_impl; let metrics = Arc::new(Registry::new()); let df_config = Arc::new(Default::default()); let query_executor = QueryExecutorImpl::new( diff --git a/influxdb3_telemetry/Cargo.toml b/influxdb3_telemetry/Cargo.toml index 1317011f02f..f16f38c4b45 100644 --- a/influxdb3_telemetry/Cargo.toml +++ b/influxdb3_telemetry/Cargo.toml @@ -18,10 +18,9 @@ sysinfo.workspace = true num.workspace = true thiserror.workspace = true -# Local Deps -influxdb3_write = { path = "../influxdb3_write" } - [dev-dependencies] test-log.workspace = true proptest.workspace = true +mockito.workspace = true +mockall.workspace = true diff --git a/influxdb3_telemetry/src/lib.rs b/influxdb3_telemetry/src/lib.rs index 43ef8fb0f94..fee6b5e80f5 100644 --- a/influxdb3_telemetry/src/lib.rs +++ b/influxdb3_telemetry/src/lib.rs @@ -20,3 +20,7 @@ pub enum TelemetryError { } pub type Result = std::result::Result; + +pub trait ParquetMetrics: Send + Sync + std::fmt::Debug + 'static { + fn get_metrics(&self) -> (u64, f64, u64); +} diff --git a/influxdb3_telemetry/src/sampler.rs b/influxdb3_telemetry/src/sampler.rs index 9f4128eeb60..6600415274a 100644 --- a/influxdb3_telemetry/src/sampler.rs +++ b/influxdb3_telemetry/src/sampler.rs @@ -1,24 +1,38 @@ use std::{sync::Arc, time::Duration}; +#[cfg(test)] +use mockall::{automock, predicate::*}; use observability_deps::tracing::debug; -use sysinfo::{ProcessRefreshKind, System}; +use sysinfo::{Pid, ProcessRefreshKind, System}; +use crate::store::TelemetryStore; use crate::Result; -use crate::{store::TelemetryStore, TelemetryError}; -struct CpuAndMemorySampler { +#[cfg_attr(test, automock)] +pub trait SystemInfoProvider: Send + Sync + 'static { + fn refresh_metrics(&mut self, pid: Pid); + + fn get_pid(&self) -> Result; + + fn get_process_specific_metrics(&self, pid: Pid) -> Option<(f32, u64)>; +} + +struct SystemInfo { system: System, } -impl CpuAndMemorySampler { - pub fn new(system: System) -> Self { - Self { system } +impl SystemInfo { + pub fn new() -> SystemInfo { + Self { + system: System::new(), + } } +} +impl SystemInfoProvider for SystemInfo { /// This method picks the memory and cpu usage for this process using the /// pid. - pub fn get_cpu_and_mem_used(&mut self) -> Result<(f32, u64)> { - let pid = sysinfo::get_current_pid().map_err(TelemetryError::CannotGetPid)?; + fn refresh_metrics(&mut self, pid: Pid) { self.system.refresh_pids_specifics( &[pid], ProcessRefreshKind::new() @@ -26,21 +40,42 @@ impl CpuAndMemorySampler { .with_memory() .with_disk_usage(), ); + } - let process = self - .system - .process(pid) - .unwrap_or_else(|| panic!("cannot get process with pid: {}", pid)); + fn get_pid(&self) -> Result { + sysinfo::get_current_pid() + } + + fn get_process_specific_metrics<'a>(&self, pid: Pid) -> Option<(f32, u64)> { + let process = self.system.process(pid)?; - let memory_used = process.memory(); let cpu_used = process.cpu_usage(); + let memory_used = process.memory(); + Some((cpu_used, memory_used)) + } +} + +struct CpuAndMemorySampler { + system: Box, +} +impl CpuAndMemorySampler { + pub fn new(system: impl SystemInfoProvider) -> Self { + Self { + system: Box::new(system), + } + } + + pub fn get_cpu_and_mem_used(&mut self) -> Option<(f32, u64)> { + let pid = self.system.get_pid().ok()?; + self.system.refresh_metrics(pid); + let (cpu_used, memory_used) = self.system.get_process_specific_metrics(pid)?; debug!( - mem_used = ?memory_used, cpu_used = ?cpu_used, + mem_used = ?memory_used, "trying to sample data for cpu/memory"); - Ok((cpu_used, memory_used)) + Some((cpu_used, memory_used)) } } @@ -49,7 +84,7 @@ pub(crate) async fn sample_metrics( duration_secs: Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let mut sampler = CpuAndMemorySampler::new(System::new()); + let mut sampler = CpuAndMemorySampler::new(SystemInfo::new()); // sample every minute let mut interval = tokio::time::interval(duration_secs); @@ -57,10 +92,73 @@ pub(crate) async fn sample_metrics( loop { interval.tick().await; - if let Ok((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() { - store.add_cpu_and_memory(cpu_used, memory_used); - store.rollup_events(); - } + sample_all_metrics(&mut sampler, &store); } }) } + +fn sample_all_metrics(sampler: &mut CpuAndMemorySampler, store: &Arc) { + if let Some((cpu_used, memory_used)) = sampler.get_cpu_and_mem_used() { + store.add_cpu_and_memory(cpu_used, memory_used); + } else { + debug!("Cannot get cpu/mem usage stats for this process"); + } + store.rollup_events(); +} + +#[cfg(test)] +mod tests { + + use crate::ParquetMetrics; + + use super::*; + + #[derive(Debug)] + struct MockParquetMetrics; + + impl ParquetMetrics for MockParquetMetrics { + fn get_metrics(&self) -> (u64, f64, u64) { + (10, 20.0, 30) + } + } + + #[test] + fn test_sample_all_metrics() { + let mut mock_sys_info_provider = MockSystemInfoProvider::new(); + let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics)); + + mock_sys_info_provider + .expect_get_pid() + .return_const(Ok(Pid::from(5))); + mock_sys_info_provider + .expect_refresh_metrics() + .return_const(()); + mock_sys_info_provider + .expect_get_process_specific_metrics() + .return_const(Some((10.0f32, 100u64))); + + let mut sampler = CpuAndMemorySampler::new(mock_sys_info_provider); + + sample_all_metrics(&mut sampler, &store); + } + + #[test] + fn test_sample_all_metrics_with_call_failure() { + let mut mock_sys_info_provider = MockSystemInfoProvider::new(); + let store = TelemetryStore::new_without_background_runners(Arc::from(MockParquetMetrics)); + + mock_sys_info_provider + .expect_get_pid() + .return_const(Ok(Pid::from(5))); + mock_sys_info_provider + .expect_refresh_metrics() + .return_const(()); + mock_sys_info_provider + .expect_get_process_specific_metrics() + .return_const(None); + + let mut sampler = CpuAndMemorySampler::new(mock_sys_info_provider); + + sample_all_metrics(&mut sampler, &store); + } +} diff --git a/influxdb3_telemetry/src/sender.rs b/influxdb3_telemetry/src/sender.rs index d8eb7ef7888..0fd413b795c 100644 --- a/influxdb3_telemetry/src/sender.rs +++ b/influxdb3_telemetry/src/sender.rs @@ -1,6 +1,7 @@ use std::{sync::Arc, time::Duration}; use observability_deps::tracing::debug; +use reqwest::{IntoUrl, Url}; use serde::Serialize; use crate::store::TelemetryStore; @@ -8,19 +9,27 @@ use crate::{Result, TelemetryError}; pub(crate) struct TelemetrySender { client: reqwest::Client, - req_path: String, + full_url: Url, } impl TelemetrySender { - pub fn new(client: reqwest::Client, req_path: String) -> Self { - Self { client, req_path } + pub fn new(client: reqwest::Client, base_url: impl IntoUrl) -> Self { + let base_url = base_url + .into_url() + .expect("Cannot parse telemetry sender url"); + Self { + client, + full_url: base_url + .join("/api/v3") + .expect("Cannot set the telemetry request path"), + } } - pub async fn try_sending(&self, telemetry: &TelemetryPayload) -> Result<()> { + pub async fn try_sending(&mut self, telemetry: &TelemetryPayload) -> Result<()> { debug!(telemetry = ?telemetry, "trying to send data to telemetry server"); let json = serde_json::to_vec(&telemetry).map_err(TelemetryError::CannotSerializeJson)?; self.client - .post(self.req_path.as_str()) + .post(self.full_url.as_str()) .body(json) .send() .await @@ -77,24 +86,90 @@ pub(crate) async fn send_telemetry_in_background( duration_secs: Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let telem_sender = TelemetrySender::new( + let mut telem_sender = TelemetrySender::new( reqwest::Client::new(), - "https://telemetry.influxdata.foo.com".to_owned(), + "https://telemetry.influxdata.foo.com", ); let mut interval = tokio::time::interval(duration_secs); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; - let telemetry = store.snapshot(); - if let Err(e) = telem_sender.try_sending(&telemetry).await { - // TODO: change to error! - until endpoint is decided keep - // this as debug log - debug!(error = ?e, "Cannot send telemetry"); - } - // if we tried sending and failed, we currently still reset the - // metrics, it is ok to miss few samples - store.reset_metrics(); + send_telemetry(&store, &mut telem_sender).await; } }) } + +async fn send_telemetry(store: &Arc, telem_sender: &mut TelemetrySender) { + let telemetry = store.snapshot(); + if let Err(e) = telem_sender.try_sending(&telemetry).await { + // Not able to send telemetry is not a crucial error + // leave it as debug + debug!(error = ?e, "Cannot send telemetry"); + } + // if we tried sending and failed, we currently still reset the + // metrics, it is ok to miss few samples + store.reset_metrics(); +} + +#[cfg(test)] +mod tests { + use mockito::Server; + use reqwest::Url; + use std::sync::Arc; + + use crate::sender::{TelemetryPayload, TelemetrySender}; + + #[test_log::test(tokio::test)] + async fn test_sending_telemetry() { + let client = reqwest::Client::new(); + let mut mock_server = Server::new_async().await; + let mut sender = TelemetrySender::new(client, mock_server.url()); + let mock = mock_server.mock("POST", "/api/v3").create_async().await; + let telem_payload = create_dummy_payload(); + + let result = sender.try_sending(&telem_payload).await; + + assert!(result.is_ok()); + mock.assert_async().await; + } + + #[test] + fn test_url_join() { + let url = Url::parse("https://foo.com/").unwrap(); + let new_url = url.join("/foo").unwrap(); + assert_eq!("https://foo.com/foo", new_url.as_str()); + } + + fn create_dummy_payload() -> TelemetryPayload { + TelemetryPayload { + os: Arc::from("dummy-str"), + version: Arc::from("dummy-str"), + storage_type: Arc::from("dummy-str"), + instance_id: Arc::from("dummy-str"), + cores: 10, + product_type: "OSS", + cpu_utilization_percent_min: 100.0, + cpu_utilization_percent_max: 100.0, + cpu_utilization_percent_avg: 100.0, + memory_used_mb_min: 250, + memory_used_mb_max: 250, + memory_used_mb_avg: 250, + write_requests_min: 100, + write_requests_max: 100, + write_requests_avg: 100, + write_lines_min: 200_000, + write_lines_max: 200_000, + write_lines_avg: 200_000, + write_mb_min: 15, + write_mb_max: 15, + write_mb_avg: 15, + query_requests_min: 15, + query_requests_max: 15, + query_requests_avg: 15, + parquet_file_count: 100, + parquet_file_size_mb: 100.0, + parquet_row_count: 100, + } + } +} diff --git a/influxdb3_telemetry/src/stats.rs b/influxdb3_telemetry/src/stats.rs index f89ad4d8493..9e5c751f52e 100644 --- a/influxdb3_telemetry/src/stats.rs +++ b/influxdb3_telemetry/src/stats.rs @@ -102,8 +102,7 @@ impl Stats { /// It calculates min/max/avg by using already calculated min/max/avg for /// possibly a higher resolution. /// -/// For eg. -/// +/// # Example /// Let's say we're looking at the stats for number of lines written. /// And we have 1st sample's minimum was 20 and the 3rd sample's /// minimum was 10. This means in the 1st sample for a whole minute diff --git a/influxdb3_telemetry/src/store.rs b/influxdb3_telemetry/src/store.rs index b3c7e1ead93..b0874bbcf19 100644 --- a/influxdb3_telemetry/src/store.rs +++ b/influxdb3_telemetry/src/store.rs @@ -1,6 +1,5 @@ use std::{sync::Arc, time::Duration}; -use influxdb3_write::write_buffer::persisted_files::PersistedFiles; use num::Float; use observability_deps::tracing::{debug, warn}; @@ -9,6 +8,7 @@ use crate::{ metrics::{Cpu, Memory, Queries, Writes}, sampler::sample_metrics, sender::{send_telemetry_in_background, TelemetryPayload}, + ParquetMetrics, }; /// This store is responsible for holding all the stats which will be sent in the background @@ -26,7 +26,7 @@ use crate::{ #[derive(Debug)] pub struct TelemetryStore { inner: parking_lot::Mutex, - persisted_files: Arc, + persisted_files: Arc, } const SAMPLER_INTERVAL_SECS: u64 = 60; @@ -39,7 +39,7 @@ impl TelemetryStore { influx_version: Arc, storage_type: Arc, cores: usize, - persisted_files: Arc, + persisted_files: Arc, ) -> Arc { debug!( instance_id = ?instance_id, @@ -66,7 +66,7 @@ impl TelemetryStore { store } - pub fn new_without_background_runners(persisted_files: Arc) -> Arc { + pub fn new_without_background_runners(persisted_files: Arc) -> Arc { let instance_id = Arc::from("dummy-instance-id"); let os = Arc::from("Linux"); let influx_version = Arc::from("influxdb3-0.1.0"); @@ -284,20 +284,26 @@ mod tests { use super::*; + #[derive(Debug)] + struct DummyParquetMetrics; + + impl ParquetMetrics for DummyParquetMetrics { + fn get_metrics(&self) -> (u64, f64, u64) { + (200, 500.25, 100) + } + } + #[test_log::test(tokio::test)] async fn test_telemetry_store_cpu_mem() { - let persisted_snapshots = Vec::new(); // create store - let persisted_files = Arc::from(PersistedFiles::new_from_persisted_snapshots( - persisted_snapshots, - )); + let parqet_file_metrics = Arc::new(DummyParquetMetrics); let store: Arc = TelemetryStore::new( Arc::from("some-instance-id"), Arc::from("Linux"), Arc::from("OSS-v3.0"), Arc::from("Memory"), 10, - persisted_files, + parqet_file_metrics, ) .await; // check snapshot @@ -327,9 +333,9 @@ mod tests { assert_eq!(expected_mem_in_mb, snapshot.memory_used_mb_min); assert_eq!(128, snapshot.memory_used_mb_max); assert_eq!(122, snapshot.memory_used_mb_avg); - assert_eq!(0, snapshot.parquet_file_count); - assert_eq!(0.0, snapshot.parquet_file_size_mb); - assert_eq!(0, snapshot.parquet_row_count); + assert_eq!(200, snapshot.parquet_file_count); + assert_eq!(500.25, snapshot.parquet_file_size_mb); + assert_eq!(100, snapshot.parquet_row_count); // add some writes store.add_write_metrics(100, 100); diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index 359735b8e9d..366cc19d68c 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -23,6 +23,7 @@ influxdb3_catalog = { path = "../influxdb3_catalog" } influxdb3_id = { path = "../influxdb3_id" } influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } influxdb3_wal = { path = "../influxdb3_wal" } +influxdb3_telemetry = { path = "../influxdb3_telemetry" } # crates.io dependencies anyhow.workspace = true diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index ea742de97cd..bbd7b1833cc 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -6,6 +6,7 @@ use crate::{ParquetFile, PersistedSnapshot}; use hashbrown::HashMap; use influxdb3_id::DbId; use influxdb3_id::TableId; +use influxdb3_telemetry::ParquetMetrics; use parking_lot::RwLock; type DatabaseToTables = HashMap; @@ -55,9 +56,11 @@ impl PersistedFiles { files } +} +impl ParquetMetrics for PersistedFiles { /// Get parquet file metrics, file count, row count and size in MB - pub fn get_metrics(&self) -> (u64, f64, u64) { + fn get_metrics(&self) -> (u64, f64, u64) { let inner = self.inner.read(); ( inner.parquet_files_count,