diff --git a/api/http_internal.go b/api/http_internal.go index 9f3df30b2..8e5efb468 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -84,58 +84,63 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Simple endpoint for healthchecks router.GET("/ok", withLogging(catalystApiHandlers.Ok())) - var metricsHandlers []http.Handler - if cli.ShouldMapic() { - metricsHandlers = append(metricsHandlers, mapic.MetricsHandler()) - } - if cli.MistPrometheus != "" { - // Enable Mist metrics enrichment - metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler()) - } - metricsHandlers = append(metricsHandlers, promhttp.Handler()) - // Hacky combined metrics handler. To be refactored away with mapic. - router.GET("/metrics", concatHandlers(metricsHandlers...)) - - // Public Catalyst API - router.POST("/api/vod", - withLogging( - withAuth( - cli.APIToken, - withCapacityChecking( - vodEngine, - catalystApiHandlers.UploadVOD(), + if cli.IsApiMode() { + var metricsHandlers []http.Handler + if cli.ShouldMapic() { + metricsHandlers = append(metricsHandlers, mapic.MetricsHandler()) + } + if cli.MistPrometheus != "" { + // Enable Mist metrics enrichment + metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler()) + } + metricsHandlers = append(metricsHandlers, promhttp.Handler()) + // Hacky combined metrics handler. To be refactored away with mapic. + router.GET("/metrics", concatHandlers(metricsHandlers...)) + + // Public Catalyst API + router.POST("/api/vod", + withLogging( + withAuth( + cli.APIToken, + withCapacityChecking( + vodEngine, + catalystApiHandlers.UploadVOD(), + ), ), ), - ), - ) + ) + + // Public GET handler to retrieve the public key for vod encryption + router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) - // Public GET handler to retrieve the public key for vod encryption - router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler())) + // Endpoint to receive "Triggers" (callbacks) from Mist + router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger())) - // Endpoint to receive "Triggers" (callbacks) from Mist - router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger())) + // Handler for STREAM_SOURCE triggers + broker.OnStreamSource(geoHandlers.HandleStreamSource) - // Handler for STREAM_SOURCE triggers - broker.OnStreamSource(geoHandlers.HandleStreamSource) + // Handler for USER_NEW triggers + broker.OnUserNew(accessControlHandlers.HandleUserNew) - // Handler for USER_NEW triggers - broker.OnUserNew(accessControlHandlers.HandleUserNew) + // Handler for USER_END triggers. + broker.OnUserEnd(analyticsHandlers.HandleUserEnd) - // Handler for USER_END triggers. - broker.OnUserEnd(analyticsHandlers.HandleUserEnd) + // Endpoint to receive segments and manifests that ffmpeg produces + router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) - // Endpoint to receive segments and manifests that ffmpeg produces - router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile())) + // Handler to forward the user event from Catalyst => Catalyst API + router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) + } - // Temporary endpoint for admin queries - router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) - // Handler to get members Catalyst API => Catalyst - router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler())) - // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst - router.POST("/api/events", withLogging(eventsHandler.Events())) + if cli.IsClusterMode() { + // Temporary endpoint for admin queries + router.GET("/admin/members", withLogging(adminHandlers.MembersHandler())) + // Handler to get members Catalyst API => Catalyst + router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler())) + // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst + router.POST("/api/events", withLogging(eventsHandler.Events())) + } - // Handler to forward the user event from Catalyst => Catalyst API - router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) return router } diff --git a/config/cli.go b/config/cli.go index 418812fea..4243282e9 100644 --- a/config/cli.go +++ b/config/cli.go @@ -25,6 +25,7 @@ type Cli struct { MistUser string MistPassword string MistPrometheus string + Mode string MistPort int MistConnectTimeout time.Duration MistStreamSource string @@ -72,6 +73,7 @@ type Cli struct { KafkaPassword string AnalyticsKafkaTopic string SerfMembersEndpoint string + CatalystApiURL string // mapping playbackId to value between 0.0 to 100.0 CdnRedirectPlaybackPct map[string]float64 @@ -111,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool { return cli.APIServer != "" } +func (cli *Cli) IsClusterMode() bool { + return cli.Mode == "cluster-only" || cli.Mode == "all" +} + +func (cli *Cli) IsApiMode() bool { + return cli.Mode == "api-only" || cli.Mode == "all" +} + // Should we enable mist-cleanup script to run periodically and delete leaky shm? func (cli *Cli) ShouldMistCleanup() bool { return cli.MistCleanup diff --git a/handlers/geolocation/geolocation_test.go b/handlers/geolocation/geolocation_test.go index 94115bdea..52d789699 100644 --- a/handlers/geolocation/geolocation_test.go +++ b/handlers/geolocation/geolocation_test.go @@ -170,10 +170,10 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection { testServer := httptest.NewServer(router) coll := GeolocationHandlersCollection{ - Balancer: mb, + Balancer: mb, + serfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL), Config: config.Cli{ - RedirectPrefixes: prefixes[:], - SerfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL), + RedirectPrefixes: prefixes[:], }, } return &coll diff --git a/main.go b/main.go index 25b47b10f..be55d6cfa 100644 --- a/main.go +++ b/main.go @@ -50,6 +50,8 @@ func main() { version := fs.Bool("version", false, "print application version") + fs.StringVar(&cli.Mode, "mode", "all", "Mode to run the application in. Options: all, cluster-only, api-only") + // listen addresses config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling") config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands") @@ -129,6 +131,7 @@ func main() { fs.StringVar(&cli.KafkaPassword, "kafka-password", "", "Kafka Password") fs.StringVar(&cli.AnalyticsKafkaTopic, "analytics-kafka-topic", "", "Kafka Topic used to send analytics logs") fs.StringVar(&cli.SerfMembersEndpoint, "serf-members-endpoint", "", "Endpoint to get the current members in the cluster") + fs.StringVar(&cli.CatalystApiURL, "catalyst-api-url", "", "Endpoint for externally deployed catalyst-api; if not set, use local catalyst-api") pprofPort := fs.Int("pprof-port", 6061, "Pprof listen port") fs.String("send-audio", "", "[DEPRECATED] ignored, will be removed") @@ -176,172 +179,177 @@ func main() { return } - // TODO: I don't love the global variables for these - config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs - config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs - config.HTTPInternalAddress = cli.HTTPInternalAddress - var ( metricsDB *sql.DB + vodEngine *pipeline.Coordinator + mapic mistapiconnector.IMac + bal balancer.Balancer + broker misttriggers.TriggerBroker + mist clients.MistAPIClient + c cluster.Cluster ) - // Kick off the callback client, to send job update messages on a regular interval - headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)} - statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start() + // Initialize root context; cancelling this prompts all components to shut down cleanly + group, ctx := errgroup.WithContext(context.Background()) + mistBalancerConfig := &balancer.Config{ + Args: cli.BalancerArgs, + MistUtilLoadPort: uint32(cli.MistLoadBalancerPort), + MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate, + MistHost: cli.MistHost, + MistPort: cli.MistPort, + NodeName: cli.NodeName, + OwnRegion: cli.OwnRegion, + OwnRegionTagAdjust: cli.OwnRegionTagAdjust, + } + broker = misttriggers.NewTriggerBroker() - // Emit high-cardinality metrics to a Postrgres database if configured - if cli.MetricsDBConnectionString != "" { - metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString) - if err != nil { - glog.Fatalf("Error creating postgres metrics connection: %v", err) - } + catalystApiURL := cli.CatalystApiURL + if catalystApiURL == "" { + catalystApiURL = cli.OwnInternalURL() + } + serfMembersEndpoint := cli.SerfMembersEndpoint + if serfMembersEndpoint == "" { + serfMembersEndpoint = cli.OwnInternalURL() + "/api/serf/members" + } - // Without this, we've run into issues with exceeding our open connection limit - metricsDB.SetMaxOpenConns(2) - metricsDB.SetMaxIdleConns(2) - metricsDB.SetConnMaxLifetime(time.Hour) - } else { - glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.") + if cli.MistEnabled { + mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort) } - var vodDecryptPrivateKey *rsa.PrivateKey + if cli.IsClusterMode() { + c = cluster.NewCluster(&cli) + group.Go(func() error { + return c.Start(ctx) + }) - if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" { - vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey) - if err != nil { - glog.Fatalf("Error loading vod decrypt private key: %v", err) - } - isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey) - if !isValidKeyPair || err != nil { - glog.Fatalf("Invalid vod decrypt key pair") + group.Go(func() error { + serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", catalystApiURL) + return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) + }) + + bal = mist_balancer.NewLocalBalancer(mistBalancerConfig) + group.Go(func() error { + return bal.Start(ctx) + }) + group.Go(func() error { + return reconcileBalancer(ctx, bal, c) + }) + } else { + bal = mist_balancer.NewRemoteBalancer(mistBalancerConfig) + if balancer.CombinedBalancerEnabled(cli.CataBalancer) { + cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout) + // Temporary combined balancer to test cataBalancer logic alongside existing mist balancer + bal = balancer.NewCombinedBalancer(cataBalancer, bal, cli.CataBalancer) + + if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes + events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist) + } } } - c2, err := createC2PA(&cli) - if err != nil { - // Log warning, but still start without C2PA signing - glog.Warning(err) - } - // Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline - // or an external one - vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) - if err != nil { - glog.Fatalf("Error creating VOD pipeline coordinator: %v", err) - } + if cli.IsApiMode() { + // TODO: I don't love the global variables for these + config.ImportIPFSGatewayURLs = cli.ImportIPFSGatewayURLs + config.ImportArweaveGatewayURLs = cli.ImportArweaveGatewayURLs + config.HTTPInternalAddress = cli.HTTPInternalAddress + + // Kick off the callback client, to send job update messages on a regular interval + headers := map[string]string{"Authorization": fmt.Sprintf("Bearer %s", cli.APIToken)} + statusClient := clients.NewPeriodicCallbackClient(15*time.Second, headers).Start() - // Start cron style apps to run periodically - if cli.ShouldMistCleanup() { - app := "mist-cleanup.sh" - // schedule mist-cleanup every 2hrs with a timeout of 15min - mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app) + // Emit high-cardinality metrics to a Postrgres database if configured + if cli.MetricsDBConnectionString != "" { + metricsDB, err = sql.Open("postgres", cli.MetricsDBConnectionString) + if err != nil { + glog.Fatalf("Error creating postgres metrics connection: %v", err) + } + + // Without this, we've run into issues with exceeding our open connection limit + metricsDB.SetMaxOpenConns(2) + metricsDB.SetMaxIdleConns(2) + metricsDB.SetConnMaxLifetime(time.Hour) + } else { + glog.Info("Postgres metrics connection string was not set, postgres metrics are disabled.") + } + + var vodDecryptPrivateKey *rsa.PrivateKey + + if cli.VodDecryptPrivateKey != "" && cli.VodDecryptPublicKey != "" { + vodDecryptPrivateKey, err = crypto.LoadPrivateKey(cli.VodDecryptPrivateKey) + if err != nil { + glog.Fatalf("Error loading vod decrypt private key: %v", err) + } + isValidKeyPair, err := crypto.ValidateKeyPair(cli.VodDecryptPublicKey, *vodDecryptPrivateKey) + if !isValidKeyPair || err != nil { + glog.Fatalf("Invalid vod decrypt key pair") + } + } + + c2, err := createC2PA(&cli) if err != nil { - glog.Info("Failed to shell out:", app, err) + // Log warning, but still start without C2PA signing + glog.Warning(err) } - mistCleanupTick := mistCleanup.RunBg() - defer mistCleanupTick.Stop() - } - if cli.ShouldLogSysUsage() { - app := "pod-mon.sh" - // schedule pod-mon every 5min with timeout of 5s - podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app) + // Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline + // or an external one + vodEngine, err = pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) if err != nil { - glog.Info("Failed to shell out:", app, err) + glog.Fatalf("Error creating VOD pipeline coordinator: %v", err) } - podMonTick := podMon.RunBg() - defer podMonTick.Stop() - } - broker := misttriggers.NewTriggerBroker() + if cli.ShouldMapic() { + mapic = mistapiconnector.NewMapic(&cli, broker, mist) + group.Go(func() error { + return mapic.Start(ctx) + }) + } + } - var mist clients.MistAPIClient - if cli.MistEnabled { - mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort) - if cli.MistTriggerSetup { - ownURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL()) - err := broker.SetupMistTriggers(mist, ownURL) + if cli.IsClusterMode() { + // Configure Mist Triggers + if cli.MistEnabled && cli.MistTriggerSetup { + mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL) + err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint) if err != nil { glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.") glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist") glog.Fatalf("error setting up Mist triggers err=%s", err) } } - } else { - glog.Info("-no-mist flag detected, not initializing Mist stream triggers") - } - - var mapic mistapiconnector.IMac - if cli.ShouldMapic() { - mapic = mistapiconnector.NewMapic(&cli, broker, mist) - } - - c := cluster.NewCluster(&cli) - // Start balancer - mistBalancerConfig := &balancer.Config{ - Args: cli.BalancerArgs, - MistUtilLoadPort: uint32(cli.MistLoadBalancerPort), - MistLoadBalancerTemplate: cli.MistLoadBalancerTemplate, - MistHost: cli.MistHost, - MistPort: cli.MistPort, - NodeName: cli.NodeName, - OwnRegion: cli.OwnRegion, - OwnRegionTagAdjust: cli.OwnRegionTagAdjust, - } - mistBalancer := mist_balancer.NewLocalBalancer(mistBalancerConfig) - - bal := mistBalancer - if balancer.CombinedBalancerEnabled(cli.CataBalancer) { - cataBalancer := catabalancer.NewBalancer(cli.NodeName, cli.CataBalancerMetricTimeout, cli.CataBalancerIngestStreamTimeout) - // Temporary combined balancer to test cataBalancer logic alongside existing mist balancer - bal = balancer.NewCombinedBalancer(cataBalancer, mistBalancer, cli.CataBalancer) - - if cli.Tags["node"] == "media" { // don't announce load balancing availability for testing nodes - events.StartMetricSending(cli.NodeName, cli.NodeLatitude, cli.NodeLongitude, c, mist) + // Start cron style apps to run periodically + if cli.ShouldMistCleanup() { + app := "mist-cleanup.sh" + // schedule mist-cleanup every 2hrs with a timeout of 15min + mistCleanup, err := middleware.NewShell(2*60*60*time.Second, 15*60*time.Second, app) + if err != nil { + glog.Info("Failed to shell out:", app, err) + } + mistCleanupTick := mistCleanup.RunBg() + defer mistCleanupTick.Stop() + } + if cli.ShouldLogSysUsage() { + app := "pod-mon.sh" + // schedule pod-mon every 5min with timeout of 5s + podMon, err := middleware.NewShell(300*time.Second, 5*time.Second, app) + if err != nil { + glog.Info("Failed to shell out:", app, err) + } + podMonTick := podMon.RunBg() + defer podMonTick.Stop() } - } - - // Initialize root context; cancelling this prompts all components to shut down cleanly - group, ctx := errgroup.WithContext(context.Background()) - - group.Go(func() error { - return handleSignals(ctx) - }) - - serfMembersEndpoint := cli.SerfMembersEndpoint - if serfMembersEndpoint == "" { - serfMembersEndpoint = cli.OwnInternalURL() + "/api/serf/members" - } - - group.Go(func() error { - return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic, serfMembersEndpoint) - }) - - group.Go(func() error { - return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint) - }) - if cli.ShouldMapic() { group.Go(func() error { - return mapic.Start(ctx) + return handleSignals(ctx) }) } group.Go(func() error { - return bal.Start(ctx) - }) - - group.Go(func() error { - return c.Start(ctx) - }) - - group.Go(func() error { - // TODO these errors cause the app to shut down? - return reconcileBalancer(ctx, bal, c) + return api.ListenAndServe(ctx, cli, vodEngine, bal, mapic, serfMembersEndpoint) }) group.Go(func() error { - serfUserEventCallbackEndpoint := fmt.Sprintf("%s/api/serf/receiveUserEvent", cli.OwnInternalURL()) - return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) + return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint) }) err = group.Wait()