Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proxy Mist triggers from catalyst to catalyst-api and remove dynamic Mist Trigger Setup #1362

Merged
merged 2 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"database/sql"
"fmt"
"net/http"
"net/http/httptest"
"time"
Expand All @@ -29,8 +30,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint, catalystApiURL)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +57,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string, catalystApiURL string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand Down Expand Up @@ -128,6 +129,10 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
} else {
// Endpoint to receive "Triggers" (callbacks) from Mist and redirect them to the standalone Catalyst API
mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL)
router.POST("/api/mist/trigger", withLogging(handlers.ProxyRequest(mistTriggerHandlerEndpoint)))
}

metricsHandlers = append(metricsHandlers, promhttp.Handler())
Expand All @@ -142,7 +147,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
} else {
router.POST("/api/events", withLogging(eventsHandler.ProxyEvents()))
router.POST("/api/events", withLogging(handlers.ProxyRequest(eventsEndpoint)))
}

return router
Expand Down
25 changes: 0 additions & 25 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,31 +79,6 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}

// ProxyEvents is a handler of Catalyst API called by Studio API.
// It proxies the requests to Catalyst.
func (d *EventsHandlersCollection) ProxyEvents() httprouter.Handle {
// Proxy the request to d.eventsEndpoint
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// Create a new request to the target endpoint
proxyReq, err := http.NewRequest(req.Method, d.eventsEndpoint, req.Body)
if err != nil {
glog.Errorf("Cannot create proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err)
return
}

// Send the request to the target endpoint
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
glog.Errorf("Cannot send proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err)
return
}
defer resp.Body.Close()
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
Expand Down
25 changes: 0 additions & 25 deletions handlers/misttriggers/trigger_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package misttriggers

import (
"context"
"fmt"
"sync"

"github.com/golang/glog"
"github.com/livepeer/catalyst-api/clients"
"golang.org/x/sync/errgroup"
)

Expand All @@ -29,8 +27,6 @@ import (
// handler for these sorts of triggers.

type TriggerBroker interface {
SetupMistTriggers(clients.MistAPIClient, string) error

OnStreamBuffer(func(context.Context, *StreamBufferPayload) error)
TriggerStreamBuffer(context.Context, *StreamBufferPayload)

Expand Down Expand Up @@ -76,27 +72,6 @@ type triggerBroker struct {
streamSourceFuncs funcGroup[StreamSourcePayload]
}

var triggers = map[string]bool{
TRIGGER_PUSH_END: false,
TRIGGER_PUSH_OUT_START: true,
TRIGGER_PUSH_REWRITE: true,
TRIGGER_STREAM_BUFFER: false,
TRIGGER_LIVE_TRACK_LIST: false,
TRIGGER_USER_NEW: true,
TRIGGER_USER_END: false,
TRIGGER_STREAM_SOURCE: true,
}

func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient, triggerCallback string) error {
for name, sync := range triggers {
err := mist.AddTrigger([]string{}, name, triggerCallback, sync)
if err != nil {
return fmt.Errorf("error setting up mist trigger trigger=%s error=%w", name, err)
}
}
return nil
}

func (b *triggerBroker) OnStreamBuffer(cb func(context.Context, *StreamBufferPayload) error) {
b.streamBufferFuncs.RegisterNoResponse(cb)
}
Expand Down
48 changes: 48 additions & 0 deletions handlers/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package handlers

import (
"io"
"net/http"

"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/errors"
)

// ProxyRequest proxies a request to a target endpoint
func ProxyRequest(targetEndpoint string) httprouter.Handle {
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
// Create a new request to the target endpoint
proxyReq, err := http.NewRequest(req.Method, targetEndpoint, req.Body)
if err != nil {
glog.Errorf("Cannot create proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot create proxy request", err)
return
}
for k, v := range req.Header {
proxyReq.Header.Set(k, v[0])
}

// Send the request to the target endpoint
client := &http.Client{}
resp, err := client.Do(proxyReq)
if err != nil {
glog.Errorf("Cannot send proxy request: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot send proxy request", err)
return
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
glog.Errorf("Cannot read response body: %s", err)
errors.WriteHTTPInternalServerError(w, "Cannot read response body", err)
return
}
for k, v := range resp.Header {
w.Header()[k] = v
}
w.WriteHeader(resp.StatusCode)
w.Write(body) // nolint:errcheck
}
}
15 changes: 2 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,17 +317,6 @@ func main() {
}

if cli.IsClusterMode() {
// Configure Mist Triggers
if cli.MistEnabled && cli.MistTriggerSetup {
mistTriggerHandlerEndpoint := fmt.Sprintf("%s/api/mist/trigger", catalystApiURL)
err := broker.SetupMistTriggers(mist, mistTriggerHandlerEndpoint)
if err != nil {
glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.")
glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist")
glog.Fatalf("error setting up Mist triggers err=%s", err)
}
}

// Start cron style apps to run periodically
if cli.ShouldMistCleanup() {
app := "mist-cleanup.sh"
Expand Down Expand Up @@ -360,14 +349,14 @@ func main() {
})

group.Go(func() error {
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint)
return api.ListenAndServeInternal(ctx, cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, cli.EventsEndpoint, catalystApiURL)
})

err = group.Wait()
glog.Infof("Shutdown complete. Reason for shutdown: %s", err)
}

func resolveCatalystApiURL(cli config.Cli) interface{} {
func resolveCatalystApiURL(cli config.Cli) string {
if cli.CatalystApiURL != "" {
return cli.CatalystApiURL
}
Expand Down
Loading