Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): introduce kafka remote wal #3001

Merged
merged 20 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sync_write = false
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_base = 2
# backoff_deadline = "5mins"

# Storage options, see `standalone.example.toml`.
Expand Down
4 changes: 2 additions & 2 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
4 changes: 2 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1

# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
Expand All @@ -119,7 +119,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
7 changes: 5 additions & 2 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ pub struct StandaloneKafkaConfig {

impl Default for StandaloneKafkaConfig {
fn default() -> Self {
let base = KafkaConfig::default();
let replication_factor = base.broker_endpoints.len() as i16;

Self {
base: KafkaConfig::default(),
base,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
replication_factor = 3
replication_factor = 1
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
Expand All @@ -103,7 +103,7 @@ mod tests {
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
Expand Down
7 changes: 5 additions & 2 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct KafkaConfig {

impl Default for KafkaConfig {
fn default() -> Self {
let broker_endpoints = vec!["127.0.0.1:9092".to_string()];
let replication_factor = broker_endpoints.len() as i16;

Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig::default(),
}
Expand Down
63 changes: 45 additions & 18 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::debug;
use common_telemetry::{debug, error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use snafu::{ensure, AsErrorSource, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
Expand Down Expand Up @@ -79,7 +82,6 @@ impl TopicManager {
.await?
.into_iter()
.collect::<HashSet<Topic>>();
debug!("Restored {} topics", created_topics.len());

// Creates missing topics.
let to_be_created = topics
Expand All @@ -92,10 +94,10 @@ impl TopicManager {
Some(i)
})
.collect::<Vec<_>>();

if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
debug!("Persisted {} topics", topics.len());
}
Ok(())
}
Expand All @@ -119,23 +121,12 @@ impl TopicManager {
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;

// Spawns tokio tasks for creating missing topics.
// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| {
client.create_topic(
topics[*i].clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
})
.map(|i| self.try_create_topic(&topics[*i], &client))
.collect::<Vec<_>>();
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
.map(|_| ())
futures::future::try_join_all(tasks).await.map(|_| ())
}

/// Selects one topic from the topic pool through the topic selector.
Expand All @@ -150,6 +141,32 @@ impl TopicManager {
.collect()
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
Expand All @@ -171,6 +188,16 @@ impl TopicManager {
.await
.map(|_| ())
}

fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}

#[cfg(test)]
Expand Down
21 changes: 12 additions & 9 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use snafu::{Location, Snafu};

use crate::kafka::NamespaceImpl as KafkaNamespace;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
Expand Down Expand Up @@ -152,16 +154,17 @@ pub enum Error {
error: rskafka::client::producer::Error,
},

#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
#[snafu(display("Failed to read a record from Kafka, ns: {}", ns))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to get the latest offset, ns: {}", ns))]
GetOffset {
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
Expand Down
20 changes: 20 additions & 0 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log_store;
mod offset;
mod record_utils;

use std::fmt::Display;

use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
Expand All @@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl {
}
}

impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.region_id)
}
}

/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
Expand Down Expand Up @@ -64,3 +72,15 @@ impl Entry for EntryImpl {
self.ns.clone()
}
}

impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
self.ns,
self.id,
self.data.len()
)
}
}
Loading
Loading