From 05886e54ff2fdde22e1ef1332e4a967b4add0ba4 Mon Sep 17 00:00:00 2001 From: Victor Elias Date: Thu, 20 Jun 2024 14:00:59 +0100 Subject: [PATCH] config: Create storage-fallback-urls flag --- config/cli.go | 1 + main.go | 3 ++- pipeline/coordinator.go | 9 ++++++++- pipeline/ffmpeg.go | 1 + 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/config/cli.go b/config/cli.go index 2f2478837..7ca6908e4 100644 --- a/config/cli.go +++ b/config/cli.go @@ -58,6 +58,7 @@ type Cli struct { EncryptKey string VodDecryptPublicKey string VodDecryptPrivateKey string + StorageFallbackURLs map[string]string GateURL string StreamHealthHookURL string BroadcasterURL string diff --git a/main.go b/main.go index e190b8b57..93b466b82 100644 --- a/main.go +++ b/main.go @@ -117,6 +117,7 @@ func main() { fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.") fs.StringVar(&cli.VodDecryptPublicKey, "catalyst-public-key", "", "Public key of the catalyst node for encryption") fs.StringVar(&cli.VodDecryptPrivateKey, "catalyst-private-key", "", "Private key of the catalyst node for encryption") + config.CommaMapFlag(fs, &cli.StorageFallbackURLs, "storage-fallback-urls", map[string]string{}, `Comma-separated map of primary to backup storage URLs. If a file fails downloading from one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced`) fs.StringVar(&cli.GateURL, "gate-url", "http://localhost:3004/api/access-control/gate", "Address to contact playback gating API for access control verification") config.InvertedBoolFlag(fs, &cli.MistTriggerSetup, "mist-trigger-setup", true, "Overwrite Mist triggers with the ones built into catalyst-api") fs.IntVar(&cli.SerfQueueSize, "serf-queue-size", 100000, "Size of internal serf queue before messages are dropped") @@ -226,7 +227,7 @@ func main() { } // Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline // or an external one - vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) + vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.StorageFallbackURLs, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2) if err != nil { glog.Fatalf("Error creating VOD pipeline coordinator: %v", err) } diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 2e280ef95..eaad9178d 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -168,11 +168,16 @@ type Coordinator struct { MetricsDB *sql.DB InputCopy clients.InputCopier VodDecryptPrivateKey *rsa.PrivateKey + StorageFallbackURLs map[string]string SourceOutputURL *url.URL C2PA *c2pa.C2PA } -func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string, statusClient clients.TranscodeStatusClient, metricsDB *sql.DB, VodDecryptPrivateKey *rsa.PrivateKey, broadcasterURL string, sourcePlaybackHosts map[string]string, c2pa *c2pa.C2PA) (*Coordinator, error) { +func NewCoordinator( + strategy Strategy, sourceOutputURL, extTranscoderURL string, statusClient clients.TranscodeStatusClient, + metricsDB *sql.DB, VodDecryptPrivateKey *rsa.PrivateKey, storageFallbackURLs map[string]string, broadcasterURL string, + sourcePlaybackHosts map[string]string, c2pa *c2pa.C2PA) (*Coordinator, error) { + if !strategy.IsValid() { return nil, fmt.Errorf("invalid strategy: %s", strategy) } @@ -205,12 +210,14 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string, Broadcaster: broadcaster, probe: video.Probe{}, sourcePlaybackHosts: sourcePlaybackHosts, + storageFallbackURLs: storageFallbackURLs, }, pipeExternal: &external{extTranscoder}, Jobs: cache.New[*JobInfo](), MetricsDB: metricsDB, InputCopy: clients.NewInputCopy(), VodDecryptPrivateKey: VodDecryptPrivateKey, + StorageFallbackURLs: storageFallbackURLs, SourceOutputURL: sourceOutput, C2PA: c2pa, }, nil diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 6a37220fd..2011f5343 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -30,6 +30,7 @@ type ffmpeg struct { Broadcaster clients.BroadcasterClient probe video.Prober sourcePlaybackHosts map[string]string + storageFallbackURLs map[string]string } func init() {