From 29896674691589893957d1f4506ce06c4884d842 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 27 Sep 2023 16:35:56 -0700 Subject: [PATCH 1/6] cmd/catalyst: implement YAML configuration stack parsing --- cmd/catalyst/catalyst.go | 9 + cmd/catalyst/config/config.go | 149 ++++++++++++++++ cmd/catalyst/config/config_test.go | 89 ++++++++++ config/full-stack.yaml | 268 +++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 2 + 6 files changed, 518 insertions(+) create mode 100644 cmd/catalyst/config/config.go create mode 100644 cmd/catalyst/config/config_test.go create mode 100644 config/full-stack.yaml diff --git a/cmd/catalyst/catalyst.go b/cmd/catalyst/catalyst.go index 0f388cfe3..fa6f1ef74 100644 --- a/cmd/catalyst/catalyst.go +++ b/cmd/catalyst/catalyst.go @@ -1,13 +1,17 @@ package main import ( + "encoding/json" + "fmt" "os" "syscall" + "github.com/icza/dyno" "github.com/livepeer/catalyst/cmd/downloader/cli" "github.com/livepeer/catalyst/cmd/downloader/downloader" "github.com/livepeer/catalyst/cmd/downloader/types" glog "github.com/magicsong/color-glog" + "gopkg.in/yaml.v3" ) var Version = "undefined" @@ -34,6 +38,11 @@ func execNext(cliFlags types.CliFlags) { // Nothing to do. return } + configStr, err := handleConfigFile("/home/iameli/code/catalyst/config/full-stack.yaml") + if err != nil { + panic(err) + } + panic(configStr) glog.Infof("downloader complete, now we will exec %v", cliFlags.ExecCommand) execErr := syscall.Exec(cliFlags.ExecCommand[0], cliFlags.ExecCommand, os.Environ()) if execErr != nil { diff --git a/cmd/catalyst/config/config.go b/cmd/catalyst/config/config.go new file mode 100644 index 000000000..2d6d0ea6d --- /dev/null +++ b/cmd/catalyst/config/config.go @@ -0,0 +1,149 @@ +package config + +import ( + "encoding/json" + "fmt" + "os" + "strings" + + "github.com/icza/dyno" + "gopkg.in/yaml.v2" +) + +// takes /path1:/path2:/path3 and returns JSON bytes +func HandleConfigStack(configPaths string) ([]byte, error) { + var err error + merged := map[string]any{} + filePaths := strings.Split(configPaths, ":") + for _, filePath := range filePaths { + contents, err := readYAMLFile(filePath) + // todo: handle missing file case (allowed as long as we have some) + if err != nil { + return []byte{}, fmt.Errorf("error handling config file %s: %w", filePath, err) + } + merged = mergeMaps(merged, contents) + } + config, err := optionalMap(merged, "config") + if err != nil { + return nil, err + } + protocols, err := optionalMap(config, "protocols") + if err != nil { + return nil, err + } + protocolArray := []map[string]any{} + for k, v := range protocols { + if v == nil { + continue + } + vMap, ok := v.(map[string]any) + if !ok { + return nil, fmt.Errorf("unable to convert protocol '%s' to a string map", k) + } + protocolArray = append(protocolArray, vMap) + } + config["protocols"] = protocolArray + jsonBytes, err := json.MarshalIndent(merged, "", " ") + if err != nil { + return nil, err + } + return jsonBytes, nil +} + +// Returns a new map merging source into dest +// Merges any map[string]any maps that are present +// Overwrites everything else +func mergeMaps(dest, source map[string]any) map[string]any { + merged := map[string]any{} + // Start with a shallow copy of `dest` + for k, v := range dest { + merged[k] = v + } + for newKey, newValue := range source { + oldValue, has := merged[newKey] + if !has { + merged[newKey] = newValue + continue + } + newMap, newOk := newValue.(map[string]any) + oldMap, oldOk := oldValue.(map[string]any) + if newOk && oldOk { + // Both maps. Merge em! + merged[newKey] = mergeMaps(oldMap, newMap) + continue + } + // One or both is not a map, just copy over the new value + merged[newKey] = newValue + } + return merged +} + +func readYAMLFile(filePath string) (map[string]any, error) { + var conf map[any]any + dat, err := os.ReadFile(filePath) + if err != nil { + return nil, err + } + err = yaml.Unmarshal(dat, &conf) + if err != nil { + return nil, err + } + jsonConf := dyno.ConvertMapI2MapS(conf) + jsonMap, ok := jsonConf.(map[string]any) + if !ok { + return nil, fmt.Errorf("unable to convert config to a string map") + } + return jsonMap, nil +} + +func optionalMap(parent map[string]any, key string) (map[string]any, error) { + child, ok := parent[key] + if !ok { + child = map[string]any{} + parent[key] = child + } + childMap, ok := child.(map[string]any) + if !ok { + return nil, fmt.Errorf("unable to convert '%s' to a string map", key) + } + return childMap, nil +} + +func handleConfigFile(configPath string) (string, error) { + var conf map[any]any + dat, err := os.ReadFile(configPath) + if err != nil { + return "", err + } + err = yaml.Unmarshal(dat, &conf) + if err != nil { + return "", err + } + jsonConf := dyno.ConvertMapI2MapS(conf) + jsonMap, ok := jsonConf.(map[string]any) + if !ok { + return "", fmt.Errorf("unable to convert config to a string map") + } + config, err := optionalMap(jsonMap, "config") + if err != nil { + return "", err + } + protocols, err := optionalMap(config, "protocols") + if err != nil { + return "", err + } + protocolArray := []map[string]any{} + for k, v := range protocols { + vMap, ok := v.(map[string]any) + if !ok { + return "", fmt.Errorf("unable to convert protocol '%s' to a string map", k) + } + protocolArray = append(protocolArray, vMap) + } + config["protocols"] = protocolArray + str, err := json.MarshalIndent(jsonConf, "", " ") + if err != nil { + return "", err + } + return string(str), nil +} diff --git a/cmd/catalyst/config/config_test.go b/cmd/catalyst/config/config_test.go new file mode 100644 index 000000000..06189d518 --- /dev/null +++ b/cmd/catalyst/config/config_test.go @@ -0,0 +1,89 @@ +package config + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/icza/dyno" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v2" +) + +func randPath(t *testing.T) string { + randBytes := make([]byte, 16) + rand.Read(randBytes) + return filepath.Join(t.TempDir(), hex.EncodeToString(randBytes)+".yaml") +} + +func toFiles(t *testing.T, strs ...string) string { + paths := []string{} + for _, content := range strs { + filepath := randPath(t) + os.WriteFile(filepath, []byte(content), 0644) + paths = append(paths, filepath) + } + return strings.Join(paths, ":") +} + +func yamlToJson(t *testing.T, yamlStr string) string { + var yamlStruct map[any]any + err := yaml.Unmarshal([]byte(yamlStr), &yamlStruct) + require.NoError(t, err) + jsonStruct := dyno.ConvertMapI2MapS(yamlStruct) + jsonBytes, err := json.Marshal(jsonStruct) + require.NoError(t, err) + return string(jsonBytes) +} + +func TestMerge(t *testing.T) { + confStack := toFiles(t, conf1, conf2, conf3) + jsonBytes, err := HandleConfigStack(confStack) + require.NoError(t, err) + require.JSONEq(t, yamlToJson(t, mergedConf), string(jsonBytes)) +} + +var conf1 = ` +foo: conf1 +some-map: + opt1: cool +config: + protocols: + example-protocol: + protocol-number: 15 + protocol-boolean: true + protocol-string: foobar + removed-protocol: + connector: asdf +` + +var conf2 = ` +foo: conf2 +` + +var conf3 = ` +foo: conf3 +some-map: + opt2: lmao +config: + protocols: + example-protocol: + protocol-string: override + removed-protocol: null +` + +var mergedConf = ` +foo: conf3 +some-map: + opt1: cool + opt2: lmao +config: + protocols: + - protocol-number: 15 + protocol-boolean: true + protocol-string: override +` diff --git a/config/full-stack.yaml b/config/full-stack.yaml new file mode 100644 index 000000000..59baa038a --- /dev/null +++ b/config/full-stack.yaml @@ -0,0 +1,268 @@ +account: + test: + password: 098f6bcd4621d373cade4e832627b4f6 +autopushes: + - - videorec+ + - s3+http://admin:password@localhost:9000/os-recordings/$wildcard/$uuid/source/$segmentCounter.ts?m3u8=../output.m3u8&split=5&video=source&audio=AAC&append=1&waittrackcount=2&recstart=-1 +config: + accesslog: LOG + controller: + interface: null + port: null + username: null + defaultStream: null + limits: null + prometheus: koekjes + protocols: + livepeer-cockroach: + connector: livepeer-cockroach + livepeer-rabbitmq: + connector: livepeer-rabbitmq + livepeer-nginx: + connector: livepeer-nginx + livepeer-minio: + connector: livepeer-minio + livepeer-core-dump-monitor: + connector: livepeer-core-dump-monitor + livepeer-coturn: + connector: livepeer-coturn + livepeer-api: + connector: livepeer-api + postgres-url: postgresql://root@localhost:5432/defaultdb?sslmode=disable + cors-jwt-allowlist: >- + ["http://localhost:8080", "http://localhost:3000", + "http://localhost:8888","http://127.0.0.1:8080", + "http://127.0.0.1:3000", "http://127.0.0.1:8888"] + jwt-secret: stupidlysecret + jwt-audience: my-node + ingest: >- + [{"ingest":"rtmp://localhost/live","ingests":{"rtmp":"rtmp://localhost/live","srt":"srt://localhost:8889"},"playback":"http://localhost:8888/hls","base":"http://localhost:8888","origin":"http://localhost:8888"}] + broadcasters: '[{"address":"http://127.0.0.1:8935"}]' + base-stream-name: video + amqp-url: amqp://localhost:5672/livepeer + vodObjectStoreId: 917a2f18-f7a8-4ae3-a849-6efd4aac8e59 + vodCatalystObjectStoreId: 517873a4-487c-40ad-872f-027f4bc6bd98 + vodCatalystPrivateAssetsObjectStore: cab9266f-5583-4532-9630-7be10d92affe + recordCatalystObjectStoreId: 0926e4ba-b726-4386-92ee-5c4583f62f0a + own-region: box + livepeer-stream-info-service: + connector: livepeer-api + postgres-url: postgresql://root@localhost:5432/defaultdb?sslmode=disable + own-region: box + stream-info-service: true + port: "3040" + livepeer-catalyst-api: + connector: livepeer-catalyst-api + api-server: http://127.0.0.1:3004 + api-token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + tags: node=media,http=http://localhost:8888/mist + redirect-prefixes: video,videorec + node: localhost + http-internal-addr: 0.0.0.0:7979 + balancer-args: "-P koekjes" + amqp-url: amqp://localhost:5672/livepeer + own-region: box + v: "8" + broadcaster-url: http://127.0.0.1:8937 + livepeer-broadcaster: + connector: livepeer + broadcaster: true + metricsClientIP: true + metricsPerStream: true + monitor: true + cliAddr: 127.0.0.1:7935 + httpAddr: 127.0.0.1:8935 + orchAddr: 127.0.0.1:8936 + rtmpAddr: 127.0.0.1:1936 + authWebhookUrl: >- + http://9c2936b5-143f-4b10-b302-6a21b5f29c3d:f61b3cdb-d173-4a7a-a0d3-547b871a56f9@127.0.0.1:3004/api/stream/hook + metadataQueueUri: amqp://localhost:5672/livepeer + v: "2" + livepeer-broadcaster-vod: + connector: livepeer + broadcaster: true + metricsClientIP: true + metricsPerStream: true + monitor: true + httpAddr: 127.0.0.1:8937 + cliAddr: 127.0.0.1:7937 + orchAddr: 127.0.0.1:8936 + rtmpAddr: 127.0.0.1:1937 + v: "2" + livepeer-orchestrator: + connector: livepeer + orchestrator: true + transcoder: true + cliAddr: 127.0.0.1:7936 + metricsClientIP: true + metricsPerStream: true + monitor: true + serviceAddr: 127.0.0.1:8936 + v: "2" + livepeer-analyzer: + connector: livepeer-analyzer + livepeer-access-token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + port: "3080" + rabbitmq-uri: amqp://localhost:5672/livepeer + disable-bigquery: true + v: "8" + livepeer-task-runner: + connector: livepeer-task-runner + amqp-uri: amqp://localhost:5672/livepeer + catalyst-secret: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + catalyst-url: http://127.0.0.1:7979 + livepeer-access-token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + own-base-url: http://127.0.0.1:3060/task-runner + port: "3060" + AAC: + connector: AAC + CMAF: + connector: CMAF + DTSC: + connector: DTSC + EBML: + connector: EBML + FLV: + connector: FLV + H264: + connector: H264 + HDS: + connector: HDS + HLS: + connector: HLS + HTTP: + connector: HTTP + HTTPTS: + connector: HTTPTS + JSON: + connector: JSON + MP3: + connector: MP3 + MP4: + connector: MP4 + OGG: + connector: OGG + RTMP: + connector: RTMP + RTSP: + connector: RTSP + SRT: + connector: SRT + TSSRT: + connector: TSSRT + WAV: + connector: WAV + WebRTC: + connector: WebRTC + bindhost: 127.0.0.1 + iceservers: + - urls: stun:localhost + - credential: livepeer + urls: turn:localhost + username: livepeer + pubhost: 127.0.0.1 + sessionInputMode: 15 + sessionOutputMode: 15 + sessionStreamInfoMode: 1 + sessionUnspecifiedMode: 0 + sessionViewerMode: 14 + tknMode: 15 + triggers: {} + trustedproxy: + - 0.0.0.0/1 + - 128.0.0.0/1 +extwriters: + - [ + "s3", + "livepeer-catalyst-uploader -t 2592000s", + ["s3", "s3+http", "s3+https", "ipfs"], + ] +push_settings: + maxspeed: null + wait: null +streams: + video: + DVR: 25000 + maxkeepaway: 7500 + name: video + processes: + - access_token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + custom_url: http://127.0.0.1:3004/api/stream/video + debug: 5 + exit_unmask: false + leastlive: "1" + process: Livepeer + target_profiles: + - x-LSP-name: 144p + bitrate: 400000 + fps: 30 + height: 144 + name: P144p30fps16x9 + width: 256 + profile: H264ConstrainedHigh + - exec: >- + gst-launch-1.0 -q fdsrc fd=0 ! matroskademux ! faad ! audioresample ! + opusenc inband-fec=true perfect-timestamp=true ! matroskamux ! fdsink + fd=1 + exit_unmask: false + process: MKVExec + track_inhibit: audio=opus&video=source,|bframes + track_select: video=none&audio=aac,|source,|maxbps + x-LSP-name: AAC to Opus + leastlive: 1 + - exec: >- + gst-launch-1.0 -q fdsrc fd=0 ! matroskademux ! opusdec + use-inband-fec=true ! audioresample ! voaacenc perfect-timestamp=true + ! matroskamux ! fdsink fd=1 + exit_unmask: false + process: MKVExec + track_inhibit: audio=aac + track_select: video=none&audio=opus,|source,|maxbps + x-LSP-name: Opus to AAC + leastlive: 1 + segmentsize: 1 + source: push:// + stop_sessions: false + videorec: + DVR: 25000 + maxkeepaway: 7500 + name: video + processes: + - access_token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 + custom_url: http://127.0.0.1:3004/api/stream/video + debug: 5 + exit_unmask: false + leastlive: "1" + process: Livepeer + target_profiles: + - x-LSP-name: 144p + bitrate: 400000 + fps: 30 + height: 144 + name: P144p30fps16x9 + width: 256 + profile: H264ConstrainedHigh + - exec: >- + gst-launch-1.0 -q fdsrc fd=0 ! matroskademux ! faad ! audioresample ! + opusenc inband-fec=true perfect-timestamp=true ! matroskamux ! fdsink + fd=1 + exit_unmask: false + process: MKVExec + track_inhibit: audio=opus&video=source,|bframes + track_select: video=none&audio=aac,|source,|maxbps + x-LSP-name: AAC to Opus + leastlive: 1 + - exec: >- + gst-launch-1.0 -q fdsrc fd=0 ! matroskademux ! opusdec + use-inband-fec=true ! audioresample ! voaacenc perfect-timestamp=true + ! matroskamux ! fdsink fd=1 + exit_unmask: false + process: MKVExec + track_inhibit: audio=aac + track_select: video=none&audio=opus,|source,|maxbps + x-LSP-name: Opus to AAC + leastlive: 1 + segmentsize: 1 + source: push:// + stop_sessions: false +variables: null diff --git a/go.mod b/go.mod index 009be8263..10017213a 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( github.com/ProtonMail/gopenpgp/v2 v2.4.10 github.com/golang/glog v1.1.1 + github.com/icza/dyno v0.0.0-20230330125955-09f820a8d9c0 github.com/livepeer/stream-tester v0.12.30-0.20230823234013-5cfb4bbcf27d github.com/magicsong/color-glog v0.0.1 github.com/minio/minio-go/v7 v7.0.46 diff --git a/go.sum b/go.sum index 2fa836fea..a3353ae98 100644 --- a/go.sum +++ b/go.sum @@ -302,6 +302,8 @@ github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/icza/dyno v0.0.0-20230330125955-09f820a8d9c0 h1:nHoRIX8iXob3Y2kdt9KsjyIb7iApSvb3vgsd93xb5Ow= +github.com/icza/dyno v0.0.0-20230330125955-09f820a8d9c0/go.mod h1:c1tRKs5Tx7E2+uHGSyyncziFjvGpgv4H2HrqXeUQ/Uk= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= From 1e66f4bfca19856149fc5c857aa2d38abb60783a Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Thu, 28 Sep 2023 07:21:54 -0700 Subject: [PATCH 2/6] cmd/catalyst: boot from a temporary mist config file --- Makefile | 2 ++ cmd/catalyst/catalyst.go | 46 +++++++++++++++---------- cmd/catalyst/cli/cli.go | 48 ++++++++++++++++++++++++++ cmd/downloader/cli/cli.go | 63 ++++++++++++++++------------------- cmd/downloader/types/types.go | 5 +-- 5 files changed, 110 insertions(+), 54 deletions(-) create mode 100644 cmd/catalyst/cli/cli.go diff --git a/Makefile b/Makefile index 8e05892d3..d26b861c1 100644 --- a/Makefile +++ b/Makefile @@ -214,6 +214,8 @@ box-dev: scripts -v $$(realpath config):/etc/livepeer:ro \ -v $$(realpath ./coredumps):$$(realpath ./coredumps) \ -e CORE_DUMP_DIR=$$(realpath ./coredumps) \ + -e CATALYST_CONFIG=/etc/livepeer/full-stack.yaml \ + -e CATALYST_DOWNLOAD=false \ $(shell for line in $$(cat .env 2>/dev/null || echo ''); do printf -- "-e $$line "; done) \ --rm \ -it \ diff --git a/cmd/catalyst/catalyst.go b/cmd/catalyst/catalyst.go index fa6f1ef74..4f6df6476 100644 --- a/cmd/catalyst/catalyst.go +++ b/cmd/catalyst/catalyst.go @@ -1,17 +1,15 @@ package main import ( - "encoding/json" - "fmt" "os" + "os/exec" "syscall" - "github.com/icza/dyno" - "github.com/livepeer/catalyst/cmd/downloader/cli" + "github.com/livepeer/catalyst/cmd/catalyst/cli" + "github.com/livepeer/catalyst/cmd/catalyst/config" "github.com/livepeer/catalyst/cmd/downloader/downloader" "github.com/livepeer/catalyst/cmd/downloader/types" glog "github.com/magicsong/color-glog" - "gopkg.in/yaml.v3" ) var Version = "undefined" @@ -25,27 +23,41 @@ func main() { glog.Fatalf("error parsing cli flags: %s", err) return } - err = downloader.Run(cliFlags) + if cliFlags.Download { + err = downloader.Run(cliFlags) + if err != nil { + glog.Fatalf("error running downloader: %s", err) + } + } + err = execNext(cliFlags) if err != nil { - glog.Fatalf("error running downloader: %s", err) + glog.Fatalf("error executing MistController: %s", err) } - execNext(cliFlags) } // Done! Move on to the provided next application, if it exists. -func execNext(cliFlags types.CliFlags) { - if len(cliFlags.ExecCommand) == 0 { - // Nothing to do. - return +func execNext(cliFlags types.CliFlags) error { + jsonBytes, err := config.HandleConfigStack(cliFlags.ConfigStack) + if err != nil { + return err + } + f, err := os.CreateTemp("", "catalyst-generated-*.json") + if err != nil { + return err + } + _, err = f.Write(jsonBytes) + if err != nil { + return err } - configStr, err := handleConfigFile("/home/iameli/code/catalyst/config/full-stack.yaml") + glog.Infof("downloader complete, now we will exec %v", cliFlags.MistController) + binary, err := exec.LookPath(cliFlags.MistController) if err != nil { - panic(err) + return err } - panic(configStr) - glog.Infof("downloader complete, now we will exec %v", cliFlags.ExecCommand) - execErr := syscall.Exec(cliFlags.ExecCommand[0], cliFlags.ExecCommand, os.Environ()) + args := []string{binary, "-c", f.Name()} + execErr := syscall.Exec(binary, args, os.Environ()) if execErr != nil { glog.Fatalf("error running next command: %s", execErr) } + return nil } diff --git a/cmd/catalyst/cli/cli.go b/cmd/catalyst/cli/cli.go new file mode 100644 index 000000000..191137c57 --- /dev/null +++ b/cmd/catalyst/cli/cli.go @@ -0,0 +1,48 @@ +package cli + +import ( + "flag" + "fmt" + "os" + + downloaderCli "github.com/livepeer/catalyst/cmd/downloader/cli" + "github.com/livepeer/catalyst/cmd/downloader/constants" + "github.com/livepeer/catalyst/cmd/downloader/types" + "github.com/peterbourgon/ff/v3" +) + +// GetCliFlags reads command-line arguments and generates a struct +// with useful values set after parsing the same. +func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) { + cliFlags := types.CliFlags{} + flag.Set("logtostderr", "true") + vFlag := flag.Lookup("v") + fs := flag.NewFlagSet(constants.AppName, flag.ExitOnError) + + downloaderCli.AddDownloaderFlags(fs, &cliFlags) + + fs.StringVar(&cliFlags.MistController, "mist-controller", "MistController", "Path to MistController binary to exec when done") + fs.StringVar(&cliFlags.ConfigStack, "config", "/etc/livepeer/catalyst.yaml", "Path to multiple Catalyst config files to use. Can contain multiple entries e.g. /conf1:/conf2") + + version := fs.Bool("version", false, "Get version information") + + if *version { + fmt.Printf("catalyst version: %s\n", buildFlags.Version) + os.Exit(0) + } + + ff.Parse( + fs, os.Args[1:], + ff.WithConfigFileParser(ff.PlainParser), + ff.WithEnvVarPrefix("CATALYST"), + ff.WithEnvVarSplit(","), + ) + flag.CommandLine.Parse(nil) + vFlag.Value.Set(cliFlags.Verbosity) + + err := downloaderCli.ValidateFlags(&cliFlags) + if err != nil { + return cliFlags, err + } + return cliFlags, err +} diff --git a/cmd/downloader/cli/cli.go b/cmd/downloader/cli/cli.go index 7c890210d..90a9b97ea 100644 --- a/cmd/downloader/cli/cli.go +++ b/cmd/downloader/cli/cli.go @@ -1,7 +1,6 @@ package cli import ( - "errors" "flag" "fmt" "net/url" @@ -15,7 +14,7 @@ import ( "github.com/peterbourgon/ff/v3" ) -func validateFlags(flags *types.CliFlags) error { +func ValidateFlags(flags *types.CliFlags) error { if !utils.IsSupportedPlatformArch(flags.Platform, flags.Architecture) { return fmt.Errorf( "invalid combination of platform+architecture detected: %s+%s", @@ -30,8 +29,6 @@ func validateFlags(flags *types.CliFlags) error { } if manifestURL.Scheme == "https" { flags.ManifestURL = true - } else if len(flags.ExecCommand) == 0 { - return errors.New("invalid path/url to manifest file") } } if info, err := os.Stat(flags.DownloadPath); !(err == nil && info.IsDir()) { @@ -47,38 +44,12 @@ func validateFlags(flags *types.CliFlags) error { // with useful values set after parsing the same. func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) { cliFlags := types.CliFlags{} - args := []string{} - // Handle post-exec string - for i, arg := range os.Args[1:] { - if arg == "--" { - cliFlags.ExecCommand = os.Args[i+2:] - break - } - args = append(args, arg) - } + flag.Set("logtostderr", "true") vFlag := flag.Lookup("v") fs := flag.NewFlagSet(constants.AppName, flag.ExitOnError) - goos := runtime.GOOS - if os.Getenv("GOOS") != "" { - goos = os.Getenv("GOOS") - } - - goarch := runtime.GOARCH - if os.Getenv("GOARCH") != "" { - goarch = os.Getenv("GOARCH") - } - - fs.StringVar(&cliFlags.Verbosity, "v", "3", "Log verbosity. Integer value from 0 to 9") - fs.StringVar(&cliFlags.Platform, "platform", goos, "One of linux/windows/darwin") - fs.StringVar(&cliFlags.Architecture, "architecture", goarch, "System architecture (amd64/arm64)") - fs.StringVar(&cliFlags.DownloadPath, "path", fmt.Sprintf(".%sbin", string(os.PathSeparator)), "Path to store binaries") - fs.StringVar(&cliFlags.ManifestFile, "manifest", "manifest.yaml", "Path (or URL) to manifest yaml file") - fs.BoolVar(&cliFlags.SkipDownloaded, "skip-downloaded", false, "Skip already downloaded archive (if found)") - fs.BoolVar(&cliFlags.Cleanup, "cleanup", true, "Cleanup downloaded archives after extraction") - fs.BoolVar(&cliFlags.UpdateManifest, "update-manifest", false, "Update the manifest file commit shas from releases prior to downloading") - fs.BoolVar(&cliFlags.Download, "download", true, "Actually do a download. Only useful for -update-manifest=true -download=false") + AddDownloaderFlags(fs, &cliFlags) version := fs.Bool("version", false, "Get version information") @@ -88,8 +59,7 @@ func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) { } ff.Parse( - fs, args, - ff.WithConfigFileFlag("config"), + fs, os.Args[1:], ff.WithConfigFileParser(ff.PlainParser), ff.WithEnvVarPrefix("CATALYST_DOWNLOADER"), ff.WithEnvVarSplit(","), @@ -97,9 +67,32 @@ func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) { flag.CommandLine.Parse(nil) vFlag.Value.Set(cliFlags.Verbosity) - err := validateFlags(&cliFlags) + err := ValidateFlags(&cliFlags) if err != nil { glog.Fatal(err) } return cliFlags, err } + +// Populate a provided flagset with downloader flags +func AddDownloaderFlags(fs *flag.FlagSet, cliFlags *types.CliFlags) { + goos := runtime.GOOS + if os.Getenv("GOOS") != "" { + goos = os.Getenv("GOOS") + } + + goarch := runtime.GOARCH + if os.Getenv("GOARCH") != "" { + goarch = os.Getenv("GOARCH") + } + + fs.StringVar(&cliFlags.Verbosity, "v", "3", "Log verbosity. Integer value from 0 to 9") + fs.StringVar(&cliFlags.Platform, "platform", goos, "One of linux/windows/darwin") + fs.StringVar(&cliFlags.Architecture, "architecture", goarch, "System architecture (amd64/arm64)") + fs.StringVar(&cliFlags.DownloadPath, "path", fmt.Sprintf(".%sbin", string(os.PathSeparator)), "Path to store binaries") + fs.StringVar(&cliFlags.ManifestFile, "manifest", "manifest.yaml", "Path (or URL) to manifest yaml file") + fs.BoolVar(&cliFlags.SkipDownloaded, "skip-downloaded", false, "Skip already downloaded archive (if found)") + fs.BoolVar(&cliFlags.Cleanup, "cleanup", true, "Cleanup downloaded archives after extraction") + fs.BoolVar(&cliFlags.UpdateManifest, "update-manifest", false, "Update the manifest file commit shas from releases prior to downloading") + fs.BoolVar(&cliFlags.Download, "download", true, "Actually do a download. Only useful for -update-manifest=true -download=false") +} diff --git a/cmd/downloader/types/types.go b/cmd/downloader/types/types.go index eaea40084..fe42f4746 100644 --- a/cmd/downloader/types/types.go +++ b/cmd/downloader/types/types.go @@ -43,9 +43,10 @@ type CliFlags struct { Architecture string ManifestFile string Verbosity string - ExecCommand []string - ManifestURL bool + ManifestURL bool + MistController string + ConfigStack string } type DownloadStrategy struct { From 371999a1a94961c3fd6b054f6784adce7444e332 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Thu, 28 Sep 2023 15:59:33 -0700 Subject: [PATCH 3/6] cmd/catalyst: cleanup --- cmd/catalyst/config/config.go | 40 +---------------------------------- config/full-stack.yaml | 34 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/cmd/catalyst/config/config.go b/cmd/catalyst/config/config.go index 2d6d0ea6d..654275dd2 100644 --- a/cmd/catalyst/config/config.go +++ b/cmd/catalyst/config/config.go @@ -96,6 +96,7 @@ func readYAMLFile(filePath string) (map[string]any, error) { return jsonMap, nil } +// Return a (mutable) reference to the map at the given key, returning an empty one if none present func optionalMap(parent map[string]any, key string) (map[string]any, error) { child, ok := parent[key] if !ok { @@ -108,42 +109,3 @@ func optionalMap(parent map[string]any, key string) (map[string]any, error) { } return childMap, nil } - -func handleConfigFile(configPath string) (string, error) { - var conf map[any]any - dat, err := os.ReadFile(configPath) - if err != nil { - return "", err - } - err = yaml.Unmarshal(dat, &conf) - if err != nil { - return "", err - } - jsonConf := dyno.ConvertMapI2MapS(conf) - jsonMap, ok := jsonConf.(map[string]any) - if !ok { - return "", fmt.Errorf("unable to convert config to a string map") - } - config, err := optionalMap(jsonMap, "config") - if err != nil { - return "", err - } - protocols, err := optionalMap(config, "protocols") - if err != nil { - return "", err - } - protocolArray := []map[string]any{} - for k, v := range protocols { - vMap, ok := v.(map[string]any) - if !ok { - return "", fmt.Errorf("unable to convert protocol '%s' to a string map", k) - } - protocolArray = append(protocolArray, vMap) - } - config["protocols"] = protocolArray - str, err := json.MarshalIndent(jsonConf, "", " ") - if err != nil { - return "", err - } - return string(str), nil -} diff --git a/config/full-stack.yaml b/config/full-stack.yaml index 59baa038a..3d51d984a 100644 --- a/config/full-stack.yaml +++ b/config/full-stack.yaml @@ -16,16 +16,22 @@ config: protocols: livepeer-cockroach: connector: livepeer-cockroach + livepeer-rabbitmq: connector: livepeer-rabbitmq + livepeer-nginx: connector: livepeer-nginx + livepeer-minio: connector: livepeer-minio + livepeer-core-dump-monitor: connector: livepeer-core-dump-monitor + livepeer-coturn: connector: livepeer-coturn + livepeer-api: connector: livepeer-api postgres-url: postgresql://root@localhost:5432/defaultdb?sslmode=disable @@ -45,12 +51,14 @@ config: vodCatalystPrivateAssetsObjectStore: cab9266f-5583-4532-9630-7be10d92affe recordCatalystObjectStoreId: 0926e4ba-b726-4386-92ee-5c4583f62f0a own-region: box + livepeer-stream-info-service: connector: livepeer-api postgres-url: postgresql://root@localhost:5432/defaultdb?sslmode=disable own-region: box stream-info-service: true port: "3040" + livepeer-catalyst-api: connector: livepeer-catalyst-api api-server: http://127.0.0.1:3004 @@ -64,6 +72,7 @@ config: own-region: box v: "8" broadcaster-url: http://127.0.0.1:8937 + livepeer-broadcaster: connector: livepeer broadcaster: true @@ -78,6 +87,7 @@ config: http://9c2936b5-143f-4b10-b302-6a21b5f29c3d:f61b3cdb-d173-4a7a-a0d3-547b871a56f9@127.0.0.1:3004/api/stream/hook metadataQueueUri: amqp://localhost:5672/livepeer v: "2" + livepeer-broadcaster-vod: connector: livepeer broadcaster: true @@ -89,6 +99,7 @@ config: orchAddr: 127.0.0.1:8936 rtmpAddr: 127.0.0.1:1937 v: "2" + livepeer-orchestrator: connector: livepeer orchestrator: true @@ -99,6 +110,7 @@ config: monitor: true serviceAddr: 127.0.0.1:8936 v: "2" + livepeer-analyzer: connector: livepeer-analyzer livepeer-access-token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 @@ -106,6 +118,7 @@ config: rabbitmq-uri: amqp://localhost:5672/livepeer disable-bigquery: true v: "8" + livepeer-task-runner: connector: livepeer-task-runner amqp-uri: amqp://localhost:5672/livepeer @@ -114,44 +127,64 @@ config: livepeer-access-token: f61b3cdb-d173-4a7a-a0d3-547b871a56f9 own-base-url: http://127.0.0.1:3060/task-runner port: "3060" + AAC: connector: AAC + CMAF: connector: CMAF + DTSC: connector: DTSC + EBML: connector: EBML + FLV: connector: FLV + H264: connector: H264 + HDS: connector: HDS + HLS: connector: HLS + HTTP: connector: HTTP + HTTPTS: connector: HTTPTS + JSON: connector: JSON + MP3: connector: MP3 + MP4: connector: MP4 + OGG: connector: OGG + RTMP: connector: RTMP + RTSP: connector: RTSP + SRT: connector: SRT + TSSRT: connector: TSSRT + WAV: connector: WAV + WebRTC: connector: WebRTC bindhost: 127.0.0.1 @@ -161,6 +194,7 @@ config: urls: turn:localhost username: livepeer pubhost: 127.0.0.1 + sessionInputMode: 15 sessionOutputMode: 15 sessionStreamInfoMode: 1 From 10ca3ed660fce8972f9788eedee97da73325d08f Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 18 Oct 2023 14:34:36 -0700 Subject: [PATCH 4/6] rename catalyst to livepeer-catalyst --- Dockerfile | 2 +- Makefile | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/Dockerfile b/Dockerfile index 71ab7705b..cdcb5cdd3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -118,7 +118,7 @@ ENV CATALYST_DOWNLOADER_PATH=/usr/local/bin \ RUN mkdir /data -CMD ["/usr/local/bin/catalyst", "--", "/usr/local/bin/MistController", "-c", "/etc/livepeer/full-stack.json"] +CMD ["/usr/local/bin/livepeer-catalyst", "--", "/usr/local/bin/MistController", "-c", "/etc/livepeer/full-stack.json"] FROM ${FROM_LOCAL_PARENT} AS box-local diff --git a/Makefile b/Makefile index d26b861c1..911b47a78 100644 --- a/Makefile +++ b/Makefile @@ -113,9 +113,11 @@ livepeer-api-pkg: box-kill: [[ "$$KILL" == "true" ]] && docker exec catalyst pkill -f /usr/local/bin/$(BIN) || echo "Not restarting $(BIN), use KILL=true if you want that" -.PHONY: catalyst -catalyst: - go build -o ./bin/catalyst ./cmd/catalyst/catalyst.go +catalyst: livepeer-catalyst + +.PHONY: livepeer-catalyst +livepeer-catalyst: + go build -o ./bin/livepeer-catalyst ./cmd/catalyst/catalyst.go .PHONY: download download: From ab0de1d1053e361ce1c5bfc5b1a9787b4d113625 Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 18 Oct 2023 14:39:51 -0700 Subject: [PATCH 5/6] catalyst: implement --exec --- cmd/catalyst/catalyst.go | 3 +++ cmd/catalyst/cli/cli.go | 1 + cmd/downloader/types/types.go | 1 + 3 files changed, 5 insertions(+) diff --git a/cmd/catalyst/catalyst.go b/cmd/catalyst/catalyst.go index 4f6df6476..1424a057d 100644 --- a/cmd/catalyst/catalyst.go +++ b/cmd/catalyst/catalyst.go @@ -29,6 +29,9 @@ func main() { glog.Fatalf("error running downloader: %s", err) } } + if !cliFlags.Exec { + return + } err = execNext(cliFlags) if err != nil { glog.Fatalf("error executing MistController: %s", err) diff --git a/cmd/catalyst/cli/cli.go b/cmd/catalyst/cli/cli.go index 191137c57..227fb9b57 100644 --- a/cmd/catalyst/cli/cli.go +++ b/cmd/catalyst/cli/cli.go @@ -22,6 +22,7 @@ func GetCliFlags(buildFlags types.BuildFlags) (types.CliFlags, error) { downloaderCli.AddDownloaderFlags(fs, &cliFlags) fs.StringVar(&cliFlags.MistController, "mist-controller", "MistController", "Path to MistController binary to exec when done") + fs.BoolVar(&cliFlags.Exec, "exec", true, "Exec MistController when (optional) update is complete") fs.StringVar(&cliFlags.ConfigStack, "config", "/etc/livepeer/catalyst.yaml", "Path to multiple Catalyst config files to use. Can contain multiple entries e.g. /conf1:/conf2") version := fs.Bool("version", false, "Get version information") diff --git a/cmd/downloader/types/types.go b/cmd/downloader/types/types.go index fe42f4746..59247701d 100644 --- a/cmd/downloader/types/types.go +++ b/cmd/downloader/types/types.go @@ -47,6 +47,7 @@ type CliFlags struct { ManifestURL bool MistController string ConfigStack string + Exec bool } type DownloadStrategy struct { From 6b4a0fee0e70850a148b62d42cb7e098cafcb97b Mon Sep 17 00:00:00 2001 From: Eli Mallon Date: Wed, 18 Oct 2023 14:42:20 -0700 Subject: [PATCH 6/6] downloader: do atomic renaming of new binaries --- cmd/downloader/downloader/downloader.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/downloader/downloader/downloader.go b/cmd/downloader/downloader/downloader.go index b9be98e7d..717d75cca 100644 --- a/cmd/downloader/downloader/downloader.go +++ b/cmd/downloader/downloader/downloader.go @@ -163,6 +163,7 @@ func ExtractTarGzipArchive(archiveFile, extractPath string, service *types.Servi if output == "" { output = filepath.Join(extractPath, path.Base(header.Name)) } + output = output + "-new" glog.V(9).Infof("extracting to %q", output) outfile, err := os.Create(output) if err != nil { @@ -176,6 +177,7 @@ func ExtractTarGzipArchive(archiveFile, extractPath string, service *types.Servi } outfile.Chmod(fs.FileMode(header.Mode)) outfile.Close() + os.Rename(output, filepath.Join(extractPath, path.Base(header.Name))) } } return nil