Skip to content

Commit

Permalink
Fix crash when the server drops and does not return (Graylog2#19)
Browse files Browse the repository at this point in the history
* Fix crash when the server drops and does not return

This adds a new test that closes the server and continue to
write to it. It uncovered a bug with an unhandled nil pointer. It
also uncovered some issues in the reconnection logic that were fixed
at the same time.

* Change snake_case to camelCase

* Refactor listenUntilCloseSignal to remove duplicated code

* Fix timing issues in connection drop test
  • Loading branch information
ghislainbourgeois authored and Marius Sturm committed Dec 11, 2017
1 parent ba920ad commit 4143646
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 37 deletions.
97 changes: 80 additions & 17 deletions gelf/tcpreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net"
"time"
)

type TCPReader struct {
Expand All @@ -13,43 +14,83 @@ type TCPReader struct {
messages chan []byte
}

func newTCPReader(addr string) (*TCPReader, chan string, error) {
type connChannels struct {
drop chan string
confirm chan string
}

func newTCPReader(addr string) (*TCPReader, chan string, chan string, error) {
var err error
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
return nil, nil, nil, fmt.Errorf("ResolveTCPAddr('%s'): %s", addr, err)
}

listener, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
return nil, nil, fmt.Errorf("ListenTCP: %s", err)
return nil, nil, nil, fmt.Errorf("ListenTCP: %s", err)
}

r := &TCPReader{
listener: listener,
messages: make(chan []byte, 100), // Make a buffered channel with at most 100 messages
}

signal := make(chan string, 1)
closeSignal := make(chan string, 1)
doneSignal := make(chan string, 1)

go r.listenUntilCloseSignal(signal)
go r.listenUntilCloseSignal(closeSignal, doneSignal)

return r, signal, nil
return r, closeSignal, doneSignal, nil
}

