Skip to content

Commit

Permalink
feat(mito): preparation to implementing write (GreptimeTeam#2085)
Browse files Browse the repository at this point in the history
* refactor: move request mod

* feat: add mutation

* feat: add handle_write mod

* feat: one mutation at a time

* feat: handle write requests

* feat: validate schema

* refactor: move schema check to write request

* feat: add convert value

* feat: fill default values

* chore: remove comments

* feat: remove code

* feat: remove code

* feat: buf requests

* style: fix clippy

* refactor: rename check functions

* chore: fix compile error

* chore: Revert "feat: remove code"

This reverts commit 6516597.

* chore: Revert "feat: remove code"

This reverts commit 5f2b790.

* chore: upgrade greptime-proto

* chore: Update comment

Co-authored-by: dennis zhuang <[email protected]>

---------

Co-authored-by: dennis zhuang <[email protected]>
  • Loading branch information
2 people authored and paomian committed Oct 19, 2023
1 parent c6f7ef1 commit 40fad99
Show file tree
Hide file tree
Showing 12 changed files with 620 additions and 54 deletions.
19 changes: 16 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ datafusion.workspace = true
datafusion-common.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
# TODO(yingwen): Update and use api crate once https://github.com/GreptimeTeam/greptime-proto/pull/75 is merged.
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ec4b84931378004db60d168e2604bc3fb9735e9c" }
lazy_static = "1.4"
log-store = { path = "../log-store" }
metrics.workspace = true
Expand Down
18 changes: 16 additions & 2 deletions src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use store_api::storage::RegionId;

use crate::config::MitoConfig;
use crate::error::{RecvSnafu, Result};
pub use crate::request::CreateRequest;
use crate::request::{CloseRequest, OpenRequest, RegionRequest, RequestBody};
use crate::request::{
CloseRequest, CreateRequest, OpenRequest, RegionRequest, RequestBody, WriteRequest,
};
use crate::worker::WorkerGroup;

/// Region engine implementation for timeseries data.
Expand Down Expand Up @@ -84,6 +85,19 @@ impl MitoEngine {
pub fn is_region_exists(&self, region_id: RegionId) -> bool {
self.inner.workers.is_region_exists(region_id)
}

/// Write to a region.
pub async fn write_region(&self, write_request: WriteRequest) -> Result<()> {
write_request.validate()?;

// TODO(yingwen): Fill default values.
// We need to fill default values before writing it to WAL so we can get
// the same default value after reopening the region.

self.inner
.handle_request_body(RequestBody::Write(write_request))
.await
}
}

/// Inner struct of [MitoEngine].
Expand Down
42 changes: 38 additions & 4 deletions src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,37 @@ pub enum Error {
reason: String,
location: Location,
},

#[snafu(display("Invalid request to region {}, reason: {}", region_id, reason))]
InvalidRequest {
region_id: RegionId,
reason: String,
location: Location,
},

/// An error type to indicate that schema is changed and we need
/// to fill default values again.
#[snafu(display(
"Need to fill default value to column {} of region {}",
column,
region_id
))]
FillDefault {
region_id: RegionId,
column: String,
// The error is for retry purpose so we don't need a location.
},

