From 08af7217b1a2a17a3e70637c3c444f28132535a6 Mon Sep 17 00:00:00 2001 From: Roy Li Date: Mon, 13 May 2024 17:01:32 -0400 Subject: [PATCH 1/3] Implement locking in CommitMultiStore --- store/cachemulti/locking_test.go | 363 ++++++++++++++++++++++++++++++ store/cachemulti/store.go | 183 ++++++++------- store/lockingkv/lockingkv.go | 252 --------------------- store/lockingkv/lockingkv_test.go | 184 --------------- store/types/store.go | 38 +--- store/types/store_test.go | 4 +- 6 files changed, 474 insertions(+), 550 deletions(-) create mode 100644 store/cachemulti/locking_test.go delete mode 100644 store/lockingkv/lockingkv.go delete mode 100644 store/lockingkv/lockingkv_test.go diff --git a/store/cachemulti/locking_test.go b/store/cachemulti/locking_test.go new file mode 100644 index 00000000000..d590890e42c --- /dev/null +++ b/store/cachemulti/locking_test.go @@ -0,0 +1,363 @@ +package cachemulti_test + +import ( + "sync" + "testing" + "time" + + "cosmossdk.io/log" + "cosmossdk.io/store/metrics" + pruningtypes "cosmossdk.io/store/pruning/types" + "cosmossdk.io/store/rootmulti" + "cosmossdk.io/store/types" + dbm "github.com/cosmos/cosmos-db" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStore_LinearizeReadsAndWrites(t *testing.T) { + key := []byte("kv_store_key") + storeKey := types.NewKVStoreKey("store1") + lockKey := []byte("a") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + kvStore := lockingCms.GetKVStore(storeKey) + v := kvStore.Get(key) + if v == nil { + kvStore.Set(key, []byte{1}) + } else { + v[0]++ + kvStore.Set(key, v) + } + lockingCms.Write() + }() + } + + wg.Wait() + require.Equal(t, []byte{100}, lockingCms.GetKVStore(storeKey).Get(key)) +} + +func TestStore_LockOrderToPreventDeadlock(t *testing.T) { + key := []byte("kv_store_key") + storeKey := types.NewKVStoreKey("store1") + lockKeyA := []byte("a") + lockKeyB := []byte("b") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + // Acquire keys in two different orders ensuring that we don't reach deadlock. + wg := sync.WaitGroup{} + wg.Add(200) + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + + lockingCms.Lock([][]byte{lockKeyA, lockKeyB}) + defer lockingCms.Unlock([][]byte{lockKeyA, lockKeyB}) + kvStore := lockingCms.GetKVStore(storeKey) + v := kvStore.Get(key) + if v == nil { + kvStore.Set(key, []byte{1}) + } else { + v[0]++ + kvStore.Set(key, v) + } + lockingCms.Write() + }() + + go func() { + defer wg.Done() + + lockingCms.Lock([][]byte{lockKeyB, lockKeyA}) + defer lockingCms.Unlock([][]byte{lockKeyB, lockKeyA}) + kvStore := lockingCms.GetKVStore(storeKey) + v := kvStore.Get(key) + if v == nil { + kvStore.Set(key, []byte{1}) + } else { + v[0]++ + kvStore.Set(key, v) + } + lockingCms.Write() + }() + } + + wg.Wait() + require.Equal(t, []byte{200}, lockingCms.GetKVStore(storeKey).Get(key)) +} + +func TestStore_AllowForParallelUpdates(t *testing.T) { + storeKey := types.NewKVStoreKey("store1") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + wg := sync.WaitGroup{} + wg.Add(100) + + for i := byte(0); i < 100; i++ { + k := []byte{i} + go func() { + defer wg.Done() + + // We specifically don't unlock the keys during processing so that we can show that we must process all + // of these in parallel before the wait group is done. + lockingCms.Lock([][]byte{k}) + lockingCms.GetKVStore(storeKey).Set(k, k) + lockingCms.Write() + }() + } + + wg.Wait() + for i := byte(0); i < 100; i++ { + lockingCms.Unlock([][]byte{{i}}) + } + for i := byte(0); i < 100; i++ { + require.Equal(t, []byte{i}, lockingCms.GetKVStore(storeKey).Get([]byte{i})) + } +} + +func TestStore_AddLocksDuringTransaction(t *testing.T) { + key := []byte("kv_store_key") + storeKey := types.NewKVStoreKey("store1") + lockKey := []byte("lockkey") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + wg := sync.WaitGroup{} + wg.Add(100) + for i := byte(0); i < 100; i++ { + k := []byte{i} + go func() { + defer wg.Done() + + lockingCms.Lock([][]byte{k}) + defer lockingCms.Unlock([][]byte{k}) + lockingCms.GetKVStore(storeKey).Set(k, k) + + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + kvStore := lockingCms.GetKVStore(storeKey) + v := kvStore.Get(key) + if v == nil { + kvStore.Set(key, []byte{1}) + } else { + v[0]++ + kvStore.Set(key, v) + } + lockingCms.Write() + }() + } + + wg.Wait() + for i := byte(0); i < 100; i++ { + require.Equal(t, []byte{i}, lockingCms.GetKVStore(storeKey).Get([]byte{i})) + } + require.Equal(t, []byte{100}, lockingCms.GetKVStore(storeKey).Get(key)) +} + +func TestStore_MaintainLockOverMultipleTransactions(t *testing.T) { + keyA := []byte("kv_store_key_a") + keyB := []byte("kv_store_key_b") + storeKey := types.NewKVStoreKey("store1") + lockKeyA := []byte("lockkeya") + lockKeyB := []byte("lockkeyb") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + // Key A is set differently in the first and second transaction so we can check it + // to see what transaction was run last. + lockingCms.GetKVStore(storeKey).Set(keyA, []byte{0}) + lockingCms.GetKVStore(storeKey).Set(keyB, []byte{0}) + + wg := sync.WaitGroup{} + wg.Add(100) + for i := byte(0); i < 100; i++ { + k := []byte{i} + go func() { + defer wg.Done() + + lockingCms.Lock([][]byte{k}) + defer lockingCms.Unlock([][]byte{k}) + lockingCms.GetKVStore(storeKey).Set(k, k) + + lockingCms.Lock([][]byte{lockKeyA}) + defer lockingCms.Unlock([][]byte{lockKeyA}) + + func() { + lockingCms.Lock([][]byte{lockKeyB}) + defer lockingCms.Unlock([][]byte{lockKeyB}) + + assert.Equal(t, []byte{0}, lockingCms.GetKVStore(storeKey).Get(keyA)) + lockingCms.GetKVStore(storeKey).Set(keyA, []byte{1}) + v := lockingCms.GetKVStore(storeKey).Get(keyB) + v[0]++ + lockingCms.GetKVStore(storeKey).Set(keyB, v) + lockingCms.Write() + }() + + func() { + lockingCms.Lock([][]byte{lockKeyB}) + defer lockingCms.Unlock([][]byte{lockKeyB}) + + assert.Equal(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(keyA)) + lockingCms.GetKVStore(storeKey).Set(keyA, []byte{0}) + v := lockingCms.GetKVStore(storeKey).Get(keyB) + v[0]++ + lockingCms.GetKVStore(storeKey).Set(keyB, v) + lockingCms.Write() + }() + }() + } + + wg.Wait() + require.Equal(t, []byte{200}, lockingCms.GetKVStore(storeKey).Get(keyB)) +} + +func TestStore_ReadWriteLock(t *testing.T) { + numReadersKey := []byte("kv_store_key_a") + numWritersKey := []byte("kv_store_key_b") + maxNumReadersKey := []byte("kv_store_key_c") + maxNumWritersKey := []byte("kv_store_key_d") + storeKey := types.NewKVStoreKey("store1") + rwLockKey := []byte("lockkeya") + lockKey := []byte("lockkeyb") + + db := dbm.NewMemDB() + store := rootmulti.NewStore(db, log.NewNopLogger(), metrics.NewNoOpMetrics()) + store.SetPruning(pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + store.MountStoreWithDB(storeKey, types.StoreTypeIAVL, db) + err := store.LoadLatestVersion() + assert.NoError(t, err) + lockingCms := store.LockingCacheMultiStore() + + lockingCms.GetKVStore(storeKey).Set(numReadersKey, []byte{0}) + lockingCms.GetKVStore(storeKey).Set(numWritersKey, []byte{0}) + lockingCms.GetKVStore(storeKey).Set(maxNumReadersKey, []byte{0}) + lockingCms.GetKVStore(storeKey).Set(maxNumWritersKey, []byte{0}) + + wg := sync.WaitGroup{} + wg.Add(200) + // Start 100 readers and 100 writers. Record the maximum number of readers and writers seen. + for i := 0; i < 100; i++ { + go func() { + defer wg.Done() + + lockingCms.RLockRW([][]byte{rwLockKey}) + defer lockingCms.RUnlockRW([][]byte{rwLockKey}) + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + v := lockingCms.GetKVStore(storeKey).Get(numReadersKey) + v[0]++ + lockingCms.GetKVStore(storeKey).Set(numReadersKey, v) + lockingCms.Write() + }() + + time.Sleep(100 * time.Millisecond) + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + numReaders := lockingCms.GetKVStore(storeKey).Get(numReadersKey)[0] + maxNumReaders := lockingCms.GetKVStore(storeKey).Get(maxNumReadersKey)[0] + if numReaders > maxNumReaders { + lockingCms.GetKVStore(storeKey).Set(maxNumReadersKey, []byte{numReaders}) + } + lockingCms.Write() + }() + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + v := lockingCms.GetKVStore(storeKey).Get(numReadersKey) + v[0]-- + lockingCms.GetKVStore(storeKey).Set(numReadersKey, v) + lockingCms.Write() + }() + }() + + go func() { + defer wg.Done() + + lockingCms.LockRW([][]byte{rwLockKey}) + defer lockingCms.UnlockRW([][]byte{rwLockKey}) + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + v := lockingCms.GetKVStore(storeKey).Get(numWritersKey) + v[0]++ + lockingCms.GetKVStore(storeKey).Set(numWritersKey, v) + lockingCms.Write() + }() + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + numWriters := lockingCms.GetKVStore(storeKey).Get(numWritersKey)[0] + maxNumWriters := lockingCms.GetKVStore(storeKey).Get(maxNumWritersKey)[0] + if numWriters > maxNumWriters { + lockingCms.GetKVStore(storeKey).Set(maxNumWritersKey, []byte{numWriters}) + } + lockingCms.Write() + lockingCms.Write() + }() + + func() { + lockingCms.Lock([][]byte{lockKey}) + defer lockingCms.Unlock([][]byte{lockKey}) + v := lockingCms.GetKVStore(storeKey).Get(numWritersKey) + v[0]-- + lockingCms.GetKVStore(storeKey).Set(numWritersKey, v) + lockingCms.Write() + }() + }() + } + + wg.Wait() + // At some point there should be more than one reader. If this test is flaky, sleep time + // can be added to the reader to deflake. + require.Less(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(maxNumReadersKey)) + // There must be at most one writer at once. + require.Equal(t, []byte{1}, lockingCms.GetKVStore(storeKey).Get(maxNumWritersKey)) +} diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 722af21f153..251d104c27c 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -3,14 +3,14 @@ package cachemulti import ( "fmt" "io" - - dbm "github.com/cosmos/cosmos-db" + "sync" "cosmossdk.io/store/cachekv" "cosmossdk.io/store/dbadapter" - "cosmossdk.io/store/lockingkv" "cosmossdk.io/store/tracekv" "cosmossdk.io/store/types" + dbm "github.com/cosmos/cosmos-db" + "golang.org/x/exp/slices" ) // storeNameCtxKey is the TraceContext metadata key that identifies @@ -31,19 +31,24 @@ type Store struct { traceWriter io.Writer traceContext types.TraceContext + + locks *sync.Map // map from string key to *sync.Mutex or *sync.RWMutex } var ( _ types.CacheMultiStore = Store{} - _ types.LockingStore = Store{} ) // NewFromKVStore creates a new Store object from a mapping of store keys to // CacheWrapper objects and a KVStore as the database. Each CacheWrapper store // is a branched store. func NewFromKVStore( - store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, - keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, + store types.KVStore, + stores map[types.StoreKey]types.CacheWrapper, + keys map[string]types.StoreKey, + traceWriter io.Writer, + traceContext types.TraceContext, + locks *sync.Map, ) Store { cms := Store{ db: cachekv.NewStore(store), @@ -51,6 +56,7 @@ func NewFromKVStore( keys: keys, traceWriter: traceWriter, traceContext: traceContext, + locks: locks, } for key, store := range stores { @@ -67,46 +73,13 @@ func NewFromKVStore( return cms } -// NewLockingFromKVStore creates a new Store object from a mapping of store keys to -// CacheWrapper objects and a KVStore as the database. Each CacheWrapper store -// is a branched store. -func NewLockingFromKVStore( - store types.KVStore, stores map[types.StoreKey]types.CacheWrapper, - keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, -) Store { - cms := Store{ - db: cachekv.NewStore(store), - stores: make(map[types.StoreKey]types.CacheWrap, len(stores)), - keys: keys, - traceWriter: traceWriter, - traceContext: traceContext, - } - - for key, store := range stores { - if cms.TracingEnabled() { - tctx := cms.traceContext.Clone().Merge(types.TraceContext{ - storeNameCtxKey: key.Name(), - }) - - store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) - } - if kvStoreKey, ok := key.(*types.KVStoreKey); ok && kvStoreKey.IsLocking() { - cms.stores[key] = lockingkv.NewStore(store.(types.KVStore)) - } else { - cms.stores[key] = cachekv.NewStore(store.(types.KVStore)) - } - } - - return cms -} - // NewStore creates a new Store object from a mapping of store keys to // CacheWrapper objects. Each CacheWrapper store is a branched store. func NewStore( db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, ) Store { - return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext) + return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext, nil) } // NewLockingStore creates a new Store object from a mapping of store keys to @@ -115,7 +88,14 @@ func NewLockingStore( db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext, ) Store { - return NewLockingFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext) + return NewFromKVStore( + dbadapter.Store{DB: db}, + stores, + keys, + traceWriter, + traceContext, + &sync.Map{}, + ) } func newCacheMultiStoreFromCMS(cms Store) Store { @@ -124,7 +104,7 @@ func newCacheMultiStoreFromCMS(cms Store) Store { stores[k] = v } - return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext) + return NewFromKVStore(cms.db, stores, nil, cms.traceWriter, cms.traceContext, cms.locks) } // SetTracer sets the tracer for the MultiStore that the underlying @@ -173,13 +153,88 @@ func (cms Store) Write() { } } -// Unlock calls Unlock on each underlying LockingStore. -func (cms Store) Unlock() { - for _, store := range cms.stores { - if s, ok := store.(types.LockingStore); ok { - s.Unlock() +// Lock, Unlock, RLockRW, LockRW, RUnlockRW, UnlockRW constitute a permissive locking interface +// that can be used to synchronize concurrent access to the store. Locking of a key should +// represent locking of some part of the store. Note that improper access is not enforced, and it is +// the user's responsibility to ensure proper locking of any access by concurrent goroutines. +// +// Common mistakes may include: +// - Introducing data races by reading or writing state that is claimed by a competing goroutine +// - Introducing deadlocks by locking in different orders through multiple calls to locking methods. +// i.e. if A calls Lock(a) followed by Lock(b), and B calls Lock(b) followed by Lock(a) +// - Using a key as an exclusive lock after it has already been initialized as a read-write lock + +// Lock acquires exclusive locks on a set of keys. +func (cms Store) Lock(keys [][]byte) { + for _, stringKey := range keysToSortedStrings(keys) { + v, _ := cms.locks.LoadOrStore(stringKey, &sync.Mutex{}) + lock := v.(*sync.Mutex) + lock.Lock() + } +} + +// Unlock releases exclusive locks on a set of keys. +func (cms Store) Unlock(keys [][]byte) { + for _, key := range keys { + v, ok := cms.locks.Load(string(key)) + if !ok { + panic("Key not found") + } + lock := v.(*sync.Mutex) + lock.Unlock() + } +} + +// RLockRW acquires read locks on a set of keys. +func (cms Store) RLockRW(keys [][]byte) { + for _, stringKey := range keysToSortedStrings(keys) { + v, _ := cms.locks.LoadOrStore(stringKey, &sync.RWMutex{}) + lock := v.(*sync.RWMutex) + lock.RLock() + } +} + +// LockRW acquires write locks on a set of keys. +func (cms Store) LockRW(keys [][]byte) { + for _, stringKey := range keysToSortedStrings(keys) { + v, _ := cms.locks.LoadOrStore(stringKey, &sync.RWMutex{}) + lock := v.(*sync.RWMutex) + lock.Lock() + } +} + +// RUnlockRW releases read locks on a set of keys. +func (cms Store) RUnlockRW(keys [][]byte) { + for _, key := range keys { + v, ok := cms.locks.Load(string(key)) + if !ok { + panic("Key not found") + } + lock := v.(*sync.RWMutex) + lock.RUnlock() + } +} + +// UnlockRW releases write locks on a set of keys. +func (cms Store) UnlockRW(keys [][]byte) { + for _, key := range keys { + v, ok := cms.locks.Load(string(key)) + if !ok { + panic("Key not found") } + lock := v.(*sync.RWMutex) + lock.Unlock() + } +} + +func keysToSortedStrings(keys [][]byte) []string { + // Ensure that we always operate in a deterministic ordering when acquiring locks to prevent deadlock. + stringLockedKeys := make([]string, len(keys)) + for i, key := range keys { + stringLockedKeys[i] = string(key) } + slices.Sort(stringLockedKeys) + return stringLockedKeys } // Implements CacheWrapper. @@ -197,40 +252,6 @@ func (cms Store) CacheMultiStore() types.CacheMultiStore { return newCacheMultiStoreFromCMS(cms) } -// CacheMultiStoreWithLocking branches each store wrapping each store with a cachekv store if not locked or -// delegating to CacheWrapWithLocks if it is a LockingCacheWrapper. -func (cms Store) CacheMultiStoreWithLocking(storeLocks map[types.StoreKey][][]byte) types.CacheMultiStore { - stores := make(map[types.StoreKey]types.CacheWrapper) - for k, v := range cms.stores { - stores[k] = v - } - - cms2 := Store{ - db: cachekv.NewStore(cms.db), - stores: make(map[types.StoreKey]types.CacheWrap, len(stores)), - keys: cms.keys, - traceWriter: cms.traceWriter, - traceContext: cms.traceContext, - } - - for key, store := range stores { - if lockKeys, ok := storeLocks[key]; ok { - cms2.stores[key] = store.(types.LockingCacheWrapper).CacheWrapWithLocks(lockKeys) - } else { - if cms.TracingEnabled() { - tctx := cms.traceContext.Clone().Merge(types.TraceContext{ - storeNameCtxKey: key.Name(), - }) - - store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, tctx) - } - cms2.stores[key] = cachekv.NewStore(store.(types.KVStore)) - } - } - - return cms2 -} - // CacheMultiStoreWithVersion implements the MultiStore interface. It will panic // as an already cached multi-store cannot load previous versions. // diff --git a/store/lockingkv/lockingkv.go b/store/lockingkv/lockingkv.go deleted file mode 100644 index 7038ba07cb6..00000000000 --- a/store/lockingkv/lockingkv.go +++ /dev/null @@ -1,252 +0,0 @@ -package lockingkv - -import ( - "io" - "sort" - "sync" - - "golang.org/x/exp/slices" - - "cosmossdk.io/store/cachekv" - "cosmossdk.io/store/tracekv" - storetypes "cosmossdk.io/store/types" -) - -var ( - _ storetypes.CacheKVStore = &LockableKV{} - _ storetypes.LockingCacheWrapper = &LockableKV{} - _ storetypes.CacheKVStore = &LockedKV{} - _ storetypes.LockingStore = &LockedKV{} -) - -func NewStore(parent storetypes.KVStore) *LockableKV { - return &LockableKV{ - parent: parent, - locks: sync.Map{}, - } -} - -// LockableKV is a store that is able to provide locks. Each locking key that is used for a lock must represent a -// disjoint partition of store keys that are able to be mutated. For example, locking per account public key would -// provide a lock over all mutations related to that account. -type LockableKV struct { - parent storetypes.KVStore - locks sync.Map // map from string key to *sync.Mutex. - mutations sync.Map // map from string key to []byte. -} - -func (s *LockableKV) Write() { - s.locks.Range(func(key, value any) bool { - lock := value.(*sync.Mutex) - // We should be able to acquire the lock and only would not be able to if for some reason a child - // store was not unlocked. - if !lock.TryLock() { - panic("LockedKV is missing Unlock() invocation.") - } - - // We specifically don't unlock here which prevents users from acquiring the locks again and - // mutating the values allowing the Write() invocation only to happen once effectively. - - return true - }) - - values := make(map[string][]byte) - s.mutations.Range(func(key, value any) bool { - values[key.(string)] = value.([]byte) - return true - }) - - // We need to make the mutations to the parent in a deterministic order to ensure a deterministic hash. - for _, sortedKey := range getSortedKeys[sort.StringSlice](values) { - value := values[sortedKey] - - if value == nil { - s.parent.Delete([]byte(sortedKey)) - } else { - s.parent.Set([]byte(sortedKey), value) - } - } -} - -func (s *LockableKV) GetStoreType() storetypes.StoreType { - return s.parent.GetStoreType() -} - -// CacheWrap allows for branching the store. Care must be taken to ensure that synchronization outside of this -// store is performed to ensure that reads and writes are linearized. -func (s *LockableKV) CacheWrap() storetypes.CacheWrap { - return cachekv.NewStore(s) -} - -// CacheWrapWithTrace allows for branching the store with tracing. Care must be taken to ensure that synchronization -// outside of this store is performed to ensure that reads and writes are linearized. -func (s *LockableKV) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap { - return cachekv.NewStore(tracekv.NewStore(s, w, tc)) -} - -// CacheWrapWithLocks returns a store that allows mutating a set of store keys that are covered by the -// set of lock keys. Each lock key should represent a disjoint partitioned space of store keys for which -// the caller is acquiring locks for. -func (s *LockableKV) CacheWrapWithLocks(lockKeys [][]byte) storetypes.CacheWrap { - stringLockedKeys := make([]string, len(lockKeys)) - for i, key := range lockKeys { - stringLockedKeys[i] = string(key) - } - // Ensure that we always operate in a deterministic ordering when acquiring locks to prevent deadlock. - slices.Sort(stringLockedKeys) - for _, stringKey := range stringLockedKeys { - v, _ := s.locks.LoadOrStore(stringKey, &sync.Mutex{}) - lock := v.(*sync.Mutex) - lock.Lock() - } - - return &LockedKV{ - parent: s, - lockKeys: stringLockedKeys, - mutations: make(map[string][]byte), - } -} - -func (s *LockableKV) Get(key []byte) []byte { - v, loaded := s.mutations.Load(string(key)) - if loaded { - return v.([]byte) - } - - return s.parent.Get(key) -} - -func (s *LockableKV) Has(key []byte) bool { - v, loaded := s.mutations.Load(string(key)) - if loaded { - return v.([]byte) != nil - } - - return s.parent.Has(key) -} - -func (s *LockableKV) Set(key, value []byte) { - s.mutations.Store(string(key), value) -} - -func (s *LockableKV) Delete(key []byte) { - s.Set(key, nil) -} - -func (s *LockableKV) Iterator(start, end []byte) storetypes.Iterator { - panic("This store does not support iteration.") -} - -func (s *LockableKV) ReverseIterator(start, end []byte) storetypes.Iterator { - panic("This store does not support iteration.") -} - -func (s *LockableKV) writeMutations(mutations map[string][]byte) { - // We don't need to sort here since the sync.Map stores keys and values in an arbitrary order. - // LockableKV.Write is responsible for sorting all the keys to ensure a deterministic write order. - for key, mutation := range mutations { - s.mutations.Store(key, mutation) - } -} - -func (s *LockableKV) unlock(lockKeys []string) { - for _, key := range lockKeys { - v, ok := s.locks.Load(key) - if !ok { - panic("Key not found") - } - lock := v.(*sync.Mutex) - - lock.Unlock() - } -} - -// LockedKV is a store that only allows setting of keys that have been locked via CacheWrapWithLocks. -// All other keys are allowed to be read but the user must ensure that no one else is able to mutate those -// values without the appropriate synchronization occurring outside of this store. -// -// This store does not support iteration. -type LockedKV struct { - parent *LockableKV - - lockKeys []string - mutations map[string][]byte -} - -func (s *LockedKV) Write() { - s.parent.writeMutations(s.mutations) -} - -func (s *LockedKV) Unlock() { - s.parent.unlock(s.lockKeys) -} - -func (s *LockedKV) GetStoreType() storetypes.StoreType { - return s.parent.GetStoreType() -} - -func (s *LockedKV) CacheWrap() storetypes.CacheWrap { - return cachekv.NewStore(s) -} - -func (s *LockedKV) CacheWrapWithTrace(w io.Writer, tc storetypes.TraceContext) storetypes.CacheWrap { - return cachekv.NewStore(tracekv.NewStore(s, w, tc)) -} - -func (s *LockedKV) Get(key []byte) []byte { - if key == nil { - panic("nil key") - } - - if value, ok := s.mutations[string(key)]; ok { - return value - } - - return s.parent.Get(key) -} - -func (s *LockedKV) Has(key []byte) bool { - if key == nil { - panic("nil key") - } - - if value, ok := s.mutations[string(key)]; ok { - return value != nil - } - - return s.parent.Has(key) -} - -func (s *LockedKV) Set(key, value []byte) { - if key == nil { - panic("nil key") - } - - s.mutations[string(key)] = value -} - -func (s *LockedKV) Delete(key []byte) { - s.Set(key, nil) -} - -func (s *LockedKV) Iterator(start, end []byte) storetypes.Iterator { - panic("This store does not support iteration.") -} - -func (s *LockedKV) ReverseIterator(start, end []byte) storetypes.Iterator { - panic("This store does not support iteration.") -} - -// getSortedKeys returns the keys of the map in sorted order. -func getSortedKeys[R interface { - ~[]K - sort.Interface -}, K comparable, V any](m map[K]V, -) []K { - keys := make([]K, 0, len(m)) - for k := range m { - keys = append(keys, k) - } - sort.Sort(R(keys)) - return keys -} diff --git a/store/lockingkv/lockingkv_test.go b/store/lockingkv/lockingkv_test.go deleted file mode 100644 index 162b362e679..00000000000 --- a/store/lockingkv/lockingkv_test.go +++ /dev/null @@ -1,184 +0,0 @@ -package lockingkv_test - -import ( - "sync" - "testing" - - "github.com/stretchr/testify/require" - - "cosmossdk.io/store/lockingkv" - "cosmossdk.io/store/transient" - storetypes "cosmossdk.io/store/types" -) - -var ( - a = []byte("a") - b = []byte("b") - key = []byte("key") -) - -func TestLockingKV_LinearizeReadsAndWrites(t *testing.T) { - parent := transient.NewStore() - locking := lockingkv.NewStore(parent) - - wg := sync.WaitGroup{} - wg.Add(100) - for i := 0; i < 100; i++ { - go func() { - defer wg.Done() - - locked := locking.CacheWrapWithLocks([][]byte{a}) - defer locked.(storetypes.LockingStore).Unlock() - v := locked.(storetypes.KVStore).Get(key) - if v == nil { - locked.(storetypes.KVStore).Set(key, []byte{1}) - } else { - v[0]++ - locked.(storetypes.KVStore).Set(key, v) - } - locked.Write() - }() - } - - wg.Wait() - require.Equal(t, []byte{100}, locking.Get(key)) -} - -func TestLockingKV_LockOrderToPreventDeadlock(t *testing.T) { - parent := transient.NewStore() - locking := lockingkv.NewStore(parent) - - // Acquire keys in two different orders ensuring that we don't reach deadlock. - wg := sync.WaitGroup{} - wg.Add(200) - for i := 0; i < 100; i++ { - go func() { - defer wg.Done() - - locked := locking.CacheWrapWithLocks([][]byte{a, b}) - defer locked.(storetypes.LockingStore).Unlock() - v := locked.(storetypes.KVStore).Get(key) - if v == nil { - locked.(storetypes.KVStore).Set(key, []byte{1}) - } else { - v[0]++ - locked.(storetypes.KVStore).Set(key, v) - } - locked.Write() - }() - - go func() { - defer wg.Done() - - locked := locking.CacheWrapWithLocks([][]byte{b, a}) - defer locked.(storetypes.LockingStore).Unlock() - v := locked.(storetypes.KVStore).Get(key) - if v == nil { - locked.(storetypes.KVStore).Set(key, []byte{1}) - } else { - v[0]++ - locked.(storetypes.KVStore).Set(key, v) - } - locked.Write() - }() - } - - wg.Wait() - require.Equal(t, []byte{200}, locking.Get(key)) -} - -func TestLockingKV_AllowForParallelUpdates(t *testing.T) { - parent := transient.NewStore() - locking := lockingkv.NewStore(parent) - - wg := sync.WaitGroup{} - wg.Add(100) - - lockeds := make([]storetypes.LockingStore, 100) - for i := byte(0); i < 100; i++ { - k := []byte{i} - // We specifically don't unlock the keys during processing so that we can show that we must process all - // of these in parallel before the wait group is done. - locked := locking.CacheWrapWithLocks([][]byte{k}) - lockeds[i] = locked.(storetypes.LockingStore) - go func() { - // The defer order is from last to first so we mark that we are done and then exit. - defer wg.Done() - - locked.(storetypes.KVStore).Set(k, k) - locked.Write() - }() - } - - wg.Wait() - for _, locked := range lockeds { - locked.Unlock() - } - for i := byte(0); i < 100; i++ { - require.Equal(t, []byte{i}, locking.Get([]byte{i})) - } -} - -func TestLockingKV_SetGetHas(t *testing.T) { - parent := transient.NewStore() - parent.Set(a, b) - locking := lockingkv.NewStore(parent) - - // Check that Get is transitive to the parent. - require.Equal(t, b, locking.Get(a)) - require.Nil(t, locking.Get(b)) - - // Check that Has is transitive to the parent. - require.True(t, locking.Has(a)) - require.False(t, locking.Has(b)) - - // Check that Set isn't transitive to the parent. - locking.Set(key, a) - require.False(t, parent.Has(key)) - - // Check that we can read our writes. - require.True(t, locking.Has(key)) - require.Equal(t, a, locking.Get(key)) - - // Check that committing the writes to the parent. - locking.Write() - require.True(t, parent.Has(key)) - require.Equal(t, a, parent.Get(key)) -} - -func TestLockedKV_SetGetHas(t *testing.T) { - parent := transient.NewStore() - parent.Set(a, b) - locking := lockingkv.NewStore(parent) - locked := locking.CacheWrapWithLocks([][]byte{key}).(storetypes.CacheKVStore) - - // Check that Get is transitive to the parent. - require.Equal(t, b, locked.Get(a)) - require.Nil(t, locked.Get(b)) - - // Check that Has is transitive to the parent. - require.True(t, locked.Has(a)) - require.False(t, locked.Has(b)) - - // Check that Set isn't transitive to the parent. - locked.Set(key, a) - require.False(t, locking.Has(key)) - - // Check that we can read our writes. - require.True(t, locked.Has(key)) - require.Equal(t, a, locked.Get(key)) - - // Check that committing the writes to the parent and not the parent's parent. - locked.Write() - require.True(t, locking.Has(key)) - require.Equal(t, a, locking.Get(key)) - require.False(t, parent.Has(key)) - require.Nil(t, parent.Get(key)) - - // Unlock and get another instance of the store to see that the mutations in the locking store are visible. - locked.(storetypes.LockingStore).Unlock() - locked = locking.CacheWrapWithLocks([][]byte{key}).(storetypes.CacheKVStore) - require.True(t, locked.Has(key)) - require.Equal(t, a, locked.Get(key)) - locked.(storetypes.LockingStore).Unlock() -} diff --git a/store/types/store.go b/store/types/store.go index 106bab8c91a..57c98636d16 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -158,6 +158,12 @@ type MultiStore interface { type CacheMultiStore interface { MultiStore Write() // Writes operations to underlying KVStore + Lock(keys [][]byte) + Unlock(keys [][]byte) + RLockRW(Rkeys [][]byte) + LockRW(Rkeys [][]byte) + RUnlockRW(keys [][]byte) + UnlockRW(keys [][]byte) } // CommitMultiStore is an interface for a MultiStore without cache capabilities. @@ -278,14 +284,6 @@ type CacheKVStore interface { Write() } -// LockingStore allows for unlocking the associated lock keys that were acquired during -// locking with CacheWrapWithLocks on a LockingCacheWrapper. -type LockingStore interface { - Store - - Unlock() -} - // CommitKVStore is an interface for MultiStore. type CommitKVStore interface { Committer @@ -318,13 +316,6 @@ type CacheWrapper interface { CacheWrapWithTrace(w io.Writer, tc TraceContext) CacheWrap } -type LockingCacheWrapper interface { - CacheWrapper - - // CacheWrapWithLocks branches a store with the specific lock keys being acquired. - CacheWrapWithLocks(lockKeys [][]byte) CacheWrap -} - func (cid CommitID) IsZero() bool { return cid.Version == 0 && len(cid.Hash) == 0 } @@ -392,8 +383,7 @@ type CapabilityKey StoreKey // KVStoreKey is used for accessing substores. // Only the pointer value should ever be used - it functions as a capabilities key. type KVStoreKey struct { - name string - locking bool + name string } // NewKVStoreKey returns a new pointer to a KVStoreKey. @@ -425,19 +415,7 @@ func (key *KVStoreKey) Name() string { } func (key *KVStoreKey) String() string { - return fmt.Sprintf("KVStoreKey{%p, %s, locking: %t}", key, key.name, key.locking) -} - -func (key *KVStoreKey) IsLocking() bool { - return key.locking -} - -// Enables locking for the store key. -func (key *KVStoreKey) WithLocking() *KVStoreKey { - return &KVStoreKey{ - name: key.name, - locking: true, - } + return fmt.Sprintf("KVStoreKey{%p, %s}", key, key.name) } // TransientStoreKey is used for indexing transient stores in a MultiStore diff --git a/store/types/store_test.go b/store/types/store_test.go index 26337b132cc..b6304d131bc 100644 --- a/store/types/store_test.go +++ b/store/types/store_test.go @@ -81,9 +81,7 @@ func TestKVStoreKey(t *testing.T) { key := NewKVStoreKey("test") require.Equal(t, "test", key.name) require.Equal(t, key.name, key.Name()) - require.Equal(t, fmt.Sprintf("KVStoreKey{%p, test, locking: false}", key), key.String()) - keyWithLocking := key.WithLocking() - require.Equal(t, fmt.Sprintf("KVStoreKey{%p, test, locking: true}", keyWithLocking), keyWithLocking.String()) + require.Equal(t, fmt.Sprintf("KVStoreKey{%p, test}", key), key.String()) } func TestNilKVStoreKey(t *testing.T) { From 649319c8b5d062d648bbe818f9a466d12652a398 Mon Sep 17 00:00:00 2001 From: Roy Li Date: Tue, 14 May 2024 11:29:37 -0400 Subject: [PATCH 2/3] Upgrade store version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 8dfd6c58750..82cc0151152 100644 --- a/go.mod +++ b/go.mod @@ -200,4 +200,4 @@ retract ( replace github.com/cometbft/cometbft => github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540 -replace cosmossdk.io/store => github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018 +replace cosmossdk.io/store => github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac diff --git a/go.sum b/go.sum index 7e5e9d5ee5b..ce29d3d21be 100644 --- a/go.sum +++ b/go.sum @@ -199,8 +199,8 @@ github.com/dvsekhvalnov/jose2go v1.6.0 h1:Y9gnSnP4qEI0+/uQkHvFXeD2PLPJeXEL+ySMEA github.com/dvsekhvalnov/jose2go v1.6.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU= github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540 h1:pkYQbAdOAAoZBSId9kLupCgZHj8YvA9LzM31fVYpjlw= github.com/dydxprotocol/cometbft v0.38.6-0.20240220185844-e704122c8540/go.mod h1:REQN+ObgfYxi39TcYR/Hv95C9bPxY3sYJCvghryj7vY= -github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018 h1:Dn08pzQTajFp1GHaZFd0istbjl793PaT50vfj4mVKNs= -github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240326190927-d35618165018/go.mod h1:zMcD3hfNwd0WMTpdRUhS3QxoCoEtBXWeoKsu3iaLBbQ= +github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac h1:frUaYZlrs9/Tk8fAHjMhcrpk73UEZ36fD7s+megReKQ= +github.com/dydxprotocol/cosmos-sdk/store v1.0.3-0.20240515175455-8168b4407fac/go.mod h1:zMcD3hfNwd0WMTpdRUhS3QxoCoEtBXWeoKsu3iaLBbQ= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= From 035f06fda8c2650263415f55fa926f98efabd76f Mon Sep 17 00:00:00 2001 From: Vincent Chau <99756290+vincentwschau@users.noreply.github.com> Date: Fri, 21 Jun 2024 15:56:12 -0400 Subject: [PATCH 3/3] Update prometheus version. --- go.mod | 4 ++-- go.sum | 8 ++++---- simapp/go.mod | 4 ++-- simapp/go.sum | 8 ++++---- telemetry/metrics.go | 5 +++-- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index 82cc0151152..c9a5239674e 100644 --- a/go.mod +++ b/go.mod @@ -45,8 +45,8 @@ require ( github.com/magiconair/properties v1.8.7 github.com/manifoldco/promptui v0.9.0 github.com/mattn/go-isatty v0.0.20 - github.com/prometheus/client_golang v1.18.0 - github.com/prometheus/common v0.47.0 + github.com/prometheus/client_golang v1.19.0 + github.com/prometheus/common v0.48.0 github.com/rs/zerolog v1.32.0 github.com/spf13/cast v1.6.0 github.com/spf13/cobra v1.8.0 diff --git a/go.sum b/go.sum index ce29d3d21be..436e232f6d8 100644 --- a/go.sum +++ b/go.sum @@ -595,8 +595,8 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -611,8 +611,8 @@ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt2 github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.47.0 h1:p5Cz0FNHo7SnWOmWmoRozVcjEp0bIVU8cV7OShpjL1k= -github.com/prometheus/common v0.47.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= diff --git a/simapp/go.mod b/simapp/go.mod index 966960f01be..8d2dd454520 100644 --- a/simapp/go.mod +++ b/simapp/go.mod @@ -149,9 +149,9 @@ require ( github.com/petermattis/goid v0.0.0-20230904192822-1876fd5063bc // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect - github.com/prometheus/client_golang v1.18.0 // indirect + github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.0 // indirect - github.com/prometheus/common v0.47.0 // indirect + github.com/prometheus/common v0.48.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect diff --git a/simapp/go.sum b/simapp/go.sum index cf5661859f4..578f5397f60 100644 --- a/simapp/go.sum +++ b/simapp/go.sum @@ -901,8 +901,8 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk= -github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= +github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= +github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -917,8 +917,8 @@ github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt2 github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= -github.com/prometheus/common v0.47.0 h1:p5Cz0FNHo7SnWOmWmoRozVcjEp0bIVU8cV7OShpjL1k= -github.com/prometheus/common v0.47.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= +github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE= +github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= diff --git a/telemetry/metrics.go b/telemetry/metrics.go index ad73804764c..b8fc1d90afd 100644 --- a/telemetry/metrics.go +++ b/telemetry/metrics.go @@ -23,6 +23,7 @@ const ( FormatDefault = "" FormatPrometheus = "prometheus" FormatText = "text" + ContentTypeText = `text/plain; version=` + expfmt.TextVersion + `; charset=utf-8` MetricSinkInMem = "mem" MetricSinkStatsd = "statsd" @@ -191,7 +192,7 @@ func (m *Metrics) gatherPrometheus() (GatherResponse, error) { buf := &bytes.Buffer{} defer buf.Reset() - e := expfmt.NewEncoder(buf, expfmt.FmtText) + e := expfmt.NewEncoder(buf, expfmt.NewFormat(expfmt.TypeTextPlain)) for _, mf := range metricsFamilies { if err := e.Encode(mf); err != nil { @@ -199,7 +200,7 @@ func (m *Metrics) gatherPrometheus() (GatherResponse, error) { } } - return GatherResponse{ContentType: string(expfmt.FmtText), Metrics: buf.Bytes()}, nil + return GatherResponse{ContentType: ContentTypeText, Metrics: buf.Bytes()}, nil } // gatherGeneric collects generic metrics and returns a GatherResponse.