Skip to content

Commit

Permalink
refactor: merge standalone and metasrv table metadata allocators (#3035)
Browse files Browse the repository at this point in the history
* refactor: merge standalone and metasrv table metadata allocators

* Update src/common/meta/src/ddl/table_meta.rs

Co-authored-by: niebayes <[email protected]>

* Update src/common/meta/src/ddl/table_meta.rs

Co-authored-by: Weny Xu <[email protected]>

---------

Co-authored-by: niebayes <[email protected]>
Co-authored-by: Weny Xu <[email protected]>
  • Loading branch information
3 people authored Dec 29, 2023
1 parent e16f093 commit 7551432
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 322 deletions.
12 changes: 5 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
Expand All @@ -38,7 +39,6 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
Expand Down Expand Up @@ -406,10 +406,8 @@ impl StartCommand {
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let table_meta_allocator =
TableMetadataAllocator::new(table_id_sequence, wal_options_allocator.clone());

let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Expand Down Expand Up @@ -446,7 +444,7 @@ impl StartCommand {
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocator,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
Expand Down
14 changes: 2 additions & 12 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};

pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod table_meta;
pub mod truncate_table;
pub mod utils;

Expand Down Expand Up @@ -64,17 +65,6 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> Result<TableMetadata>;
}

pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;

#[derive(Clone)]
pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,
Expand Down
190 changes: 190 additions & 0 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};

pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
}

impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
wal_options_allocator,
Arc::new(NoopPeerAllocator),
)
}

pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
peer_allocator,
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

ensure!(
!self
.table_id_sequence
.min_max()
.await
.contains(&(table_id as u64)),
UnsupportedSnafu {
operation: format!(
"create table by id {} that is reserved in this node",
table_id
)
}
);

info!(
"Received explicitly allocated table id {}, will use it directly.",
table_id
);

table_id
} else {
self.table_id_sequence.next().await? as TableId
};
Ok(table_id)
}

fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> Result<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}

async fn create_table_route(
&self,
ctx: &TableMetadataAllocatorContext,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let regions = task.partitions.len();

let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let peers = self.peer_allocator.alloc(ctx, regions).await?;

let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};

let peer = peers[i % peers.len()].clone();

RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
}
pub async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_route = self.create_table_route(ctx, table_id, task).await?;
let region_wal_options = self.create_wal_options(&table_route)?;

debug!(
"Allocated region wal options {:?} for table {}",
region_wal_options, table_id
);

Ok(TableMetadata {
table_id,
table_route,
region_wal_options,
})
}
}

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

/// [PeerAllocator] allocates [Peer]s for creating regions.
#[async_trait]
pub trait PeerAllocator: Send + Sync {
/// Allocates `regions` size [Peer]s.
async fn alloc(&self, ctx: &TableMetadataAllocatorContext, regions: usize)
-> Result<Vec<Peer>>;
}

struct NoopPeerAllocator;

#[async_trait]
impl PeerAllocator for NoopPeerAllocator {
async fn alloc(
&self,
_ctx: &TableMetadataAllocatorContext,
regions: usize,
) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); regions])
}
}
33 changes: 11 additions & 22 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu,
Expand All @@ -54,7 +54,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocator,
memory_region_keeper: MemoryRegionKeeperRef,
}

Expand All @@ -65,7 +65,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocator,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
Expand Down Expand Up @@ -461,15 +461,15 @@ mod tests {
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::rpc::ddl::CreateTableTask;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal::WalOptionsAllocator;

/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;
Expand All @@ -481,34 +481,23 @@ mod tests {
}
}

/// A dummy implemented [TableMetadataAllocator].
pub struct DummyTableMetadataAllocator;

#[async_trait::async_trait]
impl TableMetadataAllocator for DummyTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_task: &CreateTableTask,
) -> Result<TableMetadata> {
unimplemented!()
}
}

#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let state_store = Arc::new(KvStateStore::new(kv_backend));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));

let _ = DdlManager::try_new(
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
Arc::new(WalOptionsAllocator::default()),
),
Arc::new(MemoryRegionKeeper::default()),
);

Expand Down
Loading

0 comments on commit 7551432

Please sign in to comment.