diff --git a/api/http_internal.go b/api/http_internal.go index 4ed6cb02..30589e5b 100644 --- a/api/http_internal.go +++ b/api/http_internal.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "database/sql" + "fmt" "net/http" "net/http/httptest" "time" @@ -29,8 +30,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) error { - router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint) +func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) error { + router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint, catalystApiURL) server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router} ctx, cancel := context.WithCancel(ctx) @@ -56,7 +57,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe return server.Shutdown(ctx) } -func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) *httprouter.Router { +func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) *httprouter.Router { router := httprouter.New() withLogging := middleware.LogRequest() withAuth := middleware.IsAuthorized @@ -128,6 +129,10 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Handler to forward the user event from Catalyst => Catalyst API router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent())) + } else { + // Endpoint to receive "Triggers" (callbacks) from Mist and redirect them to the standalone Catalyst API + mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL) + router.POST("/api/mist/trigger", withLogging(handlers.ProxyRequest(mistTriggerHandlerEndpoint))) } metricsHandlers = append(metricsHandlers, promhttp.Handler()) @@ -142,7 +147,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato // Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst router.POST("/api/events", withLogging(eventsHandler.Events())) } else { - router.POST("/api/events", withLogging(eventsHandler.ProxyEvents())) + router.POST("/api/events", withLogging(handlers.ProxyRequest(eventsEndpoint))) } return router diff --git a/handlers/events.go b/handlers/events.go index 722483d4..8884fb49 100644 --- a/handlers/events.go +++ b/handlers/events.go @@ -79,31 +79,6 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle { } } -// ProxyEvents is a handler of Catalyst API called by Studio API. -// It proxies the requests to Catalyst. -func (d *EventsHandlersCollection) ProxyEvents() httprouter.Handle { - // Proxy the request to d.eventsEndpoint - return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { - // Create a new request to the target endpoint - proxyReq, err := http.NewRequest(req.Method, d.eventsEndpoint, req.Body) - if err != nil { - glog.Errorf("Cannot create proxy request: %s", err) - errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err) - return - } - - // Send the request to the target endpoint - client := &http.Client{} - resp, err := client.Do(proxyReq) - if err != nil { - glog.Errorf("Cannot send proxy request: %s", err) - errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err) - return - } - defer resp.Body.Close() - } -} - // ReceiveUserEvent is a handler to receive Serf events from Catalyst. // The idea is that: // 1. Studio API sends an event to Catalyst (received by Events() handler) diff --git a/handlers/misttriggers/trigger_broker.go b/handlers/misttriggers/trigger_broker.go index ff40cc7b..50b460c4 100644 --- a/handlers/misttriggers/trigger_broker.go +++ b/handlers/misttriggers/trigger_broker.go @@ -2,11 +2,9 @@ package misttriggers import ( "context" - "fmt" "sync" "github.com/golang/glog" - "github.com/livepeer/catalyst-api/clients" "golang.org/x/sync/errgroup" ) @@ -29,8 +27,6 @@ import ( // handler for these sorts of triggers. type TriggerBroker interface { - SetupMistTriggers(clients.MistAPIClient, string) error - OnStreamBuffer(func(context.Context, *StreamBufferPayload) error) TriggerStreamBuffer(context.Context, *StreamBufferPayload) @@ -76,27 +72,6 @@ type triggerBroker struct { streamSourceFuncs funcGroup[StreamSourcePayload] } -var triggers = map[string]bool{ - TRIGGER_PUSH_END: false, - TRIGGER_PUSH_OUT_START: true, - TRIGGER_PUSH_REWRITE: true, - TRIGGER_STREAM_BUFFER: false, - TRIGGER_LIVE_TRACK_LIST: false, - TRIGGER_USER_NEW: true, - TRIGGER_USER_END: false, - TRIGGER_STREAM_SOURCE: true, -} - -func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient, triggerCallback string) error { - for name, sync := range triggers { - err := mist.AddTrigger([]string{}, name, triggerCallback, sync) - if err != nil { - return fmt.Errorf("error setting up mist trigger trigger=%s error=%w", name, err) - } - } - return nil -} - func (b *triggerBroker) OnStreamBuffer(cb func(context.Context, *StreamBufferPayload) error) { b.streamBufferFuncs.RegisterNoResponse(cb) } diff --git a/handlers/proxy.go b/handlers/proxy.go new file mode 100644 index 00000000..a910e209 --- /dev/null +++ b/handlers/proxy.go @@ -0,0 +1,48 @@ +package handlers + +import ( + "io" + "net/http" + + "github.com/golang/glog" + "github.com/julienschmidt/httprouter" + "github.com/livepeer/catalyst-api/errors" +) + +// ProxyRequest proxies a request to a target endpoint +func ProxyRequest(targetEndpoint string) httprouter.Handle { + return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) { + // Create a new request to the target endpoint + proxyReq, err := http.NewRequest(req.Method, targetEndpoint, req.Body) + if err != nil { + glog.Errorf("Cannot create proxy request: %s", err) + errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err) + return + } + for k, v := range req.Header { + proxyReq.Header.Set(k, v[0]) + } + + // Send the request to the target endpoint + client := &http.Client{} + resp, err := client.Do(proxyReq) + if err != nil { + glog.Errorf("Cannot send proxy request: %s", err) + errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err) + return + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + glog.Errorf("Cannot read response body: %s", err) + errors.WriteHTTPInternalServerError(w, "Cannot read response body", err) + return + } + for k, v := range resp.Header { + w.Header()[k] = v + } + w.WriteHeader(resp.StatusCode) + w.Write(body) // nolint:errcheck + } +} diff --git a/main.go b/main.go index 05507d59..ad5e1255 100644 --- a/main.go +++ b/main.go @@ -317,17 +317,6 @@ func main() { } if cli.IsClusterMode() { - // Configure Mist Triggers - if cli.MistEnabled && cli.MistTriggerSetup { - mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL) - err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint) - if err != nil { - glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.") - glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist") - glog.Fatalf("error setting up Mist triggers err=%s", err) - } - } - // Start cron style apps to run periodically if cli.ShouldMistCleanup() { app := "mist-cleanup.sh" @@ -360,14 +349,14 @@ func main() { }) group.Go(func() error { - return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint) + return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint, catalystApiURL) }) err = group.Wait() glog.Infof("Shutdown complete. Reason for shutdown: %s", err) } -func resolveCatalystApiURL(cli config.Cli) interface{} { +func resolveCatalystApiURL(cli config.Cli) string { if cli.CatalystApiURL != "" { return cli.CatalystApiURL }