Skip to content

Commit

Permalink
feat(sql-backend): disable etcd meta store initialization when sql ba…
Browse files Browse the repository at this point in the history
…ckend is enabled (#14902)
  • Loading branch information
yezizp2012 authored Feb 1, 2024
1 parent 7d0d43f commit 8cb9153
Show file tree
Hide file tree
Showing 23 changed files with 221 additions and 200 deletions.
1 change: 1 addition & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub enum MetaBackend {
#[default]
Mem,
Etcd,
Sql,
}

/// The section `[meta]` in `risingwave.toml`.
Expand Down
9 changes: 4 additions & 5 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_meta::*;
use risingwave_meta_service::*;
pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient};
use server::{rpc_serve, MetaStoreSqlBackend};
use server::rpc_serve;

use crate::manager::MetaOpts;

Expand Down Expand Up @@ -213,10 +213,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
},
},
MetaBackend::Mem => MetaStoreBackend::Mem,
MetaBackend::Sql => MetaStoreBackend::Sql {
endpoint: opts.sql_endpoint.expect("sql endpoint is required"),
},
};
let sql_backend = opts
.sql_endpoint
.map(|endpoint| MetaStoreSqlBackend { endpoint });

validate_config(&config);

Expand Down Expand Up @@ -273,7 +273,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
add_info,
backend,
sql_backend,
max_heartbeat_interval,
config.meta.meta_leader_lease_secs,
MetaOpts {
Expand Down
157 changes: 75 additions & 82 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::future::join_all;
use itertools::Itertools;
use otlp_embedded::TraceServiceServer;
use regex::Regex;
use risingwave_common::config::MetaBackend;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand All @@ -33,11 +34,13 @@ use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_service::backup_service::BackupServiceImpl;
use risingwave_meta_service::cloud_service::CloudServiceImpl;
use risingwave_meta_service::cluster_service::ClusterServiceImpl;
use risingwave_meta_service::ddl_service::DdlServiceImpl;
use risingwave_meta_service::event_log_service::EventLogServiceImpl;
use risingwave_meta_service::health_service::HealthServiceImpl;
use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
use risingwave_meta_service::hummock_service::HummockServiceImpl;
Expand All @@ -50,13 +53,15 @@ use risingwave_meta_service::stream_service::StreamServiceImpl;
use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
use risingwave_meta_service::user_service::UserServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
Expand Down Expand Up @@ -101,58 +106,15 @@ use crate::storage::{
use crate::stream::{GlobalStreamManager, SourceManager};
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
use crate::{hummock, serving, MetaError, MetaResult};
#[derive(Debug)]
pub struct MetaStoreSqlBackend {
pub(crate) endpoint: String,
}

use risingwave_meta::MetaStoreBackend;
use risingwave_meta_service::event_log_service::EventLogServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;

pub async fn rpc_serve(
address_info: AddressInfo,
meta_store_backend: MetaStoreBackend,
meta_store_sql_backend: Option<MetaStoreSqlBackend>,
max_cluster_heartbeat_interval: Duration,
lease_interval_secs: u64,
opts: MetaOpts,
init_system_params: SystemParams,
) -> MetaResult<(JoinHandle<()>, Option<JoinHandle<()>>, WatchSender<()>)> {
let meta_store_sql = match meta_store_sql_backend {
Some(backend) => {
let mut options = sea_orm::ConnectOptions::new(backend.endpoint);
options
.max_connections(20)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
let conn = sea_orm::Database::connect(options).await?;
Some(SqlMetaStore::new(conn))
}
None => None,
};

let mut election_client = if let Some(sql_store) = &meta_store_sql {
let id = address_info.advertise_addr.clone();
let conn = sql_store.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
DbBackend::MySql => Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))),
};

election_client.init().await?;

Some(election_client)
} else {
None
};

