Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sql-backend): disable etcd meta store initialization when sql backend is enabled #14902

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub enum MetaBackend {
#[default]
Mem,
Etcd,
Sql,
}

/// The section `[meta]` in `risingwave.toml`.
Expand Down
9 changes: 4 additions & 5 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use risingwave_common_heap_profiling::HeapProfiler;
use risingwave_meta::*;
use risingwave_meta_service::*;
pub use rpc::{ElectionClient, ElectionMember, EtcdElectionClient};
use server::{rpc_serve, MetaStoreSqlBackend};
use server::rpc_serve;

use crate::manager::MetaOpts;

Expand Down Expand Up @@ -213,10 +213,10 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
},
},
MetaBackend::Mem => MetaStoreBackend::Mem,
MetaBackend::Sql => MetaStoreBackend::Sql {
endpoint: opts.sql_endpoint.expect("sql endpoint is required"),
},
};
let sql_backend = opts
.sql_endpoint
.map(|endpoint| MetaStoreSqlBackend { endpoint });

validate_config(&config);

Expand Down Expand Up @@ -273,7 +273,6 @@ pub fn start(opts: MetaNodeOpts) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let (mut join_handle, leader_lost_handle, shutdown_send) = rpc_serve(
add_info,
backend,
sql_backend,
max_heartbeat_interval,
config.meta.meta_leader_lease_secs,
MetaOpts {
Expand Down
157 changes: 75 additions & 82 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::future::join_all;
use itertools::Itertools;
use otlp_embedded::TraceServiceServer;
use regex::Regex;
use risingwave_common::config::MetaBackend;
use risingwave_common::monitor::connection::{RouterExt, TcpConfig};
use risingwave_common::system_param::reader::SystemParamsRead;
use risingwave_common::telemetry::manager::TelemetryManager;
Expand All @@ -33,11 +34,13 @@ use risingwave_meta::controller::cluster::ClusterController;
use risingwave_meta::manager::MetadataManager;
use risingwave_meta::rpc::intercept::MetricsMiddlewareLayer;
use risingwave_meta::rpc::ElectionClientRef;
use risingwave_meta::MetaStoreBackend;
use risingwave_meta_model_migration::{Migrator, MigratorTrait};
use risingwave_meta_service::backup_service::BackupServiceImpl;
use risingwave_meta_service::cloud_service::CloudServiceImpl;
use risingwave_meta_service::cluster_service::ClusterServiceImpl;
use risingwave_meta_service::ddl_service::DdlServiceImpl;
use risingwave_meta_service::event_log_service::EventLogServiceImpl;
use risingwave_meta_service::health_service::HealthServiceImpl;
use risingwave_meta_service::heartbeat_service::HeartbeatServiceImpl;
use risingwave_meta_service::hummock_service::HummockServiceImpl;
Expand All @@ -50,13 +53,15 @@ use risingwave_meta_service::stream_service::StreamServiceImpl;
use risingwave_meta_service::system_params_service::SystemParamsServiceImpl;
use risingwave_meta_service::telemetry_service::TelemetryInfoServiceImpl;
use risingwave_meta_service::user_service::UserServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::backup_service::backup_service_server::BackupServiceServer;
use risingwave_pb::cloud_service::cloud_service_server::CloudServiceServer;
use risingwave_pb::connector_service::sink_coordination_service_server::SinkCoordinationServiceServer;
use risingwave_pb::ddl_service::ddl_service_server::DdlServiceServer;
use risingwave_pb::health::health_server::HealthServer;
use risingwave_pb::hummock::hummock_manager_service_server::HummockManagerServiceServer;
use risingwave_pb::meta::cluster_service_server::ClusterServiceServer;
use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;
use risingwave_pb::meta::heartbeat_service_server::HeartbeatServiceServer;
use risingwave_pb::meta::meta_member_service_server::MetaMemberServiceServer;
use risingwave_pb::meta::notification_service_server::NotificationServiceServer;
Expand Down Expand Up @@ -101,58 +106,15 @@ use crate::storage::{
use crate::stream::{GlobalStreamManager, SourceManager};
use crate::telemetry::{MetaReportCreator, MetaTelemetryInfoFetcher};
use crate::{hummock, serving, MetaError, MetaResult};
#[derive(Debug)]
pub struct MetaStoreSqlBackend {
pub(crate) endpoint: String,
}

use risingwave_meta::MetaStoreBackend;
use risingwave_meta_service::event_log_service::EventLogServiceImpl;
use risingwave_meta_service::AddressInfo;
use risingwave_pb::meta::event_log_service_server::EventLogServiceServer;

pub async fn rpc_serve(
address_info: AddressInfo,
meta_store_backend: MetaStoreBackend,
meta_store_sql_backend: Option<MetaStoreSqlBackend>,
max_cluster_heartbeat_interval: Duration,
lease_interval_secs: u64,
opts: MetaOpts,
init_system_params: SystemParams,
) -> MetaResult<(JoinHandle<()>, Option<JoinHandle<()>>, WatchSender<()>)> {
let meta_store_sql = match meta_store_sql_backend {
Some(backend) => {
let mut options = sea_orm::ConnectOptions::new(backend.endpoint);
options
.max_connections(20)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
let conn = sea_orm::Database::connect(options).await?;
Some(SqlMetaStore::new(conn))
}
None => None,
};

let mut election_client = if let Some(sql_store) = &meta_store_sql {
let id = address_info.advertise_addr.clone();
let conn = sql_store.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
DbBackend::MySql => Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))),
};

election_client.init().await?;

Some(election_client)
} else {
None
};