#[snafu(display(
"Failed to create default value for column {} of region {}",
column,
region_id
))]
CreateDefault {
region_id: RegionId,
column: String,
source: datatypes::Error,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -193,10 +224,13 @@ impl ErrorExt for Error {
| RegionExists { .. }
| NewRecordBatch { .. }
| RegionNotFound { .. }
| RegionCorrupted { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
StatusCode::InvalidArguments
}
| RegionCorrupted { .. }
| CreateDefault { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InvalidMeta { .. }
| InvalidSchema { .. }
| InvalidRequest { .. }
| FillDefault { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod manifest;
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
pub(crate) mod proto_util;
pub mod read;
#[allow(dead_code)]
mod region;
Expand Down
8 changes: 4 additions & 4 deletions src/mito2/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,14 @@ impl ColumnMetadata {
}

/// The semantic type of one column
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
/// Tag column, also is a part of primary key.
Tag,
Tag = 0,
/// A column that isn't a time index or part of primary key.
Field,
Field = 1,
/// Time index column.
Timestamp,
Timestamp = 2,
}

/// Fields skipped in serialization.
Expand Down
188 changes: 188 additions & 0 deletions src/mito2/src/proto_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Utilities to process protobuf messages.
use common_time::timestamp::TimeUnit;
use datatypes::prelude::ConcreteDataType;
use datatypes::types::{TimeType, TimestampType};
use datatypes::value::Value;
use greptime_proto::v1::{self, ColumnDataType};
use store_api::storage::OpType;

use crate::metadata::SemanticType;

/// Returns true if the pb semantic type is valid.
pub(crate) fn is_semantic_type_eq(type_value: i32, semantic_type: SemanticType) -> bool {
type_value == semantic_type as i32
}

/// Returns true if the pb type value is valid.
pub(crate) fn is_column_type_value_eq(type_value: i32, expect_type: &ConcreteDataType) -> bool {
let Some(column_type) = ColumnDataType::from_i32(type_value) else {
return false;
};

is_column_type_eq(column_type, expect_type)
}

/// Convert value into proto's value.
pub(crate) fn to_proto_value(value: Value) -> Option<v1::Value> {
let proto_value = match value {
Value::Null => v1::Value { value: None },
Value::Boolean(v) => v1::Value {
value: Some(v1::value::Value::BoolValue(v)),
},
Value::UInt8(v) => v1::Value {
value: Some(v1::value::Value::U8Value(v.into())),
},
Value::UInt16(v) => v1::Value {
value: Some(v1::value::Value::U16Value(v.into())),
},
Value::UInt32(v) => v1::Value {
value: Some(v1::value::Value::U32Value(v)),
},
Value::UInt64(v) => v1::Value {
value: Some(v1::value::Value::U64Value(v)),
},
Value::Int8(v) => v1::Value {
value: Some(v1::value::Value::I8Value(v.into())),
},
Value::Int16(v) => v1::Value {
value: Some(v1::value::Value::I16Value(v.into())),
},
Value::Int32(v) => v1::Value {
value: Some(v1::value::Value::I32Value(v)),
},
Value::Int64(v) => v1::Value {
value: Some(v1::value::Value::I64Value(v)),
},
Value::Float32(v) => v1::Value {
value: Some(v1::value::Value::F32Value(*v)),
},
Value::Float64(v) => v1::Value {
value: Some(v1::value::Value::F64Value(*v)),
},
Value::String(v) => v1::Value {
value: Some(v1::value::Value::StringValue(v.as_utf8().to_string())),
},
Value::Binary(v) => v1::Value {
value: Some(v1::value::Value::BinaryValue(v.to_vec())),
},
Value::Date(v) => v1::Value {
value: Some(v1::value::Value::DateValue(v.val())),
},
Value::DateTime(v) => v1::Value {
value: Some(v1::value::Value::DatetimeValue(v.val())),
},
Value::Timestamp(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value: Some(v1::value::Value::TsSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value: Some(v1::value::Value::TsMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value: Some(v1::value::Value::TsMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value: Some(v1::value::Value::TsNanosecondValue(v.value())),
},
},
Value::Time(v) => match v.unit() {
TimeUnit::Second => v1::Value {
value: Some(v1::value::Value::TimeSecondValue(v.value())),
},
TimeUnit::Millisecond => v1::Value {
value: Some(v1::value::Value::TimeMillisecondValue(v.value())),
},
TimeUnit::Microsecond => v1::Value {
value: Some(v1::value::Value::TimeMicrosecondValue(v.value())),
},
TimeUnit::Nanosecond => v1::Value {
value: Some(v1::value::Value::TimeNanosecondValue(v.value())),
},
},
Value::Interval(_) | Value::List(_) => return None,
};

Some(proto_value)
}

/// Convert [ConcreteDataType] to [ColumnDataType].
pub(crate) fn to_column_data_type(data_type: &ConcreteDataType) -> Option<ColumnDataType> {
let column_data_type = match data_type {
ConcreteDataType::Boolean(_) => ColumnDataType::Boolean,
ConcreteDataType::Int8(_) => ColumnDataType::Int8,
ConcreteDataType::Int16(_) => ColumnDataType::Int16,
ConcreteDataType::Int32(_) => ColumnDataType::Int32,
ConcreteDataType::Int64(_) => ColumnDataType::Int64,
ConcreteDataType::UInt8(_) => ColumnDataType::Uint8,
ConcreteDataType::UInt16(_) => ColumnDataType::Uint16,
ConcreteDataType::UInt32(_) => ColumnDataType::Uint32,
ConcreteDataType::UInt64(_) => ColumnDataType::Uint64,
ConcreteDataType::Float32(_) => ColumnDataType::Float32,
ConcreteDataType::Float64(_) => ColumnDataType::Float64,
ConcreteDataType::Binary(_) => ColumnDataType::Binary,
ConcreteDataType::String(_) => ColumnDataType::String,
ConcreteDataType::Date(_) => ColumnDataType::Date,
ConcreteDataType::DateTime(_) => ColumnDataType::Datetime,
ConcreteDataType::Timestamp(TimestampType::Second(_)) => ColumnDataType::TimestampSecond,
ConcreteDataType::Timestamp(TimestampType::Millisecond(_)) => {
ColumnDataType::TimestampMillisecond
}
ConcreteDataType::Timestamp(TimestampType::Microsecond(_)) => {
ColumnDataType::TimestampMicrosecond
}
ConcreteDataType::Timestamp(TimestampType::Nanosecond(_)) => {
ColumnDataType::TimestampNanosecond
}
ConcreteDataType::Time(TimeType::Second(_)) => ColumnDataType::TimeSecond,
ConcreteDataType::Time(TimeType::Millisecond(_)) => ColumnDataType::TimeMillisecond,
ConcreteDataType::Time(TimeType::Microsecond(_)) => ColumnDataType::TimeMicrosecond,
ConcreteDataType::Time(TimeType::Nanosecond(_)) => ColumnDataType::TimeNanosecond,
ConcreteDataType::Null(_)
| ConcreteDataType::Interval(_)
| ConcreteDataType::List(_)
| ConcreteDataType::Dictionary(_) => return None,
};

Some(column_data_type)
}

/// Convert semantic type to proto's semantic type
pub(crate) fn to_proto_semantic_type(semantic_type: SemanticType) -> v1::SemanticType {
match semantic_type {
SemanticType::Tag => v1::SemanticType::Tag,
SemanticType::Field => v1::SemanticType::Field,
SemanticType::Timestamp => v1::SemanticType::Timestamp,
}
}

/// Convert op type to proto's op type.
pub(crate) fn to_proto_op_type(op_type: OpType) -> v1::mito::OpType {
match op_type {
OpType::Delete => v1::mito::OpType::Delete,
OpType::Put => v1::mito::OpType::Put,
}
}

/// Returns true if the column type is equal to expected type.
fn is_column_type_eq(column_type: ColumnDataType, expect_type: &ConcreteDataType) -> bool {
if let Some(expect) = to_column_data_type(expect_type) {
column_type == expect
} else {
false
}
}
9 changes: 7 additions & 2 deletions src/mito2/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! Mito region.
pub(crate) mod opener;
mod version;
pub(crate) mod version;

use std::collections::HashMap;
use std::sync::{Arc, RwLock};
Expand All @@ -25,7 +25,7 @@ use store_api::storage::RegionId;

use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::region::version::VersionControlRef;
use crate::region::version::{VersionControlRef, VersionRef};

/// Type to store region version.
pub type VersionNumber = u32;
Expand Down Expand Up @@ -56,6 +56,11 @@ impl MitoRegion {

Ok(())
}

/// Returns current version of the region.
pub(crate) fn version(&self) -> VersionRef {
self.version_control.current()
}
}

/// Regions indexed by ids.
Expand Down
Loading

0 comments on commit 40fad99

Please sign in to comment.