Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport upstream hashdb locking changes #291

Merged
merged 4 commits into from
Mar 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 25 additions & 53 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,6 @@ var Defaults = &Config{
// Database is an intermediate write layer between the trie data structures and
// the disk database. The aim is to accumulate trie writes in-memory and only
// periodically flush a couple tries to disk, garbage collecting the remainder.
//
// Note, the trie Database is **not** thread safe in its mutations, but it **is**
// thread safe in providing individual, independent node access. The rationale
// behind this split design is to provide read access to RPC handlers and sync
// servers even while the trie is executing expensive garbage collection.
type Database struct {
diskdb ethdb.Database // Persistent storage for matured trie nodes
resolver ChildResolver // The handler to resolve children of nodes
Expand All @@ -113,7 +108,7 @@ type Database struct {
// cachedNode is all the information we know about a single cached trie node
// in the memory database write layer.
type cachedNode struct {
node []byte // Encoded node blob
node []byte // Encoded node blob, immutable
parents uint32 // Number of live nodes referencing this one
external map[common.Hash]struct{} // The set of external children
flushPrev common.Hash // Previous node in the flush-list
Expand Down Expand Up @@ -152,9 +147,9 @@ func New(diskdb ethdb.Database, config *Config, resolver ChildResolver) *Databas
}
}

// insert inserts a simplified trie node into the memory database.
// All nodes inserted by this function will be reference tracked
// and in theory should only used for **trie nodes** insertion.
// insert inserts a trie node into the memory database. All nodes inserted by
// this function will be reference tracked. This function assumes the lock is
// already held.
func (db *Database) insert(hash common.Hash, node []byte) {
// If the node's already cached, skip
if _, ok := db.dirties[hash]; ok {
Expand Down Expand Up @@ -183,9 +178,9 @@ func (db *Database) insert(hash common.Hash, node []byte) {
db.dirtiesSize += common.StorageSize(common.HashLength + len(node))
}

// Node retrieves an encoded cached trie node from memory. If it cannot be found
// node retrieves an encoded cached trie node from memory. If it cannot be found
// cached, the method queries the persistent database for the content.
func (db *Database) Node(hash common.Hash) ([]byte, error) {
func (db *Database) node(hash common.Hash) ([]byte, error) {
// It doesn't make sense to retrieve the metaroot
if hash == (common.Hash{}) {
return nil, errors.New("not found")
Expand All @@ -198,11 +193,14 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) {
return enc, nil
}
}
// Retrieve the node from the dirty cache if available
// Retrieve the node from the dirty cache if available.
db.lock.RLock()
dirty := db.dirties[hash]
db.lock.RUnlock()

// Return the cached node if it's found in the dirty set.
// The dirty.node field is immutable and safe to read it
// even without lock guard.
if dirty != nil {
memcacheDirtyHitMeter.Mark(1)
memcacheDirtyReadMeter.Mark(int64(len(dirty.node)))
Expand All @@ -223,18 +221,9 @@ func (db *Database) Node(hash common.Hash) ([]byte, error) {
return nil, errors.New("not found")
}

// Nodes retrieves the hashes of all the nodes cached within the memory database.
// This method is extremely expensive and should only be used to validate internal
// states in test code.
func (db *Database) Nodes() []common.Hash {
db.lock.RLock()
defer db.lock.RUnlock()

var hashes = make([]common.Hash, 0, len(db.dirties))
for hash := range db.dirties {
hashes = append(hashes, hash)
}
return hashes
// arbitrum: exposing hashdb.Database.Node for triedb.Database.Node currently used by arbitrum.RecordingKV.Get
func (db *Database) Node(hash common.Hash) ([]byte, error) {
return db.node(hash)
}

// Reference adds a new reference from a parent node to a child node.
Expand Down Expand Up @@ -344,33 +333,28 @@ func (db *Database) dereference(hash common.Hash) {

// Cap iteratively flushes old but still referenced trie nodes until the total
// memory usage goes below the given threshold.
//
// Note, this method is a non-synchronized mutator. It is unsafe to call this
// concurrently with other mutators.
func (db *Database) Cap(limit common.StorageSize) error {
db.lock.Lock()
defer db.lock.Unlock()

// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
// memory cache during commit but not yet in persistent storage). This is ensured
// by only uncaching existing data when the database write finalizes.
start := time.Now()
batch := db.diskdb.NewBatch()
db.lock.RLock()
nodes, storage := len(db.dirties), db.dirtiesSize
nodes, storage, start := len(db.dirties), db.dirtiesSize, time.Now()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
size := db.dirtiesSize + common.StorageSize(len(db.dirties)*cachedNodeSize)
size += db.childrenSize
db.lock.RUnlock()

// Keep committing nodes from the flush-list until we're below allowance
oldest := db.oldest
for size > limit && oldest != (common.Hash{}) {
// Fetch the oldest referenced node and push into the batch
db.lock.RLock()
node := db.dirties[oldest]
db.lock.RUnlock()
rawdb.WriteLegacyTrieNode(batch, oldest, node.node)

// If we exceeded the ideal batch size, commit and reset
Expand All @@ -396,9 +380,6 @@ func (db *Database) Cap(limit common.StorageSize) error {
return err
}
// Write successful, clear out the flushed data
db.lock.Lock()
defer db.lock.Unlock()

for db.oldest != oldest {
node := db.dirties[db.oldest]
delete(db.dirties, db.oldest)
Expand Down Expand Up @@ -429,14 +410,13 @@ func (db *Database) Cap(limit common.StorageSize) error {
// Commit iterates over all the children of a particular node, writes them out
// to disk, forcefully tearing down all references in both directions. As a side
// effect, all pre-images accumulated up to this point are also written.
//
// Note, this method is a non-synchronized mutator. It is unsafe to call this
// concurrently with other mutators.
func (db *Database) Commit(node common.Hash, report bool) error {
if node == (common.Hash{}) {
// There's no data to commit in this node
return nil
}
db.lock.Lock()
defer db.lock.Unlock()

// Create a database batch to flush persistent data out. It is important that
// outside code doesn't see an inconsistent state (referenced data removed from
Expand All @@ -446,9 +426,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
batch := db.diskdb.NewBatch()

// Move the trie itself into the batch, flushing if enough data is accumulated
db.lock.RLock()
nodes, storage := len(db.dirties), db.dirtiesSize
db.lock.RUnlock()

uncacher := &cleaner{db}
if err := db.commit(node, batch, uncacher); err != nil {
Expand All @@ -461,8 +439,6 @@ func (db *Database) Commit(node common.Hash, report bool) error {
return err
}
// Uncache any leftovers in the last batch
db.lock.Lock()
defer db.lock.Unlock()
if err := batch.Replay(uncacher); err != nil {
return err
}
Expand Down Expand Up @@ -490,9 +466,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
// commit is the private locked version of Commit.
func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleaner) error {
// If the node does not exist, it's a previously committed node
db.lock.RLock()
node, ok := db.dirties[hash]
db.lock.RUnlock()
if !ok {
return nil
}
Expand All @@ -513,13 +487,11 @@ func (db *Database) commit(hash common.Hash, batch ethdb.Batch, uncacher *cleane
if err := batch.Write(); err != nil {
return err
}
db.lock.Lock()
err := batch.Replay(uncacher)
batch.Reset()
db.lock.Unlock()
if err != nil {
return err
}
batch.Reset()
}
return nil
}
Expand Down Expand Up @@ -588,7 +560,7 @@ func (db *Database) Initialized(genesisRoot common.Hash) bool {
func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, nodes *trienode.MergedNodeSet, states *triestate.Set) error {
// Ensure the parent state is present and signal a warning if not.
if parent != types.EmptyRootHash {
if blob, _ := db.Node(parent); len(blob) == 0 {
if blob, _ := db.node(parent); len(blob) == 0 {
log.Error("parent state is not present")
}
}
Expand Down Expand Up @@ -669,7 +641,7 @@ func (db *Database) Scheme() string {
// Reader retrieves a node reader belonging to the given state root.
// An error will be returned if the requested state is not available.
func (db *Database) Reader(root common.Hash) (*reader, error) {
if _, err := db.Node(root); err != nil {
if _, err := db.node(root); err != nil {
return nil, fmt.Errorf("state %#x is not available, %v", root, err)
}
return &reader{db: db}, nil
Expand All @@ -680,9 +652,9 @@ type reader struct {
db *Database
}

// Node retrieves the trie node with the given node hash.
// No error will be returned if the node is not found.
// Node retrieves the trie node with the given node hash. No error will be
// returned if the node is not found.
func (reader *reader) Node(owner common.Hash, path []byte, hash common.Hash) ([]byte, error) {
blob, _ := reader.db.Node(hash)
blob, _ := reader.db.node(hash)
return blob, nil
}
Loading