diff --git a/Cargo.lock b/Cargo.lock index 97e7f48e922e..6a0893a1304e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8858,6 +8858,7 @@ dependencies = [ "common-recordbatch", "common-time", "serde", + "serde_json", "sqlness", "tinytemplate", "tokio", diff --git a/tests-integration/fixtures/kafka/README.md b/tests-integration/fixtures/kafka/README.md new file mode 100644 index 000000000000..1ca33bc36af3 --- /dev/null +++ b/tests-integration/fixtures/kafka/README.md @@ -0,0 +1,19 @@ +## Starts a standalone kafka +```bash +docker compose -f docker-compose-standalone.yml up kafka -d +``` + +## Lists running containers +```bash +docker compose -f docker-compose-standalone.yml ps +``` + +## Stops the standalone kafka +```bash +docker compose -f docker-compose-standalone.yml stop kafka +``` + +## Stops and removes a standalone kafka +```bash +docker compose -f docker-compose-standalone.yml down kafka +``` \ No newline at end of file diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 55d0c2f1fe4c..9ef508fe4c45 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -6,12 +6,17 @@ rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 [wal] +{{ if is_raft_engine }} provider = "raft_engine" file_size = '1GB' purge_interval = '10m' purge_threshold = '10GB' read_batch_size = 128 sync_write = false +{{ else }} +provider = "kafka" +broker_endpoints = {kafka_wal_broker_endpoints} +{{ endif }} [storage] type = 'File' diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index f0ddc38d048e..9857fd0c0566 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -3,12 +3,17 @@ enable_memory_catalog = false require_lease_before_startup = true [wal] +{{ if is_raft_engine }} provider = "raft_engine" file_size = '1GB' purge_interval = '10m' purge_threshold = '10GB' read_batch_size = 128 sync_write = false +{{ else }} +provider = "kafka" +broker_endpoints = {kafka_wal_broker_endpoints | unescaped} +{{ endif }} [storage] type = 'File' diff --git a/tests/runner/Cargo.toml b/tests/runner/Cargo.toml index 7b9141776fd8..b2757f479dd6 100644 --- a/tests/runner/Cargo.toml +++ b/tests/runner/Cargo.toml @@ -15,6 +15,7 @@ common-query.workspace = true common-recordbatch.workspace = true common-time.workspace = true serde.workspace = true +serde_json.workspace = true sqlness = { version = "0.5" } tinytemplate = "1.2" tokio.workspace = true diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 1bd7ad36496a..7c3f3c9c853d 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -41,10 +41,17 @@ const METASRV_ADDR: &str = "127.0.0.1:3002"; const SERVER_ADDR: &str = "127.0.0.1:4001"; const DEFAULT_LOG_LEVEL: &str = "--log-level=debug,hyper=warn,tower=warn,datafusion=warn,reqwest=warn,sqlparser=warn,h2=info,opendal=info"; +#[derive(Clone)] +pub enum WalConfig { + RaftEngine, + Kafka { broker_endpoints: Vec }, +} + #[derive(Clone)] pub struct Env { data_home: PathBuf, server_addr: Option, + wal: WalConfig, } #[allow(clippy::print_stdout)] @@ -68,10 +75,11 @@ impl EnvController for Env { #[allow(clippy::print_stdout)] impl Env { - pub fn new(data_home: PathBuf, server_addr: Option) -> Self { + pub fn new(data_home: PathBuf, server_addr: Option, wal: WalConfig) -> Self { Self { data_home, server_addr, + wal, } } @@ -81,7 +89,7 @@ impl Env { } else { Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(self.wal.clone()); let server_process = self.start_server("standalone", &db_ctx, true).await; @@ -106,7 +114,7 @@ impl Env { } else { Self::build_db().await; - let db_ctx = GreptimeDBContext::new(); + let db_ctx = GreptimeDBContext::new(self.wal.clone()); // start a distributed GreptimeDB let meta_server = self.start_server("metasrv", &db_ctx, true).await; @@ -145,6 +153,7 @@ impl Env { ctx: GreptimeDBContext { time: 0, datanode_id: Default::default(), + wal: self.wal.clone(), }, is_standalone: false, env: self.clone(), @@ -321,6 +330,8 @@ impl Env { wal_dir: String, data_home: String, procedure_dir: String, + is_raft_engine: bool, + kafka_wal_broker_endpoints: String, } let data_home = self @@ -334,6 +345,8 @@ impl Env { wal_dir, data_home: data_home.display().to_string(), procedure_dir, + is_raft_engine: db_ctx.is_raft_engine(), + kafka_wal_broker_endpoints: db_ctx.kafka_wal_broker_endpoints(), }; let rendered = tt.render(subcommand, &ctx).unwrap(); @@ -447,13 +460,28 @@ struct GreptimeDBContext { /// Start time in millisecond time: i64, datanode_id: AtomicU32, + wal: WalConfig, } impl GreptimeDBContext { - pub fn new() -> Self { + pub fn new(wal: WalConfig) -> Self { Self { time: common_time::util::current_time_millis(), datanode_id: AtomicU32::new(0), + wal, + } + } + + fn is_raft_engine(&self) -> bool { + matches!(self.wal, WalConfig::RaftEngine) + } + + fn kafka_wal_broker_endpoints(&self) -> String { + match &self.wal { + WalConfig::RaftEngine => String::new(), + WalConfig::Kafka { broker_endpoints } => { + serde_json::to_string(&broker_endpoints).unwrap() + } } } diff --git a/tests/runner/src/main.rs b/tests/runner/src/main.rs index 9c93e628a484..2165671261ac 100644 --- a/tests/runner/src/main.rs +++ b/tests/runner/src/main.rs @@ -14,13 +14,20 @@ use std::path::PathBuf; -use clap::Parser; -use env::Env; +use clap::{Parser, ValueEnum}; +use env::{Env, WalConfig}; use sqlness::{ConfigBuilder, Runner}; mod env; mod util; +#[derive(ValueEnum, Debug, Clone)] +#[clap(rename_all = "snake_case")] +enum Wal { + RaftEngine, + Kafka, +} + #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] /// SQL Harness for GrepTimeDB @@ -41,9 +48,17 @@ struct Args { #[clap(short, long, default_value = ".*")] test_filter: String, - /// Address of the server + /// Address of the server. #[clap(short, long)] server_addr: Option, + + /// The type of Wal. + #[clap(short, long, default_value = "raft-engine")] + wal: Wal, + + /// The kafka wal broker endpoints. + #[clap(short, long, default_value = "127.0.0.1:9092")] + kafka_wal_broker_endpoints: String, } #[tokio::main] @@ -63,6 +78,18 @@ async fn main() { .env_config_file(args.env_config_file) .build() .unwrap(); - let runner = Runner::new(config, Env::new(data_home, args.server_addr)); + + let wal = match args.wal { + Wal::RaftEngine => WalConfig::RaftEngine, + Wal::Kafka => WalConfig::Kafka { + broker_endpoints: args + .kafka_wal_broker_endpoints + .split(',') + .map(|s| s.to_string()) + .collect(), + }, + }; + + let runner = Runner::new(config, Env::new(data_home, args.server_addr, wal)); runner.run().await.unwrap(); }