Skip to content

Commit

Permalink
chore: fill in kafka config
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 15, 2023
1 parent 29cbacf commit 0ea5461
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
21 changes: 21 additions & 0 deletions config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,25 @@ first_heartbeat_estimate = "1000ms"
# tcp_nodelay = true

[wal]
# Available wal providers:
# - "raft-engine" (default)
# - "kafka"
provider = "raft-engine"

# There're none raft-engine wal config since meta srv only involves in remote wal currently.

# Kafka wal config.
# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default.
broker_endpoints = ["127.0.0.1:9090"]
# Number of topics to be created upon start.
num_topics = 64
# Topic selector type.
# Available selector types:
# - "round-robin" (default)
selector_type = "round-robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
topic_name_prefix = "greptimedb_kafka_wal"
# Number of partitions per topic.
num_partitions = 1
# Expected number of replicas of each partition.
replication_factor = 3
33 changes: 30 additions & 3 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,40 @@ mod topic_selector;
use serde::{Deserialize, Serialize};

use crate::wal::kafka::topic::Topic;
use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType;

/// Configurations for bootstraping a kafka wal.

Check warning on line 24 in src/common/meta/src/wal/kafka.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"bootstraping" should be "bootstrapping".

Check warning on line 24 in src/common/meta/src/wal/kafka.rs

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"bootstraping" should be "bootstrapping".
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
pub struct KafkaConfig;
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct KafkaConfig {
/// The broker endpoints of the Kafka cluster.
pub broker_endpoints: Vec<String>,
/// Number of topics to be created upon start.
pub num_topics: usize,
/// The type of the topic selector with which to select a topic for a region.
pub selector_type: TopicSelectorType,
/// Topic name prefix.
pub topic_name_prefix: String,
/// Number of partitions per topic.
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
}

impl Default for KafkaConfig {
fn default() -> Self {
Self {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_wal".to_string(),
num_partitions: 1,
replication_factor: 3,
}
}
}

/// Kafka wal options allocated to a region.
#[derive(Serialize, Deserialize, Default, Debug, Clone, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct KafkaWalOptions {
/// Kafka wal topic.
/// Publishers publish log entries to the topic while subscribers pull log entries from the topic.
Expand Down
7 changes: 6 additions & 1 deletion src/common/meta/src/wal/kafka/topic_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::default;
use std::sync::Arc;

use serde::{Deserialize, Serialize};

use crate::wal::kafka::topic::Topic;

/// The type of the topic selector, i.e. with which strategy to select a topic.
pub(super) enum SelectorType {
#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum SelectorType {
#[default]
RoundRobin,
}

Expand Down

0 comments on commit 0ea5461

Please sign in to comment.