Skip to content

Commit

Permalink
add message bytes to psrpc metrics observer (#723)
Browse files Browse the repository at this point in the history
* add message bytes to psrpc metrics observer

* deps
  • Loading branch information
paulwe authored May 26, 2024
1 parent 2ec622e commit c721a3e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 37 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/jxskiss/base62 v1.1.0
github.com/lithammer/shortuuid/v4 v4.0.0
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5
github.com/mackerelio/go-osstat v0.2.4
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1
github.com/pion/logging v0.2.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ github.com/lithammer/shortuuid/v4 v4.0.0 h1:QRbbVkfgNippHOS8PXDkti4NaWeyYfcBTHtw
github.com/lithammer/shortuuid/v4 v4.0.0/go.mod h1:Zs8puNcrvf2rV9rTH51ZLLcj7ZXqQI3lv67aw4KiB1Y=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1 h1:jm09419p0lqTkDaKb5iXdynYrzB84ErPPO4LbRASk58=
github.com/livekit/mageutil v0.0.0-20230125210925-54e8a70427c1/go.mod h1:Rs3MhFwutWhGwmY1VQsygw28z5bWcnEYmS1OG9OxjOQ=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4 h1:253WtQ2VGVHzIIzW9MUZj7vUDDILESU3zsEbiRdxYF0=
github.com/livekit/psrpc v0.5.3-0.20240228172457-3724cb4adbc4/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5 h1:mTZyrjk5WEWMsvaYtJ42pG7DuxysKj21DKPINpGSIto=
github.com/livekit/psrpc v0.5.3-0.20240526192918-fbdaf10e6aa5/go.mod h1:CQUBSPfYYAaevg1TNCc6/aYsa8DJH4jSRFdCeSZk5u0=
github.com/mackerelio/go-osstat v0.2.4 h1:qxGbdPkFo65PXOb/F/nhDKpF2nGmGaCFDLXoZjJTtUs=
github.com/mackerelio/go-osstat v0.2.4/go.mod h1:Zy+qzGdZs3A9cuIqmgbJvwbmLQH9dJvtio5ZjJTbdlQ=
github.com/maxbrunsfeld/counterfeiter/v6 v6.8.1 h1:NicmruxkeqHjDv03SfSxqmaLuisddudfP3h5wdXFbhM=
Expand Down
68 changes: 46 additions & 22 deletions rpc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type psrpcMetrics struct {
streamReceiveTotal *prometheus.CounterVec
streamCurrent *prometheus.GaugeVec
errorTotal *prometheus.CounterVec
bytesTotal *prometheus.CounterVec
}

var (
Expand Down Expand Up @@ -82,6 +83,7 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) {

labels := append(curryLabelNames, "role", "kind", "service", "method")
streamLabels := append(curryLabelNames, "role", "service", "method")
bytesLabels := append(labels, "direction")

metricsBase.requestTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: livekitNamespace,
Expand Down Expand Up @@ -115,6 +117,12 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) {
Name: "error_total",
ConstLabels: constLabels,
}, labels)
metricsBase.bytesTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: livekitNamespace,
Subsystem: "psrpc",
Name: "bytes_total",
ConstLabels: constLabels,
}, bytesLabels)

metricsBase.mu.Unlock()

Expand All @@ -123,6 +131,7 @@ func InitPSRPCStats(constLabels prometheus.Labels, opts ...PSRPCMetricsOption) {
prometheus.MustRegister(metricsBase.streamReceiveTotal)
prometheus.MustRegister(metricsBase.streamCurrent)
prometheus.MustRegister(metricsBase.errorTotal)
prometheus.MustRegister(metricsBase.bytesTotal)

CurryMetricLabels(o.curryLabels)
}
Expand All @@ -146,66 +155,81 @@ func CurryMetricLabels(labels prometheus.Labels) {
streamReceiveTotal: metricsBase.streamReceiveTotal.MustCurryWith(metricsBase.curryLabels),
streamCurrent: metricsBase.streamCurrent.MustCurryWith(metricsBase.curryLabels),
errorTotal: metricsBase.errorTotal.MustCurryWith(metricsBase.curryLabels),
bytesTotal: metricsBase.bytesTotal.MustCurryWith(metricsBase.curryLabels),
})
}

var _ middleware.MetricsObserver = PSRPCMetricsObserver{}

type PSRPCMetricsObserver struct{}

func (o PSRPCMetricsObserver) OnUnaryRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error) {
func (o PSRPCMetricsObserver) OnUnaryRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int) {
m := metrics.Load()
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(rxBytes))
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(txBytes))

if err != nil {
metrics.Load().errorTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Inc()
} else if role == middleware.ClientRole {
metrics.Load().requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
m.errorTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Inc()
} else {
metrics.Load().requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
m.requestTime.WithLabelValues(role.String(), "rpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
}
}

func (o PSRPCMetricsObserver) OnMultiRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int) {
func (o PSRPCMetricsObserver) OnMultiRequest(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, rxBytes, txBytes int) {
m := metrics.Load()
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(rxBytes))
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(txBytes))

if responseCount == 0 {
metrics.Load().errorTotal.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Inc()
} else if role == middleware.ClientRole {
metrics.Load().requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
m.errorTotal.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Inc()
} else {
metrics.Load().requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
m.requestTime.WithLabelValues(role.String(), "multirpc", info.Service, info.Method).Observe(float64(duration.Milliseconds()))
}
}

