Skip to content

Commit

Permalink
feat: Pre implement batch url notifier
Browse files Browse the repository at this point in the history
  • Loading branch information
Mahdi-zarei committed Jun 12, 2024
1 parent 1392069 commit a77130f
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions webhook/batch_url_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package webhook

import (
"bytes"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"net/http"
"sync"
"time"
)

type BatchURLNotifierParams struct {
Logger logger.Logger
URL string
Interval time.Duration
MaxSize int
APIKey string
APISecret string
}

type BatchURLNotifier struct {
client *http.Client
mu sync.RWMutex
params BatchURLNotifierParams
batch []*livekit.WebhookEvent
dropped int32 // it's operated in a mutex so no need for atomic type
done chan struct{}
}

func NewBatchURLNotifier(params BatchURLNotifierParams) URLNotifier {
if params.Interval == 0 {
params.Interval = 100 * time.Millisecond
}

if params.MaxSize == 0 {
params.MaxSize = 10000
}

notifier := &BatchURLNotifier{
params: params,
done: make(chan struct{}),
client: http.DefaultClient,
}
notifier.client.Timeout = 10 * time.Second

return notifier
}

func (b *BatchURLNotifier) runner() {
ticker := time.NewTicker(b.params.Interval)
for {
select {
case <-ticker.C:
b.mu.Lock()
b.sendBatch()
b.mu.Unlock()
case <-b.done:
return
}
}
}

func (b *BatchURLNotifier) sendBatch() { // TODO complete when protobuf generated
// need the protobuf... we work with dummies for now

data := []byte{}
// set batch dequeue time

req, err := http.NewRequest("POST", b.params.URL, bytes.NewReader(data))
if err != nil {
b.dropped += 1 // Add the batch size...
return
}

req.Header.Set(authHeader, "smth")
req.Header.Set("content-type", "application/webhook+json")
resp, err := b.client.Do(req)
if err != nil {
b.dropped += 1 // add the batch size...
return
}
_ = resp.Body.Close()
return
}

func (b *BatchURLNotifier) SetKeys(apiKey, apiSecret string) {
b.mu.Lock()
defer b.mu.Unlock()
b.params.APIKey = apiKey
b.params.APISecret = apiSecret
}

func (b *BatchURLNotifier) QueueNotify(event *livekit.WebhookEvent) error {
b.mu.Lock()
defer b.mu.Unlock()
b.batch = append(b.batch, event)

if len(b.batch) >= b.params.MaxSize {
b.sendBatch()
}

return nil
}

func (b *BatchURLNotifier) Stop(force bool) {
close(b.done)
if !force {
b.mu.Lock()
b.sendBatch()
b.mu.Unlock()
}
}

0 comments on commit a77130f

Please sign in to comment.