Skip to content

Commit

Permalink
replace wal config with wal options and update tests
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 30, 2023
1 parent 0c93a24 commit 7afcb66
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 34 deletions.
34 changes: 33 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,15 @@ connect_timeout = "1s"
# `TCP_NODELAY` option for accepted connections, true by default.
tcp_nodelay = true

# WAL options, see `standalone.example.toml`.
# WAL options.
[wal]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
provider = "Kafka"

# Raft engine wal options, see `standalone.example.toml`.
[wal.raft_engine_opts]
# WAL data directory
# dir = "/tmp/greptimedb/wal"
file_size = "256MB"
Expand All @@ -39,6 +46,31 @@ purge_interval = "10m"
read_batch_size = 128
sync_write = false

# Kafka wal options.
[wal.kafka_opts]
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
num_topics = 64
# Topic name prefix.
topic_name_prefix = "greptime_topic"
# Number of partitions per topic.
num_partitions = 1
# The compression algorithm used to compress log entries.
# Available compression algorithms:
# - NoCompression (default)
# - Gzip
# - Lz4
# - Snappy
# - Zstd
compression = "Lz4"
# The maximum log size an rskafka batch producer could buffer.
max_batch_size = "4MB"
# The linger duration of an rskafka batch producer.
linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
max_wait_time = "100ms"

# Storage options, see `standalone.example.toml`.
[storage]
# The working home directory.
Expand Down
61 changes: 50 additions & 11 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::Duration;

use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_config::wal::raft_engine::RaftEngineOptions;
use common_telemetry::logging;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
Expand Down Expand Up @@ -154,7 +155,11 @@ impl StartCommand {
}

if let Some(wal_dir) = &self.wal_dir {
opts.wal.dir = Some(wal_dir.clone());
opts.wal
.raft_engine_opts
.get_or_insert(RaftEngineOptions::default())
.dir
.replace(wal_dir.clone());
}