func (r *TCPReader) listenUntilCloseSignal(signal chan string) {
defer func() { signal <- "done" }()
defer r.listener.Close()
func (r *TCPReader) accepter(connections chan net.Conn) {
for {
conn, err := r.listener.Accept()
if err != nil {
break
}
go handleConnection(conn, r.messages)
connections <- conn
}
}

func (r *TCPReader) listenUntilCloseSignal(closeSignal chan string, doneSignal chan string) {
defer func() { doneSignal <- "done" }()
defer r.listener.Close()
var conns []connChannels
connectionsChannel := make(chan net.Conn, 1)
go r.accepter(connectionsChannel)
for {
select {
case sig := <-signal:
if sig == "stop" {
break
case conn := <-connectionsChannel:
dropSignal := make(chan string, 1)
dropConfirm := make(chan string, 1)
channels := connChannels{drop: dropSignal, confirm: dropConfirm}
go handleConnection(conn, r.messages, dropSignal, dropConfirm)
conns = append(conns, channels)
default:
}

select {
case sig := <-closeSignal:
if sig == "stop" || sig == "drop" {
if len(conns) >= 1 {
for _, s := range conns {
if s.drop != nil {
s.drop <- "drop"
<-s.confirm
conns = append(conns[:0], conns[1:]...)
}
}
if sig == "stop" {
return
}
} else if sig == "stop" {
closeSignal <- "stop"
}
if sig == "drop" {
doneSignal <- "done"
}
}
default:
}
Expand All @@ -60,19 +101,41 @@ func (r *TCPReader) addr() string {
return r.listener.Addr().String()
}

func handleConnection(conn net.Conn, messages chan<- []byte) {
func handleConnection(conn net.Conn, messages chan<- []byte, dropSignal chan string, dropConfirm chan string) {
defer func() { dropConfirm <- "done" }()
defer conn.Close()
reader := bufio.NewReader(conn)

var b []byte
var err error
drop := false
canDrop := false

for {
conn.SetDeadline(time.Now().Add(2 * time.Second))
if b, err = reader.ReadBytes(0); err != nil {
continue
}
if len(b) > 0 {
if drop {
return
}
} else if len(b) > 0 {
messages <- b
canDrop = true
if drop {
return
}
} else if drop {
return
}
select {
case sig := <-dropSignal:
if sig == "drop" {
drop = true
time.Sleep(1 * time.Second)
if canDrop {
return
}
}
default:
}
}
}
Expand Down
12 changes: 10 additions & 2 deletions gelf/tcpwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ func (w *TCPWriter) Write(p []byte) (n int, err error) {

func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, err error) {
var errConn error
var i int

w.mu.Lock()
for i := 0; n <= w.MaxReconnect; i++ {
for i = 0; i <= w.MaxReconnect; i++ {
errConn = nil

n, err = w.conn.Write(zBytes)
if w.conn != nil {
n, err = w.conn.Write(zBytes)
} else {
err = fmt.Errorf("Connection was nil, will attempt reconnect")
}
if err != nil {
time.Sleep(w.ReconnectDelay * time.Second)
w.conn, errConn = net.Dial("tcp", w.addr)
Expand All @@ -90,6 +95,9 @@ func (w *TCPWriter) writeToSocketWithReconnectAttempts(zBytes []byte) (n int, er
}
w.mu.Unlock()

if i > w.MaxReconnect {
return 0, fmt.Errorf("Maximum reconnection attempts was reached; giving up")
}
if errConn != nil {
return 0, fmt.Errorf("Write Failed: %s\nReconnection failed: %s", err, errConn)
}
Expand Down
93 changes: 75 additions & 18 deletions gelf/tcpwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestNewTCPWriter(t *testing.T) {
}

func TestNewTCPWriterConfig(t *testing.T) {
r, _, err := newTCPReader("127.0.0.1:0")
r, _, _, err := newTCPReader("127.0.0.1:0")
if err != nil {
t.Error("Could not open TCPReader")
return
Expand Down Expand Up @@ -202,23 +202,36 @@ func TestWrite2MessagesWithConnectionDropTCP(t *testing.T) {
assertMessages(msg2, "Second message", msgData2, t)
}

func setupConnections() (*TCPReader, chan string, *TCPWriter, error) {
r, signal, err := newTCPReader("127.0.0.1:0")
func TestWrite2MessagesWithServerDropTCP(t *testing.T) {
msgData1 := "First message\nThis happens before the server drops"
msgData2 := "Second message\nThis happens after the server drops"

msg1, err := sendAndRecv2MessagesWithServerDropTCP(msgData1, msgData2)
if err != nil {
return nil, nil, nil, fmt.Errorf("newTCPReader: %s", err)
t.Errorf("sendAndRecv2MessagesWithDropTCP: %s", err)
return
}

assertMessages(msg1, "First message", msgData1, t)
}

func setupConnections() (*TCPReader, chan string, chan string, *TCPWriter, error) {
r, closeSignal, doneSignal, err := newTCPReader("127.0.0.1:0")

if err != nil {
return nil, nil, nil, nil, fmt.Errorf("newTCPReader: %s", err)
}

w, err := NewTCPWriter(r.addr())
if err != nil {
return nil, nil, nil, fmt.Errorf("NewTCPWriter: %s", err)
return nil, nil, nil, nil, fmt.Errorf("NewTCPWriter: %s", err)
}

return r, signal, w, nil
return r, closeSignal, doneSignal, w, nil
}

func sendAndRecvTCP(msgData string) (*Message, error) {
r, signal, w, err := setupConnections()
r, closeSignal, doneSignal, w, err := setupConnections()
if err != nil {
return nil, err
}
Expand All @@ -227,9 +240,9 @@ func sendAndRecvTCP(msgData string) (*Message, error) {
return nil, fmt.Errorf("w.Write: %s", err)
}

signal <- "stop"
done := <-signal
if done == "done" {
closeSignal <- "stop"
done := <-doneSignal
if done != "done" {
return nil, errors.New("Wrong signal received")
}

Expand All @@ -242,7 +255,7 @@ func sendAndRecvTCP(msgData string) (*Message, error) {
}

func sendAndRecvMsgTCP(msg *Message) (*Message, error) {
r, signal, w, err := setupConnections()
r, closeSignal, doneSignal, w, err := setupConnections()
if err != nil {
return nil, err
}
Expand All @@ -251,9 +264,9 @@ func sendAndRecvMsgTCP(msg *Message) (*Message, error) {
return nil, fmt.Errorf("w.Write: %s", err)
}

signal <- "stop"
done := <-signal
if done == "done" {
closeSignal <- "stop"
done := <-doneSignal
if done != "done" {
return nil, errors.New("Wrong signal received")
}

Expand All @@ -267,7 +280,7 @@ func sendAndRecvMsgTCP(msg *Message) (*Message, error) {
}

func sendAndRecv2MessagesWithDropTCP(msgData1 string, msgData2 string) (*Message, *Message, error) {
r, signal, w, err := setupConnections()
r, closeSignal, doneSignal, w, err := setupConnections()
if err != nil {
return nil, nil, err
}
Expand All @@ -276,6 +289,14 @@ func sendAndRecv2MessagesWithDropTCP(msgData1 string, msgData2 string) (*Message
return nil, nil, fmt.Errorf("w.Write: %s", err)
}

time.Sleep(200 * time.Millisecond)

closeSignal <- "drop"
done := <-doneSignal
if done != "done" {
return nil, nil, fmt.Errorf("Wrong signal received: %s", done)
}

message1, err := r.readMessage()
if err != nil {
return nil, nil, fmt.Errorf("readmessage: %s", err)
Expand All @@ -285,13 +306,15 @@ func sendAndRecv2MessagesWithDropTCP(msgData1 string, msgData2 string) (*Message
if _, err = w.Write([]byte(msgData2)); err != nil {
return nil, nil, fmt.Errorf("write 1 w.Write: %s", err)
}
time.Sleep(200 * time.Millisecond)
if _, err = w.Write([]byte(msgData2)); err != nil {
return nil, nil, fmt.Errorf("write 2 w.Write: %s", err)
}
time.Sleep(200 * time.Millisecond)

signal <- "stop"
done := <-signal
if done == "done" {
closeSignal <- "stop"
done = <-doneSignal
if done != "done" {
return nil, nil, errors.New("Wrong signal received")
}

Expand All @@ -303,3 +326,37 @@ func sendAndRecv2MessagesWithDropTCP(msgData1 string, msgData2 string) (*Message
w.Close()
return message1, message2, nil
}

func sendAndRecv2MessagesWithServerDropTCP(msgData1 string, msgData2 string) (*Message, error) {
r, closeSignal, doneSignal, w, err := setupConnections()
if err != nil {
return nil, err
}

if _, err = w.Write([]byte(msgData1)); err != nil {
return nil, fmt.Errorf("w.Write: %s", err)
}

closeSignal <- "stop"
done := <-doneSignal
if done != "done" {
return nil, fmt.Errorf("Wrong signal received: %s", done)
}

message1, err := r.readMessage()
if err != nil {
return nil, fmt.Errorf("readmessage: %s", err)
}

// Need to write twice to force the detection of the dropped connection
// The first write will not cause an error, but the subsequent ones will
for {
_, err = w.Write([]byte(msgData2))
if err != nil {
break
}
}

w.Close()
return message1, nil
}
3 changes: 3 additions & 0 deletions gelf/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,8 @@ type GelfWriter struct {

// Close connection and interrupt blocked Read or Write operations
func (w *GelfWriter) Close() error {
if w.conn == nil {
return nil
}
return w.conn.Close()
}

0 comments on commit 4143646

Please sign in to comment.