Skip to content

Commit

Permalink
Refactor configuration module and support updating the service config…
Browse files Browse the repository at this point in the history
…uration (#174)
  • Loading branch information
biglittlebigben authored Dec 1, 2023
1 parent 83ab7bc commit 006a24c
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 34 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/pion/interceptor v0.1.25
github.com/pion/rtcp v1.2.12
github.com/pion/rtp v1.8.3
github.com/pion/rtp/v2 v2.0.0
github.com/pion/webrtc/v3 v3.2.23
github.com/prometheus/client_golang v1.17.0
github.com/sirupsen/logrus v1.9.3
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9
github.com/pion/rtp v1.8.2/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.3 h1:VEHxqzSVQxCkKDSHro5/4IUUG1ea+MFdqR2R3xSpNU8=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp/v2 v2.0.0/go.mod h1:Vj+rrFbJCT3yxqE/VSwaOo9DQ2pMKGPxuE7hplGOlOs=
github.com/pion/sctp v1.8.5/go.mod h1:SUFFfDpViyKejTAdwD1d/HQsCu+V/40cCs2nZIvC3s0=
github.com/pion/sctp v1.8.8/go.mod h1:igF9nZBrjh5AtmKc7U30jXltsFHicFCXSmWA2GWRaWs=
github.com/pion/sctp v1.8.9 h1:TP5ZVxV5J7rz7uZmbyvnUvsn7EJ2x/5q9uhsTtXbI3g=
Expand Down
49 changes: 34 additions & 15 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ var (
)

type Config struct {
*ServiceConfig `yaml:",inline"`
*InternalConfig `yaml:",inline"`
}

type ServiceConfig struct {
Redis *redis.RedisConfig `yaml:"redis"` // required
ApiKey string `yaml:"api_key"` // required (env LIVEKIT_API_KEY)
ApiSecret string `yaml:"api_secret"` // required (env LIVEKIT_API_SECRET)
Expand All @@ -59,10 +64,12 @@ type Config struct {

// CPU costs for various ingress types
CPUCost CPUCostConfig `yaml:"cpu_cost"`
}

type InternalConfig struct {
// internal
ServiceName string `yaml:"-"`
NodeID string // Do not provide, will be overwritten
ServiceName string `yaml:"service_name"`
NodeID string `yaml:"node_id"` // Do not provide, will be overwritten
}

type CPUCostConfig struct {
Expand All @@ -74,10 +81,14 @@ type CPUCostConfig struct {

func NewConfig(confString string) (*Config, error) {
conf := &Config{
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
ServiceName: "ingress",
ServiceConfig: &ServiceConfig{
ApiKey: os.Getenv("LIVEKIT_API_KEY"),
ApiSecret: os.Getenv("LIVEKIT_API_SECRET"),
WsUrl: os.Getenv("LIVEKIT_WS_URL"),
},
InternalConfig: &InternalConfig{
ServiceName: "ingress",
},
}
if confString != "" {
if err := yaml.Unmarshal([]byte(confString), conf); err != nil {
Expand All @@ -91,10 +102,7 @@ func NewConfig(confString string) (*Config, error) {

return conf, nil
}

func (conf *Config) Init() error {
conf.NodeID = utils.NewGuid("NE_")

func (conf *ServiceConfig) InitDefaults() error {
if conf.RTMPPort == 0 {
conf.RTMPPort = DefaultRTMPPort
}
Expand All @@ -110,14 +118,10 @@ func (conf *Config) Init() error {
return err
}

if err := conf.InitLogger(); err != nil {
return err
}

return nil
}

func (c *Config) InitWhipConf() error {
func (c *ServiceConfig) InitWhipConf() error {
if c.WHIPPort <= 0 {
return nil
}
Expand All @@ -135,6 +139,21 @@ func (c *Config) InitWhipConf() error {
return nil
}

func (conf *Config) Init() error {
conf.NodeID = utils.NewGuid("NE_")

err := conf.InitDefaults()
if err != nil {
return err
}

if err := conf.InitLogger(); err != nil {
return err
}

return nil
}

func (c *Config) InitLogger(values ...interface{}) error {
zl, err := logger.NewZapLogger(&c.Logging)
if err != nil {
Expand Down
9 changes: 3 additions & 6 deletions pkg/service/process_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"gopkg.in/yaml.v3"

"github.com/frostbyte73/core"
"github.com/livekit/ingress/pkg/config"
"github.com/livekit/ingress/pkg/ipc"
"github.com/livekit/ingress/pkg/params"
"github.com/livekit/ingress/pkg/types"
Expand All @@ -49,17 +48,15 @@ type process struct {
}

type ProcessManager struct {
conf *config.Config
sm *SessionManager
sm *SessionManager

mu sync.RWMutex
activeHandlers map[string]*process
onFatal func(info *livekit.IngressInfo, err error)
}

func NewProcessManager(conf *config.Config, sm *SessionManager) *ProcessManager {
func NewProcessManager(sm *SessionManager) *ProcessManager {
return &ProcessManager{
conf: conf,
sm: sm,
activeHandlers: make(map[string]*process),
}
Expand All @@ -74,7 +71,7 @@ func (s *ProcessManager) launchHandler(ctx context.Context, p *params.Params) er
_, span := tracer.Start(ctx, "Service.launchHandler")
defer span.End()

confString, err := yaml.Marshal(s.conf)
confString, err := yaml.Marshal(p.Config)
if err != nil {
span.RecordError(err)
logger.Errorw("could not marshal config", err)
Expand Down
52 changes: 44 additions & 8 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"net"
"net/http"
"sync"
"sync/atomic"
"time"

"github.com/frostbyte73/core"
Expand Down Expand Up @@ -59,7 +61,9 @@ type publishResponse struct {
}

type Service struct {
conf *config.Config
confLock sync.Mutex
conf *config.Config

monitor *stats.Monitor
manager *ProcessManager
sm *SessionManager
Expand All @@ -71,7 +75,9 @@ type Service struct {
promServer *http.Server

publishRequests chan publishRequest
shutdown core.Fuse

acceptNewRequests atomic.Bool
shutdown core.Fuse
}

func NewService(conf *config.Config, psrpcClient rpc.IOInfoClient, bus psrpc.MessageBus, whipSrv *whip.WHIPServer) *Service {
Expand All @@ -82,14 +88,16 @@ func NewService(conf *config.Config, psrpcClient rpc.IOInfoClient, bus psrpc.Mes
conf: conf,
monitor: monitor,
sm: sm,
manager: NewProcessManager(conf, sm),
manager: NewProcessManager(sm),
whipSrv: whipSrv,
psrpcClient: psrpcClient,
bus: bus,
publishRequests: make(chan publishRequest, 5),
shutdown: core.NewFuse(),
}

s.acceptNewRequests.Store(true)

s.manager.onFatalError(func(info *livekit.IngressInfo, err error) {
s.sendUpdate(context.Background(), info, err)

Expand Down Expand Up @@ -278,12 +286,16 @@ func (s *Service) handleNewPublisher(ctx context.Context, resourceId string, inp
ResourceId: resourceId,
}

s.confLock.Lock()
conf := s.conf
s.confLock.Unlock()

if wsUrl == "" {
wsUrl = s.conf.WsUrl
wsUrl = conf.WsUrl
}

// This validates the ingress info
p, err := params.GetParams(ctx, s.psrpcClient, s.conf, info, wsUrl, token, nil)
p, err := params.GetParams(ctx, s.psrpcClient, conf, info, wsUrl, token, nil)
if err != nil {
return nil, err
}
Expand All @@ -301,6 +313,18 @@ func (s *Service) handleNewPublisher(ctx context.Context, resourceId string, inp
return p, nil
}

func (s *Service) UpdateConfig(conf *config.Config) {
s.confLock.Lock()
defer s.confLock.Unlock()

s.conf = conf

err := s.monitor.UpdateCostConfig(&conf.CPUCost)
if err != nil {
logger.Errorw("monitor cost config validation failed", err)
}
}

func (s *Service) Run() error {
logger.Debugw("starting service", "version", version.Version)

Expand All @@ -314,7 +338,11 @@ func (s *Service) Run() error {
}()
}

if err := s.monitor.Start(s.conf); err != nil {
s.confLock.Lock()
conf := s.conf
s.confLock.Unlock()

if err := s.monitor.Start(conf); err != nil {
return err
}

Expand Down Expand Up @@ -386,6 +414,14 @@ func (s *Service) Run() error {
}
}

func (s *Service) Pause() {
s.acceptNewRequests.Store(false)
}

func (s *Service) Resume() {
s.acceptNewRequests.Store(true)
}

func (s *Service) sendUpdate(ctx context.Context, info *livekit.IngressInfo, err error) {
var state *livekit.IngressState
if info == nil {
Expand All @@ -411,7 +447,7 @@ func (s *Service) sendUpdate(ctx context.Context, info *livekit.IngressInfo, err
}

func (s *Service) CanAccept() bool {
return s.monitor.CanAccept()
return s.acceptNewRequests.Load() && s.monitor.CanAccept()
}

func (s *Service) Stop(kill bool) {
Expand Down Expand Up @@ -441,7 +477,7 @@ func (s *Service) StartIngress(ctx context.Context, req *rpc.StartIngressRequest
}

func (s *Service) StartIngressAffinity(ctx context.Context, req *rpc.StartIngressRequest) float32 {
if !s.monitor.CanAcceptIngress(req.Info) {
if !s.acceptNewRequests.Load() || !s.monitor.CanAcceptIngress(req.Info) {
return -1
}

Expand Down
27 changes: 25 additions & 2 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package stats
import (
"fmt"
"sort"
"sync"
"time"

"github.com/frostbyte73/core"
Expand All @@ -35,8 +36,9 @@ const (
)

type Monitor struct {
cpuCostConfig config.CPUCostConfig
maxCost float64
costConfigLock sync.Mutex
cpuCostConfig config.CPUCostConfig
maxCost float64

promCPULoad prometheus.Gauge
requestGauge *prometheus.GaugeVec
Expand Down Expand Up @@ -103,6 +105,21 @@ func (m *Monitor) Start(conf *config.Config) error {
return nil
}

func (m *Monitor) UpdateCostConfig(cpuCostConfig *config.CPUCostConfig) error {
m.costConfigLock.Lock()
defer m.costConfigLock.Unlock()

// No change
if m.cpuCostConfig == *cpuCostConfig {
return nil
}

// Update config, but return an error if validation fails
m.cpuCostConfig = *cpuCostConfig

return m.checkCPUConfig()
}

// Server is shutting down, but may stay up for some time for draining
func (m *Monitor) Shutdown() {
m.shutdown.Break()
Expand Down Expand Up @@ -197,6 +214,9 @@ func (m *Monitor) CanAccept() bool {
return false
}

m.costConfigLock.Lock()
defer m.costConfigLock.Unlock()

return m.getAvailable() > m.maxCost
}

Expand All @@ -209,6 +229,9 @@ func (m *Monitor) canAcceptIngress(info *livekit.IngressInfo) (bool, float64, fl
var accept bool
available := m.getAvailable()

m.costConfigLock.Lock()
defer m.costConfigLock.Unlock()

switch info.InputType {
case livekit.IngressInput_RTMP_INPUT:
accept = available > m.cpuCostConfig.RTMPCpuCost
Expand Down
7 changes: 6 additions & 1 deletion test/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,12 @@ func (s *ioServer) GetSIPTrunkAuthentication(context.Context, *rpc.GetSIPTrunkAu
}

func GetDefaultConfig(t *testing.T) *TestConfig {
tc := &TestConfig{Config: &config.Config{}}
tc := &TestConfig{
Config: &config.Config{
ServiceConfig: &config.ServiceConfig{},
InternalConfig: &config.InternalConfig{},
},
}
// Defaults
tc.RTMPPort = 1935
tc.HTTPRelayPort = 9090
Expand Down

0 comments on commit 006a24c

Please sign in to comment.