Skip to content

Commit

Permalink
Merge pull request #2 from itzloop/itzloop/configurable-urlnotifier
Browse files Browse the repository at this point in the history
Itzloop/configurable urlnotifier
  • Loading branch information
itzloop authored May 29, 2024
2 parents c721a3e + 48efe47 commit 5bb3c92
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 49 deletions.
28 changes: 19 additions & 9 deletions livekit/livekit_webhook.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion protobufs/livekit_webhook.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ message WebhookEvent {
// timestamp in seconds
int64 created_at = 7;

int64 dequeued_at = 13;

int32 num_dropped = 11;

// NEXT_ID: 12
// NEXT_ID: 13
}
24 changes: 19 additions & 5 deletions webhook/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package webhook

import (
"context"
"github.com/livekit/protocol/logger"
"sync"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

type QueuedNotifier interface {
Expand All @@ -34,16 +34,30 @@ func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier
n := &DefaultNotifier{}
for _, url := range urls {
u := NewURLNotifier(URLNotifierParams{
URL: url,
Logger: logger.GetLogger().WithComponent("webhook"),
APIKey: apiKey,
APISecret: apiSecret,
URL: url,
Logger: logger.GetLogger().WithComponent("webhook"),
APIKey: apiKey,
APISecret: apiSecret,
DropWhenFull: true,
})
n.urlNotifiers = append(n.urlNotifiers, u)
}
return n
}

func NewDefaultNotifierByParams(params []URLNotifierParams) QueuedNotifier {
n := &DefaultNotifier{}
for _, p := range params {
if p.Logger == nil {
p.Logger = logger.GetLogger().WithComponent("webhook")
}

u := NewURLNotifier(p)
n.urlNotifiers = append(n.urlNotifiers, u)
}
return n
}

func (n *DefaultNotifier) Stop(force bool) {
wg := sync.WaitGroup{}
for _, u := range n.urlNotifiers {
Expand Down
13 changes: 7 additions & 6 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
)

type URLNotifierParams struct {
Logger logger.Logger
QueueSize int
URL string
APIKey string
APISecret string
Logger logger.Logger
QueueSize int
DropWhenFull bool
URL string
APIKey string
APISecret string
}

const defaultQueueSize = 100
Expand Down Expand Up @@ -66,7 +67,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
n.client.Logger = &logAdapter{}
n.worker = core.NewQueueWorker(core.QueueWorkerParams{
QueueSize: params.QueueSize,
DropWhenFull: true,
DropWhenFull: params.DropWhenFull,
OnDropped: func() { n.dropped.Inc() },
})
return n
Expand Down
84 changes: 56 additions & 28 deletions webhook/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,28 +79,55 @@ func TestURLNotifierDropped(t *testing.T) {
require.NoError(t, s.Start())
defer s.Stop()

urlNotifier := newTestNotifier()
defer urlNotifier.Stop(true)
totalDropped := atomic.Int32{}
totalReceived := atomic.Int32{}
s.handler = func(r *http.Request) {
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
require.NoError(t, err)
totalReceived.Inc()
totalDropped.Add(decodedEvent.NumDropped)
}
// send multiple notifications
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
}
t.Run("DropWhenFull = true", func(t *testing.T) {
urlNotifier := newTestNotifier(true)
defer urlNotifier.Stop(true)
totalDropped := atomic.Int32{}
totalReceived := atomic.Int32{}
s.handler = func(r *http.Request) {
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
require.NoError(t, err)
totalReceived.Inc()
totalDropped.Add(decodedEvent.NumDropped)
}
// send multiple notifications
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
}

time.Sleep(webhookCheckInterval)

time.Sleep(webhookCheckInterval)
require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
// at least one request dropped
require.Less(t, int32(0), totalDropped.Load())
})

require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
// at least one request dropped
require.Less(t, int32(0), totalDropped.Load())
t.Run("DropWhenFull = false", func(t *testing.T) {
urlNotifier := newTestNotifier(false)
defer urlNotifier.Stop(true)
totalDropped := atomic.Int32{}
totalReceived := atomic.Int32{}
s.handler = func(r *http.Request) {
decodedEvent, err := ReceiveWebhookEvent(r, authProvider)
require.NoError(t, err)
totalReceived.Inc()
totalDropped.Add(decodedEvent.NumDropped)
}
// send multiple notifications
for i := 0; i < 10; i++ {
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined})
_ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished})
}

time.Sleep(webhookCheckInterval)

require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load())
// at least one request dropped
require.Equal(t, int32(0), totalDropped.Load())
})
}

func TestURLNotifierLifecycle(t *testing.T) {
Expand All @@ -109,12 +136,12 @@ func TestURLNotifierLifecycle(t *testing.T) {
defer s.Stop()

t.Run("start/stop without use", func(t *testing.T) {
urlNotifier := newTestNotifier()
urlNotifier := newTestNotifier(true)
urlNotifier.Stop(false)
})

t.Run("stop allowing to drain", func(t *testing.T) {
urlNotifier := newTestNotifier()
urlNotifier := newTestNotifier(true)
numCalled := atomic.Int32{}
s.handler = func(r *http.Request) {
numCalled.Inc()
Expand All @@ -128,7 +155,7 @@ func TestURLNotifierLifecycle(t *testing.T) {
})

t.Run("force stop", func(t *testing.T) {
urlNotifier := newTestNotifier()
urlNotifier := newTestNotifier(true)
numCalled := atomic.Int32{}
s.handler = func(r *http.Request) {
numCalled.Inc()
Expand All @@ -143,12 +170,13 @@ func TestURLNotifierLifecycle(t *testing.T) {
})
}

func newTestNotifier() *URLNotifier {
func newTestNotifier(dropWhenFull bool) *URLNotifier {
return NewURLNotifier(URLNotifierParams{
QueueSize: 20,
URL: testUrl,
APIKey: apiKey,
APISecret: apiSecret,
QueueSize: 20,
URL: testUrl,
APIKey: apiKey,
APISecret: apiSecret,
DropWhenFull: dropWhenFull,
})
}

Expand Down

0 comments on commit 5bb3c92

Please sign in to comment.