Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
boks1971 committed Nov 23, 2024
1 parent ebc0b8f commit 49bebdc
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 1 deletion.
82 changes: 82 additions & 0 deletions utils/metrics_batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,74 @@ func (m *MetricsBatchBuilder) AddEventMetric(em EventMetric) error {
return nil
}

func (m *MetricsBatchBuilder) Merge(other *livekit.MetricsBatch) {
// Timestamp and NormalizedTimestamp are not merged

// add from other's StrData as needed
for _, str := range other.StrData {
m.getStrDataIndex(str)
}

for _, optsm := range other.TimeSeries {
ptsm := &livekit.TimeSeriesMetric{
Samples: optsm.Samples,
}
if optsm.Label < uint32(int(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE)) {
ptsm.Label = optsm.Label
} else {
if tidx, ok := m.translateStrDataIndex(other.StrData, optsm.Label); ok {
ptsm.Label = tidx
}
}

if tidx, ok := m.translateStrDataIndex(other.StrData, optsm.ParticipantIdentity); ok {
ptsm.ParticipantIdentity = tidx
}

if tidx, ok := m.translateStrDataIndex(other.StrData, optsm.TrackSid); ok {
ptsm.TrackSid = tidx
}

if tidx, ok := m.translateStrDataIndex(other.StrData, optsm.Rid); ok {
ptsm.Rid = tidx
}

m.MetricsBatch.TimeSeries = append(m.MetricsBatch.TimeSeries, ptsm)
}

for _, opem := range other.Events {
pem := &livekit.EventMetric{}
if opem.Label < uint32(int(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE)) {
pem.Label = opem.Label
} else {
if tidx, ok := m.translateStrDataIndex(other.StrData, opem.Label); ok {
pem.Label = tidx
}
}

if tidx, ok := m.translateStrDataIndex(other.StrData, opem.ParticipantIdentity); ok {
pem.ParticipantIdentity = tidx
}

if tidx, ok := m.translateStrDataIndex(other.StrData, opem.TrackSid); ok {
pem.TrackSid = tidx
}

pem.StartTimestampMs = opem.StartTimestampMs
pem.EndTimestampMs = opem.EndTimestampMs
pem.NormalizedStartTimestamp = opem.NormalizedStartTimestamp
pem.NormalizedEndTimestamp = opem.NormalizedEndTimestamp

pem.Metadata = opem.Metadata

if tidx, ok := m.translateStrDataIndex(other.StrData, opem.Rid); ok {
pem.Rid = tidx
}

m.MetricsBatch.Events = append(m.MetricsBatch.Events, pem)
}
}

func (m *MetricsBatchBuilder) getStrDataIndex(s string) uint32 {
idx, ok := m.stringData[s]
if !ok {
Expand All @@ -184,3 +252,17 @@ func (m *MetricsBatchBuilder) getStrDataIndex(s string) uint32 {
}
return idx
}

func (m *MetricsBatchBuilder) translateStrDataIndex(strData []string, index uint32) (uint32, bool) {
if index < uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) {
return 0, false
}

baseIdx := index - uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE)
if len(strData) <= int(baseIdx) {
return 0, false
}

translatedIdx, ok := m.stringData[strData[baseIdx]]
return translatedIdx, ok
}
145 changes: 144 additions & 1 deletion utils/metrics_batch_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package utils

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -279,7 +280,7 @@ func TestMetricsBatchBuilder(t *testing.T) {
})
require.ErrorIs(t, err, ErrInvalidMetricLabel)

