Skip to content

Commit

Permalink
feat(request-response): don't close connection on stream errors
Browse files Browse the repository at this point in the history
Related: #3591.

Pull-Request: #3913.
  • Loading branch information
thomaseizinger authored May 15, 2023
1 parent 8c00dc9 commit 1742bff
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 20 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion examples/file-sharing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ either = "1.8"
env_logger = "0.10"
futures = "0.3.28"
libp2p = { path = "../../libp2p", features = ["async-std", "dns", "kad", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] }
multiaddr = { version = "0.17.1" }
multiaddr = { version = "0.17.1" }
void = "1.0.2"
4 changes: 2 additions & 2 deletions examples/file-sharing/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use libp2p::{
multiaddr::Protocol,
noise,
request_response::{self, ProtocolSupport, RequestId, ResponseChannel},
swarm::{NetworkBehaviour, StreamUpgradeError, Swarm, SwarmBuilder, SwarmEvent},
swarm::{NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent},
tcp, yamux, PeerId, Transport,
};

Expand Down Expand Up @@ -216,7 +216,7 @@ impl EventLoop {

async fn handle_event(
&mut self,
event: SwarmEvent<ComposedEvent, Either<StreamUpgradeError<io::Error>, io::Error>>,
event: SwarmEvent<ComposedEvent, Either<void::Void, io::Error>>,
) {
match event {
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
Expand Down
5 changes: 5 additions & 0 deletions protocols/request-response/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
These variants are no longer constructed.
See [PR 3605].

- Don't close connections if individual streams fail.
Log the error instead.
See [PR 3913].

[PR 3605]: https://github.com/libp2p/rust-libp2p/pull/3605
[PR 3715]: https://github.com/libp2p/rust-libp2p/pull/3715
[PR 3702]: https://github.com/libp2p/rust-libp2p/pull/3702
[PR 3913]: https://github.com/libp2p/rust-libp2p/pull/3913

## 0.24.1

Expand Down
2 changes: 2 additions & 0 deletions protocols/request-response/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ libp2p-swarm = { workspace = true }
libp2p-identity = { workspace = true }
rand = "0.8"
smallvec = "1.6.1"
void = "1.0.2"
log = "0.4.17"

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
26 changes: 9 additions & 17 deletions protocols/request-response/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_swarm::{
use smallvec::SmallVec;
use std::{
collections::VecDeque,
fmt, io,
fmt,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -65,8 +65,6 @@ where
substream_timeout: Duration,
/// The current connection keep-alive.
keep_alive: KeepAlive,
/// A pending fatal error that results in the connection being closed.
pending_error: Option<StreamUpgradeError<io::Error>>,
/// Queue of events to emit in `poll()`.
pending_events: VecDeque<Event<TCodec>>,
/// Outbound upgrades waiting to be emitted as an `OutboundSubstreamRequest`.
Expand Down Expand Up @@ -107,7 +105,6 @@ where
outbound: VecDeque::new(),
inbound: FuturesUnordered::new(),
pending_events: VecDeque::new(),
pending_error: None,
inbound_request_id,
}
}
Expand Down Expand Up @@ -151,21 +148,22 @@ where
self.pending_events
.push_back(Event::OutboundUnsupportedProtocols(info));
}
_ => {
// Anything else is considered a fatal error or misbehaviour of
// the remote peer and results in closing the connection.
self.pending_error = Some(error);
StreamUpgradeError::Apply(e) => {
log::debug!("outbound stream {info} failed: {e}");
}
StreamUpgradeError::Io(e) => {
log::debug!("outbound stream {info} failed: {e}");
}
}
}
fn on_listen_upgrade_error(
&mut self,
ListenUpgradeError { error, .. }: ListenUpgradeError<
ListenUpgradeError { error, info }: ListenUpgradeError<
<Self as ConnectionHandler>::InboundOpenInfo,
<Self as ConnectionHandler>::InboundProtocol,
>,
) {
self.pending_error = Some(StreamUpgradeError::Apply(error));
log::debug!("inbound stream {info} failed: {error}");
}
}

Expand Down Expand Up @@ -241,7 +239,7 @@ where
{
type FromBehaviour = RequestProtocol<TCodec>;
type ToBehaviour = Event<TCodec>;
type Error = StreamUpgradeError<io::Error>;
type Error = void::Void;
type InboundProtocol = ResponseProtocol<TCodec>;
type OutboundProtocol = RequestProtocol<TCodec>;
type OutboundOpenInfo = RequestId;
Expand Down Expand Up @@ -296,12 +294,6 @@ where
) -> Poll<
ConnectionHandlerEvent<RequestProtocol<TCodec>, RequestId, Self::ToBehaviour, Self::Error>,
> {
// Check for a pending (fatal) error.
if let Some(err) = self.pending_error.take() {
// The handler will not be polled again by the `Swarm`.
return Poll::Ready(ConnectionHandlerEvent::Close(err));
}

// Drain pending events.
if let Some(event) = self.pending_events.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::Custom(event));
Expand Down

0 comments on commit 1742bff

Please sign in to comment.