Skip to content

Commit

Permalink
feat: daemon testing (#303)
Browse files Browse the repository at this point in the history
* first commit

* first commit

* first commit

* resolve image fetch from docker compose
  • Loading branch information
jorgeantonio21 authored Dec 26, 2024
1 parent 05544e4 commit ec36862
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 50 deletions.
4 changes: 2 additions & 2 deletions atoma-daemon/docs/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ components:
- task_small_id
- selected_node_id
- num_compute_units
- price
- price_per_one_million_compute_units
- already_computed_units
- in_settle_period
- total_hash
Expand All @@ -900,7 +900,7 @@ components:
owner_address:
type: string
description: Address of the owner of the stack
price:
price_per_one_million_compute_units:
type: integer
format: int64
description: Price of the stack (likely in smallest currency unit)
Expand Down
89 changes: 44 additions & 45 deletions atoma-service/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ pub async fn signature_verification_middleware(
next: Next,
) -> Result<Response, StatusCode> {
let (mut req_parts, req_body) = req.into_parts();
tracing::info!("Request parts: {:?}", req_parts);

let base64_signature = req_parts
.headers
.get(atoma_utils::constants::SIGNATURE)
Expand All @@ -243,22 +243,20 @@ pub async fn signature_verification_middleware(
})?;
let body_bytes = axum::body::to_bytes(req_body, MAX_BODY_SIZE)
.await
.map_err(|_| {
error!("Failed to convert body to bytes");
.map_err(|e| {
error!("Failed to convert body to bytes, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let body_json: Value = serde_json::from_slice(&body_bytes).map_err(|_| {
error!("Failed to parse body as JSON");
let body_json: Value = serde_json::from_slice(&body_bytes).map_err(|e| {
error!("Failed to parse body as JSON, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let body_blake2b_hash = blake2b_hash(body_json.to_string().as_bytes());
let body_blake2b_hash_bytes: [u8; 32] = body_blake2b_hash
.as_slice()
.try_into()
.expect("Invalid Blake2b hash length");

verify_signature(base64_signature, &body_blake2b_hash_bytes)?;

let request_metadata = req_parts
.extensions
.get::<RequestMetadata>()
Expand Down Expand Up @@ -343,12 +341,12 @@ pub async fn verify_stack_permissions(
StatusCode::BAD_REQUEST
})?
.to_str()
.map_err(|_| {
error!("Failed to convert signature to string");
.map_err(|e| {
error!("Failed to convert signature to string, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let signature = Signature::from_str(base64_signature).map_err(|_| {
error!("Failed to parse signature");
let signature = Signature::from_str(base64_signature).map_err(|e| {
error!("Failed to parse signature, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let public_key_bytes = signature.public_key_bytes();
Expand All @@ -367,23 +365,23 @@ pub async fn verify_stack_permissions(
})?;
let stack_small_id = stack_small_id
.to_str()
.map_err(|_| {
error!("Stack small ID cannot be converted to a string");
.map_err(|e| {
error!("Stack small ID cannot be converted to a string, with error: {e}");
StatusCode::BAD_REQUEST
})?
.parse::<i64>()
.map_err(|_| {
error!("Stack small ID is not a valid integer");
.map_err(|e| {
error!("Stack small ID is not a valid integer, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let body_bytes = axum::body::to_bytes(req_body, MAX_BODY_SIZE)
.await
.map_err(|_| {
error!("Failed to convert body to bytes");
.map_err(|e| {
error!("Failed to convert body to bytes, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let body_json: Value = serde_json::from_slice(&body_bytes).map_err(|_| {
error!("Failed to parse body as JSON");
let body_json: Value = serde_json::from_slice(&body_bytes).map_err(|e| {
error!("Failed to parse body as JSON, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let model = body_json
Expand Down Expand Up @@ -422,15 +420,12 @@ pub async fn verify_stack_permissions(
})?;
let available_stack = result_receiver
.await
.map_err(|_| {
error!("Failed to get available stack with enough compute units");
.map_err(|e| {
error!("Failed to get available stack with enough compute units, with error: {e}");
StatusCode::UNAUTHORIZED
})?
.map_err(|err| {
error!(
"Failed to get available stack with enough compute units: {}",
err
);
error!("Failed to get available stack with enough compute units, with error: {err}");
StatusCode::UNAUTHORIZED
})?;
if available_stack.is_none() {
Expand All @@ -442,8 +437,8 @@ pub async fn verify_stack_permissions(
StatusCode::BAD_REQUEST
})?
.to_str()
.map_err(|_| {
error!("Tx digest cannot be converted to a string");
.map_err(|e| {
error!("Tx digest cannot be converted to a string, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let tx_digest = TransactionDigest::from_str(tx_digest_str).unwrap();
Expand Down Expand Up @@ -532,12 +527,12 @@ pub async fn confidential_compute_middleware(
error!("Salt header not found");
StatusCode::BAD_REQUEST
})?;
let salt_str = salt.to_str().map_err(|_| {
error!("Salt cannot be converted to a string");
let salt_str = salt.to_str().map_err(|e| {
error!("Salt cannot be converted to a string, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let salt_bytes = STANDARD.decode(salt_str).map_err(|_| {
error!("Failed to decode salt from base64 encoding");
let salt_bytes = STANDARD.decode(salt_str).map_err(|e| {
error!("Failed to decode salt from base64 encoding, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let salt_bytes: [u8; SALT_SIZE] = salt_bytes.try_into().map_err(|e| {
Expand All @@ -554,12 +549,12 @@ pub async fn confidential_compute_middleware(
error!("Nonce header not found");
StatusCode::BAD_REQUEST
})?;
let nonce_str = nonce.to_str().map_err(|_| {
error!("Nonce cannot be converted to a string");
let nonce_str = nonce.to_str().map_err(|e| {
error!("Nonce cannot be converted to a string, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let nonce_bytes = STANDARD.decode(nonce_str).map_err(|_| {
error!("Failed to decode nonce from base64 encoding");
let nonce_bytes = STANDARD.decode(nonce_str).map_err(|e| {
error!("Failed to decode nonce from base64 encoding, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let nonce_bytes: [u8; NONCE_SIZE] = nonce_bytes.try_into().map_err(|e| {
Expand All @@ -576,8 +571,10 @@ pub async fn confidential_compute_middleware(
error!("Diffie-Hellman public key header not found");
StatusCode::BAD_REQUEST
})?;
let proxy_x25519_public_key_bytes: [u8; DH_PUBLIC_KEY_SIZE] = STANDARD.decode(proxy_x25519_public_key).map_err(|_| {
error!("Failed to decode Proxy X25519 public key from base64 encoding");
let proxy_x25519_public_key_bytes: [u8; DH_PUBLIC_KEY_SIZE] = STANDARD
.decode(proxy_x25519_public_key)
.map_err(|e| {
error!("Failed to decode Proxy X25519 public key from base64 encoding, with error: {e}");
StatusCode::BAD_REQUEST
})?
.try_into()
Expand All @@ -592,8 +589,10 @@ pub async fn confidential_compute_middleware(
error!("Node X25519 public key header not found");
StatusCode::BAD_REQUEST
})?;
let node_x25519_public_key_bytes: [u8; DH_PUBLIC_KEY_SIZE] = STANDARD.decode(node_x25519_public_key).map_err(|_| {
error!("Failed to decode Node X25519 public key from base64 encoding");
let node_x25519_public_key_bytes: [u8; DH_PUBLIC_KEY_SIZE] = STANDARD
.decode(node_x25519_public_key)
.map_err(|e| {
error!("Failed to decode Node X25519 public key from base64 encoding, with error: {e}");
StatusCode::BAD_REQUEST
})?
.try_into()
Expand All @@ -603,8 +602,8 @@ pub async fn confidential_compute_middleware(
})?;
let body_bytes = axum::body::to_bytes(req_body, 1024 * MAX_BODY_SIZE)
.await
.map_err(|_| {
error!("Failed to convert body to bytes");
.map_err(|e| {
error!("Failed to convert body to bytes, with error: {e}");
StatusCode::BAD_REQUEST
})?;
// Convert body bytes to string since it was created with .to_string() on the client
Expand All @@ -613,8 +612,8 @@ pub async fn confidential_compute_middleware(
StatusCode::BAD_REQUEST
})?;

let body_json: Value = serde_json::from_str(&body_str).map_err(|_| {
error!("Failed to parse body as JSON");
let body_json: Value = serde_json::from_str(&body_str).map_err(|e| {
error!("Failed to parse body as JSON, with error: {e}");
StatusCode::BAD_REQUEST
})?;
let ciphertext_bytes = parse_json_byte_array(&body_json, atoma_utils::constants::CIPHERTEXT)
Expand All @@ -634,8 +633,8 @@ pub async fn confidential_compute_middleware(
error!("Failed to send confidential compute request");
StatusCode::INTERNAL_SERVER_ERROR
})?;
let result = result_receiver.await.map_err(|_| {
error!("Failed to receive confidential compute response");
let result = result_receiver.await.map_err(|e| {
error!("Failed to receive confidential compute response, with error: {e}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
match result {
Expand Down
23 changes: 22 additions & 1 deletion atoma-service/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub async fn register_on_proxy(
) -> anyhow::Result<()> {
let client = Client::new();
let url = format!("{}/node/registration", config.proxy_address);
tracing::info!(
target = "atoma-service",
event = "register_on_proxy",
url = url,
"Registering on proxy server"
);

let body = json!({
"node_small_id": node_small_id,
Expand All @@ -38,9 +44,24 @@ pub async fn register_on_proxy(
.header(SIGNATURE, signature)
.json(&body)
.send()
.await?;
.await
.map_err(|e| {
tracing::error!(
target = "atoma-service",
event = "register_on_proxy_error",
error = ?e,
"Failed to register on proxy server"
);
anyhow::anyhow!("Failed to register on proxy server: {}", e)
})?;

if !res.status().is_success() {
tracing::error!(
target = "atoma-service",
event = "register_on_proxy_error",
error = ?res.status(),
"Failed to register on proxy server"
);
anyhow::bail!("Failed to register on proxy server: {}", res.status());
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions atoma-service/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,8 @@ impl Stream for Streamer {
return Poll::Ready(None);
}
let mut chunk = serde_json::from_str::<Value>(chunk_str).map_err(|e| {
error!("Error parsing chunk: {}", e);
Error::new(format!("Error parsing chunk: {}", e))
error!("Error parsing chunk {chunk_str}: {}", e);
Error::new(format!("Error parsing chunk {chunk_str}: {}", e))
})?;

// Observe the first token generation timer
Expand Down

0 comments on commit ec36862

Please sign in to comment.