diff --git a/Cargo.lock b/Cargo.lock index 91a040543474..78f7e5c7470e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1624,7 +1624,11 @@ version = "0.4.4" dependencies = [ "common-base", "humantime-serde", + "rskafka", "serde", + "serde_json", + "serde_with", + "toml 0.7.8", ] [[package]] @@ -1812,6 +1816,7 @@ dependencies = [ "bytes", "chrono", "common-catalog", + "common-config", "common-error", "common-grpc-expr", "common-macro", @@ -2152,6 +2157,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8f48d60e5b4d2c53d5c2b1d8a58c849a70ae5e5509b08a48d047e3b65714a74" +dependencies = [ + "rustc_version", +] + [[package]] name = "crc32fast" version = "1.3.2" @@ -4083,6 +4097,12 @@ version = "3.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" +[[package]] +name = "integer-encoding" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924df4f0e24e2e7f9cdd90babb0b96f93b20f3ecfa949ea9e6613756b8c8e1bf" + [[package]] name = "inventory" version = "0.3.13" @@ -4900,6 +4920,7 @@ dependencies = [ "chrono", "common-base", "common-catalog", + "common-config", "common-datasource", "common-decimal", "common-error", @@ -7382,6 +7403,30 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rskafka" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "132ecfa3cd9c3825208524a80881f115337762904ad3f0174e87975b2d79162c" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "crc32c", + "flate2", + "futures", + "integer-encoding 4.0.0", + "lz4", + "parking_lot 0.12.1", + "pin-project-lite", + "rand", + "snap", + "thiserror", + "tokio", + "tracing", + "zstd 0.12.4", +] + [[package]] name = "rstest" version = "0.17.0" @@ -8976,6 +9021,7 @@ dependencies = [ "async-trait", "bytes", "common-base", + "common-config", "common-error", "common-macro", "common-query", @@ -9502,7 +9548,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" dependencies = [ "byteorder", - "integer-encoding", + "integer-encoding 3.0.4", "ordered-float 2.10.1", ] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a0cc3601906e..bbb125adddd7 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -29,9 +29,15 @@ connect_timeout = "1s" # `TCP_NODELAY` option for accepted connections, true by default. tcp_nodelay = true -# WAL options, see `standalone.example.toml`. +# WAL options. +# Currently, users are expected to choose the wal through the provider field. +# When a wal provider is chose, the user should comment out all other wal config +# except those corresponding to the chosen one. [wal] # WAL data directory +provider = "raft_engine" + +# Raft-engine wal options, see `standalone.example.toml` # dir = "/tmp/greptimedb/wal" file_size = "256MB" purge_threshold = "4GB" @@ -39,6 +45,22 @@ purge_interval = "10m" read_batch_size = 128 sync_write = false +# Kafka wal options. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# broker_endpoints = ["127.0.0.1:9090"] +# Number of topics shall be created beforehand. +# num_topics = 64 +# Topic name prefix. +# topic_name_prefix = "greptimedb_wal_kafka_topic" +# Number of partitions per topic. +# num_partitions = 1 +# The maximum log size an rskafka batch producer could buffer. +# max_batch_size = "4MB" +# The linger duration of an rskafka batch producer. +# linger = "200ms" +# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. +# max_wait_time = "100ms" + # Storage options, see `standalone.example.toml`. [storage] # The working home directory. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index fd6965ab3946..305043d95c41 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -82,6 +82,13 @@ enable = true # WAL options. [wal] +# Available wal providers: +# - "RaftEngine" (default) +# - "Kafka" +provider = "raft_engine" + +# There's no kafka wal config for standalone mode. + # WAL data directory # dir = "/tmp/greptimedb/wal" # WAL file size in bytes. diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 0d069fa5e296..b3f0cbf8a111 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -18,7 +18,8 @@ use std::time::Duration; use async_trait::async_trait; use catalog::kvbackend::MetaKvBackend; use clap::Parser; -use common_telemetry::logging; +use common_config::WalConfig; +use common_telemetry::{info, logging}; use datanode::config::DatanodeOptions; use datanode::datanode::{Datanode, DatanodeBuilder}; use meta_client::MetaClientOptions; @@ -166,8 +167,18 @@ impl StartCommand { opts.storage.data_home = data_home.clone(); } - if let Some(wal_dir) = &self.wal_dir { - opts.wal.dir = Some(wal_dir.clone()); + // `wal_dir` only affects raft-engine config. + if let Some(wal_dir) = &self.wal_dir + && let WalConfig::RaftEngine(raft_engine_config) = &mut opts.wal + { + if raft_engine_config + .dir + .as_ref() + .is_some_and(|original_dir| original_dir != wal_dir) + { + info!("The wal dir of raft-engine is altered to {wal_dir}"); + } + raft_engine_config.dir.replace(wal_dir.clone()); } if let Some(http_addr) = &self.http_addr { @@ -256,6 +267,7 @@ mod tests { tcp_nodelay = true [wal] + provider = "raft_engine" dir = "/other/wal" file_size = "1GB" purge_threshold = "50GB" @@ -293,12 +305,18 @@ mod tests { assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); assert_eq!(Some(42), options.node_id); - assert_eq!("/other/wal", options.wal.dir.unwrap()); - assert_eq!(Duration::from_secs(600), options.wal.purge_interval); - assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0); - assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0); - assert!(!options.wal.sync_write); + let WalConfig::RaftEngine(raft_engine_config) = options.wal else { + unreachable!() + }; + assert_eq!("/other/wal", raft_engine_config.dir.unwrap()); + assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval); + assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0); + assert_eq!( + 1024 * 1024 * 1024 * 50, + raft_engine_config.purge_threshold.0 + ); + assert!(!raft_engine_config.sync_write); let HeartbeatOptions { interval: heart_beat_interval, @@ -412,9 +430,10 @@ mod tests { tcp_nodelay = true [wal] + provider = "raft_engine" file_size = "1GB" purge_threshold = "50GB" - purge_interval = "10m" + purge_interval = "5m" sync_write = false [storage] @@ -475,7 +494,10 @@ mod tests { }; // Should be read from env, env > default values. - assert_eq!(opts.wal.read_batch_size, 100,); + let WalConfig::RaftEngine(raft_engine_config) = opts.wal else { + unreachable!() + }; + assert_eq!(raft_engine_config.read_batch_size, 100); assert_eq!( opts.meta_client.unwrap().metasrv_addrs, vec![ @@ -486,10 +508,13 @@ mod tests { ); // Should be read from config file, config file > env > default values. - assert_eq!(opts.wal.purge_interval, Duration::from_secs(60 * 10)); + assert_eq!( + raft_engine_config.purge_interval, + Duration::from_secs(60 * 5) + ); // Should be read from cli, cli > config file > env > default values. - assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir"); + assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir"); // Should be default value. assert_eq!(opts.http.addr, DatanodeOptions::default().http.addr); diff --git a/src/cmd/src/lib.rs b/src/cmd/src/lib.rs index 66143492936d..52226e0441b3 100644 --- a/src/cmd/src/lib.rs +++ b/src/cmd/src/lib.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#![feature(assert_matches)] +#![feature(assert_matches, let_chains)] use async_trait::async_trait; use clap::arg; diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 39a3d94e6de3..4c6b04752a33 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -171,6 +171,7 @@ impl Options { mod tests { use std::io::Write; + use common_config::WalConfig; use common_test_util::temp_dir::create_named_temp_file; use datanode::config::{DatanodeOptions, ObjectStoreConfig}; @@ -194,6 +195,7 @@ mod tests { tcp_nodelay = true [wal] + provider = "raft_engine" dir = "/tmp/greptimedb/wal" file_size = "1GB" purge_threshold = "50GB" @@ -277,7 +279,10 @@ mod tests { ); // Should be the values from config file, not environment variables. - assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal"); + let WalConfig::RaftEngine(raft_engine_config) = opts.wal else { + unreachable!() + }; + assert_eq!(raft_engine_config.dir.unwrap(), "/tmp/greptimedb/wal"); // Should be default values. assert_eq!(opts.node_id, None); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 55ed1d9817a5..fb0e23ebefd9 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -500,6 +500,7 @@ mod tests { enable_memory_catalog = true [wal] + provider = "raft_engine" dir = "/tmp/greptimedb/test/wal" file_size = "1GB" purge_threshold = "50GB" @@ -562,7 +563,10 @@ mod tests { assert_eq!(None, fe_opts.mysql.reject_no_database); assert!(fe_opts.influxdb.enable); - assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); + let WalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else { + unreachable!() + }; + assert_eq!("/tmp/greptimedb/test/wal", raft_engine_config.dir.unwrap()); assert!(matches!( &dn_opts.storage.store, diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index d4511f124cdf..d43626b7e960 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -7,4 +7,8 @@ license.workspace = true [dependencies] common-base.workspace = true humantime-serde.workspace = true +rskafka = "0.5" serde.workspace = true +serde_json.workspace = true +serde_with = "3" +toml.workspace = true diff --git a/src/common/config/src/lib.rs b/src/common/config/src/lib.rs index 4cf4a0804eb8..33b07c0498fd 100644 --- a/src/common/config/src/lib.rs +++ b/src/common/config/src/lib.rs @@ -12,41 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +pub mod wal; use common_base::readable_size::ReadableSize; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -#[serde(default)] -pub struct WalConfig { - // wal directory - pub dir: Option, - // wal file size in bytes - pub file_size: ReadableSize, - // wal purge threshold in bytes - pub purge_threshold: ReadableSize, - // purge interval in seconds - #[serde(with = "humantime_serde")] - pub purge_interval: Duration, - // read batch size - pub read_batch_size: usize, - // whether to sync log file after every write - pub sync_write: bool, -} - -impl Default for WalConfig { - fn default() -> Self { - Self { - dir: None, - file_size: ReadableSize::mb(256), // log file size 256MB - purge_threshold: ReadableSize::gb(4), // purge threshold 4GB - purge_interval: Duration::from_secs(600), - read_batch_size: 128, - sync_write: false, - } - } -} +pub use crate::wal::{KafkaWalOptions, WalConfig, WalOptions, WAL_OPTIONS_KEY}; pub fn metadata_store_dir(store_dir: &str) -> String { format!("{store_dir}/metadata") diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs new file mode 100644 index 000000000000..56e04c55547c --- /dev/null +++ b/src/common/config/src/wal.rs @@ -0,0 +1,116 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; +pub mod raft_engine; + +use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; + +pub use crate::wal::kafka::{KafkaConfig, KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; +pub use crate::wal::raft_engine::RaftEngineConfig; + +/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair +/// and inserted into the options of a `RegionCreateRequest`. +pub const WAL_OPTIONS_KEY: &str = "wal_options"; + +/// Wal config for datanode. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "provider")] +pub enum WalConfig { + #[serde(rename = "raft_engine")] + RaftEngine(RaftEngineConfig), + #[serde(rename = "kafka")] + Kafka(KafkaConfig), +} + +impl Default for WalConfig { + fn default() -> Self { + WalConfig::RaftEngine(RaftEngineConfig::default()) + } +} + +/// Wal options allocated to a region. +/// A wal options is encoded by metasrv with `serde_json::to_string`, and then decoded +/// by datanode with `serde_json::from_str`. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +#[serde(tag = "wal.provider")] +pub enum WalOptions { + #[default] + #[serde(rename = "raft_engine")] + RaftEngine, + #[serde(rename = "kafka")] + #[serde(with = "prefix_wal_kafka")] + Kafka(KafkaWalOptions), +} + +with_prefix!(prefix_wal_kafka "wal.kafka."); + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use common_base::readable_size::ReadableSize; + use rskafka::client::partition::Compression as RsKafkaCompression; + + use crate::wal::{KafkaConfig, KafkaWalOptions, WalOptions}; + + #[test] + fn test_serde_kafka_config() { + let toml_str = r#" + broker_endpoints = ["127.0.0.1:9090"] + num_topics = 32 + topic_name_prefix = "greptimedb_wal_kafka_topic" + num_partitions = 1 + max_batch_size = "4MB" + linger = "200ms" + max_wait_time = "100ms" + "#; + let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); + let expected = KafkaConfig { + broker_endpoints: vec!["127.0.0.1:9090".to_string()], + num_topics: 32, + topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(), + num_partitions: 1, + compression: RsKafkaCompression::default(), + max_batch_size: ReadableSize::mb(4), + linger: Duration::from_millis(200), + max_wait_time: Duration::from_millis(100), + }; + assert_eq!(decoded, expected); + } + + #[test] + fn test_serde_wal_options() { + // Test serde raft-engine wal options. + let wal_options = WalOptions::RaftEngine; + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"raft_engine"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + + // Test serde kafka wal options. + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }); + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + } +} diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs new file mode 100644 index 000000000000..c645f7c3607b --- /dev/null +++ b/src/common/config/src/wal/kafka.rs @@ -0,0 +1,72 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use rskafka::client::partition::Compression as RsKafkaCompression; +use serde::{Deserialize, Serialize}; + +/// Topic name prefix. +pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic"; +/// Kafka wal topic. +pub type Topic = String; + +/// Configurations for kafka wal. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaConfig { + /// The broker endpoints of the Kafka cluster. + pub broker_endpoints: Vec, + /// Number of topics shall be created beforehand. + pub num_topics: usize, + /// Topic name prefix. + pub topic_name_prefix: String, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The compression algorithm used to compress log entries. + #[serde(skip)] + #[serde(default)] + pub compression: RsKafkaCompression, + /// The maximum log size an rskakfa batch producer could buffer. + pub max_batch_size: ReadableSize, + /// The linger duration of an rskafka batch producer. + #[serde(with = "humantime_serde")] + pub linger: Duration, + /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. + #[serde(with = "humantime_serde")] + pub max_wait_time: Duration, +} + +impl Default for KafkaConfig { + fn default() -> Self { + Self { + broker_endpoints: vec!["127.0.0.1:9090".to_string()], + num_topics: 64, + topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), + num_partitions: 1, + compression: RsKafkaCompression::NoCompression, + max_batch_size: ReadableSize::mb(4), + linger: Duration::from_millis(200), + max_wait_time: Duration::from_millis(100), + } + } +} + +/// Kafka wal options allocated to a region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct KafkaOptions { + /// Kafka wal topic. + pub topic: Topic, +} diff --git a/src/common/config/src/wal/raft_engine.rs b/src/common/config/src/wal/raft_engine.rs new file mode 100644 index 000000000000..ac9ff3047783 --- /dev/null +++ b/src/common/config/src/wal/raft_engine.rs @@ -0,0 +1,50 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_base::readable_size::ReadableSize; +use serde::{Deserialize, Serialize}; + +/// Configurations for raft-engine wal. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct RaftEngineConfig { + // wal directory + pub dir: Option, + // wal file size in bytes + pub file_size: ReadableSize, + // wal purge threshold in bytes + pub purge_threshold: ReadableSize, + // purge interval in seconds + #[serde(with = "humantime_serde")] + pub purge_interval: Duration, + // read batch size + pub read_batch_size: usize, + // whether to sync log file after every write + pub sync_write: bool, +} + +impl Default for RaftEngineConfig { + fn default() -> Self { + Self { + dir: None, + file_size: ReadableSize::mb(256), + purge_threshold: ReadableSize::gb(4), + purge_interval: Duration::from_secs(600), + read_batch_size: 128, + sync_write: false, + } + } +} diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 59c0c4b54d5e..7b0624748964 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -15,6 +15,7 @@ async-trait.workspace = true base64.workspace = true bytes.workspace = true common-catalog.workspace = true +common-config.workspace = true common-error.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 1fefd4db62ca..35050643d3c2 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -21,6 +21,7 @@ use api::v1::region::{ use api::v1::{ColumnDef, CreateTableExpr, SemanticType}; use async_trait::async_trait; use common_catalog::consts::METRIC_ENGINE; +use common_config::WAL_OPTIONS_KEY; use common_error::ext::BoxedError; use common_procedure::error::{ ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu, @@ -47,7 +48,6 @@ use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; -use crate::wal::WAL_OPTIONS_KEY; pub struct CreateTableProcedure { pub context: DdlContext, diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 4f3a9aca9742..1d8c0c3ecc26 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -14,6 +14,7 @@ use std::str::Utf8Error; +use common_config::wal::WalOptions; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; @@ -23,7 +24,6 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::peer::Peer; -use crate::wal::WalOptions; use crate::DatanodeId; #[derive(Snafu)] diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 8b3acb82520b..0ffaba6a22e0 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -22,14 +22,10 @@ use serde_with::with_prefix; use crate::error::Result; use crate::wal::kafka::KafkaConfig; -pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; +pub use crate::wal::kafka::Topic as KafkaWalTopic; pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator}; -/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair -/// and inserted into the options of a `RegionCreateRequest`. -pub const WAL_OPTIONS_KEY: &str = "wal_options"; - -/// Wal configurations for bootstrapping metasrv. +/// Wal config for metasrv. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] #[serde(tag = "provider")] pub enum WalConfig { @@ -40,23 +36,6 @@ pub enum WalConfig { Kafka(KafkaConfig), } -/// Wal options allocated to a region. -/// A wal options is encoded by metasrv into a `String` with `serde_json::to_string`. -/// It's then decoded by datanode to a `HashMap` with `serde_json::from_str`. -/// Such a encoding/decoding scheme is inspired by the encoding/decoding of `RegionOptions`. -#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] -#[serde(tag = "wal.provider")] -pub enum WalOptions { - #[default] - #[serde(rename = "raft_engine")] - RaftEngine, - #[serde(rename = "kafka")] - #[serde(with = "prefix_wal_kafka")] - Kafka(KafkaWalOptions), -} - -with_prefix!(prefix_wal_kafka "wal.kafka."); - #[cfg(test)] mod tests { use super::*; @@ -101,27 +80,4 @@ mod tests { }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); } - - #[test] - fn test_serde_wal_options() { - // Test serde raft-engine wal options. - let wal_options = WalOptions::RaftEngine; - let encoded = serde_json::to_string(&wal_options).unwrap(); - let expected = r#"{"wal.provider":"raft_engine"}"#; - assert_eq!(&encoded, expected); - - let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); - assert_eq!(decoded, wal_options); - - // Test serde kafka wal options. - let wal_options = WalOptions::Kafka(KafkaWalOptions { - topic: "test_topic".to_string(), - }); - let encoded = serde_json::to_string(&wal_options).unwrap(); - let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; - assert_eq!(&encoded, expected); - - let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); - assert_eq!(decoded, wal_options); - } } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index b24d14453186..90de795d5a73 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -22,7 +22,7 @@ pub use crate::wal::kafka::topic::Topic; pub use crate::wal::kafka::topic_manager::TopicManager; use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType; -/// Configurations for bootstrapping a kafka wal. +/// Configurations for kafka wal. #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct KafkaConfig { /// The broker endpoints of the Kafka cluster. @@ -51,10 +51,3 @@ impl Default for KafkaConfig { } } } - -/// Kafka wal options allocated to a region. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct KafkaOptions { - /// Kafka wal topic. - pub topic: Topic, -} diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index b6518199be52..be9913db1121 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -15,13 +15,14 @@ use std::collections::HashMap; use std::sync::Arc; +use common_config::{KafkaWalOptions, WalOptions}; use snafu::ResultExt; use store_api::storage::RegionNumber; use crate::error::{EncodeWalOptionsToJsonSnafu, Result}; use crate::kv_backend::KvBackendRef; -use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager}; -use crate::wal::{WalConfig, WalOptions}; +use crate::wal::kafka::TopicManager as KafkaTopicManager; +use crate::wal::WalConfig; /// Allocates wal options in region granularity. #[derive(Default)] @@ -55,7 +56,7 @@ impl WalOptionsAllocator { let topics = topic_manager.select_batch(num_regions); topics .into_iter() - .map(|topic| WalOptions::Kafka(KafkaOptions { topic })) + .map(|topic| WalOptions::Kafka(KafkaWalOptions { topic })) .collect() } } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 461186a8ada7..767a12c4acc7 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -21,6 +21,8 @@ use std::sync::Arc; use catalog::memory::MemoryCatalogManager; use common_base::Plugins; +use common_config::wal::{KafkaConfig, RaftEngineConfig}; +use common_config::{WalConfig, WAL_OPTIONS_KEY}; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_meta::key::datanode_table::DatanodeTableManager; @@ -32,9 +34,11 @@ use file_engine::engine::FileRegionEngine; use futures::future; use futures_util::future::try_join_all; use futures_util::StreamExt; +use log_store::kafka::log_store::KafkaLogStore; use log_store::raft_engine::log_store::RaftEngineLogStore; use meta_client::client::MetaClient; use metric_engine::engine::MetricEngine; +use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; @@ -45,7 +49,6 @@ use servers::metrics_handler::MetricsHandler; use servers::server::{start_server, ServerHandler, ServerHandlers}; use servers::Mode; use snafu::{OptionExt, ResultExt}; -use store_api::logstore::LogStore; use store_api::path_utils::{region_dir, WAL_DIR}; use store_api::region_engine::RegionEngineRef; use store_api::region_request::{RegionOpenRequest, RegionRequest}; @@ -221,8 +224,6 @@ impl DatanodeBuilder { let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?; // build and initialize region server - let log_store = Self::build_log_store(&self.opts).await?; - let (region_event_listener, region_event_receiver) = if controlled_by_metasrv { let (tx, rx) = new_region_server_event_channel(); (Box::new(tx) as _, Some(rx)) @@ -230,9 +231,7 @@ impl DatanodeBuilder { (Box::new(NoopRegionServerEventListener) as _, None) }; - let region_server = self - .new_region_server(log_store, region_event_listener) - .await?; + let region_server = self.new_region_server(region_event_listener).await?; self.initialize_region_server(®ion_server, kv_backend, !controlled_by_metasrv) .await?; @@ -347,11 +346,21 @@ impl DatanodeBuilder { while let Some(table_value) = table_values.next().await { let table_value = table_value.context(GetMetadataSnafu)?; for region_number in table_value.regions { + // Augments region options with wal options if a wal options is provided. + let mut region_options = table_value.region_info.region_options.clone(); + table_value + .region_info + .region_wal_options + .get(®ion_number.to_string()) + .and_then(|wal_options| { + region_options.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) + }); + regions.push(( RegionId::new(table_value.table_id, region_number), table_value.region_info.engine.clone(), table_value.region_info.region_storage_path.clone(), - table_value.region_info.region_options.clone(), + region_options, )); } } @@ -394,7 +403,6 @@ impl DatanodeBuilder { async fn new_region_server( &self, - log_store: Arc, event_listener: RegionServerEventListenerRef, ) -> Result { let opts = &self.opts; @@ -426,7 +434,7 @@ impl DatanodeBuilder { ); let object_store_manager = Self::build_object_store_manager(opts).await?; - let engines = Self::build_store_engines(opts, log_store, object_store_manager).await?; + let engines = Self::build_store_engines(opts, object_store_manager).await?; for engine in engines { region_server.register_engine(engine); } @@ -436,48 +444,19 @@ impl DatanodeBuilder { // internal utils - /// Build [RaftEngineLogStore] - async fn build_log_store(opts: &DatanodeOptions) -> Result> { - let data_home = normalize_dir(&opts.storage.data_home); - let wal_dir = match &opts.wal.dir { - Some(dir) => dir.clone(), - None => format!("{}{WAL_DIR}", data_home), - }; - let wal_config = opts.wal.clone(); - - // create WAL directory - fs::create_dir_all(Path::new(&wal_dir)) - .await - .context(CreateDirSnafu { dir: &wal_dir })?; - info!( - "Creating logstore with config: {:?} and storage path: {}", - wal_config, &wal_dir - ); - let logstore = RaftEngineLogStore::try_new(wal_dir, wal_config) - .await - .map_err(Box::new) - .context(OpenLogStoreSnafu)?; - Ok(Arc::new(logstore)) - } - - /// Build [RegionEngineRef] from `store_engine` section in `opts` - async fn build_store_engines( + /// Builds [RegionEngineRef] from `store_engine` section in `opts` + async fn build_store_engines( opts: &DatanodeOptions, - log_store: Arc, object_store_manager: ObjectStoreManagerRef, - ) -> Result> - where - S: LogStore, - { + ) -> Result> { let mut engines = vec![]; for engine in &opts.region_engine { match engine { RegionEngineConfig::Mito(config) => { - let mito_engine: MitoEngine = MitoEngine::new( - config.clone(), - log_store.clone(), - object_store_manager.clone(), - ); + let mito_engine = + Self::build_mito_engine(opts, object_store_manager.clone(), config.clone()) + .await?; + let metric_engine = MetricEngine::new(mito_engine.clone()); engines.push(Arc::new(mito_engine) as _); engines.push(Arc::new(metric_engine) as _); @@ -494,6 +473,61 @@ impl DatanodeBuilder { Ok(engines) } + /// Builds [MitoEngine] according to options. + async fn build_mito_engine( + opts: &DatanodeOptions, + object_store_manager: ObjectStoreManagerRef, + config: MitoConfig, + ) -> Result { + let mito_engine = match &opts.wal { + WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( + config, + Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) + .await?, + object_store_manager, + ), + WalConfig::Kafka(kafka_config) => MitoEngine::new( + config, + Self::build_kafka_log_store(kafka_config).await?, + object_store_manager, + ), + }; + Ok(mito_engine) + } + + /// Builds [RaftEngineLogStore]. + async fn build_raft_engine_log_store( + data_home: &str, + config: &RaftEngineConfig, + ) -> Result> { + let data_home = normalize_dir(data_home); + let wal_dir = match &config.dir { + Some(dir) => dir.clone(), + None => format!("{}{WAL_DIR}", data_home), + }; + + // create WAL directory + fs::create_dir_all(Path::new(&wal_dir)) + .await + .context(CreateDirSnafu { dir: &wal_dir })?; + info!( + "Creating raft-engine logstore with config: {:?} and storage path: {}", + config, &wal_dir + ); + let logstore = RaftEngineLogStore::try_new(wal_dir, config.clone()) + .await + .map_err(Box::new) + .context(OpenLogStoreSnafu)?; + + Ok(Arc::new(logstore)) + } + + /// Builds [KafkaLogStore]. + async fn build_kafka_log_store(config: &KafkaConfig) -> Result> { + let _ = config; + todo!() + } + /// Builds [ObjectStoreManager] async fn build_object_store_manager(opts: &DatanodeOptions) -> Result { let object_store = diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs new file mode 100644 index 000000000000..ea661c380e0c --- /dev/null +++ b/src/log-store/src/kafka.rs @@ -0,0 +1,85 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod log_store; + +use common_meta::wal::KafkaWalTopic as Topic; +use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::namespace::Namespace; + +use crate::error::Error; + +/// Kafka Namespace implementation. +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct NamespaceImpl { + region_id: u64, + topic: Topic, +} + +impl NamespaceImpl { + fn new(region_id: u64, topic: Topic) -> Self { + Self { region_id, topic } + } + + fn region_id(&self) -> u64 { + self.region_id + } + + fn topic(&self) -> &Topic { + &self.topic + } +} + +impl Namespace for NamespaceImpl { + fn id(&self) -> u64 { + self.region_id + } +} + +/// Kafka Entry implementation. +pub struct EntryImpl { + /// Entry payload. + data: Vec, + /// The logical entry id. + id: EntryId, + /// The namespace used to identify and isolate log entries from different regions. + ns: NamespaceImpl, +} + +impl EntryImpl { + fn new(data: Vec, entry_id: EntryId, ns: NamespaceImpl) -> Self { + Self { + data, + id: entry_id, + ns, + } + } +} + +impl Entry for EntryImpl { + type Error = Error; + type Namespace = NamespaceImpl; + + fn data(&self) -> &[u8] { + &self.data + } + + fn id(&self) -> EntryId { + self.id + } + + fn namespace(&self) -> Self::Namespace { + self.ns.clone() + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs new file mode 100644 index 000000000000..5aa2d9ec7d52 --- /dev/null +++ b/src/log-store/src/kafka/log_store.rs @@ -0,0 +1,106 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use common_config::wal::{KafkaConfig, WalOptions}; +use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::entry_stream::SendableEntryStream; +use store_api::logstore::namespace::Id as NamespaceId; +use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; + +use crate::error::{Error, Result}; +use crate::kafka::{EntryImpl, NamespaceImpl}; + +#[derive(Debug)] +pub struct KafkaLogStore; + +impl KafkaLogStore { + pub async fn try_new(config: KafkaConfig) -> Result { + todo!() + } +} + +#[async_trait::async_trait] +impl LogStore for KafkaLogStore { + type Error = Error; + type Entry = EntryImpl; + type Namespace = NamespaceImpl; + + /// Create an entry of the associate Entry type. + fn entry>( + &self, + data: D, + entry_id: EntryId, + ns: Self::Namespace, + ) -> Self::Entry { + EntryImpl::new(data.as_ref().to_vec(), entry_id, ns) + } + + /// Append an `Entry` to WAL with given namespace and return append response containing + /// the entry id. + async fn append(&self, entry: Self::Entry) -> Result { + todo!() + } + + /// For a batch of log entries belonging to multiple regions, each assigned to a specific topic, + /// we need to determine the minimum log offset returned for each region in this batch. + /// During replay, we use this offset to fetch log entries for a region from its assigned topic. + /// After fetching, we filter the entries to obtain log entries relevant to that specific region. + async fn append_batch(&self, entries: Vec) -> Result { + todo!() + } + + /// Create a new `EntryStream` to asynchronously generates `Entry` with ids + /// starting from `id`. The generated entries will be filtered by the namespace. + async fn read( + &self, + ns: &Self::Namespace, + entry_id: EntryId, + ) -> Result> { + todo!() + } + + /// Create a namespace of the associate Namespace type + fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { + todo!() + } + + /// Create a new `Namespace`. + async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { + Ok(()) + } + + /// Delete an existing `Namespace` with given ref. + async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { + Ok(()) + } + + /// List all existing namespaces. + async fn list_namespaces(&self) -> Result> { + Ok(vec![]) + } + + /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete + /// the log files if all entries inside are obsolete. This method may not delete log + /// files immediately. + async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> { + Ok(()) + } + + /// Stop components of logstore. + async fn stop(&self) -> Result<()> { + Ok(()) + } +} diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 4c1d80d5a4eb..901a202f2a48 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -15,6 +15,8 @@ #![feature(let_chains)] pub mod error; +#[allow(unused)] +pub mod kafka; mod noop; pub mod raft_engine; pub mod test_util; diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 97b6f6ef66ea..1929e59a2365 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -12,9 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -use store_api::logstore::entry::{Entry, Id}; +use std::collections::HashMap; + +use common_config::wal::WalOptions; +use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; -use store_api::logstore::{AppendResponse, LogStore}; +use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error::{Error, Result}; @@ -42,7 +45,7 @@ impl Entry for EntryImpl { &[] } - fn id(&self) -> Id { + fn id(&self) -> EntryId { 0 } @@ -62,17 +65,22 @@ impl LogStore for NoopLogStore { } async fn append(&self, mut _e: Self::Entry) -> Result { - Ok(AppendResponse { entry_id: 0 }) + Ok(AppendResponse { + entry_id: 0, + offset: None, + }) } - async fn append_batch(&self, _e: Vec) -> Result<()> { - Ok(()) + async fn append_batch(&self, _e: Vec) -> Result { + Ok(AppendBatchResponse { + offsets: HashMap::new(), + }) } async fn read( &self, _ns: &Self::Namespace, - _id: Id, + _entry_id: EntryId, ) -> Result> { Ok(Box::pin(futures::stream::once(futures::future::ready(Ok( @@ -92,25 +100,31 @@ impl LogStore for NoopLogStore { Ok(vec![]) } - fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { + fn entry>( + &self, + data: D, + entry_id: EntryId, + ns: Self::Namespace, + ) -> Self::Entry { let _ = data; - let _ = id; + let _ = entry_id; let _ = ns; EntryImpl } - fn namespace(&self, id: NamespaceId) -> Self::Namespace { - let _ = id; + fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { + let _ = ns_id; + let _ = wal_options; NamespaceImpl } async fn obsolete( &self, - namespace: Self::Namespace, - id: Id, + ns: Self::Namespace, + entry_id: EntryId, ) -> std::result::Result<(), Self::Error> { - let _ = namespace; - let _ = id; + let _ = ns; + let _ = entry_id; Ok(()) } } @@ -135,7 +149,7 @@ mod tests { store.create_namespace(&NamespaceImpl).await.unwrap(); assert_eq!(0, store.list_namespaces().await.unwrap().len()); store.delete_namespace(&NamespaceImpl).await.unwrap(); - assert_eq!(NamespaceImpl, store.namespace(0)); + assert_eq!(NamespaceImpl, store.namespace(0, &WalOptions::default())); store.obsolete(NamespaceImpl, 1).await.unwrap(); } } diff --git a/src/log-store/src/raft_engine.rs b/src/log-store/src/raft_engine.rs index 8e453c0d261c..fe761388f6dd 100644 --- a/src/log-store/src/raft_engine.rs +++ b/src/log-store/src/raft_engine.rs @@ -14,8 +14,8 @@ use std::hash::{Hash, Hasher}; -use store_api::logstore::entry::{Entry, Id}; -use store_api::logstore::namespace::Namespace; +use store_api::logstore::entry::{Entry, Id as EntryId}; +use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; use crate::error::Error; use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl}; @@ -42,7 +42,7 @@ impl EntryImpl { } impl NamespaceImpl { - pub fn with_id(id: Id) -> Self { + pub fn with_id(id: NamespaceId) -> Self { Self { id, ..Default::default() @@ -60,7 +60,7 @@ impl Hash for NamespaceImpl { impl Eq for NamespaceImpl {} impl Namespace for NamespaceImpl { - fn id(&self) -> store_api::logstore::namespace::Id { + fn id(&self) -> NamespaceId { self.id } } @@ -73,7 +73,7 @@ impl Entry for EntryImpl { self.data.as_slice() } - fn id(&self) -> Id { + fn id(&self) -> EntryId { self.id } diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 7f43a8b8c8e9..eb14bf0cf90a 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -16,15 +16,15 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use async_stream::stream; -use common_config::WalConfig; +use common_config::wal::{RaftEngineConfig, WalOptions}; use common_runtime::{RepeatedTask, TaskFunction}; use common_telemetry::{error, info}; use raft_engine::{Config, Engine, LogBatch, MessageExt, ReadableSize, RecoveryMode}; use snafu::{ensure, ResultExt}; -use store_api::logstore::entry::{Entry, Id}; +use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::entry_stream::SendableEntryStream; -use store_api::logstore::namespace::Namespace as NamespaceTrait; -use store_api::logstore::{AppendResponse, LogStore}; +use store_api::logstore::namespace::{Id as NamespaceId, Namespace as NamespaceTrait}; +use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error; use crate::error::{ @@ -37,7 +37,7 @@ use crate::raft_engine::protos::logstore::{EntryImpl, NamespaceImpl as Namespace const NAMESPACE_PREFIX: &str = "$sys/"; pub struct RaftEngineLogStore { - config: WalConfig, + config: RaftEngineConfig, engine: Arc, gc_task: RepeatedTask, } @@ -72,7 +72,7 @@ impl TaskFunction for PurgeExpiredFilesFunction { } impl RaftEngineLogStore { - pub async fn try_new(dir: String, config: WalConfig) -> Result { + pub async fn try_new(dir: String, config: RaftEngineConfig) -> Result { let raft_engine_config = Config { dir, purge_threshold: ReadableSize(config.purge_threshold.0), @@ -153,7 +153,7 @@ impl LogStore for RaftEngineLogStore { self.gc_task.stop().await.context(StopGcTaskSnafu) } - /// Append an entry to logstore. Currently of existence of entry's namespace is not checked. + /// Appends an entry to logstore. Currently the existence of the entry's namespace is not checked. async fn append(&self, e: Self::Entry) -> Result { ensure!(self.started(), IllegalStateSnafu); let entry_id = e.id; @@ -178,15 +178,18 @@ impl LogStore for RaftEngineLogStore { .engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - Ok(AppendResponse { entry_id }) + Ok(AppendResponse { + entry_id, + offset: None, + }) } - /// Append a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of + /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of /// batch append. - async fn append_batch(&self, entries: Vec) -> Result<()> { + async fn append_batch(&self, entries: Vec) -> Result { ensure!(self.started(), IllegalStateSnafu); if entries.is_empty() { - return Ok(()); + return Ok(AppendBatchResponse::default()); } let mut batch = LogBatch::with_capacity(entries.len()); @@ -203,7 +206,9 @@ impl LogStore for RaftEngineLogStore { .engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - Ok(()) + + // The user of raft-engine log store does not care about the response. + Ok(AppendBatchResponse::default()) } /// Create a stream of entries from logstore in the given namespace. The end of stream is @@ -211,18 +216,18 @@ impl LogStore for RaftEngineLogStore { async fn read( &self, ns: &Self::Namespace, - id: Id, + entry_id: EntryId, ) -> Result> { ensure!(self.started(), IllegalStateSnafu); let engine = self.engine.clone(); - let last_index = engine.last_index(ns.id).unwrap_or(0); - let mut start_index = id.max(engine.first_index(ns.id).unwrap_or(last_index + 1)); + let last_index = engine.last_index(ns.id()).unwrap_or(0); + let mut start_index = entry_id.max(engine.first_index(ns.id()).unwrap_or(last_index + 1)); info!( "Read logstore, namespace: {}, start: {}, span: {:?}", ns.id(), - id, + entry_id, self.span(ns) ); let max_batch_size = self.config.read_batch_size; @@ -322,31 +327,37 @@ impl LogStore for RaftEngineLogStore { Ok(namespaces) } - fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry { + fn entry>( + &self, + data: D, + entry_id: EntryId, + ns: Self::Namespace, + ) -> Self::Entry { EntryImpl { - id, + id: entry_id, data: data.as_ref().to_vec(), namespace_id: ns.id(), ..Default::default() } } - fn namespace(&self, id: store_api::logstore::namespace::Id) -> Self::Namespace { + fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { + let _ = wal_options; Namespace { - id, + id: ns_id, ..Default::default() } } - async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<()> { + async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<()> { ensure!(self.started(), IllegalStateSnafu); - let obsoleted = self.engine.compact_to(namespace.id(), id + 1); + let obsoleted = self.engine.compact_to(ns.id(), entry_id + 1); info!( "Namespace {} obsoleted {} entries, compacted index: {}, span: {:?}", - namespace.id(), + ns.id(), obsoleted, - id, - self.span(&namespace) + entry_id, + self.span(&ns) ); Ok(()) } @@ -386,7 +397,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - WalConfig::default(), + RaftEngineConfig::default(), ) .await .unwrap(); @@ -399,7 +410,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - WalConfig::default(), + RaftEngineConfig::default(), ) .await .unwrap(); @@ -425,7 +436,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - WalConfig::default(), + RaftEngineConfig::default(), ) .await .unwrap(); @@ -466,7 +477,7 @@ mod tests { { let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - WalConfig::default(), + RaftEngineConfig::default(), ) .await .unwrap(); @@ -486,7 +497,7 @@ mod tests { let logstore = RaftEngineLogStore::try_new( dir.path().to_str().unwrap().to_string(), - WalConfig::default(), + RaftEngineConfig::default(), ) .await .unwrap(); @@ -521,7 +532,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let path = dir.path().to_str().unwrap().to_string(); - let config = WalConfig { + let config = RaftEngineConfig { file_size: ReadableSize::mb(2), purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), @@ -553,7 +564,7 @@ mod tests { let dir = create_temp_dir("raft-engine-logstore-test"); let path = dir.path().to_str().unwrap().to_string(); - let config = WalConfig { + let config = RaftEngineConfig { file_size: ReadableSize::mb(2), purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), @@ -582,7 +593,7 @@ mod tests { let dir = create_temp_dir("logstore-append-batch-test"); let path = dir.path().to_str().unwrap().to_string(); - let config = WalConfig { + let config = RaftEngineConfig { file_size: ReadableSize::mb(2), purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), @@ -613,7 +624,7 @@ mod tests { let dir = create_temp_dir("logstore-append-batch-test"); let path = dir.path().to_str().unwrap().to_string(); - let config = WalConfig { + let config = RaftEngineConfig { file_size: ReadableSize::mb(2), purge_threshold: ReadableSize::mb(4), purge_interval: Duration::from_secs(5), diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index 652c899c88e1..d6836c5c8c60 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -15,14 +15,14 @@ use std::path::Path; use common_base::readable_size::ReadableSize; -use common_config::WalConfig; +use common_config::wal::RaftEngineConfig; use crate::raft_engine::log_store::RaftEngineLogStore; /// Create a write log for the provided path, used for test. pub async fn create_tmp_local_file_log_store>(path: P) -> RaftEngineLogStore { let path = path.as_ref().display().to_string(); - let cfg = WalConfig { + let cfg = RaftEngineConfig { file_size: ReadableSize::kb(128), ..Default::default() }; diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 10de52cd1131..0ee17bb85861 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -20,6 +20,7 @@ bytes.workspace = true chrono.workspace = true common-base.workspace = true common-catalog.workspace = true +common-config.workspace = true common-datasource.workspace = true common-decimal.workspace = true common-error.workspace = true diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 083a42fb36c2..99e8348dc7ab 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -22,6 +22,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64, Ordering}; use std::sync::{Arc, RwLock}; +use common_config::wal::WalOptions; use common_telemetry::info; use common_time::util::current_time_millis; use snafu::{ensure, OptionExt}; @@ -74,6 +75,8 @@ pub(crate) struct MitoRegion { pub(crate) manifest_manager: RegionManifestManager, /// SST file purger. pub(crate) file_purger: FilePurgerRef, + /// Wal options of this region. + pub(crate) wal_options: WalOptions, /// Last flush time in millis. last_flush_millis: AtomicI64, /// Whether the region is writable. diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ea387ef2c552..cf59c2b5b674 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use std::sync::atomic::{AtomicBool, AtomicI64}; use std::sync::Arc; +use common_config::wal::WalOptions; use common_telemetry::{debug, error, info, warn}; use common_time::util::current_time_millis; use futures::StreamExt; @@ -145,6 +146,8 @@ impl RegionOpener { } } let options = RegionOptions::try_from(&self.options)?; + let wal_options = options.wal_options.clone(); + let object_store = self.object_store(&options.storage)?.clone(); // Create a manifest manager for this region and writes regions to the manifest file. @@ -171,6 +174,7 @@ impl RegionOpener { access_layer, self.cache_manager, )), + wal_options, last_flush_millis: AtomicI64::new(current_time_millis()), // Region is writable after it is created. writable: AtomicBool::new(true), @@ -215,6 +219,8 @@ impl RegionOpener { wal: &Wal, ) -> Result> { let region_options = RegionOptions::try_from(&self.options)?; + let wal_options = region_options.wal_options.clone(); + let region_manifest_options = self.manifest_options(config, ®ion_options)?; let Some(manifest_manager) = RegionManifestManager::open(region_manifest_options).await? else { @@ -244,7 +250,14 @@ impl RegionOpener { let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { - replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?; + replay_memtable( + wal, + &wal_options, + region_id, + flushed_entry_id, + &version_control, + ) + .await?; } else { info!("Skip the WAL replay for region: {}", region_id); } @@ -255,6 +268,7 @@ impl RegionOpener { access_layer, manifest_manager, file_purger, + wal_options, last_flush_millis: AtomicI64::new(current_time_millis()), // Region is always opened in read only mode. writable: AtomicBool::new(false), @@ -346,6 +360,7 @@ pub(crate) fn check_recovered_region( /// Replays the mutations from WAL and inserts mutations to memtable of given region. async fn replay_memtable( wal: &Wal, + wal_options: &WalOptions, region_id: RegionId, flushed_entry_id: EntryId, version_control: &VersionControlRef, @@ -354,8 +369,8 @@ async fn replay_memtable( // Last entry id should start from flushed entry id since there might be no // data in the WAL. let mut last_entry_id = flushed_entry_id; - let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control); - let mut wal_stream = wal.scan(region_id, flushed_entry_id)?; + let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control, wal_options.clone()); + let mut wal_stream = wal.scan(region_id, flushed_entry_id, wal_options)?; while let Some(res) = wal_stream.next().await { let (entry_id, entry) = res?; last_entry_id = last_entry_id.max(entry_id); diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index 98f863168b08..c4415887f386 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -17,6 +17,8 @@ use std::collections::HashMap; use std::time::Duration; +use common_config::wal::WalOptions; +use common_config::WAL_OPTIONS_KEY; use serde::Deserialize; use serde_json::Value; use serde_with::{serde_as, with_prefix, DisplayFromStr}; @@ -37,6 +39,8 @@ pub struct RegionOptions { pub compaction: CompactionOptions, /// Custom storage. pub storage: Option, + /// Wal options. + pub wal_options: WalOptions, } impl TryFrom<&HashMap> for RegionOptions { @@ -53,10 +57,19 @@ impl TryFrom<&HashMap> for RegionOptions { serde_json::from_str(&json).context(JsonOptionsSnafu)?; let compaction: CompactionOptions = serde_json::from_str(&json).unwrap_or_default(); + // Tries to decode the wal options from the map or sets to the default if there's none wal options in the map. + let wal_options = options_map.get(WAL_OPTIONS_KEY).map_or_else( + || Ok(WalOptions::default()), + |encoded_wal_options| { + serde_json::from_str(encoded_wal_options).context(JsonOptionsSnafu) + }, + )?; + Ok(RegionOptions { ttl: options.ttl, compaction, storage: options.storage, + wal_options, }) } } @@ -161,6 +174,9 @@ fn options_map_to_value(options: &HashMap) -> Value { #[cfg(test)] mod tests { + use common_config::wal::KafkaWalOptions; + use common_config::WAL_OPTIONS_KEY; + use super::*; fn make_map(options: &[(&str, &str)]) -> HashMap { @@ -232,8 +248,34 @@ mod tests { assert_eq!(expect, options); } + fn test_with_wal_options(wal_options: &WalOptions) -> bool { + let encoded_wal_options = serde_json::to_string(&wal_options).unwrap(); + let map = make_map(&[(WAL_OPTIONS_KEY, &encoded_wal_options)]); + let got = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + wal_options: wal_options.clone(), + ..Default::default() + }; + expect == got + } + + // No need to add compatible tests for RegionOptions since the above tests already check for compatibility. + #[test] + fn test_with_any_wal_options() { + let all_wal_options = vec![ + WalOptions::RaftEngine, + WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }), + ]; + all_wal_options.iter().all(test_with_wal_options); + } + #[test] fn test_with_all() { + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }); let map = make_map(&[ ("ttl", "7d"), ("compaction.twcs.max_active_window_files", "8"), @@ -241,6 +283,10 @@ mod tests { ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("storage", "S3"), + ( + WAL_OPTIONS_KEY, + &serde_json::to_string(&wal_options).unwrap(), + ), ]); let options = RegionOptions::try_from(&map).unwrap(); let expect = RegionOptions { @@ -251,6 +297,7 @@ mod tests { time_window: Some(Duration::from_secs(3600 * 2)), }), storage: Some("s3".to_string()), + wal_options, }; assert_eq!(expect, options); } diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 6756c83b64ea..8a6decefb4ac 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,6 +16,7 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry}; +use common_config::wal::WalOptions; use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; @@ -86,6 +87,8 @@ pub(crate) struct RegionWriteCtx { /// We keep [WalEntry] instead of mutations to avoid taking mutations /// out of the context to construct the wal entry when we write to the wal. wal_entry: WalEntry, + /// Wal options of the region being written to. + wal_options: WalOptions, /// Notifiers to send write results to waiters. /// /// The i-th notify is for i-th mutation. @@ -102,7 +105,11 @@ pub(crate) struct RegionWriteCtx { impl RegionWriteCtx { /// Returns an empty context. - pub(crate) fn new(region_id: RegionId, version_control: &VersionControlRef) -> RegionWriteCtx { + pub(crate) fn new( + region_id: RegionId, + version_control: &VersionControlRef, + wal_options: WalOptions, + ) -> RegionWriteCtx { let VersionControlData { version, committed_sequence, @@ -117,6 +124,7 @@ impl RegionWriteCtx { next_sequence: committed_sequence + 1, next_entry_id: last_entry_id + 1, wal_entry: WalEntry::default(), + wal_options, notifiers: Vec::new(), failed: false, put_num: 0, @@ -153,7 +161,12 @@ impl RegionWriteCtx { &mut self, wal_writer: &mut WalWriter, ) -> Result<()> { - wal_writer.add_entry(self.region_id, self.next_entry_id, &self.wal_entry)?; + wal_writer.add_entry( + self.region_id, + self.next_entry_id, + &self.wal_entry, + &self.wal_options, + )?; // We only call this method one time, but we still bump next entry id for consistency. self.next_entry_id += 1; Ok(()) diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs index 2171ceb7b1bd..3bbfefe96b93 100644 --- a/src/mito2/src/wal.rs +++ b/src/mito2/src/wal.rs @@ -14,11 +14,13 @@ //! Write ahead log of the engine. +use std::collections::HashMap; use std::mem; use std::sync::Arc; use api::v1::WalEntry; use async_stream::try_stream; +use common_config::wal::WalOptions; use common_error::ext::BoxedError; use futures::stream::BoxStream; use futures::StreamExt; @@ -60,13 +62,19 @@ impl Wal { store: self.store.clone(), entries: Vec::new(), entry_encode_buf: Vec::new(), + namespaces: HashMap::new(), } } /// Scan entries of specific region starting from `start_id` (inclusive). - pub fn scan(&self, region_id: RegionId, start_id: EntryId) -> Result { + pub fn scan<'a>( + &'a self, + region_id: RegionId, + start_id: EntryId, + wal_options: &'a WalOptions, + ) -> Result { let stream = try_stream!({ - let namespace = self.store.namespace(region_id.into()); + let namespace = self.store.namespace(region_id.into(), wal_options); let mut stream = self .store .read(&namespace, start_id) @@ -89,8 +97,13 @@ impl Wal { } /// Mark entries whose ids `<= last_id` as deleted. - pub async fn obsolete(&self, region_id: RegionId, last_id: EntryId) -> Result<()> { - let namespace = self.store.namespace(region_id.into()); + pub async fn obsolete( + &self, + region_id: RegionId, + last_id: EntryId, + wal_options: &WalOptions, + ) -> Result<()> { + let namespace = self.store.namespace(region_id.into(), wal_options); self.store .obsolete(namespace, last_id) .await @@ -117,6 +130,8 @@ pub struct WalWriter { entries: Vec, /// Buffer to encode WAL entry. entry_encode_buf: Vec, + /// Namespaces of regions being written into. + namespaces: HashMap, } impl WalWriter { @@ -126,8 +141,15 @@ impl WalWriter { region_id: RegionId, entry_id: EntryId, wal_entry: &WalEntry, + wal_options: &WalOptions, ) -> Result<()> { - let namespace = self.store.namespace(region_id.into()); + // Gets or inserts with a newly built namespace. + let namespace = self + .namespaces + .entry(region_id) + .or_insert_with(|| self.store.namespace(region_id.into(), wal_options)) + .clone(); + // Encode wal entry to log store entry. self.entry_encode_buf.clear(); wal_entry @@ -143,6 +165,7 @@ impl WalWriter { } /// Write all buffered entries to the WAL. + // TODO(niebayes): returns an `AppendBatchResponse` and handle it properly. pub async fn write_to_wal(&mut self) -> Result<()> { // TODO(yingwen): metrics. @@ -152,6 +175,7 @@ impl WalWriter { .await .map_err(BoxedError::new) .context(WriteWalSnafu) + .map(|_| ()) } } @@ -235,6 +259,7 @@ mod tests { async fn test_write_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); + let wal_options = WalOptions::default(); let entry = WalEntry { mutations: vec![ @@ -244,11 +269,17 @@ mod tests { }; let mut writer = wal.writer(); // Region 1 entry 1. - writer.add_entry(RegionId::new(1, 1), 1, &entry).unwrap(); + writer + .add_entry(RegionId::new(1, 1), 1, &entry, &wal_options) + .unwrap(); // Region 2 entry 1. - writer.add_entry(RegionId::new(1, 2), 1, &entry).unwrap(); + writer + .add_entry(RegionId::new(1, 2), 1, &entry, &wal_options) + .unwrap(); // Region 1 entry 2. - writer.add_entry(RegionId::new(1, 1), 2, &entry).unwrap(); + writer + .add_entry(RegionId::new(1, 1), 2, &entry, &wal_options) + .unwrap(); // Test writing multiple region to wal. writer.write_to_wal().await.unwrap(); @@ -295,31 +326,32 @@ mod tests { async fn test_scan_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); + let wal_options = WalOptions::default(); let entries = sample_entries(); let (id1, id2) = (RegionId::new(1, 1), RegionId::new(1, 2)); let mut writer = wal.writer(); - writer.add_entry(id1, 1, &entries[0]).unwrap(); + writer.add_entry(id1, 1, &entries[0], &wal_options).unwrap(); // Insert one entry into region2. Scan should not return this entry. - writer.add_entry(id2, 1, &entries[0]).unwrap(); - writer.add_entry(id1, 2, &entries[1]).unwrap(); - writer.add_entry(id1, 3, &entries[2]).unwrap(); - writer.add_entry(id1, 4, &entries[3]).unwrap(); + writer.add_entry(id2, 1, &entries[0], &wal_options).unwrap(); + writer.add_entry(id1, 2, &entries[1], &wal_options).unwrap(); + writer.add_entry(id1, 3, &entries[2], &wal_options).unwrap(); + writer.add_entry(id1, 4, &entries[3], &wal_options).unwrap(); writer.write_to_wal().await.unwrap(); // Scan all contents region1 - let stream = wal.scan(id1, 1).unwrap(); + let stream = wal.scan(id1, 1, &wal_options).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries, 1, &actual); // Scan parts of contents - let stream = wal.scan(id1, 2).unwrap(); + let stream = wal.scan(id1, 2, &wal_options).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries[1..], 2, &actual); // Scan out of range - let stream = wal.scan(id1, 5).unwrap(); + let stream = wal.scan(id1, 5, &wal_options).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); assert!(actual.is_empty()); } @@ -328,26 +360,35 @@ mod tests { async fn test_obsolete_wal() { let env = WalEnv::new().await; let wal = env.new_wal(); + let wal_options = WalOptions::default(); let entries = sample_entries(); let mut writer = wal.writer(); let region_id = RegionId::new(1, 1); - writer.add_entry(region_id, 1, &entries[0]).unwrap(); - writer.add_entry(region_id, 2, &entries[1]).unwrap(); - writer.add_entry(region_id, 3, &entries[2]).unwrap(); + writer + .add_entry(region_id, 1, &entries[0], &wal_options) + .unwrap(); + writer + .add_entry(region_id, 2, &entries[1], &wal_options) + .unwrap(); + writer + .add_entry(region_id, 3, &entries[2], &wal_options) + .unwrap(); writer.write_to_wal().await.unwrap(); // Delete 1, 2. - wal.obsolete(region_id, 2).await.unwrap(); + wal.obsolete(region_id, 2, &wal_options).await.unwrap(); // Put 4. let mut writer = wal.writer(); - writer.add_entry(region_id, 4, &entries[3]).unwrap(); + writer + .add_entry(region_id, 4, &entries[3], &wal_options) + .unwrap(); writer.write_to_wal().await.unwrap(); // Scan all - let stream = wal.scan(region_id, 1).unwrap(); + let stream = wal.scan(region_id, 1, &wal_options).unwrap(); let actual: Vec<_> = stream.try_collect().await.unwrap(); check_entries(&entries[2..], 3, &actual); } diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 8585adf31fae..043cb60cc9d3 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -201,7 +201,11 @@ impl RegionWorkerLoop { "Region {} flush finished, tries to bump wal to {}", region_id, request.flushed_entry_id ); - if let Err(e) = self.wal.obsolete(region_id, request.flushed_entry_id).await { + if let Err(e) = self + .wal + .obsolete(region_id, request.flushed_entry_id, ®ion.wal_options) + .await + { error!(e; "Failed to write wal, region: {}", region_id); request.on_failure(e); return; diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index ecb66817b30c..811a6f2c9993 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -59,7 +59,9 @@ impl RegionWorkerLoop { ); // Make all data obsolete. - self.wal.obsolete(region_id, truncated_entry_id).await?; + self.wal + .obsolete(region_id, truncated_entry_id, ®ion.wal_options) + .await?; info!( "Complete truncating region: {}, entry id: {} and sequence: {}.", region_id, truncated_entry_id, truncated_sequence diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index a04596a9ef51..97a481d7d4dc 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -133,7 +133,11 @@ impl RegionWorkerLoop { continue; }; - let region_ctx = RegionWriteCtx::new(region.region_id, ®ion.version_control); + let region_ctx = RegionWriteCtx::new( + region.region_id, + ®ion.version_control, + region.wal_options.clone(), + ); e.insert(region_ctx); } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 3e6b92c95f17..293302dcfde5 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -10,6 +10,7 @@ aquamarine.workspace = true async-trait.workspace = true bytes.workspace = true common-base.workspace = true +common-config.workspace = true common-error.workspace = true common-macro.workspace = true common-query.workspace = true diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 975715074ccc..3fb81d9a624c 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -14,11 +14,14 @@ //! LogStore APIs. +use std::collections::HashMap; + +use common_config::wal::WalOptions; use common_error::ext::ErrorExt; -use crate::logstore::entry::{Entry, Id}; +use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset}; use crate::logstore::entry_stream::SendableEntryStream; -use crate::logstore::namespace::Namespace; +use crate::logstore::namespace::{Id as NamespaceId, Namespace}; pub mod entry; pub mod entry_stream; @@ -36,17 +39,21 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Append an `Entry` to WAL with given namespace and return append response containing /// the entry id. - async fn append(&self, e: Self::Entry) -> Result; + async fn append(&self, entry: Self::Entry) -> Result; - /// Append a batch of entries atomically and return the offset of first entry. - async fn append_batch(&self, e: Vec) -> Result<(), Self::Error>; + /// Append a batch of entries and return an append batch response containing the start entry ids of + /// log entries written to each region. + async fn append_batch( + &self, + entries: Vec, + ) -> Result; /// Create a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. async fn read( &self, ns: &Self::Namespace, - id: Id, + id: EntryId, ) -> Result, Self::Error>; /// Create a new `Namespace`. @@ -59,19 +66,33 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { async fn list_namespaces(&self) -> Result, Self::Error>; /// Create an entry of the associate Entry type - fn entry>(&self, data: D, id: Id, ns: Self::Namespace) -> Self::Entry; + fn entry>(&self, data: D, entry_id: EntryId, ns: Self::Namespace) + -> Self::Entry; /// Create a namespace of the associate Namespace type // TODO(sunng87): confusion with `create_namespace` - fn namespace(&self, id: namespace::Id) -> Self::Namespace; + fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace; /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete /// the log files if all entries inside are obsolete. This method may not delete log /// files immediately. - async fn obsolete(&self, namespace: Self::Namespace, id: Id) -> Result<(), Self::Error>; + async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>; } +/// The response of an `append` operation. #[derive(Debug)] pub struct AppendResponse { - pub entry_id: Id, + /// The entry id of the appended log entry. + pub entry_id: EntryId, + /// The start entry offset of the appended log entry. + /// Depends on the `LogStore` implementation, the entry offset may be missing. + pub offset: Option, +} + +/// The response of an `append_batch` operation. +#[derive(Debug, Default)] +pub struct AppendBatchResponse { + /// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region. + /// Depends on the `LogStore` implementation, the entry offsets may be missing. + pub offsets: HashMap, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index 68077efd0df3..cb2538086e6d 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -16,9 +16,10 @@ use common_error::ext::ErrorExt; use crate::logstore::namespace::Namespace; -pub type Offset = usize; -pub type Epoch = u64; +/// An entry's logical id, allocated by log store users. pub type Id = u64; +/// An entry's physical offset in the underlying log store. +pub type Offset = usize; /// Entry is the minimal data storage unit in `LogStore`. pub trait Entry: Send + Sync { diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index c26e840f20c0..c35070bf5a43 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -773,6 +773,7 @@ timeout = "30s" body_limit = "64MiB" [datanode.wal] +provider = "raft_engine" file_size = "256MiB" purge_threshold = "4GiB" purge_interval = "10m"