diff --git a/config/datanode.example.toml b/config/datanode.example.toml index bab945190b2c..342e10bfe19f 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -41,7 +41,7 @@ tcp_nodelay = true # WAL data directory provider = "raft_engine" -# Raft-engine wal options, see `standalone.example.toml` +# Raft-engine wal options, see `standalone.example.toml`. # dir = "/tmp/greptimedb/wal" file_size = "256MB" purge_threshold = "4GB" @@ -49,21 +49,15 @@ purge_interval = "10m" read_batch_size = 128 sync_write = false -# Kafka wal options. -# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# Kafka wal options, see `standalone.example.toml`. # broker_endpoints = ["127.0.0.1:9090"] -# Number of topics shall be created beforehand. -# num_topics = 64 -# Topic name prefix. -# topic_name_prefix = "greptimedb_wal_kafka_topic" -# Number of partitions per topic. -# num_partitions = 1 -# The maximum log size an rskafka batch producer could buffer. # max_batch_size = "4MB" -# The linger duration of an rskafka batch producer. # linger = "200ms" -# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. # max_wait_time = "100ms" +# backoff_init = "500ms" +# backoff_max = "10s" +# backoff_base = 2.0 +# backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. [storage] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index b057d7f6b677..120f19255f3a 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -61,7 +61,7 @@ provider = "raft_engine" # - "round_robin" (default) # selector_type = "round_robin" # A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. -# topic_name_prefix = "greptimedb_wal_kafka" +# topic_name_prefix = "greptimedb_wal_topic" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. diff --git a/config/standalone.example.toml b/config/standalone.example.toml index c8930a9646ca..713f8ef79edb 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -87,7 +87,23 @@ enable = true # - "Kafka" provider = "raft_engine" -# There's no kafka wal config for standalone mode. +# Kafka wal options. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# broker_endpoints = ["127.0.0.1:9090"] +# The maximum log size a kafka batch producer could buffer. +# max_batch_size = "4MB" +# The linger duration of a kafka batch producer. +# linger = "200ms" +# The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. +# max_wait_time = "100ms" +# The initial backoff for kafka clients. +# backoff_init = "500ms" +# The maximum backoff for kafka clients. +# backoff_max = "10s" +# Exponential backoff rate, i.e. next backoff = base * current backoff. +# backoff_base = 2.0 +# Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. +# backoff_deadline = "5mins" # WAL data directory # dir = "/tmp/greptimedb/wal" diff --git a/src/common/config/src/wal.rs b/src/common/config/src/wal.rs index 56e04c55547c..60128d14b35e 100644 --- a/src/common/config/src/wal.rs +++ b/src/common/config/src/wal.rs @@ -70,23 +70,25 @@ mod tests { fn test_serde_kafka_config() { let toml_str = r#" broker_endpoints = ["127.0.0.1:9090"] - num_topics = 32 - topic_name_prefix = "greptimedb_wal_kafka_topic" - num_partitions = 1 max_batch_size = "4MB" linger = "200ms" max_wait_time = "100ms" + backoff_init = "500ms" + backoff_max = "10s" + backoff_base = 2 + backoff_deadline = "5mins" "#; let decoded: KafkaConfig = toml::from_str(toml_str).unwrap(); let expected = KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 32, - topic_name_prefix: "greptimedb_wal_kafka_topic".to_string(), - num_partitions: 1, compression: RsKafkaCompression::default(), max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), max_wait_time: Duration::from_millis(100), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2, + backoff_deadline: Some(Duration::from_secs(60 * 5)), }; assert_eq!(decoded, expected); } diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index c645f7c3607b..eb6795054141 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -19,7 +19,7 @@ use rskafka::client::partition::Compression as RsKafkaCompression; use serde::{Deserialize, Serialize}; /// Topic name prefix. -pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_kafka_topic"; +pub const TOPIC_NAME_PREFIX: &str = "greptimedb_wal_topic"; /// Kafka wal topic. pub type Topic = String; @@ -29,37 +29,45 @@ pub type Topic = String; pub struct KafkaConfig { /// The broker endpoints of the Kafka cluster. pub broker_endpoints: Vec, - /// Number of topics shall be created beforehand. - pub num_topics: usize, - /// Topic name prefix. - pub topic_name_prefix: String, - /// Number of partitions per topic. - pub num_partitions: i32, /// The compression algorithm used to compress log entries. #[serde(skip)] #[serde(default)] pub compression: RsKafkaCompression, - /// The maximum log size an rskakfa batch producer could buffer. + /// The maximum log size a kakfa batch producer could buffer. pub max_batch_size: ReadableSize, - /// The linger duration of an rskafka batch producer. + /// The linger duration of a kafka batch producer. #[serde(with = "humantime_serde")] pub linger: Duration, /// The maximum amount of time (in milliseconds) to wait for Kafka records to be returned. #[serde(with = "humantime_serde")] pub max_wait_time: Duration, + /// The initial backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_init: Duration, + /// The maximum backoff for kafka clients. + #[serde(with = "humantime_serde")] + pub backoff_max: Duration, + /// Exponential backoff rate, i.e. next backoff = base * current backoff. + // Sets to u32 type since some structs containing the KafkaConfig need to derive the Eq trait. + pub backoff_base: u32, + /// Stop reconnecting if the total wait time reaches the deadline. + /// If it's None, the reconnecting won't terminate. + #[serde(with = "humantime_serde")] + pub backoff_deadline: Option, } impl Default for KafkaConfig { fn default() -> Self { Self { broker_endpoints: vec!["127.0.0.1:9090".to_string()], - num_topics: 64, - topic_name_prefix: TOPIC_NAME_PREFIX.to_string(), - num_partitions: 1, compression: RsKafkaCompression::NoCompression, max_batch_size: ReadableSize::mb(4), linger: Duration::from_millis(200), max_wait_time: Duration::from_millis(100), + backoff_init: Duration::from_millis(500), + backoff_max: Duration::from_secs(10), + backoff_base: 2, + backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins } } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 79c3e9316c59..519d8ec7a1af 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -304,7 +304,7 @@ pub enum Error { }, #[snafu(display( - "Failed to build a kafka client, broker endpoints: {:?}", + "Failed to build a Kafka client, broker endpoints: {:?}", broker_endpoints ))] BuildKafkaClient { @@ -314,14 +314,14 @@ pub enum Error { error: rskafka::client::error::Error, }, - #[snafu(display("Failed to build a kafka controller client"))] + #[snafu(display("Failed to build a Kafka controller client"))] BuildKafkaCtrlClient { location: Location, #[snafu(source)] error: rskafka::client::error::Error, }, - #[snafu(display("Failed to create a kafka wal topic"))] + #[snafu(display("Failed to create a Kafka wal topic"))] CreateKafkaWalTopic { location: Location, #[snafu(source)] diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 0e1576c3645e..f34a5224a87e 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -69,13 +69,13 @@ mod tests { broker_endpoints = ["127.0.0.1:9090"] num_topics = 32 selector_type = "round_robin" - topic_name_prefix = "greptimedb_wal_kafka" + topic_name_prefix = "greptimedb_wal_topic" num_partitions = 1 replication_factor = 3 create_topic_timeout = "30s" backoff_init = "500ms" backoff_max = "10s" - backoff_base = 2.0 + backoff_base = 2 backoff_deadline = "5mins" "#; let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); @@ -83,13 +83,13 @@ mod tests { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 32, selector_type: KafkaTopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_kafka".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), backoff_init: Duration::from_millis(500), backoff_max: Duration::from_secs(10), - backoff_base: 2.0, + backoff_base: 2, backoff_deadline: Some(Duration::from_secs(60 * 5)), }; assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 709c5a4f9879..173a74662d95 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -49,7 +49,9 @@ pub struct KafkaConfig { #[serde(with = "humantime_serde")] pub backoff_max: Duration, /// Exponential backoff rate, i.e. next backoff = base * current backoff. - pub backoff_base: f64, + // Sets to u32 type since the `backoff_base` field in the KafkaConfig for datanode is of type u32, + // and we want to unify their types. + pub backoff_base: u32, /// Stop reconnecting if the total wait time reaches the deadline. /// If it's None, the reconnecting won't terminate. #[serde(with = "humantime_serde")] @@ -62,13 +64,13 @@ impl Default for KafkaConfig { broker_endpoints: vec!["127.0.0.1:9090".to_string()], num_topics: 64, selector_type: TopicSelectorType::RoundRobin, - topic_name_prefix: "greptimedb_wal_kafka".to_string(), + topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, replication_factor: 3, create_topic_timeout: Duration::from_secs(30), backoff_init: Duration::from_millis(500), backoff_max: Duration::from_secs(10), - backoff_base: 2.0, + backoff_base: 2, backoff_deadline: Some(Duration::from_secs(60 * 5)), // 5 mins } } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 4773871ae134..9b86d2a382f9 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -105,7 +105,7 @@ impl TopicManager { let backoff_config = BackoffConfig { init_backoff: self.config.backoff_init, max_backoff: self.config.backoff_max, - base: self.config.backoff_base, + base: self.config.backoff_base as f64, deadline: self.config.backoff_deadline, }; let client = ClientBuilder::new(self.config.broker_endpoints.clone()) @@ -181,7 +181,7 @@ mod tests { #[tokio::test] async fn test_restore_persisted_topics() { let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let topic_name_prefix = "greptimedb_wal_kafka"; + let topic_name_prefix = "greptimedb_wal_topic"; let num_topics = 16; // Constructs mock topics. diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 97001b8b5408..0a0206eddc66 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -504,8 +504,11 @@ impl DatanodeBuilder { /// Builds [KafkaLogStore]. async fn build_kafka_log_store(config: &KafkaConfig) -> Result> { - let _ = config; - todo!() + KafkaLogStore::try_new(config) + .await + .map_err(Box::new) + .context(OpenLogStoreSnafu) + .map(Arc::new) } /// Builds [ObjectStoreManager] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index a24a48d84be2..1ee344046adc 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -14,6 +14,7 @@ use std::any::Any; +use common_config::wal::KafkaWalTopic; use common_error::ext::ErrorExt; use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; @@ -84,6 +85,90 @@ pub enum Error { attempt_index: u64, location: Location, }, + + #[snafu(display( + "Failed to build a Kafka client, broker endpoints: {:?}", + broker_endpoints + ))] + BuildClient { + broker_endpoints: Vec, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display( + "Failed to build a Kafka partition client, topic: {}, partition: {}", + topic, + partition + ))] + BuildPartitionClient { + topic: String, + partition: i32, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display( + "Failed to get a Kafka topic client, topic: {}, source: {}", + topic, + error + ))] + GetClient { + topic: KafkaWalTopic, + location: Location, + error: String, + }, + + #[snafu(display("Failed to encode a record meta"))] + EncodeMeta { + location: Location, + #[snafu(source)] + error: serde_json::Error, + }, + + #[snafu(display("Failed to decode a record meta"))] + DecodeMeta { + location: Location, + #[snafu(source)] + error: serde_json::Error, + }, + + #[snafu(display("Missing required key in a record"))] + MissingKey { location: Location }, + + #[snafu(display("Missing required value in a record"))] + MissingValue { location: Location }, + + #[snafu(display("Cannot build a record from empty entries"))] + EmptyEntries { location: Location }, + + #[snafu(display("Failed to produce records to Kafka, topic: {}", topic))] + ProduceRecord { + topic: KafkaWalTopic, + location: Location, + #[snafu(source)] + error: rskafka::client::producer::Error, + }, + + #[snafu(display( + "Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}", + topic, + region_id, + offset, + ))] + ConsumeRecord { + topic: String, + region_id: u64, + offset: i64, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to do a cast"))] + Cast { location: Location }, } impl ErrorExt for Error { diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index ea661c380e0c..5fd4fe326eed 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -12,35 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod client_manager; pub mod log_store; +mod offset; +mod record_utils; use common_meta::wal::KafkaWalTopic as Topic; +use serde::{Deserialize, Serialize}; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::Namespace; use crate::error::Error; /// Kafka Namespace implementation. -#[derive(Debug, PartialEq, Eq, Hash, Clone)] +#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] pub struct NamespaceImpl { region_id: u64, topic: Topic, } -impl NamespaceImpl { - fn new(region_id: u64, topic: Topic) -> Self { - Self { region_id, topic } - } - - fn region_id(&self) -> u64 { - self.region_id - } - - fn topic(&self) -> &Topic { - &self.topic - } -} - impl Namespace for NamespaceImpl { fn id(&self) -> u64 { self.region_id @@ -48,6 +38,7 @@ impl Namespace for NamespaceImpl { } /// Kafka Entry implementation. +#[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { /// Entry payload. data: Vec, @@ -57,16 +48,6 @@ pub struct EntryImpl { ns: NamespaceImpl, } -impl EntryImpl { - fn new(data: Vec, entry_id: EntryId, ns: NamespaceImpl) -> Self { - Self { - data, - id: entry_id, - ns, - } - } -} - impl Entry for EntryImpl { type Error = Error; type Namespace = NamespaceImpl; diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs new file mode 100644 index 000000000000..9aa27bf1b3fd --- /dev/null +++ b/src/log-store/src/kafka/client_manager.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; +use dashmap::mapref::entry::Entry as DashMapEntry; +use dashmap::DashMap; +use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; +use rskafka::client::producer::aggregator::RecordAggregator; +use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; +use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; +use rskafka::BackoffConfig; +use snafu::ResultExt; + +use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; + +// Each topic only has one partition for now. +// The `DEFAULT_PARTITION` refers to the index of the partition. +const DEFAULT_PARTITION: i32 = 0; + +/// Arc wrapper of ClientManager. +pub(crate) type ClientManagerRef = Arc; + +/// A client through which to contact Kafka cluster. Each client associates with one partition of a topic. +/// Since a topic only has one partition in our design, the mapping between clients and topics are one-one. +#[derive(Debug, Clone)] +pub(crate) struct Client { + /// A raw client used to construct a batch producer and/or a stream consumer for a specific topic. + pub(crate) raw_client: Arc, + /// A producer used to buffer log entries for a specific topic before sending them in a batching manner. + pub(crate) producer: Arc>, +} + +impl Client { + /// Creates a Client from the raw client. + pub(crate) fn new(raw_client: Arc, config: &KafkaConfig) -> Self { + let record_aggregator = RecordAggregator::new(config.max_batch_size.as_bytes() as usize); + let batch_producer = BatchProducerBuilder::new(raw_client.clone()) + .with_compression(config.compression) + .with_linger(config.linger) + .build(record_aggregator); + + Self { + raw_client, + producer: Arc::new(batch_producer), + } + } +} + +/// Manages client construction and accesses. +#[derive(Debug)] +pub(crate) struct ClientManager { + config: KafkaConfig, + /// Top-level client in kafka. All clients are constructed by this client. + client_factory: RsKafkaClient, + /// A pool maintaining a collection of clients. + /// Key: a topic. Value: the associated client of the topic. + client_pool: DashMap, +} + +impl ClientManager { + /// Tries to create a ClientManager. + pub(crate) async fn try_new(config: &KafkaConfig) -> Result { + // Sets backoff config for the top-level kafka client and all clients constructed by it. + let backoff_config = BackoffConfig { + init_backoff: config.backoff_init, + max_backoff: config.backoff_max, + base: config.backoff_base as f64, + deadline: config.backoff_deadline, + }; + let client = ClientBuilder::new(config.broker_endpoints.clone()) + .backoff_config(backoff_config) + .build() + .await + .with_context(|_| BuildClientSnafu { + broker_endpoints: config.broker_endpoints.clone(), + })?; + + Ok(Self { + config: config.clone(), + client_factory: client, + client_pool: DashMap::new(), + }) + } + + /// Gets the client associated with the topic. If the client does not exist, a new one will + /// be created and returned. + pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { + match self.client_pool.entry(topic.to_string()) { + DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), + DashMapEntry::Vacant(entry) => { + let topic_client = self.try_create_client(topic).await?; + Ok(entry.insert(topic_client).clone()) + } + } + } + + async fn try_create_client(&self, topic: &Topic) -> Result { + // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. + // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. + // The reconnecting won't stop until succeed or a different error returns. + let raw_client = self + .client_factory + .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry) + .await + .context(BuildPartitionClientSnafu { + topic, + partition: DEFAULT_PARTITION, + }) + .map(Arc::new)?; + + Ok(Client::new(raw_client, &self.config)) + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 5aa2d9ec7d52..4ff054712ff3 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -13,22 +13,37 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use common_config::wal::{KafkaConfig, WalOptions}; +use futures_util::StreamExt; +use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; use store_api::logstore::entry::Id as EntryId; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; use crate::error::{Error, Result}; +use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; +use crate::kafka::offset::Offset; +use crate::kafka::record_utils::{handle_consume_result, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; +/// A log store backed by Kafka. #[derive(Debug)] -pub struct KafkaLogStore; +pub struct KafkaLogStore { + config: KafkaConfig, + /// Manages kafka clients through which the log store contact the Kafka cluster. + client_manager: ClientManagerRef, +} impl KafkaLogStore { - pub async fn try_new(config: KafkaConfig) -> Result { - todo!() + /// Tries to create a Kafka log store. + pub async fn try_new(config: &KafkaConfig) -> Result { + Ok(Self { + client_manager: Arc::new(ClientManager::try_new(config).await?), + config: config.clone(), + }) } } @@ -38,68 +53,135 @@ impl LogStore for KafkaLogStore { type Entry = EntryImpl; type Namespace = NamespaceImpl; - /// Create an entry of the associate Entry type. + /// Creates an entry of the associated Entry type. fn entry>( &self, data: D, entry_id: EntryId, ns: Self::Namespace, ) -> Self::Entry { - EntryImpl::new(data.as_ref().to_vec(), entry_id, ns) + EntryImpl { + data: data.as_ref().to_vec(), + id: entry_id, + ns, + } } - /// Append an `Entry` to WAL with given namespace and return append response containing - /// the entry id. + /// Appends an entry to the log store and returns a response containing the entry id of the appended entry. async fn append(&self, entry: Self::Entry) -> Result { - todo!() + let entry_id = RecordProducer::new(entry.ns.clone()) + .with_entries(vec![entry]) + .produce(&self.client_manager) + .await + .map(TryInto::try_into)??; + Ok(AppendResponse { + last_entry_id: entry_id, + }) } - /// For a batch of log entries belonging to multiple regions, each assigned to a specific topic, - /// we need to determine the minimum log offset returned for each region in this batch. - /// During replay, we use this offset to fetch log entries for a region from its assigned topic. - /// After fetching, we filter the entries to obtain log entries relevant to that specific region. + /// Appends a batch of entries and returns a response containing a map where the key is a region id + /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { - todo!() + if entries.is_empty() { + return Ok(AppendBatchResponse::default()); + } + + // Groups entries by region id and pushes them to an associated record producer. + let mut producers = HashMap::with_capacity(entries.len()); + for entry in entries { + producers + .entry(entry.ns.region_id) + .or_insert(RecordProducer::new(entry.ns.clone())) + .push(entry); + } + + // Builds a record from entries belong to a region and produces them to kafka server. + let region_ids = producers.keys().cloned().collect::>(); + let tasks = producers + .into_values() + .map(|producer| producer.produce(&self.client_manager)) + .collect::>(); + // Each produce operation returns a kafka offset of the produced record. + // The offsets are then converted to entry ids. + let entry_ids = futures::future::try_join_all(tasks) + .await? + .into_iter() + .map(TryInto::try_into) + .collect::>>()?; + Ok(AppendBatchResponse { + last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), + }) } - /// Create a new `EntryStream` to asynchronously generates `Entry` with ids - /// starting from `id`. The generated entries will be filtered by the namespace. + /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids + /// starting from `entry_id`. The generated entries will be filtered by the namespace. async fn read( &self, ns: &Self::Namespace, entry_id: EntryId, ) -> Result> { - todo!() + let topic = ns.topic.clone(); + let region_id = ns.region_id; + + // Gets the client associated with the topic. + let client = self + .client_manager + .get_or_insert(&topic) + .await? + .raw_client + .clone(); + + // Reads the entries starting from exactly the specified offset. + let offset = Offset::try_from(entry_id)?.0; + let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) + .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) + .with_max_wait_ms(self.config.max_wait_time.as_millis() as i32) + .build(); + let stream = async_stream::stream!({ + while let Some(consume_result) = stream_consumer.next().await { + yield handle_consume_result(consume_result, &topic, region_id, offset); + } + }); + Ok(Box::pin(stream)) } - /// Create a namespace of the associate Namespace type + /// Creates a namespace of the associated Namespace type. fn namespace(&self, ns_id: NamespaceId, wal_options: &WalOptions) -> Self::Namespace { - todo!() + // Safety: upon start, the datanode checks the consistency of the wal providers in the wal config of the + // datanode and that of the metasrv. Therefore, the wal options passed into the kafka log store + // must be of type WalOptions::Kafka. + let WalOptions::Kafka(kafka_options) = wal_options else { + unreachable!() + }; + NamespaceImpl { + region_id: ns_id, + topic: kafka_options.topic.clone(), + } } - /// Create a new `Namespace`. + /// Creates a new `Namespace` from the given ref. async fn create_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } - /// Delete an existing `Namespace` with given ref. + /// Deletes an existing `Namespace` specified by the given ref. async fn delete_namespace(&self, _ns: &Self::Namespace) -> Result<()> { Ok(()) } - /// List all existing namespaces. + /// Lists all existing namespaces. async fn list_namespaces(&self) -> Result> { Ok(vec![]) } - /// Mark all entry ids `<=id` of given `namespace` as obsolete so that logstore can safely delete - /// the log files if all entries inside are obsolete. This method may not delete log - /// files immediately. + /// Marks all entries with ids `<=entry_id` of the given `namespace` as obsolete, + /// so that the log store can safely delete those entries. This method does not guarantee + /// that the obsolete entries are deleted immediately. async fn obsolete(&self, _ns: Self::Namespace, _entry_id: EntryId) -> Result<()> { Ok(()) } - /// Stop components of logstore. + /// Stops components of the logstore. async fn stop(&self) -> Result<()> { Ok(()) } diff --git a/src/log-store/src/kafka/offset.rs b/src/log-store/src/kafka/offset.rs new file mode 100644 index 000000000000..8c1c66b9f9f5 --- /dev/null +++ b/src/log-store/src/kafka/offset.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::error::{CastSnafu, Result}; +use crate::kafka::EntryId; + +/// A wrapper of kafka offset. +pub(crate) struct Offset(pub i64); + +impl TryFrom for EntryId { + type Error = crate::error::Error; + + fn try_from(offset: Offset) -> Result { + EntryId::try_from(offset.0).map_err(|_| CastSnafu.build()) + } +} + +impl TryFrom for Offset { + type Error = crate::error::Error; + + fn try_from(entry_id: EntryId) -> Result { + i64::try_from(entry_id) + .map(Offset) + .map_err(|_| CastSnafu.build()) + } +} diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs new file mode 100644 index 000000000000..37a66acfbdb3 --- /dev/null +++ b/src/log-store/src/kafka/record_utils.rs @@ -0,0 +1,219 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_config::wal::KafkaWalTopic as Topic; +use rskafka::record::{Record, RecordAndOffset}; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; + +use crate::error::{ + ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, + MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, +}; +use crate::kafka::client_manager::ClientManagerRef; +use crate::kafka::offset::Offset; +use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; + +type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; + +/// Record metadata which will be serialized/deserialized to/from the `key` of a Record. +#[derive(Debug, Serialize, Deserialize, PartialEq)] +struct RecordMeta { + /// Meta version. Used for backward compatibility. + version: u32, + /// The namespace of the entries wrapped in the record. + ns: NamespaceImpl, + /// Ids of the entries built into the record. + entry_ids: Vec, + /// entry_offsets[i] is the end offset (exclusive) of the data of the i-th entry in the record value. + entry_offsets: Vec, +} + +impl RecordMeta { + fn new(ns: NamespaceImpl, entries: &[EntryImpl]) -> Self { + Self { + version: 0, + ns, + entry_ids: entries.iter().map(|entry| entry.id).collect(), + entry_offsets: entries + .iter() + .map(|entry| entry.data.len()) + .scan(0, |presum, x| { + *presum += x; + Some(*presum) + }) + .collect(), + } + } +} + +/// Produces a record to a kafka topic. +pub(crate) struct RecordProducer { + /// The namespace of the entries. + ns: NamespaceImpl, + /// Entries are buffered before being built into a record. + entries: Vec, +} + +impl RecordProducer { + /// Creates a new producer for producing entries with the given namespace. + pub(crate) fn new(ns: NamespaceImpl) -> Self { + Self { + ns, + entries: Vec::new(), + } + } + + /// Populates the entry buffer with the given entries. + pub(crate) fn with_entries(self, entries: Vec) -> Self { + Self { entries, ..self } + } + + /// Pushes an entry into the entry buffer. + pub(crate) fn push(&mut self, entry: EntryImpl) { + self.entries.push(entry); + } + + /// Produces the buffered entries to kafka sever as a kafka record. + /// Returns the kafka offset of the produced record. + // TODO(niebayes): since the total size of a region's entries may be way-too large, + // the producer may need to support splitting entries into multiple records. + pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { + ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); + + // Produces the record through a client. The client determines when to send the record to kafka server. + let client = client_manager + .get_or_insert(&self.ns.topic) + .await + .map_err(|e| { + GetClientSnafu { + topic: &self.ns.topic, + error: e.to_string(), + } + .build() + })?; + client + .producer + .produce(encode_to_record(self.ns.clone(), self.entries)?) + .await + .map(Offset) + .context(ProduceRecordSnafu { + topic: &self.ns.topic, + }) + } +} + +fn encode_to_record(ns: NamespaceImpl, entries: Vec) -> Result { + let meta = RecordMeta::new(ns, &entries); + let data = entries.into_iter().flat_map(|entry| entry.data).collect(); + Ok(Record { + key: Some(serde_json::to_vec(&meta).context(EncodeMetaSnafu)?), + value: Some(data), + timestamp: rskafka::chrono::Utc::now(), + headers: Default::default(), + }) +} + +fn decode_from_record(record: Record) -> Result> { + let key = record.key.context(MissingKeySnafu)?; + let value = record.value.context(MissingValueSnafu)?; + let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?; + + let mut entries = Vec::with_capacity(meta.entry_ids.len()); + let mut start_offset = 0; + for (i, end_offset) in meta.entry_offsets.iter().enumerate() { + entries.push(EntryImpl { + // TODO(niebayes): try to avoid the clone. + data: value[start_offset..*end_offset].to_vec(), + id: meta.entry_ids[i], + ns: meta.ns.clone(), + }); + start_offset = *end_offset; + } + Ok(entries) +} + +/// Handles the result of a consume operation on a kafka topic. +pub(crate) fn handle_consume_result( + result: ConsumeResult, + topic: &Topic, + region_id: u64, + offset: i64, +) -> Result> { + match result { + Ok((record_and_offset, _)) => { + // Only produces entries belong to the region with the given region id. + // Since a record only contains entries from a single region, it suffices to check the first entry only. + let entries = decode_from_record(record_and_offset.record)?; + if let Some(entry) = entries.first() + && entry.id == region_id + { + Ok(entries) + } else { + Ok(vec![]) + } + } + Err(e) => Err(e).context(ConsumeRecordSnafu { + topic, + region_id, + offset, + }), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { + EntryImpl { + data: data.as_ref().to_vec(), + id: entry_id, + ns, + } + } + + #[test] + fn test_serde_record_meta() { + let ns = NamespaceImpl { + region_id: 1, + topic: "test_topic".to_string(), + }; + let entries = vec![ + new_test_entry(b"111", 1, ns.clone()), + new_test_entry(b"2222", 2, ns.clone()), + new_test_entry(b"33333", 3, ns.clone()), + ]; + let meta = RecordMeta::new(ns, &entries); + let encoded = serde_json::to_vec(&meta).unwrap(); + let decoded: RecordMeta = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(meta, decoded); + } + + #[test] + fn test_encdec_record() { + let ns = NamespaceImpl { + region_id: 1, + topic: "test_topic".to_string(), + }; + let entries = vec![ + new_test_entry(b"111", 1, ns.clone()), + new_test_entry(b"2222", 2, ns.clone()), + new_test_entry(b"33333", 3, ns.clone()), + ]; + let record = encode_to_record(ns, entries.clone()).unwrap(); + let decoded_entries = decode_from_record(record).unwrap(); + assert_eq!(entries, decoded_entries); + } +} diff --git a/src/log-store/src/lib.rs b/src/log-store/src/lib.rs index 901a202f2a48..7c57d6d2e766 100644 --- a/src/log-store/src/lib.rs +++ b/src/log-store/src/lib.rs @@ -15,7 +15,6 @@ #![feature(let_chains)] pub mod error; -#[allow(unused)] pub mod kafka; mod noop; pub mod raft_engine; diff --git a/src/log-store/src/noop.rs b/src/log-store/src/noop.rs index 694641156fa4..48439333acba 100644 --- a/src/log-store/src/noop.rs +++ b/src/log-store/src/noop.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; - use common_config::wal::WalOptions; use store_api::logstore::entry::{Entry, Id as EntryId}; use store_api::logstore::namespace::{Id as NamespaceId, Namespace}; @@ -65,15 +63,11 @@ impl LogStore for NoopLogStore { } async fn append(&self, mut _e: Self::Entry) -> Result { - Ok(AppendResponse { - last_entry_id: Default::default(), - }) + Ok(AppendResponse::default()) } async fn append_batch(&self, _e: Vec) -> Result { - Ok(AppendBatchResponse { - last_entry_ids: HashMap::new(), - }) + Ok(AppendBatchResponse::default()) } async fn read( diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index fd08f2d6522b..98d7c00ab366 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -49,6 +49,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. + // TODO(niebayes): update docs for entry id. async fn read( &self, ns: &Self::Namespace, @@ -79,7 +80,7 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { } /// The response of an `append` operation. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct AppendResponse { /// The id of the entry appended to the log store. pub last_entry_id: EntryId,