Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): impl kafka log store #2971

Merged
merged 25 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
594db45
feat: introduce client manager
niebayes Dec 21, 2023
ea3a077
chore: add errors for client manager
niebayes Dec 21, 2023
071b974
chore: add record utils
niebayes Dec 21, 2023
3b779b9
chore: impl kafka log store
niebayes Dec 21, 2023
eb597ce
chore: build kafka log store upon starting datanode
niebayes Dec 21, 2023
eed0def
chore: update comments for kafka log store
niebayes Dec 21, 2023
047941b
chore: add a todo for getting entry offset
niebayes Dec 21, 2023
27181d6
fix: typo
niebayes Dec 21, 2023
2260cf9
chore: remove unused
niebayes Dec 21, 2023
66f7d9c
chore: update comments
niebayes Dec 21, 2023
45ac518
fix: typo
niebayes Dec 21, 2023
8a5dab8
fix: resolve some review conversations
niebayes Dec 21, 2023
7241994
Merge branch 'develop' into feat/impl_kafka_log_store
niebayes Dec 22, 2023
bc7938d
chore: move commonly referenced crates to workspace Cargo.toml
niebayes Dec 22, 2023
d19178d
fix: style
niebayes Dec 22, 2023
f8307a3
Merge branch 'develop' into feat/impl_kafka_log_store
niebayes Dec 22, 2023
ffa4c4a
fix: style
niebayes Dec 22, 2023
4bc1cc1
chore: unify topic name prefix
niebayes Dec 22, 2023
94c53a4
chore: make backoff config configurable by users
niebayes Dec 22, 2023
ba21a0e
chore: properly use backoff config in wal config
niebayes Dec 22, 2023
a718a9a
refactor: read/write of kafka log store
niebayes Dec 23, 2023
ed4d2cf
fix: typo
niebayes Dec 23, 2023
62a8991
fix: typo
niebayes Dec 23, 2023
af419f0
fix: resolve conflicts and review conversations
niebayes Dec 25, 2023
0f4ef3e
fix: resolve review conversations
niebayes Dec 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 7 additions & 10 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,26 @@ tcp_nodelay = true
# WAL data directory
provider = "raft_engine"

# Raft-engine wal options, see `standalone.example.toml`
# Raft-engine wal options, see `standalone.example.toml`.
# dir = "/tmp/greptimedb/wal"
file_size = "256MB"
purge_threshold = "4GB"
purge_interval = "10m"
read_batch_size = 128
sync_write = false

# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# Kafka wal options, see `standalone.example.toml`.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
# num_topics = 64
# Topic name prefix.
# topic_name_prefix = "greptimedb_wal_kafka_topic"
# Number of partitions per topic.
# topic_name_prefix = "greptimedb_wal_topic"
niebayes marked this conversation as resolved.
Show resolved Hide resolved
# num_partitions = 1
# The maximum log size an rskafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of an rskafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_deadline = "5mins"

# Storage options, see `standalone.example.toml`.
[storage]
Expand Down
2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ provider = "raft_engine"
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_wal_kafka"
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
Expand Down
24 changes: 23 additions & 1 deletion config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,29 @@ enable = true
# - "Kafka"
provider = "raft_engine"

# There's no kafka wal config for standalone mode.
# Kafka wal options.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
# broker_endpoints = ["127.0.0.1:9090"]
# Number of topics shall be created beforehand.
# num_topics = 64
# Topic name prefix.
# topic_name_prefix = "greptimedb_wal_topic"
# Number of partitions per topic.
# num_partitions = 1
# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
# The linger duration of a kafka batch producer.
# linger = "200ms"
# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
# max_wait_time = "100ms"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

# WAL data directory
# dir = "/tmp/greptimedb/wal"
Expand Down
12 changes: 10 additions & 2 deletions src/common/config/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,30 @@ mod tests {
let toml_str = r#"
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
topic_name_prefix = "greptimedb_wal_kafka_topic"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
max_batch_size = "4MB"
linger = "200ms"
max_wait_time = "100ms"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2
backoff_deadline = "5mins"
"#;
let decoded: KafkaConfig = toml::from_str(toml_str).unwrap();
let expected = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(),
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
compression: RsKafkaCompression::default(),
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(decoded, expected);
}
Expand Down
23 changes: 20 additions & 3 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rskafka::client::partition::Compression as RsKafkaCompression;
use serde::{Deserialize, Serialize};

