Skip to content

Commit

Permalink
Merge pull request prometheus#13861 from simonpasquier/fix-sendall-de…
Browse files Browse the repository at this point in the history
…adlock

Notifier: fix deadlock when zero alerts
  • Loading branch information
bboreham authored Mar 29, 2024
2 parents 855b5ac + 8bd6ae1 commit c97a066
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 28 deletions.
5 changes: 5 additions & 0 deletions notifier/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,10 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
numSuccess atomic.Uint64
)
for _, ams := range amSets {
if len(ams.ams) == 0 {
continue
}

var (
payload []byte
err error
Expand All @@ -482,6 +486,7 @@ func (n *Manager) sendAll(alerts ...*Alert) bool {
if len(ams.cfg.AlertRelabelConfigs) > 0 {
amAlerts = relabelAlerts(ams.cfg.AlertRelabelConfigs, labels.Labels{}, alerts)
if len(amAlerts) == 0 {
ams.mtx.RUnlock()
continue
}
// We can't use the cached values from previous iteration.
Expand Down
95 changes: 67 additions & 28 deletions notifier/notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,17 +219,19 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
errc = make(chan error, 1)
expected1 = make([]*Alert, 0, maxBatchSize)
expected2 = make([]*Alert, 0, maxBatchSize)
expected3 = make([]*Alert, 0)

status1, status2 atomic.Int32
statusOK atomic.Int32
)
status1.Store(int32(http.StatusOK))
status2.Store(int32(http.StatusOK))
statusOK.Store(int32(http.StatusOK))

server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &status1)
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &status2)
server1 := newTestHTTPServerBuilder(&expected1, errc, "", "", &statusOK)
server2 := newTestHTTPServerBuilder(&expected2, errc, "", "", &statusOK)
server3 := newTestHTTPServerBuilder(&expected3, errc, "", "", &statusOK)

defer server1.Close()
defer server2.Close()
defer server3.Close()

h := NewManager(&Options{}, nil)
h.alertmanagers = make(map[string]*alertmanagerSet)
Expand All @@ -247,38 +249,68 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {
},
}

h.alertmanagers["1"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
am3Cfg := config.DefaultAlertmanagerConfig
am3Cfg.Timeout = model.Duration(time.Second)
am3Cfg.AlertRelabelConfigs = []*relabel.Config{
{
SourceLabels: model.LabelNames{"alertname"},
Action: "drop",
Regex: relabel.MustNewRegexp(".+"),
},
cfg: &am1Cfg,
}

h.alertmanagers["2"] = &alertmanagerSet{
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
h.alertmanagers = map[string]*alertmanagerSet{
// Drop no alerts.
"1": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server1.URL },
},
},
cfg: &am1Cfg,
},
// Drop only alerts with the "alertnamedrop" label.
"2": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server2.URL },
},
},
cfg: &am2Cfg,
},
// Drop all alerts.
"3": {
ams: []alertmanager{
alertmanagerMock{
urlf: func() string { return server3.URL },
},
},
cfg: &am3Cfg,
},
// Empty list of Alertmanager endpoints.
"4": {
ams: []alertmanager{},
cfg: &config.DefaultAlertmanagerConfig,
},
cfg: &am2Cfg,
}

for i := range make([]struct{}, maxBatchSize/2) {
h.queue = append(h.queue, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
h.queue = append(h.queue, &Alert{
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)),
})
h.queue = append(h.queue,
&Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
},
&Alert{
Labels: labels.FromStrings("alertname", "test", "alertnamedrop", fmt.Sprintf("%d", i)),
},
)

expected1 = append(expected1, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
})
expected1 = append(expected1, &Alert{
Labels: labels.FromStrings("alertnamedrop", fmt.Sprintf("%d", i)),
})
expected1 = append(expected1,
&Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
}, &Alert{
Labels: labels.FromStrings("alertname", "test", "alertnamedrop", fmt.Sprintf("%d", i)),
},
)

expected2 = append(expected2, &Alert{
Labels: labels.FromStrings("alertname", fmt.Sprintf("%d", i)),
Expand All @@ -296,6 +328,13 @@ func TestHandlerSendAllRemapPerAm(t *testing.T) {

require.True(t, h.sendAll(h.queue...), "all sends failed unexpectedly")
checkNoErr()

// Verify that individual locks are released.
for k := range h.alertmanagers {
h.alertmanagers[k].mtx.Lock()
h.alertmanagers[k].ams = nil
h.alertmanagers[k].mtx.Unlock()
}
}

func TestCustomDo(t *testing.T) {
Expand Down

0 comments on commit c97a066

Please sign in to comment.