Skip to content

Commit

Permalink
[clickhouse] Retrieve distributed ddl queue info (#6986)
Browse files Browse the repository at this point in the history
## Overview

This commit implements a new clickhouse-admin endpoint to retrieve and
parse information from
`[system.distributed_ddl_queue](https://clickhouse.com/docs/en/operations/system-tables/distributed_ddl_queue)`.

## Purpose

As part of [stage 1](https://rfd.shared.oxide.computer/rfd/0468) of
rolling out replicated ClickHouse, we'll be needing to monitor the
installed but not visible replicated cluster on our dogfood rack. This
endpoint will aid in said monitoring by providing information about
distributed ddl queries.

## Testing

```console
$ cargo run --bin=clickhouse-admin-server -- run -c ./smf/clickhouse-admin-server/config.toml -a [::1]:8888 -l [::1]:22001 -b /Users/karcar/src/omicron/out/clickhouse/clickhouse
   Compiling omicron-clickhouse-admin v0.1.0 (/Users/karcar/src/omicron/clickhouse-admin)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 3.79s
     Running `target/debug/clickhouse-admin-server run -c ./smf/clickhouse-admin-server/config.toml -a '[::1]:8888' -l '[::1]:22001' -b /Users/karcar/src/omicron/out/clickhouse/clickhouse`
note: configured to log to "/dev/stdout"
{"msg":"listening","v":0,"name":"clickhouse-admin-server","level":30,"time":"2024-11-04T07:29:41.746887Z","hostname":"ixchel","pid":55286,"local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.12.0/src/server.rs:197"}
{"msg":"accepted connection","v":0,"name":"clickhouse-admin-server","level":30,"time":"2024-11-04T07:29:55.674044Z","hostname":"ixchel","pid":55286,"local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.12.0/src/server.rs:1105","remote_addr":"[::1]:59735"}
{"msg":"Retrieved data from `system.distributed_ddl_queue`","v":0,"name":"clickhouse-admin-server","level":30,"time":"2024-11-04T07:29:56.148241Z","hostname":"ixchel","pid":55286,"component":"ClickhouseCli","file":"clickhouse-admin/types/src/lib.rs:1018","output":"\"{\\\"entry\\\":\\\"query-0000000001\\\",\\\"entry_version\\\":5,\\\"initiator_host\\\":\\\"ixchel\\\",\\\"initiator_port\\\":22001,\\\"cluster\\\":\\\"oximeter_cluster\\\",\\\"query\\\":\\\"CREATE DATABASE IF NOT EXISTS db1 UUID '701a3dd3-10f0-4f5d-b5b2-0ad11bcf2b17' ON CLUSTER oximeter_cluster\\\",\\\"settings\\\":{\\\"load_balancing\\\":\\\"random\\\"},\\\"query_create_time\\\":\\\"2024-11-01 16:17:08\\\",\\\"host\\\":\\\"::1\\\",\\\"port\\\":22001,\\\"status\\\":\\\"Finished\\\",\\\"exception_code\\\":0,\\\"exception_text\\\":\\\"\\\",\\\"query_finish_time\\\":\\\"2024-11-01 16:17:08\\\",\\\"query_duration_ms\\\":\\\"3\\\"}\\n{\\\"entry\\\":\\\"query-0000000001\\\",\\\"entry_version\\\":5,\\\"initiator_host\\\":\\\"ixchel\\\",\\\"initiator_port\\\":22001,\\\"cluster\\\":\\\"oximeter_cluster\\\",\\\"query\\\":\\\"CREATE DATABASE IF NOT EXISTS db1 UUID '701a3dd3-10f0-4f5d-b5b2-0ad11bcf2b17' ON CLUSTER oximeter_cluster\\\",\\\"settings\\\":{\\\"load_balancing\\\":\\\"random\\\"},\\\"query_create_time\\\":\\\"2024-11-01 16:17:08\\\",\\\"host\\\":\\\"::1\\\",\\\"port\\\":22002,\\\"status\\\":\\\"Finished\\\",\\\"exception_code\\\":0,\\\"exception_text\\\":\\\"\\\",\\\"query_finish_time\\\":\\\"2024-11-01 16:17:08\\\",\\\"query_duration_ms\\\":\\\"4\\\"}\\n{\\\"entry\\\":\\\"query-0000000000\\\",\\\"entry_version\\\":5,\\\"initiator_host\\\":\\\"ixchel\\\",\\\"initiator_port\\\":22001,\\\"cluster\\\":\\\"oximeter_cluster\\\",\\\"query\\\":\\\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\\\",\\\"settings\\\":{\\\"load_balancing\\\":\\\"random\\\",\\\"output_format_json_quote_64bit_integers\\\":\\\"0\\\"},\\\"query_create_time\\\":\\\"2024-11-01 16:16:45\\\",\\\"host\\\":\\\"::1\\\",\\\"port\\\":22002,\\\"status\\\":\\\"Finished\\\",\\\"exception_code\\\":0,\\\"exception_text\\\":\\\"\\\",\\\"query_finish_time\\\":\\\"2024-11-01 16:16:45\\\",\\\"query_duration_ms\\\":\\\"4\\\"}\\n{\\\"entry\\\":\\\"query-0000000000\\\",\\\"entry_version\\\":5,\\\"initiator_host\\\":\\\"ixchel\\\",\\\"initiator_port\\\":22001,\\\"cluster\\\":\\\"oximeter_cluster\\\",\\\"query\\\":\\\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\\\",\\\"settings\\\":{\\\"load_balancing\\\":\\\"random\\\",\\\"output_format_json_quote_64bit_integers\\\":\\\"0\\\"},\\\"query_create_time\\\":\\\"2024-11-01 16:16:45\\\",\\\"host\\\":\\\"::1\\\",\\\"port\\\":22001,\\\"status\\\":\\\"Finished\\\",\\\"exception_code\\\":0,\\\"exception_text\\\":\\\"\\\",\\\"query_finish_time\\\":\\\"2024-11-01 16:16:45\\\",\\\"query_duration_ms\\\":\\\"4\\\"}\\n\""}
{"msg":"request completed","v":0,"name":"clickhouse-admin-server","level":30,"time":"2024-11-04T07:29:56.148459Z","hostname":"ixchel","pid":55286,"uri":"/distributed-ddl-queue","method":"GET","req_id":"fb46b182-2573-4daa-a791-118dad20a3c3","remote_addr":"[::1]:59735","local_addr":"[::1]:8888","component":"dropshot","file":"/Users/karcar/.cargo/registry/src/index.crates.io-6f17d22bba15001f/dropshot-0.12.0/src/server.rs:950","latency_us":473916,"response_code":"200"}
```

```console
$ curl http://[::1]:8888/distributed-ddl-queue
[{"entry":"query-0000000001","entry_version":5,"initiator_host":"ixchel","initiator_port":22001,"cluster":"oximeter_cluster","query":"CREATE DATABASE IF NOT EXISTS db1 UUID '701a3dd3-10f0-4f5d-b5b2-0ad11bcf2b17' ON CLUSTER oximeter_cluster","settings":{"load_balancing":"random"},"query_create_time":"2024-11-01 16:17:08","host":"::1","port":22001,"status":"Finished","exception_code":0,"exception_text":"","query_finish_time":"2024-11-01 16:17:08","query_duration_ms":"3"},{"entry":"query-0000000001","entry_version":5,"initiator_host":"ixchel","initiator_port":22001,"cluster":"oximeter_cluster","query":"CREATE DATABASE IF NOT EXISTS db1 UUID '701a3dd3-10f0-4f5d-b5b2-0ad11bcf2b17' ON CLUSTER oximeter_cluster","settings":{"load_balancing":"random"},"query_create_time":"2024-11-01 16:17:08","host":"::1","port":22002,"status":"Finished","exception_code":0,"exception_text":"","query_finish_time":"2024-11-01 16:17:08","query_duration_ms":"4"},{"entry":"query-0000000000","entry_version":5,"initiator_host":"ixchel","initiator_port":22001,"cluster":"oximeter_cluster","query":"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster","settings":{"load_balancing":"random","output_format_json_quote_64bit_integers":"0"},"query_create_time":"2024-11-01 16:16:45","host":"::1","port":22002,"status":"Finished","exception_code":0,"exception_text":"","query_finish_time":"2024-11-01 16:16:45","query_duration_ms":"4"},{"entry":"query-0000000000","entry_version":5,"initiator_host":"ixchel","initiator_port":22001,"cluster":"oximeter_cluster","query":"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster","settings":{"load_balancing":"random","output_format_json_quote_64bit_integers":"0"},"query_create_time":"2024-11-01 16:16:45","host":"::1","port":22001,"status":"Finished","exception_code":0,"exception_text":"","query_finish_time":"2024-11-01 16:16:45","query_duration_ms":"4"}]
```

### Caveat

I have purposely not written an integration test as the port situation
is getting out of hand 😅 I'm working on a better implementation of
running these integration tests that involves less hardcoded ports. I
will include an integration test for this endpoint in that PR

Related: #6953
  • Loading branch information
karencfv authored Nov 6, 2024
1 parent 15b4c45 commit 76bff3f
Show file tree
Hide file tree
Showing 6 changed files with 344 additions and 14 deletions.
14 changes: 12 additions & 2 deletions clickhouse-admin/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, KeeperConf, KeeperConfig,
KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
};
use dropshot::{
Expand Down Expand Up @@ -105,4 +105,14 @@ pub trait ClickhouseAdminServerApi {
rqctx: RequestContext<Self::Context>,
body: TypedBody<ServerConfigurableSettings>,
) -> Result<HttpResponseCreated<ReplicaConfig>, HttpError>;

/// Contains information about distributed ddl queries (ON CLUSTER clause)
/// that were executed on a cluster.
#[endpoint {
method = GET,
path = "/distributed-ddl-queue",
}]
async fn distributed_ddl_queue(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Vec<DistributedDdlQueue>>, HttpError>;
}
50 changes: 44 additions & 6 deletions clickhouse-admin/src/clickhouse_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
use anyhow::Result;
use camino::Utf8PathBuf;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, KeeperConf, KeeperId, Lgif, RaftConfig,
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperId, Lgif, RaftConfig, OXIMETER_CLUSTER,
};
use dropshot::HttpError;
use illumos_utils::{output_to_exec_error, ExecutionError};
use slog::Logger;
use slog_error_chain::{InlineErrorChain, SlogInlineError};
use std::collections::BTreeSet;
use std::ffi::OsStr;
use std::fmt::Display;
use std::io;
use std::net::SocketAddrV6;
use tokio::process::Command;
Expand Down Expand Up @@ -56,6 +58,21 @@ impl From<ClickhouseCliError> for HttpError {
}
}

