Skip to content

Commit

Permalink
fix(remote_wal): resolve review conversations
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Nov 23, 2023
1 parent 5dd91b7 commit a8a0f7e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 47 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ num_topics = 64
# Available selector types:
# - "RoundRobin" (default)
selector_type = "RoundRobin"
topic_name_prefix = "gt_kafka_topic"
topic_name_prefix = "greptime_wal"
num_partitions = 1
replication_factor = 3
1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ humantime-serde.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
rskafka.workspace = true
serde.workspace = true
Expand Down
4 changes: 0 additions & 4 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,6 @@ pub enum Error {
error: JsonError,
},

#[snafu(display("Failed to persist Kafka topics"))]
PersistKafkaTopics { location: Location },

#[snafu(display(
"Failed to build a rskafka client, broker endpoints: {:?}",
broker_endpoints
Expand Down Expand Up @@ -388,7 +385,6 @@ impl ErrorExt for Error {
Error::MissingKafkaOpts { .. }
| Error::DeserKafkaTopics { .. }
| Error::SerKafkaTopics { .. }
| Error::PersistKafkaTopics { .. }
| Error::InvalidNumTopics { .. }
| Error::BuildKafkaClient { .. }
| Error::BuildKafkaCtrlClient { .. }
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl Default for KafkaOptions {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "gt_kafka_topic".to_string(),
topic_name_prefix: "greptime_wal".to_string(),
num_partitions: 1,
replication_factor: 3,
}
Expand Down
80 changes: 40 additions & 40 deletions src/common/meta/src/wal/kafka/topic_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@

use std::collections::HashSet;

use rand::seq::SliceRandom;
use rskafka::client::ClientBuilder;
use snafu::{ensure, OptionExt, ResultExt};

use crate::error::{
BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaTopicSnafu, DeserKafkaTopicsSnafu,
InvalidNumTopicsSnafu, MissingKafkaOptsSnafu, PersistKafkaTopicsSnafu, Result,
SerKafkaTopicsSnafu, TooManyCreatedKafkaTopicsSnafu,
InvalidNumTopicsSnafu, MissingKafkaOptsSnafu, Result, SerKafkaTopicsSnafu,
TooManyCreatedKafkaTopicsSnafu,
};
use crate::kv_backend::KvBackendRef;
use crate::rpc::store::PutRequest;
use crate::wal::kafka::topic_selector::{build_topic_selector, TopicSelectorRef};
use crate::wal::kafka::KafkaOptions;
use crate::wal::kafka::{KafkaOptions, TopicSelectorType};

pub type Topic = String;

const TOPICS_KEY: &str = "gt_kafka_topics";
const METASRV_CREATED_TOPICS_KEY: &str = "metasrv_created_topics";
const CREATE_TOPIC_TIMEOUT: i32 = 5_000; // 5,000 ms.

pub struct TopicManager {
Expand All @@ -38,14 +40,22 @@ pub struct TopicManager {

impl TopicManager {
pub async fn try_new(
kafka_opts: &Option<KafkaOptions>,
kafka_opts: Option<&KafkaOptions>,
kv_backend: &KvBackendRef,
) -> Result<Self> {
let opts = kafka_opts.as_ref().context(MissingKafkaOptsSnafu)?;
let opts = kafka_opts.context(MissingKafkaOptsSnafu)?;
let topic_pool = build_topic_pool(opts, kv_backend).await?;
let topic_selector = build_topic_selector(&opts.selector_type);

// The cursor in the round-robin selector is not persisted which may break the round-robin strategy cross crashes.
// Introduces a shuffling may help mitigate this issue.
let topic_pool = match opts.selector_type {
TopicSelectorType::RoundRobin => shuffle_topic_pool(topic_pool),
};

Ok(Self {
topic_pool: build_topic_pool(opts, kv_backend).await?,
topic_selector: build_topic_selector(&opts.selector_type),
topic_pool,
topic_selector,
})
}

Expand All @@ -57,17 +67,10 @@ impl TopicManager {
}

async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
let KafkaOptions {
broker_endpoints,
num_topics,
topic_name_prefix,
num_partitions,
replication_factor,
..
} = opts.clone();

let num_topics = opts.num_topics;
ensure!(num_topics > 0, InvalidNumTopicsSnafu { num_topics });

let broker_endpoints = opts.broker_endpoints.clone();
let kafka_client = ClientBuilder::new(broker_endpoints.clone())
.build()
.await
Expand All @@ -78,7 +81,7 @@ async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Res
.context(BuildKafkaCtrlClientSnafu)?;

let topics = (0..num_topics)
.map(|topic_id| format!("{topic_name_prefix}_{topic_id}"))
.map(|topic_id| format!("{}_{topic_id}", opts.topic_name_prefix))
.collect::<Vec<_>>();

let created_topics = restore_created_topics(kv_backend)
Expand All @@ -99,49 +102,46 @@ async fn build_topic_pool(opts: &KafkaOptions, kv_backend: &KvBackendRef) -> Res
return None;
}

// TODO(niebayes): Determine how rskafka handles an already-exist topic. Check if an error would be raised.
Some(kafka_ctrl_client.create_topic(
topic,
num_partitions,
replication_factor,
opts.num_partitions,
opts.replication_factor,
CREATE_TOPIC_TIMEOUT,
))
})
.collect::<Vec<_>>();

let _ = futures::future::try_join_all(create_topic_tasks)
futures::future::try_join_all(create_topic_tasks)
.await
.context(CreateKafkaTopicSnafu)?;

// FIXME(niebayes): current persistence strategy is all-or-none. Maybe we should increase the granularity.
persist_created_topics(&topics, kv_backend).await?;

Ok(topics)
}

fn shuffle_topic_pool(mut topic_pool: Vec<Topic>) -> Vec<Topic> {
topic_pool.shuffle(&mut rand::thread_rng());
topic_pool
}

async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result<Vec<Topic>> {
let raw_topics = kv_backend
.get(TOPICS_KEY.as_bytes())
kv_backend
.get(METASRV_CREATED_TOPICS_KEY.as_bytes())
.await?
.map(|key_value| key_value.value)
.unwrap_or_default();

serde_json::from_slice(&raw_topics).context(DeserKafkaTopicsSnafu)
.map(|key_value| serde_json::from_slice(&key_value.value).context(DeserKafkaTopicsSnafu))
.unwrap_or_else(|| Ok(vec![]))
}

async fn persist_created_topics(topics: &[Topic], kv_backend: &KvBackendRef) -> Result<()> {
let raw_topics = serde_json::to_string(topics).context(SerKafkaTopicsSnafu)?;
kv_backend
.put_conditionally(
TOPICS_KEY.as_bytes().to_vec(),
raw_topics.into_bytes(),
false,
)
.await
.and_then(|persisted| {
if !persisted {
PersistKafkaTopicsSnafu.fail()
} else {
Ok(())
}
.put(PutRequest {
key: METASRV_CREATED_TOPICS_KEY.as_bytes().to_vec(),
value: raw_topics.into_bytes(),
prev_kv: false,
})
.await
.map(|_| ())
}
2 changes: 1 addition & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl MetaSrvBuilder {

let kafka_topic_manager = match options.wal.provider {
WalProvider::Kafka => Some(
KafkaTopicManager::try_new(&options.wal.kafka_opts, &kv_backend)
KafkaTopicManager::try_new(options.wal.kafka_opts.as_ref(), &kv_backend)
.await
.context(BuildKafkaTopicManagerSnafu)?,
),
Expand Down

0 comments on commit a8a0f7e

Please sign in to comment.