Skip to content

Commit

Permalink
add more metrics for sync, leaking iterators/snapshots
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Dec 24, 2024
1 parent a21f201 commit 9468ee8
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 6 deletions.
3 changes: 3 additions & 0 deletions chotki.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,9 @@ func (cho *Chotki) Metrics() []prometheus.Collector {
NewNetCollector(cho.net),
EventsBatchSize,
NewPebbleCollector(cho.db),
OpenedIterators,
OpenedSnapshots,
SessionsStates,
}
}

Expand Down
64 changes: 58 additions & 6 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/drpcorg/chotki/rdx"
"github.com/drpcorg/chotki/utils"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
)

const SyncBlockBits = 28
Expand Down Expand Up @@ -83,6 +84,23 @@ func (s SyncState) String() string {
return []string{"SendHandshake", "SendDiff", "SendLive", "SendEOF", "SendNone", "SendPing", "SendPong"}[s]
}

var SessionsStates = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "sessions",
}, []string{"id", "kind"})
var OpenedSnapshots = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "opened_snapshots",
}, []string{"id"})

var OpenedIterators = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "opened_iterators",
}, []string{"id"})

const TraceSize = 10

type Syncer struct {
Expand Down Expand Up @@ -146,24 +164,33 @@ func (sync *Syncer) Close() error {

if sync.snap != nil {
if err := sync.snap.Close(); err != nil {
sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing snapshot", "err", err)
sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing snapshot", "err", err.Error())
} else {
OpenedSnapshots.WithLabelValues(sync.Name).Set(0)
}
sync.snap = nil
}

closediterators := true

if sync.ffit != nil {
if err := sync.ffit.Close(); err != nil {
closediterators = false
sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing ffit", "err", err)
}
sync.ffit = nil
}

if sync.vvit != nil {
if err := sync.vvit.Close(); err != nil {
closediterators = false
sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing vvit", "err", err)
}
sync.vvit = nil
}
if closediterators {
OpenedIterators.WithLabelValues(sync.Name).Set(0)
}

sync.log.InfoCtx(sync.logCtx(context.Background()), "sync: connection %s closed: %v\n", sync.Name, sync.reason)

Expand Down Expand Up @@ -194,6 +221,7 @@ func (sync *Syncer) GetDrainState() SyncState {
}

func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) {
SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(sync.GetFeedState()))
// other side closed the connection already
if sync.GetDrainState() == SendNone {
sync.SetFeedState(ctx, SendNone)
Expand All @@ -215,7 +243,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
}
recs, err = sync.FeedBlockDiff(ctx)
if err == io.EOF {
recs2, _ := sync.FeedDiffVV()
recs2, _ := sync.FeedDiffVV(ctx)
recs = append(recs, recs2...)
if (sync.Mode & SyncLive) != 0 {
sync.SetFeedState(ctx, SendLive)
Expand Down Expand Up @@ -263,7 +291,12 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
reason,
)}
if sync.snap != nil {
_ = sync.snap.Close()
err = sync.snap.Close()
if err != nil {
sync.log.ErrorCtx(sync.logCtx(ctx), "sync: failed closing snapshot", "error", err.Error())
} else {
OpenedSnapshots.WithLabelValues(sync.Name).Set(0)
}
sync.snap = nil
}
sync.SetFeedState(ctx, SendNone)
Expand All @@ -286,6 +319,8 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)

func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
sync.snap = sync.Host.Snapshot()

OpenedSnapshots.WithLabelValues(sync.Name).Set(1)
sync.vvit = sync.snap.NewIter(&pebble.IterOptions{
LowerBound: []byte{'V'},
UpperBound: []byte{'W'},
Expand All @@ -295,6 +330,8 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
UpperBound: []byte{'P'},
})

OpenedIterators.WithLabelValues(sync.Name).Set(1)

ok := sync.vvit.SeekGE(VKey0)
if !ok || 0 != bytes.Compare(sync.vvit.Key(), VKey0) {
return nil, rdx.ErrBadV0Record
Expand Down Expand Up @@ -433,18 +470,31 @@ func (sync *Syncer) FeedBlockDiff(ctx context.Context) (diff protocol.Records, e
return protocol.Records{parcel}, err
}

func (sync *Syncer) FeedDiffVV() (vv protocol.Records, err error) {
func (sync *Syncer) FeedDiffVV(ctx context.Context) (vv protocol.Records, err error) {
protocol.CloseHeader(sync.vpack, 5)
vv = append(vv, sync.vpack)
sync.vpack = nil
_ = sync.ffit.Close()
closediterators := true
err = sync.ffit.Close()
if err != nil {
closediterators = false
sync.log.ErrorCtx(sync.logCtx(ctx), "failed closing ffit", "err", err)
}
sync.ffit = nil
_ = sync.vvit.Close()
err = sync.vvit.Close()
if err != nil {
closediterators = false
sync.log.ErrorCtx(sync.logCtx(ctx), "failed closing vvit", "err", err)
}
sync.vvit = nil
if closediterators {
OpenedIterators.WithLabelValues(sync.Name).Set(0)
}
return
}

func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) {
SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(state))
sync.log.InfoCtx(sync.logCtx(ctx), "sync: feed state", "state", state.String())
sync.lock.Lock()
sync.feedState = state
Expand All @@ -453,6 +503,7 @@ func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) {

func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState) {
sync.log.InfoCtx(sync.logCtx(ctx), "sync: drain state", "state", state.String())
SessionsStates.WithLabelValues(sync.Name, "drain").Set(float64(state))
sync.lock.Lock()
sync.drainState = state
if sync.cond.L == nil {
Expand Down Expand Up @@ -538,6 +589,7 @@ func (sync *Syncer) Drain(ctx context.Context, recs protocol.Records) (err error
if len(recs) == 0 {
return nil
}
SessionsStates.WithLabelValues(sync.Name, "drain").Set(float64(sync.GetDrainState()))

recs = sync.processPings(recs)

Expand Down

0 comments on commit 9468ee8

Please sign in to comment.