Skip to content

Commit

Permalink
[schema][test] Add a data migration validation test (#4783)
Browse files Browse the repository at this point in the history
Adds a schema test with "before" / "after" hooks, and adds an example
specifically for the "23.0.0" migration.

My intent is that this can be used for any other schema migrations that
would like to execute arbitrary SQL checks
against the new schema too.

Fixes #4747
  • Loading branch information
smklein authored Jan 9, 2024
1 parent a6245c4 commit e4522e5
Showing 1 changed file with 265 additions and 15 deletions.
280 changes: 265 additions & 15 deletions nexus/tests/integration_tests/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use dropshot::test_util::LogContext;
use futures::future::BoxFuture;
use nexus_db_model::schema::SCHEMA_VERSION as LATEST_SCHEMA_VERSION;
use nexus_db_queries::db::datastore::{
all_sql_for_version_migration, EARLIEST_SUPPORTED_VERSION,
Expand All @@ -14,7 +15,7 @@ use omicron_common::api::external::SemverVersion;
use omicron_common::api::internal::shared::SwitchLocation;
use omicron_common::nexus_config::Config;
use omicron_common::nexus_config::SchemaConfig;
use omicron_test_utils::dev::db::CockroachInstance;
use omicron_test_utils::dev::db::{Client, CockroachInstance};
use pretty_assertions::{assert_eq, assert_ne};
use similar_asserts;
use slog::Logger;
Expand Down Expand Up @@ -163,26 +164,76 @@ async fn query_crdb_schema_version(crdb: &CockroachInstance) -> String {
version
}

#[derive(PartialEq, Clone, Debug)]
struct SqlEnum {
name: String,
variant: String,
}

impl From<(&str, &str)> for SqlEnum {
fn from((name, variant): (&str, &str)) -> Self {
Self { name: name.to_string(), variant: variant.to_string() }
}
}

// A newtype wrapper around a string, which allows us to more liberally
// interpret SQL types.
//
// Note that for the purposes of schema comparisons, we don't care about parsing
// the contents of the database, merely the schema and equality of contained data.
#[derive(PartialEq, Clone, Debug)]
enum AnySqlType {
DateTime,
String(String),
Bool(bool),
Uuid(Uuid),
Int8(i64),
DateTime,
Enum(SqlEnum),
Float4(f32),
Int8(i64),
Json(serde_json::value::Value),
String(String),
TextArray(Vec<String>),
Uuid(Uuid),
// TODO: This isn't exhaustive, feel free to add more.
//
// These should only be necessary for rows where the database schema changes also choose to
// populate data.
}

impl From<bool> for AnySqlType {
fn from(b: bool) -> Self {
Self::Bool(b)
}
}

impl From<SqlEnum> for AnySqlType {
fn from(value: SqlEnum) -> Self {
Self::Enum(value)
}
}

impl From<f32> for AnySqlType {
fn from(value: f32) -> Self {
Self::Float4(value)
}
}

impl From<i64> for AnySqlType {
fn from(value: i64) -> Self {
Self::Int8(value)
}
}

impl From<String> for AnySqlType {
fn from(value: String) -> Self {
Self::String(value)
}
}

impl From<Uuid> for AnySqlType {
fn from(value: Uuid) -> Self {
Self::Uuid(value)
}
}

impl AnySqlType {
fn as_str(&self) -> &str {
match self {
Expand Down Expand Up @@ -218,31 +269,53 @@ impl<'a> tokio_postgres::types::FromSql<'a> for AnySqlType {
if f32::accepts(ty) {
return Ok(AnySqlType::Float4(f32::from_sql(ty, raw)?));
}
if serde_json::value::Value::accepts(ty) {
return Ok(AnySqlType::Json(serde_json::value::Value::from_sql(
ty, raw,
)?));
}
if Vec::<String>::accepts(ty) {
return Ok(AnySqlType::TextArray(Vec::<String>::from_sql(
ty, raw,
)?));
}
Err(anyhow::anyhow!(
"Cannot parse type {ty}. If you're trying to use this type in a table which is populated \
during a schema migration, consider adding it to `AnySqlType`."
).into())

use tokio_postgres::types::Kind;
match ty.kind() {
Kind::Enum(_) => {
Ok(AnySqlType::Enum(SqlEnum {
name: ty.name().to_string(),
variant: std::str::from_utf8(raw)?.to_string(),
}))
},
_ => {
Err(anyhow::anyhow!(
"Cannot parse type {ty:?}. \
If you're trying to use this type in a table which is populated \
during a schema migration, consider adding it to `AnySqlType`."
).into())
}
}
}

fn accepts(_ty: &tokio_postgres::types::Type) -> bool {
true
}
}

// It's a little redunant to include the column name alongside each value,
// but it results in a prettier diff.
#[derive(PartialEq, Debug)]
struct NamedSqlValue {
// It's a little redunant to include the column name alongside each value,
// but it results in a prettier diff.
struct ColumnValue {
column: String,
value: Option<AnySqlType>,
}

impl NamedSqlValue {
impl ColumnValue {
fn new<V: Into<AnySqlType>>(column: &str, value: V) -> Self {
Self { column: String::from(column), value: Some(value.into()) }
}

fn expect(&self, column: &str) -> Option<&AnySqlType> {
assert_eq!(self.column, column);
self.value.as_ref()
Expand All @@ -252,7 +325,7 @@ impl NamedSqlValue {
// A generic representation of a row of SQL data
#[derive(PartialEq, Debug)]
struct Row {
values: Vec<NamedSqlValue>,
values: Vec<ColumnValue>,
}

impl Row {
Expand All @@ -278,7 +351,7 @@ fn process_rows(rows: &Vec<tokio_postgres::Row>) -> Vec<Row> {
let mut row_result = Row::new();
for i in 0..row.len() {
let column_name = row.columns()[i].name();
row_result.values.push(NamedSqlValue {
row_result.values.push(ColumnValue {
column: column_name.to_string(),
value: row.get(i),
});
Expand Down Expand Up @@ -849,6 +922,183 @@ async fn dbinit_equals_sum_of_all_up() {
logctx.cleanup_successful();
}

type BeforeFn = for<'a> fn(client: &'a Client) -> BoxFuture<'a, ()>;
type AfterFn = for<'a> fn(client: &'a Client) -> BoxFuture<'a, ()>;

// Describes the operations which we might take before and after
// migrations to check that they worked.
struct DataMigrationFns {
before: Option<BeforeFn>,
after: AfterFn,
}

// "51F0" -> "Silo"
const SILO1: Uuid = Uuid::from_u128(0x111151F0_5c3d_4647_83b0_8f3515da7be1);
const SILO2: Uuid = Uuid::from_u128(0x222251F0_5c3d_4647_83b0_8f3515da7be1);

// "6001" -> "Pool"
const POOL1: Uuid = Uuid::from_u128(0x11116001_5c3d_4647_83b0_8f3515da7be1);
const POOL2: Uuid = Uuid::from_u128(0x22226001_5c3d_4647_83b0_8f3515da7be1);
const POOL3: Uuid = Uuid::from_u128(0x33336001_5c3d_4647_83b0_8f3515da7be1);

fn before_23_0_0(client: &Client) -> BoxFuture<'_, ()> {
Box::pin(async move {
// Create two silos
client.batch_execute(&format!("INSERT INTO silo
(id, name, description, time_created, time_modified, time_deleted, discoverable, authentication_mode, user_provision_type, mapped_fleet_roles, rcgen) VALUES
('{SILO1}', 'silo1', '', now(), now(), NULL, false, 'local', 'jit', '{{}}', 1),
('{SILO2}', 'silo2', '', now(), now(), NULL, false, 'local', 'jit', '{{}}', 1);
")).await.expect("Failed to create silo");

// Create an IP pool for each silo, and a third "fleet pool" which has
// no corresponding silo.
client.batch_execute(&format!("INSERT INTO ip_pool
(id, name, description, time_created, time_modified, time_deleted, rcgen, silo_id, is_default) VALUES
('{POOL1}', 'pool1', '', now(), now(), NULL, 1, '{SILO1}', true),
('{POOL2}', 'pool2', '', now(), now(), NULL, 1, '{SILO2}', false),
('{POOL3}', 'pool3', '', now(), now(), NULL, 1, null, true);
")).await.expect("Failed to create IP Pool");
})
}

fn after_23_0_0(client: &Client) -> BoxFuture<'_, ()> {
Box::pin(async {
// Confirm that the ip_pool_resource objects have been created
// by the migration.
let rows = client
.query("SELECT * FROM ip_pool_resource ORDER BY ip_pool_id", &[])
.await
.expect("Failed to query ip pool resource");
let ip_pool_resources = process_rows(&rows);

assert_eq!(ip_pool_resources.len(), 4);

let type_silo = SqlEnum::from(("ip_pool_resource_type", "silo"));

// pool1, which referenced silo1 in the "ip_pool" table, has a newly
// created resource.
//
// The same relationship is true for pool2 / silo2.
assert_eq!(
ip_pool_resources[0].values,
vec![
ColumnValue::new("ip_pool_id", POOL1),
ColumnValue::new("resource_type", type_silo.clone()),
ColumnValue::new("resource_id", SILO1),
ColumnValue::new("is_default", true),
],
);
assert_eq!(
ip_pool_resources[1].values,
vec![
ColumnValue::new("ip_pool_id", POOL2),
ColumnValue::new("resource_type", type_silo.clone()),
ColumnValue::new("resource_id", SILO2),
ColumnValue::new("is_default", false),
],
);

// pool3 did not previously have a corresponding silo, so now it's associated
// with both silos as a new resource in each.
//
// Additionally, silo1 already had a default pool (pool1), but silo2 did
// not have one. As a result, pool3 becomes the new default pool for silo2.
assert_eq!(
ip_pool_resources[2].values,
vec![
ColumnValue::new("ip_pool_id", POOL3),
ColumnValue::new("resource_type", type_silo.clone()),
ColumnValue::new("resource_id", SILO1),
ColumnValue::new("is_default", false),
],
);
assert_eq!(
ip_pool_resources[3].values,
vec![
ColumnValue::new("ip_pool_id", POOL3),
ColumnValue::new("resource_type", type_silo.clone()),
ColumnValue::new("resource_id", SILO2),
ColumnValue::new("is_default", true),
],
);
})
}

// Lazily initializes all migration checks. The combination of Rust function
// pointers and async makes defining a static table fairly painful, so we're
// using lazy initialization instead.
//
// Each "check" is implemented as a pair of {before, after} migration function
// pointers, called precisely around the migration under test.
fn get_migration_checks() -> BTreeMap<SemverVersion, DataMigrationFns> {
let mut map = BTreeMap::new();

map.insert(
SemverVersion(semver::Version::parse("23.0.0").unwrap()),
DataMigrationFns { before: Some(before_23_0_0), after: after_23_0_0 },
);

map
}

// Performs all schema changes and runs version-specific assertions.
//
// HOW TO ADD A MIGRATION CHECK:
// - Add a new "map.insert" line to "get_migration_checks", with the semver of
// the version you'd like to inspect before / after.
// - Define your "before" (optional) and "after" (required) functions. These
// act on a connection to CockroachDB, and can observe and mutate arbitrary
// state.
//
// ADVICE FOR MIGRATION CHECKS:
// - Your migration check will run in the same test as all other migration
// checks, because performing schema migrations isn't that fast. If you
// perform an operation that could be disruptive to subsequent checks, I
// recommend cleaning up after yourself (e.g., DELETE relevant rows).
// - I recommend using schema checks that are NOT strongly-typed. When you
// add a migration check, it'll happen to match the "latest" static schemas
// defined by Nexus, but that won't always be the case. As the schema
// continues to change (maybe a table you're trying to check gets a new column
// in a later version), your code should continue operating on the OLD version,
// and as such, should avoid needing any updates.
#[tokio::test]
async fn validate_data_migration() {
let config = load_test_config();
let logctx = LogContext::new("validate_data_migration", &config.pkg.log);
let log = &logctx.log;

let populate = false;
let mut crdb = test_setup_just_crdb(&logctx.log, populate).await;
let client = crdb.connect().await.expect("Failed to access CRDB client");

let all_versions = read_all_schema_versions().await;
let all_checks = get_migration_checks();

// Go from the first version to the latest version.
for version in &all_versions {
// If this check has preconditions (or setup), run them.
let checks = all_checks.get(version);
if let Some(before) = checks.and_then(|check| check.before) {
before(&client).await;
}

apply_update(log, &crdb, &version.to_string(), 1).await;
assert_eq!(version.to_string(), query_crdb_schema_version(&crdb).await);

// If this check has postconditions (or cleanup), run them.
if let Some(after) = checks.map(|check| check.after) {
after(&client).await;
}
}
assert_eq!(
LATEST_SCHEMA_VERSION.to_string(),
query_crdb_schema_version(&crdb).await
);

crdb.cleanup().await.unwrap();
logctx.cleanup_successful();
}

// Returns the InformationSchema object for a database populated via `sql`.
async fn get_information_schema(log: &Logger, sql: &str) -> InformationSchema {
let populate = false;
Expand Down

0 comments on commit e4522e5

Please sign in to comment.