Skip to content

Commit

Permalink
feat(mito): add skip_wal_replay option to OpenRegionRequest (#2955)
Browse files Browse the repository at this point in the history
* feat(mito): add skip_replay_wal option to OpenRegionRequest

* test: add tests for skip replay wal

* chore: rename `skip_replay_wal` to `skip_wal_replay`
  • Loading branch information
WenyXu authored Dec 20, 2023
1 parent 5f8c175 commit 97c3755
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl DatanodeBuilder {
engine: engine.clone(),
region_dir,
options,
skip_wal_replay: false,
}),
)
.await?;
Expand Down
1 change: 1 addition & 0 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl RegionHeartbeatResponseHandler {
engine: region_ident.engine,
region_dir: region_dir(&region_storage_path, region_id),
options: region_options,
skip_wal_replay: false,
});
let result = region_server.handle_request(region_id, request).await;

Expand Down
2 changes: 2 additions & 0 deletions src/file-engine/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
};

let region = FileRegion::open(region_id, request, &object_store)
Expand Down Expand Up @@ -211,6 +212,7 @@ mod tests {
engine: "file".to_string(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
};
let err = FileRegion::open(region_id, request, &object_store)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ async fn test_alter_region() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -201,6 +202,7 @@ async fn test_put_after_alter() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/engine/basic_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ async fn test_region_replay() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
98 changes: 96 additions & 2 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ use std::time::Duration;
use api::v1::Rows;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_recordbatch::RecordBatches;
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::test_util::{
build_rows, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
build_rows, flush_region, put_rows, reopen_region, rows_schema, CreateRequestBuilder, TestEnv,
};

#[tokio::test]
Expand All @@ -42,6 +43,7 @@ async fn test_engine_open_empty() {
engine: String::new(),
region_dir: "empty".to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -73,6 +75,7 @@ async fn test_engine_open_existing() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -161,6 +164,7 @@ async fn test_engine_region_open_with_options() {
engine: String::new(),
region_dir,
options: HashMap::from([("ttl".to_string(), "4d".to_string())]),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -205,6 +209,7 @@ async fn test_engine_region_open_with_custom_store() {
engine: String::new(),
region_dir,
options: HashMap::from([("storage".to_string(), "Gcs".to_string())]),
skip_wal_replay: false,
}),
)
.await
Expand All @@ -225,3 +230,92 @@ async fn test_engine_region_open_with_custom_store() {
.await
.unwrap());
}

#[tokio::test]
async fn test_open_region_skip_wal_replay() {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id, None).await;

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 5),
};
put_rows(&engine, region_id, rows).await;

let engine = env.reopen_engine(engine, MitoConfig::default()).await;
// Skip the WAL replay .
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir: region_dir.to_string(),
options: Default::default(),
skip_wal_replay: true,
}),
)
.await
.unwrap();

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());

// Replay the WAL.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
// Open the region again with options.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: Default::default(),
skip_wal_replay: false,
}),
)
.await
.unwrap();

let request = ScanRequest::default();
let stream = engine.handle_query(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
+-------+---------+---------------------+";
assert_eq!(expected, batches.pretty_print().unwrap());
}
1 change: 1 addition & 0 deletions src/mito2/src/engine/parallel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn scan_in_parallel(
engine: String::new(),
region_dir: region_dir.to_string(),
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine/truncate_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ async fn test_engine_truncate_reopen() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down Expand Up @@ -353,6 +354,7 @@ async fn test_engine_truncate_during_flush() {
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
14 changes: 13 additions & 1 deletion src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub(crate) struct RegionOpener {
scheduler: SchedulerRef,
options: HashMap<String, String>,
cache_manager: Option<CacheManagerRef>,
skip_wal_replay: bool,
}

impl RegionOpener {
Expand All @@ -74,6 +75,7 @@ impl RegionOpener {
scheduler,
options: HashMap::new(),
cache_manager: None,
skip_wal_replay: false,
}
}

Expand All @@ -95,6 +97,12 @@ impl RegionOpener {
self
}

/// Sets the `skip_wal_replay`.
pub(crate) fn skip_wal_replay(mut self, skip: bool) -> Self {
self.skip_wal_replay = skip;
self
}

/// Writes region manifest and creates a new region if it does not exist.
/// Opens the region if it already exists.
///
Expand Down Expand Up @@ -235,7 +243,11 @@ impl RegionOpener {
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
if !self.skip_wal_replay {
replay_memtable(wal, region_id, flushed_entry_id, &version_control).await?;
} else {
info!("Skip the WAL replay for region: {}", region_id);
}

let region = MitoRegion {
region_id: self.region_id,
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use api::helper::ColumnDataTypeWrapper;
use api::v1::value::ValueData;
use api::v1::{OpType, Row, Rows, SemanticType};
use common_datasource::compression::CompressionType;
use common_query::Output;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use datatypes::arrow::array::{TimestampMillisecondArray, UInt64Array, UInt8Array};
use datatypes::prelude::ConcreteDataType;
Expand Down Expand Up @@ -697,6 +696,7 @@ pub async fn reopen_region(
engine: String::new(),
region_dir,
options: HashMap::default(),
skip_wal_replay: false,
}),
)
.await
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.scheduler.clone(),
)
.options(request.options)
.skip_wal_replay(request.skip_wal_replay)
.cache(Some(self.cache_manager.clone()))
.open(&self.config, &self.wal)
.await?;
Expand Down
3 changes: 3 additions & 0 deletions src/store-api/src/region_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl RegionRequest {
engine: open.engine,
region_dir,
options: open.options,
skip_wal_replay: false,
}),
)])
}
Expand Down Expand Up @@ -197,6 +198,8 @@ pub struct RegionOpenRequest {
pub region_dir: String,
/// Options of the opened region.
pub options: HashMap<String, String>,
/// To skip replaying the WAL.
pub skip_wal_replay: bool,
}

/// Close region request.
Expand Down

0 comments on commit 97c3755

Please sign in to comment.