From c721a3ed39936d321f05c65e633d7f83c197fd98 Mon Sep 17 00:00:00 2001 From: Paul Wells Date: Sun, 26 May 2024 12:36:38 -0700 Subject: [PATCH] add message bytes to psrpc metrics observer (#723) * add message bytes to psrpc metrics observer * deps --- go.mod | 2 +- go.sum | 4 +-- rpc/metrics.go | 68 ++++++++++++++++++++++++++------------ utils/latencyaggregate.go | 14 ++++++++ utils/timeoutqueue.go | 22 ++++++------ utils/timeoutqueue_test.go | 2 +- 6 files changed, 75 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index d59a21ef..d198c2b5 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/jxskiss/base62 v1.1.0 github.com/lithammer/shortuuid/v4 v4.0.0 github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 - github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 + github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 github.com/mackerelio/go-osstat v0.2.4 github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 github.com/pion/logging v0.2.2 diff --git a/go.sum b/go.sum index cc08d66a..7a3d1130 100644 --- a/go.sum +++ b/go.sum @@ -76,8 +76,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58= github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ= -github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0= -github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= +github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto= +github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0= github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs= github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ= github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM= diff --git a/rpc/metrics.go b/rpc/metrics.go index eb02bef4..dcea8fbc 100644 --- a/rpc/metrics.go +++ b/rpc/metrics.go @@ -37,6 +37,7 @@ type psrpcMetrics struct { streamReceiveTotal *prometheus.CounterVec streamCurrent *prometheus.GaugeVec errorTotal *prometheus.CounterVec + bytesTotal *prometheus.CounterVec } var ( @@ -82,6 +83,7 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) { labels := append(curryLabelNames, "role", "kind", "service", "method") streamLabels := append(curryLabelNames, "role", "service", "method") + bytesLabels := append(labels, "direction") metricsBase.requestTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: livekitNamespace, @@ -115,6 +117,12 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) { Name: "error_total", ConstLabels: constLabels, }, labels) + metricsBase.bytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: livekitNamespace, + Subsystem: "psrpc", + Name: "bytes_total", + ConstLabels: constLabels, + }, bytesLabels) metricsBase.mu.Unlock() @@ -123,6 +131,7 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) { prometheus.MustRegister(metricsBase.streamReceiveTotal) prometheus.MustRegister(metricsBase.streamCurrent) prometheus.MustRegister(metricsBase.errorTotal) + prometheus.MustRegister(metricsBase.bytesTotal) CurryMetricLabels(o.curryLabels) } @@ -146,6 +155,7 @@ func CurryMetricLabels(labels prometheus.Labels) { streamReceiveTotal: metricsBase.streamReceiveTotal.MustCurryWith(metricsBase.curryLabels), streamCurrent: metricsBase.streamCurrent.MustCurryWith(metricsBase.curryLabels), errorTotal: metricsBase.errorTotal.MustCurryWith(metricsBase.curryLabels), + bytesTotal: metricsBase.bytesTotal.MustCurryWith(metricsBase.curryLabels), }) } @@ -153,59 +163,73 @@ var _ middleware.MetricsObserver = PSRPCMetricsObserver{} type PSRPCMetricsObserver struct{} -func (o PSRPCMetricsObserver) OnUnaryRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error) { +func (o PSRPCMetricsObserver) OnUnaryRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int) { + m := metrics.Load() + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(rxBytes)) + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(txBytes)) + if err != nil { - metrics.Load().errorTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Inc() - } else if role == middleware.ClientRole { - metrics.Load().requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) + m.errorTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Inc() } else { - metrics.Load().requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) + m.requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) } } -func (o PSRPCMetricsObserver) OnMultiRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int) { +func (o PSRPCMetricsObserver) OnMultiRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, rxBytes, txBytes int) { + m := metrics.Load() + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(rxBytes)) + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(txBytes)) + if responseCount == 0 { - metrics.Load().errorTotal.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Inc() - } else if role == middleware.ClientRole { - metrics.Load().requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) + m.errorTotal.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Inc() } else { - metrics.Load().requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) + m.requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds())) } } -func (o PSRPCMetricsObserver) OnStreamSend(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error) { +func (o PSRPCMetricsObserver) OnStreamSend(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error, bytes int) { + m := metrics.Load() + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(bytes)) + if err != nil { - metrics.Load().errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc() + m.errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc() } else { - metrics.Load().streamSendTime.WithLabelValues(role.String(), info.Service, info.Method).Observe(float64(duration.Milliseconds())) + m.streamSendTime.WithLabelValues(role.String(), info.Service, info.Method).Observe(float64(duration.Milliseconds())) } } -func (o PSRPCMetricsObserver) OnStreamRecv(role middleware.MetricRole, info psrpc.RPCInfo, err error) { +func (o PSRPCMetricsObserver) OnStreamRecv(role middleware.MetricRole, info psrpc.RPCInfo, err error, bytes int) { + m := metrics.Load() + m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(bytes)) + if err != nil { - metrics.Load().errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc() + m.errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc() } else { - metrics.Load().streamReceiveTotal.WithLabelValues(role.String(), info.Service, info.Method).Inc() + m.streamReceiveTotal.WithLabelValues(role.String(), info.Service, info.Method).Inc() } } func (o PSRPCMetricsObserver) OnStreamOpen(role middleware.MetricRole, info psrpc.RPCInfo) { - metrics.Load().streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Inc() + m := metrics.Load() + m.streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Inc() } func (o PSRPCMetricsObserver) OnStreamClose(role middleware.MetricRole, info psrpc.RPCInfo) { - metrics.Load().streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Dec() + m := metrics.Load() + m.streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Dec() } +var _ middleware.MetricsObserver = UnimplementedMetricsObserver{} + type UnimplementedMetricsObserver struct{} -func (o UnimplementedMetricsObserver) OnUnaryRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) { +func (o UnimplementedMetricsObserver) OnUnaryRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int) { } -func (o UnimplementedMetricsObserver) OnMultiRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int) { +func (o UnimplementedMetricsObserver) OnMultiRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, reqBytes, txBytes int) { } -func (o UnimplementedMetricsObserver) OnStreamSend(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) { +func (o UnimplementedMetricsObserver) OnStreamSend(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, bytes int) { } -func (o UnimplementedMetricsObserver) OnStreamRecv(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, err error) { +func (o UnimplementedMetricsObserver) OnStreamRecv(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, err error, bytes int) { } func (o UnimplementedMetricsObserver) OnStreamOpen(role middleware.MetricRole, rpcInfo psrpc.RPCInfo) { } diff --git a/utils/latencyaggregate.go b/utils/latencyaggregate.go index 7e0db479..8d847ccb 100644 --- a/utils/latencyaggregate.go +++ b/utils/latencyaggregate.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package utils import ( diff --git a/utils/timeoutqueue.go b/utils/timeoutqueue.go index f19a3b59..a5f03ec5 100644 --- a/utils/timeoutqueue.go +++ b/utils/timeoutqueue.go @@ -53,12 +53,12 @@ func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T]) { q.remove(i) } -func (q *TimeoutQueue[T]) popBefore(t time.Time, remove bool) *TimeoutQueueItem[T] { +func (q *TimeoutQueue[T]) popBefore(t int64, remove bool) *TimeoutQueueItem[T] { q.mu.Lock() defer q.mu.Unlock() i := q.head - if i == nil || i.time > t.UnixNano() { + if i == nil || i.time > t { return nil } @@ -100,34 +100,34 @@ func (q *TimeoutQueue[T]) remove(i *TimeoutQueueItem[T]) { i.prev = nil } -func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) timeoutQueueIterator[T] { +func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) TimeoutQueueIterator[T] { return newTimeoutQueueIterator(q, timeout, false) } -func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) timeoutQueueIterator[T] { +func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) TimeoutQueueIterator[T] { return newTimeoutQueueIterator(q, timeout, true) } -type timeoutQueueIterator[T any] struct { +type TimeoutQueueIterator[T any] struct { q *TimeoutQueue[T] - time time.Time + time int64 remove bool item *TimeoutQueueItem[T] } -func newTimeoutQueueIterator[T any](q *TimeoutQueue[T], timeout time.Duration, remove bool) timeoutQueueIterator[T] { - return timeoutQueueIterator[T]{ +func newTimeoutQueueIterator[T any](q *TimeoutQueue[T], timeout time.Duration, remove bool) TimeoutQueueIterator[T] { + return TimeoutQueueIterator[T]{ q: q, - time: time.Now().Add(-timeout), + time: time.Now().Add(-timeout).UnixNano(), remove: remove, } } -func (i *timeoutQueueIterator[T]) Next() bool { +func (i *TimeoutQueueIterator[T]) Next() bool { i.item = i.q.popBefore(i.time, i.remove) return i.item != nil } -func (i *timeoutQueueIterator[T]) Item() *TimeoutQueueItem[T] { +func (i *TimeoutQueueIterator[T]) Item() *TimeoutQueueItem[T] { return i.item } diff --git a/utils/timeoutqueue_test.go b/utils/timeoutqueue_test.go index eb8368ce..c091895c 100644 --- a/utils/timeoutqueue_test.go +++ b/utils/timeoutqueue_test.go @@ -65,7 +65,7 @@ func TestTimeoutQueue(t *testing.T) { q.Reset(it) } - ts := time.Now() + ts := time.Now().UnixNano() q.popBefore(ts, true) require.EqualValues(t, []int{1, 2, 3, 4}, debugTimeoutQueueItems(&q)) q.popBefore(ts, true)