Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: merge standalone and metasrv table metadata allocators #3035

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions src/cmd/src/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use common_config::wal::StandaloneWalConfig;
use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef};
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
Expand All @@ -38,7 +39,6 @@ use datanode::datanode::{Datanode, DatanodeBuilder};
use file_engine::config::EngineConfig as FileEngineConfig;
use frontend::frontend::FrontendOptions;
use frontend::instance::builder::FrontendBuilder;
use frontend::instance::standalone::StandaloneTableMetadataAllocator;
use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager};
use frontend::service_config::{
GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions,
Expand Down Expand Up @@ -406,10 +406,8 @@ impl StartCommand {
opts.wal_meta.clone(),
kv_backend.clone(),
));
let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new(
table_id_sequence,
wal_options_allocator.clone(),
));
let table_meta_allocator =
TableMetadataAllocator::new(table_id_sequence, wal_options_allocator.clone());

let ddl_task_executor = Self::create_ddl_task_executor(
kv_backend.clone(),
Expand Down Expand Up @@ -446,7 +444,7 @@ impl StartCommand {
kv_backend: KvBackendRef,
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
table_meta_allocator: TableMetadataAllocator,
) -> Result<DdlTaskExecutorRef> {
let table_metadata_manager =
Self::create_table_metadata_manager(kv_backend.clone()).await?;
Expand Down
14 changes: 2 additions & 12 deletions src/common/meta/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@ use crate::error::Result;
use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{CreateTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};

pub mod alter_table;
pub mod create_table;
pub mod drop_table;
pub mod table_meta;
pub mod truncate_table;
pub mod utils;

Expand Down Expand Up @@ -64,17 +65,6 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}

#[async_trait::async_trait]
pub trait TableMetadataAllocator: Send + Sync {
async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> Result<TableMetadata>;
}

pub type TableMetadataAllocatorRef = Arc<dyn TableMetadataAllocator>;

#[derive(Clone)]
pub struct DdlContext {
pub datanode_manager: DatanodeManagerRef,
Expand Down
190 changes: 190 additions & 0 deletions src/common/meta/src/ddl/table_meta.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use common_catalog::consts::METRIC_ENGINE;
use common_telemetry::{debug, info};
use snafu::ensure;
use store_api::storage::{RegionId, RegionNumber, TableId};

use crate::ddl::{TableMetadata, TableMetadataAllocatorContext};
use crate::error::{Result, UnsupportedSnafu};
use crate::key::table_route::{LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue};
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::{Region, RegionRoute};
use crate::sequence::SequenceRef;
use crate::wal::{allocate_region_wal_options, WalOptionsAllocatorRef};

pub struct TableMetadataAllocator {
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
}

impl TableMetadataAllocator {
pub fn new(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
) -> Self {
Self::with_peer_allocator(
table_id_sequence,
wal_options_allocator,
Arc::new(NoopPeerAllocator),
)
}

pub fn with_peer_allocator(
table_id_sequence: SequenceRef,
wal_options_allocator: WalOptionsAllocatorRef,
peer_allocator: PeerAllocatorRef,
) -> Self {
Self {
table_id_sequence,
wal_options_allocator,
peer_allocator,
}
}

async fn allocate_table_id(&self, task: &CreateTableTask) -> Result<TableId> {
let table_id = if let Some(table_id) = &task.create_table.table_id {
let table_id = table_id.id;

ensure!(
!self
.table_id_sequence
.min_max()
.await
.contains(&(table_id as u64)),
UnsupportedSnafu {
operation: format!(
"create table by id {} that is reserved in this node",
table_id
)
}
);

info!(
"Received explicitly allocated table id {}, will use it directly.",
table_id
);

table_id
} else {
self.table_id_sequence.next().await? as TableId
};
Ok(table_id)
}

fn create_wal_options(
&self,
table_route: &TableRouteValue,
) -> Result<HashMap<RegionNumber, String>> {
match table_route {
TableRouteValue::Physical(x) => {
let region_numbers = x
.region_routes
.iter()
.map(|route| route.region.id.region_number())
.collect();
allocate_region_wal_options(region_numbers, &self.wal_options_allocator)
}
TableRouteValue::Logical(_) => Ok(HashMap::new()),
}
}

async fn create_table_route(
&self,
ctx: &TableMetadataAllocatorContext,
table_id: TableId,
task: &CreateTableTask,
) -> Result<TableRouteValue> {
let regions = task.partitions.len();

let table_route = if task.create_table.engine == METRIC_ENGINE {
TableRouteValue::Logical(LogicalTableRouteValue {})
} else {
let peers = self.peer_allocator.alloc(ctx, regions).await?;

let region_routes = task
.partitions
.iter()
.enumerate()
.map(|(i, partition)| {
let region = Region {
id: RegionId::new(table_id, i as u32),
partition: Some(partition.clone().into()),
..Default::default()
};

let peer = peers[i % peers.len()].clone();

RegionRoute {
region,
leader_peer: Some(peer),
..Default::default()
}
})
.collect::<Vec<_>>();
TableRouteValue::Physical(PhysicalTableRouteValue::new(region_routes))
};
Ok(table_route)
}
pub async fn create(
&self,
ctx: &TableMetadataAllocatorContext,
task: &CreateTableTask,
) -> Result<TableMetadata> {
let table_id = self.allocate_table_id(task).await?;
let table_route = self.create_table_route(ctx, table_id, task).await?;
let region_wal_options = self.create_wal_options(&table_route)?;

debug!(
"Allocated region wal options {:?} for table {}",
region_wal_options, table_id
);

Ok(TableMetadata {
table_id,
table_route,
region_wal_options,
})
}
}

pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;

/// [PeerAllocator] allocates [Peer]s for creating regions.
#[async_trait]
pub trait PeerAllocator: Send + Sync {
/// Allocates `regions` size [Peer]s.
async fn alloc(&self, ctx: &TableMetadataAllocatorContext, regions: usize)
-> Result<Vec<Peer>>;
}

struct NoopPeerAllocator;

#[async_trait]
impl PeerAllocator for NoopPeerAllocator {
async fn alloc(
&self,
_ctx: &TableMetadataAllocatorContext,
regions: usize,
) -> Result<Vec<Peer>> {
Ok(vec![Peer::default(); regions])
}
}
33 changes: 11 additions & 22 deletions src/common/meta/src/ddl_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use crate::datanode_manager::DatanodeManagerRef;
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{
DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext,
TableMetadataAllocatorRef,
};
use crate::error::{
self, RegisterProcedureLoaderSnafu, Result, SubmitProcedureSnafu, TableNotFoundSnafu,
Expand All @@ -54,7 +54,7 @@ pub struct DdlManager {
datanode_manager: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocator,
memory_region_keeper: MemoryRegionKeeperRef,
}

Expand All @@ -65,7 +65,7 @@ impl DdlManager {
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_metadata_allocator: TableMetadataAllocatorRef,
table_metadata_allocator: TableMetadataAllocator,
memory_region_keeper: MemoryRegionKeeperRef,
) -> Result<Self> {
let manager = Self {
Expand Down Expand Up @@ -461,15 +461,15 @@ mod tests {
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::region_keeper::MemoryRegionKeeper;
use crate::rpc::ddl::CreateTableTask;
use crate::sequence::SequenceBuilder;
use crate::state_store::KvStateStore;
use crate::wal::WalOptionsAllocator;

/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;
Expand All @@ -481,34 +481,23 @@ mod tests {
}
}

/// A dummy implemented [TableMetadataAllocator].
pub struct DummyTableMetadataAllocator;

#[async_trait::async_trait]
impl TableMetadataAllocator for DummyTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_task: &CreateTableTask,
) -> Result<TableMetadata> {
unimplemented!()
}
}

#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));

let state_store = Arc::new(KvStateStore::new(kv_backend));
let state_store = Arc::new(KvStateStore::new(kv_backend.clone()));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));

let _ = DdlManager::try_new(
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
TableMetadataAllocator::new(
Arc::new(SequenceBuilder::new("test", kv_backend).build()),
Arc::new(WalOptionsAllocator::default()),
),
Arc::new(MemoryRegionKeeper::default()),
);

Expand Down
Loading
Loading