Skip to content
This repository has been archived by the owner on Feb 6, 2021. It is now read-only.

Propigate close event when socket closes #22

Merged
merged 1 commit into from
Jan 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/socket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-socket"
version = "0.5.0"
version = "0.5.1"
edition = "2018"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Provide TCP socket wrapper for fluvio protocol"
Expand Down
3 changes: 3 additions & 0 deletions crates/socket/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ pub enum FlvSocketError {
#[from]
source: IoError,
},
#[error("Socket closed")]
SocketClosed,

#[error("Zero-copy IO error")]
SendFileError {
#[from]
Expand Down
57 changes: 38 additions & 19 deletions crates/socket/src/multiplexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ enum SharedSender {
/// Serial socket
Serial(SharedMsg),
/// Batch Socket
Queue(Sender<BytesMut>),
Queue(Sender<Option<BytesMut>>),
}

type Senders = Arc<Mutex<HashMap<i32, SharedSender>>>;
Expand Down Expand Up @@ -248,7 +248,7 @@ where
#[pin_project(PinnedDrop)]
pub struct AsyncResponse<R> {
#[pin]
receiver: Receiver<BytesMut>,
receiver: Receiver<Option<BytesMut>>,
header: RequestHeader,
correlation_id: i32,
data: PhantomData<R>,
Expand All @@ -258,7 +258,7 @@ pub struct AsyncResponse<R> {
impl<R> PinnedDrop for AsyncResponse<R> {
fn drop(self: Pin<&mut Self>) {
self.receiver.close();
debug!("multiplexor stream: {} closed", self.correlation_id);
debug!("multiplexer stream: {} closed", self.correlation_id);
}
}

Expand All @@ -274,17 +274,24 @@ impl<R: Request> Stream for AsyncResponse<R> {
)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = match this.receiver.poll_next(cx) {
let next: Option<Option<_>> = match this.receiver.poll_next(cx) {
Poll::Pending => {
trace!("Waiting for async response");
return Poll::Pending;
}
Poll::Ready(next) => next,
};

let bytes = match next {
Some(bytes) => bytes,
None => return Poll::Ready(None),
let bytes = if let Some(bytes) = next {
bytes
} else {
return Poll::Ready(None);
};

let bytes = if let Some(bytes) = bytes {
bytes
} else {
return Poll::Ready(Some(Err(FlvSocketError::SocketClosed)));
};

let mut cursor = Cursor::new(&bytes);
Expand Down Expand Up @@ -350,6 +357,16 @@ impl MultiPlexingResponseDispatcher {
}
} else {
debug!("dispatcher: inner stream has terminated ");

let guard = self.senders.lock().await;
for sender in guard.values() {
match sender {
SharedSender::Serial(_) => {},
SharedSender::Queue(stream_sender) => {
let _ = stream_sender.send(None).await;
}
}
}
break;
}
},
Expand All @@ -367,7 +384,7 @@ impl MultiPlexingResponseDispatcher {
}
}

debug!("multiplexor terminated");
debug!("multiplexer terminated");
break;

}
Expand Down Expand Up @@ -400,16 +417,18 @@ impl MultiPlexingResponseDispatcher {
.into()),
}
}
SharedSender::Queue(queue_sender) => queue_sender.send(msg).await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!(
"problem sending to queue socket: {}, err: {}",
correlation_id, err
),
)
.into()
}),
SharedSender::Queue(queue_sender) => {
queue_sender.send(Some(msg)).await.map_err(|err| {
IoError::new(
ErrorKind::BrokenPipe,
format!(
"problem sending to queue socket: {}, err: {}",
correlation_id, err
),
)
.into()
})
}
}
} else {
Err(IoError::new(
Expand Down Expand Up @@ -810,7 +829,7 @@ mod tests {
#[test_async]
async fn test_multiplexing_native_tls() -> Result<(), FlvSocketError> {
debug!("start testing");
let addr = "127.0.0.1:6000";
let addr = "127.0.0.1:6001";

let _r = join(
test_client(addr, TlsConnectorHandler::new()),
Expand Down