Skip to content

Commit

Permalink
feat: distinguish topic content by format (#18)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <[email protected]>
  • Loading branch information
tisonkun authored Nov 15, 2024
1 parent b44c9cb commit 5e7f976
Show file tree
Hide file tree
Showing 11 changed files with 524 additions and 301 deletions.
584 changes: 437 additions & 147 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ gix-discover = { version = "0.36" }
insta = { version = "1.40", features = ["json"] }
local-ip-address = { version = "0.6" }
log = { version = "0.4", features = ["kv_unstable_serde", "serde"] }
logforth = { version = "0.14" }
mea = { version = "0.0.4" }
logforth = { version = "0.18" }
mea = { version = "0.0.6" }
mime = { version = "0.3" }
opendal = { version = "0.50" }
pin-project = { version = "1.1" }
Expand All @@ -70,7 +70,7 @@ reqwest = { version = "0.12", features = ["json", "rustls-tls"] }
scopeguard = { version = "1.2" }
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
shadow-rs = { version = "0.35.1" }
shadow-rs = { version = "0.36" }
sqlx = { version = "0.8", features = [
"json",
"postgres",
Expand All @@ -80,13 +80,13 @@ sqlx = { version = "0.8", features = [
tempfile = { version = "3.13" }
test-harness = { version = "0.3" }
testcontainers = { version = "0.23", features = ["blocking"] }
thiserror = { version = "1.0" }
thiserror = { version = "2.0" }
tokio = { version = "1.41", features = ["full"] }
tokio-util = { version = "0.7", features = ["compat"] }
toml = { version = "0.8" }
url = { version = "2.5" }
uuid = { version = "1.11", features = ["v4"] }
which = { version = "6.0" }
which = { version = "7.0" }

# workspace dependencies
kafka-api = { version = "0.4.1", path = "api/kafka-api" }
Expand Down
9 changes: 9 additions & 0 deletions api/protos/src/property/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@ use serde::Serialize;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicProps {
pub format: TopicFormat,
pub storage: StorageProps,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TopicFormat {
#[serde(rename = "kafka")]
Kafka,
#[serde(rename = "wal")]
WAL,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "scheme")]
pub enum StorageProps {
Expand Down
2 changes: 2 additions & 0 deletions crates/kafka-broker/src/broker/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use kafka_api::schemata::metadata_response::MetadataResponse;
use kafka_api::schemata::metadata_response::MetadataResponseBroker;
use kafka_api::schemata::metadata_response::MetadataResponsePartition;
use kafka_api::schemata::metadata_response::MetadataResponseTopic;
use morax_protos::property::TopicFormat;
use morax_protos::property::TopicProps;

use crate::broker::Broker;
Expand Down Expand Up @@ -143,6 +144,7 @@ impl Broker {
partitions: topic.num_partitions.max(1),
properties: TopicProps {
storage: self.fallback_storage.clone(),
format: TopicFormat::Kafka,
},
};

Expand Down
23 changes: 22 additions & 1 deletion crates/kafka-broker/src/broker/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use kafka_api::schemata::offset_fetch_response::OffsetFetchResponsePartitions;
use kafka_api::schemata::offset_fetch_response::OffsetFetchResponseTopic;
use kafka_api::schemata::offset_fetch_response::OffsetFetchResponseTopics;
use kafka_api::schemata::request_header::RequestHeader;
use morax_meta::Topic;
use morax_meta::TopicPartitionSplit;
use morax_protos::property::TopicFormat;
use morax_storage::TopicStorage;

use crate::broker::Broker;
Expand Down Expand Up @@ -159,7 +161,26 @@ impl Broker {
} else {
self.meta.get_topics_by_name(topic.topic.clone()).await
} {
Ok(topic) => TopicStorage::new(topic.properties.0.storage),
Ok(Topic { properties, .. }) => match &properties.format {
TopicFormat::Kafka => TopicStorage::new(properties.0.storage),
format => {
log::error!("unsupported topic format: {format:?}");
let mut partitions = vec![];
for _ in topic.partitions.iter() {
partitions.push(PartitionData {
error_code: ErrorCode::UNSUPPORTED_FOR_MESSAGE_FORMAT.code(),
..Default::default()
});
}
responses.push(FetchableTopicResponse {
topic: topic.topic.clone(),
topic_id: topic.topic_id,
partitions,
..Default::default()
});
continue;
}
},
Err(err) => {
log::error!("failed to fetch topic metadata: {err:?}");
let mut partitions = vec![];
Expand Down
27 changes: 26 additions & 1 deletion crates/kafka-broker/src/broker/produce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use kafka_api::schemata::produce_request::ProduceRequest;
use kafka_api::schemata::produce_response::PartitionProduceResponse;
use kafka_api::schemata::produce_response::ProduceResponse;
use kafka_api::schemata::produce_response::TopicProduceResponse;
use morax_meta::Topic;
use morax_protos::property::TopicFormat;
use morax_storage::TopicStorage;

use crate::broker::Broker;
Expand Down Expand Up @@ -51,7 +53,30 @@ impl Broker {
for topic in request.topic_data {
let topic_name = topic.name.clone();
let topic_storage = match self.meta.get_topics_by_name(topic_name.clone()).await {
Ok(topic) => TopicStorage::new(topic.properties.0.storage),
Ok(Topic { properties, .. }) => match &properties.format {
TopicFormat::Kafka => TopicStorage::new(properties.0.storage),
format => {
log::error!("unsupported topic format: {format:?}");
let mut partition_responses = vec![];
for partition in topic.partition_data {
if partition.records.is_some() {
partition_responses.push(PartitionProduceResponse {
error_code: ErrorCode::UNSUPPORTED_FOR_MESSAGE_FORMAT.code(),
error_message: Some(format!(
"unsupported topic format: {format:?}"
)),
..Default::default()
});
}
}
responses.push(TopicProduceResponse {
name: topic_name,
partition_responses,
..Default::default()
});
continue;
}
},
Err(err) => {
log::error!("malformed record batches: {err:?}");
let mut partition_responses = vec![];
Expand Down
2 changes: 0 additions & 2 deletions crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,3 @@ pub use crate::global::*;

mod runtime;
pub use crate::runtime::*;

pub mod scheduled_task;
135 changes: 0 additions & 135 deletions crates/runtime/src/scheduled_task.rs

This file was deleted.

17 changes: 7 additions & 10 deletions crates/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,22 @@
// limitations under the License.

use logforth::append;
use logforth::filter::env::EnvFilterBuilder;
use logforth::filter::env_filter::EnvFilterBuilder;
use logforth::filter::EnvFilter;
use logforth::Dispatch;
use logforth::Logger;
use morax_protos::config::TelemetryConfig;

pub fn init(config: &TelemetryConfig) {
let mut logger = Logger::new();
let mut logger = logforth::builder();

// stderr logger
if let Some(ref stderr) = config.log.stderr {
logger = logger.dispatch(
Dispatch::new()
.filter(make_rust_log_filter_with_default_env(&stderr.filter))
.append(append::Stderr::default()),
);
logger = logger.dispatch(|d| {
d.filter(make_rust_log_filter_with_default_env(&stderr.filter))
.append(append::Stderr::default())
});
}

let _ = logger.apply();
logger.apply();
}

fn make_rust_log_filter(filter: &str) -> EnvFilter {
Expand Down
14 changes: 14 additions & 0 deletions crates/wal-broker/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ use std::sync::Arc;

use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use error_stack::bail;
use error_stack::Result;
use error_stack::ResultExt;
use morax_meta::CommitRecordBatchesRequest;
use morax_meta::CreateTopicRequest;
use morax_meta::FetchRecordBatchesRequest;
use morax_meta::PostgresMetaService;
use morax_protos::property::TopicFormat;
use morax_protos::rpc::AppendLogRequest;
use morax_protos::rpc::AppendLogResponse;
use morax_protos::rpc::CreateLogRequest;
Expand Down Expand Up @@ -80,6 +82,12 @@ impl WALBroker {
.get_topics_by_name(name.clone())
.await
.change_context_lazy(make_error)?;
if !matches!(topic.properties.format, TopicFormat::WAL) {
bail!(BrokerError(format!(
"unsupported topic format: {:?}",
topic.properties.format
)));
}
let topic_storage = TopicStorage::new(topic.properties.0.storage);

let splits = self
Expand Down Expand Up @@ -133,6 +141,12 @@ impl WALBroker {
.get_topics_by_name(name.clone())
.await
.change_context_lazy(make_error)?;
if !matches!(topic.properties.format, TopicFormat::WAL) {
bail!(BrokerError(format!(
"unsupported topic format: {:?}",
topic.properties.format
)));
}
let topic_storage = TopicStorage::new(topic.properties.0.storage);

let entry_cnt = request.entries.len();
Expand Down
2 changes: 2 additions & 0 deletions tests/wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::process::ExitCode;
use morax_protos::config::LogConfig;
use morax_protos::config::StderrAppenderConfig;
use morax_protos::config::TelemetryConfig;
use morax_protos::property::TopicFormat;
use morax_protos::property::TopicProps;
use tests_toolkit::make_test_name;

Expand Down Expand Up @@ -53,6 +54,7 @@ where
client,
topic_props: TopicProps {
storage: state.env_props.storage,
format: TopicFormat::WAL,
},
})
.await
Expand Down

0 comments on commit 5e7f976

Please sign in to comment.