Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): implement topic allocation #2970

Merged
merged 10 commits into from
Dec 22, 2023
Merged
1 change: 1 addition & 0 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ jobs:
- name: Setup etcd server
working-directory: tests-integration/fixtures/etcd
run: docker compose -f docker-compose-standalone.yml up -d --wait
#TODO(niebaye) Add a step to setup kafka clusters. Maybe add a docker file for starting kafka clusters.
- name: Run nextest cases
run: cargo llvm-cov nextest --workspace --lcov --output-path lcov.info -F pyo3_backend -F dashboard
env:
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion config/metasrv.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,21 @@ provider = "raft_engine"
# - "round_robin" (default)
# selector_type = "round_robin"
# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`.
# topic_name_prefix = "greptimedb_kafka_wal"
# topic_name_prefix = "greptimedb_wal_kafka"
# Number of partitions per topic.
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Above which a topic creation operation will be cancelled.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use more concise words.

Nit(Non-blocking):

Suggested change
# Above which a topic creation operation will be cancelled.
# The timeout of topic creation

# create_topic_timeout = "30s"
# The initial backoff for kafka clients.
# backoff_init = "500ms"
# The maximum backoff for kafka clients.
# backoff_max = "10s"
# Exponential backoff rate, i.e. next backoff = base * current backoff.
# backoff_base = 2.0
# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate.
# backoff_deadline = "5mins"

# Metasrv export the metrics generated by itself
# encoded to Prometheus remote-write format
Expand Down
7 changes: 7 additions & 0 deletions src/cmd/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ pub enum Error {
source: common_procedure::error::Error,
},

#[snafu(display("Failed to start wal options allocator"))]
StartWalOptionsAllocator {
location: Location,
source: common_meta::error::Error,
},

#[snafu(display("Failed to start datanode"))]
StartDatanode {
location: Location,
Expand Down Expand Up @@ -270,6 +276,7 @@ impl ErrorExt for Error {

Error::StartProcedureManager { source, .. }
| Error::StopProcedureManager { source, .. } => source.status_code(),
Error::StartWalOptionsAllocator { source, .. } => source.status_code(),
Error::ReplCreation { .. } | Error::Readline { .. } => StatusCode::Internal,
Error::RequestDatabase { source, .. } => source.status_code(),
Error::CollectRecordBatches { source, .. }
Expand Down
25 changes: 15 additions & 10 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use async_trait::async_trait;
use clap::Parser;
use common_catalog::consts::MIN_USER_TABLE_ID;
use common_config::{metadata_store_dir, KvBackendConfig, WalConfig};
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
Expand All @@ -28,7 +27,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::sequence::SequenceBuilder;
use common_meta::wal::build_wal_options_allocator;
use common_meta::wal::{WalOptionsAllocator, WalOptionsAllocatorRef};
use common_procedure::ProcedureManagerRef;
use common_telemetry::info;
use common_telemetry::logging::LoggingOptions;
Expand All @@ -51,9 +50,9 @@ use servers::Mode;
use snafu::ResultExt;

use crate::error::{
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result,
CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result,
ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu,
StartProcedureManagerSnafu, StopProcedureManagerSnafu,
StartProcedureManagerSnafu, StartWalOptionsAllocatorSnafu, StopProcedureManagerSnafu,
};
use crate::options::{CliOptions, MixOptions, Options};
use crate::App;
Expand Down Expand Up @@ -180,6 +179,7 @@ pub struct Instance {
datanode: Datanode,
frontend: FeInstance,
procedure_manager: ProcedureManagerRef,
wal_options_allocator: WalOptionsAllocatorRef,
}

#[async_trait]
Expand All @@ -196,6 +196,11 @@ impl App for Instance {
.await
.context(StartProcedureManagerSnafu)?;

self.wal_options_allocator
.start()
.await
.context(StartWalOptionsAllocatorSnafu)?;

self.frontend.start().await.context(StartFrontendSnafu)?;
Ok(())
}
Expand Down Expand Up @@ -388,14 +393,13 @@ impl StartCommand {
.build(),
);
// TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder.
let wal_options_allocator =
build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?;
let wal_options_allocator = Arc::new(WalOptionsAllocator::new(
common_meta::wal::WalConfig::default(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator,
wal_options_allocator.clone(),
));

let ddl_task_executor = Self::create_ddl_task_executor(
Expand Down Expand Up @@ -425,6 +429,7 @@ impl StartCommand {
datanode,
frontend,
procedure_manager,
wal_options_allocator,
})
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ humantime-serde.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
regex.workspace = true
rskafka = "0.5"
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
Expand Down
44 changes: 42 additions & 2 deletions src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,46 @@ pub enum Error {
"Failed to encode a wal options to json string, wal_options: {:?}",
wal_options
))]
EncodeWalOptionsToJson {
EncodeWalOptions {
wal_options: WalOptions,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},

#[snafu(display("Invalid number of topics {}", num_topics))]
InvalidNumTopics {
num_topics: usize,
location: Location,
},

#[snafu(display(
"Failed to build a kafka client, broker endpoints: {:?}",
broker_endpoints
))]
BuildKafkaClient {
broker_endpoints: Vec<String>,
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to build a kafka controller client"))]
BuildKafkaCtrlClient {
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("Failed to create a kafka wal topic"))]
CreateKafkaWalTopic {
location: Location,
#[snafu(source)]
error: rskafka::client::error::Error,
},

