diff --git a/alertobserver/alertobserver.go b/alertobserver/alertobserver.go new file mode 100644 index 0000000000..cf05d9210c --- /dev/null +++ b/alertobserver/alertobserver.go @@ -0,0 +1,36 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "github.com/prometheus/alertmanager/types" +) + +const ( + EventAlertReceived string = "received" + EventAlertRejected string = "rejected" + EventAlertAddedToAggrGroup string = "addedAggrGroup" + EventAlertFailedAddToAggrGroup string = "failedAddAggrGroup" + EventAlertPipelineStart string = "pipelineStart" + EventAlertPipelinePassStage string = "pipelinePassStage" + EventAlertMuted string = "muted" + EventAlertSent string = "sent" + EventAlertSendFailed string = "sendFailed" +) + +type AlertEventMeta map[string]interface{} + +type LifeCycleObserver interface { + Observe(event string, alerts []*types.Alert, meta AlertEventMeta) +} diff --git a/alertobserver/testing.go b/alertobserver/testing.go new file mode 100644 index 0000000000..66f774fbb7 --- /dev/null +++ b/alertobserver/testing.go @@ -0,0 +1,46 @@ +// Copyright 2023 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package alertobserver + +import ( + "sync" + + "github.com/prometheus/alertmanager/types" +) + +type FakeLifeCycleObserver struct { + AlertsPerEvent map[string][]*types.Alert + PipelineStageAlerts map[string][]*types.Alert + MetaPerEvent map[string][]AlertEventMeta + Mtx sync.RWMutex +} + +func (o *FakeLifeCycleObserver) Observe(event string, alerts []*types.Alert, meta AlertEventMeta) { + o.Mtx.Lock() + defer o.Mtx.Unlock() + if event == EventAlertPipelinePassStage { + o.PipelineStageAlerts[meta["stageName"].(string)] = append(o.PipelineStageAlerts[meta["stageName"].(string)], alerts...) + } else { + o.AlertsPerEvent[event] = append(o.AlertsPerEvent[event], alerts...) + } + o.MetaPerEvent[event] = append(o.MetaPerEvent[event], meta) +} + +func NewFakeLifeCycleObserver() *FakeLifeCycleObserver { + return &FakeLifeCycleObserver{ + PipelineStageAlerts: map[string][]*types.Alert{}, + AlertsPerEvent: map[string][]*types.Alert{}, + MetaPerEvent: map[string][]AlertEventMeta{}, + } +} diff --git a/api/api.go b/api/api.go index b48a38c324..e7f4e6e95f 100644 --- a/api/api.go +++ b/api/api.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/common/route" + "github.com/prometheus/alertmanager/alertobserver" apiv2 "github.com/prometheus/alertmanager/api/v2" "github.com/prometheus/alertmanager/cluster" "github.com/prometheus/alertmanager/config" @@ -81,6 +82,9 @@ type Options struct { GroupInfoFunc func(func(*dispatch.Route) bool) dispatch.AlertGroupInfos // APICallback define the callback function that each api call will perform before returned. APICallback callback.Callback + // AlertLCObserver is used to add hooks to the different alert life cycle events. + // If nil then no observer methods will be invoked in the life cycle events. + AlertLCObserver alertobserver.LifeCycleObserver } func (o Options) validate() error { @@ -127,6 +131,7 @@ func New(opts Options) (*API, error) { opts.Peer, log.With(l, "version", "v2"), opts.Registry, + opts.AlertLCObserver, ) if err != nil { return nil, err diff --git a/api/v1/api.go b/api/v1/api.go new file mode 100644 index 0000000000..d8b4ce0a19 --- /dev/null +++ b/api/v1/api.go @@ -0,0 +1,823 @@ +// Copyright 2015 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1 + +import ( + "encoding/json" + "errors" + "fmt" + "net/http" + "regexp" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/common/route" + "github.com/prometheus/common/version" + + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/api/metrics" + "github.com/prometheus/alertmanager/cluster" + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/pkg/labels" + "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/silence" + "github.com/prometheus/alertmanager/silence/silencepb" + "github.com/prometheus/alertmanager/types" +) + +var corsHeaders = map[string]string{ + "Access-Control-Allow-Headers": "Accept, Authorization, Content-Type, Origin", + "Access-Control-Allow-Methods": "GET, POST, DELETE, OPTIONS", + "Access-Control-Allow-Origin": "*", + "Access-Control-Expose-Headers": "Date", + "Cache-Control": "no-cache, no-store, must-revalidate", +} + +// Alert is the API representation of an alert, which is a regular alert +// annotated with silencing and inhibition info. +type Alert struct { + *model.Alert + Status types.AlertStatus `json:"status"` + Receivers []string `json:"receivers"` + Fingerprint string `json:"fingerprint"` +} + +// Enables cross-site script calls. +func setCORS(w http.ResponseWriter) { + for h, v := range corsHeaders { + w.Header().Set(h, v) + } +} + +// API provides registration of handlers for API routes. +type API struct { + alerts provider.Alerts + silences *silence.Silences + config *config.Config + route *dispatch.Route + uptime time.Time + peer cluster.ClusterPeer + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver + + getAlertStatus getAlertStatusFn + + mtx sync.RWMutex +} + +type getAlertStatusFn func(model.Fingerprint) types.AlertStatus + +// New returns a new API. +func New( + alerts provider.Alerts, + silences *silence.Silences, + sf getAlertStatusFn, + peer cluster.ClusterPeer, + l log.Logger, + r prometheus.Registerer, + o alertobserver.LifeCycleObserver, +) *API { + if l == nil { + l = log.NewNopLogger() + } + + return &API{ + alerts: alerts, + silences: silences, + getAlertStatus: sf, + uptime: time.Now(), + peer: peer, + logger: l, + m: metrics.NewAlerts("v1", r), + alertLCObserver: o, + } +} + +// Register registers the API handlers under their correct routes +// in the given router. +func (api *API) Register(r *route.Router) { + wrap := func(f http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + setCORS(w) + f(w, r) + }) + } + + r.Options("/*path", wrap(func(w http.ResponseWriter, r *http.Request) {})) + + r.Get("/status", wrap(api.status)) + r.Get("/receivers", wrap(api.receivers)) + + r.Get("/alerts", wrap(api.listAlerts)) + r.Post("/alerts", wrap(api.addAlerts)) + + r.Get("/silences", wrap(api.listSilences)) + r.Post("/silences", wrap(api.setSilence)) + r.Get("/silence/:sid", wrap(api.getSilence)) + r.Del("/silence/:sid", wrap(api.delSilence)) +} + +// Update sets the configuration string to a new value. +func (api *API) Update(cfg *config.Config) { + api.mtx.Lock() + defer api.mtx.Unlock() + + api.config = cfg + api.route = dispatch.NewRoute(cfg.Route, nil) +} + +type errorType string + +const ( + errorInternal errorType = "server_error" + errorBadData errorType = "bad_data" +) + +type apiError struct { + typ errorType + err error +} + +func (e *apiError) Error() string { + return fmt.Sprintf("%s: %s", e.typ, e.err) +} + +func (api *API) receivers(w http.ResponseWriter, req *http.Request) { + api.mtx.RLock() + defer api.mtx.RUnlock() + + receivers := make([]string, 0, len(api.config.Receivers)) + for _, r := range api.config.Receivers { + receivers = append(receivers, r.Name) + } + + api.respond(w, receivers) +} + +func (api *API) status(w http.ResponseWriter, req *http.Request) { + api.mtx.RLock() + + status := struct { + ConfigYAML string `json:"configYAML"` + ConfigJSON *config.Config `json:"configJSON"` + VersionInfo map[string]string `json:"versionInfo"` + Uptime time.Time `json:"uptime"` + ClusterStatus *clusterStatus `json:"clusterStatus"` + }{ + ConfigYAML: api.config.String(), + ConfigJSON: api.config, + VersionInfo: map[string]string{ + "version": version.Version, + "revision": version.Revision, + "branch": version.Branch, + "buildUser": version.BuildUser, + "buildDate": version.BuildDate, + "goVersion": version.GoVersion, + }, + Uptime: api.uptime, + ClusterStatus: getClusterStatus(api.peer), + } + + api.mtx.RUnlock() + + api.respond(w, status) +} + +type peerStatus struct { + Name string `json:"name"` + Address string `json:"address"` +} + +type clusterStatus struct { + Name string `json:"name"` + Status string `json:"status"` + Peers []peerStatus `json:"peers"` +} + +func getClusterStatus(p cluster.ClusterPeer) *clusterStatus { + if p == nil { + return nil + } + s := &clusterStatus{Name: p.Name(), Status: p.Status()} + + for _, n := range p.Peers() { + s.Peers = append(s.Peers, peerStatus{ + Name: n.Name(), + Address: n.Address(), + }) + } + return s +} + +func (api *API) listAlerts(w http.ResponseWriter, r *http.Request) { + var ( + err error + receiverFilter *regexp.Regexp + // Initialize result slice to prevent api returning `null` when there + // are no alerts present + res = []*Alert{} + matchers = []*labels.Matcher{} + ctx = r.Context() + + showActive, showInhibited bool + showSilenced, showUnprocessed bool + ) + + getBoolParam := func(name string) (bool, error) { + v := r.FormValue(name) + if v == "" { + return true, nil + } + if v == "false" { + return false, nil + } + if v != "true" { + err := fmt.Errorf("parameter %q can either be 'true' or 'false', not %q", name, v) + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return false, err + } + return true, nil + } + + if filter := r.FormValue("filter"); filter != "" { + matchers, err = labels.ParseMatchers(filter) + if err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + } + + showActive, err = getBoolParam("active") + if err != nil { + return + } + + showSilenced, err = getBoolParam("silenced") + if err != nil { + return + } + + showInhibited, err = getBoolParam("inhibited") + if err != nil { + return + } + + showUnprocessed, err = getBoolParam("unprocessed") + if err != nil { + return + } + + if receiverParam := r.FormValue("receiver"); receiverParam != "" { + receiverFilter, err = regexp.Compile("^(?:" + receiverParam + ")$") + if err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: fmt.Errorf( + "failed to parse receiver param: %s", + receiverParam, + ), + }, nil) + return + } + } + + alerts := api.alerts.GetPending() + defer alerts.Close() + + api.mtx.RLock() + for a := range alerts.Next() { + if err = alerts.Err(); err != nil { + break + } + if err = ctx.Err(); err != nil { + break + } + + routes := api.route.Match(a.Labels) + receivers := make([]string, 0, len(routes)) + for _, r := range routes { + receivers = append(receivers, r.RouteOpts.Receiver) + } + + if receiverFilter != nil && !receiversMatchFilter(receivers, receiverFilter) { + continue + } + + if !alertMatchesFilterLabels(&a.Alert, matchers) { + continue + } + + // Continue if the alert is resolved. + if !a.Alert.EndsAt.IsZero() && a.Alert.EndsAt.Before(time.Now()) { + continue + } + + status := api.getAlertStatus(a.Fingerprint()) + + if !showActive && status.State == types.AlertStateActive { + continue + } + + if !showUnprocessed && status.State == types.AlertStateUnprocessed { + continue + } + + if !showSilenced && len(status.SilencedBy) != 0 { + continue + } + + if !showInhibited && len(status.InhibitedBy) != 0 { + continue + } + + alert := &Alert{ + Alert: &a.Alert, + Status: status, + Receivers: receivers, + Fingerprint: a.Fingerprint().String(), + } + + res = append(res, alert) + } + api.mtx.RUnlock() + + if err != nil { + api.respondError(w, apiError{ + typ: errorInternal, + err: err, + }, nil) + return + } + sort.Slice(res, func(i, j int) bool { + return res[i].Fingerprint < res[j].Fingerprint + }) + api.respond(w, res) +} + +func receiversMatchFilter(receivers []string, filter *regexp.Regexp) bool { + for _, r := range receivers { + if filter.MatchString(r) { + return true + } + } + + return false +} + +func alertMatchesFilterLabels(a *model.Alert, matchers []*labels.Matcher) bool { + sms := make(map[string]string) + for name, value := range a.Labels { + sms[string(name)] = string(value) + } + return matchFilterLabels(matchers, sms) +} + +func (api *API) addAlerts(w http.ResponseWriter, r *http.Request) { + var alerts []*types.Alert + if err := api.receive(r, &alerts); err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + + api.insertAlerts(w, r, alerts...) +} + +func (api *API) insertAlerts(w http.ResponseWriter, r *http.Request, alerts ...*types.Alert) { + now := time.Now() + + api.mtx.RLock() + resolveTimeout := time.Duration(api.config.Global.ResolveTimeout) + api.mtx.RUnlock() + + for _, alert := range alerts { + alert.UpdatedAt = now + + // Ensure StartsAt is set. + if alert.StartsAt.IsZero() { + if alert.EndsAt.IsZero() { + alert.StartsAt = now + } else { + alert.StartsAt = alert.EndsAt + } + } + // If no end time is defined, set a timeout after which an alert + // is marked resolved if it is not updated. + if alert.EndsAt.IsZero() { + alert.Timeout = true + alert.EndsAt = now.Add(resolveTimeout) + } + if alert.EndsAt.After(time.Now()) { + api.m.Firing().Inc() + } else { + api.m.Resolved().Inc() + } + } + + // Make a best effort to insert all alerts that are valid. + var ( + validAlerts = make([]*types.Alert, 0, len(alerts)) + validationErrs = &types.MultiError{} + ) + for _, a := range alerts { + removeEmptyLabels(a.Labels) + + if err := a.Validate(); err != nil { + validationErrs.Add(err) + api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } + continue + } + validAlerts = append(validAlerts, a) + } + if err := api.alerts.Put(validAlerts...); err != nil { + api.respondError(w, apiError{ + typ: errorInternal, + err: err, + }, nil) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } + return + } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } + + if validationErrs.Len() > 0 { + api.respondError(w, apiError{ + typ: errorBadData, + err: validationErrs, + }, nil) + return + } + + api.respond(w, nil) +} + +func removeEmptyLabels(ls model.LabelSet) { + for k, v := range ls { + if string(v) == "" { + delete(ls, k) + } + } +} + +func (api *API) setSilence(w http.ResponseWriter, r *http.Request) { + var sil types.Silence + if err := api.receive(r, &sil); err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + + // This is an API only validation, it cannot be done internally + // because the expired silence is semantically important. + // But one should not be able to create expired silences, that + // won't have any use. + if sil.Expired() { + api.respondError(w, apiError{ + typ: errorBadData, + err: errors.New("start time must not be equal to end time"), + }, nil) + return + } + + if sil.EndsAt.Before(time.Now()) { + api.respondError(w, apiError{ + typ: errorBadData, + err: errors.New("end time can't be in the past"), + }, nil) + return + } + + psil, err := silenceToProto(&sil) + if err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + + sid, err := api.silences.Set(psil) + if err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + + api.respond(w, struct { + SilenceID string `json:"silenceId"` + }{ + SilenceID: sid, + }) +} + +func (api *API) getSilence(w http.ResponseWriter, r *http.Request) { + sid := route.Param(r.Context(), "sid") + + sils, _, err := api.silences.Query(silence.QIDs(sid)) + if err != nil || len(sils) == 0 { + http.Error(w, fmt.Sprint("Error getting silence: ", err), http.StatusNotFound) + return + } + sil, err := silenceFromProto(sils[0]) + if err != nil { + api.respondError(w, apiError{ + typ: errorInternal, + err: err, + }, nil) + return + } + + api.respond(w, sil) +} + +func (api *API) delSilence(w http.ResponseWriter, r *http.Request) { + sid := route.Param(r.Context(), "sid") + + if err := api.silences.Expire(sid); err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + api.respond(w, nil) +} + +func (api *API) listSilences(w http.ResponseWriter, r *http.Request) { + psils, _, err := api.silences.Query() + if err != nil { + api.respondError(w, apiError{ + typ: errorInternal, + err: err, + }, nil) + return + } + + matchers := []*labels.Matcher{} + if filter := r.FormValue("filter"); filter != "" { + matchers, err = labels.ParseMatchers(filter) + if err != nil { + api.respondError(w, apiError{ + typ: errorBadData, + err: err, + }, nil) + return + } + } + + sils := []*types.Silence{} + for _, ps := range psils { + s, err := silenceFromProto(ps) + if err != nil { + api.respondError(w, apiError{ + typ: errorInternal, + err: err, + }, nil) + return + } + + if !silenceMatchesFilterLabels(s, matchers) { + continue + } + sils = append(sils, s) + } + + var active, pending, expired []*types.Silence + + for _, s := range sils { + switch s.Status.State { + case types.SilenceStateActive: + active = append(active, s) + case types.SilenceStatePending: + pending = append(pending, s) + case types.SilenceStateExpired: + expired = append(expired, s) + } + } + + sort.Slice(active, func(i, j int) bool { + return active[i].EndsAt.Before(active[j].EndsAt) + }) + sort.Slice(pending, func(i, j int) bool { + return pending[i].StartsAt.Before(pending[j].EndsAt) + }) + sort.Slice(expired, func(i, j int) bool { + return expired[i].EndsAt.After(expired[j].EndsAt) + }) + + // Initialize silences explicitly to an empty list (instead of nil) + // So that it does not get converted to "null" in JSON. + silences := []*types.Silence{} + silences = append(silences, active...) + silences = append(silences, pending...) + silences = append(silences, expired...) + + api.respond(w, silences) +} + +func silenceMatchesFilterLabels(s *types.Silence, matchers []*labels.Matcher) bool { + sms := make(map[string]string) + for _, m := range s.Matchers { + sms[m.Name] = m.Value + } + + return matchFilterLabels(matchers, sms) +} + +func matchFilterLabels(matchers []*labels.Matcher, sms map[string]string) bool { + for _, m := range matchers { + v, prs := sms[m.Name] + switch m.Type { + case labels.MatchNotRegexp, labels.MatchNotEqual: + if string(m.Value) == "" && prs { + continue + } + if !m.Matches(string(v)) { + return false + } + default: + if string(m.Value) == "" && !prs { + continue + } + if !m.Matches(string(v)) { + return false + } + } + } + + return true +} + +func silenceToProto(s *types.Silence) (*silencepb.Silence, error) { + sil := &silencepb.Silence{ + Id: s.ID, + StartsAt: s.StartsAt, + EndsAt: s.EndsAt, + UpdatedAt: s.UpdatedAt, + Comment: s.Comment, + CreatedBy: s.CreatedBy, + } + for _, m := range s.Matchers { + matcher := &silencepb.Matcher{ + Name: m.Name, + Pattern: m.Value, + } + switch m.Type { + case labels.MatchEqual: + matcher.Type = silencepb.Matcher_EQUAL + case labels.MatchNotEqual: + matcher.Type = silencepb.Matcher_NOT_EQUAL + case labels.MatchRegexp: + matcher.Type = silencepb.Matcher_REGEXP + case labels.MatchNotRegexp: + matcher.Type = silencepb.Matcher_NOT_REGEXP + } + sil.Matchers = append(sil.Matchers, matcher) + } + return sil, nil +} + +func silenceFromProto(s *silencepb.Silence) (*types.Silence, error) { + sil := &types.Silence{ + ID: s.Id, + StartsAt: s.StartsAt, + EndsAt: s.EndsAt, + UpdatedAt: s.UpdatedAt, + Status: types.SilenceStatus{ + State: types.CalcSilenceState(s.StartsAt, s.EndsAt), + }, + Comment: s.Comment, + CreatedBy: s.CreatedBy, + } + for _, m := range s.Matchers { + var t labels.MatchType + switch m.Type { + case silencepb.Matcher_EQUAL: + t = labels.MatchEqual + case silencepb.Matcher_NOT_EQUAL: + t = labels.MatchNotEqual + case silencepb.Matcher_REGEXP: + t = labels.MatchRegexp + case silencepb.Matcher_NOT_REGEXP: + t = labels.MatchNotRegexp + } + matcher, err := labels.NewMatcher(t, m.Name, m.Pattern) + if err != nil { + return nil, err + } + + sil.Matchers = append(sil.Matchers, matcher) + } + + return sil, nil +} + +type status string + +const ( + statusSuccess status = "success" + statusError status = "error" +) + +type response struct { + Status status `json:"status"` + Data interface{} `json:"data,omitempty"` + ErrorType errorType `json:"errorType,omitempty"` + Error string `json:"error,omitempty"` +} + +func (api *API) respond(w http.ResponseWriter, data interface{}) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + + b, err := json.Marshal(&response{ + Status: statusSuccess, + Data: data, + }) + if err != nil { + level.Error(api.logger).Log("msg", "Error marshaling JSON", "err", err) + return + } + + if _, err := w.Write(b); err != nil { + level.Error(api.logger).Log("msg", "failed to write data to connection", "err", err) + } +} + +func (api *API) respondError(w http.ResponseWriter, apiErr apiError, data interface{}) { + w.Header().Set("Content-Type", "application/json") + + switch apiErr.typ { + case errorBadData: + w.WriteHeader(http.StatusBadRequest) + case errorInternal: + w.WriteHeader(http.StatusInternalServerError) + default: + panic(fmt.Sprintf("unknown error type %q", apiErr.Error())) + } + + b, err := json.Marshal(&response{ + Status: statusError, + ErrorType: apiErr.typ, + Error: apiErr.err.Error(), + Data: data, + }) + if err != nil { + return + } + level.Error(api.logger).Log("msg", "API error", "err", apiErr.Error()) + + if _, err := w.Write(b); err != nil { + level.Error(api.logger).Log("msg", "failed to write data to connection", "err", err) + } +} + +func (api *API) receive(r *http.Request, v interface{}) error { + dec := json.NewDecoder(r.Body) + defer r.Body.Close() + + err := dec.Decode(v) + if err != nil { + level.Debug(api.logger).Log("msg", "Decoding request failed", "err", err) + return err + } + return nil +} diff --git a/api/v1/api_test.go b/api/v1/api_test.go new file mode 100644 index 0000000000..8310c78164 --- /dev/null +++ b/api/v1/api_test.go @@ -0,0 +1,655 @@ +// Copyright 2018 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v1 + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/http/httptest" + "regexp" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/dispatch" + "github.com/prometheus/alertmanager/pkg/labels" + "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/types" +) + +// fakeAlerts is a struct implementing the provider.Alerts interface for tests. +type fakeAlerts struct { + fps map[model.Fingerprint]int + alerts []*types.Alert + err error +} + +func newFakeAlerts(alerts []*types.Alert, withErr bool) *fakeAlerts { + fps := make(map[model.Fingerprint]int) + for i, a := range alerts { + fps[a.Fingerprint()] = i + } + f := &fakeAlerts{ + alerts: alerts, + fps: fps, + } + if withErr { + f.err = errors.New("error occurred") + } + return f +} + +func (f *fakeAlerts) Subscribe() provider.AlertIterator { return nil } +func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil } +func (f *fakeAlerts) Put(alerts ...*types.Alert) error { + return f.err +} + +func (f *fakeAlerts) GetPending() provider.AlertIterator { + ch := make(chan *types.Alert) + done := make(chan struct{}) + go func() { + defer close(ch) + for _, a := range f.alerts { + ch <- a + } + }() + return provider.NewAlertIterator(ch, done, f.err) +} + +func newGetAlertStatus(f *fakeAlerts) func(model.Fingerprint) types.AlertStatus { + return func(fp model.Fingerprint) types.AlertStatus { + status := types.AlertStatus{SilencedBy: []string{}, InhibitedBy: []string{}} + + i, ok := f.fps[fp] + if !ok { + return status + } + alert := f.alerts[i] + switch alert.Labels["state"] { + case "active": + status.State = types.AlertStateActive + case "unprocessed": + status.State = types.AlertStateUnprocessed + case "suppressed": + status.State = types.AlertStateSuppressed + } + if alert.Labels["silenced_by"] != "" { + status.SilencedBy = append(status.SilencedBy, string(alert.Labels["silenced_by"])) + } + if alert.Labels["inhibited_by"] != "" { + status.InhibitedBy = append(status.InhibitedBy, string(alert.Labels["inhibited_by"])) + } + return status + } +} + +func TestAddAlerts(t *testing.T) { + now := func(offset int) time.Time { + return time.Now().Add(time.Duration(offset) * time.Second) + } + + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now(0), time.Time{}, false, 200}, + {time.Time{}, now(-1), false, 200}, + {time.Time{}, now(0), false, 200}, + {time.Time{}, now(1), false, 200}, + {now(-2), now(-1), false, 200}, + {now(1), now(2), false, 200}, + {now(1), now(0), false, 400}, + {now(0), time.Time{}, true, 500}, + } { + alerts := []model.Alert{{ + StartsAt: tc.start, + EndsAt: tc.end, + Labels: model.LabelSet{"label1": "test1"}, + Annotations: model.LabelSet{"annotation1": "some text"}, + }} + b, err := json.Marshal(&alerts) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) + defaultGlobalConfig := config.DefaultGlobalConfig() + route := config.Route{} + api.Update(&config.Config{ + Global: &defaultGlobalConfig, + Route: &route, + }) + + r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w := httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + api.addAlerts(w, r) + res := w.Result() + body, _ := io.ReadAll(res.Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w = httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + api.addAlerts(w, r) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } + } +} + +func TestAddAlertsWithAlertLCObserver(t *testing.T) { + now := func(offset int) time.Time { + return time.Now().Add(time.Duration(offset) * time.Second) + } + + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now(1), now(0), false, 400}, + {now(0), time.Time{}, true, 500}, + } { + alerts := []model.Alert{{ + StartsAt: tc.start, + EndsAt: tc.end, + Labels: model.LabelSet{"label1": "test1"}, + Annotations: model.LabelSet{"annotation1": "some text"}, + }} + b, err := json.Marshal(&alerts) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + alertsProvider := newFakeAlerts([]*types.Alert{}, tc.err) + observer := alertobserver.NewFakeLifeCycleObserver() + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, observer) + defaultGlobalConfig := config.DefaultGlobalConfig() + route := config.Route{} + api.Update(&config.Config{ + Global: &defaultGlobalConfig, + Route: &route, + }) + + r, err := http.NewRequest("POST", "/api/v1/alerts", bytes.NewReader(b)) + w := httptest.NewRecorder() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + api.addAlerts(w, r) + res := w.Result() + body, _ := io.ReadAll(res.Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, StartsAt %v, EndsAt %v, Response: %s", i, tc.start, tc.end, string(body))) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), alerts[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), alerts[0].Fingerprint()) + } + } +} + +func TestListAlerts(t *testing.T) { + now := time.Now() + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "active", "alertname": "alert1"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "unprocessed", "alertname": "alert2"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "suppressed", "silenced_by": "abc", "alertname": "alert3"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "suppressed", "inhibited_by": "abc", "alertname": "alert4"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert5"}, + StartsAt: now.Add(-2 * time.Minute), + EndsAt: now.Add(-time.Minute), + }, + }, + } + + for i, tc := range []struct { + err bool + params map[string]string + + code int + anames []string + }{ + { + false, + map[string]string{}, + 200, + []string{"alert1", "alert2", "alert3", "alert4"}, + }, + { + false, + map[string]string{"active": "true", "unprocessed": "true", "silenced": "true", "inhibited": "true"}, + 200, + []string{"alert1", "alert2", "alert3", "alert4"}, + }, + { + false, + map[string]string{"active": "false", "unprocessed": "true", "silenced": "true", "inhibited": "true"}, + 200, + []string{"alert2", "alert3", "alert4"}, + }, + { + false, + map[string]string{"active": "true", "unprocessed": "false", "silenced": "true", "inhibited": "true"}, + 200, + []string{"alert1", "alert3", "alert4"}, + }, + { + false, + map[string]string{"active": "true", "unprocessed": "true", "silenced": "false", "inhibited": "true"}, + 200, + []string{"alert1", "alert2", "alert4"}, + }, + { + false, + map[string]string{"active": "true", "unprocessed": "true", "silenced": "true", "inhibited": "false"}, + 200, + []string{"alert1", "alert2", "alert3"}, + }, + { + false, + map[string]string{"filter": "{alertname=\"alert3\""}, + 200, + []string{"alert3"}, + }, + { + false, + map[string]string{"filter": "{alertname"}, + 400, + []string{}, + }, + { + false, + map[string]string{"receiver": "other"}, + 200, + []string{}, + }, + { + false, + map[string]string{"active": "invalid"}, + 400, + []string{}, + }, + { + true, + map[string]string{}, + 500, + []string{}, + }, + } { + alertsProvider := newFakeAlerts(alerts, tc.err) + api := New(alertsProvider, nil, newGetAlertStatus(alertsProvider), nil, nil, nil, nil) + api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) + + r, err := http.NewRequest("GET", "/api/v1/alerts", nil) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + q := r.URL.Query() + for k, v := range tc.params { + q.Add(k, v) + } + r.URL.RawQuery = q.Encode() + w := httptest.NewRecorder() + + api.listAlerts(w, r) + body, _ := io.ReadAll(w.Result().Body) + + var res response + err = json.Unmarshal(body, &res) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) + if w.Code != 200 { + continue + } + + // Data needs to be serialized/deserialized to be converted to the real type. + b, err := json.Marshal(res.Data) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + retAlerts := []*Alert{} + err = json.Unmarshal(b, &retAlerts) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + + anames := []string{} + for _, a := range retAlerts { + name, ok := a.Labels["alertname"] + if ok { + anames = append(anames, string(name)) + } + } + require.Equal(t, tc.anames, anames, fmt.Sprintf("test case: %d, alert names are not equal", i)) + } +} + +func TestAlertFiltering(t *testing.T) { + type test struct { + alert *model.Alert + msg string + expected bool + } + + // Equal + equal, err := labels.NewMatcher(labels.MatchEqual, "label1", "test1") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests := []test{ + {&model.Alert{Labels: model.LabelSet{"label1": "test1"}}, "label1=test1", true}, + {&model.Alert{Labels: model.LabelSet{"label1": "test2"}}, "label1=test2", false}, + {&model.Alert{Labels: model.LabelSet{"label2": "test2"}}, "label2=test2", false}, + } + + for _, test := range tests { + actual := alertMatchesFilterLabels(test.alert, []*labels.Matcher{equal}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Not Equal + notEqual, err := labels.NewMatcher(labels.MatchNotEqual, "label1", "test1") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + {&model.Alert{Labels: model.LabelSet{"label1": "test1"}}, "label1!=test1", false}, + {&model.Alert{Labels: model.LabelSet{"label1": "test2"}}, "label1!=test2", true}, + {&model.Alert{Labels: model.LabelSet{"label2": "test2"}}, "label2!=test2", true}, + } + + for _, test := range tests { + actual := alertMatchesFilterLabels(test.alert, []*labels.Matcher{notEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Regexp Equal + regexpEqual, err := labels.NewMatcher(labels.MatchRegexp, "label1", "tes.*") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + {&model.Alert{Labels: model.LabelSet{"label1": "test1"}}, "label1=~test1", true}, + {&model.Alert{Labels: model.LabelSet{"label1": "test2"}}, "label1=~test2", true}, + {&model.Alert{Labels: model.LabelSet{"label2": "test2"}}, "label2=~test2", false}, + } + + for _, test := range tests { + actual := alertMatchesFilterLabels(test.alert, []*labels.Matcher{regexpEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Regexp Not Equal + regexpNotEqual, err := labels.NewMatcher(labels.MatchNotRegexp, "label1", "tes.*") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + {&model.Alert{Labels: model.LabelSet{"label1": "test1"}}, "label1!~test1", false}, + {&model.Alert{Labels: model.LabelSet{"label1": "test2"}}, "label1!~test2", false}, + {&model.Alert{Labels: model.LabelSet{"label2": "test2"}}, "label2!~test2", true}, + } + + for _, test := range tests { + actual := alertMatchesFilterLabels(test.alert, []*labels.Matcher{regexpNotEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } +} + +func TestSilenceFiltering(t *testing.T) { + type test struct { + silence *types.Silence + msg string + expected bool + } + + // Equal + equal, err := labels.NewMatcher(labels.MatchEqual, "label1", "test1") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests := []test{ + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test1"})}, + "label1=test1", + true, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test2"})}, + "label1=test2", + false, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label2": "test2"})}, + "label2=test2", + false, + }, + } + + for _, test := range tests { + actual := silenceMatchesFilterLabels(test.silence, []*labels.Matcher{equal}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Not Equal + notEqual, err := labels.NewMatcher(labels.MatchNotEqual, "label1", "test1") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test1"})}, + "label1!=test1", + false, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test2"})}, + "label1!=test2", + true, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label2": "test2"})}, + "label2!=test2", + true, + }, + } + + for _, test := range tests { + actual := silenceMatchesFilterLabels(test.silence, []*labels.Matcher{notEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Regexp Equal + regexpEqual, err := labels.NewMatcher(labels.MatchRegexp, "label1", "tes.*") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test1"})}, + "label1=~test1", + true, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test2"})}, + "label1=~test2", + true, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label2": "test2"})}, + "label2=~test2", + false, + }, + } + + for _, test := range tests { + actual := silenceMatchesFilterLabels(test.silence, []*labels.Matcher{regexpEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } + + // Regexp Not Equal + regexpNotEqual, err := labels.NewMatcher(labels.MatchNotRegexp, "label1", "tes.*") + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + tests = []test{ + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test1"})}, + "label1!~test1", + false, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label1": "test2"})}, + "label1!~test2", + false, + }, + { + &types.Silence{Matchers: newMatcher(model.LabelSet{"label2": "test2"})}, + "label2!~test2", + true, + }, + } + + for _, test := range tests { + actual := silenceMatchesFilterLabels(test.silence, []*labels.Matcher{regexpNotEqual}) + msg := fmt.Sprintf("Expected %t for %s", test.expected, test.msg) + require.Equal(t, test.expected, actual, msg) + } +} + +func TestReceiversMatchFilter(t *testing.T) { + receivers := []string{"pagerduty", "slack", "pushover"} + + filter, err := regexp.Compile(fmt.Sprintf("^(?:%s)$", "push.*")) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + require.True(t, receiversMatchFilter(receivers, filter)) + + filter, err = regexp.Compile(fmt.Sprintf("^(?:%s)$", "push")) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + require.False(t, receiversMatchFilter(receivers, filter)) +} + +func TestMatchFilterLabels(t *testing.T) { + testCases := []struct { + matcher labels.MatchType + expected bool + }{ + {labels.MatchEqual, true}, + {labels.MatchRegexp, true}, + {labels.MatchNotEqual, false}, + {labels.MatchNotRegexp, false}, + } + + for _, tc := range testCases { + l, err := labels.NewMatcher(tc.matcher, "foo", "") + require.NoError(t, err) + sms := map[string]string{ + "baz": "bar", + } + ls := []*labels.Matcher{l} + + require.Equal(t, tc.expected, matchFilterLabels(ls, sms)) + + l, err = labels.NewMatcher(tc.matcher, "foo", "") + require.NoError(t, err) + sms = map[string]string{ + "baz": "bar", + "foo": "quux", + } + ls = []*labels.Matcher{l} + require.NotEqual(t, tc.expected, matchFilterLabels(ls, sms)) + } +} + +func newMatcher(labelSet model.LabelSet) labels.Matchers { + matchers := make([]*labels.Matcher, 0, len(labelSet)) + for key, val := range labelSet { + matchers = append(matchers, &labels.Matcher{ + Type: labels.MatchEqual, + Name: string(key), + Value: string(val), + }) + } + return matchers +} diff --git a/api/v2/api.go b/api/v2/api.go index 3deda8acec..811402c34d 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -37,6 +37,7 @@ import ( alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" "github.com/prometheus/alertmanager/util/callback" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/api/metrics" open_api_models "github.com/prometheus/alertmanager/api/v2/models" "github.com/prometheus/alertmanager/api/v2/restapi" @@ -77,8 +78,9 @@ type API struct { route *dispatch.Route setAlertStatus setAlertStatusFn - logger log.Logger - m *metrics.Alerts + logger log.Logger + m *metrics.Alerts + alertLCObserver alertobserver.LifeCycleObserver Handler http.Handler } @@ -101,6 +103,7 @@ func NewAPI( peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, + o alertobserver.LifeCycleObserver, ) (*API, error) { if apiCallback == nil { apiCallback = callback.NoopAPICallback{} @@ -116,6 +119,7 @@ func NewAPI( logger: l, m: metrics.NewAlerts(r), uptime: time.Now(), + alertLCObserver: o, } // Load embedded swagger file. @@ -403,12 +407,20 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. if err := a.Validate(); err != nil { validationErrs.Add(err) api.m.Invalid().Inc() + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, []*types.Alert{a}, m) + } continue } validAlerts = append(validAlerts, a) } if err := api.alerts.Put(validAlerts...); err != nil { level.Error(logger).Log("msg", "Failed to create alerts", "err", err) + if api.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{"msg": err.Error()} + api.alertLCObserver.Observe(alertobserver.EventAlertRejected, validAlerts, m) + } return alert_ops.NewPostAlertsInternalServerError().WithPayload(err.Error()) } @@ -416,6 +428,9 @@ func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware. level.Error(logger).Log("msg", "Failed to validate alerts", "err", validationErrs.Error()) return alert_ops.NewPostAlertsBadRequest().WithPayload(validationErrs.Error()) } + if api.alertLCObserver != nil { + api.alertLCObserver.Observe(alertobserver.EventAlertReceived, validAlerts, alertobserver.AlertEventMeta{}) + } return alert_ops.NewPostAlertsOK() } diff --git a/api/v2/api_test.go b/api/v2/api_test.go index 794a1219c1..c95da7360b 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/json" "fmt" + "io" "net/http" "net/http/httptest" @@ -29,6 +30,8 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" + "github.com/prometheus/alertmanager/api/metrics" alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup" alertgroupinfolist_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroupinfolist" @@ -1123,6 +1126,67 @@ func TestListAlertInfosHandler(t *testing.T) { } } +func TestPostAlertHandler(t *testing.T) { + now := time.Now() + for i, tc := range []struct { + start, end time.Time + err bool + code int + }{ + {time.Time{}, time.Time{}, false, 200}, + {now, time.Time{}, false, 200}, + {time.Time{}, now.Add(time.Duration(-1) * time.Second), false, 200}, + {time.Time{}, now, false, 200}, + {time.Time{}, now.Add(time.Duration(1) * time.Second), false, 200}, + {now.Add(time.Duration(-2) * time.Second), now.Add(time.Duration(-1) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now.Add(time.Duration(2) * time.Second), false, 200}, + {now.Add(time.Duration(1) * time.Second), now, false, 400}, + } { + alerts, alertsBytes := createAlert(t, tc.start, tc.end) + api := API{ + uptime: time.Now(), + alerts: newFakeAlerts([]*types.Alert{}), + logger: log.NewNopLogger(), + m: metrics.NewAlerts("v2", nil), + } + api.Update(&config.Config{ + Global: &config.GlobalConfig{ + ResolveTimeout: model.Duration(5), + }, + Route: &config.Route{}, + }, nil) + + r, err := http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + responder := api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.code, w.Code, fmt.Sprintf("test case: %d, response: %s", i, string(body))) + + observer := alertobserver.NewFakeLifeCycleObserver() + api.alertLCObserver = observer + r, err = http.NewRequest("POST", "/api/v2/alerts", bytes.NewReader(alertsBytes)) + require.NoError(t, err) + api.postAlertsHandler(alert_ops.PostAlertsParams{ + HTTPRequest: r, + Alerts: alerts, + }) + amAlert := OpenAPIAlertsToAlerts(alerts) + if tc.code == 200 { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertReceived][0].Fingerprint(), amAlert[0].Fingerprint()) + } else { + require.Equal(t, observer.AlertsPerEvent[alertobserver.EventAlertRejected][0].Fingerprint(), amAlert[0].Fingerprint()) + } + } +} + type limitNumberOfAlertsReturnedCallback struct { limit int } diff --git a/api/v2/testing.go b/api/v2/testing.go index fbb38a9d53..0f3f74a554 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -137,3 +137,23 @@ func newGetAlertStatus(f *fakeAlerts) func(model.Fingerprint) types.AlertStatus return status } } + +func createAlert(t *testing.T, start, ends time.Time) (open_api_models.PostableAlerts, []byte) { + startsAt := strfmt.DateTime(start) + endsAt := strfmt.DateTime(ends) + + alert := open_api_models.PostableAlert{ + StartsAt: startsAt, + EndsAt: endsAt, + Annotations: open_api_models.LabelSet{"annotation1": "some text"}, + Alert: open_api_models.Alert{ + Labels: open_api_models.LabelSet{"label1": "test1"}, + GeneratorURL: "http://localhost:3000", + }, + } + alerts := open_api_models.PostableAlerts{} + alerts = append(alerts, &alert) + b, err := json.Marshal(alerts) + require.NoError(t, err) + return alerts, b +} diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 531708acdf..139b8aa4c2 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -437,6 +437,7 @@ func run() int { intervener, notificationLog, pipelinePeer, + nil, ) configuredReceivers.Set(float64(len(activeReceivers))) @@ -448,7 +449,7 @@ func run() int { silencer.Mutes(labels) }) - disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics) + disp = dispatch.NewDispatcher(alerts, routes, pipeline, marker, timeoutFunc, nil, logger, dispMetrics, nil) routes.Walk(func(r *dispatch.Route) { if r.RouteOpts.RepeatInterval > *retention { level.Warn(configLogger).Log( diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index e457dfeb9a..c353e21cf2 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/store" @@ -92,7 +93,8 @@ type Dispatcher struct { ctx context.Context cancel func() - logger log.Logger + logger log.Logger + alertLCObserver alertobserver.LifeCycleObserver } // Limits describes limits used by Dispatcher. @@ -113,19 +115,21 @@ func NewDispatcher( lim Limits, l log.Logger, m *DispatcherMetrics, + o alertobserver.LifeCycleObserver, ) *Dispatcher { if lim == nil { lim = nilLimits{} } disp := &Dispatcher{ - alerts: ap, - stage: s, - route: r, - timeout: to, - logger: log.With(l, "component", "dispatcher"), - metrics: m, - limits: lim, + alerts: ap, + stage: s, + route: r, + timeout: to, + logger: log.With(l, "component", "dispatcher"), + metrics: m, + limits: lim, + alertLCObserver: o, } return disp } @@ -366,13 +370,25 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { ag, ok := routeGroups[fp] if ok { ag.insert(alert) + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } return } // If the group does not exist, create it. But check the limit first. if limit := d.limits.MaxNumberOfAggregationGroups(); limit > 0 && d.aggrGroupsNum >= limit { d.metrics.aggrGroupLimitReached.Inc() - level.Error(d.logger).Log("msg", "Too many aggregation groups, cannot create new group for alert", "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + errMsg := "Too many aggregation groups, cannot create new group for alert" + level.Error(d.logger).Log("msg", errMsg, "groups", d.aggrGroupsNum, "limit", limit, "alert", alert.Name()) + if d.alertLCObserver != nil { + d.alertLCObserver.Observe(alertobserver.EventAlertFailedAddToAggrGroup, []*types.Alert{alert}, alertobserver.AlertEventMeta{"msg": errMsg}) + } return } @@ -380,6 +396,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { routeGroups[fp] = ag d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() + if d.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "groupKey": ag.GroupKey(), + "routeId": ag.routeID, + "groupId": ag.GroupID(), + } + d.alertLCObserver.Observe(alertobserver.EventAlertAddedToAggrGroup, []*types.Alert{alert}, m) + } // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st @@ -496,6 +520,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { // Populate context with information needed along the pipeline. ctx = notify.WithGroupKey(ctx, ag.GroupKey()) + ctx = notify.WithGroupId(ctx, ag.GroupID()) ctx = notify.WithGroupLabels(ctx, ag.labels) ctx = notify.WithReceiverName(ctx, ag.opts.Receiver) ctx = notify.WithRepeatInterval(ctx, ag.opts.RepeatInterval) diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 546405f797..8e5c641f3e 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -28,6 +28,7 @@ import ( "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/config" "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" @@ -107,6 +108,9 @@ func TestAggrGroup(t *testing.T) { if _, ok := notify.GroupKey(ctx); !ok { t.Errorf("group key missing") } + if _, ok := notify.GroupId(ctx); !ok { + t.Errorf("group id missing") + } if lbls, ok := notify.GroupLabels(ctx); !ok || !reflect.DeepEqual(lbls, lset) { t.Errorf("wrong group labels: %q", lbls) } @@ -374,7 +378,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -516,7 +520,7 @@ route: recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} lim := limits{groups: 6} m := NewDispatcherMetrics(true, prometheus.NewRegistry()) - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, nil) go dispatcher.Run() defer dispatcher.Stop() @@ -612,7 +616,7 @@ route: timeout := func(d time.Duration) time.Duration { return time.Duration(0) } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() @@ -726,6 +730,74 @@ route: }, alertGroupInfos) } +func TestGroupsAlertLCObserver(t *testing.T) { + confData := `receivers: +- name: 'testing' + +route: + group_by: ['alertname'] + group_wait: 10ms + group_interval: 10ms + receiver: 'testing'` + conf, err := config.Load(confData) + if err != nil { + t.Fatal(err) + } + + logger := log.NewNopLogger() + route := NewRoute(conf.Route, nil) + marker := types.NewMarker(prometheus.NewRegistry()) + alerts, err := mem.NewAlerts(context.Background(), marker, time.Hour, nil, logger, nil) + if err != nil { + t.Fatal(err) + } + defer alerts.Close() + + timeout := func(d time.Duration) time.Duration { return time.Duration(0) } + recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} + m := NewDispatcherMetrics(true, prometheus.NewRegistry()) + observer := alertobserver.NewFakeLifeCycleObserver() + lim := limits{groups: 1} + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, lim, logger, m, observer) + go dispatcher.Run() + defer dispatcher.Stop() + + // Create alerts. the dispatcher will automatically create the groups. + alert1 := newAlert(model.LabelSet{"alertname": "OtherAlert", "cluster": "cc", "service": "dd"}) + alert2 := newAlert(model.LabelSet{"alertname": "YetAnotherAlert", "cluster": "cc", "service": "db"}) + err = alerts.Put(alert1) + if err != nil { + t.Fatal(err) + } + // Let alerts get processed. + for i := 0; len(recorder.Alerts()) != 1 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + err = alerts.Put(alert2) + if err != nil { + t.Fatal(err) + } + // Let alert get processed. + for i := 0; testutil.ToFloat64(m.aggrGroupLimitReached) == 0 && i < 10; i++ { + time.Sleep(200 * time.Millisecond) + } + observer.Mtx.RLock() + defer observer.Mtx.RUnlock() + require.Equal(t, 1, len(recorder.Alerts())) + require.Equal(t, alert1.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertAddedToAggrGroup][0].Fingerprint()) + groupFp := getGroupLabels(alert1, route).Fingerprint() + group := dispatcher.aggrGroupsPerRoute[route][groupFp] + groupKey := group.GroupKey() + groupId := group.GroupID() + routeId := group.routeID + require.Equal(t, groupKey, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupKey"].(string)) + require.Equal(t, groupId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["groupId"].(string)) + require.Equal(t, routeId, observer.MetaPerEvent[alertobserver.EventAlertAddedToAggrGroup][0]["routeId"].(string)) + + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup])) + require.Equal(t, alert2.Fingerprint(), observer.AlertsPerEvent[alertobserver.EventAlertFailedAddToAggrGroup][0].Fingerprint()) +} + type recordStage struct { mtx sync.RWMutex alerts map[string]map[model.Fingerprint]*types.Alert @@ -790,7 +862,7 @@ func TestDispatcherRace(t *testing.T) { defer alerts.Close() timeout := func(d time.Duration) time.Duration { return time.Duration(0) } - dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, nil, nil, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() dispatcher.Stop() } @@ -818,7 +890,7 @@ func TestDispatcherRaceOnFirstAlertNotDeliveredWhenGroupWaitIsZero(t *testing.T) timeout := func(d time.Duration) time.Duration { return d } recorder := &recordStage{alerts: make(map[string]map[model.Fingerprint]*types.Alert)} - dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry())) + dispatcher := NewDispatcher(alerts, route, recorder, marker, timeout, nil, logger, NewDispatcherMetrics(false, prometheus.NewRegistry()), nil) go dispatcher.Run() defer dispatcher.Stop() diff --git a/notify/notify.go b/notify/notify.go index 30861a3027..3bd36c0a74 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "sort" + "strings" "sync" "time" @@ -28,6 +29,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/alertobserver" "github.com/prometheus/alertmanager/featurecontrol" "github.com/prometheus/alertmanager/inhibit" "github.com/prometheus/alertmanager/nflog" @@ -119,6 +121,7 @@ const ( keyNow keyMuteTimeIntervals keyActiveTimeIntervals + keyGroupId ) // WithReceiverName populates a context with a receiver name. @@ -131,6 +134,11 @@ func WithGroupKey(ctx context.Context, s string) context.Context { return context.WithValue(ctx, keyGroupKey, s) } +// WithGroupId populates a context with a group id. +func WithGroupId(ctx context.Context, s string) context.Context { + return context.WithValue(ctx, keyGroupId, s) +} + // WithFiringAlerts populates a context with a slice of firing alerts. func WithFiringAlerts(ctx context.Context, alerts []uint64) context.Context { return context.WithValue(ctx, keyFiringAlerts, alerts) @@ -186,6 +194,13 @@ func GroupKey(ctx context.Context) (string, bool) { return v, ok } +// GroupId extracts a group id from the context. Iff none exists, the +// second argument is false. +func GroupId(ctx context.Context) (string, bool) { + v, ok := ctx.Value(keyGroupId).(string) + return v, ok +} + // GroupLabels extracts grouping label set from the context. Iff none exists, the // second argument is false. func GroupLabels(ctx context.Context) (model.LabelSet, bool) { @@ -383,18 +398,25 @@ func (pb *PipelineBuilder) New( intervener *timeinterval.Intervener, notificationLog NotificationLog, peer Peer, + o alertobserver.LifeCycleObserver, ) RoutingStage { - rs := make(RoutingStage, len(receivers)) + rs := RoutingStage{ + stages: make(map[string]Stage, len(receivers)), + alertLCObserver: o, + } ms := NewGossipSettleStage(peer) - is := NewMuteStage(inhibitor, pb.metrics) + is := NewMuteStage(inhibitor, pb.metrics, o) tas := NewTimeActiveStage(intervener, pb.metrics) tms := NewTimeMuteStage(intervener, pb.metrics) - ss := NewMuteStage(silencer, pb.metrics) + ss := NewMuteStage(silencer, pb.metrics, o) for name := range receivers { - st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics) - rs[name] = MultiStage{ms, is, tas, tms, ss, st} + st := createReceiverStage(name, receivers[name], wait, notificationLog, pb.metrics, o) + rs.stages[name] = MultiStage{ + alertLCObserver: o, + stages: []Stage{ms, is, tas, tms, ss, st}, + } } pb.metrics.InitializeFor(receivers) @@ -409,6 +431,7 @@ func createReceiverStage( wait func() time.Duration, notificationLog NotificationLog, metrics *Metrics, + o alertobserver.LifeCycleObserver, ) Stage { var fs FanoutStage for i := range integrations { @@ -417,20 +440,23 @@ func createReceiverStage( Integration: integrations[i].Name(), Idx: uint32(integrations[i].Index()), } - var s MultiStage + var s []Stage s = append(s, NewWaitStage(wait)) s = append(s, NewDedupStage(&integrations[i], notificationLog, recv)) - s = append(s, NewRetryStage(integrations[i], name, metrics)) + s = append(s, NewRetryStage(integrations[i], name, metrics, o)) s = append(s, NewSetNotifiesStage(notificationLog, recv)) - fs = append(fs, s) + fs = append(fs, MultiStage{stages: s, alertLCObserver: o}) } return fs } // RoutingStage executes the inner stages based on the receiver specified in // the context. -type RoutingStage map[string]Stage +type RoutingStage struct { + stages map[string]Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { @@ -439,21 +465,28 @@ func (rs RoutingStage) Exec(ctx context.Context, l log.Logger, alerts ...*types. return ctx, nil, errors.New("receiver missing") } - s, ok := rs[receiver] + s, ok := rs.stages[receiver] if !ok { return ctx, nil, errors.New("stage for receiver missing") } + if rs.alertLCObserver != nil { + rs.alertLCObserver.Observe(alertobserver.EventAlertPipelineStart, alerts, alertobserver.AlertEventMeta{"ctx": ctx}) + } + return s.Exec(ctx, l, alerts...) } // A MultiStage executes a series of stages sequentially. -type MultiStage []Stage +type MultiStage struct { + stages []Stage + alertLCObserver alertobserver.LifeCycleObserver +} // Exec implements the Stage interface. func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { var err error - for _, s := range ms { + for _, s := range ms.stages { if len(alerts) == 0 { return ctx, nil, nil } @@ -462,6 +495,10 @@ func (ms MultiStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Al if err != nil { return ctx, nil, err } + if ms.alertLCObserver != nil { + p := strings.Split(fmt.Sprintf("%T", s), ".") + ms.alertLCObserver.Observe(alertobserver.EventAlertPipelinePassStage, alerts, alertobserver.AlertEventMeta{"ctx": ctx, "stageName": p[len(p)-1]}) + } } return ctx, alerts, nil } @@ -522,13 +559,14 @@ const ( // MuteStage filters alerts through a Muter. type MuteStage struct { - muter types.Muter - metrics *Metrics + muter types.Muter + metrics *Metrics + alertLCObserver alertobserver.LifeCycleObserver } // NewMuteStage return a new MuteStage. -func NewMuteStage(m types.Muter, metrics *Metrics) *MuteStage { - return &MuteStage{muter: m, metrics: metrics} +func NewMuteStage(m types.Muter, metrics *Metrics, o alertobserver.LifeCycleObserver) *MuteStage { + return &MuteStage{muter: m, metrics: metrics, alertLCObserver: o} } // Exec implements the Stage interface. @@ -547,6 +585,7 @@ func (n *MuteStage) Exec(ctx context.Context, logger log.Logger, alerts ...*type } // TODO(fabxc): increment muted alerts counter if muted. } + if len(muted) > 0 { level.Debug(logger).Log("msg", "Notifications will not be sent for muted alerts", "alerts", fmt.Sprintf("%v", muted)) @@ -561,6 +600,9 @@ func (n *MuteStage) Exec(ctx context.Context, logger log.Logger, alerts ...*type n.metrics.numNotificationSuppressedTotal.WithLabelValues(reason).Add(float64(len(muted))) } + if n.alertLCObserver != nil { + n.alertLCObserver.Observe(alertobserver.EventAlertMuted, muted, alertobserver.AlertEventMeta{"ctx": ctx}) + } return ctx, filtered, nil } @@ -733,10 +775,11 @@ func (n *DedupStage) Exec(ctx context.Context, _ log.Logger, alerts ...*types.Al // RetryStage notifies via passed integration with exponential backoff until it // succeeds. It aborts if the context is canceled or timed out. type RetryStage struct { - integration Integration - groupName string - metrics *Metrics - labelValues []string + integration Integration + groupName string + metrics *Metrics + labelValues []string + alertLCObserver alertobserver.LifeCycleObserver } // NewRetryStage returns a new instance of a RetryStage. @@ -748,16 +791,17 @@ func NewRetryStage(i Integration, groupName string, metrics *Metrics) *RetryStag } return &RetryStage{ - integration: i, - groupName: groupName, - metrics: metrics, - labelValues: labelValues, + integration: i, + groupName: groupName, + metrics: metrics, + labelValues: labelValues, + alertLCObserver: o, } } func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { r.metrics.numNotifications.WithLabelValues(r.labelValues...).Inc() - ctx, alerts, err := r.exec(ctx, l, alerts...) + ctx, alerts, sent, err := r.exec(ctx, l, alerts...) failureReason := DefaultReason.String() if err != nil { @@ -766,11 +810,26 @@ func (r RetryStage) Exec(ctx context.Context, l log.Logger, alerts ...*types.Ale failureReason = e.Reason.String() } r.metrics.numTotalFailedNotifications.WithLabelValues(append(r.labelValues, failureReason)...).Inc() + if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSendFailed, sent, m) + } + } else if r.alertLCObserver != nil { + m := alertobserver.AlertEventMeta{ + "ctx": ctx, + "integration": r.integration.Name(), + "stageName": "RetryStage", + } + r.alertLCObserver.Observe(alertobserver.EventAlertSent, sent, m) } return ctx, alerts, err } -func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { +func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, []*types.Alert, error) { var sent []*types.Alert // If we shouldn't send notifications for resolved alerts, but there are only @@ -779,10 +838,10 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if !r.integration.SendResolved() { firing, ok := FiringAlerts(ctx) if !ok { - return ctx, nil, errors.New("firing alerts missing") + return ctx, nil, nil, errors.New("firing alerts missing") } if len(firing) == 0 { - return ctx, alerts, nil + return ctx, alerts, sent, nil } for _, a := range alerts { if a.Status() != model.AlertResolved { @@ -824,9 +883,9 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } if iErr != nil { - return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) + return ctx, nil, sent, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } - return ctx, nil, nil + return ctx, nil, sent, nil default: } @@ -840,7 +899,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale if err != nil { r.metrics.numNotificationRequestsFailedTotal.WithLabelValues(r.labelValues...).Inc() if !retry { - return ctx, alerts, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) + return ctx, alerts, sent, fmt.Errorf("%s/%s: notify retry canceled due to unrecoverable error after %d attempts: %w", r.groupName, r.integration.String(), i, err) } if ctx.Err() == nil { if iErr == nil || err.Error() != iErr.Error() { @@ -858,7 +917,7 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } lvl.Log("msg", "Notify success", "attempts", i, "duration", dur) - return ctx, alerts, nil + return ctx, alerts, sent, nil } case <-ctx.Done(): if iErr == nil { @@ -870,9 +929,9 @@ func (r RetryStage) exec(ctx context.Context, l log.Logger, alerts ...*types.Ale } } if iErr != nil { - return ctx, nil, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) + return ctx, nil, sent, fmt.Errorf("%s/%s: notify retry canceled after %d attempts: %w", r.groupName, r.integration.String(), i, iErr) } - return ctx, nil, nil + return ctx, nil, sent, nil } } } diff --git a/notify/notify_test.go b/notify/notify_test.go index d5ce9612e3..2dae1fd0ec 100644 --- a/notify/notify_test.go +++ b/notify/notify_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "github.com/prometheus/alertmanager/alertobserver" "io" "reflect" "testing" @@ -306,7 +307,7 @@ func TestMultiStage(t *testing.T) { alerts3 = []*types.Alert{{}, {}, {}} ) - stage := MultiStage{ + stages := []Stage{ StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of MultiStage") @@ -326,7 +327,9 @@ func TestMultiStage(t *testing.T) { return ctx, alerts3, nil }), } - + stage := MultiStage{ + stages: stages, + } _, alerts, err := stage.Exec(context.Background(), log.NewNopLogger(), alerts1...) if err != nil { t.Fatalf("Exec failed: %s", err) @@ -335,13 +338,28 @@ func TestMultiStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts3) { t.Fatal("Output of MultiStage is not equal to the output of the last stage") } + + // Rerun multistage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + ctx := WithGroupKey(context.Background(), "test") + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + + require.Equal(t, 1, len(observer.PipelineStageAlerts)) + require.Equal(t, 5, len(observer.PipelineStageAlerts["StageFunc"])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelinePassStage][0]["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) } func TestMultiStageFailure(t *testing.T) { var ( ctx = context.Background() s1 = failStage{} - stage = MultiStage{s1} + stage = MultiStage{stages: []Stage{s1}} ) _, _, err := stage.Exec(ctx, log.NewNopLogger(), nil) @@ -356,7 +374,7 @@ func TestRoutingStage(t *testing.T) { alerts2 = []*types.Alert{{}, {}} ) - stage := RoutingStage{ + s := map[string]Stage{ "name": StageFunc(func(ctx context.Context, l log.Logger, alerts ...*types.Alert) (context.Context, []*types.Alert, error) { if !reflect.DeepEqual(alerts, alerts1) { t.Fatal("Input not equal to input of RoutingStage") @@ -365,6 +383,9 @@ func TestRoutingStage(t *testing.T) { }), "not": failStage{}, } + stage := RoutingStage{ + stages: s, + } ctx := WithReceiverName(context.Background(), "name") @@ -376,6 +397,20 @@ func TestRoutingStage(t *testing.T) { if !reflect.DeepEqual(alerts, alerts2) { t.Fatal("Output of RoutingStage is not equal to the output of the inner stage") } + + // Rerun RoutingStage but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + stage.alertLCObserver = observer + _, _, err = stage.Exec(ctx, log.NewNopLogger(), alerts1...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, len(alerts1), len(observer.AlertsPerEvent[alertobserver.EventAlertPipelineStart])) + metaCtx := observer.MetaPerEvent[alertobserver.EventAlertPipelineStart][0]["ctx"].(context.Context) + + _, ok := ReceiverName(metaCtx) + require.True(t, ok) + } func TestRetryStageWithError(t *testing.T) { @@ -392,7 +427,7 @@ func TestRetryStageWithError(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -404,6 +439,7 @@ func TestRetryStageWithError(t *testing.T) { ctx := context.Background() ctx = WithFiringAlerts(ctx, []uint64{0}) + ctx = WithGroupKey(ctx, "test") // Notify with a recoverable error should retry and succeed. resctx, res, err := r.Exec(ctx, log.NewNopLogger(), alerts...) @@ -412,13 +448,40 @@ func TestRetryStageWithError(t *testing.T) { require.Equal(t, alerts, sent) require.NotNil(t, resctx) + // Rerun recoverable error but with alert life cycle observer + observer := alertobserver.NewFakeLifeCycleObserver() + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.Nil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSent])) + meta := observer.MetaPerEvent[alertobserver.EventAlertSent][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx := meta["ctx"].(context.Context) + _, ok := GroupKey(metaCtx) + require.True(t, ok) + // Notify with an unrecoverable error should fail. sent = sent[:0] fail = true retry = false + r.alertLCObserver = nil resctx, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) require.Error(t, err) require.NotNil(t, resctx) + + // Rerun the unrecoverable error but with alert life cycle observer + fail = true + r.alertLCObserver = observer + _, _, err = r.Exec(ctx, log.NewNopLogger(), alerts...) + require.NotNil(t, err) + require.Equal(t, len(alerts), len(observer.AlertsPerEvent[alertobserver.EventAlertSendFailed])) + meta = observer.MetaPerEvent[alertobserver.EventAlertSendFailed][0] + require.Equal(t, "RetryStage", meta["stageName"].(string)) + require.Equal(t, i.Name(), meta["integration"].(string)) + metaCtx = meta["ctx"].(context.Context) + _, ok = GroupKey(metaCtx) + require.True(t, ok) } func TestRetryStageWithErrorCode(t *testing.T) { @@ -445,7 +508,7 @@ func TestRetryStageWithErrorCode(t *testing.T) { }), rs: sendResolved(false), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -562,7 +625,7 @@ func TestRetryStageSendResolved(t *testing.T) { }), rs: sendResolved(true), } - r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{})) + r := NewRetryStage(i, "", NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}), nil) alerts := []*types.Alert{ { @@ -667,7 +730,7 @@ func TestMuteStage(t *testing.T) { }) metrics := NewMetrics(prometheus.NewRegistry(), featurecontrol.NoopFlags{}) - stage := NewMuteStage(muter, metrics) + stage := NewMuteStage(muter, metrics, nil) in := []model.LabelSet{ {}, @@ -728,7 +791,7 @@ func TestMuteStageWithSilences(t *testing.T) { reg := prometheus.NewRegistry() marker := types.NewMarker(reg) silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) - metrics := NewMetrics(reg, featurecontrol.NoopFlags{}) + metrics := NewMetrics(reg, featurecontrol.NoopFlags{}, nil) stage := NewMuteStage(silencer, metrics) in := []model.LabelSet{ @@ -820,6 +883,45 @@ func TestMuteStageWithSilences(t *testing.T) { } } +func TestMuteStageWithAlertObserver(t *testing.T) { + silences, err := silence.New(silence.Options{Retention: time.Hour}) + if err != nil { + t.Fatal(err) + } + _, err = silences.Set(&silencepb.Silence{ + EndsAt: utcNow().Add(time.Hour), + Matchers: []*silencepb.Matcher{{Name: "mute", Pattern: "me"}}, + }) + if err != nil { + t.Fatal(err) + } + + marker := types.NewMarker(prometheus.NewRegistry()) + silencer := silence.NewSilencer(silences, marker, log.NewNopLogger()) + observer := alertobserver.NewFakeLifeCycleObserver() + stage := NewMuteStage(silencer, observer) + + in := []model.LabelSet{ + {"test": "set"}, + {"mute": "me"}, + {"foo": "bar", "test": "set"}, + } + + var inAlerts []*types.Alert + for _, lset := range in { + inAlerts = append(inAlerts, &types.Alert{ + Alert: model.Alert{Labels: lset}, + }) + } + + _, _, err = stage.Exec(context.Background(), log.NewNopLogger(), inAlerts...) + if err != nil { + t.Fatalf("Exec failed: %s", err) + } + require.Equal(t, 1, len(observer.AlertsPerEvent[alertobserver.EventAlertMuted])) + require.Equal(t, inAlerts[1], observer.AlertsPerEvent[alertobserver.EventAlertMuted][0]) +} + func TestTimeMuteStage(t *testing.T) { // Route mutes alerts outside business hours in November, using the +1100 timezone. muteIn := `