Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[oximeter] Add minimal versioning support #4246

Merged
merged 9 commits into from
Oct 12, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use omicron_common::backoff;
use omicron_common::FileKv;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use serde::Deserialize;
Expand Down Expand Up @@ -455,11 +456,7 @@ impl OximeterAgent {
};
let client = Client::new(db_address, &log);
let replicated = client.is_oximeter_cluster().await?;
if !replicated {
client.init_single_node_db().await?;
} else {
client.init_replicated_db().await?;
}
client.initialize_db_with_version(replicated, OXIMETER_VERSION).await?;

// Spawn the task for aggregating and inserting all metrics
tokio::spawn(async move {
Expand Down
41 changes: 41 additions & 0 deletions oximeter/db/preprocessed_configs/config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<!-- This file was generated automatically.
smklein marked this conversation as resolved.
Show resolved Hide resolved
Do not edit it: it is likely to be discarded and generated again before it's read next time.
Files used to generate this file:
config.xml -->

<!-- Config that is used when server is run without config file. --><clickhouse>
<logger>
<level>trace</level>
<console>true</console>
</logger>

<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<mysql_port>9004</mysql_port>

<path>./</path>

<mlock_executable>true</mlock_executable>

<users>
<default>
<password/>

<networks>
<ip>::/0</ip>
</networks>

<profile>default</profile>
<quota>default</quota>
<access_management>1</access_management>
</default>
</users>

<profiles>
<default/>
</profiles>

<quotas>
<default/>
</quotas>
</clickhouse>
212 changes: 211 additions & 1 deletion oximeter/db/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use dropshot::WhichPage;
use oximeter::types::Sample;
use slog::debug;
use slog::error;
use slog::info;
use slog::trace;
use slog::warn;
use slog::Logger;
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
Expand Down Expand Up @@ -269,7 +271,95 @@ impl Client {
.map_err(|e| Error::Database(e.to_string()))
}

// Verifies if instance is part of oximeter_cluster
/// Validates that the schema used by the DB matches the version used by
/// the executable using it.
///
/// This function will wipe metrics data if the version stored within
smklein marked this conversation as resolved.
Show resolved Hide resolved
/// the DB does not match the schema version of Oximeter.
///
/// NOTE: This function is not safe for concurrent usage!
pub async fn initialize_db_with_version(
&self,
replicated: bool,
expected: u64,
smklein marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), Error> {
info!(self.log, "reading db version");

// Read the version from the DB
let version = self.read_latest_version().await?;
info!(self.log, "read oximeter database version"; "version" => version);

// Decide how to conform the on-disk version with this version of
// Oximeter.
if version < expected {
info!(self.log, "wiping and re-initializing oximeter schema");
// If the on-storage version is less than the constant embedded into
// this binary, the DB is out-of-date. Drop it, and re-populate it
// later.
if !replicated {
self.wipe_single_node_db().await?;
self.init_single_node_db().await?;
} else {
self.wipe_replicated_db().await?;
self.init_replicated_db().await?;
}
} else if version > expected {
// If the on-storage version is greater than the constant embedded
// into this binary, we may have downgraded.
return Err(Error::Database(
format!(
"Expected version {expected}, saw {version}. Downgrading is not supported.",
)
));
} else {
// If the version matches, we don't need to update the DB
return Ok(());
}

info!(self.log, "inserting current version"; "version" => expected);
self.insert_version(expected).await?;
Ok(())
}

async fn read_latest_version(&self) -> Result<u64, Error> {
let sql = format!(
"SELECT MAX(value) FROM {db_name}.version;",
db_name = crate::DATABASE_NAME,
);

let version = match self.execute_with_body(sql).await {
Ok(body) if body.is_empty() => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just checking my understanding, but this case should be hit exactly once, when we first add the version table, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this case is actually unlikely, but I figured I'd include it regardless.

Here are the cases I'd expect us to use:

  • When we upgrade the in-field databases, which exist, but don't have the "version" table, we'll see a database error containing Table oximeter.version doesn't exist.
  • When we scan for versions in databases that haven't been formatted, we'll see a database error containing Database oximeter doesn't exist

This case of "no versions exist" would happen if we formatted the schema, but bailed out before we managed to insert the version (or crashed before we got that far).

I went ahead and updated the code + comments to make this a bit more explicit.

warn!(
self.log,
"no version in database (treated as 'version 0')"
);
0
}
Ok(body) => body.trim().parse::<u64>().map_err(|err| {
Error::Database(format!("Cannot read version: {err}"))
})?,
Err(Error::Database(err))
if err.contains("Database oximeter doesn't exist") =>
{
0
}
Err(err) => {
return Err(err);
}
};
Ok(version)
}

async fn insert_version(&self, version: u64) -> Result<(), Error> {
let sql = format!(
"INSERT INTO {db_name}.version (*) VALUES ({version}, now());",
db_name = crate::DATABASE_NAME,
);
self.execute_with_body(sql).await?;
Ok(())
}

/// Verifies if instance is part of oximeter_cluster
pub async fn is_oximeter_cluster(&self) -> Result<bool, Error> {
let sql = String::from("SHOW CLUSTERS FORMAT JSONEachRow;");
let res = self.execute_with_body(sql).await?;
Expand Down Expand Up @@ -1339,6 +1429,126 @@ mod tests {
db.cleanup().await.expect("Failed to cleanup ClickHouse server");
}

// Returns the number of timeseries schemas being used.
async fn get_schema_count(client: &Client) -> usize {
client
.execute_with_body(
"SELECT * FROM oximeter.timeseries_schema FORMAT JSONEachRow;",
)
.await
.expect("Failed to SELECT from database")
.lines()
.count()
}

#[tokio::test]
async fn test_database_version_update_idempotent() {
let log = slog::Logger::root(slog::Discard, o!());
smklein marked this conversation as resolved.
Show resolved Hide resolved
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let replicated = false;

// Initialize the database...
let client = Client::new(address, &log);
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Insert data here so we can verify it still exists later.
//
// The values here don't matter much, we just want to check that
// the database data hasn't been dropped.
assert_eq!(0, get_schema_count(&client).await);
let sample = test_util::make_sample();
client.insert_samples(&[sample.clone()]).await.unwrap();
assert_eq!(1, get_schema_count(&client).await);

// Re-initialize the database, see that our data still exists
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

assert_eq!(1, get_schema_count(&client).await);

db.cleanup().await.expect("Failed to cleanup ClickHouse server");
}

#[tokio::test]
async fn test_database_version_will_not_downgrade() {
let log = slog::Logger::root(slog::Discard, o!());
smklein marked this conversation as resolved.
Show resolved Hide resolved
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let replicated = false;

// Initialize the database
let client = Client::new(address, &log);
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Bump the version of the database to a "too new" version
client
.insert_version(model::OXIMETER_VERSION + 1)
.await
.expect("Failed to insert very new DB version");

// Expect a failure re-initializing the client.
//
// This will attempt to initialize the client with "version =
// model::OXIMETER_VERSION", which is "too old".
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION)
.await
.expect_err("Should have failed, downgrades are not supported");

db.cleanup().await.expect("Failed to cleanup ClickHouse server");
}

#[tokio::test]
async fn test_database_version_wipes_old_version() {
let log = slog::Logger::root(slog::Discard, o!());
smklein marked this conversation as resolved.
Show resolved Hide resolved
let mut db = ClickHouseInstance::new_single_node(0)
.await
.expect("Failed to start ClickHouse");
let address = SocketAddr::new("::1".parse().unwrap(), db.port());

let replicated = false;

// Initialize the Client
let client = Client::new(address, &log);
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION)
.await
.expect("Failed to initialize timeseries database");

// Insert data here so we can remove it later.
//
// The values here don't matter much, we just want to check that
// the database data gets dropped later.
assert_eq!(0, get_schema_count(&client).await);
let sample = test_util::make_sample();
client.insert_samples(&[sample.clone()]).await.unwrap();
assert_eq!(1, get_schema_count(&client).await);

// If we try to upgrade to a newer version, we'll drop old data.
client
.initialize_db_with_version(replicated, model::OXIMETER_VERSION + 1)
.await
.expect("Should have initialized database successfully");
assert_eq!(0, get_schema_count(&client).await);

db.cleanup().await.expect("Failed to cleanup ClickHouse server");
}

