Skip to content

Commit

Permalink
fix: reuse buffered write duplex transport
Browse files Browse the repository at this point in the history
  • Loading branch information
ii64 committed Dec 25, 2023
1 parent 54b7a1e commit 7466afc
Showing 1 changed file with 56 additions and 42 deletions.
98 changes: 56 additions & 42 deletions volo/src/net/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ impl HttpMeta {
inner: self.is_flushed.clone(),
}
}
pub fn reset_flushed(&self) {
*self.is_flushed.lock().unwrap() = false;
}
}

#[pin_project]
Expand Down Expand Up @@ -196,58 +199,69 @@ impl HttpTransport {
.with_meta(HttpMeta::default()
.with_address(addr));

let meta = cs.meta.clone();
let client = self.build_client();
let headers = self.get_headers();
let meta = cs.meta.clone();

tokio::spawn(async move {
let mut payload = Vec::with_capacity(WINDOW_SIZE);
let url = meta.get_url().unwrap();

meta.wait_flushed().await;
match sc.read_buf(&mut payload).await {
Ok(siz) => {
if siz == 0 {
eprintln!("got transport error EOF");
return;
}
// println!("rpc_payload = {:?}", payload);
// println!("headers = {:?}", headers);
let mut req = client.post(url.to_string());
if let Some(headers) = headers {
let headers = headers
.iter()
.map(|(key, val)| (key.to_string(), val.to_str().unwrap().to_string()))
.collect::<HashMap<String, String>>();

let headers: reqwest::header::HeaderMap = (&headers)
.try_into()
.expect("valid headers");
req = req.headers(headers);
}
let req = req.body(reqwest::Body::from(payload))
.build()
.unwrap()
;
let resp = client.execute(req).await;
match resp {
Ok(mut resp) => {
while let Ok(Some(chunk)) = resp.chunk().await {
if let Err(e) = sc.write_all(&chunk).await {
eprintln!("got transport error response download {:#?}", e);
// let mut i = 0;
loop {
// println!("reqnr {i}"); i += 1;
let mut payload = Vec::with_capacity(WINDOW_SIZE);
meta.wait_flushed().await;
meta.reset_flushed();
match sc.read_buf(&mut payload).await {
Ok(siz) => {
if siz == 0 {
eprintln!("got transport error EOF");
return;
}
let mut req = client.post(url.to_string());
if let Some(headers) = &headers {
let headers = headers
.iter()
.map(|(key, val)| (key.to_string(), val.to_str().unwrap().to_string()))
.collect::<HashMap<String, String>>();

let headers: reqwest::header::HeaderMap = (&headers)
.try_into()
.expect("valid headers");
req = req.headers(headers);
}
// println!("rpc_payload = {:?}", payload);
// println!("headers = {:?}", headers);
let req = req.body(reqwest::Body::from(payload))
.build()
.unwrap()
;
let resp = client.execute(req).await;
match resp {
Ok(mut resp) => {
if resp.status() != reqwest::StatusCode::OK {
eprintln!("got transport response not OK");
return;
}
// check for content-type?

while let Ok(Some(chunk)) = resp.chunk().await {
if let Err(e) = sc.write(&chunk).await {
eprintln!("got transport error response download {:#?}", e);
return;
}
}
sc.flush().await.unwrap();
}
Err(e) => {
eprintln!("got transport error transmit {:#?}", e);
return;
}
}
Err(e) => {
eprintln!("got transport error transmit {:#?}", e);
return;
}
}
}
Err(e) => {
eprintln!("got transport error {:#?}", e);
return;
Err(e) => {
eprintln!("got transport error {:#?}", e);
return;
}
}
}
});
Expand Down

0 comments on commit 7466afc

Please sign in to comment.