diff --git a/measure.go b/measure.go index 8955a55..e53cd7e 100644 --- a/measure.go +++ b/measure.go @@ -77,6 +77,23 @@ func New(prefix string, ds datastore.Datastore) *measure { duErr: metrics.New(prefix+".du.errors_total", "Number of errored Datastore.DiskUsage calls").Counter(), duLatency: metrics.New(prefix+".du.latency_seconds", "Latency distribution of Datastore.DiskUsage calls").Histogram(datastoreLatencyBuckets), + + batchPutNum: metrics.New(prefix+".batchput_total", "Total number of Batch.Put calls").Counter(), + batchPutErr: metrics.New(prefix+".batchput.errors_total", "Number of errored Batch.Put calls").Counter(), + batchPutLatency: metrics.New(prefix+".batchput.latency_seconds", + "Latency distribution of Batch.Put calls").Histogram(datastoreLatencyBuckets), + batchPutSize: metrics.New(prefix+".batchput.size_bytes", + "Size distribution of byte slices put into batches").Histogram(datastoreSizeBuckets), + + batchDeleteNum: metrics.New(prefix+".batchdelete_total", "Total number of Batch.Delete calls").Counter(), + batchDeleteErr: metrics.New(prefix+".batchdelete.errors_total", "Number of errored Batch.Delete calls").Counter(), + batchDeleteLatency: metrics.New(prefix+".batchdelete.latency_seconds", + "Latency distribution of Batch.Delete calls").Histogram(datastoreLatencyBuckets), + + batchCommitNum: metrics.New(prefix+".batchcommit_total", "Total number of Batch.Commit calls").Counter(), + batchCommitErr: metrics.New(prefix+".batchcommit.errors_total", "Number of errored Batch.Commit calls").Counter(), + batchCommitLatency: metrics.New(prefix+".batchcommit.latency_seconds", + "Latency distribution of Batch.Commit calls").Histogram(datastoreLatencyBuckets), } return m } @@ -125,6 +142,19 @@ type measure struct { duNum metrics.Counter duErr metrics.Counter duLatency metrics.Histogram + + batchPutNum metrics.Counter + batchPutErr metrics.Counter + batchPutLatency metrics.Histogram + batchPutSize metrics.Histogram + + batchDeleteNum metrics.Counter + batchDeleteErr metrics.Counter + batchDeleteLatency metrics.Histogram + + batchCommitNum metrics.Counter + batchCommitErr metrics.Counter + batchCommitLatency metrics.Histogram } func recordLatency(h metrics.Histogram, start time.Time) { @@ -251,12 +281,7 @@ func (m *measure) DiskUsage() (uint64, error) { } type measuredBatch struct { - puts int - deletes int - - putts datastore.Batch - delts datastore.Batch - + b datastore.Batch m *measure } @@ -265,64 +290,46 @@ func (m *measure) Batch() (datastore.Batch, error) { if !ok { return nil, datastore.ErrBatchUnsupported } - pb, err := bds.Batch() - if err != nil { - return nil, err - } - - db, err := bds.Batch() + batch, err := bds.Batch() if err != nil { return nil, err } return &measuredBatch{ - putts: pb, - delts: db, - + b: batch, m: m, }, nil } func (mt *measuredBatch) Put(key datastore.Key, val []byte) error { - mt.puts++ - mt.m.putSize.Observe(float64(len(val))) - return mt.putts.Put(key, val) -} - -func (mt *measuredBatch) Delete(key datastore.Key) error { - mt.deletes++ - return mt.delts.Delete(key) -} - -func (mt *measuredBatch) Commit() error { - err := logBatchCommit(mt.delts, mt.deletes, mt.m.deleteNum, mt.m.deleteErr, mt.m.deleteLatency) + defer recordLatency(mt.m.batchPutLatency, time.Now()) + mt.m.batchPutNum.Inc() + mt.m.batchPutSize.Observe(float64(len(val))) + err := mt.b.Put(key, val) if err != nil { - return err + mt.m.batchPutErr.Inc() } + return err +} - err = logBatchCommit(mt.putts, mt.puts, mt.m.putNum, mt.m.putErr, mt.m.putLatency) +func (mt *measuredBatch) Delete(key datastore.Key) error { + defer recordLatency(mt.m.batchDeleteLatency, time.Now()) + mt.m.batchDeleteNum.Inc() + err := mt.b.Delete(key) if err != nil { - return err + mt.m.batchDeleteErr.Inc() } - - return nil + return err } -func logBatchCommit(b datastore.Batch, n int, num, errs metrics.Counter, lat metrics.Histogram) error { - if n > 0 { - before := time.Now() - err := b.Commit() - took := time.Since(before) / time.Duration(n) - num.Add(float64(n)) - for i := 0; i < n; i++ { - lat.Observe(took.Seconds()) - } - if err != nil { - errs.Inc() - return err - } +func (mt *measuredBatch) Commit() error { + defer recordLatency(mt.m.batchCommitLatency, time.Now()) + mt.m.batchCommitNum.Inc() + err := mt.b.Commit() + if err != nil { + mt.m.batchCommitErr.Inc() } - return nil + return err } func (m *measure) Close() error {