Skip to content

Commit

Permalink
feat: add kafka wal integration test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jan 2, 2024
1 parent 757cbb6 commit 8b48624
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 1 deletion.
1 change: 1 addition & 0 deletions tests-integration/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct GreptimeDbCluster {
pub frontend: Arc<FeInstance>,
}

#[derive(Clone)]
pub struct GreptimeDbClusterBuilder {
cluster_name: String,
kv_backend: KvBackendRef,
Expand Down
1 change: 1 addition & 0 deletions tests-integration/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct GreptimeDbStandalone {
pub guard: TestGuard,
}

#[derive(Clone)]
pub struct GreptimeDbStandaloneBuilder {
instance_name: String,
wal_config: WalConfig,
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

mod instance_test;
mod promql_test;
// TODO(weny): Remove it.
#[allow(dead_code, unused_macros)]
mod test_util;

use std::collections::HashMap;
Expand Down
115 changes: 114 additions & 1 deletion tests-integration/src/tests/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::env;
use std::sync::Arc;

use common_config::wal::KafkaConfig;
use common_config::WalConfig;
use common_query::Output;
use common_recordbatch::util;
use common_telemetry::warn;
use common_test_util::find_workspace_path;
use frontend::instance::Instance;
use rstest_reuse::{self, template};
Expand All @@ -25,7 +29,13 @@ use crate::standalone::{GreptimeDbStandalone, GreptimeDbStandaloneBuilder};
use crate::test_util::StorageType;
use crate::tests::{create_distributed_instance, MockDistributedInstance};

pub(crate) trait MockInstance {
#[async_trait::async_trait]
pub(crate) trait RebuildableMockInstance: MockInstance {
// Rebuilds the instance and returns rebuilt frontend instance.
async fn rebuild(&mut self) -> Arc<Instance>;
}

pub(crate) trait MockInstance: Sync + Send {
fn frontend(&self) -> Arc<Instance>;

fn is_distributed_mode(&self) -> bool;
Expand All @@ -51,6 +61,54 @@ impl MockInstance for MockDistributedInstance {
}
}

pub(crate) enum MockInstanceBuilder {
Standalone(GreptimeDbStandaloneBuilder),
Distributed(GreptimeDbClusterBuilder),
}

impl MockInstanceBuilder {
async fn build(&self) -> Arc<dyn MockInstance> {
match self {
MockInstanceBuilder::Standalone(builder) => Arc::new(builder.clone().build().await),
MockInstanceBuilder::Distributed(builder) => {
Arc::new(MockDistributedInstance(builder.clone().build().await))
}
}
}
}

pub(crate) struct TestContext {
instance: Arc<dyn MockInstance>,
builder: MockInstanceBuilder,
}

impl TestContext {
async fn new(builder: MockInstanceBuilder) -> Self {
let instance = builder.build().await;

Self { instance, builder }
}
}

#[async_trait::async_trait]
impl RebuildableMockInstance for TestContext {
async fn rebuild(&mut self) -> Arc<Instance> {
let instance = self.builder.build().await;
self.instance = instance;
self.instance.frontend()
}
}

impl MockInstance for TestContext {
fn frontend(&self) -> Arc<Instance> {
self.instance.frontend()
}

fn is_distributed_mode(&self) -> bool {
self.instance.is_distributed_mode()
}
}

pub(crate) async fn standalone() -> Arc<dyn MockInstance> {
let test_name = uuid::Uuid::new_v4().to_string();
let instance = GreptimeDbStandaloneBuilder::new(&test_name).build().await;
Expand Down Expand Up @@ -86,6 +144,61 @@ pub(crate) async fn distributed_with_multiple_object_stores() -> Arc<dyn MockIns
Arc::new(MockDistributedInstance(cluster))
}

pub(crate) async fn standalone_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}

let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbStandaloneBuilder::new(&test_name).with_wal_config(WalConfig::Kafka(
KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
},
));
let instance = TestContext::new(MockInstanceBuilder::Standalone(builder)).await;
Some(Box::new(instance))
}

pub(crate) async fn distributed_with_kafka_wal() -> Option<Box<dyn RebuildableMockInstance>> {
let _ = dotenv::dotenv();
let endpoints = env::var("GT_KAFKA_ENDPOINTS").unwrap_or_default();
common_telemetry::init_default_ut_logging();
if endpoints.is_empty() {
warn!("The endpoints is empty, skipping the test");
return None;
}

let endpoints = endpoints.split(',').map(|s| s.trim().to_string()).collect();
let test_name = uuid::Uuid::new_v4().to_string();
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
.with_wal_config(WalConfig::Kafka(KafkaConfig {
broker_endpoints: endpoints,
..Default::default()
}));
let instance = TestContext::new(MockInstanceBuilder::Distributed(builder)).await;
Some(Box::new(instance))
}

#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_kafka_wal())]
#[case::test_with_distributed(distributed_with_kafka_wal())]
#[awt]
#[tokio::test(flavor = "multi_thread")]
pub(crate) fn both_instances_cases_with_kafka_wal(
#[future]
#[case]
instance: Arc<dyn MockInstance>,
) {
}

#[template]
#[rstest]
#[case::test_with_standalone(standalone_with_multiple_object_stores())]
Expand Down

0 comments on commit 8b48624

Please sign in to comment.