Skip to content

Commit

Permalink
improve solo mutexes
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgemmsilva committed Sep 7, 2023
1 parent 120ee05 commit bbdf33a
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 33 deletions.
13 changes: 12 additions & 1 deletion packages/solo/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package solo

import (
"fmt"
"sync"
"time"

"github.com/iotaledger/wasp/packages/isc"
Expand All @@ -30,17 +31,23 @@ type mempoolImpl struct {
requests map[isc.RequestID]isc.Request
info MempoolInfo
currentTime func() time.Time
chainID isc.ChainID
mu sync.Mutex
}

func newMempool(currentTime func() time.Time) Mempool {
func newMempool(currentTime func() time.Time, chainID isc.ChainID) Mempool {
return &mempoolImpl{
requests: map[isc.RequestID]isc.Request{},
info: MempoolInfo{},
currentTime: currentTime,
chainID: chainID,
mu: sync.Mutex{},
}
}

func (mi *mempoolImpl) ReceiveRequests(reqs ...isc.Request) {
mi.mu.Lock()
defer mi.mu.Unlock()
for _, req := range reqs {
if _, ok := mi.requests[req.ID()]; !ok {
mi.info.TotalPool++
Expand All @@ -51,6 +58,8 @@ func (mi *mempoolImpl) ReceiveRequests(reqs ...isc.Request) {
}

func (mi *mempoolImpl) RequestBatchProposal() []isc.Request {
mi.mu.Lock()
defer mi.mu.Unlock()
now := mi.currentTime()
batch := []isc.Request{}
for rid, request := range mi.requests {
Expand Down Expand Up @@ -78,6 +87,8 @@ func (mi *mempoolImpl) RequestBatchProposal() []isc.Request {
}

func (mi *mempoolImpl) RemoveRequest(rID isc.RequestID) {
mi.mu.Lock()
defer mi.mu.Unlock()
if _, ok := mi.requests[rID]; ok {
mi.info.OutPoolCounter++
mi.info.TotalPool--
Expand Down
6 changes: 3 additions & 3 deletions packages/solo/req.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,16 +454,16 @@ func (ch *Chain) CallView(scName, funName string, params ...interface{}) (dict.D

func (ch *Chain) CallViewAtState(chainState state.State, scName, funName string, params ...interface{}) (dict.Dict, error) {
ch.Log().Debugf("callView: %s::%s", scName, funName)
return ch.CallViewByHnameAtState(chainState, isc.Hn(scName), isc.Hn(funName), params...)
return ch.callViewByHnameAtState(chainState, isc.Hn(scName), isc.Hn(funName), params...)
}

func (ch *Chain) CallViewByHname(hContract, hFunction isc.Hname, params ...interface{}) (dict.Dict, error) {
latestState, err := ch.store.LatestState()
require.NoError(ch.Env.T, err)
return ch.CallViewByHnameAtState(latestState, hContract, hFunction, params...)
return ch.callViewByHnameAtState(latestState, hContract, hFunction, params...)
}

func (ch *Chain) CallViewByHnameAtState(chainState state.State, hContract, hFunction isc.Hname, params ...interface{}) (dict.Dict, error) {
func (ch *Chain) callViewByHnameAtState(chainState state.State, hContract, hFunction isc.Hname, params ...interface{}) (dict.Dict, error) {
ch.Log().Debugf("callView: %s::%s", hContract.String(), hFunction.String())

p := parseParams(params)
Expand Down
4 changes: 2 additions & 2 deletions packages/solo/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ func (ch *Chain) runRequestsNolock(reqs []isc.Request, trace string) (results []
l1C := ch.GetL1Commitment()
require.Equal(ch.Env.T, rootC, l1C.TrieRoot())

ch.Env.EnqueueRequests(tx)

return res.RequestResults
}

Expand Down Expand Up @@ -152,8 +154,6 @@ func (ch *Chain) settleStateTransition(stateTx *iotago.Transaction, stateDraft s
}
ch.Log().Infof("state transition --> #%d. Requests in the block: %d. Outputs: %d",
stateDraft.BlockIndex(), len(blockReceipts), len(stateTx.Essence.Outputs))

go ch.Env.EnqueueRequests(stateTx)
}

func (ch *Chain) logRequestLastBlock() {
Expand Down
8 changes: 4 additions & 4 deletions packages/solo/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ type soloChainSnapshot struct {

// SaveSnapshot generates a snapshot of the Solo environment
func (env *Solo) SaveSnapshot(fname string) {
env.glbMutex.Lock()
defer env.glbMutex.Unlock()
env.chainsMutex.Lock()
defer env.chainsMutex.Unlock()

snapshot := soloSnapshot{
UtxoDB: env.utxoDB.State(),
Expand Down Expand Up @@ -63,8 +63,8 @@ func (env *Solo) SaveSnapshot(fname string) {

// LoadSnapshot restores the Solo environment from the given snapshot
func (env *Solo) LoadSnapshot(fname string) {
env.glbMutex.Lock()
defer env.glbMutex.Unlock()
env.chainsMutex.Lock()
defer env.chainsMutex.Unlock()

b, err := os.ReadFile(fname)
require.NoError(env.T, err)
Expand Down
29 changes: 10 additions & 19 deletions packages/solo/solo.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Solo struct {
logger *logger.Logger
chainStateDatabaseManager *database.ChainStateDatabaseManager
utxoDB *utxodb.UtxoDB
glbMutex sync.RWMutex
chainsMutex sync.RWMutex
ledgerMutex sync.RWMutex
chains map[isc.ChainID]*Chain
processorConfig *processors.Config
Expand Down Expand Up @@ -213,8 +213,8 @@ func (env *Solo) Publisher() *publisher.Publisher {
}

func (env *Solo) GetChainByName(name string) *Chain {
env.glbMutex.Lock()
defer env.glbMutex.Unlock()
env.chainsMutex.Lock()
defer env.chainsMutex.Unlock()
for _, ch := range env.chains {
if ch.Name == name {
return ch
Expand Down Expand Up @@ -344,8 +344,8 @@ func (env *Solo) NewChainExt(
) (*Chain, *iotago.Transaction) {
chData, originTx := env.deployChain(chainOriginator, initBaseTokens, name, originParams...)

env.glbMutex.Lock()
defer env.glbMutex.Unlock()
env.chainsMutex.Lock()
defer env.chainsMutex.Unlock()
ch := env.addChain(chData)

ch.log.Infof("chain '%s' deployed. Chain ID: %s", ch.Name, ch.ChainID.String())
Expand All @@ -363,7 +363,7 @@ func (env *Solo) addChain(chData chainData) *Chain {
proc: processors.MustNew(env.processorConfig),
log: env.logger.Named(chData.Name),
metrics: metrics.NewChainMetricsProvider().GetChainMetrics(chData.ChainID),
mempool: newMempool(env.utxoDB.GlobalTime),
mempool: newMempool(env.utxoDB.GlobalTime, chData.ChainID),
migrationScheme: allmigrations.DefaultScheme,
}
env.chains[chData.ChainID] = ch
Expand All @@ -379,8 +379,8 @@ func (env *Solo) AddToLedger(tx *iotago.Transaction) error {

// RequestsForChain parses the transaction and returns all requests contained in it which have chainID as the target
func (env *Solo) RequestsForChain(tx *iotago.Transaction, chainID isc.ChainID) ([]isc.Request, error) {
env.glbMutex.RLock()
defer env.glbMutex.RUnlock()
env.chainsMutex.RLock()
defer env.chainsMutex.RUnlock()

m := env.requestsByChain(tx)
ret, ok := m[chainID]
Expand All @@ -399,18 +399,13 @@ func (env *Solo) requestsByChain(tx *iotago.Transaction) map[isc.ChainID][]isc.R

// AddRequestsToMempool adds all the requests to the chain mempool,
func (env *Solo) AddRequestsToMempool(ch *Chain, reqs []isc.Request) {
env.glbMutex.RLock()
defer env.glbMutex.RUnlock()
ch.runVMMutex.Lock()
defer ch.runVMMutex.Unlock()

ch.mempool.ReceiveRequests(reqs...)
}

// EnqueueRequests adds requests contained in the transaction to mempools of respective target chains
func (env *Solo) EnqueueRequests(tx *iotago.Transaction) {
env.glbMutex.RLock()
defer env.glbMutex.RUnlock()
env.chainsMutex.RLock()
defer env.chainsMutex.RUnlock()

requests := env.requestsByChain(tx)

Expand All @@ -420,11 +415,7 @@ func (env *Solo) EnqueueRequests(tx *iotago.Transaction) {
env.logger.Infof("dispatching requests. Unknown chain: %s", chainID.String())
continue
}
ch.runVMMutex.Lock()

ch.mempool.ReceiveRequests(reqs...)

ch.runVMMutex.Unlock()
}
}

Expand Down
8 changes: 4 additions & 4 deletions packages/vm/core/testcore/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,7 @@ func TestNFTMint(t *testing.T) {
wallet, _ := env.NewKeyPairWithFunds()
anotherUserAgentID := isc.NewAgentID(tpkg.RandEd25519Address())

// mint NFT to self and keep it on chain
// mint NFT to another user and keep it on chain
req := solo.NewCallParams(
accounts.Contract.Name, accounts.FuncMintNFT.Name,
accounts.ParamNFTImmutableData, []byte("foobar"),
Expand All @@ -1449,9 +1449,9 @@ func TestNFTMint(t *testing.T) {
WithAllowance(isc.NewAssetsBaseTokens(1 * isc.Million)).
WithMaxAffordableGasBudget()

require.Len(t, ch.L2NFTs(anotherUserAgentID), 0)
_, err := ch.PostRequestSync(req, wallet)
require.NoError(t, err)
require.Len(t, ch.L2NFTs(anotherUserAgentID), 0)

// post a dummy request to make the chain progress to the next block
ch.PostRequestOffLedger(solo.NewCallParams("foo", "bar"), wallet)
Expand All @@ -1464,7 +1464,7 @@ func TestNFTMint(t *testing.T) {
anotherUserAddr := tpkg.RandEd25519Address()
anotherUserAgentID := isc.NewAgentID(anotherUserAddr)

// mint NFT to self and keep it on chain
// mint NFT to another user and withdraw it
req := solo.NewCallParams(
accounts.Contract.Name, accounts.FuncMintNFT.Name,
accounts.ParamNFTImmutableData, []byte("foobar"),
Expand Down Expand Up @@ -1509,9 +1509,9 @@ func TestNFTMint(t *testing.T) {
WithAllowance(isc.NewAssetsBaseTokens(1 * isc.Million)).
WithMaxAffordableGasBudget()

require.Len(t, ch.L2NFTs(agentID), 0)
_, err := ch.PostRequestSync(req, wallet)
require.NoError(t, err)
require.Len(t, ch.L2NFTs(agentID), 0)

// post a dummy request to make the chain progress to the next block
ch.PostRequestOffLedger(solo.NewCallParams("foo", "bar"), wallet)
Expand Down
4 changes: 4 additions & 0 deletions packages/vm/vmtxbuilder/txbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,7 @@ func retryOutputFromOnLedgerRequest(req isc.OnLedgerRequest, chainAliasID iotago
}
return out
}

func (txb *AnchorTransactionBuilder) chainAddress() iotago.Address {
return txb.anchorOutput.AliasID.ToAddress()
}

0 comments on commit bbdf33a

Please sign in to comment.