Skip to content

Commit

Permalink
fix: boxed stream body size hint
Browse files Browse the repository at this point in the history
  • Loading branch information
fundon committed Dec 18, 2023
1 parent 7465bff commit e0084af
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.cargo
target
Cargo.lock

rustc-ice-*.txt
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ hyper = { version = "1", features = ["server"] }
hyper-util = { version = "0.1", features = ["server-auto", "tokio"] }

futures-util = "0.3"
rustls-pemfile = "1.0"
tokio = { version = "1.35", features = ["net"] }
tokio-tungstenite = "0.21"
tokio-native-tls = "0.3"
tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-tungstenite = "0.21"
tokio-util = "0.7"
rustls-pemfile = "1.0"
tokio-rustls = "0.24"
tokio-native-tls = "0.3"

anyhow = "1.0"
mime = "0.3"
Expand Down
5 changes: 3 additions & 2 deletions viz-core/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Body for OutgoingBody {
match self.get_mut() {
Self::Empty => Poll::Ready(None),
Self::Full(full) => Pin::new(full).poll_frame(cx).map_err(Error::from),
Self::Boxed(body) => Pin::new(body.get_mut()).poll_frame(cx),
Self::Boxed(body) => Pin::new(body).get_pin_mut().poll_frame(cx),
}
}

Expand Down Expand Up @@ -198,7 +198,8 @@ impl Stream for OutgoingBody {
Self::Full(full) => Pin::new(full)
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
Self::Boxed(wrapper) => Pin::new(wrapper.get_mut())
Self::Boxed(wrapper) => Pin::new(wrapper)
.get_pin_mut()
.poll_frame(cx)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?,
} {
Expand Down
22 changes: 12 additions & 10 deletions viz-test/tests/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn outgoing_body() -> Result<()> {
assert!(empty.frame().await.is_none());
assert!(empty.frame().await.is_none());

let mut full_none = OutgoingBody::Full(Full::new(Bytes::new()));
let mut full_none = OutgoingBody::from(Full::new(Bytes::new()));
assert!(full_none.is_end_stream());
let size_hint = full_none.size_hint();
assert_eq!(size_hint.lower(), 0);
Expand All @@ -154,7 +154,7 @@ async fn outgoing_body() -> Result<()> {
assert!(full_none.frame().await.is_none());
assert!(full_none.frame().await.is_none());

let mut full_some = OutgoingBody::<Bytes>::Full(Full::new(Bytes::from(vec![1, 0, 2, 4])));
let mut full_some = OutgoingBody::from(Full::new(Bytes::from(vec![1, 0, 2, 4])));
assert!(!full_some.is_end_stream());
let size_hint = full_some.size_hint();
assert_eq!(size_hint.lower(), 4);
Expand All @@ -178,19 +178,21 @@ async fn outgoing_body() -> Result<()> {

let mut boxed: OutgoingBody =
UnsyncBoxBody::new(Full::new(Bytes::new()).map_err(Into::into)).into();
assert!(boxed.is_end_stream());
assert_eq!(boxed.is_end_stream(), false);
// boxed stream uses default size
let size_hint = boxed.size_hint();
assert_eq!(size_hint.lower(), 0);
assert_eq!(size_hint.upper(), Some(0));
assert_eq!(size_hint.upper(), None);
assert_eq!(&format!("{boxed:?}"), r"Boxed(SyncWrapper)");
assert!(boxed.frame().await.is_none());

let mut boxed: OutgoingBody =
UnsyncBoxBody::new(Full::new(Bytes::from(vec![2, 0, 4, 8])).map_err(Into::into)).into();
assert!(!boxed.is_end_stream());
// boxed stream uses default size
let size_hint = boxed.size_hint();
assert_eq!(size_hint.lower(), 4);
assert_eq!(size_hint.upper(), Some(4));
assert_eq!(size_hint.lower(), 0);
assert_eq!(size_hint.upper(), None);
assert_eq!(&format!("{boxed:?}"), r"Boxed(SyncWrapper)");
assert_eq!(
boxed
Expand Down Expand Up @@ -219,7 +221,7 @@ async fn outgoing_stream() -> Result<()> {
reader.read_to_end(&mut buf).await?;
assert!(buf.is_empty());

let full_none = OutgoingBody::Full(Full::new(Bytes::new()));
let full_none = OutgoingBody::from(Full::new(Bytes::new()));
assert_eq!(full_none.size_hint(), (0, Some(0)));
let mut reader = full_none.into_async_read();
let mut buf = Vec::new();
Expand All @@ -234,17 +236,17 @@ async fn outgoing_stream() -> Result<()> {

let boxed: OutgoingBody =
UnsyncBoxBody::new(Full::new(Bytes::new()).map_err(Into::into)).into();
assert_eq!(boxed.size_hint(), (0, Some(0)));
assert_eq!(boxed.size_hint(), (0, None));
let mut reader = boxed.into_async_read();
let mut buf = Vec::new();
reader.read_to_end(&mut buf).await?;
assert!(buf.is_empty());

let mut boxed: OutgoingBody =
UnsyncBoxBody::new(Full::new(Bytes::from(vec![2, 0, 4, 8])).map_err(Into::into)).into();
assert_eq!(boxed.size_hint(), (4, Some(4)));
assert_eq!(boxed.size_hint(), (0, None));
assert_eq!(boxed.next().await.unwrap().unwrap(), vec![2, 0, 4, 8]);
assert_eq!(boxed.size_hint(), (0, Some(0)));
assert_eq!(boxed.size_hint(), (0, None));
assert!(boxed.next().await.is_none());

Ok(())
Expand Down

0 comments on commit e0084af

Please sign in to comment.