diff --git a/api/subscriptions/beat2_reader.go b/api/subscriptions/beat2_reader.go index 9d1940538..e3da366b7 100644 --- a/api/subscriptions/beat2_reader.go +++ b/api/subscriptions/beat2_reader.go @@ -7,6 +7,7 @@ package subscriptions import ( "bytes" + "encoding/json" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/vechain/thor/v2/chain" @@ -17,22 +18,35 @@ import ( type beat2Reader struct { repo *chain.Repository blockReader chain.BlockReader + cache *messageCache } -func newBeat2Reader(repo *chain.Repository, position thor.Bytes32) *beat2Reader { +func newBeat2Reader(repo *chain.Repository, position thor.Bytes32, cache *messageCache) *beat2Reader { return &beat2Reader{ repo: repo, blockReader: repo.NewBlockReader(position), + cache: cache, } } -func (br *beat2Reader) Read() ([]interface{}, bool, error) { +func (br *beat2Reader) Read() ([][]byte, bool, error) { blocks, err := br.blockReader.Read() if err != nil { return nil, false, err } - var msgs []interface{} + var msgs [][]byte + for _, block := range blocks { + msg, err := br.cache.GetOrAdd(block, br.repo) + if err != nil { + return nil, false, err + } + msgs = append(msgs, msg) + } + return msgs, len(blocks) > 0, nil +} + +func generateBeat2Bytes(block *chain.ExtendedBlock, repo *chain.Repository) ([]byte, error) { bloomGenerator := &bloom.Generator{} bloomAdd := func(key []byte) { @@ -43,48 +57,47 @@ func (br *beat2Reader) Read() ([]interface{}, bool, error) { } } - for _, block := range blocks { - header := block.Header() - receipts, err := br.repo.GetBlockReceipts(header.ID()) - if err != nil { - return nil, false, err - } - txs := block.Transactions() - for i, receipt := range receipts { - bloomAdd(receipt.GasPayer.Bytes()) - for _, output := range receipt.Outputs { - for _, event := range output.Events { - bloomAdd(event.Address.Bytes()) - for _, topic := range event.Topics { - bloomAdd(topic.Bytes()) - } - } - for _, transfer := range output.Transfers { - bloomAdd(transfer.Sender.Bytes()) - bloomAdd(transfer.Recipient.Bytes()) + header := block.Header() + receipts, err := repo.GetBlockReceipts(header.ID()) + if err != nil { + return nil, err + } + txs := block.Transactions() + for i, receipt := range receipts { + bloomAdd(receipt.GasPayer.Bytes()) + for _, output := range receipt.Outputs { + for _, event := range output.Events { + bloomAdd(event.Address.Bytes()) + for _, topic := range event.Topics { + bloomAdd(topic.Bytes()) } } - origin, _ := txs[i].Origin() - bloomAdd(origin.Bytes()) + for _, transfer := range output.Transfers { + bloomAdd(transfer.Sender.Bytes()) + bloomAdd(transfer.Recipient.Bytes()) + } } - signer, _ := header.Signer() - bloomAdd(signer.Bytes()) - bloomAdd(header.Beneficiary().Bytes()) + origin, _ := txs[i].Origin() + bloomAdd(origin.Bytes()) + } + signer, _ := header.Signer() + bloomAdd(signer.Bytes()) + bloomAdd(header.Beneficiary().Bytes()) - const bitsPerKey = 20 - filter := bloomGenerator.Generate(bitsPerKey, bloom.K(bitsPerKey)) + const bitsPerKey = 20 + filter := bloomGenerator.Generate(bitsPerKey, bloom.K(bitsPerKey)) - msgs = append(msgs, &Beat2Message{ - Number: header.Number(), - ID: header.ID(), - ParentID: header.ParentID(), - Timestamp: header.Timestamp(), - TxsFeatures: uint32(header.TxsFeatures()), - GasLimit: header.GasLimit(), - Bloom: hexutil.Encode(filter.Bits), - K: filter.K, - Obsolete: block.Obsolete, - }) + beat2 := &Beat2Message{ + Number: header.Number(), + ID: header.ID(), + ParentID: header.ParentID(), + Timestamp: header.Timestamp(), + TxsFeatures: uint32(header.TxsFeatures()), + GasLimit: header.GasLimit(), + Bloom: hexutil.Encode(filter.Bits), + K: filter.K, + Obsolete: block.Obsolete, } - return msgs, len(blocks) > 0, nil + + return json.Marshal(beat2) } diff --git a/api/subscriptions/beat2_reader_test.go b/api/subscriptions/beat2_reader_test.go index dbeecbd7f..2cfa48b03 100644 --- a/api/subscriptions/beat2_reader_test.go +++ b/api/subscriptions/beat2_reader_test.go @@ -6,12 +6,18 @@ package subscriptions import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/vechain/thor/v2/thor" ) +func beat2Cache() *messageCache { + cache, _ := newMessageCache(generateBeat2Bytes, 10) + return cache +} + func TestBeat2Reader_Read(t *testing.T) { // Arrange repo, generatedBlocks, _ := initChain(t) @@ -19,21 +25,23 @@ func TestBeat2Reader_Read(t *testing.T) { newBlock := generatedBlocks[1] // Act - beatReader := newBeat2Reader(repo, genesisBlk.Header().ID()) + beatReader := newBeat2Reader(repo, genesisBlk.Header().ID(), beat2Cache()) res, ok, err := beatReader.Read() // Assert assert.NoError(t, err) assert.True(t, ok) - if beatMsg, ok := res[0].(*Beat2Message); !ok { - t.Fatal("unexpected type") - } else { - assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) - assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) - assert.Equal(t, newBlock.Header().ParentID(), beatMsg.ParentID) - assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp) - assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beatMsg.TxsFeatures) - } + beat := &Beat2Message{} + err = json.Unmarshal(res[0], beat) + assert.NoError(t, err) + + assert.Equal(t, newBlock.Header().Number(), beat.Number) + assert.Equal(t, newBlock.Header().ID(), beat.ID) + assert.Equal(t, newBlock.Header().ParentID(), beat.ParentID) + assert.Equal(t, newBlock.Header().Timestamp(), beat.Timestamp) + assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beat.TxsFeatures) + // GasLimit is not part of the deprecated BeatMessage + assert.Equal(t, newBlock.Header().GasLimit(), beat.GasLimit) } func TestBeat2Reader_Read_NoNewBlocksToRead(t *testing.T) { @@ -42,7 +50,7 @@ func TestBeat2Reader_Read_NoNewBlocksToRead(t *testing.T) { newBlock := generatedBlocks[1] // Act - beatReader := newBeat2Reader(repo, newBlock.Header().ID()) + beatReader := newBeat2Reader(repo, newBlock.Header().ID(), beat2Cache()) res, ok, err := beatReader.Read() // Assert @@ -56,7 +64,7 @@ func TestBeat2Reader_Read_ErrorWhenReadingBlocks(t *testing.T) { repo, _, _ := initChain(t) // Act - beatReader := newBeat2Reader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + beatReader := newBeat2Reader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), beat2Cache()) res, ok, err := beatReader.Read() // Assert diff --git a/api/subscriptions/beat_reader.go b/api/subscriptions/beat_reader.go index ed4e77147..de1f0689b 100644 --- a/api/subscriptions/beat_reader.go +++ b/api/subscriptions/beat_reader.go @@ -7,6 +7,7 @@ package subscriptions import ( "bytes" + "encoding/json" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/vechain/thor/v2/chain" @@ -17,65 +18,29 @@ import ( type beatReader struct { repo *chain.Repository blockReader chain.BlockReader + cache *messageCache } -func newBeatReader(repo *chain.Repository, position thor.Bytes32) *beatReader { +func newBeatReader(repo *chain.Repository, position thor.Bytes32, cache *messageCache) *beatReader { return &beatReader{ repo: repo, blockReader: repo.NewBlockReader(position), + cache: cache, } } -func (br *beatReader) Read() ([]interface{}, bool, error) { +func (br *beatReader) Read() ([][]byte, bool, error) { blocks, err := br.blockReader.Read() if err != nil { return nil, false, err } - var msgs []interface{} + var msgs [][]byte for _, block := range blocks { - header := block.Header() - receipts, err := br.repo.GetBlockReceipts(header.ID()) + msg, err := br.cache.GetOrAdd(block, br.repo) if err != nil { return nil, false, err } - txs := block.Transactions() - bloomContent := &bloomContent{} - for i, receipt := range receipts { - bloomContent.add(receipt.GasPayer.Bytes()) - for _, output := range receipt.Outputs { - for _, event := range output.Events { - bloomContent.add(event.Address.Bytes()) - for _, topic := range event.Topics { - bloomContent.add(topic.Bytes()) - } - } - for _, transfer := range output.Transfers { - bloomContent.add(transfer.Sender.Bytes()) - bloomContent.add(transfer.Recipient.Bytes()) - } - } - origin, _ := txs[i].Origin() - bloomContent.add(origin.Bytes()) - } - signer, _ := header.Signer() - bloomContent.add(signer.Bytes()) - bloomContent.add(header.Beneficiary().Bytes()) - - k := bloom.LegacyEstimateBloomK(bloomContent.len()) - bloom := bloom.NewLegacyBloom(k) - for _, item := range bloomContent.items { - bloom.Add(item) - } - msgs = append(msgs, &BeatMessage{ - Number: header.Number(), - ID: header.ID(), - ParentID: header.ParentID(), - Timestamp: header.Timestamp(), - TxsFeatures: uint32(header.TxsFeatures()), - Bloom: hexutil.Encode(bloom.Bits[:]), - K: uint32(k), - Obsolete: block.Obsolete, - }) + msgs = append(msgs, msg) } return msgs, len(blocks) > 0, nil } @@ -91,3 +56,51 @@ func (bc *bloomContent) add(item []byte) { func (bc *bloomContent) len() int { return len(bc.items) } + +func generateBeatBytes(block *chain.ExtendedBlock, repo *chain.Repository) ([]byte, error) { + header := block.Header() + receipts, err := repo.GetBlockReceipts(header.ID()) + if err != nil { + return nil, err + } + txs := block.Transactions() + content := &bloomContent{} + for i, receipt := range receipts { + content.add(receipt.GasPayer.Bytes()) + for _, output := range receipt.Outputs { + for _, event := range output.Events { + content.add(event.Address.Bytes()) + for _, topic := range event.Topics { + content.add(topic.Bytes()) + } + } + for _, transfer := range output.Transfers { + content.add(transfer.Sender.Bytes()) + content.add(transfer.Recipient.Bytes()) + } + } + origin, _ := txs[i].Origin() + content.add(origin.Bytes()) + } + signer, _ := header.Signer() + content.add(signer.Bytes()) + content.add(header.Beneficiary().Bytes()) + + k := bloom.LegacyEstimateBloomK(content.len()) + bloom := bloom.NewLegacyBloom(k) + for _, item := range content.items { + bloom.Add(item) + } + beat := &BeatMessage{ + Number: header.Number(), + ID: header.ID(), + ParentID: header.ParentID(), + Timestamp: header.Timestamp(), + TxsFeatures: uint32(header.TxsFeatures()), + Bloom: hexutil.Encode(bloom.Bits[:]), + K: uint32(k), + Obsolete: block.Obsolete, + } + + return json.Marshal(beat) +} diff --git a/api/subscriptions/beat_reader_test.go b/api/subscriptions/beat_reader_test.go index 6e6974af9..ad5d3c80d 100644 --- a/api/subscriptions/beat_reader_test.go +++ b/api/subscriptions/beat_reader_test.go @@ -6,12 +6,18 @@ package subscriptions import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/vechain/thor/v2/thor" ) +func beatCache() *messageCache { + cache, _ := newMessageCache(generateBeatBytes, 10) + return cache +} + func TestBeatReader_Read(t *testing.T) { // Arrange repo, generatedBlocks, _ := initChain(t) @@ -19,21 +25,21 @@ func TestBeatReader_Read(t *testing.T) { newBlock := generatedBlocks[1] // Act - beatReader := newBeatReader(repo, genesisBlk.Header().ID()) + beatReader := newBeatReader(repo, genesisBlk.Header().ID(), beatCache()) res, ok, err := beatReader.Read() // Assert assert.NoError(t, err) assert.True(t, ok) - if beatMsg, ok := res[0].(*BeatMessage); !ok { - t.Fatal("unexpected type") - } else { - assert.Equal(t, newBlock.Header().Number(), beatMsg.Number) - assert.Equal(t, newBlock.Header().ID(), beatMsg.ID) - assert.Equal(t, newBlock.Header().ParentID(), beatMsg.ParentID) - assert.Equal(t, newBlock.Header().Timestamp(), beatMsg.Timestamp) - assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beatMsg.TxsFeatures) - } + beat := &BeatMessage{} + err = json.Unmarshal(res[0], beat) + assert.NoError(t, err) + + assert.Equal(t, newBlock.Header().Number(), beat.Number) + assert.Equal(t, newBlock.Header().ID(), beat.ID) + assert.Equal(t, newBlock.Header().ParentID(), beat.ParentID) + assert.Equal(t, newBlock.Header().Timestamp(), beat.Timestamp) + assert.Equal(t, uint32(newBlock.Header().TxsFeatures()), beat.TxsFeatures) } func TestBeatReader_Read_NoNewBlocksToRead(t *testing.T) { @@ -42,7 +48,7 @@ func TestBeatReader_Read_NoNewBlocksToRead(t *testing.T) { newBlock := generatedBlocks[1] // Act - beatReader := newBeatReader(repo, newBlock.Header().ID()) + beatReader := newBeatReader(repo, newBlock.Header().ID(), beatCache()) res, ok, err := beatReader.Read() // Assert @@ -56,7 +62,7 @@ func TestBeatReader_Read_ErrorWhenReadingBlocks(t *testing.T) { repo, _, _ := initChain(t) // Act - beatReader := newBeatReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + beatReader := newBeatReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), beatCache()) res, ok, err := beatReader.Read() // Assert diff --git a/api/subscriptions/block_reader.go b/api/subscriptions/block_reader.go index 8c1d3ba50..f75473e5d 100644 --- a/api/subscriptions/block_reader.go +++ b/api/subscriptions/block_reader.go @@ -6,6 +6,8 @@ package subscriptions import ( + "encoding/json" + "github.com/vechain/thor/v2/chain" "github.com/vechain/thor/v2/thor" ) @@ -13,23 +15,25 @@ import ( type blockReader struct { repo *chain.Repository blockReader chain.BlockReader + cache *messageCache } -func newBlockReader(repo *chain.Repository, position thor.Bytes32) *blockReader { +func newBlockReader(repo *chain.Repository, position thor.Bytes32, cache *messageCache) *blockReader { return &blockReader{ repo: repo, blockReader: repo.NewBlockReader(position), + cache: cache, } } -func (br *blockReader) Read() ([]interface{}, bool, error) { +func (br *blockReader) Read() ([][]byte, bool, error) { blocks, err := br.blockReader.Read() if err != nil { return nil, false, err } - var msgs []interface{} + var msgs [][]byte for _, block := range blocks { - msg, err := convertBlock(block) + msg, err := br.cache.GetOrAdd(block, br.repo) if err != nil { return nil, false, err } @@ -37,3 +41,11 @@ func (br *blockReader) Read() ([]interface{}, bool, error) { } return msgs, len(blocks) > 0, nil } + +func generateBlockBytes(block *chain.ExtendedBlock, _ *chain.Repository) ([]byte, error) { + blk, err := convertBlock(block) + if err != nil { + return nil, err + } + return json.Marshal(blk) +} diff --git a/api/subscriptions/block_reader_test.go b/api/subscriptions/block_reader_test.go index f6db221e5..fd936dc8f 100644 --- a/api/subscriptions/block_reader_test.go +++ b/api/subscriptions/block_reader_test.go @@ -6,6 +6,7 @@ package subscriptions import ( + "encoding/json" "math/big" "testing" "time" @@ -23,26 +24,29 @@ import ( "github.com/vechain/thor/v2/txpool" ) +func blockCache() *messageCache { + cache, _ := newMessageCache(generateBlockBytes, 10) + return cache +} + func TestBlockReader_Read(t *testing.T) { repo, generatedBlocks, _ := initChain(t) genesisBlk := generatedBlocks[0] newBlock := generatedBlocks[1] // Test case 1: Successful read next blocks - br := newBlockReader(repo, genesisBlk.Header().ID()) + br := newBlockReader(repo, genesisBlk.Header().ID(), blockCache()) res, ok, err := br.Read() assert.NoError(t, err) assert.True(t, ok) - if resBlock, ok := res[0].(*BlockMessage); !ok { - t.Fatal("unexpected type") - } else { - assert.Equal(t, newBlock.Header().Number(), resBlock.Number) - assert.Equal(t, newBlock.Header().ParentID(), resBlock.ParentID) - } + resBlock := &BlockMessage{} + assert.NoError(t, json.Unmarshal(res[0], resBlock)) + assert.Equal(t, newBlock.Header().Number(), resBlock.Number) + assert.Equal(t, newBlock.Header().ParentID(), resBlock.ParentID) // Test case 2: There is no new block - br = newBlockReader(repo, newBlock.Header().ID()) + br = newBlockReader(repo, newBlock.Header().ID(), blockCache()) res, ok, err = br.Read() assert.NoError(t, err) @@ -50,7 +54,7 @@ func TestBlockReader_Read(t *testing.T) { assert.Empty(t, res) // Test case 3: Error when reading blocks - br = newBlockReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff")) + br = newBlockReader(repo, thor.MustParseBytes32("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff"), blockCache()) res, ok, err = br.Read() assert.Error(t, err) diff --git a/api/subscriptions/event_reader.go b/api/subscriptions/event_reader.go index 1329e8777..0b9b90295 100644 --- a/api/subscriptions/event_reader.go +++ b/api/subscriptions/event_reader.go @@ -6,6 +6,8 @@ package subscriptions import ( + "encoding/json" + "github.com/vechain/thor/v2/chain" "github.com/vechain/thor/v2/thor" ) @@ -24,12 +26,12 @@ func newEventReader(repo *chain.Repository, position thor.Bytes32, filter *Event } } -func (er *eventReader) Read() ([]interface{}, bool, error) { +func (er *eventReader) Read() ([][]byte, bool, error) { blocks, err := er.blockReader.Read() if err != nil { return nil, false, err } - var msgs []interface{} + var msgs [][]byte for _, block := range blocks { receipts, err := er.repo.GetBlockReceipts(block.Header().ID()) if err != nil { @@ -44,7 +46,11 @@ func (er *eventReader) Read() ([]interface{}, bool, error) { if err != nil { return nil, false, err } - msgs = append(msgs, msg) + bytes, err := json.Marshal(msg) + if err != nil { + return nil, false, err + } + msgs = append(msgs, bytes) } } } diff --git a/api/subscriptions/event_reader_test.go b/api/subscriptions/event_reader_test.go index b1fe280ba..e7c36b7f0 100644 --- a/api/subscriptions/event_reader_test.go +++ b/api/subscriptions/event_reader_test.go @@ -6,6 +6,7 @@ package subscriptions import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -38,11 +39,9 @@ func TestEventReader_Read(t *testing.T) { assert.True(t, ok) var eventMessages []*EventMessage for _, event := range events { - if msg, ok := event.(*EventMessage); ok { - eventMessages = append(eventMessages, msg) - } else { - t.Fatal("unexpected type") - } + eventMsg := &EventMessage{} + assert.NoError(t, json.Unmarshal(event, eventMsg)) + eventMessages = append(eventMessages, eventMsg) } assert.Equal(t, 1, len(eventMessages)) eventMsg := eventMessages[0] diff --git a/api/subscriptions/message_cache.go b/api/subscriptions/message_cache.go new file mode 100644 index 000000000..5ffef7621 --- /dev/null +++ b/api/subscriptions/message_cache.go @@ -0,0 +1,50 @@ +package subscriptions + +import ( + "sync" + + lru "github.com/hashicorp/golang-lru" + "github.com/vechain/thor/v2/chain" +) + +type messageHandler = func(*chain.ExtendedBlock, *chain.Repository) ([]byte, error) + +type messageCache struct { + cache *lru.Cache + generator messageHandler + mu sync.RWMutex +} + +func newMessageCache(handler messageHandler, cacheSize int) (*messageCache, error) { + cache, err := lru.New(cacheSize) + return &messageCache{ + cache: cache, + generator: handler, + }, err +} + +// GetOrAdd can be called by thousands of goroutines concurrently. The first goroutine that invokes it for a specific +// block will generate the message and store it in the cache. Subsequent goroutines will read the message from the cache. +func (mc *messageCache) GetOrAdd(block *chain.ExtendedBlock, repo *chain.Repository) ([]byte, error) { + blockID := block.Header().ID().String() + mc.mu.RLock() + msg, ok := mc.cache.Get(blockID) + mc.mu.RUnlock() + if ok { + return msg.([]byte), nil + } + + mc.mu.Lock() + defer mc.mu.Unlock() + msg, ok = mc.cache.Get(blockID) + if ok { + return msg.([]byte), nil + } + + msg, err := mc.generator(block, repo) + if err != nil { + return nil, err + } + mc.cache.Add(blockID, msg) + return msg.([]byte), nil +} diff --git a/api/subscriptions/subscriptions.go b/api/subscriptions/subscriptions.go index 73f895bf3..5b32a5e0a 100644 --- a/api/subscriptions/subscriptions.go +++ b/api/subscriptions/subscriptions.go @@ -31,10 +31,13 @@ type Subscriptions struct { pendingTx *pendingTx done chan struct{} wg sync.WaitGroup + beat2Cache *messageCache + beatCache *messageCache + blockCache *messageCache } type msgReader interface { - Read() (msgs []interface{}, hasMore bool, err error) + Read() (msgs [][]byte, hasMore bool, err error) } var ( @@ -49,6 +52,11 @@ const ( ) func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32, txpool *txpool.TxPool) *Subscriptions { + cacheSize := backtraceLimit * 120 / 100 + beat2Cache, _ := newMessageCache(generateBeat2Bytes, int(cacheSize)) + beatCache, _ := newMessageCache(generateBeatBytes, int(cacheSize)) + blockCache, _ := newMessageCache(generateBlockBytes, int(cacheSize)) + sub := &Subscriptions{ backtraceLimit: backtraceLimit, repo: repo, @@ -67,8 +75,11 @@ func New(repo *chain.Repository, allowedOrigins []string, backtraceLimit uint32, return false }, }, - pendingTx: newPendingTx(txpool), - done: make(chan struct{}), + pendingTx: newPendingTx(txpool), + done: make(chan struct{}), + beat2Cache: beat2Cache, + beatCache: beatCache, + blockCache: blockCache, } sub.wg.Add(1) @@ -85,7 +96,7 @@ func (s *Subscriptions) handleBlockReader(_ http.ResponseWriter, req *http.Reque if err != nil { return nil, err } - return newBlockReader(s.repo, position), nil + return newBlockReader(s.repo, position, s.blockCache), nil } func (s *Subscriptions) handleEventReader(w http.ResponseWriter, req *http.Request) (*eventReader, error) { @@ -158,7 +169,7 @@ func (s *Subscriptions) handleBeatReader(w http.ResponseWriter, req *http.Reques if err != nil { return nil, err } - return newBeatReader(s.repo, position), nil + return newBeatReader(s.repo, position, s.beatCache), nil } func (s *Subscriptions) handleBeat2Reader(w http.ResponseWriter, req *http.Request) (*beat2Reader, error) { @@ -166,7 +177,7 @@ func (s *Subscriptions) handleBeat2Reader(w http.ResponseWriter, req *http.Reque if err != nil { return nil, err } - return newBeat2Reader(s.repo, position), nil + return newBeat2Reader(s.repo, position, s.beat2Cache), nil } func (s *Subscriptions) handleSubject(w http.ResponseWriter, req *http.Request) error { @@ -308,7 +319,7 @@ func (s *Subscriptions) pipe(conn *websocket.Conn, reader msgReader, closed chan return err } for _, msg := range msgs { - if err := conn.WriteJSON(msg); err != nil { + if err := conn.WriteMessage(websocket.BinaryMessage, msg); err != nil { return err } } diff --git a/api/subscriptions/transfer_reader.go b/api/subscriptions/transfer_reader.go index d5a0315f1..d2b5c89b6 100644 --- a/api/subscriptions/transfer_reader.go +++ b/api/subscriptions/transfer_reader.go @@ -6,6 +6,8 @@ package subscriptions import ( + "encoding/json" + "github.com/vechain/thor/v2/chain" "github.com/vechain/thor/v2/thor" ) @@ -24,12 +26,12 @@ func newTransferReader(repo *chain.Repository, position thor.Bytes32, filter *Tr } } -func (tr *transferReader) Read() ([]interface{}, bool, error) { +func (tr *transferReader) Read() ([][]byte, bool, error) { blocks, err := tr.blockReader.Read() if err != nil { return nil, false, err } - var msgs []interface{} + var msgs [][]byte for _, block := range blocks { receipts, err := tr.repo.GetBlockReceipts(block.Header().ID()) if err != nil { @@ -48,7 +50,11 @@ func (tr *transferReader) Read() ([]interface{}, bool, error) { if err != nil { return nil, false, err } - msgs = append(msgs, msg) + bytes, err := json.Marshal(msg) + if err != nil { + return nil, false, err + } + msgs = append(msgs, bytes) } } } diff --git a/api/subscriptions/transfer_reader_test.go b/api/subscriptions/transfer_reader_test.go index 8bef2175c..013659c66 100644 --- a/api/subscriptions/transfer_reader_test.go +++ b/api/subscriptions/transfer_reader_test.go @@ -6,6 +6,7 @@ package subscriptions import ( + "encoding/json" "testing" "github.com/stretchr/testify/assert" @@ -26,14 +27,13 @@ func TestTransferReader_Read(t *testing.T) { // Assert assert.NoError(t, err) assert.True(t, ok) - if transferMsg, ok := res[0].(*TransferMessage); !ok { - t.Fatal("unexpected type") - } else { - assert.Equal(t, newBlock.Header().Number(), transferMsg.Meta.BlockNumber) - assert.Equal(t, newBlock.Header().ID(), transferMsg.Meta.BlockID) - assert.Equal(t, newBlock.Header().Timestamp(), transferMsg.Meta.BlockTimestamp) - assert.Equal(t, newBlock.Transactions()[0].ID(), transferMsg.Meta.TxID) - } + transferMsg := &TransferMessage{} + assert.NoError(t, json.Unmarshal(res[0], transferMsg)) + + assert.Equal(t, newBlock.Header().Number(), transferMsg.Meta.BlockNumber) + assert.Equal(t, newBlock.Header().ID(), transferMsg.Meta.BlockID) + assert.Equal(t, newBlock.Header().Timestamp(), transferMsg.Meta.BlockTimestamp) + assert.Equal(t, newBlock.Transactions()[0].ID(), transferMsg.Meta.TxID) } func TestTransferReader_Read_NoNewBlocksToRead(t *testing.T) {