Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into benjamin/egress_image…
Browse files Browse the repository at this point in the history
…_output
  • Loading branch information
biglittlebigben committed Sep 27, 2023
2 parents d7401d2 + b6137b8 commit 4bcfcf7
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 80 deletions.
70 changes: 13 additions & 57 deletions livekit/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,36 @@

package livekit

// ----------------------------------------------------------------

type TrackID string

func StringsAsTrackIDs(trackIDs []string) []TrackID {
asTrackID := make([]TrackID, 0, len(trackIDs))
for _, trackID := range trackIDs {
asTrackID = append(asTrackID, TrackID(trackID))
}

return asTrackID
}

// ----------------------------------------------------------------

type ParticipantID string

func ParticipantIDsAsStrings(ids []ParticipantID) []string {
strs := make([]string, 0, len(ids))
for _, id := range ids {
strs = append(strs, string(id))
}
return strs
}

// ----------------------------------------------------------------

type ParticipantIdentity string
type ParticipantName string

type RoomID string

// ----------------------------------------------------------------

type RoomName string

func RoomNamesAsStrings(roomNames []RoomName) []string {
asString := make([]string, 0, len(roomNames))
for _, roomName := range roomNames {
asString = append(asString, string(roomName))
}

return asString
}

func StringsAsRoomNames(roomNames []string) []RoomName {
asRoomName := make([]RoomName, 0, len(roomNames))
for _, roomName := range roomNames {
asRoomName = append(asRoomName, RoomName(roomName))
}

return asRoomName
}

// ----------------------------------------------------------------

type ConnectionID string

// ----------------------------------------------------------------

type NodeID string
type ParticipantKey string

type stringTypes interface {
ParticipantID | RoomID | TrackID | ParticipantIdentity | ParticipantName | RoomName | ConnectionID | NodeID | ParticipantKey
}

func NodeIDsAsStrings(ids []NodeID) []string {
func IDsAsStrings[T stringTypes](ids []T) []string {
strs := make([]string, 0, len(ids))
for _, id := range ids {
strs = append(strs, string(id))
}
return strs
}

// ----------------------------------------------------------------
type ParticipantKey string
func StringsAsIDs[T stringTypes](ids []string) []T {
asID := make([]T, 0, len(ids))
for _, id := range ids {
asID = append(asID, T(id))
}

// ----------------------------------------------------------------
return asID
}

type Guid interface {
TrackID | ParticipantID | RoomID
Expand Down
4 changes: 2 additions & 2 deletions logger/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ package logger
import "sync"

type Config struct {
JSON bool `yaml:"json"`
Level string `yaml:"level"`
JSON bool `yaml:"json,omitempty"`
Level string `yaml:"level,omitempty"`
// true to enable log sampling, where the same log message and level will be throttled.
// we have two layers of sampling
// 1. global sampling - within a second, it will log the first SampleInitial, then every SampleInterval messages.
Expand Down
28 changes: 14 additions & 14 deletions redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import (
var ErrNotConfigured = errors.New("Redis is not configured")

type RedisConfig struct {
Address string `yaml:"address"`
Username string `yaml:"username"`
Password string `yaml:"password"`
DB int `yaml:"db"`
UseTLS bool `yaml:"use_tls"`
MasterName string `yaml:"sentinel_master_name"`
SentinelUsername string `yaml:"sentinel_username"`
SentinelPassword string `yaml:"sentinel_password"`
SentinelAddresses []string `yaml:"sentinel_addresses"`
ClusterAddresses []string `yaml:"cluster_addresses"`
DialTimeout int `yaml:"dial_timeout"`
ReadTimeout int `yaml:"read_timeout"`
WriteTimeout int `yaml:"write_timeout"`
Address string `yaml:"address,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
DB int `yaml:"db,omitempty"`
UseTLS bool `yaml:"use_tls,omitempty"`
MasterName string `yaml:"sentinel_master_name,omitempty"`
SentinelUsername string `yaml:"sentinel_username,omitempty"`
SentinelPassword string `yaml:"sentinel_password,omitempty"`
SentinelAddresses []string `yaml:"sentinel_addresses,omitempty"`
ClusterAddresses []string `yaml:"cluster_addresses,omitempty"`
DialTimeout int `yaml:"dial_timeout,omitempty"`
ReadTimeout int `yaml:"read_timeout,omitempty"`
WriteTimeout int `yaml:"write_timeout,omitempty"`
// for clustererd mode only, number of redirects to follow, defaults to 2
MaxRedirects *int `yaml:"max_redirects"`
MaxRedirects *int `yaml:"max_redirects,omitempty"`
}

func (r *RedisConfig) IsConfigured() bool {
Expand Down
4 changes: 3 additions & 1 deletion rpc/egress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func NewEgressClient(nodeID livekit.NodeID, bus psrpc.MessageBus) (EgressClient,
if !errors.As(err, &e) {
return true
}
return e.Code() == psrpc.DeadlineExceeded || e.Code() == psrpc.ResourceExhausted
return e.Code() == psrpc.DeadlineExceeded ||
e.Code() == psrpc.ResourceExhausted ||
e.Code() == psrpc.Unavailable
},
}))
if err != nil {
Expand Down
23 changes: 17 additions & 6 deletions utils/lock_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
var lockTrackerEnabled = false
var enableLockTrackerOnce sync.Once
var lowResTime uint32 = uint32(time.Now().Unix())
var enableLockTrackerStackTrace uint32

// EnableLockTracker enable lock tracking background worker. This should be
// called during init
Expand All @@ -36,6 +37,14 @@ func EnableLockTracker() {
})
}

func ToggleLockTrackerStackTraces(enable bool) {
var v uint32
if enable {
v = 1
}
atomic.StoreUint32(&enableLockTrackerStackTrace, v)
}

func updateLowResTime() {
ticker := time.NewTicker(time.Second)
for t := range ticker.C {
Expand Down Expand Up @@ -151,13 +160,15 @@ func (t *lockTracker) trackLock() {
if atomic.AddInt32(&t.held, 1) == 1 {
atomic.StoreUint32(&t.ts, atomic.LoadUint32(&lowResTime))

for {
n := runtime.Stack(t.stack[:cap(t.stack)], false)
if n < cap(t.stack) {
t.stack = t.stack[:n]
break
if atomic.LoadUint32(&enableLockTrackerStackTrace) == 1 {
for {
n := runtime.Stack(t.stack[:cap(t.stack)], false)
if n < cap(t.stack) {
t.stack = t.stack[:n]
break
}
t.stack = make([]byte, len(t.stack)*2)
}
t.stack = make([]byte, len(t.stack)*2)
}
}
}
Expand Down

0 comments on commit 4bcfcf7

Please sign in to comment.