Skip to content

Commit

Permalink
refactor: unify kv and sql based fields as enum in meta env (#16184)
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Apr 9, 2024
1 parent 99d1996 commit 0380704
Show file tree
Hide file tree
Showing 35 changed files with 436 additions and 621 deletions.
96 changes: 32 additions & 64 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use etcd_client::ConnectOptions;
use futures::future::join_all;
use otlp_embedded::TraceServiceServer;
use regex::Regex;
use risingwave_common::config::MetaBackend;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand All @@ -31,7 +30,7 @@ use risingwave_common_service::tracing::TracingExtractLayer;
use risingwave_meta::barrier::StreamRpcManager;
use risingwave_meta::controller::catalog::CatalogController;
use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::manager::{MetaStoreImpl, MetadataManager, SystemParamsManagerImpl};
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::stream::ScaleController;
Expand Down Expand Up @@ -100,10 +99,7 @@ use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
};
use crate::serving::ServingVnodeMapping;
use crate::storage::{
EtcdMetaStore, MemStore, MetaStore, MetaStoreBoxExt, MetaStoreRef,
WrappedEtcdClient as EtcdClient,
};
use crate::storage::{EtcdMetaStore, MemStore, MetaStoreBoxExt, WrappedEtcdClient as EtcdClient};
use crate::stream::{GlobalStreamManager, SourceManager};
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
use crate::{hummock, serving, MetaError, MetaResult};
Expand Down Expand Up @@ -150,9 +146,8 @@ pub async fn rpc_serve(
);

