Skip to content

Commit

Permalink
[feat] benchmark proc-macro (#238)
Browse files Browse the repository at this point in the history
* start benchmark proc-macro

* it works with a simple output

* rename `threads` to `cores`

* Update docs

* fix clippy

* fix clippy issues

* fixing clippy

* fmt and small fixes

---------

Co-authored-by: drewstone <[email protected]>
  • Loading branch information
shekohex and drewstone authored Aug 26, 2024
1 parent 5b1842d commit 191245f
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion blueprint-manager/src/executor/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ pub(crate) async fn handle_tangle_event(
{
let mut services_for_this_blueprint = vec![];
if let Gadget::Native(gadget) = &blueprint.gadget {
let gadget_source = &gadget.sources.0[0];
// TODO: fix typo in soruces -> sources
// needs to update the tangle-subxt to fix the typo
let gadget_source = &gadget.soruces.0[0];
if let gadget_common::tangle_runtime::api::runtime_types::tangle_primitives::services::GadgetSourceFetcher::Github(gh) = &gadget_source.fetcher {
let metadata = github_fetcher_to_native_github_metadata(gh, blueprint.blueprint_id);
onchain_services.push(metadata);
Expand Down
3 changes: 2 additions & 1 deletion common/src/channels.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! When delivering messages to an async protocol, we want o make sure we don't mix up voting and public key gossip messages
//! When delivering messages to an async protocol, we want o make sure we don't mix up voting and public key gossip messages;
//!
//! Thus, this file contains a function that takes a channel from the gadget to the async protocol and splits it into two channels
use round_based::{Incoming, MessageDestination, MessageType, MsgId, Outgoing, PartyIndex};
use serde::de::DeserializeOwned;
Expand Down
4 changes: 3 additions & 1 deletion common/src/full_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ where
}
}

/// Used for constructing an instance of a node. If there is both a keygen and a signing protocol, then,
/// Used for constructing an instance of a node.
///
/// If there is both a keygen and a signing protocol, then,
/// the length of the vectors are 2. The length of the vector is equal to the numbers of protocols that
/// the constructed node is going to concurrently execute
pub struct NodeInput<Env: GadgetEnvironment, N: Network<Env>, KBE: KeystoreBackend, D> {
Expand Down
7 changes: 4 additions & 3 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ async fn get_latest_event_from_client<Env: GadgetEnvironment>(
})
}

#[macro_export]
/// Generates a run function that returns a future that runs all the supplied protocols run concurrently
/// Also generates a setup_node function that sets up the future that runs all the protocols concurrently
/// Generates a run function that returns a future that runs all the supplied protocols run concurrently.
///
/// It also generates a setup_node function that sets up the future that runs all the protocols concurrently.
#[allow(clippy::crate_in_macro_def)]
#[macro_export]
macro_rules! generate_setup_and_run_command {
($( $config:ident ),*) => {
/// Sets up a future that runs all the protocols concurrently
Expand Down
2 changes: 2 additions & 0 deletions macros/blueprint-proc-macro-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub enum FieldType {
}

/// A Service Blueprint is a the main definition of a service.
///
/// it contains the metadata of the service, the job definitions, and other hooks, along with the
/// gadget that will be executed when one of the jobs is calling this service.
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -222,6 +223,7 @@ pub enum Gadget<'a> {
}

/// A binary that is stored in the Github release.
///
/// this will constuct the URL to the release and download the binary.
/// The URL will be in the following format:
/// https://github.com/<owner>/<repo>/releases/download/v<tag>/<path>
Expand Down
18 changes: 17 additions & 1 deletion macros/blueprint-proc-macro-playground/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]

use gadget_sdk::{job, registration_hook, report, request_hook};
use gadget_sdk::{benchmark, job, registration_hook, report, request_hook};

#[derive(Debug, Clone, Copy)]
pub enum Error {
Expand Down Expand Up @@ -104,6 +104,17 @@ fn report_service_health(uptime: f64, response_time: u64, error_rate: f64) -> Ve
issues.concat()
}

// ==================
// Benchmarks
// ==================
#[benchmark(job_id = 0, cores = 2)]
fn keygen_2_of_3() {
let n = 3;
let t = 2;
let result = keygen(&MyContext, n, t);
assert!(result.is_ok());
}

#[cfg(test)]
mod tests {
#[test]
Expand All @@ -112,4 +123,9 @@ mod tests {
assert_eq!(super::KEYGEN_JOB_ID, 0);
eprintln!("{}", super::REGISTRATION_HOOK);
}

#[test]
fn example_benchmark() {
super::keygen_2_of_3_benchmark();
}
}
78 changes: 78 additions & 0 deletions macros/blueprint-proc-macro/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use proc_macro::TokenStream;
use quote::{format_ident, quote};
use syn::parse::{Parse, ParseStream};
use syn::{ItemFn, Token};

// Defines custom keywords
mod kw {
syn::custom_keyword!(cores);
syn::custom_keyword!(job_id);
}

/// `BenchmarkArgs` is a struct that holds the arguments for the `benchmark` macro.
pub(crate) struct BenchmarkArgs {
/// The max number of cores this benchmark should run with.
///
/// `#[benchmark(cores = 4)]`
cores: syn::LitInt,
/// The job identifier for the benchmark.
///
/// `#[benchmark(job_id = 1)]`
job_id: syn::LitInt,
}

pub(crate) fn benchmark_impl(args: &BenchmarkArgs, input: &ItemFn) -> syn::Result<TokenStream> {
let cores = &args.cores;
let job_id = &args.job_id;
let original_name = &input.sig.ident;
let name = format_ident!("{}_benchmark", original_name);
let block = &input.block;
let expanded = quote! {
#[doc(hidden)]
pub fn #name() {
let cores: usize = #cores;
let rt = gadget_sdk::benchmark::tokio::runtime::Builder::new_multi_thread()
.worker_threads(cores)
.max_blocking_threads(cores)
.enable_all()
.build()
.unwrap();
let _guard = rt.enter();
let b = gadget_sdk::benchmark::Bencher::new(cores, gadget_sdk::benchmark::TokioRuntime);
b.block_on(async move { #block });
let summary = b.stop(stringify!(#original_name), #job_id);
eprintln!("{}", summary);
return;
}
};
Ok(expanded.into())
}

impl Parse for BenchmarkArgs {
fn parse(input: ParseStream) -> syn::Result<Self> {
let mut cores = None;
let mut job_id = None;
while !input.is_empty() {
let lookahead = input.lookahead1();
if lookahead.peek(kw::cores) {
let _ = input.parse::<kw::cores>()?;
let _ = input.parse::<Token![=]>()?;
cores = Some(input.parse()?);
} else if lookahead.peek(kw::job_id) {
let _ = input.parse::<kw::job_id>()?;
let _ = input.parse::<Token![=]>()?;
job_id = Some(input.parse()?);
} else if lookahead.peek(Token![,]) {
let _ = input.parse::<Token![,]>()?;
} else {
return Err(lookahead.error());
}
}

let cores = cores.ok_or_else(|| input.error("Missing `cores` argument in attribute"))?;

let job_id = job_id.ok_or_else(|| input.error("Missing `job_id` argument in attribute"))?;

Ok(Self { cores, job_id })
}
}
27 changes: 25 additions & 2 deletions macros/blueprint-proc-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
use proc_macro::TokenStream;
use syn::parse_macro_input;

/// Benchmarking proc-macro
mod benchmark;
/// Blueprint Hooks proc-macro
mod hooks;
/// Blueprint Job proc-macro
Expand Down Expand Up @@ -81,9 +83,9 @@ pub fn job(args: TokenStream, input: TokenStream) -> TokenStream {
}
}

/// The `report` macro is used to annotate a function as a report handler for misbehaviors.
///
/// The `report` macro is used to annotate a function as a report handler for misbehaviors. This macro generates
/// the necessary code to handle events and process reports within the service blueprint. Reports are specifically
/// This macro generates the necessary code to handle events and process reports within the service blueprint. Reports are specifically
/// for submitting incorrect job results, attributable malicious behavior, or otherwise machine failures and reliability degradation.
///
/// # Example
Expand Down Expand Up @@ -179,3 +181,24 @@ pub fn request_hook(args: TokenStream, input: TokenStream) -> TokenStream {
Err(err) => err.to_compile_error().into(),
}
}

/// A procedural macro that annotates a function as a benchmark hook, mainly used
/// during the benchmarking phase.
///
/// # Example
/// ```rust,no_run
/// # use gadget_blueprint_proc_macro::benchmark;
/// #[benchmark(job_id = 1, cores = 4)]
/// pub fn my_job() {
/// // call your job with the necessary parameters
/// }
/// ```
#[proc_macro_attribute]
pub fn benchmark(args: TokenStream, input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as syn::ItemFn);
let args = parse_macro_input!(args as benchmark::BenchmarkArgs);
match benchmark::benchmark_impl(&args, &input) {
Ok(tokens) => tokens,
Err(err) => err.to_compile_error().into(),
}
}
5 changes: 5 additions & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serde_json = { workspace = true }
sp-core = { workspace = true, features = ["full_crypto"] }
sp-io = { workspace = true }


# Event Watchers and Handlers
backoff = { workspace = true }
subxt = { workspace = true }
Expand All @@ -67,6 +68,9 @@ alloy-transport = { workspace = true }
gadget-blueprint-proc-macro = { workspace = true }
derive_more = { workspace = true }

# Benchmarking deps
sysinfo = { workspace = true }

[target.'cfg(not(target_family = "wasm"))'.dependencies.libp2p]
workspace = true
features = [
Expand Down Expand Up @@ -97,6 +101,7 @@ hyper = { workspace = true, features = ["client"] }

[features]
default = ["std"]

std = [
"getrandom",
"hex/std",
Expand Down
105 changes: 105 additions & 0 deletions sdk/src/benchmark/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
pub use tokio;
/// The runtime trait that all runtimes must implement.
pub trait Runtime {
/// Runs the given future to completion on the runtime.
fn block_on<F>(&self, future: F) -> F::Output
where
F: std::future::Future;
}

#[derive(Debug, Clone, Copy)]
pub struct TokioRuntime;

impl Runtime for TokioRuntime {
fn block_on<F>(&self, future: F) -> F::Output
where
F: std::future::Future,
{
let rt = tokio::runtime::Handle::current();
rt.block_on(future)
}
}

/// A benchmarking harness.
#[derive(Debug)]
pub struct Bencher<R> {
/// The runtime to use for running benchmarks.
runtime: R,
/// The time at which the benchmark started.
started_at: std::time::Instant,
/// The max number of cores for this benchmark.
cores: usize,
}

#[derive(Debug, Clone)]
pub struct BenchmarkSummary {
/// The name of the benchmark.
pub name: String,
/// The job identifier.
pub job_id: u8,
/// The duration of the benchmark.
pub elapsed: std::time::Duration,
/// The number of cores the benchmark was run with.
pub cores: usize,
/// The amount of memory used by the benchmark (in bytes).
pub ram_usage: u64,
}

impl<R: Runtime> Bencher<R> {
pub fn new(threads: usize, runtime: R) -> Self {
Self {
runtime,
started_at: std::time::Instant::now(),
cores: threads,
}
}

/// Runs the given future on the runtime.
pub fn block_on<F>(&self, future: F) -> F::Output
where
F: std::future::Future,
{
self.runtime.block_on(future)
}

/// Stops the benchmark and returns a summary of the benchmark.
pub fn stop(&self, name: &str, job_id: u8) -> BenchmarkSummary {
let pid = sysinfo::get_current_pid().expect("Failed to get current process ID");
let s = sysinfo::System::new_all();
let process = s
.process(pid)
.expect("Failed to get current process from the system");
let ram_usage = process.memory();
BenchmarkSummary {
name: name.to_string(),
job_id,
elapsed: self.started_at.elapsed(),
cores: self.cores,
ram_usage,
}
}
}

impl std::fmt::Display for BenchmarkSummary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
const KB: f32 = 1024.00;
const MB: f32 = 1024.00 * KB;
const GB: f32 = 1024.00 * MB;
let ram_usage = self.ram_usage as f32;
let (ram_usage, unit) = if ram_usage < KB {
(ram_usage, "B")
} else if ram_usage < MB {
(ram_usage / KB, "KB")
} else if ram_usage < GB {
(ram_usage / MB, "MB")
} else {
(ram_usage / GB, "GB")
};

write!(
f,
"Benchmark: {}\nJob ID: {}\nElapsed: {:?}\nvCPU: {}\nRAM Usage: {ram_usage:.2} {unit}\n",
self.name, self.job_id, self.elapsed, self.cores,
)
}
}
2 changes: 2 additions & 0 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ pub mod network;

pub mod slashing;

pub mod benchmark;

pub use gadget_blueprint_proc_macro::*;

0 comments on commit 191245f

Please sign in to comment.