Skip to content

Commit

Permalink
feat: create table procedure for metric engine, part 1 (#2943)
Browse files Browse the repository at this point in the history
* implementation

Signed-off-by: Ruihang Xia <[email protected]>

* initialize

Signed-off-by: Ruihang Xia <[email protected]>

* remove empty file

Signed-off-by: Ruihang Xia <[email protected]>

* apply review sugg

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 18, 2023
1 parent 4383a69 commit 9af9c02
Show file tree
Hide file tree
Showing 19 changed files with 158 additions and 48 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ frontend = { path = "src/frontend" }
log-store = { path = "src/log-store" }
meta-client = { path = "src/meta-client" }
meta-srv = { path = "src/meta-srv" }
metric-engine = { path = "src/metric-engine" }
mito2 = { path = "src/mito2" }
object-store = { path = "src/object-store" }
operator = { path = "src/operator" }
Expand Down
1 change: 1 addition & 0 deletions src/common/catalog/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub const INFORMATION_SCHEMA_COLUMN_STATISTICS_TABLE_ID: u32 = 7;
pub const MITO_ENGINE: &str = "mito";
pub const MITO2_ENGINE: &str = "mito2";
pub const METRIC_ENGINE: &str = "metric";

pub fn default_engine() -> &'static str {
MITO_ENGINE
Expand Down
121 changes: 104 additions & 17 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, SemanticType};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_error::ext::BoxedError;
use common_procedure::error::{
ExternalSnafu, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
Expand All @@ -28,14 +29,15 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::RegionId;
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::error::{self, Result, TableInfoNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
Expand Down Expand Up @@ -122,7 +124,7 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}

pub fn create_region_request_template(&self) -> Result<PbCreateRegionRequest> {
pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;

let column_defs = create_table_expr
Expand Down Expand Up @@ -172,14 +174,17 @@ impl CreateTableProcedure {
})
.collect::<Result<_>>()?;

Ok(PbCreateRegionRequest {
let template = PbCreateRegionRequest {
region_id: 0,
engine: create_table_expr.engine.to_string(),
column_defs,
primary_key,
path: String::new(),
options: create_table_expr.table_options.clone(),
})
};

let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
Ok(builder)
}

pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
Expand All @@ -194,7 +199,7 @@ impl CreateTableProcedure {
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);

let request_template = self.create_region_request_template()?;
let mut request_builder = self.new_region_request_builder()?;

let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());
Expand All @@ -203,17 +208,20 @@ impl CreateTableProcedure {
let requester = self.context.datanode_manager.datanode(&datanode).await;

let regions = find_leader_regions(region_routes, &datanode);
let requests = regions
.iter()
.map(|region_number| {
let region_id = RegionId::new(self.table_id(), *region_number);

let mut create_region_request = request_template.clone();
create_region_request.region_id = region_id.as_u64();
create_region_request.path = storage_path.clone();
PbRegionRequest::Create(create_region_request)
})
.collect::<Vec<_>>();
let mut requests = Vec::with_capacity(regions.len());
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);

let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
)
.await?;

requests.push(PbRegionRequest::Create(create_region_request));
}

for request in requests {
let request = RegionRequest {
Expand Down Expand Up @@ -371,3 +379,82 @@ impl CreateTableData {
self.task.table_ref()
}
}

/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
context: DdlContext,
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}

impl CreateRequestBuilder {
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
Self {
context,
template,
physical_table_id: None,
}
}

pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}

async fn build_one(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
storage_path: String,
) -> Result<PbCreateRegionRequest> {
let mut request = self.template.clone();

request.region_id = region_id.as_u64();
request.path = storage_path;

if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
.await?;
}

Ok(request)
}

