Skip to content

Commit

Permalink
add api callback
Browse files Browse the repository at this point in the history
Signed-off-by: Yijie Qin <[email protected]>
  • Loading branch information
qinxx108 authored and yeya24 committed Sep 4, 2024
1 parent c55f013 commit 5322f20
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 3 deletions.
6 changes: 6 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"runtime"
"time"

"github.com/prometheus/alertmanager/util/callback"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -74,6 +76,9 @@ type Options struct {
// according to the current active configuration. Alerts returned are
// filtered by the arguments provided to the function.
GroupFunc func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string)

// APICallback define the callback function that each api call will perform before returned.
APICallback callback.Callback
}

func (o Options) validate() error {
Expand Down Expand Up @@ -115,6 +120,7 @@ func New(opts Options) (*API, error) {
opts.GroupFunc,
opts.StatusFunc,
opts.Silences,
opts.APICallback,
opts.Peer,
log.With(l, "version", "v2"),
opts.Registry,
Expand Down
23 changes: 21 additions & 2 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/prometheus/alertmanager/util/callback"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-openapi/analysis"
Expand Down Expand Up @@ -60,6 +61,7 @@ type API struct {
alerts provider.Alerts
alertGroups groupsFn
getAlertStatus getAlertStatusFn
apiCallback callback.Callback
uptime time.Time

// mtx protects alertmanagerConfig, setAlertStatus and route.
Expand Down Expand Up @@ -88,16 +90,21 @@ func NewAPI(
gf groupsFn,
sf getAlertStatusFn,
silences *silence.Silences,
apiCallback callback.Callback,
peer cluster.ClusterPeer,
l log.Logger,
r prometheus.Registerer,
) (*API, error) {
if apiCallback == nil {
apiCallback = callback.NoopAPICallback{}
}
api := API{
alerts: alerts,
getAlertStatus: sf,
alertGroups: gf,
peer: peer,
silences: silences,
apiCallback: apiCallback,
logger: l,
m: metrics.NewAlerts(r),
uptime: time.Now(),
Expand Down Expand Up @@ -304,7 +311,13 @@ func (api *API) getAlertsHandler(params alert_ops.GetAlertsParams) middleware.Re
return *res[i].Fingerprint < *res[j].Fingerprint
})

return alert_ops.NewGetAlertsOK().WithPayload(res)
callbackRes, err := api.apiCallback.V2GetAlertsCallback(res)
if err != nil {
level.Error(logger).Log("msg", "Failed to call api callback", "err", err)
return alert_ops.NewGetAlertsInternalServerError().WithPayload(err.Error())
}

return alert_ops.NewGetAlertsOK().WithPayload(callbackRes)
}

func (api *API) postAlertsHandler(params alert_ops.PostAlertsParams) middleware.Responder {
Expand Down Expand Up @@ -423,7 +436,13 @@ func (api *API) getAlertGroupsHandler(params alertgroup_ops.GetAlertGroupsParams
res = append(res, ag)
}

return alertgroup_ops.NewGetAlertGroupsOK().WithPayload(res)
callbackRes, err := api.apiCallback.V2GetAlertGroupsCallback(res)
if err != nil {
level.Error(logger).Log("msg", "Failed to call api callback", "err", err)
return alertgroup_ops.NewGetAlertGroupsInternalServerError().WithPayload(err.Error())
}

return alertgroup_ops.NewGetAlertGroupsOK().WithPayload(callbackRes)
}

func (api *API) alertFilter(matchers []*labels.Matcher, silenced, inhibited, active bool) func(a *types.Alert, now time.Time) bool {
Expand Down
225 changes: 224 additions & 1 deletion api/v2/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@ package v2

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/prometheus/alertmanager/dispatch"
alert_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alert"
alertgroup_ops "github.com/prometheus/alertmanager/api/v2/restapi/operations/alertgroup"
"github.com/prometheus/alertmanager/util/callback"
"github.com/go-openapi/runtime"
"github.com/go-openapi/strfmt"
"github.com/prometheus/common/model"
Expand Down Expand Up @@ -510,3 +514,222 @@ receivers:
require.Equal(t, tc.body, string(body))
}
}

func TestListAlertsHandler(t *testing.T) {
now := time.Now()
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "alert1"},
StartsAt: now.Add(-time.Minute),
},
},
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "alert2"},
StartsAt: now.Add(-time.Minute),
},
},
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "alert3"},
StartsAt: now.Add(-time.Minute),
},
},
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "alert4"},
StartsAt: now.Add(-time.Minute),
},
},
{
Alert: model.Alert{
Labels: model.LabelSet{"alertname": "alert5"},
StartsAt: now.Add(-time.Minute),
},
},
}

