Skip to content

Commit

Permalink
Add offline schema updates for ClickHouse
Browse files Browse the repository at this point in the history
- Some cleanup around issuing multiple SQL statements from a file
- Create directory structure for storing schema updates modeled after
  CRDB up.sql files, but using integer versions, and move all existing
  SQL into version 2
- Add version 3, which fixes #4369,
  but does not apply it yet
- Add methods in the client for listing, reading, and applying one or
  more updates to the oximeter database from the upgrade files
- Add tests for upgrade application
- Add `clickhouse-schema-updater` binary for running them on demand
- Modify `oximeter-collector` to _not_ wipe / reinit the DB on startup
  if the version has change, but instead wait for the version to be
  equal to what it is compiled against. This relies on updates from the
  developer being applied before `oximeter` will continue.
  • Loading branch information
bnaecker committed Oct 31, 2023
1 parent dadbc22 commit 336475a
Show file tree
Hide file tree
Showing 18 changed files with 2,416 additions and 149 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions oximeter/collector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "MPL-2.0"

[dependencies]
anyhow.workspace = true
camino.workspace = true
clap.workspace = true
dropshot.workspace = true
futures.workspace = true
Expand Down
126 changes: 126 additions & 0 deletions oximeter/collector/src/bin/clickhouse-schema-updater.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! CLI tool to apply offline updates to ClickHouse schema.
// Copyright 2023 Oxide Computer Company

use anyhow::anyhow;
use anyhow::Context;
use camino::Utf8PathBuf;
use clap::Parser;
use clap::Subcommand;
use omicron_common::address::CLICKHOUSE_PORT;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use slog::Drain;
use slog::Level;
use slog::LevelFilter;
use slog::Logger;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV6;

const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_PORT,
0,
0,
));

fn parse_log_level(s: &str) -> anyhow::Result<Level> {
s.parse().map_err(|_| anyhow!("Invalid log level"))
}

/// Tool to apply offline updates to ClickHouse schema.
#[derive(Clone, Debug, Parser)]
struct Args {
/// IP address and port at which to access ClickHouse.
#[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")]
host: SocketAddr,

/// Directory from which to read schema files for each version.
#[arg(
short = 's',
long,
default_value_t = Utf8PathBuf::from("/opt/oxide/oximeter/schema")
)]
schema_directory: Utf8PathBuf,

/// The log level while running the command.
#[arg(
short,
long,
value_parser = parse_log_level,
default_value_t = Level::Warning
)]
log_level: Level,

#[command(subcommand)]
cmd: Cmd,
}

#[derive(Clone, Debug, Subcommand)]
enum Cmd {
/// List all schema in the directory available for an upgrade
#[clap(visible_alias = "ls")]
List,
/// Apply an upgrade to a specific version
#[clap(visible_aliases = ["up", "apply"])]
Upgrade {
/// The version to which to upgrade.
#[arg(default_value_t = OXIMETER_VERSION)]
version: u64,
},
}

fn build_logger(level: Level) -> Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();
let drain = LevelFilter::new(drain, level).fuse();
Logger::root(drain, slog::o!("unit" => "clickhouse_schema_updater"))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let log = build_logger(args.log_level);
let client = Client::new(args.host, &log);
let is_replicated = client.is_oximeter_cluster().await?;
match args.cmd {
Cmd::List => {
let latest = client
.read_latest_version()
.await
.context("Failed to read latest version")?;
let available_versions = Client::read_available_schema_versions(
&log,
is_replicated,
&args.schema_directory,
)
.await?;
println!("Latest version: {latest}");
println!("Available versions:");
for ver in available_versions {
print!(" {ver}");
if ver == latest {
print!(" (reported by database)");
}
if ver == OXIMETER_VERSION {
print!(" (expected by oximeter)");
}
println!();
}
}
Cmd::Upgrade { version } => {
client
.ensure_schema(is_replicated, version, args.schema_directory)
.await
.context("Failed to upgrade schema")?;
println!("Upgrade to oximeter database version {version} complete");
}
}
Ok(())
}
40 changes: 36 additions & 4 deletions oximeter/collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use omicron_common::backoff;
use omicron_common::FileKv;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::model::OXIMETER_VERSION;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use serde::Deserialize;
Expand Down Expand Up @@ -454,9 +453,38 @@ impl OximeterAgent {
CLICKHOUSE_PORT,
)
};

