Skip to content

Commit

Permalink
feat: Add support for receiving gzip event payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
keelerm84 committed May 20, 2024
1 parent 02c68be commit 6c0014a
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 24 deletions.
12 changes: 7 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package config
import (
"time"

"github.com/alecthomas/units"
ct "github.com/launchdarkly/go-configtypes"
"github.com/launchdarkly/ld-relay/v8/internal/logging"
)
Expand Down Expand Up @@ -179,11 +180,12 @@ type OfflineModeConfig struct {
// variables, individual fields are not documented here; instead, see the `README.md` section on
// configuration.
type EventsConfig struct {
EventsURI ct.OptURLAbsolute `conf:"EVENTS_HOST"`
SendEvents bool `conf:"USE_EVENTS"`
FlushInterval ct.OptDuration `conf:"EVENTS_FLUSH_INTERVAL"`
Capacity ct.OptIntGreaterThanZero `conf:"EVENTS_CAPACITY"`
InlineUsers bool `conf:"EVENTS_INLINE_USERS"`
EventsURI ct.OptURLAbsolute `conf:"EVENTS_HOST"`
SendEvents bool `conf:"USE_EVENTS"`
FlushInterval ct.OptDuration `conf:"EVENTS_FLUSH_INTERVAL"`
Capacity ct.OptIntGreaterThanZero `conf:"EVENTS_CAPACITY"`
InlineUsers bool `conf:"EVENTS_INLINE_USERS"`
MaxReceivablePayloadSize units.Base2Bytes `conf:"EVENTS_MAX_RECEIVABLE_PAYLOAD_SIZE"`
}

// RedisConfig configures the optional Redis integration.
Expand Down
11 changes: 6 additions & 5 deletions config/test_data_configs_valid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ func makeValidConfigAllBaseProperties() testDataValidConfig {
BigSegmentsStaleThreshold: ct.NewOptDuration(10 * time.Minute),
}
c.Events = EventsConfig{
SendEvents: true,
EventsURI: newOptURLAbsoluteMustBeValid("http://events"),
FlushInterval: ct.NewOptDuration(120 * time.Second),
Capacity: mustOptIntGreaterThanZero(500),
InlineUsers: true,
SendEvents: true,
EventsURI: newOptURLAbsoluteMustBeValid("http://events"),
FlushInterval: ct.NewOptDuration(120 * time.Second),
Capacity: mustOptIntGreaterThanZero(500),
InlineUsers: true,
MaxReceivablePayloadSize: 0,
}
c.Environment = map[string]*EnvConfig{
"earth": {
Expand Down
16 changes: 9 additions & 7 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,17 @@ Note that the last three properties have the same meanings and the same environm

To learn more, read [Forwarding events](./events.md).

| Property in file | Environment var | Type | Default | Description |
|------------------|-------------------------|:--------:|:--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `sendEvents` | `USE_EVENTS` | Boolean | `false` | When enabled, the Relay Proxy will send analytic events it receives to LaunchDarkly, unless offline mode is enabled. |
| `eventsUri` | `EVENTS_HOST` | URI | _(7)_ | URI for the LaunchDarkly events service |
| `flushInterval` | `EVENTS_FLUSH_INTERVAL` | Duration | `5s` | Controls how long the SDK buffers events before sending them back to our server. If your server generates many events per second, we suggest decreasing the flush interval and/or increasing capacity to meet your needs. |
| `capacity` | `EVENTS_CAPACITY` | Number | `1000` | Maximum number of events to accumulate for each flush interval. |
| `inlineUsers` | `EVENTS_INLINE_USERS` | Boolean | `false` | When enabled, individual events (if full event tracking is enabled for the feature flag) will contain all non-private user attributes. |
| Property in file | Environment var | Type | Default | Description |
|-------------------------------|--------------------------------------|:--------:|:--------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `sendEvents` | `USE_EVENTS` | Boolean | `false` | When enabled, the Relay Proxy will send analytic events it receives to LaunchDarkly, unless offline mode is enabled. |
| `eventsUri` | `EVENTS_HOST` | URI | _(7)_ | URI for the LaunchDarkly events service |
| `flushInterval` | `EVENTS_FLUSH_INTERVAL` | Duration | `5s` | Controls how long the SDK buffers events before sending them back to our server. If your server generates many events per second, we suggest decreasing the flush interval and/or increasing capacity to meet your needs. |
| `capacity` | `EVENTS_CAPACITY` | Number | `1000` | Maximum number of events to accumulate for each flush interval. |
| `inlineUsers` | `EVENTS_INLINE_USERS` | Boolean | `false` | When enabled, individual events (if full event tracking is enabled for the feature flag) will contain all non-private user attributes. |
| `maxReceivablePayloadSize` | `EVENTS_MAX_RECEIVABLE_PAYLOAD_SIZE` | Unit | _(8)_ | Maximum size of an event payload the Relay Proxy will accept from an SDK. |

_(7)_ See note _(1)_ above. The default value for `eventsUri` is `https://events.launchdarkly.com`.
_(8)_ The `maxReceivablePayloadSize` setting is used to limit the size of the payload that the Relay Proxy will accept from an SDK. This is an optional safety feature to prevent the Relay Proxy from being overwhelmed by a very large payload. The default value is `0B` which provides no restriction on the payload size. The value should be a number followed by a unit: `B` for bytes, `KiB` for kibibytes, `MiB` for mebibytes, `GiB` for gibibytes, `TiB` for tebibytes, `PiB` for pebibytes, or `EiB` for exbibytes. For example, `100MiB` is 100 mebibytes.


### File section: `[Environment "NAME"]`
Expand Down
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.27.0
github.com/cyphar/filepath-securejoin v0.2.4
github.com/fatih/color v1.15.0 // indirect
github.com/fsnotify/fsnotify v1.7.0
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-redis/redis/v8 v8.11.5
github.com/gomodule/redigo v1.8.9
github.com/google/uuid v1.5.0 // indirect
Expand Down Expand Up @@ -59,7 +59,10 @@ require (
golang.org/x/exp v0.0.0-20231206192017-f3f8817b8deb
)

require github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e
require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/launchdarkly/api-client-go/v13 v13.0.1-0.20230420175109-f5469391a13e
)

require (
cloud.google.com/go/compute v1.23.3 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
Expand Down
18 changes: 18 additions & 0 deletions internal/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"encoding/json"
"errors"
"net/http"
"strings"

"github.com/launchdarkly/ld-relay/v8/internal/sdkauth"
"github.com/launchdarkly/ld-relay/v8/internal/util"

"github.com/launchdarkly/ld-relay/v8/config"
"github.com/launchdarkly/ld-relay/v8/internal/basictypes"
Expand All @@ -17,6 +19,7 @@ import (
"github.com/launchdarkly/go-sdk-common/v3/ldcontext"
ld "github.com/launchdarkly/go-server-sdk/v7"

"github.com/alecthomas/units"
"github.com/gorilla/mux"
)

Expand Down Expand Up @@ -213,3 +216,18 @@ func base64urlDecode(base64String string) ([]byte, error) {

return idStr, nil
}

func GzipMiddleware(maxReceivablePayloadSize units.Base2Bytes) mux.MiddlewareFunc {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
isGzipped := strings.Contains(r.Header.Get("Content-Encoding"), "gzip")
reader, err := util.NewReader(r.Body, isGzipped, int64(maxReceivablePayloadSize))
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
r.Body = reader
next.ServeHTTP(w, r)
})
}
}
109 changes: 109 additions & 0 deletions internal/util/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package util

