Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

notify tasks on endpoint rotation #109

Merged
merged 2 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions src/extensions/api/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ impl EthApi {
}
_ = interval.tick() => {
tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
client.rotate_endpoint().await;
break;
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
Expand All @@ -140,18 +144,31 @@ impl EthApi {
.subscribe("eth_subscribe", ["newFinalizedHeads".into()].into(), "eth_unsubscribe")
.await?;

while let Some(Ok(val)) = sub.next().await {
let number = super::get_number(&val)?;
let hash = super::get_hash(&val)?;
loop {
tokio::select! {
val = sub.next() => {
if let Some(Ok(val)) = val {
let number = super::get_number(&val)?;
let hash = super::get_hash(&val)?;

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await;
break;
}

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) {
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
break;
tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
} else {
break;
}
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
}

Ok::<(), anyhow::Error>(())
Expand Down
44 changes: 31 additions & 13 deletions src/extensions/api/substrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ impl SubstrateApi {
}
_ = interval.tick() => {
tracing::warn!("No new blocks for {stale_timeout:?} seconds, rotating endpoint");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
client.rotate_endpoint().await;
break;
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
Expand All @@ -122,28 +126,42 @@ impl SubstrateApi {
tokio::spawn(async move {
loop {
let run = async {
let sub = client
let mut sub = client
.subscribe(
"chain_subscribeFinalizedHeads",
[].into(),
"chain_unsubscribeFinalizedHeads",
)
.await?;

let mut sub = sub;
while let Some(Ok(val)) = sub.next().await {
let number = super::get_number(&val)?;
loop {
tokio::select! {
val = sub.next() => {
if let Some(Ok(val)) = val {
let number = super::get_number(&val)?;

let hash = client.request("chain_getBlockHash", vec![number.into()]).await?;
let hash = client
.request("chain_getBlockHash", vec![number.into()])
.await?;

if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) {
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await.expect("Failed to rotate endpoint");
break;
}
if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash)
{
tracing::error!("Error in background task: {e}");
client.rotate_endpoint().await;
break;
}

tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
tracing::debug!("New finalized head: {number} {hash}");
finalized_head_tx.send_replace(Some((hash, number)));
} else {
break;
}
}
_ = client.on_rotation() => {
// endpoint is rotated, break the loop and restart subscription
break;
}
}
}

Ok::<(), anyhow::Error>(())
Expand Down
9 changes: 7 additions & 2 deletions src/extensions/api/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ async fn rotate_endpoint_on_head_mismatch() {
let client = Client::new([format!("ws://{addr1}"), format!("ws://{addr2}")]).unwrap();

let client = Arc::new(client);
// TODO: investigate why it takes a while to connect to another endpoint
let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(5_000));
let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(100));

let head = api.get_head();
let finalized_head = api.get_finalized_head();
Expand Down Expand Up @@ -312,6 +311,12 @@ async fn rotate_endpoint_on_head_mismatch() {
tx.send(json!("0xaa")).unwrap();
}

// wait a bit to process tasks
tokio::time::sleep(std::time::Duration::from_millis(1)).await;

assert!(head_sink.is_closed());
assert!(finalized_head_sink.is_closed());

// current finalized head is still 2
assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02));

Expand Down
23 changes: 20 additions & 3 deletions src/extensions/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use jsonrpsee::{
use opentelemetry::trace::FutureExt;
use rand::{seq::SliceRandom, thread_rng};
use serde::Deserialize;
use tokio::sync::Notify;

use crate::{
extension::Extension,
Expand All @@ -36,6 +37,7 @@ const TRACER: utils::telemetry::Tracer = utils::telemetry::Tracer::new("client")

pub struct Client {
sender: tokio::sync::mpsc::Sender<Message>,
rotation_notify: Arc<Notify>,
}

#[derive(Deserialize, Debug)]
Expand Down Expand Up @@ -96,6 +98,9 @@ impl Client {

let (disconnect_tx, mut disconnect_rx) = tokio::sync::mpsc::channel::<()>(10);

let rotation_notify = Arc::new(Notify::new());
let rotating = rotation_notify.clone();

tokio::spawn(async move {
let tx = tx2;

Expand Down Expand Up @@ -292,6 +297,7 @@ impl Client {
tracing::trace!("Received message {message:?}");
match message {
Some(Message::RotateEndpoint) => {
rotating.notify_waiters();
tracing::info!("Rotate endpoint");
ws = build_ws().await;
}
Expand All @@ -306,7 +312,10 @@ impl Client {
}
});

Ok(Self { sender: tx })
Ok(Self {
sender: tx,
rotation_notify,
})
}

pub async fn request(&self, method: &str, params: Vec<JsonValue>) -> Result<JsonValue, ErrorObjectOwned> {
Expand Down Expand Up @@ -349,8 +358,16 @@ impl Client {
rx.with_context(cx).await.map_err(errors::failed)?
}

pub async fn rotate_endpoint(&self) -> Result<(), ()> {
self.sender.send(Message::RotateEndpoint).await.map_err(|_| ())
pub async fn rotate_endpoint(&self) {
self.sender
.send(Message::RotateEndpoint)
.await
.expect("Failed to rotate endpoint");
}

/// Returns a future that resolves when the endpoint is rotated.
pub async fn on_rotation(&self) {
self.rotation_notify.notified().await
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/extensions/client/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn multiple_endpoints() {

assert_eq!(result.to_string(), "2");

client.rotate_endpoint().await.unwrap();
client.rotate_endpoint().await;

tokio::time::sleep(Duration::from_millis(100)).await;

Expand Down