From 5d68ffdde9fbe3e431b02e6b9e0a861cb8f34bfa Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Wed, 13 Nov 2024 21:17:15 -0800 Subject: [PATCH] fix: finish reading handshake on lazyConn close --- lazyClient.go | 20 +++++++++-- multistream_test.go | 83 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/lazyClient.go b/lazyClient.go index 6145eaf..44a7d08 100644 --- a/lazyClient.go +++ b/lazyClient.go @@ -134,9 +134,7 @@ func (l *lazyClientConn[T]) Write(b []byte) (int, error) { return l.con.Write(b) } -// Close closes the underlying io.ReadWriteCloser -// -// This does not flush anything. +// Close closes the underlying io.ReadWriteCloser after finishing the handshake. func (l *lazyClientConn[T]) Close() error { // As the client, we flush the handshake on close to cover an // interesting edge-case where the server only speaks a single protocol @@ -147,6 +145,22 @@ func (l *lazyClientConn[T]) Close() error { // closed the stream for reading. I mean, we're the initiator so that's // strange... but it's still allowed _ = l.Flush() + + // Finish reading the handshake before we close the connection/stream. This + // is necessary so that the other side can finish sending its response to our + // multistream header before we tell it we are done reading. + // + // Example: + // We open a QUIC stream, write the protocol `/a`, send 1 byte of application + // data, and immediately close. + // + // This can result in a single packet that contains the stream data along + // with a STOP_SENDING frame. The other side may be unable to negotiate + // multistream select since it can't write to the stream anymore and may + // drop the stream. + // + // Note: We currently handle this case in Go(https://github.com/multiformats/go-multistream/pull/87), but rust-libp2p does not. + l.rhandshakeOnce.Do(l.doReadHandshake) return l.con.Close() } diff --git a/multistream_test.go b/multistream_test.go index 199dc60..9828ac4 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -9,6 +9,7 @@ import ( "net" "sort" "strings" + "sync/atomic" "testing" "time" ) @@ -801,6 +802,88 @@ func TestNegotiatePeerSendsAndCloses(t *testing.T) { } } +func newPair() (*chanPipe, *chanPipe) { + a := make(chan []byte, 16) + b := make(chan []byte, 16) + aReadClosed := atomic.Bool{} + bReadClosed := atomic.Bool{} + return &chanPipe{r: a, w: b, myReadClosed: &aReadClosed, peerReadClosed: &bReadClosed}, + &chanPipe{r: b, w: a, myReadClosed: &bReadClosed, peerReadClosed: &aReadClosed} +} + +type chanPipe struct { + r, w chan []byte + buf bytes.Buffer + + myReadClosed *atomic.Bool + peerReadClosed *atomic.Bool +} + +func (cp *chanPipe) Read(b []byte) (int, error) { + if cp.buf.Len() > 0 { + return cp.buf.Read(b) + } + + buf, ok := <-cp.r + if !ok { + return 0, io.EOF + } + + cp.buf.Write(buf) + return cp.buf.Read(b) +} + +func (cp *chanPipe) Write(b []byte) (int, error) { + if cp.peerReadClosed.Load() { + panic("peer's read side closed") + } + copied := make([]byte, len(b)) + copy(copied, b) + cp.w <- copied + return len(b), nil +} + +func (cp *chanPipe) Close() error { + cp.myReadClosed.Store(true) + close(cp.w) + return nil +} + +func TestReadHandshakeOnClose(t *testing.T) { + rw1, rw2 := newPair() + + clientDone := make(chan struct{}) + go func() { + l1 := NewMSSelect(rw1, "a") + _, _ = l1.Write([]byte("hello")) + _ = l1.Close() + close(clientDone) + }() + + serverDone := make(chan error) + + server := NewMultistreamMuxer[string]() + server.AddHandler("a", func(protocol string, rwc io.ReadWriteCloser) error { + _, err := io.ReadAll(rwc) + rwc.Close() + serverDone <- err + return nil + }) + + p, h, err := server.Negotiate(rw2) + if err != nil { + t.Fatal(err) + } + + go h(p, rw2) + + err = <-serverDone + if err != nil { + t.Fatal(err) + } + <-clientDone +} + type rwc struct { *strings.Reader }