Skip to content

Commit

Permalink
feat(mito): Implement skeleton for alteration (GreptimeTeam#2343)
Browse files Browse the repository at this point in the history
* feat: impl handle_alter wip

* refactor: move send_result to worker.rs

* feat: skeleton for handle_alter_request

* feat: write requests should wait for alteration

* feat: define alter request

* chore: no warnings

* fix: remove memtables after flush

* chore: update comments and impl add_write_request_to_pending

* feat: add schema version to RegionMetadata

* feat: impl alter_schema/can_alter_directly

* chore: use send_result

* test: pull next_batch again

* feat: convert pb AlterRequest to RegionAlterRequest

* feat: validate alter request

* feat: validate request and alter metadata

* feat: allow none location

* test: test alter

* fix: recover files and flushed entry id from manifest

* test: test alter

* chore: change comments and variables

* chore: fix compiler errors

* feat: add is_empty() to MemtableVersion

* test: fix metadata alter test

* fix: Compaction picker doesn't notify waiters if it returns None

* chore: address CR comments

* test: add tests for alter request

* refactor: use send_result
  • Loading branch information
evenyag authored and WenyXu committed Sep 12, 2023
1 parent 18914ff commit 64c17d0
Show file tree
Hide file tree
Showing 22 changed files with 1,239 additions and 150 deletions.
20 changes: 7 additions & 13 deletions src/mito2/src/compaction/twcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::request::{BackgroundNotify, CompactionFailed, CompactionFinished, Wor
use crate::sst::file::{FileHandle, FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::version::LevelMeta;
use crate::worker::send_result;

const MAX_PARALLEL_COMPACTION: usize = 8;

Expand Down Expand Up @@ -122,7 +123,7 @@ impl Picker for TwcsPicker {
ttl,
compaction_time_window,
request_sender,
waiter: waiters,
waiter,
file_purger,
} = req;

Expand Down Expand Up @@ -155,6 +156,8 @@ impl Picker for TwcsPicker {
let outputs = self.build_output(&windows, active_window, time_window_size);

if outputs.is_empty() && expired_ssts.is_empty() {
// Nothing to compact.
send_result(waiter, Ok(Output::AffectedRows(0)));
return None;
}
let task = TwcsCompactionTask {
Expand All @@ -166,7 +169,7 @@ impl Picker for TwcsPicker {
sst_write_buffer_size: ReadableSize::mb(4),
compaction_time_window: None,
request_sender,
sender: waiters,
sender: waiter,
file_purger,
};
Some(Box::new(task))
Expand Down Expand Up @@ -267,7 +270,8 @@ impl TwcsCompactionTask {
compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta));

info!(
"Compaction output [{}]-> {}",
"Compaction region {} output [{}]-> {}",
self.region_id,
output
.inputs
.iter()
Expand Down Expand Up @@ -315,15 +319,6 @@ impl TwcsCompactionTask {
Ok((output, compacted))
}

/// Handles compaction success.
fn early_success(&mut self) {
if let Some(sender) = self.sender.take() {
let _ = sender.send(Ok(Output::AffectedRows(0)).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}

/// Handles compaction failure, notifies all waiters.
fn on_failure(&mut self, err: Arc<error::Error>) {
if let Some(sender) = self.sender.take() {
Expand Down Expand Up @@ -353,7 +348,6 @@ impl CompactionTask for TwcsCompactionTask {
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
added, deleted, self.compaction_time_window
);
self.early_success();

BackgroundNotify::CompactionFinished(CompactionFinished {
region_id: self.region_id,
Expand Down
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Mito region engine.
#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
Expand Down
111 changes: 111 additions & 0 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::{Rows, SemanticType};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;

let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();

let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check_after_alter(&engine, region_id, expected).await;

// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
scan_check_after_alter(&engine, region_id, expected).await;
}
13 changes: 6 additions & 7 deletions src/mito2/src/engine/compaction_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::ops::Range;

use api::v1::{ColumnSchema, Rows};
Expand All @@ -29,7 +28,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
};

async fn put_and_flush(
Expand All @@ -40,7 +39,7 @@ async fn put_and_flush(
) {
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
rows: build_rows_for_key("a", rows.start, rows.end, 0),
};
put_rows(engine, region_id, rows).await;

Expand All @@ -63,7 +62,7 @@ async fn delete_and_flush(
let row_cnt = rows.len();
let rows = Rows {
schema: column_schemas.to_vec(),
rows: build_rows(rows.start, rows.end),
rows: build_rows_for_key("a", rows.start, rows.end, 0),
};

let deleted = engine
Expand All @@ -89,8 +88,8 @@ async fn delete_and_flush(
assert_eq!(0, rows);
}

async fn collect_stream_ts(stream: SendableRecordBatchStream) -> HashSet<i64> {
let mut res = HashSet::new();
async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
let mut res = Vec::new();
let batches = RecordBatches::try_collect(stream).await.unwrap();
for batch in batches {
let ts_col = batch
Expand Down Expand Up @@ -139,5 +138,5 @@ async fn test_compaction_region() {
.unwrap();

let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<HashSet<_>>(), vec);
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
6 changes: 5 additions & 1 deletion src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn test_manual_flush() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -98,7 +99,7 @@ async fn test_flush_engine() {

write_buffer_manager.set_should_flush(true);

// Writes and triggers flush.
// Writes to the mutable memtable and triggers flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
Expand All @@ -110,6 +111,7 @@ async fn test_flush_engine() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -174,6 +176,7 @@ async fn test_write_stall() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -209,6 +212,7 @@ async fn test_flush_empty() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ pub enum Error {
reason: String,
location: Location,
},

#[snafu(display("{}, location: {}", source, location))]
InvalidRegionRequest {
source: store_api::metadata::MetadataError,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -537,6 +543,7 @@ impl ErrorExt for Error {
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
}
}

Expand Down
Loading

0 comments on commit 64c17d0

Please sign in to comment.