Skip to content

Commit

Permalink
feat: Add resign functionality to the etcd election client. (#9909)
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored May 22, 2023
1 parent c2a3a1f commit f522f68
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 88 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

209 changes: 125 additions & 84 deletions src/meta/src/rpc/election_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::time::Duration;

use etcd_client::{ConnectOptions, Error, GetOptions};
use etcd_client::{ConnectOptions, Error, GetOptions, LeaderKey, ResignOptions};
use risingwave_common::bail;
use tokio::sync::watch::Receiver;
use tokio::sync::{oneshot, watch};
Expand Down Expand Up @@ -213,25 +213,31 @@ impl ElectionClient for EtcdElectionClient {
if !restored_leader {
self.is_leader_sender.send_replace(false);
tracing::info!("no restored leader, campaigning");
tokio::select! {
biased;
}

_ = stop.changed() => {
tracing::info!("stop signal received when campaigning");
return Ok(());
}
// Even if we are already a restored leader, we still need to campaign to obtain the correct
// `LeaderKey` from campaign response, which is used to provide the parameter for the
// resign call.
let leader_key: Option<LeaderKey> = tokio::select! {
biased;

_ = keep_alive_fail_rx.borrow_mut() => {
tracing::error!("keep alive failed, stopping main loop");
bail!("keep alive failed, stopping main loop");
},
_ = stop.changed() => {
tracing::info!("stop signal received when campaigning");
return Ok(());
}

campaign_resp = self.client.campaign(META_ELECTION_KEY, self.id.as_bytes().to_vec(), lease_id) => {
campaign_resp?;
tracing::info!("client {} wins election {}", self.id, META_ELECTION_KEY);
}
};
}
_ = keep_alive_fail_rx.borrow_mut() => {
tracing::error!("keep alive failed, stopping main loop");
bail!("keep alive failed, stopping main loop");
},


campaign_resp = self.client.campaign(META_ELECTION_KEY, self.id.as_bytes().to_vec(), lease_id) => {
let campaign_resp = campaign_resp?;
tracing::info!("client {} wins election {}", self.id, META_ELECTION_KEY);
campaign_resp.leader().cloned()
}
};

self.is_leader_sender.send_replace(true);

Expand All @@ -242,6 +248,12 @@ impl ElectionClient for EtcdElectionClient {
biased;
_ = stop.changed() => {
tracing::info!("stop signal received when observing");

if let Some(leader_key) = leader_key {
tracing::info!("leader key found with lease {}, resigning", leader_key.lease());
self.client.resign(Some(ResignOptions::new().with_leader(leader_key))).await?;
}

break;
},
_ = keep_alive_fail_rx.borrow_mut() => {
Expand Down Expand Up @@ -350,70 +362,20 @@ mod tests {
use etcd_client::GetOptions;
use itertools::Itertools;
use tokio::sync::watch;
use tokio::sync::watch::Sender;
use tokio::time;

use crate::rpc::election_client::{ElectionClient, EtcdElectionClient, META_ELECTION_KEY};

type ElectionHandle = (Sender<()>, Arc<dyn ElectionClient>);

#[tokio::test]
async fn test_election() {
let handle = tokio::spawn(async move {
let addr = "0.0.0.0:2388".parse().unwrap();
let mut builder = etcd_client::SimServer::builder();
builder.serve(addr).await.unwrap();
});

let mut clients: Vec<(watch::Sender<()>, Arc<dyn ElectionClient>)> = vec![];

for i in 0..3 {
let (stop_sender, stop_receiver) = watch::channel(());
clients.push((
stop_sender,
Arc::new(
EtcdElectionClient::new(
vec!["localhost:2388".to_string()],
None,
false,
format!("client_{}", i).to_string(),
)
.await
.unwrap(),
),
));
}

for client in &clients {
assert!(!client.1.is_leader().await);
}

for (stop_sender, client) in &clients {
let client_ = client.clone();
let stop = stop_sender.subscribe();

tokio::spawn(async move {
let mut ticker = time::interval(Duration::from_secs(1));
loop {
ticker.tick().await;
if let Ok(_) = client_.run_once(5, stop.clone()).await {
break;
}
}
});
}
let clients = prepare_election_client(3, 5).await;

time::sleep(Duration::from_secs(10)).await;

let mut leaders = vec![];
let mut followers = vec![];
for (sender, client) in &clients {
if client.is_leader().await {
leaders.push((sender, client));
} else {
followers.push((sender, client));
}
}

assert_eq!(leaders.len(), 1);
assert_eq!(followers.len(), 2);
let (leaders, followers) = check_role(clients, 1, 2).await;

let leader = leaders.into_iter().next().unwrap();

Expand All @@ -422,18 +384,7 @@ mod tests {

time::sleep(Duration::from_secs(10)).await;

let mut new_leaders = vec![];
let mut new_followers = vec![];
for (sender, client) in followers {
if client.is_leader().await {
new_leaders.push((sender, client));
} else {
new_followers.push((sender, client));
}
}

assert_eq!(new_leaders.len(), 1);
assert_eq!(new_followers.len(), 1);
let (new_leaders, new_followers) = check_role(followers, 1, 1).await;

let leader = new_leaders.into_iter().next().unwrap();
let election_leader = leader.1.leader().await.unwrap().unwrap();
Expand Down Expand Up @@ -467,4 +418,94 @@ mod tests {

assert!(leader.1.is_leader().await);
}

#[tokio::test]
async fn test_resign() {
// with a long ttl
let clients = prepare_election_client(3, 1000).await;

time::sleep(Duration::from_secs(10)).await;

let (leaders, followers) = check_role(clients, 1, 2).await;

let leader = leaders.into_iter().next().unwrap();

// stop leader
leader.0.send(()).unwrap();

time::sleep(Duration::from_secs(10)).await;

check_role(followers, 1, 1).await;
}

async fn check_role(
clients: Vec<ElectionHandle>,
expected_leader_count: usize,
expected_follower_count: usize,
) -> (Vec<ElectionHandle>, Vec<ElectionHandle>) {
let mut leaders = vec![];
let mut followers = vec![];
for (sender, client) in clients {
if client.is_leader().await {
leaders.push((sender, client));
} else {
followers.push((sender, client));
}
}

assert_eq!(leaders.len(), expected_leader_count);
assert_eq!(followers.len(), expected_follower_count);
(leaders, followers)
}

async fn prepare_election_client(
count: i32,
ttl: i64,
) -> Vec<(Sender<()>, Arc<dyn ElectionClient>)> {
let handle = tokio::spawn(async move {
let addr = "0.0.0.0:2388".parse().unwrap();
let mut builder = etcd_client::SimServer::builder();
builder.serve(addr).await.unwrap();
});

let mut clients: Vec<(watch::Sender<()>, Arc<dyn ElectionClient>)> = vec![];

for i in 0..count {
let (stop_sender, stop_receiver) = watch::channel(());
clients.push((
stop_sender,
Arc::new(
EtcdElectionClient::new(
vec!["localhost:2388".to_string()],
None,
false,
format!("client_{}", i).to_string(),
)
.await
.unwrap(),
),
));
}

for client in &clients {
assert!(!client.1.is_leader().await);
}

for (stop_sender, client) in &clients {
let client_ = client.clone();
let stop = stop_sender.subscribe();

tokio::spawn(async move {
let mut ticker = time::interval(Duration::from_secs(1));
loop {
ticker.tick().await;
if let Ok(_) = client_.run_once(ttl, stop.clone()).await {
break;
}
}
});
}

clients
}
}
9 changes: 8 additions & 1 deletion src/meta/src/storage/wrapped_etcd_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::sync::Arc;
use etcd_client::{
CampaignResponse, ConnectOptions, DeleteOptions, DeleteResponse, GetOptions, GetResponse,
LeaderResponse, LeaseGrantOptions, LeaseGrantResponse, LeaseKeepAliveStream, LeaseKeeper,
ObserveStream, PutOptions, PutResponse, Txn, TxnResponse,
ObserveStream, PutOptions, PutResponse, ResignOptions, ResignResponse, Txn, TxnResponse,
};
use tokio::sync::RwLock;

Expand Down Expand Up @@ -165,6 +165,13 @@ impl_etcd_client_command_proxy!(
ObserveStream
);

impl_etcd_client_command_proxy!(
resign,
election_client,
(option: Option<ResignOptions>),
ResignResponse
);

impl WrappedEtcdClient {
pub async fn connect(
endpoints: Vec<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async-trait = "0.1"
aws-sdk-s3 = { version = "0.2.17", package = "madsim-aws-sdk-s3" }
clap = { version = "4", features = ["derive"] }
console = "0.15"
etcd-client = { version = "0.2.20", package = "madsim-etcd-client" }
etcd-client = { version = "0.2.23", package = "madsim-etcd-client" }
futures = { version = "0.3", default-features = false, features = ["alloc"] }
glob = "0.3"
itertools = "0.10"
Expand Down

0 comments on commit f522f68

Please sign in to comment.