// Taken from event-recorder reader.go

import (
"compress/gzip"
"errors"
"io"
)

// PayloadReader is an implementation of io.Reader that reads bytes off the request body
// optionally decompresses them, and has a limit attached.
// If the limit is reached, an error will be returned and the underlying stream will be closed.
//
// Note: limit is applied to *both* compressed and uncompressed number of bytes. This
// protects us from potential zipbombs
type PayloadReader struct {
IsGzipped bool
MaxBytes int64

uncompressedBytesRead int64

wrappedBaseStream *byteCountingReader
stream *io.LimitedReader
}

// NewReader creates a new reader
func NewReader(r io.ReadCloser, isGzipped bool, maxBytes int64) (io.ReadCloser, error) {
// If this isn't compressed, and we don't want to limit the size, just
// return the original reader
if !isGzipped && maxBytes == 0 {
return r, nil
}

var baseStream = &byteCountingReader{
bytesRead: 0,
baseStream: r,
}
var s io.Reader = baseStream

if isGzipped {
var err error
gzipReader, err := gzip.NewReader(s)
if err != nil {
return nil, err
}

// If we don't want to limit the size, just return the gzip reader
if maxBytes == 0 {
return gzipReader, nil
}

s = gzipReader
}
stream := io.LimitReader(s, maxBytes).(*io.LimitedReader)

payloadReader := &PayloadReader{
IsGzipped: isGzipped,
MaxBytes: maxBytes,
wrappedBaseStream: baseStream,
stream: stream,
}
return payloadReader, nil
}

