Skip to content

Commit

Permalink
Revert recent PRs (fix Catalyst build failure) (#1338)
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko authored Jul 23, 2024
1 parent 6f8ea1f commit 5eab596
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 411 deletions.
9 changes: 5 additions & 4 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -20,8 +21,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -47,7 +48,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -58,7 +59,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
86 changes: 40 additions & 46 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand All @@ -84,60 +84,54 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

if cli.IsClusterMode() {
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}

if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
)
),
)

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
}
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))

return router
}
Expand Down
27 changes: 8 additions & 19 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,25 +182,10 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) {
}

func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) {
return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name)
}

func toClusterMembers(members []serf.Member) []Member {
var nodes []Member
for _, member := range members {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
return nodes
}

func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) {
var nodes []Member
all := c.serf.Members()
nodes := []Member{}
for _, member := range all {
if status != "" && status != member.Status {
if status != "" && status != member.Status.String() {
continue
}
if name != "" && name != member.Name {
Expand All @@ -215,7 +200,11 @@ func FilterMembers(all []Member, filter map[string]string, status string, name s
}
}
if matches {
nodes = append(nodes, member)
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
}
return nodes, nil
Expand Down
11 changes: 0 additions & 11 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Cli struct {
MistUser string
MistPassword string
MistPrometheus string
Mode string
MistPort int
MistConnectTimeout time.Duration
MistStreamSource string
Expand Down Expand Up @@ -72,8 +71,6 @@ type Cli struct {
KafkaUser string
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string
CatalystApiURL string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down Expand Up @@ -113,14 +110,6 @@ func (cli *Cli) ShouldMapic() bool {
return cli.APIServer != ""
}

func (cli *Cli) IsClusterMode() bool {
return cli.Mode == "cluster-only" || cli.Mode == "all"
}

func (cli *Cli) IsApiMode() bool {
return cli.Mode == "api-only" || cli.Mode == "all"
}

// Should we enable mist-cleanup script to run periodically and delete leaky shm?
func (cli *Cli) ShouldMistCleanup() bool {
return cli.MistCleanup
Expand Down
9 changes: 5 additions & 4 deletions handlers/analytics/log_ext_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@ type ExternalData struct {
type ExternalDataFetcher struct {
streamCache mistapiconnector.IStreamCache
lapi *api.Client

cache map[string]ExternalData
mu sync.RWMutex
lapiCached *mistapiconnector.ApiClientCached
cache map[string]ExternalData
mu sync.RWMutex
}

func NewExternalDataFetcher(streamCache mistapiconnector.IStreamCache, lapi *api.Client) *ExternalDataFetcher {
return &ExternalDataFetcher{
streamCache: streamCache,
lapi: lapi,
lapiCached: mistapiconnector.NewApiClientCached(lapi),
cache: make(map[string]ExternalData),
}
}
Expand Down Expand Up @@ -62,7 +63,7 @@ func (e *ExternalDataFetcher) Fetch(playbackID string) (ExternalData, error) {
}

// Not found in any cache, try querying Studio API to get Stream
stream, streamErr := e.lapi.GetStreamByPlaybackID(playbackID)
stream, streamErr := e.lapiCached.GetStreamByPlaybackID(playbackID)
if streamErr == nil {
return e.extDataFromStream(playbackID, stream)
}
Expand Down
78 changes: 6 additions & 72 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,25 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/events"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/xeipuuv/gojsonschema"
"io"
"net/http"
"strings"
)

type EventsHandlersCollection struct {
cluster cluster.Cluster

mapic mistapiconnector.IMac
bal balancer.Balancer
}

type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
Cluster cluster.Cluster
}

func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection {
return &EventsHandlersCollection{
cluster: cluster,
mapic: mapic,
bal: bal,
func (d *EventsHandlersCollection) Events() httprouter.Handle {
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}
}

// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream.
// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent().
func (d *EventsHandlersCollection) Events() httprouter.Handle {
schema := inputSchemasCompiled["Event"]
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
payload, err := io.ReadAll(req.Body)
Expand All @@ -62,7 +44,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
return
}

err = d.cluster.BroadcastEvent(serf.UserEvent{
err = d.Cluster.BroadcastEvent(serf.UserEvent{
Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID),
Payload: payload,
Coalesce: true,
Expand All @@ -74,51 +56,3 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
// 2. Events() handler propagates the event to all Serf nodes
// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler)
// 4. ReceiveUserEvent() handler processes the event
func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
userEventPayload, err := io.ReadAll(r.Body)
if err != nil {
glog.Errorf("cannot read payload: %s", err)
return
}
e, err := events.Unmarshal(userEventPayload)
if err != nil {
glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err)
return
}
switch event := e.(type) {
case *events.StreamEvent:
glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID)
c.mapic.RefreshStreamIfNeeded(event.PlaybackID)
case *events.NukeEvent:
glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID)
c.mapic.NukeStream(event.PlaybackID)
return
case *events.StopSessionsEvent:
glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID)
c.mapic.StopSessions(event.PlaybackID)
return
case *events.NodeUpdateEvent:
if glog.V(5) {
glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ","))
}

c.bal.UpdateNodes(event.NodeID, event.NodeMetrics)
for _, stream := range event.GetStreams() {
c.bal.UpdateStreams(event.NodeID, stream, false)
}
for _, stream := range event.GetIngestStreams() {
c.bal.UpdateStreams(event.NodeID, stream, true)
}
default:
glog.Errorf("unsupported serf event: %v", e)
}
}
}
Loading

0 comments on commit 5eab596

Please sign in to comment.