Skip to content

Commit

Permalink
add backup_storage_used to egress info (#807)
Browse files Browse the repository at this point in the history
* add backup_storage_used to egress info

* missing import
  • Loading branch information
frostbyte73 authored Nov 14, 2024
1 parent 24945da commit 1e493fd
Show file tree
Hide file tree
Showing 17 changed files with 57 additions and 137 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/livekit/livekit-server v1.7.3-0.20241017190429-44a74fc06ae7
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b
github.com/pion/rtp v1.8.9
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkD
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598 h1:yLlkHk2feSLHstD9n4VKg7YEBR4rLODTI4WE8gNBEnQ=
github.com/livekit/mediatransportutil v0.0.0-20240730083616-559fa5ece598/go.mod h1:jwKUCmObuiEDH0iiuJHaGMXwRs3RjrB4G6qqgkr/5oE=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f h1:JDr/L79siZUP5rFH20QVj1n2ZqwUB9eyRPmFaaeIsQw=
github.com/livekit/protocol v1.27.2-0.20241107143305-3c72d6581b6f/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7 h1:acdgG8T+8lrgT58KZPq/oRy/Gc8xa8/CKLgGLI0pq/I=
github.com/livekit/protocol v1.27.2-0.20241112182453-391b78ffe9a7/go.mod h1:BrACGxSTlbAe+T9uXLOiiWyYrJ2gNc0mTYmZJPq/4aA=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9 h1:33oBjGpVD9tYkDXQU42tnHl8eCX9G6PVUToBVuCUyOs=
github.com/livekit/psrpc v0.6.1-0.20240924010758-9f0a4268a3b9/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/server-sdk-go/v2 v2.2.2-0.20241015094126-b8538ae5d67b h1:R1GpKwVbSYsG08k5sIkNCukvnrkOE18R8IO1YeujR8o=
Expand Down
3 changes: 1 addition & 2 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/stretchr/testify/require"

"github.com/livekit/egress/pkg/info"
"github.com/livekit/protocol/livekit"
)

