Skip to content

Commit

Permalink
Merge branch 'master' into nickeskov/go_modules/github.com/ory/docker…
Browse files Browse the repository at this point in the history
…test/v3-3.11.0
  • Loading branch information
alexeykiselev authored Aug 13, 2024
2 parents bb14c8e + b234e37 commit a81f105
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 53 deletions.
4 changes: 4 additions & 0 deletions cmd/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type cfg struct {
snapshotsPath string
cpuProfilePath string
memProfilePath string
disableBloomFilter bool
}

func parseFlags() (cfg, error) {
Expand Down Expand Up @@ -88,6 +89,8 @@ func parseFlags() (cfg, error) {
// Debug.
flag.StringVar(&c.cpuProfilePath, "cpuprofile", "", "Write cpu profile to this file.")
flag.StringVar(&c.memProfilePath, "memprofile", "", "Write memory profile to this file.")
flag.BoolVar(&c.disableBloomFilter, "disable-bloom", false,
"Disable bloom filter. Less memory usage, but decrease performance.")
flag.Parse()

if c.blockchainPath == "" {
Expand All @@ -109,6 +112,7 @@ func (c *cfg) params(maxFDs int) state.StateParams {
params.StorageParams.DbParams.OpenFilesCacheCapacity = maxFDs - clearance
params.VerificationGoroutinesNum = c.verificationGoroutinesNum
params.DbParams.WriteBuffer = c.writeBufferSize * MiB
params.DbParams.BloomFilterParams.Disable = c.disableBloomFilter
params.StoreExtendedApiData = c.buildDataForExtendedAPI
params.BuildStateHashes = c.buildStateHashes
params.ProvideExtendedApi = false // We do not need to provide any APIs during import.
Expand Down
4 changes: 3 additions & 1 deletion cmd/rollback/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ func main() {
"Note that state must be re-imported in case it wasn't imported with similar flag set")
buildStateHashes = flag.Bool("build-state-hashes", false,
"Calculate and store state hashes for each block height.")
cfgPath = flag.String("cfg-path", "", "Path to configuration JSON file, only for custom blockchain.")
cfgPath = flag.String("cfg-path", "", "Path to configuration JSON file, only for custom blockchain.")
disableBloomFilter = flag.Bool("disable-bloom", false, "Disable bloom filter for state.")
)

flag.Parse()
Expand Down Expand Up @@ -72,6 +73,7 @@ func main() {

params := state.DefaultStateParams()
params.StorageParams.DbParams.OpenFilesCacheCapacity = int(maxFDs - 10)
params.DbParams.BloomFilterParams.Disable = *disableBloomFilter
params.BuildStateHashes = *buildStateHashes
params.StoreExtendedApiData = *buildExtendedAPI

Expand Down
23 changes: 13 additions & 10 deletions cmd/statehash/statehash.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ func main() {

func run() error {
var (
node string
statePath string
blockchainType string
height uint64
extendedAPI bool
compare bool
search bool
showHelp bool
showVersion bool
onlyLegacy bool
node string
statePath string
blockchainType string
height uint64
extendedAPI bool
compare bool
search bool
showHelp bool
showVersion bool
onlyLegacy bool
disableBloomFilter bool
)

logging.SetupLogger(zapcore.InfoLevel)
Expand All @@ -64,6 +65,7 @@ func run() error {
flag.BoolVar(&showHelp, "help", false, "Show usage information and exit")
flag.BoolVar(&showVersion, "version", false, "Print version information and quit")
flag.BoolVar(&onlyLegacy, "legacy", false, "Compare only legacy state hashes")
flag.BoolVar(&disableBloomFilter, "disable-bloom", false, "Disable bloom filter")
flag.Parse()

if showHelp {
Expand Down Expand Up @@ -102,6 +104,7 @@ func run() error {
params := state.DefaultStateParams()
params.VerificationGoroutinesNum = 2 * runtime.NumCPU()
params.DbParams.WriteBuffer = 16 * MB
params.DbParams.BloomFilterParams.Disable = disableBloomFilter
params.StoreExtendedApiData = extendedAPI
params.BuildStateHashes = true
params.ProvideExtendedApi = false
Expand Down
4 changes: 3 additions & 1 deletion pkg/importer/blocks_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ func (imp *BlocksImporter) SkipToHeight(ctx context.Context, height proto.Height
return fmt.Errorf("failed to skip to height %d: %w", height, err)
}
imp.reg.updateTotalSize(size)
imp.br.skip(size)
if skipErr := imp.br.skip(size); skipErr != nil {
return fmt.Errorf("failed to skip to height %d: %w", height, skipErr)
}
imp.h++
}
}
Expand Down
45 changes: 27 additions & 18 deletions pkg/importer/blocks_reader.go
Original file line number Diff line number Diff line change
@@ -1,54 +1,63 @@
package importer

import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"io"
"os"
"path/filepath"
)

type blocksReader struct {
f *os.File
pos int64
r *bufio.Reader
pos int
closeFun func() error
}

func newBlocksReader(blockchainPath string) (*blocksReader, error) {
f, err := os.Open(filepath.Clean(blockchainPath))
if err != nil {
return nil, fmt.Errorf("failed to open blocks file: %w", err)
}
return &blocksReader{f: f, pos: 0}, nil
r := bufio.NewReaderSize(f, bufioReaderBuffSize)
return &blocksReader{r: r, pos: 0, closeFun: f.Close}, nil
}

func (br *blocksReader) readSize() (uint32, error) {
buf := make([]byte, uint32Size)
n, err := br.f.ReadAt(buf, br.pos)
var buf [uint32Size]byte
pos := br.pos
n, err := io.ReadFull(br.r, buf[:])
if err != nil {
return 0, fmt.Errorf("failed to read block size: %w", err)
return 0, fmt.Errorf("failed to read block size at pos %d: %w", pos, err)
}
br.pos += int64(n)
size := binary.BigEndian.Uint32(buf)
br.pos += n
size := binary.BigEndian.Uint32(buf[:])
if size > MaxBlockSize || size == 0 {
return 0, errors.New("corrupted blockchain file: invalid block size")
return 0, fmt.Errorf("corrupted blockchain file: invalid block size %d at pos %d", size, pos)
}
return size, nil
}

func (br *blocksReader) skip(size uint32) {
br.pos += int64(size)
}

func (br *blocksReader) readBlock(size uint32) ([]byte, error) {
buf := make([]byte, size)
n, err := br.f.ReadAt(buf, br.pos)
n, err := io.ReadFull(br.r, buf)
if err != nil {
return nil, fmt.Errorf("failed to read block: %w", err)
return nil, fmt.Errorf("failed to read block at pos %d: %w", br.pos, err)
}
br.pos += int64(n)
br.pos += n
return buf, nil
}

func (br *blocksReader) skip(size uint32) error {
n, err := br.r.Discard(int(size))
if err != nil {
return fmt.Errorf("failed to skip at pos %d: %w", br.pos, err)
}
br.pos += n
return nil
}

func (br *blocksReader) close() error {
return br.f.Close()
return br.closeFun()
}
2 changes: 2 additions & 0 deletions pkg/importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
MaxTotalBatchSize = 20 * MiB
MaxBlocksBatchSize = 50000
MaxBlockSize = 2 * MiB

bufioReaderBuffSize = 64 * KiB // 64 KiB buffer for bufio.Reader
)

type State interface {
Expand Down
12 changes: 8 additions & 4 deletions pkg/importer/snapshots_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ func (imp *SnapshotsImporter) SkipToHeight(ctx context.Context, height proto.Hei
}
size, err := imp.br.readSize()
if err != nil {
return fmt.Errorf("failed to skip to height %d: %w", height, err)
return fmt.Errorf("failed to skip blocks to height %d: %w", height, err)
}
imp.reg.updateTotalSize(size)
imp.br.skip(size)
if skipErr := imp.br.skip(size); skipErr != nil {
return fmt.Errorf("failed to skip blocks to height %d: %w", height, skipErr)
}
size, err = imp.sr.readSize()
if err != nil {
return fmt.Errorf("failed to skip to height %d: %w", height, err)
return fmt.Errorf("failed to skip snapshots to height %d: %w", height, err)
}
if skipErr := imp.sr.skip(size); skipErr != nil {
return fmt.Errorf("failed to snapshots skip to height %d: %w", height, skipErr)
}
imp.sr.skip(size)
imp.h++
}
}
Expand Down
49 changes: 30 additions & 19 deletions pkg/importer/snapshots_reader.go
Original file line number Diff line number Diff line change
@@ -1,65 +1,76 @@
package importer

import (
"bufio"
"encoding/binary"
"fmt"
"io"
"os"
"path/filepath"

"github.com/wavesplatform/gowaves/pkg/proto"
)

type snapshotsReader struct {
scheme proto.Scheme
f *os.File
pos int64
scheme proto.Scheme
r *bufio.Reader
pos int
closeFun func() error
}

func newSnapshotsReader(scheme proto.Scheme, snapshotsPath string) (*snapshotsReader, error) {
f, err := os.Open(filepath.Clean(snapshotsPath))
if err != nil {
return nil, fmt.Errorf("failed to open snapshots file: %w", err)
}
return &snapshotsReader{scheme: scheme, f: f, pos: 0}, nil
r := bufio.NewReaderSize(f, bufioReaderBuffSize)
return &snapshotsReader{scheme: scheme, r: r, pos: 0, closeFun: f.Close}, nil
}

func (sr *snapshotsReader) readSize() (uint32, error) {
const sanityMaxBlockSnapshotSize = 100 * MiB
buf := make([]byte, uint32Size)
n, err := sr.f.ReadAt(buf, sr.pos)
var buf [uint32Size]byte
pos := sr.pos
n, err := io.ReadFull(sr.r, buf[:])
if err != nil {
return 0, fmt.Errorf("failed to read block snapshot size: %w", err)
return 0, fmt.Errorf("failed to read block snapshot size at pos %d: %w", pos, err)
}
sr.pos += int64(n)
size := binary.BigEndian.Uint32(buf)
if size > sanityMaxBlockSnapshotSize { // dont check for 0 size because it is valid
return 0, fmt.Errorf("block snapshot size %d is too big", size)
sr.pos += n
size := binary.BigEndian.Uint32(buf[:])
if size > sanityMaxBlockSnapshotSize { // don't check for 0 size because it is valid
return 0, fmt.Errorf("block snapshot size %d is too big at pos %d", size, pos)
}
return size, nil
}

func (sr *snapshotsReader) skip(size uint32) {
sr.pos += int64(size)
func (sr *snapshotsReader) skip(size uint32) error {
n, err := sr.r.Discard(int(size))
if err != nil {
return fmt.Errorf("failed to skip at pos %d: %w", sr.pos, err)
}
sr.pos += n
return nil
}

func (sr *snapshotsReader) readSnapshot() (*proto.BlockSnapshot, error) {
size, sErr := sr.readSize()
if sErr != nil {
return nil, sErr
return nil, fmt.Errorf("failed to read snapshot size: %w", sErr)
}
pos := sr.pos
buf := make([]byte, size)
n, rErr := sr.f.ReadAt(buf, sr.pos)
n, rErr := io.ReadFull(sr.r, buf)
if rErr != nil {
return nil, fmt.Errorf("failed to read snapshot: %w", rErr)
return nil, fmt.Errorf("failed to read snapshot at pos %d: %w", pos, rErr)
}
sr.pos += int64(n)
sr.pos += n
snapshot := &proto.BlockSnapshot{}
if err := snapshot.UnmarshalBinaryImport(buf, sr.scheme); err != nil {
return nil, fmt.Errorf("failed to unmarshal snapshot: %w", err)
return nil, fmt.Errorf("failed to unmarshal snapshot at pos %d: %w", pos, err)
}
return snapshot, nil
}

func (sr *snapshotsReader) close() error {
return sr.f.Close()
return sr.closeFun()
}

0 comments on commit a81f105

Please sign in to comment.