Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Serf communication from local to http calls #1343

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
94 changes: 51 additions & 43 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand All @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand All @@ -84,54 +84,62 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
),
)
)

// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))
// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
}

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
if cli.IsClusterMode() {
// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
}

return router
}
Expand Down
27 changes: 19 additions & 8 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) {
}

func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) {
all := c.serf.Members()
nodes := []Member{}
return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name)
}

func toClusterMembers(members []serf.Member) []Member {
var nodes []Member
for _, member := range members {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
return nodes
}

func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) {
var nodes []Member
for _, member := range all {
if status != "" && status != member.Status.String() {
if status != "" && status != member.Status {
continue
}
if name != "" && name != member.Name {
Expand All @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str
}
}
if matches {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
nodes = append(nodes, member)
}
}
return nodes, nil
Expand Down
11 changes: 11 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Cli struct {
MistUser string
MistPassword string
MistPrometheus string
Mode string
MistPort int
MistConnectTimeout time.Duration
MistStreamSource string
Expand Down Expand Up @@ -71,6 +72,8 @@ type Cli struct {
KafkaUser string
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string
CatalystApiURL string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down Expand Up @@ -110,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool {
return cli.APIServer != ""
}

func (cli *Cli) IsClusterMode() bool {
return cli.Mode == "cluster-only" || cli.Mode == "all"
}

func (cli *Cli) IsApiMode() bool {
return cli.Mode == "api-only" || cli.Mode == "all"
}

// Should we enable mist-cleanup script to run periodically and delete leaky shm?
func (cli *Cli) ShouldMistCleanup() bool {
return cli.MistCleanup
Expand Down
78 changes: 72 additions & 6 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,43 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/events"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/xeipuuv/gojsonschema"
"io"
"net/http"
"strings"
)

type EventsHandlersCollection struct {
Cluster cluster.Cluster
cluster cluster.Cluster

mapic mistapiconnector.IMac
bal balancer.Balancer
}

func (d *EventsHandlersCollection) Events() httprouter.Handle {
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}

func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection {
return &EventsHandlersCollection{
cluster: cluster,
mapic: mapic,
bal: bal,
}
}

// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream.
// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent().
func (d *EventsHandlersCollection) Events() httprouter.Handle {
schema := inputSchemasCompiled["Event"]
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
payload, err := io.ReadAll(req.Body)
Expand All @@ -44,7 +62,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
return
}

err = d.Cluster.BroadcastEvent(serf.UserEvent{
err = d.cluster.BroadcastEvent(serf.UserEvent{
Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID),
Payload: payload,
Coalesce: true,
Expand All @@ -56,3 +74,51 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
// 2. Events() handler propagates the event to all Serf nodes
// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler)
// 4. ReceiveUserEvent() handler processes the event
func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
userEventPayload, err := io.ReadAll(r.Body)
if err != nil {
glog.Errorf("cannot read payload: %s", err)
return
}
e, err := events.Unmarshal(userEventPayload)
if err != nil {
glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err)
return
}
switch event := e.(type) {
case *events.StreamEvent:
glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID)
c.mapic.RefreshStreamIfNeeded(event.PlaybackID)
case *events.NukeEvent:
glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID)
c.mapic.NukeStream(event.PlaybackID)
return
case *events.StopSessionsEvent:
glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID)
c.mapic.StopSessions(event.PlaybackID)
return
case *events.NodeUpdateEvent:
if glog.V(5) {
glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ","))
}

c.bal.UpdateNodes(event.NodeID, event.NodeMetrics)
for _, stream := range event.GetStreams() {
c.bal.UpdateStreams(event.NodeID, stream, false)
}
for _, stream := range event.GetIngestStreams() {
c.bal.UpdateStreams(event.NodeID, stream, true)
}
default:
glog.Errorf("unsupported serf event: %v", e)
}
}
}
Loading
Loading