Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: hide RegionRoute behind TableRouteValue #2989

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
750 changes: 405 additions & 345 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::time::Instant;

use common_meta::key::table_route::TableRouteValue;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;

Expand Down Expand Up @@ -53,7 +54,11 @@ impl TableMetadataBencher {
let start = Instant::now();

self.table_metadata_manager
.create_table_metadata(table_info, region_routes, region_wal_options)
.create_table_metadata(
table_info,
TableRouteValue::physical(region_routes),
region_wal_options,
)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl MigrateTableMetadata {
)
.unwrap();

let new_table_value = NextTableRouteValue::new(table_route.region_routes);
let new_table_value = NextTableRouteValue::physical(table_route.region_routes);

let table_id = table_route.table.id as u32;
let new_key = TableRouteKey::new(table_id);
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ use store_api::storage::{RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::datanode_manager::DatanodeManagerRef;
use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::router::RegionRoute;

pub mod alter_table;
pub mod create_table;
Expand Down Expand Up @@ -58,7 +58,7 @@ pub struct TableMetadata {
/// Table id.
pub table_id: TableId,
/// Route information for each region of the table.
pub region_routes: Vec<RegionRoute>,
pub table_route: TableRouteValue,
/// The encoded wal options for regions of the table.
// If a region does not have an associated wal options, no key for the region would be found in the map.
pub region_wal_options: HashMap<RegionNumber, String>,
Expand Down
5 changes: 1 addition & 4 deletions src/common/meta/src/ddl/alter_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,14 @@ impl AlterTableProcedure {

pub async fn submit_alter_region_requests(&mut self) -> Result<Status> {
let table_id = self.data.table_id();
let table_ref = self.data.table_ref();

let table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(table_id)
.await?
.with_context(|| TableRouteNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.context(TableRouteNotFoundSnafu { table_id })?
.into_inner();
let region_routes = table_route.region_routes();

Expand Down
168 changes: 81 additions & 87 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
};
use api::v1::{ColumnDef, CreateTableExpr, SemanticType};
use api::v1::{ColumnDef, SemanticType};
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_config::WAL_OPTIONS_KEY;
use common_error::ext::BoxedError;
use common_procedure::error::{
Expand All @@ -40,8 +39,9 @@ use table::metadata::{RawTableInfo, TableId};

use crate::ddl::utils::{handle_operate_region_error, handle_retry_error, region_storage_path};
use crate::ddl::DdlContext;
use crate::error::{self, Result, TableInfoNotFoundSnafu};
use crate::error::{self, Result, TableRouteNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::metrics;
use crate::region_keeper::OperatingRegionGuard;
use crate::rpc::ddl::CreateTableTask;
Expand All @@ -60,13 +60,13 @@ impl CreateTableProcedure {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
context: DdlContext,
) -> Self {
Self {
context,
creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options),
creator: TableCreator::new(cluster_id, task, table_route, region_wal_options),
}
}

Expand All @@ -78,10 +78,12 @@ impl CreateTableProcedure {
opening_regions: vec![],
};

creator
.register_opening_regions(&context)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
if let TableRouteValue::Physical(x) = &creator.data.table_route {
creator.opening_regions = creator
.register_opening_regions(&context, &x.region_routes)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
}

Ok(CreateTableProcedure { context, creator })
}
Expand All @@ -94,10 +96,6 @@ impl CreateTableProcedure {
self.table_info().ident.table_id
}

pub fn region_routes(&self) -> &Vec<RegionRoute> {
&self.creator.data.region_routes
}

pub fn region_wal_options(&self) -> &HashMap<RegionNumber, String> {
&self.creator.data.region_wal_options
}
Expand Down Expand Up @@ -132,7 +130,10 @@ impl CreateTableProcedure {
Ok(Status::executing(true))
}

pub fn new_region_request_builder(&self) -> Result<CreateRequestBuilder> {
pub fn new_region_request_builder(
&self,
physical_table_id: Option<TableId>,
) -> Result<CreateRequestBuilder> {
let create_table_expr = &self.creator.data.task.create_table;

let column_defs = create_table_expr
Expand Down Expand Up @@ -191,25 +192,61 @@ impl CreateTableProcedure {
options: create_table_expr.table_options.clone(),
};

let builder = CreateRequestBuilder::new_template(self.context.clone(), template);
Ok(builder)
Ok(CreateRequestBuilder {
template,
physical_table_id,
})
}

pub async fn on_datanode_create_regions(&mut self) -> Result<Status> {
match &self.creator.data.table_route {
TableRouteValue::Physical(x) => {
let region_routes = x.region_routes.clone();
let request_builder = self.new_region_request_builder(None)?;
self.create_regions(&region_routes, request_builder).await
}
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();

let physical_table_route = self
.context
.table_metadata_manager
.table_route_manager()
.get(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
let region_routes = physical_table_route.region_routes();

let request_builder = self.new_region_request_builder(Some(physical_table_id))?;

self.create_regions(region_routes, request_builder).await
}
}
}

async fn create_regions(
&mut self,
region_routes: &[RegionRoute],
request_builder: CreateRequestBuilder,
) -> Result<Status> {
// Registers opening regions
self.creator.register_opening_regions(&self.context)?;
let guards = self
.creator
.register_opening_regions(&self.context, region_routes)?;
if !guards.is_empty() {
self.creator.opening_regions = guards;
}

let create_table_data = &self.creator.data;
let region_routes = &create_table_data.region_routes;
let region_wal_options = &create_table_data.region_wal_options;

let create_table_expr = &create_table_data.task.create_table;
let catalog = &create_table_expr.catalog_name;
let schema = &create_table_expr.schema_name;
let storage_path = region_storage_path(catalog, schema);

let mut request_builder = self.new_region_request_builder()?;

let leaders = find_leaders(region_routes);
let mut create_region_tasks = Vec::with_capacity(leaders.len());

Expand All @@ -221,12 +258,7 @@ impl CreateTableProcedure {
for region_number in regions {
let region_id = RegionId::new(self.table_id(), region_number);
let create_region_request = request_builder
.build_one(
&self.creator.data.task.create_table,
region_id,
storage_path.clone(),
region_wal_options,
)
.build_one(region_id, storage_path.clone(), region_wal_options)
.await?;

requests.push(PbRegionRequest::Create(create_region_request));
Expand Down Expand Up @@ -270,10 +302,13 @@ impl CreateTableProcedure {
let manager = &self.context.table_metadata_manager;

let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_wal_options = self.region_wal_options().clone();
manager
.create_table_metadata(raw_table_info, region_routes, region_wal_options)
.create_table_metadata(
raw_table_info,
self.creator.data.table_route.clone(),
region_wal_options,
)
.await?;
info!("Created table metadata for table {table_id}");

Expand Down Expand Up @@ -329,29 +364,31 @@ impl TableCreator {
pub fn new(
cluster_id: u64,
task: CreateTableTask,
region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
region_wal_options: HashMap<RegionNumber, String>,
) -> Self {
Self {
data: CreateTableData {
state: CreateTableState::Prepare,
cluster_id,
task,
region_routes,
table_route,
region_wal_options,
},
opening_regions: vec![],
}
}

/// Register opening regions if doesn't exist.
pub fn register_opening_regions(&mut self, context: &DdlContext) -> Result<()> {
let region_routes = &self.data.region_routes;

/// Registers and returns the guards of the opening region if they don't exist.
fn register_opening_regions(
&self,
context: &DdlContext,
region_routes: &[RegionRoute],
) -> Result<Vec<OperatingRegionGuard>> {
let opening_regions = operating_leader_regions(region_routes);

if self.opening_regions.len() == opening_regions.len() {
return Ok(());
return Ok(vec![]);
}

let mut opening_region_guards = Vec::with_capacity(opening_regions.len());
Expand All @@ -366,9 +403,7 @@ impl TableCreator {
})?;
opening_region_guards.push(guard);
}

self.opening_regions = opening_region_guards;
Ok(())
Ok(opening_region_guards)
}
}

Expand All @@ -386,7 +421,7 @@ pub enum CreateTableState {
pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
table_route: TableRouteValue,
pub region_wal_options: HashMap<RegionNumber, String>,
pub cluster_id: u64,
}
Expand All @@ -399,28 +434,18 @@ impl CreateTableData {

/// Builder for [PbCreateRegionRequest].
pub struct CreateRequestBuilder {
context: DdlContext,
template: PbCreateRegionRequest,
/// Optional. Only for metric engine.
physical_table_id: Option<TableId>,
}

impl CreateRequestBuilder {
fn new_template(context: DdlContext, template: PbCreateRegionRequest) -> Self {
Self {
context,
template,
physical_table_id: None,
}
}

pub fn template(&self) -> &PbCreateRegionRequest {
&self.template
}

async fn build_one(
&mut self,
create_expr: &CreateTableExpr,
&self,
region_id: RegionId,
storage_path: String,
region_wal_options: &HashMap<RegionNumber, String>,
Expand All @@ -438,49 +463,18 @@ impl CreateRequestBuilder {
.insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone())
});

if self.template.engine == METRIC_ENGINE {
self.metric_engine_hook(create_expr, region_id, &mut request)
.await?;
}

Ok(request)
}
if let Some(physical_table_id) = self.physical_table_id {
// Logical table has the same region numbers with physical table, and they have a one-to-one mapping.
// For example, region 0 of logical table must resides with region 0 of physical table. So here we can
// simply concat the physical table id and the logical region number to get the physical region id.
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());

async fn metric_engine_hook(
&mut self,
create_expr: &CreateTableExpr,
region_id: RegionId,
request: &mut PbCreateRegionRequest,
) -> Result<()> {
if let Some(physical_table_name) = request.options.get(LOGICAL_TABLE_METADATA_KEY) {
let table_id = if let Some(table_id) = self.physical_table_id {
table_id
} else {
let table_name_manager = self.context.table_metadata_manager.table_name_manager();
let table_name_key = TableNameKey::new(
&create_expr.catalog_name,
&create_expr.schema_name,
physical_table_name,
);
let table_id = table_name_manager
.get(table_name_key)
.await?
.context(TableInfoNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();
self.physical_table_id = Some(table_id);
table_id
};
// Concat physical table's table id and corresponding region number to get
// the physical region id.
let physical_region_id = RegionId::new(table_id, region_id.region_number());
request.options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_region_id.as_u64().to_string(),
);
}

Ok(())
Ok(request)
}
}
Loading
Loading