From f4b0acb5f8176570fff2bdf263ffb900e3095cb0 Mon Sep 17 00:00:00 2001 From: itzloop Date: Wed, 29 May 2024 11:21:28 +0330 Subject: [PATCH] Add 'DeqeuedAt' for better monitoring at hooked backends --- infra/link.pb.go | 4 +- infra/link_grpc.pb.go | 2 +- livekit/livekit_agent.pb.go | 4 +- livekit/livekit_analytics.pb.go | 4 +- livekit/livekit_analytics_grpc.pb.go | 2 +- livekit/livekit_egress.pb.go | 4 +- livekit/livekit_ingress.pb.go | 4 +- livekit/livekit_internal.pb.go | 4 +- livekit/livekit_models.pb.go | 4 +- livekit/livekit_room.pb.go | 4 +- livekit/livekit_rtc.pb.go | 4 +- livekit/livekit_sip.pb.go | 4 +- livekit/livekit_webhook.pb.go | 32 +++++++---- protobufs/livekit_webhook.proto | 4 +- rpc/agent.pb.go | 4 +- rpc/egress.pb.go | 4 +- rpc/ingress.pb.go | 4 +- rpc/io.pb.go | 4 +- rpc/keepalive.pb.go | 4 +- rpc/participant.pb.go | 4 +- rpc/room.pb.go | 4 +- rpc/signal.pb.go | 4 +- rpc/sip.pb.go | 4 +- webhook/webhook_test.go | 84 ++++++++++++++++++---------- 24 files changed, 120 insertions(+), 80 deletions(-) diff --git a/infra/link.pb.go b/infra/link.pb.go index e4989ff8..26df7f67 100644 --- a/infra/link.pb.go +++ b/infra/link.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: infra/link.proto package infra diff --git a/infra/link_grpc.pb.go b/infra/link_grpc.pb.go index a7d8284e..832b0d43 100644 --- a/infra/link_grpc.pb.go +++ b/infra/link_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc v4.24.3 // source: infra/link.proto package infra diff --git a/livekit/livekit_agent.pb.go b/livekit/livekit_agent.pb.go index 2f5559cc..e1f81d7c 100644 --- a/livekit/livekit_agent.pb.go +++ b/livekit/livekit_agent.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_agent.proto package livekit diff --git a/livekit/livekit_analytics.pb.go b/livekit/livekit_analytics.pb.go index a6f1dad2..f4b5d1f9 100644 --- a/livekit/livekit_analytics.pb.go +++ b/livekit/livekit_analytics.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_analytics.proto package livekit diff --git a/livekit/livekit_analytics_grpc.pb.go b/livekit/livekit_analytics_grpc.pb.go index d6e880c4..5c4d7720 100644 --- a/livekit/livekit_analytics_grpc.pb.go +++ b/livekit/livekit_analytics_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc v4.24.3 // source: livekit_analytics.proto package livekit diff --git a/livekit/livekit_egress.pb.go b/livekit/livekit_egress.pb.go index b2940a80..2a6b262f 100644 --- a/livekit/livekit_egress.pb.go +++ b/livekit/livekit_egress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_egress.proto package livekit diff --git a/livekit/livekit_ingress.pb.go b/livekit/livekit_ingress.pb.go index 622e2e44..e75330ab 100644 --- a/livekit/livekit_ingress.pb.go +++ b/livekit/livekit_ingress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_ingress.proto package livekit diff --git a/livekit/livekit_internal.pb.go b/livekit/livekit_internal.pb.go index a9c5e02e..076f6fea 100644 --- a/livekit/livekit_internal.pb.go +++ b/livekit/livekit_internal.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_internal.proto package livekit diff --git a/livekit/livekit_models.pb.go b/livekit/livekit_models.pb.go index ebc11715..6616009b 100644 --- a/livekit/livekit_models.pb.go +++ b/livekit/livekit_models.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_models.proto package livekit diff --git a/livekit/livekit_room.pb.go b/livekit/livekit_room.pb.go index 2102b265..edaaf1c5 100644 --- a/livekit/livekit_room.pb.go +++ b/livekit/livekit_room.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_room.proto package livekit diff --git a/livekit/livekit_rtc.pb.go b/livekit/livekit_rtc.pb.go index 0bd008bb..47781e7b 100644 --- a/livekit/livekit_rtc.pb.go +++ b/livekit/livekit_rtc.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_rtc.proto package livekit diff --git a/livekit/livekit_sip.pb.go b/livekit/livekit_sip.pb.go index 524d09d8..ed92cd74 100644 --- a/livekit/livekit_sip.pb.go +++ b/livekit/livekit_sip.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_sip.proto package livekit diff --git a/livekit/livekit_webhook.pb.go b/livekit/livekit_webhook.pb.go index fd12a721..d51e1c5b 100644 --- a/livekit/livekit_webhook.pb.go +++ b/livekit/livekit_webhook.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: livekit_webhook.proto package livekit @@ -56,6 +56,7 @@ type WebhookEvent struct { Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` // timestamp in seconds CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` + DequeuedAt int64 `protobuf:"varint,13,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` NumDropped int32 `protobuf:"varint,11,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` } @@ -147,6 +148,13 @@ func (x *WebhookEvent) GetCreatedAt() int64 { return 0 } +func (x *WebhookEvent) GetDequeuedAt() int64 { + if x != nil { + return x.DequeuedAt + } + return 0 +} + func (x *WebhookEvent) GetNumDropped() int32 { if x != nil { return x.NumDropped @@ -163,7 +171,7 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x14, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x65, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x15, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x5f, 0x69, 0x6e, 0x67, 0x72, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x22, 0xec, 0x02, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, + 0x6f, 0x74, 0x6f, 0x22, 0x8d, 0x03, 0x0a, 0x0c, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x04, 0x72, 0x6f, 0x6f, 0x6d, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x6b, @@ -184,14 +192,16 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x63, 0x6b, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, - 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, 0x70, - 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, - 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, 0x65, - 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, + 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, + 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, + 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, + 0x70, 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, + 0x6f, 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, + 0x65, 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, + 0x65, 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, } var ( diff --git a/protobufs/livekit_webhook.proto b/protobufs/livekit_webhook.proto index e13ef45e..d0460c2a 100644 --- a/protobufs/livekit_webhook.proto +++ b/protobufs/livekit_webhook.proto @@ -49,7 +49,9 @@ message WebhookEvent { // timestamp in seconds int64 created_at = 7; + int64 dequeued_at = 13; + int32 num_dropped = 11; - // NEXT_ID: 12 + // NEXT_ID: 13 } diff --git a/rpc/agent.pb.go b/rpc/agent.pb.go index 9367e256..bdf3ed4b 100644 --- a/rpc/agent.pb.go +++ b/rpc/agent.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/agent.proto package rpc diff --git a/rpc/egress.pb.go b/rpc/egress.pb.go index 7e14c985..aa442eed 100644 --- a/rpc/egress.pb.go +++ b/rpc/egress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/egress.proto package rpc diff --git a/rpc/ingress.pb.go b/rpc/ingress.pb.go index 65b66381..2021514b 100644 --- a/rpc/ingress.pb.go +++ b/rpc/ingress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/ingress.proto package rpc diff --git a/rpc/io.pb.go b/rpc/io.pb.go index 568ef68e..b99bdf60 100644 --- a/rpc/io.pb.go +++ b/rpc/io.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/io.proto package rpc diff --git a/rpc/keepalive.pb.go b/rpc/keepalive.pb.go index 62a08afa..8c2a356d 100644 --- a/rpc/keepalive.pb.go +++ b/rpc/keepalive.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/keepalive.proto package rpc diff --git a/rpc/participant.pb.go b/rpc/participant.pb.go index efb56ae1..9339f180 100644 --- a/rpc/participant.pb.go +++ b/rpc/participant.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/participant.proto package rpc diff --git a/rpc/room.pb.go b/rpc/room.pb.go index 6657f781..6e71fae8 100644 --- a/rpc/room.pb.go +++ b/rpc/room.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/room.proto package rpc diff --git a/rpc/signal.pb.go b/rpc/signal.pb.go index b7117098..35f35e17 100644 --- a/rpc/signal.pb.go +++ b/rpc/signal.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/signal.proto package rpc diff --git a/rpc/sip.pb.go b/rpc/sip.pb.go index 78585e74..0709bfb6 100644 --- a/rpc/sip.pb.go +++ b/rpc/sip.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v4.24.3 // source: rpc/sip.proto package rpc diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index 1ec111c2..c12a31ad 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -79,28 +79,55 @@ func TestURLNotifierDropped(t *testing.T) { require.NoError(t, s.Start()) defer s.Stop() - urlNotifier := newTestNotifier() - defer urlNotifier.Stop(true) - totalDropped := atomic.Int32{} - totalReceived := atomic.Int32{} - s.handler = func(r *http.Request) { - decodedEvent, err := ReceiveWebhookEvent(r, authProvider) - require.NoError(t, err) - totalReceived.Inc() - totalDropped.Add(decodedEvent.NumDropped) - } - // send multiple notifications - for i := 0; i < 10; i++ { - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) - _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) - } + t.Run("DropWhenFull = true", func(t *testing.T) { + urlNotifier := newTestNotifier(true) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } + + time.Sleep(webhookCheckInterval) - time.Sleep(webhookCheckInterval) + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Less(t, int32(0), totalDropped.Load()) + }) - require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) - // at least one request dropped - require.Less(t, int32(0), totalDropped.Load()) + t.Run("DropWhenFull = false", func(t *testing.T) { + urlNotifier := newTestNotifier(false) + defer urlNotifier.Stop(true) + totalDropped := atomic.Int32{} + totalReceived := atomic.Int32{} + s.handler = func(r *http.Request) { + decodedEvent, err := ReceiveWebhookEvent(r, authProvider) + require.NoError(t, err) + totalReceived.Inc() + totalDropped.Add(decodedEvent.NumDropped) + } + // send multiple notifications + for i := 0; i < 10; i++ { + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomStarted}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventParticipantJoined}) + _ = urlNotifier.QueueNotify(&livekit.WebhookEvent{Event: EventRoomFinished}) + } + + time.Sleep(webhookCheckInterval) + + require.Equal(t, int32(30), totalDropped.Load()+totalReceived.Load()) + // at least one request dropped + require.Equal(t, int32(0), totalDropped.Load()) + }) } func TestURLNotifierLifecycle(t *testing.T) { @@ -109,12 +136,12 @@ func TestURLNotifierLifecycle(t *testing.T) { defer s.Stop() t.Run("start/stop without use", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) urlNotifier.Stop(false) }) t.Run("stop allowing to drain", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -128,7 +155,7 @@ func TestURLNotifierLifecycle(t *testing.T) { }) t.Run("force stop", func(t *testing.T) { - urlNotifier := newTestNotifier() + urlNotifier := newTestNotifier(true) numCalled := atomic.Int32{} s.handler = func(r *http.Request) { numCalled.Inc() @@ -143,12 +170,13 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } -func newTestNotifier() *URLNotifier { +func newTestNotifier(dropWhenFull bool) *URLNotifier { return NewURLNotifier(URLNotifierParams{ - QueueSize: 20, - URL: testUrl, - APIKey: apiKey, - APISecret: apiSecret, + QueueSize: 20, + URL: testUrl, + APIKey: apiKey, + APISecret: apiSecret, + DropWhenFull: dropWhenFull, }) }