Skip to content

Commit

Permalink
Add option for running catalyst-api as a stateless service (#1336)
Browse files Browse the repository at this point in the history
From now on, we'll be able to run catalyst-api in 3 modes:
- all: the same as before
- cluster-only: managed MistUtilLoad and Serf only (intended to use inside Catalyst)
- api-only: stateless API only (intended to get deployed separately from Catalyst)
  • Loading branch information
leszko authored Jul 23, 2024
1 parent ac17b28 commit 6f8ea1f
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 163 deletions.
84 changes: 44 additions & 40 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,56 +84,60 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
if cli.IsClusterMode() {
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),

if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
),
)

// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
)

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
}

return router
}
Expand Down
10 changes: 10 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Cli struct {
MistUser string
MistPassword string
MistPrometheus string
Mode string
MistPort int
MistConnectTimeout time.Duration
MistStreamSource string
Expand Down Expand Up @@ -72,6 +73,7 @@ type Cli struct {
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string
CatalystApiURL string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down Expand Up @@ -111,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool {
return cli.APIServer != ""
}

func (cli *Cli) IsClusterMode() bool {
return cli.Mode == "cluster-only" || cli.Mode == "all"
}

func (cli *Cli) IsApiMode() bool {
return cli.Mode == "api-only" || cli.Mode == "all"
}

// Should we enable mist-cleanup script to run periodically and delete leaky shm?
func (cli *Cli) ShouldMistCleanup() bool {
return cli.MistCleanup
Expand Down
Loading

0 comments on commit 6f8ea1f

Please sign in to comment.