Skip to content

Commit

Permalink
add version to sync metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Termina1 committed Dec 24, 2024
1 parent c5bc7ee commit 391fc26
Showing 1 changed file with 15 additions and 13 deletions.
28 changes: 15 additions & 13 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const SyncBlockBits = 28
const SyncBlockMask = (rdx.ID(1) << SyncBlockBits) - 1
const MaxParcelSize = 100_000_000

var version string = fmt.Sprintf("%d", time.Now().Unix())

type SyncHost interface {
protocol.Drainer
Snapshot() pebble.Reader
Expand Down Expand Up @@ -89,18 +91,18 @@ var SessionsStates = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "sessions",
}, []string{"id", "kind"})
}, []string{"id", "kind", "version"})
var OpenedSnapshots = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "opened_snapshots",
}, []string{"id"})
}, []string{"id", "version"})

var OpenedIterators = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "chotki",
Subsystem: "sync",
Name: "opened_iterators",
}, []string{"id"})
}, []string{"id", "version"})

const TraceSize = 10

Expand Down Expand Up @@ -167,7 +169,7 @@ func (sync *Syncer) Close() error {
if err := sync.snap.Close(); err != nil {
sync.log.ErrorCtx(sync.logCtx(context.Background()), "failed closing snapshot", "err", err.Error())
} else {
OpenedSnapshots.WithLabelValues(sync.Name).Set(0)
OpenedSnapshots.WithLabelValues(sync.Name, version).Set(0)
}
sync.snap = nil
}
Expand All @@ -190,7 +192,7 @@ func (sync *Syncer) Close() error {
sync.vvit = nil
}
if closediterators {
OpenedIterators.WithLabelValues(sync.Name).Set(0)
OpenedIterators.WithLabelValues(sync.Name, version).Set(0)
}

sync.log.InfoCtx(sync.logCtx(context.Background()), fmt.Sprintf("sync: connection %s closed: %v\n", sync.Name, sync.reason))
Expand Down Expand Up @@ -222,7 +224,7 @@ func (sync *Syncer) GetDrainState() SyncState {
}

func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error) {
SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(sync.GetFeedState()))
SessionsStates.WithLabelValues(sync.Name, "feed", version).Set(float64(sync.GetFeedState()))
// other side closed the connection already
if sync.GetDrainState() == SendNone {
sync.SetFeedState(ctx, SendNone)
Expand Down Expand Up @@ -258,7 +260,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
if err != nil {
sync.log.ErrorCtx(sync.logCtx(ctx), "sync: failed closing snapshot", "err", err)
} else {
OpenedSnapshots.WithLabelValues(sync.Name).Set(0)
OpenedSnapshots.WithLabelValues(sync.Name, version).Set(0)
}
sync.snap = nil
err = nil
Expand Down Expand Up @@ -304,7 +306,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
if err != nil {
sync.log.ErrorCtx(sync.logCtx(ctx), "sync: failed closing snapshot", "error", err.Error())
} else {
OpenedSnapshots.WithLabelValues(sync.Name).Set(0)
OpenedSnapshots.WithLabelValues(sync.Name, version).Set(0)
}
sync.snap = nil
}
Expand All @@ -329,7 +331,7 @@ func (sync *Syncer) Feed(ctx context.Context) (recs protocol.Records, err error)
func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
sync.snap = sync.Host.Snapshot()

OpenedSnapshots.WithLabelValues(sync.Name).Set(1)
OpenedSnapshots.WithLabelValues(sync.Name, version).Set(1)
sync.vvit = sync.snap.NewIter(&pebble.IterOptions{
LowerBound: []byte{'V'},
UpperBound: []byte{'W'},
Expand All @@ -339,7 +341,7 @@ func (sync *Syncer) FeedHandshake() (vv protocol.Records, err error) {
UpperBound: []byte{'P'},
})

OpenedIterators.WithLabelValues(sync.Name).Set(1)
OpenedIterators.WithLabelValues(sync.Name, version).Set(1)

ok := sync.vvit.SeekGE(VKey0)
if !ok || 0 != bytes.Compare(sync.vvit.Key(), VKey0) {
Expand Down Expand Up @@ -501,13 +503,13 @@ func (sync *Syncer) FeedDiffVV(ctx context.Context) (vv protocol.Records, err er
sync.vvit = nil
}
if closediterators {
OpenedIterators.WithLabelValues(sync.Name).Set(0)
OpenedIterators.WithLabelValues(sync.Name, version).Set(0)
}
return
}

func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) {
SessionsStates.WithLabelValues(sync.Name, "feed").Set(float64(state))
SessionsStates.WithLabelValues(sync.Name, "feed", version).Set(float64(state))
sync.log.InfoCtx(sync.logCtx(ctx), "sync: feed state", "state", state.String())
sync.lock.Lock()
sync.feedState = state
Expand All @@ -516,7 +518,7 @@ func (sync *Syncer) SetFeedState(ctx context.Context, state SyncState) {

func (sync *Syncer) SetDrainState(ctx context.Context, state SyncState) {
sync.log.InfoCtx(sync.logCtx(ctx), "sync: drain state", "state", state.String())
SessionsStates.WithLabelValues(sync.Name, "drain").Set(float64(state))
SessionsStates.WithLabelValues(sync.Name, "drain", version).Set(float64(state))
sync.lock.Lock()
sync.drainState = state
if sync.cond.L == nil {
Expand Down

0 comments on commit 391fc26

Please sign in to comment.