From 1cf6a0f49d8a10ca15d6e0b1529d1c1606ad5398 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Karen=20C=C3=A1rcamo?= Date: Wed, 1 Nov 2023 10:04:49 +1300 Subject: [PATCH] [oximeter] Port all single node ClickHouse tests to a replicated set up (#4372) This is a follow up to https://github.com/oxidecomputer/omicron/pull/4149 . In this commit all single node tests are now run against a clustered set up. Additionally, a few bugs on the replicated init SQL file have been fixed. Note: All of the individual field and measurement tests all run under a single test now. The reason for this is test times. Each test needs to wipe out it's database before the next test runs. In a replicated installation, this means that all nodes must sync, which takes time. Before I merged all the field/measurement tests into a single one, the testing time for a replicated set up took about 10 minutes, which really is too much. Now it takes ~3.5 minutes. It is still a lot, but only a temporary measure until we can run these tests in parallel again. An unintended bonus of this approach is that running the tests this way means that you are testing that the data is being inserted in the correct table. Related: https://github.com/oxidecomputer/omicron/issues/4148 Related: https://github.com/oxidecomputer/omicron/issues/4001 --- oximeter/db/src/client.rs | 1101 +++++++++++++++--------- oximeter/db/src/db-replicated-init.sql | 90 ++ oximeter/db/src/db-wipe-replicated.sql | 2 +- test-utils/src/dev/clickhouse.rs | 40 +- 4 files changed, 793 insertions(+), 440 deletions(-) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 92b9ed96bd..69e91f888a 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -838,12 +838,46 @@ mod tests { use oximeter::FieldValue; use oximeter::Metric; use oximeter::Target; - use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::time::Duration; use tokio::time::sleep; use uuid::Uuid; + pub enum InstallationType { + Cluster, + SingleNode, + } + + impl InstallationType { + pub async fn init_db(&self, client: &Client) -> Result<(), Error> { + match *self { + InstallationType::SingleNode => client + .init_single_node_db() + .await + .expect("Failed to initialize timeseries database"), + InstallationType::Cluster => client + .init_replicated_db() + .await + .expect("Failed to initialize timeseries database"), + } + Ok(()) + } + + pub async fn wipe_db(&self, client: &Client) -> Result<(), Error> { + match *self { + InstallationType::SingleNode => client + .wipe_single_node_db() + .await + .expect("Failed to remove timeseries database"), + InstallationType::Cluster => client + .wipe_replicated_db() + .await + .expect("Failed to remove timeseries database"), + } + Ok(()) + } + } + // NOTE: Each test requires a clean slate. Because of this, tests run sequentially. // // This is at least partially because ClickHouse by default provides pretty weak consistency @@ -854,198 +888,524 @@ mod tests { #[tokio::test] async fn test_single_node() { - let logctx = test_setup_log("test_single_node"); - let log = &logctx.log; - // Let the OS assign a port and discover it after ClickHouse starts let mut db = ClickHouseInstance::new_single_node(0) .await .expect("Failed to start ClickHouse"); - let address = SocketAddr::new("::1".parse().unwrap(), db.port()); - // Test bad database connection - let client = Client::new("127.0.0.1:443".parse().unwrap(), &log); - assert!(matches!( - client.ping().await, - Err(Error::DatabaseUnavailable(_)) - )); + // Tests that the expected error is returned on a wrong address + bad_db_connection_test().await.unwrap(); // Tests that a new client has started and it is not part of a cluster - is_not_oximeter_cluster_test(address).await.unwrap(); + is_oximeter_cluster_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests that data can be inserted via the client - insert_samples_test(address).await.unwrap(); + insert_samples_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests for a schema mismatch - schema_mismatch_test(address).await.unwrap(); + schema_mismatch_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests for a schema update - schema_updated_test(address).await.unwrap(); + schema_updated_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests for specific timeseries selection - client_select_timeseries_one_test(address).await.unwrap(); + client_select_timeseries_one_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests for specific timeseries selection - field_record_count_test(address).await.unwrap(); + field_record_count_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // ClickHouse regression test - unquoted_64bit_integers_test(address).await.unwrap(); + unquoted_64bit_integers_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests to verify that we can distinguish between metrics by name - differentiate_by_timeseries_name_test(address).await.unwrap(); + differentiate_by_timeseries_name_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests selecting a single timeseries - select_timeseries_with_select_one_test(address).await.unwrap(); + select_timeseries_with_select_one_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests selecting two timeseries select_timeseries_with_select_one_field_with_multiple_values_test( - address, + db.address, + InstallationType::SingleNode, ) .await .unwrap(); // Tests selecting multiple timeseries - select_timeseries_with_select_multiple_fields_with_multiple_values_test(address).await.unwrap(); + select_timeseries_with_select_multiple_fields_with_multiple_values_test(db.address, InstallationType::SingleNode).await.unwrap(); // Tests selecting all timeseries - select_timeseries_with_all_test(address).await.unwrap(); + select_timeseries_with_all_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests selecting all timeseries with start time - select_timeseries_with_start_time_test(address).await.unwrap(); + select_timeseries_with_start_time_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests selecting all timeseries with start time - select_timeseries_with_limit_test(address).await.unwrap(); + select_timeseries_with_limit_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests selecting all timeseries with order - select_timeseries_with_order_test(address).await.unwrap(); + select_timeseries_with_order_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests schema does not change - get_schema_no_new_values_test(address).await.unwrap(); + get_schema_no_new_values_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests listing timeseries schema - timeseries_schema_list_test(address).await.unwrap(); + timeseries_schema_list_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests listing timeseries - list_timeseries_test(address).await.unwrap(); + list_timeseries_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests no changes are made when version is not updated - database_version_update_idempotent_test(address).await.unwrap(); + database_version_update_idempotent_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests that downgrading is impossible - database_version_will_not_downgrade_test(address).await.unwrap(); + database_version_will_not_downgrade_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests old data is dropped if version is updated - database_version_wipes_old_version_test(address).await.unwrap(); + database_version_wipes_old_version_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests schema cache is updated when a new sample is inserted - update_schema_cache_on_new_sample_test(address).await.unwrap(); + update_schema_cache_on_new_sample_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests that we can successfully query all extant datum types from the schema table. - select_all_datum_types_test(address).await.unwrap(); + select_all_datum_types_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); // Tests that, when cache new schema but _fail_ to insert them, // we also remove them from the internal cache. - new_schema_removed_when_not_inserted_test(address).await.unwrap(); + new_schema_removed_when_not_inserted_test( + db.address, + InstallationType::SingleNode, + ) + .await + .unwrap(); // Tests for fields and measurements - recall_field_value_bool_test(address).await.unwrap(); + recall_of_all_fields_test(db.address, InstallationType::SingleNode) + .await + .unwrap(); - recall_field_value_u8_test(address).await.unwrap(); + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + } - recall_field_value_i8_test(address).await.unwrap(); + #[tokio::test] + async fn test_replicated() { + let mut cluster = ClickHouseCluster::new() + .await + .expect("Failed to initialise ClickHouse Cluster"); - recall_field_value_u16_test(address).await.unwrap(); + // Tests that the expected error is returned on a wrong address + bad_db_connection_test().await.unwrap(); - recall_field_value_i16_test(address).await.unwrap(); + // Tests data is replicated in a cluster + data_is_replicated_test(&cluster).await.unwrap(); - recall_field_value_u32_test(address).await.unwrap(); + // Tests that a new client has started and it is part of a cluster + is_oximeter_cluster_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_i32_test(address).await.unwrap(); + // Tests that data can be inserted via the client + insert_samples_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_u64_test(address).await.unwrap(); + // Tests for a schema mismatch + schema_mismatch_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_i64_test(address).await.unwrap(); + // Tests for a schema update + schema_updated_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_string_test(address).await.unwrap(); + // Tests for specific timeseries selection + client_select_timeseries_one_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_ipv4addr_test(address).await.unwrap(); + // Tests for specific timeseries selection + field_record_count_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_ipv6addr_test(address).await.unwrap(); + // ClickHouse regression test + unquoted_64bit_integers_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_field_value_uuid_test(address).await.unwrap(); + // Tests to verify that we can distinguish between metrics by name + differentiate_by_timeseries_name_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_bool_test(address).await.unwrap(); + // Tests selecting a single timeseries + select_timeseries_with_select_one_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_i8_test(address).await.unwrap(); + // Tests selecting two timeseries + select_timeseries_with_select_one_field_with_multiple_values_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_u8_test(address).await.unwrap(); + // Tests selecting multiple timeseries + select_timeseries_with_select_multiple_fields_with_multiple_values_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_i16_test(address).await.unwrap(); + // Tests selecting all timeseries + select_timeseries_with_all_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_u16_test(address).await.unwrap(); + // Tests selecting all timeseries with start time + select_timeseries_with_start_time_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_i32_test(address).await.unwrap(); + // Tests selecting all timeseries with start time + select_timeseries_with_limit_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_u32_test(address).await.unwrap(); + // Tests selecting all timeseries with order + select_timeseries_with_order_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_i64_test(address).await.unwrap(); + // Tests schema does not change + get_schema_no_new_values_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_u64_test(address).await.unwrap(); + // Tests listing timeseries schema + timeseries_schema_list_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_f32_test(address).await.unwrap(); + // Tests listing timeseries + list_timeseries_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_f64_test(address).await.unwrap(); + // Tests no changes are made when version is not updated + database_version_update_idempotent_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_cumulative_i64_test(address).await.unwrap(); + // Tests that downgrading is impossible + database_version_will_not_downgrade_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_cumulative_u64_test(address).await.unwrap(); + // Tests old data is dropped if version is updated + database_version_wipes_old_version_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_cumulative_f64_test(address).await.unwrap(); + // Tests schema cache is updated when a new sample is inserted + update_schema_cache_on_new_sample_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_histogram_i8_test(address).await.unwrap(); + // Tests that we can successfully query all extant datum types from the schema table. + select_all_datum_types_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_histogram_u8_test(address).await.unwrap(); + // Tests that, when cache new schema but _fail_ to insert them, + // we also remove them from the internal cache. + new_schema_removed_when_not_inserted_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_histogram_i16_test(address).await.unwrap(); + // Tests for fields and measurements + recall_of_all_fields_test( + cluster.replica_1.address, + InstallationType::Cluster, + ) + .await + .unwrap(); - recall_measurement_histogram_u16_test(address).await.unwrap(); + cluster + .keeper_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 1"); + cluster + .keeper_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 2"); + cluster + .keeper_3 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse keeper 3"); + cluster + .replica_1 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 1"); + cluster + .replica_2 + .cleanup() + .await + .expect("Failed to cleanup ClickHouse server 2"); + } - recall_measurement_histogram_i32_test(address).await.unwrap(); + async fn bad_db_connection_test() -> Result<(), Error> { + let logctx = test_setup_log("test_bad_db_connection"); + let log = &logctx.log; - recall_measurement_histogram_u32_test(address).await.unwrap(); + let client = Client::new("127.0.0.1:443".parse().unwrap(), &log); + assert!(matches!( + client.ping().await, + Err(Error::DatabaseUnavailable(_)) + )); - recall_measurement_histogram_i64_test(address).await.unwrap(); + logctx.cleanup_successful(); + Ok(()) + } - recall_measurement_histogram_u64_test(address).await.unwrap(); + async fn data_is_replicated_test( + cluster: &ClickHouseCluster, + ) -> Result<(), Error> { + let logctx = test_setup_log("test_data_is_replicated"); + let log = &logctx.log; + + // Create database in node 1 + let client_1 = Client::new(cluster.replica_1.address, &log); + assert!(client_1.is_oximeter_cluster().await.unwrap()); + client_1 + .init_replicated_db() + .await + .expect("Failed to initialize timeseries database"); - recall_measurement_histogram_f64_test(address).await.unwrap(); + // Verify database exists in node 2 + let client_2 = Client::new(cluster.replica_2.address, &log); + assert!(client_2.is_oximeter_cluster().await.unwrap()); + let sql = String::from("SHOW DATABASES FORMAT JSONEachRow;"); - db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + // Try a few times to make sure data has been synchronised. + let mut result = String::from(""); + let tries = 5; + for _ in 0..tries { + result = client_2.execute_with_body(sql.clone()).await.unwrap(); + if !result.contains("oximeter") { + sleep(Duration::from_secs(1)).await; + continue; + } else { + break; + } + } + + assert!(result.contains("oximeter")); + + // Insert row into one of the tables + let sql = String::from( + "INSERT INTO oximeter.measurements_string (datum) VALUES ('hiya');", + ); + client_2.execute_with_body(sql).await.unwrap(); + + // Make sure replicas are synched + let sql = String::from( + "SYSTEM SYNC REPLICA oximeter.measurements_string_local;", + ); + client_1.execute_with_body(sql).await.unwrap(); + + // Make sure data exists in the other replica + let sql = String::from( + "SELECT * FROM oximeter.measurements_string FORMAT JSONEachRow;", + ); + let result = client_1.execute_with_body(sql).await.unwrap(); + assert!(result.contains("hiya")); + + client_1.wipe_replicated_db().await?; logctx.cleanup_successful(); + Ok(()) } - async fn is_not_oximeter_cluster_test( + async fn is_oximeter_cluster_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { - let logctx = test_setup_log("test_is_not_oximeter_cluster"); + let logctx = test_setup_log("test_is_oximeter_cluster"); let log = &logctx.log; let client = Client::new(address, &log); - assert!(!client.is_oximeter_cluster().await.unwrap()); - client.wipe_single_node_db().await?; + + match db_type { + InstallationType::Cluster => { + assert!(client.is_oximeter_cluster().await.unwrap()); + client.wipe_replicated_db().await?; + } + InstallationType::SingleNode => { + assert!(!client.is_oximeter_cluster().await.unwrap()); + client.wipe_single_node_db().await?; + } + } logctx.cleanup_successful(); Ok(()) } - async fn insert_samples_test(address: SocketAddr) -> Result<(), Error> { + async fn insert_samples_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { let logctx = test_setup_log("test_insert_samples"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = { let mut s = Vec::with_capacity(8); for _ in 0..s.capacity() { @@ -1054,7 +1414,7 @@ mod tests { s }; client.insert_samples(&samples).await?; - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } @@ -1079,15 +1439,15 @@ mod tests { } } - async fn schema_mismatch_test(address: SocketAddr) -> Result<(), Error> { + async fn schema_mismatch_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { let logctx = test_setup_log("test_schema_mismatch"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let sample = test_util::make_sample(); client.insert_samples(&[sample]).await.unwrap(); @@ -1104,20 +1464,20 @@ mod tests { let sample = Sample::new(&bad_name, &metric).unwrap(); let result = client.verify_or_cache_sample_schema(&sample).await; assert!(matches!(result, Err(Error::SchemaMismatch { .. }))); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } - async fn schema_updated_test(address: SocketAddr) -> Result<(), Error> { + async fn schema_updated_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { let logctx = test_setup_log("test_schema_updated"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let sample = test_util::make_sample(); // Verify that this sample is considered new, i.e., we return rows to update the timeseries @@ -1179,22 +1539,20 @@ mod tests { .collect::>(); assert_eq!(schema.len(), 1); assert_eq!(expected_schema, schema[0]); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn client_select_timeseries_one_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_client_select_timeseries_one"); let log = &logctx.log; - - let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + + let client = Client::new(address, &log); + db_type.init_db(&client).await.unwrap(); let samples = test_util::generate_test_samples(2, 2, 2, 2); client.insert_samples(&samples).await?; @@ -1266,12 +1624,15 @@ mod tests { .iter() .all(|field| field_cmp(field, sample.metric_fields())); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } - async fn field_record_count_test(address: SocketAddr) -> Result<(), Error> { + async fn field_record_count_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { let logctx = test_setup_log("test_field_record_count"); let log = &logctx.log; @@ -1280,10 +1641,7 @@ mod tests { // Because of the schema change, inserting field records per field per unique timeseries, // we'd like to exercise the logic of ClickHouse's replacing merge tree engine. let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = test_util::generate_test_samples(2, 2, 2, 2); client.insert_samples(&samples).await?; @@ -1320,7 +1678,7 @@ mod tests { ) .await; - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } @@ -1333,16 +1691,14 @@ mod tests { // details. This test verifies that we get back _unquoted_ integers from the database. async fn unquoted_64bit_integers_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { use serde_json::Value; let logctx = test_setup_log("test_unquoted_64bit_integers"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let output = client .execute_with_body( "SELECT toUInt64(1) AS foo FORMAT JSONEachRow;".to_string(), @@ -1352,13 +1708,14 @@ mod tests { let json: Value = serde_json::from_str(&output).unwrap(); assert_eq!(json["foo"], Value::Number(1u64.into())); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn differentiate_by_timeseries_name_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_differentiate_by_timeseries_name"); let log = &logctx.log; @@ -1382,10 +1739,7 @@ mod tests { } let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let target = MyTarget::default(); let first_metric = FirstMetric::default(); @@ -1422,13 +1776,14 @@ mod tests { assert_eq!(timeseries.target.name, "my_target"); assert_eq!(timeseries.metric.name, "second_metric"); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_select_one_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_select_one"); let log = &logctx.log; @@ -1436,10 +1791,7 @@ mod tests { let (target, metrics, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1484,13 +1836,14 @@ mod tests { verify_target(×eries.target, &target); verify_metric(×eries.metric, metrics.get(0).unwrap()); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_select_one_field_with_multiple_values_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log( "test_select_timeseries_with_select_one_field_with_multiple_values", @@ -1500,10 +1853,7 @@ mod tests { let (target, metrics, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1554,13 +1904,14 @@ mod tests { verify_metric(&ts.metric, metric); } - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_select_multiple_fields_with_multiple_values_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_select_multiple_fields_with_multiple_values"); let log = &logctx.log; @@ -1568,10 +1919,7 @@ mod tests { let (target, metrics, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1630,13 +1978,14 @@ mod tests { } } - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_all_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_all"); let log = &logctx.log; @@ -1644,10 +1993,7 @@ mod tests { let (target, metrics, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1691,13 +2037,14 @@ mod tests { verify_metric(&ts.metric, metrics.get(i).unwrap()); } - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_start_time_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_start_time"); let log = &logctx.log; @@ -1705,10 +2052,7 @@ mod tests { let (_, metrics, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1742,23 +2086,21 @@ mod tests { } } - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_limit_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_limit"); let log = &logctx.log; let (_, _, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1860,23 +2202,21 @@ mod tests { timeseries.measurements ); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn select_timeseries_with_order_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_select_timeseries_with_order"); let log = &logctx.log; let (_, _, samples) = setup_select_test(); let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); client .insert_samples(&samples) .await @@ -1961,22 +2301,20 @@ mod tests { timeseries_asc.last().unwrap() ); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn get_schema_no_new_values_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_get_schema_no_new_values"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = test_util::generate_test_samples(2, 2, 2, 2); client.insert_samples(&samples).await?; @@ -1988,22 +2326,20 @@ mod tests { .expect("Failed to get timeseries schema"); assert_eq!(&original_schema, &*schema, "Schema shouldn't change"); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn timeseries_schema_list_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_timeseries_schema_list"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = test_util::generate_test_samples(2, 2, 2, 2); client.insert_samples(&samples).await?; @@ -2025,20 +2361,20 @@ mod tests { result.next_page.is_none(), "Expected the next page token to be None" ); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } - async fn list_timeseries_test(address: SocketAddr) -> Result<(), Error> { + async fn list_timeseries_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { let logctx = test_setup_log("test_list_timeseries"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = test_util::generate_test_samples(2, 2, 2, 2); client.insert_samples(&samples).await?; @@ -2101,148 +2437,109 @@ mod tests { "Paginating should pick up where it left off" ); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn recall_field_value_bool_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let field = FieldValue::Bool(true); let as_json = serde_json::Value::from(1_u64); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_u8_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_u8_test(client: &Client) -> Result<(), Error> { let field = FieldValue::U8(1); let as_json = serde_json::Value::from(1_u8); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_i8_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_i8_test(client: &Client) -> Result<(), Error> { let field = FieldValue::I8(1); let as_json = serde_json::Value::from(1_i8); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_u16_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_u16_test(client: &Client) -> Result<(), Error> { let field = FieldValue::U16(1); let as_json = serde_json::Value::from(1_u16); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_i16_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_i16_test(client: &Client) -> Result<(), Error> { let field = FieldValue::I16(1); let as_json = serde_json::Value::from(1_i16); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_u32_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_u32_test(client: &Client) -> Result<(), Error> { let field = FieldValue::U32(1); let as_json = serde_json::Value::from(1_u32); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_i32_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_i32_test(client: &Client) -> Result<(), Error> { let field = FieldValue::I32(1); let as_json = serde_json::Value::from(1_i32); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_u64_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_u64_test(client: &Client) -> Result<(), Error> { let field = FieldValue::U64(1); let as_json = serde_json::Value::from(1_u64); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } - async fn recall_field_value_i64_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_field_value_i64_test(client: &Client) -> Result<(), Error> { let field = FieldValue::I64(1); let as_json = serde_json::Value::from(1_i64); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } async fn recall_field_value_string_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let field = FieldValue::String("foo".into()); let as_json = serde_json::Value::from("foo"); - test_recall_field_value_impl(address, field, as_json).await?; - Ok(()) - } - - async fn recall_field_value_ipv4addr_test( - address: SocketAddr, - ) -> Result<(), Error> { - let field = FieldValue::from(Ipv4Addr::LOCALHOST); - let as_json = serde_json::Value::from( - Ipv4Addr::LOCALHOST.to_ipv6_mapped().to_string(), - ); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } async fn recall_field_value_ipv6addr_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let field = FieldValue::from(Ipv6Addr::LOCALHOST); let as_json = serde_json::Value::from(Ipv6Addr::LOCALHOST.to_string()); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } async fn recall_field_value_uuid_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let id = Uuid::new_v4(); let field = FieldValue::from(id); let as_json = serde_json::Value::from(id.to_string()); - test_recall_field_value_impl(address, field, as_json).await?; + test_recall_field_value_impl(field, as_json, client).await?; Ok(()) } async fn test_recall_field_value_impl( - address: SocketAddr, field_value: FieldValue, as_json: serde_json::Value, + client: &Client, ) -> Result<(), Error> { - let logctx = test_setup_log( - format!("test_recall_field_value_{}", field_value.field_type()) - .as_str(), - ); - let log = &logctx.log; - - let client = Client::new(address, log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); - // Insert a record from this field. const TIMESERIES_NAME: &str = "foo:bar"; const TIMESERIES_KEY: u64 = 101; @@ -2282,157 +2579,134 @@ mod tests { actual_row, inserted_row, "Actual and expected field rows do not match" ); - - client.wipe_single_node_db().await?; - logctx.cleanup_successful(); Ok(()) } async fn recall_measurement_bool_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let datum = Datum::Bool(true); let as_json = serde_json::Value::from(1_u64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_i8_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_i8_test(client: &Client) -> Result<(), Error> { let datum = Datum::I8(1); let as_json = serde_json::Value::from(1_i8); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_u8_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_u8_test(client: &Client) -> Result<(), Error> { let datum = Datum::U8(1); let as_json = serde_json::Value::from(1_u8); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_i16_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_i16_test(client: &Client) -> Result<(), Error> { let datum = Datum::I16(1); let as_json = serde_json::Value::from(1_i16); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_u16_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_u16_test(client: &Client) -> Result<(), Error> { let datum = Datum::U16(1); let as_json = serde_json::Value::from(1_u16); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_i32_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_i32_test(client: &Client) -> Result<(), Error> { let datum = Datum::I32(1); let as_json = serde_json::Value::from(1_i32); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_u32_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_u32_test(client: &Client) -> Result<(), Error> { let datum = Datum::U32(1); let as_json = serde_json::Value::from(1_u32); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_i64_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_i64_test(client: &Client) -> Result<(), Error> { let datum = Datum::I64(1); let as_json = serde_json::Value::from(1_i64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_u64_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_u64_test(client: &Client) -> Result<(), Error> { let datum = Datum::U64(1); let as_json = serde_json::Value::from(1_u64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_f32_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_f32_test(client: &Client) -> Result<(), Error> { const VALUE: f32 = 1.1; let datum = Datum::F32(VALUE); // NOTE: This is intentionally an f64. let as_json = serde_json::Value::from(1.1_f64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } - async fn recall_measurement_f64_test( - address: SocketAddr, - ) -> Result<(), Error> { + async fn recall_measurement_f64_test(client: &Client) -> Result<(), Error> { const VALUE: f64 = 1.1; let datum = Datum::F64(VALUE); let as_json = serde_json::Value::from(VALUE); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } async fn recall_measurement_cumulative_i64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let datum = Datum::CumulativeI64(1.into()); let as_json = serde_json::Value::from(1_i64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } async fn recall_measurement_cumulative_u64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let datum = Datum::CumulativeU64(1.into()); let as_json = serde_json::Value::from(1_u64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } async fn recall_measurement_cumulative_f64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let datum = Datum::CumulativeF64(1.1.into()); let as_json = serde_json::Value::from(1.1_f64); - test_recall_measurement_impl::(address, datum, None, as_json) + test_recall_measurement_impl::(datum, None, as_json, client) .await?; Ok(()) } async fn histogram_test_impl( - address: SocketAddr, + client: &Client, hist: Histogram, ) -> Result<(), Error> where @@ -2445,72 +2719,72 @@ mod tests { let as_json = serde_json::Value::Array( counts.into_iter().map(Into::into).collect(), ); - test_recall_measurement_impl(address, datum, Some(bins), as_json) + test_recall_measurement_impl(datum, Some(bins), as_json, client) .await?; Ok(()) } async fn recall_measurement_histogram_i8_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0i8, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_u8_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0u8, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_i16_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0i16, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_u16_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0u16, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_i32_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0i32, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_u32_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0u32, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_i64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0i64, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_u64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0u64, 1, 2]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } @@ -2528,38 +2802,27 @@ mod tests { // discussion. #[allow(dead_code)] async fn recall_measurement_histogram_f32_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0.1f32, 0.2, 0.3]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn recall_measurement_histogram_f64_test( - address: SocketAddr, + client: &Client, ) -> Result<(), Error> { let hist = Histogram::new(&[0.1f64, 0.2, 0.3]).unwrap(); - histogram_test_impl(address, hist).await?; + histogram_test_impl(client, hist).await?; Ok(()) } async fn test_recall_measurement_impl + Copy>( - address: SocketAddr, datum: Datum, maybe_bins: Option>, json_datum: serde_json::Value, + client: &Client, ) -> Result<(), Error> { - let logctx = test_setup_log( - format!("test_recall_measurement_{}", datum.datum_type()).as_str(), - ); - let log = &logctx.log; - - let client = Client::new(address, log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); - // Insert a record from this datum. const TIMESERIES_NAME: &str = "foo:bar"; const TIMESERIES_KEY: u64 = 101; @@ -2609,7 +2872,7 @@ mod tests { // Select it exactly back out. let select_sql = format!( - "SELECT * FROM oximeter.{} LIMIT 1 FORMAT {};", + "SELECT * FROM oximeter.{} LIMIT 2 FORMAT {};", measurement_table, crate::DATABASE_SELECT_FORMAT, ); @@ -2617,6 +2880,7 @@ mod tests { .execute_with_body(select_sql) .await .expect("Failed to select measurement row"); + println!("{}", body); let actual_row: serde_json::Value = serde_json::from_str(&body) .expect("Failed to parse measurement row JSON"); println!("{actual_row:?}"); @@ -2625,8 +2889,6 @@ mod tests { actual_row, inserted_row, "Actual and expected measurement rows do not match" ); - client.wipe_single_node_db().await?; - logctx.cleanup_successful(); Ok(()) } @@ -2642,8 +2904,94 @@ mod tests { .count() } + async fn recall_of_all_fields_test( + address: SocketAddr, + db_type: InstallationType, + ) -> Result<(), Error> { + let logctx = test_setup_log("test_recall_of_all_fields"); + let log = &logctx.log; + + let client = Client::new(address, log); + db_type.init_db(&client).await.unwrap(); + + recall_measurement_bool_test(&client).await.unwrap(); + + recall_measurement_i8_test(&client).await.unwrap(); + + recall_measurement_u8_test(&client).await.unwrap(); + + recall_measurement_i16_test(&client).await.unwrap(); + + recall_measurement_u16_test(&client).await.unwrap(); + + recall_measurement_i32_test(&client).await.unwrap(); + + recall_measurement_u32_test(&client).await.unwrap(); + + recall_measurement_i64_test(&client).await.unwrap(); + + recall_measurement_u64_test(&client).await.unwrap(); + + recall_measurement_f32_test(&client).await.unwrap(); + + recall_measurement_f64_test(&client).await.unwrap(); + + recall_measurement_cumulative_i64_test(&client).await.unwrap(); + + recall_measurement_cumulative_u64_test(&client).await.unwrap(); + + recall_measurement_cumulative_f64_test(&client).await.unwrap(); + + recall_measurement_histogram_i8_test(&client).await.unwrap(); + + recall_measurement_histogram_u8_test(&client).await.unwrap(); + + recall_measurement_histogram_i16_test(&client).await.unwrap(); + + recall_measurement_histogram_u16_test(&client).await.unwrap(); + + recall_measurement_histogram_i32_test(&client).await.unwrap(); + + recall_measurement_histogram_u32_test(&client).await.unwrap(); + + recall_measurement_histogram_i64_test(&client).await.unwrap(); + + recall_measurement_histogram_u64_test(&client).await.unwrap(); + + recall_measurement_histogram_f64_test(&client).await.unwrap(); + + recall_field_value_bool_test(&client).await.unwrap(); + + recall_field_value_u8_test(&client).await.unwrap(); + + recall_field_value_i8_test(&client).await.unwrap(); + + recall_field_value_u16_test(&client).await.unwrap(); + + recall_field_value_i16_test(&client).await.unwrap(); + + recall_field_value_u32_test(&client).await.unwrap(); + + recall_field_value_i32_test(&client).await.unwrap(); + + recall_field_value_u64_test(&client).await.unwrap(); + + recall_field_value_i64_test(&client).await.unwrap(); + + recall_field_value_string_test(&client).await.unwrap(); + + recall_field_value_ipv6addr_test(&client).await.unwrap(); + + recall_field_value_uuid_test(&client).await.unwrap(); + + db_type.wipe_db(&client).await.unwrap(); + logctx.cleanup_successful(); + Ok(()) + } + async fn database_version_update_idempotent_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_database_version_update_idempotent"); let log = &logctx.log; @@ -2674,13 +3022,14 @@ mod tests { assert_eq!(1, get_schema_count(&client).await); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn database_version_will_not_downgrade_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_database_version_will_not_downgrade"); let log = &logctx.log; @@ -2709,13 +3058,14 @@ mod tests { .await .expect_err("Should have failed, downgrades are not supported"); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn database_version_wipes_old_version_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { let logctx = test_setup_log("test_database_version_wipes_old_version"); let log = &logctx.log; @@ -2745,23 +3095,21 @@ mod tests { .expect("Should have initialized database successfully"); assert_eq!(0, get_schema_count(&client).await); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } async fn update_schema_cache_on_new_sample_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { usdt::register_probes().unwrap(); let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = [test_util::make_sample()]; client.insert_samples(&samples).await.unwrap(); @@ -2793,7 +3141,7 @@ mod tests { "Expected exactly 1 schema again" ); assert_eq!(client.schema.lock().await.len(), 1); - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } @@ -2805,6 +3153,7 @@ mod tests { // succeed. async fn select_all_datum_types_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { use strum::IntoEnumIterator; usdt::register_probes().unwrap(); @@ -2812,10 +3161,7 @@ mod tests { let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); // Attempt to select all schema with each datum type. for ty in oximeter::DatumType::iter() { @@ -2830,7 +3176,7 @@ mod tests { let count = res.trim().parse::().unwrap(); assert_eq!(count, 0); } - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); logctx.cleanup_successful(); Ok(()) } @@ -2841,16 +3187,14 @@ mod tests { // remove them from the internal cache. async fn new_schema_removed_when_not_inserted_test( address: SocketAddr, + db_type: InstallationType, ) -> Result<(), Error> { usdt::register_probes().unwrap(); let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); let log = &logctx.log; let client = Client::new(address, &log); - client - .init_single_node_db() - .await - .expect("Failed to initialize timeseries database"); + db_type.init_db(&client).await.unwrap(); let samples = [test_util::make_sample()]; // We're using the components of the `insert_samples()` method here, @@ -2866,7 +3210,7 @@ mod tests { // Next, we'll kill the database, and then try to insert the schema. // That will fail, since the DB is now inaccessible. - client.wipe_single_node_db().await?; + db_type.wipe_db(&client).await.unwrap(); let res = client.save_new_schema_or_remove(new_schema).await; assert!(res.is_err(), "Should have failed since the DB is gone"); assert!( @@ -2878,87 +3222,6 @@ mod tests { Ok(()) } - #[tokio::test] - async fn test_build_replicated() { - let logctx = test_setup_log("test_build_replicated"); - let log = &logctx.log; - - let mut cluster = ClickHouseCluster::new() - .await - .expect("Failed to initialise ClickHouse Cluster"); - - // Create database in node 1 - let client_1 = Client::new(cluster.replica_1.address.unwrap(), &log); - assert!(client_1.is_oximeter_cluster().await.unwrap()); - client_1 - .init_replicated_db() - .await - .expect("Failed to initialize timeseries database"); - - // Verify database exists in node 2 - let client_2 = Client::new(cluster.replica_2.address.unwrap(), &log); - assert!(client_2.is_oximeter_cluster().await.unwrap()); - let sql = String::from("SHOW DATABASES FORMAT JSONEachRow;"); - let result = client_2.execute_with_body(sql).await.unwrap(); - - // Try a few times to make sure data has been synchronised. - let tries = 5; - for _ in 0..tries { - if !result.contains("oximeter") { - sleep(Duration::from_secs(1)).await; - continue; - } else { - break; - } - } - - assert!(result.contains("oximeter")); - - // Insert row into one of the tables - let sql = String::from( - "INSERT INTO oximeter.measurements_string (datum) VALUES ('hiya');", - ); - client_2.execute_with_body(sql).await.unwrap(); - - let sql = String::from( - "SELECT * FROM oximeter.measurements_string FORMAT JSONEachRow;", - ); - let result = client_2.execute_with_body(sql.clone()).await.unwrap(); - assert!(result.contains("hiya")); - - // TODO(https://github.com/oxidecomputer/omicron/issues/4001): With distributed - // engine, it can take a long time to sync the data. This means it's tricky to - // test that the data exists on both nodes. - - cluster - .keeper_1 - .cleanup() - .await - .expect("Failed to cleanup ClickHouse keeper 1"); - cluster - .keeper_2 - .cleanup() - .await - .expect("Failed to cleanup ClickHouse keeper 2"); - cluster - .keeper_3 - .cleanup() - .await - .expect("Failed to cleanup ClickHouse keeper 3"); - cluster - .replica_1 - .cleanup() - .await - .expect("Failed to cleanup ClickHouse server 1"); - cluster - .replica_2 - .cleanup() - .await - .expect("Failed to cleanup ClickHouse server 2"); - - logctx.cleanup_successful(); - } - // Testing helper functions #[derive(Debug, Clone, oximeter::Target)] diff --git a/oximeter/db/src/db-replicated-init.sql b/oximeter/db/src/db-replicated-init.sql index 21a647b1a5..ec11854e44 100644 --- a/oximeter/db/src/db-replicated-init.sql +++ b/oximeter/db/src/db-replicated-init.sql @@ -188,6 +188,26 @@ CREATE TABLE IF NOT EXISTS oximeter.measurements_u64 ON CLUSTER oximeter_cluster ) ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_u64_local', xxHash64(splitByChar(':', timeseries_name)[1])); -- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f32_local ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/measurements_f32_local', '{replica}') +ORDER BY (timeseries_name, timeseries_key, timestamp) +TTL toDateTime(timestamp) + INTERVAL 30 DAY; +-- +CREATE TABLE IF NOT EXISTS oximeter.measurements_f32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + timestamp DateTime64(9, 'UTC'), + datum Float32 +) +ENGINE = Distributed('oximeter_cluster', 'oximeter', 'measurements_f32_local', xxHash64(splitByChar(':', timeseries_name)[1])); +-- CREATE TABLE IF NOT EXISTS oximeter.measurements_f64_local ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -586,6 +606,66 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_bool ON CLUSTER oximeter_cluster ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); -- +CREATE TABLE IF NOT EXISTS oximeter.fields_i8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int8 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_u8 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt8 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_i16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int16 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_u16 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt16 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_i32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value Int32 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- +CREATE TABLE IF NOT EXISTS oximeter.fields_u32 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt32 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster ( timeseries_name String, @@ -596,6 +676,16 @@ CREATE TABLE IF NOT EXISTS oximeter.fields_i64 ON CLUSTER oximeter_cluster ENGINE = ReplicatedReplacingMergeTree() ORDER BY (timeseries_name, field_name, field_value, timeseries_key); -- +CREATE TABLE IF NOT EXISTS oximeter.fields_u64 ON CLUSTER oximeter_cluster +( + timeseries_name String, + timeseries_key UInt64, + field_name String, + field_value UInt64 +) +ENGINE = ReplicatedReplacingMergeTree() +ORDER BY (timeseries_name, field_name, field_value, timeseries_key); +-- CREATE TABLE IF NOT EXISTS oximeter.fields_ipaddr ON CLUSTER oximeter_cluster ( timeseries_name String, diff --git a/oximeter/db/src/db-wipe-replicated.sql b/oximeter/db/src/db-wipe-replicated.sql index 1ed4d270b7..5874da7561 100644 --- a/oximeter/db/src/db-wipe-replicated.sql +++ b/oximeter/db/src/db-wipe-replicated.sql @@ -1 +1 @@ -DROP DATABASE IF EXISTS oximeter ON CLUSTER oximeter_cluster; +DROP DATABASE IF EXISTS oximeter ON CLUSTER oximeter_cluster SYNC; diff --git a/test-utils/src/dev/clickhouse.rs b/test-utils/src/dev/clickhouse.rs index e96f969bbc..6fb495627f 100644 --- a/test-utils/src/dev/clickhouse.rs +++ b/test-utils/src/dev/clickhouse.rs @@ -9,7 +9,7 @@ use std::process::Stdio; use std::time::Duration; use anyhow::{anyhow, Context}; -use std::net::SocketAddr; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use tempfile::{Builder, TempDir}; use thiserror::Error; use tokio::{ @@ -32,7 +32,7 @@ pub struct ClickHouseInstance { // The HTTP port the server is listening on port: u16, // The address the server is listening on - pub address: Option, + pub address: SocketAddr, // Full list of command-line arguments args: Vec, // Subprocess handle @@ -109,11 +109,13 @@ impl ClickHouseInstance { let data_path = data_dir.path().to_path_buf(); let port = wait_for_port(log_path).await?; + let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); + Ok(Self { data_dir: Some(data_dir), data_path, port, - address: None, + address: address, args, child: Some(child), }) @@ -161,21 +163,21 @@ impl ClickHouseInstance { .env("CH_USER_LOCAL_DIR", access_path) .env("CH_FORMAT_SCHEMA_PATH", format_schemas_path) .env("CH_REPLICA_NUMBER", r_number) - // There seems to be a bug using ipv6 with a replicated set up - // when installing all servers and coordinator nodes on the same - // server. For this reason we will be using ipv4 for testing. - .env("CH_REPLICA_HOST_01", "127.0.0.1") - .env("CH_REPLICA_HOST_02", "127.0.0.1") - .env("CH_KEEPER_HOST_01", "127.0.0.1") - .env("CH_KEEPER_HOST_02", "127.0.0.1") - .env("CH_KEEPER_HOST_03", "127.0.0.1") + .env("CH_REPLICA_HOST_01", "::1") + .env("CH_REPLICA_HOST_02", "::1") + // ClickHouse servers have a small quirk, where when setting the keeper hosts as IPv6 localhost + // addresses in the replica configuration file, they must be wrapped in square brackets + // Otherwise, when running any query, a "Service not found" error appears. + .env("CH_KEEPER_HOST_01", "[::1]") + .env("CH_KEEPER_HOST_02", "[::1]") + .env("CH_KEEPER_HOST_03", "[::1]") .spawn() .with_context(|| { format!("failed to spawn `clickhouse` (with args: {:?})", &args) })?; let data_path = data_dir.path().to_path_buf(); - let address = SocketAddr::new("127.0.0.1".parse().unwrap(), port); + let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); let result = wait_for_ready(log_path).await; match result { @@ -183,7 +185,7 @@ impl ClickHouseInstance { data_dir: Some(data_dir), data_path, port, - address: Some(address), + address: address, args, child: Some(child), }), @@ -237,12 +239,9 @@ impl ClickHouseInstance { .env("CH_KEEPER_ID_01", "1") .env("CH_KEEPER_ID_02", "2") .env("CH_KEEPER_ID_03", "3") - // There seems to be a bug using ipv6 and localhost with a replicated - // set up when installing all servers and coordinator nodes on the same - // server. For this reason we will be using ipv4 for testing. - .env("CH_KEEPER_HOST_01", "127.0.0.1") - .env("CH_KEEPER_HOST_02", "127.0.0.1") - .env("CH_KEEPER_HOST_03", "127.0.0.1") + .env("CH_KEEPER_HOST_01", "::1") + .env("CH_KEEPER_HOST_02", "::1") + .env("CH_KEEPER_HOST_03", "::1") .spawn() .with_context(|| { format!( @@ -252,6 +251,7 @@ impl ClickHouseInstance { })?; let data_path = data_dir.path().to_path_buf(); + let address = SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), port); let result = wait_for_ready(log_path).await; match result { @@ -259,7 +259,7 @@ impl ClickHouseInstance { data_dir: Some(data_dir), data_path, port, - address: None, + address: address, args, child: Some(child), }),