diff --git a/blueprints/incredible-squaring/src/main.rs b/blueprints/incredible-squaring/src/main.rs index d8c9a13e..3d2aa096 100644 --- a/blueprints/incredible-squaring/src/main.rs +++ b/blueprints/incredible-squaring/src/main.rs @@ -18,12 +18,7 @@ async fn main() { }; info!("~~~ Executing the incredible squaring blueprint ~~~"); - MultiJobRunner::new(&env) - .with_job() - .with_default_price_targets() - .finish(x_square) - .run() - .await?; + MultiJobRunner::new(env).job(x_square).run().await?; info!("Exiting..."); Ok(()) diff --git a/blueprints/periodic-web-poller/src/main.rs b/blueprints/periodic-web-poller/src/main.rs index dcdaf3d3..8a9d8346 100644 --- a/blueprints/periodic-web-poller/src/main.rs +++ b/blueprints/periodic-web-poller/src/main.rs @@ -10,11 +10,7 @@ async fn main() { }; info!("~~~ Executing the periodic web poller ~~~"); - MultiJobRunner::new(None) - .with_job() - .finish(web_poller) - .run() - .await?; + MultiJobRunner::new(None).job(web_poller).run().await?; info!("Exiting..."); Ok(()) diff --git a/sdk/src/error.rs b/sdk/src/error.rs index a0eb3591..d613e73c 100644 --- a/sdk/src/error.rs +++ b/sdk/src/error.rs @@ -20,6 +20,12 @@ pub enum Error { #[error("Keystore error: {0}")] Keystore(#[from] crate::keystore::error::Error), + #[error("Config error: {0}")] + Config(#[from] crate::config::Error), + + #[error("Job runner error: {0}")] + Runner(#[from] crate::job_runner::Error), + #[error("Missing network ID")] MissingNetworkId, diff --git a/sdk/src/job_runner.rs b/sdk/src/job_runner.rs index 4f15738f..4e1e078a 100644 --- a/sdk/src/job_runner.rs +++ b/sdk/src/job_runner.rs @@ -7,38 +7,105 @@ use std::future::Future; use std::pin::Pin; use tangle_subxt::tangle_testnet_runtime::api; use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services; -use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::PriceTargets; +use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::PriceTargets as TangleSubxtPriceTargets; -pub struct MultiJobRunner<'a> { - pub(crate) enqueued_job_runners: EnqueuedJobRunners<'a>, - pub(crate) env: Option>, +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("No jobs registered. Make sure to add a job with `MultiJobRunner::add_job`")] + NoJobs, + #[error("Job already initialized")] + AlreadyInitialized, + + #[error(transparent)] + Recv(#[from] tokio::sync::oneshot::error::RecvError), } -pub type EnqueuedJobRunners<'a> = Vec< - Pin< - Box< - dyn ScopedFuture< - 'a, - Output = Option>>, - >, - >, - >, ->; +/// Wrapper for `tangle_subxt`'s [`PriceTargets`] +/// +/// This provides a [`Default`] impl for a zeroed-out [`PriceTargets`]. +/// +/// [`PriceTargets`]: tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::PriceTargets +pub struct PriceTargets(TangleSubxtPriceTargets); -pub trait ScopedFuture<'a>: Future + 'a {} -impl<'a, T: Future + 'a> ScopedFuture<'a> for T {} +impl From for PriceTargets { + fn from(t: TangleSubxtPriceTargets) -> Self { + PriceTargets(t) + } +} -pub struct JobBuilder<'b, K: 'b> { - pub(crate) register_call: Option>, - runner: MultiJobRunner<'b>, - _pd: std::marker::PhantomData<&'b K>, +impl Default for PriceTargets { + fn default() -> Self { + Self(TangleSubxtPriceTargets { + cpu: 0, + mem: 0, + storage_hdd: 0, + storage_ssd: 0, + storage_nvme: 0, + }) + } } +pub trait ScopedFuture<'a>: Future + 'a {} +impl<'a, T: Future + 'a> ScopedFuture<'a> for T {} + pub(crate) type RegisterCall<'a> = Pin>>>; -impl<'a, K: InitializableEventHandler + Send + 'a> JobBuilder<'a, K> { - pub fn with_registration< +/// A builder for blueprint jobs +/// +/// Unless custom registration functions are needed, this can be avoided. See [`MultiJobRunner::job`]. +pub struct JobBuilder<'a, T> +where + T: InitializableEventHandler + Send + 'a, +{ + event_handler: T, + price_targets: Option, + registration: Option>, +} + +impl<'a, T> From for JobBuilder<'a, T> +where + T: InitializableEventHandler + Send + 'a, +{ + fn from(value: T) -> Self { + Self { + event_handler: value, + price_targets: None, + registration: None, + } + } +} + +impl<'a, T, P> From<(T, P)> for JobBuilder<'a, T> +where + T: InitializableEventHandler + Send + 'a, + T: markers::IsTangle, + P: Into, +{ + fn from(value: (T, P)) -> Self { + Self { + event_handler: value.0, + price_targets: Some(value.1.into()), + registration: None, + } + } +} + +impl<'a, T> JobBuilder<'a, T> +where + T: InitializableEventHandler + Send + 'a, +{ + /// Create a new `JobBuilder` + pub fn new(event_handler: T) -> Self { + Self { + event_handler, + price_targets: None, + registration: None, + } + } + + /// Set the registration function + pub fn registration< Fut: ScopedFuture<'a, Output = Result<(), crate::Error>> + 'a, Input: 'a, >( @@ -47,113 +114,219 @@ impl<'a, K: InitializableEventHandler + Send + 'a> JobBuilder<'a, K> { register_call: fn(Input) -> Fut, ) -> Self { let future = register_call(context); - self.register_call = Some(Box::pin(future)); + self.registration = Some(Box::pin(future)); self } - pub fn with_price_targets(self, price_targets: PriceTargets) -> Self - where - K: markers::IsTangle, - { - let env = self - .runner - .env - .clone() - .expect("Must have an env when using tangle"); - self.with_registration((env, price_targets), tangle_registration) - } - - pub fn with_default_price_targets(self) -> Self - where - K: markers::IsTangle, - { - self.with_price_targets(PriceTargets { - cpu: 0, - mem: 0, - storage_hdd: 0, - storage_ssd: 0, - storage_nvme: 0, - }) + /// Set the price targets + pub fn price_targets(mut self, targets: PriceTargets) -> Self { + self.price_targets = Some(targets); + self } +} - pub fn finish(mut self, job_runner: K) -> MultiJobRunner<'a> { - let registration = self.register_call.take(); - let skip_registration = self - .runner - .env - .as_ref() - .map(|r| r.test_mode) - .unwrap_or(true); - - let task = Box::pin(async move { - if let Some(registration) = registration { - // Skip registration if in test mode - if !skip_registration { - if let Err(err) = registration.await { - crate::error!("Failed to register job: {err:?}"); - return None; - } - } - } - - job_runner.init_event_handler().await - }); +pub type EnqueuedJobRunners<'a> = Vec>; - self.runner.enqueued_job_runners.push(task); +pub type JobRunner<'a> = Pin< + Box< + dyn ScopedFuture< + 'a, + Output = Option>>, + >, + >, +>; - self.runner - } +pub struct MultiJobRunner<'a> { + pub(crate) enqueued_job_runners: EnqueuedJobRunners<'a>, + pub(crate) env: Option>, } impl<'a> MultiJobRunner<'a> { - pub fn new>>>(env: T) -> Self { + /// Create a new `MultiJobRunner` + pub fn new>>>(env: T) -> Self { Self { enqueued_job_runners: Vec::new(), - env: env.into().cloned(), + env: env.into(), } } /// Add a job to the job runner - /// ```no_run - /// #[gadget_sdk::main(env)] - /// async fn main() { - /// let x_square = blueprint::XsquareEventHandler { - /// service_id: env.service_id.unwrap(), - /// client: client.clone(), - /// signer, - /// }; /// - /// let x_square2 = blueprint::XsquareEventHandler { - /// service_id: env.service_id.unwrap(), - /// client: client.clone(), - /// signer, - /// }; + /// # Examples + /// + /// ```rust,no_run + /// use gadget_sdk::job_runner::{JobBuilder, MultiJobRunner, PriceTargets}; + /// use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services; + /// + /// # #[gadget_sdk::main(env)] + /// # async fn main() { + /// # mod blueprint { + /// # #[gadget_sdk::job( + /// # id = 0, + /// # params(x), + /// # result(_), + /// # event_listener( + /// # listener = TangleEventListener, + /// # event = JobCalled, + /// # ), + /// # )] + /// # pub fn xsquare(x: u64) -> Result { + /// # Ok(x.saturating_pow(2u32)) + /// # } + /// # #[gadget_sdk::job( + /// # id = 1, + /// # params(x), + /// # result(_), + /// # event_listener( + /// # listener = TangleEventListener, + /// # event = JobCalled, + /// # ), + /// # )] + /// # pub fn xsquare2(x: u64) -> Result { + /// # Ok(x.saturating_pow(2u32)) + /// # } + /// # #[gadget_sdk::job( + /// # id = 2, + /// # params(x), + /// # result(_), + /// # event_listener( + /// # listener = TangleEventListener, + /// # event = JobCalled, + /// # ), + /// # )] + /// # pub fn xsquare3(x: u64) -> Result { + /// # Ok(x.saturating_pow(2u32)) + /// # } + /// # } + /// let client = env.client().await?; + /// let signer = env.first_sr25519_signer()?; + /// let x_square = blueprint::XsquareEventHandler { + /// service_id: env.service_id.unwrap(), + /// client: client.clone(), + /// signer: signer.clone(), + /// }; /// - /// MultiJobRunner::new(&env) - /// .with_job() - /// .with_default_price_targets() - /// .finish(x_square) - /// .with_job() - /// .with_default_price_targets() - /// .finish(x_square2) - /// .run() - /// .await?; + /// let x_square2 = blueprint::XsquareEventHandler { + /// service_id: env.service_id.unwrap(), + /// client: client.clone(), + /// signer: signer.clone(), + /// }; + /// + /// let custom_price_targets = services::PriceTargets { + /// cpu: 5, + /// mem: 10, + /// storage_hdd: 15, + /// storage_ssd: 20, + /// storage_nvme: 25, + /// }; + /// + /// let x_square3 = blueprint::XsquareEventHandler { + /// service_id: env.service_id.unwrap(), + /// client: client.clone(), + /// signer: signer.clone(), + /// }; + /// + /// MultiJobRunner::new(env) + /// .job(x_square) + /// // With custom price targets + /// .job((x_square2, custom_price_targets)) + /// // With custom registration + /// .job(JobBuilder::new(x_square3).registration(1, my_registration)) + /// .run() + /// .await?; + /// # Ok(()) } + /// + /// async fn my_registration(foo: u32) -> Result<(), gadget_sdk::Error> { + /// // ... + /// Ok(()) /// } /// ``` - pub fn with_job(self) -> JobBuilder<'a, K> { - JobBuilder { - register_call: None, - runner: self, - _pd: Default::default(), + pub fn job(&mut self, job: J) -> &mut Self + where + J: Into>, + T: InitializableEventHandler + Send + 'a, + { + let JobBuilder { + event_handler, + price_targets, + mut registration, + } = job.into(); + + // Skip registration if in test mode + let skip_registration = self.env.as_ref().map(|r| r.test_mode).unwrap_or(true); + if skip_registration { + registration = None; + } + + if !skip_registration && registration.is_none() { + let env = self + .env + .clone() + .expect("Must have an env when using tangle"); + + let price_targets = price_targets.unwrap_or(PriceTargets::default()); + + let future = tangle_registration((env, price_targets)); + registration = Some(Box::pin(future)); } + + let task = Box::pin(async move { + if let Some(registration) = registration { + if let Err(err) = registration.await { + crate::error!("Failed to register job: {err:?}"); + return None; + } + } + + event_handler.init_event_handler().await + }); + + self.enqueued_job_runners.push(task); + self } + /// Start the job runner + /// + /// # Errors + /// + /// * No jobs are registered + /// * A job exits prematurely + /// + /// # Examples + /// + /// ```rust,no_run + /// use gadget_sdk::job_runner::MultiJobRunner; + /// + /// # #[gadget_sdk::main(env)] + /// # async fn main() { + /// # mod blueprint { + /// # #[gadget_sdk::job( + /// # id = 0, + /// # params(x), + /// # result(_), + /// # event_listener( + /// # listener = TangleEventListener, + /// # event = JobCalled, + /// # ), + /// # )] + /// # pub fn xsquare(x: u64) -> Result { + /// # Ok(x.saturating_pow(2u32)) + /// # } + /// # } + /// let client = env.client().await?; + /// let signer = env.first_sr25519_signer()?; + /// let x_square = blueprint::XsquareEventHandler { + /// service_id: env.service_id.unwrap(), + /// client: client.clone(), + /// signer, + /// }; + /// + /// MultiJobRunner::new(env).job(x_square).run().await?; + /// # Ok(()) } + /// ``` pub async fn run(&mut self) -> Result<(), crate::Error> { if self.enqueued_job_runners.is_empty() { - return Err(crate::Error::Other( - "No jobs registered. Make sure to add a job with `MultiJobRunner::add_job` " - .to_string(), - )); + return Err(Error::NoJobs.into()); } let mut futures = Vec::new(); @@ -168,23 +341,15 @@ impl<'a> MultiJobRunner<'a> { // is None, return an error stating that the job already initialized let mut ordered_futures = Vec::new(); for receiver in receivers { - let receiver = receiver - .ok_or_else(|| crate::Error::Other("Job already initialized".to_string()))?; + let receiver = receiver.ok_or(Error::AlreadyInitialized)?; ordered_futures.push(receiver); } - let res = futures::future::select_all(ordered_futures).await; - let job_n = res.1; - let err = res - .0 - .map_err(|err| crate::Error::Other(err.to_string())) - .map_err(|_err| { - crate::Error::Other(format!("Job {job_n} exited prematurely (channel dropped)")) - })?; - - Err(crate::Error::Other(format!( - "Job {job_n} exited prematurely: {err:?}" - ))) + let (res, job_num, _) = futures::future::select_all(ordered_futures).await; + let err = res.map_err(Error::from)?; + + crate::error!("Job {job_num} exited prematurely"); + err } } @@ -192,22 +357,15 @@ async fn tangle_registration( this: (GadgetConfiguration, PriceTargets), ) -> Result<(), crate::Error> { let (this, price_targets) = this; - let client = this - .client() - .await - .map_err(|err| crate::Error::Other(err.to_string()))?; - let signer = this - .first_sr25519_signer() - .map_err(|err| crate::Error::Other(err.to_string()))?; - let ecdsa_pair = this - .first_ecdsa_signer() - .map_err(|err| crate::Error::Other(err.to_string()))?; + let client = this.client().await?; + let signer = this.first_sr25519_signer()?; + let ecdsa_pair = this.first_ecdsa_signer()?; let xt = api::tx().services().register( this.blueprint_id, services::OperatorPreferences { key: ecdsa_pair.signer().public().0, - price_targets, + price_targets: price_targets.0, }, Default::default(), );