diff --git a/Cargo.lock b/Cargo.lock index 18fa00347ff4..d722b1e9f2ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5834,6 +5834,7 @@ dependencies = [ "datafusion-common 38.0.0", "datafusion-expr 38.0.0", "datatypes", + "dotenv", "futures", "humantime-serde", "index", @@ -5851,6 +5852,9 @@ dependencies = [ "puffin", "rand", "regex", + "rskafka", + "rstest", + "rstest_reuse", "serde", "serde_json", "serde_with", @@ -8256,6 +8260,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "relative-path" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" + [[package]] name = "rend" version = "0.4.2" @@ -8522,9 +8532,9 @@ dependencies = [ [[package]] name = "rstest" -version = "0.17.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de1bb486a691878cd320c2f0d319ba91eeaa2e894066d8b5f8f117c000e9d962" +checksum = "9afd55a67069d6e434a95161415f5beeada95a01c7b815508a82dcb0e1593682" dependencies = [ "futures", "futures-timer", @@ -8534,28 +8544,31 @@ dependencies = [ [[package]] name = "rstest_macros" -version = "0.17.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290ca1a1c8ca7edb7c3283bd44dc35dd54fdec6253a3912e201ba1072018fca8" +checksum = "4165dfae59a39dd41d8dec720d3cbfbc71f69744efb480a3920f5d4e0cc6798d" dependencies = [ "cfg-if", + "glob", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", + "regex", + "relative-path", "rustc_version", - "syn 1.0.109", + "syn 2.0.66", "unicode-ident", ] [[package]] name = "rstest_reuse" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45f80dcc84beab3a327bbe161f77db25f336a1452428176787c8c79ac79d7073" +checksum = "b3a8fb4672e840a587a66fc577a5491375df51ddb88f2a2c2a792598c326fe14" dependencies = [ "quote", "rand", - "rustc_version", - "syn 1.0.109", + "syn 2.0.66", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9623dfb90116..bdfa94e8a639 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,6 +153,8 @@ reqwest = { version = "0.12", default-features = false, features = [ "multipart", ] } rskafka = "0.5" +rstest = "0.21" +rstest_reuse = "0.7" rust_decimal = "1.33" schemars = "0.8" serde = { version = "1.0", features = ["derive"] } diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index c25b411665b3..e714088d89ec 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -13,10 +13,13 @@ // limitations under the License. use std::path::Path; +use std::time::Duration; use common_base::readable_size::ReadableSize; +use common_wal::config::kafka::DatanodeKafkaConfig; use common_wal::config::raft_engine::RaftEngineConfig; +use crate::kafka::log_store::KafkaLogStore; use crate::raft_engine::log_store::RaftEngineLogStore; /// Create a write log for the provided path, used for test. @@ -28,3 +31,14 @@ pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEng }; RaftEngineLogStore::try_new(path, cfg).await.unwrap() } + +/// Create a [KafkaLogStore]. +pub async fn create_kafka_log_store(broker_endpoints: Vec) -> KafkaLogStore { + KafkaLogStore::try_new(&DatanodeKafkaConfig { + broker_endpoints, + linger: Duration::from_millis(1), + ..Default::default() + }) + .await + .unwrap() +} diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index f9fdb5b574a4..f3a747dcd02f 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -6,7 +6,7 @@ license.workspace = true [features] default = [] -test = ["common-test-util", "log-store"] +test = ["common-test-util", "log-store", "rstest", "rstest_reuse", "rskafka"] [lints] workspace = true @@ -37,6 +37,7 @@ datafusion.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datatypes.workspace = true +dotenv.workspace = true futures.workspace = true humantime-serde.workspace = true index.workspace = true @@ -54,6 +55,9 @@ prost.workspace = true puffin.workspace = true rand.workspace = true regex = "1.5" +rskafka = { workspace = true, optional = true } +rstest = { workspace = true, optional = true } +rstest_reuse = { workspace = true, optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true @@ -73,6 +77,9 @@ common-test-util.workspace = true criterion = "0.4" log-store.workspace = true object-store = { workspace = true, features = ["services-memory"] } +rskafka.workspace = true +rstest.workspace = true +rstest_reuse.workspace = true toml.workspace = true [[bench]] diff --git a/src/mito2/src/engine/basic_test.rs b/src/mito2/src/engine/basic_test.rs index 9a5cca209b7a..439b3a2fe0d3 100644 --- a/src/mito2/src/engine/basic_test.rs +++ b/src/mito2/src/engine/basic_test.rs @@ -22,8 +22,11 @@ use common_base::readable_size::ReadableSize; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_recordbatch::RecordBatches; +use common_wal::options::WAL_OPTIONS_KEY; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use rstest::rstest; +use rstest_reuse::{self, apply}; use store_api::metadata::ColumnMetadata; use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest}; use store_api::storage::RegionId; @@ -32,7 +35,9 @@ use super::*; use crate::region::version::VersionControlData; use crate::test_util::{ build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema, - flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv, + flush_region, kafka_log_store_factory, multiple_log_store_factories, + prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, reopen_region, + rows_schema, CreateRequestBuilder, LogStoreFactory, TestEnv, }; #[tokio::test] @@ -83,14 +88,24 @@ async fn test_write_to_region() { put_rows(&engine, region_id, rows).await; } -#[tokio::test] -async fn test_region_replay() { +#[apply(multiple_log_store_factories)] + +async fn test_region_replay(factory: Option) { + use common_wal::options::{KafkaWalOptions, WalOptions}; + common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::with_prefix("region-replay"); + let Some(factory) = factory else { + return; + }; + let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); + let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); @@ -113,13 +128,24 @@ async fn test_region_replay() { let engine = env.reopen_engine(engine, MitoConfig::default()).await; + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + let result = engine .handle_request( region_id, RegionRequest::Open(RegionOpenRequest { engine: String::new(), region_dir, - options: HashMap::default(), + options, skip_wal_replay: false, }), ) diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index 89d44dc76129..52fb46dfab6a 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -21,6 +21,9 @@ use std::time::Duration; use api::v1::Rows; use common_recordbatch::RecordBatches; use common_time::util::current_time_millis; +use common_wal::options::WAL_OPTIONS_KEY; +use rstest::rstest; +use rstest_reuse::{self, apply}; use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -28,8 +31,10 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::engine::listener::{FlushListener, StallListener}; use crate::test_util::{ - build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema, - CreateRequestBuilder, MockWriteBufferManager, TestEnv, + build_rows, build_rows_for_key, flush_region, kafka_log_store_factory, + multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows, + raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder, + LogStoreFactory, MockWriteBufferManager, TestEnv, }; use crate::time_provider::TimeProvider; use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS; @@ -231,13 +236,25 @@ async fn test_flush_empty() { assert_eq!(expected, batches.pretty_print().unwrap()); } -#[tokio::test] -async fn test_flush_reopen_region() { - let mut env = TestEnv::new(); +#[apply(multiple_log_store_factories)] +async fn test_flush_reopen_region(factory: Option) { + use std::collections::HashMap; + + use common_wal::options::{KafkaWalOptions, WalOptions}; + + common_telemetry::init_default_ut_logging(); + let Some(factory) = factory else { + return; + }; + + let mut env = TestEnv::new().with_log_store_factory(factory.clone()); let engine = env.create_engine(MitoConfig::default()).await; let region_id = RegionId::new(1, 1); - let request = CreateRequestBuilder::new().build(); + let topic = prepare_test_for_kafka_log_store(&factory).await; + let request = CreateRequestBuilder::new() + .kafka_topic(topic.clone()) + .build(); let region_dir = request.region_dir.clone(); let column_schemas = rows_schema(&request); @@ -263,7 +280,17 @@ async fn test_flush_reopen_region() { }; check_region(); - reopen_region(&engine, region_id, region_dir, true, Default::default()).await; + let mut options = HashMap::new(); + if let Some(topic) = &topic { + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + })) + .unwrap(), + ); + }; + reopen_region(&engine, region_id, region_dir, true, options).await; check_region(); // Puts again. diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index d7c671962c03..3ceff8a297b2 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -33,16 +33,23 @@ use api::v1::value::ValueData; use api::v1::{OpType, Row, Rows, SemanticType}; use common_base::readable_size::ReadableSize; use common_datasource::compression::CompressionType; +use common_telemetry::warn; use common_test_util::temp_dir::{create_temp_dir, TempDir}; +use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY}; use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; +use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::services::Fs; use object_store::util::join_dir; use object_store::ObjectStore; +use rskafka::client::partition::{Compression, UnknownTopicHandling}; +use rskafka::client::{Client, ClientBuilder}; +use rskafka::record::Record; +use rstest_reuse::template; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::RegionEngine; use store_api::region_request::{ @@ -75,11 +82,110 @@ pub(crate) fn new_noop_file_purger() -> FilePurgerRef { Arc::new(NoopFilePurger {}) } +pub(crate) fn raft_engine_log_store_factory() -> Option { + Some(LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory)) +} + +pub(crate) fn kafka_log_store_factory() -> Option { + let _ = dotenv::dotenv(); + let Ok(broker_endpoints) = std::env::var("GT_KAFKA_ENDPOINTS") else { + warn!("env GT_KAFKA_ENDPOINTS not found"); + return None; + }; + + let broker_endpoints = broker_endpoints + .split(',') + .map(|s| s.trim().to_string()) + .collect::>(); + + Some(LogStoreFactory::Kafka(KafkaLogStoreFactory { + broker_endpoints, + })) +} + +#[template] +#[rstest] +#[case::with_raft_engine(raft_engine_log_store_factory())] +#[case::with_kafka(kafka_log_store_factory())] +#[tokio::test] +pub(crate) fn multiple_log_store_factories(#[case] factory: Option) {} + +#[derive(Clone)] +pub(crate) struct RaftEngineLogStoreFactory; + +impl RaftEngineLogStoreFactory { + async fn create_log_store>(&self, wal_path: P) -> RaftEngineLogStore { + log_store_util::create_tmp_local_file_log_store(wal_path).await + } +} + +pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) -> Option { + if let LogStoreFactory::Kafka(factory) = factory { + let topic = uuid::Uuid::new_v4().to_string(); + let client = factory.client().await; + append_noop_record(&client, &topic).await; + + Some(topic) + } else { + None + } +} + +pub(crate) async fn append_noop_record(client: &Client, topic: &str) { + let partition_client = client + .partition_client(topic, 0, UnknownTopicHandling::Retry) + .await + .unwrap(); + + partition_client + .produce( + vec![Record { + key: None, + value: None, + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }], + Compression::NoCompression, + ) + .await + .unwrap(); +} +#[derive(Clone)] +pub(crate) struct KafkaLogStoreFactory { + broker_endpoints: Vec, +} + +impl KafkaLogStoreFactory { + async fn create_log_store(&self) -> KafkaLogStore { + log_store_util::create_kafka_log_store(self.broker_endpoints.clone()).await + } + + pub(crate) async fn client(&self) -> Client { + ClientBuilder::new(self.broker_endpoints.clone()) + .build() + .await + .unwrap() + } +} + +#[derive(Clone)] +pub(crate) enum LogStoreFactory { + RaftEngine(RaftEngineLogStoreFactory), + Kafka(KafkaLogStoreFactory), +} + +#[derive(Clone)] +pub(crate) enum LogStoreImpl { + RaftEngine(Arc), + Kafka(Arc), +} + /// Env to test mito engine. pub struct TestEnv { /// Path to store data. data_home: TempDir, - logstore: Option>, + log_store: Option, + log_store_factory: LogStoreFactory, object_store_manager: Option, } @@ -94,7 +200,8 @@ impl TestEnv { pub fn new() -> TestEnv { TestEnv { data_home: create_temp_dir(""), - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } @@ -103,7 +210,8 @@ impl TestEnv { pub fn with_prefix(prefix: &str) -> TestEnv { TestEnv { data_home: create_temp_dir(prefix), - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } @@ -112,13 +220,16 @@ impl TestEnv { pub fn with_data_home(data_home: TempDir) -> TestEnv { TestEnv { data_home, - logstore: None, + log_store: None, + log_store_factory: LogStoreFactory::RaftEngine(RaftEngineLogStoreFactory), object_store_manager: None, } } - pub fn get_logstore(&self) -> Option> { - self.logstore.clone() + /// Overwrites the original `log_store_factory`. + pub(crate) fn with_log_store_factory(mut self, log_store_factory: LogStoreFactory) -> TestEnv { + self.log_store_factory = log_store_factory; + self } pub fn get_object_store(&self) -> Option { @@ -139,24 +250,41 @@ impl TestEnv { pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new(&data_home, config, logstore, object_store_manager) - .await - .unwrap() + + match log_store { + LogStoreImpl::RaftEngine(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + } } /// Creates a new engine with specific config and existing logstore and object store manager. pub async fn create_follower_engine(&mut self, config: MitoConfig) -> MitoEngine { - let logstore = self.logstore.as_ref().unwrap().clone(); let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); let data_home = self.data_home().display().to_string(); - MitoEngine::new(&data_home, config, logstore, object_store_manager) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + MitoEngine::new(&data_home, config, log_store, object_store_manager) + .await + .unwrap() + } + } } /// Creates a new engine with specific config and manager/listener/purge_scheduler under this env. @@ -168,24 +296,36 @@ impl TestEnv { ) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - Arc::new(StdTimeProvider), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + } } pub async fn create_engine_with_multiple_object_stores( @@ -195,7 +335,8 @@ impl TestEnv { listener: Option, custom_storage_names: &[&str], ) -> MitoEngine { - let (logstore, mut object_store_manager) = self.create_log_and_object_store_manager().await; + let (log_store, mut object_store_manager) = + self.create_log_and_object_store_manager().await; for storage_name in custom_storage_names { let data_path = self .data_home @@ -210,23 +351,35 @@ impl TestEnv { let object_store = ObjectStore::new(builder).unwrap().finish(); object_store_manager.add(storage_name, object_store); } - let logstore = Arc::new(logstore); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - Arc::new(StdTimeProvider), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + Arc::new(StdTimeProvider), + ) + .await + .unwrap(), + } } /// Creates a new engine with specific config and manager/listener/time provider under this env. @@ -239,50 +392,82 @@ impl TestEnv { ) -> MitoEngine { let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await; - let logstore = Arc::new(log_store); let object_store_manager = Arc::new(object_store_manager); - self.logstore = Some(logstore.clone()); + self.log_store = Some(log_store.clone()); self.object_store_manager = Some(object_store_manager.clone()); let data_home = self.data_home().display().to_string(); - MitoEngine::new_for_test( - &data_home, - config, - logstore, - object_store_manager, - manager, - listener, - time_provider.clone(), - ) - .await - .unwrap() + match log_store { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + time_provider.clone(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new_for_test( + &data_home, + config, + log_store, + object_store_manager, + manager, + listener, + time_provider.clone(), + ) + .await + .unwrap(), + } } /// Reopen the engine. pub async fn reopen_engine(&mut self, engine: MitoEngine, config: MitoConfig) -> MitoEngine { engine.stop().await.unwrap(); - MitoEngine::new( - &self.data_home().display().to_string(), - config, - self.logstore.clone().unwrap(), - self.object_store_manager.clone().unwrap(), - ) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + } } /// Open the engine. pub async fn open_engine(&mut self, config: MitoConfig) -> MitoEngine { - MitoEngine::new( - &self.data_home().display().to_string(), - config, - self.logstore.clone().unwrap(), - self.object_store_manager.clone().unwrap(), - ) - .await - .unwrap() + match self.log_store.as_ref().unwrap().clone() { + LogStoreImpl::RaftEngine(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + LogStoreImpl::Kafka(log_store) => MitoEngine::new( + &self.data_home().display().to_string(), + config, + log_store, + self.object_store_manager.clone().unwrap(), + ) + .await + .unwrap(), + } } /// Only initializes the object store manager, returns the default object store. @@ -297,25 +482,44 @@ impl TestEnv { let data_home = self.data_home().display().to_string(); config.sanitize(&data_home).unwrap(); - WorkerGroup::start( - Arc::new(config), - Arc::new(log_store), - Arc::new(object_store_manager), - ) - .await - .unwrap() + + match log_store { + LogStoreImpl::RaftEngine(log_store) => { + WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) + .await + .unwrap() + } + LogStoreImpl::Kafka(log_store) => { + WorkerGroup::start(Arc::new(config), log_store, Arc::new(object_store_manager)) + .await + .unwrap() + } + } } /// Returns the log store and object store manager. - async fn create_log_and_object_store_manager( - &self, - ) -> (RaftEngineLogStore, ObjectStoreManager) { + async fn create_log_and_object_store_manager(&self) -> (LogStoreImpl, ObjectStoreManager) { let data_home = self.data_home.path(); let wal_path = data_home.join("wal"); - let log_store = log_store_util::create_tmp_local_file_log_store(&wal_path).await; - let object_store_manager = self.create_object_store_manager(); - (log_store, object_store_manager) + + match &self.log_store_factory { + LogStoreFactory::RaftEngine(factory) => { + let log_store = factory.create_log_store(wal_path).await; + ( + LogStoreImpl::RaftEngine(Arc::new(log_store)), + object_store_manager, + ) + } + LogStoreFactory::Kafka(factory) => { + let log_store = factory.create_log_store().await; + + ( + LogStoreImpl::Kafka(Arc::new(log_store)), + object_store_manager, + ) + } + } } fn create_object_store_manager(&self) -> ObjectStoreManager { @@ -399,6 +603,8 @@ pub struct CreateRequestBuilder { all_not_null: bool, engine: String, ts_type: ConcreteDataType, + /// kafka topic name + kafka_topic: Option, } impl Default for CreateRequestBuilder { @@ -412,6 +618,7 @@ impl Default for CreateRequestBuilder { all_not_null: false, engine: MITO_ENGINE_NAME.to_string(), ts_type: ConcreteDataType::timestamp_millisecond_datatype(), + kafka_topic: None, } } } @@ -464,6 +671,12 @@ impl CreateRequestBuilder { self } + #[must_use] + pub fn kafka_topic(mut self, topic: Option) -> Self { + self.kafka_topic = topic; + self + } + pub fn build(&self) -> RegionCreateRequest { let mut column_id = 0; let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1); @@ -504,12 +717,21 @@ impl CreateRequestBuilder { semantic_type: SemanticType::Timestamp, column_id, }); - + let mut options = self.options.clone(); + if let Some(topic) = &self.kafka_topic { + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: topic.to_string(), + }); + options.insert( + WAL_OPTIONS_KEY.to_string(), + serde_json::to_string(&wal_options).unwrap(), + ); + } RegionCreateRequest { engine: self.engine.to_string(), column_metadatas, primary_key: self.primary_key.clone().unwrap_or(primary_key), - options: self.options.clone(), + options, region_dir: self.region_dir.clone(), } } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 2473cfc320ad..887f04a3b218 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -53,8 +53,8 @@ object-store.workspace = true operator.workspace = true prost.workspace = true query.workspace = true -rstest = "0.17" -rstest_reuse = "0.5" +rstest.workspace = true +rstest_reuse.workspace = true serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index d3e700151345..5def9351d0c9 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -26,8 +26,3 @@ pub mod test_util; pub mod standalone; #[cfg(test)] mod tests; - -#[cfg(test)] -// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate -#[allow(clippy::single_component_path_imports)] -use rstest_reuse;