Skip to content

Commit

Permalink
Add new packages, dependencies to Cargo.lock, and modify Cargo.toml.
Browse files Browse the repository at this point in the history
Code changes: import `EtcdElectionClient`, update import path, define `ElectionHandle` as tuple.

Added new code block for #[cfg(not(madsim))] and #[cfg(test)] module.
  • Loading branch information
shanicky committed Sep 19, 2023
1 parent ed96064 commit 58ecb0d
Show file tree
Hide file tree
Showing 9 changed files with 1,228 additions and 155 deletions.
605 changes: 482 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ risingwave_sqlparser = { workspace = true }
scopeguard = "1.2.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres", "mysql", "sqlite", "chrono"] }
sync-point = { path = "../utils/sync-point" }
thiserror = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
Expand Down
13 changes: 10 additions & 3 deletions src/meta/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::error::BoxedError;
use risingwave_connector::sink::SinkError;
use risingwave_pb::PbFieldNotFound;
use risingwave_rpc_client::error::RpcError;
use sqlx::Error;

use crate::hummock::error::Error as HummockError;
use crate::manager::WorkerId;
Expand Down Expand Up @@ -62,7 +63,7 @@ enum MetaErrorInner {
Unavailable(String),

#[error("Election failed: {0}")]
Election(etcd_client::Error),
Election(String),

#[error("Cancelled: {0}")]
Cancelled(String),
Expand All @@ -73,7 +74,7 @@ enum MetaErrorInner {
#[error("Sink error: {0}")]
Sink(SinkError),

#[error("AWS SDK error: {}", DisplayErrorContext(&**.0))]
#[error("AWS SDK error: {}", DisplayErrorContext(& * *.0))]
Aws(BoxedError),

#[error(transparent)]
Expand Down Expand Up @@ -165,7 +166,13 @@ impl From<HummockError> for MetaError {

impl From<etcd_client::Error> for MetaError {
fn from(e: etcd_client::Error) -> Self {
MetaErrorInner::Election(e).into()
MetaErrorInner::Election(e.to_string()).into()
}
}

impl From<sqlx::Error> for MetaError {
fn from(value: Error) -> Self {
MetaErrorInner::Election(value.to_string()).into()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,15 @@ use std::time::Duration;

use etcd_client::{ConnectOptions, Error, GetOptions, LeaderKey, ResignOptions};
use risingwave_common::bail;
use serde::Serialize;
use tokio::sync::watch::Receiver;
use tokio::sync::{oneshot, watch};
use tokio::time;
use tokio_stream::StreamExt;

use crate::rpc::election::{ElectionClient, ElectionMember, META_ELECTION_KEY};
use crate::storage::WrappedEtcdClient;
use crate::MetaResult;

const META_ELECTION_KEY: &str = "__meta_election_";

#[derive(Debug, Serialize)]
pub struct ElectionMember {
pub id: String,
pub is_leader: bool,
}

#[async_trait::async_trait]
pub trait ElectionClient: Send + Sync + 'static {
fn id(&self) -> MetaResult<String>;
async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>;
fn subscribe(&self) -> Receiver<bool>;
async fn leader(&self) -> MetaResult<Option<ElectionMember>>;
async fn get_members(&self) -> MetaResult<Vec<ElectionMember>>;
async fn is_leader(&self) -> bool;
}

pub struct EtcdElectionClient {
id: String,
is_leader_sender: watch::Sender<bool>,
Expand Down Expand Up @@ -367,7 +349,8 @@ mod tests {
use tokio::sync::watch::Sender;
use tokio::time;

use crate::rpc::election_client::{ElectionClient, EtcdElectionClient, META_ELECTION_KEY};
use crate::rpc::election::etcd::EtcdElectionClient;
use crate::rpc::election::{ElectionClient, META_ELECTION_KEY};

type ElectionHandle = (Sender<()>, Arc<dyn ElectionClient>);

Expand Down
38 changes: 38 additions & 0 deletions src/meta/src/rpc/election/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod etcd;
pub mod sql;

use serde::Serialize;
use tokio::sync::watch::Receiver;

use crate::MetaResult;

const META_ELECTION_KEY: &str = "__meta_election_";

#[derive(Debug, Serialize)]
pub struct ElectionMember {
pub id: String,
pub is_leader: bool,
}

#[async_trait::async_trait]
pub trait ElectionClient: Send + Sync + 'static {
fn id(&self) -> MetaResult<String>;
async fn run_once(&self, ttl: i64, stop: Receiver<()>) -> MetaResult<()>;
fn subscribe(&self) -> Receiver<bool>;
async fn leader(&self) -> MetaResult<Option<ElectionMember>>;
async fn get_members(&self) -> MetaResult<Vec<ElectionMember>>;
async fn is_leader(&self) -> bool;
}
Loading

0 comments on commit 58ecb0d

Please sign in to comment.