Skip to content

Commit

Permalink
test: add tests for manifest change notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Sep 4, 2024
1 parent e5cd860 commit 09e1b1e
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 32 deletions.
59 changes: 57 additions & 2 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
Expand All @@ -34,9 +40,12 @@ use crate::config::MitoConfig;
use crate::engine::listener::AlterFlushListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
TestEnv,
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
prepare_test_for_kafka_log_store, put_rows, rows_schema, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
Expand Down Expand Up @@ -68,6 +77,52 @@ fn add_tag1() -> RegionAlterRequest {
}
}

#[apply(single_kafka_log_store_factory)]
async fn test_alter_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env =
TestEnv::with_prefix("alter-notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
let request = add_tag1();
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();

let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();

// Flush sst notification
assert_eq!(entries[1].1.mutations[0].op_type(), api::v1::OpType::Notify);
// Modify table metadata notification
assert_eq!(entries[2].1.mutations[0].op_type(), api::v1::OpType::Notify);
}

#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();
Expand Down
72 changes: 71 additions & 1 deletion src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@ use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use datatypes::prelude::ScalarVector;
use datatypes::vectors::TimestampMillisecondVector;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
Expand All @@ -30,8 +36,12 @@ use crate::config::MitoConfig;
use crate::engine::listener::CompactionListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
build_rows_for_key, column_metadata_to_column_schema, kafka_log_store_factory,
prepare_test_for_kafka_log_store, put_rows, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

async fn put_and_flush(
engine: &MitoEngine,
Expand Down Expand Up @@ -105,6 +115,66 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
res
}

#[apply(single_kafka_log_store_factory)]
async fn test_compaction_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env =
TestEnv::with_prefix("compaction_notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_runs", "1")
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
.build();

let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 5 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;

let result = engine
.handle_request(
region_id,
RegionRequest::Compact(RegionCompactRequest::default()),
)
.await
.unwrap();
assert_eq!(result.affected_rows, 0);

let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();

let notifications = entries
.into_iter()
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
.count();
assert_eq!(notifications, 6);
}

#[tokio::test]
async fn test_compaction_region() {
common_telemetry::init_default_ut_logging();
Expand Down
70 changes: 69 additions & 1 deletion src/mito2/src/engine/edit_region_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@ use std::sync::{Arc, Mutex};
use std::time::Duration;

use common_time::util::current_time_millis;
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use object_store::ObjectStore;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::RegionId;
Expand All @@ -29,7 +35,69 @@ use crate::engine::MitoEngine;
use crate::manifest::action::RegionEdit;
use crate::region::MitoRegionRef;
use crate::sst::file::{FileId, FileMeta};
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{
kafka_log_store_factory, prepare_test_for_kafka_log_store, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, TestEnv,
};
use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

#[apply(single_kafka_log_store_factory)]
async fn test_edit_region_notification(factory: Option<LogStoreFactory>) {
common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env = TestEnv::with_prefix("edit-notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let topic = prepare_test_for_kafka_log_store(&factory).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let region = engine.get_region(region_id).unwrap();
let file_id = FileId::random();
// Simulating the ingestion of an SST file.
env.get_object_store()
.unwrap()
.write(
&format!("{}/{}.parquet", region.region_dir(), file_id),
b"x".as_slice(),
)
.await
.unwrap();
let edit = RegionEdit {
files_to_add: vec![FileMeta {
region_id: region.region_id,
file_id,
level: 0,
..Default::default()
}],
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
};
engine.edit_region(region.region_id, edit).await.unwrap();
let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();
let notifications = entries
.into_iter()
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
.count();
assert_eq!(notifications, 1);
}

#[tokio::test]
async fn test_edit_region_schedule_compaction() {
Expand Down
56 changes: 53 additions & 3 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use common_time::util::current_time_millis;
use common_wal::options::WAL_OPTIONS_KEY;
use rstest::rstest;
use rstest_reuse::{self, apply};
use store_api::logstore::LogStore;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
Expand All @@ -33,8 +34,8 @@ use crate::engine::listener::{FlushListener, StallListener};
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows,
raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder,
LogStoreFactory, MockWriteBufferManager, TestEnv,
raft_engine_log_store_factory, reopen_region, rows_schema, single_kafka_log_store_factory,
CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv,
};
use crate::time_provider::TimeProvider;
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
Expand Down Expand Up @@ -247,7 +248,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
return;
};

let mut env = TestEnv::new().with_log_store_factory(factory.clone());
let mut env = TestEnv::with_prefix("flush-reopen").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
Expand Down Expand Up @@ -305,6 +306,55 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
assert_eq!(5, version_data.committed_sequence);
}

#[apply(single_kafka_log_store_factory)]
async fn test_flush_notification(factory: Option<LogStoreFactory>) {
use futures::TryStreamExt;
use log_store::kafka::log_store::KafkaLogStore;
use store_api::logstore::provider::Provider;

use crate::wal::entry_reader::decode_stream;
use crate::wal::raw_entry_reader::flatten_stream;

common_telemetry::init_default_ut_logging();
let Some(factory) = factory else {
return;
};

let mut env =
TestEnv::with_prefix("flush-notification").with_log_store_factory(factory.clone());
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let topic = prepare_test_for_kafka_log_store(&factory).await;
let request = CreateRequestBuilder::new()
.kafka_topic(topic.clone())
.build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id, None).await;

let topic = topic.unwrap();
let log_store = env.log_store().unwrap().into_kafka_log_store();
let provider = Provider::kafka_provider(topic);
let stream = log_store.read(&provider, 0, None).await.unwrap();
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
.try_collect::<Vec<_>>()
.await
.unwrap();
let notifications = entries
.into_iter()
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
.count();
assert_eq!(notifications, 1);
}

#[derive(Debug)]
pub(crate) struct MockTimeProvider {
now: AtomicI64,
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/manifest_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ManifestNotifier {
pub(crate) fn push_notification(&mut self, version: ManifestVersion) {
self.wal_entry.mutations.push(Mutation {
op_type: OpType::Notify.into(),
sequence: self.next_entry_id,
sequence: self.next_sequence,
rows: None,
manifest_notification: Some(api::v1::ManifestNotification { version }),
});
Expand Down Expand Up @@ -99,6 +99,6 @@ impl ManifestNotifier {

pub(crate) fn finish(&mut self) {
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_sequence - 1);
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}
}
Loading

0 comments on commit 09e1b1e

Please sign in to comment.