Skip to content

Commit

Permalink
feat: Integrate SQL election backend into the existing ORM. (#12976)
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky authored Oct 23, 2023
1 parent abafae0 commit 8b8c14c
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 314 deletions.
15 changes: 15 additions & 0 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,21 @@ profile:
exporter-port: 21250
- use: compactor

3meta:
steps:
- use: meta-node
port: 5690
dashboard-port: 5691
exporter-port: 1250
- use: meta-node
port: 15690
dashboard-port: 15691
exporter-port: 11250
- use: meta-node
port: 25690
dashboard-port: 25691
exporter-port: 21250

3etcd-3meta-1cn-1fe:
steps:
- use: minio
Expand Down
59 changes: 43 additions & 16 deletions src/meta/node/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServ
use risingwave_pb::meta::SystemParams;
use risingwave_pb::user::user_service_server::UserServiceServer;
use risingwave_rpc_client::ComputeClientPool;
use sea_orm::{ConnectionTrait, DbBackend};
use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver};
use tokio::sync::watch;
use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender};
Expand All @@ -79,6 +80,9 @@ use crate::manager::{
};
use crate::rpc::cloud_provider::AwsEc2Client;
use crate::rpc::election::etcd::EtcdElectionClient;
use crate::rpc::election::sql::{
MySqlDriver, PostgresDriver, SqlBackendElectionClient, SqliteDriver,
};
use crate::rpc::metrics::{
start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS,
};
Expand Down Expand Up @@ -119,6 +123,27 @@ pub async fn rpc_serve(
}
None => None,
};

let mut election_client = if let Some(sql_store) = &meta_store_sql {
let id = address_info.advertise_addr.clone();
let conn = sql_store.conn.clone();
let election_client: ElectionClientRef = match conn.get_database_backend() {
DbBackend::Sqlite => {
Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn)))
}
DbBackend::Postgres => {
Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn)))
}
DbBackend::MySql => Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))),
};

election_client.init().await?;

Some(election_client)
} else {
None
};

match meta_store_backend {
MetaStoreBackend::Etcd {
endpoints,
Expand All @@ -136,25 +161,27 @@ pub async fn rpc_serve(
.map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?;
let meta_store = EtcdMetaStore::new(client).into_ref();

// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}
if election_client.is_none() {
// `with_keep_alive` option will break the long connection in election client.
let mut election_options = ConnectOptions::default();
if let Some((username, password)) = &credentials {
election_options = election_options.with_user(username, password)
}

let election_client = Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
);
election_client = Some(Arc::new(
EtcdElectionClient::new(
endpoints,
Some(election_options),
auth_enabled,
address_info.advertise_addr.clone(),
)
.await?,
));
}

rpc_serve_with_store(
meta_store,
Some(election_client),
election_client,
meta_store_sql,
address_info,
max_cluster_heartbeat_interval,
Expand All @@ -167,7 +194,7 @@ pub async fn rpc_serve(
let meta_store = MemStore::new().into_ref();
rpc_serve_with_store(
meta_store,
None,
election_client,
meta_store_sql,
address_info,
max_cluster_heartbeat_interval,
Expand Down
29 changes: 0 additions & 29 deletions src/meta/src/model_v2/election_leader.rs

This file was deleted.

29 changes: 0 additions & 29 deletions src/meta/src/model_v2/election_member.rs

This file was deleted.

60 changes: 1 addition & 59 deletions src/meta/src/model_v2/migration/src/m20230908_072257_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ impl MigrationTrait for Migration {
.has_table(SystemParameter::Table.to_string())
.await?
);
assert!(!manager.has_table(ElectionLeader::Table.to_string()).await?);
assert!(!manager.has_table(ElectionMember::Table.to_string()).await?);

// 2. create tables.
manager
Expand Down Expand Up @@ -634,44 +632,6 @@ impl MigrationTrait for Migration {
.to_owned(),
)
.await?;
manager
.create_table(
MigrationTable::create()
.table(ElectionLeader::Table)
.col(
ColumnDef::new(ElectionLeader::Service)
.string()
.primary_key()
.not_null(),
)
.col(ColumnDef::new(ElectionLeader::Id).string().not_null())
.col(
ColumnDef::new(ElectionLeader::LastHeartbeat)
.timestamp()
.not_null(),
)
.to_owned(),
)
.await?;
manager
.create_table(
MigrationTable::create()
.table(ElectionMember::Table)
.col(ColumnDef::new(ElectionMember::Service).string().not_null())
.col(
ColumnDef::new(ElectionMember::Id)
.string()
.primary_key()
.not_null(),
)
.col(
ColumnDef::new(ElectionMember::LastHeartbeat)
.timestamp()
.not_null(),
)
.to_owned(),
)
.await?;

// 3. create indexes.
manager
Expand Down Expand Up @@ -790,9 +750,7 @@ impl MigrationTrait for Migration {
Function,
Object,
ObjectDependency,
SystemParameter,
ElectionLeader,
ElectionMember
SystemParameter
);
Ok(())
}
Expand Down Expand Up @@ -1023,19 +981,3 @@ enum SystemParameter {
IsMutable,
Description,
}

#[derive(DeriveIden)]
enum ElectionLeader {
Table,
Service,
Id,
LastHeartbeat,
}

#[derive(DeriveIden)]
enum ElectionMember {
Table,
Service,
Id,
LastHeartbeat,
}
2 changes: 0 additions & 2 deletions src/meta/src/model_v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub mod compaction_status;
pub mod compaction_task;
pub mod connection;
pub mod database;
pub mod election_leader;
pub mod election_member;
pub mod ext;
pub mod fragment;
pub mod function;
Expand Down
2 changes: 0 additions & 2 deletions src/meta/src/model_v2/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ pub use super::compaction_status::Entity as CompactionStatus;
pub use super::compaction_task::Entity as CompactionTask;
pub use super::connection::Entity as Connection;
pub use super::database::Entity as Database;
pub use super::election_leader::Entity as ElectionLeader;
pub use super::election_member::Entity as ElectionMember;
pub use super::fragment::Entity as Fragment;
pub use super::function::Entity as Function;
pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot;
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/rpc/election/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ pub struct ElectionMember {

#[async_trait::async_trait]
pub trait ElectionClient: Send + Sync + 'static {
async fn init(&self) -> MetaResult<()> {
Ok(())
}

fn id(&self) -> MetaResult<String>;
async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>;
fn subscribe(&self) -> Receiver<bool>;
Expand Down
Loading

0 comments on commit 8b8c14c

Please sign in to comment.