diff --git a/src/store-api/src/error.rs b/src/store-api/src/error.rs deleted file mode 100644 index 6139232dd57c..000000000000 --- a/src/store-api/src/error.rs +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; -use common_macro::stack_trace_debug; -use snafu::{Location, Snafu}; - -use crate::storage::ColumnDescriptorBuilderError; - -#[derive(Snafu)] -#[snafu(visibility(pub))] -#[stack_trace_debug] -pub enum Error { - #[snafu(display("Invalid raw region request: {err}"))] - InvalidRawRegionRequest { err: String, location: Location }, - - #[snafu(display("Invalid default constraint: {constraint}"))] - InvalidDefaultConstraint { - constraint: String, - source: datatypes::error::Error, - location: Location, - }, - - #[snafu(display("Failed to build column descriptor: "))] - BuildColumnDescriptor { - #[snafu(source)] - error: ColumnDescriptorBuilderError, - location: Location, - }, -} - -impl ErrorExt for Error { - fn status_code(&self) -> StatusCode { - match self { - Error::InvalidRawRegionRequest { .. } => StatusCode::InvalidArguments, - Error::InvalidDefaultConstraint { source, .. } => source.status_code(), - Error::BuildColumnDescriptor { .. } => StatusCode::Internal, - } - } - - fn as_any(&self) -> &dyn std::any::Any { - self - } -} diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 46b165ba2a52..d70379459821 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -16,7 +16,6 @@ //! Storage related APIs pub mod data_source; -mod error; pub mod logstore; pub mod manifest; pub mod metadata; diff --git a/src/store-api/src/storage.rs b/src/store-api/src/storage.rs index da59e9aeaa9d..fcbf4ef09cda 100644 --- a/src/store-api/src/storage.rs +++ b/src/store-api/src/storage.rs @@ -14,15 +14,9 @@ //! Storage APIs. -mod chunk; pub mod consts; mod descriptors; -mod engine; -mod metadata; -mod region; mod requests; -mod responses; -mod snapshot; mod types; pub use datatypes::data_type::ConcreteDataType; @@ -30,19 +24,6 @@ pub use datatypes::schema::{ ColumnDefaultConstraint, ColumnSchema, Schema, SchemaBuilder, SchemaRef, }; -pub use self::chunk::{Chunk, ChunkReader}; pub use self::descriptors::*; -pub use self::engine::{ - CloseOptions, CompactionStrategy, CreateOptions, EngineContext, OpenOptions, StorageEngine, - TwcsOptions, -}; -pub use self::metadata::RegionMeta; -pub use self::region::{ - CloseContext, CompactContext, FlushContext, FlushReason, Region, RegionStat, WriteContext, -}; -pub use self::requests::{ - AddColumn, AlterOperation, AlterRequest, GetRequest, ScanRequest, WriteRequest, -}; -pub use self::responses::{GetResponse, ScanResponse, WriteResponse}; -pub use self::snapshot::{ReadContext, Snapshot}; -pub use self::types::{SequenceNumber, MIN_OP_TYPE}; +pub use self::requests::ScanRequest; +pub use self::types::SequenceNumber; diff --git a/src/store-api/src/storage/chunk.rs b/src/store-api/src/storage/chunk.rs deleted file mode 100644 index b1db1bb6538f..000000000000 --- a/src/store-api/src/storage/chunk.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::ErrorExt; -use common_recordbatch::OrderOption; -use datatypes::vectors::VectorRef; - -use crate::storage::SchemaRef; - -/// A bunch of rows in columnar format. -#[derive(Debug)] -pub struct Chunk { - pub columns: Vec, - // TODO(yingwen): Sequences. -} - -impl Chunk { - pub fn new(columns: Vec) -> Chunk { - Chunk { columns } - } -} - -/// `ChunkReader` is similar to async iterator of [Chunk]. -#[async_trait] -pub trait ChunkReader: Send { - type Error: ErrorExt + Send + Sync; - - /// Schema of the chunks returned by this reader. - /// This schema does not contain internal columns. - fn user_schema(&self) -> &SchemaRef; - - /// Fetch next chunk from the reader. - async fn next_chunk(&mut self) -> Result, Self::Error>; - - // project the chunk according to required projection. - fn project_chunk(&self, chunk: Chunk) -> Chunk; - - fn output_ordering(&self) -> Option> { - None - } -} diff --git a/src/store-api/src/storage/consts.rs b/src/store-api/src/storage/consts.rs index 8236fa3435bd..b4660249e5a8 100644 --- a/src/store-api/src/storage/consts.rs +++ b/src/store-api/src/storage/consts.rs @@ -14,19 +14,7 @@ //! Constants. -use crate::storage::descriptors::{ColumnFamilyId, ColumnId}; - -// ---------- Reserved column family ids --------------------------------------- - -/// Column family Id for row key columns. -/// -/// This is a virtual column family, actually row key columns are not -/// stored in any column family. -pub const KEY_CF_ID: ColumnFamilyId = 0; -/// Id for default column family. -pub const DEFAULT_CF_ID: ColumnFamilyId = 1; - -// ----------------------------------------------------------------------------- +use crate::storage::descriptors::ColumnId; // ---------- Reserved column ids ---------------------------------------------- @@ -93,9 +81,6 @@ impl ReservedColumnId { // ---------- Names reserved for internal columns and engine ------------------- -/// Names for default column family. -pub const DEFAULT_CF_NAME: &str = "default"; - /// Name for reserved column: sequence pub const SEQUENCE_COLUMN_NAME: &str = "__sequence"; @@ -118,14 +103,6 @@ pub fn is_internal_column(name: &str) -> bool { // ----------------------------------------------------------------------------- -// ---------- Default options -------------------------------------------------- - -pub const READ_BATCH_SIZE: usize = 256; - -pub const WRITE_ROW_GROUP_SIZE: usize = 4096; - -// ----------------------------------------------------------------------------- - #[cfg(test)] mod tests { use super::*; diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index f0f4ef0f971b..9a4291acef97 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -17,12 +17,10 @@ use std::fmt; use derive_builder::Builder; use serde::{Deserialize, Serialize}; -use crate::storage::{consts, ColumnDefaultConstraint, ColumnSchema, ConcreteDataType}; +use crate::storage::{ColumnDefaultConstraint, ColumnSchema, ConcreteDataType}; /// Id of column. Unique in each region. pub type ColumnId = u32; -/// Id of column family. Unique in each region. -pub type ColumnFamilyId = u32; /// Group number of one region. Unique in each region. pub type RegionGroup = u8; /// Sequence number of region inside one table. Unique in each table. @@ -224,68 +222,6 @@ impl ColumnDescriptorBuilder { } } -/// A [RowKeyDescriptor] contains information about row key. -#[derive(Debug, Clone, PartialEq, Eq, Builder)] -#[builder(pattern = "owned")] -pub struct RowKeyDescriptor { - #[builder(default, setter(each(name = "push_column")))] - pub columns: Vec, - /// Timestamp key column. - pub timestamp: ColumnDescriptor, -} - -/// A [ColumnFamilyDescriptor] contains information to create a column family. -#[derive(Debug, Clone, PartialEq, Eq, Builder)] -#[builder(pattern = "owned")] -pub struct ColumnFamilyDescriptor { - #[builder(default = "consts::DEFAULT_CF_ID")] - pub cf_id: ColumnFamilyId, - #[builder(default = "consts::DEFAULT_CF_NAME.to_string()", setter(into))] - pub name: String, - /// Descriptors of columns in this column family. - #[builder(default, setter(each(name = "push_column")))] - pub columns: Vec, -} - -/// A [RegionDescriptor] contains information to create a region. -#[derive(Debug, Clone, PartialEq, Eq, Builder)] -#[builder(pattern = "owned")] -pub struct RegionDescriptor { - #[builder(setter(into))] - pub id: RegionId, - /// Region name. - #[builder(setter(into))] - pub name: String, - /// Row key descriptor of this region. - pub row_key: RowKeyDescriptor, - /// Default column family. - pub default_cf: ColumnFamilyDescriptor, - /// Extra column families defined by user. - #[builder(default, setter(each(name = "push_extra_column_family")))] - pub extra_cfs: Vec, -} - -impl RowKeyDescriptorBuilder { - pub fn new(timestamp: ColumnDescriptor) -> Self { - Self { - timestamp: Some(timestamp), - ..Default::default() - } - } - - pub fn columns_capacity(mut self, capacity: usize) -> Self { - self.columns = Some(Vec::with_capacity(capacity)); - self - } -} - -impl ColumnFamilyDescriptorBuilder { - pub fn columns_capacity(mut self, capacity: usize) -> Self { - self.columns = Some(Vec::with_capacity(capacity)); - self - } -} - #[cfg(test)] mod tests { use datatypes::value::Value; @@ -360,71 +296,6 @@ mod tests { assert_eq!(expected, column_schema); } - fn new_timestamp_desc() -> ColumnDescriptor { - ColumnDescriptorBuilder::new(5, "timestamp", ConcreteDataType::int64_datatype()) - .is_time_index(true) - .build() - .unwrap() - } - - #[test] - fn test_row_key_descriptor_builder() { - let timestamp = new_timestamp_desc(); - - let desc = RowKeyDescriptorBuilder::new(timestamp.clone()) - .build() - .unwrap(); - assert!(desc.columns.is_empty()); - - let desc = RowKeyDescriptorBuilder::new(timestamp.clone()) - .columns_capacity(1) - .push_column( - ColumnDescriptorBuilder::new(6, "c1", ConcreteDataType::int32_datatype()) - .build() - .unwrap(), - ) - .push_column( - ColumnDescriptorBuilder::new(7, "c2", ConcreteDataType::int32_datatype()) - .build() - .unwrap(), - ) - .build() - .unwrap(); - assert_eq!(2, desc.columns.len()); - - let desc = RowKeyDescriptorBuilder::new(timestamp).build().unwrap(); - assert!(desc.columns.is_empty()); - } - - #[test] - fn test_cf_descriptor_builder() { - let desc = ColumnFamilyDescriptorBuilder::default().build().unwrap(); - assert_eq!(consts::DEFAULT_CF_ID, desc.cf_id); - assert_eq!(consts::DEFAULT_CF_NAME, desc.name); - assert!(desc.columns.is_empty()); - - let desc = ColumnFamilyDescriptorBuilder::default() - .cf_id(32) - .name("cf1") - .build() - .unwrap(); - assert_eq!(32, desc.cf_id); - assert_eq!("cf1", desc.name); - - let desc = ColumnFamilyDescriptorBuilder::default() - .push_column( - ColumnDescriptorBuilder::default() - .id(6) - .name("c1") - .data_type(ConcreteDataType::int32_datatype()) - .build() - .unwrap(), - ) - .build() - .unwrap(); - assert_eq!(1, desc.columns.len()); - } - #[test] fn test_region_id() { assert_eq!(RegionId::new(0, 1), 1); diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs deleted file mode 100644 index 8f72db19f1a1..000000000000 --- a/src/store-api/src/storage/engine.rs +++ /dev/null @@ -1,194 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Storage Engine traits. -//! -//! [`StorageEngine`] is the abstraction over a multi-regions, schematized data storage system, -//! a [`StorageEngine`] instance manages a bunch of storage unit called [`Region`], which holds -//! chunks of rows, support operations like PUT/DELETE/SCAN. - -use std::collections::HashMap; -use std::time::Duration; - -use async_trait::async_trait; -use common_error::ext::ErrorExt; - -use crate::storage::descriptors::RegionDescriptor; -use crate::storage::region::Region; - -const COMPACTION_STRATEGY_KEY: &str = "compaction"; -const COMPACTION_STRATEGY_TWCS_VALUE: &str = "TWCS"; -const TWCS_MAX_ACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_active_window_files"; -const TWCS_TIME_WINDOW_SECONDS_KEY: &str = "compaction.twcs.time_window_seconds"; -const TWCS_MAX_INACTIVE_WINDOW_FILES_KEY: &str = "compaction.twcs.max_inactive_window_files"; - -/// Storage engine provides primitive operations to store and access data. -#[async_trait] -pub trait StorageEngine: Send + Sync + Clone + 'static { - type Error: ErrorExt + Send + Sync; - type Region: Region; - - /// Opens an existing region. Returns `Ok(None)` if region does not exists. - async fn open_region( - &self, - ctx: &EngineContext, - name: &str, - opts: &OpenOptions, - ) -> Result, Self::Error>; - - /// Closes given region. - async fn close_region( - &self, - ctx: &EngineContext, - name: &str, - opts: &CloseOptions, - ) -> Result<(), Self::Error>; - - /// Creates and returns the created region. - /// - /// Returns existing region if region with same name already exists. The region will - /// be opened before returning. - async fn create_region( - &self, - ctx: &EngineContext, - descriptor: RegionDescriptor, - opts: &CreateOptions, - ) -> Result; - - /// Drops given region. - /// - /// The region will be closed before dropping. - async fn drop_region( - &self, - ctx: &EngineContext, - region: Self::Region, - ) -> Result<(), Self::Error>; - - /// Returns the opened region with given name. - fn get_region( - &self, - ctx: &EngineContext, - name: &str, - ) -> Result, Self::Error>; - - /// Close the engine. - async fn close(&self, ctx: &EngineContext) -> Result<(), Self::Error>; -} - -/// Storage engine context. -#[derive(Debug, Clone, Default)] -pub struct EngineContext {} - -/// Options to create a region. -#[derive(Debug, Clone, Default)] -pub struct CreateOptions { - /// Region parent directory - pub parent_dir: String, - /// Region memtable max size in bytes - pub write_buffer_size: Option, - /// Region SST files TTL - pub ttl: Option, - /// Compaction strategy - pub compaction_strategy: CompactionStrategy, -} - -/// Options to open a region. -#[derive(Debug, Clone, Default)] -pub struct OpenOptions { - /// Region parent directory - pub parent_dir: String, - /// Region memtable max size in bytes - pub write_buffer_size: Option, - /// Region SST files TTL - pub ttl: Option, - /// Compaction strategy - pub compaction_strategy: CompactionStrategy, -} - -/// Options to close a region. -#[derive(Debug, Clone, Default)] -pub struct CloseOptions { - /// Flush region - pub flush: bool, -} - -/// Options for compactions -#[derive(Debug, Clone)] -pub enum CompactionStrategy { - /// TWCS - Twcs(TwcsOptions), -} - -impl Default for CompactionStrategy { - fn default() -> Self { - Self::Twcs(TwcsOptions::default()) - } -} - -/// TWCS compaction options. -#[derive(Debug, Clone)] -pub struct TwcsOptions { - /// Max num of files that can be kept in active writing time window. - pub max_active_window_files: usize, - /// Max num of files that can be kept in inactive time window. - pub max_inactive_window_files: usize, - /// Compaction time window defined when creating tables. - pub time_window_seconds: Option, -} - -impl Default for TwcsOptions { - fn default() -> Self { - Self { - max_active_window_files: 4, - max_inactive_window_files: 1, - time_window_seconds: None, - } - } -} - -impl From<&HashMap> for CompactionStrategy { - fn from(opts: &HashMap) -> Self { - let Some(strategy_name) = opts.get(COMPACTION_STRATEGY_KEY) else { - return CompactionStrategy::default(); - }; - if strategy_name.eq_ignore_ascii_case(COMPACTION_STRATEGY_TWCS_VALUE) { - let mut twcs_opts = TwcsOptions::default(); - if let Some(max_active_window_files) = opts - .get(TWCS_MAX_ACTIVE_WINDOW_FILES_KEY) - .and_then(|num| num.parse::().ok()) - { - twcs_opts.max_active_window_files = max_active_window_files; - } - - if let Some(max_inactive_window_files) = opts - .get(TWCS_MAX_INACTIVE_WINDOW_FILES_KEY) - .and_then(|num| num.parse::().ok()) - { - twcs_opts.max_inactive_window_files = max_inactive_window_files; - } - - if let Some(time_window) = opts - .get(TWCS_TIME_WINDOW_SECONDS_KEY) - .and_then(|num| num.parse::().ok()) && time_window > 0 - { - twcs_opts.time_window_seconds = Some(time_window); - } - - CompactionStrategy::Twcs(twcs_opts) - } else { - // unrecognized compaction strategy - CompactionStrategy::default() - } - } -} diff --git a/src/store-api/src/storage/metadata.rs b/src/store-api/src/storage/metadata.rs deleted file mode 100644 index 8221ca3458ae..000000000000 --- a/src/store-api/src/storage/metadata.rs +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crate::storage::SchemaRef; - -/// Metadata of a region. -pub trait RegionMeta: Send + Sync { - /// Returns the schema of the region. - fn schema(&self) -> &SchemaRef; - - /// Returns the version of the region metadata. - fn version(&self) -> u32; -} diff --git a/src/store-api/src/storage/region.rs b/src/store-api/src/storage/region.rs deleted file mode 100644 index 66f09be0c83a..000000000000 --- a/src/store-api/src/storage/region.rs +++ /dev/null @@ -1,177 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//! Region holds chunks of rows stored in the storage engine, but does not require that -//! rows must have continuous primary key range, which is implementation specific. -//! -//! Regions support operations like PUT/DELETE/SCAN that most key-value stores provide. -//! However, unlike key-value store, data stored in region has data model like: -//! -//! ```text -//! colk-1, ..., colk-m, timestamp, version -> colv-1, ..., colv-n -//! ``` -//! -//! The data model require each row -//! - has 0 ~ m key column, parts of row key columns; -//! - **MUST** has a timestamp column, part of row key columns; -//! - has a version column, part of row key columns; -//! - has 0 ~ n value column. -//! -//! Each row is identified by (value of key columns, timestamp, version), which forms -//! a row key. Note that the implementation may allow multiple rows have same row -//! key (like ClickHouse), which is useful in analytic scenario. - -use async_trait::async_trait; -use common_error::ext::ErrorExt; - -use crate::storage::engine::OpenOptions; -use crate::storage::metadata::RegionMeta; -use crate::storage::requests::{AlterRequest, WriteRequest}; -use crate::storage::responses::WriteResponse; -use crate::storage::snapshot::{ReadContext, Snapshot}; -use crate::storage::RegionId; - -/// Chunks of rows in storage engine. -#[async_trait] -pub trait Region: Send + Sync + Clone + std::fmt::Debug + 'static { - type Error: ErrorExt + Send + Sync; - type Meta: RegionMeta; - type WriteRequest: WriteRequest; - type Snapshot: Snapshot; - - fn id(&self) -> RegionId; - - /// Returns name of the region. - fn name(&self) -> &str; - - /// Returns the in memory metadata of this region. - fn in_memory_metadata(&self) -> Self::Meta; - - /// Write updates to region. - async fn write( - &self, - ctx: &WriteContext, - request: Self::WriteRequest, - ) -> Result; - - /// Create a snapshot for read. - fn snapshot(&self, ctx: &ReadContext) -> Result; - - /// Create write request - fn write_request(&self) -> Self::WriteRequest; - - async fn alter(&self, request: AlterRequest) -> Result<(), Self::Error>; - - async fn drop_region(&self) -> Result<(), Self::Error>; - - fn disk_usage_bytes(&self) -> u64; - - fn region_stat(&self) -> RegionStat { - RegionStat { - region_id: self.id().into(), - disk_usage_bytes: self.disk_usage_bytes(), - } - } - - /// Flush memtable of the region to disk. - async fn flush(&self, ctx: &FlushContext) -> Result<(), Self::Error>; - - async fn compact(&self, ctx: &CompactContext) -> Result<(), Self::Error>; - - async fn truncate(&self) -> Result<(), Self::Error>; -} - -#[derive(Default, Debug)] -pub struct RegionStat { - pub region_id: u64, - pub disk_usage_bytes: u64, -} - -/// Context for write operations. -#[derive(Debug, Clone, Default)] -pub struct WriteContext {} - -impl From<&OpenOptions> for WriteContext { - fn from(_opts: &OpenOptions) -> WriteContext { - WriteContext::default() - } -} - -#[derive(Debug, Clone, Default)] -pub struct CloseContext { - /// If true, flush the closing region. - pub flush: bool, -} - -/// Context for flush operations. -#[derive(Debug, Clone)] -pub struct FlushContext { - /// If true, the flush will wait until the flush is done. - /// Default: true - pub wait: bool, - /// Flush reason. - pub reason: FlushReason, - /// If true, allows to flush a closed region - pub force: bool, -} - -impl Default for FlushContext { - fn default() -> FlushContext { - FlushContext { - wait: true, - reason: FlushReason::Others, - force: false, - } - } -} - -#[derive(Debug, Copy, Clone)] -pub struct CompactContext { - /// Whether to wait the compaction result. - pub wait: bool, -} - -impl Default for CompactContext { - fn default() -> CompactContext { - CompactContext { wait: true } - } -} - -/// Reason of flush operation. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum FlushReason { - /// Other reasons. - Others, - /// Memtable is full. - MemtableFull, - /// Flush manually. - Manually, - /// Auto flush periodically. - Periodically, - /// Global write buffer is full. - GlobalBufferFull, -} - -impl FlushReason { - /// Returns reason as `str`. - pub fn as_str(&self) -> &'static str { - match self { - FlushReason::Others => "others", - FlushReason::MemtableFull => "memtable_full", - FlushReason::Manually => "manually", - FlushReason::Periodically => "periodically", - FlushReason::GlobalBufferFull => "global_buffer_full", - } - } -} diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 07a1741f5a21..ad7edc897b91 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -12,39 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::{HashMap, HashSet}; - -use api::helper::ColumnDataTypeWrapper; -use api::v1::region::{alter_request, AddColumn as PbAddColumn}; -use api::v1::SemanticType; -use common_error::ext::ErrorExt; use common_query::logical_plan::Expr; use common_recordbatch::OrderOption; -use datatypes::vectors::VectorRef; -use snafu::{OptionExt, ResultExt}; - -use crate::error::{ - BuildColumnDescriptorSnafu, Error, InvalidDefaultConstraintSnafu, InvalidRawRegionRequestSnafu, -}; -use crate::storage::{ColumnDescriptor, ColumnDescriptorBuilder, RegionDescriptor}; - -/// Write request holds a collection of updates to apply to a region. -/// -/// The implementation of the write request should ensure all operations in -/// the request follows the same schema restriction. -pub trait WriteRequest: Send { - type Error: ErrorExt + Send + Sync; - - /// Add put operation to the request. - /// - /// `data` is the columnar format of the data to put. - fn put(&mut self, data: HashMap) -> Result<(), Self::Error>; - - /// Delete rows by `keys`. - /// - /// `keys` are the row keys, in columnar format, of the rows to delete. - fn delete(&mut self, keys: HashMap) -> Result<(), Self::Error>; -} #[derive(Default, Clone, Debug, PartialEq, Eq)] pub struct ScanRequest { @@ -61,325 +30,3 @@ pub struct ScanRequest { /// The data source should return *at least* this number of rows if available. pub limit: Option, } - -#[derive(Debug)] -pub struct GetRequest {} - -/// Operation to add a column. -#[derive(Debug, Clone)] -pub struct AddColumn { - /// Descriptor of the column to add. - pub desc: ColumnDescriptor, - /// Is the column a key column. - pub is_key: bool, -} - -/// Operation to alter a region. -#[derive(Debug, Clone)] -pub enum AlterOperation { - /// Add columns to the region. - AddColumns { - /// Columns to add. - columns: Vec, - }, - /// Drop columns from the region, only value columns are allowed to drop. - DropColumns { - /// Name of columns to drop. - names: Vec, - }, -} - -impl AlterOperation { - /// Apply the operation to the [RegionDescriptor]. - pub fn apply(&self, descriptor: &mut RegionDescriptor) { - match self { - AlterOperation::AddColumns { columns } => { - Self::apply_add(columns, descriptor); - } - AlterOperation::DropColumns { names } => { - Self::apply_drop(names, descriptor); - } - } - } - - /// Add `columns` to the [RegionDescriptor]. - /// - /// Value columns would be added to the default column family. - fn apply_add(columns: &[AddColumn], descriptor: &mut RegionDescriptor) { - for col in columns { - if col.is_key { - descriptor.row_key.columns.push(col.desc.clone()); - } else { - descriptor.default_cf.columns.push(col.desc.clone()); - } - } - } - - /// Drop columns from the [RegionDescriptor] by their `names`. - /// - /// Only value columns would be removed, non-value columns in `names` would be ignored. - fn apply_drop(names: &[String], descriptor: &mut RegionDescriptor) { - let name_set: HashSet<_> = names.iter().collect(); - // Remove columns in the default cf. - descriptor - .default_cf - .columns - .retain(|col| !name_set.contains(&col.name)); - // Remove columns in other cfs. - for cf in &mut descriptor.extra_cfs { - cf.columns.retain(|col| !name_set.contains(&col.name)); - } - } -} - -impl TryFrom for AddColumn { - type Error = Error; - - fn try_from(add_column: PbAddColumn) -> Result { - let column_def = add_column - .column_def - .context(InvalidRawRegionRequestSnafu { - err: "'column_def' is absent", - })?; - let column_id = column_def.column_id; - - let column_def = column_def - .column_def - .context(InvalidRawRegionRequestSnafu { - err: "'column_def' is absent", - })?; - - let data_type = column_def.data_type; - let data_type_ext = column_def.datatype_extension.clone(); - let data_type = ColumnDataTypeWrapper::try_new(data_type, data_type_ext) - .map_err(|_| { - InvalidRawRegionRequestSnafu { - err: format!("unknown raw column datatype: {data_type}"), - } - .build() - })? - .into(); - - let constraint = column_def.default_constraint.as_slice(); - let constraint = if constraint.is_empty() { - None - } else { - Some( - constraint - .try_into() - .context(InvalidDefaultConstraintSnafu { - constraint: String::from_utf8_lossy(constraint), - })?, - ) - }; - - let desc = ColumnDescriptorBuilder::new(column_id, column_def.name.clone(), data_type) - .is_nullable(column_def.is_nullable) - .is_time_index(column_def.semantic_type() == SemanticType::Timestamp) - .default_constraint(constraint) - .build() - .context(BuildColumnDescriptorSnafu)?; - - Ok(AddColumn { - desc, - is_key: column_def.semantic_type() == SemanticType::Tag, - // TODO(ruihang & yingwen): support alter column's "location" - }) - } -} - -impl TryFrom for AlterOperation { - type Error = Error; - - fn try_from(kind: alter_request::Kind) -> Result { - let operation = match kind { - alter_request::Kind::AddColumns(x) => { - let columns = x - .add_columns - .into_iter() - .map(|x| x.try_into()) - .collect::, Self::Error>>()?; - AlterOperation::AddColumns { columns } - } - alter_request::Kind::DropColumns(x) => { - let names = x.drop_columns.into_iter().map(|x| x.name).collect(); - AlterOperation::DropColumns { names } - } - }; - Ok(operation) - } -} - -/// Alter region request. -#[derive(Debug)] -pub struct AlterRequest { - /// Operation to do. - pub operation: AlterOperation, - /// The version of the schema before applying the alteration. - pub version: u32, -} - -#[cfg(test)] -mod tests { - use api::v1::region::{ - AddColumn as PbAddColumn, AddColumns, DropColumn, DropColumns, RegionColumnDef, - }; - use api::v1::{ColumnDataType, ColumnDef}; - use datatypes::prelude::*; - use datatypes::schema::ColumnDefaultConstraint; - - use super::*; - use crate::storage::{ - ColumnDescriptorBuilder, ColumnFamilyDescriptorBuilder, ColumnId, RegionDescriptorBuilder, - RowKeyDescriptorBuilder, - }; - - fn new_column_desc(id: ColumnId) -> ColumnDescriptor { - ColumnDescriptorBuilder::new(id, id.to_string(), ConcreteDataType::int64_datatype()) - .is_nullable(false) - .build() - .unwrap() - } - - fn new_region_descriptor() -> RegionDescriptor { - let row_key = RowKeyDescriptorBuilder::default() - .timestamp(new_column_desc(1)) - .build() - .unwrap(); - let default_cf = ColumnFamilyDescriptorBuilder::default() - .push_column(new_column_desc(2)) - .build() - .unwrap(); - - RegionDescriptorBuilder::default() - .id(1) - .name("test") - .row_key(row_key) - .default_cf(default_cf) - .build() - .unwrap() - } - - #[test] - fn test_alter_operation() { - let mut desc = new_region_descriptor(); - - let op = AlterOperation::AddColumns { - columns: vec![ - AddColumn { - desc: new_column_desc(3), - is_key: true, - }, - AddColumn { - desc: new_column_desc(4), - is_key: false, - }, - ], - }; - op.apply(&mut desc); - - assert_eq!(1, desc.row_key.columns.len()); - assert_eq!("3", desc.row_key.columns[0].name); - assert_eq!(2, desc.default_cf.columns.len()); - assert_eq!("2", desc.default_cf.columns[0].name); - assert_eq!("4", desc.default_cf.columns[1].name); - - let op = AlterOperation::DropColumns { - names: vec![String::from("2")], - }; - op.apply(&mut desc); - assert_eq!(1, desc.row_key.columns.len()); - assert_eq!(1, desc.default_cf.columns.len()); - assert_eq!("4", desc.default_cf.columns[0].name); - - // Key columns are ignored. - let op = AlterOperation::DropColumns { - names: vec![String::from("1"), String::from("3")], - }; - op.apply(&mut desc); - assert_eq!(1, desc.row_key.columns.len()); - assert_eq!(1, desc.default_cf.columns.len()); - } - - #[test] - fn test_try_from_raw_alter_kind() { - let kind = alter_request::Kind::AddColumns(AddColumns { - add_columns: vec![ - PbAddColumn { - column_def: Some(RegionColumnDef { - column_def: Some(ColumnDef { - name: "my_tag".to_string(), - data_type: ColumnDataType::Int32 as _, - is_nullable: false, - default_constraint: vec![], - semantic_type: SemanticType::Tag as _, - comment: String::new(), - ..Default::default() - }), - column_id: 1, - }), - location: None, - }, - PbAddColumn { - column_def: Some(RegionColumnDef { - column_def: Some(ColumnDef { - name: "my_field".to_string(), - data_type: ColumnDataType::String as _, - is_nullable: true, - default_constraint: ColumnDefaultConstraint::Value("hello".into()) - .try_into() - .unwrap(), - semantic_type: SemanticType::Field as _, - comment: String::new(), - ..Default::default() - }), - column_id: 2, - }), - location: None, - }, - ], - }); - - let AlterOperation::AddColumns { columns } = AlterOperation::try_from(kind).unwrap() else { - unreachable!() - }; - assert_eq!(2, columns.len()); - - let desc = &columns[0].desc; - assert_eq!(desc.id, 1); - assert_eq!(&desc.name, "my_tag"); - assert_eq!(desc.data_type, ConcreteDataType::int32_datatype()); - assert!(!desc.is_nullable()); - assert!(!desc.is_time_index()); - assert_eq!(desc.default_constraint(), None); - assert!(columns[0].is_key); - - let desc = &columns[1].desc; - assert_eq!(desc.id, 2); - assert_eq!(&desc.name, "my_field"); - assert_eq!(desc.data_type, ConcreteDataType::string_datatype()); - assert!(desc.is_nullable()); - assert!(!desc.is_time_index()); - assert_eq!( - desc.default_constraint(), - Some(&ColumnDefaultConstraint::Value("hello".into())) - ); - assert!(!columns[1].is_key); - - let kind = alter_request::Kind::DropColumns(DropColumns { - drop_columns: vec![ - DropColumn { - name: "c1".to_string(), - }, - DropColumn { - name: "c2".to_string(), - }, - ], - }); - - let AlterOperation::DropColumns { names } = AlterOperation::try_from(kind).unwrap() else { - unreachable!() - }; - assert_eq!(names, vec!["c1", "c2"]); - } -} diff --git a/src/store-api/src/storage/responses.rs b/src/store-api/src/storage/responses.rs deleted file mode 100644 index 7a226670ccc6..000000000000 --- a/src/store-api/src/storage/responses.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[derive(Debug)] -pub struct WriteResponse {} - -#[derive(Debug)] -pub struct ScanResponse { - /// Reader to read result chunks. - pub reader: R, -} - -#[derive(Debug)] -pub struct GetResponse {} diff --git a/src/store-api/src/storage/snapshot.rs b/src/store-api/src/storage/snapshot.rs deleted file mode 100644 index 9bf5692b5af9..000000000000 --- a/src/store-api/src/storage/snapshot.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use async_trait::async_trait; -use common_error::ext::ErrorExt; -use datatypes::schema::SchemaRef; - -use crate::storage::chunk::ChunkReader; -use crate::storage::consts; -use crate::storage::requests::{GetRequest, ScanRequest}; -use crate::storage::responses::{GetResponse, ScanResponse}; - -/// A consistent read-only view of region. -#[async_trait] -pub trait Snapshot: Send + Sync { - type Error: ErrorExt + Send + Sync; - type Reader: ChunkReader; - - fn schema(&self) -> &SchemaRef; - - async fn scan( - &self, - ctx: &ReadContext, - request: ScanRequest, - ) -> Result, Self::Error>; - - async fn get(&self, ctx: &ReadContext, request: GetRequest) - -> Result; -} - -/// Context for read. -#[derive(Debug, Clone)] -pub struct ReadContext { - /// Suggested batch size of chunk. - pub batch_size: usize, -} - -impl Default for ReadContext { - fn default() -> ReadContext { - ReadContext { - batch_size: consts::READ_BATCH_SIZE, - } - } -} diff --git a/src/store-api/src/storage/types.rs b/src/store-api/src/storage/types.rs index 7ce8dcc55b0e..ff1162d4013f 100644 --- a/src/store-api/src/storage/types.rs +++ b/src/store-api/src/storage/types.rs @@ -14,11 +14,6 @@ //! Common types. -use api::v1::OpType; - /// Represents a sequence number of data in storage. The offset of logstore can be used /// as a sequence number. pub type SequenceNumber = u64; - -// TODO(hl): We should implement a `min` method for OpType in greptime-proto crate. -pub const MIN_OP_TYPE: OpType = OpType::Delete; diff --git a/src/table/src/lib.rs b/src/table/src/lib.rs index c64b0f563e1a..2b10f8f20570 100644 --- a/src/table/src/lib.rs +++ b/src/table/src/lib.rs @@ -25,8 +25,6 @@ pub mod table; pub mod test_util; pub mod thin_table; -pub use store_api::storage::RegionStat; - pub use crate::error::{Error, Result}; pub use crate::stats::{ColumnStatistics, TableStatistics}; pub use crate::table::{Table, TableRef};