diff --git a/core/computers.go b/core/computers.go index 48d209dd9..371560460 100644 --- a/core/computers.go +++ b/core/computers.go @@ -5,6 +5,9 @@ import ( "strconv" "strings" "time" + + "github.com/ElrondNetwork/elrond-go-core/core/check" + "github.com/ElrondNetwork/elrond-go-core/data" ) // MaxInt32 returns the maximum of two given numbers @@ -182,3 +185,17 @@ func IsValidESDTRole(role string) bool { return false } } + +// GetHeaderType will return the type of the provided header +func GetHeaderType(header data.HeaderHandler) HeaderType { + switch { + case check.IfNil(header): + return "" + case header.GetShardID() == MetachainShardId: + return MetaHeader + case check.IfNil(header.GetAdditionalData()): + return ShardHeaderV1 + default: + return ShardHeaderV2 + } +} diff --git a/core/computers_test.go b/core/computers_test.go index 15d1d9b63..aae41c251 100644 --- a/core/computers_test.go +++ b/core/computers_test.go @@ -8,6 +8,8 @@ import ( "time" "github.com/ElrondNetwork/elrond-go-core/core" + dataCore "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/block" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -502,3 +504,16 @@ func TestIsValidESDTRole(t *testing.T) { require.Equal(t, tt.output, core.IsValidESDTRole(tt.input)) } } + +func TestGetHeaderType(t *testing.T) { + t.Parallel() + + require.Equal(t, core.HeaderType(""), core.GetHeaderType(nil)) + + var nilHeader dataCore.HeaderHandler + require.Equal(t, core.HeaderType(""), core.GetHeaderType(nilHeader)) + + require.Equal(t, core.MetaHeader, core.GetHeaderType(&block.MetaBlock{})) + require.Equal(t, core.ShardHeaderV1, core.GetHeaderType(&block.Header{})) + require.Equal(t, core.ShardHeaderV2, core.GetHeaderType(&block.HeaderV2{})) +} diff --git a/core/constants.go b/core/constants.go index 829b8841d..d57bc6e2f 100644 --- a/core/constants.go +++ b/core/constants.go @@ -1,5 +1,17 @@ package core +// HeaderType defines the type to be used for the header that is sent +type HeaderType string + +const ( + // MetaHeader defines the type of *block.MetaBlock + MetaHeader HeaderType = "MetaBlock" + // ShardHeaderV1 defines the type of *block.Header + ShardHeaderV1 HeaderType = "Header" + // ShardHeaderV2 defines the type of *block.HeaderV2 + ShardHeaderV2 HeaderType = "HeaderV2" +) + // NodeType represents the node's role in the network type NodeType string @@ -230,3 +242,15 @@ const SCDeployIdentifier = "SCDeploy" // SCUpgradeIdentifier is the identifier for a smart contract upgrade const SCUpgradeIdentifier = "SCUpgrade" + +// WriteLogIdentifier is the identifier for the information log that is generated by a smart contract call/esdt transfer +const WriteLogIdentifier = "writeLog" + +// SignalErrorOperation is the identifier for the log that is generated when a smart contract is executed but return an error +const SignalErrorOperation = "signalError" + +// CompletedTxEventIdentifier is the identifier for the log that is generated when the execution of a smart contract call is done +const CompletedTxEventIdentifier = "completedTxEvent" + +// GasRefundForRelayerMessage is the return message for to the smart contract result with refund for the relayer +const GasRefundForRelayerMessage = "gas refund for relayer" diff --git a/core/sharding/errors.go b/core/sharding/errors.go new file mode 100644 index 000000000..d72d0d9ea --- /dev/null +++ b/core/sharding/errors.go @@ -0,0 +1,9 @@ +package sharding + +import "errors" + +// ErrInvalidNumberOfShards signals that an invalid number of shards was passed to the sharding registry +var ErrInvalidNumberOfShards = errors.New("the number of shards must be greater than zero") + +// ErrInvalidShardId signals that an invalid shard is was passed +var ErrInvalidShardId = errors.New("shard id must be smaller than the total number of shards") diff --git a/core/sharding/multiShardCoordinator.go b/core/sharding/multiShardCoordinator.go new file mode 100644 index 000000000..ce2298ca5 --- /dev/null +++ b/core/sharding/multiShardCoordinator.go @@ -0,0 +1,134 @@ +package sharding + +import ( + "bytes" + "math" + + "github.com/ElrondNetwork/elrond-go-core/core" +) + +// multiShardCoordinator struct defines the functionality for handling transaction dispatching to +// the corresponding shards. The number of shards is currently passed as a constructor +// parameter and later it should be calculated by this structure +type multiShardCoordinator struct { + maskHigh uint32 + maskLow uint32 + selfId uint32 + numberOfShards uint32 +} + +// NewMultiShardCoordinator returns a new multiShardCoordinator and initializes the masks +func NewMultiShardCoordinator(numberOfShards, selfId uint32) (*multiShardCoordinator, error) { + if numberOfShards < 1 { + return nil, ErrInvalidNumberOfShards + } + if selfId >= numberOfShards && selfId != core.MetachainShardId { + return nil, ErrInvalidShardId + } + + sr := &multiShardCoordinator{} + sr.selfId = selfId + sr.numberOfShards = numberOfShards + sr.maskHigh, sr.maskLow = calculateMasks(numberOfShards) + + return sr, nil +} + +// calculateMasks will create two numbers who's binary form is composed from as many +// ones needed to be taken into consideration for the shard assignment. The result +// of a bitwise AND operation of an address with this mask will result in the +// shard id where a transaction from that address will be dispatched +func calculateMasks(numOfShards uint32) (uint32, uint32) { + n := math.Ceil(math.Log2(float64(numOfShards))) + return (1 << uint(n)) - 1, (1 << uint(n-1)) - 1 +} + +// ComputeId calculates the shard for a given address container +func (msc *multiShardCoordinator) ComputeId(address []byte) uint32 { + return msc.ComputeIdFromBytes(address) +} + +// ComputeShardID will compute shard id of the given address based on the number of shards parameter +func ComputeShardID(address []byte, numberOfShards uint32) uint32 { + maskHigh, maskLow := calculateMasks(numberOfShards) + + return computeIdBasedOfNrOfShardAndMasks(address, numberOfShards, maskHigh, maskLow) +} + +// ComputeIdFromBytes calculates the shard for a given address +func (msc *multiShardCoordinator) ComputeIdFromBytes(address []byte) uint32 { + if core.IsEmptyAddress(address) { + return msc.selfId + } + + return computeIdBasedOfNrOfShardAndMasks(address, msc.numberOfShards, msc.maskHigh, msc.maskLow) +} + +func computeIdBasedOfNrOfShardAndMasks(address []byte, numberOfShards, maskHigh, maskLow uint32) uint32 { + var bytesNeed int + if numberOfShards <= 256 { + bytesNeed = 1 + } else if numberOfShards <= 65536 { + bytesNeed = 2 + } else if numberOfShards <= 16777216 { + bytesNeed = 3 + } else { + bytesNeed = 4 + } + + startingIndex := 0 + if len(address) > bytesNeed { + startingIndex = len(address) - bytesNeed + } + + buffNeeded := address[startingIndex:] + if core.IsSmartContractOnMetachain(buffNeeded, address) { + return core.MetachainShardId + } + + addr := uint32(0) + for i := 0; i < len(buffNeeded); i++ { + addr = addr<<8 + uint32(buffNeeded[i]) + } + + shard := addr & maskHigh + if shard > numberOfShards-1 { + shard = addr & maskLow + } + + return shard +} + +// NumberOfShards returns the number of shards +func (msc *multiShardCoordinator) NumberOfShards() uint32 { + return msc.numberOfShards +} + +// SelfId gets the shard id of the current node +func (msc *multiShardCoordinator) SelfId() uint32 { + return msc.selfId +} + +// SameShard returns weather two addresses belong to the same shard +func (msc *multiShardCoordinator) SameShard(firstAddress, secondAddress []byte) bool { + if core.IsEmptyAddress(firstAddress) || core.IsEmptyAddress(secondAddress) { + return true + } + + if bytes.Equal(firstAddress, secondAddress) { + return true + } + + return msc.ComputeId(firstAddress) == msc.ComputeId(secondAddress) +} + +// CommunicationIdentifier returns the identifier between current shard ID and destination shard ID +// identifier is generated such as the first shard from identifier is always smaller or equal than the last +func (msc *multiShardCoordinator) CommunicationIdentifier(destShardID uint32) string { + return core.CommunicationIdentifierBetweenShards(msc.selfId, destShardID) +} + +// IsInterfaceNil returns true if there is no value under the interface +func (msc *multiShardCoordinator) IsInterfaceNil() bool { + return msc == nil +} diff --git a/core/sharding/multiShardCoordinator_test.go b/core/sharding/multiShardCoordinator_test.go new file mode 100644 index 000000000..4bb44f111 --- /dev/null +++ b/core/sharding/multiShardCoordinator_test.go @@ -0,0 +1,225 @@ +package sharding + +import ( + "bytes" + "encoding/binary" + "encoding/hex" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func getAddressFromUint32(address uint32) []byte { + buff := make([]byte, 4) + binary.BigEndian.PutUint32(buff, address) + + return buff +} + +func TestMultiShardCoordinator_NewMultiShardCoordinator(t *testing.T) { + numOfShards := uint32(10) + sr, _ := NewMultiShardCoordinator(numOfShards, 0) + assert.Equal(t, numOfShards, sr.NumberOfShards()) + expectedMask1, expectedMask2 := calculateMasks(sr.NumberOfShards()) + actualMask1 := sr.maskHigh + actualMask2 := sr.maskLow + assert.Equal(t, expectedMask1, actualMask1) + assert.Equal(t, expectedMask2, actualMask2) +} + +func TestMultiShardCoordinator_NewMultiShardCoordinatorInvalidNumberOfShards(t *testing.T) { + sr, err := NewMultiShardCoordinator(0, 0) + assert.Nil(t, sr) + assert.Equal(t, ErrInvalidNumberOfShards, err) +} + +func TestMultiShardCoordinator_NewMultiShardCoordinatorSelfIdGraterThanNumOfShardsShouldError(t *testing.T) { + _, err := NewMultiShardCoordinator(1, 2) + assert.Equal(t, ErrInvalidShardId, err) +} + +func TestMultiShardCoordinator_NewMultiShardCoordinatorCorrectSelfId(t *testing.T) { + currentShardId := uint32(0) + sr, _ := NewMultiShardCoordinator(1, currentShardId) + assert.Equal(t, currentShardId, sr.SelfId()) +} + +func TestMultiShardCoordinator_ComputeIdDoesNotGenerateInvalidShards(t *testing.T) { + numOfShards := uint32(10) + selfId := uint32(0) + sr, _ := NewMultiShardCoordinator(numOfShards, selfId) + + for i := 0; i < 200; i++ { + addr := getAddressFromUint32(uint32(i)) + shardId := sr.ComputeId(addr) + assert.True(t, shardId < sr.NumberOfShards()) + } +} + +func TestMultiShardCoordinator_ComputeId10ShardsShouldWork(t *testing.T) { + numOfShards := uint32(10) + selfId := uint32(0) + sr, _ := NewMultiShardCoordinator(numOfShards, selfId) + + dataSet := []struct { + address, shardId uint32 + }{ + {0, 0}, + {1, 1}, + {2, 2}, + {3, 3}, + {4, 4}, + {5, 5}, + {6, 6}, + {7, 7}, + {8, 8}, + {9, 9}, + {10, 2}, + {11, 3}, + {12, 4}, + {13, 5}, + {14, 6}, + {15, 7}, + } + for _, data := range dataSet { + addr := getAddressFromUint32(data.address) + shardId := sr.ComputeId(addr) + + assert.Equal(t, data.shardId, shardId) + } +} + +func TestMultiShardCoordinator_ComputeId10ShardsBigNumbersShouldWork(t *testing.T) { + numOfShards := uint32(10) + selfId := uint32(0) + sr, _ := NewMultiShardCoordinator(numOfShards, selfId) + + dataSet := []struct { + address string + shardId uint32 + }{ + {"2ca2ed0a1c77b5ddeabf99e3f17074f1c77b5ddea37dbaacf501ef1752950c50", 0}, + {"5f7e73b922883bf97b5ddeab9e3f103b8ddea37dbaaca24b5ddea37dbaac1061", 1}, + {"65b9926097345bf7b5ddeab99e3f1cc7c71c01bfd3e1efacc1d0df1ba8f96172", 2}, + {"c1c77b5ddea5c71c4160c861c01ba8f9617a65010d2baac7827a501bbf29aad3", 3}, + {"22c2e1facc1d1c77b5d16160c861c01ba8f96170c86783b8deabaac782733b84", 4}, + {"4cc88bdac668dc1878271e79a67b5ddeaddea37dbaacbbf99e3f1f4e5a4d7085", 5}, + {"b533facc1daa3a617466f4160c861c01ba8f9617a65010d160c86783b82a5836", 6}, + {"b1487283ad280316baa160c861c01ba8f9617c78270c8668dc18b5ddea37dba7", 7}, + {"acfba138faed1c7b5d668dc1878b5ddea37dbaac271edeab7160c861c01ba8f8", 8}, + {"cc3757647aebf9d160c86e9f9eb5dde0c86783b89160a37dbaac3f15c8f48999", 9}, + {"1a30b33104a94a65010d2a5e87285eb0ea37dbaac60c86c3facc1d4d1ba8f96a", 2}, + {"fcca8da9ba5160c86783b89160ea37dbaac60c86c86783b8c868be1ba8f9617b", 3}, + {"8f9b094668dc1878271ed1b1b5ddea37dbaac60c86760f2e4c71c01bf6a913cc", 4}, + {"a2d768be59a607d160c86eb5ddea37dbaafacc1dc9fa0e0c86783b8916092cbd", 5}, + {"365865f21b2e0d668dc18ea37dbaac60c8678271e160c86e9160c86783b8fe6e", 6}, + {"16cc745884a65ba160c861c01ba8f9617ac7827d160c86e9f010d2a592b3a52f", 7}, + } + for _, data := range dataSet { + buff, err := hex.DecodeString(data.address) + assert.Nil(t, err) + + shardId := sr.ComputeId(buff) + second := ComputeShardID(buff, sr.numberOfShards) + + assert.Equal(t, data.shardId, shardId) + assert.Equal(t, data.shardId, second) + } +} + +func TestMultiShardCoordinator_ComputeIdSameSuffixHasSameShard(t *testing.T) { + numOfShards := uint32(2) + selfId := uint32(0) + sr, _ := NewMultiShardCoordinator(numOfShards, selfId) + + dataSet := []struct { + address, shardId uint32 + }{ + {0, 0}, + {1, 1}, + {2, 0}, + {3, 1}, + {4, 0}, + {5, 1}, + {6, 0}, + {7, 1}, + {8, 0}, + {9, 1}, + } + for _, data := range dataSet { + addr := getAddressFromUint32(data.address) + shardId := sr.ComputeId(addr) + + assert.Equal(t, data.shardId, shardId) + } +} + +func TestMultiShardCoordinator_SameShardSameAddress(t *testing.T) { + shard, _ := NewMultiShardCoordinator(1, 0) + addr1 := getAddressFromUint32(uint32(1)) + addr2 := getAddressFromUint32(uint32(1)) + + assert.True(t, shard.SameShard(addr1, addr2)) +} + +func TestMultiShardCoordinator_SameShardSameAddressMultipleShards(t *testing.T) { + shard, _ := NewMultiShardCoordinator(11, 0) + addr1 := getAddressFromUint32(uint32(1)) + addr2 := getAddressFromUint32(uint32(1)) + + assert.True(t, shard.SameShard(addr1, addr2)) +} + +func TestMultiShardCoordinator_SameShardDifferentAddress(t *testing.T) { + shard, _ := NewMultiShardCoordinator(1, 0) + addr1 := getAddressFromUint32(uint32(1)) + addr2 := getAddressFromUint32(uint32(2)) + + assert.True(t, shard.SameShard(addr1, addr2)) +} + +func TestMultiShardCoordinator_SameShardDifferentAddressMultipleShards(t *testing.T) { + shard, _ := NewMultiShardCoordinator(2, 0) + + addr1 := getAddressFromUint32(uint32(1)) + addr2 := getAddressFromUint32(uint32(2)) + + assert.False(t, shard.SameShard(addr1, addr2)) +} + +func TestMultiShardCoordinator_ComputeIDContractDeploy(t *testing.T) { + shard, _ := NewMultiShardCoordinator(2, 1) + + addr1 := bytes.Repeat([]byte{0}, 32) + assert.Equal(t, shard.ComputeId(addr1), shard.SelfId()) +} + +func TestMultiShardCoordinator_SameShardContractDeploy(t *testing.T) { + shard, _ := NewMultiShardCoordinator(2, 0) + + addr1 := bytes.Repeat([]byte{0}, 32) + addr2 := bytes.Repeat([]byte{1}, 32) + assert.True(t, shard.SameShard(addr1, addr2)) +} + +func TestMultiShardCoordinator_CommunicationIdentifierSameShard(t *testing.T) { + destId := uint32(1) + selfId := uint32(1) + shard, _ := NewMultiShardCoordinator(2, selfId) + assert.Equal(t, fmt.Sprintf("_%d", selfId), shard.CommunicationIdentifier(destId)) +} + +func TestMultiShardCoordinator_CommunicationIdentifierSmallerDestination(t *testing.T) { + destId := uint32(0) + selfId := uint32(1) + shard, _ := NewMultiShardCoordinator(2, selfId) + assert.Equal(t, fmt.Sprintf("_%d_%d", destId, selfId), shard.CommunicationIdentifier(destId)) +} + +func TestMultiShardCoordinator_CommunicationIdentifier(t *testing.T) { + destId := uint32(1) + selfId := uint32(0) + shard, _ := NewMultiShardCoordinator(2, selfId) + assert.Equal(t, fmt.Sprintf("_%d_%d", selfId, destId), shard.CommunicationIdentifier(destId)) +} diff --git a/data/api/apiBlock.go b/data/api/apiBlock.go index 7fa495227..9f5670933 100644 --- a/data/api/apiBlock.go +++ b/data/api/apiBlock.go @@ -4,6 +4,7 @@ import ( "math/big" "time" + "github.com/ElrondNetwork/elrond-go-core/data/outport" "github.com/ElrondNetwork/elrond-go-core/data/transaction" ) @@ -54,12 +55,13 @@ type EpochStartInfo struct { // NotarizedBlock represents a notarized block type NotarizedBlock struct { - Hash string `json:"hash"` - Nonce uint64 `json:"nonce"` - Round uint64 `json:"round"` - Shard uint32 `json:"shard"` - RootHash string `json:"rootHash"` - MiniBlockHashes []string `json:"miniBlockHashes,omitempty"` + Hash string `json:"hash"` + Nonce uint64 `json:"nonce"` + Round uint64 `json:"round"` + Shard uint32 `json:"shard"` + RootHash string `json:"rootHash"` + MiniBlockHashes []string `json:"miniBlockHashes,omitempty"` + AlteredAccounts []*outport.AlteredAccount `json:"alteredAccounts,omitempty"` } // EpochStartShardData is a structure that holds data about the epoch start shard data @@ -118,3 +120,34 @@ type Delegator struct { Total string `json:"total"` TotalAsBigInt *big.Int `json:"-"` } + +// BlockFetchType is the type that specifies how a block should be queried from API +type BlockFetchType string + +func (aft BlockFetchType) String() string { + return string(aft) +} + +const ( + // BlockFetchTypeByHash is to be used when a block should be fetched from API based on its hash + BlockFetchTypeByHash BlockFetchType = "by-hash" + + // BlockFetchTypeByNonce is to be used when a block should be fetched from API based on its nonce + BlockFetchTypeByNonce BlockFetchType = "by-nonce" +) + +// TODO: GetBlockParameters can be used for other endpoints as well + +// GetBlockParameters holds the parameters for requesting a block on API +type GetBlockParameters struct { + RequestType BlockFetchType + Hash []byte + Nonce uint64 +} + +// GetAlteredAccountsForBlockOptions specifies the options for returning altered accounts for a given block +type GetAlteredAccountsForBlockOptions struct { + GetBlockParameters + TokensFilter string + WithMetadata bool +} diff --git a/data/api/apiBlock_test.go b/data/api/apiBlock_test.go new file mode 100644 index 000000000..740644e5c --- /dev/null +++ b/data/api/apiBlock_test.go @@ -0,0 +1,16 @@ +package api_test + +import ( + "testing" + + "github.com/ElrondNetwork/elrond-go-core/data/api" + "github.com/stretchr/testify/require" +) + +func TestAPIBlockFetchType(t *testing.T) { + byNonceType := api.BlockFetchTypeByNonce + require.Equal(t, "by-nonce", byNonceType.String()) + + byHashType := api.BlockFetchTypeByHash + require.Equal(t, "by-hash", byHashType.String()) +} diff --git a/data/indexer/dtos.go b/data/indexer/dtos.go deleted file mode 100644 index 3699edc5b..000000000 --- a/data/indexer/dtos.go +++ /dev/null @@ -1,71 +0,0 @@ -package indexer - -import ( - "time" - - "github.com/ElrondNetwork/elrond-go-core/data" - "github.com/ElrondNetwork/elrond-go-core/data/esdt" -) - -// AccountTokenData holds the data needed for indexing a token of an altered account -type AccountTokenData struct { - Identifier string `json:"identifier"` - Balance string `json:"balance"` - Nonce uint64 `json:"nonce"` - Properties string `json:"properties"` - MetaData *esdt.MetaData `json:"metadata"` -} - -// AlteredAccount holds the data needed of an altered account in a block -type AlteredAccount struct { - Address string `json:"address"` - Balance string `json:"balance,omitempty"` - Nonce uint64 `json:"nonce"` - Tokens []*AccountTokenData `json:"tokens"` -} - -// ArgsSaveBlockData will contains all information that are needed to save block data -type ArgsSaveBlockData struct { - HeaderHash []byte - Body data.BodyHandler - Header data.HeaderHandler - SignersIndexes []uint64 - NotarizedHeadersHashes []string - HeaderGasConsumption HeaderGasConsumption - TransactionsPool *Pool - AlteredAccounts map[string]*AlteredAccount -} - -// HeaderGasConsumption holds the data needed to save the gas consumption of a header -type HeaderGasConsumption struct { - GasProvided uint64 - GasRefunded uint64 - GasPenalized uint64 - MaxGasPerBlock uint64 -} - -// Pool will holds all types of transaction -type Pool struct { - Txs map[string]data.TransactionHandler - Scrs map[string]data.TransactionHandler - Rewards map[string]data.TransactionHandler - Invalid map[string]data.TransactionHandler - Receipts map[string]data.TransactionHandler - Logs []*data.LogData -} - -// ValidatorRatingInfo is a structure containing validator rating information -type ValidatorRatingInfo struct { - PublicKey string - Rating float32 -} - -// RoundInfo is a structure containing block signers and shard id -type RoundInfo struct { - Index uint64 - SignersIndexes []uint64 - BlockWasProposed bool - ShardId uint32 - Epoch uint32 - Timestamp time.Duration -} diff --git a/data/interface.go b/data/interface.go index 904a59d6e..cd8edde7f 100644 --- a/data/interface.go +++ b/data/interface.go @@ -280,6 +280,19 @@ type TransactionHandler interface { CheckIntegrity() error } +// TransactionHandlerWithGasUsedAndFee extends TransactionHandler by also including used gas and fee +type TransactionHandlerWithGasUsedAndFee interface { + TransactionHandler + + SetInitialPaidFee(fee *big.Int) + SetGasUsed(gasUsed uint64) + SetFee(fee *big.Int) + GetInitialPaidFee() *big.Int + GetGasUsed() uint64 + GetFee() *big.Int + GetTxHandler() TransactionHandler +} + // LogHandler defines the type for a log resulted from executing a transaction or smart contract call type LogHandler interface { // GetAddress returns the address of the sc that was originally called by the user diff --git a/data/mock/shardCoordinatorMock.go b/data/mock/shardCoordinatorMock.go index 610ddc20d..431057c97 100644 --- a/data/mock/shardCoordinatorMock.go +++ b/data/mock/shardCoordinatorMock.go @@ -6,37 +6,37 @@ import ( // ShardCoordinatorMock - type ShardCoordinatorMock struct { - SelfID uint32 - NrOfShards uint32 + SelfID uint32 + NumOfShards uint32 } // NumberOfShards - -func (scm ShardCoordinatorMock) NumberOfShards() uint32 { - return scm.NrOfShards +func (scm *ShardCoordinatorMock) NumberOfShards() uint32 { + return scm.NumOfShards } // ComputeId - -func (scm ShardCoordinatorMock) ComputeId(_ []byte) uint32 { +func (scm *ShardCoordinatorMock) ComputeId(_ []byte) uint32 { panic("implement me") } // SetSelfId - -func (scm ShardCoordinatorMock) SetSelfId(_ uint32) error { +func (scm *ShardCoordinatorMock) SetSelfId(_ uint32) error { panic("implement me") } // SelfId - -func (scm ShardCoordinatorMock) SelfId() uint32 { +func (scm *ShardCoordinatorMock) SelfId() uint32 { return scm.SelfID } // SameShard - -func (scm ShardCoordinatorMock) SameShard(_, _ []byte) bool { +func (scm *ShardCoordinatorMock) SameShard(_, _ []byte) bool { return true } // CommunicationIdentifier - -func (scm ShardCoordinatorMock) CommunicationIdentifier(destShardID uint32) string { +func (scm *ShardCoordinatorMock) CommunicationIdentifier(destShardID uint32) string { if destShardID == core.MetachainShardId { return "_0_META" } diff --git a/data/outport/dtos.go b/data/outport/dtos.go new file mode 100644 index 000000000..05c271f88 --- /dev/null +++ b/data/outport/dtos.go @@ -0,0 +1,86 @@ +package outport + +import ( + "time" + + "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/esdt" +) + +// AccountTokenData holds the data needed for indexing a token of an altered account +type AccountTokenData struct { + Nonce uint64 `json:"nonce"` + Identifier string `json:"identifier"` + Balance string `json:"balance"` + Properties string `json:"properties"` + MetaData *esdt.MetaData `json:"metadata"` + AdditionalData *AdditionalAccountTokenData `json:"additionalData,omitempty"` +} + +// AlteredAccount holds the data needed of an altered account in a block +type AlteredAccount struct { + Nonce uint64 `json:"nonce"` + Address string `json:"address"` + Balance string `json:"balance,omitempty"` + Tokens []*AccountTokenData `json:"tokens"` + AdditionalData *AdditionalAccountData `json:"additionalData,omitempty"` +} + +// AdditionalAccountData holds the additional data for an altered account +type AdditionalAccountData struct { + IsSender bool `json:"isSender,omitempty"` + BalanceChanged bool `json:"balanceChanged,omitempty"` +} + +// AdditionalAccountTokenData holds the additional data for indexing a token of an altered account +type AdditionalAccountTokenData struct { + IsNFTCreate bool `json:"isNFTCreate,omitempty"` +} + +// ArgsSaveBlockData will contain all information that are needed to save block data +type ArgsSaveBlockData struct { + HeaderHash []byte + Body data.BodyHandler + Header data.HeaderHandler + SignersIndexes []uint64 + NotarizedHeadersHashes []string + HeaderGasConsumption HeaderGasConsumption + TransactionsPool *Pool + AlteredAccounts map[string]*AlteredAccount + NumberOfShards uint32 + IsImportDB bool +} + +// HeaderGasConsumption holds the data needed to save the gas consumption of a header +type HeaderGasConsumption struct { + GasProvided uint64 + GasRefunded uint64 + GasPenalized uint64 + MaxGasPerBlock uint64 +} + +// Pool will hold all types of transaction +type Pool struct { + Txs map[string]data.TransactionHandlerWithGasUsedAndFee + Scrs map[string]data.TransactionHandlerWithGasUsedAndFee + Rewards map[string]data.TransactionHandlerWithGasUsedAndFee + Invalid map[string]data.TransactionHandlerWithGasUsedAndFee + Receipts map[string]data.TransactionHandlerWithGasUsedAndFee + Logs []*data.LogData +} + +// ValidatorRatingInfo is a structure containing validator rating information +type ValidatorRatingInfo struct { + PublicKey string + Rating float32 +} + +// RoundInfo is a structure containing block signers and shard id +type RoundInfo struct { + Index uint64 + SignersIndexes []uint64 + BlockWasProposed bool + ShardId uint32 + Epoch uint32 + Timestamp time.Duration +} diff --git a/data/outport/txWithFee.go b/data/outport/txWithFee.go new file mode 100644 index 000000000..b5bffa89f --- /dev/null +++ b/data/outport/txWithFee.go @@ -0,0 +1,76 @@ +package outport + +import ( + "math/big" + + "github.com/ElrondNetwork/elrond-go-core/data" +) + +// FeeInfo holds information about the fee and gas used +type FeeInfo struct { + GasUsed uint64 + Fee *big.Int + InitialPaidFee *big.Int +} + +// TransactionHandlerWithGasAndFee holds a data.TransactionHandler and information about fee and gas used +type TransactionHandlerWithGasAndFee struct { + data.TransactionHandler + FeeInfo +} + +// NewTransactionHandlerWithGasAndFee returns a new instance of transactionHandlerWithGasAndFee which matches the interface +func NewTransactionHandlerWithGasAndFee(txHandler data.TransactionHandler, gasUsed uint64, fee *big.Int) data.TransactionHandlerWithGasUsedAndFee { + return &TransactionHandlerWithGasAndFee{ + TransactionHandler: txHandler, + FeeInfo: FeeInfo{ + GasUsed: gasUsed, + Fee: fee, + }, + } +} + +// SetInitialPaidFee will set the initial paid fee +func (t *TransactionHandlerWithGasAndFee) SetInitialPaidFee(fee *big.Int) { + t.InitialPaidFee = fee +} + +// GetInitialPaidFee returns the initial paid fee of the transactions +func (t *TransactionHandlerWithGasAndFee) GetInitialPaidFee() *big.Int { + return t.InitialPaidFee +} + +// SetGasUsed sets the used gas internally +func (t *TransactionHandlerWithGasAndFee) SetGasUsed(gasUsed uint64) { + t.GasUsed = gasUsed +} + +// GetGasUsed returns the used gas of the transaction +func (t *TransactionHandlerWithGasAndFee) GetGasUsed() uint64 { + return t.GasUsed +} + +// SetFee sets the fee internally +func (t *TransactionHandlerWithGasAndFee) SetFee(fee *big.Int) { + t.Fee = fee +} + +// GetFee returns the fee of the transaction +func (t *TransactionHandlerWithGasAndFee) GetFee() *big.Int { + return t.Fee +} + +// GetTxHandler will return the TransactionHandler +func (t *TransactionHandlerWithGasAndFee) GetTxHandler() data.TransactionHandler { + return t.TransactionHandler +} + +// WrapTxsMap will wrap the provided transactions map in a map fo transactions with fee and gas used +func WrapTxsMap(txs map[string]data.TransactionHandler) map[string]data.TransactionHandlerWithGasUsedAndFee { + newMap := make(map[string]data.TransactionHandlerWithGasUsedAndFee, len(txs)) + for txHash, tx := range txs { + newMap[txHash] = NewTransactionHandlerWithGasAndFee(tx, 0, big.NewInt(0)) + } + + return newMap +} diff --git a/data/outport/txWithFee_test.go b/data/outport/txWithFee_test.go new file mode 100644 index 000000000..68fb4a5d3 --- /dev/null +++ b/data/outport/txWithFee_test.go @@ -0,0 +1,22 @@ +package outport + +import ( + "math/big" + "testing" + + "github.com/ElrondNetwork/elrond-go-core/data/transaction" + "github.com/stretchr/testify/require" +) + +func TestNewTransactionWithFee(t *testing.T) { + t.Parallel() + + txWithFee := NewTransactionHandlerWithGasAndFee(&transaction.Transaction{ + Nonce: 1, + }, 100, big.NewInt(1000)) + txWithFee.SetInitialPaidFee(big.NewInt(2000)) + + require.Equal(t, uint64(100), txWithFee.GetGasUsed()) + require.Equal(t, big.NewInt(1000), txWithFee.GetFee()) + require.Equal(t, big.NewInt(2000), txWithFee.GetInitialPaidFee()) +} diff --git a/go.mod b/go.mod index a3d50b95f..523e5bb8f 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,8 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 + github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 github.com/hashicorp/golang-lru v0.5.4 github.com/mr-tron/base58 v1.2.0 github.com/pelletier/go-toml v1.9.3 diff --git a/go.sum b/go.sum index 598c86962..22b2c1b2d 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,10 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= diff --git a/websocketOutportDriver/data/errors.go b/websocketOutportDriver/data/errors.go new file mode 100644 index 000000000..41dee246b --- /dev/null +++ b/websocketOutportDriver/data/errors.go @@ -0,0 +1,36 @@ +package data + +import "errors" + +// ErrNilHttpServer signals that a nil http server has been provided +var ErrNilHttpServer = errors.New("nil http server") + +// ErrNilUint64ByteSliceConverter signals that a nil uint64 byte slice converter has been provided +var ErrNilUint64ByteSliceConverter = errors.New("nil uint64 byte slice converter") + +// ErrNilLogger signals that a nil instance of logger has been provided +var ErrNilLogger = errors.New("nil logger") + +// ErrEmptyDataToSend signals that the data that should be sent via websocket is empty +var ErrEmptyDataToSend = errors.New("empty data to send") + +// ErrNoClientToSendTo signals that the list of clients listening to messages is empty +var ErrNoClientToSendTo = errors.New("no client to send to") + +// ErrServerIsClosed represents the error thrown by the server's ListenAndServe() function when the server is closed +var ErrServerIsClosed = errors.New("http: Server closed") + +// ErrNilMarshaller signals that a nil marshaller has been provided +var ErrNilMarshaller = errors.New("nil marshaller") + +// ErrNilWebSocketSender signals that a nil web socket sender has been provided +var ErrNilWebSocketSender = errors.New("nil sender sender") + +// ErrWebSocketServerIsClosed signals that the web socket server was closed while trying to perform actions +var ErrWebSocketServerIsClosed = errors.New("server is closed") + +// ErrWebSocketClientNotFound signals that the provided websocket client was not found +var ErrWebSocketClientNotFound = errors.New("websocket client not found") + +// ErrNilWebSocketClient signals that a nil websocket client has been provided +var ErrNilWebSocketClient = errors.New("nil websocket client") diff --git a/websocketOutportDriver/data/interface.go b/websocketOutportDriver/data/interface.go new file mode 100644 index 000000000..b53850a66 --- /dev/null +++ b/websocketOutportDriver/data/interface.go @@ -0,0 +1,10 @@ +package data + +import "io" + +// WSConn defines what a sender shall do +type WSConn interface { + io.Closer + ReadMessage() (messageType int, payload []byte, err error) + WriteMessage(messageType int, data []byte) error +} diff --git a/websocketOutportDriver/data/operations.go b/websocketOutportDriver/data/operations.go new file mode 100644 index 000000000..893b392b9 --- /dev/null +++ b/websocketOutportDriver/data/operations.go @@ -0,0 +1,64 @@ +package data + +const ( + // WSRoute is the route which data will be sent over websocket + WSRoute = "/save" +) + +// WebSocketConfig holds the configuration needed for instantiating a new web socket server +type WebSocketConfig struct { + URL string + WithAcknowledge bool +} + +// OperationType defines the type to be used to group web socket operations +type OperationType uint8 + +// OperationTypeFromUint64 returns the operation type based on the provided uint64 value +func OperationTypeFromUint64(value uint64) OperationType { + return OperationType(uint8(value)) +} + +// String will return the string representation of the operation +func (ot OperationType) String() string { + switch ot { + case 0: + return "SaveBlock" + case 1: + return "RevertIndexedBlock" + case 2: + return "SaveRoundsInfo" + case 3: + return "SaveValidatorsPubKeys" + case 4: + return "SaveValidatorsRating" + case 5: + return "SaveAccounts" + case 6: + return "FinalizedBlock" + default: + return "Unknown" + } +} + +// Uint32 will return the uint32 representation of the operation +func (ot OperationType) Uint32() uint32 { + return uint32(ot) +} + +const ( + // OperationSaveBlock is the operation that triggers a block saving + OperationSaveBlock OperationType = 0 + // OperationRevertIndexedBlock is the operation that triggers a reverting of an indexed block + OperationRevertIndexedBlock OperationType = 1 + // OperationSaveRoundsInfo is the operation that triggers the saving of rounds info + OperationSaveRoundsInfo OperationType = 2 + // OperationSaveValidatorsPubKeys is the operation that triggers the saving of validators' public keys + OperationSaveValidatorsPubKeys OperationType = 3 + // OperationSaveValidatorsRating is the operation that triggers the saving of the validators' rating + OperationSaveValidatorsRating OperationType = 4 + // OperationSaveAccounts is the operation that triggers the saving of accounts + OperationSaveAccounts OperationType = 5 + // OperationFinalizedBlock is the operation that triggers the handling of a finalized block + OperationFinalizedBlock OperationType = 6 +) diff --git a/websocketOutportDriver/data/shared.go b/websocketOutportDriver/data/shared.go new file mode 100644 index 000000000..5c4809538 --- /dev/null +++ b/websocketOutportDriver/data/shared.go @@ -0,0 +1,54 @@ +package data + +import ( + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/outport" +) + +// WsSendArgs holds the arguments needed for performing a web socket request +type WsSendArgs struct { + Payload []byte +} + +// ArgsRevertIndexedBlock holds the driver's arguments needed for reverting an indexed block +type ArgsRevertIndexedBlock struct { + HeaderType core.HeaderType + Header data.HeaderHandler + Body data.BodyHandler +} + +// ArgsSaveRoundsInfo holds the driver's arguments needed for indexing rounds info +type ArgsSaveRoundsInfo struct { + RoundsInfos []*outport.RoundInfo +} + +// ArgsSaveValidatorsPubKeys holds the driver's arguments needed for indexing validator public keys +type ArgsSaveValidatorsPubKeys struct { + ValidatorsPubKeys map[uint32][][]byte + Epoch uint32 +} + +// ArgsSaveValidatorsRating holds the driver's arguments needed for indexing validators' rating +type ArgsSaveValidatorsRating struct { + IndexID string + InfoRating []*outport.ValidatorRatingInfo +} + +// ArgsSaveAccounts holds the driver's arguments needed for indexing accounts +type ArgsSaveAccounts struct { + ShardID uint32 + BlockTimestamp uint64 + Acc map[string]*outport.AlteredAccount +} + +// ArgsFinalizedBlock holds the driver's arguments needed for handling a finalized block +type ArgsFinalizedBlock struct { + HeaderHash []byte +} + +// ArgsSaveBlock holds the driver's arguments needed for handling a save block +type ArgsSaveBlock struct { + HeaderType core.HeaderType + outport.ArgsSaveBlockData +} diff --git a/websocketOutportDriver/factory/factory.go b/websocketOutportDriver/factory/factory.go new file mode 100644 index 000000000..2d532fc6e --- /dev/null +++ b/websocketOutportDriver/factory/factory.go @@ -0,0 +1,126 @@ +package factory + +import ( + "net/http" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-core/core/check" + "github.com/ElrondNetwork/elrond-go-core/marshal" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver" + outportData "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/sender" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" +) + +// OutportDriverWebSocketSenderFactoryArgs holds the arguments needed for creating a outportDriverWebSocketSenderFactory +type OutportDriverWebSocketSenderFactoryArgs struct { + WebSocketConfig outportData.WebSocketConfig + Marshaller marshal.Marshalizer + Uint64ByteSliceConverter websocketOutportDriver.Uint64ByteSliceConverter + Log core.Logger + WithAcknowledge bool +} + +type outportDriverWebSocketSenderFactory struct { + webSocketConfig outportData.WebSocketConfig + marshaller marshal.Marshalizer + uint64ByteSliceConverter websocketOutportDriver.Uint64ByteSliceConverter + log core.Logger + withAcknowledge bool +} + +// NewOutportDriverWebSocketSenderFactory will return a new instance of outportDriverWebSocketSenderFactory +func NewOutportDriverWebSocketSenderFactory(args OutportDriverWebSocketSenderFactoryArgs) (*outportDriverWebSocketSenderFactory, error) { + if check.IfNil(args.Marshaller) { + return nil, outportData.ErrNilMarshaller + } + if check.IfNil(args.Uint64ByteSliceConverter) { + return nil, outportData.ErrNilUint64ByteSliceConverter + } + if check.IfNil(args.Log) { + return nil, outportData.ErrNilLogger + } + return &outportDriverWebSocketSenderFactory{ + webSocketConfig: args.WebSocketConfig, + marshaller: args.Marshaller, + uint64ByteSliceConverter: args.Uint64ByteSliceConverter, + withAcknowledge: args.WithAcknowledge, + log: args.Log, + }, nil +} + +// Create will handle the creation of all the components needed to create an outport driver that sends data over +// web socket and return it afterwards +func (o *outportDriverWebSocketSenderFactory) Create() (websocketOutportDriver.Driver, error) { + webSocketSender, err := o.createWebSocketSender() + if err != nil { + return nil, err + } + + return websocketOutportDriver.NewWebsocketOutportDriverNodePart( + websocketOutportDriver.WebsocketOutportDriverNodePartArgs{ + Enabled: false, + Marshaller: o.marshaller, + WebsocketSender: webSocketSender, + WebSocketConfig: outportData.WebSocketConfig{}, + Uint64ByteSliceConverter: o.uint64ByteSliceConverter, + Log: o.log, + }, + ) +} + +func (o *outportDriverWebSocketSenderFactory) createWebSocketSender() (websocketOutportDriver.WebSocketSenderHandler, error) { + router := mux.NewRouter() + server := &http.Server{ + Addr: o.webSocketConfig.URL, + Handler: router, + } + + webSocketSenderArgs := sender.WebSocketSenderArgs{ + Server: server, + Uint64ByteSliceConverter: o.uint64ByteSliceConverter, + WithAcknowledge: o.withAcknowledge, + Log: o.log, + } + webSocketSender, err := sender.NewWebSocketSender(webSocketSenderArgs) + if err != nil { + return nil, err + } + + err = o.registerRoute(router, webSocketSender, outportData.WSRoute) + if err != nil { + return nil, err + } + + return webSocketSender, nil +} + +func (o *outportDriverWebSocketSenderFactory) registerRoute(router *mux.Router, webSocketHandler websocketOutportDriver.WebSocketSenderHandler, path string) error { + var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } + + routeSendData := router.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + o.log.Info("new connection", "route", path, "remote address", r.RemoteAddr) + + upgrader.CheckOrigin = func(r *http.Request) bool { return true } + + ws, errUpgrade := upgrader.Upgrade(w, r, nil) + if errUpgrade != nil { + o.log.Warn("could not update websocket connection", "remote address", r.RemoteAddr, "error", errUpgrade) + return + } + + webSocketHandler.AddClient(ws, ws.RemoteAddr().String()) + }) + + if routeSendData.GetError() != nil { + o.log.Error("sender router failed to handle send data", + "route", routeSendData.GetName(), + "error", routeSendData.GetError()) + } + + return nil +} diff --git a/websocketOutportDriver/interface.go b/websocketOutportDriver/interface.go new file mode 100644 index 000000000..ab98af7c3 --- /dev/null +++ b/websocketOutportDriver/interface.go @@ -0,0 +1,36 @@ +package websocketOutportDriver + +import ( + "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + outportSenderData "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" +) + +// Driver is an interface for saving node specific data to other storage. +// This could be an elastic search index, a MySql database or any other external services. +type Driver interface { + SaveBlock(args *outport.ArgsSaveBlockData) error + RevertIndexedBlock(header data.HeaderHandler, body data.BodyHandler) error + SaveRoundsInfo(roundsInfos []*outport.RoundInfo) error + SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte, epoch uint32) error + SaveValidatorsRating(indexID string, infoRating []*outport.ValidatorRatingInfo) error + SaveAccounts(blockTimestamp uint64, acc map[string]*outport.AlteredAccount, shardID uint32) error + FinalizedBlock(headerHash []byte) error + Close() error + IsInterfaceNil() bool +} + +// WebSocketSenderHandler defines what the actions that a web socket sender should do +type WebSocketSenderHandler interface { + Send(args outportSenderData.WsSendArgs) error + AddClient(wss outportSenderData.WSConn, remoteAddr string) + Close() error + IsInterfaceNil() bool +} + +// Uint64ByteSliceConverter converts byte slice to/from uint64 +type Uint64ByteSliceConverter interface { + ToByteSlice(uint64) []byte + ToUint64([]byte) (uint64, error) + IsInterfaceNil() bool +} diff --git a/websocketOutportDriver/mock/httpServerHandlerStub.go b/websocketOutportDriver/mock/httpServerHandlerStub.go new file mode 100644 index 000000000..e9786ec0a --- /dev/null +++ b/websocketOutportDriver/mock/httpServerHandlerStub.go @@ -0,0 +1,27 @@ +package mock + +import "context" + +// HttpServerStub - +type HttpServerStub struct { + ListenAndServeCalled func() error + ShutdownCalled func(ctx context.Context) error +} + +// ListenAndServe - +func (h *HttpServerStub) ListenAndServe() error { + if h.ListenAndServeCalled != nil { + return h.ListenAndServeCalled() + } + + return nil +} + +//Shutdown - +func (h *HttpServerStub) Shutdown(ctx context.Context) error { + if h.ShutdownCalled != nil { + return h.ShutdownCalled(ctx) + } + + return nil +} diff --git a/websocketOutportDriver/mock/uint64ByteSliceConverterStub.go b/websocketOutportDriver/mock/uint64ByteSliceConverterStub.go new file mode 100644 index 000000000..43886ecb8 --- /dev/null +++ b/websocketOutportDriver/mock/uint64ByteSliceConverterStub.go @@ -0,0 +1,30 @@ +package mock + +// Uint64ByteSliceConverterStub - +type Uint64ByteSliceConverterStub struct { + ToByteSliceCalled func(u2 uint64) []byte + ToUint64Called func(bytes []byte) (uint64, error) +} + +// ToByteSlice - +func (u *Uint64ByteSliceConverterStub) ToByteSlice(u2 uint64) []byte { + if u.ToByteSliceCalled != nil { + return u.ToByteSliceCalled(u2) + } + + return nil +} + +// ToUint64 - +func (u *Uint64ByteSliceConverterStub) ToUint64(bytes []byte) (uint64, error) { + if u.ToUint64Called != nil { + return u.ToUint64Called(bytes) + } + + return 0, nil +} + +// IsInterfaceNil - +func (u *Uint64ByteSliceConverterStub) IsInterfaceNil() bool { + return u == nil +} diff --git a/websocketOutportDriver/mock/webSocketSenderStub.go b/websocketOutportDriver/mock/webSocketSenderStub.go new file mode 100644 index 000000000..0c8aef277 --- /dev/null +++ b/websocketOutportDriver/mock/webSocketSenderStub.go @@ -0,0 +1,42 @@ +package mock + +import ( + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" +) + +// WebSocketSenderStub - +type WebSocketSenderStub struct { + SendOnRouteCalled func(args data.WsSendArgs) error + AddClientCalled func(wss data.WSConn, remoteAddr string) + CloseCalled func() error +} + +// AddClient - +func (w *WebSocketSenderStub) AddClient(wss data.WSConn, remoteAddr string) { + if w.AddClientCalled != nil { + w.AddClientCalled(wss, remoteAddr) + } +} + +// Send - +func (w *WebSocketSenderStub) Send(args data.WsSendArgs) error { + if w.SendOnRouteCalled != nil { + return w.SendOnRouteCalled(args) + } + + return nil +} + +// Close - +func (w *WebSocketSenderStub) Close() error { + if w.CloseCalled != nil { + return w.CloseCalled() + } + + return nil +} + +// IsInterfaceNil - +func (w *WebSocketSenderStub) IsInterfaceNil() bool { + return w == nil +} diff --git a/websocketOutportDriver/mock/websocketConnectionStub.go b/websocketOutportDriver/mock/websocketConnectionStub.go new file mode 100644 index 000000000..1862e63a3 --- /dev/null +++ b/websocketOutportDriver/mock/websocketConnectionStub.go @@ -0,0 +1,35 @@ +package mock + +// WebsocketConnectionStub - +type WebsocketConnectionStub struct { + ReadMessageCalled func() (messageType int, payload []byte, err error) + WriteMessageCalled func(messageType int, data []byte) error + CloseCalled func() error +} + +// ReadMessage - +func (w *WebsocketConnectionStub) ReadMessage() (messageType int, payload []byte, err error) { + if w.ReadMessageCalled != nil { + return w.ReadMessageCalled() + } + + return 0, nil, err +} + +// WriteMessage - +func (w *WebsocketConnectionStub) WriteMessage(messageType int, data []byte) error { + if w.WriteMessageCalled != nil { + return w.WriteMessageCalled(messageType, data) + } + + return nil +} + +// Close - +func (w *WebsocketConnectionStub) Close() error { + if w.CloseCalled != nil { + return w.CloseCalled() + } + + return nil +} diff --git a/websocketOutportDriver/payloadParser.go b/websocketOutportDriver/payloadParser.go new file mode 100644 index 000000000..f2ca7b7d9 --- /dev/null +++ b/websocketOutportDriver/payloadParser.go @@ -0,0 +1,153 @@ +package websocketOutportDriver + +import ( + "bytes" + "encoding/hex" + "fmt" + + "github.com/ElrondNetwork/elrond-go-core/core/check" + dataCore "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" +) + +const ( + withAcknowledgeNumBytes = 1 + uint64NumBytes = 8 + uint32NumBytes = 4 +) + +var ( + minBytesForCorrectPayload = withAcknowledgeNumBytes + uint64NumBytes + uint32NumBytes + uint32NumBytes +) + +// PayloadData holds the arguments that should be parsed from a websocket payload +type PayloadData struct { + WithAcknowledge bool + Counter uint64 + OperationType data.OperationType + Payload []byte +} + +type websocketPayloadParser struct { + uint64ByteSliceConverter Uint64ByteSliceConverter +} + +// NewWebSocketPayloadParser returns a new instance of websocketPayloadParser +func NewWebSocketPayloadParser(uint64ByteSliceConverter Uint64ByteSliceConverter) (*websocketPayloadParser, error) { + if check.IfNil(uint64ByteSliceConverter) { + return nil, data.ErrNilUint64ByteSliceConverter + } + + return &websocketPayloadParser{ + uint64ByteSliceConverter: uint64ByteSliceConverter, + }, nil +} + +// ExtractPayloadData will extract the data from the received payload +// It should have the following form: +// first byte - with acknowledge or not +// next 8 bytes - counter (uint64 big endian) +// next 4 bytes - operation type (uint32 big endian) +// next 4 bytes - message length (uint32 big endian) +// next X bytes - the actual data to parse +func (wpp *websocketPayloadParser) ExtractPayloadData(payload []byte) (*PayloadData, error) { + if len(payload) < minBytesForCorrectPayload { + return nil, fmt.Errorf("invalid payload. minimum required length is %d bytes, but only provided %d", + minBytesForCorrectPayload, + len(payload)) + } + + var err error + payloadData := &PayloadData{ + WithAcknowledge: false, + } + + if payload[0] == byte(1) { + payloadData.WithAcknowledge = true + } + payload = payload[withAcknowledgeNumBytes:] + + counterBytes := payload[:uint64NumBytes] + payloadData.Counter, err = wpp.uint64ByteSliceConverter.ToUint64(counterBytes) + if err != nil { + return nil, fmt.Errorf("%w while extracting the counter from the payload", err) + } + payload = payload[uint64NumBytes:] + + operationTypeBytes := payload[:uint32NumBytes] + var operationTypeUint64 uint64 + operationTypeUint64, err = wpp.uint64ByteSliceConverter.ToUint64(padUint32ByteSlice(operationTypeBytes)) + if err != nil { + return nil, fmt.Errorf("%w while extracting the counter from the payload", err) + } + payloadData.OperationType = data.OperationTypeFromUint64(operationTypeUint64) + payload = payload[uint32NumBytes:] + + var messageLen uint64 + messageLen, err = wpp.uint64ByteSliceConverter.ToUint64(padUint32ByteSlice(payload[:uint32NumBytes])) + if err != nil { + return nil, fmt.Errorf("%w while extracting the message length", err) + } + payload = payload[uint32NumBytes:] + + if messageLen != uint64(len(payload)) { + return nil, fmt.Errorf("message counter is not equal to the actual payload. provided: %d, actual: %d", + messageLen, len(payload)) + } + + payloadData.Payload = payload + + return payloadData, nil +} + +func padUint32ByteSlice(initial []byte) []byte { + padding := bytes.Repeat([]byte{0}, 4) + return append(padding, initial...) +} + +func prepareArgsSaveBlock(args outport.ArgsSaveBlockData) outport.ArgsSaveBlockData { + var pool *outport.Pool + if args.TransactionsPool != nil { + pool = &outport.Pool{ + Txs: prepareTxs(args.TransactionsPool.Txs), + Scrs: prepareTxs(args.TransactionsPool.Scrs), + Rewards: prepareTxs(args.TransactionsPool.Rewards), + Invalid: prepareTxs(args.TransactionsPool.Invalid), + Receipts: prepareTxs(args.TransactionsPool.Receipts), + Logs: prepareLogs(args.TransactionsPool.Logs), + } + } + + return outport.ArgsSaveBlockData{ + HeaderHash: args.HeaderHash, + Body: args.Body, + Header: args.Header, + SignersIndexes: args.SignersIndexes, + NotarizedHeadersHashes: args.NotarizedHeadersHashes, + HeaderGasConsumption: args.HeaderGasConsumption, + TransactionsPool: pool, + AlteredAccounts: args.AlteredAccounts, + NumberOfShards: args.NumberOfShards, + IsImportDB: args.IsImportDB, + } +} + +func prepareLogs(initial []*dataCore.LogData) []*dataCore.LogData { + res := make([]*dataCore.LogData, 0, len(initial)) + for _, logHandler := range initial { + res = append(res, &dataCore.LogData{ + LogHandler: logHandler.LogHandler, + TxHash: hex.EncodeToString([]byte(logHandler.TxHash)), + }) + } + return res +} + +func prepareTxs(initial map[string]dataCore.TransactionHandlerWithGasUsedAndFee) map[string]dataCore.TransactionHandlerWithGasUsedAndFee { + res := make(map[string]dataCore.TransactionHandlerWithGasUsedAndFee) + for txHash, tx := range initial { + res[hex.EncodeToString([]byte(txHash))] = tx + } + return res +} diff --git a/websocketOutportDriver/payloadParser_test.go b/websocketOutportDriver/payloadParser_test.go new file mode 100644 index 000000000..3d099809b --- /dev/null +++ b/websocketOutportDriver/payloadParser_test.go @@ -0,0 +1,148 @@ +package websocketOutportDriver + +import ( + "bytes" + "errors" + "strings" + "testing" + + "github.com/ElrondNetwork/elrond-go-core/data/typeConverters/uint64ByteSlice" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/mock" + "github.com/stretchr/testify/require" +) + +var uint64ByteSliceConv = uint64ByteSlice.NewBigEndianConverter() + +func TestNewWebSocketPayloadParser(t *testing.T) { + t.Parallel() + + t.Run("nil uint64 byte slice converter", func(t *testing.T) { + wpp, err := NewWebSocketPayloadParser(nil) + require.Equal(t, data.ErrNilUint64ByteSliceConverter, err) + require.Nil(t, wpp) + }) + + t.Run("constructor should work", func(t *testing.T) { + wpp, err := NewWebSocketPayloadParser(uint64ByteSliceConv) + require.NoError(t, err) + require.NotNil(t, wpp) + }) +} + +func TestWebsocketPayloadParser_ExtractPayloadData(t *testing.T) { + t.Run("invalid payload length", testExtractPayloadDataInvalidLength) + t.Run("invalid counter byte slice", testExtractPayloadDataInvalidCounterByteSlice) + t.Run("invalid operation type byte slice", testExtractPayloadDataInvalidOperationTypeByteSlice) + t.Run("invalid message counter byte slice", testExtractPayloadDataInvalidMessageCounterByteSlice) + t.Run("invalid payload - message counter vs actual payload size", testExtractPayloadDataMessageCounterDoesNotMatchActualPayloadSize) + t.Run("should work", testExtractPayloadDataShouldWork) +} + +func testExtractPayloadDataInvalidLength(t *testing.T) { + parser, _ := NewWebSocketPayloadParser(uint64ByteSliceConv) + res, err := parser.ExtractPayloadData([]byte("invalid")) + require.Nil(t, res) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "invalid payload")) +} + +func testExtractPayloadDataInvalidCounterByteSlice(t *testing.T) { + localErr := errors.New("local error") + uint64ConvStub := &mock.Uint64ByteSliceConverterStub{ + ToUint64Called: func(_ []byte) (uint64, error) { + return 0, localErr + }, + } + parser, _ := NewWebSocketPayloadParser(uint64ConvStub) + res, err := parser.ExtractPayloadData(bytes.Repeat([]byte{0}, minBytesForCorrectPayload)) + require.Nil(t, res) + require.Error(t, err) + require.True(t, errors.Is(err, localErr)) +} + +func testExtractPayloadDataInvalidOperationTypeByteSlice(t *testing.T) { + localErr := errors.New("local error") + numCalled := 0 + uint64ConvStub := &mock.Uint64ByteSliceConverterStub{ + ToUint64Called: func(_ []byte) (uint64, error) { + numCalled++ + if numCalled == 2 { + return 0, localErr + } + + return 0, nil + }, + } + parser, _ := NewWebSocketPayloadParser(uint64ConvStub) + res, err := parser.ExtractPayloadData(bytes.Repeat([]byte{0}, minBytesForCorrectPayload)) + require.Nil(t, res) + require.Error(t, err) + require.True(t, errors.Is(err, localErr)) +} + +func testExtractPayloadDataInvalidMessageCounterByteSlice(t *testing.T) { + localErr := errors.New("local error") + numCalled := 0 + uint64ConvStub := &mock.Uint64ByteSliceConverterStub{ + ToUint64Called: func(_ []byte) (uint64, error) { + numCalled++ + if numCalled == 3 { + return 0, localErr + } + + return 0, nil + }, + } + parser, _ := NewWebSocketPayloadParser(uint64ConvStub) + res, err := parser.ExtractPayloadData(bytes.Repeat([]byte{0}, minBytesForCorrectPayload)) + require.Nil(t, res) + require.Error(t, err) + require.True(t, errors.Is(err, localErr)) +} + +func testExtractPayloadDataMessageCounterDoesNotMatchActualPayloadSize(t *testing.T) { + uint64ConvStub := &mock.Uint64ByteSliceConverterStub{ + ToUint64Called: func(_ []byte) (uint64, error) { + return 0, nil + }, + } + parser, _ := NewWebSocketPayloadParser(uint64ConvStub) + res, err := parser.ExtractPayloadData(bytes.Repeat([]byte{0}, minBytesForCorrectPayload+2)) + require.Nil(t, res) + require.Error(t, err) + require.True(t, strings.Contains(err.Error(), "message counter is not equal")) +} + +func testExtractPayloadDataShouldWork(t *testing.T) { + parser, _ := NewWebSocketPayloadParser(uint64ByteSliceConv) + + expectedCounter := uint64(9) + expectedOperation := data.OperationSaveAccounts + expectedPayload := []byte("actual payload data") + + payload := make([]byte, 1) + payload[0] = byte(1) // with ack + + counterBytes := bytes.Repeat([]byte{0}, uint64NumBytes) + counterBytes[uint64NumBytes-1] = byte(expectedCounter) + payload = append(payload, counterBytes...) + + operationBytes := bytes.Repeat([]byte{0}, uint32NumBytes) + operationBytes[uint32NumBytes-1] = byte(expectedOperation.Uint32()) + payload = append(payload, operationBytes...) + + messageLenBytes := bytes.Repeat([]byte{0}, uint32NumBytes) + messageLenBytes[uint32NumBytes-1] = byte(len(expectedPayload)) + payload = append(payload, messageLenBytes...) + + payload = append(payload, expectedPayload...) + + res, err := parser.ExtractPayloadData(payload) + require.NoError(t, err) + require.NotNil(t, res) + require.True(t, res.WithAcknowledge) + require.Equal(t, expectedCounter, res.Counter) + require.Equal(t, expectedOperation, res.OperationType) + require.Equal(t, expectedPayload, res.Payload) +} diff --git a/websocketOutportDriver/sender/acknowledgesHolder.go b/websocketOutportDriver/sender/acknowledgesHolder.go new file mode 100644 index 000000000..f4b5221c0 --- /dev/null +++ b/websocketOutportDriver/sender/acknowledgesHolder.go @@ -0,0 +1,52 @@ +package sender + +import "sync" + +type acknowledgesHolder struct { + acknowledges map[string]*websocketClientAcknowledgesHolder + mut sync.Mutex +} + +// NewAcknowledgesHolder returns a new instance of acknowledgesHolder +func NewAcknowledgesHolder() *acknowledgesHolder { + return &acknowledgesHolder{ + acknowledges: make(map[string]*websocketClientAcknowledgesHolder), + } +} + +// AddEntry will add the client to the inner map +func (ah *acknowledgesHolder) AddEntry(remoteAddr string) { + ah.mut.Lock() + ah.acknowledges[remoteAddr] = NewWebsocketClientAcknowledgesHolder() + ah.mut.Unlock() +} + +// GetAcknowledgesOfAddress will return the acknowledges for the specified address, if any +func (ah *acknowledgesHolder) GetAcknowledgesOfAddress(remoteAddr string) (*websocketClientAcknowledgesHolder, bool) { + ah.mut.Lock() + defer ah.mut.Unlock() + + acks, found := ah.acknowledges[remoteAddr] + return acks, found +} + +// RemoveEntryForAddress will remove the provided address from the internal map +func (ah *acknowledgesHolder) RemoveEntryForAddress(remoteAddr string) { + ah.mut.Lock() + delete(ah.acknowledges, remoteAddr) + ah.mut.Unlock() +} + +// AddReceivedAcknowledge will add the received acknowledge as a counter for the given address +func (ah *acknowledgesHolder) AddReceivedAcknowledge(remoteAddr string, counter uint64) bool { + ah.mut.Lock() + defer ah.mut.Unlock() + + acks, found := ah.acknowledges[remoteAddr] + if !found { + return false + } + + acks.Add(counter) + return true +} diff --git a/websocketOutportDriver/sender/acknowledgesHolder_test.go b/websocketOutportDriver/sender/acknowledgesHolder_test.go new file mode 100644 index 000000000..929ecfb67 --- /dev/null +++ b/websocketOutportDriver/sender/acknowledgesHolder_test.go @@ -0,0 +1,130 @@ +package sender + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewAcknowledgesHolder(t *testing.T) { + t.Parallel() + + ah := NewAcknowledgesHolder() + require.NotNil(t, ah) +} + +func TestAcknowledgesHolder_AddEntry(t *testing.T) { + t.Parallel() + + remAddr := "test address" + ah := NewAcknowledgesHolder() + ah.AddEntry(remAddr) + + ah.mut.Lock() + res, found := ah.acknowledges[remAddr] + ah.mut.Unlock() + + require.True(t, found) + require.NotNil(t, res) +} + +func TestAcknowledgesHolder_AddReceivedAcknowledge(t *testing.T) { + t.Parallel() + + remAddr := "test address" + counter := uint64(37) + ah := NewAcknowledgesHolder() + ah.AddEntry(remAddr) + + ah.AddReceivedAcknowledge(remAddr, counter) + + ah.mut.Lock() + found := ah.acknowledges[remAddr].ProcessAcknowledged(counter) + ah.mut.Unlock() + + require.True(t, found) +} + +func TestAcknowledgesHolder_GetAcknowledgesOfAddress(t *testing.T) { + t.Parallel() + + t.Run("GetAcknowledgesOfAddress: not found", func(t *testing.T) { + t.Parallel() + + ah := NewAcknowledgesHolder() + + res, found := ah.GetAcknowledgesOfAddress("new addr") + require.False(t, found) + require.Nil(t, res) + }) + + t.Run("GetAcknowledgesOfAddress: should work", func(t *testing.T) { + t.Parallel() + + remAddr := "test address" + counter0, counter1 := uint64(37), uint64(38) + ah := NewAcknowledgesHolder() + ah.AddEntry(remAddr) + + ah.AddReceivedAcknowledge(remAddr, counter0) + ah.AddReceivedAcknowledge(remAddr, counter1) + + acks, found := ah.GetAcknowledgesOfAddress(remAddr) + require.True(t, found) + + found0 := acks.ProcessAcknowledged(counter0) + found1 := acks.ProcessAcknowledged(counter1) + + require.True(t, found0) + require.True(t, found1) + }) +} + +func TestAcknowledgesHolder_RemoveEntryForAddress(t *testing.T) { + t.Parallel() + + remAddr := "remote addr" + + ah := NewAcknowledgesHolder() + + ah.AddEntry(remAddr) + ah.RemoveEntryForAddress(remAddr) + + ah.mut.Lock() + _, found := ah.acknowledges[remAddr] + ah.mut.Unlock() + + require.False(t, found) +} + +func TestAcknowledgesHolder_ConcurrentOperations(t *testing.T) { + t.Parallel() + + ah := NewAcknowledgesHolder() + + defer func() { + r := recover() + require.Nil(t, r) + }() + + wg := sync.WaitGroup{} + wg.Add(100) + + for i := uint64(0); i < 100; i++ { + go func(index uint64) { + switch index % 4 { + case 0: + ah.AddReceivedAcknowledge("addr", index) + case 1: + _, _ = ah.GetAcknowledgesOfAddress("addr") + case 2: + ah.RemoveEntryForAddress("addr") + case 3: + ah.AddEntry("addr") + } + wg.Done() + }(i) + } + wg.Wait() +} diff --git a/websocketOutportDriver/sender/interface.go b/websocketOutportDriver/sender/interface.go new file mode 100644 index 000000000..9403b0a0b --- /dev/null +++ b/websocketOutportDriver/sender/interface.go @@ -0,0 +1,16 @@ +package sender + +import "context" + +// Uint64ByteSliceConverter converts byte slice to/from uint64 +type Uint64ByteSliceConverter interface { + ToByteSlice(uint64) []byte + ToUint64([]byte) (uint64, error) + IsInterfaceNil() bool +} + +// HttpServerHandler defines the minimum behaviour of a http server +type HttpServerHandler interface { + ListenAndServe() error + Shutdown(ctx context.Context) error +} diff --git a/websocketOutportDriver/sender/webSocketSender.go b/websocketOutportDriver/sender/webSocketSender.go new file mode 100644 index 000000000..9236a9666 --- /dev/null +++ b/websocketOutportDriver/sender/webSocketSender.go @@ -0,0 +1,249 @@ +package sender + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + "time" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-core/core/check" + outportData "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/gorilla/websocket" +) + +var ( + prefixWithAck = []byte{1} + prefixWithoutAck = []byte{0} +) + +type webSocketClient struct { + conn outportData.WSConn + remoteAddr string +} + +type webSocketSender struct { + log core.Logger + // TODO: use an interface for http server (or simply provide the URL only) in order to make this component easy testable + server HttpServerHandler + counter uint64 + uint64ByteSliceConverter Uint64ByteSliceConverter + // TODO: use interfaces instead of direct instances + analyze returning pointers vs values on exported functions + clientsHolder *websocketClientsHolder + acknowledges *acknowledgesHolder + withAcknowledge bool +} + +// WebSocketSenderArgs holds the arguments needed for creating a new instance of webSocketSender +type WebSocketSenderArgs struct { + Server HttpServerHandler + Uint64ByteSliceConverter Uint64ByteSliceConverter + WithAcknowledge bool + Log core.Logger +} + +// NewWebSocketSender returns a new instance of webSocketSender +func NewWebSocketSender(args WebSocketSenderArgs) (*webSocketSender, error) { + if args.Server == nil { + return nil, outportData.ErrNilHttpServer + } + if check.IfNil(args.Uint64ByteSliceConverter) { + return nil, outportData.ErrNilUint64ByteSliceConverter + } + if check.IfNil(args.Log) { + return nil, outportData.ErrNilLogger + } + + ws := &webSocketSender{ + log: args.Log, + server: args.Server, + counter: 0, + uint64ByteSliceConverter: args.Uint64ByteSliceConverter, + clientsHolder: NewWebsocketClientsHolder(), + acknowledges: NewAcknowledgesHolder(), + withAcknowledge: args.WithAcknowledge, + } + + go ws.start() + + return ws, nil +} + +// AddClient will add the client to internal maps and will also start +func (w *webSocketSender) AddClient(wss outportData.WSConn, remoteAddr string) { + if wss == nil { + w.log.Warn("nil ws connection provider", "remote addr", remoteAddr) + return + } + + client := &webSocketClient{ + conn: wss, + remoteAddr: remoteAddr, + } + + err := w.clientsHolder.AddClient(client) + if err != nil { + w.log.Warn("cannot AddClient", "error", err) + return + } + + // TODO: maybe multiple clients types could be supported: some require ack, while some don't require ack + if !w.withAcknowledge { + return + } + + w.acknowledges.AddEntry(remoteAddr) + + go w.handleReceiveAck(client) +} + +func (w *webSocketSender) handleReceiveAck(client *webSocketClient) { + for { + mType, message, err := client.conn.ReadMessage() + if err != nil { + w.log.Error("cannot read message", "remote addr", client.remoteAddr, "error", err) + + err = w.clientsHolder.CloseAndRemove(client.remoteAddr) + w.log.LogIfError(err) + + w.acknowledges.RemoveEntryForAddress(client.remoteAddr) + + break + } + + if mType != websocket.BinaryMessage { + w.log.Warn("received message is not binary message", "remote addr", client.remoteAddr, "message type", mType) + continue + } + + w.log.Trace("received ack", "remote addr", client.remoteAddr, "message", message) + counter, err := w.uint64ByteSliceConverter.ToUint64(message) + if err != nil { + w.log.Warn("cannot decode counter: bytes to uint64", + "remote addr", client.remoteAddr, + "counter bytes", message, + "error", err, + ) + continue + } + + w.acknowledges.AddReceivedAcknowledge(client.remoteAddr, counter) + } +} + +func (w *webSocketSender) start() { + err := w.server.ListenAndServe() + if err != nil && !strings.Contains(err.Error(), outportData.ErrServerIsClosed.Error()) { + w.log.Error("could not initialize webserver", "error", err) + } +} + +func (w *webSocketSender) sendDataToClients( + data []byte, + counter uint64, +) error { + numSent := 0 + var err error + + clients := w.clientsHolder.GetAll() + if len(clients) == 0 { + return outportData.ErrNoClientToSendTo + } + + for _, client := range w.clientsHolder.GetAll() { + err = w.sendData(data, *client, counter) + if err != nil { + w.log.Error("couldn't send data to client", "error", err) + continue + } + + numSent++ + } + + if numSent == 0 { + return fmt.Errorf("data wasn't sent to any client. last known error: %w", err) + } + + return nil +} + +func (w *webSocketSender) sendData( + data []byte, + client webSocketClient, + counter uint64, +) error { + if len(data) == 0 { + return outportData.ErrEmptyDataToSend + } + + errSend := client.conn.WriteMessage(websocket.BinaryMessage, data) + if errSend != nil { + // TODO: test if this is a situation when the client connection should be dropped + w.log.Warn("could not send data to client", "remote addr", client.remoteAddr, "error", errSend) + return fmt.Errorf("%w while writing message to client %s", errSend, client.remoteAddr) + } + + if !w.withAcknowledge { + return nil + } + + // TODO: might refactor this (send to each clients, then wait for all VS send to one client, wait for it, move to next) + w.waitForAck(client.remoteAddr, counter) + + return nil +} + +func (w *webSocketSender) waitForAck(remoteAddr string, counter uint64) { + for { + acksForAddress, ok := w.acknowledges.GetAcknowledgesOfAddress(remoteAddr) + if !ok { + w.log.Warn("waiting acknowledge for an address that isn't present anymore in clients map", "remote addr", remoteAddr) + return + } + + ok = acksForAddress.ProcessAcknowledged(counter) + if ok { + return + } + + time.Sleep(time.Millisecond) + } +} + +// Send will make the request accordingly to the received arguments +func (w *webSocketSender) Send(args outportData.WsSendArgs) error { + assignedCounter := atomic.AddUint64(&w.counter, 1) + ackData := prefixWithoutAck + if w.withAcknowledge { + ackData = prefixWithAck + } + + newPayload := append(ackData, w.uint64ByteSliceConverter.ToByteSlice(assignedCounter)...) + newPayload = append(newPayload, args.Payload...) + + return w.sendDataToClients(newPayload, assignedCounter) +} + +// Close will close the server and the connections with the clients +func (w *webSocketSender) Close() error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := w.server.Shutdown(ctx) + if err != nil { + w.log.Error("cannot close the server", "error", err) + } + + for _, client := range w.clientsHolder.GetAll() { + err = client.conn.Close() + w.log.LogIfError(err) + } + + return nil +} + +// IsInterfaceNil returns true if there is no value under the interface +func (w *webSocketSender) IsInterfaceNil() bool { + return w == nil +} diff --git a/websocketOutportDriver/sender/webSocketSender_test.go b/websocketOutportDriver/sender/webSocketSender_test.go new file mode 100644 index 000000000..6be062823 --- /dev/null +++ b/websocketOutportDriver/sender/webSocketSender_test.go @@ -0,0 +1,213 @@ +package sender + +import ( + "errors" + "testing" + "time" + + coreMock "github.com/ElrondNetwork/elrond-go-core/core/mock" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/mock" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" +) + +func TestNewWebSocketSender(t *testing.T) { + t.Parallel() + + t.Run("nil server", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + args.Server = nil + + wss, err := NewWebSocketSender(args) + require.Nil(t, wss) + require.Equal(t, data.ErrNilHttpServer, err) + }) + + t.Run("nil uint64 byte slice converter", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + args.Uint64ByteSliceConverter = nil + + wss, err := NewWebSocketSender(args) + require.Nil(t, wss) + require.Equal(t, data.ErrNilUint64ByteSliceConverter, err) + }) + + t.Run("nil logger", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + args.Log = nil + + wss, err := NewWebSocketSender(args) + require.Nil(t, wss) + require.Equal(t, data.ErrNilLogger, err) + require.True(t, wss.IsInterfaceNil()) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + + wss, err := NewWebSocketSender(args) + require.NoError(t, err) + require.NotNil(t, wss) + require.False(t, wss.IsInterfaceNil()) + }) +} + +func TestWebSocketSender_AddClient(t *testing.T) { + t.Parallel() + + t.Run("nil client", func(t *testing.T) { + t.Parallel() + + wss, _ := NewWebSocketSender(getMockWebSocketSender()) + + wss.AddClient(nil, "remote addr") + require.Equal(t, 0, len(wss.clientsHolder.GetAll())) + }) + + t.Run("should work - without acknowledge", func(t *testing.T) { + t.Parallel() + + wss, _ := NewWebSocketSender(getMockWebSocketSender()) + + wss.AddClient(&mock.WebsocketConnectionStub{}, "remote addr") + + clients := wss.clientsHolder.GetAll() + require.NotNil(t, clients["remote addr"]) + + wss.acknowledges.mut.Lock() + acksForAddress := wss.acknowledges.acknowledges["remote addr"] + wss.acknowledges.mut.Unlock() + + require.Nil(t, acksForAddress) + }) + + t.Run("should work - with acknowledge", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + args.WithAcknowledge = true + + wss, _ := NewWebSocketSender(args) + + wss.AddClient(&mock.WebsocketConnectionStub{ + ReadMessageCalled: func() (_ int, _ []byte, err error) { + err = errors.New("early exit - close the go routine") + return + }, + }, "remote addr") + + clients := wss.clientsHolder.GetAll() + require.NotNil(t, clients["remote addr"]) + + wss.acknowledges.mut.Lock() + acksForAddress := wss.acknowledges.acknowledges["remote addr"] + wss.acknowledges.mut.Unlock() + + require.NotNil(t, acksForAddress) + }) +} + +func TestWebSocketSender_Send(t *testing.T) { + t.Parallel() + + t.Run("should error because no clients exist", func(t *testing.T) { + t.Parallel() + + wss, _ := NewWebSocketSender(getMockWebSocketSender()) + + err := wss.Send(data.WsSendArgs{ + Payload: []byte("payload"), + }) + require.Equal(t, data.ErrNoClientToSendTo, err) + }) + + t.Run("should work - without acknowledge", func(t *testing.T) { + t.Parallel() + + wss, _ := NewWebSocketSender(getMockWebSocketSender()) + + wss.AddClient(&mock.WebsocketConnectionStub{ + ReadMessageCalled: func() (_ int, _ []byte, err error) { + err = errors.New("early exit - close the go routine") + return + }, + }, "remote addr") + + err := wss.Send(data.WsSendArgs{ + Payload: []byte("payload"), + }) + require.NoError(t, err) + }) + + t.Run("should work - with acknowledge", func(t *testing.T) { + t.Parallel() + + args := getMockWebSocketSender() + args.WithAcknowledge = true + wss, _ := NewWebSocketSender(args) + + var ack []byte + + chClientAck := make(chan bool) + wasMsgProcessed := false + + wss.AddClient(&mock.WebsocketConnectionStub{ + ReadMessageCalled: func() (msgType int, payload []byte, err error) { + if wasMsgProcessed { + time.Sleep(100 * time.Millisecond) + msgType = websocket.BinaryMessage + err = errors.New("end") + return + } + + <-chClientAck + + time.Sleep(100 * time.Millisecond) + + msgType = websocket.BinaryMessage + payload = ack + err = nil + wasMsgProcessed = true + + return + }, + WriteMessageCalled: func(_ int, data []byte) error { + ack = data[1:3] + chClientAck <- true + + return nil + }, + }, "remote addr") + + err := wss.Send(data.WsSendArgs{ + Payload: []byte("payload"), + }) + require.NoError(t, err) + }) +} + +func TestWebSocketSender_Close(t *testing.T) { + t.Parallel() + + wss, _ := NewWebSocketSender(getMockWebSocketSender()) + + err := wss.Close() + require.NoError(t, err) +} + +func getMockWebSocketSender() WebSocketSenderArgs { + return WebSocketSenderArgs{ + Server: &mock.HttpServerStub{}, + Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterStub{}, + Log: coreMock.LoggerMock{}, + } +} diff --git a/websocketOutportDriver/sender/websocketClientAcknowledgesHolder.go b/websocketOutportDriver/sender/websocketClientAcknowledgesHolder.go new file mode 100644 index 000000000..d4e7a9e3d --- /dev/null +++ b/websocketOutportDriver/sender/websocketClientAcknowledgesHolder.go @@ -0,0 +1,37 @@ +package sender + +import "sync" + +type websocketClientAcknowledgesHolder struct { + acks map[uint64]struct{} + mutAcks sync.Mutex +} + +// NewWebsocketClientAcknowledgesHolder will return a new instance of websocketAcknowledgesHolder +func NewWebsocketClientAcknowledgesHolder() *websocketClientAcknowledgesHolder { + return &websocketClientAcknowledgesHolder{ + acks: make(map[uint64]struct{}), + } +} + +// Add will add an element +func (wah *websocketClientAcknowledgesHolder) Add(counter uint64) { + wah.mutAcks.Lock() + wah.acks[counter] = struct{}{} + wah.mutAcks.Unlock() +} + +// ProcessAcknowledged will process the acknowledgment for the given counter. If found, the element will also be +// removed from the inner map +func (wah *websocketClientAcknowledgesHolder) ProcessAcknowledged(counter uint64) bool { + wah.mutAcks.Lock() + defer wah.mutAcks.Unlock() + + _, exists := wah.acks[counter] + if !exists { + return false + } + + delete(wah.acks, counter) + return true +} diff --git a/websocketOutportDriver/sender/websocketClientAcknowledgesHolder_test.go b/websocketOutportDriver/sender/websocketClientAcknowledgesHolder_test.go new file mode 100644 index 000000000..f78523ed7 --- /dev/null +++ b/websocketOutportDriver/sender/websocketClientAcknowledgesHolder_test.go @@ -0,0 +1,84 @@ +package sender + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewWebsocketClientAcknowledgesHolder(t *testing.T) { + t.Parallel() + + wcah := NewWebsocketClientAcknowledgesHolder() + require.NotNil(t, wcah) +} + +func TestWebsocketClientAcknowledgesHolder_Add(t *testing.T) { + t.Parallel() + + counter := uint64(37) + wcah := NewWebsocketClientAcknowledgesHolder() + wcah.Add(counter) + + wcah.mutAcks.Lock() + res, found := wcah.acks[counter] + wcah.mutAcks.Unlock() + + require.True(t, found) + require.NotNil(t, res) +} + +func TestWebsocketClientAcknowledgesHolder_ProcessAcknowledged(t *testing.T) { + t.Parallel() + + t.Run("ProcessAcknowledged: should not find", func(t *testing.T) { + t.Parallel() + + wcah := NewWebsocketClientAcknowledgesHolder() + res := wcah.ProcessAcknowledged(5) + require.False(t, res) + }) + + t.Run("ProcessAcknowledged: should find and remove from inner map", func(t *testing.T) { + t.Parallel() + + counter := uint64(37) + wcah := NewWebsocketClientAcknowledgesHolder() + wcah.Add(counter) + + res := wcah.ProcessAcknowledged(counter) + require.True(t, res) + + wcah.mutAcks.Lock() + require.Equal(t, 0, len(wcah.acks)) + wcah.mutAcks.Unlock() + }) +} + +func TestWebsocketClientAcknowledgesHolder_ConcurrentOperations(t *testing.T) { + t.Parallel() + + wcah := NewWebsocketClientAcknowledgesHolder() + + defer func() { + r := recover() + require.Nil(t, r) + }() + + wg := sync.WaitGroup{} + wg.Add(100) + + for i := uint64(0); i < 100; i++ { + go func(index uint64) { + switch index % 2 { + case 0: + wcah.Add(index) + case 1: + wcah.ProcessAcknowledged(index) + } + wg.Done() + }(i) + } + wg.Wait() +} diff --git a/websocketOutportDriver/sender/websocketClientsHolder.go b/websocketOutportDriver/sender/websocketClientsHolder.go new file mode 100644 index 000000000..baa623f3b --- /dev/null +++ b/websocketOutportDriver/sender/websocketClientsHolder.go @@ -0,0 +1,60 @@ +package sender + +import ( + "sync" + + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" +) + +type websocketClientsHolder struct { + clients map[string]*webSocketClient + mut sync.RWMutex +} + +// NewWebsocketClientsHolder will return a new instance of websocketClientsHolder +func NewWebsocketClientsHolder() *websocketClientsHolder { + return &websocketClientsHolder{ + clients: make(map[string]*webSocketClient), + } +} + +// AddClient will add the provided client to the internal members +func (wch *websocketClientsHolder) AddClient(client *webSocketClient) error { + if client == nil { + return data.ErrNilWebSocketClient + } + + wch.mut.Lock() + wch.clients[client.remoteAddr] = client + wch.mut.Unlock() + + return nil +} + +// GetAll will return all the clients +func (wch *websocketClientsHolder) GetAll() map[string]*webSocketClient { + wch.mut.RLock() + defer wch.mut.RUnlock() + + clientsMap := make(map[string]*webSocketClient, len(wch.clients)) + for remoteAddr, client := range wch.clients { + clientsMap[remoteAddr] = client + } + + return clientsMap +} + +// CloseAndRemove will handle the closing of the connection and the deletion from the internal map +func (wch *websocketClientsHolder) CloseAndRemove(remoteAddr string) error { + wch.mut.Lock() + defer wch.mut.Unlock() + + client, ok := wch.clients[remoteAddr] + if !ok { + return data.ErrWebSocketClientNotFound + } + + delete(wch.clients, remoteAddr) + + return client.conn.Close() +} diff --git a/websocketOutportDriver/sender/websocketClientsHolder_test.go b/websocketOutportDriver/sender/websocketClientsHolder_test.go new file mode 100644 index 000000000..18fa5ba27 --- /dev/null +++ b/websocketOutportDriver/sender/websocketClientsHolder_test.go @@ -0,0 +1,84 @@ +package sender + +import ( + "testing" + + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/mock" + "github.com/stretchr/testify/require" +) + +func TestNewWebsocketClientsHolder(t *testing.T) { + t.Parallel() + + wch := NewWebsocketClientsHolder() + require.NotNil(t, wch) +} + +func TestWebsocketClientsHolder_AddClient(t *testing.T) { + t.Parallel() + + t.Run("nil web socket client", func(t *testing.T) { + t.Parallel() + + wch := NewWebsocketClientsHolder() + err := wch.AddClient(nil) + require.Equal(t, data.ErrNilWebSocketClient, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + cl := &webSocketClient{} + wch := NewWebsocketClientsHolder() + err := wch.AddClient(cl) + require.NoError(t, err) + }) +} + +func TestWebsocketClientsHolder_GetAll(t *testing.T) { + t.Parallel() + + cl0 := &webSocketClient{remoteAddr: "cl0"} + cl1 := &webSocketClient{remoteAddr: "cl1"} + + wch := NewWebsocketClientsHolder() + + _ = wch.AddClient(cl0) + _ = wch.AddClient(cl1) + + clients := wch.GetAll() + require.Equal(t, cl0, clients["cl0"]) + require.Equal(t, cl1, clients["cl1"]) + require.Equal(t, 2, len(clients)) +} + +func TestWebsocketClientsHolder_CloseAndRemove(t *testing.T) { + t.Parallel() + + t.Run("CloseAndRemove should error because the client is not found", func(t *testing.T) { + t.Parallel() + + wch := NewWebsocketClientsHolder() + + err := wch.CloseAndRemove("new address") + require.Equal(t, data.ErrWebSocketClientNotFound, err) + }) + t.Run("CloseAndRemove should work", func(t *testing.T) { + t.Parallel() + + wch := NewWebsocketClientsHolder() + closeWasCalled := false + _ = wch.AddClient(&webSocketClient{remoteAddr: "cl", conn: &mock.WebsocketConnectionStub{ + CloseCalled: func() error { + closeWasCalled = true + return nil + }, + }}) + + err := wch.CloseAndRemove("cl") + require.NoError(t, err) + require.True(t, closeWasCalled) + }) + +} diff --git a/websocketOutportDriver/tests/realtest/README.md b/websocketOutportDriver/tests/realtest/README.md new file mode 100644 index 000000000..6af98a7c9 --- /dev/null +++ b/websocketOutportDriver/tests/realtest/README.md @@ -0,0 +1,15 @@ +# About +This directory contains test server and client applications. + +The server and client apps use `127.0.0.1:21111` for communicating via websockets. Combined, the server +sends requests continuously, and the clients will process them and send acknowledge for all processed requests. + +# How to use +1. go to `server` directory +2. `go build` +3. `./server` +4. from another terminal go to `client` directory +5. `go build` +6. `./bin` + +Step number 6 might be executed again in other terminal, as to simulate multiple clients diff --git a/websocketOutportDriver/tests/realtest/client/bin/main.go b/websocketOutportDriver/tests/realtest/client/bin/main.go new file mode 100644 index 000000000..d6dc4f487 --- /dev/null +++ b/websocketOutportDriver/tests/realtest/client/bin/main.go @@ -0,0 +1,25 @@ +package main + +import ( + "flag" + "log" + + "github.com/ElrondNetwork/elrond-go-core/marshal" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/tests/realtest/client" +) + +var ( + addr = flag.String("name", "client 0", "-") + port = flag.Int("port", 21112, "-") +) + +func main() { + tc, err := client.NewTempClient(*addr, &marshal.JsonMarshalizer{}) + if err != nil { + log.Fatal(err.Error()) + } + + defer tc.Stop() + + tc.Run(*port) +} diff --git a/websocketOutportDriver/tests/realtest/client/client.go b/websocketOutportDriver/tests/realtest/client/client.go new file mode 100644 index 000000000..aa62cab12 --- /dev/null +++ b/websocketOutportDriver/tests/realtest/client/client.go @@ -0,0 +1,155 @@ +package client + +import ( + "errors" + "fmt" + "io" + "net/url" + "os" + "os/signal" + "time" + + "github.com/ElrondNetwork/elrond-go-core/core/check" + "github.com/ElrondNetwork/elrond-go-core/core/mock" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + "github.com/ElrondNetwork/elrond-go-core/data/typeConverters/uint64ByteSlice" + "github.com/ElrondNetwork/elrond-go-core/marshal" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/gorilla/websocket" +) + +// WSConn defines what a sender shall do +type WSConn interface { + io.Closer + ReadMessage() (messageType int, p []byte, err error) + WriteMessage(messageType int, data []byte) error +} + +var ( + log = &mock.LoggerMock{} + errNilMarshaller = errors.New("nil marshaller") + uint64ByteSliceConverter = uint64ByteSlice.NewBigEndianConverter() +) + +type tempClient struct { + name string + marshaller marshal.Marshalizer + chanStop chan bool +} + +// NewTempClient will return a new instance of tempClient +func NewTempClient(name string, marshaller marshal.Marshalizer) (*tempClient, error) { + if check.IfNil(marshaller) { + return nil, errNilMarshaller + } + + return &tempClient{ + name: name, + marshaller: marshaller, + chanStop: make(chan bool), + }, nil +} + +// Run will start the client on the provided port +func (tc *tempClient) Run(port int) { + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + urlReceiveData := url.URL{Scheme: "ws", Host: fmt.Sprintf("127.0.0.1:%d", port), Path: "/operations"} + log.Info(tc.name+" -> connecting to", "url", urlReceiveData.String()) + wsConnection, _, err := websocket.DefaultDialer.Dial(urlReceiveData.String(), nil) + if err != nil { + log.Error(tc.name+" -> dial", "error", err) + } + defer func() { + err = wsConnection.Close() + log.LogIfError(err) + }() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, message, err := wsConnection.ReadMessage() + if err != nil { + log.Error(tc.name+" -> error read message", "error", err) + return + } + + tc.verifyPayloadAndSendAckIfNeeded(message, wsConnection) + } + }() + + timer := time.NewTimer(time.Second) + defer timer.Stop() + + for { + select { + case <-done: + return + case <-timer.C: + case <-interrupt: + log.Info(tc.name + " -> interrupt") + + // Cleanly close the connection by sending a close message and then + // waiting (with timeout) for the server to close the connection. + err = wsConnection.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + log.Error(tc.name+" -> write close", "error", err) + return + } + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } +} + +func (tc *tempClient) verifyPayloadAndSendAckIfNeeded(payload []byte, ackHandler WSConn) { + if len(payload) == 0 { + log.Error(tc.name + " -> empty payload") + return + } + + payloadParser, _ := websocketOutportDriver.NewWebSocketPayloadParser(uint64ByteSliceConverter) + payloadData, err := payloadParser.ExtractPayloadData(payload) + if err != nil { + log.Error(tc.name + " -> error while extracting payload data: " + err.Error()) + return + } + + log.Info(tc.name+" -> processing payload", + "counter", payloadData.Counter, + "operation type", payloadData.OperationType, + "message length", len(payloadData.Payload), + "data", payloadData.Payload, + ) + + if payloadData.OperationType.Uint32() == data.OperationSaveBlock.Uint32() { + log.Debug(tc.name + " -> save block operation") + var argsBlock outport.ArgsSaveBlockData + err = tc.marshaller.Unmarshal(&argsBlock, payload) + if err != nil { + log.Error(tc.name+" -> cannot unmarshal block", "error", err) + } else { + log.Info(tc.name+" -> successfully unmarshalled block", "hash", argsBlock.HeaderHash) + } + } + + if payloadData.WithAcknowledge { + counterBytes := uint64ByteSliceConverter.ToByteSlice(payloadData.Counter) + err = ackHandler.WriteMessage(websocket.BinaryMessage, counterBytes) + if err != nil { + log.Error(tc.name + " -> " + err.Error()) + } + } +} + +// Stop - +func (tc *tempClient) Stop() { + tc.chanStop <- true +} diff --git a/websocketOutportDriver/tests/realtest/server/interface.go b/websocketOutportDriver/tests/realtest/server/interface.go new file mode 100644 index 000000000..758dad71a --- /dev/null +++ b/websocketOutportDriver/tests/realtest/server/interface.go @@ -0,0 +1,20 @@ +package main + +import ( + "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/outport" +) + +// Driver is an interface for saving node specific data to other storage. +// This could be an elastic search index, a MySql database or any other external services. +type Driver interface { + SaveBlock(args *outport.ArgsSaveBlockData) error + RevertIndexedBlock(header data.HeaderHandler, body data.BodyHandler) error + SaveRoundsInfo(roundsInfos []*outport.RoundInfo) error + SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte, epoch uint32) error + SaveValidatorsRating(indexID string, infoRating []*outport.ValidatorRatingInfo) error + SaveAccounts(blockTimestamp uint64, acc map[string]*outport.AlteredAccount, shardID uint32) error + FinalizedBlock(headerHash []byte) error + Close() error + IsInterfaceNil() bool +} diff --git a/websocketOutportDriver/tests/realtest/server/server.go b/websocketOutportDriver/tests/realtest/server/server.go new file mode 100644 index 000000000..ae33e7b17 --- /dev/null +++ b/websocketOutportDriver/tests/realtest/server/server.go @@ -0,0 +1,85 @@ +package main + +import ( + "fmt" + "time" + + "github.com/ElrondNetwork/elrond-go-core/core/mock" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + "github.com/ElrondNetwork/elrond-go-core/data/typeConverters/uint64ByteSlice" + "github.com/ElrondNetwork/elrond-go-core/marshal" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/factory" +) + +var jsonMarshaller = &marshal.JsonMarshalizer{} + +func main() { + server, err := createServer() + if err != nil { + fmt.Println("cannot create server: ", err.Error()) + return + } + + timeoutChan := make(chan bool) + go func(tChan chan bool) { + time.Sleep(1 * time.Minute) + tChan <- true + }(timeoutChan) + + funcCloseServer := func() { + err = server.Close() + if err != nil { + fmt.Println(err.Error()) + } + } + + for { + select { + case <-timeoutChan: + funcCloseServer() + default: + time.Sleep(2 * time.Second) + doAction(server) + } + } +} + +func doAction(server Driver) { + err := server.SaveBlock(&outport.ArgsSaveBlockData{HeaderHash: []byte("header hash")}) + if err != nil { + fmt.Println(err.Error()) + } + + err = server.SaveAccounts(1155, nil, 0) + if err != nil { + fmt.Println(err.Error()) + } + + err = server.FinalizedBlock([]byte("reverted header hash")) + if err != nil { + fmt.Println(err.Error()) + } + + err = server.SaveRoundsInfo(nil) + if err != nil { + fmt.Println(err.Error()) + } +} + +func createServer() (Driver, error) { + wsFactory, err := factory.NewOutportDriverWebSocketSenderFactory(factory.OutportDriverWebSocketSenderFactoryArgs{ + Marshaller: jsonMarshaller, + WebSocketConfig: data.WebSocketConfig{ + URL: "127.0.0.1:21112", + }, + Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(), + Log: &mock.LoggerMock{}, + WithAcknowledge: true, + }) + if err != nil { + return nil, err + } + + return wsFactory.Create() +} diff --git a/websocketOutportDriver/websocketOutportDriverNodePart.go b/websocketOutportDriver/websocketOutportDriverNodePart.go new file mode 100644 index 000000000..46e5556ec --- /dev/null +++ b/websocketOutportDriver/websocketOutportDriverNodePart.go @@ -0,0 +1,177 @@ +package websocketOutportDriver + +import ( + "fmt" + + "github.com/ElrondNetwork/elrond-go-core/core" + "github.com/ElrondNetwork/elrond-go-core/core/atomic" + "github.com/ElrondNetwork/elrond-go-core/core/check" + "github.com/ElrondNetwork/elrond-go-core/data" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + "github.com/ElrondNetwork/elrond-go-core/marshal" + outportSenderData "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" +) + +// WebsocketOutportDriverNodePartArgs holds the arguments needed for creating a new websocketOutportDriverNodePart +type WebsocketOutportDriverNodePartArgs struct { + Enabled bool + Marshaller marshal.Marshalizer + WebsocketSender WebSocketSenderHandler + WebSocketConfig outportSenderData.WebSocketConfig + Uint64ByteSliceConverter Uint64ByteSliceConverter + Log core.Logger +} + +type websocketOutportDriverNodePart struct { + marshalizer marshal.Marshalizer + log core.Logger + uint64ByteSliceConverter Uint64ByteSliceConverter + webSocketSender WebSocketSenderHandler + isClosed atomic.Flag +} + +// NewWebsocketOutportDriverNodePart will create a new instance of websocketOutportDriverNodePart +func NewWebsocketOutportDriverNodePart(args WebsocketOutportDriverNodePartArgs) (*websocketOutportDriverNodePart, error) { + if check.IfNil(args.Marshaller) { + return nil, outportSenderData.ErrNilMarshaller + } + if check.IfNil(args.WebsocketSender) { + return nil, outportSenderData.ErrNilWebSocketSender + } + if check.IfNil(args.Uint64ByteSliceConverter) { + return nil, outportSenderData.ErrNilUint64ByteSliceConverter + } + if check.IfNil(args.Log) { + return nil, outportSenderData.ErrNilLogger + } + + isClosedFlag := atomic.Flag{} + isClosedFlag.SetValue(false) + + return &websocketOutportDriverNodePart{ + marshalizer: args.Marshaller, + webSocketSender: args.WebsocketSender, + uint64ByteSliceConverter: args.Uint64ByteSliceConverter, + log: args.Log, + isClosed: isClosedFlag, + }, nil +} + +// SaveBlock will send the provided block saving arguments within the websocket +func (o *websocketOutportDriverNodePart) SaveBlock(args *outport.ArgsSaveBlockData) error { + argsSaveBlock := outportSenderData.ArgsSaveBlock{ + HeaderType: core.GetHeaderType(args.Header), + ArgsSaveBlockData: prepareArgsSaveBlock(*args), + } + + return o.handleAction(argsSaveBlock, outportSenderData.OperationSaveBlock) +} + +// RevertIndexedBlock will handle the action of reverting the indexed block +func (o *websocketOutportDriverNodePart) RevertIndexedBlock(header data.HeaderHandler, body data.BodyHandler) error { + args := outportSenderData.ArgsRevertIndexedBlock{ + Header: header, + Body: body, + HeaderType: core.GetHeaderType(header), + } + + return o.handleAction(args, outportSenderData.OperationRevertIndexedBlock) +} + +// SaveRoundsInfo will handle the saving of rounds +func (o *websocketOutportDriverNodePart) SaveRoundsInfo(roundsInfos []*outport.RoundInfo) error { + args := outportSenderData.ArgsSaveRoundsInfo{ + RoundsInfos: roundsInfos, + } + + return o.handleAction(args, outportSenderData.OperationSaveRoundsInfo) +} + +// SaveValidatorsPubKeys will handle the saving of the validators' public keys +func (o *websocketOutportDriverNodePart) SaveValidatorsPubKeys(validatorsPubKeys map[uint32][][]byte, epoch uint32) error { + args := outportSenderData.ArgsSaveValidatorsPubKeys{ + ValidatorsPubKeys: validatorsPubKeys, + Epoch: epoch, + } + + return o.handleAction(args, outportSenderData.OperationSaveValidatorsPubKeys) +} + +// SaveValidatorsRating will handle the saving of the validators' rating +func (o *websocketOutportDriverNodePart) SaveValidatorsRating(indexID string, infoRating []*outport.ValidatorRatingInfo) error { + args := outportSenderData.ArgsSaveValidatorsRating{ + IndexID: indexID, + InfoRating: infoRating, + } + + return o.handleAction(args, outportSenderData.OperationSaveValidatorsRating) +} + +// SaveAccounts will handle the accounts' saving +func (o *websocketOutportDriverNodePart) SaveAccounts(blockTimestamp uint64, acc map[string]*outport.AlteredAccount, shardID uint32) error { + args := outportSenderData.ArgsSaveAccounts{ + BlockTimestamp: blockTimestamp, + Acc: acc, + ShardID: shardID, + } + + return o.handleAction(args, outportSenderData.OperationSaveAccounts) +} + +// FinalizedBlock will handle the finalized block +func (o *websocketOutportDriverNodePart) FinalizedBlock(headerHash []byte) error { + args := outportSenderData.ArgsFinalizedBlock{ + HeaderHash: headerHash, + } + + return o.handleAction(args, outportSenderData.OperationFinalizedBlock) +} + +// Close will handle the closing of the outport driver web socket sender +func (o *websocketOutportDriverNodePart) Close() error { + o.isClosed.SetValue(true) + return o.webSocketSender.Close() +} + +// IsInterfaceNil returns true if there is no value under the interface +func (o *websocketOutportDriverNodePart) IsInterfaceNil() bool { + return o == nil +} + +func (o *websocketOutportDriverNodePart) handleAction(args interface{}, operation outportSenderData.OperationType) error { + if o.isClosed.IsSet() { + return outportSenderData.ErrWebSocketServerIsClosed + } + + marshaledBlock, err := o.marshalizer.Marshal(args) + if err != nil { + o.log.Error("cannot marshal block", "operation", operation.String(), "error", err) + return fmt.Errorf("%w while marshaling block for operation %s", err, operation.String()) + } + + payload := o.preparePayload(operation, marshaledBlock) + + err = o.webSocketSender.Send(outportSenderData.WsSendArgs{ + Payload: payload, + }) + if err != nil { + o.log.Error("cannot send on route", "operation", operation.String(), "error", err) + return fmt.Errorf("%w while sending data on route for operation %s", err, operation.String()) + } + + return nil +} + +func (o *websocketOutportDriverNodePart) preparePayload(operation outportSenderData.OperationType, data []byte) []byte { + opBytes := o.uint64ByteSliceConverter.ToByteSlice(uint64(operation.Uint32())) + opBytes = opBytes[uint32NumBytes:] + + messageLength := uint64(len(data)) + messageLengthBytes := o.uint64ByteSliceConverter.ToByteSlice(messageLength) + messageLengthBytes = messageLengthBytes[uint32NumBytes:] + + payload := append(opBytes, messageLengthBytes...) + payload = append(payload, data...) + + return payload +} diff --git a/websocketOutportDriver/websocketOutportDriverNodePart_test.go b/websocketOutportDriver/websocketOutportDriverNodePart_test.go new file mode 100644 index 000000000..82f6fcd4c --- /dev/null +++ b/websocketOutportDriver/websocketOutportDriverNodePart_test.go @@ -0,0 +1,369 @@ +package websocketOutportDriver + +import ( + "errors" + "testing" + + "github.com/ElrondNetwork/elrond-go-core/core" + coreMock "github.com/ElrondNetwork/elrond-go-core/core/mock" + "github.com/ElrondNetwork/elrond-go-core/data/block" + "github.com/ElrondNetwork/elrond-go-core/data/outport" + "github.com/ElrondNetwork/elrond-go-core/data/typeConverters/uint64ByteSlice" + "github.com/ElrondNetwork/elrond-go-core/marshal" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/data" + "github.com/ElrondNetwork/elrond-go-core/websocketOutportDriver/mock" + "github.com/stretchr/testify/require" +) + +var cannotSendOnRouteErr = errors.New("cannot send on route") + +func getMockArgs() WebsocketOutportDriverNodePartArgs { + return WebsocketOutportDriverNodePartArgs{ + Enabled: true, + Marshaller: &marshal.JsonMarshalizer{}, + WebSocketConfig: data.WebSocketConfig{ + URL: "localhost:5555", + }, + WebsocketSender: &mock.WebSocketSenderStub{}, + Log: &coreMock.LoggerMock{}, + Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(), + } +} + +func TestNewWebsocketOutportDriverNodePart(t *testing.T) { + t.Parallel() + + t.Run("nil marshaller", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + args.Marshaller = nil + + o, err := NewWebsocketOutportDriverNodePart(args) + require.Nil(t, o) + require.Equal(t, data.ErrNilMarshaller, err) + }) + + t.Run("nil uint64 byte slice converter", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + args.Uint64ByteSliceConverter = nil + + o, err := NewWebsocketOutportDriverNodePart(args) + require.Nil(t, o) + require.Equal(t, data.ErrNilUint64ByteSliceConverter, err) + }) + + t.Run("nil uint64 byte slice converter", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + args.Uint64ByteSliceConverter = nil + + o, err := NewWebsocketOutportDriverNodePart(args) + require.Nil(t, o) + require.Equal(t, data.ErrNilUint64ByteSliceConverter, err) + }) + + t.Run("nil logger", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + args.Log = nil + + o, err := NewWebsocketOutportDriverNodePart(args) + require.Nil(t, o) + require.Equal(t, data.ErrNilLogger, err) + }) + + t.Run("should work", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + + o, err := NewWebsocketOutportDriverNodePart(args) + require.NotNil(t, o) + require.NoError(t, err) + require.False(t, o.IsInterfaceNil()) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveBlock(t *testing.T) { + t.Parallel() + + t.Run("SaveBlock - should error", func(t *testing.T) { + t.Parallel() + + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveBlock(&outport.ArgsSaveBlockData{}) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("SaveBlock - should work", func(t *testing.T) { + t.Parallel() + + defer func() { + r := recover() + require.Nil(t, r) + }() + args := getMockArgs() + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveBlock(&outport.ArgsSaveBlockData{}) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_FinalizedBlock(t *testing.T) { + t.Parallel() + + t.Run("Finalized block - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.FinalizedBlock([]byte("header hash")) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("Finalized block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.FinalizedBlock([]byte("header hash")) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_RevertIndexedBlock(t *testing.T) { + t.Parallel() + + t.Run("RevertIndexedBlock - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.RevertIndexedBlock(nil, nil) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("RevertIndexedBlock block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.RevertIndexedBlock(nil, nil) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveAccounts(t *testing.T) { + t.Parallel() + + t.Run("SaveAccounts - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveAccounts(0, nil, 0) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("SaveAccounts block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveAccounts(0, nil, 0) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveRoundsInfo(t *testing.T) { + t.Parallel() + + t.Run("SaveRoundsInfo - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveRoundsInfo(nil) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("SaveRoundsInfo block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveRoundsInfo(nil) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveValidatorsPubKeys(t *testing.T) { + t.Parallel() + + t.Run("SaveValidatorsPubKeys - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveValidatorsPubKeys(nil, 0) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("SaveValidatorsPubKeys block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveValidatorsPubKeys(nil, 0) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveValidatorsRating(t *testing.T) { + t.Parallel() + + t.Run("SaveValidatorsRating - should error", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return cannotSendOnRouteErr + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveValidatorsRating("", nil) + require.True(t, errors.Is(err, cannotSendOnRouteErr)) + }) + + t.Run("SaveValidatorsRating block - should work", func(t *testing.T) { + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(_ data.WsSendArgs) error { + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveValidatorsRating("", nil) + require.NoError(t, err) + }) +} + +func TestWebsocketOutportDriverNodePart_SaveBlock_PayloadCheck(t *testing.T) { + t.Parallel() + + args := getMockArgs() + + marshaledData, err := args.Marshaller.Marshal(&data.ArgsSaveBlock{ + HeaderType: core.MetaHeader, + ArgsSaveBlockData: outport.ArgsSaveBlockData{ + Header: &block.MetaBlock{}, + }, + }) + require.Nil(t, err) + + args.WebsocketSender = &mock.WebSocketSenderStub{ + SendOnRouteCalled: func(args data.WsSendArgs) error { + expectedOpBytes := []byte{0, 0, 0, 0} + expectedLengthBytes := []byte{0, 0, 1, 156} + expectedPayload := append(expectedOpBytes, expectedLengthBytes...) + expectedPayload = append(expectedPayload, marshaledData...) + + require.Equal(t, expectedPayload, args.Payload) + + return nil + }, + } + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.SaveBlock(&outport.ArgsSaveBlockData{Header: &block.MetaBlock{}}) + require.NoError(t, err) +} + +func TestWebsocketOutportDriverNodePart_Close(t *testing.T) { + t.Parallel() + + closedWasCalled := false + args := getMockArgs() + args.WebsocketSender = &mock.WebSocketSenderStub{ + CloseCalled: func() error { + closedWasCalled = true + return nil + }, + } + + o, err := NewWebsocketOutportDriverNodePart(args) + require.NoError(t, err) + + err = o.Close() + require.NoError(t, err) + require.True(t, closedWasCalled) +}