From 0998ffa1b776434bb7de6e284e00f1787ce8e7c5 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 13 Nov 2024 21:17:15 -0800 Subject: [PATCH 1/4] fix: finish reading handshake on lazyConn close --- lazyClient.go | 16 +++++++++ multistream_test.go | 81 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/lazyClient.go b/lazyClient.go index 6145eaf..7edb71e 100644 --- a/lazyClient.go +++ b/lazyClient.go @@ -147,6 +147,22 @@ func (l *lazyClientConn[T]) Close() error { // closed the stream for reading. I mean, we're the initiator so that's // strange... but it's still allowed _ = l.Flush() + + // Finish reading the handshake before we close the connection/stream. This + // is necessary so that the other side can finish sending its response to our + // multistream header before we tell it we are done reading. + // + // Example: + // We open a QUIC stream, write the protocol `/a`, send 1 byte of application + // data, and immediately close. + // + // This can result in a single packet that contains the stream data along + // with a STOP_SENDING frame. The other side may be unable to negotiate + // multistream select since it can't write to the stream anymore and may + // drop the stream. + // + // Note: We currently handle this case in Go, but rust-libp2p does not. + l.rhandshakeOnce.Do(l.doReadHandshake) return l.con.Close() } diff --git a/multistream_test.go b/multistream_test.go index 199dc60..c3ed30d 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -9,6 +9,7 @@ import ( "net" "sort" "strings" + "sync/atomic" "testing" "time" ) @@ -801,6 +802,86 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) { } } +func newPair() (*chanPipe, *chanPipe) { + a := make(chan []byte, 16) + b := make(chan []byte, 16) + aReadClosed := atomic.Bool{} + bReadClosed := atomic.Bool{} + return &chanPipe{r: a, w: b, myReadClosed: &aReadClosed, peerReadClosed: &bReadClosed}, + &chanPipe{r: b, w: a, myReadClosed: &bReadClosed, peerReadClosed: &aReadClosed} +} + +type chanPipe struct { + r, w chan []byte + buf bytes.Buffer + + myReadClosed *atomic.Bool + peerReadClosed *atomic.Bool +} + +func (cp *chanPipe) Read(b []byte) (int, error) { + if cp.buf.Len() > 0 { + return cp.buf.Read(b) + } + + buf, ok := <-cp.r + if !ok { + return 0, io.EOF + } + + cp.buf.Write(buf) + return cp.buf.Read(b) +} + +func (cp *chanPipe) Write(b []byte) (int, error) { + if cp.peerReadClosed.Load() { + panic("peer's read side closed") + } + cp.w <- b + return len(b), nil +} + +func (cp *chanPipe) Close() error { + cp.myReadClosed.Store(true) + close(cp.w) + return nil +} + +func TestReadHandshakeOnClose(t *testing.T) { + rw1, rw2 := newPair() + + clientDone := make(chan struct{}) + go func() { + l1 := NewMSSelect(rw1, "a") + _, _ = l1.Write([]byte("hello")) + _ = l1.Close() + close(clientDone) + }() + + serverDone := make(chan error) + + server := NewMultistreamMuxer[string]() + server.AddHandler("a", func(protocol string, rwc io.ReadWriteCloser) error { + _, err := io.ReadAll(rwc) + rwc.Close() + serverDone <- err + return nil + }) + + p, h, err := server.Negotiate(rw2) + if err != nil { + t.Fatal(err) + } + + go h(p, rw2) + + err = <-serverDone + if err != nil { + t.Fatal(err) + } + <-clientDone +} + type rwc struct { *strings.Reader } From 8d297f1361ed6ec262b48d99c94c0f3ea49adabd Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 14 Nov 2024 10:58:30 -0800 Subject: [PATCH 2/4] Add link to fix PR Co-authored-by: sukun --- lazyClient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lazyClient.go b/lazyClient.go index 7edb71e..b72647a 100644 --- a/lazyClient.go +++ b/lazyClient.go @@ -161,7 +161,7 @@ func (l *lazyClientConn[T]) Close() error { // multistream select since it can't write to the stream anymore and may // drop the stream. // - // Note: We currently handle this case in Go, but rust-libp2p does not. + // Note: We currently handle this case in Go(https://github.com/multiformats/go-multistream/pull/87), but rust-libp2p does not. l.rhandshakeOnce.Do(l.doReadHandshake) return l.con.Close() } From 60c874b4baf50e018819a2e9e218bfbae66b5c09 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 14 Nov 2024 11:05:23 -0800 Subject: [PATCH 3/4] avoid rare (incorrect?) data race error in test --- multistream_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/multistream_test.go b/multistream_test.go index c3ed30d..9828ac4 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -837,7 +837,9 @@ func (cp *chanPipe) Write(b []byte) (int, error) { if cp.peerReadClosed.Load() { panic("peer's read side closed") } - cp.w <- b + copied := make([]byte, len(b)) + copy(copied, b) + cp.w <- copied return len(b), nil } From 3149f4bf83f57e021de3d330e0244432649811b3 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Thu, 14 Nov 2024 11:44:36 -0800 Subject: [PATCH 4/4] Update comment --- lazyClient.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lazyClient.go b/lazyClient.go index b72647a..44a7d08 100644 --- a/lazyClient.go +++ b/lazyClient.go @@ -134,9 +134,7 @@ func (l *lazyClientConn[T]) Write(b []byte) (int, error) { return l.con.Write(b) } -// Close closes the underlying io.ReadWriteCloser -// -// This does not flush anything. +// Close closes the underlying io.ReadWriteCloser after finishing the handshake. func (l *lazyClientConn[T]) Close() error { // As the client, we flush the handshake on close to cover an // interesting edge-case where the server only speaks a single protocol