diff --git a/Cargo.lock b/Cargo.lock index 98a6e213a5ff..b784ba6b9602 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5541,6 +5541,7 @@ dependencies = [ "store-api", "table", "tokio", + "uuid", ] [[package]] diff --git a/src/datatypes/src/error.rs b/src/datatypes/src/error.rs index ac29197b4fa1..e5b701e13fce 100644 --- a/src/datatypes/src/error.rs +++ b/src/datatypes/src/error.rs @@ -112,6 +112,9 @@ pub enum Error { #[snafu(display("Invalid timestamp precision: {}", precision))] InvalidTimestampPrecision { precision: u64, location: Location }, + + #[snafu(display("Column {} already exists", column))] + DuplicateColumn { column: String, location: Location }, } impl ErrorExt for Error { diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index b79641b9368a..fd6da64beb29 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -24,7 +24,7 @@ use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::data_type::DataType; -use crate::error::{self, Error, ProjectArrowSchemaSnafu, Result}; +use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY}; pub use crate::schema::constraint::ColumnDefaultConstraint; pub use crate::schema::raw::RawSchema; @@ -211,8 +211,7 @@ impl SchemaBuilder { validate_timestamp_index(&self.column_schemas, timestamp_index)?; } - let _ = self - .metadata + self.metadata .insert(VERSION_KEY.to_string(), self.version.to_string()); let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata); @@ -243,7 +242,14 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result { } let field = Field::try_from(column_schema)?; fields.push(field); - let _ = name_to_index.insert(column_schema.name.clone(), index); + ensure!( + name_to_index + .insert(column_schema.name.clone(), index) + .is_none(), + DuplicateColumnSnafu { + column: &column_schema.name, + } + ); } Ok(FieldsAndIndices { @@ -382,6 +388,21 @@ mod tests { assert_eq!(column_schemas, schema.column_schemas()); } + #[test] + fn test_schema_duplicate_column() { + let column_schemas = vec![ + ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true), + ]; + let err = Schema::try_new(column_schemas).unwrap_err(); + + assert!( + matches!(err, Error::DuplicateColumn { .. }), + "expect DuplicateColumn, found {}", + err + ); + } + #[test] fn test_metadata() { let column_schemas = vec![ColumnSchema::new( diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 30fe7cf92070..86e181ab1198 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -44,6 +44,7 @@ storage = { path = "../storage" } store-api = { path = "../store-api" } table = { path = "../table" } tokio.workspace = true +uuid.workspace = true [dev-dependencies] common-test-util = { path = "../common/test-util" } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 51f2eae93a5a..538d9ad2c9cf 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -101,6 +101,15 @@ pub enum Error { location ))] InitialMetadata { location: Location }, + + #[snafu(display("Invalid metadata, {}, location: {}", reason, location))] + InvalidMeta { reason: String, location: Location }, + + #[snafu(display("Invalid schema, source: {}, location: {}", source, location))] + InvalidSchema { + source: datatypes::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -114,7 +123,10 @@ impl ErrorExt for Error { CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => { StatusCode::Unexpected } - InvalidScanIndex { .. } | InitialMetadata { .. } => StatusCode::InvalidArguments, + InvalidScanIndex { .. } + | InitialMetadata { .. } + | InvalidMeta { .. } + | InvalidSchema { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index dae66d9416cc..a11077c59c70 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -28,10 +28,14 @@ pub mod error; #[allow(unused_variables)] pub mod manifest; #[allow(dead_code)] +pub mod memtable; +#[allow(dead_code)] pub mod metadata; #[allow(dead_code)] mod region; #[allow(dead_code)] +pub(crate) mod sst; +#[allow(dead_code)] mod worker; #[cfg_attr(doc, aquamarine::aquamarine)] @@ -108,7 +112,7 @@ mod worker; /// class Version { /// -RegionMetadataRef metadata /// -MemtableVersionRef memtables -/// -LevelMetasRef ssts +/// -SstVersionRef ssts /// -SequenceNumber flushed_sequence /// -ManifestVersion manifest_version /// } @@ -119,7 +123,7 @@ mod worker; /// +immutable_memtables() &[MemtableRef] /// +freeze_mutable(MemtableRef new_mutable) MemtableVersion /// } -/// class LevelMetas { +/// class SstVersion { /// -LevelMetaVec levels /// -AccessLayerRef sst_layer /// -FilePurgerRef file_purger @@ -146,8 +150,8 @@ mod worker; /// VersionControl o-- Version /// Version o-- RegionMetadata /// Version o-- MemtableVersion -/// Version o-- LevelMetas -/// LevelMetas o-- LevelMeta +/// Version o-- SstVersion +/// SstVersion o-- LevelMeta /// LevelMeta o-- FileHandle /// FileHandle o-- FileMeta /// class RegionMetadata diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 61c0df8664d2..e1de79acb48a 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -260,27 +260,32 @@ mod test { use crate::test_util::TestEnv; fn basic_region_metadata() -> RegionMetadata { - let builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "ts", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 45, - }); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false), - semantic_type: SemanticType::Tag, - column_id: 36, - }); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false), - semantic_type: SemanticType::Field, - column_id: 251, - }); - builder.build() + let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 45, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 36, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "val", + ConcreteDataType::float64_datatype(), + false, + ), + semantic_type: SemanticType::Field, + column_id: 251, + }); + builder.build().unwrap() } #[tokio::test] @@ -317,13 +322,13 @@ mod test { .await .unwrap(); - let new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1); - let new_metadata_builder = new_metadata_builder.add_column_metadata(ColumnMetadata { + let mut new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1); + new_metadata_builder.push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false), semantic_type: SemanticType::Field, column_id: 252, }); - let new_metadata = new_metadata_builder.build(); + let new_metadata = new_metadata_builder.build().unwrap(); let mut action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs new file mode 100644 index 000000000000..afee0bd52080 --- /dev/null +++ b/src/mito2/src/memtable.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtables are write buffers for regions. + +pub(crate) mod version; + +use std::fmt; +use std::sync::Arc; + +use crate::metadata::RegionMetadataRef; + +/// Id for memtables. +/// +/// Should be unique under the same region. +pub type MemtableId = u32; + +/// In memory write buffer. +pub trait Memtable: Send + Sync + fmt::Debug { + /// Returns the id of this memtable. + fn id(&self) -> MemtableId; +} + +pub type MemtableRef = Arc; + +/// Builder to build a new [Memtable]. +pub trait MemtableBuilder: Send + Sync + fmt::Debug { + /// Builds a new memtable instance. + fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef; +} + +pub type MemtableBuilderRef = Arc; diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs new file mode 100644 index 000000000000..0fdc6d07c674 --- /dev/null +++ b/src/mito2/src/memtable/version.rs @@ -0,0 +1,40 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Memtable version. + +use std::sync::Arc; + +use crate::memtable::MemtableRef; + +/// A version of current memtables in a region. +#[derive(Debug)] +pub(crate) struct MemtableVersion { + /// Mutable memtable. + mutable: MemtableRef, + /// Immutable memtables. + immutables: Vec, +} + +pub(crate) type MemtableVersionRef = Arc; + +impl MemtableVersion { + /// Returns a new [MemtableVersion] with specific mutable memtable. + pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion { + MemtableVersion { + mutable, + immutables: vec![], + } + } +} diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 627659b79d18..84e196bc105c 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -14,12 +14,17 @@ //! Metadata of mito regions. +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use datatypes::prelude::DataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::{ColumnId, RegionId}; +use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result}; use crate::region::VersionNumber; #[cfg_attr(doc, aquamarine::aquamarine)] @@ -49,9 +54,16 @@ use crate::region::VersionNumber; /// ``` #[derive(Debug, Clone, PartialEq, Eq, Serialize)] pub struct RegionMetadata { - /// Latest schema of this region + /// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas). #[serde(skip)] pub schema: SchemaRef, + /// Id of the time index column. + #[serde(skip)] + time_index: ColumnId, + /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas). + #[serde(skip)] + id_to_index: HashMap, + /// Columns in the region. Has the same order as columns /// in [schema](RegionMetadata::schema). pub column_metadatas: Vec, @@ -67,7 +79,7 @@ pub struct RegionMetadata { pub type RegionMetadataRef = Arc; impl<'de> Deserialize<'de> for RegionMetadata { - fn deserialize(deserializer: D) -> Result + fn deserialize(deserializer: D) -> std::result::Result where D: Deserializer<'de>, { @@ -80,49 +92,163 @@ impl<'de> Deserialize<'de> for RegionMetadata { region_id: RegionId, } - let region_metadata_without_schema = - RegionMetadataWithoutSchema::deserialize(deserializer)?; + let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?; + let skipped = + SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?; - let column_schemas = region_metadata_without_schema - .column_metadatas + Ok(Self { + schema: skipped.schema, + time_index: skipped.time_index, + id_to_index: skipped.id_to_index, + column_metadatas: without_schema.column_metadatas, + version: without_schema.version, + primary_key: without_schema.primary_key, + region_id: without_schema.region_id, + }) + } +} + +/// Fields skipped in serialization. +struct SkippedFields { + /// Last schema. + schema: SchemaRef, + /// Id of the time index column. + time_index: ColumnId, + /// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas). + id_to_index: HashMap, +} + +impl SkippedFields { + /// Constructs skipped fields from `column_metadatas`. + fn new(column_metadatas: &[ColumnMetadata]) -> Result { + let column_schemas = column_metadatas .iter() .map(|column_metadata| column_metadata.column_schema.clone()) .collect(); - let schema = Arc::new(Schema::new(column_schemas)); + let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?); + let time_index = column_metadatas + .iter() + .find_map(|col| { + if col.semantic_type == SemanticType::Timestamp { + Some(col.column_id) + } else { + None + } + }) + .context(InvalidMetaSnafu { + reason: "time index not found", + })?; + let id_to_index = column_metadatas + .iter() + .enumerate() + .map(|(idx, col)| (col.column_id, idx)) + .collect(); - Ok(Self { + Ok(SkippedFields { schema, - column_metadatas: region_metadata_without_schema.column_metadatas, - version: region_metadata_without_schema.version, - primary_key: region_metadata_without_schema.primary_key, - region_id: region_metadata_without_schema.region_id, + time_index, + id_to_index, }) } } +impl RegionMetadata { + /// Checks whether the metadata is valid. + fn validate(&self) -> Result<()> { + // Id to name. + let mut id_names = HashMap::with_capacity(self.column_metadatas.len()); + for col in &self.column_metadatas { + // Validate each column. + col.validate()?; + + // Check whether column id is duplicated. We already check column name + // is unique in `Schema` so we only check column id here. + ensure!( + !id_names.contains_key(&col.column_id), + InvalidMetaSnafu { + reason: format!( + "column {} and {} have the same column id", + id_names[&col.column_id], col.column_schema.name + ), + } + ); + id_names.insert(col.column_id, &col.column_schema.name); + } + + // Checks there is only one time index. + let num_time_index = self + .column_metadatas + .iter() + .filter(|col| col.semantic_type == SemanticType::Timestamp) + .count(); + ensure!( + num_time_index == 1, + InvalidMetaSnafu { + reason: format!("expect only one time index, found {}", num_time_index), + } + ); + + if !self.primary_key.is_empty() { + let mut pk_ids = HashSet::with_capacity(self.primary_key.len()); + // Checks column ids in the primary key is valid. + for column_id in &self.primary_key { + // Checks whether the column id exists. + ensure!( + id_names.contains_key(column_id), + InvalidMetaSnafu { + reason: format!("unknown column id {}", column_id), + } + ); + + // Checks duplicate. + ensure!( + !pk_ids.contains(&column_id), + InvalidMetaSnafu { + reason: format!("duplicate column {} in primary key", id_names[column_id]), + } + ); + + // Checks this is not a time index column. + ensure!( + *column_id != self.time_index, + InvalidMetaSnafu { + reason: format!( + "column {} is already a time index column", + id_names[column_id] + ), + } + ); + + pk_ids.insert(column_id); + } + } + + Ok(()) + } +} + +/// Builder to build [RegionMetadata]. pub struct RegionMetadataBuilder { - schema: SchemaRef, - column_metadatas: Vec, + region_id: RegionId, version: VersionNumber, + column_metadatas: Vec, primary_key: Vec, - region_id: RegionId, } impl RegionMetadataBuilder { + /// Returns a new builder. pub fn new(id: RegionId, version: VersionNumber) -> Self { Self { - schema: Arc::new(Schema::new(vec![])), - column_metadatas: vec![], + region_id: id, version, + column_metadatas: vec![], primary_key: vec![], - region_id: id, } } /// Create a builder from existing [RegionMetadata]. pub fn from_existing(existing: RegionMetadata, new_version: VersionNumber) -> Self { Self { - schema: existing.schema, column_metadatas: existing.column_metadatas, version: new_version, primary_key: existing.primary_key, @@ -130,30 +256,35 @@ impl RegionMetadataBuilder { } } - /// Add a column metadata to this region metadata. - /// This method will check the semantic type and add it to primary keys automatically. - pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self { - if column_metadata.semantic_type == SemanticType::Tag { - self.primary_key.push(column_metadata.column_id); - } + /// Push a new column metadata to this region's metadata. + pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self { self.column_metadatas.push(column_metadata); self } - pub fn build(self) -> RegionMetadata { - let schema = Arc::new(Schema::new( - self.column_metadatas - .iter() - .map(|column_metadata| column_metadata.column_schema.clone()) - .collect(), - )); - RegionMetadata { - schema, + /// Set the primary key of the region. + pub fn primary_key(&mut self, key: Vec) -> &mut Self { + self.primary_key = key; + self + } + + /// Consume the builder and build a [RegionMetadata]. + pub fn build(self) -> Result { + let skipped = SkippedFields::new(&self.column_metadatas)?; + + let meta = RegionMetadata { + schema: skipped.schema, + time_index: skipped.time_index, + id_to_index: skipped.id_to_index, column_metadatas: self.column_metadatas, version: self.version, primary_key: self.primary_key, region_id: self.region_id, - } + }; + + meta.validate()?; + + Ok(meta) } } @@ -168,6 +299,22 @@ pub struct ColumnMetadata { pub column_id: ColumnId, } +impl ColumnMetadata { + /// Checks whether it is a valid column. + pub fn validate(&self) -> Result<()> { + if self.semantic_type == SemanticType::Timestamp { + ensure!( + self.column_schema.data_type.is_timestamp_compatible(), + InvalidMetaSnafu { + reason: format!("{} is not timestamp compatible", self.column_schema.name), + } + ); + } + + Ok(()) + } +} + /// The semantic type of one column #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub enum SemanticType { @@ -185,28 +332,34 @@ mod test { use super::*; + fn create_builder() -> RegionMetadataBuilder { + RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9) + } + fn build_test_region_metadata() -> RegionMetadata { - let builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), - semantic_type: SemanticType::Tag, - column_id: 1, - }); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), - semantic_type: SemanticType::Field, - column_id: 2, - }); - let builder = builder.add_column_metadata(ColumnMetadata { - column_schema: ColumnSchema::new( - "c", - ConcreteDataType::timestamp_millisecond_datatype(), - false, - ), - semantic_type: SemanticType::Timestamp, - column_id: 3, - }); - builder.build() + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }) + .primary_key(vec![1]); + builder.build().unwrap() } #[test] @@ -216,4 +369,170 @@ mod test { let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap(); assert_eq!(region_metadata, deserialized); } + + #[test] + fn test_column_metadata_validate() { + let mut builder = create_builder(); + let col = ColumnMetadata { + column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }; + col.validate().unwrap_err(); + + builder.push_column_metadata(col); + let err = builder.build().unwrap_err(); + assert!( + err.to_string().contains("ts is not timestamp compatible"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_empty_region_metadata() { + let builder = create_builder(); + let err = builder.build().unwrap_err(); + // A region must have a time index. + assert!( + err.to_string().contains("time index not found"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_same_column_id() { + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }); + let err = builder.build().unwrap_err(); + assert!( + err.to_string() + .contains("column a and b have the same column id"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_duplicate_time_index() { + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "a", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }); + let err = builder.build().unwrap_err(); + assert!( + err.to_string().contains("expect only one time index"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_unknown_primary_key() { + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![3]); + let err = builder.build().unwrap_err(); + assert!( + err.to_string().contains("unknown column id 3"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_same_primary_key() { + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "b", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 2, + }) + .primary_key(vec![1, 1]); + let err = builder.build().unwrap_err(); + assert!( + err.to_string() + .contains("duplicate column a in primary key"), + "unexpected err: {}", + err + ); + } + + #[test] + fn test_in_time_index() { + let mut builder = create_builder(); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 1, + }) + .primary_key(vec![1]); + let err = builder.build().unwrap_err(); + assert!( + err.to_string() + .contains("column ts is already a time index column"), + "unexpected err: {}", + err + ); + } } diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index d3b034d23a01..c08a6c061b24 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -21,7 +21,11 @@ use std::sync::{Arc, RwLock}; use store_api::storage::RegionId; -use crate::region::version::VersionControlRef; +use crate::memtable::MemtableBuilderRef; +use crate::metadata::RegionMetadataRef; +use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; + +/// Type to store region version. pub type VersionNumber = u32; /// Metadata and runtime status of a region. @@ -39,3 +43,32 @@ pub(crate) struct RegionMap { } pub(crate) type RegionMapRef = Arc; + +/// [MitoRegion] builder. +pub(crate) struct RegionBuilder { + metadata: RegionMetadataRef, + memtable_builder: MemtableBuilderRef, +} + +impl RegionBuilder { + /// Returns a new builder. + pub(crate) fn new( + metadata: RegionMetadataRef, + memtable_builder: MemtableBuilderRef, + ) -> RegionBuilder { + RegionBuilder { + metadata, + memtable_builder, + } + } + + /// Builds a new region. + pub(crate) fn build(self) -> MitoRegion { + let mutable = self.memtable_builder.build(&self.metadata); + + let version = VersionBuilder::new(self.metadata, mutable).build(); + let version_control = Arc::new(VersionControl::new(version)); + + MitoRegion { version_control } + } +} diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index d3205432ec4e..aa4960a88cd8 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -25,8 +25,74 @@ use std::sync::Arc; +use arc_swap::ArcSwap; +use store_api::manifest::ManifestVersion; +use store_api::storage::SequenceNumber; + +use crate::memtable::version::{MemtableVersion, MemtableVersionRef}; +use crate::memtable::MemtableRef; +use crate::metadata::RegionMetadataRef; +use crate::sst::version::{SstVersion, SstVersionRef}; + /// Controls version of in memory metadata for a region. #[derive(Debug)] -pub(crate) struct VersionControl {} +pub(crate) struct VersionControl { + /// Latest version. + version: ArcSwap, +} + +impl VersionControl { + /// Returns a new [VersionControl] with specific `version`. + pub(crate) fn new(version: Version) -> VersionControl { + VersionControl { + version: ArcSwap::new(Arc::new(version)), + } + } +} pub(crate) type VersionControlRef = Arc; + +/// Static metadata of a region. +#[derive(Clone, Debug)] +pub(crate) struct Version { + /// Metadata of the region. + /// + /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing + /// metadata and reuse metadata when creating a new `Version`. + metadata: RegionMetadataRef, + /// Mutable and immutable memtables. + /// + /// Wrapped in Arc to make clone of `Version` much cheaper. + memtables: MemtableVersionRef, + /// SSTs of the region. + ssts: SstVersionRef, + /// Inclusive max sequence of flushed data. + flushed_sequence: SequenceNumber, + /// Current version of region manifest. + manifest_version: ManifestVersion, +} + +/// Version builder. +pub(crate) struct VersionBuilder { + metadata: RegionMetadataRef, + /// Mutable memtable. + mutable: MemtableRef, +} + +impl VersionBuilder { + /// Returns a new builder. + pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> VersionBuilder { + VersionBuilder { metadata, mutable } + } + + /// Builds a new [Version] from the builder. + pub(crate) fn build(self) -> Version { + Version { + metadata: self.metadata, + memtables: Arc::new(MemtableVersion::new(self.mutable)), + ssts: Arc::new(SstVersion::new()), + flushed_sequence: 0, + manifest_version: 0, + } + } +} diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs new file mode 100644 index 000000000000..e0527b787c71 --- /dev/null +++ b/src/mito2/src/sst.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Sorted strings tables. + +pub mod file; +pub(crate) mod version; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs new file mode 100644 index 000000000000..43655c1a9a49 --- /dev/null +++ b/src/mito2/src/sst/file.rs @@ -0,0 +1,185 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Structures to describe metadata of files. + +use std::fmt; +use std::str::FromStr; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; + +use common_time::Timestamp; +use serde::{Deserialize, Serialize}; +use snafu::{ResultExt, Snafu}; +use store_api::storage::RegionId; +use uuid::Uuid; + +/// Type to store SST level. +pub type Level = u8; +/// Maximum level of SSTs. +pub const MAX_LEVEL: Level = 2; + +#[derive(Debug, Snafu, PartialEq)] +pub struct ParseIdError { + source: uuid::Error, +} + +/// Unique id for [SST File]. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +pub struct FileId(Uuid); + +impl FileId { + /// Returns a new unique [FileId] randomly. + pub fn random() -> FileId { + FileId(Uuid::new_v4()) + } + + /// Parses id from string. + pub fn parse_str(input: &str) -> std::result::Result { + Uuid::parse_str(input).map(FileId).context(ParseIdSnafu) + } + + /// Append `.parquet` to file id to make a complete file name + pub fn as_parquet(&self) -> String { + format!("{}{}", self.0.hyphenated(), ".parquet") + } +} + +impl fmt::Display for FileId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl FromStr for FileId { + type Err = ParseIdError; + + fn from_str(s: &str) -> std::result::Result { + FileId::parse_str(s) + } +} + +/// Time range of a SST file. +pub type FileTimeRange = (Timestamp, Timestamp); + +/// Metadata of a SST file. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)] +#[serde(default)] +pub struct FileMeta { + /// Region of file. + pub region_id: RegionId, + /// Compared to normal file names, FileId ignore the extension + pub file_id: FileId, + /// Timestamp range of file. + pub time_range: FileTimeRange, + /// SST level of the file. + pub level: Level, + /// Size of the file. + pub file_size: u64, +} + +/// Handle to a SST file. +#[derive(Clone)] +pub struct FileHandle { + inner: Arc, +} + +impl fmt::Debug for FileHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FileHandle") + .field("file_id", &self.inner.meta.file_id) + .field("region_id", &self.inner.meta.region_id) + .field("time_range", &self.inner.meta.time_range) + .field("size", &self.inner.meta.file_size) + .field("level", &self.inner.meta.level) + .field("compacting", &self.inner.compacting) + .field("deleted", &self.inner.deleted) + .finish() + } +} + +/// Inner data of [FileHandle]. +/// +/// Contains meta of the file, and other mutable info like whether the file is compacting. +struct FileHandleInner { + meta: FileMeta, + compacting: AtomicBool, + deleted: AtomicBool, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_file_id() { + let id = FileId::random(); + let uuid_str = id.to_string(); + assert_eq!(id.0.to_string(), uuid_str); + + let parsed = FileId::parse_str(&uuid_str).unwrap(); + assert_eq!(id, parsed); + let parsed = uuid_str.parse().unwrap(); + assert_eq!(id, parsed); + } + + #[test] + fn test_file_id_serialization() { + let id = FileId::random(); + let json = serde_json::to_string(&id).unwrap(); + assert_eq!(format!("\"{id}\""), json); + + let parsed = serde_json::from_str(&json).unwrap(); + assert_eq!(id, parsed); + } + + #[test] + fn test_file_id_as_parquet() { + let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap(); + assert_eq!( + "67e55044-10b1-426f-9247-bb680e5fe0c8.parquet", + id.as_parquet() + ); + } + + fn create_file_meta(file_id: FileId, level: Level) -> FileMeta { + FileMeta { + region_id: 0.into(), + file_id, + time_range: FileTimeRange::default(), + level, + file_size: 0, + } + } + + #[test] + fn test_deserialize_file_meta() { + let file_meta = create_file_meta(FileId::random(), 0); + let serialized_file_meta = serde_json::to_string(&file_meta).unwrap(); + let deserialized_file_meta = serde_json::from_str(&serialized_file_meta); + assert_eq!(file_meta, deserialized_file_meta.unwrap()); + } + + #[test] + fn test_deserialize_from_string() { + let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\ + \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\"level\":0}"; + let file_meta = create_file_meta( + FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(), + 0, + ); + let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap(); + assert_eq!(file_meta, deserialized_file_meta); + } +} diff --git a/src/mito2/src/sst/version.rs b/src/mito2/src/sst/version.rs new file mode 100644 index 000000000000..9c309a4b36ff --- /dev/null +++ b/src/mito2/src/sst/version.rs @@ -0,0 +1,77 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! SST version. +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; + +use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL}; + +/// A version of all SSTs in a region. +#[derive(Debug)] +pub(crate) struct SstVersion { + /// SST metadata organized by levels. + levels: LevelMetaVec, +} + +pub(crate) type SstVersionRef = Arc; + +impl SstVersion { + /// Returns a new [SstVersion]. + pub(crate) fn new() -> SstVersion { + SstVersion { + levels: new_level_meta_vec(), + } + } +} + +// We only has fixed number of level, so we use array to hold elements. This implementation +// detail of LevelMetaVec should not be exposed to the user of [LevelMetas]. +type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize]; + +/// Metadata of files in the same SST level. +pub struct LevelMeta { + /// Level number. + level: Level, + /// Handles of SSTs in this level. + files: HashMap, +} + +impl LevelMeta { + /// Returns an empty meta of specific `level`. + pub(crate) fn new(level: Level) -> LevelMeta { + LevelMeta { + level, + files: HashMap::new(), + } + } +} + +impl fmt::Debug for LevelMeta { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("LevelMeta") + .field("level", &self.level) + .field("files", &self.files.keys()) + .finish() + } +} + +fn new_level_meta_vec() -> LevelMetaVec { + (0u8..MAX_LEVEL) + .map(LevelMeta::new) + .collect::>() + .try_into() + .unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 4c7b62efaafe..3fe3e02d990b 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -14,6 +14,7 @@ //! Utilities for testing. +use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use common_datasource::compression::CompressionType; @@ -29,7 +30,8 @@ use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; use crate::manifest::options::RegionManifestOptions; -use crate::metadata::RegionMetadata; +use crate::memtable::{Memtable, MemtableBuilder, MemtableId, MemtableRef}; +use crate::metadata::{RegionMetadata, RegionMetadataRef}; use crate::worker::WorkerGroup; /// Env to test mito engine. @@ -97,3 +99,37 @@ impl TestEnv { RegionManifestManager::new(manifest_opts).await } } + +/// Memtable that only for testing metadata. +#[derive(Debug, Default)] +pub struct MetaOnlyMemtable { + /// Id of this memtable. + id: MemtableId, +} + +impl MetaOnlyMemtable { + /// Returns a new memtable with specific `id`. + pub fn new(id: MemtableId) -> MetaOnlyMemtable { + MetaOnlyMemtable { id } + } +} + +impl Memtable for MetaOnlyMemtable { + fn id(&self) -> MemtableId { + self.id + } +} + +#[derive(Debug, Default)] +pub struct MetaOnlyBuilder { + /// Next memtable id. + next_id: AtomicU32, +} + +impl MemtableBuilder for MetaOnlyBuilder { + fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef { + Arc::new(MetaOnlyMemtable::new( + self.next_id.fetch_add(1, Ordering::Relaxed), + )) + } +}