Skip to content

Commit

Permalink
Parallel download of merge block
Browse files Browse the repository at this point in the history
  • Loading branch information
billettc committed Oct 2, 2023
1 parent bcf6f54 commit 7d6cc7f
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 81 deletions.
182 changes: 103 additions & 79 deletions accountresolver/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,27 @@ func NewProcessor(readerName string, cursor *Cursor, accountsResolver AccountsRe
}

func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100
startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 //This is the first block slot of the last merge block file
startBlockNum += 100 //This is the first block slot of the next merge block file
paddedBlockNum := fmt.Sprintf("%010d", startBlockNum)

p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum))

mergeBlocksFileChan := make(chan *mergeBlocksFile, 10)

go func() {
err := p.processMergeBlocksFiles(ctx, mergeBlocksFileChan, destinationStore, encoder)
panic(fmt.Errorf("processing merge blocks files: %w", err))
}()

err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error {
p.logger.Debug("processing merge block file", zap.String("filename", filename))
return p.processMergeBlocksFiles(ctx, filename, sourceStore, destinationStore, encoder)
mbf := newMergeBlocksFile(filename, p.logger)
err := mbf.process(ctx, sourceStore, p.cursor)
if err != nil {
return fmt.Errorf("processing merge block file %s: %w", mbf.filename, err)
}
mergeBlocksFileChan <- mbf
return err
})

if err != nil {
Expand All @@ -122,113 +135,124 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.S
return nil
}

func (p *Processor) processMergeBlocksFiles(ctx context.Context, filename string, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {
p.logger.Info("Processing merge block file", zap.String("filename", filename))
p.stats = &stats{
startProcessing: time.Now(),
type mergeBlocksFile struct {
filename string
blockChan chan *pbsol.Block
logger *zap.Logger
}

func newMergeBlocksFile(fileName string, logger *zap.Logger) *mergeBlocksFile {
return &mergeBlocksFile{
blockChan: make(chan *pbsol.Block),
logger: logger,
}
}

firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(filename, "0"))
func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, cursor *Cursor) error {
f.logger.Info("Processing merge block file", zap.String("filename", f.filename))
firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(f.filename, "0"))
if err != nil {
return fmt.Errorf("converting filename to block number: %w", err)
}

reader, err := sourceStore.OpenObject(ctx, filename)
reader, err := sourceStore.OpenObject(ctx, f.filename)
if err != nil {
return fmt.Errorf("opening merge block file %s: %w", filename, err)
return fmt.Errorf("opening merge block file %s: %w", f.filename, err)
}
defer reader.Close()

blockReader, err := bstream.GetBlockReaderFactory.New(reader)
if err != nil {
return fmt.Errorf("creating block reader for file %s: %w", filename, err)
return fmt.Errorf("creating block reader for file %s: %w", f.filename, err)
}

bundleReader := NewBundleReader(ctx, p.logger)
blockChan := make(chan *pbsol.Block, 100)

go func() {
start := time.Now()
for {
block, err := blockReader.Read()
if err != nil {
if err == io.EOF {
close(blockChan)
return
}
bundleReader.PushError(fmt.Errorf("reading block: %w", err))
return
}

blk := block.ToProtocol().(*pbsol.Block)
if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= p.cursor.slotNum {
p.logger.Debug("skip block", zap.Uint64("slot", blk.Slot))
continue
for {
block, err := blockReader.Read()
if err != nil {
if err == io.EOF {
close(f.blockChan)
return nil
}

blockChan <- blk
return fmt.Errorf("reading block: %w", err)
}
p.stats.totalBlockReadingDuration += time.Since(start)
}()

nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) {
b, err := encoder.Encode(blk)
if err != nil {
return nil, fmt.Errorf("encoding block: %w", err)
blk := block.ToProtocol().(*pbsol.Block)
if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= cursor.slotNum {
f.logger.Info("skip block", zap.Uint64("slot", blk.Slot))
continue
}
f.blockChan <- blk
}
}

return b, nil
})
nailer.Start(ctx)
func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error {

go func() {
for {
select {
case <-ctx.Done():
return
case blk, ok := <-blockChan:
if !ok {
nailer.Close()
p.stats = &stats{
startProcessing: time.Now(),
}

for mbf := range mergeBlocksFileChan {
bundleReader := NewBundleReader(ctx, p.logger)

nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) {
b, err := encoder.Encode(blk)
if err != nil {
return nil, fmt.Errorf("encoding block: %w", err)
}

return b, nil
})
nailer.Start(ctx)

mbf := mbf
go func() {
for {
select {
case <-ctx.Done():
return
case blk, ok := <-mbf.blockChan:
if !ok {
nailer.Close()
return
}

start := time.Now()
err := p.ProcessBlock(context.Background(), blk)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
return
}
p.stats.totalBlockProcessingDuration += time.Since(start)

nailer.Push(ctx, blk)
p.stats.totalBlockCount += 1
p.stats.totalBlockHandlingDuration += time.Since(start)
}

start := time.Now()
err := p.ProcessBlock(context.Background(), blk)
}
}()
go func() {
for bb := range nailer.Out {
err := bundleReader.PushBlock(bb)
if err != nil {
bundleReader.PushError(fmt.Errorf("processing block: %w", err))
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return
}
p.stats.totalBlockProcessingDuration += time.Since(start)

nailer.Push(ctx, blk)
p.stats.totalBlockCount += 1
p.stats.totalBlockHandlingDuration += time.Since(start)
}
bundleReader.Close()
}()
err := destinationStore.WriteObject(ctx, mbf.filename, bundleReader)
if err != nil {
return fmt.Errorf("writing bundle file: %w", err)
}
}()

go func() {
for bb := range nailer.Out {
err = bundleReader.PushBlock(bb)
if err != nil {
bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err))
return
}
//p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start)))
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err)
}
bundleReader.Close()
}()

err = destinationStore.WriteObject(ctx, filename, bundleReader)
if err != nil {
return fmt.Errorf("writing bundle file: %w", err)
}
//p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start)))
err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor)
if err != nil {
return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err)
p.stats.log(p.logger)
}

p.stats.log(p.logger)
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions bt/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"fmt"
"time"

"github.com/streamingfast/logging"

"cloud.google.com/go/bigtable"
pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
)

Expand Down

0 comments on commit 7d6cc7f

Please sign in to comment.