Skip to content

Commit

Permalink
refine code
Browse files Browse the repository at this point in the history
Signed-off-by: Shanicky Chen <[email protected]>
  • Loading branch information
shanicky committed Oct 23, 2023
1 parent 8b8c14c commit 67d676a
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 148 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ sea-orm = { version = "0.12.0", features = [
] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.7", features = [
"runtime-tokio",
"postgres",
"mysql",
"sqlite",
"chrono",
] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand Down
7 changes: 0 additions & 7 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use risingwave_common::error::BoxedError;
use risingwave_connector::sink::SinkError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
use sqlx::Error;

use crate::hummock::error::Error as HummockError;
use crate::manager::WorkerId;
Expand Down Expand Up @@ -181,12 +180,6 @@ impl From<etcd_client::Error> for MetaError {
}
}

impl From<sqlx::Error> for MetaError {
fn from(value: Error) -> Self {
MetaErrorInner::Election(value.to_string()).into()
}
}

impl From<RpcError> for MetaError {
fn from(e: RpcError) -> Self {
MetaErrorInner::RpcError(e).into()
Expand Down
146 changes: 64 additions & 82 deletions src/meta/src/rpc/election/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<T: SqlDriver> SqlBackendElectionClient<T> {
}
}

#[derive(sqlx::FromRow, Debug, FromQueryResult)]
#[derive(Debug, FromQueryResult)]
pub struct ElectionRow {
service: String,
id: String,
Expand Down Expand Up @@ -191,16 +191,14 @@ DO
}

async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
let string = format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
table = Self::election_table_name()
);

let query_result = self
.conn
.query_one(Statement::from_sql_and_values(
DatabaseBackend::Sqlite,
string,
format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
table = Self::election_table_name()
),
vec![Value::from(service_name)],
))
.await?;
Expand Down Expand Up @@ -285,19 +283,17 @@ impl SqlDriver for MySqlDriver {
}

async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
let string = format!(
r#"INSERT INTO {table} (id, service, last_heartbeat)
VALUES(?, ?, NOW())
ON duplicate KEY
UPDATE last_heartbeat = VALUES(last_heartbeat);
"#,
table = Self::member_table_name()
);

self.conn
.execute(Statement::from_sql_and_values(
DatabaseBackend::MySql,
string,
format!(
r#"INSERT INTO {table} (id, service, last_heartbeat)
VALUES(?, ?, NOW())
ON duplicate KEY
UPDATE last_heartbeat = VALUES(last_heartbeat);
"#,
table = Self::member_table_name()
),
vec![Value::from(id), Value::from(service_name)],
))
.await?;
Expand Down Expand Up @@ -353,16 +349,14 @@ ON duplicate KEY
}

async fn leader(&self, service_name: &str) -> MetaResult<Option<ElectionRow>> {
let string = format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
table = Self::election_table_name()
);

let query_result = self
.conn
.query_one(Statement::from_sql_and_values(
DatabaseBackend::MySql,
string,
format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
table = Self::election_table_name()
),
vec![Value::from(service_name)],
))
.await?;
Expand All @@ -375,16 +369,14 @@ ON duplicate KEY
}

async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
let string = format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
table = Self::member_table_name()
);

let all = self
.conn
.query_all(Statement::from_sql_and_values(
DatabaseBackend::MySql,
string,
format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = ?;"#,
table = Self::member_table_name()
),
vec![Value::from(service_name)],
))
.await?;
Expand Down Expand Up @@ -412,16 +404,14 @@ ON duplicate KEY
))
.await?;

let string = format!(
r#"
DELETE FROM {table} WHERE service = ? AND id = ?;
"#,
table = Self::member_table_name()
);

txn.execute(Statement::from_sql_and_values(
DatabaseBackend::MySql,
string,
format!(
r#"
DELETE FROM {table} WHERE service = ? AND id = ?;
"#,
table = Self::member_table_name()
),
vec![Value::from(service_name), Value::from(id)],
))
.await?;
Expand Down Expand Up @@ -451,20 +441,18 @@ impl SqlDriver for PostgresDriver {
}

async fn update_heartbeat(&self, service_name: &str, id: &str) -> MetaResult<()> {
let string = format!(
r#"INSERT INTO {table} (id, service, last_heartbeat)
VALUES($1, $2, NOW())
ON CONFLICT (id, service)
DO
UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
"#,
table = Self::member_table_name()
);

self.conn
.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
string,
format!(
r#"INSERT INTO {table} (id, service, last_heartbeat)
VALUES($1, $2, NOW())
ON CONFLICT (id, service)
DO
UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat;
"#,
table = Self::member_table_name()
),
vec![Value::from(id), Value::from(service_name)],
))
.await?;
Expand All @@ -478,30 +466,28 @@ DO
id: &str,
ttl: i64,
) -> MetaResult<ElectionRow> {
let string = format!(
r#"INSERT INTO {table} (service, id, last_heartbeat)
VALUES ($1, $2, NOW())
ON CONFLICT (service)
DO UPDATE
SET id = CASE
WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id
ELSE {table}.id
END,
last_heartbeat = CASE
WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat
WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
ELSE {table}.last_heartbeat
END
RETURNING service, id, last_heartbeat;
"#,
table = Self::election_table_name()
);

let query_result = self
.conn
.query_one(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
string,
format!(
r#"INSERT INTO {table} (service, id, last_heartbeat)
VALUES ($1, $2, NOW())
ON CONFLICT (service)
DO UPDATE
SET id = CASE
WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.id
ELSE {table}.id
END,
last_heartbeat = CASE
WHEN {table}.last_heartbeat < NOW() - $3::INTERVAL THEN EXCLUDED.last_heartbeat
WHEN {table}.id = EXCLUDED.id THEN EXCLUDED.last_heartbeat
ELSE {table}.last_heartbeat
END
RETURNING service, id, last_heartbeat;
"#,
table = Self::election_table_name()
),
vec![
Value::from(service_name),
Value::from(id),
Expand Down Expand Up @@ -541,16 +527,14 @@ RETURNING service, id, last_heartbeat;
}

async fn candidates(&self, service_name: &str) -> MetaResult<Vec<ElectionRow>> {
let string = format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
table = Self::member_table_name()
);

let all = self
.conn
.query_all(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
string,
format!(
r#"SELECT service, id, last_heartbeat FROM {table} WHERE service = $1;"#,
table = Self::member_table_name()
),
vec![Value::from(service_name)],
))
.await?;
Expand Down Expand Up @@ -578,16 +562,14 @@ RETURNING service, id, last_heartbeat;
))
.await?;

let string = format!(
r#"
DELETE FROM {table} WHERE service = $1 AND id = $2;
"#,
table = Self::member_table_name()
);

txn.execute(Statement::from_sql_and_values(
DatabaseBackend::Postgres,
string,
format!(
r#"
DELETE FROM {table} WHERE service = $1 AND id = $2;
"#,
table = Self::member_table_name()
),
vec![Value::from(service_name), Value::from(id)],
))
.await?;
Expand Down
Loading

0 comments on commit 67d676a

Please sign in to comment.