Skip to content

Commit

Permalink
refactor(remote_wal): add StandaloneWalConfig (#3002)
Browse files Browse the repository at this point in the history
* feat: integrate remote wal to standalone

* fix: test

* refactor: refactor standalone wal config

* chore: change default kafka port to 9092

* chore: apply suggestions from CR

---------

Co-authored-by: niebayes <[email protected]>
  • Loading branch information
WenyXu and niebayes authored Dec 26, 2023
1 parent 95f172e commit c902d43
Show file tree
Hide file tree
Showing 15 changed files with 216 additions and 105 deletions.
4 changes: 2 additions & 2 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ read_batch_size = 128
sync_write = false

# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9090"]
# broker_endpoints = ["127.0.0.1:9092"]
# max_batch_size = "4MB"
# linger = "200ms"
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
Expand Down
4 changes: 2 additions & 2 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ provider = "raft_engine"
# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default.
# broker_endpoints = ["127.0.0.1:9092"]
# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
Expand Down
30 changes: 24 additions & 6 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,40 @@ enable = true
# Whether to enable Prometheus remote write and read in HTTP API, true by default.
enable = true

# WAL options.
[wal]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
# - "raft_engine" (default)
# - "kafka"
provider = "raft_engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9092"] by default.
# broker_endpoints = ["127.0.0.1:9092"]

# Number of topics to be created upon start.
# num_topics = 64
# Topic selector type.
# Available selector types:
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3

# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# produce_record_timeout = "100ms"
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"

# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use clap::ArgMatches;
use common_config::KvBackendConfig;
use common_meta::wal::WalConfig as MetaSrvWalConfig;
use common_telemetry::logging::{LoggingOptions, TracingOptions};
use config::{Config, Environment, File, FileFormat};
use datanode::config::{DatanodeOptions, ProcedureConfig};
Expand All @@ -37,6 +38,7 @@ pub struct MixOptions {
pub frontend: FrontendOptions,
pub datanode: DatanodeOptions,
pub logging: LoggingOptions,
pub wal_meta: MetaSrvWalConfig,
}

impl From<MixOptions> for FrontendOptions {
Expand Down
18 changes: 11 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use std::{fs, path};
use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand Down Expand Up @@ -104,7 +105,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub wal: StandaloneWalConfig,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -127,7 +128,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
wal: StandaloneWalConfig::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -166,7 +167,7 @@ impl StandaloneOptions {
DatanodeOptions {
node_id: Some(0),
enable_telemetry: self.enable_telemetry,
wal: self.wal,
wal: self.wal.into(),
storage: self.storage,
region_engine: self.region_engine,
rpc_addr: self.grpc.addr,
Expand Down Expand Up @@ -338,7 +339,8 @@ impl StartCommand {
let procedure = opts.procedure.clone();
let frontend = opts.clone().frontend_options();
let logging = opts.logging.clone();
let datanode = opts.datanode_options();
let wal_meta = opts.wal.clone().into();
let datanode = opts.datanode_options().clone();

Ok(Options::Standalone(Box::new(MixOptions {
procedure,
Expand All @@ -347,6 +349,7 @@ impl StartCommand {
frontend,
datanode,
logging,
wal_meta,
})))
}

Expand Down Expand Up @@ -392,9 +395,8 @@ impl StartCommand {
.step(10)
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
Expand Down Expand Up @@ -479,6 +481,7 @@ mod tests {

use auth::{Identity, Password, UserProviderRef};
use common_base::readable_size::ReadableSize;
use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{FileConfig, GcsConfig};
use servers::Mode;
Expand Down Expand Up @@ -529,6 +532,7 @@ mod tests {
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[storage]
data_home = "/tmp/greptimedb/"
type = "File"
Expand Down
54 changes: 39 additions & 15 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ pub mod raft_engine;
use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::kafka::{
KafkaConfig, KafkaOptions as KafkaWalOptions, StandaloneKafkaConfig, Topic as KafkaWalTopic,
};
pub use crate::wal::raft_engine::RaftEngineConfig;

/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair
Expand All @@ -27,30 +29,49 @@ pub const WAL_OPTIONS_KEY: &str = "wal_options";

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider")]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum WalConfig {
#[serde(rename = "raft_engine")]
RaftEngine(RaftEngineConfig),
#[serde(rename = "kafka")]
Kafka(KafkaConfig),
}

impl From<StandaloneWalConfig> for WalConfig {
fn from(value: StandaloneWalConfig) -> Self {
match value {
StandaloneWalConfig::RaftEngine(config) => WalConfig::RaftEngine(config),
StandaloneWalConfig::Kafka(config) => WalConfig::Kafka(config.base),
}
}
}

impl Default for WalConfig {
fn default() -> Self {
WalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal config for datanode.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "provider", rename_all = "snake_case")]
pub enum StandaloneWalConfig {
RaftEngine(RaftEngineConfig),
Kafka(StandaloneKafkaConfig),
}

impl Default for StandaloneWalConfig {
fn default() -> Self {
StandaloneWalConfig::RaftEngine(RaftEngineConfig::default())
}
}

/// Wal options allocated to a region.
/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded
/// by datanode with `serde_json::from_str`.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
#[serde(tag = "wal.provider", rename_all = "snake_case")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft_engine")]
RaftEngine,
#[serde(rename = "kafka")]
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}
Expand All @@ -64,31 +85,34 @@ mod tests {
use common_base::readable_size::ReadableSize;
use rskafka::client::partition::Compression as RsKafkaCompression;

use crate::wal::kafka::KafkaBackoffConfig;
use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions};

#[test]
fn test_serde_kafka_config() {
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9090"]
broker_endpoints = ["127.0.0.1:9092"]
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
produce_record_timeout = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
produce_record_timeout: Duration::from_millis(100),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
max: Duration::from_secs(10),
base: 2,
deadline: Some(Duration::from_secs(60 * 5)),
},
};
assert_eq!(decoded, expected);
}
Expand Down
Loading

0 comments on commit c902d43

Please sign in to comment.