Skip to content

Commit

Permalink
Catch all handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardocustodio committed Nov 26, 2024
1 parent 24ba4dd commit e077991
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PLATFORM_KEY=example
KEY_PASS=example
CONFIG_FILE=/opt/app/config.json
RUST_LOG=wallet_lib=warn
RUST_LOG=
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ name = "wallet-daemon"
version = "2.0.2"
edition = "2021"

[profile.release]
debug = true
debug-assertions = true
overflow-checks = true

[dependencies]
tracing = "0.1.40"
rpassword = "7.3.1"
Expand Down Expand Up @@ -31,3 +36,4 @@ sp-core = { version = "34.0.0", default-features = false, features = [
bip39 = { version = "2.0.0", features = ["rand"] }
autoincrement = "1.0.1"
lru = "0.12.5"
anyhow = "1.0.93"
19 changes: 10 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;
use std::time::Duration;
use subxt::backend::rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient};
use subxt::{OnlineClient, PolkadotConfig};
use tokio::signal;
use wallet_daemon::config_loader::{load_config, load_wallet};
use wallet_daemon::{
set_multitenant, write_seed, DeriveWalletJob, SubscriptionJob, TransactionJob,
Expand Down Expand Up @@ -63,17 +62,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
platform_token.clone(),
);

subscription_job.start();
transaction_poller.start();
transaction_processor.start();

let (wallet_poller, wallet_processor) =
DeriveWalletJob::create_job(keypair, platform_url, platform_token);

wallet_poller.start();
wallet_processor.start();

signal::ctrl_c().await.expect("Failed to listen for ctrl c");
tokio::select! {
_ = transaction_poller.start() => {}
_ = transaction_processor.start() => {}
_ = wallet_poller.start() => {}
_ = wallet_processor.start() => {}
r = subscription_job.start() => {
let err = r.unwrap_err();
tracing::error!("Subscription job failed: {:?}", err);
}
}

Ok(())
}
34 changes: 12 additions & 22 deletions src/subscription.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::{Arc, Mutex};
use std::{panic, process};
use std::{panic};
use subxt::client::ClientRuntimeUpdater;
use subxt::ext::subxt_core;
use subxt::{OnlineClient, PolkadotConfig};
use subxt_core::config::substrate;
use tokio::task::JoinHandle;

#[derive(Debug)]
pub struct SubscriptionJob {
Expand All @@ -27,32 +28,17 @@ impl SubscriptionJob {
SubscriptionJob::new(subscription)
}

pub fn start(&self) {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
orig_hook(panic_info);
process::exit(1);
}));

self.start_block();
self.start_runtime();
}

pub fn start_block(&self) {
pub fn start(&self) -> JoinHandle<()> {
let block_sub = Arc::clone(&self.params);

tokio::spawn(async move {
block_sub.block_subscription().await;
});
}

pub fn start_runtime(&self) {
let runtime_sub = Arc::clone(&self.params);
let updater = runtime_sub.rpc.updater();

tokio::spawn(async move {
runtime_sub.runtime_subscription(updater).await;
});
tokio::select! {
_ = block_sub.block_subscription() => {}
_ = runtime_sub.runtime_subscription(updater) => {}
}
})
}

pub fn get_params(&self) -> Arc<SubscriptionParams> {
Expand Down Expand Up @@ -99,6 +85,8 @@ impl SubscriptionParams {
},
};
}

tracing::error!("Runtime update stream ended unexpectedly");
}

async fn block_subscription(self: Arc<Self>) {
Expand Down Expand Up @@ -126,6 +114,8 @@ impl SubscriptionParams {

*block_header = Some(block.header().clone());
}

tracing::error!("Block subscription stream ended unexpectedly");
}

pub fn get_block_header(&self) -> substrate::SubstrateHeader<u32, substrate::BlakeTwo256> {
Expand Down
4 changes: 2 additions & 2 deletions src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ impl TransactionJob {
)
}

pub fn start(self) {
pub fn start(self) -> JoinHandle<()> {
tokio::spawn(async move {
self.start_polling().await;
});
})
}

async fn start_polling(&self) {
Expand Down
4 changes: 2 additions & 2 deletions src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ impl DeriveWalletJob {
)
}

pub fn start(self) {
pub fn start(self) -> JoinHandle<()> {
tokio::spawn(async move {
self.start_polling().await;
});
})
}

async fn start_polling(&self) {
Expand Down

0 comments on commit e077991

Please sign in to comment.