Skip to content

Commit

Permalink
Add tests for ClickHouse schema upgrades
Browse files Browse the repository at this point in the history
- Adds a check that verifies the SQL contained in each upgrade. This
  does the basic sanity checks like: one statement per file, no data
  modifications, but doesn't actually apply the upgrades.
- Adds another test that ensures that the set of tables we arrive at
  either by upgrading or directly initializing the latest database
  version are the same.
  • Loading branch information
bnaecker committed Jun 22, 2024
1 parent 964f6eb commit 470d99f
Showing 1 changed file with 238 additions and 0 deletions.
238 changes: 238 additions & 0 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4185,4 +4185,242 @@ mod tests {
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

// The schema directory, used in tests. The actual updater uses the
// zone-image files copied in during construction.
const SCHEMA_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/schema");

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_single_node() {
check_actual_schema_upgrades_are_valid_impl(false).await;
}

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_replicated() {
check_actual_schema_upgrades_are_valid_impl(true).await;
}

// NOTE: This does not actually run the upgrades, only checks them for
// validity.
async fn check_actual_schema_upgrades_are_valid_impl(replicated: bool) {
let name = format!(
"check_actual_schema_upgrades_are_valid_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;

// We really started tracking the database version in 2. However, that
// set of files is not valid by construction, since we were just
// creating the full database from scratch as an "upgrade". So we'll
// start by applying version 3, and then do all later ones.
const FIRST_VERSION: u64 = 3;
for version in FIRST_VERSION..=OXIMETER_VERSION {
let upgrade_file_contents = Client::read_schema_upgrade_sql_files(
log, false, version, SCHEMA_DIR,
)
.await
.expect("failed to read schema upgrade files");

if let Err(e) =
Client::verify_schema_upgrades(&upgrade_file_contents)
{
panic!(
"Schema update files for version {version} \
are not valid: {e:?}"
);
}
}
logctx.cleanup_successful();
}

// Either a cluster or single node.
//
// How does this not already exist...
enum ClickHouse {
Cluster(ClickHouseCluster),
Single(ClickHouseInstance),
}

impl ClickHouse {
fn port(&self) -> u16 {
match self {
ClickHouse::Cluster(cluster) => cluster.replica_1.port(),
ClickHouse::Single(node) => node.port(),
}
}

async fn cleanup(mut self) {
match &mut self {
ClickHouse::Cluster(cluster) => {
cluster
.keeper_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 1");
cluster
.keeper_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 2");
cluster
.keeper_3
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 3");
cluster
.replica_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 1");
cluster
.replica_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 2");
}
ClickHouse::Single(node) => node
.cleanup()
.await
.expect("Failed to cleanup single node ClickHouse"),
}
}
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_single_node() {
check_db_init_is_sum_of_all_up_impl(false).await;
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_replicated() {
check_db_init_is_sum_of_all_up_impl(true).await;
}

// Check that the set of tables we arrive at through upgrades equals those
// we get by creating the latest version directly.
async fn check_db_init_is_sum_of_all_up_impl(replicated: bool) {
let name = format!(
"check_db_init_is_sum_of_all_up_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;
let db = if replicated {
ClickHouse::Cluster(create_cluster(&logctx).await)
} else {
ClickHouse::Single(
ClickHouseInstance::new_single_node(&logctx, 0)
.await
.expect("Failed to start ClickHouse"),
)
};
let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port());
let client = Client::new(address, &log);

// Let's start with version 2, which is the first tracked and contains
// the full SQL files we need to populate the DB.
client
.initialize_db_with_version(replicated, 2)
.await
.expect("Failed to initialize timeseries database");

// Now let's apply all the SQL updates from here to the latest.
for version in 3..=OXIMETER_VERSION {
client
.ensure_schema(replicated, version, SCHEMA_DIR)
.await
.expect("Failed to ensure schema");
}

// Fetch all the tables as a JSON blob.
let tables_through_upgrades =
fetch_oximeter_table_details(&client).await;

// We'll completely re-init the DB with the real version now.
if replicated {
client.wipe_replicated_db().await.unwrap()
} else {
client.wipe_single_node_db().await.unwrap()
}
client
.initialize_db_with_version(replicated, OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Fetch the tables again and compare.
let tables = fetch_oximeter_table_details(&client).await;

// This is an annoying comparison. Since the tables are quite
// complicated, we want to really be careful about what errors we show.
// Iterate through all the expected tables (from the direct creation),
// and check each expected field matches. Then we also check that the
// tables from the upgrade path don't have anything else in them.
for (name, json) in tables.iter() {
let upgrade_table =
tables_through_upgrades.get(name).unwrap_or_else(|| {
panic!("The tables via upgrade are missing table '{name}'")
});
for (key, value) in json.iter() {
let other_value = upgrade_table.get(key).unwrap_or_else(|| {
panic!("Upgrade table is missing key '{key}'")
});
assert_eq!(
value,
other_value,
"{} database table {name} disagree on the value \
of the column {key} between the direct table creation \
and the upgrade path.\nDirect:\n\n{value} \
\n\nUpgrade:\n\n{other_value}",
if replicated { "Replicated" } else { "Single-node" },
);
}
}

// Check there are zero keys in the upgrade path that don't appear in
// the direct path.
let extra_keys: Vec<_> = tables_through_upgrades
.keys()
.cloned()
.filter(|k| !tables.contains_key(k))
.collect();
assert!(
extra_keys.is_empty(),
"The oximeter database contains tables in the upgrade path \
that are not in the direct path: {extra_keys:?}"
);

db.cleanup().await;
logctx.cleanup_successful();
}

// Read the relevant table details from the `oximeter` database, and return
// it keyed on the table name.
async fn fetch_oximeter_table_details(
client: &Client,
) -> BTreeMap<String, serde_json::Map<String, serde_json::Value>> {
let out = client
.execute_with_body(format!(
"SELECT \
name,
engine_full,
create_table_query,
sorting_key,
primary_key
FROM system.tables \
WHERE database = 'oximeter'\
FORMAT JSONEachRow;"
))
.await
.unwrap()
.1;
out.lines()
.map(|line| {
let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(&line).unwrap();
let name = json.get("name").unwrap().to_string();
(name, json)
})
.collect()
}
}

0 comments on commit 470d99f

Please sign in to comment.