Skip to content

Commit

Permalink
Minimize test replicated schema and add test
Browse files Browse the repository at this point in the history
Test covers that the schema files for a replicated setup are disjoint.
  • Loading branch information
andrewjstone committed Jul 31, 2024
1 parent 29bd934 commit c5e293f
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 57 deletions.
76 changes: 20 additions & 56 deletions oximeter/db/schema/replicated/db-init-1.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
/* We purposefully split our DB schema into two *disjoint* files:
* `db-init-1.sql` and `db-init-2.sql`. The purpose of this split is to shorten
* the duration of our replicated tests. These tests only use a subset of the
* tables defined in the full schema. We put the tables used by the replicated
* tests in `db-init-1.sql`, and the remainder of the tables in `db-init-2.sql`.
* This minimizes test time by reducing the cost to load a schema. In
* production, we load `db-init-1.sql` followed by `db-init-2.sql` so we have
* the full schema. If we end up needing to use more tables in replicated tests
* we can go ahead and move them into `db-init-1.sql`, removing them from
* `db-init-2.sql`. Conversely, if we stop using given tables in our tests we
* can move them from `db-init-1.sql` into `db-init-2.sql` and keep our test
* times minimal.
* The reason to keep the two files disjoint is so that we don't have to
* maintain consistency between table definitions. All tables are defined
* once. However, in order to write a test that ensures the tables are in fact
* disjoint, we must create the `oximeter` database in both files so we can load
* them in isolation.
*/

CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster;

/* The version table contains metadata about the `oximeter` database */
Expand All @@ -19,35 +39,6 @@ ORDER BY (value, timestamp);
* This reflects that one usually looks up the _key_ in one or more field table,
* and then uses that to index quickly into the measurements tables.
*/
CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
timestamp DateTime64(9, 'UTC'),
datum Nullable(Int64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}')
ORDER BY (timeseries_name, timeseries_key, timestamp)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;

CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster
AS oximeter.measurements_i64_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
timestamp DateTime64(9, 'UTC'),
datum Nullable(Float64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}')
ORDER BY (timeseries_name, timeseries_key, timestamp)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;

CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster
AS oximeter.measurements_f64_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_cumulativef64_local ON CLUSTER oximeter_cluster
(
Expand Down Expand Up @@ -96,33 +87,6 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster
AS oximeter.fields_i64_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_i64_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
field_name String,
field_value IPv6
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/fields_ipaddr_local', '{replica}')
ORDER BY (timeseries_name, field_name, field_value, timeseries_key);

CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster
AS oximeter.fields_ipaddr_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_ipaddr_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_string_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
field_name String,
field_value String
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/fields_string_local', '{replica}')
ORDER BY (timeseries_name, field_name, field_value, timeseries_key);

CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster
AS oximeter.fields_string_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_string_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_uuid_local ON CLUSTER oximeter_cluster
(
Expand Down
80 changes: 80 additions & 0 deletions oximeter/db/schema/replicated/db-init-2.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
/* We purposefully split our DB schema into two *disjoint* files:
* `db-init-1.sql` and `db-init-2.sql`. The purpose of this split is to shorten
* the duration of our replicated tests. These tests only use a subset of the
* tables defined in the full schema. We put the tables used by the replicated
* tests in `db-init-1.sql`, and the remainder of the tables in `db-init-2.sql`.
* This minimizes test time by reducing the cost to load a schema. In
* production, we load `db-init-1.sql` followed by `db-init-2.sql` so we have
* the full schema. If we end up needing to use more tables in replicated tests
* we can go ahead and move them into `db-init-1.sql`, removing them from
* `db-init-2.sql`. Conversely, if we stop using given tables in our tests we
* can move them from `db-init-1.sql` into `db-init-2.sql` and keep our test
* times minimal.
* The reason to keep the two files disjoint is so that we don't have to
* maintain consistency between table definitions. All tables are defined
* once. However, in order to write a test that ensures the tables are in fact
* disjoint, we must create the `oximeter` database in both files so we can load
* them in isolation.
*/

CREATE DATABASE IF NOT EXISTS oximeter ON CLUSTER oximeter_cluster;

/* The measurement tables contain all individual samples from each timeseries.
*
* Each table stores a single datum type, and otherwise contains nearly the same
Expand Down Expand Up @@ -113,6 +135,21 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u32 ON CLUSTER oximeter_cluster
AS oximeter.measurements_u32_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u32_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_i64_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
timestamp DateTime64(9, 'UTC'),
datum Nullable(Int64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_i64_local', '{replica}')
ORDER BY (timeseries_name, timeseries_key, timestamp)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;

CREATE TABLE IF NOT EXISTS oximeter.measurements_i64 ON CLUSTER oximeter_cluster
AS oximeter.measurements_i64_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_i64_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_u64_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
Expand Down Expand Up @@ -143,6 +180,21 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ON CLUSTER oximeter_cluster
AS oximeter.measurements_f32_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f32_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
timestamp DateTime64(9, 'UTC'),
datum Nullable(Float64)
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f64_local', '{replica}')
ORDER BY (timeseries_name, timeseries_key, timestamp)
TTL toDateTime(timestamp) + INTERVAL 30 DAY;

CREATE TABLE IF NOT EXISTS oximeter.measurements_f64 ON CLUSTER oximeter_cluster
AS oximeter.measurements_f64_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f64_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.measurements_string_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
Expand Down Expand Up @@ -552,6 +604,34 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster
AS oximeter.fields_bool_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_bool_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
field_name String,
field_value IPv6
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/fields_ipaddr_local', '{replica}')
ORDER BY (timeseries_name, field_name, field_value, timeseries_key);

CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster
AS oximeter.fields_ipaddr_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_ipaddr_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_string_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
timeseries_key UInt64,
field_name String,
field_value String
)
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/fields_string_local', '{replica}')
ORDER BY (timeseries_name, field_name, field_value, timeseries_key);

CREATE TABLE IF NOT EXISTS oximeter.fields_string ON CLUSTER oximeter_cluster
AS oximeter.fields_string_local
ENGINE = Distributed('oximeter_cluster', 'oximeter', 'fields_string_local', xxHash64(splitByChar(':', timeseries_name)[1]));

CREATE TABLE IF NOT EXISTS oximeter.fields_i8_local ON CLUSTER oximeter_cluster
(
timeseries_name String,
Expand Down
32 changes: 31 additions & 1 deletion oximeter/db/src/client/dbwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use crate::client::Client;
use crate::model;
use crate::Error;
use camino::Utf8PathBuf;
use oximeter::Sample;
use oximeter::TimeseriesName;
use slog::debug;
Expand Down Expand Up @@ -112,8 +113,15 @@ impl DbWrite for Client {
/// testing
#[async_trait::async_trait]
pub trait TestDbWrite {
/// Initialize the replicated telemetry database, creating a subset of tables.
/// Initialize the replicated telemetry database, creating a subset of tables as described
/// in `db-init-1.sql`.
async fn init_test_minimal_replicated_db(&self) -> Result<(), Error>;

/// Initialize the replicated telemetry database with the given file id.
async fn init_replicated_db_from_file(
&self,
id: usize,
) -> Result<(), Error>;
}

#[async_trait::async_trait]
Expand All @@ -129,6 +137,28 @@ impl TestDbWrite for Client {
)))
.await
}

async fn init_replicated_db_from_file(
&self,
id: usize,
) -> Result<(), Error> {
let file = format!("db-init-{id}.sql");
debug!(self.log, "initializing ClickHouse database via {file}");
let path: Utf8PathBuf =
[env!("CARGO_MANIFEST_DIR"), "schema/replicated/", &file]
.into_iter()
.collect();
let sql = tokio::fs::read_to_string(&path).await.map_err(|err| {
Error::ReadSqlFile {
context: format!(
"Reading SQL file '{}' for test initialization",
path,
),
err,
}
})?;
self.run_many_sql_statements(sql).await
}
}

impl Client {
Expand Down
71 changes: 71 additions & 0 deletions oximeter/db/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use omicron_test_utils::dev::test_setup_log;
use oximeter::test_util;
use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite};
use slog::{info, Logger};
use std::collections::BTreeSet;
use std::default::Default;
use std::time::Duration;

Expand All @@ -36,6 +37,76 @@ impl TestInput {
}
}

/// Ensure `db-init-1.sql` and `db-init-2.sql` contain disjoint sets of tables.
#[tokio::test]
async fn test_schemas_disjoint() -> anyhow::Result<()> {
let request_timeout = Duration::from_secs(15);
let logctx = test_setup_log("test_schemas_disjoint");
let log = &logctx.log;

let (parent_dir, prefix) = log_prefix_for_test(logctx.test_name());
let path = parent_dir.join(format!("{prefix}-oximeter-clickward-test"));
std::fs::create_dir(&path)?;

let mut deployment = Deployment::new_with_default_port_config(
path.clone(),
"oximeter_cluster".to_string(),
);

// We are not testing replication here. We are just testing that the tables
// loaded by each sql file are not loaded by the other sql file.
let num_keepers = 1;
let num_replicas = 1;
deployment
.generate_config(num_keepers, num_replicas)
.context("failed to generate config")?;
deployment.deploy().context("failed to deploy")?;

let client1 = Client::new_with_request_timeout(
deployment.http_addr(1.into())?,
log,
request_timeout,
);

wait_for_ping(&client1).await?;
wait_for_keepers(&deployment, vec![KeeperId(1)]).await?;

// Load all the tables inserted by `db-init-1.sql`
client1.init_replicated_db_from_file(1).await?;
let tables1 = client1
.list_replicated_tables()
.await
.expect("failed to read replicated tables");

// Drop the database so that we can insert the tables from `db-init-2.sql`
client1.wipe_replicated_db().await.expect("failed to wipe db");

// Load all the tables inserted by `db-init-2.sql`
client1.init_replicated_db_from_file(2).await?;
let tables2: BTreeSet<_> = client1
.list_replicated_tables()
.await
.expect("failed to read replicated tables")
.into_iter()
.collect();

// Ensure that tables1 and tables2 are disjoint
for table in &tables1 {
assert!(
!tables2.contains(table),
"table `{}` exists in both `db-init-1.sql` and `db-init-2.sql`",
table
);
}

info!(log, "Cleaning up test");
deployment.teardown()?;
std::fs::remove_dir_all(path)?;
logctx.cleanup_successful();

Ok(())
}

#[tokio::test]
async fn test_cluster() -> anyhow::Result<()> {
usdt::register_probes().unwrap();
Expand Down

0 comments on commit c5e293f

Please sign in to comment.