diff --git a/Cargo.lock b/Cargo.lock index cf177126c0ee..4fae5e0b48c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1812,6 +1812,7 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_with", "snafu", "store-api", "strum 0.25.0", diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index a4f2a6f4a10e..2ea0f34c9dfa 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -34,6 +34,7 @@ prost.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true +serde_with = "3" snafu.workspace = true store-api.workspace = true strum.workspace = true diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index c3c52b44ec49..0549f02797fa 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -334,7 +334,9 @@ impl TableCreator { // Encodes each wal options. let wal_options_map = wal_options_map .into_iter() - .map(|(region_number, wal_options)| (region_number, wal_options.into())) + .map(|(region_number, wal_options)| { + (region_number, serde_json::to_string(&wal_options).unwrap()) + }) .collect(); Self { diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 26763276aafc..bf71aa055d87 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -18,9 +18,11 @@ pub mod options_allocator; use std::collections::HashMap; use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; use crate::error::Result; -use crate::wal::kafka::{KafkaConfig, Topic as KafkaTopic}; +use crate::wal::kafka::KafkaConfig; +pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; pub use crate::wal::options_allocator::WalOptionsAllocator; pub const WAL_OPTIONS_KEY: &str = "wal_options"; @@ -36,32 +38,21 @@ pub enum WalConfig { Kafka(KafkaConfig), } -/// Wal options for a region. -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] -#[serde(tag = "provider")] +/// Wal options allocated to a region. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +#[serde(tag = "wal.provider")] pub enum WalOptions { #[default] #[serde(rename = "raft-engine")] RaftEngine, #[serde(rename = "kafka")] - Kafka { topic: KafkaTopic }, -} - -pub type EncodedWalOptions = String; - -impl From for EncodedWalOptions { - fn from(value: WalOptions) -> Self { - EncodedWalOptions::default() - } + #[serde(with = "prefix_wal_kafka")] + Kafka(KafkaWalOptions), } -impl TryFrom for WalOptions { - type Error = crate::error::Error; +with_prefix!(prefix_wal_kafka "wal.kafka."); - fn try_from(value: EncodedWalOptions) -> Result { - todo!() - } -} +pub type EncodedWalOptions = String; #[cfg(test)] mod tests { @@ -111,24 +102,23 @@ mod tests { #[test] fn test_serde_wal_options() { // Test serde raft-engine wal options. - let toml_str = r#" - provider = "raft-engine" - "#; - let wal_options: WalOptions = toml::from_str(toml_str).unwrap(); - assert_eq!(wal_options, WalOptions::RaftEngine); + let wal_options = WalOptions::RaftEngine; + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"raft-engine"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); // Test serde kafka wal options. - let toml_str = r#" - provider = "kafka" - topic = "test_topic" - "#; - let wal_options: WalOptions = toml::from_str(toml_str).unwrap(); - let expected_kafka_topic = "test_topic".to_string(); - assert_eq!( - wal_options, - WalOptions::Kafka { - topic: expected_kafka_topic - } - ); + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }); + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); } } diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index db00b338c905..189a4190ae9b 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -51,3 +51,20 @@ impl Default for KafkaConfig { } } } + +/// Kafka wal options allocated to a region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct KafkaOptions { + /// Kafka wal topic. + pub topic: Topic, +} + +impl Default for KafkaOptions { + fn default() -> Self { + Self { + // To indicates a default deserialized topic is invalid. + topic: "invalid_topic".to_string(), + } + } +} diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index b3ed1c216833..c690cf5a379c 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use crate::error::Result; use crate::kv_backend::KvBackendRef; -use crate::wal::kafka::TopicManager as KafkaTopicManager; +use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager}; use crate::wal::{WalConfig, WalOptions}; #[derive(Default)] @@ -52,7 +52,7 @@ impl WalOptionsAllocator { let topics = topic_manager.select_batch(num_regions); topics .into_iter() - .map(|topic| WalOptions::Kafka { topic }) + .map(|topic| WalOptions::Kafka(KafkaOptions { topic })) .collect() } }