Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clean up integration/debug uploads #709

Merged
merged 8 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/publish-egress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
~/go/pkg/mod
~/go/bin
~/.cache
key: ${{ runner.os }}-egress-${{ hashFiles('**/go.sum') }}
key: "${{ runner.os }}-egress-${{ hashFiles('**/go.sum') }}"
restore-keys: ${{ runner.os }}-egress

- name: Docker metadata
Expand Down
1 change: 0 additions & 1 deletion pkg/config/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type BaseConfig struct {
type DebugConfig struct {
EnableProfiling bool `yaml:"enable_profiling"` // create dot file and pprof on internal error
PathPrefix string `yaml:"path_prefix"` // filepath prefix for uploads
LogKeyFrames bool `yaml:"log_keyframes"` // log first 15s of keyframes when streaming
StorageConfig `yaml:",inline"` // upload config (S3, Azure, GCP, or AliOSS)
}

Expand Down
24 changes: 0 additions & 24 deletions pkg/pipeline/builder/video.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,9 +542,6 @@ func (b *VideoBin) addEncoder() error {
x264Enc.SetArg("speed-preset", "veryfast")
if b.conf.KeyFrameInterval != 0 {
keyframeInterval := uint(b.conf.KeyFrameInterval * float64(b.conf.Framerate))
if b.conf.Debug.LogKeyFrames {
logger.Debugw("setting key frame interval", "interval", keyframeInterval)
}
if err = x264Enc.SetProperty("key-int-max", keyframeInterval); err != nil {
return errors.ErrGstPipelineError(err)
}
Expand Down Expand Up @@ -585,27 +582,6 @@ func (b *VideoBin) addEncoder() error {
if err = b.bin.AddElements(x264Enc, caps); err != nil {
return err
}

if b.conf.GetStreamConfig() != nil && b.conf.Debug.LogKeyFrames {
var firstKeyFrame *time.Duration
x264Enc.GetStaticPad("src").AddProbe(gst.PadProbeTypeBuffer, func(pad *gst.Pad, info *gst.PadProbeInfo) gst.PadProbeReturn {
buffer := info.GetBuffer()
if !buffer.HasFlags(gst.BufferFlagDeltaUnit) {
clockTime := buffer.PresentationTimestamp().AsDuration()
if firstKeyFrame == nil {
firstKeyFrame = clockTime
}
pts := *clockTime - *firstKeyFrame
logger.Debugw("keyframe generated", "pts", pts)
if pts > time.Second*15 {
return gst.PadProbeRemove
}
}

return gst.PadProbeOK
})
}

return nil

case types.MimeTypeVP9:
Expand Down
15 changes: 11 additions & 4 deletions pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (

"github.com/go-gst/go-gst/gst"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/pipeline/sink/uploader"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
Expand All @@ -44,6 +43,7 @@ func (c *Controller) uploadDebugFiles() {
}

done := make(chan struct{})
var dotUploaded, pprofUploaded, trackUploaded bool

var wg sync.WaitGroup
wg.Add(3)
Expand All @@ -67,9 +67,16 @@ func (c *Controller) uploadDebugFiles() {
select {
case <-done:
logger.Infow("debug files uploaded")
return
case <-time.After(time.Second * 3):
logger.Errorw("failed to upload debug files", errors.New("timed out"))
if !dotUploaded {
logger.Warnw("failed to upload dotfile", nil)
}
if !pprofUploaded {
logger.Warnw("failed to upload pprof file", nil)
}
if !trackUploaded {
logger.Warnw("failed to upload track debug files", nil)
}
}
}

Expand Down Expand Up @@ -123,7 +130,7 @@ func (c *Controller) uploadDebugFile(u uploader.Uploader, data []byte, fileExten

filename := fmt.Sprintf("%s%s", c.Info.EgressId, fileExtension)
local := path.Join(dir, filename)
storage := path.Join(c.Debug.PathPrefix, filename)
storage := path.Join(c.Debug.PathPrefix, c.Info.EgressId, filename)

f, err := os.Create(local)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/pipeline/sink/uploader/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net/url"
"os"
"path"

"github.com/Azure/azure-storage-blob-go/azblob"

Expand Down Expand Up @@ -88,5 +89,5 @@ func (u *AzureUploader) upload(localFilepath, storageFilepath string, outputType
return "", 0, errors.ErrUploadFailed("Azure", err)
}

return fmt.Sprintf("%s/%s", u.container, storageFilepath), stat.Size(), nil
return path.Join(u.container, storageFilepath), stat.Size(), nil
}
4 changes: 2 additions & 2 deletions pkg/pipeline/source/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,9 @@ func (s *SDKSource) createWriter(
var logFilename string
if s.Debug.EnableProfiling {
if s.Debug.ToUploadConfig() == nil {
logFilename = path.Join(s.Debug.PathPrefix, fmt.Sprintf("%s.csv", track.ID()))
logFilename = path.Join(s.Debug.PathPrefix, fmt.Sprintf("%s/%s.csv", s.Info.EgressId, track.ID()))
} else {
logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s.csv", track.ID()))
logFilename = path.Join(s.TmpDir, fmt.Sprintf("%s/%s.csv", s.Info.EgressId, track.ID()))
}
}

Expand Down
5 changes: 3 additions & 2 deletions test/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package test

import (
"fmt"
"path"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -77,10 +77,11 @@ func (r *Runner) verifyFile(t *testing.T, p *config.PipelineConfig, res *livekit
localPath := fileRes.Filename
require.NotEmpty(t, storagePath)
require.False(t, strings.Contains(storagePath, "{"))
storageFilename := path.Base(storagePath)

// download from cloud storage
if uploadConfig := p.GetFileConfig().UploadConfig; uploadConfig != nil {
localPath = fmt.Sprintf("%s/%s", r.FilePrefix, storagePath)
localPath = path.Join(r.FilePrefix, storageFilename)
download(t, uploadConfig, localPath, storagePath)
download(t, uploadConfig, localPath+".json", storagePath+".json")
}
Expand Down
14 changes: 7 additions & 7 deletions test/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu
// r.verifyManifest(t, p, segments.PlaylistName)
}

//func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) {
// localPlaylistPath := fmt.Sprintf("%s/%s", r.FilePrefix, plName)
// func (r *Runner) verifyManifest(t *testing.T, p *config.PipelineConfig, plName string) {
// localPlaylistPath := path.Join(r.FilePrefix, plName)
//
// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil {
// download(t, uploadConfig, localPlaylistPath+".json", plName+".json")
// }
//}
// }

//func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) {
// func (r *Runner) verifySegmentOutput(t *testing.T, p *config.PipelineConfig, filenameSuffix livekit.SegmentedFileSuffix, plName string, plLocation string, segmentCount int, res *livekit.EgressInfo, plType m3u8.PlaylistType) {
// require.NotEmpty(t, plName)
// require.NotEmpty(t, plLocation)

Expand All @@ -79,19 +79,19 @@ func (r *Runner) verifyImages(t *testing.T, p *config.PipelineConfig, filenameSu

// // download from cloud storage
// if uploadConfig := p.GetSegmentConfig().UploadConfig; uploadConfig != nil {
// localPlaylistPath = fmt.Sprintf("%s/%s", r.FilePrefix, storedPlaylistPath)
// localPlaylistPath = path.Join(r.FilePrefix, storedPlaylistPath)
// download(t, uploadConfig, localPlaylistPath, storedPlaylistPath)
// if plType == m3u8.PlaylistTypeEvent {
// // Only download segments once
// base := storedPlaylistPath[:len(storedPlaylistPath)-5]
// for i := 0; i < int(segmentCount); i++ {
// cloudPath := fmt.Sprintf("%s_%05d.ts", base, i)
// localPath := fmt.Sprintf("%s/%s", r.FilePrefix, cloudPath)
// localPath := path.Join(r.FilePrefix, cloudPath)
// download(t, uploadConfig, localPath, cloudPath)
// }
// }
// }

// verify
// verify(t, localPlaylistPath, p, res, types.EgressTypeSegments, r.Muting, r.sourceFramerate, plType == m3u8.PlaylistTypeLive)
//}
// }
13 changes: 3 additions & 10 deletions test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"os"
"path"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +60,8 @@ var (
types.MimeTypeVP8: time.Microsecond * 41708,
types.MimeTypeVP9: time.Microsecond * 41708,
}

uploadPrefix = fmt.Sprintf("integration/%s", time.Now().Format("2006-01-02"))
)

type testCase struct {
Expand Down Expand Up @@ -225,7 +226,7 @@ func (r *Runner) startEgress(t *testing.T, req *rpc.StartEgressRequest) string {

func (r *Runner) sendRequest(t *testing.T, req *rpc.StartEgressRequest) *livekit.EgressInfo {
// send start request
info, err := r.client.StartEgress(context.Background(), "", req)
info, err := r.StartEgress(context.Background(), req)

// check returned egress info
require.NoError(t, err)
Expand Down Expand Up @@ -327,11 +328,3 @@ func (r *Runner) stopEgress(t *testing.T, egressID string) *livekit.EgressInfo {

return res
}

func (r *Runner) getFilePath(filename string) string {
if r.S3 != nil || r.Azure != nil || r.GCP != nil || r.AliOSS != nil {
return filename
}

return path.Join(r.FilePrefix, filename)
}
3 changes: 2 additions & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ func TestEgress(t *testing.T) {
svc, err := server.NewServer(r.ServiceConfig, bus, ioClient)
require.NoError(t, err)

r.Run(t, svc, bus, rfs)
r.StartServer(t, svc, bus, rfs)
r.RunTests(t)
}
44 changes: 28 additions & 16 deletions test/participant.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package test

import (
"path"
"testing"
"time"

Expand Down Expand Up @@ -95,14 +96,19 @@ func (r *Runner) testParticipantFile(t *testing.T) {
},
} {
r.runParticipantTest(t, test.name, test, func(t *testing.T, identity string) {
fileOutput := &livekit.EncodedFileOutput{
FileType: test.fileType,
Filepath: r.getFilePath(test.filename),
}
var fileOutput *livekit.EncodedFileOutput
if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.AzureUpload != nil {
fileOutput.Filepath = test.filename
fileOutput.Output = &livekit.EncodedFileOutput_Azure{
Azure: r.AzureUpload,
fileOutput = &livekit.EncodedFileOutput{
FileType: test.fileType,
Filepath: path.Join(uploadPrefix, test.filename),
Output: &livekit.EncodedFileOutput_Azure{
Azure: r.AzureUpload,
},
}
} else {
fileOutput = &livekit.EncodedFileOutput{
FileType: test.fileType,
Filepath: path.Join(r.FilePrefix, test.filename),
}
}

Expand Down Expand Up @@ -193,15 +199,21 @@ func (r *Runner) testParticipantSegments(t *testing.T) {
} {
r.runParticipantTest(t, test.name, test,
func(t *testing.T, identity string) {
segmentOutput := &livekit.SegmentedFileOutput{
FilenamePrefix: r.getFilePath(test.filename),
PlaylistName: test.playlist,
FilenameSuffix: test.filenameSuffix,
}
var segmentOutput *livekit.SegmentedFileOutput
if test.filenameSuffix == livekit.SegmentedFileSuffix_INDEX && r.S3Upload != nil {
segmentOutput.FilenamePrefix = test.filename
segmentOutput.Output = &livekit.SegmentedFileOutput_S3{
S3: r.S3Upload,
segmentOutput = &livekit.SegmentedFileOutput{
FilenamePrefix: path.Join(uploadPrefix, test.filename),
PlaylistName: test.playlist,
FilenameSuffix: test.filenameSuffix,
Output: &livekit.SegmentedFileOutput_S3{
S3: r.S3Upload,
},
}
} else {
segmentOutput = &livekit.SegmentedFileOutput{
FilenamePrefix: path.Join(r.FilePrefix, test.filename),
PlaylistName: test.playlist,
FilenameSuffix: test.filenameSuffix,
}
}

Expand Down Expand Up @@ -256,7 +268,7 @@ func (r *Runner) testParticipantMulti(t *testing.T) {
Identity: identity,
FileOutputs: []*livekit.EncodedFileOutput{{
FileType: livekit.EncodedFileType_MP4,
Filepath: r.getFilePath("participant_multiple_{time}"),
Filepath: path.Join(r.FilePrefix, "participant_multiple_{time}"),
}},
StreamOutputs: []*livekit.StreamOutput{{
Protocol: livekit.StreamProtocol_RTMP,
Expand Down
Loading
Loading