diff --git a/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml b/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml new file mode 100644 index 000000000..d8cf85bea --- /dev/null +++ b/changelog/fragments/1733181546-Remove-race-in-remote-bulker-access.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Remove race in remote bulker access + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: fleet-server + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/fleet-server/issues/4170 diff --git a/internal/pkg/bulk/bulk_remote_output_test.go b/internal/pkg/bulk/bulk_remote_output_test.go index b61f54d9e..cc59505c4 100644 --- a/internal/pkg/bulk/bulk_remote_output_test.go +++ b/internal/pkg/bulk/bulk_remote_output_test.go @@ -12,6 +12,8 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + + testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log" ) func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) { @@ -78,11 +80,14 @@ func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { + log := testlog.SetLogger(t) bulker := NewBulker(nil, nil) - bulker.remoteOutputConfigMap["remote1"] = tc.cfg - hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(zerolog.Nop(), "remote1", tc.newCfg) + bulker.remoteOutputConfigMap.Store("remote1", tc.cfg) + hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg) assert.Equal(t, tc.changed, hasChanged) - assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"]) + result, ok := bulker.remoteOutputConfigMap.Load("remote1") + assert.True(t, ok) + assert.Equal(t, tc.newCfg, result) }) } } @@ -108,14 +113,14 @@ func Test_CreateAndGetBulkerExisting(t *testing.T) { defer cn() bulker := NewBulker(nil, nil) outputBulker := NewBulker(nil, nil) - bulker.bulkerMap["remote1"] = outputBulker + bulker.bulkerMap.Store("remote1", outputBulker) outputMap := make(map[string]map[string]interface{}) cfg := map[string]interface{}{ "type": "remote_elasticsearch", "hosts": []interface{}{"https://remote-es:443"}, "service_token": "token1", } - bulker.remoteOutputConfigMap["remote1"] = cfg + bulker.remoteOutputConfigMap.Store("remote1", cfg) outputMap["remote1"] = cfg newBulker, hasChanged, err := bulker.CreateAndGetBulker(ctx, zerolog.Nop(), "remote1", outputMap) assert.Equal(t, outputBulker, newBulker) @@ -128,13 +133,13 @@ func Test_CreateAndGetBulkerChanged(t *testing.T) { defer cn() bulker := NewBulker(nil, nil) outputBulker := NewBulker(nil, nil) - bulker.bulkerMap["remote1"] = outputBulker + bulker.bulkerMap.Store("remote1", outputBulker) outputMap := make(map[string]map[string]interface{}) - bulker.remoteOutputConfigMap["remote1"] = map[string]interface{}{ + bulker.remoteOutputConfigMap.Store("remote1", map[string]interface{}{ "type": "remote_elasticsearch", "hosts": []interface{}{"https://remote-es:443"}, "service_token": "token1", - } + }) outputMap["remote1"] = map[string]interface{}{ "type": "remote_elasticsearch", "hosts": []interface{}{"https://remote-es:443"}, diff --git a/internal/pkg/bulk/engine.go b/internal/pkg/bulk/engine.go index 7f28d4451..171ca9f9f 100644 --- a/internal/pkg/bulk/engine.go +++ b/internal/pkg/bulk/engine.go @@ -21,11 +21,12 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/logger" "github.com/elastic/go-ucfg" - "github.com/elastic/go-elasticsearch/v8" - "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "golang.org/x/sync/semaphore" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" ) type APIKey = apikey.APIKey @@ -90,10 +91,9 @@ type Bulker struct { blkPool sync.Pool apikeyLimit *semaphore.Weighted tracer *apm.Tracer - remoteOutputConfigMap map[string]map[string]interface{} - bulkerMap map[string]Bulk + remoteOutputConfigMap sync.Map + bulkerMap sync.Map cancelFn context.CancelFunc - remoteOutputMutex sync.RWMutex } const ( @@ -115,24 +115,38 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker } return &Bulker{ - opts: bopts, - es: es, - ch: make(chan *bulkT, bopts.blockQueueSz), - blkPool: sync.Pool{New: poolFunc}, - apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)), - tracer: tracer, - remoteOutputConfigMap: make(map[string]map[string]interface{}), - // remote ES bulkers - bulkerMap: make(map[string]Bulk), + opts: bopts, + es: es, + ch: make(chan *bulkT, bopts.blockQueueSz), + blkPool: sync.Pool{New: poolFunc}, + apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)), + tracer: tracer, } } func (b *Bulker) GetBulker(outputName string) Bulk { - return b.bulkerMap[outputName] + o, ok := b.bulkerMap.Load(outputName) + if !ok { + return nil + } + return o.(Bulk) } +// GetBulkerMap returns a copy of the BulkerMap func (b *Bulker) GetBulkerMap() map[string]Bulk { - return b.bulkerMap + mp := make(map[string]Bulk) + for k, v := range b.bulkerMap.Range { + str, ok := k.(string) + if !ok { + continue + } + blk, ok := v.(Bulk) + if !ok { + continue + } + mp[str] = blk + } + return mp } func (b *Bulker) CancelFn() context.CancelFunc { @@ -140,11 +154,7 @@ func (b *Bulker) CancelFn() context.CancelFunc { } func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) { - // concurrency control of updating map - b.remoteOutputMutex.Lock() - defer b.remoteOutputMutex.Unlock() - - b.bulkerMap[outputName] = newBulker + b.bulkerMap.Store(outputName, newBulker) } // for remote ES output, create a new bulker in bulkerMap if does not exist @@ -153,12 +163,12 @@ func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) { // if changed, stop the existing bulker and create a new one func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) { hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName]) - bulker := b.bulkerMap[outputName] - if bulker != nil && !hasConfigChanged { - return bulker, false, nil + bulker, ok := b.bulkerMap.Load(outputName) + if ok && !hasConfigChanged { + return bulker.(Bulk), false, nil } - if bulker != nil && hasConfigChanged { - cancelFn := bulker.CancelFn() + if ok && hasConfigChanged { + cancelFn := bulker.(Bulk).CancelFn() if cancelFn != nil { cancelFn() } @@ -246,15 +256,12 @@ func (b *Bulker) Client() *elasticsearch.Client { } func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { - curCfg := b.remoteOutputConfigMap[name] - - hasChanged := false - - // when output config first added, not reporting change - if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) { - hasChanged = true + curCfg, ok := b.remoteOutputConfigMap.Load(name) + if !ok || curCfg.(map[string]any) == nil { + return false } - return hasChanged + + return !reflect.DeepEqual(curCfg, newCfg) } // check if remote output cfg changed @@ -268,7 +275,7 @@ func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name for k, v := range newCfg { newCfgCopy[k] = v } - b.remoteOutputConfigMap[name] = newCfgCopy + b.remoteOutputConfigMap.Store(name, newCfgCopy) return hasChanged } @@ -426,8 +433,8 @@ func (b *Bulker) Run(ctx context.Context) error { // cancelling context of each remote bulker when Run exits defer func() { - for _, bulker := range b.bulkerMap { - bulker.CancelFn()() + for _, v := range b.bulkerMap.Range { + v.(Bulk).CancelFn()() } }()