for _, tc := range []struct {
name string
expectedCode int
anames []string
callback callback.Callback
}{
{
"no call back",
200,
[]string{"alert3", "alert2", "alert1", "alert5", "alert4"},
callback.NoopAPICallback{},
},
{
"callback: only return 1 alert",
200,
[]string{"alert3"},
limitNumberOfAlertsReturnedCallback{limit: 1},
},
{
"callback: only return 3 alert",
200,
[]string{"alert3", "alert2", "alert1"},
limitNumberOfAlertsReturnedCallback{limit: 3},
},
} {
t.Run(tc.name, func(t *testing.T) {
alertsProvider := newFakeAlerts(alerts)
api := API{
uptime: time.Now(),
getAlertStatus: getAlertStatus,
logger: log.NewNopLogger(),
apiCallback: tc.callback,
alerts: alertsProvider,
setAlertStatus: func(model.LabelSet) {},
}
api.route = dispatch.NewRoute(&config.Route{Receiver: "def-receiver"}, nil)
r, err := http.NewRequest("GET", "/api/v2/alerts", nil)
require.NoError(t, err)

w := httptest.NewRecorder()
p := runtime.TextProducer()
silence := false
inhibited := false
active := true
responder := api.getAlertsHandler(alert_ops.GetAlertsParams{
HTTPRequest: r,
Silenced: &silence,
Inhibited: &inhibited,
Active: &active,
})
responder.WriteResponse(w, p)
body, _ := io.ReadAll(w.Result().Body)

require.Equal(t, tc.expectedCode, w.Code)
retAlerts := open_api_models.GettableAlerts{}
err = json.Unmarshal(body, &retAlerts)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
anames := []string{}
for _, a := range retAlerts {
name, ok := a.Labels["alertname"]
if ok {
anames = append(anames, string(name))
}
}
require.Equal(t, tc.anames, anames)
})
}
}

func TestGetAlertGroupsHandler(t *testing.T) {
var startAt time.Time
alerts := []*types.Alert{
{
Alert: model.Alert{
Labels: model.LabelSet{"state": "active", "alertname": "alert1"},
StartsAt: startAt,
},
},
{
Alert: model.Alert{
Labels: model.LabelSet{"state": "unprocessed", "alertname": "alert2"},
StartsAt: startAt,
},
},
}
aginfos := dispatch.AlertGroups{
&dispatch.AlertGroup{
Labels: model.LabelSet{
"alertname": "TestingAlert",
},
Receiver: "testing",
Alerts: alerts[:1],
},
&dispatch.AlertGroup{
Labels: model.LabelSet{
"alertname": "HighErrorRate",
},
Receiver: "prod",
Alerts: alerts[:2],
},
}
for _, tc := range []struct {
name string
numberOfAG int
expectedCode int
callback callback.Callback
}{
{
"no call back",
2,
200,
callback.NoopAPICallback{},
},
{
"callback: only return 1 alert group",
1,
200,
limitNumberOfAlertsReturnedCallback{limit: 1},
},
{
"callback: only return 2 alert group",
2,
200,
limitNumberOfAlertsReturnedCallback{limit: 2},
},
} {
t.Run(tc.name, func(t *testing.T) {
api := API{
uptime: time.Now(),
alertGroups: func(func(*dispatch.Route) bool, func(*types.Alert, time.Time) bool) (dispatch.AlertGroups, map[model.Fingerprint][]string) {
return aginfos, nil
},
getAlertStatus: getAlertStatus,
logger: log.NewNopLogger(),
apiCallback: tc.callback,
}
r, err := http.NewRequest("GET", "/api/v2/alertgroups", nil)
require.NoError(t, err)

w := httptest.NewRecorder()
p := runtime.TextProducer()
silence := false
inhibited := false
active := true
responder := api.getAlertGroupsHandler(alertgroup_ops.GetAlertGroupsParams{
HTTPRequest: r,
Silenced: &silence,
Inhibited: &inhibited,
Active: &active,
})
responder.WriteResponse(w, p)
body, _ := io.ReadAll(w.Result().Body)

require.Equal(t, tc.expectedCode, w.Code)
retAlertGroups := open_api_models.AlertGroups{}
err = json.Unmarshal(body, &retAlertGroups)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
require.Equal(t, tc.numberOfAG, len(retAlertGroups))
})
}
}

type limitNumberOfAlertsReturnedCallback struct {
limit int
}

func (n limitNumberOfAlertsReturnedCallback) V2GetAlertsCallback(alerts open_api_models.GettableAlerts) (open_api_models.GettableAlerts, error) {
return alerts[:n.limit], nil
}

func (n limitNumberOfAlertsReturnedCallback) V2GetAlertGroupsCallback(alertgroups open_api_models.AlertGroups) (open_api_models.AlertGroups, error) {
return alertgroups[:n.limit], nil
}

func getAlertStatus(model.Fingerprint) types.AlertStatus {
status := types.AlertStatus{SilencedBy: []string{}, InhibitedBy: []string{}}
status.State = types.AlertStateActive
return status
}
42 changes: 42 additions & 0 deletions api/v2/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
"testing"
"time"

"github.com/prometheus/common/model"

"github.com/prometheus/alertmanager/provider"
"github.com/prometheus/alertmanager/types"

"github.com/go-openapi/strfmt"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -68,3 +73,40 @@ func createLabelMatcher(t *testing.T, name, value string, matchType labels.Match
matcher, _ := labels.NewMatcher(matchType, name, value)
return matcher
}

// fakeAlerts is a struct implementing the provider.Alerts interface for tests.
type fakeAlerts struct {
fps map[model.Fingerprint]int
alerts []*types.Alert
err error
}

func newFakeAlerts(alerts []*types.Alert) *fakeAlerts {
fps := make(map[model.Fingerprint]int)
for i, a := range alerts {
fps[a.Fingerprint()] = i
}
f := &fakeAlerts{
alerts: alerts,
fps: fps,
}
return f
}

func (f *fakeAlerts) Subscribe() provider.AlertIterator { return nil }
func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil }
func (f *fakeAlerts) Put(alerts ...*types.Alert) error {
return f.err
}

func (f *fakeAlerts) GetPending() provider.AlertIterator {
ch := make(chan *types.Alert)
done := make(chan struct{})
go func() {
defer close(ch)
for _, a := range f.alerts {
ch <- a
}
}()
return provider.NewAlertIterator(ch, done, f.err)
}
Loading

0 comments on commit 5322f20

Please sign in to comment.