Skip to content

Commit

Permalink
faster startup
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Sep 6, 2024
1 parent 0c95732 commit 15677e6
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 46 deletions.
4 changes: 2 additions & 2 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/handler"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/server"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/version"
"github.com/livekit/protocol/logger"
lkredis "github.com/livekit/protocol/redis"
Expand Down Expand Up @@ -111,7 +111,7 @@ func runService(c *cli.Context) error {
}

bus := psrpc.NewRedisMessageBus(rc)
ioClient, err := service.NewIOClient(bus)
ioClient, err := info.NewIOClient(bus)
if err != nil {
return err
}
Expand Down
121 changes: 89 additions & 32 deletions pkg/service/io.go → pkg/info/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package service
package info

import (
"context"
"strings"
"sync"
"time"

"github.com/frostbyte73/core"
"google.golang.org/protobuf/types/known/emptypb"

"github.com/livekit/egress/pkg/errors"
Expand All @@ -28,81 +31,135 @@ import (
"github.com/livekit/psrpc"
)

type IOClient struct {
type IOClient interface {
rpc.IOInfoClient
Drain()
}

type ioClient struct {
rpc.IOInfoClient

mu sync.Mutex
egresses map[string]*egressIOClient
}

type egressIOClient struct {
created core.Fuse
aborted core.Fuse

mu sync.Mutex
updates chan *livekit.EgressInfo
pending chan *livekit.EgressInfo
}

func NewIOClient(bus psrpc.MessageBus) (rpc.IOInfoClient, error) {
func NewIOClient(bus psrpc.MessageBus) (IOClient, error) {
client, err := rpc.NewIOInfoClient(bus)
if err != nil {
return nil, err
}
return &IOClient{
return &ioClient{
IOInfoClient: client,
updates: make(chan *livekit.EgressInfo, 10),
egresses: make(map[string]*egressIOClient),
}, nil
}

func (c *IOClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
func (c *ioClient) CreateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
e := &egressIOClient{
pending: make(chan *livekit.EgressInfo),
}
c.mu.Lock()
c.egresses[info.EgressId] = e
c.mu.Unlock()

_, err := c.IOInfoClient.CreateEgress(ctx, info, opts...)
if err != nil {
logger.Errorw("failed to create egress", err)
e.aborted.Break()

c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()

return nil, err
}

return &emptypb.Empty{}, nil
}

func (c *IOClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
c.updates <- info
func (c *ioClient) UpdateEgress(ctx context.Context, info *livekit.EgressInfo, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
c.mu.Lock()
e, ok := c.egresses[info.EgressId]
c.mu.Unlock()
if !ok {
return nil, errors.ErrEgressNotFound
}

// ensure updates are sent sequentially
c.mu.Lock()
defer c.mu.Unlock()
e.pending <- info

select {
case <-e.created.Watch():
// egress was created, continue
case <-e.aborted.Watch():
// egress was aborted, ignore
return &emptypb.Empty{}, nil
}

// ensure only one thread is sending updates sequentially
e.mu.Lock()
defer e.mu.Unlock()
for {
select {
case update := <-c.updates:
case update := <-e.pending:
_, err := c.IOInfoClient.UpdateEgress(ctx, update, opts...)
if err != nil {
logger.Errorw("failed to update egress", err)
return nil, err
}

requestType, outputType := egress.GetTypes(update.Request)
logger.Infow(strings.ToLower(update.Status.String()),
"requestType", requestType,
"outputType", outputType,
"error", update.Error,
"code", update.ErrorCode,
"details", update.Details,
)

switch update.Status {
case livekit.EgressStatus_EGRESS_FAILED:
logger.Warnw("egress failed", errors.New(update.Error),
"egressID", update.EgressId,
"requestType", requestType,
"outputType", outputType,
)
case livekit.EgressStatus_EGRESS_COMPLETE:
logger.Infow("egress completed",
"egressID", update.EgressId,
"requestType", requestType,
"outputType", outputType,
)
default:
logger.Infow("egress updated",
"egressID", update.EgressId,
"requestType", requestType,
"outputType", outputType,
"status", update.Status,
)
case livekit.EgressStatus_EGRESS_COMPLETE,
livekit.EgressStatus_EGRESS_FAILED,
livekit.EgressStatus_EGRESS_ABORTED,
livekit.EgressStatus_EGRESS_LIMIT_REACHED:
// egress is done, delete ioEgressClient
c.mu.Lock()
delete(c.egresses, info.EgressId)
c.mu.Unlock()
}

default:
return &emptypb.Empty{}, nil
}
}
}

func (c *IOClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
func (c *ioClient) UpdateMetrics(ctx context.Context, req *rpc.UpdateMetricsRequest, opts ...psrpc.RequestOption) (*emptypb.Empty, error) {
_, err := c.IOInfoClient.UpdateMetrics(ctx, req, opts...)
if err != nil {
logger.Errorw("failed to update ms", err)
return nil, err
}
return &emptypb.Empty{}, nil
}

func (c *ioClient) Drain() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for range ticker.C {
c.mu.Lock()
if len(c.egresses) == 0 {
c.mu.Unlock()
return
}
c.mu.Unlock()
}
}
12 changes: 7 additions & 5 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"google.golang.org/grpc"

"github.com/livekit/egress/pkg/config"
"github.com/livekit/egress/pkg/info"
"github.com/livekit/egress/pkg/ipc"
"github.com/livekit/egress/pkg/service"
"github.com/livekit/egress/pkg/stats"
Expand All @@ -51,14 +52,14 @@ type Server struct {
psrpcServer rpc.EgressInternalServer
ipcServiceServer *grpc.Server
promServer *http.Server
ioClient rpc.IOInfoClient
ioClient info.IOClient

activeRequests atomic.Int32
terminating core.Fuse
shutdown core.Fuse
}

func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient rpc.IOInfoClient) (*Server, error) {
func NewServer(conf *config.ServiceConfig, bus psrpc.MessageBus, ioClient info.IOClient) (*Server, error) {
pm := service.NewProcessManager()

s := &Server{
Expand Down Expand Up @@ -146,9 +147,9 @@ func (s *Server) Run() error {

logger.Infow("service ready")
<-s.shutdown.Watch()
logger.Infow("shutting down")

logger.Infow("draining")
s.Drain()
logger.Infow("service stopped")
return nil
}

Expand Down Expand Up @@ -191,6 +192,7 @@ func (s *Server) Drain() {
time.Sleep(time.Second)
}

logger.Infow("closing server")
s.psrpcServer.Shutdown()
logger.Infow("draining io client")
s.ioClient.Drain()
}
15 changes: 8 additions & 7 deletions pkg/server/server_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
return nil, err
}

_, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
if err != nil {
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
}

requestType, outputType := egress.GetTypes(p.Info.Request)
logger.Infow("request validated",
"egressID", req.EgressId,
Expand All @@ -87,6 +80,14 @@ func (s *Server) StartEgress(ctx context.Context, req *rpc.StartEgressRequest) (
return nil, err
}

_, err = s.ioClient.CreateEgress(ctx, (*livekit.EgressInfo)(p.Info))
if err != nil {
s.AbortProcess(req.EgressId, err)
s.monitor.EgressAborted(req)
s.activeRequests.Dec()
return nil, err
}

return (*livekit.EgressInfo)(p.Info), nil
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/service/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ func (pm *ProcessManager) KillAll() {
}
}

func (pm *ProcessManager) AbortProcess(egressID string, err error) {
pm.mu.RLock()
defer pm.mu.RUnlock()

if h, ok := pm.activeHandlers[egressID]; ok {
logger.Warnw("aborting egress", err, "egressID", egressID)
h.kill()
}
}

func (pm *ProcessManager) KillProcess(egressID string, maxUsage float64) {
pm.mu.RLock()
defer pm.mu.RUnlock()
Expand Down

0 comments on commit 15677e6

Please sign in to comment.