Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option for running catalyst-api as a stateless service #1322

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
748be7b
Add flag "mist-mode" to distinguish between running catalyst-api insi…
leszko Jul 11, 2024
95ed216
Restructure code apart (from cluster and balancer) to be run either i…
leszko Jul 11, 2024
1d861a4
Add Mist authentication
leszko Jul 11, 2024
fbc5858
Add 'own-host' flag to specify under which hostname the catalyst-api …
leszko Jul 11, 2024
1dcfbc2
Add flag "mist-mode" to distinguish between running catalyst-api insi…
leszko Jul 11, 2024
9886afa
Restructure code apart (from cluster and balancer) to be run either i…
leszko Jul 12, 2024
f67f939
Add 'own-host' flag to specify under which hostname the catalyst-api …
leszko Jul 11, 2024
2bddf30
Forward serf user events to statelss catalyst-api
leszko Jul 15, 2024
720cc29
Merge remote-tracking branch 'origin/rafal/stateless-catalyst-api' in…
leszko Jul 15, 2024
f31e437
Add /api/serf/members endpoint
leszko Jul 15, 2024
2b19a2b
Merge remote-tracking branch 'origin/main' into rafal/stateless-catal…
leszko Jul 16, 2024
0959a5e
Use /api/serf/members for geolocation instead of the directly accessi…
leszko Jul 16, 2024
f38a65f
Merge branch 'main' of https://github.com/livepeer/dms-api into rafal…
leszko Jul 17, 2024
7f70038
Resolve merge conflicts
leszko Jul 17, 2024
838bc00
Add own-host flag
leszko Jul 17, 2024
63a3f5b
Add mist-mode flag
leszko Jul 17, 2024
c829e30
Get back previous code
leszko Jul 17, 2024
f56b2e1
Add remove usage of MistUtilLoad
leszko Jul 17, 2024
73b9259
Remove /admin/members endpoint
leszko Jul 17, 2024
94c9823
Update comment
leszko Jul 17, 2024
0044c2d
Rename NewBalancer() => NewLocalBalancer()
leszko Jul 17, 2024
a782363
Fix unit tests
leszko Jul 17, 2024
a476e9e
Rename mode flag
leszko Jul 17, 2024
4e1d671
Add serf-user-event-callback flag
leszko Jul 17, 2024
5e30296
Refactor events.go
leszko Jul 17, 2024
289f822
Minor refactor
leszko Jul 17, 2024
52c8727
Refactor
leszko Jul 17, 2024
6e6a11b
Change setting up triggers
leszko Jul 17, 2024
baa4718
Change setting up triggers
leszko Jul 17, 2024
ddbdf53
Revert "Change setting up triggers"
leszko Jul 17, 2024
77d1f0d
Revert "Change setting up triggers"
leszko Jul 17, 2024
8c4c75e
Use InternalHTTP flag instead of OwnHost
leszko Jul 17, 2024
56eac98
Refactor setting up Mist Triggers
leszko Jul 17, 2024
07f888b
Set up triggers only in the cluster mode
leszko Jul 17, 2024
734f4e1
Relax condition on parsing httpaddr flag
leszko Jul 17, 2024
96933e8
Removed unused vars
leszko Jul 17, 2024
11db23d
Fix cluster-only mode
leszko Jul 17, 2024
964a754
Merge branch 'main' into rafal/stateless-catalyst-api-2
leszko Jul 18, 2024
a971f2c
Add unit tests for events_test.go
leszko Jul 18, 2024
1658eb1
Add flag 'serf-members-endpoint'
leszko Jul 18, 2024
e360829
Merge branch 'main' into rafal/stateless-catalyst-api-2
leszko Jul 18, 2024
8456466
Fix lint
leszko Jul 18, 2024
9e8d325
Add flag to specify Mist Trigger Handler endpoint
leszko Jul 19, 2024
7b5d4cb
Use different APIs depending on the mode
leszko Jul 19, 2024
5d189b8
Update flag
leszko Jul 19, 2024
cc8bba4
Update flag
leszko Jul 19, 2024
271ca50
Update
leszko Jul 19, 2024
0ac472a
Remove most libraries
leszko Jul 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .github/workflows/docker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
name: Build and publish catalyst-api docker images

on:
pull_request:
push:
branches:
- main
- dev
tags:
- "v*"

concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

jobs:
docker:
name: Build and publish docker image
runs-on: ubuntu-latest
permissions:
packages: write
contents: read

steps:
- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2

