From 6c0014aa734085687a7666fbf62125ee2f819434 Mon Sep 17 00:00:00 2001 From: Matthew Keeler Date: Mon, 20 May 2024 16:17:16 -0400 Subject: [PATCH] feat: Add support for receiving gzip event payloads --- config/config.go | 12 +-- config/test_data_configs_valid_test.go | 11 +-- docs/configuration.md | 16 ++-- go.mod | 7 +- go.sum | 1 + internal/middleware/middleware.go | 18 ++++ internal/util/reader.go | 109 +++++++++++++++++++++++++ internal/util/reader_test.go | 77 +++++++++++++++++ relay/relay_routes.go | 14 ++-- 9 files changed, 241 insertions(+), 24 deletions(-) create mode 100644 internal/util/reader.go create mode 100644 internal/util/reader_test.go diff --git a/config/config.go b/config/config.go index bc02d915..6d1bba90 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "time" + "github.com/alecthomas/units" ct "github.com/launchdarkly/go-configtypes" "github.com/launchdarkly/ld-relay/v8/internal/logging" ) @@ -179,11 +180,12 @@ type OfflineModeConfig struct { // variables, individual fields are not documented here; instead, see the `README.md` section on // configuration. type EventsConfig struct { - EventsURI ct.OptURLAbsolute `conf:"EVENTS_HOST"` - SendEvents bool `conf:"USE_EVENTS"` - FlushInterval ct.OptDuration `conf:"EVENTS_FLUSH_INTERVAL"` - Capacity ct.OptIntGreaterThanZero `conf:"EVENTS_CAPACITY"` - InlineUsers bool `conf:"EVENTS_INLINE_USERS"` + EventsURI ct.OptURLAbsolute `conf:"EVENTS_HOST"` + SendEvents bool `conf:"USE_EVENTS"` + FlushInterval ct.OptDuration `conf:"EVENTS_FLUSH_INTERVAL"` + Capacity ct.OptIntGreaterThanZero `conf:"EVENTS_CAPACITY"` + InlineUsers bool `conf:"EVENTS_INLINE_USERS"` + MaxReceivablePayloadSize units.Base2Bytes `conf:"EVENTS_MAX_RECEIVABLE_PAYLOAD_SIZE"` } // RedisConfig configures the optional Redis integration. diff --git a/config/test_data_configs_valid_test.go b/config/test_data_configs_valid_test.go index a06bdcd0..e7f7797e 100644 --- a/config/test_data_configs_valid_test.go +++ b/config/test_data_configs_valid_test.go @@ -117,11 +117,12 @@ func makeValidConfigAllBaseProperties() testDataValidConfig { BigSegmentsStaleThreshold: ct.NewOptDuration(10 * time.Minute), } c.Events = EventsConfig{ - SendEvents: true, - EventsURI: newOptURLAbsoluteMustBeValid("http://events"), - FlushInterval: ct.NewOptDuration(120 * time.Second), - Capacity: mustOptIntGreaterThanZero(500), - InlineUsers: true, + SendEvents: true, + EventsURI: newOptURLAbsoluteMustBeValid("http://events"), + FlushInterval: ct.NewOptDuration(120 * time.Second), + Capacity: mustOptIntGreaterThanZero(500), + InlineUsers: true, + MaxReceivablePayloadSize: 0, } c.Environment = map[string]*EnvConfig{ "earth": { diff --git a/docs/configuration.md b/docs/configuration.md index f937df4b..7b97af88 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -109,15 +109,17 @@ Note that the last three properties have the same meanings and the same environm To learn more, read [Forwarding events](./events.md). -| Property in file | Environment var | Type | Default | Description | -|------------------|-------------------------|:--------:|:--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `sendEvents` | `USE_EVENTS` | Boolean | `false` | When enabled, the Relay Proxy will send analytic events it receives to LaunchDarkly, unless offline mode is enabled. | -| `eventsUri` | `EVENTS_HOST` | URI | _(7)_ | URI for the LaunchDarkly events service | -| `flushInterval` | `EVENTS_FLUSH_INTERVAL` | Duration | `5s` | Controls how long the SDK buffers events before sending them back to our server. If your server generates many events per second, we suggest decreasing the flush interval and/or increasing capacity to meet your needs. | -| `capacity` | `EVENTS_CAPACITY` | Number | `1000` | Maximum number of events to accumulate for each flush interval. | -| `inlineUsers` | `EVENTS_INLINE_USERS` | Boolean | `false` | When enabled, individual events (if full event tracking is enabled for the feature flag) will contain all non-private user attributes. | +| Property in file | Environment var | Type | Default | Description | +|-------------------------------|--------------------------------------|:--------:|:--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `sendEvents` | `USE_EVENTS` | Boolean | `false` | When enabled, the Relay Proxy will send analytic events it receives to LaunchDarkly, unless offline mode is enabled. | +| `eventsUri` | `EVENTS_HOST` | URI | _(7)_ | URI for the LaunchDarkly events service | +| `flushInterval` | `EVENTS_FLUSH_INTERVAL` | Duration | `5s` | Controls how long the SDK buffers events before sending them back to our server. If your server generates many events per second, we suggest decreasing the flush interval and/or increasing capacity to meet your needs. | +| `capacity` | `EVENTS_CAPACITY` | Number | `1000` | Maximum number of events to accumulate for each flush interval. | +| `inlineUsers` | `EVENTS_INLINE_USERS` | Boolean | `false` | When enabled, individual events (if full event tracking is enabled for the feature flag) will contain all non-private user attributes. | +| `maxReceivablePayloadSize` | `EVENTS_MAX_RECEIVABLE_PAYLOAD_SIZE` | Unit | _(8)_ | Maximum size of an event payload the Relay Proxy will accept from an SDK. | _(7)_ See note _(1)_ above. The default value for `eventsUri` is `https://events.launchdarkly.com`. +_(8)_ The `maxReceivablePayloadSize` setting is used to limit the size of the payload that the Relay Proxy will accept from an SDK. This is an optional safety feature to prevent the Relay Proxy from being overwhelmed by a very large payload. The default value is `0B` which provides no restriction on the payload size. The value should be a number followed by a unit: `B` for bytes, `KiB` for kibibytes, `MiB` for mebibytes, `GiB` for gibibytes, `TiB` for tebibytes, `PiB` for pebibytes, or `EiB` for exbibytes. For example, `100MiB` is 100 mebibytes. ### File section: `[Environment "NAME"]` diff --git a/go.mod b/go.mod index b2e6fe93..2fde15c2 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/dynamodb v1.27.0 github.com/cyphar/filepath-securejoin v0.2.4 github.com/fatih/color v1.15.0 // indirect - github.com/fsnotify/fsnotify v1.7.0 + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-redis/redis/v8 v8.11.5 github.com/gomodule/redigo v1.8.9 github.com/google/uuid v1.5.0 // indirect @@ -59,7 +59,10 @@ require ( golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb ) -require github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e +require ( + github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 + github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e +) require ( cloud.google.com/go/compute v1.23.3 // indirect diff --git a/go.sum b/go.sum index 16e0c12c..b17da28c 100644 --- a/go.sum +++ b/go.sum @@ -62,6 +62,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= diff --git a/internal/middleware/middleware.go b/internal/middleware/middleware.go index 7543ba40..94ed33c7 100644 --- a/internal/middleware/middleware.go +++ b/internal/middleware/middleware.go @@ -5,8 +5,10 @@ import ( "encoding/json" "errors" "net/http" + "strings" "github.com/launchdarkly/ld-relay/v8/internal/sdkauth" + "github.com/launchdarkly/ld-relay/v8/internal/util" "github.com/launchdarkly/ld-relay/v8/config" "github.com/launchdarkly/ld-relay/v8/internal/basictypes" @@ -17,6 +19,7 @@ import ( "github.com/launchdarkly/go-sdk-common/v3/ldcontext" ld "github.com/launchdarkly/go-server-sdk/v7" + "github.com/alecthomas/units" "github.com/gorilla/mux" ) @@ -213,3 +216,18 @@ func base64urlDecode(base64String string) ([]byte, error) { return idStr, nil } + +func GzipMiddleware(maxReceivablePayloadSize units.Base2Bytes) mux.MiddlewareFunc { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + isGzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip") + reader, err := util.NewReader(r.Body, isGzipped, int64(maxReceivablePayloadSize)) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + r.Body = reader + next.ServeHTTP(w, r) + }) + } +} diff --git a/internal/util/reader.go b/internal/util/reader.go new file mode 100644 index 00000000..43de5298 --- /dev/null +++ b/internal/util/reader.go @@ -0,0 +1,109 @@ +package util + +// Taken from event-recorder reader.go + +import ( + "compress/gzip" + "errors" + "io" +) + +// PayloadReader is an implementation of io.Reader that reads bytes off the request body +// optionally decompresses them, and has a limit attached. +// If the limit is reached, an error will be returned and the underlying stream will be closed. +// +// Note: limit is applied to *both* compressed and uncompressed number of bytes. This +// protects us from potential zipbombs +type PayloadReader struct { + IsGzipped bool + MaxBytes int64 + + uncompressedBytesRead int64 + + wrappedBaseStream *byteCountingReader + stream *io.LimitedReader +} + +// NewReader creates a new reader +func NewReader(r io.ReadCloser, isGzipped bool, maxBytes int64) (io.ReadCloser, error) { + // If this isn't compressed, and we don't want to limit the size, just + // return the original reader + if !isGzipped && maxBytes == 0 { + return r, nil + } + + var baseStream = &byteCountingReader{ + bytesRead: 0, + baseStream: r, + } + var s io.Reader = baseStream + + if isGzipped { + var err error + gzipReader, err := gzip.NewReader(s) + if err != nil { + return nil, err + } + + // If we don't want to limit the size, just return the gzip reader + if maxBytes == 0 { + return gzipReader, nil + } + + s = gzipReader + } + stream := io.LimitReader(s, maxBytes).(*io.LimitedReader) + + payloadReader := &PayloadReader{ + IsGzipped: isGzipped, + MaxBytes: maxBytes, + wrappedBaseStream: baseStream, + stream: stream, + } + return payloadReader, nil +} + +// GetBytesRead returns the total number of bytes read off the original stream +func (pr *PayloadReader) GetBytesRead() int64 { + return pr.wrappedBaseStream.bytesRead +} + +// GetUncompressedBytesRead Total number of Bytes in the uncompressed stream. +// +// GetBytesRead and GetUncompressedBytesRead will return the same value if the stream is uncompressed. +func (pr *PayloadReader) GetUncompressedBytesRead() int64 { + return pr.uncompressedBytesRead +} + +func (pr *PayloadReader) Read(p []byte) (int, error) { + n, err := pr.stream.Read(p) + pr.uncompressedBytesRead += int64(n) + if err != nil { + return n, err + } + if pr.stream.N <= 0 { + pr.Close() + return n, errors.New("Max bytes exceeded") + } + return n, err +} + +func (pr *PayloadReader) Close() error { + c, ok := pr.wrappedBaseStream.baseStream.(io.Closer) + if ok { + return c.Close() + } + return nil +} + +// a simple reader decorator that keeps a running total of bytes +type byteCountingReader struct { + baseStream io.Reader + bytesRead int64 +} + +func (bt *byteCountingReader) Read(p []byte) (int, error) { + n, err := bt.baseStream.Read(p) + bt.bytesRead += int64(n) + return n, err +} diff --git a/internal/util/reader_test.go b/internal/util/reader_test.go new file mode 100644 index 00000000..c15568e6 --- /dev/null +++ b/internal/util/reader_test.go @@ -0,0 +1,77 @@ +package util + +import ( + "bytes" + "compress/gzip" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUncompressed(t *testing.T) { + // make sure the bytes read are correct for non-gzipped streams + nonZipBytes := []byte("this is a test") + + reader, _ := NewReader(io.NopCloser(bytes.NewReader(nonZipBytes)), false, 1000) + payloadReader, _ := reader.(*PayloadReader) + readBytes, _ := io.ReadAll(reader) + + assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetBytesRead()) + assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetUncompressedBytesRead()) + assert.Equal(t, nonZipBytes, readBytes) +} + +func TestPayloadBytesTracking(t *testing.T) { + // build a string + var ret string + for i := 0; i < 500; i++ { + ret += "00" + } + + // make sure the bytes read are correct for gzipped streams + nonZipBytes := []byte(ret) + + var b bytes.Buffer + w := gzip.NewWriter(&b) + w.Write(nonZipBytes) + w.Close() + zipBytes := b.Bytes() + + reader, _ := NewReader(io.NopCloser(bytes.NewReader(zipBytes)), true, 10000) + payloadReader, _ := reader.(*PayloadReader) + readBytes, _ := io.ReadAll(reader) + + assert.Equal(t, int64(len(zipBytes)), payloadReader.GetBytesRead()) // compressed is 32 + assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetUncompressedBytesRead()) // uncompressed is 1000 + assert.Equal(t, nonZipBytes, readBytes) +} + +func TestZipBombing(t *testing.T) { + // build a string + var ret string + for i := 0; i < 500; i++ { + ret += "00" + } + + nonZipBytes := []byte(ret) + + var b bytes.Buffer + w := gzip.NewWriter(&b) + w.Write(nonZipBytes) + w.Close() + zipBytes := b.Bytes() + + maxBytes := int64(100) + reader, _ := NewReader(io.NopCloser(bytes.NewReader(zipBytes)), true, maxBytes) + payloadReader, _ := reader.(*PayloadReader) + bytesRead, err := io.ReadAll(reader) + + // we should have an error because the uncompressed stream is larger than maxBytes + assert.Error(t, err) + assert.Equal(t, int64(len(bytesRead)), maxBytes) + + assert.Equal(t, int64(len(zipBytes)), payloadReader.GetBytesRead()) + assert.Equal(t, maxBytes, payloadReader.GetUncompressedBytesRead()) + +} diff --git a/relay/relay_routes.go b/relay/relay_routes.go index 5484e0c7..e5527fc2 100644 --- a/relay/relay_routes.go +++ b/relay/relay_routes.go @@ -124,18 +124,18 @@ func (r *Relay) makeRouter() *mux.Router { clientSideStreamEvalRouter.Handle("", middleware.CountBrowserConns(jsPingWithUser)).Methods("REPORT", "OPTIONS") mobileEventsRouter := router.PathPrefix("/mobile").Subrouter() - mobileEventsRouter.Use(mobileMiddlewareStack) + mobileEventsRouter.Use(mobileMiddlewareStack, middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize)) mobileEventsRouter.Handle("/events/bulk", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST") mobileEventsRouter.Handle("/events", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST") mobileEventsRouter.Handle("", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST") mobileEventsRouter.Handle("/events/diagnostic", bulkEventHandler(basictypes.MobileSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST") clientSideBulkEventsRouter := router.PathPrefix("/events/bulk/{envId}").Subrouter() - clientSideBulkEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter)) + clientSideBulkEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter), middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize)) clientSideBulkEventsRouter.Handle("", bulkEventHandler(basictypes.JSClientSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST", "OPTIONS") clientSideDiagnosticEventsRouter := router.PathPrefix("/events/diagnostic/{envId}").Subrouter() - clientSideDiagnosticEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter)) + clientSideDiagnosticEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter), middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize)) clientSideDiagnosticEventsRouter.Handle("", bulkEventHandler(basictypes.JSClientSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST", "OPTIONS") clientSideImageEventsRouter := router.PathPrefix("/a/{envId}.gif").Subrouter() @@ -144,8 +144,12 @@ func (r *Relay) makeRouter() *mux.Router { serverSideRouter := router.PathPrefix("").Subrouter() serverSideRouter.Use(serverSideMiddlewareStack) - serverSideRouter.Handle("/bulk", bulkEventHandler(basictypes.ServerSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST") - serverSideRouter.Handle("/diagnostic", bulkEventHandler(basictypes.ServerSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST") + + serverSideBulkEventsRouter := serverSideRouter.NewRoute().Subrouter() + serverSideBulkEventsRouter.Use(middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize)) + serverSideBulkEventsRouter.Handle("/bulk", bulkEventHandler(basictypes.ServerSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST") + serverSideBulkEventsRouter.Handle("/diagnostic", bulkEventHandler(basictypes.ServerSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST") + serverSideRouter.Handle("/all", middleware.CountServerConns(middleware.Streaming( streamHandler(r.serverSideStreamProvider, serverSideStreamLogMessage), ))).Methods("GET")