match meta_store_backend {
MetaStoreBackend::Etcd {
endpoints,
Expand All @@ -170,28 +132,26 @@ pub async fn rpc_serve(
.context("failed to connect etcd")?;
let meta_store = EtcdMetaStore::new(client).into_ref();

if election_client.is_none() {
// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}

election_client = Some(Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
));
// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}

let election_client: ElectionClientRef = Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
);

rpc_serve_with_store(
meta_store,
election_client,
meta_store_sql,
Some(meta_store),
Some(election_client),
None,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -202,9 +162,44 @@ pub async fn rpc_serve(
MetaStoreBackend::Mem => {
let meta_store = MemStore::new().into_ref();
rpc_serve_with_store(
meta_store,
election_client,
meta_store_sql,
Some(meta_store),
None,
None,
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
opts,
init_system_params,
)
}
MetaStoreBackend::Sql { endpoint } => {
let mut options = sea_orm::ConnectOptions::new(endpoint);
options
.max_connections(20)
.connect_timeout(Duration::from_secs(10))
.idle_timeout(Duration::from_secs(30));
let conn = sea_orm::Database::connect(options).await?;
let meta_store_sql = SqlMetaStore::new(conn);

// Init election client.
let id = address_info.advertise_addr.clone();
let conn = meta_store_sql.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
DbBackend::MySql => {
Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn)))
}
};

rpc_serve_with_store(
None,
Some(election_client),
Some(meta_store_sql),
address_info,
max_cluster_heartbeat_interval,
lease_interval_secs,
Expand All @@ -217,7 +212,7 @@ pub async fn rpc_serve(

#[expect(clippy::type_complexity)]
pub fn rpc_serve_with_store(
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
election_client: Option<ElectionClientRef>,
meta_store_sql: Option<SqlMetaStore>,
address_info: AddressInfo,
Expand Down Expand Up @@ -372,7 +367,7 @@ pub async fn start_service_as_election_follower(
/// ## Returns
/// Returns an error if the service initialization failed
pub async fn start_service_as_election_leader(
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
meta_store_sql: Option<SqlMetaStore>,
address_info: AddressInfo,
max_cluster_heartbeat_interval: Duration,
Expand All @@ -397,14 +392,7 @@ pub async fn start_service_as_election_leader(
)
.await?;

let system_params_manager = env.system_params_manager_ref();
let mut system_params_reader = system_params_manager.get_params().await;

// Using new reader instead if the controller is set.
let system_params_controller = env.system_params_controller_ref();
if let Some(ctl) = &system_params_controller {
system_params_reader = ctl.get_params().await;
}
let system_params_reader = env.system_params_reader().await;

let data_directory = system_params_reader.data_directory();
if !is_correct_data_directory(data_directory) {
Expand Down Expand Up @@ -651,8 +639,8 @@ pub async fn start_service_as_election_leader(
let backup_srv = BackupServiceImpl::new(backup_manager);
let telemetry_srv = TelemetryInfoServiceImpl::new(meta_store.clone(), env.sql_meta_store());
let system_params_srv = SystemParamsServiceImpl::new(
system_params_manager.clone(),
system_params_controller.clone(),
env.system_params_manager_ref(),
env.system_params_controller_ref(),
);
let serving_srv =
ServingServiceImpl::new(serving_vnode_mapping.clone(), metadata_manager.clone());
Expand Down Expand Up @@ -681,13 +669,13 @@ pub async fn start_service_as_election_leader(
hummock_manager.clone(),
meta_metrics.clone(),
));
if let Some(system_params_ctl) = system_params_controller {
if let Some(system_params_ctl) = env.system_params_controller_ref() {
sub_tasks.push(SystemParamsController::start_params_notifier(
system_params_ctl,
));
} else {
sub_tasks.push(SystemParamsManager::start_params_notifier(
system_params_manager.clone(),
env.system_params_manager_ref().unwrap(),
));
}
sub_tasks.push(HummockManager::hummock_timer_task(hummock_manager.clone()));
Expand Down Expand Up @@ -742,7 +730,10 @@ pub async fn start_service_as_election_leader(
Arc::new(MetaTelemetryInfoFetcher::new(env.cluster_id().clone())),
Arc::new(MetaReportCreator::new(
metadata_manager.clone(),
meta_store.meta_store_type(),
meta_store
.as_ref()
.map(|m| m.meta_store_type())
.unwrap_or(MetaBackend::Sql),
)),
);

Expand Down Expand Up @@ -787,8 +778,10 @@ pub async fn start_service_as_election_leader(
// Persist params before starting services so that invalid params that cause meta node
// to crash will not be persisted.
if meta_store_sql.is_none() {
system_params_manager.flush_params().await?;
env.cluster_id().put_at_meta_store(&meta_store).await?;
env.system_params_manager().unwrap().flush_params().await?;
env.cluster_id()
.put_at_meta_store(meta_store.as_ref().unwrap())
.await?;
}

tracing::info!("Assigned cluster id {:?}", *env.cluster_id());
Expand Down
12 changes: 9 additions & 3 deletions src/meta/service/src/system_params_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ use crate::controller::system_param::SystemParamsControllerRef;
use crate::manager::SystemParamsManagerRef;

pub struct SystemParamsServiceImpl {
system_params_manager: SystemParamsManagerRef,
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
}

impl SystemParamsServiceImpl {
pub fn new(
system_params_manager: SystemParamsManagerRef,
system_params_manager: Option<SystemParamsManagerRef>,
system_params_controller: Option<SystemParamsControllerRef>,
) -> Self {
Self {
Expand All @@ -48,7 +48,11 @@ impl SystemParamsService for SystemParamsServiceImpl {
let params = if let Some(ctl) = &self.system_params_controller {
ctl.get_pb_params().await
} else {
self.system_params_manager.get_pb_params().await
self.system_params_manager
.as_ref()
.unwrap()
.get_pb_params()
.await
};

Ok(Response::new(GetSystemParamsResponse {
Expand All @@ -65,6 +69,8 @@ impl SystemParamsService for SystemParamsServiceImpl {
ctl.set_param(&req.param, req.value).await?
} else {
self.system_params_manager
.as_ref()
.unwrap()
.set_param(&req.param, req.value)
.await?
};
Expand Down
14 changes: 8 additions & 6 deletions src/meta/service/src/telemetry_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use crate::storage::MetaStoreRef;
use crate::MetaResult;

pub struct TelemetryInfoServiceImpl {
meta_store: MetaStoreRef,
meta_store: Option<MetaStoreRef>,
sql_meta_store: Option<SqlMetaStore>,
}

impl TelemetryInfoServiceImpl {
pub fn new(meta_store: MetaStoreRef, sql_meta_store: Option<SqlMetaStore>) -> Self {
pub fn new(meta_store: Option<MetaStoreRef>, sql_meta_store: Option<SqlMetaStore>) -> Self {
Self {
meta_store,
sql_meta_store,
Expand All @@ -42,10 +42,12 @@ impl TelemetryInfoServiceImpl {
return Ok(cluster.map(|c| c.cluster_id.to_string().into()));
}

Ok(ClusterId::from_meta_store(&self.meta_store)
.await
.ok()
.flatten())
Ok(
ClusterId::from_meta_store(self.meta_store.as_ref().unwrap())
.await
.ok()
.flatten(),
)
}
}

Expand Down
Loading
Loading