Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
niebayes committed Dec 29, 2023
1 parent 1851393 commit ef031ee
Show file tree
Hide file tree
Showing 19 changed files with 540 additions and 315 deletions.
7 changes: 4 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions src/common/test-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ edition.workspace = true
license.workspace = true

[dependencies]
log-store.workspace = true
once_cell.workspace = true
rand.workspace = true
rskafka.workspace = true
tempfile.workspace = true
store-api.workspace = true
testcontainers = "0.15.0"
tokio.workspace = true
5 changes: 5 additions & 0 deletions src/common/test-util/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@
// limitations under the License.

pub mod kafka;

pub use testcontainers::clients::Cli as DockerCli;

pub use crate::wal::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT as DEFAULT_EXPOSED_PORT;
pub use crate::wal::kafka::image::Image as KafkaImage;
5 changes: 3 additions & 2 deletions src/common/test-util/src/wal/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod entry_builder;
pub mod config;
pub mod image;

pub use crate::wal::kafka::entry_builder::EntryBuilder;
pub const BROKER_ENDPOINTS_KEY: &str = "GT_KAFKA_ENDPOINTS";
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use testcontainers::core::{ContainerState, ExecCommand, WaitFor};

use crate::wal_util::kafka::config::{
use crate::wal::kafka::config::{
Config, KAFKA_ADVERTISED_LISTENER_PORT, KAFKA_LISTENER_PORT, ZOOKEEPER_PORT,
};

Expand Down Expand Up @@ -103,15 +103,14 @@ impl testcontainers::Image for Image {

#[cfg(test)]
mod tests {
use chrono::TimeZone;
use rskafka::chrono::Utc;
use rskafka::chrono::{TimeZone, Utc};
use rskafka::client::partition::UnknownTopicHandling;
use rskafka::client::ClientBuilder;
use rskafka::record::Record;
use testcontainers::clients::Cli as DockerCli;

use crate::wal_util::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT;
use crate::wal_util::kafka::image::Image;
use crate::wal::kafka::config::KAFKA_ADVERTISED_LISTENER_PORT;
use crate::wal::kafka::image::Image;

#[tokio::test]
async fn test_image() {
Expand Down
4 changes: 3 additions & 1 deletion src/log-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ async-stream.workspace = true
async-trait.workspace = true
byteorder = "1.4"
bytes.workspace = true
chrono.workspace = true
common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
dashmap.workspace = true
common-test-util.workspace = true
futures-util.workspace = true
futures.workspace = true
protobuf = { version = "2", features = ["bytes"] }
raft-engine.workspace = true
rand.workspace = true
rskafka.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion src/log-store/src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl Display for EntryImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Entry (ns: {}, id: {}, data_len: {})",
"Entry [ns: {}, id: {}, data_len: {}]",
self.ns,
self.id,
self.data.len()
Expand Down
103 changes: 93 additions & 10 deletions src/log-store/src/kafka/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::sync::Arc;

use common_config::wal::{KafkaConfig, KafkaWalTopic as Topic};
use dashmap::mapref::entry::Entry as DashMapEntry;
use dashmap::DashMap;
use common_telemetry::debug;
use rskafka::client::partition::{PartitionClient, UnknownTopicHandling};
use rskafka::client::producer::aggregator::RecordAggregator;
use rskafka::client::producer::{BatchProducer, BatchProducerBuilder};
use rskafka::client::{Client as RsKafkaClient, ClientBuilder};
use rskafka::BackoffConfig;
use snafu::ResultExt;
use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};

use crate::error::{BuildClientSnafu, BuildPartitionClientSnafu, Result};

Expand Down Expand Up @@ -67,7 +69,7 @@ pub(crate) struct ClientManager {
client_factory: RsKafkaClient,
/// A pool maintaining a collection of clients.
/// Key: a topic. Value: the associated client of the topic.
client_pool: DashMap<Topic, Client>,
client_pool: TokioMutex<HashMap<Topic, Client>>,
}

impl ClientManager {
Expand All @@ -88,26 +90,28 @@ impl ClientManager {
broker_endpoints: config.broker_endpoints.clone(),
})?;

debug!("Created a ClientManager");

Ok(Self {
config: config.clone(),
client_factory: client,
client_pool: DashMap::new(),
client_pool: TokioMutex::new(HashMap::new()),
})
}

/// Gets the client associated with the topic. If the client does not exist, a new one will
/// be created and returned.
pub(crate) async fn get_or_insert(&self, topic: &Topic) -> Result<Client> {
match self.client_pool.entry(topic.to_string()) {
DashMapEntry::Occupied(entry) => Ok(entry.get().clone()),
DashMapEntry::Vacant(entry) => {
let topic_client = self.try_create_client(topic).await?;
Ok(entry.insert(topic_client).clone())
}
let mut client_pool = self.client_pool.lock().await;
if let Entry::Vacant(entry) = client_pool.entry(topic.to_string()) {
entry.insert(self.try_create_client(topic).await?);
}
Ok(client_pool[topic].clone())
}

async fn try_create_client(&self, topic: &Topic) -> Result<Client> {
debug!("Try to create client for topic {}", topic);

// Sets to Retry to retry connecting if the kafka cluter replies with an UnknownTopic error.
// That's because the topic is believed to exist as the metasrv is expected to create required topics upon start.
// The reconnecting won't stop until succeed or a different error returns.
Expand All @@ -121,6 +125,85 @@ impl ClientManager {
})
.map(Arc::new)?;

debug!("Created a client for topic {}", topic);

Ok(Client::new(raw_client, &self.config))
}
}

#[cfg(test)]
mod tests {
use common_test_util::wal::kafka::BROKER_ENDPOINTS_KEY;

use super::*;
use crate::test_util::kafka::topic_builder::{Affix, TopicBuilder};

/// Checks clients for the given topics are created.
async fn ensure_topics_exist(topics: &[Topic], client_manager: &ClientManager) {
let client_pool = client_manager.client_pool.lock().await;
let all_exist = topics.iter().all(|topic| client_pool.contains_key(topic));
assert_eq!(all_exist, true);
}

/// Sends `get_or_insert` requests sequentially to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_sequential() {
let broker_endpoints = std::env::var(BROKER_ENDPOINTS_KEY)
.unwrap()
.split(',')
.map(ToString::to_string)
.collect::<Vec<_>>();
if broker_endpoints.is_empty() {
return;
}

let client = ClientBuilder::new(broker_endpoints.clone())
.build()
.await
.unwrap();
let ctrl_client = client.controller_client().unwrap();

let topic_builder = TopicBuilder::default()
.with_prefix(Affix::Fixed("test_sequential".to_string()))
.with_suffix(Affix::TimeNow);


let config = KafkaConfig {
broker_endpoints,
..Default::default()
};
let manager = ClientManager::try_new(&config).await.unwrap();

// Constructs a collection of mock topics.
let num_topics = 256;
let topics = (0..num_topics)
.map(|i| format!("topic_{i}"))
.collect::<Vec<_>>();

// Gets all clients sequentially.
for topic in topics.iter() {
manager.get_or_insert(topic).await.unwrap();
}
ensure_topics_exist(&topics, &manager).await;
}

/// Sends `get_or_insert` requests in parallel to the client manager, and checks if it could handle them correctly.
#[tokio::test]
async fn test_parallel() {
let manager = ClientManager::try_new(&config).await.unwrap();

// Constructs a collection of mock topics.
let num_topics = 256;
let topics = (0..num_topics)
.map(|i| format!("topic_{i}"))
.collect::<Vec<_>>();

// Gets all clients in parallel.
let tasks = topics
.iter()
.map(|topic| manager.get_or_insert(topic))
.collect::<Vec<_>>();
futures::future::try_join_all(tasks).await.unwrap();
ensure_topics_exist(&topics, &manager).await;
}
}
55 changes: 36 additions & 19 deletions src/log-store/src/kafka/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl LogStore for KafkaLogStore {
/// Appends a batch of entries and returns a response containing a map where the key is a region id
/// while the value is the id of the last successfully written entry of the region.
async fn append_batch(&self, entries: Vec<Self::Entry>) -> Result<AppendBatchResponse> {
debug!("LogStore handles append_batch with entries {:?}", entries);
println!("LogStore handles append_batch with entries {:?}", entries);

if entries.is_empty() {
return Ok(AppendBatchResponse::default());
Expand All @@ -101,24 +101,31 @@ impl LogStore for KafkaLogStore {
}

// Builds a record from entries belong to a region and produces them to kafka server.
let region_ids = producers.keys().cloned().collect::<Vec<_>>();

let tasks = producers
.into_values()
.map(|producer| producer.produce(&self.client_manager))
.collect::<Vec<_>>();
let (region_ids, tasks): (Vec<_>, Vec<_>) = producers
.into_iter()
.map(|(id, producer)| (id, producer.produce(&self.client_manager)))
.unzip();
// Each produce operation returns a kafka offset of the produced record.
// The offsets are then converted to entry ids.
let entry_ids = futures::future::try_join_all(tasks)
.await?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>>>()?;
debug!("The entries are appended at offsets {:?}", entry_ids);
println!("The entries are appended at offsets {:?}", entry_ids);

Ok(AppendBatchResponse {
last_entry_ids: region_ids.into_iter().zip(entry_ids).collect(),
})
let last_entry_ids = region_ids
.into_iter()
.zip(entry_ids)
.collect::<HashMap<_, _>>();
for (region, last_entry_id) in last_entry_ids.iter() {
println!(
"The entries for region {} are appended at offset {}",
region, last_entry_id
);
}

Ok(AppendBatchResponse { last_entry_ids })
}

/// Creates a new `EntryStream` to asynchronously generates `Entry` with entry ids
Expand Down Expand Up @@ -148,14 +155,19 @@ impl LogStore for KafkaLogStore {
.await
.context(GetOffsetSnafu { ns: ns.clone() })?
- 1;
// Reads entries with offsets in the range [start_offset, end_offset).
// Reads entries with offsets in the range [start_offset, end_offset].
let start_offset = Offset::try_from(entry_id)?.0;

println!(
"Start reading entries in range [{}, {}] for ns {}",
start_offset, end_offset, ns
);

// Abort if there're no new entries.
// FIXME(niebayes): how come this case happens?
if start_offset > end_offset {
warn!(
"No new entries for ns {} in range [{}, {})",
println!(
"No new entries for ns {} in range [{}, {}]",
ns, start_offset, end_offset
);
return Ok(futures_util::stream::empty().boxed());
Expand All @@ -166,8 +178,8 @@ impl LogStore for KafkaLogStore {
.with_max_wait_ms(self.config.produce_record_timeout.as_millis() as i32)
.build();

debug!(
"Built a stream consumer for ns {} to consume entries in range [{}, {})",
println!(
"Built a stream consumer for ns {} to consume entries in range [{}, {}]",
ns, start_offset, end_offset
);

Expand All @@ -181,7 +193,7 @@ impl LogStore for KafkaLogStore {
ns: ns_clone.clone(),
})?;
let record_offset = record.offset;
debug!(
println!(
"Read a record at offset {} for ns {}, high watermark: {}",
record_offset, ns_clone, high_watermark
);
Expand All @@ -192,14 +204,19 @@ impl LogStore for KafkaLogStore {
if let Some(entry) = entries.first()
&& entry.ns.region_id == region_id
{
println!("{} entries are yielded for ns {}", entries.len(), ns_clone);
yield Ok(entries);
} else {
yield Ok(vec![]);
println!(
"{} entries are filtered out for ns {}",
entries.len(),
ns_clone
);
}

// Terminates the stream if the entry with the end offset was read.
if record_offset >= end_offset {
debug!(
println!(
"Stream consumer for ns {} terminates at offset {}",
ns_clone, record_offset
);
Expand Down
Loading

0 comments on commit ef031ee

Please sign in to comment.