async fn metric_engine_hook(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
request: &mut PbCreateRegionRequest,
) -> Result<()> {
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
let table_id = if let Some(table_id) = self.physical_table_id {
table_id
} else {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_key = TableNameKey::new(
&create_expr.catalog_name,
&create_expr.schema_name,
physical_table_name,
);
let table_id = table_name_manager
.get(table_name_key)
.await?
.context(TableInfoNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();
self.physical_table_id = Some(table_id);
table_id
};
// Concat physical table's table id and corresponding region number to get
// the physical region id.
let physical_region_id = RegionId::new(table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}

Ok(())
}
}
10 changes: 9 additions & 1 deletion src/common/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ pub enum Error {
#[snafu(display("Invalid result with a txn response: {}", err_msg))]
InvalidTxnResult { err_msg: String, location: Location },

#[snafu(display("Invalid engine type: {}", engine_type))]
InvalidEngineType {
engine_type: String,
location: Location,
},

#[snafu(display("Failed to connect to Etcd"))]
ConnectEtcd {
#[snafu(source)]
Expand Down Expand Up @@ -322,7 +328,9 @@ impl ErrorExt for Error {
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,

PrimaryKeyNotFound { .. } | &EmptyKey { .. } => StatusCode::InvalidArguments,
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
StatusCode::InvalidArguments
}

TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
Expand Down
1 change: 1 addition & 0 deletions src/datanode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ hyper = { version = "0.14", features = ["full"] }
lazy_static.workspace = true
log-store.workspace = true
meta-client.workspace = true
metric-engine.workspace = true
mito2.workspace = true
object-store.workspace = true
pin-project = "1.0"
Expand Down
7 changes: 5 additions & 2 deletions src/datanode/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use futures_util::future::try_join_all;
use futures_util::StreamExt;
use log_store::raft_engine::log_store::RaftEngineLogStore;
use meta_client::client::MetaClient;
use metric_engine::engine::MetricEngine;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
Expand Down Expand Up @@ -471,12 +472,14 @@ impl DatanodeBuilder {
for engine in &opts.region_engine {
match engine {
RegionEngineConfig::Mito(config) => {
let engine: MitoEngine = MitoEngine::new(
let mito_engine: MitoEngine = MitoEngine::new(
config.clone(),
log_store.clone(),
object_store_manager.clone(),
);
engines.push(Arc::new(engine) as _);
let metric_engine = MetricEngine::new(mito_engine.clone());
engines.push(Arc::new(mito_engine) as _);
engines.push(Arc::new(metric_engine) as _);
}
RegionEngineConfig::File(config) => {
let engine = FileRegionEngine::new(
Expand Down
6 changes: 3 additions & 3 deletions src/meta-srv/src/procedure/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ fn create_table_task() -> CreateTableTask {
}

#[test]
fn test_create_region_request_template() {
fn test_region_request_builder() {
let procedure = CreateTableProcedure::new(
1,
create_table_task(),
test_data::new_region_routes(),
test_data::new_ddl_context(Arc::new(DatanodeClients::default())),
);

let template = procedure.create_region_request_template().unwrap();
let template = procedure.new_region_request_builder().unwrap();

let expected = PbCreateRegionRequest {
region_id: 0,
Expand Down Expand Up @@ -163,7 +163,7 @@ fn test_create_region_request_template() {
path: String::new(),
options: HashMap::new(),
};
assert_eq!(template, expected);
assert_eq!(template.template(), &expected);
}

async fn new_datanode_manager(
Expand Down
5 changes: 4 additions & 1 deletion src/metric-engine/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
use store_api::region_engine::{RegionEngine, RegionRole, SetReadonlyResponse};
use store_api::region_request::{AffectedRows, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::RwLock;

use self::state::MetricEngineState;
use crate::consts::METRIC_ENGINE_NAME;
use crate::data_region::DataRegion;
use crate::metadata_region::MetadataRegion;

/// Fixed random state for generating tsid
pub(crate) const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4);

#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Metric Engine
///
Expand Down
15 changes: 8 additions & 7 deletions src/metric-engine/src/engine/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;

use crate::consts::{
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
LOGICAL_TABLE_METADATA_KEY, METADATA_REGION_SUBDIR, METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX,
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AffectedRows, RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::RegionId;

use crate::engine::MetricEngineInner;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
Expand Down Expand Up @@ -412,8 +412,9 @@ impl MetricEngineInner {

#[cfg(test)]
mod test {
use store_api::metric_engine_consts::METRIC_ENGINE_NAME;

use super::*;
use crate::consts::METRIC_ENGINE_NAME;
use crate::engine::MetricEngine;
use crate::test_util::TestEnv;

Expand Down
6 changes: 4 additions & 2 deletions src/metric-engine/src/engine/put.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
use common_telemetry::{error, info};
use snafu::OptionExt;
use store_api::metric_engine_consts::{
DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
};
use store_api::region_request::{AffectedRows, RegionPutRequest};
use store_api::storage::{RegionId, TableId};

use crate::consts::{DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, RANDOM_STATE};
use crate::engine::MetricEngineInner;
use crate::engine::{MetricEngineInner, RANDOM_STATE};
use crate::error::{
ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result,
};
Expand Down
2 changes: 1 addition & 1 deletion src/metric-engine/src/engine/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info, tracing};
use datafusion::logical_expr;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::RegionEngine;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, ScanRequest};

use crate::consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use crate::engine::MetricEngineInner;
use crate::error::{LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, Result};
use crate::utils;
Expand Down
1 change: 0 additions & 1 deletion src/metric-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
//! └─────────────────────┘
//! ```
pub mod consts;
mod data_region;
#[allow(unused)]
pub mod engine;
Expand Down
Loading

0 comments on commit 9af9c02

Please sign in to comment.