- name: Login to DockerHub
uses: docker/login-action@v2
with:
username: ${{ secrets.CI_DOCKERHUB_USERNAME }}
password: ${{ secrets.CI_DOCKERHUB_TOKEN }}

- name: Log in to the Container registry
uses: docker/login-action@v2
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ github.token }}

- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@v4
with:
images: |
livepeerci/${{ github.event.repository.name }}
ghcr.io/${{ github.repository }}
tags: |
type=sha
type=ref,event=pr
type=ref,event=tag
type=sha,format=long
type=ref,event=branch
type=semver,pattern={{version}},prefix=v
type=semver,pattern={{major}}.{{minor}},prefix=v
type=raw,value=latest,enable={{is_default_branch}}
type=raw,value=${{ github.event.pull_request.head.ref }}
type=raw,value=stable,enable=${{ startsWith(github.event.ref, 'refs/tags/v') }}

- name: Build and push
uses: docker/build-push-action@v4
with:
platforms: linux/amd64, linux/arm64
push: true
build-args: |
GIT_VERSION=${{ github.ref_type == 'tag' && github.ref_name || github.event.pull_request.head.sha || github.sha }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max

- name: Notify new build upload
run: curl -X POST https://holy-bread-207a.livepeer.workers.dev
44 changes: 44 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
FROM golang:1-bullseye as gobuild

ARG TARGETARCH

# Download c2patool needed to sign our C2PA manifest
# We download it from any of our previous builds, because building c2patool from source is very slow with QEMU
RUN apt update && apt install -yqq \
curl \
ca-certificates \
&& curl https://build.livepeer.live/c2patool/0.6.2/c2patool-linux-${TARGETARCH}.tar.gz -o /c2patool.tar.gz \
&& tar xzf /c2patool.tar.gz

WORKDIR /src

ADD go.mod go.sum ./
RUN go mod download

ADD . .
RUN make build

ARG GIT_VERSION
ENV GIT_VERSION="${GIT_VERSION}"

FROM ubuntu:22.04 AS catalyst

ENV DEBIAN_FRONTEND=noninteractive

LABEL maintainer="Amritanshu Varshney <[email protected]>"

ARG BUILD_TARGET

RUN apt update && apt install -yqq wget software-properties-common

RUN apt update && apt install -yqq \
curl \
ca-certificates \
procps \
vnstat \
&& rm -rf /var/lib/apt/lists/*

COPY --from=gobuild /src/build/catalyst-api /bin/catalyst-api
COPY --from=gobuild /go/c2patool /bin/

CMD ["/bin/catalyst-api"]
9 changes: 4 additions & 5 deletions api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/golang/glog"
"github.com/julienschmidt/httprouter"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
"github.com/livepeer/catalyst-api/handlers"
"github.com/livepeer/catalyst-api/handlers/analytics"
Expand All @@ -21,8 +20,8 @@ import (
"github.com/livepeer/go-api-client"
)

func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, c, mapic)
func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) error {
router := NewCatalystAPIRouter(cli, vodEngine, bal, mapic)
server := http.Server{Addr: cli.HTTPAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -48,7 +47,7 @@ func ListenAndServe(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coo
return server.Shutdown(ctx)
}

func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, c cluster.Cluster, mapic mistapiconnector.IMac) *httprouter.Router {
func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal balancer.Balancer, mapic mistapiconnector.IMac) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withCORS := middleware.AllowCORS()
Expand All @@ -59,7 +58,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
85 changes: 45 additions & 40 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := &handlers.EventsHandlersCollection{Cluster: c}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand All @@ -84,54 +84,59 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
if cli.IsClusterMode() {
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),

if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
withLogging(
withAuth(
cli.APIToken,
withCapacityChecking(
vodEngine,
catalystApiHandlers.UploadVOD(),
),
),
),
),
)

// Public handler to propagate an event to all Catalyst nodes
router.POST("/api/events", withLogging(eventsHandler.Events()))
)

// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))
// Handlers for remote serf interaction
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))

// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))
// Public GET handler to retrieve the public key for vod encryption
router.GET("/api/pubkey", withLogging(encryptionHandlers.PublicKeyHandler()))

// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)
// Endpoint to receive "Triggers" (callbacks) from Mist
router.POST("/api/mist/trigger", withLogging(mistCallbackHandlers.Trigger()))

// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)
// Handler for STREAM_SOURCE triggers
broker.OnStreamSource(geoHandlers.HandleStreamSource)

// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)
// Handler for USER_NEW triggers
broker.OnUserNew(accessControlHandlers.HandleUserNew)

// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
// Handler for USER_END triggers.
broker.OnUserEnd(analyticsHandlers.HandleUserEnd)

// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Endpoint to receive segments and manifests that ffmpeg produces
router.POST("/api/ffmpeg/:id/:filename", withLogging(ffmpegSegmentingHandlers.NewFile()))
}

return router
}
Expand Down
21 changes: 13 additions & 8 deletions balancer/mist/mist_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,32 @@ var mistUtilLoadLoopTimeout = 2 * time.Minute

type MistBalancer struct {
config *balancer.Config
cmd *exec.Cmd
endpoint string
// Blocks until initial startup
startupOnce sync.Once
startupError error
}

// create a new load balancer instance
func NewBalancer(config *balancer.Config) balancer.Balancer {
func NewLocalBalancer(config *balancer.Config) balancer.Balancer {
_, err := exec.LookPath("MistUtilLoad")
if err != nil {
glog.Warning("MistUtilLoad not found, not doing meaningful balancing")
return &balancer.BalancerStub{}
}
return &MistBalancer{
config: config,
cmd: nil,
endpoint: fmt.Sprintf("http://127.0.0.1:%d", config.MistUtilLoadPort),
}
}

func NewRemoteBalancer(config *balancer.Config) balancer.Balancer {
return &MistBalancer{
config: config,
endpoint: fmt.Sprintf("http://%s:%d", config.MistHost, config.MistUtilLoadPort),
}
}

// start this load balancer instance, execing MistUtilLoad if necessary
func (b *MistBalancer) Start(ctx context.Context) error {
b.killPreviousBalancer(ctx)
Expand Down Expand Up @@ -288,17 +293,17 @@ func (b *MistBalancer) isBalancerRunning(ctx context.Context) bool {
func (b *MistBalancer) execBalancer(ctx context.Context, balancerArgs []string) error {
args := append(balancerArgs, "-p", fmt.Sprintf("%d", b.config.MistUtilLoadPort), "-g", "4")
glog.Infof("Running MistUtilLoad with %v", args)
b.cmd = exec.CommandContext(ctx, "MistUtilLoad", args...)
cmd := exec.CommandContext(ctx, "MistUtilLoad", args...)

b.cmd.Stdout = os.Stdout
b.cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

err := b.cmd.Start()
err := cmd.Start()
if err != nil {
return err
}

err = b.cmd.Wait()
err = cmd.Wait()
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion balancer/mist/mist_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ func start(t *testing.T) (*MistBalancer, *mockMistUtilLoad) {
OwnRegion: "fra",
OwnRegionTagAdjust: 1000,
},
cmd: nil,
endpoint: mul.Server.URL,
}
// Mock startup loop
Expand Down
30 changes: 14 additions & 16 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,27 @@ type MistAPIClient interface {
DeleteStream(streamName string) error
NukeStream(streamName string) error
StopSessions(streamName string) error
AddTrigger(streamName []string, triggerName string, sync bool) error
AddTrigger(streamName []string, triggerName, triggerCallback string, sync bool) error
DeleteTrigger(streamName []string, triggerName string) error
GetStreamInfo(streamName string) (MistStreamInfo, error)
GetState() (MistState, error)
}

type MistClient struct {
ApiUrl string
Username string
Password string
HttpReqUrl string
TriggerCallback string
configMu sync.Mutex
cache *cache.Cache
ApiUrl string
Username string
Password string
HttpReqUrl string
configMu sync.Mutex
cache *cache.Cache
}

func NewMistAPIClient(user, password, host string, port int, ownURL string) MistAPIClient {
func NewMistAPIClient(user, password, host string, port int) MistAPIClient {
mist := &MistClient{
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
TriggerCallback: ownURL,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
}
return mist
}
Expand Down Expand Up @@ -310,15 +308,15 @@ func (mc *MistClient) StopSessions(streamName string) error {
// 3. Add a new trigger (or update the existing one)
// 4. Override the triggers
// 5. Release the lock
func (mc *MistClient) AddTrigger(streamNames []string, triggerName string, sync bool) error {
func (mc *MistClient) AddTrigger(streamNames []string, triggerName, triggerCallback string, sync bool) error {
mc.configMu.Lock()
defer mc.configMu.Unlock()

triggers, err := mc.getCurrentTriggers()
if err != nil {
return err
}
c := commandAddTrigger(streamNames, triggerName, mc.TriggerCallback, triggers, sync)
c := commandAddTrigger(streamNames, triggerName, triggerCallback, triggers, sync)
resp, err := mc.sendCommand(c)
return validateAddTrigger(streamNames, triggerName, resp, err, sync)
}
Expand Down
Loading
Loading