diff --git a/src/lib.rs b/src/lib.rs index 0aeaf08..e11d889 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,7 +2,7 @@ pub use importer::write_seed; pub use multitenant::set_multitenant; pub use platform_client::{set_wallet_account, update_transaction}; -pub use subscription::SubscriptionParams; +pub use subscription::{SubscriptionJob, SubscriptionParams}; pub use transaction::TransactionJob; pub use wallet::DeriveWalletJob; diff --git a/src/main.rs b/src/main.rs index 51f850a..712d04f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use subxt::{OnlineClient, PolkadotConfig}; use tokio::signal; use wallet_daemon::config_loader::{load_config, load_wallet}; use wallet_daemon::{ - set_multitenant, write_seed, DeriveWalletJob, SubscriptionParams, TransactionJob, + set_multitenant, write_seed, DeriveWalletJob, SubscriptionJob, TransactionJob, }; #[tokio::main(flavor = "multi_thread")] @@ -52,16 +52,18 @@ async fn main() -> Result<(), Box> { }); let chain_client = Arc::new(chain_client); - let subscription = SubscriptionParams::new(Arc::clone(&chain_client)); + let subscription_job = SubscriptionJob::create_job(Arc::clone(&chain_client)); + let params = subscription_job.get_params(); let (transaction_poller, transaction_processor) = TransactionJob::create_job( Arc::clone(&chain_client), - Arc::clone(&subscription), + Arc::clone(¶ms), keypair.clone(), platform_url.clone(), platform_token.clone(), ); + subscription_job.start(); transaction_poller.start(); transaction_processor.start(); diff --git a/src/subscription.rs b/src/subscription.rs index c86951c..fa65495 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -1,41 +1,73 @@ use std::sync::{Arc, Mutex}; +use std::{panic, process}; use subxt::client::ClientRuntimeUpdater; use subxt::ext::subxt_core; use subxt::{OnlineClient, PolkadotConfig}; use subxt_core::config::substrate; #[derive(Debug)] -pub struct SubscriptionParams { - rpc: Arc>, - block_header: Arc>>>, - spec_version: Arc>>, +pub struct SubscriptionJob { + params: Arc, } -impl SubscriptionParams { - pub fn new(rpc: Arc>) -> Arc { +impl SubscriptionJob { + pub fn new(params: Arc) -> Self { + Self { params } + } + + pub fn create_job(rpc: Arc>) -> SubscriptionJob { let block_header = Arc::new(Mutex::new(None)); let spec_version = Arc::new(Mutex::new(None)); - - let subscription = Arc::new(Self { - rpc: Arc::clone(&rpc), + let subscription = Arc::new(SubscriptionParams { + rpc, block_header: Arc::clone(&block_header), spec_version: Arc::clone(&spec_version), }); - let block_sub = Arc::clone(&subscription); + SubscriptionJob::new(subscription) + } + + pub fn start(&self) { + let orig_hook = panic::take_hook(); + panic::set_hook(Box::new(move |panic_info| { + orig_hook(panic_info); + process::exit(1); + })); + + self.start_block(); + self.start_runtime(); + } + + pub fn start_block(&self) { + let block_sub = Arc::clone(&self.params); + tokio::spawn(async move { block_sub.block_subscription().await; }); + } - let runtime_sub = Arc::clone(&subscription); + pub fn start_runtime(&self) { + let runtime_sub = Arc::clone(&self.params); let updater = runtime_sub.rpc.updater(); + tokio::spawn(async move { runtime_sub.runtime_subscription(updater).await; }); + } - subscription + pub fn get_params(&self) -> Arc { + Arc::clone(&self.params) } +} +#[derive(Debug)] +pub struct SubscriptionParams { + rpc: Arc>, + block_header: Arc>>>, + spec_version: Arc>>, +} + +impl SubscriptionParams { async fn runtime_subscription(self: Arc, updater: ClientRuntimeUpdater) { let mut update_stream = updater.runtime_updates().await.unwrap();