Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): add skeleton for remote wal related to datanode #2941

Merged
merged 26 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9cf7837
refactor: refactor wal config
niebayes Dec 14, 2023
c66e46c
test: update tests related to wal
niebayes Dec 14, 2023
6fdcec9
feat: introduce kafka wal config
niebayes Dec 14, 2023
88abd86
chore: augment proto with wal options
niebayes Dec 14, 2023
9992a9a
feat: augment region open request with wal options
niebayes Dec 14, 2023
5aaa4ad
feat: augment mito region with wal options
niebayes Dec 14, 2023
4dd851b
feat: augment region create request with wal options
niebayes Dec 14, 2023
c493c38
refactor: refactor log store trait
niebayes Dec 13, 2023
ceb6118
feat: add skeleton for kafka log store
niebayes Dec 14, 2023
d0c6810
feat: generalize building log store when starting datanode
niebayes Dec 14, 2023
1dc70d3
feat: integrate wal options to region write
niebayes Dec 15, 2023
1ef6e86
chore: minor update
niebayes Dec 15, 2023
37a925b
refactor: remove wal options from region create/open requests
niebayes Dec 15, 2023
7f090e6
fix: compliation issues
niebayes Dec 16, 2023
fd690af
chore: insert wal options into region options upon initializing regio…
niebayes Dec 16, 2023
7c0308f
chore: integrate wal options into region options
niebayes Dec 16, 2023
8499692
chore: fill in kafka wal config
niebayes Dec 16, 2023
b6950a9
chore: reuse namespaces while writing to wal
niebayes Dec 16, 2023
3a79101
chore: minor update
niebayes Dec 16, 2023
4f2204f
chore: fetch wal options from region while handling truncate/flush
niebayes Dec 16, 2023
0940828
fix: region options test
niebayes Dec 18, 2023
26ecded
fix: resolve some review conversations
niebayes Dec 19, 2023
075184b
fix: resolve conflicts with develop branch
niebayes Dec 19, 2023
a117a08
refactor: serde with wal options
niebayes Dec 20, 2023
338dc64
fix: conflicts with develop branch
niebayes Dec 20, 2023
c80369c
fix: resolve some review conversations
niebayes Dec 20, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 21 additions & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,36 @@ connect_timeout = "1s"
# `TCP_NODELAY` option for accepted connections, true by default.
tcp_nodelay = true

# WAL options, see `standalone.example.toml`.
# WAL options.
# Uncomments all other wal options except the one you chose.
[wal]
# WAL data directory
provider = "raft-engine"
niebayes marked this conversation as resolved.
Show resolved Hide resolved

# Raft-engine wal options, see `standalone.example.toml`
niebayes marked this conversation as resolved.
Show resolved Hide resolved
# dir = "/tmp/greptimedb/wal"
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
num_topics = 64
# Topic name prefix.
topic_name_prefix = "greptimedb_wal_kafka_topic"
# Number of partitions per topic.
num_partitions = 1
# The maximum log size an rskafka batch producer could buffer.
max_batch_size = "4MB"
# The linger duration of an rskafka batch producer.
linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
max_wait_time = "100ms"

niebayes marked this conversation as resolved.
Show resolved Hide resolved
# Storage options, see `standalone.example.toml`.
[storage]
# The working home directory.
Expand Down
7 changes: 7 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ enable = true

# WAL options.
[wal]
# Available wal providers:
# - "RaftEngine" (default)
# - "Kafka"
provider = "raft-engine"

# There's no kafka wal config for standalone mode.
niebayes marked this conversation as resolved.
Show resolved Hide resolved

# WAL data directory
# dir = "/tmp/greptimedb/wal"
# WAL file size in bytes.
Expand Down
41 changes: 31 additions & 10 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::time::Duration;
use async_trait::async_trait;
use catalog::kvbackend::MetaKvBackend;
use clap::Parser;
use common_config::WalConfig;
use common_telemetry::logging;
use datanode::config::DatanodeOptions;
use datanode::datanode::{Datanode, DatanodeBuilder};
Expand Down Expand Up @@ -167,7 +168,13 @@ impl StartCommand {
}

if let Some(wal_dir) = &self.wal_dir {
opts.wal.dir = Some(wal_dir.clone());
// `wal_dir` only affects raft-engine config.
match &mut opts.wal {
WalConfig::RaftEngine(raft_engine_config) => {
raft_engine_config.dir.replace(wal_dir.clone());
niebayes marked this conversation as resolved.
Show resolved Hide resolved
}
WalConfig::Kafka(_) => {}
}
}
niebayes marked this conversation as resolved.
Show resolved Hide resolved