Expand Down Expand Up @@ -79,7 +78,7 @@ func TestSegmentNaming(t *testing.T) {
expectedStorageDir: "conf_test/", expectedPlaylistFilename: "playlist.m3u8", expectedLivePlaylistFilename: "", expectedSegmentPrefix: "conf_test_2/filename",
},
} {
p := &PipelineConfig{Info: &info.EgressInfo{EgressId: "egress_ID"}}
p := &PipelineConfig{Info: &livekit.EgressInfo{EgressId: "egress_ID"}}
o, err := p.getSegmentConfig(&livekit.SegmentedFileOutput{
FilenamePrefix: test.filenamePrefix,
PlaylistName: test.playlistName,
Expand Down
7 changes: 3 additions & 4 deletions pkg/config/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"gopkg.in/yaml.v3"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
Expand All @@ -55,8 +54,8 @@ type PipelineConfig struct {
OutputCount atomic.Int32 `yaml:"-"`
FinalizationRequired bool `yaml:"-"`

Info *info.EgressInfo `yaml:"-"`
Manifest *Manifest `yaml:"-"`
Info *livekit.EgressInfo `yaml:"-"`
Manifest *Manifest `yaml:"-"`
}

type SourceConfig struct {
Expand Down Expand Up @@ -164,7 +163,7 @@ func (p *PipelineConfig) Update(request *rpc.StartEgressRequest) error {

// start with defaults
now := time.Now().UnixNano()
p.Info = &info.EgressInfo{
p.Info = &livekit.EgressInfo{
EgressId: request.EgressId,
RoomId: request.RoomId,
Status: livekit.EgressStatus_EGRESS_STARTING,
Expand Down
6 changes: 3 additions & 3 deletions pkg/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (h *Handler) Run() {
h.initialized.Break()
if err != nil {
h.conf.Info.SetFailed(err)
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(h.conf.Info))
_, _ = h.ipcServiceClient.HandlerUpdate(context.Background(), h.conf.Info)
return
}

Expand All @@ -109,7 +109,7 @@ func (h *Handler) Run() {
_, _ = h.ipcServiceClient.HandlerFinished(ctx, &ipc.HandlerFinishedRequest{
EgressId: h.conf.Info.EgressId,
Metrics: m,
Info: (*livekit.EgressInfo)(res),
Info: res,
})
}

Expand All @@ -118,5 +118,5 @@ func (h *Handler) Kill() {
if h.controller == nil {
return
}
h.controller.SendEOS(context.Background(), "handler killed")
h.controller.SendEOS(context.Background(), livekit.EndReasonKilled)
}
6 changes: 3 additions & 3 deletions pkg/handler/handler_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (h *Handler) UpdateStream(ctx context.Context, req *livekit.UpdateStreamReq
if err != nil {
return nil, err
}
return (*livekit.EgressInfo)(h.controller.Info), nil
return h.controller.Info, nil
}

func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest) (*livekit.EgressInfo, error) {
Expand All @@ -47,6 +47,6 @@ func (h *Handler) StopEgress(ctx context.Context, _ *livekit.StopEgressRequest)
return nil, errors.ErrEgressNotFound
}

h.controller.SendEOS(ctx, "StopEgress API")
return (*livekit.EgressInfo)(h.controller.Info), nil
h.controller.SendEOS(ctx, livekit.EndReasonAPI)
return h.controller.Info, nil
}
78 changes: 0 additions & 78 deletions pkg/info/info.go

This file was deleted.

31 changes: 15 additions & 16 deletions pkg/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/gstreamer"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/pipeline/builder"
"github.com/livekit/egress/pkg/pipeline/sink"
Expand Down Expand Up @@ -196,7 +195,7 @@ func (c *Controller) BuildPipeline() error {
return nil
}

func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
func (c *Controller) Run(ctx context.Context) *livekit.EgressInfo {
ctx, span := tracer.Start(ctx, "Pipeline.Run")
defer span.End()

Expand All @@ -208,7 +207,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
// close when room ends
go func() {
<-c.src.EndRecording()
c.SendEOS(ctx, "source closed")
c.SendEOS(ctx, livekit.EndReasonSrcClosed)
}()

// wait until room is ready
Expand All @@ -218,7 +217,7 @@ func (c *Controller) Run(ctx context.Context) *info.EgressInfo {
select {
case <-c.stopped.Watch():
c.src.Close()
c.Info.SetAborted(info.MsgStartNotReceived)
c.Info.SetAborted(livekit.MsgStartNotReceived)
return c.Info
case <-start:
// continue
Expand Down Expand Up @@ -280,7 +279,7 @@ func (c *Controller) UpdateStream(ctx context.Context, req *livekit.UpdateStream
// add stream info to results
c.mu.Lock()
c.Info.StreamResults = append(c.Info.StreamResults, stream.StreamInfo)
if list := (*livekit.EgressInfo)(c.Info).GetStream(); list != nil {
if list := c.Info.GetStream(); list != nil {
list.Info = append(list.Info, stream.StreamInfo)
}
c.mu.Unlock()
Expand Down Expand Up @@ -325,7 +324,7 @@ func (c *Controller) streamFinished(ctx context.Context, stream *config.Stream)

// end egress if no outputs remaining
if c.OutputCount.Load() == 0 {
c.SendEOS(ctx, "all streams removed")
c.SendEOS(ctx, livekit.EndReasonStreamsStopped)
return nil
}

Expand Down Expand Up @@ -368,7 +367,7 @@ func (c *Controller) onEOSSent() {
// made it through the pipeline by the time endRecording is closed
if c.SourceType == types.SourceTypeSDK && !c.AudioEnabled {
// this will not actually send a second EOS, but will make sure everything is in the correct state
c.SendEOS(context.Background(), "source closed")
c.SendEOS(context.Background(), livekit.EndReasonSrcClosed)
}
}

Expand All @@ -381,12 +380,12 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {
c.limitTimer.Stop()
}

c.Info.Details = fmt.Sprintf("end reason: %s", reason)
c.Info.SetEndReason(reason)
logger.Debugw("stopping pipeline", "reason", reason)

switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgStoppedBeforeStarted)
c.Info.SetAborted(livekit.MsgStoppedBeforeStarted)
c.p.Stop()

case livekit.EgressStatus_EGRESS_ABORTED,
Expand All @@ -395,11 +394,11 @@ func (c *Controller) SendEOS(ctx context.Context, reason string) {

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ENDING)
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_ENDING:
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
c.sendEOS()

case livekit.EgressStatus_EGRESS_LIMIT_REACHED:
Expand Down Expand Up @@ -454,7 +453,7 @@ func (c *Controller) Close() {
// ensure egress ends with a final state
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgStoppedBeforeStarted)
c.Info.SetAborted(livekit.MsgStoppedBeforeStarted)

case livekit.EgressStatus_EGRESS_ACTIVE,
livekit.EgressStatus_EGRESS_ENDING:
Expand Down Expand Up @@ -492,13 +491,13 @@ func (c *Controller) startSessionLimitTimer(ctx context.Context) {
c.limitTimer = time.AfterFunc(timeout, func() {
switch c.Info.Status {
case livekit.EgressStatus_EGRESS_STARTING:
c.Info.SetAborted(info.MsgLimitReachedWithoutStart)
c.Info.SetAborted(livekit.MsgLimitReachedWithoutStart)

case livekit.EgressStatus_EGRESS_ACTIVE:
c.Info.SetLimitReached()
}
if c.playing.IsBroken() {
c.SendEOS(ctx, "time limit reached")
c.SendEOS(ctx, livekit.EndReasonLimitReached)
} else {
c.p.Stop()
}
Expand Down Expand Up @@ -538,7 +537,7 @@ func (c *Controller) updateStartTime(startedAt int64) {

if c.Info.Status == livekit.EgressStatus_EGRESS_STARTING {
c.Info.UpdateStatus(livekit.EgressStatus_EGRESS_ACTIVE)
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(context.Background(), c.Info)
}
}

Expand Down Expand Up @@ -576,7 +575,7 @@ func (c *Controller) streamUpdated(ctx context.Context) {
}
}

_, _ = c.ipcServiceClient.HandlerUpdate(ctx, (*livekit.EgressInfo)(c.Info))
_, _ = c.ipcServiceClient.HandlerUpdate(ctx, c.Info)
}

func (c *Controller) updateEndTime() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (c *Controller) GetGstPipelineDebugDot() string {
}

func (c *Controller) uploadDebugFiles() {
u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor)
u, err := uploader.New(&c.Debug.StorageConfig, nil, c.monitor, nil)
if err != nil {
logger.Errorw("failed to create uploader", err)
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *FileSink) Close() error {
}

func (s *FileSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *ImageSink) Close() error {
}

func (s *ImageSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/sink/segments.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *SegmentSink) Close() error {
}

func (s *SegmentSink) UploadManifest(filepath string) (string, bool, error) {
if s.DisableManifest && !s.ManifestRequired() {
if s.DisableManifest && !s.conf.Info.BackupStorageUsed {
return "", false, nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
case types.EgressTypeFile:
o := c[0].(*config.FileConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand All @@ -51,7 +51,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
case types.EgressTypeSegments:
o := c[0].(*config.SegmentConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand All @@ -75,7 +75,7 @@ func CreateSinks(p *config.PipelineConfig, callbacks *gstreamer.Callbacks, monit
for _, ci := range c {
o := ci.(*config.ImageConfig)

u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor)
u, err := uploader.New(o.StorageConfig, p.BackupConfig, monitor, p.Info)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 1e493fd

Please sign in to comment.