rpc_serve_with_store(
Some(meta_store),
MetaStoreImpl::Kv(meta_store),
Some(election_client),
None,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -163,8 +158,7 @@ pub async fn rpc_serve(
MetaStoreBackend::Mem => {
let meta_store = MemStore::new().into_ref();
rpc_serve_with_store(
Some(meta_store),
None,
MetaStoreImpl::Kv(meta_store),
None,
address_info,
max_cluster_heartbeat_interval,
Expand Down Expand Up @@ -207,9 +201,8 @@ pub async fn rpc_serve(
election_client.init().await?;

rpc_serve_with_store(
None,
MetaStoreImpl::Sql(meta_store_sql),
Some(election_client),
Some(meta_store_sql),
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -222,9 +215,8 @@ pub async fn rpc_serve(

#[expect(clippy::type_complexity)]
pub fn rpc_serve_with_store(
meta_store: Option<MetaStoreRef>,
meta_store_impl: MetaStoreImpl,
election_client: Option<ElectionClientRef>,
meta_store_sql: Option<SqlMetaStore>,
address_info: AddressInfo,
max_cluster_heartbeat_interval: Duration,
lease_interval_secs: u64,
Expand Down Expand Up @@ -305,8 +297,7 @@ pub fn rpc_serve_with_store(
};

start_service_as_election_leader(
meta_store,
meta_store_sql,
meta_store_impl,
address_info,
max_cluster_heartbeat_interval,
opts,
Expand Down Expand Up @@ -377,8 +368,7 @@ pub async fn start_service_as_election_follower(
/// ## Returns
/// Returns an error if the service initialization failed
pub async fn start_service_as_election_leader(
meta_store: Option<MetaStoreRef>,
meta_store_sql: Option<SqlMetaStore>,
meta_store_impl: MetaStoreImpl,
address_info: AddressInfo,
max_cluster_heartbeat_interval: Duration,
opts: MetaOpts,
Expand All @@ -387,21 +377,14 @@ pub async fn start_service_as_election_leader(
mut svc_shutdown_rx: WatchReceiver<()>,
) -> MetaResult<()> {
tracing::info!("Defining leader services");
if let Some(sql_store) = &meta_store_sql {
if let MetaStoreImpl::Sql(sql_store) = &meta_store_impl {
// Try to upgrade if any new model changes are added.
Migrator::up(&sql_store.conn, None)
.await
.expect("Failed to upgrade models in meta store");
}

let env = MetaSrvEnv::new(
opts.clone(),
init_system_params,
meta_store.clone(),
meta_store_sql.clone(),
)
.await?;

let env = MetaSrvEnv::new(opts.clone(), init_system_params, meta_store_impl).await?;
let system_params_reader = env.system_params_reader().await;

let data_directory = system_params_reader.data_directory();
Expand All @@ -415,24 +398,25 @@ pub async fn start_service_as_election_leader(
)));
}

let metadata_manager = if meta_store_sql.is_some() {
let cluster_controller = Arc::new(
ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
.await
.unwrap(),
);
let catalog_controller = Arc::new(CatalogController::new(env.clone()).unwrap());
MetadataManager::new_v2(cluster_controller, catalog_controller)
} else {
MetadataManager::new_v1(
let metadata_manager = match env.meta_store() {
MetaStoreImpl::Kv(_) => MetadataManager::new_v1(
Arc::new(
ClusterManager::new(env.clone(), max_cluster_heartbeat_interval)
.await
.unwrap(),
),
Arc::new(CatalogManager::new(env.clone()).await.unwrap()),
Arc::new(FragmentManager::new(env.clone()).await.unwrap()),
)
),
MetaStoreImpl::Sql(_) => {
let cluster_controller = Arc::new(
ClusterController::new(env.clone(), max_cluster_heartbeat_interval)
.await
.unwrap(),
);
let catalog_controller = Arc::new(CatalogController::new(env.clone()));
MetadataManager::new_v2(cluster_controller, catalog_controller)
}
};

let serving_vnode_mapping = Arc::new(ServingVnodeMapping::default());
Expand Down Expand Up @@ -633,11 +617,8 @@ pub async fn start_service_as_election_leader(
);
let health_srv = HealthServiceImpl::new();
let backup_srv = BackupServiceImpl::new(backup_manager);
let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone(), env.sql_meta_store());
let system_params_srv = SystemParamsServiceImpl::new(
env.system_params_manager_ref(),
env.system_params_controller_ref(),
);
let telemetry_srv = TelemetryInfoServiceImpl::new(env.meta_store_ref());
let system_params_srv = SystemParamsServiceImpl::new(env.system_params_manager_impl_ref());
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
let cloud_srv = CloudServiceImpl::new(metadata_manager.clone(), aws_cli);
Expand Down Expand Up @@ -665,14 +646,13 @@ pub async fn start_service_as_election_leader(
hummock_manager.clone(),
meta_metrics.clone(),
));
if let Some(system_params_ctl) = env.system_params_controller_ref() {
sub_tasks.push(SystemParamsController::start_params_notifier(
system_params_ctl,
));
} else {
sub_tasks.push(SystemParamsManager::start_params_notifier(
env.system_params_manager_ref().unwrap(),
));
match env.system_params_manager_impl_ref() {
SystemParamsManagerImpl::Kv(mgr) => {
sub_tasks.push(SystemParamsManager::start_params_notifier(mgr));
}
SystemParamsManagerImpl::Sql(mgr) => {
sub_tasks.push(SystemParamsController::start_params_notifier(mgr));
}
}
sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone()));
sub_tasks.extend(HummockManager::compaction_event_loop(
Expand Down Expand Up @@ -726,10 +706,7 @@ pub async fn start_service_as_election_leader(
Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
Arc::new(MetaReportCreator::new(
metadata_manager.clone(),
meta_store
.as_ref()
.map(|m| m.meta_store_type())
.unwrap_or(MetaBackend::Sql),
env.meta_store().backend(),
)),
);

Expand Down Expand Up @@ -771,15 +748,6 @@ pub async fn start_service_as_election_leader(
}
};

// Persist params before starting services so that invalid params that cause meta node
// to crash will not be persisted.
if meta_store_sql.is_none() {
env.system_params_manager().unwrap().flush_params().await?;
env.cluster_id()
.put_at_meta_store(meta_store.as_ref().unwrap())
.await?;
}

tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
tracing::info!("Starting meta services");

Expand Down
35 changes: 9 additions & 26 deletions src/meta/service/src/system_params_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,21 @@
// limitations under the License.

use async_trait::async_trait;
use risingwave_meta::manager::SystemParamsManagerImpl;
use risingwave_pb::meta::system_params_service_server::SystemParamsService;
use risingwave_pb::meta::{
GetSystemParamsRequest, GetSystemParamsResponse, SetSystemParamRequest, SetSystemParamResponse,
};
use tonic::{Request, Response, Status};

use crate::controller::system_param::SystemParamsControllerRef;
use crate::manager::SystemParamsManagerRef;

pub struct SystemParamsServiceImpl {
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
system_params_manager: SystemParamsManagerImpl,
}

impl SystemParamsServiceImpl {
pub fn new(
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
) -> Self {
pub fn new(system_params_manager: SystemParamsManagerImpl) -> Self {
Self {
system_params_manager,
system_params_controller,
}
}
}
Expand All @@ -45,14 +38,9 @@ impl SystemParamsService for SystemParamsServiceImpl {
&self,
_request: Request<GetSystemParamsRequest>,
) -> Result<Response<GetSystemParamsResponse>, Status> {
let params = if let Some(ctl) = &self.system_params_controller {
ctl.get_pb_params().await
} else {
self.system_params_manager
.as_ref()
.unwrap()
.get_pb_params()
.await
let params = match &self.system_params_manager {
SystemParamsManagerImpl::Kv(mgr) => mgr.get_pb_params().await,
SystemParamsManagerImpl::Sql(mgr) => mgr.get_pb_params().await,
};

Ok(Response::new(GetSystemParamsResponse {
Expand All @@ -65,14 +53,9 @@ impl SystemParamsService for SystemParamsServiceImpl {
request: Request<SetSystemParamRequest>,
) -> Result<Response<SetSystemParamResponse>, Status> {
let req = request.into_inner();
let params = if let Some(ctl) = &self.system_params_controller {
ctl.set_param(&req.param, req.value).await?
} else {
self.system_params_manager
.as_ref()
.unwrap()
.set_param(&req.param, req.value)
.await?
let params = match &self.system_params_manager {
SystemParamsManagerImpl::Kv(mgr) => mgr.set_param(&req.param, req.value).await?,
SystemParamsManagerImpl::Sql(mgr) => mgr.set_param(&req.param, req.value).await?,
};

Ok(Response::new(SetSystemParamResponse {
Expand Down
34 changes: 14 additions & 20 deletions src/meta/service/src/telemetry_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,36 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_meta::manager::MetaStoreImpl;
use risingwave_meta_model_v2::prelude::Cluster;
use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoService;
use risingwave_pb::meta::{GetTelemetryInfoRequest, TelemetryInfoResponse};
use sea_orm::EntityTrait;
use tonic::{Request, Response, Status};

use crate::controller::SqlMetaStore;
use crate::model::ClusterId;
use crate::storage::MetaStoreRef;
use crate::MetaResult;

pub struct TelemetryInfoServiceImpl {
meta_store: Option<MetaStoreRef>,
sql_meta_store: Option<SqlMetaStore>,
meta_store_impl: MetaStoreImpl,
}

impl TelemetryInfoServiceImpl {
pub fn new(meta_store: Option<MetaStoreRef>, sql_meta_store: Option<SqlMetaStore>) -> Self {
Self {
meta_store,
sql_meta_store,
}
pub fn new(meta_store_impl: MetaStoreImpl) -> Self {
Self { meta_store_impl }
}

async fn get_tracking_id(&self) -> MetaResult<Option<ClusterId>> {
if let Some(store) = &self.sql_meta_store {
let cluster = Cluster::find().one(&store.conn).await?;
return Ok(cluster.map(|c| c.cluster_id.to_string().into()));
}

Ok(
ClusterId::from_meta_store(self.meta_store.as_ref().unwrap())
.await
.ok()
.flatten(),
)
let cluster_id = match &self.meta_store_impl {
MetaStoreImpl::Kv(meta_store) => {
ClusterId::from_meta_store(meta_store).await.ok().flatten()
}
MetaStoreImpl::Sql(sql_meta_store) => {
let cluster = Cluster::find().one(&sql_meta_store.conn).await?;
cluster.map(|c| c.cluster_id.to_string().into())
}
};
Ok(cluster_id)
}
}

Expand Down
1 change: 1 addition & 0 deletions src/meta/service/src/user_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl UserService for UserServiceImpl {
let id = self
.env
.id_gen_manager()
.as_kv()
.generate::<{ IdCategory::User }>()
.await? as u32;
let mut user = req.get_user()?.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/backup_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ impl BackupWorker {
let backup_manager_clone = self.backup_manager.clone();
let job = async move {
let mut snapshot_builder = meta_snapshot_builder::MetaSnapshotV1Builder::new(
backup_manager_clone.env.meta_store_ref(),
backup_manager_clone.env.meta_store().as_kv().clone(),
);
// Reuse job id as snapshot id.
let hummock_manager = backup_manager_clone.hummock_manager.clone();
Expand Down
23 changes: 11 additions & 12 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::hummock::{CommitEpochInfo, HummockManagerRef, NewTableFragmentInfo};
use crate::manager::sink_coordination::SinkCoordinatorManager;
use crate::manager::{
ActiveStreamingWorkerChange, ActiveStreamingWorkerNodes, LocalNotification, MetaSrvEnv,
MetadataManager, WorkerId,
MetadataManager, SystemParamsManagerImpl, WorkerId,
};
use crate::model::{ActorId, TableFragments};
use crate::rpc::metrics::MetaMetrics;
Expand Down Expand Up @@ -508,17 +508,16 @@ impl GlobalBarrierManager {
To resume the data sources, either restart the cluster again or use `risectl meta resume`.",
PAUSE_ON_NEXT_BOOTSTRAP_KEY
);
if let Some(system_ctl) = self.env.system_params_controller() {
system_ctl
.set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
.await?;
} else {
self.env
.system_params_manager()
.unwrap()
.set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
.await?;
}
match self.env.system_params_manager_impl_ref() {
SystemParamsManagerImpl::Kv(mgr) => {
mgr.set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
.await?;
}
SystemParamsManagerImpl::Sql(mgr) => {
mgr.set_param(PAUSE_ON_NEXT_BOOTSTRAP_KEY, Some("false".to_owned()))
.await?;
}
};
}
Ok(paused)
}
Expand Down
Loading

0 comments on commit 0380704

Please sign in to comment.