Skip to content

Commit

Permalink
Make webhook more configurable
Browse files Browse the repository at this point in the history
    * Add a new constructor to have more control over the URLNotifier
    * Add 'DropWhenFull' as param in 'URLNotifierParams'
  • Loading branch information
itzloop committed May 29, 2024
1 parent c721a3e commit 23ef4a4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
15 changes: 14 additions & 1 deletion webhook/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package webhook

import (
"context"
"github.com/livekit/protocol/logger"
"sync"

"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
)

type QueuedNotifier interface {
Expand All @@ -44,6 +44,19 @@ func NewDefaultNotifier(apiKey, apiSecret string, urls []string) QueuedNotifier
return n
}

func NewDefaultNotifierByParams(params []URLNotifierParams) QueuedNotifier {
n := &DefaultNotifier{}
for _, p := range params {
if p.Logger == nil {
p.Logger = logger.GetLogger().WithComponent("webhook")
}

u := NewURLNotifier(p)
n.urlNotifiers = append(n.urlNotifiers, u)
}
return n
}

func (n *DefaultNotifier) Stop(force bool) {
wg := sync.WaitGroup{}
for _, u := range n.urlNotifiers {
Expand Down
17 changes: 11 additions & 6 deletions webhook/url_notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
)

type URLNotifierParams struct {
Logger logger.Logger
QueueSize int
URL string
APIKey string
APISecret string
Logger logger.Logger
QueueSize int
DropWhenFull bool
URL string
APIKey string
APISecret string
}

const defaultQueueSize = 100
Expand Down Expand Up @@ -66,7 +67,7 @@ func NewURLNotifier(params URLNotifierParams) *URLNotifier {
n.client.Logger = &logAdapter{}
n.worker = core.NewQueueWorker(core.QueueWorkerParams{
QueueSize: params.QueueSize,
DropWhenFull: true,
DropWhenFull: params.DropWhenFull,
OnDropped: func() { n.dropped.Inc() },
})
return n
Expand Down Expand Up @@ -102,6 +103,10 @@ func (n *URLNotifier) Stop(force bool) {
func (n *URLNotifier) send(event *livekit.WebhookEvent) error {
// set dropped count
event.NumDropped = n.dropped.Swap(0)

// set the when event is dequeued and being sent
event.DequeuedAt = time.Now().Unix()

encoded, err := protojson.Marshal(event)
if err != nil {
return err
Expand Down

0 comments on commit 23ef4a4

Please sign in to comment.