Skip to content

Commit

Permalink
fix extra conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
cwaldren-ld committed Nov 21, 2024
2 parents f6c6530 + 67a0a7d commit c824db3
Show file tree
Hide file tree
Showing 16 changed files with 1,069 additions and 28 deletions.
37 changes: 37 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,41 @@ type Config struct {
// LaunchDarkly provides integration packages, and most applications will not
// need to implement their own hooks.
Hooks []ldhooks.Hook

// This field is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
// It is not suitable for production usage. Do not use it. You have been warned.
//
// DataSystem configures how data (e.g. flags, segments) are retrieved by the SDK.
//
// Set this field only if you want to specify non-default values for any of the data system configuration,
// such as defining an alternate data source or setting up a persistent store.
//
// Below, the default configuration is described with the relevant config item in parentheses:
// 1. The SDK will first attempt to fetch all data from LaunchDarkly's global Content Delivery Network (Initializer)
// 2. It will then establish a streaming connection with LaunchDarkly's realtime Flag Delivery Network (Primary
// Synchronizer.)
// 3. If at any point the connection to the realtime network is interrupted for a short period of time,
// the connection will be automatically re-established.
// 4. If the connection cannot be re-established over a sustained period, the SDK will begin to make periodic
// requests to LaunchDarkly's global CDN (Secondary Synchronizer)
// 5. After a period of time, the SDK will swap back to the realtime Flag Delivery Network if it becomes
// available again.
//
// The default streaming mode configuration is preferred for most use-cases (DataSystem().StreamingPreferred()).
// Sometimes streaming connections are blocked by firewalls or proxies. If this is the case, a polling-only mode
// can be configured:
//
// config := ld.Config{
// DataSystem: ldcomponents.DataSystem().PollingOnly(),
// }
//
// If you'd like to load data from a local source to provide redundancy if there is a problem
// connecting to LaunchDarkly, you can add a custom initializer:
//
// config := ld.Config {
// DataSystem: ldcomponents.DataSystem().PrependInitializers(myCustomInitializer),
// }
//
// The initializer(s) will run before LaunchDarkly's default initializer.
DataSystem subsystems.ComponentConfigurer[subsystems.DataSystemConfiguration]
}
25 changes: 21 additions & 4 deletions internal/datasourcev2/polling_data_source.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datasourcev2

