Skip to content

Commit

Permalink
Add option for running catalyst-api as a stateless service
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Jul 24, 2024
1 parent 14a9dc3 commit a6205cf
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 175 deletions.
89 changes: 47 additions & 42 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,58 +84,63 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
),
)
)

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
}

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
if cli.IsClusterMode() {
// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
}

// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
return router
}

Expand Down
10 changes: 10 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Cli struct {
MistUser string
MistPassword string
MistPrometheus string
Mode string
MistPort int
MistConnectTimeout time.Duration
MistStreamSource string
Expand Down Expand Up @@ -72,6 +73,7 @@ type Cli struct {
KafkaPassword string
AnalyticsKafkaTopic string
SerfMembersEndpoint string
CatalystApiURL string

// mapping playbackId to value between 0.0 to 100.0
CdnRedirectPlaybackPct map[string]float64
Expand Down Expand Up @@ -111,6 +113,14 @@ func (cli *Cli) ShouldMapic() bool {
return cli.APIServer != ""
}

func (cli *Cli) IsClusterMode() bool {
return cli.Mode == "cluster-only" || cli.Mode == "all"
}

func (cli *Cli) IsApiMode() bool {
return cli.Mode == "api-only" || cli.Mode == "all"
}

// Should we enable mist-cleanup script to run periodically and delete leaky shm?
func (cli *Cli) ShouldMistCleanup() bool {
return cli.MistCleanup
Expand Down
6 changes: 3 additions & 3 deletions handlers/geolocation/geolocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,10 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection {
testServer := httptest.NewServer(router)

coll := GeolocationHandlersCollection{
Balancer: mb,
Balancer: mb,
serfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL),
Config: config.Cli{
RedirectPrefixes: prefixes[:],
SerfMembersEndpoint: fmt.Sprintf("%s/api/serf/members", testServer.URL),
RedirectPrefixes: prefixes[:],
},
}
return &coll
Expand Down
Loading

0 comments on commit a6205cf

Please sign in to comment.