diff --git a/src/extensions/api/eth.rs b/src/extensions/api/eth.rs index a097717..b3abc35 100644 --- a/src/extensions/api/eth.rs +++ b/src/extensions/api/eth.rs @@ -159,6 +159,16 @@ impl EthApi { let number = super::get_number(&val)?; let hash = super::get_hash(&val)?; + if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) + { + tracing::error!("Error in background task: {e}"); + client + .rotate_endpoint() + .await + .expect("Failed to rotate endpoint"); + break; + } + tracing::debug!("New finalized head: {number} {hash}"); finalized_head_tx.send_replace(Some((hash, number))); } diff --git a/src/extensions/api/mod.rs b/src/extensions/api/mod.rs index 4a0fada..2c02eca 100644 --- a/src/extensions/api/mod.rs +++ b/src/extensions/api/mod.rs @@ -53,3 +53,23 @@ pub(crate) fn get_hash(val: &JsonValue) -> anyhow::Result { } Err(anyhow::Error::msg("Hash not found")) } + +pub(crate) fn validate_new_head( + tx: &watch::Sender>, + number: u64, + hash: &JsonValue, +) -> anyhow::Result<()> { + if let Some((current_hash, current_number)) = tx.borrow().as_ref() { + if *current_number > number { + return Err(anyhow::Error::msg("Head number is not increasing, current_number: {current_number} new_number: {number}")); + } + + if *current_number == number && current_hash != hash { + return Err(anyhow::Error::msg( + "Head number is the same but hash is not matching, current_hash: {current_hash} new_hash: {hash}" + )); + } + } + + Ok(()) +} diff --git a/src/extensions/api/substrate.rs b/src/extensions/api/substrate.rs index 02e05c4..159bf31 100644 --- a/src/extensions/api/substrate.rs +++ b/src/extensions/api/substrate.rs @@ -100,12 +100,12 @@ impl SubstrateApi { let number = super::get_number(&val)?; - let res = client + let hash = client .request("chain_getBlockHash", vec![number.into()]) .await?; - tracing::debug!("New head: {number} {res}"); - head_tx.send_replace(Some((res, number))); + tracing::debug!("New head: {number} {hash}"); + head_tx.send_replace(Some((hash, number))); } else { break; } @@ -145,12 +145,22 @@ impl SubstrateApi { while let Some(Ok(val)) = sub.next().await { let number = super::get_number(&val)?; - let res = client + let hash = client .request("chain_getBlockHash", vec![number.into()]) .await?; - tracing::debug!("New finalized head: {number} {res}"); - finalized_head_tx.send_replace(Some((res, number))); + if let Err(e) = super::validate_new_head(&finalized_head_tx, number, &hash) + { + tracing::error!("Error in background task: {e}"); + client + .rotate_endpoint() + .await + .expect("Failed to rotate endpoint"); + break; + } + + tracing::debug!("New finalized head: {number} {hash}"); + finalized_head_tx.send_replace(Some((hash, number))); } Ok::<(), anyhow::Error>(()) diff --git a/src/extensions/api/tests.rs b/src/extensions/api/tests.rs index 21ee5da..3de1cdc 100644 --- a/src/extensions/api/tests.rs +++ b/src/extensions/api/tests.rs @@ -75,89 +75,89 @@ async fn get_head_finalized_head() { // access value before subscription is established let h1 = tokio::spawn(async move { - assert_eq!(head.read().await, (json!("0xabcd"), 0x1234)); + assert_eq!(head.read().await, (json!("0xaa"), 0x01)); // should be able to read it multiple times - assert_eq!(head.read().await, (json!("0xabcd"), 0x1234)); + assert_eq!(head.read().await, (json!("0xaa"), 0x01)); }); let (_, head_sink) = head_rx.recv().await.unwrap(); head_sink - .send(SubscriptionMessage::from_json(&json!({ "number": "0x1234" })).unwrap()) + .send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap()) .await .unwrap(); { let (params, tx) = block_rx.recv().await.unwrap(); - assert_eq!(params, json!([0x1234])); - tx.send(json!("0xabcd")).unwrap(); + assert_eq!(params, json!([0x01])); + tx.send(json!("0xaa")).unwrap(); } let (_, finalized_head_sink) = finalized_head_rx.recv().await.unwrap(); finalized_head_sink - .send(SubscriptionMessage::from_json(&json!({ "number": "0x4321" })).unwrap()) + .send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap()) .await .unwrap(); { let (params, tx) = block_rx.recv().await.unwrap(); - assert_eq!(params, json!([0x4321])); - tx.send(json!("0xdcba")).unwrap(); + assert_eq!(params, json!([0x01])); + tx.send(json!("0xaa")).unwrap(); } // read after subscription is established let h2 = tokio::spawn(async move { let val = finalized_head.read().await; - assert_eq!(val, (json!("0xdcba"), 0x4321)); + assert_eq!(val, (json!("0xaa"), 0x01)); }); // new head head_sink - .send(SubscriptionMessage::from_json(&json!({ "number": "0x1122" })).unwrap()) + .send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap()) .await .unwrap(); { let (params, tx) = block_rx.recv().await.unwrap(); - assert_eq!(params, json!([0x1122])); - tx.send(json!("0xaabb")).unwrap(); + assert_eq!(params, json!([0x02])); + tx.send(json!("0xbb")).unwrap(); } let finalized_head = api.get_finalized_head(); // still old value - assert_eq!(finalized_head.read().await, (json!("0xdcba"), 0x4321)); + assert_eq!(finalized_head.read().await, (json!("0xaa"), 0x01)); // wait a bit for the value to be updated tokio::time::sleep(std::time::Duration::from_millis(1)).await; let head = api.get_head(); - assert_eq!(head.read().await, (json!("0xaabb"), 0x1122)); + assert_eq!(head.read().await, (json!("0xbb"), 0x02)); // new finalized head finalized_head_sink - .send(SubscriptionMessage::from_json(&json!({ "number": "0x2233" })).unwrap()) + .send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap()) .await .unwrap(); finalized_head_sink - .send(SubscriptionMessage::from_json(&json!({ "number": "0x3344" })).unwrap()) + .send(SubscriptionMessage::from_json(&json!({ "number": "0x04" })).unwrap()) .await .unwrap(); { let (params, tx) = block_rx.recv().await.unwrap(); - assert_eq!(params, json!([0x2233])); - tx.send(json!("0xbbcc")).unwrap(); + assert_eq!(params, json!([0x03])); + tx.send(json!("0xcc")).unwrap(); let (params, tx) = block_rx.recv().await.unwrap(); - assert_eq!(params, json!([0x3344])); - tx.send(json!("0xccdd")).unwrap(); + assert_eq!(params, json!([0x04])); + tx.send(json!("0xdd")).unwrap(); } // wait a bit for the value to be updated tokio::time::sleep(std::time::Duration::from_millis(1)).await; - assert_eq!(finalized_head.read().await, (json!("0xccdd"), 0x3344)); + assert_eq!(finalized_head.read().await, (json!("0xdd"), 0x04)); h1.await.unwrap(); h2.await.unwrap(); @@ -170,7 +170,7 @@ async fn rotate_endpoint_on_stale() { let (addr2, server2, mut head_rx2, _, mut block_rx2) = create_server().await; let client = Client::new([format!("ws://{addr}"), format!("ws://{addr2}")]).unwrap(); - let api = SubstrateApi::new(Arc::new(client), std::time::Duration::from_millis(10)); + let api = SubstrateApi::new(Arc::new(client), std::time::Duration::from_millis(100)); let head = api.get_head(); let h1 = tokio::spawn(async move { @@ -209,7 +209,7 @@ async fn rotate_endpoint_on_stale() { assert_eq!(api.get_head().read().await, (json!("0xbcde"), 0x2345)); // wait for timeout - tokio::time::sleep(std::time::Duration::from_millis(15)).await; + tokio::time::sleep(std::time::Duration::from_millis(150)).await; // stale assert!(head_sink.is_closed()); @@ -235,3 +235,127 @@ async fn rotate_endpoint_on_stale() { server.stop().unwrap(); server2.stop().unwrap(); } + +#[tokio::test] +async fn rotate_endpoint_on_head_mismatch() { + let (addr1, server1, mut head_rx1, mut finalized_head_rx1, mut block_rx1) = + create_server().await; + let (addr2, server2, mut head_rx2, mut finalized_head_rx2, mut block_rx2) = + create_server().await; + + let client = Client::new([format!("ws://{addr1}"), format!("ws://{addr2}")]).unwrap(); + + let client = Arc::new(client); + // TODO: investigate why it takes a while to connect to another endpoint + let api = SubstrateApi::new(client.clone(), std::time::Duration::from_millis(5_000)); + + let head = api.get_head(); + let finalized_head = api.get_finalized_head(); + let h1 = tokio::spawn(async move { + assert_eq!(head.read().await, (json!("0xaa"), 1)); + assert_eq!(finalized_head.read().await, (json!("0xaa"), 1)); + }); + + // initial connection + let (_, head_sink) = head_rx1.recv().await.unwrap(); + head_sink + .send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap()) + .await + .unwrap(); + { + let (params, tx) = block_rx1.recv().await.unwrap(); + assert_eq!(params, json!([0x01])); + tx.send(json!("0xaa")).unwrap(); + } + + let (_, finalized_head_sink) = finalized_head_rx1.recv().await.unwrap(); + finalized_head_sink + .send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap()) + .await + .unwrap(); + { + let (params, tx) = block_rx1.recv().await.unwrap(); + assert_eq!(params, json!([0x01])); + tx.send(json!("0xaa")).unwrap(); + } + + h1.await.unwrap(); + + // not stale + head_sink + .send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap()) + .await + .unwrap(); + { + let (params, tx) = block_rx1.recv().await.unwrap(); + assert_eq!(params, json!([0x02])); + tx.send(json!("0xbb")).unwrap(); + } + finalized_head_sink + .send(SubscriptionMessage::from_json(&json!({ "number": "0x02" })).unwrap()) + .await + .unwrap(); + { + let (params, tx) = block_rx1.recv().await.unwrap(); + assert_eq!(params, json!([0x02])); + tx.send(json!("0xbb")).unwrap(); + } + + // wait a bit to process tasks + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02)); + + // stale server finalized head 1, trigger rotate endpoint + finalized_head_sink + .send(SubscriptionMessage::from_json(&json!({ "number": "0x01" })).unwrap()) + .await + .unwrap(); + { + let (params, tx) = block_rx1.recv().await.unwrap(); + assert_eq!(params, json!([0x01])); + tx.send(json!("0xaa")).unwrap(); + } + + // current finalized head is still 2 + assert_eq!(api.get_finalized_head().read().await, (json!("0xbb"), 0x02)); + + let (_, head_sink2) = head_rx2.recv().await.unwrap(); + head_sink2 + .send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap()) + .await + .unwrap(); + + let (params, tx) = block_rx2.recv().await.unwrap(); + assert_eq!(params, json!([0x03])); + tx.send(json!("0xcc")).unwrap(); + + let (_, finalized_head_sink2) = finalized_head_rx2.recv().await.unwrap(); + finalized_head_sink2 + .send(SubscriptionMessage::from_json(&json!({ "number": "0x03" })).unwrap()) + .await + .unwrap(); + + let (params, tx) = block_rx2.recv().await.unwrap(); + assert_eq!(params, json!([0x03])); + tx.send(json!("0xcc")).unwrap(); + + head_sink2 + .send(SubscriptionMessage::from_json(&json!({ "number": "0x04" })).unwrap()) + .await + .unwrap(); + + let (params, tx) = block_rx2.recv().await.unwrap(); + assert_eq!(params, json!([0x04])); + tx.send(json!("0xdd")).unwrap(); + + // wait a bit to process tasks + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + + // current head=4 and finalized_head=3 + assert_eq!(api.get_head().read().await, (json!("0xdd"), 0x04)); + assert_eq!(api.get_finalized_head().read().await, (json!("0xcc"), 0x03)); + + server1.stop().unwrap(); + server2.stop().unwrap(); +}