diff --git a/livekit/types_test b/livekit/types_test deleted file mode 100644 index e4d82d0d..00000000 --- a/livekit/types_test +++ /dev/null @@ -1,2 +0,0 @@ -func TestUnmarshallRoomEgress(t testing.T) { -} diff --git a/logger/logger.go b/logger/logger.go index 6dbde34b..cc2f4f5f 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -379,7 +379,7 @@ func (l *zapLogger[T]) WithoutSampler() Logger { func (l *zapLogger[T]) WithDeferredValues() (Logger, DeferredFieldResolver) { dup := *l def, resolve := zaputil.NewDeferrer() - dup.deferred = append(dup.deferred[0:len(dup.deferred):len(dup.deferred)], def) + dup.deferred = append(dup.deferred, def) dup.zap = dup.ToZap() return &dup, resolve } diff --git a/logger/slog.go b/logger/slog.go index ed567343..6190b2c2 100644 --- a/logger/slog.go +++ b/logger/slog.go @@ -27,11 +27,11 @@ func ToSlogHandler(log Logger) slog.Handler { type slogDiscard struct{} -func (_ slogDiscard) Enabled(ctx context.Context, level slog.Level) bool { +func (slogDiscard) Enabled(ctx context.Context, level slog.Level) bool { return false } -func (_ slogDiscard) Handle(ctx context.Context, record slog.Record) error { +func (slogDiscard) Handle(ctx context.Context, record slog.Record) error { return nil } diff --git a/utils/messaging.go b/utils/messaging.go deleted file mode 100644 index c2beb2cb..00000000 --- a/utils/messaging.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -import ( - "context" - "crypto/sha256" - "encoding/base64" - "math/rand" - "time" - - "github.com/eapache/channels" - "github.com/prometheus/client_golang/prometheus" - "github.com/redis/go-redis/v9" - "google.golang.org/protobuf/proto" -) - -const lockExpiration = time.Second * 5 - -var ( - PromMessageBusCounter = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: "livekit", - Subsystem: "messagebus", - Name: "messages", - }, - []string{"type", "status"}, - ) -) - -func init() { - prometheus.MustRegister(PromMessageBusCounter) -} - -type MessageBus interface { - Subscribe(ctx context.Context, channel string) (PubSub, error) - // SubscribeQueue is like subscribe, but ensuring only a single instance gets to process the message - SubscribeQueue(ctx context.Context, channel string) (PubSub, error) - Publish(ctx context.Context, channel string, msg proto.Message) error -} - -type PubSub interface { - Channel() <-chan interface{} - Payload(msg interface{}) []byte - Close() error -} - -type RedisMessageBus struct { - rc redis.UniversalClient -} - -func NewRedisMessageBus(rc redis.UniversalClient) MessageBus { - return &RedisMessageBus{rc: rc} -} - -func (r *RedisMessageBus) Lock(ctx context.Context, key string, expiration time.Duration) (bool, error) { - return r.rc.SetNX(ctx, key, rand.Int(), expiration).Result() -} - -func (r *RedisMessageBus) Subscribe(ctx context.Context, channel string) (PubSub, error) { - ps := r.rc.Subscribe(ctx, channel) - return &RedisPubSub{ - ps: ps, - c: channels.Wrap(ps.Channel()).Out(), - done: make(chan struct{}, 1), - }, nil -} - -func (r *RedisMessageBus) SubscribeQueue(ctx context.Context, channel string) (PubSub, error) { - sub := r.rc.Subscribe(ctx, channel) - c := make(chan *redis.Message, 100) // same chan size as redis pubsub - ps := &RedisPubSub{ - ps: sub, - c: channels.Wrap(c).Out(), - done: make(chan struct{}, 1), - } - - go func() { - for { - select { - case <-ps.done: - return - case msg := <-sub.Channel(): - sha := sha256.Sum256([]byte(msg.Payload)) - hash := base64.StdEncoding.EncodeToString(sha[:]) - acquired, _ := r.Lock(ctx, hash, lockExpiration) - if acquired { - PromMessageBusCounter.WithLabelValues("in", "success").Add(1) - c <- msg - } - } - } - }() - - return ps, nil -} - -func (r *RedisMessageBus) Publish(ctx context.Context, channel string, msg proto.Message) error { - b, err := proto.Marshal(msg) - if err != nil { - PromMessageBusCounter.WithLabelValues("out", "failure").Add(1) - return err - } - - err = r.rc.Publish(ctx, channel, b).Err() - if err == nil { - PromMessageBusCounter.WithLabelValues("out", "success").Add(1) - } else { - PromMessageBusCounter.WithLabelValues("out", "failure").Add(1) - } - - return err -} - -type RedisPubSub struct { - ps *redis.PubSub - c <-chan interface{} - done chan struct{} -} - -func (r *RedisPubSub) Channel() <-chan interface{} { - return r.c -} - -func (r *RedisPubSub) Payload(msg interface{}) []byte { - return []byte(msg.(*redis.Message).Payload) -} - -func (r *RedisPubSub) Close() error { - r.done <- struct{}{} - return r.ps.Close() -} diff --git a/utils/must.go b/utils/must.go deleted file mode 100644 index 3f4eb61f..00000000 --- a/utils/must.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2023 LiveKit, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package utils - -func Must[T any](v T, err error) T { - if err != nil { - panic(err) - } - return v -} diff --git a/utils/options/options.go b/utils/options/options.go index b2ef975b..d9d8bb73 100644 --- a/utils/options/options.go +++ b/utils/options/options.go @@ -20,3 +20,13 @@ func Apply[T any, F ~func(T)](o T, opts []F) T { } return o } + +func Make[T any, F ~func(*T)](opts []F) T { + var o T + Apply(&o, opts) + return o +} + +func New[T any, F ~func(*T)](opts []F) *T { + return Apply(new(T), opts) +} diff --git a/utils/protoproxy.go b/utils/protoproxy.go index dc1748d2..cb12c839 100644 --- a/utils/protoproxy.go +++ b/utils/protoproxy.go @@ -86,7 +86,7 @@ func (p *ProtoProxy[T]) Updated() <-chan struct{} { func (p *ProtoProxy[T]) Get() T { p.lock.RLock() defer p.lock.RUnlock() - return proto.Clone(p.message).(T) + return CloneProto(p.message) } func (p *ProtoProxy[T]) Stop() {