From ef031eebaada9979e3c957e0767111ba24ecef8d Mon Sep 17 00:00:00 2001 From: niebayes Date: Fri, 29 Dec 2023 17:11:41 +0800 Subject: [PATCH] tmp --- Cargo.lock | 7 +- src/common/test-util/Cargo.toml | 5 +- src/common/test-util/src/wal.rs | 5 + src/common/test-util/src/wal/kafka.rs | 5 +- .../common/test-util/src/wal}/kafka/config.rs | 0 .../common/test-util/src/wal}/kafka/image.rs | 9 +- src/log-store/Cargo.toml | 4 +- src/log-store/src/kafka.rs | 2 +- src/log-store/src/kafka/client_manager.rs | 103 +++- src/log-store/src/kafka/log_store.rs | 55 +- src/log-store/src/kafka/record_utils.rs | 5 + src/log-store/src/test_util.rs | 1 + .../log-store/src/test_util/kafka.rs | 15 +- .../src/test_util}/kafka/entry_builder.rs | 5 +- .../src/test_util/kafka/topic_builder.rs | 94 ++++ tests-integration/src/lib.rs | 1 - tests-integration/src/wal_util/kafka.rs | 16 - tests-integration/tests/main.rs | 2 - tests-integration/tests/wal.rs | 521 +++++++++--------- 19 files changed, 540 insertions(+), 315 deletions(-) rename {tests-integration/src/wal_util => src/common/test-util/src/wal}/kafka/config.rs (100%) rename {tests-integration/src/wal_util => src/common/test-util/src/wal}/kafka/image.rs (95%) rename tests-integration/src/wal_util.rs => src/log-store/src/test_util/kafka.rs (53%) rename src/{common/test-util/src/wal => log-store/src/test_util}/kafka/entry_builder.rs (97%) create mode 100644 src/log-store/src/test_util/kafka/topic_builder.rs delete mode 100644 tests-integration/src/wal_util/kafka.rs diff --git a/Cargo.lock b/Cargo.lock index 09fbe9275128..072d2a5559d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1983,11 +1983,12 @@ dependencies = [ name = "common-test-util" version = "0.5.0" dependencies = [ - "log-store", "once_cell", "rand", - "store-api", + "rskafka", "tempfile", + "testcontainers", + "tokio", ] [[package]] @@ -4542,6 +4543,7 @@ dependencies = [ "async-trait", "byteorder", "bytes", + "chrono", "common-base", "common-config", "common-error", @@ -4550,7 +4552,6 @@ dependencies = [ "common-runtime", "common-telemetry", "common-test-util", - "dashmap", "futures", "futures-util", "protobuf", diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index a1f8def92d01..e3caa859d21f 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -5,8 +5,9 @@ edition.workspace = true license.workspace = true [dependencies] -log-store.workspace = true once_cell.workspace = true rand.workspace = true +rskafka.workspace = true tempfile.workspace = true -store-api.workspace = true +testcontainers = "0.15.0" +tokio.workspace = true diff --git a/src/common/test-util/src/wal.rs b/src/common/test-util/src/wal.rs index 28c04633b640..c1085a8f1e17 100644 --- a/src/common/test-util/src/wal.rs +++ b/src/common/test-util/src/wal.rs @@ -13,3 +13,8 @@ // limitations under the License. pub mod kafka; + +pub use testcontainers::clients::Cli as DockerCli; + +pub use crate::wal::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT as DEFAULT_EXPOSED_PORT; +pub use crate::wal::kafka::image::Image as KafkaImage; diff --git a/src/common/test-util/src/wal/kafka.rs b/src/common/test-util/src/wal/kafka.rs index bd615f8a4af3..427cbea89a71 100644 --- a/src/common/test-util/src/wal/kafka.rs +++ b/src/common/test-util/src/wal/kafka.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod entry_builder; +pub mod config; +pub mod image; -pub use crate::wal::kafka::entry_builder::EntryBuilder; +pub const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS"; diff --git a/tests-integration/src/wal_util/kafka/config.rs b/src/common/test-util/src/wal/kafka/config.rs similarity index 100% rename from tests-integration/src/wal_util/kafka/config.rs rename to src/common/test-util/src/wal/kafka/config.rs diff --git a/tests-integration/src/wal_util/kafka/image.rs b/src/common/test-util/src/wal/kafka/image.rs similarity index 95% rename from tests-integration/src/wal_util/kafka/image.rs rename to src/common/test-util/src/wal/kafka/image.rs index f509d5f60a00..3428549a4c4b 100644 --- a/tests-integration/src/wal_util/kafka/image.rs +++ b/src/common/test-util/src/wal/kafka/image.rs @@ -14,7 +14,7 @@ use testcontainers::core::{ContainerState, ExecCommand, WaitFor}; -use crate::wal_util::kafka::config::{ +use crate::wal::kafka::config::{ Config, KAFKA_ADVERTISED_LISTENER_PORT, KAFKA_LISTENER_PORT, ZOOKEEPER_PORT, }; @@ -103,15 +103,14 @@ impl testcontainers::Image for Image { #[cfg(test)] mod tests { - use chrono::TimeZone; - use rskafka::chrono::Utc; + use rskafka::chrono::{TimeZone, Utc}; use rskafka::client::partition::UnknownTopicHandling; use rskafka::client::ClientBuilder; use rskafka::record::Record; use testcontainers::clients::Cli as DockerCli; - use crate::wal_util::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT; - use crate::wal_util::kafka::image::Image; + use crate::wal::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT; + use crate::wal::kafka::image::Image; #[tokio::test] async fn test_image() { diff --git a/src/log-store/Cargo.toml b/src/log-store/Cargo.toml index 7941cd11e918..afc3ddbab934 100644 --- a/src/log-store/Cargo.toml +++ b/src/log-store/Cargo.toml @@ -14,6 +14,7 @@ async-stream.workspace = true async-trait.workspace = true byteorder = "1.4" bytes.workspace = true +chrono.workspace = true common-base.workspace = true common-config.workspace = true common-error.workspace = true @@ -21,11 +22,12 @@ common-macro.workspace = true common-meta.workspace = true common-runtime.workspace = true common-telemetry.workspace = true -dashmap.workspace = true +common-test-util.workspace = true futures-util.workspace = true futures.workspace = true protobuf = { version = "2", features = ["bytes"] } raft-engine.workspace = true +rand.workspace = true rskafka.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/log-store/src/kafka.rs b/src/log-store/src/kafka.rs index 01448ce91cdb..9b9493d4fe9e 100644 --- a/src/log-store/src/kafka.rs +++ b/src/log-store/src/kafka.rs @@ -77,7 +77,7 @@ impl Display for EntryImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "Entry (ns: {}, id: {}, data_len: {})", + "Entry [ns: {}, id: {}, data_len: {}]", self.ns, self.id, self.data.len() diff --git a/src/log-store/src/kafka/client_manager.rs b/src/log-store/src/kafka/client_manager.rs index e272840201bb..1e49f977de4d 100644 --- a/src/log-store/src/kafka/client_manager.rs +++ b/src/log-store/src/kafka/client_manager.rs @@ -12,17 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; +use std::collections::HashMap; use std::sync::Arc; use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic}; -use dashmap::mapref::entry::Entry as DashMapEntry; -use dashmap::DashMap; +use common_telemetry::debug; use rskafka::client::partition::{PartitionClient, UnknownTopicHandling}; use rskafka::client::producer::aggregator::RecordAggregator; use rskafka::client::producer::{BatchProducer, BatchProducerBuilder}; use rskafka::client::{Client as RsKafkaClient, ClientBuilder}; use rskafka::BackoffConfig; use snafu::ResultExt; +use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock}; use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result}; @@ -67,7 +69,7 @@ pub(crate) struct ClientManager { client_factory: RsKafkaClient, /// A pool maintaining a collection of clients. /// Key: a topic. Value: the associated client of the topic. - client_pool: DashMap, + client_pool: TokioMutex>, } impl ClientManager { @@ -88,26 +90,28 @@ impl ClientManager { broker_endpoints: config.broker_endpoints.clone(), })?; + debug!("Created a ClientManager"); + Ok(Self { config: config.clone(), client_factory: client, - client_pool: DashMap::new(), + client_pool: TokioMutex::new(HashMap::new()), }) } /// Gets the client associated with the topic. If the client does not exist, a new one will /// be created and returned. pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result { - match self.client_pool.entry(topic.to_string()) { - DashMapEntry::Occupied(entry) => Ok(entry.get().clone()), - DashMapEntry::Vacant(entry) => { - let topic_client = self.try_create_client(topic).await?; - Ok(entry.insert(topic_client).clone()) - } + let mut client_pool = self.client_pool.lock().await; + if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) { + entry.insert(self.try_create_client(topic).await?); } + Ok(client_pool[topic].clone()) } async fn try_create_client(&self, topic: &Topic) -> Result { + debug!("Try to create client for topic {}", topic); + // Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error. // That's because the topic is believed to exist as the metasrv is expected to create required topics upon start. // The reconnecting won't stop until succeed or a different error returns. @@ -121,6 +125,85 @@ impl ClientManager { }) .map(Arc::new)?; + debug!("Created a client for topic {}", topic); + Ok(Client::new(raw_client, &self.config)) } } + +#[cfg(test)] +mod tests { + use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY; + + use super::*; + use crate::test_util::kafka::topic_builder::{Affix, TopicBuilder}; + + /// Checks clients for the given topics are created. + async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) { + let client_pool = client_manager.client_pool.lock().await; + let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic)); + assert_eq!(all_exist, true); + } + + /// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly. + #[tokio::test] + async fn test_sequential() { + let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) + .unwrap() + .split(',') + .map(ToString::to_string) + .collect::>(); + if broker_endpoints.is_empty() { + return; + } + + let client = ClientBuilder::new(broker_endpoints.clone()) + .build() + .await + .unwrap(); + let ctrl_client = client.controller_client().unwrap(); + + let topic_builder = TopicBuilder::default() + .with_prefix(Affix::Fixed("test_sequential".to_string())) + .with_suffix(Affix::TimeNow); + + + let config = KafkaConfig { + broker_endpoints, + ..Default::default() + }; + let manager = ClientManager::try_new(&config).await.unwrap(); + + // Constructs a collection of mock topics. + let num_topics = 256; + let topics = (0..num_topics) + .map(|i| format!("topic_{i}")) + .collect::>(); + + // Gets all clients sequentially. + for topic in topics.iter() { + manager.get_or_insert(topic).await.unwrap(); + } + ensure_topics_exist(&topics, &manager).await; + } + + /// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly. + #[tokio::test] + async fn test_parallel() { + let manager = ClientManager::try_new(&config).await.unwrap(); + + // Constructs a collection of mock topics. + let num_topics = 256; + let topics = (0..num_topics) + .map(|i| format!("topic_{i}")) + .collect::>(); + + // Gets all clients in parallel. + let tasks = topics + .iter() + .map(|topic| manager.get_or_insert(topic)) + .collect::>(); + futures::future::try_join_all(tasks).await.unwrap(); + ensure_topics_exist(&topics, &manager).await; + } +} diff --git a/src/log-store/src/kafka/log_store.rs b/src/log-store/src/kafka/log_store.rs index 73b0fe1de2a9..f0170245e5e2 100644 --- a/src/log-store/src/kafka/log_store.rs +++ b/src/log-store/src/kafka/log_store.rs @@ -85,7 +85,7 @@ impl LogStore for KafkaLogStore { /// Appends a batch of entries and returns a response containing a map where the key is a region id /// while the value is the id of the last successfully written entry of the region. async fn append_batch(&self, entries: Vec) -> Result { - debug!("LogStore handles append_batch with entries {:?}", entries); + println!("LogStore handles append_batch with entries {:?}", entries); if entries.is_empty() { return Ok(AppendBatchResponse::default()); @@ -101,12 +101,10 @@ impl LogStore for KafkaLogStore { } // Builds a record from entries belong to a region and produces them to kafka server. - let region_ids = producers.keys().cloned().collect::>(); - - let tasks = producers - .into_values() - .map(|producer| producer.produce(&self.client_manager)) - .collect::>(); + let (region_ids, tasks): (Vec<_>, Vec<_>) = producers + .into_iter() + .map(|(id, producer)| (id, producer.produce(&self.client_manager))) + .unzip(); // Each produce operation returns a kafka offset of the produced record. // The offsets are then converted to entry ids. let entry_ids = futures::future::try_join_all(tasks) @@ -114,11 +112,20 @@ impl LogStore for KafkaLogStore { .into_iter() .map(TryInto::try_into) .collect::>>()?; - debug!("The entries are appended at offsets {:?}", entry_ids); + println!("The entries are appended at offsets {:?}", entry_ids); - Ok(AppendBatchResponse { - last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(), - }) + let last_entry_ids = region_ids + .into_iter() + .zip(entry_ids) + .collect::>(); + for (region, last_entry_id) in last_entry_ids.iter() { + println!( + "The entries for region {} are appended at offset {}", + region, last_entry_id + ); + } + + Ok(AppendBatchResponse { last_entry_ids }) } /// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids @@ -148,14 +155,19 @@ impl LogStore for KafkaLogStore { .await .context(GetOffsetSnafu { ns: ns.clone() })? - 1; - // Reads entries with offsets in the range [start_offset, end_offset). + // Reads entries with offsets in the range [start_offset, end_offset]. let start_offset = Offset::try_from(entry_id)?.0; + println!( + "Start reading entries in range [{}, {}] for ns {}", + start_offset, end_offset, ns + ); + // Abort if there're no new entries. // FIXME(niebayes): how come this case happens? if start_offset > end_offset { - warn!( - "No new entries for ns {} in range [{}, {})", + println!( + "No new entries for ns {} in range [{}, {}]", ns, start_offset, end_offset ); return Ok(futures_util::stream::empty().boxed()); @@ -166,8 +178,8 @@ impl LogStore for KafkaLogStore { .with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32) .build(); - debug!( - "Built a stream consumer for ns {} to consume entries in range [{}, {})", + println!( + "Built a stream consumer for ns {} to consume entries in range [{}, {}]", ns, start_offset, end_offset ); @@ -181,7 +193,7 @@ impl LogStore for KafkaLogStore { ns: ns_clone.clone(), })?; let record_offset = record.offset; - debug!( + println!( "Read a record at offset {} for ns {}, high watermark: {}", record_offset, ns_clone, high_watermark ); @@ -192,14 +204,19 @@ impl LogStore for KafkaLogStore { if let Some(entry) = entries.first() && entry.ns.region_id == region_id { + println!("{} entries are yielded for ns {}", entries.len(), ns_clone); yield Ok(entries); } else { - yield Ok(vec![]); + println!( + "{} entries are filtered out for ns {}", + entries.len(), + ns_clone + ); } // Terminates the stream if the entry with the end offset was read. if record_offset >= end_offset { - debug!( + println!( "Stream consumer for ns {} terminates at offset {}", ns_clone, record_offset ); diff --git a/src/log-store/src/kafka/record_utils.rs b/src/log-store/src/kafka/record_utils.rs index 3707b873f3e3..2ef5b2db81b2 100644 --- a/src/log-store/src/kafka/record_utils.rs +++ b/src/log-store/src/kafka/record_utils.rs @@ -89,6 +89,8 @@ impl RecordProducer { pub(crate) async fn produce(self, client_manager: &ClientManagerRef) -> Result { ensure!(!self.entries.is_empty(), EmptyEntriesSnafu); + println!("Start producing a record for ns {}", self.ns); + // Produces the record through a client. The client determines when to send the record to kafka server. let client = client_manager .get_or_insert(&self.ns.topic) @@ -100,6 +102,9 @@ impl RecordProducer { } .build() })?; + + println!("Got the client for ns {}", self.ns); + client .producer .produce(encode_to_record(self.ns.clone(), self.entries)?) diff --git a/src/log-store/src/test_util.rs b/src/log-store/src/test_util.rs index 973d6d3f9720..52ffcca39370 100644 --- a/src/log-store/src/test_util.rs +++ b/src/log-store/src/test_util.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod kafka; pub mod log_store_util; diff --git a/tests-integration/src/wal_util.rs b/src/log-store/src/test_util/kafka.rs similarity index 53% rename from tests-integration/src/wal_util.rs rename to src/log-store/src/test_util/kafka.rs index 150cc065a471..b66e692836d7 100644 --- a/tests-integration/src/wal_util.rs +++ b/src/log-store/src/test_util/kafka.rs @@ -12,9 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod kafka; +pub mod entry_builder; +pub mod topic_builder; -pub use testcontainers::clients::Cli as DockerCli; +use common_config::wal::KafkaWalTopic as Topic; -pub use crate::wal_util::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT as DEFAULT_EXPOSED_PORT; -pub use crate::wal_util::kafka::image::Image as KafkaImage; +pub use crate::test_util::kafka::entry_builder::EntryBuilder; +pub use crate::test_util::kafka::topic_builder::TopicBuilder; + +/// Creates `num_topiocs` number of topics with the given TopicBuilder. +/// Requests for creating these topics on the Kafka cluster if the `broker_endpoints` is not empty. +pub fn create_topics(num_topics: usize, builder: TopicBuilder, broker_endpoints: Vec) -> Vec { + +} diff --git a/src/common/test-util/src/wal/kafka/entry_builder.rs b/src/log-store/src/test_util/kafka/entry_builder.rs similarity index 97% rename from src/common/test-util/src/wal/kafka/entry_builder.rs rename to src/log-store/src/test_util/kafka/entry_builder.rs index 09910c6c7077..3e234c78d6e4 100644 --- a/src/common/test-util/src/wal/kafka/entry_builder.rs +++ b/src/log-store/src/test_util/kafka/entry_builder.rs @@ -15,12 +15,13 @@ use std::sync::atomic::{AtomicU64 as AtomicEntryId, Ordering}; use std::sync::Mutex; -use log_store::kafka::{EntryImpl, NamespaceImpl}; use rand::rngs::ThreadRng; use rand::seq::SliceRandom; use rand::{thread_rng, Rng}; use store_api::logstore::EntryId; +use crate::kafka::{EntryImpl, NamespaceImpl}; + const DEFAULT_DATA: &[u8; 10] = b"[greptime]"; /// A builder for building entries for a namespace. @@ -108,3 +109,5 @@ impl EntryBuilder { .collect() } } + +// TODO(niebayes): add tests for EntryBuilder. diff --git a/src/log-store/src/test_util/kafka/topic_builder.rs b/src/log-store/src/test_util/kafka/topic_builder.rs new file mode 100644 index 000000000000..485b846cc6d4 --- /dev/null +++ b/src/log-store/src/test_util/kafka/topic_builder.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use common_config::wal::KafkaWalTopic as Topic; + +/// Things need to bo inserted at the front or the back of the topic. +#[derive(Debug, Default)] +pub enum Affix { + /// Inserts a provided string to each topic. + Fixed(String), + /// Computes the current time for each topic and inserts it into the topic. + TimeNow, + /// Nothing to be inserted. + #[default] + Nothing, +} + +impl ToString for Affix { + fn to_string(&self) -> String { + match self { + Affix::Fixed(s) => s.to_string(), + Affix::TimeNow => chrono::Local::now().to_string(), + Affix::Nothing => String::default(), + } + } +} + +/// A builder for building topics on demand. +pub struct TopicBuilder { + /// A prefix to be inserted at the front of each topic. + prefix: Affix, + /// A suffix to be inserted at the back of each topic. + suffix: Affix, + /// Topics built so far. Used to filter out duplicate topics. + created: HashSet, +} + +impl Default for TopicBuilder { + fn default() -> Self { + Self { + prefix: Affix::Nothing, + suffix: Affix::Nothing, + created: HashSet::with_capacity(256), + } + } +} + +impl TopicBuilder { + /// Overrides the current prefix with the given prefix. + pub fn with_prefix(self, prefix: Affix) -> Self { + Self { prefix, ..self } + } + + /// Overrides the current suffix with the given suffix. + pub fn with_suffix(self, suffix: Affix) -> Self { + Self { suffix, ..self } + } + + /// Builds a topic by inserting a prefix and a suffix into the given topic. + pub fn build(&mut self, topic: &Topic) -> Topic { + const ITERS: usize = 24; + for _ in 0..ITERS { + let topic = format!( + "{}_{}_{}", + self.prefix.to_string(), + topic, + self.suffix.to_string() + ); + if !self.created.contains(&topic) { + self.created.insert(topic.clone()); + return topic; + } + } + unreachable!( + "Building a topic should be completed within iterations {}", + ITERS + ) + } +} + +// TODO(niebayes): add tests for TopicBuilder. diff --git a/tests-integration/src/lib.rs b/tests-integration/src/lib.rs index 8dc4bc5a9829..b0b28ba4651c 100644 --- a/tests-integration/src/lib.rs +++ b/tests-integration/src/lib.rs @@ -20,7 +20,6 @@ mod opentsdb; mod otlp; mod prom_store; pub mod test_util; -pub mod wal_util; mod standalone; #[cfg(test)] diff --git a/tests-integration/src/wal_util/kafka.rs b/tests-integration/src/wal_util/kafka.rs deleted file mode 100644 index b4289768ed83..000000000000 --- a/tests-integration/src/wal_util/kafka.rs +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub mod config; -pub mod image; diff --git a/tests-integration/tests/main.rs b/tests-integration/tests/main.rs index 65dfcc8cf7c4..8b1e064579c8 100644 --- a/tests-integration/tests/main.rs +++ b/tests-integration/tests/main.rs @@ -25,5 +25,3 @@ grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); http_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs); // region_failover_tests!(File, S3, S3WithCache, Oss, Azblob); sql_tests!(File); - -// TODO(niebayes): add integration tests for remote wal. diff --git a/tests-integration/tests/wal.rs b/tests-integration/tests/wal.rs index 30695217087d..94c6c3b12cb3 100644 --- a/tests-integration/tests/wal.rs +++ b/tests-integration/tests/wal.rs @@ -1,248 +1,273 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::sync::Arc; - -use common_config::wal::KafkaConfig as DatanodeKafkaConfig; -use common_config::{KafkaWalOptions, WalOptions}; -use common_meta::kv_backend::memory::MemoryKvBackend; -use common_meta::kv_backend::KvBackendRef; -use common_meta::wal::kafka::{ - KafkaConfig as MetaSrvKafkaConfig, TopicManager as KafkaTopicManager, -}; -use common_meta::wal::{allocate_region_wal_options, WalConfig, WalOptionsAllocator}; -use futures::StreamExt; -use log_store::kafka::log_store::KafkaLogStore; -use log_store::kafka::{EntryImpl, NamespaceImpl}; -use rskafka::client::controller::ControllerClient; -use rskafka::client::ClientBuilder; -use store_api::logstore::entry::Id as EntryId; -use store_api::logstore::LogStore; - -// Notice: the following tests are literally unit tests. They are placed at here since -// it seems too heavy to start a Kafka cluster for each unit test. - -// The key of an env variable that stores a series of Kafka broker endpoints. -const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS"; - -// Tests that the TopicManager allocates topics in a round-robin mannar. -#[tokio::test] -async fn test_kafka_alloc_topics() { - let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) - .unwrap() - .split(',') - .map(ToString::to_string) - .collect::>(); - let config = MetaSrvKafkaConfig { - topic_name_prefix: "__test_kafka_alloc_topics".to_string(), - replication_factor: broker_endpoints.len() as i16, - broker_endpoints, - ..Default::default() - }; - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let manager = KafkaTopicManager::new(config.clone(), kv_backend); - manager.start().await.unwrap(); - - // Topics should be created. - let topics = (0..config.num_topics) - .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) - .collect::>(); - - // Selects exactly the number of `num_topics` topics one by one. - for expected in topics.iter() { - let got = manager.select().unwrap(); - assert_eq!(got, expected); - } - - // Selects exactly the number of `num_topics` topics in a batching manner. - let got = manager - .select_batch(config.num_topics) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - assert_eq!(got, topics); - - // Selects none. - let got = manager.select_batch(config.num_topics).unwrap(); - assert!(got.is_empty()); - - // Selects more than the number of `num_topics` topics. - let got = manager - .select_batch(2 * config.num_topics) - .unwrap() - .into_iter() - .map(ToString::to_string) - .collect::>(); - let expected = vec![topics.clone(); 2] - .into_iter() - .flatten() - .collect::>(); - assert_eq!(got, expected); -} - -// Tests that the wal options allocator could successfully allocate Kafka wal options. -#[tokio::test] -async fn test_kafka_options_allocator() { - let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) - .unwrap() - .split(',') - .map(ToString::to_string) - .collect::>(); - let config = MetaSrvKafkaConfig { - topic_name_prefix: "__test_kafka_options_allocator".to_string(), - replication_factor: broker_endpoints.len() as i16, - broker_endpoints, - ..Default::default() - }; - let wal_config = WalConfig::Kafka(config.clone()); - let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; - let allocator = WalOptionsAllocator::new(wal_config, kv_backend); - allocator.start().await.unwrap(); - - let num_regions = 32; - let regions = (0..num_regions).collect::>(); - let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); - - // Topics should be allocated. - let topics = (0..num_regions) - .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) - .collect::>(); - // Check the allocated wal options contain the expected topics. - let expected = (0..num_regions) - .map(|i| { - let options = WalOptions::Kafka(KafkaWalOptions { - topic: topics[i as usize].clone(), - }); - (i, serde_json::to_string(&options).unwrap()) - }) - .collect::>(); - assert_eq!(got, expected); -} - -fn new_test_entry>(data: D, entry_id: EntryId, ns: NamespaceImpl) -> EntryImpl { - EntryImpl { - data: data.as_ref().to_vec(), - id: entry_id, - ns, - } -} - -async fn create_topic(topic: &str, replication_factor: i16, client: &ControllerClient) { - client - .create_topic(topic, 1, replication_factor, 5_000) - .await - .unwrap(); -} - -async fn check_entries( - ns: &NamespaceImpl, - start_offset: EntryId, - expected: Vec, - logstore: &KafkaLogStore, -) { - let mut stream = logstore.read(ns, start_offset).await.unwrap(); - for entry in expected { - let got = stream.next().await.unwrap().unwrap(); - assert_eq!(entry, got[0]); - } -} - -// Tests that the Kafka log store is able to write and read log entries from Kafka. -#[tokio::test] -async fn test_kafka_log_store() { - let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) - .unwrap() - .split(',') - .map(ToString::to_string) - .collect::>(); - let config = DatanodeKafkaConfig { - broker_endpoints, - ..Default::default() - }; - let logstore = KafkaLogStore::try_new(&config).await.unwrap(); - - let client = ClientBuilder::new(config.broker_endpoints.clone()) - .build() - .await - .unwrap() - .controller_client() - .unwrap(); - - // Appends one entry. - let topic = "__test_kafka_log_store_topic_append"; - create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - let ns = NamespaceImpl { - region_id: 0, - topic: topic.to_string(), - }; - let entry = new_test_entry(b"0", 0, ns.clone()); - let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id; - check_entries(&ns, last_entry_id, vec![entry], &logstore).await; - - // Appends a batch of entries. - // Region 1, 2 are mapped to topic 1, - let topic = "__test_kafka_log_store_topic_append_batch_1"; - create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - let ns_1 = NamespaceImpl { - region_id: 1, - topic: topic.to_string(), - }; - let ns_2 = NamespaceImpl { - region_id: 2, - topic: topic.to_string(), - }; - - // Region 3 is mapped to topic 2. - let topic = "__test_kafka_log_store_topic_append_batch_2"; - create_topic(topic, config.broker_endpoints.len() as i16, &client).await; - let ns_3 = NamespaceImpl { - region_id: 3, - topic: topic.to_string(), - }; - - // Constructs a batch of entries. - let entries_1 = vec![ - new_test_entry(b"1", 0, ns_1.clone()), - new_test_entry(b"1", 1, ns_1.clone()), - ]; - let entries_2 = vec![ - new_test_entry(b"2", 2, ns_2.clone()), - new_test_entry(b"2", 3, ns_2.clone()), - ]; - let entries_3 = vec![ - new_test_entry(b"3", 7, ns_3.clone()), - new_test_entry(b"3", 8, ns_3.clone()), - ]; - let entries = vec![entries_1.clone(), entries_2.clone(), entries_3.clone()] - .into_iter() - .flatten() - .collect::>(); - - let last_entry_ids = logstore - .append_batch(entries.clone()) - .await - .unwrap() - .last_entry_ids; - - // Reads entries for region 1. - check_entries(&ns_1, last_entry_ids[&1], entries_1, &logstore).await; - // Reads entries from region 2. - check_entries(&ns_2, last_entry_ids[&2], entries_2, &logstore).await; - // Reads entries from region 3. - check_entries(&ns_3, last_entry_ids[&3], entries_3, &logstore).await; -} - -// TODO(niebayes): add more integration tests. +// // Copyright 2023 Greptime Team +// // +// // Licensed under the Apache License, Version 2.0 (the "License"); +// // you may not use this file except in compliance with the License. +// // You may obtain a copy of the License at +// // +// // http://www.apache.org/licenses/LICENSE-2.0 +// // +// // Unless required by applicable law or agreed to in writing, software +// // distributed under the License is distributed on an "AS IS" BASIS, +// // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// // See the License for the specific language governing permissions and +// // limitations under the License. + +// use std::collections::HashMap; +// use std::sync::Arc; + +// use common_config::wal::KafkaConfig as DatanodeKafkaConfig; +// use common_config::{KafkaWalOptions, WalOptions}; +// use common_meta::kv_backend::memory::MemoryKvBackend; +// use common_meta::kv_backend::KvBackendRef; +// use common_meta::wal::kafka::{ +// KafkaConfig as MetaSrvKafkaConfig, TopicManager as KafkaTopicManager, +// }; +// use common_meta::wal::{allocate_region_wal_options, WalConfig, WalOptionsAllocator}; +// use futures::StreamExt; +// use log_store::error::Result as LogStoreResult; +// use log_store::kafka::log_store::KafkaLogStore; +// use log_store::kafka::{EntryImpl, NamespaceImpl}; +// use rskafka::client::controller::ControllerClient; +// use rskafka::client::ClientBuilder; +// use store_api::logstore::entry::Id as EntryId; +// use store_api::logstore::LogStore; +// use tests_integration::wal_util::{DockerCli, KafkaImage, DEFAULT_EXPOSED_PORT}; + +// // Notice: the following tests are literally unit tests. They are placed at here since +// // it seems too heavy to start a Kafka cluster for each unit test. + +// // The key of an env variable that stores a series of Kafka broker endpoints. +// const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS"; + +// // Tests that the TopicManager allocates topics in a round-robin mannar. +// #[tokio::test] +// async fn test_kafka_alloc_topics() { +// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) +// .unwrap() +// .split(',') +// .map(ToString::to_string) +// .collect::>(); +// let config = MetaSrvKafkaConfig { +// topic_name_prefix: "__test_kafka_alloc_topics".to_string(), +// replication_factor: broker_endpoints.len() as i16, +// broker_endpoints, +// ..Default::default() +// }; +// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; +// let manager = KafkaTopicManager::new(config.clone(), kv_backend); +// manager.start().await.unwrap(); + +// // Topics should be created. +// let topics = (0..config.num_topics) +// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) +// .collect::>(); + +// // Selects exactly the number of `num_topics` topics one by one. +// for expected in topics.iter() { +// let got = manager.select().unwrap(); +// assert_eq!(got, expected); +// } + +// // Selects exactly the number of `num_topics` topics in a batching manner. +// let got = manager +// .select_batch(config.num_topics) +// .unwrap() +// .into_iter() +// .map(ToString::to_string) +// .collect::>(); +// assert_eq!(got, topics); + +// // Selects none. +// let got = manager.select_batch(config.num_topics).unwrap(); +// assert!(got.is_empty()); + +// // Selects more than the number of `num_topics` topics. +// let got = manager +// .select_batch(2 * config.num_topics) +// .unwrap() +// .into_iter() +// .map(ToString::to_string) +// .collect::>(); +// let expected = vec![topics.clone(); 2] +// .into_iter() +// .flatten() +// .collect::>(); +// assert_eq!(got, expected); +// } + +// // Tests that the wal options allocator could successfully allocate Kafka wal options. +// #[tokio::test] +// async fn test_kafka_options_allocator() { +// let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) +// .unwrap() +// .split(',') +// .map(ToString::to_string) +// .collect::>(); +// let config = MetaSrvKafkaConfig { +// topic_name_prefix: "__test_kafka_options_allocator".to_string(), +// replication_factor: broker_endpoints.len() as i16, +// broker_endpoints, +// ..Default::default() +// }; +// let wal_config = WalConfig::Kafka(config.clone()); +// let kv_backend = Arc::new(MemoryKvBackend::new()) as KvBackendRef; +// let allocator = WalOptionsAllocator::new(wal_config, kv_backend); +// allocator.start().await.unwrap(); + +// let num_regions = 32; +// let regions = (0..num_regions).collect::>(); +// let got = allocate_region_wal_options(regions.clone(), &allocator).unwrap(); + +// // Topics should be allocated. +// let topics = (0..num_regions) +// .map(|topic_id| format!("{}_{topic_id}", config.topic_name_prefix)) +// .collect::>(); +// // Check the allocated wal options contain the expected topics. +// let expected = (0..num_regions) +// .map(|i| { +// let options = WalOptions::Kafka(KafkaWalOptions { +// topic: topics[i as usize].clone(), +// }); +// (i, serde_json::to_string(&options).unwrap()) +// }) +// .collect::>(); +// assert_eq!(got, expected); +// } + +// async fn create_topic(topic: &str, replication_factor: i16, client: &ControllerClient) { +// client +// .create_topic(topic, 1, replication_factor, 500) +// .await +// .unwrap(); +// } + +// async fn check_entries( +// ns: &NamespaceImpl, +// start_offset: EntryId, +// expected: Vec, +// logstore: &KafkaLogStore, +// ) { +// let stream = logstore.read(ns, start_offset).await.unwrap(); +// let got = stream +// .collect::>() +// .await +// .into_iter() +// .flat_map(|x| x.unwrap()) +// .collect::>(); +// assert_eq!(expected, got); +// // for entry in expected { +// // let got = stream.next().await.unwrap().unwrap(); +// // } +// } + +// // Tests that the Kafka log store is able to write and read log entries from Kafka. +// // #[tokio::test] +// // async fn test_kafka_log_store() { +// // println!("Start running test"); + +// // // Starts a Kafka container. +// // let docker = DockerCli::default(); +// // let container = docker.run(KafkaImage::default()); + +// // println!("Started the container"); + +// // let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY) +// // .unwrap_or(format!( +// // "localhost:{}", +// // container.get_host_port_ipv4(DEFAULT_EXPOSED_PORT) +// // )) +// // .split(',') +// // .map(ToString::to_string) +// // .collect::>(); +// // let config = DatanodeKafkaConfig { +// // broker_endpoints, +// // ..Default::default() +// // }; +// // let logstore = KafkaLogStore::try_new(&config).await.unwrap(); + +// // println!("Started the log store"); + +// // let client = ClientBuilder::new(config.broker_endpoints.clone()) +// // .build() +// // .await +// // .unwrap() +// // .controller_client() +// // .unwrap(); + +// // println!("Created a client"); + +// // // Appends one entry. +// // let topic = "__test_kafka_log_store_topic_append"; +// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; + +// // println!("Created a topic"); + +// // let ns = NamespaceImpl { +// // region_id: 0, +// // topic: topic.to_string(), +// // }; +// // let entry = new_test_entry(b"0", 0, ns.clone()); +// // let last_entry_id = logstore.append(entry.clone()).await.unwrap().last_entry_id; + +// // println!("Appended an entry"); + +// // check_entries(&ns, last_entry_id, vec![entry], &logstore).await; + +// // // Appends a batch of entries. +// // // Region 1, 2 are mapped to topic 1, +// // let topic = "__test_kafka_log_store_topic_append_batch_1"; +// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; + +// // println!("Created a topic"); + +// // let ns_1 = NamespaceImpl { +// // region_id: 1, +// // topic: topic.to_string(), +// // }; +// // let ns_2 = NamespaceImpl { +// // region_id: 2, +// // topic: topic.to_string(), +// // }; + +// // // Region 3 is mapped to topic 2. +// // let topic = "__test_kafka_log_store_topic_append_batch_2"; +// // create_topic(topic, config.broker_endpoints.len() as i16, &client).await; + +// // println!("Created a topic"); + +// // let ns_3 = NamespaceImpl { +// // region_id: 3, +// // topic: topic.to_string(), +// // }; + +// // // Constructs a batch of entries. +// // let entries_1 = vec![ +// // new_test_entry(b"1", 0, ns_1.clone()), +// // new_test_entry(b"1", 1, ns_1.clone()), +// // ]; +// // let entries_2 = vec![ +// // new_test_entry(b"2", 2, ns_2.clone()), +// // new_test_entry(b"2", 3, ns_2.clone()), +// // ]; +// // let entries_3 = vec![ +// // new_test_entry(b"3", 7, ns_3.clone()), +// // new_test_entry(b"3", 8, ns_3.clone()), +// // ]; +// // let entries = vec![entries_1.clone(), entries_2.clone(), entries_3.clone()] +// // .into_iter() +// // .flatten() +// // .collect::>(); + +// // let last_entry_ids = logstore +// // .append_batch(entries.clone()) +// // .await +// // .unwrap() +// // .last_entry_ids; + +// // // Reads entries for region 1. +// // check_entries(&ns_1, last_entry_ids[&1], entries_1, &logstore).await; +// // // Reads entries from region 2. +// // check_entries(&ns_2, last_entry_ids[&2], entries_2, &logstore).await; +// // // Reads entries from region 3. +// // check_entries(&ns_3, last_entry_ids[&3], entries_3, &logstore).await; +// // }