From 8b8c14c8b5304fbef1802fa3b57f059f5418ba96 Mon Sep 17 00:00:00 2001 From: Shanicky Chen Date: Mon, 23 Oct 2023 15:22:12 +0800 Subject: [PATCH] feat: Integrate SQL election backend into the existing ORM. (#12976) Signed-off-by: Shanicky Chen --- risedev.yml | 15 + src/meta/node/src/server.rs | 59 +- src/meta/src/model_v2/election_leader.rs | 29 - src/meta/src/model_v2/election_member.rs | 29 - .../migration/src/m20230908_072257_init.rs | 60 +- src/meta/src/model_v2/mod.rs | 2 - src/meta/src/model_v2/prelude.rs | 2 - src/meta/src/rpc/election/mod.rs | 4 + src/meta/src/rpc/election/sql.rs | 562 ++++++++++++------ 9 files changed, 448 insertions(+), 314 deletions(-) delete mode 100644 src/meta/src/model_v2/election_leader.rs delete mode 100644 src/meta/src/model_v2/election_member.rs diff --git a/risedev.yml b/risedev.yml index 8367c9fe99d9b..a5ba8a7b43f97 100644 --- a/risedev.yml +++ b/risedev.yml @@ -270,6 +270,21 @@ profile: exporter-port: 21250 - use: compactor + 3meta: + steps: + - use: meta-node + port: 5690 + dashboard-port: 5691 + exporter-port: 1250 + - use: meta-node + port: 15690 + dashboard-port: 15691 + exporter-port: 11250 + - use: meta-node + port: 25690 + dashboard-port: 25691 + exporter-port: 21250 + 3etcd-3meta-1cn-1fe: steps: - use: minio diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index d5cbfa3e3b26a..d922f1c37e033 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -62,6 +62,7 @@ use risingwave_pb::meta::telemetry_info_service_server::TelemetryInfoServiceServ use risingwave_pb::meta::SystemParams; use risingwave_pb::user::user_service_server::UserServiceServer; use risingwave_rpc_client::ComputeClientPool; +use sea_orm::{ConnectionTrait, DbBackend}; use tokio::sync::oneshot::{channel as OneChannel, Receiver as OneReceiver}; use tokio::sync::watch; use tokio::sync::watch::{Receiver as WatchReceiver, Sender as WatchSender}; @@ -79,6 +80,9 @@ use crate::manager::{ }; use crate::rpc::cloud_provider::AwsEc2Client; use crate::rpc::election::etcd::EtcdElectionClient; +use crate::rpc::election::sql::{ + MySqlDriver, PostgresDriver, SqlBackendElectionClient, SqliteDriver, +}; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, }; @@ -119,6 +123,27 @@ pub async fn rpc_serve( } None => None, }; + + let mut election_client = if let Some(sql_store) = &meta_store_sql { + let id = address_info.advertise_addr.clone(); + let conn = sql_store.conn.clone(); + let election_client: ElectionClientRef = match conn.get_database_backend() { + DbBackend::Sqlite => { + Arc::new(SqlBackendElectionClient::new(id, SqliteDriver::new(conn))) + } + DbBackend::Postgres => { + Arc::new(SqlBackendElectionClient::new(id, PostgresDriver::new(conn))) + } + DbBackend::MySql => Arc::new(SqlBackendElectionClient::new(id, MySqlDriver::new(conn))), + }; + + election_client.init().await?; + + Some(election_client) + } else { + None + }; + match meta_store_backend { MetaStoreBackend::Etcd { endpoints, @@ -136,25 +161,27 @@ pub async fn rpc_serve( .map_err(|e| anyhow::anyhow!("failed to connect etcd {}", e))?; let meta_store = EtcdMetaStore::new(client).into_ref(); - // `with_keep_alive` option will break the long connection in election client. - let mut election_options = ConnectOptions::default(); - if let Some((username, password)) = &credentials { - election_options = election_options.with_user(username, password) - } + if election_client.is_none() { + // `with_keep_alive` option will break the long connection in election client. + let mut election_options = ConnectOptions::default(); + if let Some((username, password)) = &credentials { + election_options = election_options.with_user(username, password) + } - let election_client = Arc::new( - EtcdElectionClient::new( - endpoints, - Some(election_options), - auth_enabled, - address_info.advertise_addr.clone(), - ) - .await?, - ); + election_client = Some(Arc::new( + EtcdElectionClient::new( + endpoints, + Some(election_options), + auth_enabled, + address_info.advertise_addr.clone(), + ) + .await?, + )); + } rpc_serve_with_store( meta_store, - Some(election_client), + election_client, meta_store_sql, address_info, max_cluster_heartbeat_interval, @@ -167,7 +194,7 @@ pub async fn rpc_serve( let meta_store = MemStore::new().into_ref(); rpc_serve_with_store( meta_store, - None, + election_client, meta_store_sql, address_info, max_cluster_heartbeat_interval, diff --git a/src/meta/src/model_v2/election_leader.rs b/src/meta/src/model_v2/election_leader.rs deleted file mode 100644 index b56991ffd26f9..0000000000000 --- a/src/meta/src/model_v2/election_leader.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use sea_orm::entity::prelude::*; - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "election_leader")] -pub struct Model { - #[sea_orm(primary_key, auto_increment = false)] - pub service: String, - pub id: String, - pub last_heartbeat: DateTime, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/src/model_v2/election_member.rs b/src/meta/src/model_v2/election_member.rs deleted file mode 100644 index 4c441d5aa0b73..0000000000000 --- a/src/meta/src/model_v2/election_member.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2023 RisingWave Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use sea_orm::entity::prelude::*; - -#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] -#[sea_orm(table_name = "election_member")] -pub struct Model { - pub service: String, - #[sea_orm(primary_key, auto_increment = false)] - pub id: String, - pub last_heartbeat: DateTime, -} - -#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] -pub enum Relation {} - -impl ActiveModelBehavior for ActiveModel {} diff --git a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs index a50e3bcba6e69..43a8e5d24d22f 100644 --- a/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/src/model_v2/migration/src/m20230908_072257_init.rs @@ -34,8 +34,6 @@ impl MigrationTrait for Migration { .has_table(SystemParameter::Table.to_string()) .await? ); - assert!(!manager.has_table(ElectionLeader::Table.to_string()).await?); - assert!(!manager.has_table(ElectionMember::Table.to_string()).await?); // 2. create tables. manager @@ -634,44 +632,6 @@ impl MigrationTrait for Migration { .to_owned(), ) .await?; - manager - .create_table( - MigrationTable::create() - .table(ElectionLeader::Table) - .col( - ColumnDef::new(ElectionLeader::Service) - .string() - .primary_key() - .not_null(), - ) - .col(ColumnDef::new(ElectionLeader::Id).string().not_null()) - .col( - ColumnDef::new(ElectionLeader::LastHeartbeat) - .timestamp() - .not_null(), - ) - .to_owned(), - ) - .await?; - manager - .create_table( - MigrationTable::create() - .table(ElectionMember::Table) - .col(ColumnDef::new(ElectionMember::Service).string().not_null()) - .col( - ColumnDef::new(ElectionMember::Id) - .string() - .primary_key() - .not_null(), - ) - .col( - ColumnDef::new(ElectionMember::LastHeartbeat) - .timestamp() - .not_null(), - ) - .to_owned(), - ) - .await?; // 3. create indexes. manager @@ -790,9 +750,7 @@ impl MigrationTrait for Migration { Function, Object, ObjectDependency, - SystemParameter, - ElectionLeader, - ElectionMember + SystemParameter ); Ok(()) } @@ -1023,19 +981,3 @@ enum SystemParameter { IsMutable, Description, } - -#[derive(DeriveIden)] -enum ElectionLeader { - Table, - Service, - Id, - LastHeartbeat, -} - -#[derive(DeriveIden)] -enum ElectionMember { - Table, - Service, - Id, - LastHeartbeat, -} diff --git a/src/meta/src/model_v2/mod.rs b/src/meta/src/model_v2/mod.rs index d053a513a9671..d799a608933ac 100644 --- a/src/meta/src/model_v2/mod.rs +++ b/src/meta/src/model_v2/mod.rs @@ -26,8 +26,6 @@ pub mod compaction_status; pub mod compaction_task; pub mod connection; pub mod database; -pub mod election_leader; -pub mod election_member; pub mod ext; pub mod fragment; pub mod function; diff --git a/src/meta/src/model_v2/prelude.rs b/src/meta/src/model_v2/prelude.rs index 37496e486afcb..ab9670f712f04 100644 --- a/src/meta/src/model_v2/prelude.rs +++ b/src/meta/src/model_v2/prelude.rs @@ -19,8 +19,6 @@ pub use super::compaction_status::Entity as CompactionStatus; pub use super::compaction_task::Entity as CompactionTask; pub use super::connection::Entity as Connection; pub use super::database::Entity as Database; -pub use super::election_leader::Entity as ElectionLeader; -pub use super::election_member::Entity as ElectionMember; pub use super::fragment::Entity as Fragment; pub use super::function::Entity as Function; pub use super::hummock_pinned_snapshot::Entity as HummockPinnedSnapshot; diff --git a/src/meta/src/rpc/election/mod.rs b/src/meta/src/rpc/election/mod.rs index 9835c554b3fd3..7916ddba6eea4 100644 --- a/src/meta/src/rpc/election/mod.rs +++ b/src/meta/src/rpc/election/mod.rs @@ -29,6 +29,10 @@ pub struct ElectionMember { #[async_trait::async_trait] pub trait ElectionClient: Send + Sync + 'static { + async fn init(&self) -> MetaResult<()> { + Ok(()) + } + fn id(&self) -> MetaResult; async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; fn subscribe(&self) -> Receiver; diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index af8081f170089..fc985bd9a4521 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -15,7 +15,11 @@ use std::sync::Arc; use std::time::Duration; -use sqlx::{MySql, MySqlPool, PgPool, Postgres, Sqlite, SqlitePool}; +use anyhow::anyhow; +use sea_orm::{ + ConnectionTrait, DatabaseBackend, DatabaseConnection, FromQueryResult, Statement, + TransactionTrait, Value, +}; use tokio::sync::watch; use tokio::sync::watch::Receiver; use tokio::time; @@ -29,7 +33,18 @@ pub struct SqlBackendElectionClient { is_leader_sender: watch::Sender, } -#[derive(sqlx::FromRow, Debug)] +impl SqlBackendElectionClient { + pub fn new(id: String, driver: Arc) -> Self { + let (sender, _) = watch::channel(false); + Self { + id, + driver, + is_leader_sender: sender, + } + } +} + +#[derive(sqlx::FromRow, Debug, FromQueryResult)] pub struct ElectionRow { service: String, id: String, @@ -37,6 +52,8 @@ pub struct ElectionRow { #[async_trait::async_trait] pub trait SqlDriver: Send + Sync + 'static { + async fn init_database(&self) -> MetaResult<()>; + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()>; async fn try_campaign(&self, service_name: &str, id: &str, ttl: i64) @@ -50,7 +67,7 @@ pub trait SqlDriver: Send + Sync + 'static { pub trait SqlDriverCommon { const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader"; - const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_members"; + const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_member"; fn election_table_name() -> &'static str { Self::ELECTION_LEADER_TABLE_NAME @@ -67,34 +84,69 @@ impl SqlDriverCommon for PostgresDriver {} impl SqlDriverCommon for SqliteDriver {} pub struct MySqlDriver { - pool: MySqlPool, + pub(crate) conn: DatabaseConnection, +} + +impl MySqlDriver { + pub fn new(conn: DatabaseConnection) -> Arc { + Arc::new(Self { conn }) + } } pub struct PostgresDriver { - pool: PgPool, + pub(crate) conn: DatabaseConnection, +} + +impl PostgresDriver { + pub fn new(conn: DatabaseConnection) -> Arc { + Arc::new(Self { conn }) + } } pub struct SqliteDriver { - pool: SqlitePool, + pub(crate) conn: DatabaseConnection, +} + +impl SqliteDriver { + pub fn new(conn: DatabaseConnection) -> Arc { + Arc::new(Self { conn }) + } } #[async_trait::async_trait] impl SqlDriver for SqliteDriver { + async fn init_database(&self) -> MetaResult<()> { + self.conn.execute( + Statement::from_string(DatabaseBackend::Sqlite, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#, + table = Self::member_table_name() + ))).await?; + + self.conn.execute( + Statement::from_string(DatabaseBackend::Sqlite, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#, + table = Self::election_table_name() + ))).await?; + + Ok(()) + } + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { - sqlx::query(&format!( - r#"INSERT INTO {table} (id, service, last_heartbeat) + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) VALUES($1, $2, CURRENT_TIMESTAMP) ON CONFLICT (id, service) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; "#, - table = Self::member_table_name() - )) - .bind(id) - .bind(service_name) - .execute(&self.pool) - .await?; - + table = Self::member_table_name() + ), + vec![Value::from(id), Value::from(service_name)], + )) + .await?; Ok(()) } @@ -104,79 +156,108 @@ DO id: &str, ttl: i64, ) -> MetaResult { - let row = sqlx::query_as::(&format!( - r#"INSERT INTO {table} (service, id, last_heartbeat) -VALUES ($1, $2, CURRENT_TIMESTAMP) -ON CONFLICT (service) - DO UPDATE - SET id = CASE - WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' second') < CURRENT_TIMESTAMP THEN EXCLUDED.id - ELSE {table}.id - END, - last_heartbeat = CASE - WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' seconds') < CURRENT_TIMESTAMP THEN EXCLUDED.last_heartbeat - WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat - ELSE {table}.last_heartbeat - END -RETURNING service, id, last_heartbeat; -"#, - table = Self::election_table_name() - )) - .bind(service_name) - .bind(id) - .bind(ttl) - .fetch_one(&self.pool) + let query_result = self.conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#"INSERT INTO {table} (service, id, last_heartbeat) + VALUES ($1, $2, CURRENT_TIMESTAMP) + ON CONFLICT (service) + DO UPDATE + SET id = CASE + WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' second') < CURRENT_TIMESTAMP THEN EXCLUDED.id + ELSE {table}.id + END, + last_heartbeat = CASE + WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' seconds') < CURRENT_TIMESTAMP THEN EXCLUDED.last_heartbeat + WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat + ELSE {table}.last_heartbeat + END + RETURNING service, id, last_heartbeat; + "#, + table = Self::election_table_name() + ), + vec![Value::from(service_name), Value::from(id), Value::from(ttl)], + )) .await?; + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; + + let row = row.ok_or_else(|| anyhow!("bad result from sqlite"))?; + Ok(row) } async fn leader(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::<_, ElectionRow>(&format!( + let string = format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, table = Self::election_table_name() - )) - .bind(service_name) - .fetch_optional(&self.pool) - .await?; + ); + + let query_result = self + .conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + string, + vec![Value::from(service_name)], + )) + .await?; + + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; Ok(row) } async fn candidates(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::<_, ElectionRow>(&format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = Self::member_table_name() - )) - .bind(service_name) - .fetch_all(&self.pool) - .await?; + let all = self + .conn + .query_all(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = Self::member_table_name() + ), + vec![Value::from(service_name)], + )) + .await?; - Ok(row) + let rows = all + .into_iter() + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .collect::>()?; + + Ok(rows) } async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { - let mut txn = self.pool.begin().await?; - sqlx::query(&format!( - r#" - DELETE FROM {table} WHERE service = $1 AND id = $2; - "#, - table = Self::election_table_name() + let txn = self.conn.begin().await?; + + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = Self::election_table_name() + ), + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; - sqlx::query(&format!( - r#" - DELETE FROM {table} WHERE service = $1 AND id = $2; - "#, - table = Self::member_table_name() + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::Sqlite, + format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = Self::member_table_name() + ), + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; txn.commit().await?; @@ -187,19 +268,39 @@ RETURNING service, id, last_heartbeat; #[async_trait::async_trait] impl SqlDriver for MySqlDriver { + async fn init_database(&self) -> MetaResult<()> { + self.conn.execute( + Statement::from_string(DatabaseBackend::MySql, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id));"#, + table = Self::member_table_name() + ))).await?; + + self.conn.execute( + Statement::from_string(DatabaseBackend::MySql, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service));"#, + table = Self::election_table_name() + ))).await?; + + Ok(()) + } + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { - sqlx::query(&format!( + let string = format!( r#"INSERT INTO {table} (id, service, last_heartbeat) VALUES(?, ?, NOW()) ON duplicate KEY UPDATE last_heartbeat = VALUES(last_heartbeat); "#, table = Self::member_table_name() - )) - .bind(id) - .bind(service_name) - .execute(&self.pool) - .await?; + ); + + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::MySql, + string, + vec![Value::from(id), Value::from(service_name)], + )) + .await?; Ok(()) } @@ -210,82 +311,119 @@ ON duplicate KEY id: &str, ttl: i64, ) -> MetaResult { - let _ = sqlx::query::(&format!( - r#"INSERT - IGNORE -INTO {table} (service, id, last_heartbeat) -VALUES (?, ?, NOW()) -ON duplicate KEY - UPDATE id = if(last_heartbeat < NOW() - INTERVAL ? SECOND, - VALUES(id), id), - last_heartbeat = if(id = - VALUES(id), - VALUES(last_heartbeat), last_heartbeat);"#, - table = Self::election_table_name() - )) - .bind(service_name) - .bind(id) - .bind(ttl) - .execute(&self.pool) - .await?; + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::MySql, + format!( + r#"INSERT + IGNORE + INTO {table} (service, id, last_heartbeat) + VALUES (?, ?, NOW()) + ON duplicate KEY + UPDATE id = if(last_heartbeat < NOW() - INTERVAL ? SECOND, + VALUES(id), id), + last_heartbeat = if(id = + VALUES(id), + VALUES(last_heartbeat), last_heartbeat);"#, + table = Self::election_table_name() + ), + vec![Value::from(service_name), Value::from(id), Value::from(ttl)], + )) + .await?; - let row = sqlx::query_as::(&format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = Self::election_table_name(), - )) - .bind(service_name) - .fetch_one(&self.pool) - .await?; + let query_result = self + .conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::MySql, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = Self::election_table_name(), + ), + vec![Value::from(service_name)], + )) + .await?; + + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; + + let row = row.ok_or_else(|| anyhow!("bad result from mysql"))?; Ok(row) } async fn leader(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::(&format!( + let string = format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, table = Self::election_table_name() - )) - .bind(service_name) - .fetch_optional(&self.pool) - .await?; + ); + + let query_result = self + .conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::MySql, + string, + vec![Value::from(service_name)], + )) + .await?; + + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; Ok(row) } async fn candidates(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::(&format!( + let string = format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, table = Self::member_table_name() - )) - .bind(service_name) - .fetch_all(&self.pool) - .await?; + ); + + let all = self + .conn + .query_all(Statement::from_sql_and_values( + DatabaseBackend::MySql, + string, + vec![Value::from(service_name)], + )) + .await?; - Ok(row) + let rows = all + .into_iter() + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .collect::>()?; + + Ok(rows) } async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { - let mut txn = self.pool.begin().await?; - sqlx::query(&format!( - r#" - DELETE FROM {table} WHERE service = ? AND id = ?; - "#, - table = Self::election_table_name() + let txn = self.conn.begin().await?; + + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::MySql, + format!( + r#" + DELETE FROM {table} WHERE service = ? AND id = ?; + "#, + table = Self::election_table_name() + ), + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; - sqlx::query(&format!( + let string = format!( r#" DELETE FROM {table} WHERE service = ? AND id = ?; "#, table = Self::member_table_name() + ); + + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::MySql, + string, + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; txn.commit().await?; @@ -296,8 +434,24 @@ ON duplicate KEY #[async_trait::async_trait] impl SqlDriver for PostgresDriver { + async fn init_database(&self) -> MetaResult<()> { + self.conn.execute( + Statement::from_string(DatabaseBackend::Postgres, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service, id));"#, + table = Self::member_table_name() + ))).await?; + + self.conn.execute( + Statement::from_string(DatabaseBackend::Postgres, format!( + r#"CREATE TABLE IF NOT EXISTS {table} (service VARCHAR, id VARCHAR, last_heartbeat TIMESTAMPTZ, PRIMARY KEY (service));"#, + table = Self::election_table_name() + ))).await?; + + Ok(()) + } + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { - sqlx::query(&format!( + let string = format!( r#"INSERT INTO {table} (id, service, last_heartbeat) VALUES($1, $2, NOW()) ON CONFLICT (id, service) @@ -305,11 +459,15 @@ DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; "#, table = Self::member_table_name() - )) - .bind(id) - .bind(service_name) - .execute(&self.pool) - .await?; + ); + + self.conn + .execute(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + string, + vec![Value::from(id), Value::from(service_name)], + )) + .await?; Ok(()) } @@ -320,7 +478,7 @@ DO id: &str, ttl: i64, ) -> MetaResult { - let row = sqlx::query_as::(&format!( + let string = format!( r#"INSERT INTO {table} (service, id, last_heartbeat) VALUES ($1, $2, NOW()) ON CONFLICT (service) @@ -337,62 +495,101 @@ ON CONFLICT (service) RETURNING service, id, last_heartbeat; "#, table = Self::election_table_name() - )) - .bind(service_name) - .bind(id) - .bind(Duration::from_secs(ttl as u64)) - .fetch_one(&self.pool) + ); + + let query_result = self + .conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + string, + vec![ + Value::from(service_name), + Value::from(id), + // special handling for interval + Value::from(ttl.to_string()), + ], + )) .await?; + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; + + let row = row.ok_or_else(|| anyhow!("bad result from postgres"))?; + Ok(row) } async fn leader(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::(&format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = Self::election_table_name() - )) - .bind(service_name) - .fetch_optional(&self.pool) - .await?; + let query_result = self + .conn + .query_one(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = Self::election_table_name() + ), + vec![Value::from(service_name)], + )) + .await?; + + let row = query_result + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .transpose()?; Ok(row) } async fn candidates(&self, service_name: &str) -> MetaResult> { - let row = sqlx::query_as::(&format!( + let string = format!( r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, table = Self::member_table_name() - )) - .bind(service_name) - .fetch_all(&self.pool) - .await?; + ); + + let all = self + .conn + .query_all(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + string, + vec![Value::from(service_name)], + )) + .await?; - Ok(row) + let rows = all + .into_iter() + .map(|query_result| ElectionRow::from_query_result(&query_result, "")) + .collect::>()?; + + Ok(rows) } async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { - let mut txn = self.pool.begin().await?; - sqlx::query(&format!( - r#" - DELETE FROM {table} WHERE service = $1 AND id = $2; - "#, - table = Self::election_table_name() + let txn = self.conn.begin().await?; + + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = Self::election_table_name() + ), + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; - sqlx::query(&format!( + let string = format!( r#" DELETE FROM {table} WHERE service = $1 AND id = $2; "#, table = Self::member_table_name() + ); + + txn.execute(Statement::from_sql_and_values( + DatabaseBackend::Postgres, + string, + vec![Value::from(service_name), Value::from(id)], )) - .bind(service_name) - .bind(id) - .execute(&mut *txn) .await?; txn.commit().await?; @@ -406,6 +603,11 @@ impl ElectionClient for SqlBackendElectionClient where T: SqlDriver + Send + Sync + 'static, { + async fn init(&self) -> MetaResult<()> { + tracing::info!("initializing database for Sql backend election client"); + self.driver.init_database().await + } + fn id(&self) -> MetaResult { Ok(self.id.clone()) } @@ -540,34 +742,40 @@ where mod tests { use std::sync::Arc; - use sqlx::sqlite::SqlitePoolOptions; - use sqlx::SqlitePool; + use sea_orm::{ConnectionTrait, Database, DatabaseConnection, DbBackend, Statement}; use tokio::sync::watch; use crate::rpc::election::sql::{SqlBackendElectionClient, SqlDriverCommon, SqliteDriver}; use crate::{ElectionClient, MetaResult}; - async fn prepare_sqlite_env() -> MetaResult { - let pool = SqlitePoolOptions::new().connect("sqlite::memory:").await?; - let _ = sqlx::query( - &format!("CREATE TABLE {table} (service VARCHAR(256) PRIMARY KEY, id VARCHAR(256), last_heartbeat DATETIME)", - table = SqliteDriver::election_table_name())) - .execute(&pool).await?; + async fn prepare_sqlite_env() -> MetaResult { + let db: DatabaseConnection = Database::connect("sqlite::memory:").await?; + + db.execute(Statement::from_sql_and_values( + DbBackend::Sqlite, + format!("CREATE TABLE {table} (service VARCHAR(256) PRIMARY KEY, id VARCHAR(256), last_heartbeat DATETIME)", + table = SqliteDriver::election_table_name()), + vec![], + )) + .await?; - let _ = sqlx::query( - &format!("CREATE TABLE {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id))", - table = SqliteDriver::member_table_name())) - .execute(&pool).await?; + db.execute(Statement::from_sql_and_values( + DbBackend::Sqlite, + format!("CREATE TABLE {table} (service VARCHAR(256), id VARCHAR(256), last_heartbeat DATETIME, PRIMARY KEY (service, id))", + table = SqliteDriver::member_table_name()), + vec![], + )) + .await?; - Ok(pool) + Ok(db) } #[tokio::test] async fn test_sql_election() { let id = "test_id".to_string(); - let pool = prepare_sqlite_env().await.unwrap(); + let conn = prepare_sqlite_env().await.unwrap(); - let provider = SqliteDriver { pool }; + let provider = SqliteDriver { conn }; let (sender, _) = watch::channel(false); let sql_election_client: Arc = Arc::new(SqlBackendElectionClient { id, @@ -597,10 +805,10 @@ mod tests { let mut clients = vec![]; - let pool = prepare_sqlite_env().await.unwrap(); + let conn = prepare_sqlite_env().await.unwrap(); for i in 1..3 { let id = format!("test_id_{}", i); - let provider = SqliteDriver { pool: pool.clone() }; + let provider = SqliteDriver { conn: conn.clone() }; let (sender, _) = watch::channel(false); let sql_election_client: Arc = Arc::new(SqlBackendElectionClient { id,