Skip to content

Commit

Permalink
Read / write timeseries schema with the native client
Browse files Browse the repository at this point in the history
- Add methods for converting an array of objects into a data block, or
  deserializing an array out of one.
- Use new to/from block methods to read and write timeseries schema in
  the native format
- Add native `Connection` method for inserting data in a block.
- Implement deserialization of the `TableColumns` server message type,
  which is sent to client when it tries to insert data, and is used to
  check that the block to be inserted matches the structure of the table
  into which the insertion is directed.
- Add a build script, which reads the `db-init.sql` file to extract the
  enum data type definitions into constants. This is used in the
  serialization of timeseries schema rows into a block.
- Add better handling of HTTP / native addresses in Nexus setup
- Closes #6942
  • Loading branch information
bnaecker committed Oct 31, 2024
1 parent 2c51979 commit fa3b05f
Show file tree
Hide file tree
Showing 25 changed files with 1,183 additions and 231 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 23 additions & 3 deletions nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ pub struct SchemaConfig {
/// Optional configuration for the timeseries database.
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub struct TimeseriesDbConfig {
/// The HTTP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub address: Option<SocketAddr>,
/// The native TCP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub native_address: Option<SocketAddr>,
}

/// Configuration for the `Dendrite` dataplane daemon.
Expand Down Expand Up @@ -774,7 +778,9 @@ impl std::fmt::Display for SchemeName {
mod test {
use super::*;

use omicron_common::address::{Ipv6Subnet, RACK_PREFIX};
use omicron_common::address::{
Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX,
};
use omicron_common::api::internal::shared::SwitchLocation;

use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -784,7 +790,7 @@ mod test {
use dropshot::ConfigLoggingLevel;
use std::collections::HashMap;
use std::fs;
use std::net::{Ipv6Addr, SocketAddr};
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -889,6 +895,7 @@ mod test {
if_exists = "fail"
[timeseries_db]
address = "[::1]:8123"
native_address = "[::1]:9000"
[updates]
trusted_root = "/path/to/root.json"
[tunables]
Expand Down Expand Up @@ -1007,7 +1014,20 @@ mod test {
path: "/nonexistent/path".into()
},
timeseries_db: TimeseriesDbConfig {
address: Some("[::1]:8123".parse().unwrap())
address: Some(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_HTTP_PORT,
0,
0,
))),
native_address: Some(SocketAddr::V6(
SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_TCP_PORT,
0,
0,
)
)),
},
updates: Some(UpdatesConfig {
trusted_root: Utf8PathBuf::from("/path/to/root.json"),
Expand Down
6 changes: 4 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl super::Nexus {
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down Expand Up @@ -150,7 +151,8 @@ impl super::Nexus {
result.tables
})
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down
33 changes: 25 additions & 8 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::address::DENDRITE_PORT;
use omicron_common::address::MGD_PORT;
Expand Down Expand Up @@ -411,13 +412,12 @@ impl Nexus {
.map_err(|e| e.to_string())?;

// Client to the ClickHouse database.
let timeseries_client =
if let Some(http_address) = &config.pkg.timeseries_db.address {
let native_address =
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT);
oximeter_db::Client::new(*http_address, native_address, &log)
} else {
// TODO-cleanup: Remove this when we remove the HTTP client.
// TODO-cleanup: Simplify this when we remove the HTTP client.
let timeseries_client = match (
&config.pkg.timeseries_db.address,
&config.pkg.timeseries_db.native_address,
) {
(None, None) => {
let http_resolver =
qorb_resolver.for_service(ServiceName::Clickhouse);
let native_resolver =
Expand All @@ -427,7 +427,24 @@ impl Nexus {
native_resolver,
&log,
)
};
}
(maybe_http, maybe_native) => {
let (http_address, native_address) =
match (maybe_http, maybe_native) {
(None, None) => unreachable!("handled above"),
(None, Some(native)) => (
SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT),
*native,
),
(Some(http), None) => (
*http,
SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT),
),
(Some(http), Some(native)) => (*http, *native),
};
oximeter_db::Client::new(http_address, native_address, &log)
}
};

// TODO-cleanup We may want to make the populator a first-class
// background task.
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer(

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
Expand Down
3 changes: 3 additions & 0 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
let dataset_id = Uuid::new_v4();
let http_address = clickhouse.http_address();
let http_port = http_address.port();
let native_address = clickhouse.native_address();
self.rack_init_builder.add_clickhouse_dataset(
zpool_id,
dataset_id,
Expand All @@ -503,6 +504,8 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
.as_mut()
.expect("Tests expect to set a port of Clickhouse")
.set_port(http_port);
self.config.pkg.timeseries_db.native_address =
Some(native_address.into());

let pool_name = illumos_utils::zpool::ZpoolName::new_external(zpool_id)
.to_string()
Expand Down
7 changes: 7 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ camino.workspace = true
chrono.workspace = true
chrono-tz.workspace = true
clap.workspace = true
const_format.workspace = true
clickward.workspace = true
debug-ignore.workspace = true
dropshot.workspace = true
Expand All @@ -32,6 +33,7 @@ omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
parse-display.workspace = true
qorb.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down Expand Up @@ -93,6 +95,11 @@ optional = true
workspace = true
features = [ "rt-multi-thread", "macros" ]

[build-dependencies]
anyhow.workspace = true
nom.workspace = true
quote.workspace = true

[dev-dependencies]
camino-tempfile.workspace = true
criterion = { workspace = true, features = [ "async_tokio" ] }
Expand Down
100 changes: 100 additions & 0 deletions oximeter/db/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright 2024 Oxide Computer Company

use anyhow::Context as _;
use nom::IResult;

/// Build script for generating native type representations from the
/// ground-truth SQL definitions.
fn main() -> anyhow::Result<()> {
const INIT_FILE: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql");
let contents = std::fs::read_to_string(INIT_FILE)
.with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?;
let field_type_enum =
find_enum(&contents, "type").context("failed to find column 'type'")?;
let field_source_enum = find_enum(&contents, "source")
.context("failed to find column 'source'")?;
let datum_type_enum = find_enum(&contents, "datum_type")
.context("failed to find column 'datum_type'")?;
std::fs::write(
format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?),
[field_type_enum, field_source_enum, datum_type_enum].join("\n"),
)
.context("writing output file")?;
Ok(())
}

// Find an enum in the `timeseries_schema` table definition for the named
// column, and return the corresponding `DataType::Enum8()` definition for it.
fn find_enum(contents: &str, column: &str) -> Option<String> {
let needle = format!("{column} Enum(\n");
let start = contents.find(&needle)? + needle.len();
let s = &contents[start..].trim();
let (variants, names): (Vec<i8>, Vec<String>) =
variant_list(s).ok()?.1.into_iter().unzip();
let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase());
let enum_rev_map =
quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase());
let enum_type =
quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase());
let parsed_type = if column == "type" {
quote::quote! { ::oximeter::FieldType }
} else if column == "source" {
quote::quote! { ::oximeter::FieldSource }
} else if column == "datum_type" {
quote::quote! { ::oximeter::DatumType }
} else {
unreachable!();
};
Some(quote::quote! {
/// Mapping from the variant index to the string form.
#[allow(dead_code)]
static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap<i8, String>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
});
/// Reverse mapping, from the _parsed_ form to the variant index.
#[allow(dead_code)]
static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),*
])
});
/// Actual DataType::Enum8(_) with the contained variant-to-name mapping.
#[allow(dead_code)]
static #enum_type: ::std::sync::LazyLock<crate::native::block::DataType> = ::std::sync::LazyLock::new(|| {
crate::native::block::DataType::Enum8(
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
)
});
}.to_string())
}

fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> {
nom::multi::separated_list1(
nom::bytes::complete::is_a(" ,\n"),
single_variant,
)(s)
}

fn single_variant(s: &str) -> IResult<&str, (i8, String)> {
nom::combinator::map(
nom::sequence::separated_pair(
nom::sequence::delimited(
nom::bytes::complete::tag("'"),
nom::character::complete::alphanumeric1,
nom::bytes::complete::tag("'"),
),
nom::bytes::complete::tag(" = "),
nom::character::complete::i8,
),
|(name, variant): (&str, i8)| (variant, name.to_string()),
)(s)
}
Loading

0 comments on commit fa3b05f

Please sign in to comment.