Skip to content

Commit

Permalink
ehancement: create a cache for block based subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
darrenvechain committed Oct 10, 2024
1 parent e8d420d commit 25f7515
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 147 deletions.
95 changes: 54 additions & 41 deletions api/subscriptions/beat2_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package subscriptions

import (
"bytes"
"encoding/json"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/vechain/thor/v2/chain"
Expand All @@ -17,22 +18,35 @@ import (
type beat2Reader struct {
repo *chain.Repository
blockReader chain.BlockReader
cache *messageCache
}

func newBeat2Reader(repo *chain.Repository, position thor.Bytes32) *beat2Reader {
func newBeat2Reader(repo *chain.Repository, position thor.Bytes32, cache *messageCache) *beat2Reader {
return &beat2Reader{
repo: repo,
blockReader: repo.NewBlockReader(position),
cache: cache,
}
}

func (br *beat2Reader) Read() ([]interface{}, bool, error) {
func (br *beat2Reader) Read() ([][]byte, bool, error) {
blocks, err := br.blockReader.Read()
if err != nil {
return nil, false, err
}
var msgs []interface{}
var msgs [][]byte

for _, block := range blocks {
msg, err := br.cache.GetOrAdd(block, br.repo)
if err != nil {
return nil, false, err
}
msgs = append(msgs, msg)
}
return msgs, len(blocks) > 0, nil
}

func generateBeat2Bytes(block *chain.ExtendedBlock, repo *chain.Repository) ([]byte, error) {
bloomGenerator := &bloom.Generator{}

bloomAdd := func(key []byte) {
Expand All @@ -43,48 +57,47 @@ func (br *beat2Reader) Read() ([]interface{}, bool, error) {
}
}

for _, block := range blocks {
header := block.Header()
receipts, err := br.repo.GetBlockReceipts(header.ID())
if err != nil {
return nil, false, err
}
txs := block.Transactions()
for i, receipt := range receipts {
bloomAdd(receipt.GasPayer.Bytes())
for _, output := range receipt.Outputs {
for _, event := range output.Events {
bloomAdd(event.Address.Bytes())
for _, topic := range event.Topics {
bloomAdd(topic.Bytes())
}
}
for _, transfer := range output.Transfers {
bloomAdd(transfer.Sender.Bytes())
bloomAdd(transfer.Recipient.Bytes())
header := block.Header()
receipts, err := repo.GetBlockReceipts(header.ID())
if err != nil {
return nil, err
}
txs := block.Transactions()
for i, receipt := range receipts {
bloomAdd(receipt.GasPayer.Bytes())
for _, output := range receipt.Outputs {
for _, event := range output.Events {
bloomAdd(event.Address.Bytes())
for _, topic := range event.Topics {
bloomAdd(topic.Bytes())
}
}
origin, _ := txs[i].Origin()
bloomAdd(origin.Bytes())
for _, transfer := range output.Transfers {
bloomAdd(transfer.Sender.Bytes())
bloomAdd(transfer.Recipient.Bytes())
}
}
signer, _ := header.Signer()
bloomAdd(signer.Bytes())
bloomAdd(header.Beneficiary().Bytes())
origin, _ := txs[i].Origin()
bloomAdd(origin.Bytes())
}
signer, _ := header.Signer()
bloomAdd(signer.Bytes())
bloomAdd(header.Beneficiary().Bytes())

const bitsPerKey = 20
filter := bloomGenerator.Generate(bitsPerKey, bloom.K(bitsPerKey))
const bitsPerKey = 20
filter := bloomGenerator.Generate(bitsPerKey, bloom.K(bitsPerKey))

msgs = append(msgs, &Beat2Message{
Number: header.Number(),
ID: header.ID(),
ParentID: header.ParentID(),
Timestamp: header.Timestamp(),
TxsFeatures: uint32(header.TxsFeatures()),
GasLimit: header.GasLimit(),
Bloom: hexutil.Encode(filter.Bits),
K: filter.K,
Obsolete: block.Obsolete,
})
beat2 := &Beat2Message{
Number: header.Number(),
ID: header.ID(),
ParentID: header.ParentID(),
Timestamp: header.Timestamp(),
TxsFeatures: uint32(header.TxsFeatures()),
GasLimit: header.GasLimit(),
Bloom: hexutil.Encode(filter.Bits),
K: filter.K,
Obsolete: block.Obsolete,
}
return msgs, len(blocks) > 0, nil

return json.Marshal(beat2)
}
32 changes: 20 additions & 12 deletions api/subscriptions/beat2_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,42 @@
package subscriptions

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/assert"
"github.com/vechain/thor/v2/thor"
)

func beat2Cache() *messageCache {
cache, _ := newMessageCache(generateBeat2Bytes, 10)
return cache
}

func TestBeat2Reader_Read(t *testing.T) {
// Arrange
repo, generatedBlocks, _ := initChain(t)
genesisBlk := generatedBlocks[0]
newBlock := generatedBlocks[1]

// Act
beatReader := newBeat2Reader(repo, genesisBlk.Header().ID())
beatReader := newBeat2Reader(repo, genesisBlk.Header().ID(), beat2Cache())
res, ok, err := beatReader.Read()

// Assert
assert.NoError(t, err)
assert.True(t, ok)
if beatMsg, ok := res[0].(*Beat2Message); !ok {
t.Fatal("unexpected type")
} else {
assert.Equal(t, newBlock.Header().Number(), beatMsg.Number)
assert.Equal(t, newBlock.Header().ID(), beatMsg.ID)
assert.Equal(t, newBlock.Header().ParentID(), beatMsg.ParentID)
assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp)
assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beatMsg.TxsFeatures)
}
beat := &Beat2Message{}
err = json.Unmarshal(res[0], beat)
assert.NoError(t, err)

assert.Equal(t, newBlock.Header().Number(), beat.Number)
assert.Equal(t, newBlock.Header().ID(), beat.ID)
assert.Equal(t, newBlock.Header().ParentID(), beat.ParentID)
assert.Equal(t, newBlock.Header().Timestamp(), beat.Timestamp)
assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beat.TxsFeatures)
// GasLimit is not part of the deprecated BeatMessage
assert.Equal(t, newBlock.Header().GasLimit(), beat.GasLimit)
}

func TestBeat2Reader_Read_NoNewBlocksToRead(t *testing.T) {
Expand All @@ -42,7 +50,7 @@ func TestBeat2Reader_Read_NoNewBlocksToRead(t *testing.T) {
newBlock := generatedBlocks[1]

// Act
beatReader := newBeat2Reader(repo, newBlock.Header().ID())
beatReader := newBeat2Reader(repo, newBlock.Header().ID(), beat2Cache())
res, ok, err := beatReader.Read()

// Assert
Expand All @@ -56,7 +64,7 @@ func TestBeat2Reader_Read_ErrorWhenReadingBlocks(t *testing.T) {
repo, _, _ := initChain(t)

// Act
beatReader := newBeat2Reader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
beatReader := newBeat2Reader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), beat2Cache())
res, ok, err := beatReader.Read()

// Assert
Expand Down
99 changes: 56 additions & 43 deletions api/subscriptions/beat_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package subscriptions

import (
"bytes"
"encoding/json"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/vechain/thor/v2/chain"
Expand All @@ -17,65 +18,29 @@ import (
type beatReader struct {
repo *chain.Repository
blockReader chain.BlockReader
cache *messageCache
}

func newBeatReader(repo *chain.Repository, position thor.Bytes32) *beatReader {
func newBeatReader(repo *chain.Repository, position thor.Bytes32, cache *messageCache) *beatReader {
return &beatReader{
repo: repo,
blockReader: repo.NewBlockReader(position),
cache: cache,
}
}

func (br *beatReader) Read() ([]interface{}, bool, error) {
func (br *beatReader) Read() ([][]byte, bool, error) {
blocks, err := br.blockReader.Read()
if err != nil {
return nil, false, err
}
var msgs []interface{}
var msgs [][]byte
for _, block := range blocks {
header := block.Header()
receipts, err := br.repo.GetBlockReceipts(header.ID())
msg, err := br.cache.GetOrAdd(block, br.repo)
if err != nil {
return nil, false, err
}
txs := block.Transactions()
bloomContent := &bloomContent{}
for i, receipt := range receipts {
bloomContent.add(receipt.GasPayer.Bytes())
for _, output := range receipt.Outputs {
for _, event := range output.Events {
bloomContent.add(event.Address.Bytes())
for _, topic := range event.Topics {
bloomContent.add(topic.Bytes())
}
}
for _, transfer := range output.Transfers {
bloomContent.add(transfer.Sender.Bytes())
bloomContent.add(transfer.Recipient.Bytes())
}
}
origin, _ := txs[i].Origin()
bloomContent.add(origin.Bytes())
}
signer, _ := header.Signer()
bloomContent.add(signer.Bytes())
bloomContent.add(header.Beneficiary().Bytes())

k := bloom.LegacyEstimateBloomK(bloomContent.len())
bloom := bloom.NewLegacyBloom(k)
for _, item := range bloomContent.items {
bloom.Add(item)
}
msgs = append(msgs, &BeatMessage{
Number: header.Number(),
ID: header.ID(),
ParentID: header.ParentID(),
Timestamp: header.Timestamp(),
TxsFeatures: uint32(header.TxsFeatures()),
Bloom: hexutil.Encode(bloom.Bits[:]),
K: uint32(k),
Obsolete: block.Obsolete,
})
msgs = append(msgs, msg)
}
return msgs, len(blocks) > 0, nil
}
Expand All @@ -91,3 +56,51 @@ func (bc *bloomContent) add(item []byte) {
func (bc *bloomContent) len() int {
return len(bc.items)
}

func generateBeatBytes(block *chain.ExtendedBlock, repo *chain.Repository) ([]byte, error) {
header := block.Header()
receipts, err := repo.GetBlockReceipts(header.ID())
if err != nil {
return nil, err
}
txs := block.Transactions()
content := &bloomContent{}
for i, receipt := range receipts {
content.add(receipt.GasPayer.Bytes())
for _, output := range receipt.Outputs {
for _, event := range output.Events {
content.add(event.Address.Bytes())
for _, topic := range event.Topics {
content.add(topic.Bytes())
}
}
for _, transfer := range output.Transfers {
content.add(transfer.Sender.Bytes())
content.add(transfer.Recipient.Bytes())
}
}
origin, _ := txs[i].Origin()
content.add(origin.Bytes())
}
signer, _ := header.Signer()
content.add(signer.Bytes())
content.add(header.Beneficiary().Bytes())

k := bloom.LegacyEstimateBloomK(content.len())
bloom := bloom.NewLegacyBloom(k)
for _, item := range content.items {
bloom.Add(item)
}
beat := &BeatMessage{
Number: header.Number(),
ID: header.ID(),
ParentID: header.ParentID(),
Timestamp: header.Timestamp(),
TxsFeatures: uint32(header.TxsFeatures()),
Bloom: hexutil.Encode(bloom.Bits[:]),
K: uint32(k),
Obsolete: block.Obsolete,
}

return json.Marshal(beat)
}
Loading

0 comments on commit 25f7515

Please sign in to comment.