diff --git a/Cargo.lock b/Cargo.lock index b2875296b683a..b7e9b6c45ec06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7688,7 +7688,6 @@ dependencies = [ "sea-orm", "serde", "serde_json", - "sqlx", "sync-point", "thiserror", "tokio-retry", @@ -10978,7 +10977,6 @@ dependencies = [ "futures-util", "hashbrown 0.12.3", "hashbrown 0.14.0", - "heck 0.4.1", "hyper", "indexmap 1.9.3", "itertools 0.10.5", diff --git a/src/meta/Cargo.toml b/src/meta/Cargo.toml index 67e9a95026cc7..3e96dfcc7be2f 100644 --- a/src/meta/Cargo.toml +++ b/src/meta/Cargo.toml @@ -64,13 +64,6 @@ sea-orm = { version = "0.12.0", features = [ ] } serde = { version = "1", features = ["derive"] } serde_json = "1" -sqlx = { version = "0.7", features = [ - "runtime-tokio", - "postgres", - "mysql", - "sqlite", - "chrono", -] } sync-point = { path = "../utils/sync-point" } thiserror = "1" tokio = { version = "0.2", package = "madsim-tokio", features = [ diff --git a/src/meta/src/error.rs b/src/meta/src/error.rs index f8332819a4610..03323d53fa0af 100644 --- a/src/meta/src/error.rs +++ b/src/meta/src/error.rs @@ -20,7 +20,6 @@ use risingwave_common::error::BoxedError; use risingwave_connector::sink::SinkError; use risingwave_pb::PbFieldNotFound; use risingwave_rpc_client::error::RpcError; -use sqlx::Error; use crate::hummock::error::Error as HummockError; use crate::manager::WorkerId; @@ -181,12 +180,6 @@ impl From for MetaError { } } -impl From for MetaError { - fn from(value: Error) -> Self { - MetaErrorInner::Election(value.to_string()).into() - } -} - impl From for MetaError { fn from(e: RpcError) -> Self { MetaErrorInner::RpcError(e).into() diff --git a/src/meta/src/rpc/election/sql.rs b/src/meta/src/rpc/election/sql.rs index fc985bd9a4521..a027e8bffdfd1 100644 --- a/src/meta/src/rpc/election/sql.rs +++ b/src/meta/src/rpc/election/sql.rs @@ -44,7 +44,7 @@ impl SqlBackendElectionClient { } } -#[derive(sqlx::FromRow, Debug, FromQueryResult)] +#[derive(Debug, FromQueryResult)] pub struct ElectionRow { service: String, id: String, @@ -191,16 +191,14 @@ DO } async fn leader(&self, service_name: &str) -> MetaResult> { - let string = format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = Self::election_table_name() - ); - let query_result = self .conn .query_one(Statement::from_sql_and_values( DatabaseBackend::Sqlite, - string, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = Self::election_table_name() + ), vec![Value::from(service_name)], )) .await?; @@ -285,19 +283,17 @@ impl SqlDriver for MySqlDriver { } async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { - let string = format!( - r#"INSERT INTO {table} (id, service, last_heartbeat) -VALUES(?, ?, NOW()) -ON duplicate KEY - UPDATE last_heartbeat = VALUES(last_heartbeat); -"#, - table = Self::member_table_name() - ); - self.conn .execute(Statement::from_sql_and_values( DatabaseBackend::MySql, - string, + format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) + VALUES(?, ?, NOW()) + ON duplicate KEY + UPDATE last_heartbeat = VALUES(last_heartbeat); + "#, + table = Self::member_table_name() + ), vec![Value::from(id), Value::from(service_name)], )) .await?; @@ -353,16 +349,14 @@ ON duplicate KEY } async fn leader(&self, service_name: &str) -> MetaResult> { - let string = format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = Self::election_table_name() - ); - let query_result = self .conn .query_one(Statement::from_sql_and_values( DatabaseBackend::MySql, - string, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = Self::election_table_name() + ), vec![Value::from(service_name)], )) .await?; @@ -375,16 +369,14 @@ ON duplicate KEY } async fn candidates(&self, service_name: &str) -> MetaResult> { - let string = format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, - table = Self::member_table_name() - ); - let all = self .conn .query_all(Statement::from_sql_and_values( DatabaseBackend::MySql, - string, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#, + table = Self::member_table_name() + ), vec![Value::from(service_name)], )) .await?; @@ -412,16 +404,14 @@ ON duplicate KEY )) .await?; - let string = format!( - r#" - DELETE FROM {table} WHERE service = ? AND id = ?; - "#, - table = Self::member_table_name() - ); - txn.execute(Statement::from_sql_and_values( DatabaseBackend::MySql, - string, + format!( + r#" + DELETE FROM {table} WHERE service = ? AND id = ?; + "#, + table = Self::member_table_name() + ), vec![Value::from(service_name), Value::from(id)], )) .await?; @@ -451,20 +441,18 @@ impl SqlDriver for PostgresDriver { } async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> { - let string = format!( - r#"INSERT INTO {table} (id, service, last_heartbeat) -VALUES($1, $2, NOW()) -ON CONFLICT (id, service) -DO - UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; -"#, - table = Self::member_table_name() - ); - self.conn .execute(Statement::from_sql_and_values( DatabaseBackend::Postgres, - string, + format!( + r#"INSERT INTO {table} (id, service, last_heartbeat) + VALUES($1, $2, NOW()) + ON CONFLICT (id, service) + DO + UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat; + "#, + table = Self::member_table_name() + ), vec![Value::from(id), Value::from(service_name)], )) .await?; @@ -478,30 +466,28 @@ DO id: &str, ttl: i64, ) -> MetaResult { - let string = format!( - r#"INSERT INTO {table} (service, id, last_heartbeat) -VALUES ($1, $2, NOW()) -ON CONFLICT (service) - DO UPDATE - SET id = CASE - WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id - ELSE {table}.id - END, - last_heartbeat = CASE - WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat - WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat - ELSE {table}.last_heartbeat - END -RETURNING service, id, last_heartbeat; -"#, - table = Self::election_table_name() - ); - let query_result = self .conn .query_one(Statement::from_sql_and_values( DatabaseBackend::Postgres, - string, + format!( + r#"INSERT INTO {table} (service, id, last_heartbeat) + VALUES ($1, $2, NOW()) + ON CONFLICT (service) + DO UPDATE + SET id = CASE + WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id + ELSE {table}.id + END, + last_heartbeat = CASE + WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat + WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat + ELSE {table}.last_heartbeat + END + RETURNING service, id, last_heartbeat; + "#, + table = Self::election_table_name() + ), vec![ Value::from(service_name), Value::from(id), @@ -541,16 +527,14 @@ RETURNING service, id, last_heartbeat; } async fn candidates(&self, service_name: &str) -> MetaResult> { - let string = format!( - r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, - table = Self::member_table_name() - ); - let all = self .conn .query_all(Statement::from_sql_and_values( DatabaseBackend::Postgres, - string, + format!( + r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#, + table = Self::member_table_name() + ), vec![Value::from(service_name)], )) .await?; @@ -578,16 +562,14 @@ RETURNING service, id, last_heartbeat; )) .await?; - let string = format!( - r#" - DELETE FROM {table} WHERE service = $1 AND id = $2; - "#, - table = Self::member_table_name() - ); - txn.execute(Statement::from_sql_and_values( DatabaseBackend::Postgres, - string, + format!( + r#" + DELETE FROM {table} WHERE service = $1 AND id = $2; + "#, + table = Self::member_table_name() + ), vec![Value::from(service_name), Value::from(id)], )) .await?; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 2ae671ca2de93..6c08e08490f7d 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -53,7 +53,6 @@ futures-task = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } hashbrown-5ef9efb8ec2df382 = { package = "hashbrown", version = "0.12", features = ["nightly", "raw"] } -heck = { version = "0.4", features = ["unicode"] } hyper = { version = "0.14", features = ["full"] } indexmap = { version = "1", default-features = false, features = ["serde", "std"] } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } @@ -113,11 +112,11 @@ sha1 = { version = "0.10" } sha2 = { version = "0.10" } signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] } smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } -sqlx = { version = "0.7", features = ["bigdecimal", "chrono", "mysql", "postgres", "runtime-tokio-native-tls", "rust_decimal", "sqlite", "time", "uuid"] } -sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-mysql = { version = "0.7", default-features = false, features = ["any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-postgres = { version = "0.7", default-features = false, features = ["any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline", "time", "uuid"] } +sqlx = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "mysql", "postgres", "runtime-tokio-native-tls", "rust_decimal", "sqlite", "time", "uuid"] } +sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } +sqlx-mysql = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } +sqlx-postgres = { version = "0.7", default-features = false, features = ["bigdecimal", "chrono", "json", "rust_decimal", "time", "uuid"] } +sqlx-sqlite = { version = "0.7", default-features = false, features = ["chrono", "json", "time", "uuid"] } strum = { version = "0.25", features = ["derive"] } subtle = { version = "2" } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } @@ -145,47 +144,26 @@ ahash = { version = "0.8" } allocator-api2 = { version = "0.2", default-features = false, features = ["alloc", "nightly"] } anyhow = { version = "1", features = ["backtrace"] } auto_enums = { version = "0.8", features = ["futures03"] } -base64 = { version = "0.21", features = ["alloc"] } bitflags = { version = "2", default-features = false, features = ["serde", "std"] } -byteorder = { version = "1" } bytes = { version = "1", features = ["serde"] } cc = { version = "1", default-features = false, features = ["parallel"] } -chrono = { version = "0.4", features = ["serde"] } -crossbeam-queue = { version = "0.3" } -crossbeam-utils = { version = "0.8" } deranged = { version = "0.3", default-features = false, features = ["serde", "std"] } -digest = { version = "0.10", features = ["mac", "oid", "std"] } either = { version = "1", features = ["serde"] } fixedbitset = { version = "0.4" } frunk_core = { version = "0.4", default-features = false, features = ["std"] } -futures-channel = { version = "0.3", features = ["sink"] } -futures-core = { version = "0.3" } -futures-io = { version = "0.3" } -futures-sink = { version = "0.3" } -futures-task = { version = "0.3" } -futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hashbrown-582f2526e08bb6a0 = { package = "hashbrown", version = "0.14", features = ["nightly", "raw"] } -heck = { version = "0.4", features = ["unicode"] } itertools-93f6ce9d446188ac = { package = "itertools", version = "0.10" } itertools-a6292c17cd707f01 = { package = "itertools", version = "0.11" } lazy_static = { version = "1", default-features = false, features = ["spin_no_std"] } libc = { version = "0.2", features = ["extra_traits"] } -lock_api = { version = "0.4", features = ["arc_lock"] } log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] } -madsim-tokio = { version = "0.2", default-features = false, features = ["fs", "io-util", "macros", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time", "tracing"] } -md-5 = { version = "0.10" } -mio = { version = "0.8", features = ["net", "os-ext"] } nom = { version = "7" } num-bigint = { version = "0.4" } num-integer = { version = "0.1", features = ["i128"] } -num-iter = { version = "0.1", default-features = false, features = ["i128", "std"] } num-traits = { version = "0.2", features = ["i128", "libm"] } -parking_lot = { version = "0.12", features = ["arc_lock", "deadlock_detection"] } -parking_lot_core = { version = "0.9", default-features = false, features = ["deadlock_detection"] } petgraph = { version = "0.6" } phf = { version = "0.11", features = ["uncased"] } phf_shared = { version = "0.11", features = ["uncased"] } -postgres-types = { version = "0.2", default-features = false, features = ["derive", "with-chrono-0_4", "with-serde_json-1"] } proc-macro2 = { version = "1", features = ["span-locations"] } prost-5ef9efb8ec2df382 = { package = "prost", version = "0.12", features = ["no-recursion-limit"] } prost-a6292c17cd707f01 = { package = "prost", version = "0.11" } @@ -196,36 +174,13 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa-backtrack", "perf-inline", "perf-literal", "unicode"] } regex-syntax = { version = "0.8" } -rust_decimal = { version = "1", features = ["db-postgres", "maths"] } -scopeguard = { version = "1" } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_json = { version = "1", features = ["alloc", "raw_value"] } -sha1 = { version = "0.10" } -sha2 = { version = "0.10" } -signature = { version = "2", default-features = false, features = ["digest", "rand_core", "std"] } -smallvec = { version = "1", default-features = false, features = ["serde", "union", "write"] } -sqlx-core = { version = "0.7", features = ["_rt-tokio", "_tls-native-tls", "any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-mysql = { version = "0.7", default-features = false, features = ["any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-postgres = { version = "0.7", default-features = false, features = ["any", "bigdecimal", "chrono", "json", "migrate", "offline", "rust_decimal", "time", "uuid"] } -sqlx-sqlite = { version = "0.7", default-features = false, features = ["any", "chrono", "json", "migrate", "offline", "time", "uuid"] } -subtle = { version = "2" } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit", "visit-mut"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "fold", "full", "visit", "visit-mut"] } time = { version = "0.3", features = ["local-offset", "macros", "serde-well-known"] } time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] } -tinyvec = { version = "1", features = ["alloc", "grab_spare_slice", "rustc_1_55"] } -tokio = { version = "1", features = ["full", "stats", "tracing"] } -tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "ac00d88", features = ["with-chrono-0_4"] } -tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "fe39bb8e", features = ["fs", "net"] } -tokio-util = { version = "0.7", features = ["codec", "io"] } toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } toml_edit = { version = "0.19", features = ["serde"] } -tracing = { version = "0.1", features = ["log"] } -tracing-core = { version = "0.1" } -unicode-bidi = { version = "0.3" } -unicode-normalization = { version = "0.1" } -url = { version = "2", features = ["serde"] } -uuid = { version = "1", features = ["fast-rng", "serde", "v4"] } -whoami = { version = "1" } ### END HAKARI SECTION