diff --git a/muxdb/metrics.go b/muxdb/metrics.go index 65ead18fb..298b75bed 100644 --- a/muxdb/metrics.go +++ b/muxdb/metrics.go @@ -1,4 +1,4 @@ -// Copyright (c) 2024 The VeChainThor developers +// Copyright (c) 2025 The VeChainThor developers // Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying // file LICENSE or @@ -8,7 +8,140 @@ package muxdb import ( + "fmt" + "math" + "strconv" + "strings" + "text/tabwriter" + "github.com/vechain/thor/v2/metrics" ) -var metricCacheHitMiss = metrics.LazyLoadGaugeVec("cache_hit_miss_count", []string{"type", "event"}) +var ( + metricCacheHitMiss = metrics.LazyLoadGaugeVec("cache_hit_miss_count", []string{"type", "event"}) + metricCompaction = metrics.LazyLoadGaugeVec("compaction_stats_gauge", []string{"level", "type"}) +) + +// CompactionValues holds the values for a specific level. +type CompactionValues struct { + Level string + Tables int64 + SizeMB int64 + TimeSec int64 + ReadMB int64 + WriteMB int64 +} + +// Collects the compaction values from the stats table. +// The format of the stats table is: +/* +Compactions + Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB) +-------+------------+---------------+---------------+---------------+--------------- + 0 | 2 | 224.46577 | 3.25844 | 0.00000 | 1908.26756 + 1 | 29 | 110.98547 | 6.76062 | 2070.73768 | 2054.52797 + 2 | 295 | 1109.32673 | 3.16157 | 883.22560 | 799.85596 + 3 | 2777 | 10206.97173 | 0.33533 | 103.17983 | 91.55081 + 4 | 4100 | 15773.54834 | 6.75241 | 2032.57337 | 1851.48528 +-------+------------+---------------+---------------+---------------+--------------- + Total | 7203 | 27425.29804 | 20.26837 | 5089.71648 | 6705.68758 + +*/ +func collectCompactionValues(stats string) { + // Create a new tabwriter + var sb strings.Builder + w := tabwriter.NewWriter(&sb, 0, 0, 1, ' ', tabwriter.Debug) + + // Print the stats string using the tabwriter + fmt.Fprintln(w, stats) + w.Flush() + + // Extract and log the value from the specified level + formattedStats := sb.String() + logger.Debug(formattedStats) + values, err := extractCompactionValues(formattedStats) + if err != nil { + logger.Error("Failed to extract values for stats %s: %s", stats, err.Error()) + } else { + for _, value := range values { + metricCompaction().SetWithLabel(value.Tables, map[string]string{"level": value.Level, "type": "tables"}) + metricCompaction().SetWithLabel(value.SizeMB, map[string]string{"level": value.Level, "type": "size-mb"}) + metricCompaction().SetWithLabel(value.TimeSec, map[string]string{"level": value.Level, "type": "time-sec"}) + metricCompaction().SetWithLabel(value.ReadMB, map[string]string{"level": value.Level, "type": "read-mb"}) + metricCompaction().SetWithLabel(value.WriteMB, map[string]string{"level": value.Level, "type": "write-mb"}) + } + } +} + +func parseAndRoundFloatToInt64(str string) (int64, error) { + // Parse the string to a float64 + floatValue, err := strconv.ParseFloat(str, 64) + if err != nil { + return 0, err + } + + // Round the float64 value + roundedValue := math.Round(floatValue) + + // Convert the rounded float64 to int64 + intValue := int64(roundedValue) + + return intValue, nil +} + +func extractCompactionValues(stats string) ([]CompactionValues, error) { + lines := strings.Split(stats, "\n") + var values []CompactionValues + + if len(lines) < 6 { + return nil, fmt.Errorf("not enough lines in stats %s", stats) + } + + for _, line := range lines[2 : len(lines)-3] { + columns := strings.Fields(line) + if len(columns) >= 6 { + value, err := parseCompactionColumns(columns) + if err != nil { + return nil, err + } + values = append(values, *value) + } + } + + if len(values) == 0 { + return nil, fmt.Errorf("no valid compaction values found in stats %s", stats) + } + + return values, nil +} + +func parseCompactionColumns(columns []string) (*CompactionValues, error) { + tables, err := strconv.ParseInt(columns[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("error when parsing tables: %v", err) + } + sizeMb, err := parseAndRoundFloatToInt64(columns[4]) + if err != nil { + return nil, fmt.Errorf("error when parsing sizeMb: %v", err) + } + timeSec, err := parseAndRoundFloatToInt64(columns[6]) + if err != nil { + return nil, fmt.Errorf("error when parsing timeSec: %v", err) + } + readMb, err := parseAndRoundFloatToInt64(columns[8]) + if err != nil { + return nil, fmt.Errorf("error when parsing readMb: %v", err) + } + writeMb, err := parseAndRoundFloatToInt64(columns[10]) + if err != nil { + return nil, fmt.Errorf("error when parsing writeMb: %v", err) + } + return &CompactionValues{ + Level: columns[0], + Tables: tables, + SizeMB: sizeMb, + TimeSec: timeSec, + ReadMB: readMb, + WriteMB: writeMb, + }, nil +} diff --git a/muxdb/metrics_test.go b/muxdb/metrics_test.go new file mode 100644 index 000000000..b4fdba2f0 --- /dev/null +++ b/muxdb/metrics_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2025 The VeChainThor developers + +// Distributed under the GNU Lesser General Public License v3.0 software license, see the accompanying +// file LICENSE or + +// Package muxdb implements the storage layer for block-chain. +// It manages instance of merkle-patricia-trie, and general purpose named kv-store. +package muxdb + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricsCollectCompactionValues(t *testing.T) { + require.NotPanics(t, func() { collectCompactionValues("") }) + require.NotPanics(t, func() { collectCompactionValues("wrong stats") }) + + stats := ` Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB) +-------+------------+---------------+---------------+---------------+--------------- + 0 | 0 | 0.00000 | 0.16840 | 0.00000 | 61.67909 + 1 | 27 | 96.34199 | 1.03280 | 139.39040 | 138.68919 + 2 | 271 | 989.34527 | 0.15046 | 45.49008 | 39.92714 + 3 | 2732 | 10002.10112 | 1.11660 | 128.58780 | 119.32566 + 4 | 3544 | 13591.24199 | 3.38804 | 2059.54114 | 223.60823 +-------+------------+---------------+---------------+---------------+--------------- + Total | 6574 | 24679.03037 | 5.85630 | 2373.00942 | 583.22930 +` + values, err := extractCompactionValues(stats) + + require.Equal(t, nil, err) + require.Equal(t, "0", values[0].Level) + require.Equal(t, int64(27), values[1].Tables) + require.Equal(t, int64(989), values[2].SizeMB) +} diff --git a/muxdb/muxdb.go b/muxdb/muxdb.go index a0f00ae2c..eca46f414 100644 --- a/muxdb/muxdb.go +++ b/muxdb/muxdb.go @@ -10,6 +10,8 @@ package muxdb import ( "context" "encoding/json" + "sync" + "time" "github.com/syndtr/goleveldb/leveldb" dberrors "github.com/syndtr/goleveldb/leveldb/errors" @@ -60,6 +62,29 @@ type Options struct { type MuxDB struct { engine engine.Engine trieBackend *backend + cancelFunc context.CancelFunc + wg sync.WaitGroup +} + +// collectCompactionMetrics collects compaction metrics periodically. +func collectCompactionMetrics(ctx context.Context, ldb *leveldb.DB) { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // stats is a table in a single string, so we need to parse it + stats, err := ldb.GetProperty("leveldb.stats") + if err != nil { + logger.Error("Failed to get LevelDB stats: %v", err) + } else { + collectCompactionValues(stats) + } + case <-ctx.Done(): + return + } + } } // Open opens or creates DB at the given path. @@ -103,7 +128,9 @@ func Open(path string, options *Options) (*MuxDB, error) { return nil, err } - return &MuxDB{ + ctx, cancelFunc := context.WithCancel(context.Background()) + + muxdb := &MuxDB{ engine: engine, trieBackend: &backend{ Store: engine, @@ -114,7 +141,16 @@ func Open(path string, options *Options) (*MuxDB, error) { DedupedPtnFactor: cfg.DedupedPtnFactor, CachedNodeTTL: options.TrieCachedNodeTTL, }, - }, nil + cancelFunc: cancelFunc, + } + + muxdb.wg.Add(1) + go func() { + defer muxdb.wg.Done() + collectCompactionMetrics(ctx, ldb) + }() + + return muxdb, nil } // NewMem creates a memory-backed DB. @@ -137,6 +173,13 @@ func NewMem() *MuxDB { // Close closes the DB. func (db *MuxDB) Close() error { + if db.cancelFunc != nil { + db.cancelFunc() + } + + // Wait for all goroutines to finish + db.wg.Wait() + return db.engine.Close() }