Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add offline schema updates for ClickHouse #4394

Merged
merged 13 commits into from
Nov 10, 2023
12 changes: 12 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,15 @@ fail-fast = false

[script.crdb-seed]
command = 'cargo run -p crdb-seed'

# The ClickHouse cluster tests currently rely on a hard-coded set of ports for
# the nodes in the cluster. We would like to relax this in the future, at which
# point this test-group configuration can be removed or at least loosened to
# support testing in parallel. For now, enforce strict serialization for all
# tests with `replicated` in the name.
[test-groups]
clickhouse-cluster = { max-threads = 1 }

[[profile.default.overrides]]
filter = 'package(oximeter-db) and test(replicated)'
test-group = 'clickhouse-cluster'
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MPL-2.0"

[dependencies]
anyhow.workspace = true
camino.workspace = true
clap.workspace = true
dropshot.workspace = true
futures.workspace = true
Expand Down
126 changes: 126 additions & 0 deletions oximeter/collector/src/bin/clickhouse-schema-updater.rs
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! CLI tool to apply offline updates to ClickHouse schema.

// Copyright 2023 Oxide Computer Company

use anyhow::anyhow;
use anyhow::Context;
use camino::Utf8PathBuf;
use clap::Parser;
use clap::Subcommand;
use omicron_common::address::CLICKHOUSE_PORT;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use slog::Drain;
use slog::Level;
use slog::LevelFilter;
use slog::Logger;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;

const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_PORT,
0,
0,
));

fn parse_log_level(s: &str) -> anyhow::Result<Level> {
s.parse().map_err(|_| anyhow!("Invalid log level"))
}

/// Tool to apply offline updates to ClickHouse schema.
#[derive(Clone, Debug, Parser)]
struct Args {
/// IP address and port at which to access ClickHouse.
#[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")]
host: SocketAddr,

/// Directory from which to read schema files for each version.
#[arg(
short = 's',
long,
default_value_t = Utf8PathBuf::from("/opt/oxide/oximeter/schema")
)]
schema_directory: Utf8PathBuf,

/// The log level while running the command.
#[arg(
short,
long,
value_parser = parse_log_level,
default_value_t = Level::Warning
)]
log_level: Level,

#[command(subcommand)]
cmd: Cmd,
}

#[derive(Clone, Debug, Subcommand)]
enum Cmd {
/// List all schema in the directory available for an upgrade
#[clap(visible_alias = "ls")]
List,
/// Apply an upgrade to a specific version
#[clap(visible_aliases = ["up", "apply"])]
Upgrade {
/// The version to which to upgrade.
#[arg(default_value_t = OXIMETER_VERSION)]
version: u64,
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
},
}

fn build_logger(level: Level) -> Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let drain = LevelFilter::new(drain, level).fuse();
Logger::root(drain, slog::o!("unit" => "clickhouse_schema_updater"))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let log = build_logger(args.log_level);
let client = Client::new(args.host, &log);
let is_replicated = client.is_oximeter_cluster().await?;
match args.cmd {
Cmd::List => {
let latest = client
.read_latest_version()
.await
.context("Failed to read latest version")?;
let available_versions = Client::read_available_schema_versions(
&log,
is_replicated,
&args.schema_directory,
)
.await?;
println!("Latest version: {latest}");
println!("Available versions:");
for ver in available_versions {
print!(" {ver}");
if ver == latest {
print!(" (reported by database)");
}
if ver == OXIMETER_VERSION {
print!(" (expected by oximeter)");
}
println!();
}
}
Cmd::Upgrade { version } => {
client
.ensure_schema(is_replicated, version, args.schema_directory)
.await
.context("Failed to upgrade schema")?;
println!("Upgrade to oximeter database version {version} complete");
}
}
Ok(())
}
41 changes: 37 additions & 4 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use omicron_common::backoff;
use omicron_common::FileKv;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use serde::Deserialize;
Expand Down Expand Up @@ -454,9 +453,39 @@ impl OximeterAgent {
CLICKHOUSE_PORT,
)
};

