Skip to content

Commit

Permalink
faster startup (#769)
Browse files Browse the repository at this point in the history
* faster startup

* fix test

* add retries for updates

* create async then wait

* fix build

* simplify launch logic

* channel blocking
  • Loading branch information
frostbyte73 authored Sep 6, 2024
1 parent 0c95732 commit 79afb24
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 140 deletions.
4 changes: 2 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
lkredis "github.com/livekit/protocol/redis"
Expand Down Expand Up @@ -111,7 +111,7 @@ func runService(c *cli.Context) error {
}

bus := psrpc.NewRedisMessageBus(rc)
ioClient, err := service.NewIOClient(bus)
ioClient, err := info.NewIOClient(bus)
if err != nil {
return err
}
Expand Down
184 changes: 184 additions & 0 deletions pkg/info/io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package info

import (
"context"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/protocol/egress"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/rpc"
"github.com/livekit/psrpc"
)

const (
ioTimeout = time.Second * 30
)

type IOClient interface {
CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error
UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error
UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error
Drain()
}

type ioClient struct {
rpc.IOInfoClient

mu sync.Mutex
egresses map[string]*egressIOClient
}

type egressIOClient struct {
created core.Fuse
aborted core.Fuse

mu sync.Mutex
pending chan *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
client, err := rpc.NewIOInfoClient(bus, psrpc.WithClientTimeout(ioTimeout))
if err != nil {
return nil, err
}

return &ioClient{
IOInfoClient: client,
egresses: make(map[string]*egressIOClient),
}, nil
}

func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo) chan error {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo, 10),
}
c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

errChan := make(chan error, 1)
go func() {
_, err := c.IOInfoClient.CreateEgress(ctx, info)
if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()
errChan <- err

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
} else {
e.created.Break()
errChan <- nil
}
}()

return errChan
}

func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo) error {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return errors.ErrEgressNotFound
}

// ensure updates are sent sequentially
e.pending <- info

select {
case <-e.created.Watch():
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
return nil
}

// ensure only one thread is sending updates sequentially
e.mu.Lock()
defer e.mu.Unlock()
for {
select {
case update := <-e.pending:
var err error
for i := 0; i < 10; i++ {
_, err = c.IOInfoClient.UpdateEgress(ctx, update)
if err == nil {
break
}
time.Sleep(time.Millisecond * 100 * time.Duration(i))
}
if err != nil {
logger.Warnw("failed to update egress", err, "egressID", update.EgressId)
return err
}

requestType, outputType := egress.GetTypes(update.Request)
logger.Infow(strings.ToLower(update.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", update.Error,
"code", update.ErrorCode,
"details", update.Details,
)

switch update.Status {
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// egress is done, delete ioEgressClient
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
}

default:
return nil
}
}
}

func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest) error {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req)
if err != nil {
logger.Errorw("failed to update ms", err)
return err
}

return nil
}

func (c *ioClient) Drain() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
if len(c.egresses) == 0 {
c.mu.Unlock()
return
}
c.mu.Unlock()
}
}
12 changes: 7 additions & 5 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/pkg/stats"
Expand All @@ -51,14 +52,14 @@ type Server struct {
psrpcServer rpc.EgressInternalServer
ipcServiceServer *grpc.Server
promServer *http.Server
ioClient rpc.IOInfoClient
ioClient info.IOClient

activeRequests atomic.Int32
terminating core.Fuse
shutdown core.Fuse
}

