From 9a61e38aee1298343b73d36a269e6117bdeda567 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20Leszko?= Date: Tue, 23 Jul 2024 13:45:23 +0200 Subject: [PATCH] Revert "Change Serf communication from local to http calls (#1330)" This reverts commit fcff1417142c5e2f80b3ef0fbde3baf711cd3905. --- api/http.go | 9 +-- api/http_internal.go | 14 ++--- cluster/cluster.go | 27 +++----- config/cli.go | 1 - handlers/events.go | 78 ++---------------------- handlers/events_test.go | 65 +------------------- handlers/geolocation/geolocation.go | 25 ++------ handlers/geolocation/geolocation_test.go | 24 +++----- main.go | 69 ++++++++++++++------- mapic/mistapiconnector_app.go | 2 - 10 files changed, 86 insertions(+), 228 deletions(-) diff --git a/api/http.go b/api/http.go index 4631a3e36..ede34709a 100644 --- a/api/http.go +++ b/api/http.go @@ -8,6 +8,7 @@ import ( "github.com/golang/glog" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/balancer" + "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers" "github.com/livepeer/catalyst-api/handlers/analytics" @@ -20,8 +21,8 @@ import ( "github.com/livepeer/go-api-client" ) -func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) error { - router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic) +func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error { + router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic) server := http.Server{Addr: cli.HTTPAddress, Handler: router} ctx, cancel := context.WithCancel(ctx) @@ -47,7 +48,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo return server.Shutdown(ctx) } -func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) *httprouter.Router { +func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router { router := httprouter.New() withLogging := middleware.LogRequest() withCORS := middleware.AllowCORS() @@ -58,7 +59,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b AccessToken: cli.APIToken, }) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) router.GET("/ok", withLogging(catalystApiHandlers.Ok())) router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck())) diff --git a/api/http_internal.go b/api/http_internal.go index db6c62ccc..e975c3474 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato Server: cli.APIServer, AccessToken: cli.APIToken, }) - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal) + eventsHandler := &handlers.EventsHandlersCollection{Cluster: c} ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine} accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic) analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB) @@ -109,14 +109,9 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato ), ) - // Handler to get members Catalyst API => Catalyst - router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler())) - // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst + // Public handler to propagate an event to all Catalyst nodes router.POST("/api/events", withLogging(eventsHandler.Events())) - // Handler to forward the user event from Catalyst => Catalyst API - router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) - // Public GET handler to retrieve the public key for vod encryption router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) @@ -135,6 +130,9 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Endpoint to receive segments and manifests that ffmpeg produces router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) + // Temporary endpoint for admin queries + router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) + return router } diff --git a/cluster/cluster.go b/cluster/cluster.go index e83173383..f9d83dc68 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -182,25 +182,10 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) { } func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) { - return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name) -} - -func toClusterMembers(members []serf.Member) []Member { - var nodes []Member - for _, member := range members { - nodes = append(nodes, Member{ - Name: member.Name, - Tags: member.Tags, - Status: member.Status.String(), - }) - } - return nodes -} - -func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) { - var nodes []Member + all := c.serf.Members() + nodes := []Member{} for _, member := range all { - if status != "" && status != member.Status { + if status != "" && status != member.Status.String() { continue } if name != "" && name != member.Name { @@ -215,7 +200,11 @@ func FilterMembers(all []Member, filter map[string]string, status string, name s } } if matches { - nodes = append(nodes, member) + nodes = append(nodes, Member{ + Name: member.Name, + Tags: member.Tags, + Status: member.Status.String(), + }) } } return nodes, nil diff --git a/config/cli.go b/config/cli.go index 418812fea..a968ce80f 100644 --- a/config/cli.go +++ b/config/cli.go @@ -71,7 +71,6 @@ type Cli struct { KafkaUser string KafkaPassword string AnalyticsKafkaTopic string - SerfMembersEndpoint string // mapping playbackId to value between 0.0 to 100.0 CdnRedirectPlaybackPct map[string]float64 diff --git a/handlers/events.go b/handlers/events.go index a7d78abd3..7efac8eb2 100644 --- a/handlers/events.go +++ b/handlers/events.go @@ -3,43 +3,25 @@ package handlers import ( "encoding/json" "fmt" - "github.com/golang/glog" "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" - "github.com/livepeer/catalyst-api/balancer" "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/errors" - "github.com/livepeer/catalyst-api/events" - mistapiconnector "github.com/livepeer/catalyst-api/mapic" "github.com/xeipuuv/gojsonschema" "io" "net/http" - "strings" ) type EventsHandlersCollection struct { - cluster cluster.Cluster - - mapic mistapiconnector.IMac - bal balancer.Balancer -} - -type Event struct { - Resource string `json:"resource"` - PlaybackID string `json:"playback_id"` + Cluster cluster.Cluster } -func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection { - return &EventsHandlersCollection{ - cluster: cluster, - mapic: mapic, - bal: bal, +func (d *EventsHandlersCollection) Events() httprouter.Handle { + type Event struct { + Resource string `json:"resource"` + PlaybackID string `json:"playback_id"` } -} -// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream. -// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent(). -func (d *EventsHandlersCollection) Events() httprouter.Handle { schema := inputSchemasCompiled["Event"] return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { payload, err := io.ReadAll(req.Body) @@ -62,7 +44,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { return } - err = d.cluster.BroadcastEvent(serf.UserEvent{ + err = d.Cluster.BroadcastEvent(serf.UserEvent{ Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID), Payload: payload, Coalesce: true, @@ -74,51 +56,3 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { } } } - -// ReceiveUserEvent is a handler to receive Serf events from Catalyst. -// The idea is that: -// 1. Studio API sends an event to Catalyst (received by Events() handler) -// 2. Events() handler propagates the event to all Serf nodes -// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler) -// 4. ReceiveUserEvent() handler processes the event -func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle { - return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - userEventPayload, err := io.ReadAll(r.Body) - if err != nil { - glog.Errorf("cannot read payload: %s", err) - return - } - e, err := events.Unmarshal(userEventPayload) - if err != nil { - glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err) - return - } - switch event := e.(type) { - case *events.StreamEvent: - glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) - c.mapic.RefreshStreamIfNeeded(event.PlaybackID) - case *events.NukeEvent: - glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) - c.mapic.NukeStream(event.PlaybackID) - return - case *events.StopSessionsEvent: - glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) - c.mapic.StopSessions(event.PlaybackID) - return - case *events.NodeUpdateEvent: - if glog.V(5) { - glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) - } - - c.bal.UpdateNodes(event.NodeID, event.NodeMetrics) - for _, stream := range event.GetStreams() { - c.bal.UpdateStreams(event.NodeID, stream, false) - } - for _, stream := range event.GetIngestStreams() { - c.bal.UpdateStreams(event.NodeID, stream, true) - } - default: - glog.Errorf("unsupported serf event: %v", e) - } - } -} diff --git a/handlers/events_test.go b/handlers/events_test.go index 57e31f519..797bc0f3a 100644 --- a/handlers/events_test.go +++ b/handlers/events_test.go @@ -5,7 +5,6 @@ import ( "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" - mock_mistapiconnector "github.com/livepeer/catalyst-api/mocks/mistapiconnector" "github.com/stretchr/testify/require" "net/http" "net/http/httptest" @@ -63,7 +62,7 @@ func TestEventHandler(t *testing.T) { return nil }).AnyTimes() - catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil) + catalystApiHandlers := EventsHandlersCollection{Cluster: mc} router := httprouter.New() router.POST("/events", catalystApiHandlers.Events()) @@ -75,65 +74,3 @@ func TestEventHandler(t *testing.T) { require.Equal(rr.Result().StatusCode, tt.wantHttpCode) } } - -func TestReceiveUserEventHandler(t *testing.T) { - require := require.New(t) - playbackId := "123456789" - - tests := []struct { - name string - requestBody string - functionCalled string - }{ - { - name: "Refresh Stream", - requestBody: `{ - "resource": "stream", - "playback_id": "123456789" - }`, - functionCalled: "RefreshStreamIfNeeded", - }, - { - name: "Nuke Stream", - requestBody: `{ - "resource": "nuke", - "playback_id": "123456789" - }`, - functionCalled: "NukeStream", - }, - { - name: "Stop Sessions", - requestBody: `{ - "resource": "stopSessions", - "playback_id": "123456789" - }`, - functionCalled: "StopSessions", - }, - } - - ctrl := gomock.NewController(t) - mac := mock_mistapiconnector.NewMockIMac(ctrl) - - catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil) - router := httprouter.New() - router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent()) - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - switch tt.functionCalled { - case "RefreshStreamIfNeeded": - mac.EXPECT().RefreshStreamIfNeeded(playbackId).Times(1) - case "NukeStream": - mac.EXPECT().NukeStream(playbackId).Times(1) - case "StopSessions": - mac.EXPECT().StopSessions(playbackId).Times(1) - } - - req, _ := http.NewRequest("POST", "/receiveUserEvent", strings.NewReader(tt.requestBody)) - rr := httptest.NewRecorder() - router.ServeHTTP(rr, req) - - require.Equal(rr.Result().StatusCode, 200) - }) - } -} diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index fcbba6957..b6ce64f4d 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -67,14 +67,16 @@ func (l *streamPullRateLimit) mark(playbackID string) { type GeolocationHandlersCollection struct { Balancer balancer.Balancer + Cluster cluster.Cluster Config config.Cli Lapi *api.Client streamPullRateLimit *streamPullRateLimit } -func NewGeolocationHandlersCollection(balancer balancer.Balancer, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection { +func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluster.Cluster, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection { return &GeolocationHandlersCollection{ Balancer: balancer, + Cluster: cluster, Config: config, Lapi: lapi, streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval), @@ -242,7 +244,7 @@ func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string } func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) { - members, err := c.membersFiltered(filter, "", name) + members, err := c.Cluster.MembersFiltered(filter, "", name) if err != nil { return cluster.Member{}, err } @@ -402,7 +404,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo } func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID string, region string) { - members, err := c.membersFiltered(map[string]string{"region": region}, "", "") + members, err := c.Cluster.MembersFiltered(map[string]string{"region": region}, "", "") if err != nil || len(members) == 0 { glog.Errorf("Error fetching member list: %v", err) return @@ -420,23 +422,6 @@ func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID stri }() } -func (c *GeolocationHandlersCollection) membersFiltered(filter map[string]string, status, name string) ([]cluster.Member, error) { - resp, err := http.Get(c.Config.SerfMembersEndpoint) - if err != nil { - return []cluster.Member{}, err - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return []cluster.Member{}, fmt.Errorf("failed to get members: %s", resp.Status) - } - var members []cluster.Member - if err := json.NewDecoder(resp.Body).Decode(&members); err != nil { - return []cluster.Member{}, err - } - - return cluster.FilterMembers(members, filter, status, name) -} - func parsePlus(plusString string) (string, string) { slice := strings.Split(plusString, "+") prefix := "" diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index 94115bdea..da5578d68 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -2,7 +2,6 @@ package geolocation import ( "context" - "encoding/json" "errors" "fmt" "math/rand" @@ -18,6 +17,7 @@ import ( "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/metrics" mockbalancer "github.com/livepeer/catalyst-api/mocks/balancer" + mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) @@ -30,7 +30,7 @@ const ( ) var fakeSerfMember = cluster.Member{ - Name: "someurl.com", + Name: "fake-serf-member", Tags: map[string]string{ "http": fmt.Sprintf("http://%s", closestNodeAddr), "https": fmt.Sprintf("https://%s", closestNodeAddr), @@ -142,6 +142,7 @@ func getHLSURLsWithSeg(proto, host, seg, query string) []string { func mockHandlers(t *testing.T) *GeolocationHandlersCollection { ctrl := gomock.NewController(t) mb := mockbalancer.NewMockBalancer(ctrl) + mc := mockcluster.NewMockCluster(ctrl) mb.EXPECT(). GetBestNode(context.Background(), prefixes[:], playbackID, "", "", "", gomock.Any()). AnyTimes(). @@ -157,23 +158,16 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { AnyTimes(). Return("", "", errors.New("")) - router := httprouter.New() - router.GET("/api/serf/members", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { - w.Header().Set("Content-Type", "application/json") - res, err := json.Marshal([]cluster.Member{fakeSerfMember}) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - w.Write(res) // nolint:errcheck - }) - testServer := httptest.NewServer(router) + mc.EXPECT(). + MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr). + AnyTimes(). + Return([]cluster.Member{fakeSerfMember}, nil) coll := GeolocationHandlersCollection{ Balancer: mb, + Cluster: mc, Config: config.Cli{ - RedirectPrefixes: prefixes[:], - SerfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL), + RedirectPrefixes: prefixes[:], }, } return &coll diff --git a/main.go b/main.go index ca30baef9..7a70b0b1f 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,15 @@ package main import ( - "bytes" "context" "crypto/rsa" "database/sql" "flag" "fmt" "log" - "net/http" "os" "os/signal" + "strings" "syscall" "time" @@ -128,7 +127,6 @@ func main() { fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username") fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password") fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs") - fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "http://127.0.0.1:7979/api/serf/members", "Endpoint to get the current members in the cluster") pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port") fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed") @@ -308,7 +306,7 @@ func main() { }) group.Go(func() error { - return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic) + return api.ListenAndServe(ctx, cli, vodEngine, bal, c, mapic) }) group.Go(func() error { @@ -335,8 +333,7 @@ func main() { }) group.Go(func() error { - serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", cli.OwnInternalURL()) - return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) + return handleClusterEvents(ctx, mapic, bal, c) }) err = group.Wait() @@ -346,49 +343,75 @@ func main() { // Eventually this will be the main loop of the state machine, but we just have one variable right now. func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Cluster) error { memberCh := c.MemberChan() + ticker := time.NewTicker(1 * time.Minute) for { + var members []cluster.Member + var err error select { case <-ctx.Done(): return nil - case list := <-memberCh: - err := bal.UpdateMembers(ctx, list) + case <-ticker.C: + members, err = c.MembersFiltered(cluster.MediaFilter, "alive", "") if err != nil { - return fmt.Errorf("failed to update load balancer from member list: %w", err) + glog.Errorf("Error getting serf members: %v", err) + continue } + case members = <-memberCh: + } + err = bal.UpdateMembers(ctx, members) + if err != nil { + glog.Errorf("Failed to update load balancer from member list: %v", err) + continue } } } -func handleClusterEvents(ctx context.Context, callbackEndpoint string, c cluster.Cluster) error { +func handleClusterEvents(ctx context.Context, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster) error { eventCh := c.EventChan() for { select { case <-ctx.Done(): return nil case e := <-eventCh: - processClusterEvent(callbackEndpoint, e) + processClusterEvent(mapic, bal, e) } } } -func processClusterEvent(callbackEndpoint string, userEvent serf.UserEvent) { - client := &http.Client{} - glog.V(5).Infof("received serf user event, propagating to %s, event=%s", callbackEndpoint, userEvent.String()) - +func processClusterEvent(mapic mistapiconnector.IMac, bal balancer.Balancer, userEvent serf.UserEvent) { go func() { - req, err := http.NewRequest("POST", callbackEndpoint, bytes.NewBuffer(userEvent.Payload)) + e, err := events.Unmarshal(userEvent.Payload) if err != nil { - glog.Errorf("error creating request: %v", err) + glog.Errorf("cannot unmarshal received serf event %v: %s", userEvent, err) return } - resp, err := client.Do(req) - if err != nil { - glog.Errorf("error sending request: %v", err) + switch event := e.(type) { + case *events.StreamEvent: + glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) + mapic.RefreshStreamIfNeeded(event.PlaybackID) + case *events.NukeEvent: + glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) + mapic.NukeStream(event.PlaybackID) return - } - defer resp.Body.Close() + case *events.StopSessionsEvent: + glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) + mapic.StopSessions(event.PlaybackID) + return + case *events.NodeUpdateEvent: + if glog.V(5) { + glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEvent.Payload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) + } - glog.V(5).Infof("propagated serf user event to %s, event=%s", callbackEndpoint, userEvent.String()) + bal.UpdateNodes(event.NodeID, event.NodeMetrics) + for _, stream := range event.GetStreams() { + bal.UpdateStreams(event.NodeID, stream, false) + } + for _, stream := range event.GetIngestStreams() { + bal.UpdateStreams(event.NodeID, stream, true) + } + default: + glog.Errorf("unsupported serf event: %v", e) + } }() } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index ac7cc1857..feec9887b 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -1,8 +1,6 @@ //nolint:all package mistapiconnector -//go:generate mockgen -source=./mistapiconnector_app.go -destination=../mocks/mistapiconnector/mistapiconnector_app.go - import ( "context" "errors"