// GetBytesRead returns the total number of bytes read off the original stream
func (pr *PayloadReader) GetBytesRead() int64 {
return pr.wrappedBaseStream.bytesRead
}

// GetUncompressedBytesRead Total number of Bytes in the uncompressed stream.
//
// GetBytesRead and GetUncompressedBytesRead will return the same value if the stream is uncompressed.
func (pr *PayloadReader) GetUncompressedBytesRead() int64 {
return pr.uncompressedBytesRead
}

func (pr *PayloadReader) Read(p []byte) (int, error) {
n, err := pr.stream.Read(p)
pr.uncompressedBytesRead += int64(n)
if err != nil {
return n, err
}
if pr.stream.N <= 0 {
pr.Close()

Check failure on line 85 in internal/util/reader.go

View workflow job for this annotation

GitHub Actions / Go 1.21.10 / Unit Tests

Error return value of `pr.Close` is not checked (errcheck)

Check failure on line 85 in internal/util/reader.go

View workflow job for this annotation

GitHub Actions / Go 1.22.3 / Unit Tests

Error return value of `pr.Close` is not checked (errcheck)
return n, errors.New("Max bytes exceeded")

Check failure on line 86 in internal/util/reader.go

View workflow job for this annotation

GitHub Actions / Go 1.21.10 / Unit Tests

ST1005: error strings should not be capitalized (stylecheck)

Check failure on line 86 in internal/util/reader.go

View workflow job for this annotation

GitHub Actions / Go 1.22.3 / Unit Tests

ST1005: error strings should not be capitalized (stylecheck)
}
return n, err
}

func (pr *PayloadReader) Close() error {
c, ok := pr.wrappedBaseStream.baseStream.(io.Closer)
if ok {
return c.Close()
}
return nil
}

// a simple reader decorator that keeps a running total of bytes
type byteCountingReader struct {
baseStream io.Reader
bytesRead int64
}

func (bt *byteCountingReader) Read(p []byte) (int, error) {
n, err := bt.baseStream.Read(p)
bt.bytesRead += int64(n)
return n, err
}
77 changes: 77 additions & 0 deletions internal/util/reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package util

import (
"bytes"
"compress/gzip"
"io"
"testing"

"github.com/stretchr/testify/assert"
)

func TestUncompressed(t *testing.T) {
// make sure the bytes read are correct for non-gzipped streams
nonZipBytes := []byte("this is a test")

reader, _ := NewReader(io.NopCloser(bytes.NewReader(nonZipBytes)), false, 1000)
payloadReader, _ := reader.(*PayloadReader)
readBytes, _ := io.ReadAll(reader)

assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetBytesRead())
assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetUncompressedBytesRead())
assert.Equal(t, nonZipBytes, readBytes)
}

func TestPayloadBytesTracking(t *testing.T) {
// build a string
var ret string
for i := 0; i < 500; i++ {
ret += "00"
}

// make sure the bytes read are correct for gzipped streams
nonZipBytes := []byte(ret)

var b bytes.Buffer
w := gzip.NewWriter(&b)
w.Write(nonZipBytes)
w.Close()
zipBytes := b.Bytes()

reader, _ := NewReader(io.NopCloser(bytes.NewReader(zipBytes)), true, 10000)
payloadReader, _ := reader.(*PayloadReader)
readBytes, _ := io.ReadAll(reader)

assert.Equal(t, int64(len(zipBytes)), payloadReader.GetBytesRead()) // compressed is 32
assert.Equal(t, int64(len(nonZipBytes)), payloadReader.GetUncompressedBytesRead()) // uncompressed is 1000
assert.Equal(t, nonZipBytes, readBytes)
}

