From d024e4c820349a6fb70f3e424703b1614202e127 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Tue, 13 Aug 2024 12:00:34 +0300 Subject: [PATCH 1/3] Improve `importer` utility with buffered reading (#1460) * Improve 'blocksReader' with buffered reading. * Move buf size constant to common constants. * Improve 'snapshotsReader' with buffered reading. --- pkg/importer/blocks_importer.go | 4 ++- pkg/importer/blocks_reader.go | 45 ++++++++++++++++----------- pkg/importer/importer.go | 2 ++ pkg/importer/snapshots_importer.go | 12 +++++--- pkg/importer/snapshots_reader.go | 49 ++++++++++++++++++------------ 5 files changed, 70 insertions(+), 42 deletions(-) diff --git a/pkg/importer/blocks_importer.go b/pkg/importer/blocks_importer.go index 74c987e02..c5aa26176 100644 --- a/pkg/importer/blocks_importer.go +++ b/pkg/importer/blocks_importer.go @@ -43,7 +43,9 @@ func (imp *BlocksImporter) SkipToHeight(ctx context.Context, height proto.Height return fmt.Errorf("failed to skip to height %d: %w", height, err) } imp.reg.updateTotalSize(size) - imp.br.skip(size) + if skipErr := imp.br.skip(size); skipErr != nil { + return fmt.Errorf("failed to skip to height %d: %w", height, skipErr) + } imp.h++ } } diff --git a/pkg/importer/blocks_reader.go b/pkg/importer/blocks_reader.go index b2e4c608f..ddc63530c 100644 --- a/pkg/importer/blocks_reader.go +++ b/pkg/importer/blocks_reader.go @@ -1,16 +1,18 @@ package importer import ( + "bufio" "encoding/binary" - "errors" "fmt" + "io" "os" "path/filepath" ) type blocksReader struct { - f *os.File - pos int64 + r *bufio.Reader + pos int + closeFun func() error } func newBlocksReader(blockchainPath string) (*blocksReader, error) { @@ -18,37 +20,44 @@ func newBlocksReader(blockchainPath string) (*blocksReader, error) { if err != nil { return nil, fmt.Errorf("failed to open blocks file: %w", err) } - return &blocksReader{f: f, pos: 0}, nil + r := bufio.NewReaderSize(f, bufioReaderBuffSize) + return &blocksReader{r: r, pos: 0, closeFun: f.Close}, nil } func (br *blocksReader) readSize() (uint32, error) { - buf := make([]byte, uint32Size) - n, err := br.f.ReadAt(buf, br.pos) + var buf [uint32Size]byte + pos := br.pos + n, err := io.ReadFull(br.r, buf[:]) if err != nil { - return 0, fmt.Errorf("failed to read block size: %w", err) + return 0, fmt.Errorf("failed to read block size at pos %d: %w", pos, err) } - br.pos += int64(n) - size := binary.BigEndian.Uint32(buf) + br.pos += n + size := binary.BigEndian.Uint32(buf[:]) if size > MaxBlockSize || size == 0 { - return 0, errors.New("corrupted blockchain file: invalid block size") + return 0, fmt.Errorf("corrupted blockchain file: invalid block size %d at pos %d", size, pos) } return size, nil } -func (br *blocksReader) skip(size uint32) { - br.pos += int64(size) -} - func (br *blocksReader) readBlock(size uint32) ([]byte, error) { buf := make([]byte, size) - n, err := br.f.ReadAt(buf, br.pos) + n, err := io.ReadFull(br.r, buf) if err != nil { - return nil, fmt.Errorf("failed to read block: %w", err) + return nil, fmt.Errorf("failed to read block at pos %d: %w", br.pos, err) } - br.pos += int64(n) + br.pos += n return buf, nil } +func (br *blocksReader) skip(size uint32) error { + n, err := br.r.Discard(int(size)) + if err != nil { + return fmt.Errorf("failed to skip at pos %d: %w", br.pos, err) + } + br.pos += n + return nil +} + func (br *blocksReader) close() error { - return br.f.Close() + return br.closeFun() } diff --git a/pkg/importer/importer.go b/pkg/importer/importer.go index 5ff57427b..f8af30270 100644 --- a/pkg/importer/importer.go +++ b/pkg/importer/importer.go @@ -23,6 +23,8 @@ const ( MaxTotalBatchSize = 20 * MiB MaxBlocksBatchSize = 50000 MaxBlockSize = 2 * MiB + + bufioReaderBuffSize = 64 * KiB // 64 KiB buffer for bufio.Reader ) type State interface { diff --git a/pkg/importer/snapshots_importer.go b/pkg/importer/snapshots_importer.go index 527cd6448..42a03f1b6 100644 --- a/pkg/importer/snapshots_importer.go +++ b/pkg/importer/snapshots_importer.go @@ -45,15 +45,19 @@ func (imp *SnapshotsImporter) SkipToHeight(ctx context.Context, height proto.Hei } size, err := imp.br.readSize() if err != nil { - return fmt.Errorf("failed to skip to height %d: %w", height, err) + return fmt.Errorf("failed to skip blocks to height %d: %w", height, err) } imp.reg.updateTotalSize(size) - imp.br.skip(size) + if skipErr := imp.br.skip(size); skipErr != nil { + return fmt.Errorf("failed to skip blocks to height %d: %w", height, skipErr) + } size, err = imp.sr.readSize() if err != nil { - return fmt.Errorf("failed to skip to height %d: %w", height, err) + return fmt.Errorf("failed to skip snapshots to height %d: %w", height, err) + } + if skipErr := imp.sr.skip(size); skipErr != nil { + return fmt.Errorf("failed to snapshots skip to height %d: %w", height, skipErr) } - imp.sr.skip(size) imp.h++ } } diff --git a/pkg/importer/snapshots_reader.go b/pkg/importer/snapshots_reader.go index 3879adeba..8976f294a 100644 --- a/pkg/importer/snapshots_reader.go +++ b/pkg/importer/snapshots_reader.go @@ -1,8 +1,10 @@ package importer import ( + "bufio" "encoding/binary" "fmt" + "io" "os" "path/filepath" @@ -10,9 +12,10 @@ import ( ) type snapshotsReader struct { - scheme proto.Scheme - f *os.File - pos int64 + scheme proto.Scheme + r *bufio.Reader + pos int + closeFun func() error } func newSnapshotsReader(scheme proto.Scheme, snapshotsPath string) (*snapshotsReader, error) { @@ -20,46 +23,54 @@ func newSnapshotsReader(scheme proto.Scheme, snapshotsPath string) (*snapshotsRe if err != nil { return nil, fmt.Errorf("failed to open snapshots file: %w", err) } - return &snapshotsReader{scheme: scheme, f: f, pos: 0}, nil + r := bufio.NewReaderSize(f, bufioReaderBuffSize) + return &snapshotsReader{scheme: scheme, r: r, pos: 0, closeFun: f.Close}, nil } func (sr *snapshotsReader) readSize() (uint32, error) { const sanityMaxBlockSnapshotSize = 100 * MiB - buf := make([]byte, uint32Size) - n, err := sr.f.ReadAt(buf, sr.pos) + var buf [uint32Size]byte + pos := sr.pos + n, err := io.ReadFull(sr.r, buf[:]) if err != nil { - return 0, fmt.Errorf("failed to read block snapshot size: %w", err) + return 0, fmt.Errorf("failed to read block snapshot size at pos %d: %w", pos, err) } - sr.pos += int64(n) - size := binary.BigEndian.Uint32(buf) - if size > sanityMaxBlockSnapshotSize { // dont check for 0 size because it is valid - return 0, fmt.Errorf("block snapshot size %d is too big", size) + sr.pos += n + size := binary.BigEndian.Uint32(buf[:]) + if size > sanityMaxBlockSnapshotSize { // don't check for 0 size because it is valid + return 0, fmt.Errorf("block snapshot size %d is too big at pos %d", size, pos) } return size, nil } -func (sr *snapshotsReader) skip(size uint32) { - sr.pos += int64(size) +func (sr *snapshotsReader) skip(size uint32) error { + n, err := sr.r.Discard(int(size)) + if err != nil { + return fmt.Errorf("failed to skip at pos %d: %w", sr.pos, err) + } + sr.pos += n + return nil } func (sr *snapshotsReader) readSnapshot() (*proto.BlockSnapshot, error) { size, sErr := sr.readSize() if sErr != nil { - return nil, sErr + return nil, fmt.Errorf("failed to read snapshot size: %w", sErr) } + pos := sr.pos buf := make([]byte, size) - n, rErr := sr.f.ReadAt(buf, sr.pos) + n, rErr := io.ReadFull(sr.r, buf) if rErr != nil { - return nil, fmt.Errorf("failed to read snapshot: %w", rErr) + return nil, fmt.Errorf("failed to read snapshot at pos %d: %w", pos, rErr) } - sr.pos += int64(n) + sr.pos += n snapshot := &proto.BlockSnapshot{} if err := snapshot.UnmarshalBinaryImport(buf, sr.scheme); err != nil { - return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err) + return nil, fmt.Errorf("failed to unmarshal snapshot at pos %d: %w", pos, err) } return snapshot, nil } func (sr *snapshotsReader) close() error { - return sr.f.Close() + return sr.closeFun() } From db46fb749e023116cc855854474f603e3dd4be23 Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Tue, 13 Aug 2024 12:54:45 +0300 Subject: [PATCH 2/3] Add `disable-bloom` flag for utilities (#1461) * Add 'disable-bloom' bool flag for importer utility. * Add 'disable-bloom' bool flag for statehash utility. * Add 'disable-bloom' bool flag for rollback utility. --------- Co-authored-by: Alexey Kiselev --- cmd/importer/importer.go | 4 ++++ cmd/rollback/main.go | 4 +++- cmd/statehash/statehash.go | 23 +++++++++++++---------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/cmd/importer/importer.go b/cmd/importer/importer.go index 9255bc53e..5f15c4723 100644 --- a/cmd/importer/importer.go +++ b/cmd/importer/importer.go @@ -55,6 +55,7 @@ type cfg struct { snapshotsPath string cpuProfilePath string memProfilePath string + disableBloomFilter bool } func parseFlags() (cfg, error) { @@ -88,6 +89,8 @@ func parseFlags() (cfg, error) { // Debug. flag.StringVar(&c.cpuProfilePath, "cpuprofile", "", "Write cpu profile to this file.") flag.StringVar(&c.memProfilePath, "memprofile", "", "Write memory profile to this file.") + flag.BoolVar(&c.disableBloomFilter, "disable-bloom", false, + "Disable bloom filter. Less memory usage, but decrease performance.") flag.Parse() if c.blockchainPath == "" { @@ -109,6 +112,7 @@ func (c *cfg) params(maxFDs int) state.StateParams { params.StorageParams.DbParams.OpenFilesCacheCapacity = maxFDs - clearance params.VerificationGoroutinesNum = c.verificationGoroutinesNum params.DbParams.WriteBuffer = c.writeBufferSize * MiB + params.DbParams.BloomFilterParams.Disable = c.disableBloomFilter params.StoreExtendedApiData = c.buildDataForExtendedAPI params.BuildStateHashes = c.buildStateHashes params.ProvideExtendedApi = false // We do not need to provide any APIs during import. diff --git a/cmd/rollback/main.go b/cmd/rollback/main.go index 4116e6335..dbb75ec53 100644 --- a/cmd/rollback/main.go +++ b/cmd/rollback/main.go @@ -28,7 +28,8 @@ func main() { "Note that state must be re-imported in case it wasn't imported with similar flag set") buildStateHashes = flag.Bool("build-state-hashes", false, "Calculate and store state hashes for each block height.") - cfgPath = flag.String("cfg-path", "", "Path to configuration JSON file, only for custom blockchain.") + cfgPath = flag.String("cfg-path", "", "Path to configuration JSON file, only for custom blockchain.") + disableBloomFilter = flag.Bool("disable-bloom", false, "Disable bloom filter for state.") ) flag.Parse() @@ -72,6 +73,7 @@ func main() { params := state.DefaultStateParams() params.StorageParams.DbParams.OpenFilesCacheCapacity = int(maxFDs - 10) + params.DbParams.BloomFilterParams.Disable = *disableBloomFilter params.BuildStateHashes = *buildStateHashes params.StoreExtendedApiData = *buildExtendedAPI diff --git a/cmd/statehash/statehash.go b/cmd/statehash/statehash.go index 3c98d1188..78c84bacd 100644 --- a/cmd/statehash/statehash.go +++ b/cmd/statehash/statehash.go @@ -40,16 +40,17 @@ func main() { func run() error { var ( - node string - statePath string - blockchainType string - height uint64 - extendedAPI bool - compare bool - search bool - showHelp bool - showVersion bool - onlyLegacy bool + node string + statePath string + blockchainType string + height uint64 + extendedAPI bool + compare bool + search bool + showHelp bool + showVersion bool + onlyLegacy bool + disableBloomFilter bool ) logging.SetupLogger(zapcore.InfoLevel) @@ -64,6 +65,7 @@ func run() error { flag.BoolVar(&showHelp, "help", false, "Show usage information and exit") flag.BoolVar(&showVersion, "version", false, "Print version information and quit") flag.BoolVar(&onlyLegacy, "legacy", false, "Compare only legacy state hashes") + flag.BoolVar(&disableBloomFilter, "disable-bloom", false, "Disable bloom filter") flag.Parse() if showHelp { @@ -102,6 +104,7 @@ func run() error { params := state.DefaultStateParams() params.VerificationGoroutinesNum = 2 * runtime.NumCPU() params.DbParams.WriteBuffer = 16 * MB + params.DbParams.BloomFilterParams.Disable = disableBloomFilter params.StoreExtendedApiData = extendedAPI params.BuildStateHashes = true params.ProvideExtendedApi = false From b234e37c4070b02d8ece1b47e2378af0c020940a Mon Sep 17 00:00:00 2001 From: Nikolay Eskov Date: Tue, 13 Aug 2024 15:10:13 +0300 Subject: [PATCH 3/3] Moved to Go 1.22 (#1464) Co-authored-by: Alexey Kiselev --- .github/workflows/codeql-analysis.yml | 4 ++-- .github/workflows/go.yml | 12 ++++++------ .github/workflows/itests.yml | 4 ++-- .github/workflows/itests_race.yml | 4 ++-- .github/workflows/security.yml | 2 +- README.md | 2 +- go.mod | 2 +- 7 files changed, 15 insertions(+), 15 deletions(-) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index f1d9c7288..4558bcf7e 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -41,10 +41,10 @@ jobs: - name: Checkout repository uses: actions/checkout@v4 - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: true diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 6cfc6316c..dafd452ab 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -15,10 +15,10 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: false # don't save & restore build caches because golangci-lint action does it internally - name: Get dependencies @@ -38,10 +38,10 @@ jobs: if: github.event_name == 'push' || github.event.pull_request.head.repo.full_name != 'wavesplatform/gowaves' steps: - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: true @@ -74,10 +74,10 @@ jobs: if: github.event_name == 'pull_request' && github.repository == 'wavesplatform/gowaves' steps: - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: true id: go diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index 410fdddda..3c35f9b1e 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -14,10 +14,10 @@ jobs: if: (github.event_name == 'workflow_dispatch' || github.event.review.state == 'APPROVED') && github.repository == 'wavesplatform/gowaves' steps: - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: true id: go diff --git a/.github/workflows/itests_race.yml b/.github/workflows/itests_race.yml index ced474383..6e61098e0 100644 --- a/.github/workflows/itests_race.yml +++ b/.github/workflows/itests_race.yml @@ -11,10 +11,10 @@ jobs: if: github.repository == 'wavesplatform/gowaves' steps: - - name: Set up Go 1.21 + - name: Set up Go 1.22 uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.x + go-version: 1.22.x check-latest: true cache: true id: go diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 934fa9947..93592c8c3 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -67,7 +67,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5.0.2 with: - go-version: 1.21.12 + go-version: 1.22.x check-latest: true cache: true - name: Run go list diff --git a/README.md b/README.md index 61e365687..6ae8cdc7d 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ Send the transaction to the network: ### Building from sources -Go version 1.21 or later is required to build the `node`, `importer`, `wallet` and other tools. +Go version 1.22 or later is required to build the `node`, `importer`, `wallet` and other tools. To build a node, importer or other tools run a `make` command: diff --git a/go.mod b/go.mod index 056687661..2ad4c4339 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/wavesplatform/gowaves -go 1.21 +go 1.22 require ( filippo.io/edwards25519 v1.1.0