// Determine the version of the database.
//
// There are three cases
//
// - The database exists and is at the expected version. Continue in
// this case.
//
// - The database exists and is at a lower-than-expected version. We
// fail back to the caller here, which will retry indefinitely until the
// DB has been updated.
//
// - The DB doesn't exist at all. This reports a version number of 0. We
// need to create the DB here, at the latest version. This is used in
// fresh installations and tests.
let client = Client::new(db_address, &log);
let replicated = client.is_oximeter_cluster().await?;
client.initialize_db_with_version(replicated, OXIMETER_VERSION).await?;
match client.check_db_is_at_expected_version().await {
Ok(_) => {}
Err(oximeter_db::Error::DatabaseVersionMismatch {
found, ..
}) if found == 0 => {
debug!(log, "oximeter database does not exist, creating");
let replicated = client.is_oximeter_cluster().await?;
client
.initialize_db_with_version(
replicated,
oximeter_db::OXIMETER_VERSION,
)
.await?;
}
Err(e) => return Err(Error::from(e)),
}

// Spawn the task for aggregating and inserting all metrics
tokio::spawn(async move {
Expand Down Expand Up @@ -712,6 +740,9 @@ impl Oximeter {
///
/// This can be used to override / ignore the logging configuration in
/// `config`, using `log` instead.
///
/// Note that this blocks until the ClickHouse database is available **and
/// at the expected version**.
pub async fn with_logger(
config: &Config,
args: &OximeterArguments,
Expand Down Expand Up @@ -743,7 +774,8 @@ impl Oximeter {
let log_client_failure = |error, delay| {
warn!(
log,
"failed to initialize ClickHouse database, will retry in {:?}", delay;
"failed to create ClickHouse client";
"retry_after" => ?delay,
"error" => ?error,
);
};
Expand Down
3 changes: 3 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ anyhow.workspace = true
async-trait.workspace = true
bcs.workspace = true
bytes = { workspace = true, features = [ "serde" ] }
camino.workspace = true
chrono.workspace = true
clap.workspace = true
dropshot.workspace = true
highway.workspace = true
omicron-common.workspace = true
oximeter.workspace = true
regex.workspace = true
reqwest = { workspace = true, features = [ "json" ] }
Expand All @@ -35,6 +37,7 @@ itertools.workspace = true
omicron-test-utils.workspace = true
slog-dtrace.workspace = true
strum.workspace = true
tempfile.workspace = true

[[bin]]
name = "oxdb"
Expand Down
40 changes: 40 additions & 0 deletions oximeter/db/schema/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ClickHouse schema files

This directory contains the SQL files for different versions of the ClickHouse
timeseries database used by `oximeter`. In general, schema are expected to be
applied while the database is online, but no other clients exist. This is
similar to the current situation for _offline upgrade_ we use when updating the
main control plane database in CockroachDB.

## Constraints, or why ClickHouse is weird

While this tool is modeled after the mechanism for applying updates in
CockroachDB, ClickHouse is a significantly different DBMS. There are no
transactions; no unique primary keys; a single DB server can house both
replicated and single-node tables. This means we need to be pretty careful when
updating the schema. Changes must be idempotent, as with the CRDB schema, but at
this point we do not support inserting or modifying data at all.

Similar to the CRDB offline update tool, we assume no non-update modifications
of the database are running concurrently. However, given ClickHouse's lack of
transactions, we actually require that there are no writes of any kind. In
practice, this means `oximeter` **must not** be running when this is called.
Similarly, there must be only a single instance of this program at a time.

To run this program:

- Ensure the ClickHouse server is running, and grab its IP address;
```bash
$ pfexec zlogin oxz_clickhouse_e449eb80-3371-40a6-a316-d6e64b039357 'ipadm show-addr -o addrobj,addr | grep omicron6'
oxControlService20/omicron6 fd00:1122:3344:101::e/64
```
- Log into the `oximeter` zone, `zlogin oxz_oximeter_<UUID>`
- Ensure `oximeter` is _not_ running, e.g., `svcadm disable oximeter`
- Run this tool, pointing it at the desired schema directory, e.g.:

```bash
# /opt/oxide/oximeter/bin/clickhouse-schema-updater \
--host <ADDR_FROM_ABOVE> \
--schema-dir /opt/oxide/oximeter/sql
up VERSION
```
Loading

0 comments on commit 336475a

Please sign in to comment.