From 71999761c3bbc8f89e0daf588f0541747e143588 Mon Sep 17 00:00:00 2001 From: August Date: Wed, 31 Jan 2024 19:20:21 +0800 Subject: [PATCH] feat: disable etcd initialization when sql backend enabled --- src/common/src/config.rs | 1 + src/meta/node/src/lib.rs | 9 +- src/meta/node/src/server.rs | 157 +++++++++--------- src/meta/service/src/system_params_service.rs | 12 +- src/meta/service/src/telemetry_service.rs | 14 +- src/meta/src/backup_restore/utils.rs | 2 + src/meta/src/barrier/mod.rs | 1 + src/meta/src/barrier/recovery.rs | 6 +- src/meta/src/hummock/compactor_manager.rs | 2 +- .../manager/compaction_group_manager.rs | 14 +- src/meta/src/hummock/manager/mod.rs | 16 +- src/meta/src/hummock/manager/tests.rs | 8 +- src/meta/src/hummock/manager/utils.rs | 2 +- src/meta/src/lib.rs | 3 + src/meta/src/manager/catalog/database.rs | 18 +- src/meta/src/manager/catalog/fragment.rs | 4 +- src/meta/src/manager/catalog/mod.rs | 4 +- src/meta/src/manager/catalog/user.rs | 2 +- src/meta/src/manager/cluster.rs | 12 +- src/meta/src/manager/env.rs | 122 +++++++------- src/meta/src/manager/notification.rs | 7 +- src/meta/src/manager/notification_version.rs | 3 +- src/meta/src/model/catalog.rs | 2 +- 23 files changed, 221 insertions(+), 200 deletions(-) diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 7f7cb500d1f47..f3cd926b6802b 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -167,6 +167,7 @@ pub enum MetaBackend { #[default] Mem, Etcd, + Sql, } /// The section `[meta]` in `risingwave.toml`. diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index fb5365edf3e84..af8190e1ca8d6 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -31,7 +31,7 @@ use risingwave_common_heap_profiling::HeapProfiler; use risingwave_meta::*; use risingwave_meta_service::*; pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient}; -use server::{rpc_serve, MetaStoreSqlBackend}; +use server::rpc_serve; use crate::manager::MetaOpts; @@ -213,10 +213,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { }, }, MetaBackend::Mem => MetaStoreBackend::Mem, + MetaBackend::Sql => MetaStoreBackend::Sql { + endpoint: opts.sql_endpoint.expect("sql endpoint is required"), + }, }; - let sql_backend = opts - .sql_endpoint - .map(|endpoint| MetaStoreSqlBackend { endpoint }); validate_config(&config); @@ -273,7 +273,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin + Send>> { let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve( add_info, backend, - sql_backend, max_heartbeat_interval, config.meta.meta_leader_lease_secs, MetaOpts { diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 71486dbece5cb..83a47a98effdf 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -22,6 +22,7 @@ use futures::future::join_all; use itertools::Itertools; use otlp_embedded::TraceServiceServer; use regex::Regex; +use risingwave_common::config::MetaBackend; use risingwave_common::monitor::connection::{RouterExt, TcpConfig}; use risingwave_common::system_param::reader::SystemParamsRead; use risingwave_common::telemetry::manager::TelemetryManager; @@ -33,11 +34,13 @@ use risingwave_meta::controller::cluster::ClusterController; use risingwave_meta::manager::MetadataManager; use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer; use risingwave_meta::rpc::ElectionClientRef; +use risingwave_meta::MetaStoreBackend; use risingwave_meta_model_migration::{Migrator, MigratorTrait}; use risingwave_meta_service::backup_service::BackupServiceImpl; use risingwave_meta_service::cloud_service::CloudServiceImpl; use risingwave_meta_service::cluster_service::ClusterServiceImpl; use risingwave_meta_service::ddl_service::DdlServiceImpl; +use risingwave_meta_service::event_log_service::EventLogServiceImpl; use risingwave_meta_service::health_service::HealthServiceImpl; use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl; use risingwave_meta_service::hummock_service::HummockServiceImpl; @@ -50,6 +53,7 @@ use risingwave_meta_service::stream_service::StreamServiceImpl; use risingwave_meta_service::system_params_service::SystemParamsServiceImpl; use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl; use risingwave_meta_service::user_service::UserServiceImpl; +use risingwave_meta_service::AddressInfo; use risingwave_pb::backup_service::backup_service_server::BackupServiceServer; use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer; use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer; @@ -57,6 +61,7 @@ use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer; use risingwave_pb::health::health_server::HealthServer; use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer; use risingwave_pb::meta::cluster_service_server::ClusterServiceServer; +use risingwave_pb::meta::event_log_service_server::EventLogServiceServer; use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer; use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer; use risingwave_pb::meta::notification_service_server::NotificationServiceServer; @@ -101,58 +106,15 @@ use crate::storage::{ use crate::stream::{GlobalStreamManager, SourceManager}; use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher}; use crate::{hummock, serving, MetaError, MetaResult}; -#[derive(Debug)] -pub struct MetaStoreSqlBackend { - pub(crate) endpoint: String, -} - -use risingwave_meta::MetaStoreBackend; -use risingwave_meta_service::event_log_service::EventLogServiceImpl; -use risingwave_meta_service::AddressInfo; -use risingwave_pb::meta::event_log_service_server::EventLogServiceServer; pub async fn rpc_serve( address_info: AddressInfo, meta_store_backend: MetaStoreBackend, - meta_store_sql_backend: Option, max_cluster_heartbeat_interval: Duration, lease_interval_secs: u64, opts: MetaOpts, init_system_params: SystemParams, ) -> MetaResult<(JoinHandle<()>, Option>, WatchSender<()>)> { - let meta_store_sql = match meta_store_sql_backend { - Some(backend) => { - let mut options = sea_orm::ConnectOptions::new(backend.endpoint); - options - .max_connections(20) - .connect_timeout(Duration::from_secs(10)) - .idle_timeout(Duration::from_secs(30)); - let conn = sea_orm::Database::connect(options).await?; - Some(SqlMetaStore::new(conn)) - } - None => None, - }; - - let mut election_client = if let Some(sql_store) = &meta_store_sql { - let id = address_info.advertise_addr.clone(); - let conn = sql_store.conn.clone(); - let election_client: ElectionClientRef = match conn.get_database_backend() { - DbBackend::Sqlite => { - Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn))) - } - DbBackend::Postgres => { - Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn))) - } - DbBackend::MySql => Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))), - }; - - election_client.init().await?; - - Some(election_client) - } else { - None - }; - match meta_store_backend { MetaStoreBackend::Etcd { endpoints, @@ -170,28 +132,26 @@ pub async fn rpc_serve( .context("failed to connect etcd")?; let meta_store = EtcdMetaStore::new(client).into_ref(); - if election_client.is_none() { - // `with_keep_alive` option will break the long connection in election client. - let mut election_options = ConnectOptions::default(); - if let Some((username, password)) = &credentials { - election_options = election_options.with_user(username, password) - } - - election_client = Some(Arc::new( - EtcdElectionClient::new( - endpoints, - Some(election_options), - auth_enabled, - address_info.advertise_addr.clone(), - ) - .await?, - )); + // `with_keep_alive` option will break the long connection in election client. + let mut election_options = ConnectOptions::default(); + if let Some((username, password)) = &credentials { + election_options = election_options.with_user(username, password) } + let election_client: ElectionClientRef = Arc::new( + EtcdElectionClient::new( + endpoints, + Some(election_options), + auth_enabled, + address_info.advertise_addr.clone(), + ) + .await?, + ); + rpc_serve_with_store( - meta_store, - election_client, - meta_store_sql, + Some(meta_store), + Some(election_client), + None, address_info, max_cluster_heartbeat_interval, lease_interval_secs, @@ -202,9 +162,44 @@ pub async fn rpc_serve( MetaStoreBackend::Mem => { let meta_store = MemStore::new().into_ref(); rpc_serve_with_store( - meta_store, - election_client, - meta_store_sql, + Some(meta_store), + None, + None, + address_info, + max_cluster_heartbeat_interval, + lease_interval_secs, + opts, + init_system_params, + ) + } + MetaStoreBackend::Sql { endpoint } => { + let mut options = sea_orm::ConnectOptions::new(endpoint); + options + .max_connections(20) + .connect_timeout(Duration::from_secs(10)) + .idle_timeout(Duration::from_secs(30)); + let conn = sea_orm::Database::connect(options).await?; + let meta_store_sql = SqlMetaStore::new(conn); + + // Init election client. + let id = address_info.advertise_addr.clone(); + let conn = meta_store_sql.conn.clone(); + let election_client: ElectionClientRef = match conn.get_database_backend() { + DbBackend::Sqlite => { + Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn))) + } + DbBackend::Postgres => { + Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn))) + } + DbBackend::MySql => { + Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))) + } + }; + + rpc_serve_with_store( + None, + Some(election_client), + Some(meta_store_sql), address_info, max_cluster_heartbeat_interval, lease_interval_secs, @@ -217,7 +212,7 @@ pub async fn rpc_serve( #[expect(clippy::type_complexity)] pub fn rpc_serve_with_store( - meta_store: MetaStoreRef, + meta_store: Option, election_client: Option, meta_store_sql: Option, address_info: AddressInfo, @@ -372,7 +367,7 @@ pub async fn start_service_as_election_follower( /// ## Returns /// Returns an error if the service initialization failed pub async fn start_service_as_election_leader( - meta_store: MetaStoreRef, + meta_store: Option, meta_store_sql: Option, address_info: AddressInfo, max_cluster_heartbeat_interval: Duration, @@ -397,14 +392,7 @@ pub async fn start_service_as_election_leader( ) .await?; - let system_params_manager = env.system_params_manager_ref(); - let mut system_params_reader = system_params_manager.get_params().await; - - // Using new reader instead if the controller is set. - let system_params_controller = env.system_params_controller_ref(); - if let Some(ctl) = &system_params_controller { - system_params_reader = ctl.get_params().await; - } + let system_params_reader = env.system_params_reader().await; let data_directory = system_params_reader.data_directory(); if !is_correct_data_directory(data_directory) { @@ -651,8 +639,8 @@ pub async fn start_service_as_election_leader( let backup_srv = BackupServiceImpl::new(backup_manager); let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone(), env.sql_meta_store()); let system_params_srv = SystemParamsServiceImpl::new( - system_params_manager.clone(), - system_params_controller.clone(), + env.system_params_manager_ref(), + env.system_params_controller_ref(), ); let serving_srv = ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone()); @@ -681,13 +669,13 @@ pub async fn start_service_as_election_leader( hummock_manager.clone(), meta_metrics.clone(), )); - if let Some(system_params_ctl) = system_params_controller { + if let Some(system_params_ctl) = env.system_params_controller_ref() { sub_tasks.push(SystemParamsController::start_params_notifier( system_params_ctl, )); } else { sub_tasks.push(SystemParamsManager::start_params_notifier( - system_params_manager.clone(), + env.system_params_manager_ref().unwrap(), )); } sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone())); @@ -742,7 +730,10 @@ pub async fn start_service_as_election_leader( Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())), Arc::new(MetaReportCreator::new( metadata_manager.clone(), - meta_store.meta_store_type(), + meta_store + .as_ref() + .map(|m| m.meta_store_type()) + .unwrap_or(MetaBackend::Sql), )), ); @@ -787,8 +778,10 @@ pub async fn start_service_as_election_leader( // Persist params before starting services so that invalid params that cause meta node // to crash will not be persisted. if meta_store_sql.is_none() { - system_params_manager.flush_params().await?; - env.cluster_id().put_at_meta_store(&meta_store).await?; + env.system_params_manager().unwrap().flush_params().await?; + env.cluster_id() + .put_at_meta_store(meta_store.as_ref().unwrap()) + .await?; } tracing::info!("Assigned cluster id {:?}", *env.cluster_id()); diff --git a/src/meta/service/src/system_params_service.rs b/src/meta/service/src/system_params_service.rs index fb4a59c0c2879..9d1e3ad3969f8 100644 --- a/src/meta/service/src/system_params_service.rs +++ b/src/meta/service/src/system_params_service.rs @@ -23,13 +23,13 @@ use crate::controller::system_param::SystemParamsControllerRef; use crate::manager::SystemParamsManagerRef; pub struct SystemParamsServiceImpl { - system_params_manager: SystemParamsManagerRef, + system_params_manager: Option, system_params_controller: Option, } impl SystemParamsServiceImpl { pub fn new( - system_params_manager: SystemParamsManagerRef, + system_params_manager: Option, system_params_controller: Option, ) -> Self { Self { @@ -48,7 +48,11 @@ impl SystemParamsService for SystemParamsServiceImpl { let params = if let Some(ctl) = &self.system_params_controller { ctl.get_pb_params().await } else { - self.system_params_manager.get_pb_params().await + self.system_params_manager + .as_ref() + .unwrap() + .get_pb_params() + .await }; Ok(Response::new(GetSystemParamsResponse { @@ -65,6 +69,8 @@ impl SystemParamsService for SystemParamsServiceImpl { ctl.set_param(&req.param, req.value).await? } else { self.system_params_manager + .as_ref() + .unwrap() .set_param(&req.param, req.value) .await? }; diff --git a/src/meta/service/src/telemetry_service.rs b/src/meta/service/src/telemetry_service.rs index 963f0884a30ed..6670fe1481389 100644 --- a/src/meta/service/src/telemetry_service.rs +++ b/src/meta/service/src/telemetry_service.rs @@ -24,12 +24,12 @@ use crate::storage::MetaStoreRef; use crate::MetaResult; pub struct TelemetryInfoServiceImpl { - meta_store: MetaStoreRef, + meta_store: Option, sql_meta_store: Option, } impl TelemetryInfoServiceImpl { - pub fn new(meta_store: MetaStoreRef, sql_meta_store: Option) -> Self { + pub fn new(meta_store: Option, sql_meta_store: Option) -> Self { Self { meta_store, sql_meta_store, @@ -42,10 +42,12 @@ impl TelemetryInfoServiceImpl { return Ok(cluster.map(|c| c.cluster_id.to_string().into())); } - Ok(ClusterId::from_meta_store(&self.meta_store) - .await - .ok() - .flatten()) + Ok( + ClusterId::from_meta_store(self.meta_store.as_ref().unwrap()) + .await + .ok() + .flatten(), + ) } } diff --git a/src/meta/src/backup_restore/utils.rs b/src/meta/src/backup_restore/utils.rs index 0e40085a2b97c..48eec28f4ffea 100644 --- a/src/meta/src/backup_restore/utils.rs +++ b/src/meta/src/backup_restore/utils.rs @@ -62,6 +62,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult MetaStoreBackend::Mem, + MetaBackend::Sql => panic!("not supported"), }; match meta_store_backend { MetaStoreBackend::Etcd { @@ -79,6 +80,7 @@ pub async fn get_meta_store(opts: RestoreOpts) -> BackupResult Ok(MetaStoreBackendImpl::Mem(MemStore::new())), + MetaStoreBackend::Sql { .. } => panic!("not supported"), } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 0884560045398..96e8743b83a94 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -462,6 +462,7 @@ impl GlobalBarrierManager { } else { self.env .system_params_manager() + .unwrap() .set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned())) .await?; } diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 1958ab03a166d..2a760528dfa33 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -595,7 +595,7 @@ impl GlobalBarrierManagerContext { .migrate_fragment_actors(&migration_plan) .await?; // 3. remove the migration plan. - migration_plan.delete(self.env.meta_store()).await?; + migration_plan.delete(self.env.meta_store_checked()).await?; debug!("migrate actors succeed."); let info = self.resolve_actor_info().await; @@ -750,7 +750,7 @@ impl GlobalBarrierManagerContext { ) -> MetaResult { let mgr = self.metadata_manager.as_v1_ref(); - let mut cached_plan = MigrationPlan::get(self.env.meta_store()).await?; + let mut cached_plan = MigrationPlan::get(self.env.meta_store_checked()).await?; let all_worker_parallel_units = mgr.fragment_manager.all_worker_parallel_units().await; @@ -847,7 +847,7 @@ impl GlobalBarrierManagerContext { new_plan.parallel_unit_plan ); - new_plan.insert(self.env.meta_store()).await?; + new_plan.insert(self.env.meta_store_checked()).await?; Ok(new_plan) } diff --git a/src/meta/src/hummock/compactor_manager.rs b/src/meta/src/hummock/compactor_manager.rs index d472fa20037a3..8a548d2522e89 100644 --- a/src/meta/src/hummock/compactor_manager.rs +++ b/src/meta/src/hummock/compactor_manager.rs @@ -132,7 +132,7 @@ impl CompactorManagerInner { use sea_orm::EntityTrait; // Retrieve the existing task assignments from metastore. let task_assignment: Vec = match env.sql_meta_store() { - None => CompactTaskAssignment::list(env.meta_store()).await?, + None => CompactTaskAssignment::list(env.meta_store_checked()).await?, Some(sql_meta_store) => compaction_task::Entity::find() .all(&sql_meta_store.conn) .await diff --git a/src/meta/src/hummock/manager/compaction_group_manager.rs b/src/meta/src/hummock/manager/compaction_group_manager.rs index 51f8e63dd6eb0..8db45a2286478 100644 --- a/src/meta/src/hummock/manager/compaction_group_manager.rs +++ b/src/meta/src/hummock/manager/compaction_group_manager.rs @@ -782,10 +782,10 @@ pub(super) struct CompactionGroupManager { } impl CompactionGroupManager { - async fn init(&mut self, meta_store: &S) -> Result<()> { + async fn init(&mut self, meta_store: Option<&S>) -> Result<()> { let loaded_compaction_groups: BTreeMap = match &self.sql_meta_store { - None => CompactionGroup::list(meta_store) + None => CompactionGroup::list(meta_store.unwrap()) .await? .into_iter() .map(|cg| (cg.group_id(), cg)) @@ -811,7 +811,7 @@ impl CompactionGroupManager { pub(super) async fn get_or_insert_compaction_group_config( &mut self, compaction_group_id: CompactionGroupId, - meta_store: &S, + meta_store: Option<&S>, ) -> Result { let r = self .get_or_insert_compaction_group_configs(&[compaction_group_id], meta_store) @@ -823,7 +823,7 @@ impl CompactionGroupManager { pub(super) async fn get_or_insert_compaction_group_configs( &mut self, compaction_group_ids: &[CompactionGroupId], - meta_store: &S, + meta_store: Option<&S>, ) -> Result> { let mut compaction_groups = create_trx_wrapper!( self.sql_meta_store, @@ -862,7 +862,7 @@ impl CompactionGroupManager { &mut self, compaction_group_ids: &[CompactionGroupId], config_to_update: &[MutableConfig], - meta_store: &S, + meta_store: Option<&S>, ) -> Result> { let mut compaction_groups = create_trx_wrapper!( self.sql_meta_store, @@ -894,7 +894,7 @@ impl CompactionGroupManager { &mut self, group_id: CompactionGroupId, config: CompactionConfig, - meta_store: &S, + meta_store: Option<&S>, ) -> Result<()> { let insert = create_trx_wrapper!( self.sql_meta_store, @@ -916,7 +916,7 @@ impl CompactionGroupManager { async fn purge( &mut self, existing_groups: HashSet, - meta_store: &S, + meta_store: Option<&S>, ) -> Result<()> { let mut compaction_groups = create_trx_wrapper!( self.sql_meta_store, diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index bab6ef381d95e..f43408df23d44 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -364,7 +364,7 @@ impl HummockManager { // Make sure data dir is not used by another cluster. // Skip this check in e2e compaction test, which needs to start a secondary cluster with // same bucket - if env.cluster_first_launch() && !deterministic_mode { + if !deterministic_mode { write_exclusive_cluster_id( state_store_dir, env.cluster_id().clone(), @@ -453,7 +453,7 @@ impl HummockManager { let sql_meta_store = self.sql_meta_store(); let compaction_statuses: BTreeMap = match &sql_meta_store { - None => CompactStatus::list(self.env.meta_store()) + None => CompactStatus::list(self.env.meta_store_checked()) .await? .into_iter() .map(|cg| (cg.compaction_group_id(), cg)) @@ -471,7 +471,7 @@ impl HummockManager { } compaction_guard.compact_task_assignment = match &sql_meta_store { - None => CompactTaskAssignment::list(self.env.meta_store()) + None => CompactTaskAssignment::list(self.env.meta_store_checked()) .await? .into_iter() .map(|assigned| (assigned.key().unwrap(), assigned)) @@ -487,7 +487,7 @@ impl HummockManager { let hummock_version_deltas: BTreeMap = match &sql_meta_store { - None => HummockVersionDelta::list(self.env.meta_store()) + None => HummockVersionDelta::list(self.env.meta_store_checked()) .await? .into_iter() .map(|version_delta| (version_delta.id, version_delta)) @@ -536,7 +536,7 @@ impl HummockManager { } } versioning_guard.version_stats = match &sql_meta_store { - None => HummockVersionStats::list(self.env.meta_store()) + None => HummockVersionStats::list(self.env.meta_store_checked()) .await? .into_iter() .next(), @@ -564,7 +564,7 @@ impl HummockManager { versioning_guard.hummock_version_deltas = hummock_version_deltas; versioning_guard.pinned_versions = match &sql_meta_store { - None => HummockPinnedVersion::list(self.env.meta_store()) + None => HummockPinnedVersion::list(self.env.meta_store_checked()) .await? .into_iter() .map(|p| (p.context_id, p)) @@ -579,7 +579,7 @@ impl HummockManager { }; versioning_guard.pinned_snapshots = match &sql_meta_store { - None => HummockPinnedSnapshot::list(self.env.meta_store()) + None => HummockPinnedSnapshot::list(self.env.meta_store_checked()) .await? .into_iter() .map(|p| (p.context_id, p)) @@ -1927,7 +1927,7 @@ impl HummockManager { compaction_groups: Vec, ) -> Result<()> { for table in &table_catalogs { - table.insert(self.env.meta_store()).await?; + table.insert(self.env.meta_store_checked()).await?; } for group in &compaction_groups { assert!( diff --git a/src/meta/src/hummock/manager/tests.rs b/src/meta/src/hummock/manager/tests.rs index 0921e4d2351d8..877be8919262e 100644 --- a/src/meta/src/hummock/manager/tests.rs +++ b/src/meta/src/hummock/manager/tests.rs @@ -101,7 +101,9 @@ fn get_compaction_group_object_ids( async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec { match env.sql_meta_store() { - None => HummockPinnedSnapshot::list(env.meta_store()).await.unwrap(), + None => HummockPinnedSnapshot::list(env.meta_store_checked()) + .await + .unwrap(), Some(sql_meta_store) => { use risingwave_meta_model_v2::hummock_pinned_snapshot; use sea_orm::EntityTrait; @@ -118,7 +120,9 @@ async fn list_pinned_snapshot_from_meta_store(env: &MetaSrvEnv) -> Vec Vec { match env.sql_meta_store() { - None => HummockPinnedVersion::list(env.meta_store()).await.unwrap(), + None => HummockPinnedVersion::list(env.meta_store_checked()) + .await + .unwrap(), Some(sql_meta_store) => { use risingwave_meta_model_v2::hummock_pinned_version; use sea_orm::EntityTrait; diff --git a/src/meta/src/hummock/manager/utils.rs b/src/meta/src/hummock/manager/utils.rs index 9ddc9699cb5e3..07c26d7a24243 100644 --- a/src/meta/src/hummock/manager/utils.rs +++ b/src/meta/src/hummock/manager/utils.rs @@ -26,7 +26,7 @@ macro_rules! commit_multi_var { $( $val_txn.as_v1_ref().apply_to_txn(&mut trx).await?; )* - $meta_store.txn(trx).await?; + $meta_store.unwrap().txn(trx).await?; $( $val_txn.into_v1().commit(); )* diff --git a/src/meta/src/lib.rs b/src/meta/src/lib.rs index 69d75fb40d79b..bce4f082db4c1 100644 --- a/src/meta/src/lib.rs +++ b/src/meta/src/lib.rs @@ -58,4 +58,7 @@ pub enum MetaStoreBackend { credentials: Option<(String, String)>, }, Mem, + Sql { + endpoint: String, + }, } diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 1117544e0f218..d060a610b6fd5 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -83,15 +83,15 @@ pub struct DatabaseManager { impl DatabaseManager { pub async fn new(env: MetaSrvEnv) -> MetaResult { - let databases = Database::list(env.meta_store()).await?; - let schemas = Schema::list(env.meta_store()).await?; - let sources = Source::list(env.meta_store()).await?; - let sinks = Sink::list(env.meta_store()).await?; - let tables = Table::list(env.meta_store()).await?; - let indexes = Index::list(env.meta_store()).await?; - let views = View::list(env.meta_store()).await?; - let functions = Function::list(env.meta_store()).await?; - let connections = Connection::list(env.meta_store()).await?; + let databases = Database::list(env.meta_store_checked()).await?; + let schemas = Schema::list(env.meta_store_checked()).await?; + let sources = Source::list(env.meta_store_checked()).await?; + let sinks = Sink::list(env.meta_store_checked()).await?; + let tables = Table::list(env.meta_store_checked()).await?; + let indexes = Index::list(env.meta_store_checked()).await?; + let views = View::list(env.meta_store_checked()).await?; + let functions = Function::list(env.meta_store_checked()).await?; + let connections = Connection::list(env.meta_store_checked()).await?; let mut relation_ref_count = HashMap::new(); diff --git a/src/meta/src/manager/catalog/fragment.rs b/src/meta/src/manager/catalog/fragment.rs index 559cef397c250..4883951c5f99d 100644 --- a/src/meta/src/manager/catalog/fragment.rs +++ b/src/meta/src/manager/catalog/fragment.rs @@ -124,7 +124,7 @@ pub type FragmentManagerRef = Arc; impl FragmentManager { pub async fn new(env: MetaSrvEnv) -> MetaResult { - let table_fragments = TableFragments::list(env.meta_store()).await?; + let table_fragments = TableFragments::list(env.meta_store_checked()).await?; // `expr_context` of `StreamActor` is introduced in 1.6.0. // To ensure compatibility, we fill it for table fragments that were created with older versions. @@ -133,7 +133,7 @@ impl FragmentManager { .map(|tf| (tf.table_id(), tf.fill_expr_context())) .collect(); - let table_revision = TableRevision::get(env.meta_store()).await?; + let table_revision = TableRevision::get(env.meta_store_checked()).await?; Ok(Self { env, diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 4d0476f5d4887..33990548f01b8 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -87,7 +87,7 @@ macro_rules! commit_meta_with_trx { $val_txn.apply_to_txn(&mut $trx).await?; )* // Commit to meta store - $manager.env.meta_store().txn($trx).await?; + $manager.env.meta_store_checked().txn($trx).await?; // Upon successful commit, commit the change to in-mem meta $( $val_txn.commit(); @@ -3399,7 +3399,7 @@ impl CatalogManager { ..Default::default() }; - default_user.insert(self.env.meta_store()).await?; + default_user.insert(self.env.meta_store_checked()).await?; core.user_info.insert(default_user.id, default_user); } } diff --git a/src/meta/src/manager/catalog/user.rs b/src/meta/src/manager/catalog/user.rs index 851016ec0e603..456dd116b9daf 100644 --- a/src/meta/src/manager/catalog/user.rs +++ b/src/meta/src/manager/catalog/user.rs @@ -35,7 +35,7 @@ pub struct UserManager { impl UserManager { pub async fn new(env: MetaSrvEnv, database: &DatabaseManager) -> MetaResult { - let users = UserInfo::list(env.meta_store()).await?; + let users = UserInfo::list(env.meta_store_checked()).await?; let user_info = BTreeMap::from_iter(users.into_iter().map(|user| (user.id, user))); let mut user_manager = Self { diff --git a/src/meta/src/manager/cluster.rs b/src/meta/src/manager/cluster.rs index 27a97167d3e4b..dbffa250b4cf1 100644 --- a/src/meta/src/manager/cluster.rs +++ b/src/meta/src/manager/cluster.rs @@ -186,7 +186,7 @@ impl ClusterManager { } new_worker.update_expire_at(self.max_heartbeat_interval); - new_worker.insert(self.env.meta_store()).await?; + new_worker.insert(self.env.meta_store_checked()).await?; *worker = new_worker; return Ok(worker.to_protobuf()); } @@ -230,7 +230,7 @@ impl ClusterManager { worker.update_started_at(timestamp_now_sec()); worker.update_resource(Some(resource)); // Persist worker node. - worker.insert(self.env.meta_store()).await?; + worker.insert(self.env.meta_store_checked()).await?; // Update core. core.add_worker_node(worker); Ok(worker_node) @@ -241,7 +241,7 @@ impl ClusterManager { let mut worker = core.get_worker_by_host_checked(host_address.clone())?; if worker.worker_node.state != State::Running as i32 { worker.worker_node.state = State::Running as i32; - worker.insert(self.env.meta_store()).await?; + worker.insert(self.env.meta_store_checked()).await?; core.update_worker_node(worker.clone()); } @@ -292,7 +292,7 @@ impl ClusterManager { } } - self.env.meta_store().txn(txn).await?; + self.env.meta_store_checked().txn(txn).await?; for var_txn in var_txns { var_txn.commit(); @@ -308,7 +308,7 @@ impl ClusterManager { let worker_node = worker.to_protobuf(); // Persist deletion. - Worker::delete(self.env.meta_store(), &host_address).await?; + Worker::delete(self.env.meta_store_checked(), &host_address).await?; // Update core. core.delete_worker_node(worker); @@ -536,7 +536,7 @@ impl ClusterManagerCore { pub const MAX_WORKER_REUSABLE_ID_COUNT: usize = 1 << Self::MAX_WORKER_REUSABLE_ID_BITS; async fn new(env: MetaSrvEnv) -> MetaResult { - let meta_store = env.meta_store(); + let meta_store = env.meta_store_checked(); let mut workers = Worker::list(meta_store).await?; let used_transactional_ids: HashSet<_> = workers diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index ee4287b1aec3b..be287ac864860 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -45,13 +45,13 @@ use crate::MetaResult; #[derive(Clone)] pub struct MetaSrvEnv { /// id generator manager. - id_gen_manager: IdGeneratorManagerRef, + id_gen_manager: Option, /// sql id generator manager. sql_id_gen_manager: Option, /// meta store. - meta_store: MetaStoreRef, + meta_store: Option, /// sql meta store. meta_store_sql: Option, @@ -68,7 +68,7 @@ pub struct MetaSrvEnv { event_log_manager: EventLogMangerRef, /// system param manager. - system_params_manager: SystemParamsManagerRef, + system_params_manager: Option, /// system param controller. system_params_controller: Option, @@ -76,9 +76,6 @@ pub struct MetaSrvEnv { /// Unique identifier of the cluster. cluster_id: ClusterId, - /// Whether the cluster is launched for the first time. - cluster_first_launch: bool, - /// Client to connector node. `None` if endpoint unspecified or unable to connect. connector_client: Option, @@ -280,39 +277,50 @@ impl MetaSrvEnv { pub async fn new( opts: MetaOpts, init_system_params: SystemParams, - meta_store: MetaStoreRef, + meta_store: Option, meta_store_sql: Option, ) -> MetaResult { - // change to sync after refactor `IdGeneratorManager::new` sync. - let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); - let stream_client_pool = Arc::new(StreamClientPool::default()); let notification_manager = Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); let idle_manager = Arc::new(IdleManager::new(opts.max_idle_ms)); - let (mut cluster_id, cluster_first_launch) = - if let Some(id) = ClusterId::from_meta_store(&meta_store).await? { - (id, false) - } else { - (ClusterId::new(), true) - }; - let system_params_manager = Arc::new( - SystemParamsManager::new( - meta_store.clone(), - notification_manager.clone(), - init_system_params.clone(), - cluster_first_launch, - ) - .await?, - ); - // TODO: remove `cluster_first_launch` and check equality of cluster id stored in hummock to - // make sure the data dir of hummock is not used by another cluster. + let stream_client_pool = Arc::new(StreamClientPool::default()); + + let (id_gen_manager, mut cluster_id, system_params_manager) = match meta_store.clone() { + Some(meta_store) => { + // change to sync after refactor `IdGeneratorManager::new` sync. + let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); + let (cluster_id, cluster_first_launch) = + if let Some(id) = ClusterId::from_meta_store(&meta_store).await? { + (id, false) + } else { + (ClusterId::new(), true) + }; + let system_params_manager = Arc::new( + SystemParamsManager::new( + meta_store.clone(), + notification_manager.clone(), + init_system_params.clone(), + cluster_first_launch, + ) + .await?, + ); + ( + Some(id_gen_manager), + Some(cluster_id), + Some(system_params_manager), + ) + } + None => (None, None, None), + }; let system_params_controller = match &meta_store_sql { Some(store) => { - cluster_id = Cluster::find() - .one(&store.conn) - .await? - .map(|c| c.cluster_id.to_string().into()) - .unwrap(); + cluster_id = Some( + Cluster::find() + .one(&store.conn) + .await? + .map(|c| c.cluster_id.to_string().into()) + .unwrap(), + ); Some(Arc::new( SystemParamsController::new( store.clone(), @@ -350,8 +358,7 @@ impl MetaSrvEnv { event_log_manager, system_params_manager, system_params_controller, - cluster_id, - cluster_first_launch, + cluster_id: cluster_id.unwrap(), connector_client, opts: opts.into(), hummock_seq, @@ -359,23 +366,23 @@ impl MetaSrvEnv { } pub fn meta_store_ref(&self) -> MetaStoreRef { - self.meta_store.clone() + self.meta_store.clone().unwrap() } - pub fn meta_store(&self) -> &MetaStoreRef { - &self.meta_store + pub fn meta_store_checked(&self) -> &MetaStoreRef { + self.meta_store.as_ref().unwrap() } - pub fn sql_meta_store(&self) -> Option { - self.meta_store_sql.clone() + pub fn meta_store(&self) -> Option<&MetaStoreRef> { + self.meta_store.as_ref() } - pub fn id_gen_manager_ref(&self) -> IdGeneratorManagerRef { - self.id_gen_manager.clone() + pub fn sql_meta_store(&self) -> Option { + self.meta_store_sql.clone() } pub fn id_gen_manager(&self) -> &IdGeneratorManager { - self.id_gen_manager.deref() + self.id_gen_manager.as_ref().unwrap() } pub fn sql_id_gen_manager_ref(&self) -> Option { @@ -402,15 +409,19 @@ impl MetaSrvEnv { if let Some(system_ctl) = &self.system_params_controller { return system_ctl.get_params().await; } - self.system_params_manager.get_params().await + self.system_params_manager + .as_ref() + .unwrap() + .get_params() + .await } - pub fn system_params_manager_ref(&self) -> SystemParamsManagerRef { + pub fn system_params_manager_ref(&self) -> Option { self.system_params_manager.clone() } - pub fn system_params_manager(&self) -> &SystemParamsManager { - self.system_params_manager.deref() + pub fn system_params_manager(&self) -> Option<&SystemParamsManagerRef> { + self.system_params_manager.as_ref() } pub fn system_params_controller_ref(&self) -> Option { @@ -433,10 +444,6 @@ impl MetaSrvEnv { &self.cluster_id } - pub fn cluster_first_launch(&self) -> bool { - self.cluster_first_launch - } - pub fn connector_client(&self) -> Option { self.connector_client.clone() } @@ -456,19 +463,19 @@ impl MetaSrvEnv { pub async fn for_test_opts(opts: Arc) -> Self { use crate::manager::event_log::EventLogManger; - // change to sync after refactor `IdGeneratorManager::new` sync. let meta_store = MemStore::default().into_ref(); #[cfg(madsim)] let meta_store_sql: Option = None; #[cfg(not(madsim))] let meta_store_sql = Some(SqlMetaStore::for_test().await); - let id_gen_manager = Arc::new(IdGeneratorManager::new(meta_store.clone()).await); - let notification_manager = - Arc::new(NotificationManager::new(meta_store.clone(), meta_store_sql.clone()).await); + let id_gen_manager = Some(Arc::new(IdGeneratorManager::new(meta_store.clone()).await)); + let notification_manager = Arc::new( + NotificationManager::new(Some(meta_store.clone()), meta_store_sql.clone()).await, + ); let stream_client_pool = Arc::new(StreamClientPool::default()); let idle_manager = Arc::new(IdleManager::disabled()); - let (cluster_id, cluster_first_launch) = (ClusterId::new(), true); + let cluster_id = ClusterId::new(); let system_params_manager = Arc::new( SystemParamsManager::new( meta_store.clone(), @@ -508,16 +515,15 @@ impl MetaSrvEnv { Self { id_gen_manager, sql_id_gen_manager, - meta_store, + meta_store: Some(meta_store), meta_store_sql, notification_manager, stream_client_pool, idle_manager, event_log_manager, - system_params_manager, + system_params_manager: Some(system_params_manager), system_params_controller, cluster_id, - cluster_first_launch, connector_client: None, opts, hummock_seq, diff --git a/src/meta/src/manager/notification.rs b/src/meta/src/manager/notification.rs index 49f85a993bdd9..166f35655aa3c 100644 --- a/src/meta/src/manager/notification.rs +++ b/src/meta/src/manager/notification.rs @@ -84,7 +84,10 @@ pub struct NotificationManager { } impl NotificationManager { - pub async fn new(meta_store: MetaStoreRef, meta_store_sql: Option) -> Self { + pub async fn new( + meta_store: Option, + meta_store_sql: Option, + ) -> Self { // notification waiting queue. let (task_tx, mut task_rx) = mpsc::unbounded_channel::(); let core = Arc::new(Mutex::new(NotificationManagerCore::new())); @@ -418,7 +421,7 @@ mod tests { #[tokio::test] async fn test_multiple_subscribers_one_worker() { - let mgr = NotificationManager::new(MemStore::new().into_ref(), None).await; + let mgr = NotificationManager::new(Some(MemStore::new().into_ref()), None).await; let worker_key1 = WorkerKey(HostAddress { host: "a".to_string(), port: 1, diff --git a/src/meta/src/manager/notification_version.rs b/src/meta/src/manager/notification_version.rs index ea461da976ac5..87ccd3ff7f548 100644 --- a/src/meta/src/manager/notification_version.rs +++ b/src/meta/src/manager/notification_version.rs @@ -31,7 +31,7 @@ pub enum NotificationVersionGenerator { // TODO: add pre-allocation if necessary impl NotificationVersionGenerator { pub async fn new( - meta_store: MetaStoreRef, + meta_store: Option, meta_store_sql: Option, ) -> MetaResult { if let Some(sql) = meta_store_sql { @@ -52,6 +52,7 @@ impl NotificationVersionGenerator { Ok(Self::SqlGenerator(current_version, sql.conn)) } else { + let meta_store = meta_store.unwrap(); let current_version = NotificationModelV1::new(&meta_store).await; Ok(Self::KvGenerator(current_version, meta_store)) } diff --git a/src/meta/src/model/catalog.rs b/src/meta/src/model/catalog.rs index 5d0d00a037ec2..146bbc2690afc 100644 --- a/src/meta/src/model/catalog.rs +++ b/src/meta/src/model/catalog.rs @@ -90,7 +90,7 @@ mod tests { #[tokio::test] async fn test_database() -> MetadataModelResult<()> { let env = MetaSrvEnv::for_test().await; - let store = env.meta_store(); + let store = env.meta_store_checked(); let databases = Database::list(store).await?; assert!(databases.is_empty()); assert!(Database::select(store, &0).await.unwrap().is_none());