diff --git a/internal/streams/stream_provider_server_side.go b/internal/streams/stream_provider_server_side.go index fa2696de..60d07677 100644 --- a/internal/streams/stream_provider_server_side.go +++ b/internal/streams/stream_provider_server_side.go @@ -2,7 +2,10 @@ package streams import ( "net/http" + "os" + "strconv" "sync" + "time" "github.com/launchdarkly/ld-relay/v8/internal/sdkauth" @@ -109,6 +112,7 @@ func (r *serverSideEnvStreamRepository) Replay(channel, id string) chan eventsou // getReplayEvent will return a ServerSidePutEvent with all the data needed for a Replay. func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, error) { data, err, _ := r.flightGroup.Do("getReplayEvent", func() (interface{}, error) { + start := time.Now() flags, err := r.store.GetAll(ldstoreimpl.Features()) if err != nil { @@ -126,7 +130,19 @@ func (r *serverSideEnvStreamRepository) getReplayEvent() (eventsource.Event, err {Kind: ldstoreimpl.Segments(), Items: removeDeleted(segments)}, } + // This call uses a lot of system resources (RAM in particular). event := MakeServerSidePutEvent(allData) + // So we sleep for a bit to allow a bunch of concurrent calls to + // all make use of this same flightGroup. + delayS := os.Getenv("LD_STREAMING_DELAY_SECONDS") + if delay, err := strconv.Atoi(delayS); err == nil { + if delay > 0 && delay <= 60 { + time.Sleep(time.Duration(delay)*time.Second - time.Since(start)) + } else { + r.loggers.Warnf("Ignoring invalid value for LD_STREAMING_DELAY_SECONDS: %s", delayS) + } + } + return event, nil })