From e4522e52a720cc4721600a03cf924c3bf043a883 Mon Sep 17 00:00:00 2001 From: Sean Klein Date: Tue, 9 Jan 2024 14:35:59 -0800 Subject: [PATCH] [schema][test] Add a data migration validation test (#4783) Adds a schema test with "before" / "after" hooks, and adds an example specifically for the "23.0.0" migration. My intent is that this can be used for any other schema migrations that would like to execute arbitrary SQL checks against the new schema too. Fixes https://github.com/oxidecomputer/omicron/issues/4747 --- nexus/tests/integration_tests/schema.rs | 280 ++++++++++++++++++++++-- 1 file changed, 265 insertions(+), 15 deletions(-) diff --git a/nexus/tests/integration_tests/schema.rs b/nexus/tests/integration_tests/schema.rs index 6feafe415d..21ed99e010 100644 --- a/nexus/tests/integration_tests/schema.rs +++ b/nexus/tests/integration_tests/schema.rs @@ -5,6 +5,7 @@ use camino::Utf8PathBuf; use chrono::{DateTime, Utc}; use dropshot::test_util::LogContext; +use futures::future::BoxFuture; use nexus_db_model::schema::SCHEMA_VERSION as LATEST_SCHEMA_VERSION; use nexus_db_queries::db::datastore::{ all_sql_for_version_migration, EARLIEST_SUPPORTED_VERSION, @@ -14,7 +15,7 @@ use omicron_common::api::external::SemverVersion; use omicron_common::api::internal::shared::SwitchLocation; use omicron_common::nexus_config::Config; use omicron_common::nexus_config::SchemaConfig; -use omicron_test_utils::dev::db::CockroachInstance; +use omicron_test_utils::dev::db::{Client, CockroachInstance}; use pretty_assertions::{assert_eq, assert_ne}; use similar_asserts; use slog::Logger; @@ -163,6 +164,18 @@ async fn query_crdb_schema_version(crdb: &CockroachInstance) -> String { version } +#[derive(PartialEq, Clone, Debug)] +struct SqlEnum { + name: String, + variant: String, +} + +impl From<(&str, &str)> for SqlEnum { + fn from((name, variant): (&str, &str)) -> Self { + Self { name: name.to_string(), variant: variant.to_string() } + } +} + // A newtype wrapper around a string, which allows us to more liberally // interpret SQL types. // @@ -170,19 +183,57 @@ async fn query_crdb_schema_version(crdb: &CockroachInstance) -> String { // the contents of the database, merely the schema and equality of contained data. #[derive(PartialEq, Clone, Debug)] enum AnySqlType { - DateTime, - String(String), Bool(bool), - Uuid(Uuid), - Int8(i64), + DateTime, + Enum(SqlEnum), Float4(f32), + Int8(i64), + Json(serde_json::value::Value), + String(String), TextArray(Vec), + Uuid(Uuid), // TODO: This isn't exhaustive, feel free to add more. // // These should only be necessary for rows where the database schema changes also choose to // populate data. } +impl From for AnySqlType { + fn from(b: bool) -> Self { + Self::Bool(b) + } +} + +impl From for AnySqlType { + fn from(value: SqlEnum) -> Self { + Self::Enum(value) + } +} + +impl From for AnySqlType { + fn from(value: f32) -> Self { + Self::Float4(value) + } +} + +impl From for AnySqlType { + fn from(value: i64) -> Self { + Self::Int8(value) + } +} + +impl From for AnySqlType { + fn from(value: String) -> Self { + Self::String(value) + } +} + +impl From for AnySqlType { + fn from(value: Uuid) -> Self { + Self::Uuid(value) + } +} + impl AnySqlType { fn as_str(&self) -> &str { match self { @@ -218,15 +269,33 @@ impl<'a> tokio_postgres::types::FromSql<'a> for AnySqlType { if f32::accepts(ty) { return Ok(AnySqlType::Float4(f32::from_sql(ty, raw)?)); } + if serde_json::value::Value::accepts(ty) { + return Ok(AnySqlType::Json(serde_json::value::Value::from_sql( + ty, raw, + )?)); + } if Vec::::accepts(ty) { return Ok(AnySqlType::TextArray(Vec::::from_sql( ty, raw, )?)); } - Err(anyhow::anyhow!( - "Cannot parse type {ty}. If you're trying to use this type in a table which is populated \ -during a schema migration, consider adding it to `AnySqlType`." - ).into()) + + use tokio_postgres::types::Kind; + match ty.kind() { + Kind::Enum(_) => { + Ok(AnySqlType::Enum(SqlEnum { + name: ty.name().to_string(), + variant: std::str::from_utf8(raw)?.to_string(), + })) + }, + _ => { + Err(anyhow::anyhow!( + "Cannot parse type {ty:?}. \ + If you're trying to use this type in a table which is populated \ + during a schema migration, consider adding it to `AnySqlType`." + ).into()) + } + } } fn accepts(_ty: &tokio_postgres::types::Type) -> bool { @@ -234,15 +303,19 @@ during a schema migration, consider adding it to `AnySqlType`." } } +// It's a little redunant to include the column name alongside each value, +// but it results in a prettier diff. #[derive(PartialEq, Debug)] -struct NamedSqlValue { - // It's a little redunant to include the column name alongside each value, - // but it results in a prettier diff. +struct ColumnValue { column: String, value: Option, } -impl NamedSqlValue { +impl ColumnValue { + fn new>(column: &str, value: V) -> Self { + Self { column: String::from(column), value: Some(value.into()) } + } + fn expect(&self, column: &str) -> Option<&AnySqlType> { assert_eq!(self.column, column); self.value.as_ref() @@ -252,7 +325,7 @@ impl NamedSqlValue { // A generic representation of a row of SQL data #[derive(PartialEq, Debug)] struct Row { - values: Vec, + values: Vec, } impl Row { @@ -278,7 +351,7 @@ fn process_rows(rows: &Vec) -> Vec { let mut row_result = Row::new(); for i in 0..row.len() { let column_name = row.columns()[i].name(); - row_result.values.push(NamedSqlValue { + row_result.values.push(ColumnValue { column: column_name.to_string(), value: row.get(i), }); @@ -849,6 +922,183 @@ async fn dbinit_equals_sum_of_all_up() { logctx.cleanup_successful(); } +type BeforeFn = for<'a> fn(client: &'a Client) -> BoxFuture<'a, ()>; +type AfterFn = for<'a> fn(client: &'a Client) -> BoxFuture<'a, ()>; + +// Describes the operations which we might take before and after +// migrations to check that they worked. +struct DataMigrationFns { + before: Option, + after: AfterFn, +} + +// "51F0" -> "Silo" +const SILO1: Uuid = Uuid::from_u128(0x111151F0_5c3d_4647_83b0_8f3515da7be1); +const SILO2: Uuid = Uuid::from_u128(0x222251F0_5c3d_4647_83b0_8f3515da7be1); + +// "6001" -> "Pool" +const POOL1: Uuid = Uuid::from_u128(0x11116001_5c3d_4647_83b0_8f3515da7be1); +const POOL2: Uuid = Uuid::from_u128(0x22226001_5c3d_4647_83b0_8f3515da7be1); +const POOL3: Uuid = Uuid::from_u128(0x33336001_5c3d_4647_83b0_8f3515da7be1); + +fn before_23_0_0(client: &Client) -> BoxFuture<'_, ()> { + Box::pin(async move { + // Create two silos + client.batch_execute(&format!("INSERT INTO silo + (id, name, description, time_created, time_modified, time_deleted, discoverable, authentication_mode, user_provision_type, mapped_fleet_roles, rcgen) VALUES + ('{SILO1}', 'silo1', '', now(), now(), NULL, false, 'local', 'jit', '{{}}', 1), + ('{SILO2}', 'silo2', '', now(), now(), NULL, false, 'local', 'jit', '{{}}', 1); + ")).await.expect("Failed to create silo"); + + // Create an IP pool for each silo, and a third "fleet pool" which has + // no corresponding silo. + client.batch_execute(&format!("INSERT INTO ip_pool + (id, name, description, time_created, time_modified, time_deleted, rcgen, silo_id, is_default) VALUES + ('{POOL1}', 'pool1', '', now(), now(), NULL, 1, '{SILO1}', true), + ('{POOL2}', 'pool2', '', now(), now(), NULL, 1, '{SILO2}', false), + ('{POOL3}', 'pool3', '', now(), now(), NULL, 1, null, true); + ")).await.expect("Failed to create IP Pool"); + }) +} + +fn after_23_0_0(client: &Client) -> BoxFuture<'_, ()> { + Box::pin(async { + // Confirm that the ip_pool_resource objects have been created + // by the migration. + let rows = client + .query("SELECT * FROM ip_pool_resource ORDER BY ip_pool_id", &[]) + .await + .expect("Failed to query ip pool resource"); + let ip_pool_resources = process_rows(&rows); + + assert_eq!(ip_pool_resources.len(), 4); + + let type_silo = SqlEnum::from(("ip_pool_resource_type", "silo")); + + // pool1, which referenced silo1 in the "ip_pool" table, has a newly + // created resource. + // + // The same relationship is true for pool2 / silo2. + assert_eq!( + ip_pool_resources[0].values, + vec![ + ColumnValue::new("ip_pool_id", POOL1), + ColumnValue::new("resource_type", type_silo.clone()), + ColumnValue::new("resource_id", SILO1), + ColumnValue::new("is_default", true), + ], + ); + assert_eq!( + ip_pool_resources[1].values, + vec![ + ColumnValue::new("ip_pool_id", POOL2), + ColumnValue::new("resource_type", type_silo.clone()), + ColumnValue::new("resource_id", SILO2), + ColumnValue::new("is_default", false), + ], + ); + + // pool3 did not previously have a corresponding silo, so now it's associated + // with both silos as a new resource in each. + // + // Additionally, silo1 already had a default pool (pool1), but silo2 did + // not have one. As a result, pool3 becomes the new default pool for silo2. + assert_eq!( + ip_pool_resources[2].values, + vec![ + ColumnValue::new("ip_pool_id", POOL3), + ColumnValue::new("resource_type", type_silo.clone()), + ColumnValue::new("resource_id", SILO1), + ColumnValue::new("is_default", false), + ], + ); + assert_eq!( + ip_pool_resources[3].values, + vec![ + ColumnValue::new("ip_pool_id", POOL3), + ColumnValue::new("resource_type", type_silo.clone()), + ColumnValue::new("resource_id", SILO2), + ColumnValue::new("is_default", true), + ], + ); + }) +} + +// Lazily initializes all migration checks. The combination of Rust function +// pointers and async makes defining a static table fairly painful, so we're +// using lazy initialization instead. +// +// Each "check" is implemented as a pair of {before, after} migration function +// pointers, called precisely around the migration under test. +fn get_migration_checks() -> BTreeMap { + let mut map = BTreeMap::new(); + + map.insert( + SemverVersion(semver::Version::parse("23.0.0").unwrap()), + DataMigrationFns { before: Some(before_23_0_0), after: after_23_0_0 }, + ); + + map +} + +// Performs all schema changes and runs version-specific assertions. +// +// HOW TO ADD A MIGRATION CHECK: +// - Add a new "map.insert" line to "get_migration_checks", with the semver of +// the version you'd like to inspect before / after. +// - Define your "before" (optional) and "after" (required) functions. These +// act on a connection to CockroachDB, and can observe and mutate arbitrary +// state. +// +// ADVICE FOR MIGRATION CHECKS: +// - Your migration check will run in the same test as all other migration +// checks, because performing schema migrations isn't that fast. If you +// perform an operation that could be disruptive to subsequent checks, I +// recommend cleaning up after yourself (e.g., DELETE relevant rows). +// - I recommend using schema checks that are NOT strongly-typed. When you +// add a migration check, it'll happen to match the "latest" static schemas +// defined by Nexus, but that won't always be the case. As the schema +// continues to change (maybe a table you're trying to check gets a new column +// in a later version), your code should continue operating on the OLD version, +// and as such, should avoid needing any updates. +#[tokio::test] +async fn validate_data_migration() { + let config = load_test_config(); + let logctx = LogContext::new("validate_data_migration", &config.pkg.log); + let log = &logctx.log; + + let populate = false; + let mut crdb = test_setup_just_crdb(&logctx.log, populate).await; + let client = crdb.connect().await.expect("Failed to access CRDB client"); + + let all_versions = read_all_schema_versions().await; + let all_checks = get_migration_checks(); + + // Go from the first version to the latest version. + for version in &all_versions { + // If this check has preconditions (or setup), run them. + let checks = all_checks.get(version); + if let Some(before) = checks.and_then(|check| check.before) { + before(&client).await; + } + + apply_update(log, &crdb, &version.to_string(), 1).await; + assert_eq!(version.to_string(), query_crdb_schema_version(&crdb).await); + + // If this check has postconditions (or cleanup), run them. + if let Some(after) = checks.map(|check| check.after) { + after(&client).await; + } + } + assert_eq!( + LATEST_SCHEMA_VERSION.to_string(), + query_crdb_schema_version(&crdb).await + ); + + crdb.cleanup().await.unwrap(); + logctx.cleanup_successful(); +} + // Returns the InformationSchema object for a database populated via `sql`. async fn get_information_schema(log: &Logger, sql: &str) -> InformationSchema { let populate = false;