Skip to content

Commit

Permalink
refactor: refacto serialization and deserialization of wal options
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 18, 2023
1 parent 56c325d commit c430bc9
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 39 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ prost.workspace = true
regex.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with = "3"
snafu.workspace = true
store-api.workspace = true
strum.workspace = true
Expand Down
4 changes: 3 additions & 1 deletion src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,9 @@ impl TableCreator {
// Encodes each wal options.
let wal_options_map = wal_options_map
.into_iter()
.map(|(region_number, wal_options)| (region_number, wal_options.into()))
.map(|(region_number, wal_options)| {
(region_number, serde_json::to_string(&wal_options).unwrap())
})
.collect();

Self {
Expand Down
62 changes: 26 additions & 36 deletions src/common/meta/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ pub mod options_allocator;
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use serde_with::with_prefix;

use crate::error::Result;
use crate::wal::kafka::{KafkaConfig, Topic as KafkaTopic};
use crate::wal::kafka::KafkaConfig;
pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic};
pub use crate::wal::options_allocator::WalOptionsAllocator;

pub const WAL_OPTIONS_KEY: &str = "wal_options";
Expand All @@ -36,32 +38,21 @@ pub enum WalConfig {
Kafka(KafkaConfig),
}

/// Wal options for a region.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)]
#[serde(tag = "provider")]
/// Wal options allocated to a region.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
#[serde(tag = "wal.provider")]
pub enum WalOptions {
#[default]
#[serde(rename = "raft-engine")]
RaftEngine,
#[serde(rename = "kafka")]
Kafka { topic: KafkaTopic },
}

pub type EncodedWalOptions = String;

impl From<WalOptions> for EncodedWalOptions {
fn from(value: WalOptions) -> Self {
EncodedWalOptions::default()
}
#[serde(with = "prefix_wal_kafka")]
Kafka(KafkaWalOptions),
}

impl TryFrom<EncodedWalOptions> for WalOptions {
type Error = crate::error::Error;
with_prefix!(prefix_wal_kafka "wal.kafka.");

fn try_from(value: EncodedWalOptions) -> Result<Self> {
todo!()
}
}
pub type EncodedWalOptions = String;

#[cfg(test)]
mod tests {
Expand Down Expand Up @@ -111,24 +102,23 @@ mod tests {
#[test]
fn test_serde_wal_options() {
// Test serde raft-engine wal options.
let toml_str = r#"
provider = "raft-engine"
"#;
let wal_options: WalOptions = toml::from_str(toml_str).unwrap();
assert_eq!(wal_options, WalOptions::RaftEngine);
let wal_options = WalOptions::RaftEngine;
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"raft-engine"}"#;
assert_eq!(&encoded, expected);

let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);

// Test serde kafka wal options.
let toml_str = r#"
provider = "kafka"
topic = "test_topic"
"#;
let wal_options: WalOptions = toml::from_str(toml_str).unwrap();
let expected_kafka_topic = "test_topic".to_string();
assert_eq!(
wal_options,
WalOptions::Kafka {
topic: expected_kafka_topic
}
);
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: "test_topic".to_string(),
});
let encoded = serde_json::to_string(&wal_options).unwrap();
let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#;
assert_eq!(&encoded, expected);

let decoded: WalOptions = serde_json::from_str(&encoded).unwrap();
assert_eq!(decoded, wal_options);
}
}
17 changes: 17 additions & 0 deletions src/common/meta/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,20 @@ impl Default for KafkaConfig {
}
}
}

/// Kafka wal options allocated to a region.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct KafkaOptions {
/// Kafka wal topic.
pub topic: Topic,
}

impl Default for KafkaOptions {
fn default() -> Self {
Self {
// To indicates a default deserialized topic is invalid.
topic: "invalid_topic".to_string(),
}
}
}
4 changes: 2 additions & 2 deletions src/common/meta/src/wal/options_allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use crate::error::Result;
use crate::kv_backend::KvBackendRef;
use crate::wal::kafka::TopicManager as KafkaTopicManager;
use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager};
use crate::wal::{WalConfig, WalOptions};

#[derive(Default)]
Expand Down Expand Up @@ -52,7 +52,7 @@ impl WalOptionsAllocator {
let topics = topic_manager.select_batch(num_regions);
topics
.into_iter()
.map(|topic| WalOptions::Kafka { topic })
.map(|topic| WalOptions::Kafka(KafkaOptions { topic }))
.collect()
}
}
Expand Down

0 comments on commit c430bc9

Please sign in to comment.