diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 28e31fc9840f..7698d2fb9017 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -71,6 +71,7 @@ var ( errSlowReader = errors.New("client failed to read fast enough to keep outgoing buffer below threshold") errNotRequested = errors.New("event not requested by client") errUnhandledEventData = errors.New("unable to represent event data in the event stream") + errWriterUnusable = errors.New("http response writer is unusable") ) // StreamingResponseWriter defines a type that can be used by the eventStreamer. @@ -309,10 +310,21 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can } } +func writeLazyReaderWithRecover(w StreamingResponseWriter, lr lazyReader) (err error) { + defer func() { + if r := recover(); r != nil { + log.WithField("panic", r).Error("Recovered from panic while writing event to client.") + err = errWriterUnusable + } + }() + _, err = io.Copy(w, lr()) + return err +} + func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error { needKeepAlive := true if first != nil { - if _, err := io.Copy(w, first()); err != nil { + if err := writeLazyReaderWithRecover(w, first); err != nil { return err } needKeepAlive = false @@ -325,13 +337,13 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWri case <-ctx.Done(): return ctx.Err() case rf := <-es.outbox: - if _, err := io.Copy(w, rf()); err != nil { + if err := writeLazyReaderWithRecover(w, rf); err != nil { return err } needKeepAlive = false default: if needKeepAlive { - if _, err := io.Copy(w, newlineReader()); err != nil { + if err := writeLazyReaderWithRecover(w, newlineReader); err != nil { return err } }