Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests for ClickHouse schema upgrades #5932

Merged
merged 2 commits into from
Jun 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 238 additions & 0 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4185,4 +4185,242 @@ mod tests {
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

// The schema directory, used in tests. The actual updater uses the
// zone-image files copied in during construction.
const SCHEMA_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/schema");
bnaecker marked this conversation as resolved.
Show resolved Hide resolved

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_single_node() {
check_actual_schema_upgrades_are_valid_impl(false).await;
}

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_replicated() {
check_actual_schema_upgrades_are_valid_impl(true).await;
}

// NOTE: This does not actually run the upgrades, only checks them for
// validity.
async fn check_actual_schema_upgrades_are_valid_impl(replicated: bool) {
let name = format!(
"check_actual_schema_upgrades_are_valid_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;

// We really started tracking the database version in 2. However, that
// set of files is not valid by construction, since we were just
// creating the full database from scratch as an "upgrade". So we'll
// start by applying version 3, and then do all later ones.
const FIRST_VERSION: u64 = 3;
for version in FIRST_VERSION..=OXIMETER_VERSION {
let upgrade_file_contents = Client::read_schema_upgrade_sql_files(
log, false, version, SCHEMA_DIR,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears we're only testing single node and not replicated here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Derp (on me). Good find. I set false to replicated in my branch: #5897, which runs a new series of migrations (and why @bnaecker added this), and things are passing correctly. That PR is almost ready for merge, so, I'll ensure this test fn is updated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome, thanks!

)
.await
.expect("failed to read schema upgrade files");

if let Err(e) =
Client::verify_schema_upgrades(&upgrade_file_contents)
{
panic!(
"Schema update files for version {version} \
are not valid: {e:?}"
);
}
}
logctx.cleanup_successful();
}

// Either a cluster or single node.
//
// How does this not already exist...
enum ClickHouse {
Cluster(ClickHouseCluster),
Single(ClickHouseInstance),
}

impl ClickHouse {
fn port(&self) -> u16 {
match self {
ClickHouse::Cluster(cluster) => cluster.replica_1.port(),
ClickHouse::Single(node) => node.port(),
}
}

async fn cleanup(mut self) {
match &mut self {
ClickHouse::Cluster(cluster) => {
cluster
.keeper_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 1");
cluster
.keeper_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 2");
cluster
.keeper_3
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 3");
cluster
.replica_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 1");
cluster
.replica_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 2");
}
ClickHouse::Single(node) => node
.cleanup()
.await
.expect("Failed to cleanup single node ClickHouse"),
}
}
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_single_node() {
check_db_init_is_sum_of_all_up_impl(false).await;
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_replicated() {
check_db_init_is_sum_of_all_up_impl(true).await;
}

// Check that the set of tables we arrive at through upgrades equals those
// we get by creating the latest version directly.
async fn check_db_init_is_sum_of_all_up_impl(replicated: bool) {
let name = format!(
"check_db_init_is_sum_of_all_up_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;
let db = if replicated {
ClickHouse::Cluster(create_cluster(&logctx).await)
} else {
ClickHouse::Single(
ClickHouseInstance::new_single_node(&logctx, 0)
.await
.expect("Failed to start ClickHouse"),
)
};
let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port());
let client = Client::new(address, &log);

// Let's start with version 2, which is the first tracked and contains
// the full SQL files we need to populate the DB.
client
.initialize_db_with_version(replicated, 2)
.await
.expect("Failed to initialize timeseries database");

// Now let's apply all the SQL updates from here to the latest.
for version in 3..=OXIMETER_VERSION {
client
.ensure_schema(replicated, version, SCHEMA_DIR)
.await
.expect("Failed to ensure schema");
}

// Fetch all the tables as a JSON blob.
let tables_through_upgrades =
fetch_oximeter_table_details(&client).await;

// We'll completely re-init the DB with the real version now.
if replicated {
client.wipe_replicated_db().await.unwrap()
} else {
client.wipe_single_node_db().await.unwrap()
}
client
.initialize_db_with_version(replicated, OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Fetch the tables again and compare.
let tables = fetch_oximeter_table_details(&client).await;

// This is an annoying comparison. Since the tables are quite
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
// complicated, we want to really be careful about what errors we show.
// Iterate through all the expected tables (from the direct creation),
// and check each expected field matches. Then we also check that the
// tables from the upgrade path don't have anything else in them.
for (name, json) in tables.iter() {
let upgrade_table =
tables_through_upgrades.get(name).unwrap_or_else(|| {
panic!("The tables via upgrade are missing table '{name}'")
});
for (key, value) in json.iter() {
let other_value = upgrade_table.get(key).unwrap_or_else(|| {
panic!("Upgrade table is missing key '{key}'")
});
assert_eq!(
value,
other_value,
"{} database table {name} disagree on the value \
of the column {key} between the direct table creation \
and the upgrade path.\nDirect:\n\n{value} \
\n\nUpgrade:\n\n{other_value}",
if replicated { "Replicated" } else { "Single-node" },
);
}
}

// Check there are zero keys in the upgrade path that don't appear in
// the direct path.
let extra_keys: Vec<_> = tables_through_upgrades
.keys()
.filter(|k| !tables.contains_key(k.as_str()))
.cloned()
.collect();
assert!(
extra_keys.is_empty(),
"The oximeter database contains tables in the upgrade path \
that are not in the direct path: {extra_keys:?}"
);

db.cleanup().await;
logctx.cleanup_successful();
}

// Read the relevant table details from the `oximeter` database, and return
// it keyed on the table name.
async fn fetch_oximeter_table_details(
client: &Client,
) -> BTreeMap<String, serde_json::Map<String, serde_json::Value>> {
let out = client
.execute_with_body(
"SELECT \
name,
engine_full,
create_table_query,
sorting_key,
primary_key
FROM system.tables \
WHERE database = 'oximeter'\
FORMAT JSONEachRow;",
)
.await
.unwrap()
.1;
out.lines()
.map(|line| {
let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(&line).unwrap();
let name = json.get("name").unwrap().to_string();
(name, json)
})
.collect()
}
}
Loading