Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(remote_wal): introduce region meta and integrate it into creating table metadata #2830

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ serde_json.workspace = true
servers.workspace = true
session.workspace = true
snafu.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true
Expand Down
28 changes: 27 additions & 1 deletion src/cmd/src/cli/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -22,12 +22,15 @@ use clap::Parser;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::etcd::EtcdStore;
use common_meta::peer::Peer;
use common_meta::region_meta::wal_meta::{KeyName, RegionWalMeta, RegionWalMetaKey};
use common_meta::region_meta::RegionMeta;
use common_meta::rpc::router::{Region, RegionRoute};
use common_meta::table_name::TableName;
use common_telemetry::info;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, RawSchema};
use rand::Rng;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType};

use self::metadata::TableMetadataBencher;
Expand Down Expand Up @@ -160,3 +163,26 @@ fn create_region_routes() -> Vec<RegionRoute> {

regions
}

fn create_region_meta_map(table_id: TableId) -> HashMap<RegionNumber, RegionMeta> {
// The region ids shall be coincident with the those used in `create_region_routes`.
(0..64u64)
.map(|region_id| {
let key = RegionId::from_u64(region_id).region_number();
let value = new_bench_region_meta(table_id, region_id);
(key, value)
})
.collect()
}

fn new_bench_region_meta(table_id: TableId, region_id: u64) -> RegionMeta {
let topic_key = RegionWalMetaKey::new(
table_id,
RegionId::from_u64(region_id).region_number(),
KeyName::KafkaTopic,
)
.to_string();
let topic_value = "test_topic".to_string();
let wal_meta = RegionWalMeta::with_metas([(topic_key, topic_value)]);
RegionMeta { wal_meta }
}
5 changes: 3 additions & 2 deletions src/cmd/src/cli/bench/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::time::Instant;
use common_meta::key::TableMetadataManagerRef;
use common_meta::table_name::TableName;

use super::{bench_self_recorded, create_region_routes, create_table_info};
use super::{bench_self_recorded, create_region_meta_map, create_region_routes, create_table_info};

pub struct TableMetadataBencher {
table_metadata_manager: TableMetadataManagerRef,
Expand All @@ -44,11 +44,12 @@ impl TableMetadataBencher {
let table_name = TableName::new("bench_catalog", "bench_schema", table_name);
let table_info = create_table_info(i, table_name);
let region_routes = create_region_routes();
let region_meta_map = create_region_meta_map(i);

let start = Instant::now();

self.table_metadata_manager
.create_table_metadata(table_info, region_routes)
.create_table_metadata(table_info, region_routes, region_meta_map)
.await
.unwrap();

Expand Down
3 changes: 3 additions & 0 deletions src/cmd/src/cli/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ impl MigrateTableMetadata {
);
let region_distribution: RegionDistribution =
value.regions_id_map.clone().into_iter().collect();
// TODO(niebayes): Requires properly construct a region_meta_map.
let region_metas = Vec::new();

let datanode_table_kvs = region_distribution
.into_iter()
Expand All @@ -405,6 +407,7 @@ impl MigrateTableMetadata {
DatanodeTableValue::new(
table_id,
regions,
region_metas.clone(),
RegionInfo {
engine: engine.to_string(),
region_storage_path: region_storage_path.clone(),
Expand Down
16 changes: 14 additions & 2 deletions src/common/meta/src/ddl/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::region::region_request::Body as PbRegionRequest;
use api::v1::region::{
CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader,
Expand All @@ -25,7 +27,7 @@ use common_telemetry::tracing_context::TracingContext;
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use strum::AsRefStr;
use table::engine::TableReference;
use table::metadata::{RawTableInfo, TableId};
Expand All @@ -35,6 +37,7 @@ use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
use crate::metrics;
use crate::region_meta::RegionMeta;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};

Expand Down Expand Up @@ -78,6 +81,10 @@ impl CreateTableProcedure {
&self.creator.data.region_routes
}

pub fn region_meta_map(&self) -> &HashMap<RegionNumber, RegionMeta> {
&self.creator.data.region_meta_map
}

/// Checks whether the table exists.
async fn on_prepare(&mut self) -> Result<Status> {
let expr = &self.creator.data.task.create_table;
Expand Down Expand Up @@ -235,8 +242,10 @@ impl CreateTableProcedure {

let raw_table_info = self.table_info().clone();
let region_routes = self.region_routes().clone();
let region_meta_map = self.region_meta_map().clone();

manager
.create_table_metadata(raw_table_info, region_routes)
.create_table_metadata(raw_table_info, region_routes, region_meta_map)
.await?;
info!("Created table metadata for table {table_id}");

Expand Down Expand Up @@ -293,6 +302,8 @@ impl TableCreator {
cluster_id,
task,
region_routes,
// TODO(niebayes): Allocate region metadata in `TableMetadataAllocator`.
region_meta_map: HashMap::new(),
},
}
}
Expand All @@ -313,6 +324,7 @@ pub struct CreateTableData {
pub state: CreateTableState,
pub task: CreateTableTask,
pub region_routes: Vec<RegionRoute>,
pub region_meta_map: HashMap<RegionNumber, RegionMeta>,
pub cluster_id: u64,
}

Expand Down
Loading
Loading