Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(mito): Stores and recovers flushed sequence #2355

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::listener::{FlushListener, StallListener};
use crate::test_util::{
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
MockWriteBufferManager, TestEnv,
build_rows, build_rows_for_key, flush_region, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, MockWriteBufferManager, TestEnv,
};

#[tokio::test]
Expand Down Expand Up @@ -221,3 +221,38 @@ async fn test_flush_empty() {
++";
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_flush_reopen_region() {
evenyag marked this conversation as resolved.
Show resolved Hide resolved
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let region_dir = request.region_dir.clone();

let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("a", 0, 3, 0),
};
put_rows(&engine, region_id, rows).await;

flush_region(&engine, region_id).await;
let check_region = || {
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();
assert_eq!(1, version_data.last_entry_id);
assert_eq!(3, version_data.committed_sequence);
assert_eq!(1, version_data.version.flushed_entry_id);
};
check_region();

reopen_region(&engine, region_id, region_dir).await;
check_region();
}
23 changes: 3 additions & 20 deletions src/mito2/src/engine/open_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use std::collections::HashMap;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCloseRequest, RegionOpenRequest, RegionRequest};
use store_api::region_request::{RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::test_util::{CreateRequestBuilder, TestEnv};
use crate::test_util::{reopen_region, CreateRequestBuilder, TestEnv};

#[tokio::test]
async fn test_engine_open_empty() {
Expand Down Expand Up @@ -84,23 +84,6 @@ async fn test_engine_reopen_region() {
.await
.unwrap();

// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
reopen_region(&engine, region_id, region_dir).await;
assert!(engine.is_region_exists(region_id));
}
7 changes: 5 additions & 2 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,11 @@ impl RegionFlushTask {
}

/// Converts the flush task into a background job.
///
/// We must call this in the region worker.
fn into_flush_job(mut self, region: &MitoRegionRef) -> Job {
// Get a version of this region before creating a job so we
// always have a consistent memtable list.
// Get a version of this region before creating a job to get current
// wal entry id, sequence and immutable memtables.
let version_data = region.version_control.current();

Box::pin(async move {
Expand All @@ -232,6 +234,7 @@ impl RegionFlushTask {
file_metas,
// The last entry has been flushed.
flushed_entry_id: version_data.last_entry_id,
flushed_sequence: version_data.committed_sequence,
memtables_to_remove,
senders: std::mem::take(&mut self.senders),
file_purger: self.file_purger.clone(),
Expand Down
12 changes: 11 additions & 1 deletion src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::manifest::ManifestVersion;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use store_api::storage::{RegionId, SequenceNumber};

use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::sst::file::{FileId, FileMeta};
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct RegionEdit {
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
pub flushed_entry_id: Option<EntryId>,
pub flushed_sequence: Option<SequenceNumber>,
}

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
Expand All @@ -65,6 +66,8 @@ pub struct RegionManifest {
pub files: HashMap<FileId, FileMeta>,
/// Last WAL entry id of flushed data.
pub flushed_entry_id: EntryId,
/// Last sequence of flushed data.
pub flushed_sequence: SequenceNumber,
/// Current manifest version.
pub manifest_version: ManifestVersion,
}
Expand All @@ -74,6 +77,7 @@ pub struct RegionManifestBuilder {
metadata: Option<RegionMetadataRef>,
files: HashMap<FileId, FileMeta>,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
manifest_version: ManifestVersion,
}

Expand All @@ -86,6 +90,7 @@ impl RegionManifestBuilder {
files: s.files,
flushed_entry_id: s.flushed_entry_id,
manifest_version: s.manifest_version,
flushed_sequence: s.flushed_sequence,
}
} else {
Default::default()
Expand All @@ -108,6 +113,9 @@ impl RegionManifestBuilder {
if let Some(flushed_entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
}
if let Some(flushed_sequence) = edit.flushed_sequence {
self.flushed_sequence = self.flushed_sequence.max(flushed_sequence);
}
}

/// Check if the builder keeps a [RegionMetadata](crate::metadata::RegionMetadata).
Expand All @@ -121,6 +129,7 @@ impl RegionManifestBuilder {
metadata,
files: self.files,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
manifest_version: self.manifest_version,
})
}
Expand Down Expand Up @@ -216,6 +225,7 @@ mod tests {

let region_edit = r#"{
"flushed_entry_id":10,
"flushed_sequence":10,
"compaction_time_window":null,
"files_to_add":[
{"region_id":4402341478400,"file_id":"4b220a70-2b03-4641-9687-b65d94641208","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1,"file_size":100}
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ fn nop_action() -> RegionMetaActionList {
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
})])
}

Expand Down Expand Up @@ -149,7 +150,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":769,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":790,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);

// reopen the manager
Expand All @@ -176,6 +177,7 @@ async fn checkpoint_with_different_compression_types() {
files_to_remove: vec![],
compaction_time_window: None,
flushed_entry_id: None,
flushed_sequence: None,
})]);
actions.push(action);
}
Expand Down
5 changes: 4 additions & 1 deletion src/mito2/src/region/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ impl RegionOpener {
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
Expand All @@ -177,7 +178,9 @@ async fn replay_memtable<S: LogStore>(
version_control: &VersionControlRef,
) -> Result<()> {
let mut rows_replayed = 0;
let mut last_entry_id = EntryId::MIN;
// Last entry id should start from flushed entry id since there might be no
// data in the WAL.
let mut last_entry_id = flushed_entry_id;
let mut region_write_ctx = RegionWriteCtx::new(region_id, version_control);
let mut wal_stream = wal.scan(region_id, flushed_entry_id)?;
while let Some(res) = wal_stream.next().await {
Expand Down
30 changes: 26 additions & 4 deletions src/mito2/src/region/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ pub(crate) struct VersionControl {
impl VersionControl {
/// Returns a new [VersionControl] with specific `version`.
pub(crate) fn new(version: Version) -> VersionControl {
// Initialize sequence and entry id from flushed sequence and entry id.
let (flushed_sequence, flushed_entry_id) =
(version.flushed_sequence, version.flushed_entry_id);
VersionControl {
data: RwLock::new(VersionControlData {
version: Arc::new(version),
committed_sequence: 0,
last_entry_id: 0,
committed_sequence: flushed_sequence,
last_entry_id: flushed_entry_id,
is_dropped: false,
}),
}
Expand Down Expand Up @@ -145,8 +148,12 @@ pub(crate) struct VersionControlData {
/// Latest version.
pub(crate) version: VersionRef,
/// Sequence number of last committed data.
///
/// Starts from 1 (zero means no data).
pub(crate) committed_sequence: SequenceNumber,
/// Last WAL entry Id.
///
/// Starts from 1 (zero means no data).
pub(crate) last_entry_id: EntryId,
/// Marker of whether this region is dropped/dropping
pub(crate) is_dropped: bool,
Expand All @@ -168,6 +175,8 @@ pub(crate) struct Version {
pub(crate) ssts: SstVersionRef,
/// Inclusive max WAL entry id of flushed data.
pub(crate) flushed_entry_id: EntryId,
/// Inclusive max sequence of flushed data.
pub(crate) flushed_sequence: SequenceNumber,
// TODO(yingwen): RegionOptions.
}

Expand All @@ -179,6 +188,7 @@ pub(crate) struct VersionBuilder {
memtables: MemtableVersionRef,
ssts: SstVersionRef,
flushed_entry_id: EntryId,
flushed_sequence: SequenceNumber,
}

impl VersionBuilder {
Expand All @@ -189,6 +199,7 @@ impl VersionBuilder {
memtables: Arc::new(MemtableVersion::new(mutable)),
ssts: Arc::new(SstVersion::new()),
flushed_entry_id: 0,
flushed_sequence: 0,
}
}

Expand All @@ -199,6 +210,7 @@ impl VersionBuilder {
memtables: version.memtables.clone(),
ssts: version.ssts.clone(),
flushed_entry_id: version.flushed_entry_id,
flushed_sequence: version.flushed_sequence,
}
}

Expand All @@ -220,10 +232,19 @@ impl VersionBuilder {
self
}

/// Sets flushed sequence.
pub(crate) fn flushed_sequence(mut self, sequence: SequenceNumber) -> Self {
self.flushed_sequence = sequence;
self
}

/// Apply edit to the builder.
pub(crate) fn apply_edit(mut self, edit: RegionEdit, file_purger: FilePurgerRef) -> Self {
if let Some(flushed_entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(flushed_entry_id);
if let Some(entry_id) = edit.flushed_entry_id {
self.flushed_entry_id = self.flushed_entry_id.max(entry_id);
}
if let Some(sequence) = edit.flushed_sequence {
self.flushed_sequence = self.flushed_sequence.max(sequence);
}
if !edit.files_to_add.is_empty() || !edit.files_to_remove.is_empty() {
let mut ssts = (*self.ssts).clone();
Expand Down Expand Up @@ -265,6 +286,7 @@ impl VersionBuilder {
memtables: self.memtables,
ssts: self.ssts,
flushed_entry_id: self.flushed_entry_id,
flushed_sequence: self.flushed_sequence,
}
}
}
4 changes: 3 additions & 1 deletion src/mito2/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use store_api::region_request::{
RegionAlterRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
RegionDropRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{CompactionStrategy, RegionId};
use store_api::storage::{CompactionStrategy, RegionId, SequenceNumber};
use tokio::sync::oneshot::{self, Receiver, Sender};

use crate::config::DEFAULT_WRITE_BUFFER_SIZE;
Expand Down Expand Up @@ -525,6 +525,8 @@ pub(crate) struct FlushFinished {
pub(crate) file_metas: Vec<FileMeta>,
/// Entry id of flushed data.
pub(crate) flushed_entry_id: EntryId,
/// Sequence of flushed data.
pub(crate) flushed_sequence: SequenceNumber,
/// Id of memtables to remove.
pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
/// Flush result senders.
Expand Down
25 changes: 24 additions & 1 deletion src/mito2/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ use object_store::ObjectStore;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest, RegionPutRequest, RegionRequest,
RegionCloseRequest, RegionCreateRequest, RegionDeleteRequest, RegionFlushRequest,
RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;

Expand Down Expand Up @@ -566,3 +567,25 @@ pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) {
};
assert_eq!(0, rows);
}

/// Reopen a region.
pub async fn reopen_region(engine: &MitoEngine, region_id: RegionId, region_dir: String) {
// Close the region.
engine
.handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
.await
.unwrap();

// Open the region again.
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
}
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
files_to_remove: std::mem::take(&mut request.compacted_files),
compaction_time_window: None, // TODO(hl): update window maybe
flushed_entry_id: None,
flushed_sequence: None,
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/worker/handle_flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
files_to_remove: Vec::new(),
compaction_time_window: None,
flushed_entry_id: Some(request.flushed_entry_id),
flushed_sequence: Some(request.flushed_sequence),
};
let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
if let Err(e) = region.manifest_manager.update(action_list).await {
Expand Down
Loading