Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Implement skeleton for alteration #2343

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
70a9541
feat: impl handle_alter wip
evenyag Sep 6, 2023
116e6bb
refactor: move send_result to worker.rs
evenyag Sep 6, 2023
3cbc626
feat: skeleton for handle_alter_request
evenyag Sep 6, 2023
ec0cd0c
feat: write requests should wait for alteration
evenyag Sep 6, 2023
3636bc3
feat: define alter request
evenyag Sep 6, 2023
053d270
chore: no warnings
evenyag Sep 6, 2023
38ccd4f
fix: remove memtables after flush
evenyag Sep 6, 2023
5729c5c
chore: update comments and impl add_write_request_to_pending
evenyag Sep 7, 2023
1ba00f6
feat: add schema version to RegionMetadata
evenyag Sep 8, 2023
73dc472
feat: impl alter_schema/can_alter_directly
evenyag Sep 8, 2023
b1c6ade
chore: use send_result
evenyag Sep 8, 2023
08ed3e1
test: pull next_batch again
evenyag Sep 8, 2023
fa4c6c8
feat: convert pb AlterRequest to RegionAlterRequest
evenyag Sep 8, 2023
8dc8fb6
feat: validate alter request
evenyag Sep 8, 2023
14eae74
feat: validate request and alter metadata
evenyag Sep 8, 2023
aa5ceba
feat: allow none location
evenyag Sep 9, 2023
3f66182
test: test alter
evenyag Sep 9, 2023
78b35b7
fix: recover files and flushed entry id from manifest
evenyag Sep 9, 2023
a305c49
test: test alter
evenyag Sep 9, 2023
9fde091
chore: change comments and variables
evenyag Sep 9, 2023
1e68db8
feat: Merge branch 'migrate-region-server' into feat/mito2-alter
evenyag Sep 11, 2023
be6edec
chore: fix compiler errors
evenyag Sep 11, 2023
306c1de
feat: add is_empty() to MemtableVersion
evenyag Sep 11, 2023
271a370
test: fix metadata alter test
evenyag Sep 11, 2023
8fccdf0
fix: Compaction picker doesn't notify waiters if it returns None
evenyag Sep 11, 2023
6f7302a
chore: address CR comments
evenyag Sep 11, 2023
5f57ef2
test: add tests for alter request
evenyag Sep 11, 2023
78b4058
refactor: use send_result
evenyag Sep 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

//! Mito region engine.

#[cfg(test)]
mod alter_test;
#[cfg(test)]
mod close_test;
#[cfg(test)]
Expand Down
111 changes: 111 additions & 0 deletions src/mito2/src/engine/alter_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::{Rows, SemanticType};
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AddColumn, AddColumnLocation, AlterKind, RegionAlterRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};

use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::test_util::{build_rows, put_rows, rows_schema, CreateRequestBuilder, TestEnv};

async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

#[tokio::test]
async fn test_alter_region() {
common_telemetry::init_default_ut_logging();

let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;

let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();

let column_schemas = rows_schema(&request);
let region_dir = request.region_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();

let rows = Rows {
schema: column_schemas,
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;

let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_schema: ColumnSchema::new(
"tag_1",
ConcreteDataType::string_datatype(),
true,
),
semantic_type: SemanticType::Tag,
column_id: 3,
},
location: Some(AddColumnLocation::First),
}],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(request))
.await
.unwrap();

let expected = "\
+-------+-------+---------+---------------------+
| tag_1 | tag_0 | field_0 | ts |
+-------+-------+---------+---------------------+
| | 0 | 0.0 | 1970-01-01T00:00:00 |
| | 1 | 1.0 | 1970-01-01T00:00:01 |
| | 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+-------+---------+---------------------+";
scan_check_after_alter(&engine, region_id, expected).await;

