Skip to content

Commit

Permalink
add drain time statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Jan 9, 2025
1 parent dac0e89 commit 4a82d06
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ var EventsBatchSize = prometheus.NewHistogram(prometheus.HistogramOpts{
Buckets: []float64{0, 1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000},
})

var DrainTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "chotki",
Name: "drain_time",
Buckets: prometheus.DefBuckets,
}, []string{"type"})

type Options struct {
pebble.Options

Expand Down Expand Up @@ -501,6 +507,10 @@ func (cho *Chotki) Broadcast(ctx context.Context, records protocol.Records, exce

// Here new packets are timestamped and queued for save
func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body protocol.Records) (id rdx.ID, err error) {
now := time.Now()
defer func() {
DrainTime.WithLabelValues("commit+broadcast").Observe(float64(time.Since(now)) / float64(time.Millisecond))
}()
cho.commitMutex.Lock()
defer cho.commitMutex.Unlock()

Expand All @@ -513,6 +523,7 @@ func (cho *Chotki) CommitPacket(ctx context.Context, lit byte, ref rdx.ID, body
packet := protocol.Record(lit, i, r, protocol.Join(body...))
recs := protocol.Records{packet}
err = cho.drain(ctx, recs)
DrainTime.WithLabelValues("commit").Observe(float64(time.Since(now)) / float64(time.Millisecond))
cho.Broadcast(ctx, recs, "")
return
}
Expand Down Expand Up @@ -557,6 +568,7 @@ func (cho *Chotki) Metrics() []prometheus.Collector {
OpenedIterators,
OpenedSnapshots,
SessionsStates,
DrainTime,
}
}

Expand Down Expand Up @@ -662,6 +674,10 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
}

func (cho *Chotki) Drain(ctx context.Context, recs protocol.Records) (err error) {
now := time.Now()
defer func() {
DrainTime.WithLabelValues("drain").Observe(float64(time.Since(now)) / float64(time.Millisecond))
}()
cho.lock.RLock()
defer cho.lock.RUnlock()
if cho.db == nil {
Expand Down

0 comments on commit 4a82d06

Please sign in to comment.