diff --git a/eth-repository/src/main/kotlin/net/consensys/cava/eth/repository/BlockchainRepository.kt b/eth-repository/src/main/kotlin/net/consensys/cava/eth/repository/BlockchainRepository.kt index 2cc731c1..29821d74 100644 --- a/eth-repository/src/main/kotlin/net/consensys/cava/eth/repository/BlockchainRepository.kt +++ b/eth-repository/src/main/kotlin/net/consensys/cava/eth/repository/BlockchainRepository.kt @@ -15,6 +15,7 @@ package net.consensys.cava.eth.repository import net.consensys.cava.bytes.Bytes import net.consensys.cava.bytes.Bytes32 import net.consensys.cava.eth.Block +import net.consensys.cava.eth.BlockBody import net.consensys.cava.eth.BlockHeader import net.consensys.cava.eth.Hash import net.consensys.cava.eth.TransactionReceipt @@ -31,13 +32,13 @@ class BlockchainRepository * Default constructor. * * @param chainMetadata the key-value store to store chain metadata - * @param blockStore the key-value store to store blocks + * @param blockBodyStore the key-value store to store block bodies * @param blockHeaderStore the key-value store to store block headers * @param blockchainIndex the blockchain index to index values */ ( private val chainMetadata: KeyValueStore, - private val blockStore: KeyValueStore, + private val blockBodyStore: KeyValueStore, private val blockHeaderStore: KeyValueStore, private val transactionReceiptsStore: KeyValueStore, private val blockchainIndex: BlockchainIndex @@ -53,7 +54,7 @@ class BlockchainRepository * @return a new blockchain repository made from the metadata passed in parameter. */ suspend fun init( - blockStore: KeyValueStore, + blockBodyStore: KeyValueStore, blockHeaderStore: KeyValueStore, chainMetadata: KeyValueStore, transactionReceiptsStore: KeyValueStore, @@ -61,7 +62,7 @@ class BlockchainRepository genesisBlock: Block ): BlockchainRepository { val repo = BlockchainRepository(chainMetadata, - blockStore, + blockBodyStore, blockHeaderStore, transactionReceiptsStore, blockchainIndex) @@ -72,13 +73,23 @@ class BlockchainRepository } /** - * Stores a block in the repository. + * Stores a block body into the repository. + * + * @param blockBody the block body to store + * @return a handle to the storage operation completion + */ + suspend fun storeBlockBody(blockHash: Hash, blockBody: BlockBody) { + blockBodyStore.put(blockHash.toBytes(), blockBody.toBytes()) + } + + /** + * Stores a block into the repository. * * @param block the block to store * @return a handle to the storage operation completion */ suspend fun storeBlock(block: Block) { - blockStore.put(block.header().hash().toBytes(), block.toBytes()) + storeBlockBody(block.header().hash(), block.body()) blockHeaderStore.put(block.header().hash().toBytes(), block.header().toBytes()) indexBlockHeader(block.header()) } @@ -153,18 +164,38 @@ class BlockchainRepository * @param blockHash the hash of the block stored * @return a future with the bytes if found */ - suspend fun retrieveBlockBytes(blockHash: Hash): Bytes? { - return retrieveBlockBytes(blockHash.toBytes()) + suspend fun retrieveBlockBodyBytes(blockHash: Hash): Bytes? { + return retrieveBlockBodyBytes(blockHash.toBytes()) } /** - * Retrieves a block into the repository as its serialized RLP bytes representation. + * Retrieves a block body into the repository as its serialized RLP bytes representation. * * @param blockHash the hash of the block stored * @return a future with the bytes if found */ - suspend fun retrieveBlockBytes(blockHash: Bytes): Bytes? { - return blockStore.get(blockHash) + suspend fun retrieveBlockBodyBytes(blockHash: Bytes): Bytes? { + return blockBodyStore.get(blockHash) + } + + /** + * Retrieves a block body into the repository. + * + * @param blockHash the hash of the block stored + * @return a future with the block if found + */ + suspend fun retrieveBlockBody(blockHash: Hash): BlockBody? { + return retrieveBlockBody(blockHash.toBytes()) + } + + /** + * Retrieves a block body into the repository. + * + * @param blockHash the hash of the block stored + * @return a future with the block if found + */ + suspend fun retrieveBlockBody(blockHash: Bytes): BlockBody? { + return retrieveBlockBodyBytes(blockHash)?.let { BlockBody.fromBytes(it) } } /** @@ -184,7 +215,9 @@ class BlockchainRepository * @return a future with the block if found */ suspend fun retrieveBlock(blockHash: Bytes): Block? { - return retrieveBlockBytes(blockHash)?.let { Block.fromBytes(it) } ?: return null + return retrieveBlockBody(blockHash)?.let { + body -> this.retrieveBlockHeader(blockHash)?.let { Block(it, body) } + } ?: return null } /** @@ -194,7 +227,7 @@ class BlockchainRepository * @return a future with the block header bytes if found */ suspend fun retrieveBlockHeaderBytes(blockHash: Hash): Bytes? { - return retrieveBlockBytes(blockHash.toBytes()) + return retrieveBlockBodyBytes(blockHash.toBytes()) } /** diff --git a/les/build.gradle b/les/build.gradle new file mode 100644 index 00000000..1a7070d0 --- /dev/null +++ b/les/build.gradle @@ -0,0 +1,25 @@ +dependencies { + compile project(':bytes') + compile project(':concurrent') + compile project(':concurrent-coroutines') + compile project(':crypto') + compile project(':eth') + compile project(':eth-repository') + compile project(':kv') + compile project(':rlpx') + compile 'com.google.guava:guava' + compile 'org.logl:logl-api' + + compileOnly 'io.vertx:vertx-core' + compile 'org.xerial.snappy:snappy-java' + compile 'org.bouncycastle:bcprov-jdk15on' + + testCompile project(':junit') + testCompile 'io.vertx:vertx-core' + testCompile 'org.bouncycastle:bcprov-jdk15on' + testCompile 'org.junit.jupiter:junit-jupiter-api' + testCompile 'org.junit.jupiter:junit-jupiter-params' + testCompile 'org.logl:logl-logl' + + testRuntime 'org.junit.jupiter:junit-jupiter-engine' +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/BlockBodiesMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/BlockBodiesMessage.kt new file mode 100644 index 00000000..b4da5083 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/BlockBodiesMessage.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.eth.BlockBody +import net.consensys.cava.rlp.RLP + +internal data class BlockBodiesMessage( + val reqID: Long, + val bufferValue: Long, + val blockBodies: List +) { + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + writer.writeLong(bufferValue) + writer.writeList(blockBodies) { eltWriter, blockBody -> blockBody.writeTo(eltWriter) } + } + } + + companion object { + + fun read(bytes: Bytes): BlockBodiesMessage { + return RLP.decodeList( + bytes + ) { reader -> + BlockBodiesMessage( + reader.readLong(), + reader.readLong(), + reader.readListContents { BlockBody.readFrom(it) } + ) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/BlockHeadersMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/BlockHeadersMessage.kt new file mode 100644 index 00000000..6e1a02d8 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/BlockHeadersMessage.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.eth.BlockHeader +import net.consensys.cava.rlp.RLP + +internal data class BlockHeadersMessage( + val reqID: Long, + val bufferValue: Long, + val blockHeaders: List +) { + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + writer.writeLong(bufferValue) + writer.writeList { headersWriter -> + for (bh in blockHeaders) { + headersWriter.writeRLP(bh.toBytes()) + } + } + } + } + + companion object { + + fun read(bytes: Bytes): BlockHeadersMessage { + return RLP.decodeList(bytes) { reader -> + val reqID = reader.readLong() + val bufferValue = reader.readLong() + val headers = reader.readListContents { headersReader -> + headersReader.readList { + BlockHeader.readFrom(it) + } + } + BlockHeadersMessage(reqID, bufferValue, headers) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/GetBlockBodiesMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/GetBlockBodiesMessage.kt new file mode 100644 index 00000000..6e7b16ef --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/GetBlockBodiesMessage.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.eth.Hash +import net.consensys.cava.rlp.RLP + +internal data class GetBlockBodiesMessage(val reqID: Long, val blockHashes: List) { + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + writer.writeList(blockHashes) { eltWriter, hash -> eltWriter.writeValue(hash.toBytes()) } + } + } + + companion object { + + fun read(bytes: Bytes): GetBlockBodiesMessage { + return RLP.decodeList( + bytes + ) { reader -> + GetBlockBodiesMessage( + reader.readLong(), + reader.readListContents { elementReader -> Hash.fromBytes(elementReader.readValue()) }) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/GetBlockHeadersMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/GetBlockHeadersMessage.kt new file mode 100644 index 00000000..f5669704 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/GetBlockHeadersMessage.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.bytes.Bytes32 +import net.consensys.cava.rlp.RLP +import net.consensys.cava.units.bigints.UInt256 + +internal data class GetBlockHeadersMessage(val reqID: Long, val queries: List) { + + internal data class BlockHeaderQuery( + val blockNumberOrBlockHash: Bytes32, + val maxHeaders: UInt256, + val skip: UInt256, + val direction: Direction + ) { + + internal enum class Direction { + BACKWARDS, FORWARD + } + } + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + for (query in queries) { + writer.writeList { queryWriter -> + queryWriter.writeValue(query.blockNumberOrBlockHash) + queryWriter.writeUInt256(query.maxHeaders) + queryWriter.writeUInt256(query.skip) + queryWriter.writeInt(if (query.direction == BlockHeaderQuery.Direction.BACKWARDS) 1 else 0) + } + } + } + } + + companion object { + + fun read(bytes: Bytes): GetBlockHeadersMessage { + return RLP.decodeList(bytes) { reader -> + val reqId = reader.readLong() + val queries = ArrayList() + while (!reader.isComplete) { + queries.add( + reader.readList { queryReader -> + BlockHeaderQuery( + Bytes32.wrap(queryReader.readValue()), + queryReader.readUInt256(), + queryReader.readUInt256(), + if (queryReader.readInt() == 1) + BlockHeaderQuery.Direction.BACKWARDS + else + BlockHeaderQuery.Direction.FORWARD + ) + }) + } + GetBlockHeadersMessage(reqId, queries) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/GetReceiptsMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/GetReceiptsMessage.kt new file mode 100644 index 00000000..24b09700 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/GetReceiptsMessage.kt @@ -0,0 +1,40 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.eth.Hash +import net.consensys.cava.rlp.RLP + +internal data class GetReceiptsMessage(val reqID: Long, val blockHashes: List) { + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + writer.writeList(blockHashes) { eltWriter, hash -> eltWriter.writeValue(hash.toBytes()) } + } + } + + companion object { + + fun read(bytes: Bytes): GetReceiptsMessage { + return RLP.decodeList( + bytes + ) { reader -> + GetReceiptsMessage( + reader.readLong(), + reader.readListContents { elementReader -> Hash.fromBytes(elementReader.readValue()) }) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/LESPeerState.kt b/les/src/main/kotlin/net/consensys/cava/les/LESPeerState.kt new file mode 100644 index 00000000..0bba3b97 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/LESPeerState.kt @@ -0,0 +1,32 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.eth.Hash +import java.util.concurrent.ConcurrentHashMap + +internal class LESPeerState { + + var ourStatusMessage: StatusMessage? = null + var peerStatusMessage: StatusMessage? = null + val requestsCache = ConcurrentHashMap>() + + fun handshakeComplete(): Boolean { + ourStatusMessage?.let { + peerStatusMessage?.let { + return true + } + } + return false + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/LESSubProtocolHandler.kt b/les/src/main/kotlin/net/consensys/cava/les/LESSubProtocolHandler.kt new file mode 100644 index 00000000..e660c178 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/LESSubProtocolHandler.kt @@ -0,0 +1,192 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.concurrent.AsyncCompletion +import net.consensys.cava.concurrent.coroutines.asyncCompletion +import net.consensys.cava.eth.BlockBody +import net.consensys.cava.eth.BlockHeader +import net.consensys.cava.eth.TransactionReceipt +import net.consensys.cava.eth.repository.BlockchainRepository +import net.consensys.cava.rlpx.RLPxService +import net.consensys.cava.rlpx.wire.DisconnectReason +import net.consensys.cava.rlpx.wire.SubProtocolHandler +import net.consensys.cava.rlpx.wire.SubProtocolIdentifier +import net.consensys.cava.units.bigints.UInt256 +import java.util.TreeSet +import java.util.concurrent.ConcurrentHashMap +import kotlin.collections.ArrayList +import kotlin.coroutines.CoroutineContext + +internal class LESSubProtocolHandler( + private val service: RLPxService, + private val subProtocolIdentifier: SubProtocolIdentifier, + private val networkId: Int, + private val serveHeaders: Boolean, + private val serveChainSince: UInt256, + private val serveStateSince: UInt256, + private val flowControlBufferLimit: UInt256, + private val flowControlMaximumRequestCostTable: UInt256, + private val flowControlMinimumRateOfRecharge: UInt256, + private val repo: BlockchainRepository, + override val coroutineContext: CoroutineContext = Dispatchers.Default +) : SubProtocolHandler, CoroutineScope { + + private val peerStateMap = ConcurrentHashMap() + + override fun handle(connectionId: String, messageType: Int, message: Bytes): AsyncCompletion { + return asyncCompletion { + val state = peerStateMap.computeIfAbsent(connectionId) { LESPeerState() } + if (messageType == 0) { + if (state.handshakeComplete()) { + service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH) + throw IllegalStateException("Handshake message sent after handshake completed") + } + state.peerStatusMessage = StatusMessage.read(message) + } else { + if (!state.handshakeComplete()) { + service.disconnect(connectionId, DisconnectReason.PROTOCOL_BREACH) + throw IllegalStateException("Message sent before handshake completed") + } + if (messageType == 1) { + throw UnsupportedOperationException() + } else if (messageType == 2) { + val getBlockHeadersMessage = GetBlockHeadersMessage.read(message) + handleGetBlockHeaders(connectionId, getBlockHeadersMessage) + } else if (messageType == 3) { + val blockHeadersMessage = BlockHeadersMessage.read(message) + handleBlockHeadersMessage(blockHeadersMessage) + } else if (messageType == 4) { + val blockBodiesMessage = GetBlockBodiesMessage.read(message) + handleGetBlockBodiesMessage(connectionId, blockBodiesMessage) + } else if (messageType == 5) { + val blockBodiesMessage = BlockBodiesMessage.read(message) + handleBlockBodiesMessage(state, blockBodiesMessage) + } else if (messageType == 6) { + val getReceiptsMessage = GetReceiptsMessage.read(message) + handleGetReceiptsMessage(connectionId, getReceiptsMessage) + } else { + throw UnsupportedOperationException() + } + } + } + } + + private suspend fun handleGetReceiptsMessage( + connectionId: String, + receiptsMessage: GetReceiptsMessage + ) { + val receipts = ArrayList>() + for (blockHash in receiptsMessage.blockHashes) { + repo.retrieveTransactionReceipts(blockHash).let { transactionReceipts -> + receipts.add(transactionReceipts.filterNotNull()) + } + } + return service.send( + subProtocolIdentifier, + 5, + connectionId, + ReceiptsMessage(receiptsMessage.reqID, 0, receipts).toBytes() + ) + } + + private suspend fun handleGetBlockBodiesMessage( + connectionId: String, + blockBodiesMessage: GetBlockBodiesMessage + ) { + val bodies = ArrayList() + for (blockHash in blockBodiesMessage.blockHashes) { + repo.retrieveBlock(blockHash)?.let { block -> + bodies.add(block.body()) + } + } + return service.send( + subProtocolIdentifier, + 5, + connectionId, + BlockBodiesMessage(blockBodiesMessage.reqID, 0, bodies).toBytes() + ) + } + + private suspend fun handleBlockBodiesMessage( + state: LESPeerState, + blockBodiesMessage: BlockBodiesMessage + ) { + for (index in 0..blockBodiesMessage.blockBodies.size) { + state.requestsCache[blockBodiesMessage.reqID]?.get(index)?.let { + repo.storeBlockBody(it, blockBodiesMessage.blockBodies[index]) + } + } + } + + private suspend fun handleBlockHeadersMessage(blockHeadersMessage: BlockHeadersMessage) { + for (header in blockHeadersMessage.blockHeaders) { + repo.storeBlockHeader(header) + } + } + + private suspend fun handleGetBlockHeaders( + connectionId: String, + getBlockHeadersMessage: GetBlockHeadersMessage + ) { + val headersFound = TreeSet() + for (query in getBlockHeadersMessage.queries) { + val hashes = repo.findBlockByHashOrNumber(query.blockNumberOrBlockHash) + for (h in hashes) { + repo.retrieveBlockHeader(h)?.let { header -> + headersFound.add(header) + } + } + } + service.send( + subProtocolIdentifier, + 3, + connectionId, + BlockHeadersMessage(getBlockHeadersMessage.reqID, 0L, ArrayList(headersFound)).toBytes() + ) + } + + override fun handleNewPeerConnection(connectionId: String): AsyncCompletion { + return asyncCompletion { + val head = repo.retrieveChainHead()!! + val genesis = repo.retrieveGenesisBlock()!! + val headTd = head.header().difficulty() + val headHash = head.header().hash() + val state = peerStateMap.computeIfAbsent(connectionId) { LESPeerState() } + state.ourStatusMessage = StatusMessage( + subProtocolIdentifier.version(), + networkId, + headTd, + headHash.toBytes(), + head.header().number(), + genesis.header().hash().toBytes(), + serveHeaders, + serveChainSince, + serveStateSince, + false, + flowControlBufferLimit, + flowControlMaximumRequestCostTable, + flowControlMinimumRateOfRecharge, + 0 + ) + service.send(subProtocolIdentifier, 0, connectionId, state.ourStatusMessage!!.toBytes()) + } + } + + override fun stop(): AsyncCompletion { + return AsyncCompletion.completed() + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/LESSubprotocol.kt b/les/src/main/kotlin/net/consensys/cava/les/LESSubprotocol.kt new file mode 100644 index 00000000..91db81de --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/LESSubprotocol.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import kotlinx.coroutines.Dispatchers +import net.consensys.cava.eth.repository.BlockchainRepository +import net.consensys.cava.rlpx.RLPxService +import net.consensys.cava.rlpx.wire.SubProtocol +import net.consensys.cava.rlpx.wire.SubProtocolHandler +import net.consensys.cava.rlpx.wire.SubProtocolIdentifier +import net.consensys.cava.units.bigints.UInt256 + +/** + * The LES subprotocol entry point class, to be used in conjunction with RLPxService + * + * + * This subprotocol is implemented after the specification presented on the * + * [Ethereum wiki.](https://github.com/ethereum/wiki/wiki/Light-client-protocol) + * + * @see net.consensys.cava.rlpx.RLPxService + */ +class LESSubprotocol +/** + * Default constructor. + * + * @param networkId the identifier, as an integer of the chain to connect to. 0 for testnet, 1 for mainnet. + * @param blockStore the key-value store for blocks + * @param blockHeaderStore the key-value store for block headers + */ + ( + private val networkId: Int, + private val serveHeaders: Boolean, + private val serveChainSince: UInt256, + private val serveStateSince: UInt256, + private val flowControlBufferLimit: UInt256, + private val flowControlMaximumRequestCostTable: UInt256, + private val flowControlMinimumRateOfRecharge: UInt256, + private val repo: BlockchainRepository + ) : SubProtocol { + + override fun id(): SubProtocolIdentifier { + return LES_ID + } + + override fun supports(subProtocolIdentifier: SubProtocolIdentifier): Boolean { + return "les" == subProtocolIdentifier.name() && subProtocolIdentifier.version() == 2 + } + + override fun versionRange(version: Int): Int { + return 21 + } + + override fun createHandler(service: RLPxService): SubProtocolHandler { + return LESSubProtocolHandler( + service, + LES_ID, + networkId, + serveHeaders, + serveChainSince, + serveStateSince, + flowControlBufferLimit, + flowControlMaximumRequestCostTable, + flowControlMinimumRateOfRecharge, + repo, + Dispatchers.Default + ) + } + + companion object { + + internal val LES_ID = SubProtocolIdentifier.of("les", 2) + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/LightClient.kt b/les/src/main/kotlin/net/consensys/cava/les/LightClient.kt new file mode 100644 index 00000000..f8af6811 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/LightClient.kt @@ -0,0 +1,58 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes32 +import net.consensys.cava.eth.BlockBody +import net.consensys.cava.eth.BlockHeader +import net.consensys.cava.eth.Hash +import net.consensys.cava.eth.TransactionReceipt + +/** + * Calls to LES functions from the point of view of the consumer of the subprotocol. + * + * When executing those calls, the client will store all data transferred in the blockchain repository. + * + * + */ +interface LightClient { + + /** + * Get block headers from remote peers. + * + * @param blockNumberOrHash the block number or the hash to start to look for headers from + * @param maxHeaders maximum number of headers to return + * @param skip the number of block apart to skip when returning headers + * @param reverse if true, walk the chain in descending order + */ + fun getBlockHeaders( + blockNumberOrHash: Bytes32, + maxHeaders: Int = 10, + skip: Int = 0, + reverse: Boolean = false + ): List + + /** + * Get block bodies from remote peers. + * + * @param blockHashes hashes identifying block bodies + */ + fun getBlockBodies(vararg blockHashes: Hash): List + + /** + * Get transaction receipts from remote peers for blocks. + * + * @param blockHashes hashes identifying blocks + */ + fun getReceipts(vararg blockHashes: Hash): List> +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/ReceiptsMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/ReceiptsMessage.kt new file mode 100644 index 00000000..eaab50e2 --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/ReceiptsMessage.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.eth.TransactionReceipt +import net.consensys.cava.rlp.RLP + +internal data class ReceiptsMessage( + val reqID: Long, + val bufferValue: Long, + val receipts: List> +) { + + fun toBytes(): Bytes { + return RLP.encodeList { writer -> + writer.writeLong(reqID) + writer.writeLong(bufferValue) + writer.writeList(receipts) { + eltWriter, listOfReceipts -> eltWriter.writeList(listOfReceipts) { + txWriter, txReceipt -> txReceipt.writeTo(txWriter) + } + } + } + } + + companion object { + + fun read(bytes: Bytes): ReceiptsMessage { + return RLP.decodeList( + bytes + ) { reader -> + ReceiptsMessage( + reader.readLong(), + reader.readLong(), + reader.readListContents { listTx -> listTx.readListContents { TransactionReceipt.readFrom(it) } } + ) + } + } + } +} diff --git a/les/src/main/kotlin/net/consensys/cava/les/StatusMessage.kt b/les/src/main/kotlin/net/consensys/cava/les/StatusMessage.kt new file mode 100644 index 00000000..01c4ef8b --- /dev/null +++ b/les/src/main/kotlin/net/consensys/cava/les/StatusMessage.kt @@ -0,0 +1,162 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.bytes.Bytes32 +import net.consensys.cava.rlp.RLP +import net.consensys.cava.units.bigints.UInt256 + +/** + * + * Inform a peer of the sender's current LES state. This message should be sent after the initial handshake and prior to + * any LES related messages. The following keys should be present (except the optional ones) in order to be accepted by + * a LES/1 node: (value types are noted after the key string) + * + * @link https://github.com/ethereum/wiki/wiki/Light-client-protocol + */ +internal data class StatusMessage( + val protocolVersion: Int, + val networkId: Int, + val headTd: UInt256, + val headHash: Bytes32, + val headNum: UInt256, + val genesisHash: Bytes32, + val serveHeaders: Boolean?, + val serveChainSince: UInt256?, + val serveStateSince: UInt256?, + val txRelay: Boolean?, + val flowControlBufferLimit: UInt256, + val flowControlMaximumRequestCostTable: UInt256, + val flowControlMinimumRateOfRecharge: UInt256, + val announceType: Int +) { + + fun toBytes(): Bytes { + return RLP.encode { writer -> + writer.writeList { listWriter -> + listWriter.writeString("protocolVersion") + listWriter.writeInt(protocolVersion) + } + writer.writeList { listWriter -> + listWriter.writeString("networkId") + listWriter.writeInt(networkId) + } + writer.writeList { listWriter -> + listWriter.writeString("headTd") + listWriter.writeUInt256(headTd) + } + writer.writeList { listWriter -> + listWriter.writeString("headHash") + listWriter.writeValue(headHash) + } + writer.writeList { listWriter -> + listWriter.writeString("headNum") + listWriter.writeUInt256(headNum) + } + writer.writeList { listWriter -> + listWriter.writeString("genesisHash") + listWriter.writeValue(genesisHash) + } + if (serveHeaders != null && serveHeaders) { + writer.writeList { listWriter -> listWriter.writeString("serveHeaders") } + } + serveChainSince?.let { + writer.writeList { listWriter -> + listWriter.writeString("serveChainSince") + listWriter.writeUInt256(serveChainSince) + } + } + serveStateSince?.let { + writer.writeList { listWriter -> + listWriter.writeString("serveStateSince") + listWriter.writeUInt256(serveStateSince) + } + } + if (txRelay != null && txRelay) { + writer.writeList { listWriter -> listWriter.writeString("txRelay") } + } + writer.writeList { listWriter -> + listWriter.writeString("flowControl/BL") + listWriter.writeUInt256(flowControlBufferLimit) + } + writer.writeList { listWriter -> + listWriter.writeString("flowControl/MRC") + listWriter.writeUInt256(flowControlMaximumRequestCostTable) + } + writer.writeList { listWriter -> + listWriter.writeString("flowControl/MRR") + listWriter.writeUInt256(flowControlMinimumRateOfRecharge) + } + writer.writeList { listWriter -> + listWriter.writeString("announceType") + listWriter.writeInt(announceType) + } + } + } + + companion object { + + /** + * Reads a status message from bytes, and associates it with a connection ID. + * + * @param bytes the bytes of the message + * @return a new StatusMessage built from the bytes + */ + fun read(bytes: Bytes): StatusMessage { + return RLP.decode(bytes) { reader -> + val parameters = HashMap() + while (!reader.isComplete) { + reader.readList { eltReader -> + val key = eltReader.readString() + + if ("protocolVersion" == key || "networkId" == key || "announceType" == key) { + parameters[key] = eltReader.readInt() + } else if ("headHash" == key || "genesisHash" == key) { + parameters[key] = Bytes32.wrap(eltReader.readValue()) + } else if ("headTd" == key || + "headNum" == key || + "serveChainSince" == key || + "serveStateSince" == key || + "flowControl/BL" == key || + "flowControl/MRC" == key || + "flowControl/MRR" == key + ) { + parameters[key] = eltReader.readUInt256() + } else if ("serveHeaders" == key || "txRelay" == key) { + parameters[key] = true + } + null + } + } + + StatusMessage( + parameters["protocolVersion"] as Int, + parameters["networkId"] as Int, + parameters["headTd"] as UInt256, + parameters["headHash"] as Bytes32, + parameters["headNum"] as UInt256, + parameters["genesisHash"] as Bytes32, + parameters["serveHeaders"] as Boolean?, + parameters["serveChainSince"] as UInt256, + parameters["serveStateSince"] as UInt256, + parameters["txRelay"] as Boolean?, + parameters["flowControl/BL"] as UInt256, + parameters["flowControl/MRC"] as UInt256, + parameters["flowControl/MRR"] as UInt256, + parameters["announceType"] as Int + ) + } + } + } +} diff --git a/les/src/test/kotlin/net/consensys/cava/les/LESSubProtocolHandlerTest.kt b/les/src/test/kotlin/net/consensys/cava/les/LESSubProtocolHandlerTest.kt new file mode 100644 index 00000000..9e2a5e5f --- /dev/null +++ b/les/src/test/kotlin/net/consensys/cava/les/LESSubProtocolHandlerTest.kt @@ -0,0 +1,478 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import kotlinx.coroutines.runBlocking +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.bytes.Bytes32 +import net.consensys.cava.concurrent.AsyncCompletion +import net.consensys.cava.concurrent.coroutines.await +import net.consensys.cava.crypto.SECP256K1 +import net.consensys.cava.eth.Address +import net.consensys.cava.eth.Block +import net.consensys.cava.eth.BlockBody +import net.consensys.cava.eth.BlockHeader +import net.consensys.cava.eth.Hash +import net.consensys.cava.eth.Transaction +import net.consensys.cava.eth.repository.BlockchainIndex +import net.consensys.cava.eth.repository.BlockchainRepository +import net.consensys.cava.junit.BouncyCastleExtension +import net.consensys.cava.junit.LuceneIndexWriter +import net.consensys.cava.junit.LuceneIndexWriterExtension +import net.consensys.cava.junit.VertxExtension +import net.consensys.cava.kv.MapKeyValueStore +import net.consensys.cava.les.LESSubprotocol.Companion.LES_ID +import net.consensys.cava.rlpx.RLPxService +import net.consensys.cava.rlpx.WireConnectionRepository +import net.consensys.cava.rlpx.wire.DisconnectReason +import net.consensys.cava.rlpx.wire.SubProtocolIdentifier +import net.consensys.cava.units.bigints.UInt256 +import net.consensys.cava.units.ethereum.Gas +import net.consensys.cava.units.ethereum.Wei +import org.apache.lucene.index.IndexWriter +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNotNull +import org.junit.jupiter.api.Assertions.assertThrows +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import java.io.IOException +import java.net.InetSocketAddress +import java.time.Instant +import java.time.temporal.ChronoUnit + +@ExtendWith(BouncyCastleExtension::class, VertxExtension::class, LuceneIndexWriterExtension::class) +internal class LESSubProtocolHandlerTest @Throws(IOException::class) +constructor() { + + private val header = BlockHeader( + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Address.fromBytes(Bytes.random(20)), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Bytes32.random(), + UInt256.fromBytes(Bytes32.random()), + UInt256.fromBytes(Bytes32.random()), + Gas.valueOf(3), + Gas.valueOf(2), + Instant.now().truncatedTo(ChronoUnit.SECONDS), + Bytes.of(2, 3, 4), + Hash.fromBytes(Bytes32.random()), + Bytes32.random() + ) + private val body = BlockBody( + listOf( + Transaction( + UInt256.valueOf(1), + Wei.valueOf(2), + Gas.valueOf(2), + Address.fromBytes(Bytes.random(20)), + Wei.valueOf(2), + Bytes.random(12), + SECP256K1.KeyPair.random() + ) + ), + emptyList() + ) + private val block = Block(header, body) + + private class MyRLPxService : RLPxService { + + var message: Bytes? = null + var disconnectReason: DisconnectReason? = null + + override fun connectTo(peerPublicKey: SECP256K1.PublicKey, peerAddress: InetSocketAddress): AsyncCompletion? { + return null + } + + override fun start(): AsyncCompletion? { + return null + } + + override fun stop(): AsyncCompletion? { + return null + } + + override fun send( + subProtocolIdentifier: SubProtocolIdentifier, + messageType: Int, + connectionId: String, + message: Bytes + ) { + this.message = message + } + + override fun broadcast(subProtocolIdentifier: SubProtocolIdentifier, messageType: Int, message: Bytes) { + } + + override fun disconnect(connectionId: String, reason: DisconnectReason) { + this.disconnectReason = reason + } + + override fun repository(): WireConnectionRepository? { + return null + } + } + + @Test + @Throws(Exception::class) + fun sendStatusOnNewConnection(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val service = MyRLPxService() + val block = Block(header, body) + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + handler.handleNewPeerConnection("abc").await() + val message = StatusMessage.read(service.message!!) + assertNotNull(message) + assertEquals(2, message.protocolVersion) + assertEquals(UInt256.ZERO, message.flowControlBufferLimit) + assertEquals(block.header().hash().toBytes(), message.genesisHash) + } + + @Test + @Throws(Exception::class) + fun receiveStatusTwice(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val status = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 0 + ).toBytes() + val service = MyRLPxService() + + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + handler.handleNewPeerConnection("abc").await() + handler.handle("abc", 0, status).await() + assertThrows(IllegalStateException::class.java) { runBlocking { + handler.handle("abc", 0, status).await() + } } + + assertEquals(DisconnectReason.PROTOCOL_BREACH, service.disconnectReason) + } + + @Test + @Throws(Exception::class) + fun receiveOtherMessageBeforeStatus(@LuceneIndexWriter writer: IndexWriter) = runBlocking { + val service = MyRLPxService() + val repo = BlockchainRepository( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer) + ) + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + assertThrows(IllegalStateException::class.java) { runBlocking { + handler.handle("abc", 2, Bytes.random(2)).await() + } } + + assertEquals(DisconnectReason.PROTOCOL_BREACH, service.disconnectReason) + } + + @Test + @Throws(Exception::class) + fun receivedGetBlockHeadersMessage(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val service = MyRLPxService() + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + val status = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 0 + ).toBytes() + handler.handleNewPeerConnection("abc").await() + handler.handle("abc", 0, status).await() + + handler.handle( + "abc", + 2, + GetBlockHeadersMessage( + 1, + listOf( + GetBlockHeadersMessage.BlockHeaderQuery( + Bytes32.random(), + UInt256.valueOf(3), + UInt256.valueOf(0), + GetBlockHeadersMessage.BlockHeaderQuery.Direction.BACKWARDS + ) + ) + ).toBytes() + ).await() + val blockHeaders = BlockHeadersMessage.read(service.message!!) + assertTrue(blockHeaders.blockHeaders.isEmpty()) + } + + @Test + @Throws(Exception::class) + fun receivedBlockHeadersMessage(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val service = MyRLPxService() + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + val status = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 0 + ).toBytes() + + val header = BlockHeader( + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Address.fromBytes(Bytes.random(20)), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Bytes32.random(), + UInt256.fromBytes(Bytes32.random()), + UInt256.fromBytes(Bytes32.random()), + Gas.valueOf(3), + Gas.valueOf(2), + Instant.now().truncatedTo(ChronoUnit.SECONDS), + Bytes.of(2, 3, 4), + Hash.fromBytes(Bytes32.random()), + Bytes32.random() + ) + + handler.handleNewPeerConnection("abc").await() + handler.handle("abc", 0, status).await() + handler.handle("abc", 3, BlockHeadersMessage(1, 2, listOf(header)).toBytes()).await() + val retrieved = repo.retrieveBlockHeader(header.hash()) + assertEquals(header, retrieved) + } + + @Test + @Throws(Exception::class) + fun receivedGetBlockBodiesMessage(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val service = MyRLPxService() + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + val status = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 0 + ).toBytes() + handler.handleNewPeerConnection("abc").await() + handler.handle("abc", 0, status).await() + + handler + .handle("abc", 4, GetBlockBodiesMessage(1, listOf(Hash.fromBytes(Bytes32.random()))).toBytes()).await() + val received = service.message + val blockBodies = BlockBodiesMessage.read(received!!) + assertTrue(blockBodies.blockBodies.isEmpty()) + } + + @Test + @Throws(Exception::class) + fun receivedGetReceiptsMessage(@LuceneIndexWriter writer: IndexWriter) = + runBlocking { + val service = MyRLPxService() + val repo = BlockchainRepository + .init( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer), + block + ) + val handler = LESSubProtocolHandler( + service, + LES_ID, + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + repo + ) + val status = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 0 + ).toBytes() + handler.handleNewPeerConnection("abc").await() + handler.handle("abc", 0, status).await() + + handler + .handle("abc", 4, GetReceiptsMessage(1, listOf(Hash.fromBytes(Bytes32.random()))).toBytes()).await() + val received = service.message + val receipts = ReceiptsMessage.read(received!!) + assertTrue(receipts.receipts.isEmpty()) + } +} diff --git a/les/src/test/kotlin/net/consensys/cava/les/LESSubprotocolTest.kt b/les/src/test/kotlin/net/consensys/cava/les/LESSubprotocolTest.kt new file mode 100644 index 00000000..c80a8621 --- /dev/null +++ b/les/src/test/kotlin/net/consensys/cava/les/LESSubprotocolTest.kt @@ -0,0 +1,101 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.eth.repository.BlockchainIndex +import net.consensys.cava.eth.repository.BlockchainRepository +import net.consensys.cava.junit.LuceneIndexWriter +import net.consensys.cava.junit.LuceneIndexWriterExtension +import net.consensys.cava.junit.TempDirectoryExtension +import net.consensys.cava.kv.MapKeyValueStore +import net.consensys.cava.rlpx.wire.SubProtocolIdentifier +import net.consensys.cava.units.bigints.UInt256 +import org.apache.lucene.index.IndexWriter +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.extension.ExtendWith + +@ExtendWith(TempDirectoryExtension::class, LuceneIndexWriterExtension::class) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +internal class LESSubprotocolTest { + + @Test + @Throws(Exception::class) + fun supportsLESv2(@LuceneIndexWriter writer: IndexWriter) { + + val sp = LESSubprotocol( + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + BlockchainRepository( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer) + ) + ) + assertTrue(sp.supports(SubProtocolIdentifier.of("les", 2))) + } + + @Test + @Throws(Exception::class) + fun noSupportForv3(@LuceneIndexWriter writer: IndexWriter) { + + val sp = LESSubprotocol( + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + BlockchainRepository( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer) + ) + ) + assertFalse(sp.supports(SubProtocolIdentifier.of("les", 3))) + } + + @Test + @Throws(Exception::class) + fun noSupportForETH(@LuceneIndexWriter writer: IndexWriter) { + val sp = LESSubprotocol( + 1, + false, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + UInt256.ZERO, + BlockchainRepository( + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + MapKeyValueStore(), + BlockchainIndex(writer) + ) + ) + assertFalse(sp.supports(SubProtocolIdentifier.of("eth", 2))) + } +} diff --git a/les/src/test/kotlin/net/consensys/cava/les/MessagesTest.kt b/les/src/test/kotlin/net/consensys/cava/les/MessagesTest.kt new file mode 100644 index 00000000..c221c8cb --- /dev/null +++ b/les/src/test/kotlin/net/consensys/cava/les/MessagesTest.kt @@ -0,0 +1,226 @@ +/* + * Copyright 2019 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package net.consensys.cava.les + +import net.consensys.cava.bytes.Bytes +import net.consensys.cava.bytes.Bytes32 +import net.consensys.cava.crypto.SECP256K1 +import net.consensys.cava.eth.Address +import net.consensys.cava.eth.BlockBody +import net.consensys.cava.eth.BlockHeader +import net.consensys.cava.eth.Hash +import net.consensys.cava.eth.Log +import net.consensys.cava.eth.LogsBloomFilter +import net.consensys.cava.eth.Transaction +import net.consensys.cava.eth.TransactionReceipt +import net.consensys.cava.junit.BouncyCastleExtension +import net.consensys.cava.units.bigints.UInt256 +import net.consensys.cava.units.ethereum.Gas +import net.consensys.cava.units.ethereum.Wei +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.extension.ExtendWith +import java.time.Instant +import java.time.temporal.ChronoUnit + +@ExtendWith(BouncyCastleExtension::class) +internal class BlockBodiesMessageTest { + + @Test + fun roundtripRLP() { + val message = BlockBodiesMessage( + 3, + 2, + listOf( + BlockBody( + listOf( + Transaction( + UInt256.valueOf(1), + Wei.valueOf(2), + Gas.valueOf(2), + Address.fromBytes(Bytes.random(20)), + Wei.valueOf(2), + Bytes.random(12), + SECP256K1.KeyPair.random() + ) + ), + emptyList() + ) + ) + ) + val rlp = message.toBytes() + val read = BlockBodiesMessage.read(rlp) + assertEquals(message, read) + } +} + +@ExtendWith(BouncyCastleExtension::class) +internal class ReceiptsMessageTest { + + @Test + fun roundtripRLP() { + val logsList = listOf( + Log(Address.fromBytes(Bytes.random(20)), + Bytes.of(1, 2, 3), + listOf(Bytes32.random(), Bytes32.random(), Bytes32.random()) + ), + Log(Address.fromBytes(Bytes.random(20)), + Bytes.of(1, 2, 3), + listOf(Bytes32.random(), Bytes32.random(), Bytes32.random()) + ), + Log(Address.fromBytes(Bytes.random(20)), + Bytes.of(1, 2, 3), + listOf(Bytes32.random(), Bytes32.random(), Bytes32.random()) + ) + ) + val message = ReceiptsMessage( + 3, + 2, + listOf(listOf( + TransactionReceipt( + Bytes32.random(), 3, + LogsBloomFilter.compute(logsList + ), + logsList + ) + ) + ) + ) + val rlp = message.toBytes() + val read = ReceiptsMessage.read(rlp) + assertEquals(message, read) + } +} + +internal class BlockHeadersMessageTest { + + @Test + fun roundtripRLP() { + val header = BlockHeader( + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Address.fromBytes(Bytes.random(20)), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Bytes32.random(), + UInt256.fromBytes(Bytes32.random()), + UInt256.fromBytes(Bytes32.random()), + Gas.valueOf(3), + Gas.valueOf(2), + Instant.now().truncatedTo(ChronoUnit.SECONDS), + Bytes.of(2, 3, 4), + Hash.fromBytes(Bytes32.random()), + Bytes32.random() + ) + val message = BlockHeadersMessage(3L, 2L, listOf(header)) + val bytes = message.toBytes() + assertEquals(message, BlockHeadersMessage.read(bytes)) + } +} + +internal class GetBlockBodiesMessageTest { + + @Test + fun roundtripRLP() { + val message = GetBlockBodiesMessage( + 3, + listOf( + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()) + ) + ) + val rlp = message.toBytes() + val read = GetBlockBodiesMessage.read(rlp) + assertEquals(message, read) + } +} + +internal class GetReceiptsMessageTest { + + @Test + fun roundtripRLP() { + val message = GetReceiptsMessage( + 3, + listOf( + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()), + Hash.fromBytes(Bytes32.random()) + ) + ) + val rlp = message.toBytes() + val read = GetReceiptsMessage.read(rlp) + assertEquals(message, read) + } +} + +internal class GetBlockHeadersMessageTest { + + @Test + fun roundtripBytes() { + val message = GetBlockHeadersMessage( + 344, + listOf( + GetBlockHeadersMessage.BlockHeaderQuery( + Bytes32.random(), + UInt256.valueOf(32), + UInt256.valueOf(64), + GetBlockHeadersMessage.BlockHeaderQuery.Direction.BACKWARDS + ), + GetBlockHeadersMessage.BlockHeaderQuery( + Bytes32.random(), + UInt256.valueOf(32), + UInt256.valueOf(64), + GetBlockHeadersMessage.BlockHeaderQuery.Direction.FORWARD + ), + GetBlockHeadersMessage.BlockHeaderQuery( + Bytes32.random(), + UInt256.valueOf(32), + UInt256.valueOf(64), + GetBlockHeadersMessage.BlockHeaderQuery.Direction.BACKWARDS + ) + ) + ) + + val bytes = message.toBytes() + assertEquals(message, GetBlockHeadersMessage.read(bytes)) + } +} + +internal class StatusMessageTest { + + @Test + fun testStatusMessageRoundtrip() { + val message = StatusMessage( + 2, + 1, + UInt256.valueOf(23), + Bytes32.random(), + UInt256.valueOf(3443), + Bytes32.random(), + null, + UInt256.valueOf(333), + UInt256.valueOf(453), + true, + UInt256.valueOf(3), + UInt256.valueOf(4), + UInt256.valueOf(5), + 1 + ) + val read = StatusMessage.read(message.toBytes()) + assertEquals(message, read) + } +} diff --git a/settings.gradle b/settings.gradle index 562a3709..95394509 100644 --- a/settings.gradle +++ b/settings.gradle @@ -12,6 +12,7 @@ include 'io' include 'junit' include 'kademlia' include 'kv' +include 'les' include 'merkle-trie' include 'net' include 'net-coroutines'