// Determine the version of the database.
//
// There are three cases
//
// - The database exists and is at the expected version. Continue in
// this case.
//
// - The database exists and is at a lower-than-expected version. We
// fail back to the caller here, which will retry indefinitely until the
// DB has been updated.
bnaecker marked this conversation as resolved.
Show resolved Hide resolved
//
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new(db_address, &log);
let replicated = client.is_oximeter_cluster().await?;
client.initialize_db_with_version(replicated, OXIMETER_VERSION).await?;
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
found: 0,
..
}) => {
debug!(log, "oximeter database does not exist, creating");
let replicated = client.is_oximeter_cluster().await?;
client
.initialize_db_with_version(
replicated,
oximeter_db::OXIMETER_VERSION,
)
.await?;
}
Err(e) => return Err(Error::from(e)),
}

// Spawn the task for aggregating and inserting all metrics
tokio::spawn(async move {
Expand Down Expand Up @@ -712,6 +741,9 @@ impl Oximeter {
///
/// This can be used to override / ignore the logging configuration in
/// `config`, using `log` instead.
///
/// Note that this blocks until the ClickHouse database is available **and
/// at the expected version**.
pub async fn with_logger(
config: &Config,
args: &OximeterArguments,
Expand Down Expand Up @@ -743,7 +775,8 @@ impl Oximeter {
let log_client_failure = |error, delay| {
warn!(
log,
"failed to initialize ClickHouse database, will retry in {:?}", delay;
"failed to create ClickHouse client";
"retry_after" => ?delay,
"error" => ?error,
);
};
Expand Down
3 changes: 3 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ anyhow.workspace = true
async-trait.workspace = true
bcs.workspace = true
bytes = { workspace = true, features = [ "serde" ] }
camino.workspace = true
chrono.workspace = true
clap.workspace = true
dropshot.workspace = true
highway.workspace = true
omicron-common.workspace = true
oximeter.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = [ "json" ] }
Expand All @@ -35,6 +37,7 @@ itertools.workspace = true
omicron-test-utils.workspace = true
slog-dtrace.workspace = true
strum.workspace = true
tempfile.workspace = true

[[bin]]
name = "oxdb"
Expand Down
39 changes: 39 additions & 0 deletions oximeter/db/schema/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# ClickHouse schema files

This directory contains the SQL files for different versions of the ClickHouse
timeseries database used by `oximeter`. In general, schema are expected to be
applied while the database is online, but no other clients exist. This is
similar to the current situation for _offline upgrade_ we use when updating the
main control plane database in CockroachDB.

## Constraints, or why ClickHouse is weird

While this tool is modeled after the mechanism for applying updates in
CockroachDB, ClickHouse is a significantly different DBMS. There are no
transactions; no unique primary keys; a single DB server can house both
replicated and single-node tables. This means we need to be pretty careful when
updating the schema. Changes must be idempotent, as with the CRDB schema, but at
this point we do not support inserting or modifying data at all.

Similar to the CRDB offline update tool, we assume no non-update modifications
of the database are running concurrently. However, given ClickHouse's lack of
transactions, we actually require that there are no writes of any kind. In
practice, this means `oximeter` **must not** be running when this is called.
Similarly, there must be only a single instance of this program at a time.

To run this program:

- Ensure the ClickHouse server is running, and grab its IP address;
```bash
$ pfexec zlogin oxz_clickhouse_e449eb80-3371-40a6-a316-d6e64b039357 'ipadm show-addr -o addrobj,addr | grep omicron6'
oxControlService20/omicron6 fd00:1122:3344:101::e/64
```
- Log into the `oximeter` zone, `zlogin oxz_oximeter_<UUID>`
- Run this tool, pointing it at the desired schema directory, e.g.:

```bash
# /opt/oxide/oximeter/bin/clickhouse-schema-updater \
--host <ADDR_FROM_ABOVE> \
--schema-dir /opt/oxide/oximeter/sql
up VERSION
```
Loading
Loading