Skip to content

Commit

Permalink
feat(fetcher-service): respond with http headers
Browse files Browse the repository at this point in the history
  • Loading branch information
matthias-wright committed Apr 25, 2024
1 parent ae4a515 commit f6c542a
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 48 deletions.
2 changes: 1 addition & 1 deletion core/handshake/src/transports/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn handler<P: ExecutorProviderInterface>(

// If the service is the javascript service. Await the first response as the possible header
// overrides and over ride the response headers
if let Service::Js = service_id {
if matches!(service_id, Service::Js | Service::Fetcher) {
let header_bytes = body_rx
.recv()
.await
Expand Down
43 changes: 1 addition & 42 deletions lib/sdk/src/header.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use anyhow::{anyhow, Result};
use anyhow::Result;
use fleek_crypto::ClientPublicKey;
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -74,44 +74,3 @@ pub enum WriteHeaderError {
Serialization(#[from] serde_cbor::Error),
Write(#[from] std::io::Error),
}

impl HttpResponse {
pub fn try_from_json(value: &serde_json::Value) -> Result<Self> {
if value["status"].is_null() {
return Err(anyhow!("Failed to parse status"));
}
let Some(status) = value["status"].as_str() else {
return Err(anyhow!("Failed to parse status"));
};
let status = status.parse::<u16>()?;
println!("status: {:?}", status);

if value["body"].is_null() {
return Err(anyhow!("Failed to parse body"));
}
let body = value["body"].to_string();
println!("body: {}", body);

if value["headers"].is_null() {
return Err(anyhow!("Failed to parse body"));
}
println!("array: {}", value["headers"].is_object());

todo!()
}
}

#[cfg(test)]
mod tests {
use crate::header::HttpResponse;

#[test]
fn test_json() {
let json_str = std::fs::read_to_string("/home/matthias/Desktop/ssr.txt").unwrap();
let value: serde_json::Value = serde_json::from_str(&json_str).unwrap();

let res = HttpResponse::try_from_json(&value).unwrap();

println!("{value}");
}
}
17 changes: 17 additions & 0 deletions lib/sdk/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,20 @@ pub async fn respond_to_client(

Ok(())
}

pub async fn respond_to_client_with_http_headers(
connection: &mut Connection,
is_http: bool,
) -> anyhow::Result<()> {
if is_http {
let header_bytes = serde_json::to_vec(&HttpOverrides::default())
.context("Failed to serializez headers")?;

// response with the headers first
connection
.write_payload(&header_bytes)
.await
.context("failed to send error headers")?;
}
Ok(())
}
16 changes: 11 additions & 5 deletions services/fetcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cid::Cid;
use fn_sdk::api::Origin as ApiOrigin;
use fn_sdk::connection::Connection;
use fn_sdk::header::TransportDetail;
use fn_sdk::http_util::{respond_to_client_with_http_headers, respond_with_error};
use tracing::{debug, error, info};
use url::Url;

Expand Down Expand Up @@ -105,22 +106,23 @@ fn parse_http_url(url: &Url) -> Option<(Origin, Bytes)> {
async fn handle_request(conn: &mut Connection, origin: Origin, uri: Bytes) -> anyhow::Result<()> {
debug!("got request for cid");

let is_http = conn.is_http_request();
// Fetch the content from the origin
let hash = match origin {
Origin::Unknown => {
let _ = conn.write_payload(b"Unknown origin").await;
respond_with_error(conn, b"Unknown origin", is_http).await?;
bail!("unknown origin");
},
Origin::Blake3 => {
if uri.len() != 32 {
let _ = conn.write_payload(b"Invalid blake3 hash").await;
respond_with_error(conn, b"Invalid blake3 hash", is_http).await?;
bail!("expected a 32 byte hash");
}

// Fetch the content from the network
let hash = *array_ref!(uri, 0, 32);
if !fn_sdk::api::fetch_blake3(hash).await {
let _ = conn.write_payload(b"Failed to fetch blake3 content").await;
respond_with_error(conn, b"Failed to fetch blake3 content", is_http).await?;
bail!("failed to fetch content");
}

Expand All @@ -129,7 +131,7 @@ async fn handle_request(conn: &mut Connection, origin: Origin, uri: Bytes) -> an
origin => {
// Fetch the content from the origin
let Some(hash) = fn_sdk::api::fetch_from_origin(origin.into(), uri).await else {
let _ = conn.write_payload(b"Failed to fetch from origin").await;
respond_with_error(conn, b"Failed to fetch from origin", is_http).await?;
bail!("failed to fetch from origin");
};
hash
Expand All @@ -140,20 +142,24 @@ async fn handle_request(conn: &mut Connection, origin: Origin, uri: Bytes) -> an

// Get the content from the blockstore
let Ok(content_handle) = fn_sdk::blockstore::ContentHandle::load(&hash).await else {
// TODO(matthias): why aren't we sending an error to the handshake here?
bail!("failed to load content handle from the blockstore");
};

debug!("got content handle");

// Only write block count for non-HTTP transports.
if !conn.is_http_request() {
if !is_http {
let bytes = (content_handle.len() as u32).to_be_bytes();
if let Err(e) = conn.write_payload(bytes.as_slice()).await {
bail!("failed to send number of blocks: {e}");
}
debug!("sent block count {}", content_handle.len());
}

// Respond with header before streaming the body
respond_to_client_with_http_headers(conn, is_http).await?;

for block in 0..content_handle.len() {
let Ok(bytes) = content_handle.read(block).await else {
bail!("failed to read content from the blockstore :(");
Expand Down

0 comments on commit f6c542a

Please sign in to comment.