Skip to content

Commit

Permalink
feat(remote_wal): integrate remote wal to standalone (#3001)
Browse files Browse the repository at this point in the history
* feat: integrate remote wal to standalone

* fix: test

* chore: ready to debug kafka remote wal

* fix: test

* chore: add some logs for remote wal

* chore: add logs for topic manager

* fix: properly terminate stream consumer

* fix: properly handle TopicAlreadyExists error

* fix: parse config file error

* fix: properly handle last entry id

* chore: prepare for merge

* fix: test

* fix: typo

* fix: set replication_factor properly

* fix: CR
  • Loading branch information
niebayes authored Dec 26, 2023
1 parent 8ce8a8f commit c797241
Show file tree
Hide file tree
Showing 13 changed files with 172 additions and 80 deletions.
2 changes: 1 addition & 1 deletion config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ sync_write = false
# produce_record_timeout = "100ms"
# backoff_init = "500ms"
# backoff_max = "10s"
# backoff_base = 2.0
# backoff_base = 2
# backoff_deadline = "5mins"

# Storage options, see `standalone.example.toml`.
Expand Down
4 changes: 2 additions & 2 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1
# Above which a topic creation operation will be cancelled.
# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
4 changes: 2 additions & 2 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ provider = "raft_engine"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# replication_factor = 1

# The maximum log size a kafka batch producer could buffer.
# max_batch_size = "4MB"
Expand All @@ -119,7 +119,7 @@ provider = "raft_engine"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# backoff_base = 2
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

Expand Down
7 changes: 5 additions & 2 deletions src/common/config/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ pub struct StandaloneKafkaConfig {

impl Default for StandaloneKafkaConfig {
fn default() -> Self {
let base = KafkaConfig::default();
let replication_factor = base.broker_endpoints.len() as i16;

Self {
base: KafkaConfig::default(),
base,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
selector_type = "round_robin"
topic_name_prefix = "greptimedb_wal_topic"
num_partitions = 1
replication_factor = 3
replication_factor = 1
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
Expand All @@ -103,7 +103,7 @@ mod tests {
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor: 1,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig {
init: Duration::from_millis(500),
Expand Down
7 changes: 5 additions & 2 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,16 @@ pub struct KafkaConfig {

impl Default for KafkaConfig {
fn default() -> Self {
let broker_endpoints = vec!["127.0.0.1:9092".to_string()];
let replication_factor = broker_endpoints.len() as i16;

Self {
broker_endpoints: vec!["127.0.0.1:9092".to_string()],
broker_endpoints,
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal_topic".to_string(),
num_partitions: 1,
replication_factor: 3,
replication_factor,
create_topic_timeout: Duration::from_secs(30),
backoff: KafkaBackoffConfig::default(),
}
Expand Down
63 changes: 45 additions & 18 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::sync::Arc;
use std::time::Duration;

use common_config::wal::kafka::TopicSelectorType;
use common_telemetry::debug;
use common_telemetry::{debug, error, info};
use rskafka::client::controller::ControllerClient;
use rskafka::client::error::Error as RsKafkaError;
use rskafka::client::error::ProtocolError::TopicAlreadyExists;
use rskafka::client::ClientBuilder;
use rskafka::BackoffConfig;
use snafu::{ensure, ResultExt};
use snafu::{ensure, AsErrorSource, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu,
Expand Down Expand Up @@ -79,7 +82,6 @@ impl TopicManager {
.await?
.into_iter()
.collect::<HashSet<Topic>>();
debug!("Restored {} topics", created_topics.len());

// Creates missing topics.
let to_be_created = topics
Expand All @@ -92,10 +94,10 @@ impl TopicManager {
Some(i)
})
.collect::<Vec<_>>();

if !to_be_created.is_empty() {
self.try_create_topics(topics, &to_be_created).await?;
Self::persist_created_topics(topics, &self.kv_backend).await?;
debug!("Persisted {} topics", topics.len());
}
Ok(())
}
Expand All @@ -119,23 +121,12 @@ impl TopicManager {
.controller_client()
.context(BuildKafkaCtrlClientSnafu)?;

// Spawns tokio tasks for creating missing topics.
// Try to create missing topics.
let tasks = to_be_created
.iter()
.map(|i| {
client.create_topic(
topics[*i].clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
})
.map(|i| self.try_create_topic(&topics[*i], &client))
.collect::<Vec<_>>();
// FIXME(niebayes): try to create an already-exist topic would raise an error.
futures::future::try_join_all(tasks)
.await
.context(CreateKafkaWalTopicSnafu)
.map(|_| ())
futures::future::try_join_all(tasks).await.map(|_| ())
}

/// Selects one topic from the topic pool through the topic selector.
Expand All @@ -150,6 +141,32 @@ impl TopicManager {
.collect()
}

async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> {
match client
.create_topic(
topic.clone(),
self.config.num_partitions,
self.config.replication_factor,
self.config.create_topic_timeout.as_millis() as i32,
)
.await
{
Ok(_) => {
info!("Successfully created topic {}", topic);
Ok(())
}
Err(e) => {
if Self::is_topic_already_exist_err(&e) {
info!("The topic {} already exists", topic);
Ok(())
} else {
error!("Failed to create a topic {}, error {:?}", topic, e);
Err(e).context(CreateKafkaWalTopicSnafu)
}
}
}
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
kv_backend
.get(CREATED_TOPICS_KEY.as_bytes())
Expand All @@ -171,6 +188,16 @@ impl TopicManager {
.await
.map(|_| ())
}

fn is_topic_already_exist_err(e: &RsKafkaError) -> bool {
matches!(
e,
&RsKafkaError::ServerError {
protocol_error: TopicAlreadyExists,
..
}
)
}
}

#[cfg(test)]
Expand Down
21 changes: 12 additions & 9 deletions src/log-store/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use common_macro::stack_trace_debug;
use common_runtime::error::Error as RuntimeError;
use snafu::{Location, Snafu};

use crate::kafka::NamespaceImpl as KafkaNamespace;

#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
Expand Down Expand Up @@ -152,16 +154,17 @@ pub enum Error {
error: rskafka::client::producer::Error,
},

#[snafu(display(
"Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}",
topic,
region_id,
offset,
))]
#[snafu(display("Failed to read a record from Kafka, ns: {}", ns))]
ConsumeRecord {
topic: String,
region_id: u64,
offset: i64,
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to get the latest offset, ns: {}", ns))]
GetOffset {
ns: KafkaNamespace,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
Expand Down
20 changes: 20 additions & 0 deletions src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ pub mod log_store;
mod offset;
mod record_utils;

use std::fmt::Display;

use common_meta::wal::KafkaWalTopic as Topic;
use serde::{Deserialize, Serialize};
use store_api::logstore::entry::{Entry, Id as EntryId};
Expand All @@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl {
}
}

impl Display for NamespaceImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.topic, self.region_id)
}
}

/// Kafka Entry implementation.
#[derive(Debug, PartialEq, Clone)]
pub struct EntryImpl {
Expand Down Expand Up @@ -64,3 +72,15 @@ impl Entry for EntryImpl {
self.ns.clone()
}
}

impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
self.ns,
self.id,
self.data.len()
)
}
}
Loading

0 comments on commit c797241

Please sign in to comment.