Skip to content

Commit

Permalink
Merge branch 'rss-datasets' into put-zones-doesnt-put-datasets
Browse files Browse the repository at this point in the history
  • Loading branch information
smklein committed Nov 7, 2024
2 parents ce7ec38 + 2aa4a0e commit 0fb92ed
Show file tree
Hide file tree
Showing 43 changed files with 1,434 additions and 328 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ impl ClickhouseCli {
self.client_non_interactive(
ClickhouseClientType::Server,
format!(
"SELECT * FROM system.distributed_ddl_queue WHERE cluster = '{}' FORMAT JSONEachRow",
"SELECT * FROM system.distributed_ddl_queue WHERE cluster = '{}'
SETTINGS date_time_output_format = 'iso',
output_format_json_quote_64bit_integers = '0'
FORMAT JSONEachRow",
OXIMETER_CLUSTER
).as_str(),
"Retrieve information about distributed ddl queries (ON CLUSTER clause)
Expand Down
1 change: 1 addition & 0 deletions clickhouse-admin/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow.workspace = true
atomicwrites.workspace = true
camino.workspace = true
camino-tempfile.workspace = true
chrono.workspace = true
derive_more.workspace = true
itertools.workspace = true
omicron-common.workspace = true
Expand Down
28 changes: 15 additions & 13 deletions clickhouse-admin/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use anyhow::{bail, Context, Error, Result};
use atomicwrites::AtomicFile;
use camino::Utf8PathBuf;
use chrono::{DateTime, Utc};
use derive_more::{Add, AddAssign, Display, From};
use itertools::Itertools;
use omicron_common::api::external::Generation;
Expand Down Expand Up @@ -995,7 +996,7 @@ pub struct DistributedDdlQueue {
/// Settings used in the DDL operation
pub settings: BTreeMap<String, String>,
/// Query created time
pub query_create_time: String,
pub query_create_time: DateTime<Utc>,
/// Hostname
pub host: Ipv6Addr,
/// Host Port
Expand All @@ -1007,9 +1008,9 @@ pub struct DistributedDdlQueue {
/// Exception message
pub exception_text: String,
/// Query finish time
pub query_finish_time: String,
pub query_finish_time: DateTime<Utc>,
/// Duration of query execution (in milliseconds)
pub query_duration_ms: String,
pub query_duration_ms: u64,
}

impl DistributedDdlQueue {
Expand All @@ -1036,6 +1037,7 @@ impl DistributedDdlQueue {
mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Builder;
use chrono::{DateTime, Utc};
use slog::{o, Drain};
use slog_term::{FullFormat, PlainDecorator, TestStdoutWriter};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -1809,8 +1811,8 @@ snapshot_storage_disk=LocalSnapshotDisk
fn test_distributed_ddl_queries_parse_success() {
let log = log();
let data =
"{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22002,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
"{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01T16:16:45Z\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01T16:16:45Z\",\"query_duration_ms\":4}
{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01T16:16:45Z\",\"host\":\"::1\",\"port\":22002,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01T16:16:45Z\",\"query_duration_ms\":4}
"
.as_bytes();
let ddl = DistributedDdlQueue::parse(&log, data).unwrap();
Expand All @@ -1826,14 +1828,14 @@ snapshot_storage_disk=LocalSnapshotDisk
settings: BTreeMap::from([
("load_balancing".to_string(), "random".to_string()),
]),
query_create_time: "2024-11-01 16:16:45".to_string(),
query_create_time: "2024-11-01T16:16:45Z".parse::<DateTime::<Utc>>().unwrap(),
host: Ipv6Addr::from_str("::1").unwrap(),
port: 22001,
exception_code: 0,
exception_text: "".to_string(),
status: "Finished".to_string(),
query_finish_time: "2024-11-01 16:16:45".to_string(),
query_duration_ms: "4".to_string(),
query_finish_time: "2024-11-01T16:16:45Z".parse::<DateTime::<Utc>>().unwrap(),
query_duration_ms: 4,
},
DistributedDdlQueue{
entry: "query-0000000000".to_string(),
Expand All @@ -1845,16 +1847,16 @@ snapshot_storage_disk=LocalSnapshotDisk
settings: BTreeMap::from([
("load_balancing".to_string(), "random".to_string()),
]),
query_create_time: "2024-11-01 16:16:45".to_string(),
query_create_time: "2024-11-01T16:16:45Z".parse::<DateTime::<Utc>>().unwrap(),
host: Ipv6Addr::from_str("::1").unwrap(),
port: 22002,
exception_code: 0,
exception_text: "".to_string(),
status: "Finished".to_string(),
query_finish_time: "2024-11-01 16:16:45".to_string(),
query_duration_ms: "4".to_string(),
query_finish_time: "2024-11-01T16:16:45Z".parse::<DateTime::<Utc>>().unwrap(),
query_duration_ms: 4,
},
];
];
assert!(ddl == expected_result);
}

Expand All @@ -1872,7 +1874,7 @@ snapshot_storage_disk=LocalSnapshotDisk
fn test_misshapen_distributed_ddl_queries_parse_fail() {
let log = log();
let data =
"{\"entry\":\"query-0000000000\",\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
"{\"entry\":\"query-0000000000\",\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01T16:16:45Z\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01T16:16:45Z\",\"query_duration_ms\":4}
"
.as_bytes();
let result = DistributedDdlQueue::parse(&log, data);
Expand Down
30 changes: 0 additions & 30 deletions clients/sled-agent-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,36 +391,6 @@ impl From<omicron_common::api::internal::shared::NetworkInterfaceKind>
}
}

impl From<omicron_common::api::internal::shared::SledIdentifiers>
for types::SledIdentifiers
{
fn from(
value: omicron_common::api::internal::shared::SledIdentifiers,
) -> Self {
Self {
model: value.model,
rack_id: value.rack_id,
revision: value.revision,
serial: value.serial,
sled_id: value.sled_id,
}
}
}

impl From<types::SledIdentifiers>
for omicron_common::api::internal::shared::SledIdentifiers
{
fn from(value: types::SledIdentifiers) -> Self {
Self {
model: value.model,
rack_id: value.rack_id,
revision: value.revision,
serial: value.serial,
sled_id: value.sled_id,
}
}
}

/// Exposes additional [`Client`] interfaces for use by the test suite. These
/// are bonus endpoints, not generated in the real client.
#[async_trait]
Expand Down
15 changes: 6 additions & 9 deletions dev-tools/omdb/src/bin/omdb/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1299,10 +1299,10 @@ async fn lookup_project(
#[derive(Tabled)]
#[tabled(rename_all = "SCREAMING_SNAKE_CASE")]
struct DiskIdentity {
name: String,
id: Uuid,
size: String,
state: String,
name: String,
}

impl From<&'_ db::model::Disk> for DiskIdentity {
Expand Down Expand Up @@ -3325,17 +3325,14 @@ async fn cmd_db_instance_info(
println!("\n{:=<80}", "== ATTACHED DISKS ");

check_limit(&disks, fetch_opts.fetch_limit, ctx);
let table = if fetch_opts.include_deleted {
let mut table = if fetch_opts.include_deleted {
tabled::Table::new(disks.iter().map(MaybeDeletedDiskRow::from))
.with(tabled::settings::Style::empty())
.with(tabled::settings::Padding::new(0, 1, 0, 0))
.to_string()
} else {
tabled::Table::new(disks.iter().map(DiskRow::from))
.with(tabled::settings::Style::empty())
.with(tabled::settings::Padding::new(0, 1, 0, 0))
.to_string()
};
table
.with(tabled::settings::Style::empty())
.with(tabled::settings::Padding::new(0, 1, 0, 0));
println!("{table}");
}

Expand Down Expand Up @@ -3502,11 +3499,11 @@ struct VmmStateRow {
#[tabled(rename_all = "SCREAMING_SNAKE_CASE")]
struct CustomerInstanceRow {
id: String,
name: String,
state: String,
propolis_id: MaybePropolisId,
sled_id: MaybeSledId,
host_serial: String,
name: String,
}

/// Run `omdb db instances`: list data about customer VMs.
Expand Down
4 changes: 2 additions & 2 deletions dev-tools/omdb/tests/successes.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ EXECUTING COMMAND: omdb ["db", "disks", "list"]
termination: Exited(0)
---------------------------------------------
stdout:
NAME ID SIZE STATE ATTACHED_TO
ID SIZE STATE NAME ATTACHED_TO
---------------------------------------------
stderr:
note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=disable
Expand Down Expand Up @@ -52,7 +52,7 @@ EXECUTING COMMAND: omdb ["db", "instances"]
termination: Exited(0)
---------------------------------------------
stdout:
ID NAME STATE PROPOLIS_ID SLED_ID HOST_SERIAL
ID STATE PROPOLIS_ID SLED_ID HOST_SERIAL NAME
---------------------------------------------
stderr:
note: using database URL postgresql://root@[::1]:REDACTED_PORT/omicron?sslmode=disable
Expand Down
26 changes: 23 additions & 3 deletions nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ pub struct SchemaConfig {
/// Optional configuration for the timeseries database.
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub struct TimeseriesDbConfig {
/// The HTTP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub address: Option<SocketAddr>,
/// The native TCP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub native_address: Option<SocketAddr>,
}

/// Configuration for the `Dendrite` dataplane daemon.
Expand Down Expand Up @@ -774,7 +778,9 @@ impl std::fmt::Display for SchemeName {
mod test {
use super::*;

use omicron_common::address::{Ipv6Subnet, RACK_PREFIX};
use omicron_common::address::{
Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX,
};
use omicron_common::api::internal::shared::SwitchLocation;

use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -784,7 +790,7 @@ mod test {
use dropshot::ConfigLoggingLevel;
use std::collections::HashMap;
use std::fs;
use std::net::{Ipv6Addr, SocketAddr};
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -889,6 +895,7 @@ mod test {
if_exists = "fail"
[timeseries_db]
address = "[::1]:8123"
native_address = "[::1]:9000"
[updates]
trusted_root = "/path/to/root.json"
[tunables]
Expand Down Expand Up @@ -1007,7 +1014,20 @@ mod test {
path: "/nonexistent/path".into()
},
timeseries_db: TimeseriesDbConfig {
address: Some("[::1]:8123".parse().unwrap())
address: Some(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_HTTP_PORT,
0,
0,
))),
native_address: Some(SocketAddr::V6(
SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_TCP_PORT,
0,
0,
)
)),
},
updates: Some(UpdatesConfig {
trusted_root: Utf8PathBuf::from("/path/to/root.json"),
Expand Down
6 changes: 4 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl super::Nexus {
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down Expand Up @@ -150,7 +151,8 @@ impl super::Nexus {
result.tables
})
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down
33 changes: 25 additions & 8 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::address::DENDRITE_PORT;
use omicron_common::address::MGD_PORT;
Expand Down Expand Up @@ -411,13 +412,12 @@ impl Nexus {
.map_err(|e| e.to_string())?;

// Client to the ClickHouse database.
let timeseries_client =
if let Some(http_address) = &config.pkg.timeseries_db.address {
let native_address =
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT);
oximeter_db::Client::new(*http_address, native_address, &log)
} else {
// TODO-cleanup: Remove this when we remove the HTTP client.
// TODO-cleanup: Simplify this when we remove the HTTP client.
let timeseries_client = match (
&config.pkg.timeseries_db.address,
&config.pkg.timeseries_db.native_address,
) {
(None, None) => {
let http_resolver =
qorb_resolver.for_service(ServiceName::Clickhouse);
let native_resolver =
Expand All @@ -427,7 +427,24 @@ impl Nexus {
native_resolver,
&log,
)
};
}
(maybe_http, maybe_native) => {
let (http_address, native_address) =
match (maybe_http, maybe_native) {
(None, None) => unreachable!("handled above"),
(None, Some(native)) => (
SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT),
*native,
),
(Some(http), None) => (
*http,
SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT),
),
(Some(http), Some(native)) => (*http, *native),
};
oximeter_db::Client::new(http_address, native_address, &log)
}
};

// TODO-cleanup We may want to make the populator a first-class
// background task.
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer(

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
Expand Down
Loading

0 comments on commit 0fb92ed

Please sign in to comment.