/// Topic name prefix.
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic";
pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic";
/// Kafka wal topic.
pub type Topic = String;

Expand All @@ -39,14 +39,27 @@ pub struct KafkaConfig {
#[serde(skip)]
#[serde(default)]
pub compression: RsKafkaCompression,
/// The maximum log size an rskakfa batch producer could buffer.
/// The maximum log size a kakfa batch producer could buffer.
pub max_batch_size: ReadableSize,
/// The linger duration of an rskafka batch producer.
/// The linger duration of a kafka batch producer.
#[serde(with = "humantime_serde")]
pub linger: Duration,
/// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned.
#[serde(with = "humantime_serde")]
pub max_wait_time: Duration,
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
// Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait.
pub backoff_base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
}

impl Default for KafkaConfig {
Expand All @@ -60,6 +73,10 @@ impl Default for KafkaConfig {
max_batch_size: ReadableSize::mb(4),
linger: Duration::from_millis(200),
max_wait_time: Duration::from_millis(100),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,27 @@ mod tests {
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_kafka"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
replication_factor = 3
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2.0
backoff_base = 2
backoff_deadline = "5mins"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
let expected_kafka_config = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
selector_type: KafkaTopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
Expand Down
8 changes: 5 additions & 3 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub struct KafkaConfig {
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
pub backoff_base: f64,
// Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32,
// and we want to unify their types.
pub backoff_base: u32,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
Expand All @@ -62,13 +64,13 @@ impl Default for KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_base: 2,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl TopicManager {
let backoff_config = BackoffConfig {
init_backoff: self.config.backoff_init,
max_backoff: self.config.backoff_max,
base: self.config.backoff_base,
base: self.config.backoff_base as f64,
deadline: self.config.backoff_deadline,
};
let client = ClientBuilder::new(self.config.broker_endpoints.clone())
Expand Down Expand Up @@ -181,7 +181,7 @@ mod tests {
#[tokio::test]
async fn test_restore_persisted_topics() {
let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef;
let topic_name_prefix = "greptimedb_wal_kafka";
let topic_name_prefix = "greptimedb_wal_topic";
let num_topics = 16;

// Constructs mock topics.
Expand Down
7 changes: 5 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,11 @@ impl DatanodeBuilder {

/// Builds [KafkaLogStore].
async fn build_kafka_log_store(config: &KafkaConfig) -> Result<Arc<KafkaLogStore>> {
let _ = config;
todo!()
KafkaLogStore::try_new(config)
.await
.map_err(Box::new)
.context(OpenLogStoreSnafu)
.map(Arc::new)
}

/// Builds [ObjectStoreManager]
Expand Down
85 changes: 85 additions & 0 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::any::Any;

use common_config::wal::KafkaWalTopic;
use common_error::ext::ErrorExt;
use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
Expand Down Expand Up @@ -84,6 +85,90 @@ pub enum Error {
attempt_index: u64,
location: Location,
},

#[snafu(display(
"Failed to build a kafka client, broker endpoints: {:?}",
niebayes marked this conversation as resolved.
Show resolved Hide resolved
broker_endpoints
))]
BuildClient {
broker_endpoints: Vec<String>,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display(
"Failed to build a kafka partition client, topic: {}, partition: {}",
topic,
partition
))]
BuildPartitionClient {
topic: String,
partition: i32,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display(
"Failed to get a Kafka topic client, topic: {}, source: {}",
topic,
error
))]
fengys1996 marked this conversation as resolved.
Show resolved Hide resolved
GetClient {
topic: KafkaWalTopic,
location: Location,
error: String,
},

#[snafu(display("Failed to encode a record meta"))]
EncodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Failed to decode a record meta"))]
DecodeMeta {
location: Location,
#[snafu(source)]
error: serde_json::Error,
},

#[snafu(display("Missing required key in a record"))]
MissingKey { location: Location },

#[snafu(display("Missing required value in a record"))]
MissingValue { location: Location },

#[snafu(display("Cannot build a record from empty entries"))]
EmptyEntries { location: Location },

#[snafu(display("Failed to produce records to Kafka, topic: {}", topic))]
ProduceRecord {
topic: KafkaWalTopic,
location: Location,
#[snafu(source)]
error: rskafka::client::producer::Error,
},

#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to do a cast"))]
Cast { location: Location },
}

impl ErrorExt for Error {
Expand Down
Loading
Loading