// Reopen region.
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
region_dir,
options: HashMap::default(),
}),
)
.await
.unwrap();
scan_check_after_alter(&engine, region_id, expected).await;
}
6 changes: 5 additions & 1 deletion src/mito2/src/engine/flush_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ async fn test_manual_flush() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -98,7 +99,7 @@ async fn test_flush_engine() {

write_buffer_manager.set_should_flush(true);

// Writes and triggers flush.
// Writes to the mutable memtable and triggers flush.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows_for_key("b", 0, 2, 0),
Expand All @@ -110,6 +111,7 @@ async fn test_flush_engine() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -174,6 +176,7 @@ async fn test_write_stall() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(1, scanner.num_memtables());
assert_eq!(1, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down Expand Up @@ -209,6 +212,7 @@ async fn test_flush_empty() {

let request = ScanRequest::default();
let scanner = engine.scan(region_id, request).unwrap();
assert_eq!(0, scanner.num_memtables());
assert_eq!(0, scanner.num_files());
let stream = scanner.scan().await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,12 @@ pub enum Error {
reason: String,
location: Location,
},

#[snafu(display("{}, location: {}", source, location))]
InvalidRegionRequest {
source: store_api::metadata::MetadataError,
location: Location,
},
}

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down Expand Up @@ -537,6 +543,7 @@ impl ErrorExt for Error {
RejectWrite { .. } => StatusCode::StorageUnavailable,
CompactRegion { source, .. } => source.status_code(),
CompatReader { .. } => StatusCode::Unexpected,
InvalidRegionRequest { source, .. } => source.status_code(),
}
}

Expand Down
76 changes: 53 additions & 23 deletions src/mito2/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ use crate::read::Source;
use crate::region::version::{VersionControlData, VersionRef};
use crate::region::MitoRegionRef;
use crate::request::{
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, WorkerRequest,
BackgroundNotify, FlushFailed, FlushFinished, SenderDdlRequest, SenderWriteRequest,
WorkerRequest,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileId, FileMeta};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::parquet::WriteOptions;
use crate::worker::send_result;

/// Global write buffer (memtable) manager.
///
Expand Down Expand Up @@ -165,7 +167,8 @@ pub enum FlushReason {
EngineFull,
/// Manual flush.
Manual,
// TODO(yingwen): Alter.
/// Flush to alter table.
Alter,
}

/// Task to flush a region.
Expand Down Expand Up @@ -357,13 +360,13 @@ impl FlushScheduler {
// Checks whether we can flush the region now.
if flush_status.flushing {
// There is already a flush job running.
flush_status.push_task(task);
flush_status.merge_task(task);
return Ok(());
}

// If there are pending tasks, then we should push it to pending list.
if flush_status.pending_task.is_some() {
flush_status.push_task(task);
flush_status.merge_task(task);
return Ok(());
}

Expand Down Expand Up @@ -393,19 +396,19 @@ impl FlushScheduler {
pub(crate) fn on_flush_success(
&mut self,
region_id: RegionId,
) -> Option<Vec<SenderDdlRequest>> {
) -> Option<(Vec<SenderDdlRequest>, Vec<SenderWriteRequest>)> {
let Some(flush_status) = self.region_status.get_mut(&region_id) else {
return None;
};

// This region doesn't have running flush job.
flush_status.flushing = false;

let pending_ddls = if flush_status.pending_task.is_none() {
let pending_requests = if flush_status.pending_task.is_none() {
// The region doesn't have any pending flush task.
// Safety: The flush status exists.
let flush_status = self.region_status.remove(&region_id).unwrap();
Some(flush_status.pending_ddls)
Some((flush_status.pending_ddls, flush_status.pending_writes))
} else {
None
};
Expand All @@ -415,7 +418,7 @@ impl FlushScheduler {
error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
}

pending_ddls
pending_requests
}

/// Notifies the scheduler that the flush job is finished.
Expand Down Expand Up @@ -460,17 +463,31 @@ impl FlushScheduler {

/// Add ddl request to pending queue.
///
/// Returns error if region doesn't request flush.
pub(crate) fn add_ddl_request_to_pending(
&mut self,
request: SenderDdlRequest,
) -> Result<(), SenderDdlRequest> {
if let Some(status) = self.region_status.get_mut(&request.region_id) {
status.pending_ddls.push(request);
return Ok(());
}
/// # Panics
/// Panics if region didn't request flush.
pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
let status = self.region_status.get_mut(&request.region_id).unwrap();
status.pending_ddls.push(request);
}

Err(request)
/// Add write request to pending queue.
///
/// # Panics
/// Panics if region didn't request flush.
pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
let status = self
.region_status
.get_mut(&request.request.region_id)
.unwrap();
status.pending_writes.push(request);
}

/// Returns true if the region has pending DDLs.
pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
self.region_status
.get(&region_id)
.map(|status| !status.pending_ddls.is_empty())
.unwrap_or(false)
}

/// Schedules a new flush task when the scheduler can submit next task.
Expand Down Expand Up @@ -508,6 +525,8 @@ struct FlushStatus {
pending_task: Option<RegionFlushTask>,
/// Pending ddl requests.
pending_ddls: Vec<SenderDdlRequest>,
/// Requests waiting to write after altering the region.
pending_writes: Vec<SenderWriteRequest>,
}

impl FlushStatus {
Expand All @@ -517,10 +536,12 @@ impl FlushStatus {
flushing: false,
pending_task: None,
pending_ddls: Vec::new(),
pending_writes: Vec::new(),
}
}

fn push_task(&mut self, task: RegionFlushTask) {
/// Merges the task to pending task.
fn merge_task(&mut self, task: RegionFlushTask) {
if let Some(pending) = &mut self.pending_task {
pending.merge(task);
} else {
Expand All @@ -533,11 +554,20 @@ impl FlushStatus {
task.on_failure(err.clone());
}
for ddl in self.pending_ddls {
if let Some(sender) = ddl.sender {
let _ = sender.send(Err(err.clone()).context(FlushRegionSnafu {
send_result(
ddl.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}));
}
}),
);
}
for write_req in self.pending_writes {
send_result(
write_req.sender,
Err(err.clone()).context(FlushRegionSnafu {
region_id: self.region.region_id,
}),
);
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/manifest/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,9 @@ mod tests {
{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}
],
"primary_key":[1],
"region_id":5299989648942}
"region_id":5299989648942,
"schema_version":0
}
}"#;
let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/manifest/tests/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn manager_with_checkpoint_distance_1() {
.await
.unwrap();
let raw_json = std::str::from_utf8(&raw_bytes).unwrap();
let expected_json = "{\"size\":750,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
let expected_json = "{\"size\":769,\"version\":9,\"checksum\":null,\"extend_metadata\":{}}";
assert_eq!(expected_json, raw_json);

// reopen the manager
Expand Down
14 changes: 12 additions & 2 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ use std::sync::Arc;

use smallvec::SmallVec;

use crate::memtable::MemtableRef;
use crate::memtable::{MemtableId, MemtableRef};

/// A version of current memtables in a region.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
pub(crate) mutable: MemtableRef,
Expand Down Expand Up @@ -85,6 +85,16 @@ impl MemtableVersion {
})
}

/// Removes memtables by ids from immutable memtables.
pub(crate) fn remove_memtables(&mut self, ids: &[MemtableId]) {
self.immutables = self
.immutables
.iter()
.filter(|mem| !ids.contains(&mem.id()))
.cloned()
.collect();
}

/// Returns the memory usage of the mutable memtable.
pub(crate) fn mutable_usage(&self) -> usize {
self.mutable.stats().estimated_bytes
Expand Down
Loading