Skip to content

Commit

Permalink
Tokio fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardocustodio committed Nov 7, 2024
1 parent 3cb89e4 commit b49d169
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 19 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ tokio = { version = "1.41.0", features = [
"rt",
"macros",
"rt-multi-thread",
"signal"
"signal",
] }
graphql_client = "0.14.0"
reqwest = { version = "0.12.9", features = ["json"] }
Expand Down
2 changes: 0 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: '3.9'

services:
daemon:
image: enjin/wallet-daemon:latest
Expand Down
22 changes: 6 additions & 16 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,19 @@ impl SubscriptionParams {

let block_sub = Arc::clone(&subscription);
tokio::spawn(async move {
block_sub.block_subscription().await.unwrap();
block_sub.block_subscription().await;
});

let runtime_sub = Arc::clone(&subscription);
let updater = runtime_sub.rpc.updater();
tokio::spawn(async move {
runtime_sub.runtime_subscription(updater).await.unwrap();
runtime_sub.runtime_subscription(updater).await;
});

subscription
}

async fn runtime_subscription(
self: Arc<Self>,
updater: ClientRuntimeUpdater<PolkadotConfig>,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
async fn runtime_subscription(self: Arc<Self>, updater: ClientRuntimeUpdater<PolkadotConfig>) {
let mut update_stream = updater.runtime_updates().await.unwrap();

while let Some(Ok(update)) = update_stream.next().await {
Expand Down Expand Up @@ -70,25 +67,20 @@ impl SubscriptionParams {
},
};
}

Ok(())
}

async fn block_subscription(
self: Arc<Self>,
) -> Result<(), Box<dyn std::error::Error + Sync + Send>> {
let mut blocks_sub = self.rpc.blocks().subscribe_finalized().await?;
async fn block_subscription(self: Arc<Self>) {
let mut blocks_sub = self.rpc.blocks().subscribe_finalized().await.unwrap();

while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(e) => {
if e.is_disconnected_will_reconnect() {
tracing::warn!("Lost connection with the RPC node, reconnecting...");
continue;
}

return Err(e.into());
continue;
}
};

Expand All @@ -102,8 +94,6 @@ impl SubscriptionParams {

*block_header = Some(block.header().clone());
}

Ok(())
}

pub fn get_block_header(&self) -> substrate::SubstrateHeader<u32, substrate::BlakeTwo256> {
Expand Down

0 comments on commit b49d169

Please sign in to comment.