diff --git a/src/cmd/src/error.rs b/src/cmd/src/error.rs index ae84282b1c01..91d556ff3861 100644 --- a/src/cmd/src/error.rs +++ b/src/cmd/src/error.rs @@ -14,7 +14,7 @@ use std::any::Any; -use common_error::ext::ErrorExt; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use config::ConfigError; @@ -231,6 +231,12 @@ pub enum Error { #[snafu(source)] error: std::io::Error, }, + + #[snafu(display("Other error"))] + Other { + source: BoxedError, + location: Location, + }, } pub type Result = std::result::Result; @@ -276,6 +282,8 @@ impl ErrorExt for Error { Error::StartCatalogManager { source, .. } => source.status_code(), Error::SerdeJson { .. } | Error::FileIo { .. } => StatusCode::Unexpected, + + Error::Other { source, .. } => source.status_code(), } } diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 3a499f6d6df7..55ed1d9817a5 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use clap::Parser; use common_catalog::consts::MIN_USER_TABLE_ID; use common_config::{metadata_store_dir, KvBackendConfig, WalConfig}; +use common_error::ext::BoxedError; use common_meta::cache_invalidator::DummyCacheInvalidator; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::{DdlTaskExecutorRef, TableMetadataAllocatorRef}; @@ -27,6 +28,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_meta::kv_backend::KvBackendRef; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; +use common_meta::wal::build_wal_options_allocator; use common_procedure::ProcedureManagerRef; use common_telemetry::info; use common_telemetry::logging::LoggingOptions; @@ -35,7 +37,7 @@ use datanode::datanode::{Datanode, DatanodeBuilder}; use file_engine::config::EngineConfig as FileEngineConfig; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataCreator; +use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance as FeInstance, StandaloneDatanodeManager}; use frontend::service_config::{ GrpcOptions, InfluxdbOptions, MysqlOptions, OpentsdbOptions, PostgresOptions, PromStoreOptions, @@ -48,7 +50,7 @@ use servers::Mode; use snafu::ResultExt; use crate::error::{ - CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, Result, + CreateDirSnafu, IllegalConfigSnafu, InitDdlManagerSnafu, InitMetadataSnafu, OtherSnafu, Result, ShutdownDatanodeSnafu, ShutdownFrontendSnafu, StartDatanodeSnafu, StartFrontendSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; @@ -372,7 +374,16 @@ impl StartCommand { .step(10) .build(), ); - let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); + // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_options_allocator = + build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; + let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( + table_id_sequence, + wal_options_allocator, + )); let ddl_task_executor = Self::create_ddl_task_executor( kv_backend.clone(), diff --git a/src/common/meta/src/wal.rs b/src/common/meta/src/wal.rs index 8a95bd38af7b..a4c5dcb62669 100644 --- a/src/common/meta/src/wal.rs +++ b/src/common/meta/src/wal.rs @@ -23,7 +23,7 @@ use serde_with::with_prefix; use crate::error::Result; use crate::wal::kafka::KafkaConfig; pub use crate::wal::kafka::{KafkaOptions as KafkaWalOptions, Topic as KafkaWalTopic}; -pub use crate::wal::options_allocator::WalOptionsAllocator; +pub use crate::wal::options_allocator::{build_wal_options_allocator, WalOptionsAllocator}; /// An encoded wal options will be wrapped into a (WAL_OPTIONS_KEY, encoded wal options) key-value pair /// and inserted into the options of a `RegionCreateRequest`. diff --git a/src/common/meta/src/wal/options_allocator.rs b/src/common/meta/src/wal/options_allocator.rs index 90f9e55739e9..b6518199be52 100644 --- a/src/common/meta/src/wal/options_allocator.rs +++ b/src/common/meta/src/wal/options_allocator.rs @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; -use crate::error::Result; +use snafu::ResultExt; +use store_api::storage::RegionNumber; + +use crate::error::{EncodeWalOptionsToJsonSnafu, Result}; use crate::kv_backend::KvBackendRef; use crate::wal::kafka::{KafkaOptions, TopicManager as KafkaTopicManager}; use crate::wal::{WalConfig, WalOptions}; @@ -27,8 +31,6 @@ pub enum WalOptionsAllocator { Kafka(KafkaTopicManager), } -pub type WalOptionsAllocatorRef = Arc; - impl WalOptionsAllocator { /// Creates a WalOptionsAllocator. pub fn new(config: &WalConfig, kv_backend: KvBackendRef) -> Self { @@ -59,3 +61,30 @@ impl WalOptionsAllocator { } } } + +/// Allocates a wal options for each region. The allocated wal options is encoded immediately. +pub fn build_region_wal_options( + regions: Vec, + wal_options_allocator: &WalOptionsAllocator, +) -> Result> { + let wal_options = wal_options_allocator + .alloc_batch(regions.len()) + .into_iter() + .map(|wal_options| { + serde_json::to_string(&wal_options).context(EncodeWalOptionsToJsonSnafu { wal_options }) + }) + .collect::>>()?; + + Ok(regions.into_iter().zip(wal_options).collect()) +} + +/// Builds a wal options allocator. +// TODO(niebayes): implement. +pub async fn build_wal_options_allocator( + config: &WalConfig, + kv_backend: &KvBackendRef, +) -> Result { + let _ = config; + let _ = kv_backend; + Ok(WalOptionsAllocator::default()) +} diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 3fb07da6ced0..95229d343694 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -26,9 +26,11 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; +use common_meta::wal::options_allocator::build_region_wal_options; +use common_meta::wal::WalOptionsAllocator; use common_recordbatch::SendableRecordBatchStream; -use common_telemetry::tracing; use common_telemetry::tracing_context::{FutureExt, TracingContext}; +use common_telemetry::{debug, tracing}; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; use snafu::{OptionExt, ResultExt}; @@ -105,18 +107,22 @@ impl Datanode for RegionInvoker { } } -pub struct StandaloneTableMetadataCreator { +pub struct StandaloneTableMetadataAllocator { table_id_sequence: SequenceRef, + wal_options_allocator: WalOptionsAllocator, } -impl StandaloneTableMetadataCreator { - pub fn new(table_id_sequence: SequenceRef) -> Self { - Self { table_id_sequence } +impl StandaloneTableMetadataAllocator { + pub fn new(table_id_sequence: SequenceRef, wal_options_allocator: WalOptionsAllocator) -> Self { + Self { + table_id_sequence, + wal_options_allocator, + } } } #[async_trait] -impl TableMetadataAllocator for StandaloneTableMetadataCreator { +impl TableMetadataAllocator for StandaloneTableMetadataAllocator { async fn create( &self, _ctx: &TableMetadataAllocatorContext, @@ -145,7 +151,18 @@ impl TableMetadataAllocator for StandaloneTableMetadataCreator { }) .collect::>(); - // There're no wal options involved in standalone mode currently. + let region_numbers = region_routes + .iter() + .map(|route| route.region.id.region_number()) + .collect(); + let region_wal_options = + build_region_wal_options(region_numbers, &self.wal_options_allocator)?; + + debug!( + "Allocated region wal options {:?} for table {}", + region_wal_options, table_id + ); + Ok(TableMetadata { table_id, region_routes, diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 0c24053736af..0e168d522d62 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -19,6 +19,7 @@ use std::time::Duration; use client::client_manager::DatanodeClients; use common_base::Plugins; use common_catalog::consts::MIN_USER_TABLE_ID; +use common_error::ext::BoxedError; use common_grpc::channel_manager::ChannelConfig; use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::ddl::TableMetadataAllocatorRef; @@ -30,14 +31,14 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::region_keeper::{MemoryRegionKeeper, MemoryRegionKeeperRef}; use common_meta::sequence::SequenceBuilder; use common_meta::state_store::KvStateStore; -use common_meta::wal::{WalConfig, WalOptionsAllocator}; +use common_meta::wal::build_wal_options_allocator; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; use snafu::ResultExt; use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; -use crate::error::{self, Result}; +use crate::error::{self, OtherSnafu, Result}; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; use crate::handler::check_leader_handler::CheckLeaderHandler; use crate::handler::collect_stats_handler::CollectStatsHandler; @@ -205,7 +206,10 @@ impl MetaSrvBuilder { table_id: None, }; - let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend).await?; + let wal_options_allocator = build_wal_options_allocator(&options.wal, &kv_backend) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?; let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| { let sequence = Arc::new( SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone()) @@ -358,15 +362,6 @@ fn build_procedure_manager( Arc::new(LocalManager::new(manager_config, state_store)) } -async fn build_wal_options_allocator( - config: &WalConfig, - kv_backend: &KvBackendRef, -) -> Result { - let _ = config; - let _ = kv_backend; - Ok(WalOptionsAllocator::default()) -} - fn build_ddl_manager( options: &MetaSrvOptions, datanode_clients: Option, diff --git a/src/meta-srv/src/table_meta_alloc.rs b/src/meta-srv/src/table_meta_alloc.rs index e24c14397af4..7a4183e58a88 100644 --- a/src/meta-srv/src/table_meta_alloc.rs +++ b/src/meta-srv/src/table_meta_alloc.rs @@ -16,9 +16,10 @@ use api::v1::meta::Partition; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; use common_meta::ddl::{TableMetadata, TableMetadataAllocator, TableMetadataAllocatorContext}; -use common_meta::error::{self as meta_error, EncodeWalOptionsToJsonSnafu, Result as MetaResult}; +use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::rpc::router::{Region, RegionRoute}; use common_meta::sequence::SequenceRef; +use common_meta::wal::options_allocator::build_region_wal_options; use common_meta::wal::WalOptionsAllocator; use common_telemetry::{debug, warn}; use snafu::{ensure, ResultExt}; @@ -72,23 +73,12 @@ impl TableMetadataAllocator for MetaSrvTableMetadataAllocator { .map_err(BoxedError::new) .context(meta_error::ExternalSnafu)?; - // Allocates a wal options for each region. The allocated wal options is encoded immediately. - let num_regions = region_routes.len(); - let wal_options = self - .wal_options_allocator - .alloc_batch(num_regions) - .into_iter() - .map(|wal_options| { - serde_json::to_string(&wal_options) - .context(EncodeWalOptionsToJsonSnafu { wal_options }) - }) - .collect::>>()?; - - let region_wal_options = region_routes + let region_numbers = region_routes .iter() .map(|route| route.region.id.region_number()) - .zip(wal_options) .collect(); + let region_wal_options = + build_region_wal_options(region_numbers, &self.wal_options_allocator)?; debug!( "Allocated region wal options {:?} for table {}", diff --git a/tests-integration/src/standalone.rs b/tests-integration/src/standalone.rs index 9536eaefe88c..0f561ca1512b 100644 --- a/tests-integration/src/standalone.rs +++ b/tests-integration/src/standalone.rs @@ -23,13 +23,14 @@ use common_meta::ddl_manager::DdlManager; use common_meta::key::TableMetadataManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::sequence::SequenceBuilder; +use common_meta::wal::build_wal_options_allocator; use common_procedure::options::ProcedureConfig; use common_telemetry::logging::LoggingOptions; use datanode::config::DatanodeOptions; use datanode::datanode::DatanodeBuilder; use frontend::frontend::FrontendOptions; use frontend::instance::builder::FrontendBuilder; -use frontend::instance::standalone::StandaloneTableMetadataCreator; +use frontend::instance::standalone::StandaloneTableMetadataAllocator; use frontend::instance::{FrontendInstance, Instance, StandaloneDatanodeManager}; use crate::test_util::{self, create_tmp_dir_and_datanode_opts, StorageType, TestGuard}; @@ -117,7 +118,15 @@ impl GreptimeDbStandaloneBuilder { .step(10) .build(), ); - let table_meta_allocator = Arc::new(StandaloneTableMetadataCreator::new(table_id_sequence)); + // TODO(niebayes): add a wal config into the MixOptions and pass it to the allocator builder. + let wal_options_allocator = + build_wal_options_allocator(&common_meta::wal::WalConfig::default(), &kv_backend) + .await + .unwrap(); + let table_meta_allocator = Arc::new(StandaloneTableMetadataAllocator::new( + table_id_sequence, + wal_options_allocator, + )); let ddl_task_executor = Arc::new( DdlManager::try_new(