Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: table route for metric engine #3053

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,18 @@ impl StartCommand {
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator =
TableMetadataAllocator::new(table_id_sequence, wal_options_allocator.clone());

let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let table_meta_allocator = TableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
table_metadata_manager.clone(),
);

let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
table_metadata_manager,
procedure_manager.clone(),
datanode_manager.clone(),
table_meta_allocator,
Expand Down Expand Up @@ -441,14 +448,11 @@ impl StartCommand {
}

pub async fn create_ddl_task_executor(
kv_backend: KvBackendRef,
table_metadata_manager: TableMetadataManagerRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocator,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;

let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
Expand Down
39 changes: 35 additions & 4 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_telemetry::{debug, info};
use snafu::ensure;
use snafu::{ensure, OptionExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Result, UnsupportedSnafu};
use crate::error::{Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
Expand All @@ -33,29 +36,34 @@ use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};
pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
peer_allocator: PeerAllocatorRef,
}

impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
wal_options_allocator,
table_metadata_manager,
Arc::new(NoopPeerAllocator),
)
}

pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
table_metadata_manager,
peer_allocator,
}
}
Expand Down Expand Up @@ -115,8 +123,31 @@ impl TableMetadataAllocator {
) -> Result<TableRouteValue> {
let regions = task.partitions.len();

let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
let table_route = if task.create_table.engine == METRIC_ENGINE
&& let Some(physical_table_name) = task
.create_table
.table_options
.get(LOGICAL_TABLE_METADATA_KEY)
WenyXu marked this conversation as resolved.
Show resolved Hide resolved
{
let physical_table_id = self
.table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
&task.create_table.catalog_name,
&task.create_table.schema_name,
physical_table_name,
))
.await?
.context(TableNotFoundSnafu {
table_name: physical_table_name,
})?
.table_id();

let region_ids = (0..regions)
.map(|i| RegionId::new(table_id, i as RegionNumber))
.collect();

TableRouteValue::Logical(LogicalTableRouteValue::new(physical_table_id, region_ids))
} else {
let peers = self.peer_allocator.alloc(ctx, regions).await?;

Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,9 @@ mod tests {
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
Arc::new(SequenceBuilder::new("test", kv_backend.clone()).build()),
Arc::new(WalOptionsAllocator::default()),
Arc::new(TableMetadataManager::new(kv_backend)),
),
Arc::new(MemoryRegionKeeper::default()),
);
Expand Down
16 changes: 12 additions & 4 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ pub struct PhysicalTableRouteValue {

#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)]
pub struct LogicalTableRouteValue {
// TODO(LFC): Add table route for MetricsEngine table.
physical_table_id: TableId,
region_ids: Vec<RegionId>,
}

impl TableRouteValue {
Expand Down Expand Up @@ -174,12 +175,19 @@ impl PhysicalTableRouteValue {
}

impl LogicalTableRouteValue {
pub fn new(physical_table_id: TableId, region_ids: Vec<RegionId>) -> Self {
Self {
physical_table_id,
region_ids,
}
}

pub fn physical_table_id(&self) -> TableId {
todo!()
self.physical_table_id
}

pub fn region_ids(&self) -> Vec<RegionId> {
todo!()
pub fn region_ids(&self) -> &Vec<RegionId> {
&self.region_ids
}
}

Expand Down
1 change: 1 addition & 0 deletions src/common/meta/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#![feature(assert_matches)]
#![feature(btree_extract_if)]
#![feature(async_closure)]
#![feature(let_chains)]

pub mod cache_invalidator;
pub mod datanode_manager;
Expand Down
3 changes: 2 additions & 1 deletion src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ pub fn convert_to_region_leader_status_map(
pub fn find_region_leader(
region_routes: &[RegionRoute],
region_number: RegionNumber,
) -> Option<&Peer> {
) -> Option<Peer> {
region_routes
.iter()
.find(|x| x.region.id.region_number() == region_number)
.and_then(|r| r.leader_peer.as_ref())
.cloned()
}

pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Vec<RegionNumber> {
Expand Down
12 changes: 12 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ pub enum Error {
location: Location,
source: BoxedError,
},

#[snafu(display(
"Failed to find logical regions in physical region {}",
physical_region_id
))]
FindLogicalRegions {
physical_region_id: RegionId,
source: metric_engine::error::Error,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -340,6 +350,8 @@ impl ErrorExt for Error {
}
HandleRegionRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),

FindLogicalRegions { source, .. } => source.status_code(),
}
}

Expand Down
Loading
Loading