if let Some(http_addr) = &self.http_addr {
Expand Down Expand Up @@ -256,6 +263,7 @@ mod tests {
tcp_nodelay = true

[wal]
provider = "raft-engine"
dir = "/other/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -293,12 +301,18 @@ mod tests {

assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!(Some(42), options.node_id);
assert_eq!("/other/wal", options.wal.dir.unwrap());

assert_eq!(Duration::from_secs(600), options.wal.purge_interval);
assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0);
assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0);
assert!(!options.wal.sync_write);
let WalConfig::RaftEngine(raft_engine_config) = options.wal else {
unreachable!()
};
assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
assert_eq!(
1024 * 1024 * 1024 * 50,
raft_engine_config.purge_threshold.0
);
assert!(!raft_engine_config.sync_write);

let HeartbeatOptions {
interval: heart_beat_interval,
Expand Down Expand Up @@ -412,9 +426,10 @@ mod tests {
tcp_nodelay = true

[wal]
provider = "raft-engine"
file_size = "1GB"
purge_threshold = "50GB"
purge_interval = "10m"
purge_interval = "5m"
sync_write = false

[storage]
Expand Down Expand Up @@ -475,7 +490,10 @@ mod tests {
};

// Should be read from env, env > default values.
assert_eq!(opts.wal.read_batch_size, 100,);
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.read_batch_size, 100);
assert_eq!(
opts.meta_client.unwrap().metasrv_addrs,
vec![
Expand All @@ -486,10 +504,13 @@ mod tests {
);

// Should be read from config file, config file > env > default values.
assert_eq!(opts.wal.purge_interval, Duration::from_secs(60 * 10));
assert_eq!(
raft_engine_config.purge_interval,
Duration::from_secs(60 * 5)
);

// Should be read from cli, cli > config file > env > default values.
assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir");
assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");

// Should be default value.
assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr);
Expand Down
7 changes: 6 additions & 1 deletion src/cmd/src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl Options {
mod tests {
use std::io::Write;

use common_config::WalConfig;
use common_test_util::temp_dir::create_named_temp_file;
use datanode::config::{DatanodeOptions, ObjectStoreConfig};

Expand All @@ -194,6 +195,7 @@ mod tests {
tcp_nodelay = true

[wal]
provider = "raft-engine"
dir = "/tmp/greptimedb/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -277,7 +279,10 @@ mod tests {
);

// Should be the values from config file, not environment variables.
assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal");
let WalConfig::RaftEngine(raft_engine_config) = opts.wal else {
unreachable!()
};
assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal");

// Should be default values.
assert_eq!(opts.node_id, None);
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ mod tests {
enable_memory_catalog = true

[wal]
provider = "raft-engine"
dir = "/tmp/greptimedb/test/wal"
file_size = "1GB"
purge_threshold = "50GB"
Expand Down Expand Up @@ -544,7 +545,10 @@ mod tests {
assert_eq!(None, fe_opts.mysql.reject_no_database);
assert!(fe_opts.influxdb.enable);

assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap());
let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
unreachable!()
};
assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap());

assert!(matches!(
&dn_opts.storage.store,
Expand Down
5 changes: 5 additions & 0 deletions src/common/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,9 @@ license.workspace = true
[dependencies]
common-base.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
rskafka = "0.5"
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
toml.workspace = true
33 changes: 2 additions & 31 deletions src/common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;
pub mod wal;

use common_base::readable_size::ReadableSize;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct WalConfig {
// wal directory
pub dir: Option<String>,
// wal file size in bytes
pub file_size: ReadableSize,
// wal purge threshold in bytes
pub purge_threshold: ReadableSize,
// purge interval in seconds
#[serde(with = "humantime_serde")]
pub purge_interval: Duration,
// read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
}

impl Default for WalConfig {
fn default() -> Self {
Self {
dir: None,
file_size: ReadableSize::mb(256), // log file size 256MB
purge_threshold: ReadableSize::gb(4), // purge threshold 4GB
purge_interval: Duration::from_secs(600),
read_batch_size: 128,
sync_write: false,
}
}
}
pub use crate::wal::WalConfig;

pub fn metadata_store_dir(store_dir: &str) -> String {
format!("{store_dir}/metadata")
Expand Down
Loading
Loading