From e5663a075f09b9fc47c10b7987738c5743e2be5e Mon Sep 17 00:00:00 2001 From: Yingwen Date: Fri, 4 Aug 2023 11:53:02 +0900 Subject: [PATCH] feat(mito): preparation to implementing write (#2085) * refactor: move request mod * feat: add mutation * feat: add handle_write mod * feat: one mutation at a time * feat: handle write requests * feat: validate schema * refactor: move schema check to write request * feat: add convert value * feat: fill default values * chore: remove comments * feat: remove code * feat: remove code * feat: buf requests * style: fix clippy * refactor: rename check functions * chore: fix compile error * chore: Revert "feat: remove code" This reverts commit 651659754041643aeeb763a0f4562fb6dd313f1a. * chore: Revert "feat: remove code" This reverts commit 5f2b790a01d6bb0ec625e0c90e7a288e1c385b69. * chore: upgrade greptime-proto * chore: Update comment Co-authored-by: dennis zhuang --------- Co-authored-by: dennis zhuang --- Cargo.lock | 19 ++- src/mito2/Cargo.toml | 2 + src/mito2/src/engine.rs | 18 ++- src/mito2/src/error.rs | 42 +++++- src/mito2/src/lib.rs | 1 + src/mito2/src/metadata.rs | 8 +- src/mito2/src/proto_util.rs | 188 +++++++++++++++++++++++ src/mito2/src/region.rs | 9 +- src/mito2/src/region/version.rs | 20 +-- src/mito2/src/request.rs | 218 +++++++++++++++++++++++++-- src/mito2/src/worker.rs | 31 ++-- src/mito2/src/worker/handle_write.rs | 118 +++++++++++++++ 12 files changed, 620 insertions(+), 54 deletions(-) create mode 100644 src/mito2/src/proto_util.rs create mode 100644 src/mito2/src/worker/handle_write.rs diff --git a/Cargo.lock b/Cargo.lock index 0074c227fe9f..7ff3c5be1f01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -211,7 +211,7 @@ dependencies = [ "common-error", "common-time", "datatypes", - "greptime-proto", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)", "prost", "snafu", "tonic 0.9.2", @@ -4111,6 +4111,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "greptime-proto" +version = "0.1.0" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c#ec4b84931378004db60d168e2604bc3fb9735e9c" +dependencies = [ + "prost", + "serde", + "serde_json", + "tonic 0.9.2", + "tonic-build", +] + [[package]] name = "greptime-proto" version = "0.1.0" @@ -5498,6 +5510,7 @@ dependencies = [ "datafusion-common", "datatypes", "futures", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ec4b84931378004db60d168e2604bc3fb9735e9c)", "lazy_static", "log-store", "metrics", @@ -6982,7 +6995,7 @@ dependencies = [ "datafusion", "datatypes", "futures", - "greptime-proto", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)", "promql-parser", "prost", "query", @@ -7252,7 +7265,7 @@ dependencies = [ "format_num", "futures", "futures-util", - "greptime-proto", + "greptime-proto 0.1.0 (git+https://github.com/GreptimeTeam/greptime-proto.git?rev=eeae2d0dfa8ee320a7b9e987b4631a6c1c732ebd)", "humantime", "metrics", "num", diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index bdc7ac2da9b7..5e88a9dccb35 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -32,6 +32,8 @@ datafusion.workspace = true datafusion-common.workspace = true datatypes = { path = "../datatypes" } futures.workspace = true +# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged. +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" } lazy_static = "1.4" log-store = { path = "../log-store" } metrics.workspace = true diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index c1ef1cb1aa69..e4a74527cb7c 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -26,8 +26,9 @@ use store_api::storage::RegionId; use crate::config::MitoConfig; use crate::error::{RecvSnafu, Result}; -pub use crate::request::CreateRequest; -use crate::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody}; +use crate::request::{ + CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest, +}; use crate::worker::WorkerGroup; /// Region engine implementation for timeseries data. @@ -84,6 +85,19 @@ impl MitoEngine { pub fn is_region_exists(&self, region_id: RegionId) -> bool { self.inner.workers.is_region_exists(region_id) } + + /// Write to a region. + pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> { + write_request.validate()?; + + // TODO(yingwen): Fill default values. + // We need to fill default values before writing it to WAL so we can get + // the same default value after reopening the region. + + self.inner + .handle_request_body(RequestBody::Write(write_request)) + .await + } } /// Inner struct of [MitoEngine]. diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 508796945649..4b92522118d3 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -174,6 +174,37 @@ pub enum Error { reason: String, location: Location, }, + + #[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))] + InvalidRequest { + region_id: RegionId, + reason: String, + location: Location, + }, + + /// An error type to indicate that schema is changed and we need + /// to fill default values again. + #[snafu(display( + "Need to fill default value to column {} of region {}", + column, + region_id + ))] + FillDefault { + region_id: RegionId, + column: String, + // The error is for retry purpose so we don't need a location. + }, + + #[snafu(display( + "Failed to create default value for column {} of region {}", + column, + region_id + ))] + CreateDefault { + region_id: RegionId, + column: String, + source: datatypes::Error, + }, } pub type Result = std::result::Result; @@ -193,10 +224,13 @@ impl ErrorExt for Error { | RegionExists { .. } | NewRecordBatch { .. } | RegionNotFound { .. } - | RegionCorrupted { .. } => StatusCode::Unexpected, - InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => { - StatusCode::InvalidArguments - } + | RegionCorrupted { .. } + | CreateDefault { .. } => StatusCode::Unexpected, + InvalidScanIndex { .. } + | InvalidMeta { .. } + | InvalidSchema { .. } + | InvalidRequest { .. } + | FillDefault { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index b3953a2a1cd2..1aeacc9270d1 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -31,6 +31,7 @@ pub mod manifest; pub mod memtable; #[allow(dead_code)] pub mod metadata; +pub(crate) mod proto_util; pub mod read; #[allow(dead_code)] mod region; diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index f201a88002be..7f1588d1559c 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -310,14 +310,14 @@ impl ColumnMetadata { } /// The semantic type of one column -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum SemanticType { /// Tag column, also is a part of primary key. - Tag, + Tag = 0, /// A column that isn't a time index or part of primary key. - Field, + Field = 1, /// Time index column. - Timestamp, + Timestamp = 2, } /// Fields skipped in serialization. diff --git a/src/mito2/src/proto_util.rs b/src/mito2/src/proto_util.rs new file mode 100644 index 000000000000..6884dff3604d --- /dev/null +++ b/src/mito2/src/proto_util.rs @@ -0,0 +1,188 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Utilities to process protobuf messages. + +use common_time::timestamp::TimeUnit; +use datatypes::prelude::ConcreteDataType; +use datatypes::types::{TimeType, TimestampType}; +use datatypes::value::Value; +use greptime_proto::v1::{self, ColumnDataType}; +use store_api::storage::OpType; + +use crate::metadata::SemanticType; + +/// Returns true if the pb semantic type is valid. +pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool { + type_value == semantic_type as i32 +} + +/// Returns true if the pb type value is valid. +pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool { + let Some(column_type) = ColumnDataType::from_i32(type_value) else { + return false; + }; + + is_column_type_eq(column_type, expect_type) +} + +/// Convert value into proto's value. +pub(crate) fn to_proto_value(value: Value) -> Option { + let proto_value = match value { + Value::Null => v1::Value { value: None }, + Value::Boolean(v) => v1::Value { + value: Some(v1::value::Value::BoolValue(v)), + }, + Value::UInt8(v) => v1::Value { + value: Some(v1::value::Value::U8Value(v.into())), + }, + Value::UInt16(v) => v1::Value { + value: Some(v1::value::Value::U16Value(v.into())), + }, + Value::UInt32(v) => v1::Value { + value: Some(v1::value::Value::U32Value(v)), + }, + Value::UInt64(v) => v1::Value { + value: Some(v1::value::Value::U64Value(v)), + }, + Value::Int8(v) => v1::Value { + value: Some(v1::value::Value::I8Value(v.into())), + }, + Value::Int16(v) => v1::Value { + value: Some(v1::value::Value::I16Value(v.into())), + }, + Value::Int32(v) => v1::Value { + value: Some(v1::value::Value::I32Value(v)), + }, + Value::Int64(v) => v1::Value { + value: Some(v1::value::Value::I64Value(v)), + }, + Value::Float32(v) => v1::Value { + value: Some(v1::value::Value::F32Value(*v)), + }, + Value::Float64(v) => v1::Value { + value: Some(v1::value::Value::F64Value(*v)), + }, + Value::String(v) => v1::Value { + value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())), + }, + Value::Binary(v) => v1::Value { + value: Some(v1::value::Value::BinaryValue(v.to_vec())), + }, + Value::Date(v) => v1::Value { + value: Some(v1::value::Value::DateValue(v.val())), + }, + Value::DateTime(v) => v1::Value { + value: Some(v1::value::Value::DatetimeValue(v.val())), + }, + Value::Timestamp(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value: Some(v1::value::Value::TsSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value: Some(v1::value::Value::TsMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value: Some(v1::value::Value::TsMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value: Some(v1::value::Value::TsNanosecondValue(v.value())), + }, + }, + Value::Time(v) => match v.unit() { + TimeUnit::Second => v1::Value { + value: Some(v1::value::Value::TimeSecondValue(v.value())), + }, + TimeUnit::Millisecond => v1::Value { + value: Some(v1::value::Value::TimeMillisecondValue(v.value())), + }, + TimeUnit::Microsecond => v1::Value { + value: Some(v1::value::Value::TimeMicrosecondValue(v.value())), + }, + TimeUnit::Nanosecond => v1::Value { + value: Some(v1::value::Value::TimeNanosecondValue(v.value())), + }, + }, + Value::Interval(_) | Value::List(_) => return None, + }; + + Some(proto_value) +} + +/// Convert [ConcreteDataType] to [ColumnDataType]. +pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option { + let column_data_type = match data_type { + ConcreteDataType::Boolean(_) => ColumnDataType::Boolean, + ConcreteDataType::Int8(_) => ColumnDataType::Int8, + ConcreteDataType::Int16(_) => ColumnDataType::Int16, + ConcreteDataType::Int32(_) => ColumnDataType::Int32, + ConcreteDataType::Int64(_) => ColumnDataType::Int64, + ConcreteDataType::UInt8(_) => ColumnDataType::Uint8, + ConcreteDataType::UInt16(_) => ColumnDataType::Uint16, + ConcreteDataType::UInt32(_) => ColumnDataType::Uint32, + ConcreteDataType::UInt64(_) => ColumnDataType::Uint64, + ConcreteDataType::Float32(_) => ColumnDataType::Float32, + ConcreteDataType::Float64(_) => ColumnDataType::Float64, + ConcreteDataType::Binary(_) => ColumnDataType::Binary, + ConcreteDataType::String(_) => ColumnDataType::String, + ConcreteDataType::Date(_) => ColumnDataType::Date, + ConcreteDataType::DateTime(_) => ColumnDataType::Datetime, + ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond, + ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => { + ColumnDataType::TimestampMillisecond + } + ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => { + ColumnDataType::TimestampMicrosecond + } + ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => { + ColumnDataType::TimestampNanosecond + } + ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond, + ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond, + ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond, + ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond, + ConcreteDataType::Null(_) + | ConcreteDataType::Interval(_) + | ConcreteDataType::List(_) + | ConcreteDataType::Dictionary(_) => return None, + }; + + Some(column_data_type) +} + +/// Convert semantic type to proto's semantic type +pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType { + match semantic_type { + SemanticType::Tag => v1::SemanticType::Tag, + SemanticType::Field => v1::SemanticType::Field, + SemanticType::Timestamp => v1::SemanticType::Timestamp, + } +} + +/// Convert op type to proto's op type. +pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType { + match op_type { + OpType::Delete => v1::mito::OpType::Delete, + OpType::Put => v1::mito::OpType::Put, + } +} + +/// Returns true if the column type is equal to expected type. +fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool { + if let Some(expect) = to_column_data_type(expect_type) { + column_type == expect + } else { + false + } +} diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index c0266ff0df1d..2be5ddf2478c 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -15,7 +15,7 @@ //! Mito region. pub(crate) mod opener; -mod version; +pub(crate) mod version; use std::collections::HashMap; use std::sync::{Arc, RwLock}; @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; -use crate::region::version::VersionControlRef; +use crate::region::version::{VersionControlRef, VersionRef}; /// Type to store region version. pub type VersionNumber = u32; @@ -56,6 +56,11 @@ impl MitoRegion { Ok(()) } + + /// Returns current version of the region. + pub(crate) fn version(&self) -> VersionRef { + self.version_control.current() + } } /// Regions indexed by ids. diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index d9c5e9ecece0..54fe29df3ca8 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use arc_swap::ArcSwap; -use store_api::manifest::ManifestVersion; use store_api::storage::SequenceNumber; use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; @@ -48,6 +47,11 @@ impl VersionControl { version: ArcSwap::new(Arc::new(version)), } } + + /// Returns current [Version]. + pub(crate) fn current(&self) -> VersionRef { + self.version.load_full() + } } pub(crate) type VersionControlRef = Arc; @@ -59,21 +63,20 @@ pub(crate) struct Version { /// /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing /// metadata and reuse metadata when creating a new `Version`. - metadata: RegionMetadataRef, + pub(crate) metadata: RegionMetadataRef, /// Mutable and immutable memtables. /// /// Wrapped in Arc to make clone of `Version` much cheaper. - memtables: MemtableVersionRef, + pub(crate) memtables: MemtableVersionRef, /// SSTs of the region. - ssts: SstVersionRef, + pub(crate) ssts: SstVersionRef, /// Inclusive max sequence of flushed data. - flushed_sequence: SequenceNumber, - // TODO(yingwen): Remove this. - /// Current version of region manifest. - manifest_version: ManifestVersion, + pub(crate) flushed_sequence: SequenceNumber, // TODO(yingwen): RegionOptions. } +pub(crate) type VersionRef = Arc; + /// Version builder. pub(crate) struct VersionBuilder { metadata: RegionMetadataRef, @@ -94,7 +97,6 @@ impl VersionBuilder { memtables: Arc::new(MemtableVersion::new(self.mutable)), ssts: Arc::new(SstVersion::new()), flushed_sequence: 0, - manifest_version: 0, } } } diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index a18105a7548e..10348ac70df4 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -14,15 +14,22 @@ //! Worker requests. +use std::collections::HashMap; use std::time::Duration; use common_base::readable_size::ReadableSize; -use store_api::storage::{ColumnId, CompactionStrategy, RegionId}; +use greptime_proto::v1::{ColumnDataType, ColumnSchema, Rows}; +use snafu::{ensure, OptionExt, ResultExt}; +use store_api::storage::{ColumnId, CompactionStrategy, OpType, RegionId}; use tokio::sync::oneshot::{self, Receiver, Sender}; use crate::config::DEFAULT_WRITE_BUFFER_SIZE; -use crate::error::Result; -use crate::metadata::ColumnMetadata; +use crate::error::{CreateDefaultSnafu, FillDefaultSnafu, InvalidRequestSnafu, Result}; +use crate::metadata::{ColumnMetadata, RegionMetadata}; +use crate::proto_util::{ + is_column_type_value_eq, is_semantic_type_eq, to_column_data_type, to_proto_semantic_type, + to_proto_value, +}; /// Options that affect the entire region. /// @@ -84,9 +91,193 @@ pub struct CloseRequest { /// Request to write a region. #[derive(Debug)] -pub(crate) struct WriteRequest { +pub struct WriteRequest { /// Region to write. pub region_id: RegionId, + /// Type of the write request. + pub op_type: OpType, + /// Rows to write. + pub rows: Rows, + /// Map column name to column index in `rows`. + name_to_index: HashMap, +} + +impl WriteRequest { + /// Returns a new request. + pub fn new(region_id: RegionId, op_type: OpType, rows: Rows) -> WriteRequest { + let name_to_index = rows + .schema + .iter() + .enumerate() + .map(|(index, column)| (column.column_name.clone(), index)) + .collect(); + WriteRequest { + region_id, + op_type, + rows, + name_to_index, + } + } + + /// Validate the request. + pub(crate) fn validate(&self) -> Result<()> { + // - checks whether the request is too large. + // - checks whether each row in rows has the same schema. + // - checks whether each column match the schema in Rows. + // - checks rows don't have duplicate columns. + unimplemented!() + } + + /// Checks schema of rows. + /// + /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault) + /// error. + pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> { + let region_id = self.region_id; + // Index all columns in rows. + let mut rows_columns: HashMap<_, _> = self + .rows + .schema + .iter() + .map(|column| (&column.column_name, column)) + .collect(); + + // Checks all columns in this region. + for column in &metadata.column_metadatas { + if let Some(input_col) = rows_columns.remove(&column.column_schema.name) { + // Check data type. + ensure!( + is_column_type_value_eq(input_col.datatype, &column.column_schema.data_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} expect type {:?}, given: {:?}({})", + column.column_schema.name, + column.column_schema.data_type, + ColumnDataType::from_i32(input_col.datatype), + input_col.datatype, + ) + } + ); + + // Check semantic type. + ensure!( + is_semantic_type_eq(input_col.semantic_type, column.semantic_type), + InvalidRequestSnafu { + region_id, + reason: format!( + "column {} has semantic type {:?}, given: {:?}({})", + column.column_schema.name, + column.semantic_type, + greptime_proto::v1::SemanticType::from_i32(input_col.semantic_type), + input_col.semantic_type + ), + } + ); + } else { + // For columns not in rows, checks whether they have default value. + ensure!( + column.column_schema.is_nullable() + || column.column_schema.default_constraint().is_some(), + InvalidRequestSnafu { + region_id, + reason: format!("missing column {}", column.column_schema.name), + } + ); + + return FillDefaultSnafu { + region_id, + column: &column.column_schema.name, + } + .fail(); + } + } + + // Checks all columns in rows exist in the region. + if !rows_columns.is_empty() { + let names: Vec<_> = rows_columns.into_keys().collect(); + return InvalidRequestSnafu { + region_id, + reason: format!("unknown columns: {:?}", names), + } + .fail(); + } + + Ok(()) + } + + /// Try to fill missing columns. + /// + /// Currently, our protobuf format might be inefficient when we need to fill lots of null + /// values. + pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> { + for column in &metadata.column_metadatas { + if !self.name_to_index.contains_key(&column.column_schema.name) { + self.fill_column(metadata.region_id, column)?; + } + } + + Ok(()) + } + + /// Fill default value for specific `column`. + fn fill_column(&mut self, region_id: RegionId, column: &ColumnMetadata) -> Result<()> { + // Need to add a default value for this column. + let default_value = column + .column_schema + .create_default() + .context(CreateDefaultSnafu { + region_id, + column: &column.column_schema.name, + })? + // This column doesn't have default value. + .with_context(|| InvalidRequestSnafu { + region_id, + reason: format!( + "column {} does not have default value", + column.column_schema.name + ), + })?; + + // Convert default value into proto's value. + let proto_value = to_proto_value(default_value).with_context(|| InvalidRequestSnafu { + region_id, + reason: format!( + "no protobuf type for default value of column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + })?; + + // Insert default value to each row. + for row in &mut self.rows.rows { + row.values.push(proto_value.clone()); + } + + // Insert column schema. + let datatype = to_column_data_type(&column.column_schema.data_type).with_context(|| { + InvalidRequestSnafu { + region_id, + reason: format!( + "no protobuf type for column {} ({:?})", + column.column_schema.name, column.column_schema.data_type + ), + } + })?; + self.rows.schema.push(ColumnSchema { + column_name: column.column_schema.name.clone(), + datatype: datatype as i32, + semantic_type: to_proto_semantic_type(column.semantic_type) as i32, + }); + + Ok(()) + } +} + +/// Sender and write request. +pub(crate) struct SenderWriteRequest { + /// Result sender. + pub(crate) sender: Option>>, + pub(crate) request: WriteRequest, } /// Request sent to a worker @@ -127,7 +318,6 @@ impl RegionRequest { /// Body to carry actual region request. #[derive(Debug)] pub(crate) enum RequestBody { - // DML: /// Write to a region. Write(WriteRequest), @@ -151,13 +341,19 @@ impl RequestBody { } } - /// Returns whether the request is a DDL (e.g. CREATE/OPEN/ALTER). - pub(crate) fn is_ddl(&self) -> bool { + /// Returns whether the request is a write request. + pub(crate) fn is_write(&self) -> bool { + matches!(self, RequestBody::Write(_)) + } + + /// Converts the request into a [WriteRequest]. + /// + /// # Panics + /// Panics if it isn't a [WriteRequest]. + pub(crate) fn into_write_request(self) -> WriteRequest { match self { - RequestBody::Write(_) => false, - RequestBody::Create(_) => true, - RequestBody::Open(_) => true, - RequestBody::Close(_) => true, + RequestBody::Write(req) => req, + other => panic!("expect write request, found {other:?}"), } } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 874a76d52d2d..dd26f9ac3960 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -17,6 +17,7 @@ mod handle_close; mod handle_create; mod handle_open; +mod handle_write; use std::collections::hash_map::DefaultHasher; use std::hash::{Hash, Hasher}; @@ -37,7 +38,7 @@ use crate::config::MitoConfig; use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu}; use crate::memtable::{DefaultMemtableBuilder, MemtableBuilderRef}; use crate::region::{RegionMap, RegionMapRef}; -use crate::request::{RegionRequest, RequestBody, WorkerRequest}; +use crate::request::{RegionRequest, RequestBody, SenderWriteRequest, WorkerRequest}; /// Identifier for a worker. pub(crate) type WorkerId = u32; @@ -322,15 +323,18 @@ impl RegionWorkerLoop { /// /// `buffer` should be empty. async fn handle_requests(&mut self, buffer: &mut RequestBuffer) { - let mut dml_requests = Vec::with_capacity(buffer.len()); + let mut write_requests = Vec::with_capacity(buffer.len()); let mut ddl_requests = Vec::with_capacity(buffer.len()); for worker_req in buffer.drain(..) { match worker_req { WorkerRequest::Region(req) => { - if req.body.is_ddl() { - ddl_requests.push(req); + if req.body.is_write() { + write_requests.push(SenderWriteRequest { + sender: req.sender, + request: req.body.into_write_request(), + }); } else { - dml_requests.push(req); + ddl_requests.push(req); } } // We receive a stop signal, but we still want to process remaining @@ -342,24 +346,13 @@ impl RegionWorkerLoop { } } - // Handles all dml requests first. So we can alter regions without - // considering existing dml requests. - self.handle_dml_requests(dml_requests).await; + // Handles all write requests first. So we can alter regions without + // considering existing write requests. + self.handle_write_requests(write_requests).await; self.handle_ddl_requests(ddl_requests).await; } - /// Takes and handles all dml requests. - async fn handle_dml_requests(&mut self, write_requests: Vec) { - if write_requests.is_empty() { - return; - } - - // Create a write context that holds meta and sequence. - - unimplemented!() - } - /// Takes and handles all ddl requests. async fn handle_ddl_requests(&mut self, ddl_requests: Vec) { if ddl_requests.is_empty() { diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs new file mode 100644 index 000000000000..09da51716e8c --- /dev/null +++ b/src/mito2/src/worker/handle_write.rs @@ -0,0 +1,118 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Handling write requests. + +use std::collections::{hash_map, HashMap}; + +use greptime_proto::v1::mito::Mutation; +use tokio::sync::oneshot::Sender; + +use crate::error::{RegionNotFoundSnafu, Result}; +use crate::proto_util::to_proto_op_type; +use crate::region::version::VersionRef; +use crate::region::MitoRegionRef; +use crate::request::SenderWriteRequest; +use crate::worker::RegionWorkerLoop; + +impl RegionWorkerLoop { + /// Takes and handles all write requests. + pub(crate) async fn handle_write_requests(&mut self, write_requests: Vec) { + if write_requests.is_empty() { + return; + } + + let mut region_ctxs = HashMap::new(); + for sender_req in write_requests { + let region_id = sender_req.request.region_id; + // Checks whether the region exists. + if let hash_map::Entry::Vacant(e) = region_ctxs.entry(region_id) { + let Some(region) = self.regions.get_region(region_id) else { + // No such region. + send_result(sender_req.sender, RegionNotFoundSnafu { + region_id, + }.fail()); + + continue; + }; + + // Initialize the context. + e.insert(RegionWriteCtx::new(region)); + } + + // Safety: Now we ensure the region exists. + let region_ctx = region_ctxs.get_mut(®ion_id).unwrap(); + + // Checks whether request schema is compatible with region schema. + if let Err(e) = sender_req + .request + .check_schema(®ion_ctx.version.metadata) + { + send_result(sender_req.sender, Err(e)); + + continue; + } + + // Collect requests by region. + region_ctx.push_sender_request(sender_req); + } + + todo!() + } +} + +/// Send result to the request. +fn send_result(sender: Option>>, res: Result<()>) { + if let Some(sender) = sender { + // Ignore send result. + let _ = sender.send(res); + } +} + +/// Context to keep region metadata and buffer write requests. +struct RegionWriteCtx { + /// Region to write. + region: MitoRegionRef, + /// Version of the region while creating the context. + version: VersionRef, + /// Valid mutations. + mutations: Vec, + /// Result senders. + /// + /// The sender is 1:1 map to the mutation in `mutations`. + senders: Vec>>>, +} + +impl RegionWriteCtx { + /// Returns an empty context. + fn new(region: MitoRegionRef) -> RegionWriteCtx { + let version = region.version(); + RegionWriteCtx { + region, + version, + mutations: Vec::new(), + senders: Vec::new(), + } + } + + /// Push [SenderWriteRequest] to the context. + fn push_sender_request(&mut self, sender_req: SenderWriteRequest) { + self.mutations.push(Mutation { + op_type: to_proto_op_type(sender_req.request.op_type) as i32, + sequence: 0, // TODO(yingwen): Set sequence. + rows: Some(sender_req.request.rows), + }); + self.senders.push(sender_req.sender); + } +}