// add an eventmetric
// add an event metric
err = mbb.AddEventMetric(EventMetric{
MetricLabel: livekit.MetricLabel_PUBLISHER_RTT,
ParticipantIdentity: "PA_1",
Expand Down Expand Up @@ -322,4 +323,146 @@ func TestMetricsBatchBuilder(t *testing.T) {
mb := mbb.ToProto()
require.True(t, proto.Equal(expected, mb))
})

t.Run("merge", func(t *testing.T) {
at := mono.Now()
atMilli := at.UnixMilli()
normalizedAt := mono.Now().Add(10 * time.Millisecond)

expected := &livekit.MetricsBatch{
TimestampMs: at.UnixMilli(),
NormalizedTimestamp: timestamppb.New(normalizedAt),
StrData: []string{
"PA_1",
"TR_VC1",
"f",
"CustomMetric",
"TR_VC2",
"q",
"PA_2",
},
Events: []*livekit.EventMetric{
{
Label: uint32(livekit.MetricLabel_PUBLISHER_RTT),
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE),
TrackSid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 1,
StartTimestampMs: at.UnixMilli(),
EndTimestampMs: &atMilli,
NormalizedStartTimestamp: timestamppb.New(normalizedAt),
NormalizedEndTimestamp: timestamppb.New(normalizedAt),
Metadata: "md1",
Rid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 2,
},
{
Label: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 3,
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE),
TrackSid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 4,
StartTimestampMs: at.UnixMilli(),
NormalizedStartTimestamp: timestamppb.New(normalizedAt),
Metadata: "md2",
Rid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 5,
},
{
Label: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 3,
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE),
TrackSid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 4,
StartTimestampMs: at.UnixMilli(),
NormalizedStartTimestamp: timestamppb.New(normalizedAt),
Metadata: "md2",
Rid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 5,
},
},
TimeSeries: []*livekit.TimeSeriesMetric{
{
Label: uint32(livekit.MetricLabel_SUBSCRIBER_RTT),
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 6,
Samples: []*livekit.MetricSample{
{
TimestampMs: at.UnixMilli(),
NormalizedTimestamp: timestamppb.New(normalizedAt),
Value: 102.4,
},
},
},
},
}

mbb := NewMetricsBatchBuilder()
mbb.SetTime(at, normalizedAt)

// should not be able to add invalid metric label index
err := mbb.AddEventMetric(EventMetric{
MetricLabel: livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE,
})
require.ErrorIs(t, err, ErrInvalidMetricLabel)

// add an event metric
err = mbb.AddEventMetric(EventMetric{
MetricLabel: livekit.MetricLabel_PUBLISHER_RTT,
ParticipantIdentity: "PA_1",
TrackID: "TR_VC1",
StartedAt: at,
EndedAt: at,
NormalizedStartedAt: normalizedAt,
NormalizedEndedAt: normalizedAt,
Metadata: "md1",
Rid: "f",
})
require.NoError(t, err)

// add a second one with some optional fields not included
// including this here and in the one to merge to test index translation
err = mbb.AddEventMetric(EventMetric{
CustomMetricLabel: "CustomMetric",
ParticipantIdentity: "PA_1",
TrackID: "TR_VC2",
StartedAt: at,
NormalizedStartedAt: normalizedAt,
Metadata: "md2",
Rid: "q",
})
require.NoError(t, err)

toMerge := &livekit.MetricsBatch{
TimestampMs: at.UnixMilli(),
NormalizedTimestamp: timestamppb.New(normalizedAt),
StrData: []string{
"CustomMetric",
"PA_1",
"TR_VC2",
"q",
"PA_2",
},
Events: []*livekit.EventMetric{
{
Label: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE),
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 1,
TrackSid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 2,
StartTimestampMs: at.UnixMilli(),
NormalizedStartTimestamp: timestamppb.New(normalizedAt),
Metadata: "md2",
Rid: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 3,
},
},
TimeSeries: []*livekit.TimeSeriesMetric{
{
Label: uint32(livekit.MetricLabel_SUBSCRIBER_RTT),
ParticipantIdentity: uint32(livekit.MetricLabel_METRIC_LABEL_PREDEFINED_MAX_VALUE) + 4,
Samples: []*livekit.MetricSample{
{
TimestampMs: at.UnixMilli(),
NormalizedTimestamp: timestamppb.New(normalizedAt),
Value: 102.4,
},
},
},
},
}
mbb.Merge(toMerge)

mb := mbb.ToProto()
fmt.Printf("expected: %s\n", expected) // REMOVE
fmt.Printf("actual: %s\n", mb) // REMOVE
require.True(t, proto.Equal(expected, mb))
})
}

0 comments on commit 49bebdc

Please sign in to comment.