Skip to content

Commit

Permalink
chore: apply suggestions from CR
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Jun 3, 2024
1 parent 39777fc commit 89f8c5b
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 35 deletions.
21 changes: 11 additions & 10 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ async fn test_write_to_region() {
#[apply(multiple_log_store_factories)]

async fn test_region_replay(factory: Option<LogStoreFactory>) {
use common_wal::options::{KafkaWalOptions, WalOptions};

common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
Expand All @@ -99,14 +101,10 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
let topic_and_options = prepare_test_for_kafka_log_store(&factory).await;
if let Some((_, wal_options)) = &topic_and_options {
request.options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
};
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();

let region_dir = request.region_dir.clone();

Expand All @@ -131,10 +129,13 @@ async fn test_region_replay(factory: Option<LogStoreFactory>) {
let engine = env.reopen_engine(engine, MitoConfig::default()).await;

let mut options = HashMap::new();
if let Some((_, wal_options)) = &topic_and_options {
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};

Expand Down
21 changes: 11 additions & 10 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ async fn test_flush_empty() {
async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
use std::collections::HashMap;

use common_wal::options::{KafkaWalOptions, WalOptions};

common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
Expand All @@ -249,14 +251,10 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let mut request = CreateRequestBuilder::new().build();
let topic_and_options = prepare_test_for_kafka_log_store(&factory).await;
if let Some((_, wal_options)) = &topic_and_options {
request.options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
};
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
Expand All @@ -283,10 +281,13 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
check_region();

let mut options = HashMap::new();
if let Some((_, wal_options)) = &topic_and_options {
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};
reopen_region(&engine, region_id, region_dir, true, options).await;
Expand Down
33 changes: 23 additions & 10 deletions src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use common_base::readable_size::ReadableSize;
use common_datasource::compression::CompressionType;
use common_telemetry::warn;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use common_wal::options::{KafkaWalOptions, WalOptions};
use common_wal::options::{KafkaWalOptions, WalOptions, WAL_OPTIONS_KEY};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
Expand Down Expand Up @@ -119,18 +119,13 @@ impl RaftEngineLogStoreFactory {
}
}

pub(crate) async fn prepare_test_for_kafka_log_store(
factory: &LogStoreFactory,
) -> Option<(String, WalOptions)> {
pub(crate) async fn prepare_test_for_kafka_log_store(factory: &LogStoreFactory) -> Option<String> {
if let LogStoreFactory::Kafka(factory) = factory {
let topic = uuid::Uuid::new_v4().to_string();
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
});
let client = factory.client().await;
append_noop_record(&client, &topic).await;

Some((topic, wal_options))
Some(topic)
} else {
None
}
Expand Down Expand Up @@ -608,6 +603,8 @@ pub struct CreateRequestBuilder {
all_not_null: bool,
engine: String,
ts_type: ConcreteDataType,
/// kafka topic name
kafka_topic: Option<String>,
}

impl Default for CreateRequestBuilder {
Expand All @@ -621,6 +618,7 @@ impl Default for CreateRequestBuilder {
all_not_null: false,
engine: MITO_ENGINE_NAME.to_string(),
ts_type: ConcreteDataType::timestamp_millisecond_datatype(),
kafka_topic: None,
}
}
}
Expand Down Expand Up @@ -673,6 +671,12 @@ impl CreateRequestBuilder {
self
}

#[must_use]
pub fn kafka_topic(mut self, topic: Option<String>) -> Self {
self.kafka_topic = topic;
self
}

pub fn build(&self) -> RegionCreateRequest {
let mut column_id = 0;
let mut column_metadatas = Vec::with_capacity(self.tag_num + self.field_num + 1);
Expand Down Expand Up @@ -713,12 +717,21 @@ impl CreateRequestBuilder {
semantic_type: SemanticType::Timestamp,
column_id,
});

let mut options = self.options.clone();
if let Some(topic) = &self.kafka_topic {
let wal_options = WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
});
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&wal_options).unwrap(),
);
}
RegionCreateRequest {
engine: self.engine.to_string(),
column_metadatas,
primary_key: self.primary_key.clone().unwrap_or(primary_key),
options: self.options.clone(),
options,
region_dir: self.region_dir.clone(),
}
}
Expand Down
5 changes: 0 additions & 5 deletions tests-integration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,3 @@ pub mod test_util;
pub mod standalone;
#[cfg(test)]
mod tests;

#[cfg(test)]
// allowed because https://docs.rs/rstest_reuse/0.5.0/rstest_reuse/#use-rstest_reuse-at-the-top-of-your-crate
#[allow(clippy::single_component_path_imports)]
use rstest_reuse;

0 comments on commit 89f8c5b

Please sign in to comment.