From 594db453b8df992d8ae4f6d27cf3934b0977b162 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 11:23:45 +0800 Subject: [PATCH 01/22] feat: introduce client manager --- Cargo.lock | 3 + src/log-store/Cargo.toml | 3 + src/log-store/src/kafka.rs | 1 + src/log-store/src/kafka/client_manager.rs | 130 ++++++++++++++++++++++ 4 files changed, 137 insertions(+) create mode 100644 src/log-store/src/kafka/client_manager.rs diff --git a/Cargo.lock b/Cargo.lock index 78f7e5c7470e..519af260d3f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4472,12 +4472,15 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", + "dashmap", "futures", "futures-util", "protobuf", "protobuf-build", "raft-engine", "rand", + "rskafka", + "serde_json", "snafu", "store-api", "tokio", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index b3ce74640a21..1cb6f5ebc822 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -21,10 +21,13 @@ common-macro.workspace = true common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true +dashmap = "5.4" futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true +rskafka = "0.5" +serde_json.workspace = true snafu.workspace = true store-api.workspace = true tokio-util.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ea661c380e0c..a69fab00943a 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod client_manager; pub mod log_store; use common_meta::wal::KafkaWalTopic as Topic; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs new file mode 100644 index 000000000000..76708aaadcd9 --- /dev/null +++ b/src/log-store/src/kafka/client_manager.rs @@ -0,0 +1,130 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use common_config::wal::{KafkaConfig, KafkaWalOptions, KafkaWalTopic as Topic}; +use dashmap::mapref::entry::Entry as DashMapEntry; +use dashmap::DashMap; +use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; +use rskafka::client::producer::aggregator::RecordAggregator; +use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; +use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; +use rskafka::BackoffConfig; +use snafu::ResultExt; + +use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; + +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + +/// Arc wrapper of Client. +pub type ClientRef = Arc; +/// Arc wrapper of ClientManager. +pub type ClientManagerRef = Arc; + +/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. +/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. +#[derive(Debug)] +pub struct Client { + /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. + pub raw_client: Arc, + /// A producer used to buffer log entries for a specific topic before sending them in a batching manner. + pub producer: Arc>, +} + +impl Client { + /// Creates a Client from the raw client. + pub fn new(raw_client: Arc, config: &KafkaConfig) -> Self { + let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); + let batch_producer = BatchProducerBuilder::new(raw_client.clone()) + .with_compression(config.compression) + .with_linger(config.linger) + .build(record_aggregator); + + Self { + raw_client, + producer: Arc::new(batch_producer), + } + } +} + +/// Manages client construction and accesses. +#[derive(Debug)] +pub struct ClientManager { + config: KafkaConfig, + /// Top-level client in rskafka. All clients are constructed by this client. + client_factory: RsKafkaClient, + /// A pool maintaining a collection of clients. + /// Key: a topic. Value: the associated client of the topic. + client_pool: DashMap, +} + +impl ClientManager { + /// Tries to create a ClientManager. + pub async fn try_new(config: &KafkaConfig) -> Result { + // Sets backoff config for the top-level rskafka client and all clients constructed by it. + let backoff_config = BackoffConfig { + init_backoff: Duration::from_millis(500), + max_backoff: Duration::from_secs(10), + base: 2., + // Stop reconnecting if the total wait time reaches the deadline. + deadline: Some(Duration::from_secs(60 * 5)), + }; + let client = ClientBuilder::new(config.broker_endpoints.clone()) + .backoff_config(backoff_config) + .build() + .await + .with_context(|_| BuildClientSnafu { + broker_endpoints: config.broker_endpoints.clone(), + })?; + + Ok(Self { + config: config.clone(), + client_factory: client, + client_pool: DashMap::with_capacity(config.num_topics), + }) + } + + /// Gets the client associated with the topic. If the client does not exist, a new one will + /// be created and returned. + pub async fn get_or_insert(&self, topic: &Topic) -> Result { + match self.client_pool.entry(topic.to_string()) { + DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), + DashMapEntry::Vacant(entry) => { + let topic_client = self.try_create_client(topic).await?; + Ok(entry.insert(topic_client).clone()) + } + } + } + + async fn try_create_client(&self, topic: &Topic) -> Result { + // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. + // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. + // The reconnecting won't stop until succeed or a different error returns. + let raw_client = self + .client_factory + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + }) + .map(Arc::new)?; + + Ok(Arc::new(Client::new(raw_client, &self.config))) + } +} From ea3a07724f0daf66fdcacb7f76cbc5de3bc4848b Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 11:39:35 +0800 Subject: [PATCH 02/22] chore: add errors for client manager --- src/log-store/src/error.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index a24a48d84be2..981f8b96d7aa 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -84,6 +84,30 @@ pub enum Error { attempt_index: u64, location: Location, }, + + #[snafu(display( + "Failed to build a rskafka client, broker endpoints: {:?}", + broker_endpoints + ))] + BuildClient { + broker_endpoints: Vec, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display( + "Failed to build a rskafka partition client, topic: {}, partition: {}", + topic, + partition + ))] + BuildPartitionClient { + topic: String, + partition: i32, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, } impl ErrorExt for Error { From 071b9742958604047720cb97e48a7753f09a7fb0 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 15:22:53 +0800 Subject: [PATCH 03/22] chore: add record utils --- Cargo.lock | 1 + src/log-store/Cargo.toml | 1 + src/log-store/src/error.rs | 38 ++++++++++++ src/log-store/src/kafka.rs | 1 + src/log-store/src/kafka/client_manager.rs | 18 +++--- src/log-store/src/kafka/record_utils.rs | 73 +++++++++++++++++++++++ 6 files changed, 123 insertions(+), 9 deletions(-) create mode 100644 src/log-store/src/kafka/record_utils.rs diff --git a/Cargo.lock b/Cargo.lock index 519af260d3f6..10ca536c4c6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4480,6 +4480,7 @@ dependencies = [ "raft-engine", "rand", "rskafka", + "serde", "serde_json", "snafu", "store-api", diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 1cb6f5ebc822..953398b6d73d 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -27,6 +27,7 @@ futures.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true rskafka = "0.5" +serde.workspace = true serde_json.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 981f8b96d7aa..3bfb98885395 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -13,11 +13,17 @@ // limitations under the License. use std::any::Any; +use std::num::TryFromIntError; +use common_config::wal::KafkaWalTopic; use common_error::ext::ErrorExt; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use snafu::{Location, Snafu}; +use store_api::logstore::entry::Id as EntryId; +use store_api::logstore::namespace::Id as NamespaceId; + +use crate::kafka::record_utils::RecordKey; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -108,6 +114,38 @@ pub enum Error { #[snafu(source)] error: rskafka::client::error::Error, }, + + #[snafu(display( + "Failed to get a Kafka topic client, topic: {}, source: {}", + topic, + error + ))] + GetClient { + topic: KafkaWalTopic, + location: Location, + error: String, + }, + + #[snafu(display("Failed to encode a record key, key: {:?}", key))] + EncodeKey { + key: RecordKey, + location: Location, + #[snafu(source)] + error: serde_json::Error, + }, + + #[snafu(display("Failed to decode a record key"))] + DecodeKey { + location: Location, + #[snafu(source)] + error: serde_json::Error, + }, + + #[snafu(display("Missing required key in a record"))] + MissingKey { location: Location }, + + #[snafu(display("Missing required value in a record"))] + MissingValue { location: Location }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index a69fab00943a..373f52624612 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,6 +14,7 @@ mod client_manager; pub mod log_store; +pub(crate) mod record_utils; use common_meta::wal::KafkaWalTopic as Topic; use store_api::logstore::entry::{Entry, Id as EntryId}; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 76708aaadcd9..f9859edea942 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -32,23 +32,23 @@ use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; const DEFAULT_PARTITION: i32 = 0; /// Arc wrapper of Client. -pub type ClientRef = Arc; +pub(super) type ClientRef = Arc; /// Arc wrapper of ClientManager. -pub type ClientManagerRef = Arc; +pub(super) type ClientManagerRef = Arc; /// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. /// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. #[derive(Debug)] -pub struct Client { +pub(super) struct Client { /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. - pub raw_client: Arc, + pub(super) raw_client: Arc, /// A producer used to buffer log entries for a specific topic before sending them in a batching manner. - pub producer: Arc>, + pub(super) producer: Arc>, } impl Client { /// Creates a Client from the raw client. - pub fn new(raw_client: Arc, config: &KafkaConfig) -> Self { + pub(super) fn new(raw_client: Arc, config: &KafkaConfig) -> Self { let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); let batch_producer = BatchProducerBuilder::new(raw_client.clone()) .with_compression(config.compression) @@ -64,7 +64,7 @@ impl Client { /// Manages client construction and accesses. #[derive(Debug)] -pub struct ClientManager { +pub(super) struct ClientManager { config: KafkaConfig, /// Top-level client in rskafka. All clients are constructed by this client. client_factory: RsKafkaClient, @@ -75,7 +75,7 @@ pub struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. - pub async fn try_new(config: &KafkaConfig) -> Result { + pub(super) async fn try_new(config: &KafkaConfig) -> Result { // Sets backoff config for the top-level rskafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: Duration::from_millis(500), @@ -101,7 +101,7 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. - pub async fn get_or_insert(&self, topic: &Topic) -> Result { + pub(super) async fn get_or_insert(&self, topic: &Topic) -> Result { match self.client_pool.entry(topic.to_string()) { DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), DashMapEntry::Vacant(entry) => { diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs new file mode 100644 index 000000000000..5bed03328196 --- /dev/null +++ b/src/log-store/src/kafka/record_utils.rs @@ -0,0 +1,73 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use common_config::wal::KafkaWalTopic as Topic; +use rskafka::record::Record; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; + +use crate::error::{DecodeKeySnafu, EncodeKeySnafu, MissingKeySnafu, MissingValueSnafu, Result}; +use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; + +/// The key of a record. +/// An rskafka record consists of key, value, headers, and datetime. The value of a record +/// is the entry data. Either of the key or the headers can be chosen to store the entry metadata +/// including topic, region id, and entry id. Currently, the entry metadata is stored in the key. +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct RecordKey { + topic: Topic, + region_id: u64, + entry_id: EntryId, +} + +// When writing to a region, a wal entry is constructed from all mutations on the region. +// I.e., a wal entry is itself a log batch and hence no need to group multiple entries into a record. +// That's why the mapping between entries and records are one-one. +impl TryInto for EntryImpl { + type Error = crate::error::Error; + + fn try_into(self) -> Result { + let key = RecordKey { + topic: self.ns.topic, + region_id: self.ns.region_id, + entry_id: self.id, + }; + let raw_key = serde_json::to_vec(&key).context(EncodeKeySnafu { key })?; + + Ok(Record { + key: Some(raw_key), + value: Some(self.data), + headers: BTreeMap::default(), + timestamp: rskafka::chrono::Utc::now(), + }) + } +} + +impl TryFrom for EntryImpl { + type Error = crate::error::Error; + + fn try_from(record: Record) -> Result { + let raw_key = record.key.context(MissingKeySnafu)?; + let key: RecordKey = serde_json::from_slice(&raw_key).context(DecodeKeySnafu)?; + let data = record.value.context(MissingValueSnafu)?; + + Ok(Self { + id: key.entry_id, + ns: NamespaceImpl::new(key.region_id, key.topic), + data, + }) + } +} From 3b779b99da6124a590df428a5af80545fa6e8937 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 17:27:02 +0800 Subject: [PATCH 04/22] chore: impl kafka log store --- src/log-store/src/error.rs | 56 ++++++- src/log-store/src/kafka.rs | 2 +- src/log-store/src/kafka/log_store.rs | 211 +++++++++++++++++++++--- src/log-store/src/kafka/record_utils.rs | 12 +- src/store-api/src/logstore.rs | 4 +- 5 files changed, 255 insertions(+), 30 deletions(-) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 3bfb98885395..4484238b90d2 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::num::TryFromIntError; use common_config::wal::KafkaWalTopic; use common_error::ext::ErrorExt; @@ -21,9 +20,6 @@ use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use snafu::{Location, Snafu}; use store_api::logstore::entry::Id as EntryId; -use store_api::logstore::namespace::Id as NamespaceId; - -use crate::kafka::record_utils::RecordKey; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -126,9 +122,9 @@ pub enum Error { error: String, }, - #[snafu(display("Failed to encode a record key, key: {:?}", key))] + #[snafu(display("Failed to encode a record key, key: {}", key))] EncodeKey { - key: RecordKey, + key: String, location: Location, #[snafu(source)] error: serde_json::Error, @@ -146,6 +142,54 @@ pub enum Error { #[snafu(display("Missing required value in a record"))] MissingValue { location: Location }, + + #[snafu(display( + "Missing required entry offset, entry_id: {}, region_id: {}, topic: {}", + entry_id, + region_id, + topic + ))] + MissingOffset { + entry_id: EntryId, + region_id: u64, + topic: KafkaWalTopic, + location: Location, + }, + + #[snafu(display("Failed to produce entries to Kafka, topic: {}", topic))] + ProduceEntries { + topic: KafkaWalTopic, + location: Location, + #[snafu(source)] + error: rskafka::client::producer::Error, + }, + + #[snafu(display("The produce tasks return an empty offset vector, topic: {}", topic))] + EmptyOffsets { + topic: KafkaWalTopic, + location: Location, + }, + + #[snafu(display( + "Failed to cast an rskafka offset to entry offset, rskafka_offset: {}", + offset + ))] + CastOffset { offset: i64, location: Location }, + + #[snafu(display( + "Failed to read a record from Kafka, offset {}, topic: {}, region id: {}", + offset, + topic, + region_id, + ))] + ConsumeRecord { + offset: i64, + topic: String, + region_id: u64, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 373f52624612..0a2506d9728b 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,7 +14,7 @@ mod client_manager; pub mod log_store; -pub(crate) mod record_utils; +mod record_utils; use common_meta::wal::KafkaWalTopic as Topic; use store_api::logstore::entry::{Entry, Id as EntryId}; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 5aa2d9ec7d52..83f68964b897 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -13,22 +13,96 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; -use common_config::wal::{KafkaConfig, WalOptions}; -use store_api::logstore::entry::Id as EntryId; +use common_config::wal::{KafkaConfig, KafkaWalOptions, KafkaWalTopic as Topic, WalOptions}; +use futures_util::StreamExt; +use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; +use rskafka::record::{Record, RecordAndOffset}; +use snafu::{OptionExt, ResultExt}; +use store_api::logstore::entry::{Id as EntryId, Offset as EntryOffset}; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error::{Error, Result}; +use crate::error::{ + CastOffsetSnafu, ConsumeRecordSnafu, EmptyOffsetsSnafu, Error, GetClientSnafu, + MissingOffsetSnafu, ProduceEntriesSnafu, Result, +}; +use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::{EntryImpl, NamespaceImpl}; +type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; + +// TODO(niebayes): implement building log store upon start. + +/// A log store backed by Kafka. #[derive(Debug)] -pub struct KafkaLogStore; +pub struct KafkaLogStore { + config: KafkaConfig, + /// Manages rskafka clients through which the log store contact the Kafka cluster. + client_manager: ClientManagerRef, +} impl KafkaLogStore { - pub async fn try_new(config: KafkaConfig) -> Result { - todo!() + /// Tries to create a Kafka log store. + pub async fn try_new(config: &KafkaConfig) -> Result { + Ok(Self { + client_manager: Arc::new(ClientManager::try_new(config).await?), + config: config.clone(), + }) + } + + /// Appends a batch of entries to a topic. The entries may come from multiple regions. + /// Returns a tuple where the first element is the topic while the second is the minimum + /// start offset of the entries appended to the topic. + async fn append_batch_to_topic( + &self, + entries: Vec, + topic: Topic, + ) -> Result<(Topic, EntryOffset)> { + // Safety: the caller ensures the input entries is not empty. + assert!(!entries.is_empty()); + + let region_ids = entries + .iter() + .map(|entry| entry.ns.region_id) + .collect::>(); + + // Gets the client associated with the topic. + let client = self + .client_manager + .get_or_insert(&topic) + .await + .map_err(|e| { + GetClientSnafu { + topic: &topic, + error: e.to_string(), + } + .build() + })?; + + // Convert entries to records and produce them to Kafka. + let mut tasks = Vec::with_capacity(entries.len()); + for entry in entries { + let record: Record = entry.try_into()?; + let task = client.producer.produce(record); + tasks.push(task); + } + // Each produce task will return an offset to represent the minimum start offset of the entries produced by the task. + let offsets = futures::future::try_join_all(tasks) + .await + .context(ProduceEntriesSnafu { topic: &topic })?; + + // Since the task completion order is not deterministic, a `min` operation is required to find the minimum offset. + let min_offset = offsets + .into_iter() + .min() + .context(EmptyOffsetsSnafu { topic: &topic })?; + let min_offset: EntryOffset = min_offset + .try_into() + .map_err(|_| CastOffsetSnafu { offset: min_offset }.build())?; + Ok((topic, min_offset)) } } @@ -38,7 +112,7 @@ impl LogStore for KafkaLogStore { type Entry = EntryImpl; type Namespace = NamespaceImpl; - /// Create an entry of the associate Entry type. + /// Creates an entry of the associated Entry type. fn entry>( &self, data: D, @@ -48,33 +122,103 @@ impl LogStore for KafkaLogStore { EntryImpl::new(data.as_ref().to_vec(), entry_id, ns) } - /// Append an `Entry` to WAL with given namespace and return append response containing - /// the entry id. + /// Appends an entry to the log store and returns a response containing the entry id and an optional entry offset. async fn append(&self, entry: Self::Entry) -> Result { - todo!() + let entry_id = entry.id; + let region_id = entry.ns.region_id; + let topic = entry.ns.topic.clone(); + + let offset = self.append_batch_to_topic(vec![entry], topic).await?.1; + Ok(AppendResponse { + entry_id, + offset: Some(offset), + }) } - /// For a batch of log entries belonging to multiple regions, each assigned to a specific topic, - /// we need to determine the minimum log offset returned for each region in this batch. - /// During replay, we use this offset to fetch log entries for a region from its assigned topic. - /// After fetching, we filter the entries to obtain log entries relevant to that specific region. + /// For a batch of entries belonging to multiple regions, each assigned to a specific topic, + /// we need to determine the minimum start offset returned for each region in this batch. + /// During replay a region, we use this offset to fetch entries for the region from its assigned topic. + /// After fetching, we filter the entries to obtain entries relevant to the region. async fn append_batch(&self, entries: Vec) -> Result { - todo!() + if entries.is_empty() { + return Ok(AppendBatchResponse::default()); + } + + // The entries are grouped by topic since the number of regions might be very large + // while the number of topics are well controlled, + let mut topic_entries: HashMap<_, Vec<_>> = HashMap::new(); + // A utility map used to construct the result response. + let mut topic_regions: HashMap<_, Vec<_>> = HashMap::new(); + for entry in entries { + let topic = entry.ns.topic.clone(); + let region_id = entry.ns.region_id; + topic_entries.entry(topic.clone()).or_default().push(entry); + topic_regions.entry(topic).or_default().push(region_id); + } + + // Appends each group of entries to the corresponding topic. + let tasks = topic_entries + .into_iter() + .map(|(topic, entries)| self.append_batch_to_topic(entries, topic)) + .collect::>(); + let topic_offset: HashMap<_, _> = futures::future::try_join_all(tasks) + .await? + .into_iter() + .collect(); + + let mut region_offset = HashMap::new(); + for (topic, regions) in topic_regions { + let offset = topic_offset[&topic]; + regions.into_iter().for_each(|region| { + region_offset.insert(region, offset); + }); + } + Ok(AppendBatchResponse { + offsets: region_offset, + }) } - /// Create a new `EntryStream` to asynchronously generates `Entry` with ids - /// starting from `id`. The generated entries will be filtered by the namespace. + /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids + /// starting from `entry_id`. The generated entries will be filtered by the namespace. async fn read( &self, ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { - todo!() + let topic = ns.topic.clone(); + let region_id = ns.region_id; + let offset = try_get_offset(entry_id)?; + + let raw_client = self + .client_manager + .get_or_insert(&topic) + .await? + .raw_client + .clone(); + // Reads the entries starting from exactly the specified offset. + let mut stream_consumer = StreamConsumerBuilder::new(raw_client, StartOffset::At(offset)) + .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) + .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) + .build(); + let stream = async_stream::stream!({ + while let Some(consume_result) = stream_consumer.next().await { + yield handle_consume_result(consume_result, &topic, region_id, offset); + } + }); + Ok(Box::pin(stream)) } /// Create a namespace of the associate Namespace type fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - todo!() + // TODO(niebayes): we need to add necessary validations on wal options and wal config to ensure they're consistent. + // With the validations added, we can safely use unreachable here. + let WalOptions::Kafka(kafka_options) = wal_options else { + unreachable!() + }; + NamespaceImpl { + region_id: ns_id, + topic: kafka_options.topic.clone(), + } } /// Create a new `Namespace`. @@ -104,3 +248,32 @@ impl LogStore for KafkaLogStore { Ok(()) } } + +// Tries to get the physical offset of the entry with the given entry id. +fn try_get_offset(entry_id: EntryId) -> Result { + todo!() +} + +fn handle_consume_result( + result: ConsumeResult, + topic: &Topic, + region_id: u64, + offset: i64, +) -> Result> { + match result { + Ok((record_and_offset, _)) => { + let entry = EntryImpl::try_from(record_and_offset.record)?; + // Only produces entries belonging to the region with the given region id. + if entry.ns.region_id == region_id { + Ok(vec![entry]) + } else { + Ok(vec![]) + } + } + Err(e) => Err(e).context(ConsumeRecordSnafu { + offset, + topic, + region_id, + }), + } +} diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 5bed03328196..0a1b59595671 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -27,12 +27,18 @@ use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; /// is the entry data. Either of the key or the headers can be chosen to store the entry metadata /// including topic, region id, and entry id. Currently, the entry metadata is stored in the key. #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct RecordKey { +struct RecordKey { topic: Topic, region_id: u64, entry_id: EntryId, } +impl ToString for RecordKey { + fn to_string(&self) -> String { + format!("{}/{}/{}", self.topic, self.region_id, self.entry_id) + } +} + // When writing to a region, a wal entry is constructed from all mutations on the region. // I.e., a wal entry is itself a log batch and hence no need to group multiple entries into a record. // That's why the mapping between entries and records are one-one. @@ -45,7 +51,9 @@ impl TryInto for EntryImpl { region_id: self.ns.region_id, entry_id: self.id, }; - let raw_key = serde_json::to_vec(&key).context(EncodeKeySnafu { key })?; + let raw_key = serde_json::to_vec(&key).context(EncodeKeySnafu { + key: key.to_string(), + })?; Ok(Record { key: Some(raw_key), diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 3fb81d9a624c..e63b230f201f 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -82,9 +82,9 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// The response of an `append` operation. #[derive(Debug)] pub struct AppendResponse { - /// The entry id of the appended log entry. + /// The logical id of the appended log entry. pub entry_id: EntryId, - /// The start entry offset of the appended log entry. + /// The physical start offset of the appended log entry. /// Depends on the `LogStore` implementation, the entry offset may be missing. pub offset: Option, } From eb597ce8ddc2ec42745c923dcb020346a6f106ff Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 17:33:49 +0800 Subject: [PATCH 05/22] chore: build kafka log store upon starting datanode --- src/datanode/src/datanode.rs | 7 +++++-- src/log-store/src/kafka/log_store.rs | 2 -- src/store-api/src/logstore.rs | 1 + 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 604146a5d4a3..cb672a1ca064 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -493,8 +493,11 @@ impl DatanodeBuilder { /// Builds [KafkaLogStore]. async fn build_kafka_log_store(config: &KafkaConfig) -> Result> { - let _ = config; - todo!() + KafkaLogStore::try_new(config) + .await + .map_err(Box::new) + .context(OpenLogStoreSnafu) + .map(Arc::new) } /// Builds [ObjectStoreManager] diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 83f68964b897..3a409873e9f7 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -34,8 +34,6 @@ use crate::kafka::{EntryImpl, NamespaceImpl}; type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; -// TODO(niebayes): implement building log store upon start. - /// A log store backed by Kafka. #[derive(Debug)] pub struct KafkaLogStore { diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index e63b230f201f..6d124a499bb5 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -94,5 +94,6 @@ pub struct AppendResponse { pub struct AppendBatchResponse { /// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region. /// Depends on the `LogStore` implementation, the entry offsets may be missing. + // TODO(niebayes): the offset seems shouldn't be exposed to users of wal. But for now, let's keep it. pub offsets: HashMap, } From eed0def0915cbb13d3d68019cbb8eda1e43d99ed Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 17:39:52 +0800 Subject: [PATCH 06/22] chore: update comments for kafka log store --- src/log-store/src/kafka/log_store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 3a409873e9f7..9aab4dd207c6 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -208,8 +208,9 @@ impl LogStore for KafkaLogStore { /// Create a namespace of the associate Namespace type fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - // TODO(niebayes): we need to add necessary validations on wal options and wal config to ensure they're consistent. - // With the validations added, we can safely use unreachable here. + // Warning: we assume the database manager, not the database itself, is responsible for ensuring that + // the wal config for metasrv and that for datanode are consistent, i.e. the wal provider should be identical. + // With such an assumption, the unreachable is safe here. let WalOptions::Kafka(kafka_options) = wal_options else { unreachable!() }; From 047941ba8eb59ddf988b5df284617f9a6e4423c2 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 17:45:14 +0800 Subject: [PATCH 07/22] chore: add a todo for getting entry offset --- src/log-store/src/kafka/log_store.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 9aab4dd207c6..d870c8cf89bd 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -248,7 +248,15 @@ impl LogStore for KafkaLogStore { } } -// Tries to get the physical offset of the entry with the given entry id. +/// Tries to get the physical offset of the entry with the given entry id. +// TODO(niebayes): a mapping between entry id and entry offset needs to maintained at somewhere. +// One solution is to store the mapping at a specific Kafka topic. Each time the mapping is updated, +// a new record is constructed and appended to the topic. On initializing the log store, the latest +// record is pulled from Kafka cluster. +// +// Another solution is to store the mapping at the kv backend. We design a dedicated key, e.g. ID_TO_OFFSET_MAP_KEY. +// Each time the mapping is updated, the map is serialized into a vector of bytes and stored into the kv backend at the given key. +// On initializing the log store, the map is deserialized from the kv backend. fn try_get_offset(entry_id: EntryId) -> Result { todo!() } From 27181d65c744e4a0404f5ed358924ab37b3d1fe5 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 17:57:00 +0800 Subject: [PATCH 08/22] fix: typo --- src/log-store/src/error.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 4484238b90d2..6fcc38687d42 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -88,7 +88,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a rskafka client, broker endpoints: {:?}", + "Failed to build an rskafka client, broker endpoints: {:?}", broker_endpoints ))] BuildClient { @@ -99,7 +99,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a rskafka partition client, topic: {}, partition: {}", + "Failed to build an rskafka partition client, topic: {}, partition: {}", topic, partition ))] From 2260cf9f55374820684dfba4cffb42c821f383f0 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 18:05:58 +0800 Subject: [PATCH 09/22] chore: remove unused --- src/log-store/src/error.rs | 14 -------------- src/log-store/src/kafka.rs | 8 -------- src/log-store/src/kafka/client_manager.rs | 2 +- src/log-store/src/kafka/log_store.rs | 11 +++-------- src/log-store/src/lib.rs | 1 - 5 files changed, 4 insertions(+), 32 deletions(-) diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 6fcc38687d42..841cbe21a7b4 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -19,7 +19,6 @@ use common_error::ext::ErrorExt; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use snafu::{Location, Snafu}; -use store_api::logstore::entry::Id as EntryId; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -143,19 +142,6 @@ pub enum Error { #[snafu(display("Missing required value in a record"))] MissingValue { location: Location }, - #[snafu(display( - "Missing required entry offset, entry_id: {}, region_id: {}, topic: {}", - entry_id, - region_id, - topic - ))] - MissingOffset { - entry_id: EntryId, - region_id: u64, - topic: KafkaWalTopic, - location: Location, - }, - #[snafu(display("Failed to produce entries to Kafka, topic: {}", topic))] ProduceEntries { topic: KafkaWalTopic, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 0a2506d9728b..fe7d001d70a5 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -33,14 +33,6 @@ impl NamespaceImpl { fn new(region_id: u64, topic: Topic) -> Self { Self { region_id, topic } } - - fn region_id(&self) -> u64 { - self.region_id - } - - fn topic(&self) -> &Topic { - &self.topic - } } impl Namespace for NamespaceImpl { diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index f9859edea942..74eea2538d3b 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Duration; -use common_config::wal::{KafkaConfig, KafkaWalOptions, KafkaWalTopic as Topic}; +use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; use dashmap::mapref::entry::Entry as DashMapEntry; use dashmap::DashMap; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index d870c8cf89bd..21f650d0fc23 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::{KafkaConfig, KafkaWalOptions, KafkaWalTopic as Topic, WalOptions}; +use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use rskafka::record::{Record, RecordAndOffset}; @@ -27,7 +27,7 @@ use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error::{ CastOffsetSnafu, ConsumeRecordSnafu, EmptyOffsetsSnafu, Error, GetClientSnafu, - MissingOffsetSnafu, ProduceEntriesSnafu, Result, + ProduceEntriesSnafu, Result, }; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::{EntryImpl, NamespaceImpl}; @@ -62,11 +62,6 @@ impl KafkaLogStore { // Safety: the caller ensures the input entries is not empty. assert!(!entries.is_empty()); - let region_ids = entries - .iter() - .map(|entry| entry.ns.region_id) - .collect::>(); - // Gets the client associated with the topic. let client = self .client_manager @@ -123,7 +118,6 @@ impl LogStore for KafkaLogStore { /// Appends an entry to the log store and returns a response containing the entry id and an optional entry offset. async fn append(&self, entry: Self::Entry) -> Result { let entry_id = entry.id; - let region_id = entry.ns.region_id; let topic = entry.ns.topic.clone(); let offset = self.append_batch_to_topic(vec![entry], topic).await?.1; @@ -258,6 +252,7 @@ impl LogStore for KafkaLogStore { // Each time the mapping is updated, the map is serialized into a vector of bytes and stored into the kv backend at the given key. // On initializing the log store, the map is deserialized from the kv backend. fn try_get_offset(entry_id: EntryId) -> Result { + let _ = entry_id; todo!() } diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 901a202f2a48..7c57d6d2e766 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -15,7 +15,6 @@ #![feature(let_chains)] pub mod error; -#[allow(unused)] pub mod kafka; mod noop; pub mod raft_engine; From 66f7d9c4390a75eac320867d4bb1dbf25685a176 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 19:26:24 +0800 Subject: [PATCH 10/22] chore: update comments --- src/log-store/src/kafka/log_store.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 21f650d0fc23..83e2c5144a9c 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -202,8 +202,8 @@ impl LogStore for KafkaLogStore { /// Create a namespace of the associate Namespace type fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - // Warning: we assume the database manager, not the database itself, is responsible for ensuring that - // the wal config for metasrv and that for datanode are consistent, i.e. the wal provider should be identical. + // Safety: we assume the database administrator, not the database itself, is responsible for ensuring that + // the wal config for metasrv and that for datanode are consistent, i.e. their wal providers should be identical. // With such an assumption, the unreachable is safe here. let WalOptions::Kafka(kafka_options) = wal_options else { unreachable!() @@ -248,9 +248,13 @@ impl LogStore for KafkaLogStore { // a new record is constructed and appended to the topic. On initializing the log store, the latest // record is pulled from Kafka cluster. // -// Another solution is to store the mapping at the kv backend. We design a dedicated key, e.g. ID_TO_OFFSET_MAP_KEY. +// The second solution is to store the mapping at the kv backend. We design a dedicated key, e.g. ID_TO_OFFSET_MAP_KEY. // Each time the mapping is updated, the map is serialized into a vector of bytes and stored into the kv backend at the given key. // On initializing the log store, the map is deserialized from the kv backend. +// +// The third solution is to store the offset for each region separately. More specifically, when flushed, the +// latest entry offset is stored in the RegionManifest and then persisted into the manifest file. On openning +// a region, the offset is restored and maintained at somewhere in memory. fn try_get_offset(entry_id: EntryId) -> Result { let _ = entry_id; todo!() From 45ac5181f3f6575d37b6566037f20821339014f6 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 19:28:58 +0800 Subject: [PATCH 11/22] fix: typo --- src/log-store/src/kafka/log_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 83e2c5144a9c..7fce5c9373a2 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -253,7 +253,7 @@ impl LogStore for KafkaLogStore { // On initializing the log store, the map is deserialized from the kv backend. // // The third solution is to store the offset for each region separately. More specifically, when flushed, the -// latest entry offset is stored in the RegionManifest and then persisted into the manifest file. On openning +// latest entry offset is stored in the RegionManifest and then persisted into the manifest file. On opening // a region, the offset is restored and maintained at somewhere in memory. fn try_get_offset(entry_id: EntryId) -> Result { let _ = entry_id; From 8a5dab8226c841778027189944485d931f7687e2 Mon Sep 17 00:00:00 2001 From: niebayes Date: Thu, 21 Dec 2023 19:44:04 +0800 Subject: [PATCH 12/22] fix: resolve some review conversations --- src/log-store/src/kafka/client_manager.rs | 18 +++++++++--------- src/log-store/src/kafka/log_store.rs | 7 +++---- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 74eea2538d3b..d979c0a838e8 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -32,23 +32,23 @@ use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; const DEFAULT_PARTITION: i32 = 0; /// Arc wrapper of Client. -pub(super) type ClientRef = Arc; +pub(crate) type ClientRef = Arc; /// Arc wrapper of ClientManager. -pub(super) type ClientManagerRef = Arc; +pub(crate) type ClientManagerRef = Arc; /// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. /// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. #[derive(Debug)] -pub(super) struct Client { +pub(crate) struct Client { /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. - pub(super) raw_client: Arc, + pub(crate) raw_client: Arc, /// A producer used to buffer log entries for a specific topic before sending them in a batching manner. - pub(super) producer: Arc>, + pub(crate) producer: Arc>, } impl Client { /// Creates a Client from the raw client. - pub(super) fn new(raw_client: Arc, config: &KafkaConfig) -> Self { + pub(crate) fn new(raw_client: Arc, config: &KafkaConfig) -> Self { let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); let batch_producer = BatchProducerBuilder::new(raw_client.clone()) .with_compression(config.compression) @@ -64,7 +64,7 @@ impl Client { /// Manages client construction and accesses. #[derive(Debug)] -pub(super) struct ClientManager { +pub(crate) struct ClientManager { config: KafkaConfig, /// Top-level client in rskafka. All clients are constructed by this client. client_factory: RsKafkaClient, @@ -75,7 +75,7 @@ pub(super) struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. - pub(super) async fn try_new(config: &KafkaConfig) -> Result { + pub(crate) async fn try_new(config: &KafkaConfig) -> Result { // Sets backoff config for the top-level rskafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: Duration::from_millis(500), @@ -101,7 +101,7 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. - pub(super) async fn get_or_insert(&self, topic: &Topic) -> Result { + pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { match self.client_pool.entry(topic.to_string()) { DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), DashMapEntry::Vacant(entry) => { diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 7fce5c9373a2..bba9490c352c 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -119,8 +119,7 @@ impl LogStore for KafkaLogStore { async fn append(&self, entry: Self::Entry) -> Result { let entry_id = entry.id; let topic = entry.ns.topic.clone(); - - let offset = self.append_batch_to_topic(vec![entry], topic).await?.1; + let (_, offset) = self.append_batch_to_topic(vec![entry], topic).await?; Ok(AppendResponse { entry_id, offset: Some(offset), @@ -153,10 +152,10 @@ impl LogStore for KafkaLogStore { .into_iter() .map(|(topic, entries)| self.append_batch_to_topic(entries, topic)) .collect::>(); - let topic_offset: HashMap<_, _> = futures::future::try_join_all(tasks) + let topic_offset = futures::future::try_join_all(tasks) .await? .into_iter() - .collect(); + .collect::>(); let mut region_offset = HashMap::new(); for (topic, regions) in topic_regions { From bc7938d396938b37f156ec13f02e0dc69ccbd0fb Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 16:56:56 +0800 Subject: [PATCH 13/22] chore: move commonly referenced crates to workspace Cargo.toml --- Cargo.toml | 2 ++ src/catalog/Cargo.toml | 4 ++-- src/common/config/Cargo.toml | 2 +- src/common/decimal/Cargo.toml | 2 +- src/common/grpc/Cargo.toml | 2 +- src/common/meta/Cargo.toml | 2 +- src/common/procedure/Cargo.toml | 2 +- src/common/time/Cargo.toml | 2 +- src/datanode/Cargo.toml | 4 ++-- src/datatypes/Cargo.toml | 2 +- src/file-engine/Cargo.toml | 2 +- src/frontend/Cargo.toml | 2 +- src/log-store/Cargo.toml | 4 ++-- src/meta-srv/Cargo.toml | 4 ++-- src/mito2/Cargo.toml | 2 +- src/operator/Cargo.toml | 2 +- src/partition/Cargo.toml | 2 +- src/query/Cargo.toml | 2 +- src/servers/Cargo.toml | 4 ++-- src/store-api/Cargo.toml | 2 +- tests-integration/Cargo.toml | 2 +- 21 files changed, 27 insertions(+), 25 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 87985d74935b..9d6508b0d569 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ bitflags = "2.4.1" bytemuck = "1.12" bytes = { version = "1.5", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] } +dashmap = "5.4" datafusion = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-common = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } datafusion-expr = { git = "https://github.com/apache/arrow-datafusion.git", rev = "26e43acac3a96cec8dd4c8365f22dfb1a84306e9" } @@ -116,6 +117,7 @@ reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls-native-roots", "stream", ] } +rskafka = "0.5" rust_decimal = "1.33" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index cb37e048b0c8..79f3603f4f19 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -24,7 +24,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion.workspace = true datatypes.workspace = true futures = "0.3" @@ -38,7 +38,7 @@ paste = "1.0" prometheus.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/common/config/Cargo.toml b/src/common/config/Cargo.toml index d43626b7e960..ce779d904313 100644 --- a/src/common/config/Cargo.toml +++ b/src/common/config/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [dependencies] common-base.workspace = true humantime-serde.workspace = true -rskafka = "0.5" +rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/decimal/Cargo.toml b/src/common/decimal/Cargo.toml index dd0ba90c440f..adf9b08446a8 100644 --- a/src/common/decimal/Cargo.toml +++ b/src/common/decimal/Cargo.toml @@ -11,5 +11,5 @@ common-error.workspace = true common-macro.workspace = true rust_decimal.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index f323af866079..9c71d5786039 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -16,7 +16,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion.workspace = true datatypes.workspace = true flatbuffers = "23.1" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 6f95109196d8..5a15581f41c6 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -34,7 +34,7 @@ prometheus.workspace = true prost.workspace = true rand.workspace = true regex.workspace = true -rskafka = "0.5" +rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with = "3" diff --git a/src/common/procedure/Cargo.toml b/src/common/procedure/Cargo.toml index ece649434b88..795df7eea9ea 100644 --- a/src/common/procedure/Cargo.toml +++ b/src/common/procedure/Cargo.toml @@ -19,7 +19,7 @@ futures.workspace = true humantime-serde.workspace = true object-store.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true smallvec.workspace = true snafu.workspace = true tokio.workspace = true diff --git a/src/common/time/Cargo.toml b/src/common/time/Cargo.toml index 23be6519f4c2..5bdba94a7e9b 100644 --- a/src/common/time/Cargo.toml +++ b/src/common/time/Cargo.toml @@ -11,7 +11,7 @@ chrono.workspace = true common-error.workspace = true common-macro.workspace = true serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true [dev-dependencies] diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index ee568a3c0602..85afd709e4b8 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -32,7 +32,7 @@ common-recordbatch.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true @@ -55,7 +55,7 @@ query.workspace = true reqwest.workspace = true secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/datatypes/Cargo.toml b/src/datatypes/Cargo.toml index 79ec3099367b..c7674d9973ae 100644 --- a/src/datatypes/Cargo.toml +++ b/src/datatypes/Cargo.toml @@ -26,5 +26,5 @@ ordered-float = { version = "3.0", features = ["serde"] } paste = "1.0" rust_decimal = "1.32.0" serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true diff --git a/src/file-engine/Cargo.toml b/src/file-engine/Cargo.toml index 514a064508b4..f0938c545bad 100644 --- a/src/file-engine/Cargo.toml +++ b/src/file-engine/Cargo.toml @@ -26,7 +26,7 @@ datatypes.workspace = true futures.workspace = true object-store.workspace = true serde = { version = "1.0", features = ["derive"] } -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index a95b557961ce..6df16182511c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -62,7 +62,7 @@ raft-engine.workspace = true regex.workspace = true script = { workspace = true, features = ["python"], optional = true } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 953398b6d73d..7941cd11e918 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -21,12 +21,12 @@ common-macro.workspace = true common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true -dashmap = "5.4" +dashmap.workspace = true futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true -rskafka = "0.5" +rskafka.workspace = true serde.workspace = true serde_json.workspace = true snafu.workspace = true diff --git a/src/meta-srv/Cargo.toml b/src/meta-srv/Cargo.toml index 2df19d8bea0e..01450f861d11 100644 --- a/src/meta-srv/Cargo.toml +++ b/src/meta-srv/Cargo.toml @@ -26,7 +26,7 @@ common-procedure.workspace = true common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datatypes.workspace = true derive_builder.workspace = true etcd-client.workspace = true @@ -43,7 +43,7 @@ prost.workspace = true rand.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 0ee17bb85861..8c3ef50ec2c7 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,7 +32,7 @@ common-runtime.workspace = true common-telemetry.workspace = true common-test-util = { workspace = true, optional = true } common-time.workspace = true -dashmap = "5.4" +dashmap.workspace = true datafusion-common.workspace = true datafusion-expr.workspace = true datafusion.workspace = true diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index a0d627b872d9..a2de8e0102ae 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -43,7 +43,7 @@ prometheus.workspace = true query.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index 9fd2321f178e..cf9ba0a16f2f 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -22,7 +22,7 @@ meta-client.workspace = true moka = { workspace = true, features = ["future"] } prometheus.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true snafu.workspace = true store-api.workspace = true table.workspace = true diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 2e61d7dd0a55..24010dd94011 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -47,7 +47,7 @@ promql-parser = "0.1.1" promql.workspace = true regex.workspace = true serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true snafu.workspace = true sql.workspace = true diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 45621a26e170..e81dfaf593c8 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -80,7 +80,7 @@ rustls-pki-types = "1.0" schemars = "0.8" secrecy = { version = "0.8", features = ["serde", "alloc"] } serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true session.workspace = true sha1 = "0.10" snafu.workspace = true @@ -112,7 +112,7 @@ mysql_async = { version = "0.33", default-features = false, features = [ ] } rand.workspace = true script = { workspace = true, features = ["python"] } -serde_json = "1.0" +serde_json.workspace = true session = { workspace = true, features = ["testing"] } table.workspace = true tokio-postgres = "0.7" diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 293302dcfde5..31b140ef7bdd 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -26,5 +26,5 @@ strum.workspace = true [dev-dependencies] async-stream.workspace = true -serde_json = "1.0" +serde_json.workspace = true tokio.workspace = true diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index 20812d397fba..1ba6f8e05d36 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -48,7 +48,7 @@ rstest = "0.17" rstest_reuse = "0.5" secrecy = "0.8" serde.workspace = true -serde_json = "1.0" +serde_json.workspace = true servers = { workspace = true, features = ["testing"] } session.workspace = true snafu.workspace = true From d19178dec9de4d0fcc6e0183cbf99545be0cecf8 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:25:00 +0800 Subject: [PATCH 14/22] fix: style --- src/common/config/src/wal/kafka.rs | 2 +- src/log-store/src/error.rs | 6 +++--- src/log-store/src/kafka/client_manager.rs | 4 ++-- src/log-store/src/kafka/log_store.rs | 2 +- src/log-store/src/kafka/record_utils.rs | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index c645f7c3607b..02227a9540e6 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -41,7 +41,7 @@ pub struct KafkaConfig { pub compression: RsKafkaCompression, /// The maximum log size an rskakfa batch producer could buffer. pub max_batch_size: ReadableSize, - /// The linger duration of an rskafka batch producer. + /// The linger duration of a rskafka batch producer. #[serde(with = "humantime_serde")] pub linger: Duration, /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 841cbe21a7b4..875baafbfb86 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -87,7 +87,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build an rskafka client, broker endpoints: {:?}", + "Failed to build a kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildClient { @@ -98,7 +98,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build an rskafka partition client, topic: {}, partition: {}", + "Failed to build a kafka partition client, topic: {}, partition: {}", topic, partition ))] @@ -157,7 +157,7 @@ pub enum Error { }, #[snafu(display( - "Failed to cast an rskafka offset to entry offset, rskafka_offset: {}", + "Failed to cast a kafka offset to entry offset, kafka_offset: {}", offset ))] CastOffset { offset: i64, location: Location }, diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index d979c0a838e8..060124f8d280 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -66,7 +66,7 @@ impl Client { #[derive(Debug)] pub(crate) struct ClientManager { config: KafkaConfig, - /// Top-level client in rskafka. All clients are constructed by this client. + /// Top-level client in kafka. All clients are constructed by this client. client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. @@ -76,7 +76,7 @@ pub(crate) struct ClientManager { impl ClientManager { /// Tries to create a ClientManager. pub(crate) async fn try_new(config: &KafkaConfig) -> Result { - // Sets backoff config for the top-level rskafka client and all clients constructed by it. + // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { init_backoff: Duration::from_millis(500), max_backoff: Duration::from_secs(10), diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index bba9490c352c..f154e97ec2c5 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -38,7 +38,7 @@ type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client #[derive(Debug)] pub struct KafkaLogStore { config: KafkaConfig, - /// Manages rskafka clients through which the log store contact the Kafka cluster. + /// Manages kafka clients through which the log store contact the Kafka cluster. client_manager: ClientManagerRef, } diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 0a1b59595671..9054ad9be515 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -23,7 +23,7 @@ use crate::error::{DecodeKeySnafu, EncodeKeySnafu, MissingKeySnafu, MissingValue use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; /// The key of a record. -/// An rskafka record consists of key, value, headers, and datetime. The value of a record +/// A kafka record consists of key, value, headers, and datetime. The value of a record /// is the entry data. Either of the key or the headers can be chosen to store the entry metadata /// including topic, region id, and entry id. Currently, the entry metadata is stored in the key. #[derive(Debug, Serialize, Deserialize)] From ffa4c4a31738e102b3ed9b6fe03dafe43534a516 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:27:39 +0800 Subject: [PATCH 15/22] fix: style --- config/datanode.example.toml | 4 ++-- src/common/config/src/wal/kafka.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bab945190b2c..261ec25158fc 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -58,9 +58,9 @@ sync_write = false # topic_name_prefix = "greptimedb_wal_kafka_topic" # Number of partitions per topic. # num_partitions = 1 -# The maximum log size an rskafka batch producer could buffer. +# The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" -# The linger duration of an rskafka batch producer. +# The linger duration of a kafka batch producer. # linger = "200ms" # The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. # max_wait_time = "100ms" diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index 02227a9540e6..bd7574c1613a 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -39,9 +39,9 @@ pub struct KafkaConfig { #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size an rskakfa batch producer could buffer. + /// The maximum log size a kakfa batch producer could buffer. pub max_batch_size: ReadableSize, - /// The linger duration of a rskafka batch producer. + /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] pub linger: Duration, /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. From 4bc1cc18b220f62fadef3c6f7931c13b850d4eb8 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:33:14 +0800 Subject: [PATCH 16/22] chore: unify topic name prefix --- config/datanode.example.toml | 2 +- config/metasrv.example.toml | 2 +- src/common/config/src/wal.rs | 4 ++-- src/common/config/src/wal/kafka.rs | 2 +- src/common/meta/src/wal.rs | 4 ++-- src/common/meta/src/wal/kafka.rs | 2 +- src/common/meta/src/wal/kafka/topic_manager.rs | 2 +- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 261ec25158fc..1d1eec471e78 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -55,7 +55,7 @@ sync_write = false # Number of topics shall be created beforehand. # num_topics = 64 # Topic name prefix. -# topic_name_prefix = "greptimedb_wal_kafka_topic" +# topic_name_prefix = "greptimedb_wal_topic" # Number of partitions per topic. # num_partitions = 1 # The maximum log size a kafka batch producer could buffer. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index b057d7f6b677..120f19255f3a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -61,7 +61,7 @@ provider = "raft_engine" # - "round_robin" (default) # selector_type = "round_robin" # A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -# topic_name_prefix = "greptimedb_wal_kafka" +# topic_name_prefix = "greptimedb_wal_topic" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 56e04c55547c..8fd7ef43bb23 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -71,7 +71,7 @@ mod tests { let toml_str = r#" broker_endpoints = ["127.0.0.1:9090"] num_topics = 32 - topic_name_prefix = "greptimedb_wal_kafka_topic" + topic_name_prefix = "greptimedb_wal_topic" num_partitions = 1 max_batch_size = "4MB" linger = "200ms" @@ -81,7 +81,7 @@ mod tests { let expected = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 32, - topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index bd7574c1613a..2c2ab54f530a 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -19,7 +19,7 @@ use rskafka::client::partition::Compression as RsKafkaCompression; use serde::{Deserialize, Serialize}; /// Topic name prefix. -pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic"; +pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; /// Kafka wal topic. pub type Topic = String; diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 0e1576c3645e..ff020bebb5d1 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -69,7 +69,7 @@ mod tests { broker_endpoints = ["127.0.0.1:9090"] num_topics = 32 selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_kafka" + topic_name_prefix = "greptimedb_wal_topic" num_partitions = 1 replication_factor = 3 create_topic_timeout = "30s" @@ -83,7 +83,7 @@ mod tests { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 32, selector_type: KafkaTopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_kafka".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 709c5a4f9879..1d6d253c8658 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -62,7 +62,7 @@ impl Default for KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_kafka".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 4773871ae134..0add7746ec26 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -181,7 +181,7 @@ mod tests { #[tokio::test] async fn test_restore_persisted_topics() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_name_prefix = "greptimedb_wal_kafka"; + let topic_name_prefix = "greptimedb_wal_topic"; let num_topics = 16; // Constructs mock topics. From 94c53a4a1be270f0f8dd9c1afee428e90f50e549 Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:47:36 +0800 Subject: [PATCH 17/22] chore: make backoff config configurable by users --- config/datanode.example.toml | 15 +++++------- config/standalone.example.toml | 24 ++++++++++++++++++- src/common/config/src/wal.rs | 8 +++++++ src/common/config/src/wal/kafka.rs | 16 +++++++++++++ src/common/meta/src/wal.rs | 4 ++-- src/common/meta/src/wal/kafka.rs | 4 ++-- .../meta/src/wal/kafka/topic_manager.rs | 2 +- src/log-store/src/kafka/client_manager.rs | 13 ++++------ 8 files changed, 63 insertions(+), 23 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 1d1eec471e78..7343fbe47bf5 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -41,7 +41,7 @@ tcp_nodelay = true # WAL data directory provider = "raft_engine" -# Raft-engine wal options, see `standalone.example.toml` +# Raft-engine wal options, see `standalone.example.toml`. # dir = "/tmp/greptimedb/wal" file_size = "256MB" purge_threshold = "4GB" @@ -49,21 +49,18 @@ purge_interval = "10m" read_batch_size = 128 sync_write = false -# Kafka wal options. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# Kafka wal options, see `standalone.example.toml`. # broker_endpoints = ["127.0.0.1:9090"] -# Number of topics shall be created beforehand. # num_topics = 64 -# Topic name prefix. # topic_name_prefix = "greptimedb_wal_topic" -# Number of partitions per topic. # num_partitions = 1 -# The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" -# The linger duration of a kafka batch producer. # linger = "200ms" -# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. # max_wait_time = "100ms" +# backoff_init = "500ms" +# backoff_max = "10s" +# backoff_base = 2.0 +# backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. [storage] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c8930a9646ca..7e12a6288ae0 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -87,7 +87,29 @@ enable = true # - "Kafka" provider = "raft_engine" -# There's no kafka wal config for standalone mode. +# Kafka wal options. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# broker_endpoints = ["127.0.0.1:9090"] +# Number of topics shall be created beforehand. +# num_topics = 64 +# Topic name prefix. +# topic_name_prefix = "greptimedb_wal_topic" +# Number of partitions per topic. +# num_partitions = 1 +# The maximum log size a kafka batch producer could buffer. +# max_batch_size = "4MB" +# The linger duration of a kafka batch producer. +# linger = "200ms" +# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. +# max_wait_time = "100ms" +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2.0 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" # WAL data directory # dir = "/tmp/greptimedb/wal" diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 8fd7ef43bb23..6c4955f4e63c 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -76,6 +76,10 @@ mod tests { max_batch_size = "4MB" linger = "200ms" max_wait_time = "100ms" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2 + backoff_deadline = "5mins" "#; let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { @@ -87,6 +91,10 @@ mod tests { max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), max_wait_time: Duration::from_millis(100), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2, + backoff_deadline: Some(Duration::from_secs(60 * 5)), }; assert_eq!(decoded, expected); } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index 2c2ab54f530a..1cf1abda492b 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -47,6 +47,18 @@ pub struct KafkaConfig { /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. #[serde(with = "humantime_serde")] pub max_wait_time: Duration, + /// The initial backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_init: Duration, + /// The maximum backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_max: Duration, + /// Exponential backoff rate, i.e. next backoff = base * current backoff. + pub backoff_base: u32, + /// Stop reconnecting if the total wait time reaches the deadline. + /// If it's None, the reconnecting won't terminate. + #[serde(with = "humantime_serde")] + pub backoff_deadline: Option, } impl Default for KafkaConfig { @@ -60,6 +72,10 @@ impl Default for KafkaConfig { max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), max_wait_time: Duration::from_millis(100), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2, + backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins } } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index ff020bebb5d1..f34a5224a87e 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -75,7 +75,7 @@ mod tests { create_topic_timeout = "30s" backoff_init = "500ms" backoff_max = "10s" - backoff_base = 2.0 + backoff_base = 2 backoff_deadline = "5mins" "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); @@ -89,7 +89,7 @@ mod tests { create_topic_timeout: Duration::from_secs(30), backoff_init: Duration::from_millis(500), backoff_max: Duration::from_secs(10), - backoff_base: 2.0, + backoff_base: 2, backoff_deadline: Some(Duration::from_secs(60 * 5)), }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 1d6d253c8658..52846569e344 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -49,7 +49,7 @@ pub struct KafkaConfig { #[serde(with = "humantime_serde")] pub backoff_max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. - pub backoff_base: f64, + pub backoff_base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. #[serde(with = "humantime_serde")] @@ -68,7 +68,7 @@ impl Default for KafkaConfig { create_topic_timeout: Duration::from_secs(30), backoff_init: Duration::from_millis(500), backoff_max: Duration::from_secs(10), - backoff_base: 2.0, + backoff_base: 2, backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 0add7746ec26..9b86d2a382f9 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -105,7 +105,7 @@ impl TopicManager { let backoff_config = BackoffConfig { init_backoff: self.config.backoff_init, max_backoff: self.config.backoff_max, - base: self.config.backoff_base, + base: self.config.backoff_base as f64, deadline: self.config.backoff_deadline, }; let client = ClientBuilder::new(self.config.broker_endpoints.clone()) diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 060124f8d280..8cb2995bb608 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -31,14 +31,12 @@ use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; // The `DEFAULT_PARTITION` refers to the index of the partition. const DEFAULT_PARTITION: i32 = 0; -/// Arc wrapper of Client. -pub(crate) type ClientRef = Arc; /// Arc wrapper of ClientManager. pub(crate) type ClientManagerRef = Arc; /// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. /// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(crate) struct Client { /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. pub(crate) raw_client: Arc, @@ -70,7 +68,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: DashMap, } impl ClientManager { @@ -81,7 +79,6 @@ impl ClientManager { init_backoff: Duration::from_millis(500), max_backoff: Duration::from_secs(10), base: 2., - // Stop reconnecting if the total wait time reaches the deadline. deadline: Some(Duration::from_secs(60 * 5)), }; let client = ClientBuilder::new(config.broker_endpoints.clone()) @@ -101,7 +98,7 @@ impl ClientManager { /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. - pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { + pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { match self.client_pool.entry(topic.to_string()) { DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), DashMapEntry::Vacant(entry) => { @@ -111,7 +108,7 @@ impl ClientManager { } } - async fn try_create_client(&self, topic: &Topic) -> Result { + async fn try_create_client(&self, topic: &Topic) -> Result { // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. // The reconnecting won't stop until succeed or a different error returns. @@ -125,6 +122,6 @@ impl ClientManager { }) .map(Arc::new)?; - Ok(Arc::new(Client::new(raw_client, &self.config))) + Ok(Client::new(raw_client, &self.config)) } } From ba21a0ebc84d65596a1d3006413b3decd6993daf Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 22 Dec 2023 17:52:42 +0800 Subject: [PATCH 18/22] chore: properly use backoff config in wal config --- src/common/config/src/wal/kafka.rs | 1 + src/common/meta/src/wal/kafka.rs | 2 ++ src/log-store/src/kafka/client_manager.rs | 9 ++++----- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index 1cf1abda492b..dc31bcec9e6c 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -54,6 +54,7 @@ pub struct KafkaConfig { #[serde(with = "humantime_serde")] pub backoff_max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. + // Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait. pub backoff_base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 52846569e344..173a74662d95 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -49,6 +49,8 @@ pub struct KafkaConfig { #[serde(with = "humantime_serde")] pub backoff_max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. + // Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32, + // and we want to unify their types. pub backoff_base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 8cb2995bb608..56cdadabeccb 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; use dashmap::mapref::entry::Entry as DashMapEntry; @@ -76,10 +75,10 @@ impl ClientManager { pub(crate) async fn try_new(config: &KafkaConfig) -> Result { // Sets backoff config for the top-level kafka client and all clients constructed by it. let backoff_config = BackoffConfig { - init_backoff: Duration::from_millis(500), - max_backoff: Duration::from_secs(10), - base: 2., - deadline: Some(Duration::from_secs(60 * 5)), + init_backoff: config.backoff_init, + max_backoff: config.backoff_max, + base: config.backoff_base as f64, + deadline: config.backoff_deadline, }; let client = ClientBuilder::new(config.broker_endpoints.clone()) .backoff_config(backoff_config) From a718a9a154da306011d896b1c4a3073f9fb762fd Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 23 Dec 2023 17:42:26 +0800 Subject: [PATCH 19/22] refactor: read/write of kafka log store --- src/log-store/src/error.rs | 37 ++-- src/log-store/src/kafka.rs | 21 +- src/log-store/src/kafka/log_store.rs | 198 +++++------------ src/log-store/src/kafka/offset.rs | 37 ++++ src/log-store/src/kafka/record_utils.rs | 235 ++++++++++++++++----- src/log-store/src/noop.rs | 11 +- src/log-store/src/raft_engine/log_store.rs | 5 +- src/store-api/src/logstore.rs | 38 ++-- src/store-api/src/logstore/entry.rs | 14 +- 9 files changed, 319 insertions(+), 277 deletions(-) create mode 100644 src/log-store/src/kafka/offset.rs diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 875baafbfb86..ea54853dea09 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -121,16 +121,15 @@ pub enum Error { error: String, }, - #[snafu(display("Failed to encode a record key, key: {}", key))] - EncodeKey { - key: String, + #[snafu(display("Failed to encode a record meta"))] + EncodeMeta { location: Location, #[snafu(source)] error: serde_json::Error, }, - #[snafu(display("Failed to decode a record key"))] - DecodeKey { + #[snafu(display("Failed to decode a record meta"))] + DecodeMeta { location: Location, #[snafu(source)] error: serde_json::Error, @@ -142,40 +141,34 @@ pub enum Error { #[snafu(display("Missing required value in a record"))] MissingValue { location: Location }, - #[snafu(display("Failed to produce entries to Kafka, topic: {}", topic))] - ProduceEntries { + #[snafu(display("Cannot build a record from empty entries"))] + EmptyEntries { location: Location }, + + #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))] + ProduceRecord { topic: KafkaWalTopic, location: Location, #[snafu(source)] error: rskafka::client::producer::Error, }, - #[snafu(display("The produce tasks return an empty offset vector, topic: {}", topic))] - EmptyOffsets { - topic: KafkaWalTopic, - location: Location, - }, - #[snafu(display( - "Failed to cast a kafka offset to entry offset, kafka_offset: {}", - offset - ))] - CastOffset { offset: i64, location: Location }, - - #[snafu(display( - "Failed to read a record from Kafka, offset {}, topic: {}, region id: {}", - offset, + "Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}", topic, region_id, + offset, ))] ConsumeRecord { - offset: i64, topic: String, region_id: u64, + offset: i64, location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, + + #[snafu(display("Failed to do a cast"))] + Cast { location: Location }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index fe7d001d70a5..5fd4fe326eed 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -14,27 +14,23 @@ mod client_manager; pub mod log_store; +mod offset; mod record_utils; use common_meta::wal::KafkaWalTopic as Topic; +use serde::{Deserialize, Serialize}; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::Namespace; use crate::error::Error; /// Kafka Namespace implementation. -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct NamespaceImpl { region_id: u64, topic: Topic, } -impl NamespaceImpl { - fn new(region_id: u64, topic: Topic) -> Self { - Self { region_id, topic } - } -} - impl Namespace for NamespaceImpl { fn id(&self) -> u64 { self.region_id @@ -42,6 +38,7 @@ impl Namespace for NamespaceImpl { } /// Kafka Entry implementation. +#[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { /// Entry payload. data: Vec, @@ -51,16 +48,6 @@ pub struct EntryImpl { ns: NamespaceImpl, } -impl EntryImpl { - fn new(data: Vec, entry_id: EntryId, ns: NamespaceImpl) -> Self { - Self { - data, - id: entry_id, - ns, - } - } -} - impl Entry for EntryImpl { type Error = Error; type Namespace = NamespaceImpl; diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index f154e97ec2c5..6594029e3de0 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -15,25 +15,20 @@ use std::collections::HashMap; use std::sync::Arc; -use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic, WalOptions}; +use common_config::wal::{KafkaConfig, WalOptions}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; -use rskafka::record::{Record, RecordAndOffset}; -use snafu::{OptionExt, ResultExt}; -use store_api::logstore::entry::{Id as EntryId, Offset as EntryOffset}; +use store_api::logstore::entry::Id as EntryId; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error::{ - CastOffsetSnafu, ConsumeRecordSnafu, EmptyOffsetsSnafu, Error, GetClientSnafu, - ProduceEntriesSnafu, Result, -}; +use crate::error::{Error, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; +use crate::kafka::offset::Offset; +use crate::kafka::record_utils::{handle_consume_result, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; -type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; - /// A log store backed by Kafka. #[derive(Debug)] pub struct KafkaLogStore { @@ -50,53 +45,6 @@ impl KafkaLogStore { config: config.clone(), }) } - - /// Appends a batch of entries to a topic. The entries may come from multiple regions. - /// Returns a tuple where the first element is the topic while the second is the minimum - /// start offset of the entries appended to the topic. - async fn append_batch_to_topic( - &self, - entries: Vec, - topic: Topic, - ) -> Result<(Topic, EntryOffset)> { - // Safety: the caller ensures the input entries is not empty. - assert!(!entries.is_empty()); - - // Gets the client associated with the topic. - let client = self - .client_manager - .get_or_insert(&topic) - .await - .map_err(|e| { - GetClientSnafu { - topic: &topic, - error: e.to_string(), - } - .build() - })?; - - // Convert entries to records and produce them to Kafka. - let mut tasks = Vec::with_capacity(entries.len()); - for entry in entries { - let record: Record = entry.try_into()?; - let task = client.producer.produce(record); - tasks.push(task); - } - // Each produce task will return an offset to represent the minimum start offset of the entries produced by the task. - let offsets = futures::future::try_join_all(tasks) - .await - .context(ProduceEntriesSnafu { topic: &topic })?; - - // Since the task completion order is not deterministic, a `min` operation is required to find the minimum offset. - let min_offset = offsets - .into_iter() - .min() - .context(EmptyOffsetsSnafu { topic: &topic })?; - let min_offset: EntryOffset = min_offset - .try_into() - .map_err(|_| CastOffsetSnafu { offset: min_offset }.build())?; - Ok((topic, min_offset)) - } } #[async_trait::async_trait] @@ -105,67 +53,62 @@ impl LogStore for KafkaLogStore { type Entry = EntryImpl; type Namespace = NamespaceImpl; - /// Creates an entry of the associated Entry type. + /// Creates an entry. fn entry>( &self, data: D, entry_id: EntryId, ns: Self::Namespace, ) -> Self::Entry { - EntryImpl::new(data.as_ref().to_vec(), entry_id, ns) + EntryImpl { + data: data.as_ref().to_vec(), + id: entry_id, + ns, + } } - /// Appends an entry to the log store and returns a response containing the entry id and an optional entry offset. + /// Appends an entry to the log store and returns a response containing the entry id of the appended entry. async fn append(&self, entry: Self::Entry) -> Result { - let entry_id = entry.id; - let topic = entry.ns.topic.clone(); - let (_, offset) = self.append_batch_to_topic(vec![entry], topic).await?; - Ok(AppendResponse { - entry_id, - offset: Some(offset), - }) + let entry_id = RecordProducer::new(entry.ns.clone()) + .with_entries(vec![entry]) + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok(AppendResponse { entry_id }) } - /// For a batch of entries belonging to multiple regions, each assigned to a specific topic, - /// we need to determine the minimum start offset returned for each region in this batch. - /// During replay a region, we use this offset to fetch entries for the region from its assigned topic. - /// After fetching, we filter the entries to obtain entries relevant to the region. + /// Appends a batch of entries to the log store. The response contains a map where the key + /// is a region id while the value if the id of the entry, the first entry of the entries belong to the region, + /// written into the log store. async fn append_batch(&self, entries: Vec) -> Result { if entries.is_empty() { return Ok(AppendBatchResponse::default()); } - // The entries are grouped by topic since the number of regions might be very large - // while the number of topics are well controlled, - let mut topic_entries: HashMap<_, Vec<_>> = HashMap::new(); - // A utility map used to construct the result response. - let mut topic_regions: HashMap<_, Vec<_>> = HashMap::new(); + // Groups entries by region id and push them to an associated record producer. + let mut producers: HashMap<_, RecordProducer> = HashMap::with_capacity(entries.len()); for entry in entries { - let topic = entry.ns.topic.clone(); - let region_id = entry.ns.region_id; - topic_entries.entry(topic.clone()).or_default().push(entry); - topic_regions.entry(topic).or_default().push(region_id); + producers + .entry(entry.ns.region_id) + .or_insert(RecordProducer::new(entry.ns.clone())) + .push(entry); } - // Appends each group of entries to the corresponding topic. - let tasks = topic_entries - .into_iter() - .map(|(topic, entries)| self.append_batch_to_topic(entries, topic)) + // Builds a record from entries belong to a region and produces them to kafka server. + let region_ids = producers.keys().cloned().collect::>(); + let tasks = producers + .into_values() + .map(|producer| producer.produce(&self.client_manager)) .collect::>(); - let topic_offset = futures::future::try_join_all(tasks) + // Each produce operation returns a kafka offset of the produced record. + // The offsets are then converted to entry ids. + let entry_ids = futures::future::try_join_all(tasks) .await? .into_iter() - .collect::>(); - - let mut region_offset = HashMap::new(); - for (topic, regions) in topic_regions { - let offset = topic_offset[&topic]; - regions.into_iter().for_each(|region| { - region_offset.insert(region, offset); - }); - } + .map(TryInto::try_into) + .collect::>>()?; Ok(AppendBatchResponse { - offsets: region_offset, + entry_ids: region_ids.into_iter().zip(entry_ids).collect(), }) } @@ -178,16 +121,18 @@ impl LogStore for KafkaLogStore { ) -> Result> { let topic = ns.topic.clone(); let region_id = ns.region_id; - let offset = try_get_offset(entry_id)?; - let raw_client = self + // Gets the client associated with the topic. + let client = self .client_manager .get_or_insert(&topic) .await? .raw_client .clone(); + // Reads the entries starting from exactly the specified offset. - let mut stream_consumer = StreamConsumerBuilder::new(raw_client, StartOffset::At(offset)) + let offset = Offset::try_from(entry_id)?.0; + let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) .build(); @@ -199,11 +144,11 @@ impl LogStore for KafkaLogStore { Ok(Box::pin(stream)) } - /// Create a namespace of the associate Namespace type + /// Creates a namespace. fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - // Safety: we assume the database administrator, not the database itself, is responsible for ensuring that - // the wal config for metasrv and that for datanode are consistent, i.e. their wal providers should be identical. - // With such an assumption, the unreachable is safe here. + // Safety: upon start, the datanode checks the consistency of the wal providers in the wal config of the + // datanode and that of the metasrv. Therefore, the wal options passed into the kafka log store + // must be of type WalOptions::Kafka. let WalOptions::Kafka(kafka_options) = wal_options else { unreachable!() }; @@ -213,72 +158,27 @@ impl LogStore for KafkaLogStore { } } - /// Create a new `Namespace`. async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } - /// Delete an existing `Namespace` with given ref. async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } - /// List all existing namespaces. async fn list_namespaces(&self) -> Result> { Ok(vec![]) } - /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete + /// Marks all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete /// the log files if all entries inside are obsolete. This method may not delete log /// files immediately. async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> { Ok(()) } - /// Stop components of logstore. + /// Stops components of the logstore. async fn stop(&self) -> Result<()> { Ok(()) } } - -/// Tries to get the physical offset of the entry with the given entry id. -// TODO(niebayes): a mapping between entry id and entry offset needs to maintained at somewhere. -// One solution is to store the mapping at a specific Kafka topic. Each time the mapping is updated, -// a new record is constructed and appended to the topic. On initializing the log store, the latest -// record is pulled from Kafka cluster. -// -// The second solution is to store the mapping at the kv backend. We design a dedicated key, e.g. ID_TO_OFFSET_MAP_KEY. -// Each time the mapping is updated, the map is serialized into a vector of bytes and stored into the kv backend at the given key. -// On initializing the log store, the map is deserialized from the kv backend. -// -// The third solution is to store the offset for each region separately. More specifically, when flushed, the -// latest entry offset is stored in the RegionManifest and then persisted into the manifest file. On opening -// a region, the offset is restored and maintained at somewhere in memory. -fn try_get_offset(entry_id: EntryId) -> Result { - let _ = entry_id; - todo!() -} - -fn handle_consume_result( - result: ConsumeResult, - topic: &Topic, - region_id: u64, - offset: i64, -) -> Result> { - match result { - Ok((record_and_offset, _)) => { - let entry = EntryImpl::try_from(record_and_offset.record)?; - // Only produces entries belonging to the region with the given region id. - if entry.ns.region_id == region_id { - Ok(vec![entry]) - } else { - Ok(vec![]) - } - } - Err(e) => Err(e).context(ConsumeRecordSnafu { - offset, - topic, - region_id, - }), - } -} diff --git a/src/log-store/src/kafka/offset.rs b/src/log-store/src/kafka/offset.rs new file mode 100644 index 000000000000..8c1c66b9f9f5 --- /dev/null +++ b/src/log-store/src/kafka/offset.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{CastSnafu, Result}; +use crate::kafka::EntryId; + +/// A wrapper of kafka offset. +pub(crate) struct Offset(pub i64); + +impl TryFrom for EntryId { + type Error = crate::error::Error; + + fn try_from(offset: Offset) -> Result { + EntryId::try_from(offset.0).map_err(|_| CastSnafu.build()) + } +} + +impl TryFrom for Offset { + type Error = crate::error::Error; + + fn try_from(entry_id: EntryId) -> Result { + i64::try_from(entry_id) + .map(Offset) + .map_err(|_| CastSnafu.build()) + } +} diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 9054ad9be515..de205df4d36c 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -12,70 +12,205 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; - use common_config::wal::KafkaWalTopic as Topic; -use rskafka::record::Record; +use rskafka::record::{Record, RecordAndOffset}; use serde::{Deserialize, Serialize}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; -use crate::error::{DecodeKeySnafu, EncodeKeySnafu, MissingKeySnafu, MissingValueSnafu, Result}; +use crate::error::{ + ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, + MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, +}; +use crate::kafka::client_manager::ClientManagerRef; +use crate::kafka::offset::Offset; use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; -/// The key of a record. -/// A kafka record consists of key, value, headers, and datetime. The value of a record -/// is the entry data. Either of the key or the headers can be chosen to store the entry metadata -/// including topic, region id, and entry id. Currently, the entry metadata is stored in the key. -#[derive(Debug, Serialize, Deserialize)] -struct RecordKey { - topic: Topic, - region_id: u64, - entry_id: EntryId, +type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; + +/// Record metadata which will be serialized/deserialized to/from the `key` of a Record. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +struct RecordMeta { + /// The namespace of the entries wrapped in the record. + ns: NamespaceImpl, + /// Ids of the entries built into the record. + entry_ids: Vec, + /// entry_offsets[i] is the end offset (exclusive) of the data of the i-th entry in the record value. + entry_offsets: Vec, } -impl ToString for RecordKey { - fn to_string(&self) -> String { - format!("{}/{}/{}", self.topic, self.region_id, self.entry_id) +impl RecordMeta { + fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self { + Self { + ns, + entry_ids: entries.iter().map(|entry| entry.id).collect(), + entry_offsets: entries + .iter() + .map(|entry| entry.data.len()) + .scan(0, |presum, x| { + *presum += x; + Some(*presum) + }) + .collect(), + } } } -// When writing to a region, a wal entry is constructed from all mutations on the region. -// I.e., a wal entry is itself a log batch and hence no need to group multiple entries into a record. -// That's why the mapping between entries and records are one-one. -impl TryInto for EntryImpl { - type Error = crate::error::Error; - - fn try_into(self) -> Result { - let key = RecordKey { - topic: self.ns.topic, - region_id: self.ns.region_id, - entry_id: self.id, - }; - let raw_key = serde_json::to_vec(&key).context(EncodeKeySnafu { - key: key.to_string(), - })?; - - Ok(Record { - key: Some(raw_key), - value: Some(self.data), - headers: BTreeMap::default(), - timestamp: rskafka::chrono::Utc::now(), - }) +/// Produces a record to a kafka topic. +pub(crate) struct RecordProducer { + /// The namespace of the entries. + ns: NamespaceImpl, + /// Entries are buffered before being built into a record. + entries: Vec, +} + +impl RecordProducer { + /// Creates a new producer for producing entries with the given namespace. + pub(crate) fn new(ns: NamespaceImpl) -> Self { + Self { + ns, + entries: Vec::new(), + } + } + + /// Populates the entry buffer with the given entries. + pub(crate) fn with_entries(self, entries: Vec) -> Self { + Self { entries, ..self } + } + + /// Pushes an entry into the entry buffer. + pub(crate) fn push(&mut self, entry: EntryImpl) { + self.entries.push(entry); + } + + /// Produces the buffered entries to kafka sever as a kafka record. + /// Returns the kafka offset of the produced record. + // TODO(niebayes): since the total size of a region's entries may be way-too large, + // the producer may need to support splitting entries into multiple records. + pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { + ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); + + // Produces the record through a client. The client determines when to send the record to kafka server. + let client = client_manager + .get_or_insert(&self.ns.topic) + .await + .map_err(|e| { + GetClientSnafu { + topic: &self.ns.topic, + error: e.to_string(), + } + .build() + })?; + client + .producer + .produce(encode_to_record(self.ns.clone(), self.entries)?) + .await + .map(Offset) + .context(ProduceRecordSnafu { + topic: &self.ns.topic, + }) + } +} + +fn encode_to_record(ns: NamespaceImpl, entries: Vec) -> Result { + let meta = RecordMeta::new(ns, &entries); + let data = entries.into_iter().flat_map(|entry| entry.data).collect(); + Ok(Record { + key: Some(serde_json::to_vec(&meta).context(EncodeMetaSnafu)?), + value: Some(data), + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }) +} + +fn decode_from_record(record: Record) -> Result> { + let key = record.key.context(MissingKeySnafu)?; + let value = record.value.context(MissingValueSnafu)?; + let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?; + + let mut entries = Vec::with_capacity(meta.entry_ids.len()); + let mut start_offset = 0; + for (i, end_offset) in meta.entry_offsets.iter().enumerate() { + entries.push(EntryImpl { + // TODO(niebayes): try to avoid the clone. + data: value[start_offset..*end_offset].to_vec(), + id: meta.entry_ids[i], + ns: meta.ns.clone(), + }); + start_offset = *end_offset; + } + Ok(entries) +} + +/// Handles the result of a consume operation on a kafka topic. +pub(crate) fn handle_consume_result( + result: ConsumeResult, + topic: &Topic, + region_id: u64, + offset: i64, +) -> Result> { + match result { + Ok((record_and_offset, _)) => { + // Only produces entries belong to the region with the given region id. + // Since a record only contains entries from a single region, it suffices to check the first entry only. + let entries = decode_from_record(record_and_offset.record)?; + if let Some(entry) = entries.first() + && entry.id == region_id + { + Ok(entries) + } else { + Ok(vec![]) + } + } + Err(e) => Err(e).context(ConsumeRecordSnafu { + topic, + region_id, + offset, + }), } } -impl TryFrom for EntryImpl { - type Error = crate::error::Error; +#[cfg(test)] +mod tests { + use super::*; - fn try_from(record: Record) -> Result { - let raw_key = record.key.context(MissingKeySnafu)?; - let key: RecordKey = serde_json::from_slice(&raw_key).context(DecodeKeySnafu)?; - let data = record.value.context(MissingValueSnafu)?; + fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { + EntryImpl { + data: data.as_ref().to_vec(), + id: entry_id, + ns, + } + } - Ok(Self { - id: key.entry_id, - ns: NamespaceImpl::new(key.region_id, key.topic), - data, - }) + #[test] + fn test_serde_record_meta() { + let ns = NamespaceImpl { + region_id: 1, + topic: "test_topic".to_string(), + }; + let entries = vec![ + new_test_entry(b"111", 1, ns.clone()), + new_test_entry(b"2222", 2, ns.clone()), + new_test_entry(b"33333", 3, ns.clone()), + ]; + let meta = RecordMeta::new(ns, &entries); + let encoded = serde_json::to_vec(&meta).unwrap(); + let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(meta, decoded); + } + + #[test] + fn test_encdec_record() { + let ns = NamespaceImpl { + region_id: 1, + topic: "test_topic".to_string(), + }; + let entries = vec![ + new_test_entry(b"111", 1, ns.clone()), + new_test_entry(b"2222", 2, ns.clone()), + new_test_entry(b"33333", 3, ns.clone()), + ]; + let record = encode_to_record(ns, entries.clone()).unwrap(); + let decoded_entries = decode_from_record(record).unwrap(); + assert_eq!(entries, decoded_entries); } } diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 1929e59a2365..62ace38db977 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use common_config::wal::WalOptions; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -65,16 +63,11 @@ impl LogStore for NoopLogStore { } async fn append(&self, mut _e: Self::Entry) -> Result { - Ok(AppendResponse { - entry_id: 0, - offset: None, - }) + Ok(AppendResponse { entry_id: 0 }) } async fn append_batch(&self, _e: Vec) -> Result { - Ok(AppendBatchResponse { - offsets: HashMap::new(), - }) + Ok(AppendBatchResponse::default()) } async fn read( diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index eb14bf0cf90a..af0eb07b155e 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -178,10 +178,7 @@ impl LogStore for RaftEngineLogStore { .engine .write(&mut batch, self.config.sync_write) .context(RaftEngineSnafu)?; - Ok(AppendResponse { - entry_id, - offset: None, - }) + Ok(AppendResponse { entry_id }) } /// Appends a batch of entries to logstore. `RaftEngineLogStore` assures the atomicity of diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 6d124a499bb5..ec5f9097041e 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use common_config::wal::WalOptions; use common_error::ext::ErrorExt; -use crate::logstore::entry::{Entry, Id as EntryId, Offset as EntryOffset}; +use crate::logstore::entry::{Entry, Id as EntryId}; use crate::logstore::entry_stream::SendableEntryStream; use crate::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -34,46 +34,46 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { type Namespace: Namespace; type Entry: Entry; - /// Stop components of logstore. + /// Stops components of the logstore. async fn stop(&self) -> Result<(), Self::Error>; - /// Append an `Entry` to WAL with given namespace and return append response containing - /// the entry id. + /// Appends an entry to the log store and returns a response containing the id of the append entry. async fn append(&self, entry: Self::Entry) -> Result; - /// Append a batch of entries and return an append batch response containing the start entry ids of - /// log entries written to each region. + /// Appends a batch of entries and returns a response containing a map where the key is a region id + /// while the value is the id of the entry, the first entry of the entries belong to the region, written into the log store. async fn append_batch( &self, entries: Vec, ) -> Result; - /// Create a new `EntryStream` to asynchronously generates `Entry` with ids + /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. + // TODO(niebayes): update docs for entry id. async fn read( &self, ns: &Self::Namespace, id: EntryId, ) -> Result, Self::Error>; - /// Create a new `Namespace`. + /// Creates a new `Namespace`. async fn create_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// Delete an existing `Namespace` with given ref. + /// Deletes an existing `Namespace` with given ref. async fn delete_namespace(&self, ns: &Self::Namespace) -> Result<(), Self::Error>; - /// List all existing namespaces. + /// Lists all existing namespaces. async fn list_namespaces(&self) -> Result, Self::Error>; - /// Create an entry of the associate Entry type + /// Creates an entry of the associate Entry type. fn entry>(&self, data: D, entry_id: EntryId, ns: Self::Namespace) -> Self::Entry; - /// Create a namespace of the associate Namespace type + /// Creates a namespace of the associates Namespace type. // TODO(sunng87): confusion with `create_namespace` fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace; - /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete + /// Marks all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete /// the log files if all entries inside are obsolete. This method may not delete log /// files immediately. async fn obsolete(&self, ns: Self::Namespace, entry_id: EntryId) -> Result<(), Self::Error>; @@ -82,18 +82,14 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// The response of an `append` operation. #[derive(Debug)] pub struct AppendResponse { - /// The logical id of the appended log entry. + /// The id of the entry written into the log store. pub entry_id: EntryId, - /// The physical start offset of the appended log entry. - /// Depends on the `LogStore` implementation, the entry offset may be missing. - pub offset: Option, } /// The response of an `append_batch` operation. #[derive(Debug, Default)] pub struct AppendBatchResponse { - /// Key: region id (as u64). Value: the known minimum start offset of the appended log entries belonging to the region. - /// Depends on the `LogStore` implementation, the entry offsets may be missing. - // TODO(niebayes): the offset seems shouldn't be exposed to users of wal. But for now, let's keep it. - pub offsets: HashMap, + /// Key: region id (as u64). + /// Value: the id of the entry, the first entry of the entries belong to the region, written into the log store. + pub entry_ids: HashMap, } diff --git a/src/store-api/src/logstore/entry.rs b/src/store-api/src/logstore/entry.rs index cb2538086e6d..11b3136687b9 100644 --- a/src/store-api/src/logstore/entry.rs +++ b/src/store-api/src/logstore/entry.rs @@ -16,21 +16,25 @@ use common_error::ext::ErrorExt; use crate::logstore::namespace::Namespace; -/// An entry's logical id, allocated by log store users. +/// An entry's id. +/// An append operation tries to append an entry at the id. +/// A read operation tries to read an entry at the id. pub type Id = u64; -/// An entry's physical offset in the underlying log store. +/// An entry's offset. It's not used for now. pub type Offset = usize; -/// Entry is the minimal data storage unit in `LogStore`. +/// Entry is the minimal data storage unit through which users interact with the log store. +/// The log store implementation may have larger or smaller data storage unit than an entry. pub trait Entry: Send + Sync { type Error: ErrorExt + Send + Sync; type Namespace: Namespace; - /// Return contained data of entry. + /// Returns the contained data of the entry. fn data(&self) -> &[u8]; - /// Return entry id that monotonically increments. + /// Returns the id of the entry. fn id(&self) -> Id; + /// Returns the namespace of the entry. fn namespace(&self) -> Self::Namespace; } From ed4d2cfe7e44b285d6c43e746750b8c829965bac Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 23 Dec 2023 17:46:34 +0800 Subject: [PATCH 20/22] fix: typo --- src/log-store/src/kafka/log_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 6594029e3de0..8718a0f7c4d4 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -78,7 +78,7 @@ impl LogStore for KafkaLogStore { } /// Appends a batch of entries to the log store. The response contains a map where the key - /// is a region id while the value if the id of the entry, the first entry of the entries belong to the region, + /// is a region id while the value is the id of the entry, the first entry of the entries belong to the region, /// written into the log store. async fn append_batch(&self, entries: Vec) -> Result { if entries.is_empty() { From 62a8991a986e585e41a84002c420a36724c1f58b Mon Sep 17 00:00:00 2001 From: niebayes Date: Sat, 23 Dec 2023 17:49:05 +0800 Subject: [PATCH 21/22] fix: typo --- src/log-store/src/kafka/log_store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 8718a0f7c4d4..e1f97d6b2937 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -85,7 +85,7 @@ impl LogStore for KafkaLogStore { return Ok(AppendBatchResponse::default()); } - // Groups entries by region id and push them to an associated record producer. + // Groups entries by region id and pushes them to an associated record producer. let mut producers: HashMap<_, RecordProducer> = HashMap::with_capacity(entries.len()); for entry in entries { producers From 0f4ef3e571b0f1eda1df1fa71439ea4c030717e3 Mon Sep 17 00:00:00 2001 From: niebayes Date: Mon, 25 Dec 2023 17:05:34 +0800 Subject: [PATCH 22/22] fix: resolve review conversations --- config/datanode.example.toml | 3 --- config/standalone.example.toml | 6 ------ src/common/config/src/wal.rs | 6 ------ src/common/config/src/wal/kafka.rs | 9 --------- src/common/meta/src/error.rs | 6 +++--- src/log-store/src/error.rs | 4 ++-- src/log-store/src/kafka/client_manager.rs | 2 +- 7 files changed, 6 insertions(+), 30 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 7343fbe47bf5..342e10bfe19f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -51,9 +51,6 @@ sync_write = false # Kafka wal options, see `standalone.example.toml`. # broker_endpoints = ["127.0.0.1:9090"] -# num_topics = 64 -# topic_name_prefix = "greptimedb_wal_topic" -# num_partitions = 1 # max_batch_size = "4MB" # linger = "200ms" # max_wait_time = "100ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 7e12a6288ae0..713f8ef79edb 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -90,12 +90,6 @@ provider = "raft_engine" # Kafka wal options. # The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. # broker_endpoints = ["127.0.0.1:9090"] -# Number of topics shall be created beforehand. -# num_topics = 64 -# Topic name prefix. -# topic_name_prefix = "greptimedb_wal_topic" -# Number of partitions per topic. -# num_partitions = 1 # The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" # The linger duration of a kafka batch producer. diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 6c4955f4e63c..60128d14b35e 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -70,9 +70,6 @@ mod tests { fn test_serde_kafka_config() { let toml_str = r#" broker_endpoints = ["127.0.0.1:9090"] - num_topics = 32 - topic_name_prefix = "greptimedb_wal_topic" - num_partitions = 1 max_batch_size = "4MB" linger = "200ms" max_wait_time = "100ms" @@ -84,9 +81,6 @@ mod tests { let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 32, - topic_name_prefix: "greptimedb_wal_topic".to_string(), - num_partitions: 1, compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index dc31bcec9e6c..eb6795054141 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -29,12 +29,6 @@ pub type Topic = String; pub struct KafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// Number of topics shall be created beforehand. - pub num_topics: usize, - /// Topic name prefix. - pub topic_name_prefix: String, - /// Number of partitions per topic. - pub num_partitions: i32, /// The compression algorithm used to compress log entries. #[serde(skip)] #[serde(default)] @@ -66,9 +60,6 @@ impl Default for KafkaConfig { fn default() -> Self { Self { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 64, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, compression: RsKafkaCompression::NoCompression, max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 79c3e9316c59..519d8ec7a1af 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -304,7 +304,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka client, broker endpoints: {:?}", + "Failed to build a Kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildKafkaClient { @@ -314,14 +314,14 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to build a kafka controller client"))] + #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to create a kafka wal topic"))] + #[snafu(display("Failed to create a Kafka wal topic"))] CreateKafkaWalTopic { location: Location, #[snafu(source)] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index ea54853dea09..1ee344046adc 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -87,7 +87,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka client, broker endpoints: {:?}", + "Failed to build a Kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildClient { @@ -98,7 +98,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka partition client, topic: {}, partition: {}", + "Failed to build a Kafka partition client, topic: {}, partition: {}", topic, partition ))] diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index 56cdadabeccb..9aa27bf1b3fd 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -91,7 +91,7 @@ impl ClientManager { Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::with_capacity(config.num_topics), + client_pool: DashMap::new(), }) }