diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 342e10bfe19f..b17bf313d38a 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -50,10 +50,10 @@ read_batch_size = 128 sync_write = false # Kafka wal options, see `standalone.example.toml`. -# broker_endpoints = ["127.0.0.1:9090"] +# broker_endpoints = ["127.0.0.1:9092"] # max_batch_size = "4MB" # linger = "200ms" -# max_wait_time = "100ms" +# produce_record_timeout = "100ms" # backoff_init = "500ms" # backoff_max = "10s" # backoff_base = 2.0 diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 120f19255f3a..313ed4b10ac2 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -52,8 +52,8 @@ provider = "raft_engine" # There're none raft-engine wal config since meta srv only involves in remote wal currently. # Kafka wal config. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. -# broker_endpoints = ["127.0.0.1:9090"] +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. +# broker_endpoints = ["127.0.0.1:9092"] # Number of topics to be created upon start. # num_topics = 64 # Topic selector type. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 713f8ef79edb..a7a7ea0aedf6 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -80,22 +80,40 @@ enable = true # Whether to enable Prometheus remote write and read in HTTP API, true by default. enable = true -# WAL options. [wal] # Available wal providers: -# - "RaftEngine" (default) -# - "Kafka" +# - "raft_engine" (default) +# - "kafka" provider = "raft_engine" +# There're none raft-engine wal config since meta srv only involves in remote wal currently. + # Kafka wal options. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. -# broker_endpoints = ["127.0.0.1:9090"] +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default. +# broker_endpoints = ["127.0.0.1:9092"] + +# Number of topics to be created upon start. +# num_topics = 64 +# Topic selector type. +# Available selector types: +# - "round_robin" (default) +# selector_type = "round_robin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# topic_name_prefix = "greptimedb_wal_topic" +# Number of partitions per topic. +# num_partitions = 1 +# Expected number of replicas of each partition. +# replication_factor = 3 + # The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" # The linger duration of a kafka batch producer. # linger = "200ms" # The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. -# max_wait_time = "100ms" +# produce_record_timeout = "100ms" +# Above which a topic creation operation will be cancelled. +# create_topic_timeout = "30s" + # The initial backoff for kafka clients. # backoff_init = "500ms" # The maximum backoff for kafka clients. diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 4c6b04752a33..9270d8d52a6b 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -14,6 +14,7 @@ use clap::ArgMatches; use common_config::KvBackendConfig; +use common_meta::wal::WalConfig as MetaSrvWalConfig; use common_telemetry::logging::{LoggingOptions, TracingOptions}; use config::{Config, Environment, File, FileFormat}; use datanode::config::{DatanodeOptions, ProcedureConfig}; @@ -37,6 +38,7 @@ pub struct MixOptions { pub frontend: FrontendOptions, pub datanode: DatanodeOptions, pub logging: LoggingOptions, + pub wal_meta: MetaSrvWalConfig, } impl From for FrontendOptions { diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index b283dc59fb2e..812b64d301e1 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -18,7 +18,8 @@ use std::{fs, path}; use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; -use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; +use common_config::wal::StandaloneWalConfig; +use common_config::{metadata_store_dir, KvBackendConfig}; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -104,7 +105,7 @@ pub struct StandaloneOptions { pub opentsdb: OpentsdbOptions, pub influxdb: InfluxdbOptions, pub prom_store: PromStoreOptions, - pub wal: WalConfig, + pub wal: StandaloneWalConfig, pub storage: StorageConfig, pub metadata_store: KvBackendConfig, pub procedure: ProcedureConfig, @@ -127,7 +128,7 @@ impl Default for StandaloneOptions { opentsdb: OpentsdbOptions::default(), influxdb: InfluxdbOptions::default(), prom_store: PromStoreOptions::default(), - wal: WalConfig::default(), + wal: StandaloneWalConfig::default(), storage: StorageConfig::default(), metadata_store: KvBackendConfig::default(), procedure: ProcedureConfig::default(), @@ -166,7 +167,7 @@ impl StandaloneOptions { DatanodeOptions { node_id: Some(0), enable_telemetry: self.enable_telemetry, - wal: self.wal, + wal: self.wal.into(), storage: self.storage, region_engine: self.region_engine, rpc_addr: self.grpc.addr, @@ -338,7 +339,8 @@ impl StartCommand { let procedure = opts.procedure.clone(); let frontend = opts.clone().frontend_options(); let logging = opts.logging.clone(); - let datanode = opts.datanode_options(); + let wal_meta = opts.wal.clone().into(); + let datanode = opts.datanode_options().clone(); Ok(Options::Standalone(Box::new(MixOptions { procedure, @@ -347,6 +349,7 @@ impl StartCommand { frontend, datanode, logging, + wal_meta, }))) } @@ -392,9 +395,8 @@ impl StartCommand { .step(10) .build(), ); - // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - common_meta::wal::WalConfig::default(), + opts.wal_meta.clone(), kv_backend.clone(), )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( @@ -479,6 +481,7 @@ mod tests { use auth::{Identity, Password, UserProviderRef}; use common_base::readable_size::ReadableSize; + use common_config::WalConfig; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{FileConfig, GcsConfig}; use servers::Mode; @@ -529,6 +532,7 @@ mod tests { purge_interval = "10m" read_batch_size = 128 sync_write = false + [storage] data_home = "/tmp/greptimedb/" type = "File" diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 60128d14b35e..f9c492758e63 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -18,7 +18,9 @@ pub mod raft_engine; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; -pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; +pub use crate::wal::kafka::{ + KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic, +}; pub use crate::wal::raft_engine::RaftEngineConfig; /// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair @@ -27,30 +29,49 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options"; /// Wal config for datanode. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(tag = "provider")] +#[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { - #[serde(rename = "raft_engine")] RaftEngine(RaftEngineConfig), - #[serde(rename = "kafka")] Kafka(KafkaConfig), } +impl From for WalConfig { + fn from(value: StandaloneWalConfig) -> Self { + match value { + StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config), + StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base), + } + } +} + impl Default for WalConfig { fn default() -> Self { WalConfig::RaftEngine(RaftEngineConfig::default()) } } +/// Wal config for datanode. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "provider", rename_all = "snake_case")] +pub enum StandaloneWalConfig { + RaftEngine(RaftEngineConfig), + Kafka(StandaloneKafkaConfig), +} + +impl Default for StandaloneWalConfig { + fn default() -> Self { + StandaloneWalConfig::RaftEngine(RaftEngineConfig::default()) + } +} + /// Wal options allocated to a region. /// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded /// by datanode with `serde_json::from_str`. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -#[serde(tag = "wal.provider")] +#[serde(tag = "wal.provider", rename_all = "snake_case")] pub enum WalOptions { #[default] - #[serde(rename = "raft_engine")] RaftEngine, - #[serde(rename = "kafka")] #[serde(with = "prefix_wal_kafka")] Kafka(KafkaWalOptions), } @@ -64,15 +85,16 @@ mod tests { use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression as RsKafkaCompression; + use crate::wal::kafka::KafkaBackoffConfig; use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions}; #[test] fn test_serde_kafka_config() { let toml_str = r#" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] max_batch_size = "4MB" linger = "200ms" - max_wait_time = "100ms" + produce_record_timeout = "100ms" backoff_init = "500ms" backoff_max = "10s" backoff_base = 2 @@ -80,15 +102,17 @@ mod tests { "#; let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), - max_wait_time: Duration::from_millis(100), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), + produce_record_timeout: Duration::from_millis(100), + backoff: KafkaBackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, }; assert_eq!(decoded, expected); } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index eb6795054141..d1d1a615a370 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -17,12 +17,21 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use rskafka::client::partition::Compression as RsKafkaCompression; use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; /// Topic name prefix. pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; /// Kafka wal topic. pub type Topic = String; +/// The type of the topic selector, i.e. with which strategy to select a topic. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum TopicSelectorType { + #[default] + RoundRobin, +} + /// Configurations for kafka wal. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(default)] @@ -40,34 +49,86 @@ pub struct KafkaConfig { pub linger: Duration, /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. #[serde(with = "humantime_serde")] - pub max_wait_time: Duration, + pub produce_record_timeout: Duration, + /// The backoff config. + #[serde(flatten, with = "kafka_backoff")] + pub backoff: KafkaBackoffConfig, +} + +impl Default for KafkaConfig { + fn default() -> Self { + Self { + broker_endpoints: vec!["127.0.0.1:9092".to_string()], + compression: RsKafkaCompression::NoCompression, + max_batch_size: ReadableSize::mb(4), + linger: Duration::from_millis(200), + produce_record_timeout: Duration::from_millis(100), + backoff: KafkaBackoffConfig::default(), + } + } +} + +with_prefix!(pub kafka_backoff "backoff_"); + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaBackoffConfig { /// The initial backoff for kafka clients. #[serde(with = "humantime_serde")] - pub backoff_init: Duration, + pub init: Duration, /// The maximum backoff for kafka clients. #[serde(with = "humantime_serde")] - pub backoff_max: Duration, + pub max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. // Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait. - pub backoff_base: u32, + pub base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. #[serde(with = "humantime_serde")] - pub backoff_deadline: Option, + pub deadline: Option, } -impl Default for KafkaConfig { +impl Default for KafkaBackoffConfig { fn default() -> Self { Self { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], - compression: RsKafkaCompression::NoCompression, - max_batch_size: ReadableSize::mb(4), - linger: Duration::from_millis(200), - max_wait_time: Duration::from_millis(100), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct StandaloneKafkaConfig { + #[serde(flatten)] + pub base: KafkaConfig, + /// Number of topics to be created upon start. + pub num_topics: usize, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// Topic name prefix. + pub topic_name_prefix: String, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The replication factor of each topic. + pub replication_factor: i16, + /// Above which a topic creation operation will be cancelled. + #[serde(with = "humantime_serde")] + pub create_topic_timeout: Duration, +} + +impl Default for StandaloneKafkaConfig { + fn default() -> Self { + Self { + base: KafkaConfig::default(), + num_topics: 64, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_wal_topic".to_string(), + num_partitions: 1, + replication_factor: 3, + create_topic_timeout: Duration::from_secs(30), } } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index f34a5224a87e..1e394e847985 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -17,6 +17,7 @@ pub mod options_allocator; use std::collections::HashMap; +use common_config::wal::StandaloneWalConfig; use serde::{Deserialize, Serialize}; use serde_with::with_prefix; @@ -29,21 +30,38 @@ pub use crate::wal::options_allocator::{ /// Wal config for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] -#[serde(tag = "provider")] +#[serde(tag = "provider", rename_all = "snake_case")] pub enum WalConfig { #[default] - #[serde(rename = "raft_engine")] RaftEngine, - #[serde(rename = "kafka")] Kafka(KafkaConfig), } +impl From for WalConfig { + fn from(value: StandaloneWalConfig) -> Self { + match value { + StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine, + StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(KafkaConfig { + broker_endpoints: config.base.broker_endpoints, + num_topics: config.num_topics, + selector_type: config.selector_type, + topic_name_prefix: config.topic_name_prefix, + num_partitions: config.num_partitions, + replication_factor: config.replication_factor, + create_topic_timeout: config.create_topic_timeout, + backoff: config.base.backoff, + }), + } + } +} + #[cfg(test)] mod tests { use std::time::Duration; + use common_config::wal::kafka::{KafkaBackoffConfig, TopicSelectorType}; + use super::*; - use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType; #[test] fn test_serde_wal_config() { @@ -57,7 +75,7 @@ mod tests { // Test serde raft-engine wal config with extra other wal config. let toml_str = r#" provider = "raft_engine" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] num_topics = 32 "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); @@ -66,7 +84,7 @@ mod tests { // Test serde kafka wal config. let toml_str = r#" provider = "kafka" - broker_endpoints = ["127.0.0.1:9090"] + broker_endpoints = ["127.0.0.1:9092"] num_topics = 32 selector_type = "round_robin" topic_name_prefix = "greptimedb_wal_topic" @@ -80,20 +98,20 @@ mod tests { "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); let expected_kafka_config = KafkaConfig { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], num_topics: 32, - selector_type: KafkaTopicSelectorType::RoundRobin, + selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), + backoff: KafkaBackoffConfig { + init: Duration::from_millis(500), + max: Duration::from_secs(10), + base: 2, + deadline: Some(Duration::from_secs(60 * 5)), + }, }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); } - - // TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed. } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 173a74662d95..8c9cdea92594 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -18,11 +18,12 @@ pub mod topic_selector; use std::time::Duration; +use common_config::wal::kafka::{kafka_backoff, KafkaBackoffConfig, TopicSelectorType}; +use common_config::wal::StandaloneWalConfig; use serde::{Deserialize, Serialize}; pub use crate::wal::kafka::topic::Topic; pub use crate::wal::kafka::topic_manager::TopicManager; -use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType; /// Configurations for kafka wal. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] @@ -42,36 +43,22 @@ pub struct KafkaConfig { /// Above which a topic creation operation will be cancelled. #[serde(with = "humantime_serde")] pub create_topic_timeout: Duration, - /// The initial backoff for kafka clients. - #[serde(with = "humantime_serde")] - pub backoff_init: Duration, - /// The maximum backoff for kafka clients. - #[serde(with = "humantime_serde")] - pub backoff_max: Duration, - /// Exponential backoff rate, i.e. next backoff = base * current backoff. - // Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32, - // and we want to unify their types. - pub backoff_base: u32, - /// Stop reconnecting if the total wait time reaches the deadline. - /// If it's None, the reconnecting won't terminate. - #[serde(with = "humantime_serde")] - pub backoff_deadline: Option, + /// The backoff config. + #[serde(flatten, with = "kafka_backoff")] + pub backoff: KafkaBackoffConfig, } impl Default for KafkaConfig { fn default() -> Self { Self { - broker_endpoints: vec!["127.0.0.1:9090".to_string()], + broker_endpoints: vec!["127.0.0.1:9092".to_string()], num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), - backoff_init: Duration::from_millis(500), - backoff_max: Duration::from_secs(10), - backoff_base: 2, - backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins + backoff: KafkaBackoffConfig::default(), } } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 995d8c1393d0..1e7964f85601 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::sync::Arc; use std::time::Duration; +use common_config::wal::kafka::TopicSelectorType; use common_telemetry::debug; use rskafka::client::ClientBuilder; use rskafka::BackoffConfig; @@ -28,7 +29,7 @@ use crate::error::{ use crate::kv_backend::KvBackendRef; use crate::rpc::store::PutRequest; use crate::wal::kafka::topic::Topic; -use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef}; +use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, TopicSelectorRef}; use crate::wal::kafka::KafkaConfig; const CREATED_TOPICS_KEY: &str = "__created_wal_topics/kafka/"; @@ -51,7 +52,7 @@ impl TopicManager { .collect::>(); let selector = match config.selector_type { - SelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), + TopicSelectorType::RoundRobin => RoundRobinTopicSelector::with_shuffle(), }; Self { @@ -103,10 +104,10 @@ impl TopicManager { async fn try_create_topics(&self, topics: &[Topic], to_be_created: &[usize]) -> Result<()> { // Builds an kafka controller client for creating topics. let backoff_config = BackoffConfig { - init_backoff: self.config.backoff_init, - max_backoff: self.config.backoff_max, - base: self.config.backoff_base as f64, - deadline: self.config.backoff_deadline, + init_backoff: self.config.backoff.init, + max_backoff: self.config.backoff.max, + base: self.config.backoff.base as f64, + deadline: self.config.backoff.deadline, }; let client = ClientBuilder::new(self.config.broker_endpoints.clone()) .backoff_config(backoff_config) @@ -130,7 +131,7 @@ impl TopicManager { ) }) .collect::>(); - // TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised. + // FIXME(niebayes): try to create an already-exist topic would raise an error. futures::future::try_join_all(tasks) .await .context(CreateKafkaWalTopicSnafu) diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs index 6764cadcc990..fe7517bfd0b5 100644 --- a/src/common/meta/src/wal/kafka/topic_selector.rs +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -22,14 +22,6 @@ use snafu::ensure; use crate::error::{EmptyTopicPoolSnafu, Result}; use crate::wal::kafka::topic::Topic; -/// The type of the topic selector, i.e. with which strategy to select a topic. -#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] -pub enum SelectorType { - #[default] - #[serde(rename = "round_robin")] - RoundRobin, -} - /// Controls topic selection. pub(crate) trait TopicSelector: Send + Sync { /// Selects a topic from the topic pool. diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 9aa27bf1b3fd..e272840201bb 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -75,10 +75,10 @@ impl ClientManager { pub(crate) async fn try_new(config: &KafkaConfig) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { - init_backoff: config.backoff_init, - max_backoff: config.backoff_max, - base: config.backoff_base as f64, - deadline: config.backoff_deadline, + init_backoff: config.backoff.init, + max_backoff: config.backoff.max, + base: config.backoff.base as f64, + deadline: config.backoff.deadline, }; let client = ClientBuilder::new(config.broker_endpoints.clone()) .backoff_config(backoff_config) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 4ff054712ff3..b9f06e06c723 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -135,7 +135,7 @@ impl LogStore for KafkaLogStore { let offset = Offset::try_from(entry_id)?.0; let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) - .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) + .with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32) .build(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 8cbd70260cde..14afcb2ca19a 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,7 +23,7 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; -use common_meta::wal::WalOptionsAllocator; +use common_meta::wal::{WalConfig as MetaSrvWalConfig, WalOptionsAllocator}; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; @@ -118,9 +118,9 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_meta = MetaSrvWalConfig::default(); let wal_options_allocator = Arc::new(WalOptionsAllocator::new( - common_meta::wal::WalConfig::default(), + wal_meta.clone(), kv_backend.clone(), )); let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( @@ -163,6 +163,7 @@ impl GreptimeDbStandaloneBuilder { frontend: FrontendOptions::default(), datanode: opts, logging: LoggingOptions::default(), + wal_meta, }, guard, } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 5a8f2aa4aaf7..0decb3821951 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -824,7 +824,10 @@ write_interval = "30s" [datanode.export_metrics.headers] [logging] -enable_otlp_tracing = false"#, +enable_otlp_tracing = false + +[wal_meta] +provider = "raft_engine""#, store_type, ); let body_text = drop_lines_with_inconsistent_results(res_get.text().await);