if let Some(http_addr) = &self.http_addr {
Expand Down Expand Up @@ -243,13 +248,26 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "RaftEngine"
[wal.raft_engine_opts]
dir = "/other/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[wal.kafka_opts]
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 64
topic_name_prefix = "greptime_topic"
num_partitions = 1
compression = "Lz4"
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
Expand All @@ -272,12 +290,13 @@ mod tests {

assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
assert_eq!("/other/wal", options.wal.dir.unwrap());

assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
assert!(!options.wal.sync_write);
let raft_engine_opts = options.wal.raft_engine_opts.unwrap();
assert_eq!("/other/wal", raft_engine_opts.dir.unwrap());
assert_eq!(Duration::from_secs(600), raft_engine_opts.purge_interval);
assert_eq!(1024 * 1024 * 1024, raft_engine_opts.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, raft_engine_opts.purge_threshold.0);
assert!(!raft_engine_opts.sync_write);

let HeartbeatOptions {
interval: heart_beat_interval,
Expand Down Expand Up @@ -379,11 +398,24 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "RaftEngine"
[wal.raft_engine_opts]
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
sync_write = false
[wal.kafka_opts]
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 64
topic_name_prefix = "greptime_topic"
num_partitions = 1
compression = "Lz4"
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
[storage]
type = "File"
data_home = "/tmp/greptimedb/"
Expand All @@ -398,20 +430,22 @@ mod tests {
temp_env::with_vars(
[
(
// wal.purge_interval = 1m
// wal.raft_engine_opts.purge_interval = 1m
[
env_prefix.to_string(),
"wal".to_uppercase(),
"raft_engine_opts".to_uppercase(),
"purge_interval".to_uppercase(),
]
.join(ENV_VAR_SEP),
Some("1m"),
),
(
// wal.read_batch_size = 100
// wal.raft_engine_opts.read_batch_size = 100
[
env_prefix.to_string(),
"wal".to_uppercase(),
"raft_engine_opts".to_uppercase(),
"read_batch_size".to_uppercase(),
]
.join(ENV_VAR_SEP),
Expand Down Expand Up @@ -443,7 +477,8 @@ mod tests {
};

// Should be read from env, env > default values.
assert_eq!(opts.wal.read_batch_size, 100,);
let raft_engine_opts = opts.wal.raft_engine_opts.unwrap();
assert_eq!(raft_engine_opts.read_batch_size, 100);
assert_eq!(
opts.meta_client.unwrap().metasrv_addrs,
vec![
Expand All @@ -454,10 +489,14 @@ mod tests {
);

// Should be read from config file, config file > env > default values.
assert_eq!(opts.wal.purge_interval, Duration::from_secs(60 * 10));
assert_eq!(
raft_engine_opts.purge_interval,
Duration::from_secs(60 * 10)
);

// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");
let wal_dir = raft_engine_opts.dir.unwrap();
assert_eq!(wal_dir, "/other/wal/dir");

// Should be default value.
assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr);
Expand Down
10 changes: 9 additions & 1 deletion src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;

use common_config::wal::WalProvider;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
Expand Down Expand Up @@ -232,6 +233,12 @@ pub enum Error {
#[snafu(source)]
error: std::net::AddrParseError,
},

#[snafu(display("Unexpected wal provider {:?}", wal_provider))]
UnexpectedWalProvider {
location: Location,
wal_provider: WalProvider,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -260,7 +267,8 @@ impl ErrorExt for Error {
| Error::CreateDir { .. }
| Error::EmptyResult { .. }
| Error::InvalidDatabaseName { .. }
| Error::ParseAddr { .. } => StatusCode::InvalidArguments,
| Error::ParseAddr { .. }
| Error::UnexpectedWalProvider { .. } => StatusCode::InvalidArguments,

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Expand Down
19 changes: 17 additions & 2 deletions src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,26 @@ mod tests {
tcp_nodelay = true
[wal]
provider = "RaftEngine"
[wal.raft_engine_opts]
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[wal.kafka_opts]
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 64
topic_name_prefix = "greptime_topic"
num_partitions = 1
compression = "Lz4"
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
[logging]
level = "debug"
dir = "/tmp/greptimedb/test/logs"
Expand Down Expand Up @@ -209,10 +222,11 @@ mod tests {
Some("mybucket"),
),
(
// wal.dir = /other/wal/dir
// wal.raft_engine_opts.dir = /other/wal/dir
[
env_prefix.to_string(),
"wal".to_uppercase(),
"raft_engine_opts".to_uppercase(),
"dir".to_uppercase(),
]
.join(ENV_VAR_SEP),
Expand Down Expand Up @@ -254,7 +268,8 @@ mod tests {
);

// Should be the values from config file, not environment variables.
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");
let wal_dir = opts.wal.raft_engine_opts.unwrap().dir.unwrap();
assert_eq!(wal_dir, "/tmp/greptimedb/wal");

// Should be default values.
assert_eq!(opts.node_id, None);
Expand Down
48 changes: 31 additions & 17 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use catalog::kvbackend::KvBackendCatalogManager;
use catalog::CatalogManagerRef;
use clap::Parser;
use common_base::Plugins;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_config::wal::{WalOptions, WalProvider};
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyKvCacheInvalidator;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureManagerRef;
Expand All @@ -39,12 +40,12 @@ use serde::{Deserialize, Serialize};
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitMetadataSnafu, Result, ShutdownDatanodeSnafu,
ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu,
StopProcedureManagerSnafu,
StopProcedureManagerSnafu, UnexpectedWalProviderSnafu,
};
use crate::options::{MixOptions, Options, TopLevelOptions};

Expand Down Expand Up @@ -95,7 +96,7 @@ pub struct StandaloneOptions {
pub opentsdb: OpentsdbOptions,
pub influxdb: InfluxdbOptions,
pub prom_store: PromStoreOptions,
pub wal: WalConfig,
pub wal: WalOptions,
pub storage: StorageConfig,
pub metadata_store: KvBackendConfig,
pub procedure: ProcedureConfig,
Expand All @@ -117,7 +118,7 @@ impl Default for StandaloneOptions {
opentsdb: OpentsdbOptions::default(),
influxdb: InfluxdbOptions::default(),
prom_store: PromStoreOptions::default(),
wal: WalConfig::default(),
wal: WalOptions::default(),
storage: StorageConfig::default(),
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
Expand Down Expand Up @@ -329,6 +330,13 @@ impl StartCommand {

let dn_opts = opts.datanode.clone();

ensure!(
dn_opts.wal.provider == WalProvider::RaftEngine,
UnexpectedWalProviderSnafu {
wal_provider: dn_opts.wal.provider
}
);

info!("Standalone start command: {:#?}", self);

info!("Building standalone instance with {opts:#?}");
Expand Down Expand Up @@ -460,26 +468,29 @@ mod tests {
enable_memory_catalog = true
[wal]
provider = "RaftEngine"
[wal.raft_engine_opts]
dir = "/tmp/greptimedb/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false
[wal.kafka_opts]
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 64
topic_name_prefix = "greptime_topic"
num_partitions = 1
compression = "Lz4"
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
[storage]
type = "S3"
access_key_id = "access_key_id"
secret_access_key = "secret_access_key"
[storage.compaction]
max_inflight_tasks = 3
max_files_in_level0 = 7
max_purge_tasks = 32
[storage.manifest]
checkpoint_margin = 9
gc_duration = '7s'
data_home = "/tmp/greptimedb/"
[http]
addr = "127.0.0.1:4000"
Expand Down Expand Up @@ -515,7 +526,10 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
assert_eq!(
"/tmp/greptimedb/test/wal",
dn_opts.wal.raft_engine_opts.unwrap().dir.unwrap()
);

match &dn_opts.storage.store {
datanode::config::ObjectStoreConfig::S3(s3_config) => {
Expand Down
3 changes: 1 addition & 2 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,7 @@ pub enum Error {
#[snafu(display("Failed to build a Kafka log store"))]
BuildKafkaLogStore {
location: Location,
#[snafu(source)]
error: log_store::error::Error,
source: BoxedError,
},
}

Expand Down

0 comments on commit 7afcb66

Please sign in to comment.