Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: streaming/polling processors take DataDestination/StatusReporter interfaces #191

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Requester interface {
// configuration. All other code outside of this package should interact with it only via the
// DataSource interface.
type PollingProcessor struct {
dataSourceUpdates subsystems.DataSourceUpdateSink
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
requester Requester
pollInterval time.Duration
loggers ldlog.Loggers
Expand All @@ -45,25 +46,28 @@ type PollingProcessor struct {
// NewPollingProcessor creates the internal implementation of the polling data source.
func NewPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
cfg datasource.PollingConfig,
) *PollingProcessor {
httpRequester := newPollingRequester(context, context.GetHTTP().CreateHTTPClient(), cfg.BaseURI, cfg.FilterKey)
return newPollingProcessor(context, dataSourceUpdates, httpRequester, cfg.PollInterval)
return newPollingProcessor(context, dataDestination, statusReporter, httpRequester, cfg.PollInterval)
}

func newPollingProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
requester Requester,
pollInterval time.Duration,
) *PollingProcessor {
pp := &PollingProcessor{
dataSourceUpdates: dataSourceUpdates,
requester: requester,
pollInterval: pollInterval,
loggers: context.GetLogging().Loggers,
quit: make(chan struct{}),
dataDestination: dataDestination,
statusReporter: statusReporter,
requester: requester,
pollInterval: pollInterval,
loggers: context.GetLogging().Loggers,
quit: make(chan struct{}),
}
return pp
}
Expand Down Expand Up @@ -106,9 +110,9 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
pollingWillRetryMessage,
)
if recoverable {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
} else {
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
notifyReady()
return
}
Expand All @@ -122,11 +126,11 @@ func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
errorInfo.Kind = interfaces.DataSourceErrorKindInvalidData
}
checkIfErrorIsRecoverableAndLog(pp.loggers, err.Error(), pollingErrorContext, 0, pollingWillRetryMessage)
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
}
continue
}
pp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
pp.statusReporter.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
pp.setInitializedOnce.Do(func() {
pp.isInitialized.Set(true)
pp.loggers.Info("First polling request successful")
Expand All @@ -146,7 +150,7 @@ func (pp *PollingProcessor) poll() error {

// We initialize the store only if the request wasn't cached
if !cached {
pp.dataSourceUpdates.Init(allData)
pp.dataDestination.Init(allData, nil)
}
return nil
}
Expand Down
73 changes: 23 additions & 50 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,14 @@ const (
// DataSource interface.
type StreamProcessor struct {
cfg datasource.StreamConfig
dataSourceUpdates subsystems.DataSourceUpdateSink
dataDestination subsystems.DataDestination
statusReporter subsystems.DataSourceStatusReporter
client *http.Client
headers http.Header
diagnosticsManager *ldevents.DiagnosticsManager
loggers ldlog.Loggers
isInitialized internal.AtomicBoolean
halt chan struct{}
storeStatusCh <-chan interfaces.DataStoreStatus
connectionAttemptStartTime ldtime.UnixMillisecondTime
connectionAttemptLock sync.Mutex
readyOnce sync.Once
Expand All @@ -91,15 +91,17 @@ type StreamProcessor struct {
// NewStreamProcessor creates the internal implementation of the streaming data source.
func NewStreamProcessor(
context subsystems.ClientContext,
dataSourceUpdates subsystems.DataSourceUpdateSink,
dataDestination subsystems.DataDestination,
statusReporter subsystems.DataSourceStatusReporter,
cfg datasource.StreamConfig,
) *StreamProcessor {
sp := &StreamProcessor{
dataSourceUpdates: dataSourceUpdates,
headers: context.GetHTTP().DefaultHeaders,
loggers: context.GetLogging().Loggers,
halt: make(chan struct{}),
cfg: cfg,
dataDestination: dataDestination,
statusReporter: statusReporter,
headers: context.GetHTTP().DefaultHeaders,
loggers: context.GetLogging().Loggers,
halt: make(chan struct{}),
cfg: cfg,
}
if cci, ok := context.(*internal.ClientContextImpl); ok {
sp.diagnosticsManager = cci.DiagnosticsManager
Expand All @@ -123,9 +125,6 @@ func (sp *StreamProcessor) IsInitialized() bool {
//nolint:revive // no doc comment for standard method
func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) {
sp.loggers.Info("Starting LaunchDarkly streaming connection")
if sp.dataSourceUpdates.GetDataStoreStatusProvider().IsStatusMonitoringEnabled() {
sp.storeStatusCh = sp.dataSourceUpdates.GetDataStoreStatusProvider().AddStatusListener()
}
go sp.subscribe(closeWhenReady)
}

Expand Down Expand Up @@ -183,21 +182,16 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
Message: err.Error(),
Time: time.Now(),
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)

shouldRestart = true // scenario 1 in error handling comments at top of file
processedEvent = false
}

storeUpdateFailed := func(updateDesc string) {
if sp.storeStatusCh != nil {
sp.loggers.Errorf("Failed to store %s in data store; will try again once data store is working", updateDesc)
// scenario 2a in error handling comments at top of file
} else {
sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
shouldRestart = true // scenario 2b
processedEvent = false
}
sp.loggers.Errorf("Failed to store %s in data store; will restart stream until successful", updateDesc)
shouldRestart = true // scenario 2b
processedEvent = false
}

switch event.Event() {
Expand Down Expand Up @@ -266,18 +260,18 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
for _, update := range updates {
switch u := update.(type) {
case datasource.PatchData:
if !sp.dataSourceUpdates.Upsert(u.Kind, u.Key, u.Data) {
if !sp.dataDestination.Upsert(u.Kind, u.Key, u.Data) {
storeUpdateFailed("streaming update of " + u.Key)
}
case datasource.PutData:
if sp.dataSourceUpdates.Init(u.Data) {
if sp.dataDestination.Init(u.Data, nil) {
sp.setInitializedAndNotifyClient(true, closeWhenReady)
} else {
storeUpdateFailed("initial streaming data")
}
case datasource.DeleteData:
deletedItem := ldstoretypes.ItemDescriptor{Version: u.Version, Item: nil}
if !sp.dataSourceUpdates.Upsert(u.Kind, u.Key, deletedItem) {
if !sp.dataDestination.Upsert(u.Kind, u.Key, deletedItem) {
storeUpdateFailed("streaming deletion of " + u.Key)
}

Expand All @@ -291,30 +285,12 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
}

if processedEvent {
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateValid, interfaces.DataSourceErrorInfo{})
}
if shouldRestart {
stream.Restart()
}

case newStoreStatus := <-sp.storeStatusCh:
if sp.loggers.IsDebugEnabled() {
sp.loggers.Debugf("StreamProcessorV2 received store status update: %+v", newStoreStatus)
}
if newStoreStatus.Available {
// The store has just transitioned from unavailable to available (scenario 2a above)
if newStoreStatus.NeedsRefresh {
// The store is telling us that it can't guarantee that all of the latest data was cached.
// So we'll restart the stream to ensure a full refresh.
sp.loggers.Warn("Restarting stream to refresh data after data store outage")
stream.Restart()
}
// All of the updates were cached and have been written to the store, so we don't need to
// restart the stream. We just need to make sure the client knows we're initialized now
// (in case the initial "put" was not stored).
sp.setInitializedAndNotifyClient(true, closeWhenReady)
}

case <-sp.halt:
stream.Close()
return
Expand All @@ -329,7 +305,7 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
"Unable to create a stream request; this is not a network problem, most likely a bad base URI: %s",
reqErr,
)
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{
Kind: interfaces.DataSourceErrorKindUnknown,
Message: reqErr.Error(),
Time: time.Now(),
Expand Down Expand Up @@ -373,10 +349,10 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
)
if recoverable {
sp.logConnectionStarted()
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
return es.StreamErrorHandlerResult{CloseNow: false}
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, errorInfo)
return es.StreamErrorHandlerResult{CloseNow: true}
}

Expand All @@ -392,7 +368,7 @@ func (sp *StreamProcessor) subscribe(closeWhenReady chan<- struct{}) {
Message: err.Error(),
Time: time.Now(),
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateInterrupted, errorInfo)
sp.logConnectionStarted()
return es.StreamErrorHandlerResult{CloseNow: false}
}
Expand Down Expand Up @@ -453,10 +429,7 @@ func (sp *StreamProcessor) logConnectionResult(success bool) {
func (sp *StreamProcessor) Close() error {
sp.closeOnce.Do(func() {
close(sp.halt)
if sp.storeStatusCh != nil {
sp.dataSourceUpdates.GetDataStoreStatusProvider().RemoveStatusListener(sp.storeStatusCh)
}
sp.dataSourceUpdates.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
sp.statusReporter.UpdateStatus(interfaces.DataSourceStateOff, interfaces.DataSourceErrorInfo{})
})
return nil
}
Expand Down
104 changes: 104 additions & 0 deletions internal/sharedtest/mocks/mock_data_destination.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package mocks

import (
"sync"
"testing"
"time"

"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoretypes"

th "github.com/launchdarkly/go-test-helpers/v3"

"github.com/stretchr/testify/assert"
)

// MockDataDestination is a mock implementation of a data destination used by tests involving FDv2 data sources.
type MockDataDestination struct {
DataStore *CapturingDataStore
Statuses chan interfaces.DataSourceStatus
dataStoreStatusProvider *mockDataStoreStatusProvider
lastStatus interfaces.DataSourceStatus
lock sync.Mutex
}

// NewMockDataDestination creates an instance of MockDataDestination.
//
// The DataStoreStatusProvider can be nil if we are not doing a test that requires manipulation of that
// component.
func NewMockDataDestination(realStore subsystems.DataStore) *MockDataDestination {
dataStore := NewCapturingDataStore(realStore)
dataStoreStatusProvider := &mockDataStoreStatusProvider{
dataStore: dataStore,
status: interfaces.DataStoreStatus{Available: true},
statusCh: make(chan interfaces.DataStoreStatus, 10),
}
return &MockDataDestination{
DataStore: dataStore,
Statuses: make(chan interfaces.DataSourceStatus, 10),
dataStoreStatusProvider: dataStoreStatusProvider,
}
}

// Init in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) Init(allData []ldstoretypes.Collection, _ *int) bool {
// For now, the payloadVersion is ignored. When the data sources start making use of it, it should be
// stored so that assertions can be made.
for _, coll := range allData {
AssertNotNil(coll.Kind)
}
err := d.DataStore.Init(allData)
return err == nil
}

// Upsert in this test implementation, delegates to d.DataStore.CapturedUpdates.
func (d *MockDataDestination) Upsert(
kind ldstoretypes.DataKind,
key string,
newItem ldstoretypes.ItemDescriptor,
) bool {
AssertNotNil(kind)
_, err := d.DataStore.Upsert(kind, key, newItem)
return err == nil
}

// UpdateStatus in this test implementation, pushes a value onto the Statuses channel.
func (d *MockDataDestination) UpdateStatus(
newState interfaces.DataSourceState,
newError interfaces.DataSourceErrorInfo,
) {
d.lock.Lock()
defer d.lock.Unlock()
if newState != d.lastStatus.State || newError.Kind != "" {
d.lastStatus = interfaces.DataSourceStatus{State: newState, LastError: newError}
d.Statuses <- d.lastStatus
}
}

// GetDataStoreStatusProvider returns a stub implementation that does not have full functionality
// but enough to test a data source with.
func (d *MockDataDestination) GetDataStoreStatusProvider() interfaces.DataStoreStatusProvider {
return d.dataStoreStatusProvider
}

// UpdateStoreStatus simulates a change in the data store status.
func (d *MockDataDestination) UpdateStoreStatus(newStatus interfaces.DataStoreStatus) {
d.dataStoreStatusProvider.statusCh <- newStatus
}

// RequireStatusOf blocks until a new data source status is available, and verifies its state.
func (d *MockDataDestination) RequireStatusOf(
t *testing.T,
newState interfaces.DataSourceState,
) interfaces.DataSourceStatus {
status := d.RequireStatus(t)
assert.Equal(t, string(newState), string(status.State))
// string conversion is due to a bug in assert with type aliases
return status
}

// RequireStatus blocks until a new data source status is available.
func (d *MockDataDestination) RequireStatus(t *testing.T) interfaces.DataSourceStatus {
return th.RequireValue(t, d.Statuses, time.Second, "timed out waiting for new data source status")
}
Loading