Skip to content

Commit

Permalink
Remove race condition when accessing remote bulker map (#4171) (#4186)
Browse files Browse the repository at this point in the history
Use the remoteOutputMutex whenever accessing the bulkerMap.
Change GetBulkerMap to return a copy of the map so that remote output health will not conflict with adding/removing a bulker from the map.

(cherry picked from commit 924ea07)

Co-authored-by: Michel Laterman <[email protected]>
  • Loading branch information
mergify[bot] and michel-laterman authored Dec 9, 2024
1 parent f2315df commit 67e6bff
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Remove race in remote bulker access

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: fleet-server

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/fleet-server/issues/4170
59 changes: 58 additions & 1 deletion internal/pkg/bulk/bulk_remote_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"

testlog "github.com/elastic/fleet-server/v7/internal/pkg/testing/log"
)

func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {
Expand Down Expand Up @@ -78,9 +80,10 @@ func Test_hasChangedAndUpdateRemoteOutputConfig(t *testing.T) {

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
log := testlog.SetLogger(t)
bulker := NewBulker(nil, nil)
bulker.remoteOutputConfigMap["remote1"] = tc.cfg
hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(zerolog.Nop(), "remote1", tc.newCfg)
hasChanged := bulker.hasChangedAndUpdateRemoteOutputConfig(log, "remote1", tc.newCfg)
assert.Equal(t, tc.changed, hasChanged)
assert.Equal(t, tc.newCfg, bulker.remoteOutputConfigMap["remote1"])
})
Expand Down Expand Up @@ -148,3 +151,57 @@ func Test_CreateAndGetBulkerChanged(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, true, cancelFnCalled)
}

func Benchmark_CreateAndGetBulker(b *testing.B) {
b.Skip("Crashes on remote runner")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := zerolog.Nop()
outputMap := map[string]map[string]any{
"remote1": map[string]any{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "token1",
},
}
b.Run("new remote bulker", func(b *testing.B) {
bulker := NewBulker(nil, nil)
b.ReportAllocs()
for range b.N {
b.StopTimer()
bulker.bulkerMap = make(map[string]Bulk)
bulker.remoteOutputConfigMap = make(map[string]map[string]any)
b.StartTimer()

bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
}
})
b.Run("existing remote bulker", func(b *testing.B) {
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
bulker.bulkerMap["remote1"] = outputBulker
bulker.remoteOutputConfigMap["remote1"] = outputMap["remote1"]
b.ResetTimer()
b.ReportAllocs()
for range b.N {
bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
}
})
b.Run("changed remote bulker", func(b *testing.B) {
b.ReportAllocs()
for range b.N {
b.StopTimer()
bulker := NewBulker(nil, nil)
outputBulker := NewBulker(nil, nil)
bulker.bulkerMap["remote1"] = outputBulker
bulker.remoteOutputConfigMap["remote1"] = map[string]any{
"type": "remote_elasticsearch",
"hosts": []interface{}{"https://remote-es:443"},
"service_token": "wrong token",
}
b.StartTimer()

bulker.CreateAndGetBulker(ctx, log, "remote1", outputMap)
}
})
}
15 changes: 14 additions & 1 deletion internal/pkg/bulk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,20 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker
}

func (b *Bulker) GetBulker(outputName string) Bulk {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
return b.bulkerMap[outputName]
}

// GetBulkerMap returns a copy of the remote output bulkers
func (b *Bulker) GetBulkerMap() map[string]Bulk {
return b.bulkerMap
mp := make(map[string]Bulk)
b.remoteOutputMutex.RLock()
for k, v := range b.bulkerMap {
mp[k] = v
}
b.remoteOutputMutex.RUnlock()
return mp
}

func (b *Bulker) CancelFn() context.CancelFunc {
Expand All @@ -153,7 +162,9 @@ func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) {
// if changed, stop the existing bulker and create a new one
func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) {
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName])
b.remoteOutputMutex.RLock()
bulker := b.bulkerMap[outputName]
b.remoteOutputMutex.RUnlock()
if bulker != nil && !hasConfigChanged {
return bulker, false, nil
}
Expand Down Expand Up @@ -426,6 +437,8 @@ func (b *Bulker) Run(ctx context.Context) error {

// cancelling context of each remote bulker when Run exits
defer func() {
b.remoteOutputMutex.RLock()
defer b.remoteOutputMutex.RUnlock()
for _, bulker := range b.bulkerMap {
bulker.CancelFn()()
}
Expand Down

0 comments on commit 67e6bff

Please sign in to comment.