match meta_store_backend {
MetaStoreBackend::Etcd {
endpoints,
Expand All @@ -170,28 +132,26 @@ pub async fn rpc_serve(
.context("failed to connect etcd")?;
let meta_store = EtcdMetaStore::new(client).into_ref();

if election_client.is_none() {
// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}

election_client = Some(Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
));
// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}

let election_client: ElectionClientRef = Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
);

rpc_serve_with_store(
meta_store,
election_client,
meta_store_sql,
Some(meta_store),
Some(election_client),
None,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -202,9 +162,44 @@ pub async fn rpc_serve(
MetaStoreBackend::Mem => {
let meta_store = MemStore::new().into_ref();
rpc_serve_with_store(
meta_store,
election_client,
meta_store_sql,
Some(meta_store),
None,
None,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
)
}
MetaStoreBackend::Sql { endpoint } => {
let mut options = sea_orm::ConnectOptions::new(endpoint);
options
.max_connections(20)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
let conn = sea_orm::Database::connect(options).await?;
let meta_store_sql = SqlMetaStore::new(conn);

// Init election client.
let id = address_info.advertise_addr.clone();
let conn = meta_store_sql.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
DbBackend::MySql => {
Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
}
};

rpc_serve_with_store(
None,
Some(election_client),
Some(meta_store_sql),
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -217,7 +212,7 @@ pub async fn rpc_serve(

#[expect(clippy::type_complexity)]
pub fn rpc_serve_with_store(
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
election_client: Option<ElectionClientRef>,
meta_store_sql: Option<SqlMetaStore>,
address_info: AddressInfo,
Expand Down Expand Up @@ -372,7 +367,7 @@ pub async fn start_service_as_election_follower(
/// ## Returns
/// Returns an error if the service initialization failed
pub async fn start_service_as_election_leader(
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
meta_store_sql: Option<SqlMetaStore>,
address_info: AddressInfo,
max_cluster_heartbeat_interval: Duration,
Expand All @@ -397,14 +392,7 @@ pub async fn start_service_as_election_leader(
)
.await?;

let system_params_manager = env.system_params_manager_ref();
let mut system_params_reader = system_params_manager.get_params().await;

// Using new reader instead if the controller is set.
let system_params_controller = env.system_params_controller_ref();
if let Some(ctl) = &system_params_controller {
system_params_reader = ctl.get_params().await;
}
let system_params_reader = env.system_params_reader().await;

let data_directory = system_params_reader.data_directory();
if !is_correct_data_directory(data_directory) {
Expand Down Expand Up @@ -651,8 +639,8 @@ pub async fn start_service_as_election_leader(
let backup_srv = BackupServiceImpl::new(backup_manager);
let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone(), env.sql_meta_store());
let system_params_srv = SystemParamsServiceImpl::new(
system_params_manager.clone(),
system_params_controller.clone(),
env.system_params_manager_ref(),
env.system_params_controller_ref(),
);
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
Expand Down Expand Up @@ -681,13 +669,13 @@ pub async fn start_service_as_election_leader(
hummock_manager.clone(),
meta_metrics.clone(),
));
if let Some(system_params_ctl) = system_params_controller {
if let Some(system_params_ctl) = env.system_params_controller_ref() {
sub_tasks.push(SystemParamsController::start_params_notifier(
system_params_ctl,
));
} else {
sub_tasks.push(SystemParamsManager::start_params_notifier(
system_params_manager.clone(),
env.system_params_manager_ref().unwrap(),
));
}
sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone()));
Expand Down Expand Up @@ -742,7 +730,10 @@ pub async fn start_service_as_election_leader(
Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
Arc::new(MetaReportCreator::new(
metadata_manager.clone(),
meta_store.meta_store_type(),
meta_store
.as_ref()
.map(|m| m.meta_store_type())
.unwrap_or(MetaBackend::Sql),
)),
);

Expand Down Expand Up @@ -787,8 +778,10 @@ pub async fn start_service_as_election_leader(
// Persist params before starting services so that invalid params that cause meta node
// to crash will not be persisted.
if meta_store_sql.is_none() {
system_params_manager.flush_params().await?;
env.cluster_id().put_at_meta_store(&meta_store).await?;
env.system_params_manager().unwrap().flush_params().await?;
env.cluster_id()
.put_at_meta_store(meta_store.as_ref().unwrap())
.await?;
}

tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
Expand Down
12 changes: 9 additions & 3 deletions src/meta/service/src/system_params_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use crate::controller::system_param::SystemParamsControllerRef;
use crate::manager::SystemParamsManagerRef;

pub struct SystemParamsServiceImpl {
system_params_manager: SystemParamsManagerRef,
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
}

impl SystemParamsServiceImpl {
pub fn new(
system_params_manager: SystemParamsManagerRef,
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
) -> Self {
Self {
Expand All @@ -48,7 +48,11 @@ impl SystemParamsService for SystemParamsServiceImpl {
let params = if let Some(ctl) = &self.system_params_controller {
ctl.get_pb_params().await
} else {
self.system_params_manager.get_pb_params().await
self.system_params_manager
.as_ref()
.unwrap()
.get_pb_params()
.await
};

Ok(Response::new(GetSystemParamsResponse {
Expand All @@ -65,6 +69,8 @@ impl SystemParamsService for SystemParamsServiceImpl {
ctl.set_param(&req.param, req.value).await?
} else {
self.system_params_manager
.as_ref()
.unwrap()
.set_param(&req.param, req.value)
.await?
};
Expand Down
14 changes: 8 additions & 6 deletions src/meta/service/src/telemetry_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use crate::storage::MetaStoreRef;
use crate::MetaResult;

pub struct TelemetryInfoServiceImpl {
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
sql_meta_store: Option<SqlMetaStore>,
}

impl TelemetryInfoServiceImpl {
pub fn new(meta_store: MetaStoreRef, sql_meta_store: Option<SqlMetaStore>) -> Self {
pub fn new(meta_store: Option<MetaStoreRef>, sql_meta_store: Option<SqlMetaStore>) -> Self {
Self {
meta_store,
sql_meta_store,
Expand All @@ -42,10 +42,12 @@ impl TelemetryInfoServiceImpl {
return Ok(cluster.map(|c| c.cluster_id.to_string().into()));
}

Ok(ClusterId::from_meta_store(&self.meta_store)
.await
.ok()
.flatten())
Ok(
ClusterId::from_meta_store(self.meta_store.as_ref().unwrap())
.await
.ok()
.flatten(),
)
}
}

Expand Down
Loading

0 comments on commit 8cb9153

Please sign in to comment.