func TestZipBombing(t *testing.T) {
// build a string
var ret string
for i := 0; i < 500; i++ {
ret += "00"
}

nonZipBytes := []byte(ret)

var b bytes.Buffer
w := gzip.NewWriter(&b)
w.Write(nonZipBytes)
w.Close()
zipBytes := b.Bytes()

maxBytes := int64(100)
reader, _ := NewReader(io.NopCloser(bytes.NewReader(zipBytes)), true, maxBytes)
payloadReader, _ := reader.(*PayloadReader)
bytesRead, err := io.ReadAll(reader)

// we should have an error because the uncompressed stream is larger than maxBytes
assert.Error(t, err)
assert.Equal(t, int64(len(bytesRead)), maxBytes)

assert.Equal(t, int64(len(zipBytes)), payloadReader.GetBytesRead())
assert.Equal(t, maxBytes, payloadReader.GetUncompressedBytesRead())

}
14 changes: 9 additions & 5 deletions relay/relay_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,18 @@ func (r *Relay) makeRouter() *mux.Router {
clientSideStreamEvalRouter.Handle("", middleware.CountBrowserConns(jsPingWithUser)).Methods("REPORT", "OPTIONS")

mobileEventsRouter := router.PathPrefix("/mobile").Subrouter()
mobileEventsRouter.Use(mobileMiddlewareStack)
mobileEventsRouter.Use(mobileMiddlewareStack, middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize))
mobileEventsRouter.Handle("/events/bulk", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST")
mobileEventsRouter.Handle("/events", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST")
mobileEventsRouter.Handle("", bulkEventHandler(basictypes.MobileSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST")
mobileEventsRouter.Handle("/events/diagnostic", bulkEventHandler(basictypes.MobileSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST")

clientSideBulkEventsRouter := router.PathPrefix("/events/bulk/{envId}").Subrouter()
clientSideBulkEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter))
clientSideBulkEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter), middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize))
clientSideBulkEventsRouter.Handle("", bulkEventHandler(basictypes.JSClientSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST", "OPTIONS")

clientSideDiagnosticEventsRouter := router.PathPrefix("/events/diagnostic/{envId}").Subrouter()
clientSideDiagnosticEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter))
clientSideDiagnosticEventsRouter.Use(jsClientSideMiddlewareStack(clientSideBulkEventsRouter), middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize))
clientSideDiagnosticEventsRouter.Handle("", bulkEventHandler(basictypes.JSClientSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST", "OPTIONS")

clientSideImageEventsRouter := router.PathPrefix("/a/{envId}.gif").Subrouter()
Expand All @@ -144,8 +144,12 @@ func (r *Relay) makeRouter() *mux.Router {

serverSideRouter := router.PathPrefix("").Subrouter()
serverSideRouter.Use(serverSideMiddlewareStack)
serverSideRouter.Handle("/bulk", bulkEventHandler(basictypes.ServerSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST")
serverSideRouter.Handle("/diagnostic", bulkEventHandler(basictypes.ServerSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST")

serverSideBulkEventsRouter := serverSideRouter.NewRoute().Subrouter()
serverSideBulkEventsRouter.Use(middleware.GzipMiddleware(r.config.Events.MaxReceivablePayloadSize))
serverSideBulkEventsRouter.Handle("/bulk", bulkEventHandler(basictypes.ServerSDK, ldevents.AnalyticsEventDataKind, offlineMode)).Methods("POST")
serverSideBulkEventsRouter.Handle("/diagnostic", bulkEventHandler(basictypes.ServerSDK, ldevents.DiagnosticEventDataKind, offlineMode)).Methods("POST")

serverSideRouter.Handle("/all", middleware.CountServerConns(middleware.Streaming(
streamHandler(r.serverSideStreamProvider, serverSideStreamLogMessage),
))).Methods("GET")
Expand Down

0 comments on commit 6c0014a

Please sign in to comment.