diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 9a2b7b1bd3..0db71d195a 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -4185,4 +4185,242 @@ mod tests { db.cleanup().await.unwrap(); logctx.cleanup_successful(); } + + // The schema directory, used in tests. The actual updater uses the + // zone-image files copied in during construction. + const SCHEMA_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/schema"); + + #[tokio::test] + async fn check_actual_schema_upgrades_are_valid_single_node() { + check_actual_schema_upgrades_are_valid_impl(false).await; + } + + #[tokio::test] + async fn check_actual_schema_upgrades_are_valid_replicated() { + check_actual_schema_upgrades_are_valid_impl(true).await; + } + + // NOTE: This does not actually run the upgrades, only checks them for + // validity. + async fn check_actual_schema_upgrades_are_valid_impl(replicated: bool) { + let name = format!( + "check_actual_schema_upgrades_are_valid_{}", + if replicated { "replicated" } else { "single_node" } + ); + let logctx = test_setup_log(&name); + let log = &logctx.log; + + // We really started tracking the database version in 2. However, that + // set of files is not valid by construction, since we were just + // creating the full database from scratch as an "upgrade". So we'll + // start by applying version 3, and then do all later ones. + const FIRST_VERSION: u64 = 3; + for version in FIRST_VERSION..=OXIMETER_VERSION { + let upgrade_file_contents = Client::read_schema_upgrade_sql_files( + log, false, version, SCHEMA_DIR, + ) + .await + .expect("failed to read schema upgrade files"); + + if let Err(e) = + Client::verify_schema_upgrades(&upgrade_file_contents) + { + panic!( + "Schema update files for version {version} \ + are not valid: {e:?}" + ); + } + } + logctx.cleanup_successful(); + } + + // Either a cluster or single node. + // + // How does this not already exist... + enum ClickHouse { + Cluster(ClickHouseCluster), + Single(ClickHouseInstance), + } + + impl ClickHouse { + fn port(&self) -> u16 { + match self { + ClickHouse::Cluster(cluster) => cluster.replica_1.port(), + ClickHouse::Single(node) => node.port(), + } + } + + async fn cleanup(mut self) { + match &mut self { + ClickHouse::Cluster(cluster) => { + cluster + .keeper_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 1"); + cluster + .keeper_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 2"); + cluster + .keeper_3 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 3"); + cluster + .replica_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 1"); + cluster + .replica_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 2"); + } + ClickHouse::Single(node) => node + .cleanup() + .await + .expect("Failed to cleanup single node ClickHouse"), + } + } + } + + #[tokio::test] + async fn check_db_init_is_sum_of_all_up_single_node() { + check_db_init_is_sum_of_all_up_impl(false).await; + } + + #[tokio::test] + async fn check_db_init_is_sum_of_all_up_replicated() { + check_db_init_is_sum_of_all_up_impl(true).await; + } + + // Check that the set of tables we arrive at through upgrades equals those + // we get by creating the latest version directly. + async fn check_db_init_is_sum_of_all_up_impl(replicated: bool) { + let name = format!( + "check_db_init_is_sum_of_all_up_{}", + if replicated { "replicated" } else { "single_node" } + ); + let logctx = test_setup_log(&name); + let log = &logctx.log; + let db = if replicated { + ClickHouse::Cluster(create_cluster(&logctx).await) + } else { + ClickHouse::Single( + ClickHouseInstance::new_single_node(&logctx, 0) + .await + .expect("Failed to start ClickHouse"), + ) + }; + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + let client = Client::new(address, &log); + + // Let's start with version 2, which is the first tracked and contains + // the full SQL files we need to populate the DB. + client + .initialize_db_with_version(replicated, 2) + .await + .expect("Failed to initialize timeseries database"); + + // Now let's apply all the SQL updates from here to the latest. + for version in 3..=OXIMETER_VERSION { + client + .ensure_schema(replicated, version, SCHEMA_DIR) + .await + .expect("Failed to ensure schema"); + } + + // Fetch all the tables as a JSON blob. + let tables_through_upgrades = + fetch_oximeter_table_details(&client).await; + + // We'll completely re-init the DB with the real version now. + if replicated { + client.wipe_replicated_db().await.unwrap() + } else { + client.wipe_single_node_db().await.unwrap() + } + client + .initialize_db_with_version(replicated, OXIMETER_VERSION) + .await + .expect("Failed to initialize timeseries database"); + + // Fetch the tables again and compare. + let tables = fetch_oximeter_table_details(&client).await; + + // This is an annoying comparison. Since the tables are quite + // complicated, we want to really be careful about what errors we show. + // Iterate through all the expected tables (from the direct creation), + // and check each expected field matches. Then we also check that the + // tables from the upgrade path don't have anything else in them. + for (name, json) in tables.iter() { + let upgrade_table = + tables_through_upgrades.get(name).unwrap_or_else(|| { + panic!("The tables via upgrade are missing table '{name}'") + }); + for (key, value) in json.iter() { + let other_value = upgrade_table.get(key).unwrap_or_else(|| { + panic!("Upgrade table is missing key '{key}'") + }); + assert_eq!( + value, + other_value, + "{} database table {name} disagree on the value \ + of the column {key} between the direct table creation \ + and the upgrade path.\nDirect:\n\n{value} \ + \n\nUpgrade:\n\n{other_value}", + if replicated { "Replicated" } else { "Single-node" }, + ); + } + } + + // Check there are zero keys in the upgrade path that don't appear in + // the direct path. + let extra_keys: Vec<_> = tables_through_upgrades + .keys() + .filter(|k| !tables.contains_key(k.as_str())) + .cloned() + .collect(); + assert!( + extra_keys.is_empty(), + "The oximeter database contains tables in the upgrade path \ + that are not in the direct path: {extra_keys:?}" + ); + + db.cleanup().await; + logctx.cleanup_successful(); + } + + // Read the relevant table details from the `oximeter` database, and return + // it keyed on the table name. + async fn fetch_oximeter_table_details( + client: &Client, + ) -> BTreeMap> { + let out = client + .execute_with_body( + "SELECT \ + name, + engine_full, + create_table_query, + sorting_key, + primary_key + FROM system.tables \ + WHERE database = 'oximeter'\ + FORMAT JSONEachRow;", + ) + .await + .unwrap() + .1; + out.lines() + .map(|line| { + let json: serde_json::Map = + serde_json::from_str(&line).unwrap(); + let name = json.get("name").unwrap().to_string(); + (name, json) + }) + .collect() + } }