#[snafu(display("The topic pool is empty"))]
EmptyTopicPool { location: Location },
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -331,7 +365,11 @@ impl ErrorExt for Error {
| TableRouteNotFound { .. }
| ConvertRawTableInfo { .. }
| RegionOperatingRace { .. }
| EncodeWalOptionsToJson { .. } => StatusCode::Unexpected,
| EncodeWalOptions { .. }
| BuildKafkaClient { .. }
| BuildKafkaCtrlClient { .. }
| CreateKafkaWalTopic { .. }
| EmptyTopicPool { .. } => StatusCode::Unexpected,

SendMessage { .. }
| GetKvCache { .. }
Expand All @@ -356,6 +394,8 @@ impl ErrorExt for Error {
RetryLater { source, .. } => source.status_code(),
InvalidCatalogValue { source, .. } => source.status_code(),
ConvertAlterTableRequest { source, .. } => source.status_code(),

InvalidNumTopics { .. } => StatusCode::InvalidArguments,
}
}

Expand Down
22 changes: 19 additions & 3 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ use serde_with::with_prefix;
use crate::error::Result;
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::Topic as KafkaWalTopic;
pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator};
pub use crate::wal::options_allocator::{
allocate_region_wal_options, WalOptionsAllocator, WalOptionsAllocatorRef,
};

/// Wal config for metasrv.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)]
Expand All @@ -38,6 +40,8 @@ pub enum WalConfig {

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType;

Expand Down Expand Up @@ -65,19 +69,31 @@ mod tests {
broker_endpoints = ["127.0.0.1:9090"]
num_topics = 32
selector_type = "round_robin"
topic_name_prefix = "greptimedb_kafka_wal"
topic_name_prefix = "greptimedb_wal_kafka"
num_partitions = 1
replication_factor = 3
create_topic_timeout = "30s"
backoff_init = "500ms"
backoff_max = "10s"
backoff_base = 2.0
backoff_deadline = "5mins"
"#;
let wal_config: WalConfig = toml::from_str(toml_str).unwrap();
let expected_kafka_config = KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 32,
selector_type: KafkaTopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_deadline: Some(Duration::from_secs(60 * 5)),
};
assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config));
}

// TODO(niebayes): the integrate test needs to test that the example config file can be successfully parsed.
}
24 changes: 23 additions & 1 deletion src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub mod topic;
pub mod topic_manager;
pub mod topic_selector;

use std::time::Duration;

use serde::{Deserialize, Serialize};

pub use crate::wal::kafka::topic::Topic;
Expand All @@ -37,6 +39,21 @@ pub struct KafkaConfig {
pub num_partitions: i32,
/// The replication factor of each topic.
pub replication_factor: i16,
/// Above which a topic creation operation will be cancelled.
#[serde(with = "humantime_serde")]
pub create_topic_timeout: Duration,
/// The initial backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_init: Duration,
/// The maximum backoff for kafka clients.
#[serde(with = "humantime_serde")]
pub backoff_max: Duration,
/// Exponential backoff rate, i.e. next backoff = base * current backoff.
pub backoff_base: f64,
/// Stop reconnecting if the total wait time reaches the deadline.
/// If it's None, the reconnecting won't terminate.
#[serde(with = "humantime_serde")]
pub backoff_deadline: Option<Duration>,
}

impl Default for KafkaConfig {
Expand All @@ -45,9 +62,14 @@ impl Default for KafkaConfig {
broker_endpoints: vec!["127.0.0.1:9090".to_string()],
num_topics: 64,
selector_type: TopicSelectorType::RoundRobin,
topic_name_prefix: "greptimedb_kafka_wal".to_string(),
topic_name_prefix: "greptimedb_wal_kafka".to_string(),
num_partitions: 1,
replication_factor: 3,
create_topic_timeout: Duration::from_secs(30),
backoff_init: Duration::from_millis(500),
backoff_max: Duration::from_secs(10),
backoff_base: 2.0,
backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins
}
}
}
Loading
Loading