diff --git a/livekit/livekit_webhook.pb.go b/livekit/livekit_webhook.pb.go index fd12a721..34027785 100644 --- a/livekit/livekit_webhook.pb.go +++ b/livekit/livekit_webhook.pb.go @@ -56,6 +56,7 @@ type WebhookEvent struct { Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` // timestamp in seconds CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + DequeuedAt int64 `protobuf:"varint,13,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` NumDropped int32 `protobuf:"varint,11,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` } @@ -147,6 +148,13 @@ func (x *WebhookEvent) GetCreatedAt() int64 { return 0 } +func (x *WebhookEvent) GetDequeuedAt() int64 { + if x != nil { + return x.DequeuedAt + } + return 0 +} + func (x *WebhookEvent) GetNumDropped() int32 { if x != nil { return x.NumDropped @@ -163,7 +171,7 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0xec, 0x02, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, + 0x6f, 0x74, 0x6f, 0x22, 0x8d, 0x03, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, @@ -184,14 +192,16 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, - 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, 0x70, - 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, + 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, + 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, + 0x70, 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, + 0x65, 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, + 0x65, 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/protobufs/livekit_webhook.proto b/protobufs/livekit_webhook.proto index e13ef45e..d0460c2a 100644 --- a/protobufs/livekit_webhook.proto +++ b/protobufs/livekit_webhook.proto @@ -49,7 +49,9 @@ message WebhookEvent { // timestamp in seconds int64 created_at = 7; + int64 dequeued_at = 13; + int32 num_dropped = 11; - // NEXT_ID: 12 + // NEXT_ID: 13 } diff --git a/webhook/notifier.go b/webhook/notifier.go index da1a91de..719ad7b0 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -16,10 +16,10 @@ package webhook import ( "context" + "github.com/livekit/protocol/logger" "sync" "github.com/livekit/protocol/livekit" - "github.com/livekit/protocol/logger" ) type QueuedNotifier interface { @@ -34,16 +34,30 @@ func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier n := &DefaultNotifier{} for _, url := range urls { u := NewURLNotifier(URLNotifierParams{ - URL: url, - Logger: logger.GetLogger().WithComponent("webhook"), - APIKey: apiKey, - APISecret: apiSecret, + URL: url, + Logger: logger.GetLogger().WithComponent("webhook"), + APIKey: apiKey, + APISecret: apiSecret, + DropWhenFull: true, }) n.urlNotifiers = append(n.urlNotifiers, u) } return n } +func NewDefaultNotifierByParams(params []URLNotifierParams) QueuedNotifier { + n := &DefaultNotifier{} + for _, p := range params { + if p.Logger == nil { + p.Logger = logger.GetLogger().WithComponent("webhook") + } + + u := NewURLNotifier(p) + n.urlNotifiers = append(n.urlNotifiers, u) + } + return n +} + func (n *DefaultNotifier) Stop(force bool) { wg := sync.WaitGroup{} for _, u := range n.urlNotifiers { diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index ad956f8d..05d8cb00 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -32,11 +32,12 @@ import ( ) type URLNotifierParams struct { - Logger logger.Logger - QueueSize int - URL string - APIKey string - APISecret string + Logger logger.Logger + QueueSize int + DropWhenFull bool + URL string + APIKey string + APISecret string } const defaultQueueSize = 100 @@ -66,7 +67,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { n.client.Logger = &logAdapter{} n.worker = core.NewQueueWorker(core.QueueWorkerParams{ QueueSize: params.QueueSize, - DropWhenFull: true, + DropWhenFull: params.DropWhenFull, OnDropped: func() { n.dropped.Inc() }, }) return n diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 1ec111c2..c12a31ad 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -79,28 +79,55 @@ func TestURLNotifierDropped(t *testing.T) { require.NoError(t, s.Start()) defer s.Stop() - urlNotifier := newTestNotifier() - defer urlNotifier.Stop(true) - totalDropped := atomic.Int32{} - totalReceived := atomic.Int32{} - s.handler = func(r *http.Request) { - decodedEvent, err := ReceiveWebhookEvent(r, authProvider) - require.NoError(t, err) - totalReceived.Inc() - totalDropped.Add(decodedEvent.NumDropped) - } - // send multiple notifications - for i := 0; i < 10; i++ { - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) - } + t.Run("DropWhenFull = true", func(t *testing.T) { + urlNotifier := newTestNotifier(true) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } + + time.Sleep(webhookCheckInterval) - time.Sleep(webhookCheckInterval) + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Less(t, int32(0), totalDropped.Load()) + }) - require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) - // at least one request dropped - require.Less(t, int32(0), totalDropped.Load()) + t.Run("DropWhenFull = false", func(t *testing.T) { + urlNotifier := newTestNotifier(false) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } + + time.Sleep(webhookCheckInterval) + + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Equal(t, int32(0), totalDropped.Load()) + }) } func TestURLNotifierLifecycle(t *testing.T) { @@ -109,12 +136,12 @@ func TestURLNotifierLifecycle(t *testing.T) { defer s.Stop() t.Run("start/stop without use", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) urlNotifier.Stop(false) }) t.Run("stop allowing to drain", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -128,7 +155,7 @@ func TestURLNotifierLifecycle(t *testing.T) { }) t.Run("force stop", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -143,12 +170,13 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } -func newTestNotifier() *URLNotifier { +func newTestNotifier(dropWhenFull bool) *URLNotifier { return NewURLNotifier(URLNotifierParams{ - QueueSize: 20, - URL: testUrl, - APIKey: apiKey, - APISecret: apiSecret, + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + DropWhenFull: dropWhenFull, }) }