Skip to content

Commit

Permalink
Merge branch 'main' into mh/bg-ffmpeg
Browse files Browse the repository at this point in the history
  • Loading branch information
mjh1 committed Aug 21, 2024
2 parents 096601b + 2632451 commit 0b869ae
Show file tree
Hide file tree
Showing 25 changed files with 258 additions and 104 deletions.
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
.git/
/docker/*
.dockerignore
*.mp4
*.ts
*Dockerfile*
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
FROM golang:1-bullseye as gobuild
FROM golang:1-bullseye AS gobuild

ARG TARGETARCH
ARG TARGETARCH

WORKDIR /src

ADD go.mod go.sum ./
RUN go mod download

ADD . .
RUN make build

ARG GIT_VERSION
ENV GIT_VERSION="${GIT_VERSION}"

RUN make build GIT_VERSION="${GIT_VERSION}"

FROM ubuntu:22.04 AS catalyst

ENV DEBIAN_FRONTEND=noninteractive
Expand All @@ -30,6 +31,6 @@ RUN apt update && apt install -yqq \
vnstat \
&& rm -rf /var/lib/apt/lists/*

COPY --from=gobuild /src/build/catalyst-api /bin/catalyst-api
COPY --from=gobuild /src/build/catalyst-api /bin/catalyst-api

CMD ["/bin/catalyst-api"]
CMD ["/bin/catalyst-api"]
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
GO_BUILD_DIR?=build/

ldflags := -X 'github.com/livepeer/catalyst-api/config.Version=$(shell git rev-parse HEAD)'
GIT_VERSION?=$(shell git rev-parse HEAD)
ldflags := -X 'github.com/livepeer/catalyst-api/config.Version=$(GIT_VERSION)'

.PHONY: all
all: build fmt test lint integration-test tidy
Expand Down
20 changes: 12 additions & 8 deletions api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint)
func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) error {
router := NewCatalystAPIRouterInternal(cli, vodEngine, mapic, bal, c, broker, metricsDB, serfMembersEndpoint, eventsEndpoint)
server := http.Server{Addr: cli.HTTPInternalAddress, Handler: router}
ctx, cancel := context.WithCancel(ctx)

Expand All @@ -56,7 +56,7 @@ func ListenAndServeInternal(ctx context.Context, cli config.Cli, vodEngine *pipe
return server.Shutdown(ctx)
}

func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint string) *httprouter.Router {
func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinator, mapic mistapiconnector.IMac, bal balancer.Balancer, c cluster.Cluster, broker misttriggers.TriggerBroker, metricsDB *sql.DB, serfMembersEndpoint, eventsEndpoint string) *httprouter.Router {
router := httprouter.New()
withLogging := middleware.LogRequest()
withAuth := middleware.IsAuthorized
Expand All @@ -73,7 +73,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal)
eventsHandler := handlers.NewEventsHandlersCollection(c, mapic, bal, eventsEndpoint)
ffmpegSegmentingHandlers := &ffmpeg.HandlersCollection{VODEngine: vodEngine}
accessControlHandlers := accesscontrol.NewAccessControlHandlersCollection(cli, mapic)
analyticsHandlers := analytics.NewAnalyticsHandler(metricsDB)
Expand All @@ -84,18 +84,16 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
// Simple endpoint for healthchecks
router.GET("/ok", withLogging(catalystApiHandlers.Ok()))

var metricsHandlers []http.Handler

if cli.IsApiMode() {
var metricsHandlers []http.Handler
if cli.ShouldMapic() {
metricsHandlers = append(metricsHandlers, mapic.MetricsHandler())
}
if cli.MistPrometheus != "" {
// Enable Mist metrics enrichment
metricsHandlers = append(metricsHandlers, mapic.MistMetricsHandler())
}
metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

// Public Catalyst API
router.POST("/api/vod",
Expand Down Expand Up @@ -132,13 +130,19 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
router.POST("/api/serf/receiveUserEvent", withLogging(eventsHandler.ReceiveUserEvent()))
}

metricsHandlers = append(metricsHandlers, promhttp.Handler())
// Hacky combined metrics handler. To be refactored away with mapic.
router.GET("/metrics", concatHandlers(metricsHandlers...))

if cli.IsClusterMode() {
// Temporary endpoint for admin queries
router.GET("/admin/members", withLogging(adminHandlers.MembersHandler()))
// Handler to get members Catalyst API => Catalyst
router.GET("/api/serf/members", withLogging(adminHandlers.MembersHandler()))
// Public handler to propagate an event to all Catalyst nodes, execute from Studio API => Catalyst
router.POST("/api/events", withLogging(eventsHandler.Events()))
} else {
router.POST("/api/events", withLogging(eventsHandler.ProxyEvents()))
}

return router
Expand Down
4 changes: 4 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,8 @@ type Config struct {
MistHost string
OwnRegion string
OwnRegionTagAdjust int

ReplaceHostMatch string
ReplaceHostList []string
ReplaceHostPercent int
}
11 changes: 11 additions & 0 deletions balancer/mist/mist_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
"os/exec"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -335,6 +337,8 @@ func (b *MistBalancer) queryMistForClosestNode(ctx context.Context, playbackID,
return node, nil
}

var nodeHostRegex = regexp.MustCompile(`^.+?\.`) // matches the first part of the hostname before the first dot

// return the best node available for a given stream. will return any node if nobody has the stream.
func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []string, playbackID, lat, lon, fallbackPrefix string, isStudioReq bool) (string, string, error) {
var nodeAddr, fullPlaybackID, fallbackAddr string
Expand Down Expand Up @@ -366,6 +370,13 @@ func (b *MistBalancer) GetBestNode(ctx context.Context, redirectPrefixes []strin

// good path: we found the stream and a good node to play it back, yay!
if nodeAddr != "" {
if b.config.ReplaceHostMatch != "" && len(b.config.ReplaceHostList) > 0 && rand.Intn(100) < b.config.ReplaceHostPercent {
// replace the host for a percentage of requests based on the configured replacement list, choosing a random host from that list
if strings.Contains(nodeHostRegex.FindString(nodeAddr), b.config.ReplaceHostMatch) {
nodeAddr = nodeHostRegex.ReplaceAllString(nodeAddr, b.config.ReplaceHostList[rand.Intn(len(b.config.ReplaceHostList))]+".")
}
}

return nodeAddr, fullPlaybackID, nil
}

Expand Down
27 changes: 27 additions & 0 deletions balancer/mist/mist_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,33 @@ func TestGetBestNode(t *testing.T) {
require.Contains(t, []string{"one.example.com", "two.example.com"}, node)
}

func TestGetBestNodeWithReplacement(t *testing.T) {
bal, mul := start(t)
defer mul.Close()

mul.BalancedHosts = map[string]string{
"http://one.example.com:4242": "Online",
"http://two.example.com:4242": "Online",
}
mul.StreamsLive = map[string][]string{"http://one.example.com:4242": {"prefix+fakeid"}}

// stream is live on host "one" but replace this with "two"
bal.config.ReplaceHostMatch = "one"
bal.config.ReplaceHostPercent = 100
bal.config.ReplaceHostList = []string{"two"}

node, streamName, err := bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false)
require.NoError(t, err)
require.Equal(t, streamName, "prefix+fakeid")
require.Contains(t, node, "two.example.com")

// set percent to zero, should not replace
bal.config.ReplaceHostPercent = 0
node, _, err = bal.GetBestNode(context.Background(), []string{"prefix"}, "fakeid", "0", "0", "", false)
require.NoError(t, err)
require.Contains(t, node, "one.example.com")
}

func TestGetBestNodeForWebRTC(t *testing.T) {
const webrtcStreamKey = "webr-tcst-ream-key1"
bal, mul := start(t)
Expand Down
4 changes: 3 additions & 1 deletion clients/broadcaster_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"net/url"

"github.com/livepeer/catalyst-api/log"
)

// Currently only implemented by LocalBroadcasterClient
Expand All @@ -20,7 +22,7 @@ type LocalBroadcasterClient struct {
func NewLocalBroadcasterClient(broadcasterURL string) (BroadcasterClient, error) {
u, err := url.Parse(broadcasterURL)
if err != nil {
return &LocalBroadcasterClient{}, fmt.Errorf("error parsing local broadcaster URL %q: %s", broadcasterURL, err)
return &LocalBroadcasterClient{}, fmt.Errorf("error parsing local broadcaster URL %q: %s", log.RedactURL(broadcasterURL), err)
}
return &LocalBroadcasterClient{
broadcasterURL: *u,
Expand Down
4 changes: 2 additions & 2 deletions clients/callback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,12 @@ func (pcc *PeriodicCallbackClient) doWithRetries(r *http.Request) error {

resp, err := metrics.MonitorRequest(metrics.Metrics.TranscodingStatusUpdate, pcc.httpClient, r)
if err != nil {
return fmt.Errorf("failed to send callback to %q. Error: %s", r.URL.String(), err)
return fmt.Errorf("failed to send callback to %q. Error: %s", r.URL.Redacted(), err)
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
return fmt.Errorf("failed to send callback to %q. HTTP Code: %d", r.URL.String(), resp.StatusCode)
return fmt.Errorf("failed to send callback to %q. HTTP Code: %d", r.URL.Redacted(), resp.StatusCode)
}

return nil
Expand Down
8 changes: 4 additions & 4 deletions clients/input_copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ func (s *InputCopy) CopyInputToS3(requestID string, inputFile, osTransferURL *ur
}
}

log.Log(requestID, "starting probe", "source", inputFile.String(), "dest", osTransferURL.String())
log.Log(requestID, "starting probe", "source", inputFile.Redacted(), "dest", osTransferURL.Redacted())
inputFileProbe, err := s.Probe.ProbeFile(requestID, signedURL, "-analyzeduration", "15000000")
if err != nil {
log.Log(requestID, "probe failed", "err", err, "source", inputFile.String(), "dest", osTransferURL.String())
log.Log(requestID, "probe failed", "err", err, "source", inputFile.Redacted(), "dest", osTransferURL.Redacted())
return video.InputVideo{}, "", fmt.Errorf("error probing MP4 input file from S3: %w", err)
}

log.Log(requestID, "probe succeeded", "source", inputFile.String(), "dest", osTransferURL.String())
log.Log(requestID, "probe succeeded", "source", inputFile.Redacted(), "dest", osTransferURL.Redacted())
videoTrack, err := inputFileProbe.GetTrack(video.TrackTypeVideo)
hasVideoTrack := err == nil
// verify the duration of the video track and don't process if we can't determine duration
Expand Down Expand Up @@ -229,7 +229,7 @@ func CopyAllInputFiles(requestID string, srcInputUrl, dstOutputUrl *url.URL, dec
}
byteCount += size
}
log.Log(requestID, "Copied", "bytes", byteCount, "source", srcInputUrl.String(), "dest", dstOutputUrl.String())
log.Log(requestID, "Copied", "bytes", byteCount, "source", srcInputUrl.Redacted(), "dest", dstOutputUrl.Redacted())
return nil
}

Expand Down
28 changes: 20 additions & 8 deletions clients/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,13 @@ func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *u
return primaryManifestURL, nil
}

playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
playlistURL, playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String())
if err != nil {
return nil, fmt.Errorf("error downloading manifest: %w", err)
}
// if we had to use the backup location for the manifest then we need to write a new playlist
newPlaylistRequired := playlistURL != primaryManifestURL.String()

mediaPlaylist, err := convertToMediaPlaylist(playlist, playlistType)
if err != nil {
return nil, err
Expand All @@ -81,10 +84,19 @@ func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *u
if err != nil {
return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err)
}
if actualSegURL != segURL.String() {
// if we had to use the backup location for any segment then we need a new manifest file with new segment URLs
// pointing to wherever they are found, primary or backup
newPlaylistRequired = true
}
segment.URI = actualSegURL
}

// write the manifest to storage and update the manifestURL variable
if !newPlaylistRequired {
return primaryManifestURL, nil
}

// write the updated manifest to storage and update the manifestURL variable
outputStorageURL := osTransferURL.JoinPath("input.m3u8")
err = backoff.Retry(func() error {
return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout)
Expand Down Expand Up @@ -118,7 +130,7 @@ func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType)
return *mediaPlaylist, nil
}

func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) {
func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (string, m3u8.Playlist, m3u8.ListType, error) {
var playlist, playlistBackup m3u8.Playlist
var playlistType, playlistTypeBackup m3u8.ListType
var size, sizeBackup int
Expand Down Expand Up @@ -146,21 +158,21 @@ func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Pla
// (only not found errors passthrough below)
primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup)
if primaryNotFound && backupNotFound {
return nil, 0, errPrimary
return "", nil, 0, errPrimary
}
if errPrimary != nil && !primaryNotFound {
return nil, 0, errPrimary
return "", nil, 0, errPrimary
}
if errBackup != nil && !backupNotFound {
return nil, 0, errBackup
return "", nil, 0, errBackup
}

// Return the largest manifest as the most recent version
hasBackup := backupManifestURL != "" && errBackup == nil
if hasBackup && (errPrimary != nil || sizeBackup > size) {
return playlistBackup, playlistTypeBackup, nil
return backupManifestURL, playlistBackup, playlistTypeBackup, nil
}
return playlist, playlistType, errPrimary
return sourceManifestOSURL, playlist, playlistType, errPrimary
}

func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) {
Expand Down
Loading

0 comments on commit 0b869ae

Please sign in to comment.