Skip to content

Commit

Permalink
Merge branch 'main' into nexus-zone-filesystems-2
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Jun 24, 2024
2 parents 684932d + 895f280 commit 87b8df9
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 94 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions dev-tools/xtask/src/check_workspace_deps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ pub fn run_cmd() -> Result<()> {
}
}

let mut seen_bins = BTreeSet::new();
for package in &workspace.packages {
if workspace.workspace_members.contains(&package.id) {
for target in &package.targets {
if target.is_bin() {
if !seen_bins.insert(&target.name) {
eprintln!(
"error: bin target {:?} seen multiple times",
target.name
);
nerrors += 1;
}
}
}
}
}

eprintln!(
"check-workspace-deps: errors: {}, warnings: {}",
nerrors, nwarnings
Expand Down
2 changes: 0 additions & 2 deletions installinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,3 @@ tokio-stream.workspace = true

[features]
image-standard = []
image-trampoline = []
rack-topology-single-sled = []
238 changes: 238 additions & 0 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4185,4 +4185,242 @@ mod tests {
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

// The schema directory, used in tests. The actual updater uses the
// zone-image files copied in during construction.
const SCHEMA_DIR: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/schema");

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_single_node() {
check_actual_schema_upgrades_are_valid_impl(false).await;
}

#[tokio::test]
async fn check_actual_schema_upgrades_are_valid_replicated() {
check_actual_schema_upgrades_are_valid_impl(true).await;
}

// NOTE: This does not actually run the upgrades, only checks them for
// validity.
async fn check_actual_schema_upgrades_are_valid_impl(replicated: bool) {
let name = format!(
"check_actual_schema_upgrades_are_valid_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;

// We really started tracking the database version in 2. However, that
// set of files is not valid by construction, since we were just
// creating the full database from scratch as an "upgrade". So we'll
// start by applying version 3, and then do all later ones.
const FIRST_VERSION: u64 = 3;
for version in FIRST_VERSION..=OXIMETER_VERSION {
let upgrade_file_contents = Client::read_schema_upgrade_sql_files(
log, false, version, SCHEMA_DIR,
)
.await
.expect("failed to read schema upgrade files");

if let Err(e) =
Client::verify_schema_upgrades(&upgrade_file_contents)
{
panic!(
"Schema update files for version {version} \
are not valid: {e:?}"
);
}
}
logctx.cleanup_successful();
}

// Either a cluster or single node.
//
// How does this not already exist...
enum ClickHouse {
Cluster(ClickHouseCluster),
Single(ClickHouseInstance),
}

impl ClickHouse {
fn port(&self) -> u16 {
match self {
ClickHouse::Cluster(cluster) => cluster.replica_1.port(),
ClickHouse::Single(node) => node.port(),
}
}

async fn cleanup(mut self) {
match &mut self {
ClickHouse::Cluster(cluster) => {
cluster
.keeper_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 1");
cluster
.keeper_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 2");
cluster
.keeper_3
.cleanup()
.await
.expect("Failed to cleanup ClickHouse keeper 3");
cluster
.replica_1
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 1");
cluster
.replica_2
.cleanup()
.await
.expect("Failed to cleanup ClickHouse server 2");
}
ClickHouse::Single(node) => node
.cleanup()
.await
.expect("Failed to cleanup single node ClickHouse"),
}
}
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_single_node() {
check_db_init_is_sum_of_all_up_impl(false).await;
}

#[tokio::test]
async fn check_db_init_is_sum_of_all_up_replicated() {
check_db_init_is_sum_of_all_up_impl(true).await;
}

// Check that the set of tables we arrive at through upgrades equals those
// we get by creating the latest version directly.
async fn check_db_init_is_sum_of_all_up_impl(replicated: bool) {
let name = format!(
"check_db_init_is_sum_of_all_up_{}",
if replicated { "replicated" } else { "single_node" }
);
let logctx = test_setup_log(&name);
let log = &logctx.log;
let db = if replicated {
ClickHouse::Cluster(create_cluster(&logctx).await)
} else {
ClickHouse::Single(
ClickHouseInstance::new_single_node(&logctx, 0)
.await
.expect("Failed to start ClickHouse"),
)
};
let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port());
let client = Client::new(address, &log);

// Let's start with version 2, which is the first tracked and contains
// the full SQL files we need to populate the DB.
client
.initialize_db_with_version(replicated, 2)
.await
.expect("Failed to initialize timeseries database");

// Now let's apply all the SQL updates from here to the latest.
for version in 3..=OXIMETER_VERSION {
client
.ensure_schema(replicated, version, SCHEMA_DIR)
.await
.expect("Failed to ensure schema");
}

// Fetch all the tables as a JSON blob.
let tables_through_upgrades =
fetch_oximeter_table_details(&client).await;

// We'll completely re-init the DB with the real version now.
if replicated {
client.wipe_replicated_db().await.unwrap()
} else {
client.wipe_single_node_db().await.unwrap()
}
client
.initialize_db_with_version(replicated, OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Fetch the tables again and compare.
let tables = fetch_oximeter_table_details(&client).await;

// This is an annoying comparison. Since the tables are quite
// complicated, we want to really be careful about what errors we show.
// Iterate through all the expected tables (from the direct creation),
// and check each expected field matches. Then we also check that the
// tables from the upgrade path don't have anything else in them.
for (name, json) in tables.iter() {
let upgrade_table =
tables_through_upgrades.get(name).unwrap_or_else(|| {
panic!("The tables via upgrade are missing table '{name}'")
});
for (key, value) in json.iter() {
let other_value = upgrade_table.get(key).unwrap_or_else(|| {
panic!("Upgrade table is missing key '{key}'")
});
assert_eq!(
value,
other_value,
"{} database table {name} disagree on the value \
of the column {key} between the direct table creation \
and the upgrade path.\nDirect:\n\n{value} \
\n\nUpgrade:\n\n{other_value}",
if replicated { "Replicated" } else { "Single-node" },
);
}
}

// Check there are zero keys in the upgrade path that don't appear in
// the direct path.
let extra_keys: Vec<_> = tables_through_upgrades
.keys()
.filter(|k| !tables.contains_key(k.as_str()))
.cloned()
.collect();
assert!(
extra_keys.is_empty(),
"The oximeter database contains tables in the upgrade path \
that are not in the direct path: {extra_keys:?}"
);

db.cleanup().await;
logctx.cleanup_successful();
}

// Read the relevant table details from the `oximeter` database, and return
// it keyed on the table name.
async fn fetch_oximeter_table_details(
client: &Client,
) -> BTreeMap<String, serde_json::Map<String, serde_json::Value>> {
let out = client
.execute_with_body(
"SELECT \
name,
engine_full,
create_table_query,
sorting_key,
primary_key
FROM system.tables \
WHERE database = 'oximeter'\
FORMAT JSONEachRow;",
)
.await
.unwrap()
.1;
out.lines()
.map(|line| {
let json: serde_json::Map<String, serde_json::Value> =
serde_json::from_str(&line).unwrap();
let name = json.get("name").unwrap().to_string();
(name, json)
})
.collect()
}
}
8 changes: 7 additions & 1 deletion oximeter/producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use dropshot::endpoint;
use dropshot::ApiDescription;
use dropshot::ConfigDropshot;
use dropshot::ConfigLogging;
use dropshot::HttpError;
use dropshot::HttpResponseOk;
use dropshot::HttpServer;
Expand Down Expand Up @@ -42,6 +41,13 @@ use std::time::Duration;
use thiserror::Error;
use uuid::Uuid;

// Our public interface depends directly or indirectly on these types; we
// export them so that consumers need not depend on dropshot themselves and
// to simplify how we stage incompatible upgrades.
pub use dropshot::ConfigLogging;
pub use dropshot::ConfigLoggingIfExists;
pub use dropshot::ConfigLoggingLevel;

#[derive(Debug, Clone, Error)]
pub enum Error {
#[error("Error running producer HTTP server: {0}")]
Expand Down
4 changes: 2 additions & 2 deletions package/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ workspace = true
[dependencies]
anyhow.workspace = true
camino.workspace = true
cargo_metadata.workspace = true
clap.workspace = true
futures.workspace = true
hex.workspace = true
illumos-utils.workspace = true
indicatif.workspace = true
omicron-workspace-hack.workspace = true
omicron-zone-package.workspace = true
petgraph.workspace = true
rayon.workspace = true
Expand All @@ -30,13 +32,11 @@ slog-bunyan.workspace = true
slog-term.workspace = true
smf.workspace = true
strum.workspace = true
swrite.workspace = true
tar.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = [ "full" ] }
toml.workspace = true
walkdir.workspace = true
omicron-workspace-hack.workspace = true

[dev-dependencies]
expectorate.workspace = true
Expand Down
Loading

0 comments on commit 87b8df9

Please sign in to comment.