Skip to content

Commit

Permalink
Merge branch 'main' into vg/feat/storage-fallback
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Jul 25, 2024
2 parents cac0c5f + 21cbded commit 391709e
Show file tree
Hide file tree
Showing 27 changed files with 3,919 additions and 373 deletions.
76 changes: 76 additions & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Build and publish catalyst-api docker images

on:
pull_request:
push:
branches:
- main
- dev
tags:
- "v*"

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
docker:
name: Build and publish docker image
runs-on: ubuntu-latest
permissions:
packages: write
contents: read

steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.CI_DOCKERHUB_USERNAME }}
password: ${{ secrets.CI_DOCKERHUB_TOKEN }}

- name: Log in to the Container registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v4
with:
images: |
livepeerci/${{ github.event.repository.name }}
ghcr.io/${{ github.repository }}
tags: |
type=sha
type=ref,event=pr
type=ref,event=tag
type=sha,format=long
type=ref,event=branch
type=semver,pattern={{version}},prefix=v
type=semver,pattern={{major}}.{{minor}},prefix=v
type=raw,value=latest,enable={{is_default_branch}}
type=raw,value=${{ github.event.pull_request.head.ref }}
type=raw,value=stable,enable=${{ startsWith(github.event.ref, 'refs/tags/v') }}
- name: Build and push
uses: docker/build-push-action@v4
with:
platforms: linux/amd64, linux/arm64
push: true
build-args: |
GIT_VERSION=${{ github.ref_type == 'tag' && github.ref_name || github.event.pull_request.head.sha || github.sha }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Notify new build upload
run: curl -X POST https://holy-bread-207a.livepeer.workers.dev
35 changes: 35 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
FROM golang:1-bullseye as gobuild

ARG TARGETARCH

WORKDIR /src

ADD go.mod go.sum ./
RUN go mod download

ADD . .
RUN make build

ARG GIT_VERSION
ENV GIT_VERSION="${GIT_VERSION}"

FROM ubuntu:22.04 AS catalyst

ENV DEBIAN_FRONTEND=noninteractive

LABEL maintainer="Amritanshu Varshney <[email protected]>"

ARG BUILD_TARGET

RUN apt update && apt install -yqq wget software-properties-common

RUN apt update && apt install -yqq \
curl \
ca-certificates \
procps \
vnstat \
&& rm -rf /var/lib/apt/lists/*

COPY --from=gobuild /src/build/catalyst-api /bin/catalyst-api

CMD ["/bin/catalyst-api"]
9 changes: 4 additions & 5 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
94 changes: 51 additions & 43 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand All @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi, serfMembersEndpoint)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand All @@ -84,54 +84,62 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
),
)
)

// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))
// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
// Handler to forward the user event from Catalyst => Catalyst API
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
}

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
if cli.IsClusterMode() {
// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
}

return router
}
Expand Down
Loading

0 comments on commit 391709e

Please sign in to comment.