diff --git a/Cargo.lock b/Cargo.lock index 6cb0662b32155..1a1309ad7ecf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -517,6 +517,15 @@ dependencies = [ "syn 2.0.33", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic" version = "0.5.3" @@ -1186,6 +1195,9 @@ name = "bitflags" version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4682ae6287fcf752ecaabbfcc7b6f9b72aa33933dc23a554d853aea8eea8635" +dependencies = [ + "serde", +] [[package]] name = "bitmaps" @@ -2422,6 +2434,12 @@ dependencies = [ "const-random", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -2510,6 +2528,9 @@ name = "either" version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" +dependencies = [ + "serde", +] [[package]] name = "encode_unicode" @@ -2635,6 +2656,17 @@ dependencies = [ "tower-service", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "ethnum" version = "1.4.0" @@ -2789,6 +2821,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.10.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1657b4441c3403d9f7b3409e47575237dac27b1b5726df654a6ecbf92f0f7577" +dependencies = [ + "futures-core", + "futures-sink", + "pin-project", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3080,6 +3124,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.1", +] + [[package]] name = "futures-io" version = "0.3.28" @@ -3383,6 +3438,15 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "hashlink" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +dependencies = [ + "hashbrown 0.14.0", +] + [[package]] name = "hdrhistogram" version = "7.5.2" @@ -3411,6 +3475,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "hermit-abi" @@ -3424,6 +3491,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hkdf" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "791a029f6b9fc27657f6f188ec6e5e43f6911f6f878e0dc5501396e09809d437" +dependencies = [ + "hmac", +] + [[package]] name = "hmac" version = "0.12.1" @@ -4032,6 +4108,17 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7012b1bbb0719e1097c47611d3898568c546d597c2e74d66f6087edd5233ff4" +[[package]] +name = "libsqlite3-sys" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +dependencies = [ + "cc", + "pkg-config", + "vcpkg", +] + [[package]] name = "libtest-mimic" version = "0.6.1" @@ -7245,6 +7332,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", + "sqlx", "static_assertions", "sync-point", "tempfile", @@ -8632,6 +8720,17 @@ dependencies = [ "der 0.7.8", ] +[[package]] +name = "sqlformat" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b7b278788e7be4d0d29c0f39497a0eef3fba6bbc8e70d8bf7fde46edeaa9e85" +dependencies = [ + "itertools 0.11.0", + "nom", + "unicode_categories", +] + [[package]] name = "sqllogictest" version = "0.15.3" @@ -8655,6 +8754,205 @@ dependencies = [ "tracing", ] +[[package]] +name = "sqlx" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e58421b6bc416714d5115a2ca953718f6c621a51b68e4f4922aea5a4391a721" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", +] + +[[package]] +name = "sqlx-core" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd4cef4251aabbae751a3710927945901ee1d97ee96d757f6880ebb9a79bfd53" +dependencies = [ + "ahash 0.8.3", + "atoi", + "byteorder", + "bytes", + "chrono", + "crc", + "crossbeam-queue", + "dotenvy", + "either", + "event-listener", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink", + "hex", + "indexmap 2.0.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.7", + "smallvec", + "sqlformat", + "thiserror", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "208e3165167afd7f3881b16c1ef3f2af69fa75980897aac8874a0696516d12c2" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a4a8336d278c62231d87f24e8a7a74898156e34c1c18942857be2acb29c7dfc" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.1", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.7", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ca69bf415b93b60b80dc8fda3cb4ef52b2336614d8da2de5456cc942a110482" +dependencies = [ + "atoi", + "base64 0.21.3", + "bitflags 2.4.0", + "byteorder", + "bytes", + "chrono", + "crc", + "digest 0.10.7", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand", + "rsa", + "serde", + "sha1", + "sha2 0.10.7", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0db2df1b8731c3651e204629dd55e52adbae0462fa1bdcbed56a2302c18181e" +dependencies = [ + "atoi", + "base64 0.21.3", + "bitflags 2.4.0", + "byteorder", + "chrono", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand", + "serde", + "serde_json", + "sha1", + "sha2 0.10.7", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-sqlite" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4c21bf34c7cae5b283efb3ac1bcc7670df7561124dc2f8bdc0b59be40f79a2" +dependencies = [ + "atoi", + "chrono", + "flume", + "futures-channel", + "futures-core", + "futures-executor", + "futures-intrusive", + "futures-util", + "libsqlite3-sys", + "log", + "percent-encoding", + "serde", + "sqlx-core", + "tracing", + "url", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -9571,6 +9869,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0edd1e5b14653f783770bce4a4dabb4a5108a5370a5f5d8cfe8710c361f6c8b" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "unsafe-libyaml" version = "0.2.9" @@ -9837,6 +10141,12 @@ dependencies = [ "rustix 0.38.11", ] +[[package]] +name = "whoami" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" + [[package]] name = "winapi" version = "0.3.9" @@ -10044,12 +10354,14 @@ name = "workspace-hack" version = "1.3.0-alpha" dependencies = [ "ahash 0.8.3", + "allocator-api2", "anyhow", "auto_enums", "aws-credential-types", "aws-sdk-s3", "aws-smithy-client", "base64 0.21.3", + "base64ct", "bit-vec", "bitflags 2.4.0", "byteorder", @@ -10060,8 +10372,10 @@ dependencies = [ "clap_builder", "combine", "crossbeam-epoch", + "crossbeam-queue", "crossbeam-utils", "deranged", + "digest 0.10.7", "either", "fail", "fixedbitset", @@ -10076,10 +10390,12 @@ dependencies = [ "futures-util", "hashbrown 0.12.3", "hashbrown 0.14.0", + "heck 0.4.1", "hyper", "indexmap 1.9.3", "itertools 0.10.5", "jni", + "lazy_static", "lexical-core", "lexical-parse-float", "lexical-parse-integer", @@ -10091,11 +10407,13 @@ dependencies = [ "log", "madsim-rdkafka", "madsim-tokio", + "md-5", "mio", "multimap", "nom", "num-bigint", "num-integer", + "num-iter", "num-traits", "opentelemetry_api", "opentelemetry_sdk", @@ -10121,7 +10439,12 @@ dependencies = [ "serde", "serde_json", "serde_with 3.3.0", + "sha1", "smallvec", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "sqlx-sqlite", "subtle", "syn 1.0.109", "syn 2.0.33", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index d8401cc5a7f71..4fd74e8efae0e 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -56,6 +56,7 @@ risingwave_sqlparser = { workspace = true } scopeguard = "1.2.0" serde = { version = "1", features = ["derive"] } serde_json = "1" +sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "mysql", "sqlite", "chrono"] } sync-point = { path = "../utils/sync-point" } thiserror = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index b6c86b3f1eec0..b056fea0e35e7 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -20,6 +20,7 @@ use risingwave_common::error::BoxedError; use risingwave_connector::sink::SinkError; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::RpcError; +use sqlx::Error; use crate::hummock::error::Error as HummockError; use crate::manager::WorkerId; @@ -62,7 +63,7 @@ enum MetaErrorInner { Unavailable(String), #[error("Election failed: {0}")] - Election(etcd_client::Error), + Election(String), #[error("Cancelled: {0}")] Cancelled(String), @@ -73,7 +74,7 @@ enum MetaErrorInner { #[error("Sink error: {0}")] Sink(SinkError), - #[error("AWS SDK error: {}", DisplayErrorContext(&**.0))] + #[error("AWS SDK error: {}", DisplayErrorContext(& * *.0))] Aws(BoxedError), #[error(transparent)] @@ -165,7 +166,13 @@ impl From for MetaError { impl From for MetaError { fn from(e: etcd_client::Error) -> Self { - MetaErrorInner::Election(e).into() + MetaErrorInner::Election(e.to_string()).into() + } +} + +impl From for MetaError { + fn from(value: Error) -> Self { + MetaErrorInner::Election(value.to_string()).into() } } diff --git a/src/meta/src/rpc/election_client.rs b/src/meta/src/rpc/election/etcd.rs similarity index 96% rename from src/meta/src/rpc/election_client.rs rename to src/meta/src/rpc/election/etcd.rs index fdc9f9b3bc2dd..78e066bc49669 100644 --- a/src/meta/src/rpc/election_client.rs +++ b/src/meta/src/rpc/election/etcd.rs @@ -18,33 +18,15 @@ use std::time::Duration; use etcd_client::{ConnectOptions, Error, GetOptions, LeaderKey, ResignOptions}; use risingwave_common::bail; -use serde::Serialize; use tokio::sync::watch::Receiver; use tokio::sync::{oneshot, watch}; use tokio::time; use tokio_stream::StreamExt; +use crate::rpc::election::{ElectionClient, ElectionMember, META_ELECTION_KEY}; use crate::storage::WrappedEtcdClient; use crate::MetaResult; -const META_ELECTION_KEY: &str = "__meta_election_"; - -#[derive(Debug, Serialize)] -pub struct ElectionMember { - pub id: String, - pub is_leader: bool, -} - -#[async_trait::async_trait] -pub trait ElectionClient: Send + Sync + 'static { - fn id(&self) -> MetaResult; - async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; - fn subscribe(&self) -> Receiver; - async fn leader(&self) -> MetaResult>; - async fn get_members(&self) -> MetaResult>; - async fn is_leader(&self) -> bool; -} - pub struct EtcdElectionClient { id: String, is_leader_sender: watch::Sender, diff --git a/src/meta/src/rpc/election/mod.rs b/src/meta/src/rpc/election/mod.rs new file mode 100644 index 0000000000000..9835c554b3fd3 --- /dev/null +++ b/src/meta/src/rpc/election/mod.rs @@ -0,0 +1,38 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +pub mod etcd; +pub mod sql; + +use serde::Serialize; +use tokio::sync::watch::Receiver; + +use crate::MetaResult; + +const META_ELECTION_KEY: &str = "__meta_election_"; + +#[derive(Debug, Serialize)] +pub struct ElectionMember { + pub id: String, + pub is_leader: bool, +} + +#[async_trait::async_trait] +pub trait ElectionClient: Send + Sync + 'static { + fn id(&self) -> MetaResult; + async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>; + fn subscribe(&self) -> Receiver; + async fn leader(&self) -> MetaResult>; + async fn get_members(&self) -> MetaResult>; + async fn is_leader(&self) -> bool; +} diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs new file mode 100644 index 0000000000000..6567857d857ec --- /dev/null +++ b/src/meta/src/rpc/election/sql.rs @@ -0,0 +1,541 @@ +// Copyright 2023 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; +use std::time::Duration; + +use sqlx::{MySql, MySqlPool, PgPool, Postgres, Sqlite, SqlitePool}; +use tokio::sync::watch; +use tokio::sync::watch::Receiver; +use tokio::time; + +use crate::rpc::election::META_ELECTION_KEY; +use crate::{ElectionClient, ElectionMember, MetaResult}; + +pub struct SqlBackendElectionClient { + id: String, + driver: Arc, + is_leader_sender: watch::Sender, +} + +#[derive(sqlx::FromRow, Debug)] +pub(crate) struct ElectionRow { + service: String, + id: String, +} + +#[async_trait::async_trait] +pub(crate) trait SqlDriver: Send + Sync + 'static { + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()>; + + async fn try_campaign(&self, service_name: &str, id: &str, ttl: i64) + -> MetaResult; + async fn leader(&self, service_name: &str) -> MetaResult>; + + async fn candidates(&self, service_name: &str) -> MetaResult>; + + async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()>; +} + +pub(crate) trait SqlDriverCommon { + const ELECTION_LEADER_TABLE_NAME: &'static str = "election_leader"; + const ELECTION_MEMBER_TABLE_NAME: &'static str = "election_members"; + + fn election_table_name(&self) -> &'static str { + Self::ELECTION_LEADER_TABLE_NAME + } + fn member_table_name(&self) -> &'static str { + Self::ELECTION_MEMBER_TABLE_NAME + } +} + +impl SqlDriverCommon for MySqlDriver {} + +impl SqlDriverCommon for PostgresDriver {} + +impl SqlDriverCommon for SqliteDriver {} + +pub struct MySqlDriver { + pool: MySqlPool, +} + +pub struct PostgresDriver { + pool: PgPool, +} + +pub struct SqliteDriver { + pool: SqlitePool, +} + +#[async_trait::async_trait] +impl SqlDriver for SqliteDriver { + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { + sqlx::query(&format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) +VALUES($1, $2, CURRENT_TIMESTAMP) +ON CONFLICT (id, service) +DO + UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; +"#, + table = self.member_table_name() + )) + .bind(id) + .bind(service_name) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn try_campaign( + &self, + service_name: &str, + id: &str, + ttl: i64, + ) -> MetaResult { + let row = sqlx::query_as::(&format!( + r#"INSERT INTO {table} (service, id, last_heartbeat) +VALUES ($1, $2, CURRENT_TIMESTAMP) +ON CONFLICT (service) + DO UPDATE + SET id = CASE + WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' second') < CURRENT_TIMESTAMP THEN EXCLUDED.id + ELSE {table}.id + END, + last_heartbeat = CASE + WHEN DATETIME({table}.last_heartbeat, '+' || $3 || ' seconds') < CURRENT_TIMESTAMP THEN EXCLUDED.last_heartbeat + WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat + ELSE {table}.last_heartbeat + END +RETURNING service, id, last_heartbeat; +"#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .bind(ttl) + .fetch_one(&self.pool) + .await?; + + Ok(row) + } + + async fn leader(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::<_, ElectionRow>(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = self.election_table_name() + )) + .bind(service_name) + .fetch_optional(&self.pool) + .await?; + + Ok(row) + } + + async fn candidates(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::<_, ElectionRow>(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = self.member_table_name() + )) + .bind(service_name) + .fetch_all(&self.pool) + .await?; + + Ok(row) + } + + async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { + let mut txn = self.pool.begin().await?; + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = self.member_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +impl SqlDriver for MySqlDriver { + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { + sqlx::query(&format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) +VALUES(?, ?, NOW()) +ON duplicate KEY + UPDATE last_heartbeat = VALUES(last_heartbeat); +"#, + table = self.member_table_name() + )) + .bind(id) + .bind(service_name) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn try_campaign( + &self, + service_name: &str, + id: &str, + ttl: i64, + ) -> MetaResult { + let row = sqlx::query::(&format!( + r#"INSERT + IGNORE +INTO {table} (service, id, last_heartbeat) +VALUES (?, ?, NOW()) +ON duplicate KEY + UPDATE id = if(last_heartbeat < NOW() - INTERVAL ? SECOND, + VALUES(id), id), + last_heartbeat = if(id = + VALUES(id), + VALUES(last_heartbeat), last_heartbeat);"#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .bind(ttl) + .execute(&self.pool) + .await?; + + println!("row {:?}", row); + + let row = sqlx::query_as::(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = self.election_table_name(), + )) + .bind(service_name) + .fetch_one(&self.pool) + .await?; + + Ok(row) + } + + async fn leader(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = self.election_table_name() + )) + .bind(service_name) + .fetch_optional(&self.pool) + .await?; + + Ok(row) + } + + async fn candidates(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = self.member_table_name() + )) + .bind(service_name) + .fetch_all(&self.pool) + .await?; + + Ok(row) + } + + async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { + let mut txn = self.pool.begin().await?; + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = ? AND id = ?; + "#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = ? AND id = ?; + "#, + table = self.member_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +impl SqlDriver for PostgresDriver { + async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { + sqlx::query(&format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) +VALUES($1, $2, NOW()) +ON CONFLICT (id, service) +DO + UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; +"#, + table = self.member_table_name() + )) + .bind(id) + .bind(service_name) + .execute(&self.pool) + .await?; + + Ok(()) + } + + async fn try_campaign( + &self, + service_name: &str, + id: &str, + ttl: i64, + ) -> MetaResult { + let row = sqlx::query_as::(&format!( + r#"INSERT INTO {table} (service, id, last_heartbeat) +VALUES ($1, $2, NOW()) +ON CONFLICT (service) + DO UPDATE + SET id = CASE + WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id + ELSE {table}.id + END, + last_heartbeat = CASE + WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat + WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat + ELSE {table}.last_heartbeat + END +RETURNING service, id, last_heartbeat; +"#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .bind(Duration::from_secs(ttl as u64)) + .fetch_one(&self.pool) + .await?; + + println!("row {:?}", row); + + Ok(row) + } + + async fn leader(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = self.election_table_name() + )) + .bind(service_name) + .fetch_optional(&self.pool) + .await?; + + Ok(row) + } + + async fn candidates(&self, service_name: &str) -> MetaResult> { + let row = sqlx::query_as::(&format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = self.member_table_name() + )) + .bind(service_name) + .fetch_all(&self.pool) + .await?; + + Ok(row) + } + + async fn resign(&self, service_name: &str, id: &str) -> MetaResult<()> { + let mut txn = self.pool.begin().await?; + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = self.election_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + sqlx::query(&format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = self.member_table_name() + )) + .bind(service_name) + .bind(id) + .execute(&mut *txn) + .await?; + + txn.commit().await?; + + Ok(()) + } +} + +#[async_trait::async_trait] +impl ElectionClient for SqlBackendElectionClient +where + T: SqlDriver + Send + Sync + 'static, +{ + fn id(&self) -> MetaResult { + Ok(self.id.clone()) + } + + async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()> { + let stop = stop.clone(); + + let mut election_ticker = tokio::time::interval(Duration::from_secs(1)); + + let member_refresh_driver = self.driver.clone(); + + let id = self.id.clone(); + + let mut member_refresh_stop = stop.clone(); + + let handle = tokio::spawn(async move { + let mut ticker = tokio::time::interval(Duration::from_secs(1)); + + loop { + tokio::select! { + _ = ticker.tick() => { + if let Err(e) = member_refresh_driver + .update_heartbeat(META_ELECTION_KEY, id.as_str()) + .await { + + tracing::debug!("keep alive for member {} failed {}", id, e); + continue + } + } + _ = member_refresh_stop.changed() => { + return; + } + } + } + }); + + let _guard = scopeguard::guard(handle, |handle| handle.abort()); + + self.is_leader_sender.send_replace(false); + + let mut timeout_ticker = time::interval(Duration::from_secs_f64(ttl as f64 / 2.0)); + timeout_ticker.reset(); + let mut stop = stop.clone(); + + let mut is_leader = false; + + loop { + tokio::select! { + _ = election_ticker.tick() => { + let election_row = self + .driver + .try_campaign(META_ELECTION_KEY, self.id.as_str(), ttl) + .await?; + + assert_eq!(election_row.service, META_ELECTION_KEY); + + if election_row.id.eq_ignore_ascii_case(self.id.as_str()) { + if !is_leader{ + self.is_leader_sender.send_replace(true); + is_leader = true; + } + } else if is_leader { + tracing::warn!("leader has been changed to {}", election_row.id); + break; + + } + + timeout_ticker.reset(); + } + _ = timeout_ticker.tick() => { + tracing::error!("member {} election timeout", self.id); + break; + } + _ = stop.changed() => { + tracing::info!("stop signal received when observing"); + + if is_leader { + tracing::info!("leader {} resigning", self.id); + if let Err(e) = self.driver.resign(META_ELECTION_KEY, self.id.as_str()).await { + tracing::warn!("resign failed {}", e); + } + } + + return Ok(()); + } + } + } + + self.is_leader_sender.send_replace(false); + + return Ok(()); + } + + fn subscribe(&self) -> Receiver { + self.is_leader_sender.subscribe() + } + + async fn leader(&self) -> MetaResult> { + let row = self.driver.leader(META_ELECTION_KEY).await?; + Ok(row.map(|row| ElectionMember { + id: row.id, + is_leader: true, + })) + } + + async fn get_members(&self) -> MetaResult> { + let leader = self.leader().await?; + let members = self.driver.candidates(META_ELECTION_KEY).await?; + + Ok(members + .into_iter() + .map(|row| { + let is_leader = leader + .as_ref() + .map(|leader| leader.id.eq_ignore_ascii_case(row.id.as_str())) + .unwrap_or(false); + + ElectionMember { + id: row.id, + is_leader, + } + }) + .collect()) + } + + async fn is_leader(&self) -> bool { + *self.is_leader_sender.borrow() + } +} diff --git a/src/meta/src/rpc/mod.rs b/src/meta/src/rpc/mod.rs index 6baa080fba83b..36380c4d2dafb 100644 --- a/src/meta/src/rpc/mod.rs +++ b/src/meta/src/rpc/mod.rs @@ -14,13 +14,14 @@ mod cloud_provider; pub mod ddl_controller; -mod election_client; +pub mod election; mod intercept; pub mod metrics; pub mod server; pub mod service; -pub use election_client::{ElectionClient, ElectionMember, EtcdElectionClient}; +pub use election::etcd::EtcdElectionClient; +pub use election::{ElectionClient, ElectionMember}; pub use service::cluster_service::ClusterServiceImpl; pub use service::ddl_service::DdlServiceImpl; pub use service::heartbeat_service::HeartbeatServiceImpl; diff --git a/src/meta/src/rpc/server.rs b/src/meta/src/rpc/server.rs index cb64adc13dae9..6c541692b224f 100644 --- a/src/meta/src/rpc/server.rs +++ b/src/meta/src/rpc/server.rs @@ -64,7 +64,8 @@ use crate::manager::{ SystemParamsManager, }; use crate::rpc::cloud_provider::AwsEc2Client; -use crate::rpc::election_client::{ElectionClient, EtcdElectionClient}; +use crate::rpc::election::etcd::EtcdElectionClient; +use crate::rpc::election::ElectionClient; use crate::rpc::metrics::{ start_fragment_info_monitor, start_worker_info_monitor, GLOBAL_META_METRICS, }; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 603619f3a8b27..83b227688e618 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -19,13 +19,15 @@ publish = false ### BEGIN HAKARI SECTION [dependencies] ahash = { version = "0.8" } +allocator-api2 = { version = "0.2", default-features = false, features = ["alloc", "nightly"] } anyhow = { version = "1", features = ["backtrace"] } aws-credential-types = { version = "0.55", default-features = false, features = ["hardcoded-credentials"] } aws-sdk-s3 = { version = "0.28", features = ["native-tls"] } aws-smithy-client = { version = "0.55", default-features = false, features = ["native-tls", "rustls"] } base64 = { version = "0.21", features = ["alloc"] } +base64ct = { version = "1", default-features = false, features = ["alloc"] } bit-vec = { version = "0.6" } -bitflags = { version = "2", default-features = false, features = ["std"] } +bitflags = { version = "2", default-features = false, features = ["serde", "std"] } byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } chrono = { version = "0.4", features = ["alloc", "serde"] } @@ -33,9 +35,11 @@ clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4" } crossbeam-epoch = { version = "0.9" } +crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } deranged = { version = "0.3", default-features = false, features = ["serde", "std"] } -either = { version = "1" } +digest = { version = "0.10", features = ["mac", "oid", "std"] } +either = { version = "1", features = ["serde"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } fixedbitset = { version = "0.4" } flate2 = { version = "1", features = ["zlib"] } @@ -49,10 +53,12 @@ futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } +heck = { version = "0.4", features = ["unicode"] } hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } +lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] } @@ -64,11 +70,13 @@ lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } +md-5 = { version = "0.10" } mio = { version = "0.8", features = ["net", "os-ext"] } multimap = { version = "0.8" } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } +num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm"] } opentelemetry_api = { version = "0.20", features = ["logs", "metrics"] } opentelemetry_sdk = { version = "0.20", features = ["logs", "metrics"] } @@ -91,9 +99,14 @@ ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } -serde_json = { version = "1", features = ["alloc"] } +serde_json = { version = "1", features = ["alloc", "raw_value"] } serde_with = { version = "3", features = ["json"] } +sha1 = { version = "0.10" } smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } +sqlx-core = { version = "0.7", features = ["_rt-tokio", "any", "chrono", "json", "migrate", "offline"] } +sqlx-mysql = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } +sqlx-postgres = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } +sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } subtle = { version = "2" } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } @@ -116,14 +129,16 @@ zeroize = { version = "1", features = ["zeroize_derive"] } [build-dependencies] ahash = { version = "0.8" } +allocator-api2 = { version = "0.2", default-features = false, features = ["alloc", "nightly"] } anyhow = { version = "1", features = ["backtrace"] } auto_enums = { version = "0.8", features = ["futures03"] } aws-credential-types = { version = "0.55", default-features = false, features = ["hardcoded-credentials"] } aws-sdk-s3 = { version = "0.28", features = ["native-tls"] } aws-smithy-client = { version = "0.55", default-features = false, features = ["native-tls", "rustls"] } base64 = { version = "0.21", features = ["alloc"] } +base64ct = { version = "1", default-features = false, features = ["alloc"] } bit-vec = { version = "0.6" } -bitflags = { version = "2", default-features = false, features = ["std"] } +bitflags = { version = "2", default-features = false, features = ["serde", "std"] } byteorder = { version = "1", features = ["i128"] } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } @@ -132,9 +147,11 @@ clap = { version = "4", features = ["cargo", "derive", "env"] } clap_builder = { version = "4", default-features = false, features = ["cargo", "color", "env", "help", "std", "suggestions", "usage"] } combine = { version = "4" } crossbeam-epoch = { version = "0.9" } +crossbeam-queue = { version = "0.3" } crossbeam-utils = { version = "0.8" } deranged = { version = "0.3", default-features = false, features = ["serde", "std"] } -either = { version = "1" } +digest = { version = "0.10", features = ["mac", "oid", "std"] } +either = { version = "1", features = ["serde"] } fail = { version = "0.5", default-features = false, features = ["failpoints"] } fixedbitset = { version = "0.4" } flate2 = { version = "1", features = ["zlib"] } @@ -148,10 +165,12 @@ futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } +heck = { version = "0.4", features = ["unicode"] } hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["std"] } itertools = { version = "0.10" } jni = { version = "0.21", features = ["invocation"] } +lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } lexical-core = { version = "0.8", features = ["format"] } lexical-parse-float = { version = "0.8", default-features = false, features = ["format", "std"] } lexical-parse-integer = { version = "0.8", default-features = false, features = ["format", "std"] } @@ -163,11 +182,13 @@ lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["std"] } madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "fedb1e3", features = ["cmake-build", "gssapi", "ssl-vendored", "zstd"] } madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } +md-5 = { version = "0.10" } mio = { version = "0.8", features = ["net", "os-ext"] } multimap = { version = "0.8" } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } +num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm"] } opentelemetry_api = { version = "0.20", features = ["logs", "metrics"] } opentelemetry_sdk = { version = "0.20", features = ["logs", "metrics"] } @@ -191,9 +212,14 @@ ring = { version = "0.16", features = ["std"] } rust_decimal = { version = "1", features = ["db-postgres", "maths"] } scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } -serde_json = { version = "1", features = ["alloc"] } +serde_json = { version = "1", features = ["alloc", "raw_value"] } serde_with = { version = "3", features = ["json"] } +sha1 = { version = "0.10" } smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } +sqlx-core = { version = "0.7", features = ["_rt-tokio", "any", "chrono", "json", "migrate", "offline"] } +sqlx-mysql = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } +sqlx-postgres = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } +sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline"] } subtle = { version = "2" } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] }