diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index f560d9938428..ae54a00d986a 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -65,6 +65,7 @@ pub struct GreptimeDbCluster { pub frontend: Arc, } +#[derive(Clone)] pub struct GreptimeDbClusterBuilder { cluster_name: String, kv_backend: KvBackendRef, diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 4e6187fc7ffb..20348c462aa0 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -43,6 +43,7 @@ pub struct GreptimeDbStandalone { pub guard: TestGuard, } +#[derive(Clone)] pub struct GreptimeDbStandaloneBuilder { instance_name: String, wal_config: WalConfig, diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 692a9de8d6ab..8d1e421738aa 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -14,6 +14,8 @@ mod instance_test; mod promql_test; +// TODO(weny): Remove it. +#[allow(dead_code, unused_macros)] mod test_util; use std::collections::HashMap; diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index edf21ba7601d..32be423e69bb 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::env; use std::sync::Arc; +use common_config::wal::KafkaConfig; +use common_config::WalConfig; use common_query::Output; use common_recordbatch::util; +use common_telemetry::warn; use common_test_util::find_workspace_path; use frontend::instance::Instance; use rstest_reuse::{self, template}; @@ -25,7 +29,13 @@ use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder}; use crate::test_util::StorageType; use crate::tests::{create_distributed_instance, MockDistributedInstance}; -pub(crate) trait MockInstance { +#[async_trait::async_trait] +pub(crate) trait RebuildableMockInstance: MockInstance { + // Rebuilds the instance and returns rebuilt frontend instance. + async fn rebuild(&mut self) -> Arc; +} + +pub(crate) trait MockInstance: Sync + Send { fn frontend(&self) -> Arc; fn is_distributed_mode(&self) -> bool; @@ -51,6 +61,54 @@ impl MockInstance for MockDistributedInstance { } } +pub(crate) enum MockInstanceBuilder { + Standalone(GreptimeDbStandaloneBuilder), + Distributed(GreptimeDbClusterBuilder), +} + +impl MockInstanceBuilder { + async fn build(&self) -> Arc { + match self { + MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await), + MockInstanceBuilder::Distributed(builder) => { + Arc::new(MockDistributedInstance(builder.clone().build().await)) + } + } + } +} + +pub(crate) struct TestContext { + instance: Arc, + builder: MockInstanceBuilder, +} + +impl TestContext { + async fn new(builder: MockInstanceBuilder) -> Self { + let instance = builder.build().await; + + Self { instance, builder } + } +} + +#[async_trait::async_trait] +impl RebuildableMockInstance for TestContext { + async fn rebuild(&mut self) -> Arc { + let instance = self.builder.build().await; + self.instance = instance; + self.instance.frontend() + } +} + +impl MockInstance for TestContext { + fn frontend(&self) -> Arc { + self.instance.frontend() + } + + fn is_distributed_mode(&self) -> bool { + self.instance.is_distributed_mode() + } +} + pub(crate) async fn standalone() -> Arc { let test_name = uuid::Uuid::new_v4().to_string(); let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await; @@ -86,6 +144,61 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc Option> { + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return None; + } + + let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect(); + let test_name = uuid::Uuid::new_v4().to_string(); + let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka( + KafkaConfig { + broker_endpoints: endpoints, + ..Default::default() + }, + )); + let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await; + Some(Box::new(instance)) +} + +pub(crate) async fn distributed_with_kafka_wal() -> Option> { + let _ = dotenv::dotenv(); + let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default(); + common_telemetry::init_default_ut_logging(); + if endpoints.is_empty() { + warn!("The endpoints is empty, skipping the test"); + return None; + } + + let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect(); + let test_name = uuid::Uuid::new_v4().to_string(); + let builder = GreptimeDbClusterBuilder::new(&test_name) + .await + .with_wal_config(WalConfig::Kafka(KafkaConfig { + broker_endpoints: endpoints, + ..Default::default() + })); + let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await; + Some(Box::new(instance)) +} + +#[template] +#[rstest] +#[case::test_with_standalone(standalone_with_kafka_wal())] +#[case::test_with_distributed(distributed_with_kafka_wal())] +#[awt] +#[tokio::test(flavor = "multi_thread")] +pub(crate) fn both_instances_cases_with_kafka_wal( + #[future] + #[case] + instance: Arc, +) { +} + #[template] #[rstest] #[case::test_with_standalone(standalone_with_multiple_object_stores())]