Skip to content

Commit

Permalink
Change Serf communication from local to http calls
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Jul 24, 2024
1 parent 7af70c9 commit 14a9dc3
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 76 deletions.
9 changes: 4 additions & 5 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
19 changes: 11 additions & 8 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand All @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand Down Expand Up @@ -109,9 +109,6 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
),
)

// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

Expand All @@ -132,7 +129,13 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
return router
}

Expand Down
27 changes: 19 additions & 8 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) {
}

func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) {
all := c.serf.Members()
nodes := []Member{}
return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name)
}

func toClusterMembers(members []serf.Member) []Member {
var nodes []Member
for _, member := range members {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
return nodes
}

func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) {
var nodes []Member
for _, member := range all {
if status != "" && status != member.Status.String() {
if status != "" && status != member.Status {
continue
}
if name != "" && name != member.Name {
Expand All @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str
}
}
if matches {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
nodes = append(nodes, member)
}
}
return nodes, nil
Expand Down
1 change: 1 addition & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Cli struct {
KafkaUser string
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down
78 changes: 72 additions & 6 deletions handlers/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,43 @@ package handlers
import (
"encoding/json"
"fmt"
"github.com/golang/glog"
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/errors"
"github.com/livepeer/catalyst-api/events"
mistapiconnector "github.com/livepeer/catalyst-api/mapic"
"github.com/xeipuuv/gojsonschema"
"io"
"net/http"
"strings"
)

type EventsHandlersCollection struct {
Cluster cluster.Cluster
cluster cluster.Cluster

mapic mistapiconnector.IMac
bal balancer.Balancer
}

func (d *EventsHandlersCollection) Events() httprouter.Handle {
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
type Event struct {
Resource string `json:"resource"`
PlaybackID string `json:"playback_id"`
}

func NewEventsHandlersCollection(cluster cluster.Cluster, mapic mistapiconnector.IMac, bal balancer.Balancer) *EventsHandlersCollection {
return &EventsHandlersCollection{
cluster: cluster,
mapic: mapic,
bal: bal,
}
}

// Events is a handler called by Studio API to send an event, e.g., to refresh a stream or nuke a stream.
// This event is then propagated to all Serf nodes and then forwarded to catalyst-api and handled by ReceiveUserEvent().
func (d *EventsHandlersCollection) Events() httprouter.Handle {
schema := inputSchemasCompiled["Event"]
return func(w http.ResponseWriter, req *http.Request, _ httprouter.Params) {
payload, err := io.ReadAll(req.Body)
Expand All @@ -44,7 +62,7 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
return
}

err = d.Cluster.BroadcastEvent(serf.UserEvent{
err = d.cluster.BroadcastEvent(serf.UserEvent{
Name: fmt.Sprintf("%s-%s", event.Resource, event.PlaybackID),
Payload: payload,
Coalesce: true,
Expand All @@ -56,3 +74,51 @@ func (d *EventsHandlersCollection) Events() httprouter.Handle {
}
}
}

// ReceiveUserEvent is a handler to receive Serf events from Catalyst.
// The idea is that:
// 1. Studio API sends an event to Catalyst (received by Events() handler)
// 2. Events() handler propagates the event to all Serf nodes
// 3. Each Serf node sends tne event to its corresponding catalyst-api instance (to the ReceiveUserEvent() handler)
// 4. ReceiveUserEvent() handler processes the event
func (c *EventsHandlersCollection) ReceiveUserEvent() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
userEventPayload, err := io.ReadAll(r.Body)
if err != nil {
glog.Errorf("cannot read payload: %s", err)
return
}
e, err := events.Unmarshal(userEventPayload)
if err != nil {
glog.Errorf("cannot unmarshal received serf event %v: %s", userEventPayload, err)
return
}
switch event := e.(type) {
case *events.StreamEvent:
glog.V(5).Infof("received serf StreamEvent: %v", event.PlaybackID)
c.mapic.RefreshStreamIfNeeded(event.PlaybackID)
case *events.NukeEvent:
glog.V(5).Infof("received serf NukeEvent: %v", event.PlaybackID)
c.mapic.NukeStream(event.PlaybackID)
return
case *events.StopSessionsEvent:
glog.V(5).Infof("received serf StopSessionsEvent: %v", event.PlaybackID)
c.mapic.StopSessions(event.PlaybackID)
return
case *events.NodeUpdateEvent:
if glog.V(5) {
glog.Infof("received serf NodeUpdateEvent. Node: %s. Length: %d bytes. Ingest Streams: %v. Non-Ingest Streams: %v", event.NodeID, len(userEventPayload), strings.Join(event.GetIngestStreams(), ","), strings.Join(event.GetStreams(), ","))
}

c.bal.UpdateNodes(event.NodeID, event.NodeMetrics)
for _, stream := range event.GetStreams() {
c.bal.UpdateStreams(event.NodeID, stream, false)
}
for _, stream := range event.GetIngestStreams() {
c.bal.UpdateStreams(event.NodeID, stream, true)
}
default:
glog.Errorf("unsupported serf event: %v", e)
}
}
}
65 changes: 64 additions & 1 deletion handlers/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/hashicorp/serf/serf"
"github.com/julienschmidt/httprouter"
mockcluster "github.com/livepeer/catalyst-api/mocks/cluster"
mock_mistapiconnector "github.com/livepeer/catalyst-api/mocks/mistapiconnector"
"github.com/stretchr/testify/require"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestEventHandler(t *testing.T) {
return nil
}).AnyTimes()

catalystApiHandlers := EventsHandlersCollection{Cluster: mc}
catalystApiHandlers := NewEventsHandlersCollection(mc, nil, nil)
router := httprouter.New()
router.POST("/events", catalystApiHandlers.Events())

Expand All @@ -74,3 +75,65 @@ func TestEventHandler(t *testing.T) {
require.Equal(rr.Result().StatusCode, tt.wantHttpCode)
}
}

func TestReceiveUserEventHandler(t *testing.T) {
require := require.New(t)
playbackId := "123456789"

tests := []struct {
name string
requestBody string
functionCalled string
}{
{
name: "Refresh Stream",
requestBody: `{
"resource": "stream",
"playback_id": "123456789"
}`,
functionCalled: "RefreshStreamIfNeeded",
},
{
name: "Nuke Stream",
requestBody: `{
"resource": "nuke",
"playback_id": "123456789"
}`,
functionCalled: "NukeStream",
},
{
name: "Stop Sessions",
requestBody: `{
"resource": "stopSessions",
"playback_id": "123456789"
}`,
functionCalled: "StopSessions",
},
}

ctrl := gomock.NewController(t)
mac := mock_mistapiconnector.NewMockIMac(ctrl)

catalystApiHandlers := NewEventsHandlersCollection(nil, mac, nil)
router := httprouter.New()
router.POST("/receiveUserEvent", catalystApiHandlers.ReceiveUserEvent())

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
switch tt.functionCalled {
case "RefreshStreamIfNeeded":
mac.EXPECT().RefreshStreamIfNeeded(playbackId).Times(1)
case "NukeStream":
mac.EXPECT().NukeStream(playbackId).Times(1)
case "StopSessions":
mac.EXPECT().StopSessions(playbackId).Times(1)
}

req, _ := http.NewRequest("POST", "/receiveUserEvent", strings.NewReader(tt.requestBody))
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)

require.Equal(rr.Result().StatusCode, 200)
})
}
}
Loading

0 comments on commit 14a9dc3

Please sign in to comment.