From 031431a7e8e1cde094160fedca25365832a828da Mon Sep 17 00:00:00 2001 From: Patrick Dawkins Date: Mon, 6 Feb 2023 23:46:59 +0000 Subject: [PATCH 1/3] client: remove EOF special case --- client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/client.go b/client.go index 61772b6..78c936e 100644 --- a/client.go +++ b/client.go @@ -10,7 +10,6 @@ import ( "encoding/base64" "errors" "fmt" - "io" "net/http" "sync" "sync/atomic" @@ -213,10 +212,6 @@ func (c *Client) readLoop(reader *EventStreamReader, outCh chan *Event, erChan c // Read each new line and process the type of event event, err := reader.ReadEvent() if err != nil { - if err == io.EOF { - erChan <- nil - return - } // run user specified disconnect function if c.disconnectcb != nil { c.Connected = false From b01f4fc36e9b921b49f8653718b04a21654e1b60 Mon Sep 17 00:00:00 2001 From: Patrick Dawkins Date: Tue, 7 Feb 2023 22:21:58 +0000 Subject: [PATCH 2/3] Add test for reconnecting after EOF --- client_test.go | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/client_test.go b/client_test.go index 3468df0..1617f20 100644 --- a/client_test.go +++ b/client_test.go @@ -8,6 +8,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "io" "net/http" "net/http/httptest" "runtime" @@ -33,11 +34,12 @@ var mldata = `{ ] }` -func setup(empty bool) { +func setup(empty bool) *Server { // New Server srv = newServer() // Send almost-continuous string of events to the client go publishMsgs(srv, empty, 100000000) + return srv } func setupMultiline() { @@ -258,6 +260,42 @@ func TestClientChanReconnect(t *testing.T) { c.Unsubscribe(events) } +func TestClientChanReconnectOnEOF(t *testing.T) { + srv := setup(false) + defer cleanup() + streamID := "test" + + c := NewClient(urlPath) + + var ( + reconnectErr error + reconnectDuration time.Duration + ) + c.ReconnectStrategy = backoff.NewConstantBackOff(time.Millisecond * 10) + c.ReconnectNotify = func(err error, duration time.Duration) { + reconnectErr = err + reconnectDuration = duration + } + + events := make(chan *Event) + err := c.SubscribeChan(streamID, events) + require.Nil(t, err) + + msg, err := wait(events, time.Millisecond*100) + assert.NoError(t, err) + assert.Equal(t, []byte(`ping`), msg) + + srv.RemoveStream(streamID) + srv.CreateStream(streamID) + msg, err = wait(events, time.Millisecond*100) + assert.NoError(t, err) + assert.Equal(t, io.EOF, reconnectErr) + assert.Equal(t, time.Millisecond*10, reconnectDuration) + assert.Equal(t, []byte(`ping`), msg) + + c.Unsubscribe(events) +} + func TestClientUnsubscribe(t *testing.T) { setup(false) defer cleanup() From a3b4b979472c32c281c238325738808bd5e807ca Mon Sep 17 00:00:00 2001 From: Patrick Dawkins Date: Wed, 8 Feb 2023 23:51:54 +0000 Subject: [PATCH 3/3] Simplify test change to use global srv --- client_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/client_test.go b/client_test.go index 1617f20..0e7cffa 100644 --- a/client_test.go +++ b/client_test.go @@ -34,12 +34,11 @@ var mldata = `{ ] }` -func setup(empty bool) *Server { +func setup(empty bool) { // New Server srv = newServer() // Send almost-continuous string of events to the client go publishMsgs(srv, empty, 100000000) - return srv } func setupMultiline() { @@ -261,7 +260,7 @@ func TestClientChanReconnect(t *testing.T) { } func TestClientChanReconnectOnEOF(t *testing.T) { - srv := setup(false) + setup(false) defer cleanup() streamID := "test"