func (o PSRPCMetricsObserver) OnStreamSend(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error) {
func (o PSRPCMetricsObserver) OnStreamSend(role middleware.MetricRole, info psrpc.RPCInfo, duration time.Duration, err error, bytes int) {
m := metrics.Load()
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "tx").Add(float64(bytes))

if err != nil {
metrics.Load().errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc()
m.errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc()
} else {
metrics.Load().streamSendTime.WithLabelValues(role.String(), info.Service, info.Method).Observe(float64(duration.Milliseconds()))
m.streamSendTime.WithLabelValues(role.String(), info.Service, info.Method).Observe(float64(duration.Milliseconds()))
}
}

func (o PSRPCMetricsObserver) OnStreamRecv(role middleware.MetricRole, info psrpc.RPCInfo, err error) {
func (o PSRPCMetricsObserver) OnStreamRecv(role middleware.MetricRole, info psrpc.RPCInfo, err error, bytes int) {
m := metrics.Load()
m.bytesTotal.WithLabelValues(role.String(), "rpc", info.Service, info.Method, "rx").Add(float64(bytes))

if err != nil {
metrics.Load().errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc()
m.errorTotal.WithLabelValues(role.String(), "stream", info.Service, info.Method).Inc()
} else {
metrics.Load().streamReceiveTotal.WithLabelValues(role.String(), info.Service, info.Method).Inc()
m.streamReceiveTotal.WithLabelValues(role.String(), info.Service, info.Method).Inc()
}
}

func (o PSRPCMetricsObserver) OnStreamOpen(role middleware.MetricRole, info psrpc.RPCInfo) {
metrics.Load().streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Inc()
m := metrics.Load()
m.streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Inc()
}

func (o PSRPCMetricsObserver) OnStreamClose(role middleware.MetricRole, info psrpc.RPCInfo) {
metrics.Load().streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Dec()
m := metrics.Load()
m.streamCurrent.WithLabelValues(role.String(), info.Service, info.Method).Dec()
}

var _ middleware.MetricsObserver = UnimplementedMetricsObserver{}

type UnimplementedMetricsObserver struct{}

func (o UnimplementedMetricsObserver) OnUnaryRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) {
func (o UnimplementedMetricsObserver) OnUnaryRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, rxBytes, txBytes int) {
}
func (o UnimplementedMetricsObserver) OnMultiRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount int, errorCount int) {
func (o UnimplementedMetricsObserver) OnMultiRequest(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, responseCount, errorCount, reqBytes, txBytes int) {
}
func (o UnimplementedMetricsObserver) OnStreamSend(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error) {
func (o UnimplementedMetricsObserver) OnStreamSend(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, duration time.Duration, err error, bytes int) {
}
func (o UnimplementedMetricsObserver) OnStreamRecv(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, err error) {
func (o UnimplementedMetricsObserver) OnStreamRecv(role middleware.MetricRole, rpcInfo psrpc.RPCInfo, err error, bytes int) {
}
func (o UnimplementedMetricsObserver) OnStreamOpen(role middleware.MetricRole, rpcInfo psrpc.RPCInfo) {
}
Expand Down
14 changes: 14 additions & 0 deletions utils/latencyaggregate.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package utils

import (
Expand Down
22 changes: 11 additions & 11 deletions utils/timeoutqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,12 @@ func (q *TimeoutQueue[T]) Remove(i *TimeoutQueueItem[T]) {
q.remove(i)
}

func (q *TimeoutQueue[T]) popBefore(t time.Time, remove bool) *TimeoutQueueItem[T] {
func (q *TimeoutQueue[T]) popBefore(t int64, remove bool) *TimeoutQueueItem[T] {
q.mu.Lock()
defer q.mu.Unlock()

i := q.head
if i == nil || i.time > t.UnixNano() {
if i == nil || i.time > t {
return nil
}

Expand Down Expand Up @@ -100,34 +100,34 @@ func (q *TimeoutQueue[T]) remove(i *TimeoutQueueItem[T]) {
i.prev = nil
}

func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) timeoutQueueIterator[T] {
func (q *TimeoutQueue[T]) IterateAfter(timeout time.Duration) TimeoutQueueIterator[T] {
return newTimeoutQueueIterator(q, timeout, false)
}

func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) timeoutQueueIterator[T] {
func (q *TimeoutQueue[T]) IterateRemoveAfter(timeout time.Duration) TimeoutQueueIterator[T] {
return newTimeoutQueueIterator(q, timeout, true)
}

type timeoutQueueIterator[T any] struct {
type TimeoutQueueIterator[T any] struct {
q *TimeoutQueue[T]
time time.Time
time int64
remove bool
item *TimeoutQueueItem[T]
}

func newTimeoutQueueIterator[T any](q *TimeoutQueue[T], timeout time.Duration, remove bool) timeoutQueueIterator[T] {
return timeoutQueueIterator[T]{
func newTimeoutQueueIterator[T any](q *TimeoutQueue[T], timeout time.Duration, remove bool) TimeoutQueueIterator[T] {
return TimeoutQueueIterator[T]{
q: q,
time: time.Now().Add(-timeout),
time: time.Now().Add(-timeout).UnixNano(),
remove: remove,
}
}

func (i *timeoutQueueIterator[T]) Next() bool {
func (i *TimeoutQueueIterator[T]) Next() bool {
i.item = i.q.popBefore(i.time, i.remove)
return i.item != nil
}

func (i *timeoutQueueIterator[T]) Item() *TimeoutQueueItem[T] {
func (i *TimeoutQueueIterator[T]) Item() *TimeoutQueueItem[T] {
return i.item
}
2 changes: 1 addition & 1 deletion utils/timeoutqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestTimeoutQueue(t *testing.T) {
q.Reset(it)
}

ts := time.Now()
ts := time.Now().UnixNano()
q.popBefore(ts, true)
require.EqualValues(t, []int{1, 2, 3, 4}, debugTimeoutQueueItems(&q))
q.popBefore(ts, true)
Expand Down

0 comments on commit c721a3e

Please sign in to comment.