From accb31315f6a08bf3ddc4d713bff4938f45eb45c Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Wed, 10 Jul 2024 21:09:23 +0000 Subject: [PATCH 1/2] Support deleting timeseries by name during schema upgrades This adds support for listing timeseries by name in a schema upgrade directory, and deleting all records (schema and data) from those timeseries during an offline ClickHouse database upgrade. The main goal here is a relatively simple but effective mechanism to clean up abandoned timeseries, while we figure out how to implement breaking changes more robustly. We alreay have examples of these abandonded timeseries in some existing installations. The existing effort to move timeseries to TOML also presents an opportunity to make one-time breaking changes for individual timeseries. Both of these can be supported with this mechanism. Fixes #5266 --- oximeter/db/src/client/mod.rs | 455 ++++++++++++++++++++++++++++++++++ oximeter/db/src/lib.rs | 20 ++ 2 files changed, 475 insertions(+) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 0c372cedae..8650dc1c76 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -42,6 +42,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::convert::TryFrom; +use std::io::ErrorKind; use std::net::SocketAddr; use std::num::NonZeroU32; use std::ops::Bound; @@ -489,6 +490,29 @@ impl Client { } } } + + // Check if we have a list of timeseries that should be deleted, and + // remove them from the history books. + let to_delete = Self::read_timeseries_to_delete( + replicated, + next_version, + schema_dir, + ) + .await?; + if to_delete.is_empty() { + debug!( + self.log, + "schema upgrade contained timeseries list file, \ + but it did not contain any timeseries names", + ); + } else { + debug!( + self.log, + "schema upgrade includes list of timeseries to be deleted"; + "n_timeseries" => to_delete.len(), + ); + self.expunge_timeseries_by_name(replicated, &to_delete).await?; + } Ok(()) } @@ -960,6 +984,125 @@ impl Client { } Ok(()) } + + /// Given a list of timeseries by name, delete their schema and any + /// associated data records from all tables. + async fn expunge_timeseries_by_name( + &self, + replicated: bool, + to_delete: &[TimeseriesName], + ) -> Result<(), Error> { + // The version table should not have any matching data, but let's avoid + // it entirely anyway. + let tables = self + .list_oximeter_database_tables(ListDetails { + include_version: false, + replicated, + }) + .await?; + + const DELETE_BATCH_SIZE: usize = 1000; + let maybe_on_cluster = if replicated { + format!("ON CLUSTER {}", crate::CLUSTER_NAME) + } else { + String::new() + }; + for chunk in to_delete.chunks(DELETE_BATCH_SIZE) { + let names = chunk + .iter() + .map(|name| format!("'{name}'")) + .collect::>() + .join(","); + debug!( + self.log, + "deleting chunk of timeseries"; + "timeseries_names" => &names, + "n_timeseries" => chunk.len(), + ); + for table in tables.iter() { + let sql = format!( + "ALTER TABLE {}.{} \ + {} \ + DELETE WHERE timeseries_name in ({})", + crate::DATABASE_NAME, + table, + maybe_on_cluster, + names, + ); + debug!( + self.log, + "deleting timeseries from next table"; + "table_name" => table, + "n_timeseries" => chunk.len(), + ); + self.execute(sql).await?; + } + } + Ok(()) + } + + async fn read_timeseries_to_delete( + replicated: bool, + next_version: u64, + schema_dir: &Path, + ) -> Result, Error> { + let version_schema_dir = + Self::full_upgrade_path(replicated, next_version, schema_dir); + let filename = + version_schema_dir.join(crate::TIMESERIES_TO_DELETE_FILE); + match fs::read_to_string(&filename).await { + Ok(contents) => contents + .lines() + .map(|line| line.trim().parse().map_err(Error::from)) + .collect(), + Err(e) if e.kind() == ErrorKind::NotFound => Ok(vec![]), + Err(err) => Err(Error::ReadTimeseriesToDelete { err }), + } + } + + /// List tables in the oximeter database. + async fn list_oximeter_database_tables( + &self, + ListDetails { include_version, replicated }: ListDetails, + ) -> Result, Error> { + let mut sql = format!( + "SELECT name FROM system.tables WHERE database = '{}'", + crate::DATABASE_NAME, + ); + if !include_version { + sql.push_str(" AND name != '"); + sql.push_str(crate::VERSION_TABLE_NAME); + sql.push('\''); + } + // On a cluster, we need to operate on the "local" replicated tables. + if replicated { + sql.push_str(" AND engine = 'ReplicatedMergeTree'"); + } + self.execute_with_body(sql).await.map(|(_summary, body)| { + body.lines().map(ToString::to_string).collect() + }) + } +} + +/// Helper argument to `Client::list_oximeter_database_tables`. +#[derive(Clone, Copy, Debug, PartialEq)] +struct ListDetails { + /// If true, include the version table in the output. + include_version: bool, + /// If true, list tables to operate on in a replicated cluster configuration. + /// + /// NOTE: We would like to always operate on the "top-level table", e.g. + /// `oximeter.measurements_u64`, regardless of whether we're working on the + /// cluster or a single-node setup. Otherwise, we need to know which cluster + /// we're working with, and then query either `measurements_u64` or + /// `measurements_u64_local` based on that. + /// + /// However, while that works for the local tables (even replicated ones), + /// it does _not_ work for the `Distributed` tables that we use as those + /// "top-level tables" in a cluster setup. That table engine does not + /// support mutations. Instead, we need to run those operations on the + /// `*_local` tables. + replicated: bool, } // A regex used to validate supported schema updates. @@ -4422,4 +4565,316 @@ mod tests { }) .collect() } + + // Helper to write a test file containing timeseries to delete. + async fn write_timeseries_to_delete_file( + schema_dir: &Path, + replicated: bool, + version: u64, + names: &[TimeseriesName], + ) { + let subdir = schema_dir + .join(if replicated { "replicated" } else { "single-node" }) + .join(version.to_string()); + tokio::fs::create_dir_all(&subdir) + .await + .expect("failed to make subdirectories"); + let filename = subdir.join(crate::TIMESERIES_TO_DELETE_FILE); + let contents = names + .iter() + .map(ToString::to_string) + .collect::>() + .join("\n"); + tokio::fs::write(&filename, contents) + .await + .expect("failed to write test timeseries to delete file"); + } + + #[tokio::test] + async fn test_read_timeseries_to_delete() { + let names: Vec = + vec!["a:b".parse().unwrap(), "c:d".parse().unwrap()]; + let schema_dir = + tempfile::TempDir::new().expect("failed to make temp dir"); + const VERSION: u64 = 7; + write_timeseries_to_delete_file( + schema_dir.path(), + false, + VERSION, + &names, + ) + .await; + let read = Client::read_timeseries_to_delete( + false, + VERSION, + schema_dir.path(), + ) + .await + .expect("Failed to read timeseries to delete"); + assert_eq!(names, read, "Read incorrect list of timeseries to delete",); + } + + #[tokio::test] + async fn test_read_timeseries_to_delete_empty_file_is_ok() { + let schema_dir = + tempfile::TempDir::new().expect("failed to make temp dir"); + const VERSION: u64 = 7; + write_timeseries_to_delete_file(schema_dir.path(), false, VERSION, &[]) + .await; + let read = Client::read_timeseries_to_delete( + false, + VERSION, + schema_dir.path(), + ) + .await + .expect("Failed to read timeseries to delete"); + assert!(read.is_empty(), "Read incorrect list of timeseries to delete",); + } + + #[tokio::test] + async fn test_read_timeseries_to_delete_nonexistent_file_is_ok() { + let path = PathBuf::from("/this/file/better/not/exist"); + let read = Client::read_timeseries_to_delete(false, 1000000, &path) + .await + .expect("Failed to read timeseries to delete"); + assert!(read.is_empty(), "Read incorrect list of timeseries to delete",); + } + + #[tokio::test] + async fn test_expunge_timeseries_by_name_single_node() { + const TEST_NAME: &str = "test_expunge_timeseries_by_name_single_node"; + let logctx = test_setup_log(TEST_NAME); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(&logctx, 0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + test_expunge_timeseries_by_name_impl(log, address, false).await; + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_expunge_timeseries_by_name_replicated() { + const TEST_NAME: &str = "test_expunge_timeseries_by_name_replicated"; + let logctx = test_setup_log(TEST_NAME); + let mut cluster = create_cluster(&logctx).await; + let address = cluster.replica_1.address; + test_expunge_timeseries_by_name_impl(&logctx.log, address, true).await; + + // TODO-cleanup: These should be arrays. + // See https://github.com/oxidecomputer/omicron/issues/4460. + cluster + .keeper_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 1"); + cluster + .keeper_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 2"); + cluster + .keeper_3 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 3"); + cluster + .replica_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 1"); + cluster + .replica_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 2"); + logctx.cleanup_successful(); + } + + // Implementation of the test for expunging timeseries by name during an + // upgrade. + async fn test_expunge_timeseries_by_name_impl( + log: &Logger, + address: SocketAddr, + replicated: bool, + ) { + usdt::register_probes().unwrap(); + let client = Client::new(address, &log); + + const STARTING_VERSION: u64 = 1; + const NEXT_VERSION: u64 = 2; + const VERSIONS: [u64; 2] = [STARTING_VERSION, NEXT_VERSION]; + + // We need to actually have the oximeter DB here, and the version table, + // since `ensure_schema()` writes out versions to the DB as they're + // applied. + client + .initialize_db_with_version(replicated, STARTING_VERSION) + .await + .expect("failed to initialize test DB"); + + // Let's insert a few samples from two different timeseries. The + // timeseries share some field types and have others that are distinct + // between them, so that we can test that we don't touch tables we + // shouldn't, and only delete the parts we should. + let samples = generate_expunge_timeseries_samples(); + client + .insert_samples(&samples) + .await + .expect("failed to insert test samples"); + let all_timeseries: BTreeSet = samples + .iter() + .map(|s| s.timeseries_name.parse().unwrap()) + .collect(); + assert_eq!(all_timeseries.len(), 2); + + // Count the number of records in all tables, by timeseries. + let mut records_by_timeseries: BTreeMap<_, Vec<_>> = BTreeMap::new(); + let all_tables = client + .list_oximeter_database_tables(ListDetails { + include_version: false, + replicated, + }) + .await + .unwrap(); + for table in all_tables.iter() { + let sql = format!( + "SELECT * FROM {}.{} FORMAT JSONEachRow", + crate::DATABASE_NAME, + table, + ); + let body = client.execute_with_body(sql).await.unwrap().1; + for line in body.lines() { + let json: serde_json::Value = + serde_json::from_str(line.trim()).unwrap(); + let name = json["timeseries_name"].to_string(); + records_by_timeseries.entry(name).or_default().push(json); + } + } + + // Even though we don't need SQL, we need the directory for the first + // version too. + let (schema_dir, _version_dirs) = + create_test_upgrade_schema_directory(replicated, &VERSIONS).await; + + // We don't actually need any SQL files in the version we're upgrading + // to. The function `ensure_schema` will apply any SQL and any + // timeseries to be deleted independently. We're just testing the + // latter. + let to_delete = vec![all_timeseries.first().unwrap().clone()]; + write_timeseries_to_delete_file( + schema_dir.path(), + replicated, + NEXT_VERSION, + &to_delete, + ) + .await; + + // Let's run the "schema upgrade", which should only delete these + // particular timeseries. + client + .ensure_schema(replicated, NEXT_VERSION, schema_dir.path()) + .await + .unwrap(); + + // Look over all tables. + // + // First, we should have zero mentions of the timeseries we've deleted. + for table in all_tables.iter() { + let sql = format!( + "SELECT COUNT() \ + FROM {}.{} \ + WHERE timeseries_name = '{}' + FORMAT CSV", + crate::DATABASE_NAME, + table, + &to_delete[0].to_string(), + ); + let count: u64 = client + .execute_with_body(sql) + .await + .expect("failed to get count of timeseries") + .1 + .trim() + .parse() + .expect("invalid record count from query"); + assert_eq!( + count, 0, + "Should not have any rows associated with the deleted \ + but found {count} records in table {table}", + ); + } + + // We should also still have all the records from the timeseries that we + // did _not_ expunge. + let mut found: BTreeMap<_, Vec<_>> = BTreeMap::new(); + for table in all_tables.iter() { + let sql = format!( + "SELECT * FROM {}.{} FORMAT JSONEachRow", + crate::DATABASE_NAME, + table, + ); + let body = client.execute_with_body(sql).await.unwrap().1; + for line in body.lines() { + let json: serde_json::Value = + serde_json::from_str(line.trim()).unwrap(); + let name = json["timeseries_name"].to_string(); + found.entry(name).or_default().push(json); + } + } + + // Check that all records we found exist in the previous set of found + // records, and that they are identical. + for (name, records) in found.iter() { + let existing_records = records_by_timeseries + .get(name) + .expect("expected to find previous records for timeseries"); + assert_eq!( + records, existing_records, + "Some records from timeseries {name} were removed, \ + but should not have been" + ); + } + } + + fn generate_expunge_timeseries_samples() -> Vec { + #[derive(oximeter::Target)] + struct FirstTarget { + first_field: String, + second_field: Uuid, + } + + #[derive(oximeter::Target)] + struct SecondTarget { + first_field: String, + second_field: bool, + } + + #[derive(oximeter::Metric)] + struct SharedMetric { + datum: u64, + } + + let ft = FirstTarget { + first_field: String::from("foo"), + second_field: Uuid::new_v4(), + }; + let st = SecondTarget { + first_field: String::from("foo"), + second_field: false, + }; + let mut m = SharedMetric { datum: 0 }; + + let mut out = Vec::with_capacity(8); + for i in 0..4 { + m.datum = i; + out.push(Sample::new(&ft, &m).unwrap()); + } + for i in 4..8 { + m.datum = i; + out.push(Sample::new(&st, &m).unwrap()); + } + out + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index bbf29653e9..be71c7ecb9 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -138,6 +138,12 @@ pub enum Error { #[error("Schema update versions must be sequential without gaps")] NonSequentialSchemaVersions, + #[error("Could not read timeseries_to_delete file")] + ReadTimeseriesToDelete { + #[source] + err: io::Error, + }, + #[cfg(any(feature = "sql", test))] #[error("SQL error")] Sql(#[from] sql::Error), @@ -296,6 +302,20 @@ const DATABASE_TIMESTAMP_FORMAT: &str = "%Y-%m-%d %H:%M:%S%.9f"; // The name of the database storing all metric information. const DATABASE_NAME: &str = "oximeter"; +// The name of the oximeter cluster, in the case of a replicated database. +// +// This must match what is used in the replicated SQL files when created the +// database itself, and the XML files describing the cluster. +const CLUSTER_NAME: &str = "oximeter_cluster"; + +// The name of the table storing database version information. +const VERSION_TABLE_NAME: &str = "version"; + +// During schema upgrades, it is possible to list timeseries that should be +// deleted, rather than deleting the entire database. These must be listed one +// per line, in the file inside the schema version directory with this name. +const TIMESERIES_TO_DELETE_FILE: &str = "timeseries-to-delete.txt"; + // The output format used for the result of select queries // // See https://clickhouse.com/docs/en/interfaces/formats/#jsoneachrow for details. From c9bd27c108e87ddb496f552f1a7570a9f47a2c54 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 11 Jul 2024 15:51:52 +0000 Subject: [PATCH 2/2] Review feedback - Rename error variant for clarity - Comment on deletion batch size --- oximeter/db/src/client/mod.rs | 5 ++++- oximeter/db/src/lib.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 8650dc1c76..364c1ec46c 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -1001,6 +1001,9 @@ impl Client { }) .await?; + // This size is arbitrary, and just something to avoid enormous requests + // to ClickHouse. It's unlikely that we'll hit this in practice anyway, + // given that we have far fewer than 1000 timeseries today. const DELETE_BATCH_SIZE: usize = 1000; let maybe_on_cluster = if replicated { format!("ON CLUSTER {}", crate::CLUSTER_NAME) @@ -1056,7 +1059,7 @@ impl Client { .map(|line| line.trim().parse().map_err(Error::from)) .collect(), Err(e) if e.kind() == ErrorKind::NotFound => Ok(vec![]), - Err(err) => Err(Error::ReadTimeseriesToDelete { err }), + Err(err) => Err(Error::ReadTimeseriesToDeleteFile { err }), } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index be71c7ecb9..123e4f15d4 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -139,7 +139,7 @@ pub enum Error { NonSequentialSchemaVersions, #[error("Could not read timeseries_to_delete file")] - ReadTimeseriesToDelete { + ReadTimeseriesToDeleteFile { #[source] err: io::Error, },