Skip to content

Commit

Permalink
Proxy Mist triggers from catalyst to catalyst-api and remove dynamic …
Browse files Browse the repository at this point in the history
…Mist Trigger Setup
  • Loading branch information
leszko committed Aug 21, 2024
1 parent 2632451 commit 861f99c
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 56 deletions.
13 changes: 9 additions & 4 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"database/sql"
"fmt"
"net/http"
"net/http/httptest"
"time"
Expand All @@ -29,8 +30,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint, catalystApiURL)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +57,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand Down Expand Up @@ -128,6 +129,10 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
} else {
// Endpoint to receive "Triggers" (callbacks) from Mist and redirect them to the standalone Catalyst API
mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL)
router.POST("/api/mist/trigger", withLogging(handlers.ProxyRequest(mistTriggerHandlerEndpoint)))
}

metricsHandlers = append(metricsHandlers, promhttp.Handler())
Expand All @@ -142,7 +147,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
} else {
router.POST("/api/events", withLogging(eventsHandler.ProxyEvents()))
router.POST("/api/events", withLogging(handlers.ProxyRequest(eventsEndpoint)))
}

return router
Expand Down
25 changes: 0 additions & 25 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,31 +79,6 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}

// ProxyEvents is a handler of Catalyst API called by Studio API.
// It proxies the requests to Catalyst.
func (d *EventsHandlersCollection) ProxyEvents() httprouter.Handle {
// Proxy the request to d.eventsEndpoint
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// Create a new request to the target endpoint
proxyReq, err := http.NewRequest(req.Method, d.eventsEndpoint, req.Body)
if err != nil {
glog.Errorf("Cannot create proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err)
return
}

// Send the request to the target endpoint
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
glog.Errorf("Cannot send proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err)
return
}
defer resp.Body.Close()
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
Expand Down
14 changes: 0 additions & 14 deletions handlers/misttriggers/trigger_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package misttriggers

import (
"context"
"fmt"
"sync"

"github.com/golang/glog"
"github.com/livepeer/catalyst-api/clients"
"golang.org/x/sync/errgroup"
)

Expand All @@ -29,8 +27,6 @@ import (
// handler for these sorts of triggers.

type TriggerBroker interface {
SetupMistTriggers(clients.MistAPIClient, string) error

OnStreamBuffer(func(context.Context, *StreamBufferPayload) error)
TriggerStreamBuffer(context.Context, *StreamBufferPayload)

Expand Down Expand Up @@ -87,16 +83,6 @@ var triggers = map[string]bool{
TRIGGER_STREAM_SOURCE: true,
}

func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient, triggerCallback string) error {
for name, sync := range triggers {
err := mist.AddTrigger([]string{}, name, triggerCallback, sync)
if err != nil {
return fmt.Errorf("error setting up mist trigger trigger=%s error=%w", name, err)
}
}
return nil
}

func (b *triggerBroker) OnStreamBuffer(cb func(context.Context, *StreamBufferPayload) error) {
b.streamBufferFuncs.RegisterNoResponse(cb)
}
Expand Down
48 changes: 48 additions & 0 deletions handlers/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package handlers

import (
"io"
"net/http"

"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/errors"
)

// ProxyRequest proxies a request to a target endpoint
func ProxyRequest(targetEndpoint string) httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// Create a new request to the target endpoint
proxyReq, err := http.NewRequest(req.Method, targetEndpoint, req.Body)
if err != nil {
glog.Errorf("Cannot create proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err)
return
}
for k, v := range req.Header {
proxyReq.Header.Set(k, v[0])
}

// Send the request to the target endpoint
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
glog.Errorf("Cannot send proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err)
return
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
glog.Errorf("Cannot read response body: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot read response body", err)
return
}
for k, v := range resp.Header {
w.Header()[k] = v
}
w.WriteHeader(resp.StatusCode)
w.Write(body) // nolint:errcheck
}
}
15 changes: 2 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,6 @@ func main() {
}

if cli.IsClusterMode() {
// Configure Mist Triggers
if cli.MistEnabled && cli.MistTriggerSetup {
mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL)
err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint)
if err != nil {
glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.")
glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist")
glog.Fatalf("error setting up Mist triggers err=%s", err)
}
}

// Start cron style apps to run periodically
if cli.ShouldMistCleanup() {
app := "mist-cleanup.sh"
Expand Down Expand Up @@ -360,14 +349,14 @@ func main() {
})

group.Go(func() error {
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint)
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint, catalystApiURL)
})

err = group.Wait()
glog.Infof("Shutdown complete. Reason for shutdown: %s", err)
}

func resolveCatalystApiURL(cli config.Cli) interface{} {
func resolveCatalystApiURL(cli config.Cli) string {
if cli.CatalystApiURL != "" {
return cli.CatalystApiURL
}
Expand Down

0 comments on commit 861f99c

Please sign in to comment.