From 5322f20799718748bc2dbffe77960295721cece3 Mon Sep 17 00:00:00 2001 From: Yijie Qin Date: Mon, 19 Jun 2023 19:59:19 -0400 Subject: [PATCH] add api callback Signed-off-by: Yijie Qin --- api/api.go | 6 + api/v2/api.go | 23 +++- api/v2/api_test.go | 225 +++++++++++++++++++++++++++++++++++++- api/v2/testing.go | 42 +++++++ util/callback/callback.go | 36 ++++++ 5 files changed, 329 insertions(+), 3 deletions(-) create mode 100644 util/callback/callback.go diff --git a/api/api.go b/api/api.go index 2e1e1ea425..8afb686bc2 100644 --- a/api/api.go +++ b/api/api.go @@ -20,6 +20,8 @@ import ( "runtime" "time" + "github.com/prometheus/alertmanager/util/callback" + "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -74,6 +76,9 @@ type Options struct { // according to the current active configuration. Alerts returned are // filtered by the arguments provided to the function. GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) + + // APICallback define the callback function that each api call will perform before returned. + APICallback callback.Callback } func (o Options) validate() error { @@ -115,6 +120,7 @@ func New(opts Options) (*API, error) { opts.GroupFunc, opts.StatusFunc, opts.Silences, + opts.APICallback, opts.Peer, log.With(l, "version", "v2"), opts.Registry, diff --git a/api/v2/api.go b/api/v2/api.go index b4f57e75e2..a0bc803726 100644 --- a/api/v2/api.go +++ b/api/v2/api.go @@ -22,6 +22,7 @@ import ( "sync" "time" + "github.com/prometheus/alertmanager/util/callback" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/go-openapi/analysis" @@ -60,6 +61,7 @@ type API struct { alerts provider.Alerts alertGroups groupsFn getAlertStatus getAlertStatusFn + apiCallback callback.Callback uptime time.Time // mtx protects alertmanagerConfig, setAlertStatus and route. @@ -88,16 +90,21 @@ func NewAPI( gf groupsFn, sf getAlertStatusFn, silences *silence.Silences, + apiCallback callback.Callback, peer cluster.ClusterPeer, l log.Logger, r prometheus.Registerer, ) (*API, error) { + if apiCallback == nil { + apiCallback = callback.NoopAPICallback{} + } api := API{ alerts: alerts, getAlertStatus: sf, alertGroups: gf, peer: peer, silences: silences, + apiCallback: apiCallback, logger: l, m: metrics.NewAlerts(r), uptime: time.Now(), @@ -304,7 +311,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re return *res[i].Fingerprint < *res[j].Fingerprint }) - return alert_ops.NewGetAlertsOK().WithPayload(res) + callbackRes, err := api.apiCallback.V2GetAlertsCallback(res) + if err != nil { + level.Error(logger).Log("msg", "Failed to call api callback", "err", err) + return alert_ops.NewGetAlertsInternalServerError().WithPayload(err.Error()) + } + + return alert_ops.NewGetAlertsOK().WithPayload(callbackRes) } func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder { @@ -423,7 +436,13 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams res = append(res, ag) } - return alertgroup_ops.NewGetAlertGroupsOK().WithPayload(res) + callbackRes, err := api.apiCallback.V2GetAlertGroupsCallback(res) + if err != nil { + level.Error(logger).Log("msg", "Failed to call api callback", "err", err) + return alertgroup_ops.NewGetAlertGroupsInternalServerError().WithPayload(err.Error()) + } + + return alertgroup_ops.NewGetAlertGroupsOK().WithPayload(callbackRes) } func (api *API) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool { diff --git a/api/v2/api_test.go b/api/v2/api_test.go index d520dbde13..b5f62465c8 100644 --- a/api/v2/api_test.go +++ b/api/v2/api_test.go @@ -15,6 +15,7 @@ package v2 import ( "bytes" + "encoding/json" "fmt" "io" "net/http" @@ -22,7 +23,10 @@ import ( "strconv" "testing" "time" - + "github.com/prometheus/alertmanager/dispatch" + alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert" + alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup" + "github.com/prometheus/alertmanager/util/callback" "github.com/go-openapi/runtime" "github.com/go-openapi/strfmt" "github.com/prometheus/common/model" @@ -510,3 +514,222 @@ receivers: require.Equal(t, tc.body, string(body)) } } + +func TestListAlertsHandler(t *testing.T) { + now := time.Now() + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert1"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert2"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert3"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert4"}, + StartsAt: now.Add(-time.Minute), + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"alertname": "alert5"}, + StartsAt: now.Add(-time.Minute), + }, + }, + } + + for _, tc := range []struct { + name string + expectedCode int + anames []string + callback callback.Callback + }{ + { + "no call back", + 200, + []string{"alert3", "alert2", "alert1", "alert5", "alert4"}, + callback.NoopAPICallback{}, + }, + { + "callback: only return 1 alert", + 200, + []string{"alert3"}, + limitNumberOfAlertsReturnedCallback{limit: 1}, + }, + { + "callback: only return 3 alert", + 200, + []string{"alert3", "alert2", "alert1"}, + limitNumberOfAlertsReturnedCallback{limit: 3}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + alertsProvider := newFakeAlerts(alerts) + api := API{ + uptime: time.Now(), + getAlertStatus: getAlertStatus, + logger: log.NewNopLogger(), + apiCallback: tc.callback, + alerts: alertsProvider, + setAlertStatus: func(model.LabelSet) {}, + } + api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil) + r, err := http.NewRequest("GET", "/api/v2/alerts", nil) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + silence := false + inhibited := false + active := true + responder := api.getAlertsHandler(alert_ops.GetAlertsParams{ + HTTPRequest: r, + Silenced: &silence, + Inhibited: &inhibited, + Active: &active, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.expectedCode, w.Code) + retAlerts := open_api_models.GettableAlerts{} + err = json.Unmarshal(body, &retAlerts) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + anames := []string{} + for _, a := range retAlerts { + name, ok := a.Labels["alertname"] + if ok { + anames = append(anames, string(name)) + } + } + require.Equal(t, tc.anames, anames) + }) + } +} + +func TestGetAlertGroupsHandler(t *testing.T) { + var startAt time.Time + alerts := []*types.Alert{ + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "active", "alertname": "alert1"}, + StartsAt: startAt, + }, + }, + { + Alert: model.Alert{ + Labels: model.LabelSet{"state": "unprocessed", "alertname": "alert2"}, + StartsAt: startAt, + }, + }, + } + aginfos := dispatch.AlertGroups{ + &dispatch.AlertGroup{ + Labels: model.LabelSet{ + "alertname": "TestingAlert", + }, + Receiver: "testing", + Alerts: alerts[:1], + }, + &dispatch.AlertGroup{ + Labels: model.LabelSet{ + "alertname": "HighErrorRate", + }, + Receiver: "prod", + Alerts: alerts[:2], + }, + } + for _, tc := range []struct { + name string + numberOfAG int + expectedCode int + callback callback.Callback + }{ + { + "no call back", + 2, + 200, + callback.NoopAPICallback{}, + }, + { + "callback: only return 1 alert group", + 1, + 200, + limitNumberOfAlertsReturnedCallback{limit: 1}, + }, + { + "callback: only return 2 alert group", + 2, + 200, + limitNumberOfAlertsReturnedCallback{limit: 2}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + api := API{ + uptime: time.Now(), + alertGroups: func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) { + return aginfos, nil + }, + getAlertStatus: getAlertStatus, + logger: log.NewNopLogger(), + apiCallback: tc.callback, + } + r, err := http.NewRequest("GET", "/api/v2/alertgroups", nil) + require.NoError(t, err) + + w := httptest.NewRecorder() + p := runtime.TextProducer() + silence := false + inhibited := false + active := true + responder := api.getAlertGroupsHandler(alertgroup_ops.GetAlertGroupsParams{ + HTTPRequest: r, + Silenced: &silence, + Inhibited: &inhibited, + Active: &active, + }) + responder.WriteResponse(w, p) + body, _ := io.ReadAll(w.Result().Body) + + require.Equal(t, tc.expectedCode, w.Code) + retAlertGroups := open_api_models.AlertGroups{} + err = json.Unmarshal(body, &retAlertGroups) + if err != nil { + t.Fatalf("Unexpected error %v", err) + } + require.Equal(t, tc.numberOfAG, len(retAlertGroups)) + }) + } +} + +type limitNumberOfAlertsReturnedCallback struct { + limit int +} + +func (n limitNumberOfAlertsReturnedCallback) V2GetAlertsCallback(alerts open_api_models.GettableAlerts) (open_api_models.GettableAlerts, error) { + return alerts[:n.limit], nil +} + +func (n limitNumberOfAlertsReturnedCallback) V2GetAlertGroupsCallback(alertgroups open_api_models.AlertGroups) (open_api_models.AlertGroups, error) { + return alertgroups[:n.limit], nil +} + +func getAlertStatus(model.Fingerprint) types.AlertStatus { + status := types.AlertStatus{SilencedBy: []string{}, InhibitedBy: []string{}} + status.State = types.AlertStateActive + return status +} diff --git a/api/v2/testing.go b/api/v2/testing.go index c813d350d3..c7bcbe627f 100644 --- a/api/v2/testing.go +++ b/api/v2/testing.go @@ -18,6 +18,11 @@ import ( "testing" "time" + "github.com/prometheus/common/model" + + "github.com/prometheus/alertmanager/provider" + "github.com/prometheus/alertmanager/types" + "github.com/go-openapi/strfmt" "github.com/stretchr/testify/require" @@ -68,3 +73,40 @@ func createLabelMatcher(t *testing.T, name, value string, matchType labels.Match matcher, _ := labels.NewMatcher(matchType, name, value) return matcher } + +// fakeAlerts is a struct implementing the provider.Alerts interface for tests. +type fakeAlerts struct { + fps map[model.Fingerprint]int + alerts []*types.Alert + err error +} + +func newFakeAlerts(alerts []*types.Alert) *fakeAlerts { + fps := make(map[model.Fingerprint]int) + for i, a := range alerts { + fps[a.Fingerprint()] = i + } + f := &fakeAlerts{ + alerts: alerts, + fps: fps, + } + return f +} + +func (f *fakeAlerts) Subscribe() provider.AlertIterator { return nil } +func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil } +func (f *fakeAlerts) Put(alerts ...*types.Alert) error { + return f.err +} + +func (f *fakeAlerts) GetPending() provider.AlertIterator { + ch := make(chan *types.Alert) + done := make(chan struct{}) + go func() { + defer close(ch) + for _, a := range f.alerts { + ch <- a + } + }() + return provider.NewAlertIterator(ch, done, f.err) +} diff --git a/util/callback/callback.go b/util/callback/callback.go new file mode 100644 index 0000000000..a7f2c3a71e --- /dev/null +++ b/util/callback/callback.go @@ -0,0 +1,36 @@ +// Copyright 2019 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package callback + +import ( + open_api_models "github.com/prometheus/alertmanager/api/v2/models" +) + +type Callback interface { + // V2GetAlertsCallback is called before v2 getAlerts api returned. + V2GetAlertsCallback(alerts open_api_models.GettableAlerts) (open_api_models.GettableAlerts, error) + + // V2GetAlertGroupsCallback is called before v2 GetAlertGroups api returned. + V2GetAlertGroupsCallback(alertgroups open_api_models.AlertGroups) (open_api_models.AlertGroups, error) +} + +type NoopAPICallback struct{} + +func (n NoopAPICallback) V2GetAlertsCallback(alerts open_api_models.GettableAlerts) (open_api_models.GettableAlerts, error) { + return alerts, nil +} + +func (n NoopAPICallback) V2GetAlertGroupsCallback(alertgroups open_api_models.AlertGroups) (open_api_models.AlertGroups, error) { + return alertgroups, nil +}