From 336475a419ab5fb21c1ccd8784625b83b9e437e7 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Mon, 30 Oct 2023 02:59:33 +0000 Subject: [PATCH] Add offline schema updates for ClickHouse - Some cleanup around issuing multiple SQL statements from a file - Create directory structure for storing schema updates modeled after CRDB up.sql files, but using integer versions, and move all existing SQL into version 2 - Add version 3, which fixes https://github.com/oxidecomputer/omicron/issues/4369, but does not apply it yet - Add methods in the client for listing, reading, and applying one or more updates to the oximeter database from the upgrade files - Add tests for upgrade application - Add `clickhouse-schema-updater` binary for running them on demand - Modify `oximeter-collector` to _not_ wipe / reinit the DB on startup if the version has change, but instead wait for the version to be equal to what it is compiled against. This relies on updates from the developer being applied before `oximeter` will continue. --- Cargo.lock | 4 + oximeter/collector/Cargo.toml | 1 + .../src/bin/clickhouse-schema-updater.rs | 126 +++ oximeter/collector/src/lib.rs | 40 +- oximeter/db/Cargo.toml | 3 + oximeter/db/schema/README.md | 40 + .../replicated/2/up.sql} | 167 ++-- oximeter/db/schema/replicated/3/up.sql | 22 + oximeter/db/schema/replicated/db-init.sql | 709 +++++++++++++++++ .../replicated/db-wipe.sql} | 0 .../single-node/2/up.sql} | 115 ++- oximeter/db/schema/single-node/3/up.sql | 22 + oximeter/db/schema/single-node/db-init.sql | 540 +++++++++++++ .../single-node/db-wipe.sql} | 0 oximeter/db/src/client.rs | 725 +++++++++++++++++- oximeter/db/src/lib.rs | 35 +- oximeter/db/src/model.rs | 9 +- package-manifest.toml | 7 +- 18 files changed, 2416 insertions(+), 149 deletions(-) create mode 100644 oximeter/collector/src/bin/clickhouse-schema-updater.rs create mode 100644 oximeter/db/schema/README.md rename oximeter/db/{src/db-replicated-init.sql => schema/replicated/2/up.sql} (93%) create mode 100644 oximeter/db/schema/replicated/3/up.sql create mode 100644 oximeter/db/schema/replicated/db-init.sql rename oximeter/db/{src/db-wipe-replicated.sql => schema/replicated/db-wipe.sql} (100%) rename oximeter/db/{src/db-single-node-init.sql => schema/single-node/2/up.sql} (88%) create mode 100644 oximeter/db/schema/single-node/3/up.sql create mode 100644 oximeter/db/schema/single-node/db-init.sql rename oximeter/db/{src/db-wipe-single-node.sql => schema/single-node/db-wipe.sql} (100%) diff --git a/Cargo.lock b/Cargo.lock index 2df98809a1c..131e60ea5e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5759,6 +5759,7 @@ name = "oximeter-collector" version = "0.1.0" dependencies = [ "anyhow", + "camino", "clap 4.4.3", "dropshot", "expectorate", @@ -5798,12 +5799,14 @@ dependencies = [ "async-trait", "bcs", "bytes", + "camino", "chrono", "clap 4.4.3", "dropshot", "expectorate", "highway", "itertools 0.11.0", + "omicron-common 0.1.0", "omicron-test-utils", "omicron-workspace-hack", "oximeter 0.1.0", @@ -5817,6 +5820,7 @@ dependencies = [ "slog-dtrace", "slog-term", "strum", + "tempfile", "thiserror", "tokio", "usdt", diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 470d9db312b..ad0ae8e330d 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -7,6 +7,7 @@ license = "MPL-2.0" [dependencies] anyhow.workspace = true +camino.workspace = true clap.workspace = true dropshot.workspace = true futures.workspace = true diff --git a/oximeter/collector/src/bin/clickhouse-schema-updater.rs b/oximeter/collector/src/bin/clickhouse-schema-updater.rs new file mode 100644 index 00000000000..20780c37e07 --- /dev/null +++ b/oximeter/collector/src/bin/clickhouse-schema-updater.rs @@ -0,0 +1,126 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! CLI tool to apply offline updates to ClickHouse schema. + +// Copyright 2023 Oxide Computer Company + +use anyhow::anyhow; +use anyhow::Context; +use camino::Utf8PathBuf; +use clap::Parser; +use clap::Subcommand; +use omicron_common::address::CLICKHOUSE_PORT; +use oximeter_db::model::OXIMETER_VERSION; +use oximeter_db::Client; +use slog::Drain; +use slog::Level; +use slog::LevelFilter; +use slog::Logger; +use std::net::Ipv6Addr; +use std::net::SocketAddr; +use std::net::SocketAddrV6; + +const DEFAULT_HOST: SocketAddr = SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_PORT, + 0, + 0, +)); + +fn parse_log_level(s: &str) -> anyhow::Result { + s.parse().map_err(|_| anyhow!("Invalid log level")) +} + +/// Tool to apply offline updates to ClickHouse schema. +#[derive(Clone, Debug, Parser)] +struct Args { + /// IP address and port at which to access ClickHouse. + #[arg(long, default_value_t = DEFAULT_HOST, env = "CLICKHOUSE_HOST")] + host: SocketAddr, + + /// Directory from which to read schema files for each version. + #[arg( + short = 's', + long, + default_value_t = Utf8PathBuf::from("/opt/oxide/oximeter/schema") + )] + schema_directory: Utf8PathBuf, + + /// The log level while running the command. + #[arg( + short, + long, + value_parser = parse_log_level, + default_value_t = Level::Warning + )] + log_level: Level, + + #[command(subcommand)] + cmd: Cmd, +} + +#[derive(Clone, Debug, Subcommand)] +enum Cmd { + /// List all schema in the directory available for an upgrade + #[clap(visible_alias = "ls")] + List, + /// Apply an upgrade to a specific version + #[clap(visible_aliases = ["up", "apply"])] + Upgrade { + /// The version to which to upgrade. + #[arg(default_value_t = OXIMETER_VERSION)] + version: u64, + }, +} + +fn build_logger(level: Level) -> Logger { + let decorator = slog_term::TermDecorator::new().build(); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + let drain = LevelFilter::new(drain, level).fuse(); + Logger::root(drain, slog::o!("unit" => "clickhouse_schema_updater")) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let log = build_logger(args.log_level); + let client = Client::new(args.host, &log); + let is_replicated = client.is_oximeter_cluster().await?; + match args.cmd { + Cmd::List => { + let latest = client + .read_latest_version() + .await + .context("Failed to read latest version")?; + let available_versions = Client::read_available_schema_versions( + &log, + is_replicated, + &args.schema_directory, + ) + .await?; + println!("Latest version: {latest}"); + println!("Available versions:"); + for ver in available_versions { + print!(" {ver}"); + if ver == latest { + print!(" (reported by database)"); + } + if ver == OXIMETER_VERSION { + print!(" (expected by oximeter)"); + } + println!(); + } + } + Cmd::Upgrade { version } => { + client + .ensure_schema(is_replicated, version, args.schema_directory) + .await + .context("Failed to upgrade schema")?; + println!("Upgrade to oximeter database version {version} complete"); + } + } + Ok(()) +} diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index b7a14cec453..4ec6188026b 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -35,7 +35,6 @@ use omicron_common::backoff; use omicron_common::FileKv; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; -use oximeter_db::model::OXIMETER_VERSION; use oximeter_db::Client; use oximeter_db::DbWrite; use serde::Deserialize; @@ -454,9 +453,38 @@ impl OximeterAgent { CLICKHOUSE_PORT, ) }; + + // Determine the version of the database. + // + // There are three cases + // + // - The database exists and is at the expected version. Continue in + // this case. + // + // - The database exists and is at a lower-than-expected version. We + // fail back to the caller here, which will retry indefinitely until the + // DB has been updated. + // + // - The DB doesn't exist at all. This reports a version number of 0. We + // need to create the DB here, at the latest version. This is used in + // fresh installations and tests. let client = Client::new(db_address, &log); - let replicated = client.is_oximeter_cluster().await?; - client.initialize_db_with_version(replicated, OXIMETER_VERSION).await?; + match client.check_db_is_at_expected_version().await { + Ok(_) => {} + Err(oximeter_db::Error::DatabaseVersionMismatch { + found, .. + }) if found == 0 => { + debug!(log, "oximeter database does not exist, creating"); + let replicated = client.is_oximeter_cluster().await?; + client + .initialize_db_with_version( + replicated, + oximeter_db::OXIMETER_VERSION, + ) + .await?; + } + Err(e) => return Err(Error::from(e)), + } // Spawn the task for aggregating and inserting all metrics tokio::spawn(async move { @@ -712,6 +740,9 @@ impl Oximeter { /// /// This can be used to override / ignore the logging configuration in /// `config`, using `log` instead. + /// + /// Note that this blocks until the ClickHouse database is available **and + /// at the expected version**. pub async fn with_logger( config: &Config, args: &OximeterArguments, @@ -743,7 +774,8 @@ impl Oximeter { let log_client_failure = |error, delay| { warn!( log, - "failed to initialize ClickHouse database, will retry in {:?}", delay; + "failed to create ClickHouse client"; + "retry_after" => ?delay, "error" => ?error, ); }; diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index d37c57ccced..4d53869d0dc 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -10,10 +10,12 @@ anyhow.workspace = true async-trait.workspace = true bcs.workspace = true bytes = { workspace = true, features = [ "serde" ] } +camino.workspace = true chrono.workspace = true clap.workspace = true dropshot.workspace = true highway.workspace = true +omicron-common.workspace = true oximeter.workspace = true regex.workspace = true reqwest = { workspace = true, features = [ "json" ] } @@ -35,6 +37,7 @@ itertools.workspace = true omicron-test-utils.workspace = true slog-dtrace.workspace = true strum.workspace = true +tempfile.workspace = true [[bin]] name = "oxdb" diff --git a/oximeter/db/schema/README.md b/oximeter/db/schema/README.md new file mode 100644 index 00000000000..e18804c8575 --- /dev/null +++ b/oximeter/db/schema/README.md @@ -0,0 +1,40 @@ +# ClickHouse schema files + +This directory contains the SQL files for different versions of the ClickHouse +timeseries database used by `oximeter`. In general, schema are expected to be +applied while the database is online, but no other clients exist. This is +similar to the current situation for _offline upgrade_ we use when updating the +main control plane database in CockroachDB. + +## Constraints, or why ClickHouse is weird + +While this tool is modeled after the mechanism for applying updates in +CockroachDB, ClickHouse is a significantly different DBMS. There are no +transactions; no unique primary keys; a single DB server can house both +replicated and single-node tables. This means we need to be pretty careful when +updating the schema. Changes must be idempotent, as with the CRDB schema, but at +this point we do not support inserting or modifying data at all. + +Similar to the CRDB offline update tool, we assume no non-update modifications +of the database are running concurrently. However, given ClickHouse's lack of +transactions, we actually require that there are no writes of any kind. In +practice, this means `oximeter` **must not** be running when this is called. +Similarly, there must be only a single instance of this program at a time. + +To run this program: + +- Ensure the ClickHouse server is running, and grab its IP address; + ```bash + $ pfexec zlogin oxz_clickhouse_e449eb80-3371-40a6-a316-d6e64b039357 'ipadm show-addr -o addrobj,addr | grep omicron6' + oxControlService20/omicron6 fd00:1122:3344:101::e/64 + ``` +- Log into the `oximeter` zone, `zlogin oxz_oximeter_` +- Ensure `oximeter` is _not_ running, e.g., `svcadm disable oximeter` +- Run this tool, pointing it at the desired schema directory, e.g.: + +```bash +# /opt/oxide/oximeter/bin/clickhouse-schema-updater \ + --host \ + --schema-dir /opt/oxide/oximeter/sql + up VERSION +``` diff --git a/oximeter/db/src/db-replicated-init.sql b/oximeter/db/schema/replicated/2/up.sql similarity index 93% rename from oximeter/db/src/db-replicated-init.sql rename to oximeter/db/schema/replicated/2/up.sql index ec11854e446..4348271d789 100644 --- a/oximeter/db/src/db-replicated-init.sql +++ b/oximeter/db/schema/replicated/2/up.sql @@ -1,5 +1,6 @@ CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster; --- + +/* The version table contains metadata about the `oximeter` database */ CREATE TABLE IF NOT EXISTS oximeter.version ON CLUSTER oximeter_cluster ( value UInt64, @@ -7,7 +8,17 @@ CREATE TABLE IF NOT EXISTS oximeter.version ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedMergeTree() ORDER BY (value, timestamp); --- + +/* The measurement tables contain all individual samples from each timeseries. + * + * Each table stores a single datum type, and otherwise contains nearly the same + * structure. The primary sorting key is on the timeseries name, key, and then + * timestamp, so that all timeseries from the same schema are grouped, followed + * by all samples from the same timeseries. + * + * This reflects that one usually looks up the _key_ in one or more field table, + * and then uses that to index quickly into the measurements tables. + */ CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -18,7 +29,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_ ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bool_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -27,7 +38,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ON CLUSTER oximeter_cluste datum UInt8 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bool_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i8_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -38,7 +49,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i8_local ON CLUSTER oximeter_cl ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i8_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -47,7 +58,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 ON CLUSTER oximeter_cluster datum Int8 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i8_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u8_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -58,7 +69,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u8_local ON CLUSTER oximeter_cl ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u8_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -67,7 +78,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 ON CLUSTER oximeter_cluster datum UInt8 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u8_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i16_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -78,7 +89,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i16_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i16_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -87,7 +98,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 ON CLUSTER oximeter_cluster datum Int16 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i16_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u16_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -98,7 +109,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u16_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u16_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -107,7 +118,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 ON CLUSTER oximeter_cluster datum UInt16 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u16_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -118,7 +129,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i32_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -127,7 +138,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 ON CLUSTER oximeter_cluster datum Int32 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -138,7 +149,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u32_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -147,7 +158,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ON CLUSTER oximeter_cluster datum UInt32 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -158,7 +169,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -167,7 +178,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster datum Int64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -178,7 +189,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u64_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -187,7 +198,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ON CLUSTER oximeter_cluster datum UInt64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -198,7 +209,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f32_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -207,7 +218,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ON CLUSTER oximeter_cluster datum Float32 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -218,7 +229,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_c ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -227,7 +238,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster datum Float64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -238,7 +249,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximete ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_string_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_string ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -247,7 +258,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_string ON CLUSTER oximeter_clus datum String ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_string_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -258,7 +269,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes_local ON CLUSTER oximeter ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bytes_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -267,7 +278,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ON CLUSTER oximeter_clust datum Array(UInt8) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bytes_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -279,7 +290,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64_local ON CLUSTER ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativei64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -289,7 +300,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ON CLUSTER oximet datum Int64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativei64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -301,7 +312,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64_local ON CLUSTER ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativeu64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -311,7 +322,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 ON CLUSTER oximet datum UInt64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativeu64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -323,7 +334,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32_local ON CLUSTER ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -333,7 +344,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 ON CLUSTER oximet datum Float32 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -345,7 +356,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -355,7 +366,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ON CLUSTER oximet datum Float64 ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -368,7 +379,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8_local ON CLUSTER ox ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami8_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -379,7 +390,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 ON CLUSTER oximeter counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami8_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -392,7 +403,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8_local ON CLUSTER ox ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu8_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -403,7 +414,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 ON CLUSTER oximeter counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu8_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -416,7 +427,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami16_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -427,7 +438,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami16_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -440,7 +451,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu16_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -451,7 +462,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu16_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -464,7 +475,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -475,7 +486,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -488,7 +499,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -499,7 +510,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -512,7 +523,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -523,7 +534,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -536,7 +547,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -547,7 +558,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -560,7 +571,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf32_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -571,7 +582,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf32_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -584,7 +595,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64_local ON CLUSTER o ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf64_local', '{replica}') ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -595,7 +606,24 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ON CLUSTER oximete counts Array(UInt64) ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf64_local', xxHash64(splitByChar(':', timeseries_name)[1])); --- + +/* The field tables store named dimensions of each timeseries. + * + * As with the measurement tables, there is one field table for each field data + * type. Fields are deduplicated by using the "replacing merge tree", though + * this behavior **must not** be relied upon for query correctness. + * + * The index for the fields differs from the measurements, however. Rows are + * sorted by timeseries name, then field name, field value, and finally + * timeseries key. This reflects the most common pattern for looking them up: + * by field name and possibly value, within a timeseries. The resulting keys are + * usually then used to look up measurements. + * + * NOTE: We may want to consider a secondary index on these tables, sorting by + * timeseries name and then key, since it would improve lookups where one + * already has the key. Realistically though, these tables are quite small and + * so performance benefits will be low in absolute terms. + */ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -605,7 +633,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -615,7 +643,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i8 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u8 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -625,7 +653,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u8 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -635,7 +663,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i16 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u16 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -645,7 +673,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u16 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -655,7 +683,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i32 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u32 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -665,7 +693,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u32 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -675,7 +703,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -685,7 +713,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u64 ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -695,7 +723,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -705,7 +733,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -715,7 +743,10 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ON CLUSTER oximeter_cluster ) ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + +/* The timeseries schema table stores the extracted schema for the samples + * oximeter collects. + */ CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema ON CLUSTER oximeter_cluster ( timeseries_name String, diff --git a/oximeter/db/schema/replicated/3/up.sql b/oximeter/db/schema/replicated/3/up.sql new file mode 100644 index 00000000000..073d643564f --- /dev/null +++ b/oximeter/db/schema/replicated/3/up.sql @@ -0,0 +1,22 @@ +/* This adds missing field types to the timeseries schema table field.type + * column, by augmentin the enum to capture new values. Note that the existing + * variants can't be moved or changed, so the new ones are added at the end. The + * client never sees this discriminant, only the string, so it should not + * matter. + */ +ALTER TABLE oximeter.timeseries_schema + MODIFY COLUMN IF EXISTS fields.type + Array(Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6, + 'I8' = 7, + 'U8' = 8, + 'I16' = 9, + 'U16' = 10, + 'I32' = 11, + 'U32' = 12, + 'U64' = 13 + )); diff --git a/oximeter/db/schema/replicated/db-init.sql b/oximeter/db/schema/replicated/db-init.sql new file mode 100644 index 00000000000..d71e3b5c6e5 --- /dev/null +++ b/oximeter/db/schema/replicated/db-init.sql @@ -0,0 +1,709 @@ +CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster; + +/* The version table contains metadata about the `oximeter` database */ +CREATE TABLE IF NOT EXISTS oximeter.version ON CLUSTER oximeter_cluster +( + value UInt64, + timestamp DateTime64(9, 'UTC') +) +ENGINE = ReplicatedMergeTree() +ORDER BY (value, timestamp); + +/* The measurement tables contain all individual samples from each timeseries. + * + * Each table stores a single datum type, and otherwise contains nearly the same + * structure. The primary sorting key is on the timeseries name, key, and then + * timestamp, so that all timeseries from the same schema are grouped, followed + * by all samples from the same timeseries. + * + * This reflects that one usually looks up the _key_ in one or more field table, + * and then uses that to index quickly into the measurements tables. + */ +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bool_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bool_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i8_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int8 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i8_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int8 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i8_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u8_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u8_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u8_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i16_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int16 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i16_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int16 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i16_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u16_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt16 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u16_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt16 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u16_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int32 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int32 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt32 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt32 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_u64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_string_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_string_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_bytes_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_bytes_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativei64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativei64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativeu64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativeu64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_cumulativef64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_cumulativef64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int8), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami8_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int8), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami8_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt8), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu8_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt8), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu8_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int16), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami16_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int16), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami16_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt16), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu16_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt16), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu16_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int32), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int32), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt32), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt32), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogrami64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogrami64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramu64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramu64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float32), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float32), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf32_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_histogramf64_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_histogramf64_local', xxHash64(splitByChar(':', timeseries_name)[1])); + +/* The field tables store named dimensions of each timeseries. + * + * As with the measurement tables, there is one field table for each field data + * type. Fields are deduplicated by using the "replacing merge tree", though + * this behavior **must not** be relied upon for query correctness. + * + * The index for the fields differs from the measurements, however. Rows are + * sorted by timeseries name, then field name, field value, and finally + * timeseries key. This reflects the most common pattern for looking them up: + * by field name and possibly value, within a timeseries. The resulting keys are + * usually then used to look up measurements. + * + * NOTE: We may want to consider a secondary index on these tables, sorting by + * timeseries name and then key, since it would improve lookups where one + * already has the key. Realistically though, these tables are quite small and + * so performance benefits will be low in absolute terms. + */ +CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int64 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value IPv6 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value String +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UUID +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +/* The timeseries schema table stores the extracted schema for the samples + * oximeter collects. + */ +CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema ON CLUSTER oximeter_cluster +( + timeseries_name String, + fields Nested( + name String, + type Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6 + ), + source Enum( + 'Target' = 1, + 'Metric' = 2 + ) + ), + datum_type Enum( + 'Bool' = 1, + 'I64' = 2, + 'F64' = 3, + 'String' = 4, + 'Bytes' = 5, + 'CumulativeI64' = 6, + 'CumulativeF64' = 7, + 'HistogramI64' = 8, + 'HistogramF64' = 9, + 'I8' = 10, + 'U8' = 11, + 'I16' = 12, + 'U16' = 13, + 'I32' = 14, + 'U32' = 15, + 'U64' = 16, + 'F32' = 17, + 'CumulativeU64' = 18, + 'CumulativeF32' = 19, + 'HistogramI8' = 20, + 'HistogramU8' = 21, + 'HistogramI16' = 22, + 'HistogramU16' = 23, + 'HistogramI32' = 24, + 'HistogramU32' = 25, + 'HistogramU64' = 26, + 'HistogramF32' = 27 + ), + created DateTime64(9, 'UTC') +) +ENGINE = ReplicatedMergeTree() +ORDER BY (timeseries_name, fields.name); diff --git a/oximeter/db/src/db-wipe-replicated.sql b/oximeter/db/schema/replicated/db-wipe.sql similarity index 100% rename from oximeter/db/src/db-wipe-replicated.sql rename to oximeter/db/schema/replicated/db-wipe.sql diff --git a/oximeter/db/src/db-single-node-init.sql b/oximeter/db/schema/single-node/2/up.sql similarity index 88% rename from oximeter/db/src/db-single-node-init.sql rename to oximeter/db/schema/single-node/2/up.sql index 2fb5c363977..4756e2897d0 100644 --- a/oximeter/db/src/db-single-node-init.sql +++ b/oximeter/db/schema/single-node/2/up.sql @@ -1,5 +1,6 @@ CREATE DATABASE IF NOT EXISTS oximeter; --- + +/* The version table contains metadata about the `oximeter` database */ CREATE TABLE IF NOT EXISTS oximeter.version ( value UInt64, @@ -7,7 +8,17 @@ CREATE TABLE IF NOT EXISTS oximeter.version ) ENGINE = MergeTree() ORDER BY (value, timestamp); --- + +/* The measurement tables contain all individual samples from each timeseries. + * + * Each table stores a single datum type, and otherwise contains nearly the same + * structure. The primary sorting key is on the timeseries name, key, and then + * timestamp, so that all timeseries from the same schema are grouped, followed + * by all samples from the same timeseries. + * + * This reflects that one usually looks up the _key_ in one or more field table, + * and then uses that to index quickly into the measurements tables. + */ CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ( timeseries_name String, @@ -18,7 +29,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bool ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 ( timeseries_name String, @@ -29,7 +40,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 ( timeseries_name String, @@ -40,7 +51,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 ( timeseries_name String, @@ -51,7 +62,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 ( timeseries_name String, @@ -62,7 +73,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 ( timeseries_name String, @@ -73,7 +84,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ( timeseries_name String, @@ -84,7 +95,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ( timeseries_name String, @@ -95,7 +106,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ( timeseries_name String, @@ -106,7 +117,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ( timeseries_name String, @@ -117,7 +128,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ( timeseries_name String, @@ -128,7 +139,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_string ( timeseries_name String, @@ -139,7 +150,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_string ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ( timeseries_name String, @@ -150,7 +161,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ( timeseries_name String, @@ -162,7 +173,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 ( timeseries_name String, @@ -174,7 +185,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 ( timeseries_name String, @@ -186,8 +197,8 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- --- + + CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ( timeseries_name String, @@ -199,7 +210,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 ( timeseries_name String, @@ -212,7 +223,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 ( timeseries_name String, @@ -225,7 +236,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 ( timeseries_name String, @@ -238,7 +249,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 ( timeseries_name String, @@ -251,7 +262,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 ( timeseries_name String, @@ -264,7 +275,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 ( timeseries_name String, @@ -277,7 +288,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ( timeseries_name String, @@ -290,7 +301,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 ( timeseries_name String, @@ -303,7 +314,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 ( timeseries_name String, @@ -316,7 +327,7 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ( timeseries_name String, @@ -329,7 +340,24 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 ENGINE = MergeTree() ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) TTL toDateTime(timestamp) + INTERVAL 30 DAY; --- + +/* The field tables store named dimensions of each timeseries. + * + * As with the measurement tables, there is one field table for each field data + * type. Fields are deduplicated by using the "replacing merge tree", though + * this behavior **must not** be relied upon for query correctness. + * + * The index for the fields differs from the measurements, however. Rows are + * sorted by timeseries name, then field name, field value, and finally + * timeseries key. This reflects the most common pattern for looking them up: + * by field name and possibly value, within a timeseries. The resulting keys are + * usually then used to look up measurements. + * + * NOTE: We may want to consider a secondary index on these tables, sorting by + * timeseries name and then key, since it would improve lookups where one + * already has the key. Realistically though, these tables are quite small and + * so performance benefits will be low in absolute terms. + */ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ( timeseries_name String, @@ -339,7 +367,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i8 ( timeseries_name String, @@ -349,7 +377,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i8 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u8 ( timeseries_name String, @@ -359,7 +387,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u8 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i16 ( timeseries_name String, @@ -369,7 +397,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i16 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u16 ( timeseries_name String, @@ -379,7 +407,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u16 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i32 ( timeseries_name String, @@ -389,7 +417,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i32 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u32 ( timeseries_name String, @@ -399,7 +427,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u32 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ( timeseries_name String, @@ -409,7 +437,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_u64 ( timeseries_name String, @@ -419,7 +447,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_u64 ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ( timeseries_name String, @@ -429,7 +457,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_string ( timeseries_name String, @@ -439,7 +467,7 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_string ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ( timeseries_name String, @@ -449,7 +477,10 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_uuid ) ENGINE = ReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); --- + +/* The timeseries schema table stores the extracted schema for the samples + * oximeter collects. + */ CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema ( timeseries_name String, diff --git a/oximeter/db/schema/single-node/3/up.sql b/oximeter/db/schema/single-node/3/up.sql new file mode 100644 index 00000000000..073d643564f --- /dev/null +++ b/oximeter/db/schema/single-node/3/up.sql @@ -0,0 +1,22 @@ +/* This adds missing field types to the timeseries schema table field.type + * column, by augmentin the enum to capture new values. Note that the existing + * variants can't be moved or changed, so the new ones are added at the end. The + * client never sees this discriminant, only the string, so it should not + * matter. + */ +ALTER TABLE oximeter.timeseries_schema + MODIFY COLUMN IF EXISTS fields.type + Array(Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6, + 'I8' = 7, + 'U8' = 8, + 'I16' = 9, + 'U16' = 10, + 'I32' = 11, + 'U32' = 12, + 'U64' = 13 + )); diff --git a/oximeter/db/schema/single-node/db-init.sql b/oximeter/db/schema/single-node/db-init.sql new file mode 100644 index 00000000000..ee5e91c4b7e --- /dev/null +++ b/oximeter/db/schema/single-node/db-init.sql @@ -0,0 +1,540 @@ +CREATE DATABASE IF NOT EXISTS oximeter; + +/* The version table contains metadata about the `oximeter` database */ +CREATE TABLE IF NOT EXISTS oximeter.version +( + value UInt64, + timestamp DateTime64(9, 'UTC') +) +ENGINE = MergeTree() +ORDER BY (value, timestamp); + +/* The measurement tables contain all individual samples from each timeseries. + * + * Each table stores a single datum type, and otherwise contains nearly the same + * structure. The primary sorting key is on the timeseries name, key, and then + * timestamp, so that all timeseries from the same schema are grouped, followed + * by all samples from the same timeseries. + * + * This reflects that one usually looks up the _key_ in one or more field table, + * and then uses that to index quickly into the measurements tables. + */ +CREATE TABLE IF NOT EXISTS oximeter.measurements_bool +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i8 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int8 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u8 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt8 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i16 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int16 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u16 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt16 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i32 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int32 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt32 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_string +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum String +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_bytes +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Array(UInt8) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativei64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Int64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativeu64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum UInt64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef32 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + + +CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + datum Float64 +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami8 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int8), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu8 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt8), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami16 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int16), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu16 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt16), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami32 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int32), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu32 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt32), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogrami64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Int64), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramu64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(UInt64), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf32 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float32), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +CREATE TABLE IF NOT EXISTS oximeter.measurements_histogramf64 +( + timeseries_name String, + timeseries_key UInt64, + start_time DateTime64(9, 'UTC'), + timestamp DateTime64(9, 'UTC'), + bins Array(Float64), + counts Array(UInt64) +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, timeseries_key, start_time, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; + +/* The field tables store named dimensions of each timeseries. + * + * As with the measurement tables, there is one field table for each field data + * type. Fields are deduplicated by using the "replacing merge tree", though + * this behavior **must not** be relied upon for query correctness. + * + * The index for the fields differs from the measurements, however. Rows are + * sorted by timeseries name, then field name, field value, and finally + * timeseries key. This reflects the most common pattern for looking them up: + * by field name and possibly value, within a timeseries. The resulting keys are + * usually then used to look up measurements. + * + * NOTE: We may want to consider a secondary index on these tables, sorting by + * timeseries name and then key, since it would improve lookups where one + * already has the key. Realistically though, these tables are quite small and + * so performance benefits will be low in absolute terms. + */ +CREATE TABLE IF NOT EXISTS oximeter.fields_bool +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_i8 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int8 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_u8 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_i16 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int16 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_u16 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt16 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_i32 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int32 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_u32 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt32 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_i64 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int64 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_u64 +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt64 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value IPv6 +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_string +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value String +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +CREATE TABLE IF NOT EXISTS oximeter.fields_uuid +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UUID +) +ENGINE = ReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); + +/* The timeseries schema table stores the extracted schema for the samples + * oximeter collects. + */ +CREATE TABLE IF NOT EXISTS oximeter.timeseries_schema +( + timeseries_name String, + fields Nested( + name String, + type Enum( + 'Bool' = 1, + 'I64' = 2, + 'IpAddr' = 3, + 'String' = 4, + 'Uuid' = 6, + 'I8' = 7, + 'U8' = 8, + 'I16' = 9, + 'U16' = 10, + 'I32' = 11, + 'U32' = 12, + 'U64' = 13 + ), + source Enum( + 'Target' = 1, + 'Metric' = 2 + ) + ), + datum_type Enum( + 'Bool' = 1, + 'I64' = 2, + 'F64' = 3, + 'String' = 4, + 'Bytes' = 5, + 'CumulativeI64' = 6, + 'CumulativeF64' = 7, + 'HistogramI64' = 8, + 'HistogramF64' = 9, + 'I8' = 10, + 'U8' = 11, + 'I16' = 12, + 'U16' = 13, + 'I32' = 14, + 'U32' = 15, + 'U64' = 16, + 'F32' = 17, + 'CumulativeU64' = 18, + 'CumulativeF32' = 19, + 'HistogramI8' = 20, + 'HistogramU8' = 21, + 'HistogramI16' = 22, + 'HistogramU16' = 23, + 'HistogramI32' = 24, + 'HistogramU32' = 25, + 'HistogramU64' = 26, + 'HistogramF32' = 27 + ), + created DateTime64(9, 'UTC') +) +ENGINE = MergeTree() +ORDER BY (timeseries_name, fields.name); diff --git a/oximeter/db/src/db-wipe-single-node.sql b/oximeter/db/schema/single-node/db-wipe.sql similarity index 100% rename from oximeter/db/src/db-wipe-single-node.sql rename to oximeter/db/schema/single-node/db-wipe.sql diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 69e91f888af..6b51e50b26f 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -35,6 +35,10 @@ use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::SocketAddr; use std::num::NonZeroU32; +use std::ops::Bound; +use std::path::Path; +use std::path::PathBuf; +use tokio::fs; use tokio::sync::Mutex; use uuid::Uuid; @@ -267,14 +271,301 @@ impl Client { .map_err(|e| Error::Database(e.to_string())) } + /// Read the available schema versions in the provided directory. + pub async fn read_available_schema_versions( + log: &Logger, + is_replicated: bool, + schema_dir: impl AsRef, + ) -> Result, Error> { + let dir = schema_dir.as_ref().join(if is_replicated { + "replicated" + } else { + "single-node" + }); + let mut rd = + fs::read_dir(&dir).await.map_err(|err| Error::ReadSchemaDir { + context: format!( + "Failed to read schema directory '{}'", + dir.display() + ), + err, + })?; + let mut versions = BTreeSet::new(); + debug!(log, "reading entries from schema dir"; "dir" => dir.display()); + while let Some(entry) = + rd.next_entry().await.map_err(|err| Error::ReadSchemaDir { + context: String::from("Failed to read directory entry"), + err, + })? + { + let name = entry + .file_name() + .into_string() + .map_err(|bad| Error::NonUtf8SchemaDirEntry(bad.to_owned()))?; + let md = + entry.metadata().await.map_err(|err| Error::ReadSchemaDir { + context: String::from("Failed to fetch entry metatdata"), + err, + })?; + if !md.is_dir() { + debug!(log, "skipping non-directory"; "name" => &name); + continue; + } + match name.parse() { + Ok(ver) => { + debug!(log, "valid version dir"; "ver" => ver); + assert!(versions.insert(ver), "Versions should be unique"); + } + Err(e) => warn!( + log, + "found directory with non-u64 name, skipping"; + "name" => name, + "error" => ?e, + ), + } + } + Ok(versions) + } + + /// Ensure that the database is upgraded to the desired version of the + /// schema. + /// + /// NOTE: This function is not safe for concurrent usage! + pub async fn ensure_schema( + &self, + replicated: bool, + desired_version: u64, + schema_dir: impl AsRef, + ) -> Result<(), Error> { + let schema_dir = schema_dir.as_ref(); + let latest = self.read_latest_version().await?; + if latest == desired_version { + debug!( + self.log, + "database already at desired version"; + "version" => latest, + ); + return Ok(()); + } + debug!( + self.log, + "starting upgrade to desired version {}", desired_version + ); + let available = Self::read_available_schema_versions( + &self.log, + replicated, + schema_dir.clone(), + ) + .await?; + // We explicitly ignore version 0, which implies the database doesn't + // exist at all. + if latest > 0 && !available.contains(&latest) { + return Err(Error::MissingSchemaVersion(latest)); + } + if !available.contains(&desired_version) { + return Err(Error::MissingSchemaVersion(desired_version)); + } + + // Walk through all changes between current version (exclusive) and + // the desired version (inclusive). + let range = (Bound::Excluded(latest), Bound::Included(desired_version)); + let versions_to_apply = available.range(range); + let mut current = latest; + for version in versions_to_apply { + if let Err(e) = self + .apply_one_schema_upgrade(replicated, *version, schema_dir) + .await + { + error!( + self.log, + "failed to apply schema upgrade"; + "current_version" => current, + "next_version" => *version, + "replicated" => replicated, + "schema_dir" => schema_dir.display(), + "error" => ?e, + ); + return Err(e); + } + current = *version; + self.insert_version(current).await?; + } + Ok(()) + } + + fn verify_schema_upgrades( + files: &BTreeMap, + ) -> Result<(), Error> { + let re = + regex::Regex::new("(INSERT INTO)|(ALTER TABLE .* DELETE)").unwrap(); + for (path, sql) in files.values() { + if re.is_match(&sql) { + return Err(Error::SchemaUpdateModifiesData { + path: path.clone(), + statement: sql.clone(), + }); + } + if sql.matches(';').count() > 1 { + return Err(Error::MultipleSqlStatementsInSchemaUpdate { + path: path.clone(), + }); + } + } + Ok(()) + } + + async fn apply_one_schema_upgrade( + &self, + replicated: bool, + next_version: u64, + schema_dir: impl AsRef, + ) -> Result<(), Error> { + let schema_dir = schema_dir.as_ref(); + let upgrade_file_contents = Self::read_schema_upgrade_sql_files( + &self.log, + replicated, + next_version, + schema_dir, + ) + .await?; + + // We need to be pretty careful at this point with any data-modifying + // statements. There should be no INSERT queries, for example, which we + // check here. ClickHouse doesn't support much in the way of data + // modification, which makes this pretty easy. + Self::verify_schema_upgrades(&upgrade_file_contents)?; + + // Apply each file in sequence in the upgrade directory. + for (name, (path, sql)) in upgrade_file_contents.into_iter() { + debug!( + self.log, + "apply schema upgrade file"; + "version" => next_version, + "path" => path.display(), + "filename" => &name, + ); + match self.execute(sql).await { + Ok(_) => debug!( + self.log, + "successfully applied schema upgrade file"; + "version" => next_version, + "path" => path.display(), + "name" => name, + ), + Err(e) => { + return Err(e); + } + } + } + Ok(()) + } + + // Read all SQL files, in order, in the schema directory for the provided + // version. + async fn read_schema_upgrade_sql_files( + log: &Logger, + replicated: bool, + version: u64, + schema_dir: impl AsRef, + ) -> Result, Error> { + let version_schema_dir = schema_dir + .as_ref() + .join(if replicated { "replicated" } else { "single-node" }) + .join(version.to_string()); + let mut rd = + fs::read_dir(&version_schema_dir).await.map_err(|err| { + Error::ReadSchemaDir { + context: format!( + "Failed to read schema directory '{}'", + version_schema_dir.display() + ), + err, + } + })?; + + let mut upgrade_files = BTreeMap::new(); + debug!(log, "reading SQL files from schema dir"; "dir" => version_schema_dir.display()); + while let Some(entry) = + rd.next_entry().await.map_err(|err| Error::ReadSchemaDir { + context: String::from("Failed to read directory entry"), + err, + })? + { + let path = entry.path(); + let Some(ext) = path.extension() else { + warn!( + log, + "skipping schema dir entry without an extension"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + continue; + }; + let Some(ext) = ext.to_str() else { + warn!( + log, + "skipping schema dir entry with non-UTF8 extension"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + continue; + }; + if ext.eq_ignore_ascii_case("sql") { + let Some(stem) = path.file_stem() else { + warn!( + log, + "skipping schema SQL file with no name"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + continue; + }; + let Some(name) = stem.to_str() else { + warn!( + log, + "skipping schema SQL file with non-UTF8 name"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + continue; + }; + let contents = + fs::read_to_string(&path).await.map_err(|err| { + Error::ReadSqlFile { + context: format!( + "Reading SQL file '{}' for upgrade", + path.display(), + ), + err, + } + })?; + upgrade_files + .insert(name.to_string(), (path.to_owned(), contents)); + } else { + warn!( + log, + "skipping non-SQL schema dir entry"; + "dir" => version_schema_dir.display(), + "path" => path.display(), + ); + continue; + } + } + Ok(upgrade_files) + } + /// Validates that the schema used by the DB matches the version used by /// the executable using it. /// - /// This function will wipe metrics data if the version stored within + /// This function will **wipe** metrics data if the version stored within /// the DB is less than the schema version of Oximeter. /// If the version in the DB is newer than what is known to Oximeter, an /// error is returned. /// + /// If you would like to non-destructively upgrade the database, then either + /// the included binary `clickhouse-schema-updater` or the method + /// [`Client::ensure_schema()`] should be used instead. + /// /// NOTE: This function is not safe for concurrent usage! pub async fn initialize_db_with_version( &self, @@ -304,11 +595,10 @@ impl Client { } else if version > expected_version { // If the on-storage version is greater than the constant embedded // into this binary, we may have downgraded. - return Err(Error::Database( - format!( - "Expected version {expected_version}, saw {version}. Downgrading is not supported.", - ) - )); + return Err(Error::DatabaseVersionMismatch { + expected: crate::model::OXIMETER_VERSION, + found: version, + }); } else { // If the version matches, we don't need to update the DB return Ok(()); @@ -319,7 +609,8 @@ impl Client { Ok(()) } - async fn read_latest_version(&self) -> Result { + /// Read the latest version applied in the database. + pub async fn read_latest_version(&self) -> Result { let sql = format!( "SELECT MAX(value) FROM {db_name}.version;", db_name = crate::DATABASE_NAME, @@ -354,6 +645,20 @@ impl Client { Ok(version) } + /// Return Ok if the DB is at exactly the version compatible with this + /// client. + pub async fn check_db_is_at_expected_version(&self) -> Result<(), Error> { + let ver = self.read_latest_version().await?; + if ver == crate::model::OXIMETER_VERSION { + Ok(()) + } else { + Err(Error::DatabaseVersionMismatch { + expected: crate::model::OXIMETER_VERSION, + found: ver, + }) + } + } + async fn insert_version(&self, version: u64) -> Result<(), Error> { let sql = format!( "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", @@ -365,7 +670,7 @@ impl Client { /// Verifies if instance is part of oximeter_cluster pub async fn is_oximeter_cluster(&self) -> Result { - let sql = String::from("SHOW CLUSTERS FORMAT JSONEachRow;"); + let sql = "SHOW CLUSTERS FORMAT JSONEachRow;"; let res = self.execute_with_body(sql).await?; Ok(res.contains("oximeter_cluster")) } @@ -501,7 +806,11 @@ impl Client { S: AsRef, { let sql = sql.as_ref().to_string(); - trace!(self.log, "executing SQL query: {}", sql); + trace!( + self.log, + "executing SQL query"; + "sql" => &sql, + ); let id = usdt::UniqueId::new(); probes::query__start!(|| (&id, &sql)); let response = handle_db_response( @@ -720,6 +1029,20 @@ impl Client { // many as one per sample. It's not clear how to structure this in a way that's useful. Ok(()) } + + // Run one or more SQL statements. + // + // This is intended to be used for the methods which run SQL from one of the + // SQL files in the crate, e.g., the DB initialization or update files. + async fn run_many_sql_statements( + &self, + sql: impl AsRef, + ) -> Result<(), Error> { + for stmt in sql.as_ref().split(';').filter(|s| !s.trim().is_empty()) { + self.execute(stmt).await?; + } + Ok(()) + } } #[derive(Debug)] @@ -770,11 +1093,19 @@ impl DbWrite for Client { // The HTTP client doesn't support multiple statements per query, so we break them out here // manually. debug!(self.log, "initializing ClickHouse database"); - let sql = include_str!("./db-replicated-init.sql"); - for query in sql.split("\n--\n") { - self.execute(query.to_string()).await?; - } - Ok(()) + self.run_many_sql_statements(include_str!( + "../schema/replicated/db-init.sql" + )) + .await + } + + /// Wipe the ClickHouse database entirely from a replicated set up. + async fn wipe_replicated_db(&self) -> Result<(), Error> { + debug!(self.log, "wiping ClickHouse database"); + self.run_many_sql_statements(include_str!( + "../schema/replicated/db-wipe.sql" + )) + .await } /// Initialize a single node telemetry database, creating tables as needed. @@ -782,25 +1113,19 @@ impl DbWrite for Client { // The HTTP client doesn't support multiple statements per query, so we break them out here // manually. debug!(self.log, "initializing ClickHouse database"); - let sql = include_str!("./db-single-node-init.sql"); - for query in sql.split("\n--\n") { - self.execute(query.to_string()).await?; - } - Ok(()) + self.run_many_sql_statements(include_str!( + "../schema/single-node/db-init.sql" + )) + .await } /// Wipe the ClickHouse database entirely from a single node set up. async fn wipe_single_node_db(&self) -> Result<(), Error> { debug!(self.log, "wiping ClickHouse database"); - let sql = include_str!("./db-wipe-single-node.sql").to_string(); - self.execute(sql).await - } - - /// Wipe the ClickHouse database entirely from a replicated set up. - async fn wipe_replicated_db(&self) -> Result<(), Error> { - debug!(self.log, "wiping ClickHouse database"); - let sql = include_str!("./db-wipe-replicated.sql").to_string(); - self.execute(sql).await + self.run_many_sql_statements(include_str!( + "../schema/single-node/db-wipe.sql" + )) + .await } } @@ -839,7 +1164,9 @@ mod tests { use oximeter::Metric; use oximeter::Target; use std::net::Ipv6Addr; + use std::path::PathBuf; use std::time::Duration; + use tempfile::TempDir; use tokio::time::sleep; use uuid::Uuid; @@ -3338,4 +3665,346 @@ mod tests { ); } } + + async fn create_test_upgrade_schema_directory( + replicated: bool, + versions: &[u64], + ) -> (TempDir, Vec) { + assert!(!versions.is_empty()); + let schema_dir = TempDir::new().expect("failed to create tempdir"); + let mut paths = Vec::with_capacity(versions.len()); + for version in versions.iter() { + let version_dir = schema_dir + .path() + .join(if replicated { "replicated" } else { "single-node" }) + .join(version.to_string()); + fs::create_dir_all(&version_dir) + .await + .expect("failed to make version directory"); + paths.push(version_dir); + } + (schema_dir, paths) + } + + #[tokio::test] + async fn test_read_schema_upgrade_sql_files() { + let logctx = test_setup_log("test_read_schema_upgrade_sql_files"); + let log = &logctx.log; + const REPLICATED: bool = false; + const VERSION: u64 = 1; + let (schema_dir, version_dirs) = + create_test_upgrade_schema_directory(REPLICATED, &[VERSION]).await; + let version_dir = &version_dirs[0]; + + // Create a few SQL files in there. + const SQL: &str = "SELECT NOW();"; + let filenames: Vec<_> = (0..3).map(|i| format!("up-{i}.sql")).collect(); + for name in filenames.iter() { + let full_path = version_dir.join(name); + fs::write(full_path, SQL).await.expect("Failed to write dummy SQL"); + } + + let upgrade_files = Client::read_schema_upgrade_sql_files( + log, + REPLICATED, + VERSION, + schema_dir.path(), + ) + .await + .expect("Failed to read schema upgrade files"); + for filename in filenames.iter() { + let stem = filename.split_once('.').unwrap().0; + assert_eq!( + upgrade_files.get(stem).unwrap().1, + SQL, + "upgrade SQL file contents are not correct" + ); + } + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_apply_one_schema_upgrade() { + const TEST_NAME: &str = "test_apply_one_schema_upgrade"; + let logctx = test_setup_log(TEST_NAME); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + let client = Client::new(address, &log); + + // We'll test moving from version 1, which just creates a database and + // table, to version 2, which adds two columns to that table in + // different SQL files. + const REPLICATED: bool = false; + client.execute(format!("CREATE DATABASE {TEST_NAME};")).await.unwrap(); + client + .execute(format!( + "\ + CREATE TABLE {TEST_NAME}.tbl (\ + `col0` UInt8 \ + )\ + ENGINE = MergeTree() + ORDER BY `col0`;\ + " + )) + .await + .unwrap(); + + // Write out the upgrading SQL files. + // + // Note that all of these statements are going in the version 2 schema + // directory. + let (schema_dir, version_dirs) = + create_test_upgrade_schema_directory(REPLICATED, &[NEXT_VERSION]) + .await; + const NEXT_VERSION: u64 = 2; + let first_sql = + format!("ALTER TABLE {TEST_NAME}.tbl ADD COLUMN `col1` UInt16;"); + let second_sql = + format!("ALTER TABLE {TEST_NAME}.tbl ADD COLUMN `col2` String;"); + let all_sql = [first_sql, second_sql]; + let version_dir = &version_dirs[0]; + for (i, sql) in all_sql.iter().enumerate() { + let path = version_dir.join(format!("up-{i}.sql")); + fs::write(path, sql) + .await + .expect("failed to write out upgrade SQL file"); + } + + // Apply the upgrade itself. + client + .apply_one_schema_upgrade( + REPLICATED, + NEXT_VERSION, + schema_dir.path(), + ) + .await + .expect("Failed to apply one schema upgrade"); + + // Check that it actually worked! + let body = client + .execute_with_body(format!( + "\ + SELECT name, type FROM system.columns \ + WHERE database = '{TEST_NAME}' AND table = 'tbl' \ + ORDER BY name \ + FORMAT CSV;\ + " + )) + .await + .unwrap(); + let mut lines = body.lines(); + assert_eq!(lines.next().unwrap(), "\"col0\",\"UInt8\""); + assert_eq!(lines.next().unwrap(), "\"col1\",\"UInt16\""); + assert_eq!(lines.next().unwrap(), "\"col2\",\"String\""); + assert!(lines.next().is_none()); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_schema_with_missing_desired_schema_version_fails() { + let logctx = test_setup_log( + "test_ensure_schema_with_missing_desired_schema_version_fails", + ); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + let client = Client::new(address, &log); + const REPLICATED: bool = false; + client + .initialize_db_with_version( + REPLICATED, + crate::model::OXIMETER_VERSION, + ) + .await + .expect("failed to initialize DB"); + + let (schema_dir, _) = create_test_upgrade_schema_directory( + REPLICATED, + &[crate::model::OXIMETER_VERSION], + ) + .await; + + let err = client.ensure_schema( + REPLICATED, + 1000, + schema_dir.path(), + ).await + .expect_err("Should have received an error when ensuring a non-existing version"); + let Error::MissingSchemaVersion(missing) = err else { + panic!("Expected an Error::MissingSchemaVersion, found {err:?}"); + }; + assert_eq!(missing, 1000); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_ensure_schema_walks_through_multiple_steps() { + const TEST_NAME: &str = + "test_ensure_schema_walks_through_multiple_steps"; + let logctx = test_setup_log(TEST_NAME); + let log = &logctx.log; + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), db.port()); + let client = Client::new(address, &log); + + // We need to actually have the oximeter DB here, and the version table, + // since `ensure_schema()` writes out versions to the DB as they're + // applied. + client + .execute(format!("CREATE DATABASE {}", crate::DATABASE_NAME)) + .await + .unwrap(); + client + .execute(format!( + "\ + CREATE TABLE {}.version ( \ + value UInt64, \ + timestamp DateTime64(9, 'UTC') \ + ) \ + ENGINE = MergeTree() \ + ORDER BY (value, timestamp);", + crate::DATABASE_NAME + )) + .await + .unwrap(); + + // We'll test moving from version 1, which just creates a database and + // table, to version 3, stopping off at version 2. This is similar to + // the `test_apply_one_schema_upgrade` test, but we split the two + // modifications over two versions, rather than as multiple schema + // upgrades in one version bump. + const REPLICATED: bool = false; + client.execute(format!("CREATE DATABASE {TEST_NAME};")).await.unwrap(); + client + .execute(format!( + "\ + CREATE TABLE {TEST_NAME}.tbl (\ + `col0` UInt8 \ + )\ + ENGINE = MergeTree() + ORDER BY `col0`;\ + " + )) + .await + .unwrap(); + + // Write out the upgrading SQL files. + // + // Note that each statements goes into a different version. + const VERSIONS: [u64; 2] = [2, 3]; + let (schema_dir, version_dirs) = + create_test_upgrade_schema_directory(REPLICATED, &VERSIONS).await; + let first_sql = + format!("ALTER TABLE {TEST_NAME}.tbl ADD COLUMN `col1` UInt16;"); + let second_sql = + format!("ALTER TABLE {TEST_NAME}.tbl ADD COLUMN `col2` String;"); + let all_sql = [first_sql, second_sql]; + for (version_dir, sql) in version_dirs.iter().zip(all_sql) { + let path = version_dir.join("up.sql"); + fs::write(path, sql) + .await + .expect("failed to write out upgrade SQL file"); + } + + // Apply the sequence of upgrades. + client + .ensure_schema(REPLICATED, VERSIONS[1], schema_dir.path()) + .await + .expect("Failed to apply one schema upgrade"); + + // Check that it actually worked! + let body = client + .execute_with_body(format!( + "\ + SELECT name, type FROM system.columns \ + WHERE database = '{TEST_NAME}' AND table = 'tbl' \ + ORDER BY name \ + FORMAT CSV;\ + " + )) + .await + .unwrap(); + let mut lines = body.lines(); + assert_eq!(lines.next().unwrap(), "\"col0\",\"UInt8\""); + assert_eq!(lines.next().unwrap(), "\"col1\",\"UInt16\""); + assert_eq!(lines.next().unwrap(), "\"col2\",\"String\""); + assert!(lines.next().is_none()); + + let latest_version = client.read_latest_version().await.unwrap(); + assert_eq!( + latest_version, VERSIONS[1], + "Updated version not written to the database" + ); + + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } + + #[test] + fn test_verify_schema_upgrades() { + let mut map = BTreeMap::new(); + + // Check that we fail if the upgrade tries to insert data. + map.insert( + "up".into(), + ( + PathBuf::from("/foo/bar/up.sql"), + String::from( + "INSERT INTO oximeter.version (*) VALUES (100, now());", + ), + ), + ); + assert!(Client::verify_schema_upgrades(&map).is_err()); + + // Sanity check for the normal case. + map.clear(); + map.insert( + "up".into(), + ( + PathBuf::from("/foo/bar/up.sql"), + String::from("ALTER TABLE oximeter.measurements_bool ADD COLUMN foo UInt64;") + ), + ); + assert!(Client::verify_schema_upgrades(&map).is_ok()); + + // Check that we fail if the upgrade ties to delete any data. + map.clear(); + map.insert( + "up".into(), + ( + PathBuf::from("/foo/bar/up.sql"), + String::from("ALTER TABLE oximeter.measurements_bool DELETE WHERE timestamp < NOW();") + ), + ); + assert!(Client::verify_schema_upgrades(&map).is_err()); + + // Check that we fail if the upgrade contains multiple SQL statements. + map.clear(); + map.insert( + "up".into(), + ( + PathBuf::from("/foo/bar/up.sql"), + String::from( + "\ + ALTER TABLE oximeter.measurements_bool \ + ADD COLUMN foo UInt8; \ + ALTER TABLE oximeter.measurements_bool \ + ADD COLUMN bar UInt8; \ + ", + ), + ), + ); + assert!(Client::verify_schema_upgrades(&map).is_err()); + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 11ecbeddc87..6b7edfb720e 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -15,7 +15,9 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; use std::collections::BTreeSet; use std::convert::TryFrom; +use std::io; use std::num::NonZeroU32; +use std::path::PathBuf; use thiserror::Error; mod client; @@ -23,7 +25,9 @@ pub mod model; pub mod query; pub use client::{Client, DbWrite}; -#[derive(Clone, Debug, Error)] +pub use model::OXIMETER_VERSION; + +#[derive(Debug, Error)] pub enum Error { #[error("Oximeter core error: {0}")] Oximeter(#[from] oximeter::MetricsError), @@ -79,6 +83,35 @@ pub enum Error { #[error("Query must resolve to a single timeseries if limit is specified")] InvalidLimitQuery, + + #[error("Database is not at expected version")] + DatabaseVersionMismatch { expected: u64, found: u64 }, + + #[error("Could not read schema directory")] + ReadSchemaDir { + context: String, + #[source] + err: io::Error, + }, + + #[error("Could not read SQL file from path")] + ReadSqlFile { + context: String, + #[source] + err: io::Error, + }, + + #[error("Non-UTF8 schema directory entry")] + NonUtf8SchemaDirEntry(std::ffi::OsString), + + #[error("Missing desired schema version: {0}")] + MissingSchemaVersion(u64), + + #[error("Data-modifying operations are not supported in schema updates")] + SchemaUpdateModifiesData { path: PathBuf, statement: String }, + + #[error("Schema update SQL files should contain at most 1 statement")] + MultipleSqlStatementsInSchemaUpdate { path: PathBuf }, } /// A timeseries name. diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 41c7ab9d249..715e025a04d 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -38,11 +38,12 @@ use uuid::Uuid; /// Describes the version of the Oximeter database. /// -/// See: [crate::Client::initialize_db_with_version] for usage. +/// For usage and details see: /// -/// TODO(#4271): The current implementation of versioning will wipe the metrics -/// database if this number is incremented. -pub const OXIMETER_VERSION: u64 = 2; +/// - [`crate::Client::initialize_db_with_version`] +/// - [`crate::Client::ensure_schema`] +/// - The `clickhouse-schema-updater` binary in this crate +pub const OXIMETER_VERSION: u64 = 3; // Wrapper type to represent a boolean in the database. // diff --git a/package-manifest.toml b/package-manifest.toml index b8ffb2756aa..d6693cb6d51 100644 --- a/package-manifest.toml +++ b/package-manifest.toml @@ -107,9 +107,12 @@ setup_hint = """ service_name = "oximeter" only_for_targets.image = "standard" source.type = "local" -source.rust.binary_names = ["oximeter"] +source.rust.binary_names = ["oximeter", "clickhouse-schema-updater"] source.rust.release = true -source.paths = [ { from = "smf/oximeter", to = "/var/svc/manifest/site/oximeter" } ] +source.paths = [ + { from = "smf/oximeter", to = "/var/svc/manifest/site/oximeter" }, + { from = "oximeter/db/schema", to = "/opt/oxide/oximeter/schema" }, +] output.type = "zone" [package.clickhouse]