diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 6674d65ecd..b7a14cec45 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -35,6 +35,7 @@ use omicron_common::backoff; use omicron_common::FileKv; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; +use oximeter_db::model::OXIMETER_VERSION; use oximeter_db::Client; use oximeter_db::DbWrite; use serde::Deserialize; @@ -455,11 +456,7 @@ impl OximeterAgent { }; let client = Client::new(db_address, &log); let replicated = client.is_oximeter_cluster().await?; - if !replicated { - client.init_single_node_db().await?; - } else { - client.init_replicated_db().await?; - } + client.initialize_db_with_version(replicated, OXIMETER_VERSION).await?; // Spawn the task for aggregating and inserting all metrics tokio::spawn(async move { diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 8629e4b8ef..ffa5d97d52 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -25,7 +25,9 @@ use dropshot::WhichPage; use oximeter::types::Sample; use slog::debug; use slog::error; +use slog::info; use slog::trace; +use slog::warn; use slog::Logger; use std::collections::btree_map::Entry; use std::collections::BTreeMap; @@ -269,7 +271,103 @@ impl Client { .map_err(|e| Error::Database(e.to_string())) } - // Verifies if instance is part of oximeter_cluster + /// Validates that the schema used by the DB matches the version used by + /// the executable using it. + /// + /// This function will wipe metrics data if the version stored within + /// the DB is less than the schema version of Oximeter. + /// If the version in the DB is newer than what is known to Oximeter, an + /// error is returned. + /// + /// NOTE: This function is not safe for concurrent usage! + pub async fn initialize_db_with_version( + &self, + replicated: bool, + expected_version: u64, + ) -> Result<(), Error> { + info!(self.log, "reading db version"); + + // Read the version from the DB + let version = self.read_latest_version().await?; + info!(self.log, "read oximeter database version"; "version" => version); + + // Decide how to conform the on-disk version with this version of + // Oximeter. + if version < expected_version { + info!(self.log, "wiping and re-initializing oximeter schema"); + // If the on-storage version is less than the constant embedded into + // this binary, the DB is out-of-date. Drop it, and re-populate it + // later. + if !replicated { + self.wipe_single_node_db().await?; + self.init_single_node_db().await?; + } else { + self.wipe_replicated_db().await?; + self.init_replicated_db().await?; + } + } else if version > expected_version { + // If the on-storage version is greater than the constant embedded + // into this binary, we may have downgraded. + return Err(Error::Database( + format!( + "Expected version {expected_version}, saw {version}. Downgrading is not supported.", + ) + )); + } else { + // If the version matches, we don't need to update the DB + return Ok(()); + } + + info!(self.log, "inserting current version"; "version" => expected_version); + self.insert_version(expected_version).await?; + Ok(()) + } + + async fn read_latest_version(&self) -> Result { + let sql = format!( + "SELECT MAX(value) FROM {db_name}.version;", + db_name = crate::DATABASE_NAME, + ); + + let version = match self.execute_with_body(sql).await { + Ok(body) if body.is_empty() => { + warn!( + self.log, + "no version in database (treated as 'version 0')" + ); + 0 + } + Ok(body) => body.trim().parse::().map_err(|err| { + Error::Database(format!("Cannot read version: {err}")) + })?, + Err(Error::Database(err)) + // Case 1: The database has not been created. + if err.contains("Database oximeter doesn't exist") || + // Case 2: The database has been created, but it's old (exists + // prior to the version table). + err.contains("Table oximeter.version doesn't exist") => + { + warn!(self.log, "oximeter database does not exist, or is out-of-date"); + 0 + } + Err(err) => { + warn!(self.log, "failed to read version"; "error" => err.to_string()); + return Err(err); + } + }; + Ok(version) + } + + async fn insert_version(&self, version: u64) -> Result<(), Error> { + let sql = format!( + "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", + db_name = crate::DATABASE_NAME, + ); + self.execute_with_body(sql).await?; + Ok(()) + } + + /// Verifies if instance is part of oximeter_cluster pub async fn is_oximeter_cluster(&self) -> Result { let sql = String::from("SHOW CLUSTERS FORMAT JSONEachRow;"); let res = self.execute_with_body(sql).await?; @@ -710,7 +808,8 @@ mod tests { // on the ubuntu CI job with "Failed to detect ClickHouse subprocess within timeout" #[ignore] async fn test_build_replicated() { - let log = slog::Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_build_replicated"); + let log = &logctx.log; // Start all Keeper coordinator nodes let cur_dir = std::env::current_dir().unwrap(); @@ -819,11 +918,14 @@ mod tests { k3.cleanup().await.expect("Failed to cleanup ClickHouse keeper 3"); db_1.cleanup().await.expect("Failed to cleanup ClickHouse server 1"); db_2.cleanup().await.expect("Failed to cleanup ClickHouse server 2"); + + logctx.cleanup_successful(); } #[tokio::test] async fn test_client_insert() { - let log = slog::Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_client_insert"); + let log = &logctx.log; // Let the OS assign a port and discover it after ClickHouse starts let mut db = ClickHouseInstance::new_single_node(0) @@ -845,6 +947,7 @@ mod tests { }; client.insert_samples(&samples).await.unwrap(); db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); } // This is a target with the same name as that in `lib.rs` used for other tests, but with a @@ -1307,7 +1410,8 @@ mod tests { #[tokio::test] async fn test_schema_mismatch() { - let log = slog::Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_schema_mismatch"); + let log = &logctx.log; // Let the OS assign a port and discover it after ClickHouse starts let mut db = ClickHouseInstance::new_single_node(0) @@ -1337,11 +1441,141 @@ mod tests { let result = client.verify_sample_schema(&sample).await; assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + // Returns the number of timeseries schemas being used. + async fn get_schema_count(client: &Client) -> usize { + client + .execute_with_body( + "SELECT * FROM oximeter.timeseries_schema FORMAT JSONEachRow;", + ) + .await + .expect("Failed to SELECT from database") + .lines() + .count() + } + + #[tokio::test] + async fn test_database_version_update_idempotent() { + let logctx = test_setup_log("test_database_version_update_idempotent"); + let log = &logctx.log; + + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + + let replicated = false; + + // Initialize the database... + let client = Client::new(address, &log); + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + + // Insert data here so we can verify it still exists later. + // + // The values here don't matter much, we just want to check that + // the database data hasn't been dropped. + assert_eq!(0, get_schema_count(&client).await); + let sample = test_util::make_sample(); + client.insert_samples(&[sample.clone()]).await.unwrap(); + assert_eq!(1, get_schema_count(&client).await); + + // Re-initialize the database, see that our data still exists + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + + assert_eq!(1, get_schema_count(&client).await); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_database_version_will_not_downgrade() { + let logctx = test_setup_log("test_database_version_will_not_downgrade"); + let log = &logctx.log; + + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + + let replicated = false; + + // Initialize the database + let client = Client::new(address, &log); + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + + // Bump the version of the database to a "too new" version + client + .insert_version(model::OXIMETER_VERSION + 1) + .await + .expect("Failed to insert very new DB version"); + + // Expect a failure re-initializing the client. + // + // This will attempt to initialize the client with "version = + // model::OXIMETER_VERSION", which is "too old". + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION) + .await + .expect_err("Should have failed, downgrades are not supported"); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_database_version_wipes_old_version() { + let logctx = test_setup_log("test_database_version_wipes_old_version"); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + + let replicated = false; + + // Initialize the Client + let client = Client::new(address, &log); + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + + // Insert data here so we can remove it later. + // + // The values here don't matter much, we just want to check that + // the database data gets dropped later. + assert_eq!(0, get_schema_count(&client).await); + let sample = test_util::make_sample(); + client.insert_samples(&[sample.clone()]).await.unwrap(); + assert_eq!(1, get_schema_count(&client).await); + + // If we try to upgrade to a newer version, we'll drop old data. + client + .initialize_db_with_version(replicated, model::OXIMETER_VERSION + 1) + .await + .expect("Should have initialized database successfully"); + assert_eq!(0, get_schema_count(&client).await); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); } #[tokio::test] async fn test_schema_update() { - let log = slog::Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_schema_update"); + let log = &logctx.log; // Let the OS assign a port and discover it after ClickHouse starts let mut db = ClickHouseInstance::new_single_node(0) @@ -1415,6 +1649,7 @@ mod tests { assert_eq!(expected_schema, schema[0]); db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); } async fn setup_filter_testcase() -> (ClickHouseInstance, Client, Vec) @@ -1589,12 +1824,14 @@ mod tests { #[tokio::test] async fn test_bad_database_connection() { - let log = slog::Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_bad_database_connection"); + let log = &logctx.log; let client = Client::new("127.0.0.1:443".parse().unwrap(), &log); assert!(matches!( client.ping().await, Err(Error::DatabaseUnavailable(_)) )); + logctx.cleanup_successful(); } #[tokio::test] @@ -1617,7 +1854,8 @@ mod tests { datum: i64, } - let log = Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_differentiate_by_timeseries_name"); + let log = &logctx.log; // Let the OS assign a port and discover it after ClickHouse starts let db = ClickHouseInstance::new_single_node(0) @@ -1665,6 +1903,7 @@ mod tests { ); assert_eq!(timeseries.target.name, "my_target"); assert_eq!(timeseries.metric.name, "second_metric"); + logctx.cleanup_successful(); } #[derive(Debug, Clone, oximeter::Target)] @@ -1980,7 +2219,8 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - let log = Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_select_timeseries_with_start_time"); + let log = &logctx.log; let client = Client::new(address, &log); client .init_single_node_db() @@ -2015,6 +2255,7 @@ mod tests { } } db.cleanup().await.expect("Failed to cleanup database"); + logctx.cleanup_successful(); } #[tokio::test] @@ -2024,7 +2265,8 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - let log = Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_select_timeseries_with_limit"); + let log = &logctx.log; let client = Client::new(address, &log); client .init_single_node_db() @@ -2133,6 +2375,7 @@ mod tests { ); db.cleanup().await.expect("Failed to cleanup database"); + logctx.cleanup_successful(); } #[tokio::test] @@ -2142,7 +2385,8 @@ mod tests { .await .expect("Failed to start ClickHouse"); let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - let log = Logger::root(slog::Discard, o!()); + let logctx = test_setup_log("test_select_timeseries_with_order"); + let log = &logctx.log; let client = Client::new(address, &log); client .init_single_node_db() @@ -2234,6 +2478,7 @@ mod tests { ); db.cleanup().await.expect("Failed to cleanup database"); + logctx.cleanup_successful(); } #[tokio::test] diff --git a/oximeter/db/src/db-replicated-init.sql b/oximeter/db/src/db-replicated-init.sql index 7b92d967af..7b756f4b0d 100644 --- a/oximeter/db/src/db-replicated-init.sql +++ b/oximeter/db/src/db-replicated-init.sql @@ -1,5 +1,13 @@ CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster; -- +CREATE TABLE IF NOT EXISTS oximeter.version ON CLUSTER oximeter_cluster +( + value UInt64, + timestamp DateTime64(9, 'UTC') +) +ENGINE = ReplicatedMergeTree() +ORDER BY (value, timestamp); +-- CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster ( timeseries_name String, diff --git a/oximeter/db/src/db-single-node-init.sql b/oximeter/db/src/db-single-node-init.sql index 5f805f5725..1f648fd5d5 100644 --- a/oximeter/db/src/db-single-node-init.sql +++ b/oximeter/db/src/db-single-node-init.sql @@ -1,5 +1,13 @@ CREATE DATABASE IF NOT EXISTS oximeter; -- +CREATE TABLE IF NOT EXISTS oximeter.version +( + value UInt64, + timestamp DateTime64(9, 'UTC') +) +ENGINE = MergeTree() +ORDER BY (value, timestamp); +-- CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ( timeseries_name String, diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 1b3b75320f..1314c5c649 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -35,6 +35,14 @@ use std::net::IpAddr; use std::net::Ipv6Addr; use uuid::Uuid; +/// Describes the version of the Oximeter database. +/// +/// See: [crate::Client::initialize_db_with_version] for usage. +/// +/// TODO(#4271): The current implementation of versioning will wipe the metrics +/// database if this number is incremented. +pub const OXIMETER_VERSION: u64 = 1; + // Wrapper type to represent a boolean in the database. // // ClickHouse's type system lacks a boolean, and using `u8` to represent them. This a safe wrapper