func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Server, error) {
func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.IOClient) (*Server, error) {
pm := service.NewProcessManager()

s := &Server{
Expand Down Expand Up @@ -146,9 +147,9 @@ func (s *Server) Run() error {

logger.Infow("service ready")
<-s.shutdown.Watch()
logger.Infow("shutting down")

logger.Infow("draining")
s.Drain()
logger.Infow("service stopped")
return nil
}

Expand Down Expand Up @@ -191,6 +192,7 @@ func (s *Server) Drain() {
time.Sleep(time.Second)
}

logger.Infow("closing server")
s.psrpcServer.Shutdown()
logger.Infow("draining io client")
s.ioClient.Drain()
}
7 changes: 3 additions & 4 deletions pkg/server/server_ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (s *Server) HandlerReady(_ context.Context, req *ipc.HandlerReadyRequest) (
}

func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*emptypb.Empty, error) {
if _, err := s.ioClient.UpdateEgress(ctx, info); err != nil {
if err := s.ioClient.UpdateEgress(ctx, info); err != nil {
logger.Errorw("failed to update egress", err)
}

Expand All @@ -48,12 +48,11 @@ func (s *Server) HandlerUpdate(ctx context.Context, info *livekit.EgressInfo) (*
}

func (s *Server) HandlerFinished(ctx context.Context, req *ipc.HandlerFinishedRequest) (*emptypb.Empty, error) {
_, err := s.ioClient.UpdateEgress(ctx, req.Info)
if err != nil {
if err := s.ioClient.UpdateEgress(ctx, req.Info); err != nil {
logger.Errorw("failed to update egress", err)
}

if err = s.StoreProcessEndedMetrics(req.EgressId, req.Metrics); err != nil {
if err := s.StoreProcessEndedMetrics(req.EgressId, req.Metrics); err != nil {
logger.Errorw("failed to store ms", err)
}

Expand Down
32 changes: 13 additions & 19 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
return nil, err
}

_, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
if err != nil {
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
}

requestType, outputType := egress.GetTypes(p.Info.Request)
logger.Infow("request validated",
"egressID", req.EgressId,
Expand All @@ -80,8 +73,10 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
"request", p.Info.Request,
)

err = s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err != nil {
errChan := s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
s.launchProcess(req, (*livekit.EgressInfo)(p.Info))
if err = <-errChan; err != nil {
s.AbortProcess(req.EgressId, err)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
Expand All @@ -90,10 +85,12 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
return (*livekit.EgressInfo)(p.Info), nil
}

func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) error {
func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.EgressInfo) {
_, span := tracer.Start(context.Background(), "Service.launchProcess")
defer span.End()

s.monitor.EgressStarted(req)

handlerID := utils.NewGuid("EGH_")
p := &config.PipelineConfig{
BaseConfig: s.conf.BaseConfig,
Expand All @@ -105,14 +102,16 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal config", err)
return err
s.processEnded(req, info, err)
return
}

reqString, err := protojson.Marshal(req)
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal request", err)
return err
s.processEnded(req, info, err)
return
}

cmd := exec.Command("egress",
Expand All @@ -125,8 +124,6 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{Setsid: true}

s.monitor.EgressStarted(req)

if err = s.Launch(context.Background(), handlerID, req, info, cmd, p.TmpDir); err != nil {
s.processEnded(req, info, err)
} else {
Expand All @@ -136,8 +133,6 @@ func (s *Server) launchProcess(req *rpc.StartEgressRequest, info *livekit.Egress
s.processEnded(req, info, err)
}()
}

return nil
}

func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressInfo, err error) {
Expand All @@ -149,15 +144,14 @@ func (s *Server) processEnded(req *rpc.StartEgressRequest, info *livekit.EgressI
info.Status = livekit.EgressStatus_EGRESS_FAILED
info.Error = "internal error"
info.ErrorCode = int32(http.StatusInternalServerError)
_, _ = s.ioClient.UpdateEgress(context.Background(), info)

_ = s.ioClient.UpdateEgress(context.Background(), info)
logger.Errorw("process failed, shutting down", err)
s.Shutdown(false, false)
}

avgCPU, maxCPU := s.monitor.EgressEnded(req)
if maxCPU > 0 {
_, _ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
_ = s.ioClient.UpdateMetrics(context.Background(), &rpc.UpdateMetricsRequest{
Info: info,
AvgCpuUsage: float32(avgCPU),
MaxCpuUsage: float32(maxCPU),
Expand Down
Loading

0 comments on commit 79afb24

Please sign in to comment.