Skip to content

Commit

Permalink
test: run test_flush_reopen_region and test_region_replay with `K…
Browse files Browse the repository at this point in the history
…afkaLogStore` (#4083)

* feat: add `LogStoreFactory` to `TestEnv`

* feat: add `multiple_log_store_factories` template

* test: run `test_flush_reopen_region` and `test_region_replay` with `KafkaLogStore`

* chore: move deps to workspace

* chore: apply suggestions from CR
  • Loading branch information
WenyXu authored Jun 4, 2024
1 parent 51e2b6e commit b3a4362
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 116 deletions.
31 changes: 22 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ reqwest = { version = "0.12", default-features = false, features = [
"multipart",
] }
rskafka = "0.5"
rstest = "0.21"
rstest_reuse = "0.7"
rust_decimal = "1.33"
schemars = "0.8"
serde = { version = "1.0", features = ["derive"] }
Expand Down
14 changes: 14 additions & 0 deletions src/log-store/src/test_util/log_store_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
// limitations under the License.

use std::path::Path;
use std::time::Duration;

use common_base::readable_size::ReadableSize;
use common_wal::config::kafka::DatanodeKafkaConfig;
use common_wal::config::raft_engine::RaftEngineConfig;

use crate::kafka::log_store::KafkaLogStore;
use crate::raft_engine::log_store::RaftEngineLogStore;

/// Create a write log for the provided path, used for test.
Expand All @@ -28,3 +31,14 @@ pub async fn create_tmp_local_file_log_store<P: AsRef<Path>>(path: P) -> RaftEng
};
RaftEngineLogStore::try_new(path, cfg).await.unwrap()
}

/// Create a [KafkaLogStore].
pub async fn create_kafka_log_store(broker_endpoints: Vec<String>) -> KafkaLogStore {
KafkaLogStore::try_new(&DatanodeKafkaConfig {
broker_endpoints,
linger: Duration::from_millis(1),
..Default::default()
})
.await
.unwrap()
}
9 changes: 8 additions & 1 deletion src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ license.workspace = true

[features]
default = []
test = ["common-test-util", "log-store"]
test = ["common-test-util", "log-store", "rstest", "rstest_reuse", "rskafka"]

[lints]
workspace = true
Expand Down Expand Up @@ -37,6 +37,7 @@ datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datatypes.workspace = true
dotenv.workspace = true
futures.workspace = true
humantime-serde.workspace = true
index.workspace = true
Expand All @@ -54,6 +55,9 @@ prost.workspace = true
puffin.workspace = true
rand.workspace = true
regex = "1.5"
rskafka = { workspace = true, optional = true }
rstest = { workspace = true, optional = true }
rstest_reuse = { workspace = true, optional = true }
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
Expand All @@ -73,6 +77,9 @@ common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
object-store = { workspace = true, features = ["services-memory"] }
rskafka.workspace = true
rstest.workspace = true
rstest_reuse.workspace = true
toml.workspace = true

[[bench]]
Expand Down
38 changes: 32 additions & 6 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@ use common_base::readable_size::ReadableSize;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use common_wal::options::WAL_OPTIONS_KEY;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::metadata::ColumnMetadata;
use store_api::region_request::{RegionCreateRequest, RegionOpenRequest, RegionPutRequest};
use store_api::storage::RegionId;
Expand All @@ -32,7 +35,9 @@ use super::*;
use crate::region::version::VersionControlData;
use crate::test_util::{
build_delete_rows_for_key, build_rows, build_rows_for_key, delete_rows, delete_rows_schema,
flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
flush_region, kafka_log_store_factory, multiple_log_store_factories,
prepare_test_for_kafka_log_store, put_rows, raft_engine_log_store_factory, reopen_region,
rows_schema, CreateRequestBuilder, LogStoreFactory, TestEnv,
};

#[tokio::test]
Expand Down Expand Up @@ -83,14 +88,24 @@ async fn test_write_to_region() {
put_rows(&engine, region_id, rows).await;
}

#[tokio::test]
async fn test_region_replay() {
#[apply(multiple_log_store_factories)]

async fn test_region_replay(factory: Option<LogStoreFactory>) {
use common_wal::options::{KafkaWalOptions, WalOptions};

common_telemetry::init_default_ut_logging();
let mut env = TestEnv::with_prefix("region-replay");
let Some(factory) = factory else {
return;
};
let mut env = TestEnv::with_prefix("region-replay").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();

let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
Expand All @@ -113,13 +128,24 @@ async fn test_region_replay() {

let engine = env.reopen_engine(engine, MitoConfig::default()).await;

let mut options = HashMap::new();
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};

let result = engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
options,
skip_wal_replay: false,
}),
)
Expand Down
41 changes: 34 additions & 7 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@ use std::time::Duration;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use common_time::util::current_time_millis;
use common_wal::options::WAL_OPTIONS_KEY;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::listener::{FlushListener, StallListener};
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows,
raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder,
LogStoreFactory, MockWriteBufferManager, TestEnv,
};
use crate::time_provider::TimeProvider;
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
Expand Down Expand Up @@ -231,13 +236,25 @@ async fn test_flush_empty() {
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_flush_reopen_region() {
let mut env = TestEnv::new();
#[apply(multiple_log_store_factories)]
async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
use std::collections::HashMap;

use common_wal::options::{KafkaWalOptions, WalOptions};

common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
Expand All @@ -263,7 +280,17 @@ async fn test_flush_reopen_region() {
};
check_region();

reopen_region(&engine, region_id, region_dir, true, Default::default()).await;
let mut options = HashMap::new();
if let Some(topic) = &topic {
options.insert(
WAL_OPTIONS_KEY.to_string(),
serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
topic: topic.to_string(),
}))
.unwrap(),
);
};
reopen_region(&engine, region_id, region_dir, true, options).await;
check_region();

// Puts again.
Expand Down
Loading

0 comments on commit b3a4362

Please sign in to comment.