-
Notifications
You must be signed in to change notification settings - Fork 81
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove race condition when accessing remote bulker map #4171
Changes from 3 commits
3fb37e8
4ec2546
3ca6a00
47d2615
478e59e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
# Kind can be one of: | ||
# - breaking-change: a change to previously-documented behavior | ||
# - deprecation: functionality that is being removed in a later release | ||
# - bug-fix: fixes a problem in a previous version | ||
# - enhancement: extends functionality but does not break or fix existing behavior | ||
# - feature: new functionality | ||
# - known-issue: problems that we are aware of in a given version | ||
# - security: impacts on the security of a product or a user’s deployment. | ||
# - upgrade: important information for someone upgrading from a prior version | ||
# - other: does not fit into any of the other categories | ||
kind: bug-fix | ||
|
||
# Change summary; a 80ish characters long description of the change. | ||
summary: Remove race in remote bulker access | ||
|
||
# Long description; in case the summary is not enough to describe the change | ||
# this field accommodate a description without length limits. | ||
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. | ||
#description: | ||
|
||
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. | ||
component: fleet-server | ||
|
||
# PR URL; optional; the PR number that added the changeset. | ||
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. | ||
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. | ||
# Please provide it if you are adding a fragment for a different PR. | ||
#pr: https://github.com/owner/repo/1234 | ||
|
||
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). | ||
# If not present is automatically filled by the tooling with the issue linked to the PR number. | ||
issue: https://github.com/elastic/fleet-server/issues/4170 |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,12 @@ import ( | |
"github.com/elastic/fleet-server/v7/internal/pkg/logger" | ||
"github.com/elastic/go-ucfg" | ||
|
||
"github.com/elastic/go-elasticsearch/v8" | ||
"github.com/elastic/go-elasticsearch/v8/esapi" | ||
"github.com/rs/zerolog" | ||
"go.elastic.co/apm/v2" | ||
"golang.org/x/sync/semaphore" | ||
|
||
"github.com/elastic/go-elasticsearch/v8" | ||
"github.com/elastic/go-elasticsearch/v8/esapi" | ||
) | ||
|
||
type APIKey = apikey.APIKey | ||
|
@@ -90,10 +91,9 @@ type Bulker struct { | |
blkPool sync.Pool | ||
apikeyLimit *semaphore.Weighted | ||
tracer *apm.Tracer | ||
remoteOutputConfigMap map[string]map[string]interface{} | ||
bulkerMap map[string]Bulk | ||
remoteOutputConfigMap sync.Map | ||
bulkerMap sync.Map | ||
cancelFn context.CancelFunc | ||
remoteOutputMutex sync.RWMutex | ||
} | ||
|
||
const ( | ||
|
@@ -115,36 +115,46 @@ func NewBulker(es esapi.Transport, tracer *apm.Tracer, opts ...BulkOpt) *Bulker | |
} | ||
|
||
return &Bulker{ | ||
opts: bopts, | ||
es: es, | ||
ch: make(chan *bulkT, bopts.blockQueueSz), | ||
blkPool: sync.Pool{New: poolFunc}, | ||
apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)), | ||
tracer: tracer, | ||
remoteOutputConfigMap: make(map[string]map[string]interface{}), | ||
// remote ES bulkers | ||
bulkerMap: make(map[string]Bulk), | ||
opts: bopts, | ||
es: es, | ||
ch: make(chan *bulkT, bopts.blockQueueSz), | ||
blkPool: sync.Pool{New: poolFunc}, | ||
apikeyLimit: semaphore.NewWeighted(int64(bopts.apikeyMaxParallel)), | ||
tracer: tracer, | ||
} | ||
} | ||
|
||
func (b *Bulker) GetBulker(outputName string) Bulk { | ||
return b.bulkerMap[outputName] | ||
o, ok := b.bulkerMap.Load(outputName) | ||
if !ok { | ||
return nil | ||
} | ||
return o.(Bulk) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this panic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it shouldn't, we are only adding Bulkers |
||
} | ||
|
||
// GetBulkerMap returns a copy of the BulkerMap | ||
func (b *Bulker) GetBulkerMap() map[string]Bulk { | ||
return b.bulkerMap | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From what I can see, we had a flaky test because the policy self-monitor ( |
||
mp := make(map[string]Bulk) | ||
for k, v := range b.bulkerMap.Range { | ||
str, ok := k.(string) | ||
if !ok { | ||
continue | ||
} | ||
blk, ok := v.(Bulk) | ||
if !ok { | ||
continue | ||
} | ||
mp[str] = blk | ||
} | ||
return mp | ||
} | ||
|
||
func (b *Bulker) CancelFn() context.CancelFunc { | ||
return b.cancelFn | ||
} | ||
|
||
func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) { | ||
// concurrency control of updating map | ||
b.remoteOutputMutex.Lock() | ||
defer b.remoteOutputMutex.Unlock() | ||
|
||
b.bulkerMap[outputName] = newBulker | ||
b.bulkerMap.Store(outputName, newBulker) | ||
} | ||
|
||
// for remote ES output, create a new bulker in bulkerMap if does not exist | ||
|
@@ -153,12 +163,12 @@ func (b *Bulker) updateBulkerMap(outputName string, newBulker *Bulker) { | |
// if changed, stop the existing bulker and create a new one | ||
func (b *Bulker) CreateAndGetBulker(ctx context.Context, zlog zerolog.Logger, outputName string, outputMap map[string]map[string]interface{}) (Bulk, bool, error) { | ||
hasConfigChanged := b.hasChangedAndUpdateRemoteOutputConfig(zlog, outputName, outputMap[outputName]) | ||
bulker := b.bulkerMap[outputName] | ||
if bulker != nil && !hasConfigChanged { | ||
return bulker, false, nil | ||
bulker, ok := b.bulkerMap.Load(outputName) | ||
if ok && !hasConfigChanged { | ||
return bulker.(Bulk), false, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming it is guaranteed that the sync.Map will only hold bulker type, is that the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
} | ||
if bulker != nil && hasConfigChanged { | ||
cancelFn := bulker.CancelFn() | ||
if ok && hasConfigChanged { | ||
cancelFn := bulker.(Bulk).CancelFn() | ||
if cancelFn != nil { | ||
cancelFn() | ||
} | ||
|
@@ -246,15 +256,12 @@ func (b *Bulker) Client() *elasticsearch.Client { | |
} | ||
|
||
func (b *Bulker) RemoteOutputConfigChanged(zlog zerolog.Logger, name string, newCfg map[string]interface{}) bool { | ||
curCfg := b.remoteOutputConfigMap[name] | ||
|
||
hasChanged := false | ||
|
||
// when output config first added, not reporting change | ||
if curCfg != nil && !reflect.DeepEqual(curCfg, newCfg) { | ||
hasChanged = true | ||
curCfg, ok := b.remoteOutputConfigMap.Load(name) | ||
if !ok || curCfg.(map[string]any) == nil { | ||
return false | ||
} | ||
return hasChanged | ||
|
||
return !reflect.DeepEqual(curCfg, newCfg) | ||
} | ||
|
||
// check if remote output cfg changed | ||
|
@@ -268,7 +275,7 @@ func (b *Bulker) hasChangedAndUpdateRemoteOutputConfig(zlog zerolog.Logger, name | |
for k, v := range newCfg { | ||
newCfgCopy[k] = v | ||
} | ||
b.remoteOutputConfigMap[name] = newCfgCopy | ||
b.remoteOutputConfigMap.Store(name, newCfgCopy) | ||
return hasChanged | ||
} | ||
|
||
|
@@ -426,8 +433,8 @@ func (b *Bulker) Run(ctx context.Context) error { | |
|
||
// cancelling context of each remote bulker when Run exits | ||
defer func() { | ||
for _, bulker := range b.bulkerMap { | ||
bulker.CancelFn()() | ||
for _, v := range b.bulkerMap.Range { | ||
v.(Bulk).CancelFn()() | ||
} | ||
}() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure why, but this causes issues when
make benchmark
is ran, but these tests can be ran individually without this