enum ClickhouseClientType {
Server,
Keeper,
}

impl Display for ClickhouseClientType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
ClickhouseClientType::Server => "client",
ClickhouseClientType::Keeper => "keeper-client",
};
write!(f, "{s}")
}
}

#[derive(Debug)]
pub struct ClickhouseCli {
/// Path to where the clickhouse binary is located
Expand All @@ -76,7 +93,8 @@ impl ClickhouseCli {
}

pub async fn lgif(&self) -> Result<Lgif, ClickhouseCliError> {
self.keeper_client_non_interactive(
self.client_non_interactive(
ClickhouseClientType::Keeper,
"lgif",
"Retrieve logically grouped information file",
Lgif::parse,
Expand All @@ -86,7 +104,8 @@ impl ClickhouseCli {
}

pub async fn raft_config(&self) -> Result<RaftConfig, ClickhouseCliError> {
self.keeper_client_non_interactive(
self.client_non_interactive(
ClickhouseClientType::Keeper,
"get /keeper/config",
"Retrieve raft configuration information",
RaftConfig::parse,
Expand All @@ -96,7 +115,8 @@ impl ClickhouseCli {
}

pub async fn keeper_conf(&self) -> Result<KeeperConf, ClickhouseCliError> {
self.keeper_client_non_interactive(
self.client_non_interactive(
ClickhouseClientType::Keeper,
"conf",
"Retrieve keeper node configuration information",
KeeperConf::parse,
Expand All @@ -121,8 +141,26 @@ impl ClickhouseCli {
})
}

async fn keeper_client_non_interactive<F, T>(
pub async fn distributed_ddl_queue(
&self,
) -> Result<Vec<DistributedDdlQueue>, ClickhouseCliError> {
self.client_non_interactive(
ClickhouseClientType::Server,
format!(
"SELECT * FROM system.distributed_ddl_queue WHERE cluster = '{}' FORMAT JSONEachRow",
OXIMETER_CLUSTER
).as_str(),
"Retrieve information about distributed ddl queries (ON CLUSTER clause)
that were executed on a cluster",
DistributedDdlQueue::parse,
self.log.clone().unwrap(),
)
.await
}

async fn client_non_interactive<F, T>(
&self,
client: ClickhouseClientType,
query: &str,
subcommand_description: &'static str,
parse: F,
Expand All @@ -133,7 +171,7 @@ impl ClickhouseCli {
{
let mut command = Command::new(&self.binary_path);
command
.arg("keeper-client")
.arg(client.to_string())
.arg("--host")
.arg(&format!("[{}]", self.listen_address.ip()))
.arg("--port")
Expand Down
12 changes: 10 additions & 2 deletions clickhouse-admin/src/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use crate::context::ServerContext;
use clickhouse_admin_api::*;
use clickhouse_admin_types::{
ClickhouseKeeperClusterMembership, KeeperConf, KeeperConfig,
KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ClickhouseKeeperClusterMembership, DistributedDdlQueue, KeeperConf,
KeeperConfig, KeeperConfigurableSettings, Lgif, RaftConfig, ReplicaConfig,
ServerConfigurableSettings,
};
use dropshot::{
Expand Down Expand Up @@ -47,6 +47,14 @@ impl ClickhouseAdminServerApi for ClickhouseAdminServerImpl {

Ok(HttpResponseCreated(output))
}

async fn distributed_ddl_queue(
rqctx: RequestContext<Self::Context>,
) -> Result<HttpResponseOk<Vec<DistributedDdlQueue>>, HttpError> {
let ctx = rqctx.context();
let output = ctx.clickhouse_cli().distributed_ddl_queue().await?;
Ok(HttpResponseOk(output))
}
}

enum ClickhouseAdminKeeperImpl {}
Expand Down
157 changes: 153 additions & 4 deletions clickhouse-admin/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use schemars::{
};
use serde::{Deserialize, Serialize};
use slog::{info, Logger};
use std::collections::BTreeSet;
use std::collections::{BTreeMap, BTreeSet};
use std::fs::create_dir;
use std::io::{ErrorKind, Write};
use std::net::Ipv6Addr;
Expand Down Expand Up @@ -965,19 +965,87 @@ pub struct ClickhouseKeeperClusterMembership {
pub raft_config: BTreeSet<KeeperId>,
}

#[derive(
Clone,
Debug,
PartialEq,
Eq,
PartialOrd,
Ord,
Deserialize,
Serialize,
JsonSchema,
)]
#[serde(rename_all = "snake_case")]
/// Contains information about distributed ddl queries (ON CLUSTER clause) that were
/// executed on a cluster.
pub struct DistributedDdlQueue {
/// Query id
pub entry: String,
/// Version of the entry
pub entry_version: u64,
/// Host that initiated the DDL operation
pub initiator_host: String,
/// Port used by the initiator
pub initiator_port: u16,
/// Cluster name
pub cluster: String,
/// Query executed
pub query: String,
/// Settings used in the DDL operation
pub settings: BTreeMap<String, String>,
/// Query created time
pub query_create_time: String,
/// Hostname
pub host: Ipv6Addr,
/// Host Port
pub port: u16,
/// Status of the query
pub status: String,
/// Exception code
pub exception_code: u64,
/// Exception message
pub exception_text: String,
/// Query finish time
pub query_finish_time: String,
/// Duration of query execution (in milliseconds)
pub query_duration_ms: String,
}

impl DistributedDdlQueue {
pub fn parse(log: &Logger, data: &[u8]) -> Result<Vec<Self>> {
let s = String::from_utf8_lossy(data);
info!(
log,
"Retrieved data from `system.distributed_ddl_queue`";
"output" => ?s
);

let mut ddl = vec![];

for line in s.lines() {
let item: DistributedDdlQueue = serde_json::from_str(line)?;
ddl.push(item);
}

Ok(ddl)
}
}

#[cfg(test)]
mod tests {
use camino::Utf8PathBuf;
use camino_tempfile::Builder;
use slog::{o, Drain};
use slog_term::{FullFormat, PlainDecorator, TestStdoutWriter};
use std::collections::BTreeMap;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use crate::{
ClickhouseHost, KeeperConf, KeeperId, KeeperServerInfo,
KeeperServerType, KeeperSettings, Lgif, LogLevel, RaftConfig,
RaftServerSettings, ServerId, ServerSettings,
ClickhouseHost, DistributedDdlQueue, KeeperConf, KeeperId,
KeeperServerInfo, KeeperServerType, KeeperSettings, Lgif, LogLevel,
RaftConfig, RaftServerSettings, ServerId, ServerSettings,
};

fn log() -> slog::Logger {
Expand Down Expand Up @@ -1736,4 +1804,85 @@ snapshot_storage_disk=LocalSnapshotDisk
"Extracted key `\"session_timeout_fake\"` from output differs from expected key `session_timeout_ms`"
);
}

#[test]
fn test_distributed_ddl_queries_parse_success() {
let log = log();
let data =
"{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
{\"entry\":\"query-0000000000\",\"entry_version\":5,\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22002,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
"
.as_bytes();
let ddl = DistributedDdlQueue::parse(&log, data).unwrap();

let expected_result = vec![
DistributedDdlQueue{
entry: "query-0000000000".to_string(),
entry_version: 5,
initiator_host: "ixchel".to_string(),
initiator_port: 22001,
cluster: "oximeter_cluster".to_string(),
query: "CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster".to_string(),
settings: BTreeMap::from([
("load_balancing".to_string(), "random".to_string()),
]),
query_create_time: "2024-11-01 16:16:45".to_string(),
host: Ipv6Addr::from_str("::1").unwrap(),
port: 22001,
exception_code: 0,
exception_text: "".to_string(),
status: "Finished".to_string(),
query_finish_time: "2024-11-01 16:16:45".to_string(),
query_duration_ms: "4".to_string(),
},
DistributedDdlQueue{
entry: "query-0000000000".to_string(),
entry_version: 5,
initiator_host: "ixchel".to_string(),
initiator_port: 22001,
cluster: "oximeter_cluster".to_string(),
query: "CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster".to_string(),
settings: BTreeMap::from([
("load_balancing".to_string(), "random".to_string()),
]),
query_create_time: "2024-11-01 16:16:45".to_string(),
host: Ipv6Addr::from_str("::1").unwrap(),
port: 22002,
exception_code: 0,
exception_text: "".to_string(),
status: "Finished".to_string(),
query_finish_time: "2024-11-01 16:16:45".to_string(),
query_duration_ms: "4".to_string(),
},
];
assert!(ddl == expected_result);
}

#[test]
fn test_empty_distributed_ddl_queries_parse_success() {
let log = log();
let data = "".as_bytes();
let ddl = DistributedDdlQueue::parse(&log, data).unwrap();

let expected_result = vec![];
assert!(ddl == expected_result);
}

#[test]
fn test_misshapen_distributed_ddl_queries_parse_fail() {
let log = log();
let data =
"{\"entry\":\"query-0000000000\",\"initiator_host\":\"ixchel\",\"initiator_port\":22001,\"cluster\":\"oximeter_cluster\",\"query\":\"CREATE DATABASE IF NOT EXISTS db1 UUID 'a49757e4-179e-42bd-866f-93ac43136e2d' ON CLUSTER oximeter_cluster\",\"settings\":{\"load_balancing\":\"random\"},\"query_create_time\":\"2024-11-01 16:16:45\",\"host\":\"::1\",\"port\":22001,\"status\":\"Finished\",\"exception_code\":0,\"exception_text\":\"\",\"query_finish_time\":\"2024-11-01 16:16:45\",\"query_duration_ms\":\"4\"}
"
.as_bytes();
let result = DistributedDdlQueue::parse(&log, data);

let error = result.unwrap_err();
let root_cause = error.root_cause();

assert_eq!(
format!("{}", root_cause),
"missing field `entry_version` at line 1 column 454",
);
}
}
1 change: 1 addition & 0 deletions clients/clickhouse-admin-server-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

//! Interface for making API requests to a clickhouse-admin-server server
//! running in an omicron zone.
use std::clone::Clone;

progenitor::generate_api!(
spec = "../../openapi/clickhouse-admin-server.json",
Expand Down
Loading

0 comments on commit 76bff3f

Please sign in to comment.