#[tokio::test]
async fn test_schema_update() {
let log = slog::Logger::root(slog::Discard, o!());
Expand Down
8 changes: 8 additions & 0 deletions oximeter/db/src/db-replicated-init.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster;
--
CREATE TABLE IF NOT EXISTS oximeter.version ON CLUSTER oximeter_cluster
(
value UInt64,
timestamp DateTime64(9, 'UTC')
)
ENGINE = ReplicatedMergeTree()
ORDER BY (value, timestamp);
--
CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
Expand Down
8 changes: 8 additions & 0 deletions oximeter/db/src/db-single-node-init.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
CREATE DATABASE IF NOT EXISTS oximeter;
--
CREATE TABLE IF NOT EXISTS oximeter.version
(
value UInt64,
timestamp DateTime64(9, 'UTC')
)
ENGINE = MergeTree()
ORDER BY (value, timestamp);
--
CREATE TABLE IF NOT EXISTS oximeter.measurements_bool
(
timeseries_name String,
Expand Down
3 changes: 3 additions & 0 deletions oximeter/db/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ use std::net::IpAddr;
use std::net::Ipv6Addr;
use uuid::Uuid;

/// Describes the version of the Oximeter database.
pub const OXIMETER_VERSION: u64 = 1;

// Wrapper type to represent a boolean in the database.
//
// ClickHouse's type system lacks a boolean, and using `u8` to represent them. This a safe wrapper
Expand Down