Skip to content

Commit

Permalink
Merge pull request #488 from Cerebellum-Network/feature/verify-aggreg…
Browse files Browse the repository at this point in the history
…ator-resp-sig

Verify aggregator's response signature
  • Loading branch information
khssnv authored Dec 3, 2024
2 parents 7ecebe2 + d9e6d91 commit 47e61d9
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 19 deletions.
77 changes: 77 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pallets/ddc-verification/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rand = { workspace = true, features = ["small_rng", "alloc"], default-features =
scale-info = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_with = { version = "3", default-features = false, features = ["base64", "macros"] }
sp-application-crypto = { workspace = true }
sp-core = { workspace = true }
sp-io = { workspace = true }
Expand Down
132 changes: 116 additions & 16 deletions pallets/ddc-verification/src/aggregator_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@

use ddc_primitives::{AggregatorInfo, BucketId, DdcEra};
use prost::Message;
use serde_with::{base64::Base64, serde_as};
use sp_io::offchain::timestamp;
use sp_runtime::offchain::{http, Duration};

use super::*;
use crate::signature::Verify;

pub struct AggregatorClient<'a> {
pub base_url: &'a str,
timeout: Duration,
retries: u32,
verify_sig: bool,
}

impl<'a> AggregatorClient<'a> {
pub fn new(base_url: &'a str, timeout: Duration, retries: u32) -> Self {
Self { base_url, timeout, retries }
pub fn new(base_url: &'a str, timeout: Duration, retries: u32, verify_sig: bool) -> Self {
Self { base_url, timeout, retries, verify_sig }
}

pub fn buckets_aggregates(
Expand All @@ -31,11 +34,28 @@ impl<'a> AggregatorClient<'a> {
if let Some(prev_token) = prev_token {
url = format!("{}&prevToken={}", url, prev_token);
}
if self.verify_sig {
url = format!("{}&sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;
if self.verify_sig {
let json_response: json::SignedJsonResponse<Vec<json::BucketAggregateResponse>> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: Vec<json::BucketAggregateResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
Ok(json_response)
}
}

pub fn nodes_aggregates(
Expand All @@ -51,11 +71,28 @@ impl<'a> AggregatorClient<'a> {
if let Some(prev_token) = prev_token {
url = format!("{}&prevToken={}", url, prev_token);
}
if self.verify_sig {
url = format!("{}&sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;
if self.verify_sig {
let json_response: json::SignedJsonResponse<Vec<json::NodeAggregateResponse>> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response)
Ok(json_response.payload)
} else {
let json_response: Vec<json::NodeAggregateResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
}
}

pub fn challenge_bucket_sub_aggregate(
Expand Down Expand Up @@ -103,12 +140,29 @@ impl<'a> AggregatorClient<'a> {
}

pub fn eras(&self) -> Result<Vec<json::AggregationEraResponse>, http::Error> {
let url = format!("{}/activity/eras", self.base_url);
let mut url = format!("{}/activity/eras", self.base_url);
if self.verify_sig {
url = format!("{}&sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;
if self.verify_sig {
let json_response: json::SignedJsonResponse<Vec<json::AggregationEraResponse>> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: Vec<json::AggregationEraResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
}
}

pub fn traverse_bucket_sub_aggregate(
Expand All @@ -119,16 +173,33 @@ impl<'a> AggregatorClient<'a> {
merkle_tree_node_id: u32,
levels: u16,
) -> Result<json::MerkleTreeNodeResponse, http::Error> {
let url = format!(
let mut url = format!(
"{}/activity/buckets/{}/traverse?eraId={}&nodeId={}&merkleTreeNodeId={}&levels={}",
self.base_url, bucket_id, era_id, node_id, merkle_tree_node_id, levels,
);
if self.verify_sig {
url = format!("{}&sign=true", url);
}

let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;
if self.verify_sig {
let json_response: json::SignedJsonResponse<json::MerkleTreeNodeResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: json::MerkleTreeNodeResponse =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
Ok(json_response)
}
}

pub fn traverse_node_aggregate(
Expand All @@ -138,16 +209,32 @@ impl<'a> AggregatorClient<'a> {
merkle_tree_node_id: u32,
levels: u16,
) -> Result<json::MerkleTreeNodeResponse, http::Error> {
let url = format!(
let mut url = format!(
"{}/activity/nodes/{}/traverse?eraId={}&merkleTreeNodeId={}&levels={}",
self.base_url, node_id, era_id, merkle_tree_node_id, levels,
);

if self.verify_sig {
url = format!("{}&sign=true", url);
}
let response = self.get(&url, Accept::Any)?;

let body = response.body().collect::<Vec<u8>>();
let json_response = serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;
if self.verify_sig {
let json_response: json::SignedJsonResponse<json::MerkleTreeNodeResponse> =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
if !json_response.verify() {
log::debug!("bad signature, req: {:?}, resp: {:?}", url, json_response);
return Err(http::Error::Unknown); // TODO (khssnv): more specific error.
}

Ok(json_response.payload)
} else {
let json_response: json::MerkleTreeNodeResponse =
serde_json::from_slice(&body).map_err(|_| http::Error::Unknown)?;

Ok(json_response)
}
}

fn merkle_tree_node_id_param(merkle_tree_node_id: &[u32]) -> String {
Expand Down Expand Up @@ -430,4 +517,17 @@ pub(crate) mod json {
pub number_of_puts: u64,
pub number_of_gets: u64,
}

/// Json response wrapped with a signature.
#[serde_as]
#[derive(
Debug, Serialize, Deserialize, Clone, Hash, Ord, PartialOrd, PartialEq, Eq, Encode, Decode,
)]
pub struct SignedJsonResponse<T> {
pub payload: T,
#[serde_as(as = "Base64")]
pub signer: Vec<u8>,
#[serde_as(as = "Base64")]
pub signature: Vec<u8>,
}
}
6 changes: 6 additions & 0 deletions pallets/ddc-verification/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ pub mod pallet {
type AccountIdConverter: From<Self::AccountId> + Into<AccountId32>;
type CustomerVisitor: CustomerVisitor<Self>;
type Currency: Currency<Self::AccountId>;
const VERIFY_AGGREGATOR_RESPONSE_SIGNATURE: bool;
#[cfg(feature = "runtime-benchmarks")]
type CustomerDepositor: CustomerDepositor<Self>;
#[cfg(feature = "runtime-benchmarks")]
Expand Down Expand Up @@ -2982,6 +2983,7 @@ pub mod pallet {
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
T::VERIFY_AGGREGATOR_RESPONSE_SIGNATURE,
);

match aggregate_key {
Expand Down Expand Up @@ -3018,6 +3020,7 @@ pub mod pallet {
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
T::VERIFY_AGGREGATOR_RESPONSE_SIGNATURE,
);

let response = match aggregate_key {
Expand Down Expand Up @@ -3050,6 +3053,7 @@ pub mod pallet {
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
T::VERIFY_AGGREGATOR_RESPONSE_SIGNATURE,
);

let response = client.eras()?;
Expand All @@ -3073,6 +3077,7 @@ pub mod pallet {
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
T::VERIFY_AGGREGATOR_RESPONSE_SIGNATURE,
);

let mut buckets_aggregates = Vec::new();
Expand Down Expand Up @@ -3115,6 +3120,7 @@ pub mod pallet {
&base_url,
Duration::from_millis(RESPONSE_TIMEOUT),
3,
T::VERIFY_AGGREGATOR_RESPONSE_SIGNATURE,
);

let mut nodes_aggregates = Vec::new();
Expand Down
Loading

0 comments on commit 47e61d9

Please sign in to comment.