diff --git a/api/http.go b/api/http.go index ede34709a..15992a0ca 100644 --- a/api/http.go +++ b/api/http.go @@ -8,7 +8,6 @@ import ( "github.com/golang/glog" "github.com/julienschmidt/httprouter" "github.com/livepeer/catalyst-api/balancer" - "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/handlers" "github.com/livepeer/catalyst-api/handlers/analytics" @@ -21,8 +20,8 @@ import ( "github.com/livepeer/go-api-client" ) -func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error { - router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic) +func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) error { + router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic, serfMembersEndpoint) server := http.Server{Addr: cli.HTTPAddress, Handler: router} ctx, cancel := context.WithCancel(ctx) @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo return server.Shutdown(ctx) } -func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router { +func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) *httprouter.Router { router := httprouter.New() withLogging := middleware.LogRequest() withCORS := middleware.AllowCORS() @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b AccessToken: cli.APIToken, }) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint) router.GET("/ok", withLogging(catalystApiHandlers.Ok())) router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck())) diff --git a/api/http_internal.go b/api/http_internal.go index e975c3474..8e5efb468 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -29,8 +29,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) error { - router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB) +func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error { + router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint) server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router} ctx, cancel := context.WithCancel(ctx) @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe return server.Shutdown(ctx) } -func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) *httprouter.Router { +func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router { router := httprouter.New() withLogging := middleware.LogRequest() withAuth := middleware.IsAuthorized @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato Server: cli.APIServer, AccessToken: cli.APIToken, }) - geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi) + geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint) spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey) catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine} - eventsHandler := &handlers.EventsHandlersCollection{Cluster: c} + eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal) ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine} accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic) analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB) @@ -84,54 +84,62 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Simple endpoint for healthchecks router.GET("/ok", withLogging(catalystApiHandlers.Ok())) - var metricsHandlers []http.Handler - if cli.ShouldMapic() { - metricsHandlers = append(metricsHandlers, mapic.MetricsHandler()) - } - if cli.MistPrometheus != "" { - // Enable Mist metrics enrichment - metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler()) - } - metricsHandlers = append(metricsHandlers, promhttp.Handler()) - // Hacky combined metrics handler. To be refactored away with mapic. - router.GET("/metrics", concatHandlers(metricsHandlers...)) - - // Public Catalyst API - router.POST("/api/vod", - withLogging( - withAuth( - cli.APIToken, - withCapacityChecking( - vodEngine, - catalystApiHandlers.UploadVOD(), + if cli.IsApiMode() { + var metricsHandlers []http.Handler + if cli.ShouldMapic() { + metricsHandlers = append(metricsHandlers, mapic.MetricsHandler()) + } + if cli.MistPrometheus != "" { + // Enable Mist metrics enrichment + metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler()) + } + metricsHandlers = append(metricsHandlers, promhttp.Handler()) + // Hacky combined metrics handler. To be refactored away with mapic. + router.GET("/metrics", concatHandlers(metricsHandlers...)) + + // Public Catalyst API + router.POST("/api/vod", + withLogging( + withAuth( + cli.APIToken, + withCapacityChecking( + vodEngine, + catalystApiHandlers.UploadVOD(), + ), ), ), - ), - ) + ) - // Public handler to propagate an event to all Catalyst nodes - router.POST("/api/events", withLogging(eventsHandler.Events())) + // Public GET handler to retrieve the public key for vod encryption + router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) - // Public GET handler to retrieve the public key for vod encryption - router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) + // Endpoint to receive "Triggers" (callbacks) from Mist + router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger())) - // Endpoint to receive "Triggers" (callbacks) from Mist - router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger())) + // Handler for STREAM_SOURCE triggers + broker.OnStreamSource(geoHandlers.HandleStreamSource) - // Handler for STREAM_SOURCE triggers - broker.OnStreamSource(geoHandlers.HandleStreamSource) + // Handler for USER_NEW triggers + broker.OnUserNew(accessControlHandlers.HandleUserNew) - // Handler for USER_NEW triggers - broker.OnUserNew(accessControlHandlers.HandleUserNew) + // Handler for USER_END triggers. + broker.OnUserEnd(analyticsHandlers.HandleUserEnd) - // Handler for USER_END triggers. - broker.OnUserEnd(analyticsHandlers.HandleUserEnd) + // Endpoint to receive segments and manifests that ffmpeg produces + router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) - // Endpoint to receive segments and manifests that ffmpeg produces - router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) + // Handler to forward the user event from Catalyst => Catalyst API + router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) + } - // Temporary endpoint for admin queries - router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) + if cli.IsClusterMode() { + // Temporary endpoint for admin queries + router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) + // Handler to get members Catalyst API => Catalyst + router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler())) + // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst + router.POST("/api/events", withLogging(eventsHandler.Events())) + } return router } diff --git a/cluster/cluster.go b/cluster/cluster.go index f9d83dc68..e83173383 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) { } func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) { - all := c.serf.Members() - nodes := []Member{} + return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name) +} + +func toClusterMembers(members []serf.Member) []Member { + var nodes []Member + for _, member := range members { + nodes = append(nodes, Member{ + Name: member.Name, + Tags: member.Tags, + Status: member.Status.String(), + }) + } + return nodes +} + +func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) { + var nodes []Member for _, member := range all { - if status != "" && status != member.Status.String() { + if status != "" && status != member.Status { continue } if name != "" && name != member.Name { @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str } } if matches { - nodes = append(nodes, Member{ - Name: member.Name, - Tags: member.Tags, - Status: member.Status.String(), - }) + nodes = append(nodes, member) } } return nodes, nil diff --git a/config/cli.go b/config/cli.go index a968ce80f..4243282e9 100644 --- a/config/cli.go +++ b/config/cli.go @@ -25,6 +25,7 @@ type Cli struct { MistUser string MistPassword string MistPrometheus string + Mode string MistPort int MistConnectTimeout time.Duration MistStreamSource string @@ -71,6 +72,8 @@ type Cli struct { KafkaUser string KafkaPassword string AnalyticsKafkaTopic string + SerfMembersEndpoint string + CatalystApiURL string // mapping playbackId to value between 0.0 to 100.0 CdnRedirectPlaybackPct map[string]float64 @@ -110,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool { return cli.APIServer != "" } +func (cli *Cli) IsClusterMode() bool { + return cli.Mode == "cluster-only" || cli.Mode == "all" +} + +func (cli *Cli) IsApiMode() bool { + return cli.Mode == "api-only" || cli.Mode == "all" +} + // Should we enable mist-cleanup script to run periodically and delete leaky shm? func (cli *Cli) ShouldMistCleanup() bool { return cli.MistCleanup diff --git a/handlers/events.go b/handlers/events.go index 7efac8eb2..a7d78abd3 100644 --- a/handlers/events.go +++ b/handlers/events.go @@ -3,25 +3,43 @@ package handlers import ( "encoding/json" "fmt" + "github.com/golang/glog" "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" + "github.com/livepeer/catalyst-api/balancer" "github.com/livepeer/catalyst-api/cluster" "github.com/livepeer/catalyst-api/errors" + "github.com/livepeer/catalyst-api/events" + mistapiconnector "github.com/livepeer/catalyst-api/mapic" "github.com/xeipuuv/gojsonschema" "io" "net/http" + "strings" ) type EventsHandlersCollection struct { - Cluster cluster.Cluster + cluster cluster.Cluster + + mapic mistapiconnector.IMac + bal balancer.Balancer } -func (d *EventsHandlersCollection) Events() httprouter.Handle { - type Event struct { - Resource string `json:"resource"` - PlaybackID string `json:"playback_id"` +type Event struct { + Resource string `json:"resource"` + PlaybackID string `json:"playback_id"` +} + +func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection { + return &EventsHandlersCollection{ + cluster: cluster, + mapic: mapic, + bal: bal, } +} +// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream. +// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent(). +func (d *EventsHandlersCollection) Events() httprouter.Handle { schema := inputSchemasCompiled["Event"] return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { payload, err := io.ReadAll(req.Body) @@ -44,7 +62,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { return } - err = d.Cluster.BroadcastEvent(serf.UserEvent{ + err = d.cluster.BroadcastEvent(serf.UserEvent{ Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID), Payload: payload, Coalesce: true, @@ -56,3 +74,51 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { } } } + +// ReceiveUserEvent is a handler to receive Serf events from Catalyst. +// The idea is that: +// 1. Studio API sends an event to Catalyst (received by Events() handler) +// 2. Events() handler propagates the event to all Serf nodes +// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler) +// 4. ReceiveUserEvent() handler processes the event +func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle { + return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + userEventPayload, err := io.ReadAll(r.Body) + if err != nil { + glog.Errorf("cannot read payload: %s", err) + return + } + e, err := events.Unmarshal(userEventPayload) + if err != nil { + glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err) + return + } + switch event := e.(type) { + case *events.StreamEvent: + glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) + c.mapic.RefreshStreamIfNeeded(event.PlaybackID) + case *events.NukeEvent: + glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) + c.mapic.NukeStream(event.PlaybackID) + return + case *events.StopSessionsEvent: + glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) + c.mapic.StopSessions(event.PlaybackID) + return + case *events.NodeUpdateEvent: + if glog.V(5) { + glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) + } + + c.bal.UpdateNodes(event.NodeID, event.NodeMetrics) + for _, stream := range event.GetStreams() { + c.bal.UpdateStreams(event.NodeID, stream, false) + } + for _, stream := range event.GetIngestStreams() { + c.bal.UpdateStreams(event.NodeID, stream, true) + } + default: + glog.Errorf("unsupported serf event: %v", e) + } + } +} diff --git a/handlers/events_test.go b/handlers/events_test.go index 797bc0f3a..57e31f519 100644 --- a/handlers/events_test.go +++ b/handlers/events_test.go @@ -5,6 +5,7 @@ import ( "github.com/hashicorp/serf/serf" "github.com/julienschmidt/httprouter" mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" + mock_mistapiconnector "github.com/livepeer/catalyst-api/mocks/mistapiconnector" "github.com/stretchr/testify/require" "net/http" "net/http/httptest" @@ -62,7 +63,7 @@ func TestEventHandler(t *testing.T) { return nil }).AnyTimes() - catalystApiHandlers := EventsHandlersCollection{Cluster: mc} + catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil) router := httprouter.New() router.POST("/events", catalystApiHandlers.Events()) @@ -74,3 +75,65 @@ func TestEventHandler(t *testing.T) { require.Equal(rr.Result().StatusCode, tt.wantHttpCode) } } + +func TestReceiveUserEventHandler(t *testing.T) { + require := require.New(t) + playbackId := "123456789" + + tests := []struct { + name string + requestBody string + functionCalled string + }{ + { + name: "Refresh Stream", + requestBody: `{ + "resource": "stream", + "playback_id": "123456789" + }`, + functionCalled: "RefreshStreamIfNeeded", + }, + { + name: "Nuke Stream", + requestBody: `{ + "resource": "nuke", + "playback_id": "123456789" + }`, + functionCalled: "NukeStream", + }, + { + name: "Stop Sessions", + requestBody: `{ + "resource": "stopSessions", + "playback_id": "123456789" + }`, + functionCalled: "StopSessions", + }, + } + + ctrl := gomock.NewController(t) + mac := mock_mistapiconnector.NewMockIMac(ctrl) + + catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil) + router := httprouter.New() + router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent()) + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + switch tt.functionCalled { + case "RefreshStreamIfNeeded": + mac.EXPECT().RefreshStreamIfNeeded(playbackId).Times(1) + case "NukeStream": + mac.EXPECT().NukeStream(playbackId).Times(1) + case "StopSessions": + mac.EXPECT().StopSessions(playbackId).Times(1) + } + + req, _ := http.NewRequest("POST", "/receiveUserEvent", strings.NewReader(tt.requestBody)) + rr := httptest.NewRecorder() + router.ServeHTTP(rr, req) + + require.Equal(rr.Result().StatusCode, 200) + }) + } +} diff --git a/handlers/geolocation/geolocation.go b/handlers/geolocation/geolocation.go index b6ce64f4d..0e5484b79 100644 --- a/handlers/geolocation/geolocation.go +++ b/handlers/geolocation/geolocation.go @@ -67,19 +67,19 @@ func (l *streamPullRateLimit) mark(playbackID string) { type GeolocationHandlersCollection struct { Balancer balancer.Balancer - Cluster cluster.Cluster Config config.Cli Lapi *api.Client streamPullRateLimit *streamPullRateLimit + serfMembersEndpoint string } -func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluster.Cluster, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection { +func NewGeolocationHandlersCollection(balancer balancer.Balancer, config config.Cli, lapi *api.Client, serfMembersEndpoint string) *GeolocationHandlersCollection { return &GeolocationHandlersCollection{ Balancer: balancer, - Cluster: cluster, Config: config, Lapi: lapi, streamPullRateLimit: newStreamPullRateLimit(streamSourceRetryInterval), + serfMembersEndpoint: serfMembersEndpoint, } } @@ -244,7 +244,7 @@ func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string } func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) { - members, err := c.Cluster.MembersFiltered(filter, "", name) + members, err := c.membersFiltered(filter, "", name) if err != nil { return cluster.Member{}, err } @@ -404,7 +404,7 @@ func (c *GeolocationHandlersCollection) getStreamPull(playbackID string, retryCo } func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID string, region string) { - members, err := c.Cluster.MembersFiltered(map[string]string{"region": region}, "", "") + members, err := c.membersFiltered(map[string]string{"region": region}, "", "") if err != nil || len(members) == 0 { glog.Errorf("Error fetching member list: %v", err) return @@ -422,6 +422,25 @@ func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID stri }() } +func (c *GeolocationHandlersCollection) membersFiltered(filter map[string]string, status, name string) ([]cluster.Member, error) { + resp, err := http.Get(c.serfMembersEndpoint) + if err != nil { + return []cluster.Member{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return []cluster.Member{}, fmt.Errorf("failed to get members: %s", resp.Status) + } + var members []cluster.Member + if err := json.NewDecoder(resp.Body).Decode(&members); err != nil { + return []cluster.Member{}, err + } + + res, err := cluster.FilterMembers(members, filter, status, name) + return res, err +} + func parsePlus(plusString string) (string, string) { slice := strings.Split(plusString, "+") prefix := "" diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index da5578d68..52d789699 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -2,6 +2,7 @@ package geolocation import ( "context" + "encoding/json" "errors" "fmt" "math/rand" @@ -17,7 +18,6 @@ import ( "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/metrics" mockbalancer "github.com/livepeer/catalyst-api/mocks/balancer" - mockcluster "github.com/livepeer/catalyst-api/mocks/cluster" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" ) @@ -30,7 +30,7 @@ const ( ) var fakeSerfMember = cluster.Member{ - Name: "fake-serf-member", + Name: "someurl.com", Tags: map[string]string{ "http": fmt.Sprintf("http://%s", closestNodeAddr), "https": fmt.Sprintf("https://%s", closestNodeAddr), @@ -142,7 +142,6 @@ func getHLSURLsWithSeg(proto, host, seg, query string) []string { func mockHandlers(t *testing.T) *GeolocationHandlersCollection { ctrl := gomock.NewController(t) mb := mockbalancer.NewMockBalancer(ctrl) - mc := mockcluster.NewMockCluster(ctrl) mb.EXPECT(). GetBestNode(context.Background(), prefixes[:], playbackID, "", "", "", gomock.Any()). AnyTimes(). @@ -158,14 +157,21 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { AnyTimes(). Return("", "", errors.New("")) - mc.EXPECT(). - MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr). - AnyTimes(). - Return([]cluster.Member{fakeSerfMember}, nil) + router := httprouter.New() + router.GET("/api/serf/members", func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + w.Header().Set("Content-Type", "application/json") + res, err := json.Marshal([]cluster.Member{fakeSerfMember}) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(res) // nolint:errcheck + }) + testServer := httptest.NewServer(router) coll := GeolocationHandlersCollection{ - Balancer: mb, - Cluster: mc, + Balancer: mb, + serfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL), Config: config.Cli{ RedirectPrefixes: prefixes[:], }, diff --git a/main.go b/main.go index 7a70b0b1f..be55d6cfa 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,16 @@ package main import ( + "bytes" "context" "crypto/rsa" "database/sql" "flag" "fmt" "log" + "net/http" "os" "os/signal" - "strings" "syscall" "time" @@ -49,6 +50,8 @@ func main() { version := fs.Bool("version", false, "print application version") + fs.StringVar(&cli.Mode, "mode", "all", "Mode to run the application in. Options: all, cluster-only, api-only") + // listen addresses config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling") config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands") @@ -127,6 +130,8 @@ func main() { fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username") fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password") fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs") + fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster") + fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api") pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port") fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed") @@ -174,166 +179,177 @@ func main() { return } - // TODO: I don't love the global variables for these - config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs - config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs - config.HTTPInternalAddress = cli.HTTPInternalAddress - var ( metricsDB *sql.DB + vodEngine *pipeline.Coordinator + mapic mistapiconnector.IMac + bal balancer.Balancer + broker misttriggers.TriggerBroker + mist clients.MistAPIClient + c cluster.Cluster ) - // Kick off the callback client, to send job update messages on a regular interval - headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)} - statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start() + // Initialize root context; cancelling this prompts all components to shut down cleanly + group, ctx := errgroup.WithContext(context.Background()) + mistBalancerConfig := &balancer.Config{ + Args: cli.BalancerArgs, + MistUtilLoadPort: uint32(cli.MistLoadBalancerPort), + MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate, + MistHost: cli.MistHost, + MistPort: cli.MistPort, + NodeName: cli.NodeName, + OwnRegion: cli.OwnRegion, + OwnRegionTagAdjust: cli.OwnRegionTagAdjust, + } + broker = misttriggers.NewTriggerBroker() - // Emit high-cardinality metrics to a Postrgres database if configured - if cli.MetricsDBConnectionString != "" { - metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString) - if err != nil { - glog.Fatalf("Error creating postgres metrics connection: %v", err) - } + catalystApiURL := cli.CatalystApiURL + if catalystApiURL == "" { + catalystApiURL = cli.OwnInternalURL() + } + serfMembersEndpoint := cli.SerfMembersEndpoint + if serfMembersEndpoint == "" { + serfMembersEndpoint = cli.OwnInternalURL() + "/api/serf/members" + } - // Without this, we've run into issues with exceeding our open connection limit - metricsDB.SetMaxOpenConns(2) - metricsDB.SetMaxIdleConns(2) - metricsDB.SetConnMaxLifetime(time.Hour) - } else { - glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.") + if cli.MistEnabled { + mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort) } - var vodDecryptPrivateKey *rsa.PrivateKey + if cli.IsClusterMode() { + c = cluster.NewCluster(&cli) + group.Go(func() error { + return c.Start(ctx) + }) - if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" { - vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey) - if err != nil { - glog.Fatalf("Error loading vod decrypt private key: %v", err) - } - isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey) - if !isValidKeyPair || err != nil { - glog.Fatalf("Invalid vod decrypt key pair") + group.Go(func() error { + serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", catalystApiURL) + return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) + }) + + bal = mist_balancer.NewLocalBalancer(mistBalancerConfig) + group.Go(func() error { + return bal.Start(ctx) + }) + group.Go(func() error { + return reconcileBalancer(ctx, bal, c) + }) + } else { + bal = mist_balancer.NewRemoteBalancer(mistBalancerConfig) + if balancer.CombinedBalancerEnabled(cli.CataBalancer) { + cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout) + // Temporary combined balancer to test cataBalancer logic alongside existing mist balancer + bal = balancer.NewCombinedBalancer(cataBalancer, bal, cli.CataBalancer) + + if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes + events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist) + } } } - c2, err := createC2PA(&cli) - if err != nil { - // Log warning, but still start without C2PA signing - glog.Warning(err) - } - // Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline - // or an external one - vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) - if err != nil { - glog.Fatalf("Error creating VOD pipeline coordinator: %v", err) - } + if cli.IsApiMode() { + // TODO: I don't love the global variables for these + config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs + config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs + config.HTTPInternalAddress = cli.HTTPInternalAddress + + // Kick off the callback client, to send job update messages on a regular interval + headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)} + statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start() + + // Emit high-cardinality metrics to a Postrgres database if configured + if cli.MetricsDBConnectionString != "" { + metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString) + if err != nil { + glog.Fatalf("Error creating postgres metrics connection: %v", err) + } + + // Without this, we've run into issues with exceeding our open connection limit + metricsDB.SetMaxOpenConns(2) + metricsDB.SetMaxIdleConns(2) + metricsDB.SetConnMaxLifetime(time.Hour) + } else { + glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.") + } - // Start cron style apps to run periodically - if cli.ShouldMistCleanup() { - app := "mist-cleanup.sh" - // schedule mist-cleanup every 2hrs with a timeout of 15min - mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app) + var vodDecryptPrivateKey *rsa.PrivateKey + + if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" { + vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey) + if err != nil { + glog.Fatalf("Error loading vod decrypt private key: %v", err) + } + isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey) + if !isValidKeyPair || err != nil { + glog.Fatalf("Invalid vod decrypt key pair") + } + } + + c2, err := createC2PA(&cli) if err != nil { - glog.Info("Failed to shell out:", app, err) + // Log warning, but still start without C2PA signing + glog.Warning(err) } - mistCleanupTick := mistCleanup.RunBg() - defer mistCleanupTick.Stop() - } - if cli.ShouldLogSysUsage() { - app := "pod-mon.sh" - // schedule pod-mon every 5min with timeout of 5s - podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app) + // Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline + // or an external one + vodEngine, err = pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) if err != nil { - glog.Info("Failed to shell out:", app, err) + glog.Fatalf("Error creating VOD pipeline coordinator: %v", err) } - podMonTick := podMon.RunBg() - defer podMonTick.Stop() - } - broker := misttriggers.NewTriggerBroker() + if cli.ShouldMapic() { + mapic = mistapiconnector.NewMapic(&cli, broker, mist) + group.Go(func() error { + return mapic.Start(ctx) + }) + } + } - var mist clients.MistAPIClient - if cli.MistEnabled { - mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort) - if cli.MistTriggerSetup { - ownURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL()) - err := broker.SetupMistTriggers(mist, ownURL) + if cli.IsClusterMode() { + // Configure Mist Triggers + if cli.MistEnabled && cli.MistTriggerSetup { + mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL) + err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint) if err != nil { glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.") glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist") glog.Fatalf("error setting up Mist triggers err=%s", err) } } - } else { - glog.Info("-no-mist flag detected, not initializing Mist stream triggers") - } - - var mapic mistapiconnector.IMac - if cli.ShouldMapic() { - mapic = mistapiconnector.NewMapic(&cli, broker, mist) - } - - c := cluster.NewCluster(&cli) - - // Start balancer - mistBalancerConfig := &balancer.Config{ - Args: cli.BalancerArgs, - MistUtilLoadPort: uint32(cli.MistLoadBalancerPort), - MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate, - MistHost: cli.MistHost, - MistPort: cli.MistPort, - NodeName: cli.NodeName, - OwnRegion: cli.OwnRegion, - OwnRegionTagAdjust: cli.OwnRegionTagAdjust, - } - mistBalancer := mist_balancer.NewLocalBalancer(mistBalancerConfig) - - bal := mistBalancer - if balancer.CombinedBalancerEnabled(cli.CataBalancer) { - cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout) - // Temporary combined balancer to test cataBalancer logic alongside existing mist balancer - bal = balancer.NewCombinedBalancer(cataBalancer, mistBalancer, cli.CataBalancer) - if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes - events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist) + // Start cron style apps to run periodically + if cli.ShouldMistCleanup() { + app := "mist-cleanup.sh" + // schedule mist-cleanup every 2hrs with a timeout of 15min + mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app) + if err != nil { + glog.Info("Failed to shell out:", app, err) + } + mistCleanupTick := mistCleanup.RunBg() + defer mistCleanupTick.Stop() + } + if cli.ShouldLogSysUsage() { + app := "pod-mon.sh" + // schedule pod-mon every 5min with timeout of 5s + podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app) + if err != nil { + glog.Info("Failed to shell out:", app, err) + } + podMonTick := podMon.RunBg() + defer podMonTick.Stop() } - } - - // Initialize root context; cancelling this prompts all components to shut down cleanly - group, ctx := errgroup.WithContext(context.Background()) - - group.Go(func() error { - return handleSignals(ctx) - }) - - group.Go(func() error { - return api.ListenAndServe(ctx, cli, vodEngine, bal, c, mapic) - }) - - group.Go(func() error { - return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB) - }) - if cli.ShouldMapic() { group.Go(func() error { - return mapic.Start(ctx) + return handleSignals(ctx) }) } group.Go(func() error { - return bal.Start(ctx) - }) - - group.Go(func() error { - return c.Start(ctx) + return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic, serfMembersEndpoint) }) group.Go(func() error { - // TODO these errors cause the app to shut down? - return reconcileBalancer(ctx, bal, c) - }) - - group.Go(func() error { - return handleClusterEvents(ctx, mapic, bal, c) + return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint) }) err = group.Wait() @@ -366,52 +382,36 @@ func reconcileBalancer(ctx context.Context, bal balancer.Balancer, c cluster.Clu } } -func handleClusterEvents(ctx context.Context, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster) error { +func handleClusterEvents(ctx context.Context, callbackEndpoint string, c cluster.Cluster) error { eventCh := c.EventChan() for { select { case <-ctx.Done(): return nil case e := <-eventCh: - processClusterEvent(mapic, bal, e) + processClusterEvent(callbackEndpoint, e) } } } -func processClusterEvent(mapic mistapiconnector.IMac, bal balancer.Balancer, userEvent serf.UserEvent) { +func processClusterEvent(callbackEndpoint string, userEvent serf.UserEvent) { + client := &http.Client{} + glog.V(5).Infof("received serf user event, propagating to %s, event=%s", callbackEndpoint, userEvent.String()) + go func() { - e, err := events.Unmarshal(userEvent.Payload) + req, err := http.NewRequest("POST", callbackEndpoint, bytes.NewBuffer(userEvent.Payload)) if err != nil { - glog.Errorf("cannot unmarshal received serf event %v: %s", userEvent, err) + glog.Errorf("error creating request: %v", err) return } - switch event := e.(type) { - case *events.StreamEvent: - glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID) - mapic.RefreshStreamIfNeeded(event.PlaybackID) - case *events.NukeEvent: - glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID) - mapic.NukeStream(event.PlaybackID) - return - case *events.StopSessionsEvent: - glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID) - mapic.StopSessions(event.PlaybackID) + resp, err := client.Do(req) + if err != nil { + glog.Errorf("error sending request: %v", err) return - case *events.NodeUpdateEvent: - if glog.V(5) { - glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEvent.Payload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ",")) - } - - bal.UpdateNodes(event.NodeID, event.NodeMetrics) - for _, stream := range event.GetStreams() { - bal.UpdateStreams(event.NodeID, stream, false) - } - for _, stream := range event.GetIngestStreams() { - bal.UpdateStreams(event.NodeID, stream, true) - } - default: - glog.Errorf("unsupported serf event: %v", e) } + defer resp.Body.Close() + + glog.V(5).Infof("propagated serf user event to %s, event=%s", callbackEndpoint, userEvent.String()) }() } diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index feec9887b..ac7cc1857 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -1,6 +1,8 @@ //nolint:all package mistapiconnector +//go:generate mockgen -source=./mistapiconnector_app.go -destination=../mocks/mistapiconnector/mistapiconnector_app.go + import ( "context" "errors"