From 282dfd8833525da2578c7f9dbc85511d0e31538e Mon Sep 17 00:00:00 2001 From: Mahdi <80265960+Mahdi-zarei@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:32:26 +0330 Subject: [PATCH] =?UTF-8?q?feat:=20allow=20for=20custom=20URLNotifier=20an?= =?UTF-8?q?d=20add=20batched=20events=20protobuf=20de=E2=80=A6=20(#3)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: allow for custom URLNotifier and add batched events protobuf definition * feat: Implement batch sender --- infra/link.pb.go | 4 +- infra/link_grpc.pb.go | 2 +- livekit/livekit_agent.pb.go | 4 +- livekit/livekit_analytics.pb.go | 4 +- livekit/livekit_analytics_grpc.pb.go | 2 +- livekit/livekit_egress.pb.go | 4 +- livekit/livekit_ingress.pb.go | 4 +- livekit/livekit_internal.pb.go | 4 +- livekit/livekit_models.pb.go | 4 +- livekit/livekit_room.pb.go | 4 +- livekit/livekit_rtc.pb.go | 4 +- livekit/livekit_sip.pb.go | 4 +- livekit/livekit_webhook.pb.go | 141 +++++++++++++++++++----- protobufs/livekit_webhook.proto | 8 ++ rpc/agent.pb.go | 4 +- rpc/egress.pb.go | 4 +- rpc/ingress.pb.go | 4 +- rpc/io.pb.go | 4 +- rpc/keepalive.pb.go | 4 +- rpc/participant.pb.go | 4 +- rpc/room.pb.go | 4 +- rpc/signal.pb.go | 4 +- rpc/sip.pb.go | 4 +- webhook/batch_url_notifier.go | 155 +++++++++++++++++++++++++++ webhook/notifier.go | 23 +++- webhook/url_notifier.go | 22 ++-- webhook/verifier.go | 18 ++++ webhook/webhook_test.go | 45 +++++++- 28 files changed, 410 insertions(+), 82 deletions(-) create mode 100644 webhook/batch_url_notifier.go diff --git a/infra/link.pb.go b/infra/link.pb.go index e4989ff84..eadf33e55 100644 --- a/infra/link.pb.go +++ b/infra/link.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: infra/link.proto package infra diff --git a/infra/link_grpc.pb.go b/infra/link_grpc.pb.go index a7d8284ed..ade868166 100644 --- a/infra/link_grpc.pb.go +++ b/infra/link_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc v5.26.1 // source: infra/link.proto package infra diff --git a/livekit/livekit_agent.pb.go b/livekit/livekit_agent.pb.go index 2f5559cc6..25260173d 100644 --- a/livekit/livekit_agent.pb.go +++ b/livekit/livekit_agent.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_agent.proto package livekit diff --git a/livekit/livekit_analytics.pb.go b/livekit/livekit_analytics.pb.go index a6f1dad27..444251a9d 100644 --- a/livekit/livekit_analytics.pb.go +++ b/livekit/livekit_analytics.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_analytics.proto package livekit diff --git a/livekit/livekit_analytics_grpc.pb.go b/livekit/livekit_analytics_grpc.pb.go index d6e880c49..9c4aaeec4 100644 --- a/livekit/livekit_analytics_grpc.pb.go +++ b/livekit/livekit_analytics_grpc.pb.go @@ -15,7 +15,7 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: // - protoc-gen-go-grpc v1.3.0 -// - protoc v4.23.4 +// - protoc v5.26.1 // source: livekit_analytics.proto package livekit diff --git a/livekit/livekit_egress.pb.go b/livekit/livekit_egress.pb.go index b2940a804..5f0cac47f 100644 --- a/livekit/livekit_egress.pb.go +++ b/livekit/livekit_egress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_egress.proto package livekit diff --git a/livekit/livekit_ingress.pb.go b/livekit/livekit_ingress.pb.go index 622e2e442..713bd4946 100644 --- a/livekit/livekit_ingress.pb.go +++ b/livekit/livekit_ingress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_ingress.proto package livekit diff --git a/livekit/livekit_internal.pb.go b/livekit/livekit_internal.pb.go index a9c5e02e6..ccd1ea5c7 100644 --- a/livekit/livekit_internal.pb.go +++ b/livekit/livekit_internal.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_internal.proto package livekit diff --git a/livekit/livekit_models.pb.go b/livekit/livekit_models.pb.go index ebc117153..6dd4634a9 100644 --- a/livekit/livekit_models.pb.go +++ b/livekit/livekit_models.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_models.proto package livekit diff --git a/livekit/livekit_room.pb.go b/livekit/livekit_room.pb.go index 2102b2659..3cb7260b1 100644 --- a/livekit/livekit_room.pb.go +++ b/livekit/livekit_room.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_room.proto package livekit diff --git a/livekit/livekit_rtc.pb.go b/livekit/livekit_rtc.pb.go index 0bd008bb6..8afc19db6 100644 --- a/livekit/livekit_rtc.pb.go +++ b/livekit/livekit_rtc.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_rtc.proto package livekit diff --git a/livekit/livekit_sip.pb.go b/livekit/livekit_sip.pb.go index 524d09d8b..374ec3e7a 100644 --- a/livekit/livekit_sip.pb.go +++ b/livekit/livekit_sip.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_sip.proto package livekit diff --git a/livekit/livekit_webhook.pb.go b/livekit/livekit_webhook.pb.go index 340277857..96a678fa0 100644 --- a/livekit/livekit_webhook.pb.go +++ b/livekit/livekit_webhook.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: livekit_webhook.proto package livekit @@ -56,7 +56,7 @@ type WebhookEvent struct { Id string `protobuf:"bytes,6,opt,name=id,proto3" json:"id,omitempty"` // timestamp in seconds CreatedAt int64 `protobuf:"varint,7,opt,name=created_at,json=createdAt,proto3" json:"created_at,omitempty"` - DequeuedAt int64 `protobuf:"varint,13,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` + DequeuedAt int64 `protobuf:"varint,12,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` NumDropped int32 `protobuf:"varint,11,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` } @@ -162,6 +162,69 @@ func (x *WebhookEvent) GetNumDropped() int32 { return 0 } +type BatchedWebhookEvents struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Events []*WebhookEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` + NumDropped int32 `protobuf:"varint,2,opt,name=num_dropped,json=numDropped,proto3" json:"num_dropped,omitempty"` + DequeuedAt int64 `protobuf:"varint,3,opt,name=dequeued_at,json=dequeuedAt,proto3" json:"dequeued_at,omitempty"` +} + +func (x *BatchedWebhookEvents) Reset() { + *x = BatchedWebhookEvents{} + if protoimpl.UnsafeEnabled { + mi := &file_livekit_webhook_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *BatchedWebhookEvents) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*BatchedWebhookEvents) ProtoMessage() {} + +func (x *BatchedWebhookEvents) ProtoReflect() protoreflect.Message { + mi := &file_livekit_webhook_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use BatchedWebhookEvents.ProtoReflect.Descriptor instead. +func (*BatchedWebhookEvents) Descriptor() ([]byte, []int) { + return file_livekit_webhook_proto_rawDescGZIP(), []int{1} +} + +func (x *BatchedWebhookEvents) GetEvents() []*WebhookEvent { + if x != nil { + return x.Events + } + return nil +} + +func (x *BatchedWebhookEvents) GetNumDropped() int32 { + if x != nil { + return x.NumDropped + } + return 0 +} + +func (x *BatchedWebhookEvents) GetDequeuedAt() int64 { + if x != nil { + return x.DequeuedAt + } + return 0 +} + var File_livekit_webhook_proto protoreflect.FileDescriptor var file_livekit_webhook_proto_rawDesc = []byte{ @@ -193,15 +256,23 @@ var file_livekit_webhook_proto_rawDesc = []byte{ 0x69, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x07, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, - 0x18, 0x0d, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, + 0x18, 0x0c, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, - 0x70, 0x65, 0x64, 0x42, 0x46, 0x5a, 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, - 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, - 0x6f, 0x6c, 0x2f, 0x6c, 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, - 0x65, 0x4b, 0x69, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, - 0x65, 0x4b, 0x69, 0x74, 0x3a, 0x3a, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x33, + 0x70, 0x65, 0x64, 0x22, 0x87, 0x01, 0x0a, 0x14, 0x42, 0x61, 0x74, 0x63, 0x68, 0x65, 0x64, 0x57, + 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x2d, 0x0a, 0x06, + 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x15, 0x2e, 0x6c, + 0x69, 0x76, 0x65, 0x6b, 0x69, 0x74, 0x2e, 0x57, 0x65, 0x62, 0x68, 0x6f, 0x6f, 0x6b, 0x45, 0x76, + 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x6e, + 0x75, 0x6d, 0x5f, 0x64, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x05, + 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x44, 0x72, 0x6f, 0x70, 0x70, 0x65, 0x64, 0x12, 0x1f, 0x0a, 0x0b, + 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x5f, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x03, 0x52, 0x0a, 0x64, 0x65, 0x71, 0x75, 0x65, 0x75, 0x65, 0x64, 0x41, 0x74, 0x42, 0x46, 0x5a, + 0x23, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6c, 0x69, 0x76, 0x65, + 0x6b, 0x69, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x2f, 0x6c, 0x69, 0x76, + 0x65, 0x6b, 0x69, 0x74, 0xaa, 0x02, 0x0d, 0x4c, 0x69, 0x76, 0x65, 0x4b, 0x69, 0x74, 0x2e, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0xea, 0x02, 0x0e, 0x4c, 0x69, 0x76, 0x65, 0x4b, 0x69, 0x74, 0x3a, 0x3a, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -216,26 +287,28 @@ func file_livekit_webhook_proto_rawDescGZIP() []byte { return file_livekit_webhook_proto_rawDescData } -var file_livekit_webhook_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_livekit_webhook_proto_msgTypes = make([]protoimpl.MessageInfo, 2) var file_livekit_webhook_proto_goTypes = []interface{}{ - (*WebhookEvent)(nil), // 0: livekit.WebhookEvent - (*Room)(nil), // 1: livekit.Room - (*ParticipantInfo)(nil), // 2: livekit.ParticipantInfo - (*EgressInfo)(nil), // 3: livekit.EgressInfo - (*IngressInfo)(nil), // 4: livekit.IngressInfo - (*TrackInfo)(nil), // 5: livekit.TrackInfo + (*WebhookEvent)(nil), // 0: livekit.WebhookEvent + (*BatchedWebhookEvents)(nil), // 1: livekit.BatchedWebhookEvents + (*Room)(nil), // 2: livekit.Room + (*ParticipantInfo)(nil), // 3: livekit.ParticipantInfo + (*EgressInfo)(nil), // 4: livekit.EgressInfo + (*IngressInfo)(nil), // 5: livekit.IngressInfo + (*TrackInfo)(nil), // 6: livekit.TrackInfo } var file_livekit_webhook_proto_depIdxs = []int32{ - 1, // 0: livekit.WebhookEvent.room:type_name -> livekit.Room - 2, // 1: livekit.WebhookEvent.participant:type_name -> livekit.ParticipantInfo - 3, // 2: livekit.WebhookEvent.egress_info:type_name -> livekit.EgressInfo - 4, // 3: livekit.WebhookEvent.ingress_info:type_name -> livekit.IngressInfo - 5, // 4: livekit.WebhookEvent.track:type_name -> livekit.TrackInfo - 5, // [5:5] is the sub-list for method output_type - 5, // [5:5] is the sub-list for method input_type - 5, // [5:5] is the sub-list for extension type_name - 5, // [5:5] is the sub-list for extension extendee - 0, // [0:5] is the sub-list for field type_name + 2, // 0: livekit.WebhookEvent.room:type_name -> livekit.Room + 3, // 1: livekit.WebhookEvent.participant:type_name -> livekit.ParticipantInfo + 4, // 2: livekit.WebhookEvent.egress_info:type_name -> livekit.EgressInfo + 5, // 3: livekit.WebhookEvent.ingress_info:type_name -> livekit.IngressInfo + 6, // 4: livekit.WebhookEvent.track:type_name -> livekit.TrackInfo + 0, // 5: livekit.BatchedWebhookEvents.events:type_name -> livekit.WebhookEvent + 6, // [6:6] is the sub-list for method output_type + 6, // [6:6] is the sub-list for method input_type + 6, // [6:6] is the sub-list for extension type_name + 6, // [6:6] is the sub-list for extension extendee + 0, // [0:6] is the sub-list for field type_name } func init() { file_livekit_webhook_proto_init() } @@ -259,6 +332,18 @@ func file_livekit_webhook_proto_init() { return nil } } + file_livekit_webhook_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*BatchedWebhookEvents); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -266,7 +351,7 @@ func file_livekit_webhook_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_livekit_webhook_proto_rawDesc, NumEnums: 0, - NumMessages: 1, + NumMessages: 2, NumExtensions: 0, NumServices: 0, }, diff --git a/protobufs/livekit_webhook.proto b/protobufs/livekit_webhook.proto index 3e47c468a..473d6a9dc 100644 --- a/protobufs/livekit_webhook.proto +++ b/protobufs/livekit_webhook.proto @@ -55,3 +55,11 @@ message WebhookEvent { // NEXT_ID: 13 } + +message BatchedWebhookEvents { + repeated WebhookEvent events = 1; + + int32 num_dropped = 2; + + int64 dequeued_at = 3; +} diff --git a/rpc/agent.pb.go b/rpc/agent.pb.go index 9367e2564..d68c6a032 100644 --- a/rpc/agent.pb.go +++ b/rpc/agent.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/agent.proto package rpc diff --git a/rpc/egress.pb.go b/rpc/egress.pb.go index 7e14c9859..b41787807 100644 --- a/rpc/egress.pb.go +++ b/rpc/egress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/egress.proto package rpc diff --git a/rpc/ingress.pb.go b/rpc/ingress.pb.go index 65b663815..d7a26ec45 100644 --- a/rpc/ingress.pb.go +++ b/rpc/ingress.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/ingress.proto package rpc diff --git a/rpc/io.pb.go b/rpc/io.pb.go index 568ef68e9..dc2a2d709 100644 --- a/rpc/io.pb.go +++ b/rpc/io.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/io.proto package rpc diff --git a/rpc/keepalive.pb.go b/rpc/keepalive.pb.go index 62a08afa9..57a10cc07 100644 --- a/rpc/keepalive.pb.go +++ b/rpc/keepalive.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/keepalive.proto package rpc diff --git a/rpc/participant.pb.go b/rpc/participant.pb.go index efb56ae1a..7548f72c0 100644 --- a/rpc/participant.pb.go +++ b/rpc/participant.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/participant.proto package rpc diff --git a/rpc/room.pb.go b/rpc/room.pb.go index 6657f7813..1c0bdcc4e 100644 --- a/rpc/room.pb.go +++ b/rpc/room.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/room.proto package rpc diff --git a/rpc/signal.pb.go b/rpc/signal.pb.go index b71170980..8f4e83c91 100644 --- a/rpc/signal.pb.go +++ b/rpc/signal.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/signal.proto package rpc diff --git a/rpc/sip.pb.go b/rpc/sip.pb.go index 78585e74a..bf8f9d243 100644 --- a/rpc/sip.pb.go +++ b/rpc/sip.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.1 -// protoc v4.23.4 +// protoc-gen-go v1.31.0 +// protoc v5.26.1 // source: rpc/sip.proto package rpc diff --git a/webhook/batch_url_notifier.go b/webhook/batch_url_notifier.go new file mode 100644 index 000000000..5c1f593c5 --- /dev/null +++ b/webhook/batch_url_notifier.go @@ -0,0 +1,155 @@ +package webhook + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/base64" + "github.com/hashicorp/go-retryablehttp" + "github.com/livekit/protocol/auth" + "github.com/livekit/protocol/livekit" + "github.com/livekit/protocol/logger" + "google.golang.org/protobuf/encoding/protojson" + "sync" + "time" +) + +const ( + DefaultBatchSendInterval = 100 * time.Millisecond + DefaultMaxBatchSize = 10000 +) + +type BatchURLNotifierParams struct { + Logger logger.Logger + URL string + Interval time.Duration + MaxSize int + APIKey string + APISecret string +} + +type BatchURLNotifier struct { + cancelFunc context.CancelFunc + client *retryablehttp.Client + mu sync.RWMutex + params BatchURLNotifierParams + batch []*livekit.WebhookEvent + dropped int // it's operated inside a mutex scope so no need for atomic type +} + +func NewBatchURLNotifier(ctx context.Context, params BatchURLNotifierParams) URLNotifier { + if params.Interval == 0 { + params.Interval = DefaultBatchSendInterval + } + if params.MaxSize == 0 { + params.MaxSize = DefaultMaxBatchSize + } + + ctx, cancel := context.WithCancel(ctx) + notifier := &BatchURLNotifier{ + cancelFunc: cancel, + params: params, + client: retryablehttp.NewClient(), + } + + go notifier.runner(ctx) + + return notifier +} + +func (b *BatchURLNotifier) runner(ctx context.Context) { + ticker := time.NewTicker(b.params.Interval) + for { + select { + case <-ticker.C: + b.mu.Lock() + b.sendBatch() + b.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +func (b *BatchURLNotifier) sendBatch() { + if len(b.batch) == 0 { + return + } + raw := &livekit.BatchedWebhookEvents{ + Events: b.batch, + NumDropped: int32(b.dropped), + DequeuedAt: time.Now().Unix(), + } + defer func() { + b.batch = nil + }() + b.dropped = 0 + + encoded, err := protojson.Marshal(raw) + if err != nil { + b.params.Logger.Warnw("Failed to marshal event", err) + b.dropped += len(b.batch) + return + } + + // sign payload + sum := sha256.Sum256(encoded) + b64 := base64.StdEncoding.EncodeToString(sum[:]) + at := auth.NewAccessToken(b.params.APIKey, b.params.APISecret). + SetValidFor(5 * time.Minute). + SetSha256(b64) + token, err := at.ToJWT() + if err != nil { + b.params.Logger.Warnw("Failed to generate jwt token", err) + b.dropped += len(b.batch) + return + } + + req, err := retryablehttp.NewRequest("POST", b.params.URL, bytes.NewReader(encoded)) + if err != nil { + b.params.Logger.Warnw("Failed to create http req", err) + b.dropped += len(b.batch) + return + } + + req.Header.Set(authHeader, token) + req.Header.Set("batched", "true") + req.Header.Set("content-type", "application/webhook+json") + resp, err := b.client.Do(req) + if err != nil { + b.params.Logger.Errorw("Failed to send request", err) + b.dropped += len(b.batch) + return + } + _ = resp.Body.Close() + + return +} + +func (b *BatchURLNotifier) SetKeys(apiKey, apiSecret string) { + b.mu.Lock() + defer b.mu.Unlock() + b.params.APIKey = apiKey + b.params.APISecret = apiSecret +} + +func (b *BatchURLNotifier) QueueNotify(event *livekit.WebhookEvent) error { + b.mu.Lock() + defer b.mu.Unlock() + b.batch = append(b.batch, event) + + if len(b.batch) >= b.params.MaxSize { + b.sendBatch() + } + + return nil +} + +func (b *BatchURLNotifier) Stop(force bool) { + b.cancelFunc() + if !force { + b.mu.Lock() + b.sendBatch() + b.mu.Unlock() + } +} diff --git a/webhook/notifier.go b/webhook/notifier.go index 719ad7b0f..e8fa58949 100644 --- a/webhook/notifier.go +++ b/webhook/notifier.go @@ -24,16 +24,31 @@ import ( type QueuedNotifier interface { QueueNotify(ctx context.Context, event *livekit.WebhookEvent) error + Stop(force bool) } type DefaultNotifier struct { - urlNotifiers []*URLNotifier + urlNotifiers []URLNotifier +} + +func NewBatchedNotifier(ctx context.Context, apiKey, apiSecret string, urls []string) QueuedNotifier { + n := &DefaultNotifier{} + for _, url := range urls { + u := NewBatchURLNotifier(ctx, BatchURLNotifierParams{ + Logger: logger.GetLogger().WithComponent("webhook"), + URL: url, + APIKey: apiKey, + APISecret: apiSecret, + }) + n.urlNotifiers = append(n.urlNotifiers, u) + } + return n } func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier { n := &DefaultNotifier{} for _, url := range urls { - u := NewURLNotifier(URLNotifierParams{ + u := NewDefaultURLNotifier(URLNotifierParams{ URL: url, Logger: logger.GetLogger().WithComponent("webhook"), APIKey: apiKey, @@ -52,7 +67,7 @@ func NewDefaultNotifierByParams(params []URLNotifierParams) QueuedNotifier { p.Logger = logger.GetLogger().WithComponent("webhook") } - u := NewURLNotifier(p) + u := NewDefaultURLNotifier(p) n.urlNotifiers = append(n.urlNotifiers, u) } return n @@ -62,7 +77,7 @@ func (n *DefaultNotifier) Stop(force bool) { wg := sync.WaitGroup{} for _, u := range n.urlNotifiers { wg.Add(1) - go func(u *URLNotifier) { + go func(u URLNotifier) { defer wg.Done() u.Stop(force) }(u) diff --git a/webhook/url_notifier.go b/webhook/url_notifier.go index 0d9a325c4..b600b706b 100644 --- a/webhook/url_notifier.go +++ b/webhook/url_notifier.go @@ -31,6 +31,12 @@ import ( "github.com/livekit/protocol/logger" ) +type URLNotifier interface { + SetKeys(apiKey, apiSecret string) + QueueNotify(event *livekit.WebhookEvent) error + Stop(force bool) +} + type URLNotifierParams struct { Logger logger.Logger QueueSize int @@ -42,9 +48,9 @@ type URLNotifierParams struct { const defaultQueueSize = 100 -// URLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. +// DefaultURLNotifier is a QueuedNotifier that sends a POST request to a Webhook URL. // It will retry on failure, and will drop events if notification fall too far behind -type URLNotifier struct { +type DefaultURLNotifier struct { mu sync.RWMutex params URLNotifierParams client *retryablehttp.Client @@ -52,7 +58,7 @@ type URLNotifier struct { worker core.QueueWorker } -func NewURLNotifier(params URLNotifierParams) *URLNotifier { +func NewDefaultURLNotifier(params URLNotifierParams) URLNotifier { if params.QueueSize == 0 { params.QueueSize = defaultQueueSize } @@ -60,7 +66,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { params.Logger = logger.GetLogger() } - n := &URLNotifier{ + n := &DefaultURLNotifier{ params: params, client: retryablehttp.NewClient(), } @@ -73,14 +79,14 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier { return n } -func (n *URLNotifier) SetKeys(apiKey, apiSecret string) { +func (n *DefaultURLNotifier) SetKeys(apiKey, apiSecret string) { n.mu.Lock() defer n.mu.Unlock() n.params.APIKey = apiKey n.params.APISecret = apiSecret } -func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { +func (n *DefaultURLNotifier) QueueNotify(event *livekit.WebhookEvent) error { n.worker.Submit(func() { if err := n.send(event); err != nil { n.params.Logger.Warnw("failed to send webhook", err, "url", n.params.URL, "event", event.Event) @@ -92,7 +98,7 @@ func (n *URLNotifier) QueueNotify(event *livekit.WebhookEvent) error { return nil } -func (n *URLNotifier) Stop(force bool) { +func (n *DefaultURLNotifier) Stop(force bool) { if force { n.worker.Kill() } else { @@ -100,7 +106,7 @@ func (n *URLNotifier) Stop(force bool) { } } -func (n *URLNotifier) send(event *livekit.WebhookEvent) error { +func (n *DefaultURLNotifier) send(event *livekit.WebhookEvent) error { // set dropped count event.NumDropped = n.dropped.Swap(0) event.DequeuedAt = time.Now().Unix() diff --git a/webhook/verifier.go b/webhook/verifier.go index 32964b8ec..f0b193e9c 100644 --- a/webhook/verifier.go +++ b/webhook/verifier.go @@ -82,3 +82,21 @@ func ReceiveWebhookEvent(r *http.Request, provider auth.KeyProvider) (*livekit.W } return &event, nil } + +func ReceiveWebhookEventBatched(r *http.Request, provider auth.KeyProvider) ([]*livekit.WebhookEvent, error) { + data, err := Receive(r, provider) + if err != nil { + return nil, err + } + unmarshalOpts := protojson.UnmarshalOptions{ + DiscardUnknown: true, + AllowPartial: true, + } + + events := livekit.BatchedWebhookEvents{} + if err = unmarshalOpts.Unmarshal(data, &events); err != nil { + return nil, err + } + + return events.Events, nil +} diff --git a/webhook/webhook_test.go b/webhook/webhook_test.go index c12a31adf..02cd87636 100644 --- a/webhook/webhook_test.go +++ b/webhook/webhook_test.go @@ -47,6 +47,7 @@ func TestWebHook(t *testing.T) { defer s.Stop() notifier := NewDefaultNotifier(apiKey, apiSecret, []string{testUrl}) + defer notifier.Stop(true) t.Run("test event payload", func(t *testing.T) { event := &livekit.WebhookEvent{ @@ -74,6 +75,46 @@ func TestWebHook(t *testing.T) { } +func TestBatchWebHook(t *testing.T) { + s := newServer(testAddr) + require.NoError(t, s.Start()) + defer s.Stop() + + notifier := NewBatchedNotifier(context.Background(), apiKey, apiSecret, []string{testUrl}) + defer notifier.Stop(true) + + t.Run("test events payload", func(t *testing.T) { + event := &livekit.WebhookEvent{ + Event: EventTrackPublished, + Participant: &livekit.ParticipantInfo{ + Identity: "test", + }, + Track: &livekit.TrackInfo{ + Sid: "TR_abcde", + }, + } + + wg := sync.WaitGroup{} + wg.Add(1) + s.handler = func(r *http.Request) { + defer wg.Done() + decodedEvent, err := ReceiveWebhookEventBatched(r, authProvider) + require.NoError(t, err) + + require.Equal(t, 3, len(decodedEvent)) + for _, ev := range decodedEvent { + require.EqualValues(t, event, ev) + } + } + // send 3 times + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + require.NoError(t, notifier.QueueNotify(context.Background(), event)) + wg.Wait() + }) + +} + func TestURLNotifierDropped(t *testing.T) { s := newServer(testAddr) require.NoError(t, s.Start()) @@ -170,8 +211,8 @@ func TestURLNotifierLifecycle(t *testing.T) { }) } -func newTestNotifier(dropWhenFull bool) *URLNotifier { - return NewURLNotifier(URLNotifierParams{ +func newTestNotifier(dropWhenFull bool) URLNotifier { + return NewDefaultURLNotifier(URLNotifierParams{ QueueSize: 20, URL: testUrl, APIKey: apiKey,