Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mito): Define Version and metadata builders for mito2 #1989

Merged
merged 10 commits into from
Jul 19, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions src/datatypes/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ pub enum Error {

#[snafu(display("Invalid timestamp precision: {}", precision))]
InvalidTimestampPrecision { precision: u64, location: Location },

#[snafu(display("Column {} already exists", column))]
evenyag marked this conversation as resolved.
Show resolved Hide resolved
DuplicateColumn { column: String, location: Location },
}

impl ErrorExt for Error {
Expand Down
29 changes: 25 additions & 4 deletions src/datatypes/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};

use crate::data_type::DataType;
use crate::error::{self, Error, ProjectArrowSchemaSnafu, Result};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
Expand Down Expand Up @@ -211,8 +211,7 @@ impl SchemaBuilder {
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
}

let _ = self
.metadata
self.metadata
.insert(VERSION_KEY.to_string(), self.version.to_string());

let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
Expand Down Expand Up @@ -243,7 +242,14 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
}
let field = Field::try_from(column_schema)?;
fields.push(field);
let _ = name_to_index.insert(column_schema.name.clone(), index);
ensure!(
name_to_index
.insert(column_schema.name.clone(), index)
.is_none(),
DuplicateColumnSnafu {
column: &column_schema.name,
}
);
}

Ok(FieldsAndIndices {
Expand Down Expand Up @@ -382,6 +388,21 @@ mod tests {
assert_eq!(column_schemas, schema.column_schemas());
}

#[test]
fn test_schema_duplicate_column() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
];
let err = Schema::try_new(column_schemas).unwrap_err();

assert!(
matches!(err, Error::DuplicateColumn { .. }),
"expect DuplicateColumn, found {}",
err
);
}

#[test]
fn test_metadata() {
let column_schemas = vec![ColumnSchema::new(
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio.workspace = true
uuid.workspace = true

[dev-dependencies]
common-test-util = { path = "../common/test-util" }
Expand Down
14 changes: 13 additions & 1 deletion src/mito2/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ pub enum Error {
location
))]
InitialMetadata { location: Location },

#[snafu(display("Invalid metadata, {}, location: {}", reason, location))]
InvalidMeta { reason: String, location: Location },

#[snafu(display("Invalid schema, source: {}, location: {}", source, location))]
InvalidSchema {
source: datatypes::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -114,7 +123,10 @@ impl ErrorExt for Error {
CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => {
StatusCode::Unexpected
}
InvalidScanIndex { .. } | InitialMetadata { .. } => StatusCode::InvalidArguments,
InvalidScanIndex { .. }
| InitialMetadata { .. }
| InvalidMeta { .. }
| InvalidSchema { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}
Expand Down
12 changes: 8 additions & 4 deletions src/mito2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,14 @@ pub mod error;
#[allow(unused_variables)]
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
#[allow(dead_code)]
mod region;
#[allow(dead_code)]
pub(crate) mod sst;
#[allow(dead_code)]
mod worker;

#[cfg_attr(doc, aquamarine::aquamarine)]
Expand Down Expand Up @@ -108,7 +112,7 @@ mod worker;
/// class Version {
/// -RegionMetadataRef metadata
/// -MemtableVersionRef memtables
/// -LevelMetasRef ssts
/// -SstVersionRef ssts
/// -SequenceNumber flushed_sequence
/// -ManifestVersion manifest_version
/// }
Expand All @@ -119,7 +123,7 @@ mod worker;
/// +immutable_memtables() &[MemtableRef]
/// +freeze_mutable(MemtableRef new_mutable) MemtableVersion
/// }
/// class LevelMetas {
/// class SstVersion {
/// -LevelMetaVec levels
/// -AccessLayerRef sst_layer
/// -FilePurgerRef file_purger
Expand All @@ -146,8 +150,8 @@ mod worker;
/// VersionControl o-- Version
/// Version o-- RegionMetadata
/// Version o-- MemtableVersion
/// Version o-- LevelMetas
/// LevelMetas o-- LevelMeta
/// Version o-- SstVersion
/// SstVersion o-- LevelMeta
/// LevelMeta o-- FileHandle
/// FileHandle o-- FileMeta
/// class RegionMetadata
Expand Down
53 changes: 29 additions & 24 deletions src/mito2/src/manifest/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,27 +260,32 @@ mod test {
use crate::test_util::TestEnv;

fn basic_region_metadata() -> RegionMetadata {
let builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 251,
});
builder.build()
let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"val",
ConcreteDataType::float64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 251,
});
builder.build().unwrap()
}

#[tokio::test]
Expand Down Expand Up @@ -317,13 +322,13 @@ mod test {
.await
.unwrap();

let new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
let new_metadata_builder = new_metadata_builder.add_column_metadata(ColumnMetadata {
let mut new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = new_metadata_builder.build();
let new_metadata = new_metadata_builder.build().unwrap();

let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
Expand Down
43 changes: 43 additions & 0 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Memtables are write buffers for regions.

pub(crate) mod version;

use std::fmt;
use std::sync::Arc;

use crate::metadata::RegionMetadataRef;

/// Id for memtables.
///
/// Should be unique under the same region.
pub type MemtableId = u32;

/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
fn id(&self) -> MemtableId;
}

pub type MemtableRef = Arc<dyn Memtable>;

/// Builder to build a new [Memtable].
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
/// Builds a new memtable instance.
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef;
}

pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;
40 changes: 40 additions & 0 deletions src/mito2/src/memtable/version.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Memtable version.

use std::sync::Arc;

use crate::memtable::MemtableRef;

/// A version of current memtables in a region.
#[derive(Debug)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableRef>,
}

pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;

impl MemtableVersion {
/// Returns a new [MemtableVersion] with specific mutable memtable.
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
MemtableVersion {
mutable,
immutables: vec![],
}
}
}
Loading