From c797241be9f0d2c5cc77e4b1faaf34a94d226595 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 26 Dec 2023 17:57:46 +0800 Subject: [PATCH] feat(remote_wal): integrate remote wal to standalone (#3001) * feat: integrate remote wal to standalone * fix: test * chore: ready to debug kafka remote wal * fix: test * chore: add some logs for remote wal * chore: add logs for topic manager * fix: properly terminate stream consumer * fix: properly handle TopicAlreadyExists error * fix: parse config file error * fix: properly handle last entry id * chore: prepare for merge * fix: test * fix: typo * fix: set replication_factor properly * fix: CR --- config/datanode.example.toml | 2 +- config/metasrv.example.toml | 4 +- config/standalone.example.toml | 4 +- src/common/config/src/wal/kafka.rs | 7 +- src/common/meta/src/wal.rs | 4 +- src/common/meta/src/wal/kafka.rs | 7 +- .../meta/src/wal/kafka/topic_manager.rs | 63 ++++++++++----- src/log-store/src/error.rs | 21 ++--- src/log-store/src/kafka.rs | 20 +++++ src/log-store/src/kafka/log_store.rs | 76 +++++++++++++++++-- src/log-store/src/kafka/record_utils.rs | 39 +--------- src/mito2/src/region/opener.rs | 4 + src/store-api/src/logstore.rs | 1 - 13 files changed, 172 insertions(+), 80 deletions(-) diff --git a/config/datanode.example.toml b/config/datanode.example.toml index b17bf313d38a..bd3f8fc2eec9 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -56,7 +56,7 @@ sync_write = false # produce_record_timeout = "100ms" # backoff_init = "500ms" # backoff_max = "10s" -# backoff_base = 2.0 +# backoff_base = 2 # backoff_deadline = "5mins" # Storage options, see `standalone.example.toml`. diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 313ed4b10ac2..ff05a9c095e8 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -65,7 +65,7 @@ provider = "raft_engine" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. -# replication_factor = 3 +# replication_factor = 1 # Above which a topic creation operation will be cancelled. # create_topic_timeout = "30s" # The initial backoff for kafka clients. @@ -73,7 +73,7 @@ provider = "raft_engine" # The maximum backoff for kafka clients. # backoff_max = "10s" # Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2.0 +# backoff_base = 2 # Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. # backoff_deadline = "5mins" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 77f4b4c3cdb5..7db8477ec78e 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -103,7 +103,7 @@ provider = "raft_engine" # Number of partitions per topic. # num_partitions = 1 # Expected number of replicas of each partition. -# replication_factor = 3 +# replication_factor = 1 # The maximum log size a kafka batch producer could buffer. # max_batch_size = "4MB" @@ -119,7 +119,7 @@ provider = "raft_engine" # The maximum backoff for kafka clients. # backoff_max = "10s" # Exponential backoff rate, i.e. next backoff = base * current backoff. -# backoff_base = 2.0 +# backoff_base = 2 # Stop reconnecting if the total wait time reaches the deadline. If this config is missing, the reconnecting won't terminate. # backoff_deadline = "5mins" diff --git a/src/common/config/src/wal/kafka.rs b/src/common/config/src/wal/kafka.rs index d1d1a615a370..e93aa6cb2271 100644 --- a/src/common/config/src/wal/kafka.rs +++ b/src/common/config/src/wal/kafka.rs @@ -121,13 +121,16 @@ pub struct StandaloneKafkaConfig { impl Default for StandaloneKafkaConfig { fn default() -> Self { + let base = KafkaConfig::default(); + let replication_factor = base.broker_endpoints.len() as i16; + Self { - base: KafkaConfig::default(), + base, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor, create_topic_timeout: Duration::from_secs(30), } } diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 1e394e847985..853c6fa5df63 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -89,7 +89,7 @@ mod tests { selector_type = "round_robin" topic_name_prefix = "greptimedb_wal_topic" num_partitions = 1 - replication_factor = 3 + replication_factor = 1 create_topic_timeout = "30s" backoff_init = "500ms" backoff_max = "10s" @@ -103,7 +103,7 @@ mod tests { selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor: 1, create_topic_timeout: Duration::from_secs(30), backoff: KafkaBackoffConfig { init: Duration::from_millis(500), diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs index 8c9cdea92594..0a61b6015dfc 100644 --- a/src/common/meta/src/wal/kafka.rs +++ b/src/common/meta/src/wal/kafka.rs @@ -50,13 +50,16 @@ pub struct KafkaConfig { impl Default for KafkaConfig { fn default() -> Self { + let broker_endpoints = vec!["127.0.0.1:9092".to_string()]; + let replication_factor = broker_endpoints.len() as i16; + Self { - broker_endpoints: vec!["127.0.0.1:9092".to_string()], + broker_endpoints, num_topics: 64, selector_type: TopicSelectorType::RoundRobin, topic_name_prefix: "greptimedb_wal_topic".to_string(), num_partitions: 1, - replication_factor: 3, + replication_factor, create_topic_timeout: Duration::from_secs(30), backoff: KafkaBackoffConfig::default(), } diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs index 1e7964f85601..860192b97071 100644 --- a/src/common/meta/src/wal/kafka/topic_manager.rs +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -17,10 +17,13 @@ use std::sync::Arc; use std::time::Duration; use common_config::wal::kafka::TopicSelectorType; -use common_telemetry::debug; +use common_telemetry::{debug, error, info}; +use rskafka::client::controller::ControllerClient; +use rskafka::client::error::Error as RsKafkaError; +use rskafka::client::error::ProtocolError::TopicAlreadyExists; use rskafka::client::ClientBuilder; use rskafka::BackoffConfig; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, AsErrorSource, ResultExt}; use crate::error::{ BuildKafkaClientSnafu, BuildKafkaCtrlClientSnafu, CreateKafkaWalTopicSnafu, DecodeJsonSnafu, @@ -79,7 +82,6 @@ impl TopicManager { .await? .into_iter() .collect::>(); - debug!("Restored {} topics", created_topics.len()); // Creates missing topics. let to_be_created = topics @@ -92,10 +94,10 @@ impl TopicManager { Some(i) }) .collect::>(); + if !to_be_created.is_empty() { self.try_create_topics(topics, &to_be_created).await?; Self::persist_created_topics(topics, &self.kv_backend).await?; - debug!("Persisted {} topics", topics.len()); } Ok(()) } @@ -119,23 +121,12 @@ impl TopicManager { .controller_client() .context(BuildKafkaCtrlClientSnafu)?; - // Spawns tokio tasks for creating missing topics. + // Try to create missing topics. let tasks = to_be_created .iter() - .map(|i| { - client.create_topic( - topics[*i].clone(), - self.config.num_partitions, - self.config.replication_factor, - self.config.create_topic_timeout.as_millis() as i32, - ) - }) + .map(|i| self.try_create_topic(&topics[*i], &client)) .collect::>(); - // FIXME(niebayes): try to create an already-exist topic would raise an error. - futures::future::try_join_all(tasks) - .await - .context(CreateKafkaWalTopicSnafu) - .map(|_| ()) + futures::future::try_join_all(tasks).await.map(|_| ()) } /// Selects one topic from the topic pool through the topic selector. @@ -150,6 +141,32 @@ impl TopicManager { .collect() } + async fn try_create_topic(&self, topic: &Topic, client: &ControllerClient) -> Result<()> { + match client + .create_topic( + topic.clone(), + self.config.num_partitions, + self.config.replication_factor, + self.config.create_topic_timeout.as_millis() as i32, + ) + .await + { + Ok(_) => { + info!("Successfully created topic {}", topic); + Ok(()) + } + Err(e) => { + if Self::is_topic_already_exist_err(&e) { + info!("The topic {} already exists", topic); + Ok(()) + } else { + error!("Failed to create a topic {}, error {:?}", topic, e); + Err(e).context(CreateKafkaWalTopicSnafu) + } + } + } + } + async fn restore_created_topics(kv_backend: &KvBackendRef) -> Result> { kv_backend .get(CREATED_TOPICS_KEY.as_bytes()) @@ -171,6 +188,16 @@ impl TopicManager { .await .map(|_| ()) } + + fn is_topic_already_exist_err(e: &RsKafkaError) -> bool { + matches!( + e, + &RsKafkaError::ServerError { + protocol_error: TopicAlreadyExists, + .. + } + ) + } } #[cfg(test)] diff --git a/src/log-store/src/error.rs b/src/log-store/src/error.rs index 1ee344046adc..7f475e2076a8 100644 --- a/src/log-store/src/error.rs +++ b/src/log-store/src/error.rs @@ -20,6 +20,8 @@ use common_macro::stack_trace_debug; use common_runtime::error::Error as RuntimeError; use snafu::{Location, Snafu}; +use crate::kafka::NamespaceImpl as KafkaNamespace; + #[derive(Snafu)] #[snafu(visibility(pub))] #[stack_trace_debug] @@ -152,16 +154,17 @@ pub enum Error { error: rskafka::client::producer::Error, }, - #[snafu(display( - "Failed to read a record from Kafka, topic: {}, region_id: {}, offset: {}", - topic, - region_id, - offset, - ))] + #[snafu(display("Failed to read a record from Kafka, ns: {}", ns))] ConsumeRecord { - topic: String, - region_id: u64, - offset: i64, + ns: KafkaNamespace, + location: Location, + #[snafu(source)] + error: rskafka::client::error::Error, + }, + + #[snafu(display("Failed to get the latest offset, ns: {}", ns))] + GetOffset { + ns: KafkaNamespace, location: Location, #[snafu(source)] error: rskafka::client::error::Error, diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 5fd4fe326eed..fefa7823c5c2 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -17,6 +17,8 @@ pub mod log_store; mod offset; mod record_utils; +use std::fmt::Display; + use common_meta::wal::KafkaWalTopic as Topic; use serde::{Deserialize, Serialize}; use store_api::logstore::entry::{Entry, Id as EntryId}; @@ -37,6 +39,12 @@ impl Namespace for NamespaceImpl { } } +impl Display for NamespaceImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", self.topic, self.region_id) + } +} + /// Kafka Entry implementation. #[derive(Debug, PartialEq, Clone)] pub struct EntryImpl { @@ -64,3 +72,15 @@ impl Entry for EntryImpl { self.ns.clone() } } + +impl Display for EntryImpl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Entry (ns: {}, id: {}, data_len: {})", + self.ns, + self.id, + self.data.len() + ) + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index b9f06e06c723..03b2dbbaf0ae 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -16,17 +16,20 @@ use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, WalOptions}; +use common_telemetry::{debug, warn}; use futures_util::StreamExt; use rskafka::client::consumer::{StartOffset, StreamConsumerBuilder}; +use rskafka::client::partition::OffsetAt; +use snafu::ResultExt; use store_api::logstore::entry::Id as EntryId; use store_api::logstore::entry_stream::SendableEntryStream; use store_api::logstore::namespace::Id as NamespaceId; use store_api::logstore::{AppendBatchResponse, AppendResponse, LogStore}; -use crate::error::{Error, Result}; +use crate::error::{ConsumeRecordSnafu, Error, GetOffsetSnafu, Result}; use crate::kafka::client_manager::{ClientManager, ClientManagerRef}; use crate::kafka::offset::Offset; -use crate::kafka::record_utils::{handle_consume_result, RecordProducer}; +use crate::kafka::record_utils::{decode_from_record, RecordProducer}; use crate::kafka::{EntryImpl, NamespaceImpl}; /// A log store backed by Kafka. @@ -82,6 +85,8 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { + debug!("LogStore handles append_batch with entries {:?}", entries); + if entries.is_empty() { return Ok(AppendBatchResponse::default()); } @@ -97,6 +102,7 @@ impl LogStore for KafkaLogStore { // Builds a record from entries belong to a region and produces them to kafka server. let region_ids = producers.keys().cloned().collect::>(); + let tasks = producers .into_values() .map(|producer| producer.produce(&self.client_manager)) @@ -108,6 +114,8 @@ impl LogStore for KafkaLogStore { .into_iter() .map(TryInto::try_into) .collect::>>()?; + debug!("The entries are appended at offsets {:?}", entry_ids); + Ok(AppendBatchResponse { last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), }) @@ -131,15 +139,71 @@ impl LogStore for KafkaLogStore { .raw_client .clone(); - // Reads the entries starting from exactly the specified offset. - let offset = Offset::try_from(entry_id)?.0; - let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(offset)) + // Gets the offset of the latest record in the topic. Actually, it's the latest record of the single partition in the topic. + // The read operation terminates when this record is consumed. + // Warning: the `get_offset` returns the end offset of the latest record. For our usage, it should be decremented. + let end_offset = client + .get_offset(OffsetAt::Latest) + .await + .context(GetOffsetSnafu { ns: ns.clone() })? + - 1; + // Reads entries with offsets in the range [start_offset, end_offset). + let start_offset = Offset::try_from(entry_id)?.0; + + // Abort if there're no new entries. + // FIXME(niebayes): how come this case happens? + if start_offset > end_offset { + warn!( + "No new entries for ns {} in range [{}, {})", + ns, start_offset, end_offset + ); + return Ok(futures_util::stream::empty().boxed()); + } + + let mut stream_consumer = StreamConsumerBuilder::new(client, StartOffset::At(start_offset)) .with_max_batch_size(self.config.max_batch_size.as_bytes() as i32) .with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32) .build(); + + debug!( + "Built a stream consumer for ns {} to consume entries in range [{}, {})", + ns, start_offset, end_offset + ); + + let ns_clone = ns.clone(); let stream = async_stream::stream!({ while let Some(consume_result) = stream_consumer.next().await { - yield handle_consume_result(consume_result, &topic, region_id, offset); + // Each next will prdoce a `RecordAndOffset` and a high watermark offset. + // The `RecordAndOffset` contains the record data and its start offset. + // The high watermark offset is the end offset of the latest record in the partition. + let (record, high_watermark) = consume_result.context(ConsumeRecordSnafu { + ns: ns_clone.clone(), + })?; + let record_offset = record.offset; + debug!( + "Read a record at offset {} for ns {}, high watermark: {}", + record_offset, ns_clone, high_watermark + ); + + let entries = decode_from_record(record.record)?; + + // Filters entries by region id. + if let Some(entry) = entries.first() + && entry.ns.region_id == region_id + { + yield Ok(entries); + } else { + yield Ok(vec![]); + } + + // Terminates the stream if the entry with the end offset was read. + if record_offset >= end_offset { + debug!( + "Stream consumer for ns {} terminates at offset {}", + ns_clone, record_offset + ); + break; + } } }); Ok(Box::pin(stream)) diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 37a66acfbdb3..3707b873f3e3 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -12,21 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_config::wal::KafkaWalTopic as Topic; -use rskafka::record::{Record, RecordAndOffset}; +use rskafka::record::Record; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use crate::error::{ - ConsumeRecordSnafu, DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, - MissingKeySnafu, MissingValueSnafu, ProduceRecordSnafu, Result, + DecodeMetaSnafu, EmptyEntriesSnafu, EncodeMetaSnafu, GetClientSnafu, MissingKeySnafu, + MissingValueSnafu, ProduceRecordSnafu, Result, }; use crate::kafka::client_manager::ClientManagerRef; use crate::kafka::offset::Offset; use crate::kafka::{EntryId, EntryImpl, NamespaceImpl}; -type ConsumeResult = std::result::Result<(RecordAndOffset, i64), rskafka::client::error::Error>; - /// Record metadata which will be serialized/deserialized to/from the `key` of a Record. #[derive(Debug, Serialize, Deserialize, PartialEq)] struct RecordMeta { @@ -125,7 +122,7 @@ fn encode_to_record(ns: NamespaceImpl, entries: Vec) -> Result Result> { +pub(crate) fn decode_from_record(record: Record) -> Result> { let key = record.key.context(MissingKeySnafu)?; let value = record.value.context(MissingValueSnafu)?; let meta: RecordMeta = serde_json::from_slice(&key).context(DecodeMetaSnafu)?; @@ -144,34 +141,6 @@ fn decode_from_record(record: Record) -> Result> { Ok(entries) } -/// Handles the result of a consume operation on a kafka topic. -pub(crate) fn handle_consume_result( - result: ConsumeResult, - topic: &Topic, - region_id: u64, - offset: i64, -) -> Result> { - match result { - Ok((record_and_offset, _)) => { - // Only produces entries belong to the region with the given region id. - // Since a record only contains entries from a single region, it suffices to check the first entry only. - let entries = decode_from_record(record_and_offset.record)?; - if let Some(entry) = entries.first() - && entry.id == region_id - { - Ok(entries) - } else { - Ok(vec![]) - } - } - Err(e) => Err(e).context(ConsumeRecordSnafu { - topic, - region_id, - offset, - }), - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index ffb3696a97e5..7b969d578d00 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -256,6 +256,10 @@ impl RegionOpener { let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); if !self.skip_wal_replay { + info!( + "Start replaying memtable at flushed_entry_id {} for region {}", + flushed_entry_id, region_id + ); replay_memtable( wal, &wal_options, diff --git a/src/store-api/src/logstore.rs b/src/store-api/src/logstore.rs index 98d7c00ab366..16809c26b1a1 100644 --- a/src/store-api/src/logstore.rs +++ b/src/store-api/src/logstore.rs @@ -49,7 +49,6 @@ pub trait LogStore: Send + Sync + 'static + std::fmt::Debug { /// Creates a new `EntryStream` to asynchronously generates `Entry` with ids /// starting from `id`. - // TODO(niebayes): update docs for entry id. async fn read( &self, ns: &Self::Namespace,