import (
"context"
"sync"
"time"

Expand Down Expand Up @@ -72,8 +73,24 @@ func newPollingProcessor(
return pp
}

//nolint:revive // no doc comment for standard method
func (pp *PollingProcessor) Start(closeWhenReady chan<- struct{}) {
//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Name() string {
return "PollingDataSourceV2"
}

//nolint:revive // DataInitializer method.
func (pp *PollingProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) {
//nolint:godox
// TODO(SDK-752): Plumb the context into the request method.
basis, err := pp.requester.Request()
if err != nil {
return nil, err
}
return &subsystems.Basis{Events: basis.Changes(), Selector: basis.Selector(), Persist: true}, nil
}

//nolint:revive // DataSynchronizer method.
func (pp *PollingProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
pp.loggers.Infof("Starting LaunchDarkly polling with interval: %+v", pp.pollInterval)

ticker := newTickerWithInitialTick(pp.pollInterval)
Expand Down Expand Up @@ -148,8 +165,8 @@ func (pp *PollingProcessor) poll() error {
return err
}

payload := changeSet.IntentCode().Payload
switch payload.Code {
code := changeSet.IntentCode()
switch code {
case fdv2proto.IntentTransferFull:
pp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true)
case fdv2proto.IntentTransferChanges:
Expand Down
22 changes: 16 additions & 6 deletions internal/datasourcev2/streaming_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package datasourcev2

import (
"encoding/json"
"errors"
"net/http"
"net/url"
"sync"
"time"

"github.com/launchdarkly/go-server-sdk/v7/internal/fdv2proto"

"context"
es "github.com/launchdarkly/eventsource"

"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-sdk-common/v3/ldtime"
ldevents "github.com/launchdarkly/go-sdk-events/v3"
Expand Down Expand Up @@ -80,7 +83,6 @@ type StreamProcessor struct {
connectionAttemptLock sync.Mutex
readyOnce sync.Once
closeOnce sync.Once
persist bool
}

// NewStreamProcessor creates the internal implementation of the streaming data source.
Expand All @@ -97,7 +99,6 @@ func NewStreamProcessor(
loggers: context.GetLogging().Loggers,
halt: make(chan struct{}),
cfg: cfg,
persist: true,
}
if cci, ok := context.(*internal.ClientContextImpl); ok {
sp.diagnosticsManager = cci.DiagnosticsManager
Expand All @@ -113,13 +114,22 @@ func NewStreamProcessor(
return sp
}

//nolint:revive // DataInitializer method.
func (sp *StreamProcessor) Name() string {
return "StreamingDataSourceV2"
}

func (sp *StreamProcessor) Fetch(_ context.Context) (*subsystems.Basis, error) {
return nil, errors.New("StreamProcessor does not implement Fetch capability")
}

//nolint:revive // no doc comment for standard method
func (sp *StreamProcessor) IsInitialized() bool {
return sp.isInitialized.Get()
}

//nolint:revive // no doc comment for standard method
func (sp *StreamProcessor) Start(closeWhenReady chan<- struct{}) {
//nolint:revive // DataSynchronizer method.
func (sp *StreamProcessor) Sync(closeWhenReady chan<- struct{}, _ fdv2proto.Selector) {
sp.loggers.Info("Starting LaunchDarkly streaming connection")
go sp.subscribe(closeWhenReady)
}
Expand Down Expand Up @@ -258,8 +268,8 @@ func (sp *StreamProcessor) consumeStream(stream *es.Stream, closeWhenReady chan<
break
}

payload := changeSet.IntentCode().Payload
switch payload.Code {
code := changeSet.IntentCode()
switch code {
case fdv2proto.IntentTransferFull:
sp.dataDestination.SetBasis(changeSet.Changes(), changeSet.Selector(), true)
sp.setInitializedAndNotifyClient(true, closeWhenReady)
Expand Down
20 changes: 17 additions & 3 deletions internal/datastore/data_store_status_provider_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,33 @@ package datastore

import (
"github.com/launchdarkly/go-server-sdk/v7/interfaces"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
)

type StatusMonitorable interface {
// IsStatusMonitoringEnabled returns true if this data store implementation supports status
// monitoring.
//
// This is normally only true for persistent data stores created with ldcomponents.PersistentDataStore(),
// but it could also be true for any custom DataStore implementation that makes use of the
// statusUpdater parameter provided to the DataStoreFactory. Returning true means that the store
// guarantees that if it ever enters an invalid state (that is, an operation has failed or it knows
// that operations cannot succeed at the moment), it will publish a status update, and will then
// publish another status update once it has returned to a valid state.
//
// The same value will be returned from DataStoreStatusProvider.IsStatusMonitoringEnabled().
IsStatusMonitoringEnabled() bool
}

// dataStoreStatusProviderImpl is the internal implementation of DataStoreStatusProvider. It's not
// exported because the rest of the SDK code only interacts with the public interface.
type dataStoreStatusProviderImpl struct {
store subsystems.DataStore
store StatusMonitorable
dataStoreUpdates *DataStoreUpdateSinkImpl
}

// NewDataStoreStatusProviderImpl creates the internal implementation of DataStoreStatusProvider.
func NewDataStoreStatusProviderImpl(
store subsystems.DataStore,
store StatusMonitorable,
dataStoreUpdates *DataStoreUpdateSinkImpl,
) interfaces.DataStoreStatusProvider {
return &dataStoreStatusProviderImpl{
Expand Down
Loading

0 comments on commit c824db3

Please sign in to comment.