From 839e653e0d0aaf09f6dab6163c3d527e18465730 Mon Sep 17 00:00:00 2001 From: niebayes Date: Tue, 19 Dec 2023 20:43:47 +0800 Subject: [PATCH] feat(remote_wal): add skeleton for remote wal related to meta srv (#2933) * feat: introduce wal config and kafka config * feat: introduce kafka topic manager and selector * feat: introduce region wal options * chore: build region wal options upon starting meta srv * feat: integrate region wal options allocator into table meta allocator * chore: add wal config to metasrv.example.toml * chore: add region wal options map to create table procedure * feat: augment region create request with wal options * feat: augment DatanodeTableValue with region wal options map * chore: encode region wal options upon constructing table creator * feat: persist region wal options when creating table meta * fix: sqlness test * chore: set default wal provider to raft-engine * refactor: refactor wal options * chore: update wal options allocator * refactor: rename region wal options to wal options * chore: update usages of region wal options * chore: add some comments to kafka * chore: fill in kafka config * test: add tests for serde wal config * test: add tests for wal options * refactor: refactor wal options allocator to enum * refactor: store wal options into the request options instead * fix: typo * fix: typo * refactor: move wal options map to region info * refactor: refacto serialization and deserialization of wal options * refactor: use serde_json to encode wal options * chore: rename wal_options_map to region_wal_options * chore: resolve some review comments * fix: typo * refactor: replace kecab-case with snake_case * fix: sqlness and converage tests * fix: typo * fix: coverage test * fix: coverage test * chore: resolve some review conversations * fix: resolve some review conversations * chore: format comments in metasrv.example.toml * chore: update import style * feat: integrate wal options allocator to standalone mode * test: add compatible test for OpenRegion * test: add compatible test for UpdateRegionMetadata * chore: remove based suffix from topic selector type --- Cargo.lock | 3 + config/metasrv.example.toml | 24 ++++ src/cmd/Cargo.toml | 1 + src/cmd/src/cli/bench.rs | 19 ++- src/cmd/src/cli/bench/metadata.rs | 11 +- src/cmd/src/cli/upgrade.rs | 5 + src/cmd/src/error.rs | 10 +- src/cmd/src/standalone.rs | 17 ++- src/common/meta/Cargo.toml | 2 + src/common/meta/src/ddl.rs | 16 ++- src/common/meta/src/ddl/create_table.rs | 36 ++++- src/common/meta/src/ddl_manager.rs | 44 ++++-- src/common/meta/src/error.rs | 15 ++- src/common/meta/src/instruction.rs | 60 ++++++++- src/common/meta/src/key.rs | 60 +++++++-- src/common/meta/src/key/datanode_table.rs | 64 ++++++++- src/common/meta/src/lib.rs | 2 + src/common/meta/src/wal.rs | 127 ++++++++++++++++++ src/common/meta/src/wal/kafka.rs | 60 +++++++++ src/common/meta/src/wal/kafka/topic.rs | 18 +++ .../meta/src/wal/kafka/topic_manager.rs | 61 +++++++++ .../meta/src/wal/kafka/topic_selector.rs | 51 +++++++ src/common/meta/src/wal/options_allocator.rs | 90 +++++++++++++ src/datanode/src/datanode.rs | 1 + src/datanode/src/heartbeat/handler.rs | 8 +- src/frontend/src/instance/standalone.rs | 43 ++++-- .../src/handler/region_lease_handler.rs | 4 +- src/meta-srv/src/metasrv.rs | 3 + src/meta-srv/src/metasrv/builder.rs | 11 +- src/meta-srv/src/procedure/region_failover.rs | 1 + .../region_failover/activate_region.rs | 17 ++- .../region_failover/deactivate_region.rs | 4 +- .../region_failover/update_metadata.rs | 61 ++++++++- .../region_migration/migration_start.rs | 9 +- .../region_migration/open_candidate_region.rs | 8 +- .../procedure/region_migration/test_util.rs | 3 +- .../downgrade_leader_region.rs | 7 +- .../rollback_downgraded_region.rs | 5 +- .../upgrade_candidate_region.rs | 22 +-- src/meta-srv/src/procedure/tests.rs | 8 +- src/meta-srv/src/region/lease_keeper.rs | 4 +- src/meta-srv/src/table_meta_alloc.rs | 33 ++++- src/meta-srv/src/test_util.rs | 3 +- src/operator/src/tests/partition_manager.rs | 17 ++- tests-integration/src/standalone.rs | 13 +- 45 files changed, 972 insertions(+), 109 deletions(-) create mode 100644 src/common/meta/src/wal.rs create mode 100644 src/common/meta/src/wal/kafka.rs create mode 100644 src/common/meta/src/wal/kafka/topic.rs create mode 100644 src/common/meta/src/wal/kafka/topic_manager.rs create mode 100644 src/common/meta/src/wal/kafka/topic_selector.rs create mode 100644 src/common/meta/src/wal/options_allocator.rs diff --git a/Cargo.lock b/Cargo.lock index ee7bde92e7b6..91a040543474 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1560,6 +1560,7 @@ dependencies = [ "servers", "session", "snafu", + "store-api", "substrait 0.4.4", "table", "temp-env", @@ -1831,11 +1832,13 @@ dependencies = [ "regex", "serde", "serde_json", + "serde_with", "snafu", "store-api", "strum 0.25.0", "table", "tokio", + "toml 0.7.8", "tonic 0.10.2", ] diff --git a/config/metasrv.example.toml b/config/metasrv.example.toml index 0be99c1dd048..aad33ce1afcf 100644 --- a/config/metasrv.example.toml +++ b/config/metasrv.example.toml @@ -42,3 +42,27 @@ first_heartbeat_estimate = "1000ms" # timeout = "10s" # connect_timeout = "10s" # tcp_nodelay = true + +[wal] +# Available wal providers: +# - "raft_engine" (default) +# - "kafka" +provider = "raft_engine" + +# There're none raft-engine wal config since meta srv only involves in remote wal currently. + +# Kafka wal config. +# The broker endpoints of the Kafka cluster. ["127.0.0.1:9090"] by default. +# broker_endpoints = ["127.0.0.1:9090"] +# Number of topics to be created upon start. +# num_topics = 64 +# Topic selector type. +# Available selector types: +# - "round_robin" (default) +# selector_type = "round_robin" +# A Kafka topic is constructed by concatenating `topic_name_prefix` and `topic_id`. +# topic_name_prefix = "greptimedb_kafka_wal" +# Number of partitions per topic. +# num_partitions = 1 +# Expected number of replicas of each partition. +# replication_factor = 3 diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 1ccd8fe21200..c28b2982877b 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -58,6 +58,7 @@ serde_json.workspace = true servers.workspace = true session.workspace = true snafu.workspace = true +store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true diff --git a/src/cmd/src/cli/bench.rs b/src/cmd/src/cli/bench.rs index 481d22d0b536..6cd7d86a1642 100644 --- a/src/cmd/src/cli/bench.rs +++ b/src/cmd/src/cli/bench.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -28,6 +28,7 @@ use common_telemetry::info; use datatypes::data_type::ConcreteDataType; use datatypes::schema::{ColumnSchema, RawSchema}; use rand::Rng; +use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableType}; use self::metadata::TableMetadataBencher; @@ -137,12 +138,12 @@ fn create_table_info(table_id: TableId, table_name: TableName) -> RawTableInfo { } } -fn create_region_routes() -> Vec { - let mut regions = Vec::with_capacity(100); +fn create_region_routes(regions: Vec) -> Vec { + let mut region_routes = Vec::with_capacity(100); let mut rng = rand::thread_rng(); - for region_id in 0..64u64 { - regions.push(RegionRoute { + for region_id in regions.into_iter().map(u64::from) { + region_routes.push(RegionRoute { region: Region { id: region_id.into(), name: String::new(), @@ -158,5 +159,11 @@ fn create_region_routes() -> Vec { }); } - regions + region_routes +} + +fn create_region_wal_options(regions: Vec) -> HashMap { + // TODO(niebayes): construct region wal options for benchmark. + let _ = regions; + HashMap::default() } diff --git a/src/cmd/src/cli/bench/metadata.rs b/src/cmd/src/cli/bench/metadata.rs index 11492c7c1f0b..7b77fed49dbd 100644 --- a/src/cmd/src/cli/bench/metadata.rs +++ b/src/cmd/src/cli/bench/metadata.rs @@ -17,7 +17,9 @@ use std::time::Instant; use common_meta::key::TableMetadataManagerRef; use common_meta::table_name::TableName; -use super::{bench_self_recorded, create_region_routes, create_table_info}; +use crate::cli::bench::{ + bench_self_recorded, create_region_routes, create_region_wal_options, create_table_info, +}; pub struct TableMetadataBencher { table_metadata_manager: TableMetadataManagerRef, @@ -43,12 +45,15 @@ impl TableMetadataBencher { let table_name = format!("bench_table_name_{}", i); let table_name = TableName::new("bench_catalog", "bench_schema", table_name); let table_info = create_table_info(i, table_name); - let region_routes = create_region_routes(); + + let regions: Vec<_> = (0..64).collect(); + let region_routes = create_region_routes(regions.clone()); + let region_wal_options = create_region_wal_options(regions); let start = Instant::now(); self.table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, region_wal_options) .await .unwrap(); diff --git a/src/cmd/src/cli/upgrade.rs b/src/cmd/src/cli/upgrade.rs index ac52e94b2b5e..e5615f4d8219 100644 --- a/src/cmd/src/cli/upgrade.rs +++ b/src/cmd/src/cli/upgrade.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; @@ -395,6 +396,9 @@ impl MigrateTableMetadata { let region_distribution: RegionDistribution = value.regions_id_map.clone().into_iter().collect(); + // TODO(niebayes): properly fetch or construct wal options. + let region_wal_options = HashMap::default(); + let datanode_table_kvs = region_distribution .into_iter() .map(|(datanode_id, regions)| { @@ -409,6 +413,7 @@ impl MigrateTableMetadata { engine: engine.to_string(), region_storage_path: region_storage_path.clone(), region_options: (&value.table_info.meta.options).into(), + region_wal_options: region_wal_options.clone(), }, ), ) diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ae84282b1c01..91d556ff3861 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use config::ConfigError; @@ -231,6 +231,12 @@ pub enum Error { #[snafu(source)] error: std::io::Error, }, + + #[snafu(display("Other error"))] + Other { + source: BoxedError, + location: Location, + }, } pub type Result = std::result::Result; @@ -276,6 +282,8 @@ impl ErrorExt for Error { Error::StartCatalogManager { source, .. } => source.status_code(), Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, + + Error::Other { source, .. } => source.status_code(), } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 3a499f6d6df7..55ed1d9817a5 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; +use common_error::ext::BoxedError; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -27,6 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; +use common_meta::wal::build_wal_options_allocator; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -35,7 +37,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataCreator; +use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, @@ -48,7 +50,7 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; @@ -372,7 +374,16 @@ impl StartCommand { .step(10) .build(), ); - let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); + // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_options_allocator = + build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( + table_id_sequence, + wal_options_allocator, + )); let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 3415c841aef4..59c0c4b54d5e 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -34,11 +34,13 @@ prost.workspace = true regex.workspace = true serde.workspace = true serde_json.workspace = true +serde_with = "3" snafu.workspace = true store-api.workspace = true strum.workspace = true table.workspace = true tokio.workspace = true +toml.workspace = true tonic.workspace = true [dev-dependencies] diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index be3d153c8c9f..5e03354fcca9 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -12,11 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::Partition; use common_telemetry::tracing_context::W3cTrace; -use store_api::storage::TableId; +use store_api::storage::{RegionNumber, TableId}; use table::metadata::RawTableInfo; use crate::cache_invalidator::CacheInvalidatorRef; @@ -54,6 +55,17 @@ pub struct TableMetadataAllocatorContext { pub cluster_id: u64, } +/// Metadata allocated to a table. +pub struct TableMetadata { + /// Table id. + pub table_id: TableId, + /// Route information for each region of the table. + pub region_routes: Vec, + /// The encoded wal options for regions of the table. + // If a region does not have an associated wal options, no key for the region would be found in the map. + pub region_wal_options: HashMap, +} + #[async_trait::async_trait] pub trait TableMetadataAllocator: Send + Sync { async fn create( @@ -61,7 +73,7 @@ pub trait TableMetadataAllocator: Send + Sync { ctx: &TableMetadataAllocatorContext, table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> Result<(TableId, Vec)>; + ) -> Result; } pub type TableMetadataAllocatorRef = Arc; diff --git a/src/common/meta/src/ddl/create_table.rs b/src/common/meta/src/ddl/create_table.rs index 7bcf0fad7f4f..1fefd4db62ca 100644 --- a/src/common/meta/src/ddl/create_table.rs +++ b/src/common/meta/src/ddl/create_table.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ CreateRequest as PbCreateRegionRequest, RegionColumnDef, RegionRequest, RegionRequestHeader, @@ -30,7 +32,7 @@ use futures::future::join_all; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; @@ -45,6 +47,7 @@ use crate::rpc::ddl::CreateTableTask; use crate::rpc::router::{ find_leader_regions, find_leaders, operating_leader_regions, RegionRoute, }; +use crate::wal::WAL_OPTIONS_KEY; pub struct CreateTableProcedure { pub context: DdlContext, @@ -58,11 +61,12 @@ impl CreateTableProcedure { cluster_id: u64, task: CreateTableTask, region_routes: Vec, + region_wal_options: HashMap, context: DdlContext, ) -> Self { Self { context, - creator: TableCreator::new(cluster_id, task, region_routes), + creator: TableCreator::new(cluster_id, task, region_routes, region_wal_options), } } @@ -94,6 +98,10 @@ impl CreateTableProcedure { &self.creator.data.region_routes } + pub fn region_wal_options(&self) -> &HashMap { + &self.creator.data.region_wal_options + } + /// Checks whether the table exists. async fn on_prepare(&mut self) -> Result { let expr = &self.creator.data.task.create_table; @@ -193,6 +201,7 @@ impl CreateTableProcedure { let create_table_data = &self.creator.data; let region_routes = &create_table_data.region_routes; + let region_wal_options = &create_table_data.region_wal_options; let create_table_expr = &create_table_data.task.create_table; let catalog = &create_table_expr.catalog_name; @@ -211,12 +220,12 @@ impl CreateTableProcedure { let mut requests = Vec::with_capacity(regions.len()); for region_number in regions { let region_id = RegionId::new(self.table_id(), region_number); - let create_region_request = request_builder .build_one( &self.creator.data.task.create_table, region_id, storage_path.clone(), + region_wal_options, ) .await?; @@ -262,8 +271,9 @@ impl CreateTableProcedure { let raw_table_info = self.table_info().clone(); let region_routes = self.region_routes().clone(); + let region_wal_options = self.region_wal_options().clone(); manager - .create_table_metadata(raw_table_info, region_routes) + .create_table_metadata(raw_table_info, region_routes, region_wal_options) .await?; info!("Created table metadata for table {table_id}"); @@ -316,13 +326,19 @@ pub struct TableCreator { } impl TableCreator { - pub fn new(cluster_id: u64, task: CreateTableTask, region_routes: Vec) -> Self { + pub fn new( + cluster_id: u64, + task: CreateTableTask, + region_routes: Vec, + region_wal_options: HashMap, + ) -> Self { Self { data: CreateTableData { state: CreateTableState::Prepare, cluster_id, task, region_routes, + region_wal_options, }, opening_regions: vec![], } @@ -371,6 +387,7 @@ pub struct CreateTableData { pub state: CreateTableState, pub task: CreateTableTask, pub region_routes: Vec, + pub region_wal_options: HashMap, pub cluster_id: u64, } @@ -406,11 +423,20 @@ impl CreateRequestBuilder { create_expr: &CreateTableExpr, region_id: RegionId, storage_path: String, + region_wal_options: &HashMap, ) -> Result { let mut request = self.template.clone(); request.region_id = region_id.as_u64(); request.path = storage_path; + // Stores the encoded wal options into the request options. + region_wal_options + .get(®ion_id.region_number()) + .and_then(|wal_options| { + request + .options + .insert(WAL_OPTIONS_KEY.to_string(), wal_options.clone()) + }); if self.template.engine == METRIC_ENGINE { self.metric_engine_hook(create_expr, region_id, &mut request) diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index cb57c03e400e..873ea04a7c17 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -12,12 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{info, tracing}; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionNumber; use crate::cache_invalidator::CacheInvalidatorRef; use crate::datanode_manager::DatanodeManagerRef; @@ -26,7 +28,7 @@ use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; use crate::ddl::{ - DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadataAllocatorContext, + DdlContext, DdlTaskExecutor, ExecutorContext, TableMetadata, TableMetadataAllocatorContext, TableMetadataAllocatorRef, }; use crate::error::{ @@ -52,7 +54,7 @@ pub struct DdlManager { datanode_manager: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_meta_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, } @@ -63,7 +65,7 @@ impl DdlManager { datanode_clients: DatanodeManagerRef, cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, - table_meta_allocator: TableMetadataAllocatorRef, + table_metadata_allocator: TableMetadataAllocatorRef, memory_region_keeper: MemoryRegionKeeperRef, ) -> Result { let manager = Self { @@ -71,7 +73,7 @@ impl DdlManager { datanode_manager: datanode_clients, cache_invalidator, table_metadata_manager, - table_meta_allocator, + table_metadata_allocator, memory_region_keeper, }; manager.register_loaders()?; @@ -176,11 +178,17 @@ impl DdlManager { cluster_id: u64, create_table_task: CreateTableTask, region_routes: Vec, + region_wal_options: HashMap, ) -> Result { let context = self.create_context(); - let procedure = - CreateTableProcedure::new(cluster_id, create_table_task, region_routes, context); + let procedure = CreateTableProcedure::new( + cluster_id, + create_table_task, + region_routes, + region_wal_options, + context, + ); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); @@ -374,8 +382,8 @@ async fn handle_create_table_task( cluster_id: u64, mut create_table_task: CreateTableTask, ) -> Result { - let (table_id, region_routes) = ddl_manager - .table_meta_allocator + let table_meta = ddl_manager + .table_metadata_allocator .create( &TableMetadataAllocatorContext { cluster_id }, &mut create_table_task.table_info, @@ -383,8 +391,19 @@ async fn handle_create_table_task( ) .await?; + let TableMetadata { + table_id, + region_routes, + region_wal_options, + } = table_meta; + let id = ddl_manager - .submit_create_table_task(cluster_id, create_table_task, region_routes) + .submit_create_table_task( + cluster_id, + create_table_task, + region_routes, + region_wal_options, + ) .await?; info!("Table: {table_id:?} is created via procedure_id {id:?}"); @@ -437,7 +456,7 @@ mod tests { use api::v1::meta::Partition; use common_procedure::local::LocalManager; - use table::metadata::{RawTableInfo, TableId}; + use table::metadata::RawTableInfo; use super::DdlManager; use crate::cache_invalidator::DummyCacheInvalidator; @@ -446,13 +465,12 @@ mod tests { use crate::ddl::create_table::CreateTableProcedure; use crate::ddl::drop_table::DropTableProcedure; use crate::ddl::truncate_table::TruncateTableProcedure; - use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; + use crate::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use crate::error::Result; use crate::key::TableMetadataManager; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::region_keeper::MemoryRegionKeeper; - use crate::rpc::router::RegionRoute; use crate::state_store::KvStateStore; /// A dummy implemented [DatanodeManager]. @@ -475,7 +493,7 @@ mod tests { _ctx: &TableMetadataAllocatorContext, _table_info: &mut RawTableInfo, _partitions: &[Partition], - ) -> Result<(TableId, Vec)> { + ) -> Result { unimplemented!() } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 74faf064f9b7..4f3a9aca9742 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -23,6 +23,7 @@ use store_api::storage::{RegionId, RegionNumber}; use table::metadata::TableId; use crate::peer::Peer; +use crate::wal::WalOptions; use crate::DatanodeId; #[derive(Snafu)] @@ -284,6 +285,17 @@ pub enum Error { #[snafu(display("Retry later"))] RetryLater { source: BoxedError }, + + #[snafu(display( + "Failed to encode a wal options to json string, wal_options: {:?}", + wal_options + ))] + EncodeWalOptionsToJson { + wal_options: WalOptions, + #[snafu(source)] + error: serde_json::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -318,7 +330,8 @@ impl ErrorExt for Error { | BuildTableMeta { .. } | TableRouteNotFound { .. } | ConvertRawTableInfo { .. } - | RegionOperatingRace { .. } => StatusCode::Unexpected, + | RegionOperatingRace { .. } + | EncodeWalOptionsToJson { .. } => StatusCode::Unexpected, SendMessage { .. } | GetKvCache { .. } diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 8ad58552f267..80e2c53045a0 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -91,19 +91,27 @@ impl Display for OpenRegion { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct OpenRegion { pub region_ident: RegionIdent, pub region_storage_path: String, - pub options: HashMap, + pub region_options: HashMap, + #[serde(default)] + pub region_wal_options: HashMap, } impl OpenRegion { - pub fn new(region_ident: RegionIdent, path: &str, options: HashMap) -> Self { + pub fn new( + region_ident: RegionIdent, + path: &str, + region_options: HashMap, + region_wal_options: HashMap, + ) -> Self { Self { region_ident, region_storage_path: path.to_string(), - options, + region_options, + region_wal_options, } } } @@ -218,12 +226,13 @@ mod tests { }, "test/foo", HashMap::new(), + HashMap::new(), )); let serialized = serde_json::to_string(&open_region).unwrap(); assert_eq!( - r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","options":{}}}"#, + r#"{"OpenRegion":{"region_ident":{"cluster_id":1,"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{}}}"#, serialized ); @@ -242,4 +251,45 @@ mod tests { serialized ); } + + #[derive(Debug, Clone, Serialize, Deserialize)] + struct LegacyOpenRegion { + region_ident: RegionIdent, + region_storage_path: String, + region_options: HashMap, + } + + #[test] + fn test_compatible_serialize_open_region() { + let region_ident = RegionIdent { + cluster_id: 1, + datanode_id: 2, + table_id: 1024, + region_number: 1, + engine: "mito2".to_string(), + }; + let region_storage_path = "test/foo".to_string(); + let region_options = HashMap::from([ + ("a".to_string(), "aa".to_string()), + ("b".to_string(), "bb".to_string()), + ]); + + // Serialize a legacy OpenRegion. + let legacy_open_region = LegacyOpenRegion { + region_ident: region_ident.clone(), + region_storage_path: region_storage_path.clone(), + region_options: region_options.clone(), + }; + let serialized = serde_json::to_string(&legacy_open_region).unwrap(); + + // Deserialize to OpenRegion. + let deserialized = serde_json::from_str(&serialized).unwrap(); + let expected = OpenRegion { + region_ident, + region_storage_path, + region_options, + region_wal_options: HashMap::new(), + }; + assert_eq!(expected, deserialized); + } } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 850051fa0b93..d6ed8e04c5bf 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -364,6 +364,7 @@ impl TableMetadataManager { &self, mut table_info: RawTableInfo, region_routes: Vec, + region_wal_options: HashMap, ) -> Result<()> { let region_numbers = region_routes .iter() @@ -399,6 +400,7 @@ impl TableMetadataManager { &engine, ®ion_storage_path, region_options, + region_wal_options, distribution, )?; @@ -587,6 +589,7 @@ impl TableMetadataManager { current_table_route_value: &DeserializedValueWithBytes, new_region_routes: Vec, new_region_options: &HashMap, + new_region_wal_options: &HashMap, ) -> Result<()> { // Updates the datanode table key value pairs. let current_region_distribution = @@ -599,6 +602,7 @@ impl TableMetadataManager { current_region_distribution, new_region_distribution, new_region_options, + new_region_wal_options, )?; // Updates the table_route. @@ -827,19 +831,31 @@ mod tests { new_test_table_info(region_routes.iter().map(|r| r.region.id.region_number())).into(); // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); // if metadata was already created, it should be ok. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); let mut modified_region_routes = region_routes.clone(); modified_region_routes.push(region_route.clone()); // if remote metadata was exists, it should return an error. assert!(table_metadata_manager - .create_table_metadata(table_info.clone(), modified_region_routes) + .create_table_metadata( + table_info.clone(), + modified_region_routes, + HashMap::default() + ) .await .is_err()); @@ -873,7 +889,11 @@ mod tests { // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); @@ -944,7 +964,11 @@ mod tests { let table_id = table_info.ident.table_id; // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); let new_table_name = "another_name".to_string(); @@ -1012,7 +1036,11 @@ mod tests { let table_id = table_info.ident.table_id; // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); let mut new_table_info = table_info.clone(); @@ -1089,7 +1117,11 @@ mod tests { DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); @@ -1155,7 +1187,11 @@ mod tests { DeserializedValueWithBytes::from_inner(TableRouteValue::new(region_routes.clone())); // creates metadata. table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); assert_datanode_table(&table_metadata_manager, table_id, ®ion_routes).await; @@ -1172,10 +1208,12 @@ mod tests { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), + region_wal_options: HashMap::new(), }, ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), + &HashMap::new(), ) .await .unwrap(); @@ -1189,10 +1227,12 @@ mod tests { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), + region_wal_options: HashMap::new(), }, ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), + &HashMap::new(), ) .await .unwrap(); @@ -1211,10 +1251,12 @@ mod tests { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), + region_wal_options: HashMap::new(), }, ¤t_table_route_value, new_region_routes.clone(), &HashMap::new(), + &HashMap::new(), ) .await .unwrap(); @@ -1236,10 +1278,12 @@ mod tests { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: HashMap::new(), + region_wal_options: HashMap::new(), }, &wrong_table_route_value, new_region_routes, &HashMap::new(), + &HashMap::new(), ) .await .is_err()); diff --git a/src/common/meta/src/key/datanode_table.rs b/src/common/meta/src/key/datanode_table.rs index 1872f36b4fa7..3ddb00a19ac2 100644 --- a/src/common/meta/src/key/datanode_table.rs +++ b/src/common/meta/src/key/datanode_table.rs @@ -38,14 +38,18 @@ use crate::DatanodeId; /// For compatible reason, DON'T modify the field name. pub struct RegionInfo { #[serde(default)] - // The table engine, it SHOULD be immutable after created. + /// The table engine, it SHOULD be immutable after created. pub engine: String, - // The region storage path, it SHOULD be immutable after created. + /// The region storage path, it SHOULD be immutable after created. #[serde(default)] pub region_storage_path: String, - // The region options. + /// The region options. #[serde(default)] pub region_options: HashMap, + /// The per-region wal options. + /// Key: region number (in string representation). Value: the encoded wal options of the region. + #[serde(default)] + pub region_wal_options: HashMap, } pub struct DatanodeTableKey { @@ -165,11 +169,21 @@ impl DatanodeTableManager { engine: &str, region_storage_path: &str, region_options: HashMap, + region_wal_options: HashMap, distribution: RegionDistribution, ) -> Result { let txns = distribution .into_iter() .map(|(datanode_id, regions)| { + let filtered_region_wal_options = regions + .iter() + .filter_map(|region_number| { + region_wal_options + .get(region_number) + .map(|wal_options| (region_number.to_string(), wal_options.clone())) + }) + .collect(); + let key = DatanodeTableKey::new(datanode_id, table_id); let val = DatanodeTableValue::new( table_id, @@ -178,6 +192,7 @@ impl DatanodeTableManager { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: region_options.clone(), + region_wal_options: filtered_region_wal_options, }, ); @@ -198,6 +213,7 @@ impl DatanodeTableManager { current_region_distribution: RegionDistribution, new_region_distribution: RegionDistribution, new_region_options: &HashMap, + new_region_wal_options: &HashMap, ) -> Result { let mut opts = Vec::new(); @@ -209,12 +225,15 @@ impl DatanodeTableManager { opts.push(TxnOp::Delete(raw_key)) } } + let need_update_options = region_info.region_options != *new_region_options; + let need_update_wal_options = region_info.region_wal_options != *new_region_wal_options; + for (datanode, regions) in new_region_distribution.into_iter() { let need_update = if let Some(current_region) = current_region_distribution.get(&datanode) { // Updates if need. - *current_region != regions || need_update_options + *current_region != regions || need_update_options || need_update_wal_options } else { true }; @@ -272,7 +291,7 @@ mod tests { region_info: RegionInfo::default(), version: 1, }; - let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"version":1}"#; + let literal = br#"{"table_id":42,"regions":[1,2,3],"engine":"","region_storage_path":"","region_options":{},"region_wal_options":{},"version":1}"#; let raw_value = value.try_as_raw_value().unwrap(); assert_eq!(raw_value, literal); @@ -286,6 +305,41 @@ mod tests { assert!(parsed.is_ok()); } + // This test intends to ensure both the `serde_json::to_string` + `serde_json::from_str` + // and `serde_json::to_vec` + `serde_json::from_slice` work for `DatanodeTableValue`. + // Warning: if the key of `region_wal_options` is of type non-String, this test would fail. + #[test] + fn test_serde_with_region_info() { + let region_info = RegionInfo { + engine: "test_engine".to_string(), + region_storage_path: "test_storage_path".to_string(), + region_options: HashMap::from([ + ("a".to_string(), "aa".to_string()), + ("b".to_string(), "bb".to_string()), + ("c".to_string(), "cc".to_string()), + ]), + region_wal_options: HashMap::from([ + ("1".to_string(), "aaa".to_string()), + ("2".to_string(), "bbb".to_string()), + ("3".to_string(), "ccc".to_string()), + ]), + }; + let table_value = DatanodeTableValue { + table_id: 1, + regions: vec![], + region_info, + version: 1, + }; + + let encoded = serde_json::to_string(&table_value).unwrap(); + let decoded = serde_json::from_str(&encoded).unwrap(); + assert_eq!(table_value, decoded); + + let encoded = serde_json::to_vec(&table_value).unwrap(); + let decoded = serde_json::from_slice(&encoded).unwrap(); + assert_eq!(table_value, decoded); + } + #[test] fn test_strip_table_id() { fn test_err(raw_key: &[u8]) { diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index 2345bf7d670a..d343e9aa69f6 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -35,6 +35,8 @@ pub mod sequence; pub mod state_store; pub mod table_name; pub mod util; +#[allow(unused)] +pub mod wal; pub type ClusterId = u64; pub type DatanodeId = u64; diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs new file mode 100644 index 000000000000..8b3acb82520b --- /dev/null +++ b/src/common/meta/src/wal.rs @@ -0,0 +1,127 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod kafka; +pub mod options_allocator; + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use serde_with::with_prefix; + +use crate::error::Result; +use crate::wal::kafka::KafkaConfig; +pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; +pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator}; + +/// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair +/// and inserted into the options of a `RegionCreateRequest`. +pub const WAL_OPTIONS_KEY: &str = "wal_options"; + +/// Wal configurations for bootstrapping metasrv. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, Default)] +#[serde(tag = "provider")] +pub enum WalConfig { + #[default] + #[serde(rename = "raft_engine")] + RaftEngine, + #[serde(rename = "kafka")] + Kafka(KafkaConfig), +} + +/// Wal options allocated to a region. +/// A wal options is encoded by metasrv into a `String` with `serde_json::to_string`. +/// It's then decoded by datanode to a `HashMap` with `serde_json::from_str`. +/// Such a encoding/decoding scheme is inspired by the encoding/decoding of `RegionOptions`. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)] +#[serde(tag = "wal.provider")] +pub enum WalOptions { + #[default] + #[serde(rename = "raft_engine")] + RaftEngine, + #[serde(rename = "kafka")] + #[serde(with = "prefix_wal_kafka")] + Kafka(KafkaWalOptions), +} + +with_prefix!(prefix_wal_kafka "wal.kafka."); + +#[cfg(test)] +mod tests { + use super::*; + use crate::wal::kafka::topic_selector::SelectorType as KafkaTopicSelectorType; + + #[test] + fn test_serde_wal_config() { + // Test serde raft-engine wal config with none other wal config. + let toml_str = r#" + provider = "raft_engine" + "#; + let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(wal_config, WalConfig::RaftEngine); + + // Test serde raft-engine wal config with extra other wal config. + let toml_str = r#" + provider = "raft_engine" + broker_endpoints = ["127.0.0.1:9090"] + num_topics = 32 + "#; + let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); + assert_eq!(wal_config, WalConfig::RaftEngine); + + // Test serde kafka wal config. + let toml_str = r#" + provider = "kafka" + broker_endpoints = ["127.0.0.1:9090"] + num_topics = 32 + selector_type = "round_robin" + topic_name_prefix = "greptimedb_kafka_wal" + num_partitions = 1 + replication_factor = 3 + "#; + let wal_config: WalConfig = toml::from_str(toml_str).unwrap(); + let expected_kafka_config = KafkaConfig { + broker_endpoints: vec!["127.0.0.1:9090".to_string()], + num_topics: 32, + selector_type: KafkaTopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_kafka_wal".to_string(), + num_partitions: 1, + replication_factor: 3, + }; + assert_eq!(wal_config, WalConfig::Kafka(expected_kafka_config)); + } + + #[test] + fn test_serde_wal_options() { + // Test serde raft-engine wal options. + let wal_options = WalOptions::RaftEngine; + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"raft_engine"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + + // Test serde kafka wal options. + let wal_options = WalOptions::Kafka(KafkaWalOptions { + topic: "test_topic".to_string(), + }); + let encoded = serde_json::to_string(&wal_options).unwrap(); + let expected = r#"{"wal.provider":"kafka","wal.kafka.topic":"test_topic"}"#; + assert_eq!(&encoded, expected); + + let decoded: WalOptions = serde_json::from_str(&encoded).unwrap(); + assert_eq!(decoded, wal_options); + } +} diff --git a/src/common/meta/src/wal/kafka.rs b/src/common/meta/src/wal/kafka.rs new file mode 100644 index 000000000000..b24d14453186 --- /dev/null +++ b/src/common/meta/src/wal/kafka.rs @@ -0,0 +1,60 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod topic; +pub mod topic_manager; +pub mod topic_selector; + +use serde::{Deserialize, Serialize}; + +pub use crate::wal::kafka::topic::Topic; +pub use crate::wal::kafka::topic_manager::TopicManager; +use crate::wal::kafka::topic_selector::SelectorType as TopicSelectorType; + +/// Configurations for bootstrapping a kafka wal. +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct KafkaConfig { + /// The broker endpoints of the Kafka cluster. + pub broker_endpoints: Vec, + /// Number of topics to be created upon start. + pub num_topics: usize, + /// The type of the topic selector with which to select a topic for a region. + pub selector_type: TopicSelectorType, + /// Topic name prefix. + pub topic_name_prefix: String, + /// Number of partitions per topic. + pub num_partitions: i32, + /// The replication factor of each topic. + pub replication_factor: i16, +} + +impl Default for KafkaConfig { + fn default() -> Self { + Self { + broker_endpoints: vec!["127.0.0.1:9090".to_string()], + num_topics: 64, + selector_type: TopicSelectorType::RoundRobin, + topic_name_prefix: "greptimedb_kafka_wal".to_string(), + num_partitions: 1, + replication_factor: 3, + } + } +} + +/// Kafka wal options allocated to a region. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct KafkaOptions { + /// Kafka wal topic. + pub topic: Topic, +} diff --git a/src/common/meta/src/wal/kafka/topic.rs b/src/common/meta/src/wal/kafka/topic.rs new file mode 100644 index 000000000000..67223637f1ae --- /dev/null +++ b/src/common/meta/src/wal/kafka/topic.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Kafka wal topic. +/// Publishers publish log entries to the topic while subscribers pull log entries from the topic. +/// A topic is simply a string right now. But it may be more complex in the future. +pub type Topic = String; diff --git a/src/common/meta/src/wal/kafka/topic_manager.rs b/src/common/meta/src/wal/kafka/topic_manager.rs new file mode 100644 index 000000000000..43d829bdbcab --- /dev/null +++ b/src/common/meta/src/wal/kafka/topic_manager.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::error::Result; +use crate::kv_backend::KvBackendRef; +use crate::wal::kafka::topic::Topic; +use crate::wal::kafka::topic_selector::{RoundRobinTopicSelector, SelectorType, TopicSelectorRef}; +use crate::wal::kafka::KafkaConfig; + +/// Manages topic initialization and selection. +pub struct TopicManager { + topic_pool: Vec, + topic_selector: TopicSelectorRef, + kv_backend: KvBackendRef, +} + +impl TopicManager { + /// Creates a new topic manager. + pub fn new(config: &KafkaConfig, kv_backend: KvBackendRef) -> Self { + let selector = match config.selector_type { + SelectorType::RoundRobin => RoundRobinTopicSelector::new(), + }; + + Self { + topic_pool: Vec::new(), + topic_selector: Arc::new(selector), + kv_backend, + } + } + + /// Tries to initialize the topic pool. + /// The initializer first tries to restore persisted topics from the kv backend. + /// If not enough topics retrieved, the initializer will try to contact the Kafka cluster and request more topics. + pub async fn try_init(&mut self) -> Result<()> { + todo!() + } + + /// Selects one topic from the topic pool through the topic selector. + pub fn select(&self) -> &Topic { + self.topic_selector.select(&self.topic_pool) + } + + /// Selects a batch of topics from the topic pool through the topic selector. + pub fn select_batch(&self, num_topics: usize) -> Vec { + // TODO(niebayes): calls `select` to select a collection of topics in a batching manner. + vec!["tmp_topic".to_string(); num_topics] + } +} diff --git a/src/common/meta/src/wal/kafka/topic_selector.rs b/src/common/meta/src/wal/kafka/topic_selector.rs new file mode 100644 index 000000000000..f29ac55b6ba8 --- /dev/null +++ b/src/common/meta/src/wal/kafka/topic_selector.rs @@ -0,0 +1,51 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use serde::{Deserialize, Serialize}; + +use crate::wal::kafka::topic::Topic; + +/// The type of the topic selector, i.e. with which strategy to select a topic. +#[derive(Default, Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum SelectorType { + #[default] + #[serde(rename = "round_robin")] + RoundRobin, +} + +/// Controls topic selection. +pub(super) trait TopicSelector: Send + Sync { + /// Selects a topic from the topic pool. + fn select(&self, topic_pool: &[Topic]) -> &Topic; +} + +pub(super) type TopicSelectorRef = Arc; + +/// A topic selector with the round-robin strategy, i.e. selects topics in a round-robin manner. +pub(super) struct RoundRobinTopicSelector; + +impl RoundRobinTopicSelector { + /// Creates a new round-robin topic selector. + pub(super) fn new() -> Self { + todo!() + } +} + +impl TopicSelector for RoundRobinTopicSelector { + fn select(&self, topic_pool: &[Topic]) -> &Topic { + todo!() + } +} diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs new file mode 100644 index 000000000000..b6518199be52 --- /dev/null +++ b/src/common/meta/src/wal/options_allocator.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use snafu::ResultExt; +use store_api::storage::RegionNumber; + +use crate::error::{EncodeWalOptionsToJsonSnafu, Result}; +use crate::kv_backend::KvBackendRef; +use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager}; +use crate::wal::{WalConfig, WalOptions}; + +/// Allocates wal options in region granularity. +#[derive(Default)] +pub enum WalOptionsAllocator { + #[default] + RaftEngine, + Kafka(KafkaTopicManager), +} + +impl WalOptionsAllocator { + /// Creates a WalOptionsAllocator. + pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self { + todo!() + } + + /// Tries to initialize the allocator. + pub fn try_init(&self) -> Result<()> { + todo!() + } + + /// Allocates a wal options for a region. + pub fn alloc(&self) -> WalOptions { + todo!() + } + + /// Allocates a batch of wal options where each wal options goes to a region. + pub fn alloc_batch(&self, num_regions: usize) -> Vec { + match self { + WalOptionsAllocator::RaftEngine => vec![WalOptions::RaftEngine; num_regions], + WalOptionsAllocator::Kafka(topic_manager) => { + let topics = topic_manager.select_batch(num_regions); + topics + .into_iter() + .map(|topic| WalOptions::Kafka(KafkaOptions { topic })) + .collect() + } + } + } +} + +/// Allocates a wal options for each region. The allocated wal options is encoded immediately. +pub fn build_region_wal_options( + regions: Vec, + wal_options_allocator: &WalOptionsAllocator, +) -> Result> { + let wal_options = wal_options_allocator + .alloc_batch(regions.len()) + .into_iter() + .map(|wal_options| { + serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options }) + }) + .collect::>>()?; + + Ok(regions.into_iter().zip(wal_options).collect()) +} + +/// Builds a wal options allocator. +// TODO(niebayes): implement. +pub async fn build_wal_options_allocator( + config: &WalConfig, + kv_backend: &KvBackendRef, +) -> Result { + let _ = config; + let _ = kv_backend; + Ok(WalOptionsAllocator::default()) +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 0646771459ec..6ea9288debb5 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -534,6 +534,7 @@ mod tests { "mock", "foo/bar/weny", HashMap::from([("foo".to_string(), "bar".to_string())]), + HashMap::default(), BTreeMap::from([(0, vec![0, 1, 2])]), ) .unwrap(); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index cd1d692a4dd7..6f0609f689fa 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -53,14 +53,17 @@ impl RegionHeartbeatResponseHandler { Instruction::OpenRegion(OpenRegion { region_ident, region_storage_path, - options, + region_options, + region_wal_options, }) => Ok(Box::new(|region_server| { Box::pin(async move { let region_id = Self::region_ident_to_region_id(®ion_ident); + // TODO(niebayes): extends region options with region_wal_options. + let _ = region_wal_options; let request = RegionRequest::Open(RegionOpenRequest { engine: region_ident.engine, region_dir: region_dir(®ion_storage_path, region_id), - options, + options: region_options, }); let result = region_server.handle_request(region_id, request).await; @@ -239,6 +242,7 @@ mod tests { }, path, HashMap::new(), + HashMap::new(), )) } diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 00a510f27d8d..95229d343694 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use api::v1::meta::Partition; @@ -20,18 +21,20 @@ use async_trait::async_trait; use client::region::check_response_header; use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; -use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; +use common_meta::wal::options_allocator::build_region_wal_options; +use common_meta::wal::WalOptionsAllocator; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{debug, tracing}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, TableId}; +use store_api::storage::RegionId; use table::metadata::RawTableInfo; use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; @@ -104,24 +107,28 @@ impl Datanode for RegionInvoker { } } -pub struct StandaloneTableMetadataCreator { +pub struct StandaloneTableMetadataAllocator { table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocator, } -impl StandaloneTableMetadataCreator { - pub fn new(table_id_sequence: SequenceRef) -> Self { - Self { table_id_sequence } +impl StandaloneTableMetadataAllocator { + pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self { + Self { + table_id_sequence, + wal_options_allocator, + } } } #[async_trait] -impl TableMetadataAllocator for StandaloneTableMetadataCreator { +impl TableMetadataAllocator for StandaloneTableMetadataAllocator { async fn create( &self, _ctx: &TableMetadataAllocatorContext, raw_table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> MetaResult<(TableId, Vec)> { + ) -> MetaResult { let table_id = self.table_id_sequence.next().await? as u32; raw_table_info.ident.table_id = table_id; let region_routes = partitions @@ -144,6 +151,22 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator { }) .collect::>(); - Ok((table_id, region_routes)) + let region_numbers = region_routes + .iter() + .map(|route| route.region.id.region_number()) + .collect(); + let region_wal_options = + build_region_wal_options(region_numbers, &self.wal_options_allocator)?; + + debug!( + "Allocated region wal options {:?} for table {}", + region_wal_options, table_id + ); + + Ok(TableMetadata { + table_id, + region_routes, + region_wal_options: HashMap::default(), + }) } } diff --git a/src/meta-srv/src/handler/region_lease_handler.rs b/src/meta-srv/src/handler/region_lease_handler.rs index 2c4ec0a1a86c..d3433179fea0 100644 --- a/src/meta-srv/src/handler/region_lease_handler.rs +++ b/src/meta-srv/src/handler/region_lease_handler.rs @@ -161,7 +161,7 @@ mod test { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -303,7 +303,7 @@ mod test { let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index d1b91bf03cb8..906d9847c617 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -26,6 +26,7 @@ use common_meta::ddl::DdlTaskExecutorRef; use common_meta::key::TableMetadataManagerRef; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef}; use common_meta::region_keeper::MemoryRegionKeeperRef; +use common_meta::wal::WalConfig; use common_procedure::options::ProcedureConfig; use common_procedure::ProcedureManagerRef; use common_telemetry::logging::LoggingOptions; @@ -70,6 +71,7 @@ pub struct MetaSrvOptions { pub datanode: DatanodeOptions, pub enable_telemetry: bool, pub data_home: String, + pub wal: WalConfig, } impl Default for MetaSrvOptions { @@ -94,6 +96,7 @@ impl Default for MetaSrvOptions { datanode: DatanodeOptions::default(), enable_telemetry: true, data_home: METASRV_HOME.to_string(), + wal: WalConfig::default(), } } } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index ee312b28f09a..0e168d522d62 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,6 +19,7 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::TableMetadataAllocatorRef; @@ -30,13 +31,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; +use common_meta::wal::build_wal_options_allocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::{self, Result}; +use crate::error::{self, OtherSnafu, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -204,6 +206,10 @@ impl MetaSrvBuilder { table_id: None, }; + let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -214,7 +220,8 @@ impl MetaSrvBuilder { Arc::new(MetaSrvTableMetadataAllocator::new( selector_ctx.clone(), selector.clone(), - sequence, + sequence.clone(), + wal_options_allocator, )) }); diff --git a/src/meta-srv/src/procedure/region_failover.rs b/src/meta-srv/src/procedure/region_failover.rs index f96226f5e0a7..d47fc8384cdf 100644 --- a/src/meta-srv/src/procedure/region_failover.rs +++ b/src/meta-srv/src/procedure/region_failover.rs @@ -621,6 +621,7 @@ mod tests { opening_region, &path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/activate_region.rs b/src/meta-srv/src/procedure/region_failover/activate_region.rs index b758524018b9..d586e12ecad0 100644 --- a/src/meta-srv/src/procedure/region_failover/activate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/activate_region.rs @@ -41,8 +41,10 @@ pub(super) struct ActivateRegion { // the new leader node needs to remark the failed region as "inactive" // to prevent it from renewing the lease. remark_inactive_region: bool, + // An `None` option stands for uninitialized. region_storage_path: Option, region_options: Option>, + region_wal_options: Option>, } impl ActivateRegion { @@ -52,6 +54,7 @@ impl ActivateRegion { remark_inactive_region: false, region_storage_path: None, region_options: None, + region_wal_options: None, } } @@ -81,14 +84,19 @@ impl ActivateRegion { }; info!("Activating region: {candidate_ident:?}"); let region_options: HashMap = (&table_info.meta.options).into(); + // TODO(niebayes): properly fetch or construct region wal options. + let region_wal_options = HashMap::new(); let instruction = Instruction::OpenRegion(OpenRegion::new( candidate_ident.clone(), ®ion_storage_path, region_options.clone(), + region_wal_options.clone(), )); self.region_storage_path = Some(region_storage_path); self.region_options = Some(region_options); + self.region_wal_options = Some(region_wal_options); + let msg = MailboxMessage::json_message( "Activate Region", &format!("Metasrv@{}", ctx.selector_ctx.server_addr), @@ -137,6 +145,11 @@ impl ActivateRegion { .context(error::UnexpectedSnafu { violated: "expected region_options", })?, + self.region_wal_options + .clone() + .context(error::UnexpectedSnafu { + violated: "expected region_wal_options", + })?, ))) } else { // The region could be just indeed cannot be opened by the candidate, retry @@ -222,6 +235,7 @@ mod tests { }, &env.path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) @@ -256,7 +270,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {} }"# + r#"UpdateRegionMetadata { candidate: Peer { id: 2, addr: "" }, region_storage_path: "greptime/public", region_options: {}, region_wal_options: {} }"# ); } @@ -292,6 +306,7 @@ mod tests { }, &env.path, HashMap::new(), + HashMap::new(), ))) .unwrap(), )) diff --git a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs index c120b30a26da..84466eb19928 100644 --- a/src/meta-srv/src/procedure/region_failover/deactivate_region.rs +++ b/src/meta-srv/src/procedure/region_failover/deactivate_region.rs @@ -277,7 +277,7 @@ mod tests { .unwrap(); assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"# ); } @@ -319,7 +319,7 @@ mod tests { // Timeout or not, proceed to `ActivateRegion`. assert_eq!( format!("{next_state:?}"), - r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None }"# + r#"ActivateRegion { candidate: Peer { id: 2, addr: "" }, remark_inactive_region: false, region_storage_path: None, region_options: None, region_wal_options: None }"# ); } } diff --git a/src/meta-srv/src/procedure/region_failover/update_metadata.rs b/src/meta-srv/src/procedure/region_failover/update_metadata.rs index 225588ac0942..505f1cb55a51 100644 --- a/src/meta-srv/src/procedure/region_failover/update_metadata.rs +++ b/src/meta-srv/src/procedure/region_failover/update_metadata.rs @@ -30,11 +30,13 @@ use crate::error::{self, Result, RetryLaterSnafu, TableRouteNotFoundSnafu}; use crate::lock::keys::table_metadata_lock_key; use crate::lock::Opts; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq)] pub(super) struct UpdateRegionMetadata { candidate: Peer, region_storage_path: String, region_options: HashMap, + #[serde(default)] + region_wal_options: HashMap, } impl UpdateRegionMetadata { @@ -42,11 +44,13 @@ impl UpdateRegionMetadata { candidate: Peer, region_storage_path: String, region_options: HashMap, + region_wal_options: HashMap, ) -> Self { Self { candidate, region_storage_path, region_options, + region_wal_options, } } @@ -104,10 +108,12 @@ impl UpdateRegionMetadata { engine: engine.to_string(), region_storage_path: self.region_storage_path.to_string(), region_options: self.region_options.clone(), + region_wal_options: self.region_wal_options.clone(), }, &table_route_value, new_region_routes, &self.region_options, + &self.region_wal_options, ) .await .context(error::UpdateTableRouteSnafu)?; @@ -188,8 +194,12 @@ mod tests { let env = TestingEnvBuilder::new().build().await; let failed_region = env.failed_region(1).await; - let mut state = - UpdateRegionMetadata::new(Peer::new(2, ""), env.path.clone(), HashMap::new()); + let mut state = UpdateRegionMetadata::new( + Peer::new(2, ""), + env.path.clone(), + HashMap::new(), + HashMap::new(), + ); let next_state = state.next(&env.context, &failed_region).await.unwrap(); assert_eq!(format!("{next_state:?}"), "InvalidateCache"); @@ -206,6 +216,7 @@ mod tests { Peer::new(candidate, ""), env.path.clone(), HashMap::new(), + HashMap::new(), ); state .update_table_route(&env.context, &failed_region) @@ -348,7 +359,12 @@ mod tests { let path = env.path.clone(); let _ = futures::future::join_all(vec![ tokio::spawn(async move { - let state = UpdateRegionMetadata::new(Peer::new(2, ""), path, HashMap::new()); + let state = UpdateRegionMetadata::new( + Peer::new(2, ""), + path, + HashMap::new(), + HashMap::new(), + ); state .update_metadata(&ctx_1, &failed_region_1) .await @@ -359,6 +375,7 @@ mod tests { Peer::new(3, ""), env.path.clone(), HashMap::new(), + HashMap::new(), ); state .update_metadata(&ctx_2, &failed_region_2) @@ -431,4 +448,40 @@ mod tests { assert_eq!(tables[0].regions, vec![2, 4]); } } + + #[derive(Debug, Clone, Serialize, Deserialize)] + struct LegacyUpdateRegionMetadata { + candidate: Peer, + region_storage_path: String, + region_options: HashMap, + } + + #[test] + fn test_compatible_serialize_update_region_metadata() { + let candidate = Peer::new(1, "test_addr"); + let region_storage_path = "test_path".to_string(); + let region_options = HashMap::from([ + ("a".to_string(), "aa".to_string()), + ("b".to_string(), "bb".to_string()), + ]); + + let legacy_update_region_metadata = LegacyUpdateRegionMetadata { + candidate: candidate.clone(), + region_storage_path: region_storage_path.clone(), + region_options: region_options.clone(), + }; + + // Serialize a LegacyUpdateRegionMetadata. + let serialized = serde_json::to_string(&legacy_update_region_metadata).unwrap(); + + // Deserialize to UpdateRegionMetadata. + let deserialized = serde_json::from_str(&serialized).unwrap(); + let expected = UpdateRegionMetadata { + candidate, + region_storage_path, + region_options, + region_wal_options: HashMap::new(), + }; + assert_eq!(expected, deserialized); + } } diff --git a/src/meta-srv/src/procedure/region_migration/migration_start.rs b/src/meta-srv/src/procedure/region_migration/migration_start.rs index 98006139295d..3ef5d46c6595 100644 --- a/src/meta-srv/src/procedure/region_migration/migration_start.rs +++ b/src/meta-srv/src/procedure/region_migration/migration_start.rs @@ -137,6 +137,7 @@ impl RegionMigrationStart { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -187,7 +188,7 @@ mod tests { }; env.table_metadata_manager() - .create_table_metadata(table_info, vec![region_route]) + .create_table_metadata(table_info, vec![region_route], HashMap::default()) .await .unwrap(); @@ -221,7 +222,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -254,7 +255,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -281,7 +282,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs index 830c4e22b861..129f7486fcc8 100644 --- a/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/open_candidate_region.rs @@ -76,6 +76,8 @@ impl OpenCandidateRegion { let engine = table_info.meta.engine.clone(); let region_options: HashMap = (&table_info.meta.options).into(); + // TODO(niebayes): properly fetch or construct region wal options. + let region_wal_options = HashMap::new(); let open_instruction = Instruction::OpenRegion(OpenRegion::new( RegionIdent { @@ -87,6 +89,7 @@ impl OpenCandidateRegion { }, ®ion_storage_path, region_options, + region_wal_options, )); Ok(open_instruction) @@ -210,7 +213,8 @@ mod tests { engine: MITO2_ENGINE.to_string(), }, region_storage_path: "/bar/foo/region/".to_string(), - options: Default::default(), + region_options: Default::default(), + region_wal_options: Default::default(), }) } @@ -403,7 +407,7 @@ mod tests { }]; env.table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/test_util.rs b/src/meta-srv/src/procedure/region_migration/test_util.rs index a4ff64403da2..1c95a2d393a7 100644 --- a/src/meta-srv/src/procedure/region_migration/test_util.rs +++ b/src/meta-srv/src/procedure/region_migration/test_util.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::assert_matches::assert_matches; +use std::collections::HashMap; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -368,7 +369,7 @@ impl ProcedureMigrationTestSuite { ) { self.env .table_metadata_manager() - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); } diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs index 3bc665652ab9..7deaddb5c27b 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/downgrade_leader_region.rs @@ -74,6 +74,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -137,7 +138,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -191,7 +192,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -234,7 +235,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs index 603850b60d58..6c1a2648535a 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/rollback_downgraded_region.rs @@ -59,6 +59,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -129,7 +130,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -215,7 +216,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs index 097a282273cc..4886df0e5af4 100644 --- a/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs +++ b/src/meta-srv/src/procedure/region_migration/update_metadata/upgrade_candidate_region.rs @@ -136,6 +136,9 @@ impl UpdateMetadata { let engine = table_info.meta.engine.clone(); let region_options: HashMap = (&table_info.meta.options).into(); + // TODO(niebayes): properly fetch or construct region wal options. + let region_wal_options = HashMap::new(); + // No remote fetch. let table_route_value = ctx.get_table_route_value().await?; @@ -146,10 +149,12 @@ impl UpdateMetadata { engine: engine.to_string(), region_storage_path: region_storage_path.to_string(), region_options: region_options.clone(), + region_wal_options: region_wal_options.clone(), }, table_route_value, region_routes, ®ion_options, + ®ion_wal_options, ) .await .context(error::TableMetadataManagerSnafu) @@ -171,6 +176,7 @@ impl UpdateMetadata { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::collections::HashMap; use common_meta::key::test_utils::new_test_table_info; use common_meta::peer::Peer; @@ -221,7 +227,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -250,7 +256,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -281,7 +287,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -322,7 +328,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -381,7 +387,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -407,7 +413,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -433,7 +439,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); @@ -466,7 +472,7 @@ mod tests { let table_metadata_manager = env.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/procedure/tests.rs b/src/meta-srv/src/procedure/tests.rs index 9f513e3662d2..e7b8a681138c 100644 --- a/src/meta-srv/src/procedure/tests.rs +++ b/src/meta-srv/src/procedure/tests.rs @@ -101,6 +101,7 @@ fn test_region_request_builder() { 1, create_table_task(), test_data::new_region_routes(), + HashMap::default(), test_data::new_ddl_context(Arc::new(DatanodeClients::default())), ); @@ -191,6 +192,7 @@ async fn test_on_datanode_create_regions() { 1, create_table_task(), region_routes, + HashMap::default(), test_data::new_ddl_context(datanode_manager), ); @@ -369,7 +371,11 @@ async fn test_submit_alter_region_requests() { let table_info = test_data::new_table_info(); context .table_metadata_manager - .create_table_metadata(table_info.clone(), region_routes.clone()) + .create_table_metadata( + table_info.clone(), + region_routes.clone(), + HashMap::default(), + ) .await .unwrap(); diff --git a/src/meta-srv/src/region/lease_keeper.rs b/src/meta-srv/src/region/lease_keeper.rs index fcc4a8363d85..b555d2e780dd 100644 --- a/src/meta-srv/src/region/lease_keeper.rs +++ b/src/meta-srv/src/region/lease_keeper.rs @@ -291,7 +291,7 @@ mod tests { let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, vec![region_route.clone()]) + .create_table_metadata(table_info, vec![region_route.clone()], HashMap::default()) .await .unwrap(); @@ -378,7 +378,7 @@ mod tests { let keeper = new_test_keeper(); let table_metadata_manager = keeper.table_metadata_manager(); table_metadata_manager - .create_table_metadata(table_info, vec![region_route.clone()]) + .create_table_metadata(table_info, vec![region_route.clone()], HashMap::default()) .await .unwrap(); diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index 395104a9fc78..7a4183e58a88 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -15,11 +15,13 @@ use api::v1::meta::Partition; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; -use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; +use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; -use common_telemetry::warn; +use common_meta::wal::options_allocator::build_region_wal_options; +use common_meta::wal::WalOptionsAllocator; +use common_telemetry::{debug, warn}; use snafu::{ensure, ResultExt}; use store_api::storage::{RegionId, TableId, MAX_REGION_SEQ}; use table::metadata::RawTableInfo; @@ -32,6 +34,7 @@ pub struct MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocator, } impl MetaSrvTableMetadataAllocator { @@ -39,11 +42,13 @@ impl MetaSrvTableMetadataAllocator { ctx: SelectorContext, selector: SelectorRef, table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocator, ) -> Self { Self { ctx, selector, table_id_sequence, + wal_options_allocator, } } } @@ -55,8 +60,8 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { ctx: &TableMetadataAllocatorContext, raw_table_info: &mut RawTableInfo, partitions: &[Partition], - ) -> MetaResult<(TableId, Vec)> { - handle_create_region_routes( + ) -> MetaResult { + let (table_id, region_routes) = handle_create_region_routes( ctx.cluster_id, raw_table_info, partitions, @@ -66,7 +71,25 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { ) .await .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu) + .context(meta_error::ExternalSnafu)?; + + let region_numbers = region_routes + .iter() + .map(|route| route.region.id.region_number()) + .collect(); + let region_wal_options = + build_region_wal_options(region_numbers, &self.wal_options_allocator)?; + + debug!( + "Allocated region wal options {:?} for table {}", + region_wal_options, table_id + ); + + Ok(TableMetadata { + table_id, + region_routes, + region_wal_options, + }) } } diff --git a/src/meta-srv/src/test_util.rs b/src/meta-srv/src/test_util.rs index b6993f7a3778..801b63ab3222 100644 --- a/src/meta-srv/src/test_util.rs +++ b/src/meta-srv/src/test_util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use chrono::DateTime; @@ -144,7 +145,7 @@ pub(crate) async fn prepare_table_region_and_info_value( region_route_factory(4, 3), ]; table_metadata_manager - .create_table_metadata(table_info, region_routes) + .create_table_metadata(table_info, region_routes, HashMap::default()) .await .unwrap(); } diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index e5e24b9b9596..c0d2a9f74f6b 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; @@ -80,6 +80,12 @@ pub fn new_test_table_info( .unwrap() } +fn new_test_region_wal_options(regions: Vec) -> HashMap { + // TODO(niebayes): construct region wal options for test. + let _ = regions; + HashMap::default() +} + /// Create a partition rule manager with two tables, one is partitioned by single column, and /// the other one is two. The tables are under default catalog and schema. /// @@ -102,9 +108,12 @@ pub(crate) async fn create_partition_rule_manager( let table_metadata_manager = TableMetadataManager::new(kv_backend.clone()); let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend)); + let regions = vec![1u32, 2, 3]; + let region_wal_options = new_test_region_wal_options(regions.clone()); + table_metadata_manager .create_table_metadata( - new_test_table_info(1, "table_1", vec![0u32, 1, 2].into_iter()).into(), + new_test_table_info(1, "table_1", regions.clone().into_iter()).into(), vec![ RegionRoute { region: Region { @@ -161,13 +170,14 @@ pub(crate) async fn create_partition_rule_manager( leader_status: None, }, ], + region_wal_options.clone(), ) .await .unwrap(); table_metadata_manager .create_table_metadata( - new_test_table_info(2, "table_2", vec![0u32, 1, 2].into_iter()).into(), + new_test_table_info(2, "table_2", regions.clone().into_iter()).into(), vec![ RegionRoute { region: Region { @@ -230,6 +240,7 @@ pub(crate) async fn create_partition_rule_manager( leader_status: None, }, ], + region_wal_options, ) .await .unwrap(); diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9536eaefe88c..0f561ca1512b 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,13 +23,14 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; +use common_meta::wal::build_wal_options_allocator; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataCreator; +use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -117,7 +118,15 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); + // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_options_allocator = + build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) + .await + .unwrap(); + let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( + table_id_sequence, + wal_options_allocator, + )); let ddl_task_executor = Arc::new( DdlManager::try_new(