Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(remote_wal): add StandaloneWalConfig #3002

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ read_batch_size = 128
sync_write = false

# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9090"]
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# linger = "200ms"
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
Expand Down
4 changes: 2 additions & 2 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ provider = "raft_engine"
# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default.
# broker_endpoints = ["127.0.0.1:9092"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
Expand Down
30 changes: 24 additions & 6 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,40 @@ enable = true
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

# WAL options.
[wal]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default.
# broker_endpoints = ["127.0.0.1:9092"]

# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3

# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"

# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_meta::wal::WalConfig as MetaSrvWalConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -37,6 +38,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetaSrvWalConfig,
}

impl From<MixOptions> for FrontendOptions {
Expand Down
18 changes: 11 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand Down Expand Up @@ -104,7 +105,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub wal: StandaloneWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -127,7 +128,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
wal: StandaloneWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -166,7 +167,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal,
wal: self.wal.into(),
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -338,7 +339,8 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let datanode = opts.datanode_options();
let wal_meta = opts.wal.clone().into();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
procedure,
Expand All @@ -347,6 +349,7 @@ impl StartCommand {
frontend,
datanode,
logging,
wal_meta,
})))
}

Expand Down Expand Up @@ -392,9 +395,8 @@ impl StartCommand {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -479,6 +481,7 @@ mod tests {

use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
Expand Down Expand Up @@ -529,6 +532,7 @@ mod tests {
purge_interval = "10m"
read_batch_size = 128
sync_write = false

[storage]
data_home = "/tmp/greptimedb/"
type = "File"
Expand Down
54 changes: 39 additions & 15 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::kafka::{
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
};
pub use crate::wal::raft_engine::RaftEngineConfig;

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
Expand All @@ -27,30 +29,49 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options";

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider")]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
#[serde(rename = "raft_engine")]
RaftEngine(RaftEngineConfig),
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}

impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base),
}
}
}

impl Default for WalConfig {
fn default() -> Self {
WalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
StandaloneWalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
Expand All @@ -64,31 +85,34 @@ mod tests {
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;

use crate::wal::kafka::KafkaBackoffConfig;
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};

#[test]
fn test_serde_kafka_config() {
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9090"]
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
produce_record_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(decoded, expected);
}
Expand Down
Loading
Loading