Skip to content

Commit

Permalink
Adjust twitch ingest urls (#726)
Browse files Browse the repository at this point in the history
* route twitch streams to nearest ingest endpoint

* twitch endpoint may not always be consistent

* fix tests

* tidy
frostbyte73 authored Jul 19, 2024
1 parent 9a94b76 commit c644ec9
Showing 12 changed files with 242 additions and 153 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
.DS_Store

.github/workflows/config.yaml
build/gstcefsrc/*.tar.bz2
build/plugins/
test/output/*
test/*.yaml
!test/config-sample.yaml
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ require (
github.com/frostbyte73/core v0.0.10
github.com/go-gst/go-glib v1.0.1
github.com/go-gst/go-gst v1.0.0
github.com/go-jose/go-jose/v3 v3.0.3
github.com/go-logr/logr v1.4.2
github.com/googleapis/gax-go/v2 v2.12.4
github.com/gorilla/websocket v1.5.2
@@ -62,7 +63,6 @@ require (
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gammazero/deque v0.2.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
106 changes: 0 additions & 106 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
@@ -22,114 +22,8 @@ import (

"github.com/livekit/egress/pkg/info"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/rpc"
)

func TestRedactUpload(t *testing.T) {
t.Cleanup(func() {
_ = os.Remove("test_upload/")
})

conf := &ServiceConfig{
BaseConfig: BaseConfig{
NodeID: "server",
},
}

fileReq := &rpc.StartEgressRequest{
EgressId: "test_upload",
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: "room",
Layout: "layout",
Output: &livekit.RoomCompositeEgressRequest_File{
File: &livekit.EncodedFileOutput{
Filepath: "filepath",
Output: &livekit.EncodedFileOutput_S3{
S3: &livekit.S3Upload{
AccessKey: "access",
Secret: "secret",
SessionToken: "",
Bucket: "bucket",
},
},
},
},
},
},
Token: "token",
WsUrl: "wss://egress.com",
}

p, err := GetValidatedPipelineConfig(conf, fileReq)
require.NoError(t, err)

require.Equal(t, "******", (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetFile().GetS3().AccessKey)

require.Len(t, p.Outputs, 1)
output := p.GetFileConfig()
require.NotNil(t, output.UploadConfig)
require.Equal(t, "access", output.UploadConfig.(*livekit.S3Upload).AccessKey)
}

func TestRedactStreamKeys(t *testing.T) {
t.Cleanup(func() {
_ = os.Remove("test_stream/")
})

var (
streamUrl1 = "rtmp://sfo.contribute.live-video.net/app/stream_key"
redactedUrl1 = "rtmp://sfo.contribute.live-video.net/app/**********"
streamUrl2 = "rtmps://live-api-s.facebook.com:443/rtmp/stream_key"
redactedUrl2 = "rtmps://live-api-s.facebook.com:443/rtmp/**********"
)

conf := &ServiceConfig{
BaseConfig: BaseConfig{
NodeID: "server",
},
}

streamReq := &rpc.StartEgressRequest{
EgressId: "test_stream",
Request: &rpc.StartEgressRequest_RoomComposite{
RoomComposite: &livekit.RoomCompositeEgressRequest{
RoomName: "room",
Layout: "layout",
Output: &livekit.RoomCompositeEgressRequest_Stream{
Stream: &livekit.StreamOutput{
Urls: []string{
streamUrl1,
streamUrl2,
},
},
},
},
},
Token: "token",
WsUrl: "wss://egress.com",
}

p, err := GetValidatedPipelineConfig(conf, streamReq)
require.NoError(t, err)

urls := (*livekit.EgressInfo)(p.Info).GetRoomComposite().GetStream().GetUrls()
require.Len(t, urls, 2)
require.Equal(t, redactedUrl1, urls[0])
require.Equal(t, redactedUrl2, urls[1])

streamInfo := (*livekit.EgressInfo)(p.Info).GetStream()
require.Len(t, streamInfo.Info, 2)
require.Equal(t, redactedUrl1, streamInfo.Info[0].Url)
require.Equal(t, redactedUrl2, streamInfo.Info[1].Url)

require.Len(t, p.Outputs, 1)
output := p.GetStreamConfig()
require.Len(t, output.Urls, 2)
require.Equal(t, streamUrl1, output.Urls[0])
require.Equal(t, streamUrl2, output.Urls[1])
}

func TestSegmentNaming(t *testing.T) {
t.Cleanup(func() {
_ = os.RemoveAll("conf_test/")
2 changes: 1 addition & 1 deletion pkg/config/output_stream.go
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@ func (p *PipelineConfig) getStreamConfig(outputType types.OutputType, urls []str
conf.StreamInfo = make(map[string]*livekit.StreamInfo)
var streamInfoList []*livekit.StreamInfo
for _, rawUrl := range urls {
url, redacted, err := p.ValidateUrl(rawUrl, outputType)
url, redacted, err := ValidateUrl(rawUrl, outputType)
if err != nil {
return nil, err
}
31 changes: 0 additions & 31 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ package config

import (
"context"
"fmt"
"net/url"
"strings"
"time"
@@ -34,7 +33,6 @@ import (
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/protocol/tracer"
"github.com/livekit/protocol/utils"
lksdk "github.com/livekit/server-sdk-go/v2"
)

@@ -598,35 +596,6 @@ func (p *PipelineConfig) UpdateInfoFromSDK(identifier string, replacements map[s
return nil
}

func (p *PipelineConfig) ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}

switch outputType {
case types.OutputTypeRTMP:
if parsed.Scheme == "mux" {
rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host)
}

redacted, ok := utils.RedactStreamKey(rawUrl)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
return rawUrl, redacted, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
return rawUrl, rawUrl, nil

default:
return "", "", errors.ErrInvalidInput("stream output type")
}
}

func (p *PipelineConfig) GetEncodedOutputs() []OutputConfig {
ret := make([]OutputConfig, 0)

124 changes: 124 additions & 0 deletions pkg/config/urls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package config

import (
"fmt"
"net/http"
"net/url"
"regexp"
"strings"

"github.com/go-jose/go-jose/v3/json"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/utils"
)

var twitchEndpoint = regexp.MustCompile("^rtmps?://.*\\.contribute\\.live-video\\.net/app/(.*)( live=1)?$")

func ValidateUrl(rawUrl string, outputType types.OutputType) (string, string, error) {
parsed, err := url.Parse(rawUrl)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}

switch outputType {
case types.OutputTypeRTMP:
if parsed.Scheme == "mux" {
rawUrl = fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host)
} else if parsed.Scheme == "twitch" {
rawUrl, err = updateTwitchURL(parsed.Host)
if err != nil {
return "", "", errors.ErrInvalidUrl(rawUrl, err.Error())
}
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
updated, err := updateTwitchURL(match[1])
if err == nil {
rawUrl = updated
}
}

redacted, ok := utils.RedactStreamKey(rawUrl)
if !ok {
return "", "", errors.ErrInvalidUrl(rawUrl, "rtmp urls must be of format rtmp(s)://{host}(/{path})/{app}/{stream_key}( live=1)")
}
return rawUrl, redacted, nil

case types.OutputTypeRaw:
if parsed.Scheme != "ws" && parsed.Scheme != "wss" {
return "", "", errors.ErrInvalidUrl(rawUrl, "invalid scheme")
}
return rawUrl, rawUrl, nil

default:
return "", "", errors.ErrInvalidInput("stream output type")
}
}

func (o *StreamConfig) GetStreamUrl(rawUrl string) (string, error) {
parsed, err := url.Parse(rawUrl)
if err != nil {
return "", errors.ErrInvalidUrl(rawUrl, err.Error())
}

var twitchKey string
if parsed.Scheme == "mux" {
return fmt.Sprintf("rtmps://global-live.mux.com:443/app/%s", parsed.Host), nil
} else if parsed.Scheme == "twitch" {
twitchKey = parsed.Host
} else if match := twitchEndpoint.FindStringSubmatch(rawUrl); len(match) > 0 {
twitchKey = match[1]
} else {
return rawUrl, nil
}

// find twitch url by stream key because we can't rely on the ingest endpoint returning consistent results
for u := range o.StreamInfo {
if match := twitchEndpoint.FindStringSubmatch(u); len(match) > 0 && match[1] == twitchKey {
return u, nil
}
}

return "", errors.ErrStreamNotFound(rawUrl)
}

func updateTwitchURL(key string) (string, error) {
resp, err := http.Get("https://ingest.twitch.tv/ingests")
if err != nil {
return "", err
}
defer resp.Body.Close()
var body struct {
Ingests []struct {
Name string `json:"name"`
URLTemplate string `json:"url_template"`
URLTemplateSecure string `json:"url_template_secure"`
Priority int `json:"priority"`
} `json:"ingests"`
}
if err = json.NewDecoder(resp.Body).Decode(&body); err != nil {
return "", err
}
for _, ingest := range body.Ingests {
if ingest.URLTemplateSecure != "" {
return strings.ReplaceAll(ingest.URLTemplateSecure, "{stream_key}", key), nil
} else if ingest.URLTemplate != "" {
return strings.ReplaceAll(ingest.URLTemplate, "{stream_key}", key), nil
}
}
return "", errors.New("no ingest found")
}
Loading

0 comments on commit c644ec9

Please sign in to comment.