From 7d6cc7f2bf396eed2f179cf6356ddff4cb31da03 Mon Sep 17 00:00:00 2001 From: billettc Date: Mon, 2 Oct 2023 07:45:44 -0400 Subject: [PATCH] Parallel download of merge block --- accountresolver/processor.go | 182 ++++++++++++++++++++--------------- bt/core.go | 3 +- 2 files changed, 104 insertions(+), 81 deletions(-) diff --git a/accountresolver/processor.go b/accountresolver/processor.go index 654461d6..9ef2244f 100644 --- a/accountresolver/processor.go +++ b/accountresolver/processor.go @@ -103,14 +103,27 @@ func NewProcessor(readerName string, cursor *Cursor, accountsResolver AccountsRe } func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { - startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 + startBlockNum := p.cursor.slotNum - p.cursor.slotNum%100 //This is the first block slot of the last merge block file + startBlockNum += 100 //This is the first block slot of the next merge block file paddedBlockNum := fmt.Sprintf("%010d", startBlockNum) p.logger.Info("Processing merge blocks", zap.Uint64("cursor_block_num", p.cursor.slotNum), zap.String("first_merge_filename", paddedBlockNum)) + mergeBlocksFileChan := make(chan *mergeBlocksFile, 10) + + go func() { + err := p.processMergeBlocksFiles(ctx, mergeBlocksFileChan, destinationStore, encoder) + panic(fmt.Errorf("processing merge blocks files: %w", err)) + }() + err := sourceStore.WalkFrom(ctx, "", paddedBlockNum, func(filename string) error { - p.logger.Debug("processing merge block file", zap.String("filename", filename)) - return p.processMergeBlocksFiles(ctx, filename, sourceStore, destinationStore, encoder) + mbf := newMergeBlocksFile(filename, p.logger) + err := mbf.process(ctx, sourceStore, p.cursor) + if err != nil { + return fmt.Errorf("processing merge block file %s: %w", mbf.filename, err) + } + mergeBlocksFileChan <- mbf + return err }) if err != nil { @@ -122,113 +135,124 @@ func (p *Processor) ProcessMergeBlocks(ctx context.Context, sourceStore dstore.S return nil } -func (p *Processor) processMergeBlocksFiles(ctx context.Context, filename string, sourceStore dstore.Store, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { - p.logger.Info("Processing merge block file", zap.String("filename", filename)) - p.stats = &stats{ - startProcessing: time.Now(), +type mergeBlocksFile struct { + filename string + blockChan chan *pbsol.Block + logger *zap.Logger +} + +func newMergeBlocksFile(fileName string, logger *zap.Logger) *mergeBlocksFile { + return &mergeBlocksFile{ + blockChan: make(chan *pbsol.Block), + logger: logger, } +} - firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(filename, "0")) +func (f *mergeBlocksFile) process(ctx context.Context, sourceStore dstore.Store, cursor *Cursor) error { + f.logger.Info("Processing merge block file", zap.String("filename", f.filename)) + firstBlockOfFile, err := strconv.Atoi(strings.TrimLeft(f.filename, "0")) if err != nil { return fmt.Errorf("converting filename to block number: %w", err) } - reader, err := sourceStore.OpenObject(ctx, filename) + reader, err := sourceStore.OpenObject(ctx, f.filename) if err != nil { - return fmt.Errorf("opening merge block file %s: %w", filename, err) + return fmt.Errorf("opening merge block file %s: %w", f.filename, err) } defer reader.Close() blockReader, err := bstream.GetBlockReaderFactory.New(reader) if err != nil { - return fmt.Errorf("creating block reader for file %s: %w", filename, err) + return fmt.Errorf("creating block reader for file %s: %w", f.filename, err) } - bundleReader := NewBundleReader(ctx, p.logger) - blockChan := make(chan *pbsol.Block, 100) - - go func() { - start := time.Now() - for { - block, err := blockReader.Read() - if err != nil { - if err == io.EOF { - close(blockChan) - return - } - bundleReader.PushError(fmt.Errorf("reading block: %w", err)) - return - } - - blk := block.ToProtocol().(*pbsol.Block) - if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= p.cursor.slotNum { - p.logger.Debug("skip block", zap.Uint64("slot", blk.Slot)) - continue + for { + block, err := blockReader.Read() + if err != nil { + if err == io.EOF { + close(f.blockChan) + return nil } - - blockChan <- blk + return fmt.Errorf("reading block: %w", err) } - p.stats.totalBlockReadingDuration += time.Since(start) - }() - nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) { - b, err := encoder.Encode(blk) - if err != nil { - return nil, fmt.Errorf("encoding block: %w", err) + blk := block.ToProtocol().(*pbsol.Block) + if blk.Slot < uint64(firstBlockOfFile) || blk.Slot <= cursor.slotNum { + f.logger.Info("skip block", zap.Uint64("slot", blk.Slot)) + continue } + f.blockChan <- blk + } +} - return b, nil - }) - nailer.Start(ctx) +func (p *Processor) processMergeBlocksFiles(ctx context.Context, mergeBlocksFileChan chan *mergeBlocksFile, destinationStore dstore.Store, encoder firecore.BlockEncoder) error { - go func() { - for { - select { - case <-ctx.Done(): - return - case blk, ok := <-blockChan: - if !ok { - nailer.Close() + p.stats = &stats{ + startProcessing: time.Now(), + } + + for mbf := range mergeBlocksFileChan { + bundleReader := NewBundleReader(ctx, p.logger) + + nailer := dhammer.NewNailer(50, func(ctx context.Context, blk *pbsol.Block) (*bstream.Block, error) { + b, err := encoder.Encode(blk) + if err != nil { + return nil, fmt.Errorf("encoding block: %w", err) + } + + return b, nil + }) + nailer.Start(ctx) + + mbf := mbf + go func() { + for { + select { + case <-ctx.Done(): return + case blk, ok := <-mbf.blockChan: + if !ok { + nailer.Close() + return + } + + start := time.Now() + err := p.ProcessBlock(context.Background(), blk) + if err != nil { + bundleReader.PushError(fmt.Errorf("processing block: %w", err)) + return + } + p.stats.totalBlockProcessingDuration += time.Since(start) + + nailer.Push(ctx, blk) + p.stats.totalBlockCount += 1 + p.stats.totalBlockHandlingDuration += time.Since(start) } - - start := time.Now() - err := p.ProcessBlock(context.Background(), blk) + } + }() + go func() { + for bb := range nailer.Out { + err := bundleReader.PushBlock(bb) if err != nil { - bundleReader.PushError(fmt.Errorf("processing block: %w", err)) + bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) return } - p.stats.totalBlockProcessingDuration += time.Since(start) - - nailer.Push(ctx, blk) - p.stats.totalBlockCount += 1 - p.stats.totalBlockHandlingDuration += time.Since(start) } + bundleReader.Close() + }() + err := destinationStore.WriteObject(ctx, mbf.filename, bundleReader) + if err != nil { + return fmt.Errorf("writing bundle file: %w", err) } - }() - - go func() { - for bb := range nailer.Out { - err = bundleReader.PushBlock(bb) - if err != nil { - bundleReader.PushError(fmt.Errorf("pushing block to bundle reader: %w", err)) - return - } + //p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start))) + err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor) + if err != nil { + return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err) } - bundleReader.Close() - }() - err = destinationStore.WriteObject(ctx, filename, bundleReader) - if err != nil { - return fmt.Errorf("writing bundle file: %w", err) - } - //p.logger.Info("new merge blocks file written:", zap.String("filename", filename), zap.Duration("duration", time.Since(start))) - err = p.accountsResolver.StoreCursor(ctx, p.readerName, p.cursor) - if err != nil { - return fmt.Errorf("storing cursor at block %d: %w", p.cursor.slotNum, err) + p.stats.log(p.logger) } - p.stats.log(p.logger) return nil } diff --git a/bt/core.go b/bt/core.go index 08c278c8..7ce9af52 100644 --- a/bt/core.go +++ b/bt/core.go @@ -5,10 +5,9 @@ import ( "fmt" "time" - "github.com/streamingfast/logging" - "cloud.google.com/go/bigtable" pbsolv1 "github.com/streamingfast/firehose-solana/pb/sf/solana/type/v1" + "github.com/streamingfast/logging" "go.uber.org/zap" )