From 4c1e8f374680c5fe8179f795f2a0a14f1fea0cd2 Mon Sep 17 00:00:00 2001 From: Marco Peereboom Date: Tue, 13 Aug 2024 19:59:39 +0100 Subject: [PATCH] go fuck yourself mod and sum --- database/level/level.go | 66 ++++++++-- database/tbcd/level/level.go | 16 +-- go.mod | 10 +- go.sum | 23 ++-- rawdb/rawdb.go | 225 +++++++++++++++++++++++++++++++++++ rawdb/rawdb_test.go | 101 ++++++++++++++++ 6 files changed, 403 insertions(+), 38 deletions(-) create mode 100644 rawdb/rawdb.go create mode 100644 rawdb/rawdb_test.go diff --git a/database/level/level.go b/database/level/level.go index 44f9fdeb4..bdda0369d 100644 --- a/database/level/level.go +++ b/database/level/level.go @@ -20,6 +20,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/opt" "github.com/hemilabs/heminetwork/database" + "github.com/hemilabs/heminetwork/rawdb" ) const ( @@ -27,13 +28,14 @@ const ( BlockHeadersDB = "blockheaders" BlocksMissingDB = "blocksmissing" - BlocksDB = "blocks" MetadataDB = "metadata" HeightHashDB = "heighthash" PeersDB = "peers" OutputsDB = "outputs" TransactionsDB = "transactions" + BlocksDB = "blocks" // raw database + versionKey = "version" databaseVersion = 1 ) @@ -44,13 +46,17 @@ func init() { loggo.ConfigureLoggers(logLevel) } -type Pool map[string]*leveldb.DB +type ( + Pool map[string]*leveldb.DB + RawPool map[string]*rawdb.RawDB +) type Database struct { - mtx sync.RWMutex - pool Pool // database pool + mtx sync.RWMutex + pool Pool // database pool + rawPool RawPool // raw database pool - home string // leveld toplevel database directory + home string // leveldb toplevel database directory } var _ database.Database = (*Database)(nil) @@ -63,6 +69,15 @@ func (l *Database) Close() error { defer l.mtx.Unlock() var errSeen error // XXX return last error for now + + for k, v := range l.rawPool { + if err := v.Close(); err != nil { + // do continue, leveldb does not like unfresh shutdowns + log.Errorf("close %v: %v", k, err) + errSeen = err + } + } + for k, v := range l.pool { if err := v.Close(); err != nil { // do continue, leveldb does not like unfresh shutdowns @@ -81,6 +96,13 @@ func (l *Database) DB() Pool { return l.pool } +func (l *Database) RawDB() RawPool { + log.Tracef("RawDB") + defer log.Tracef("RawDB exit") + + return l.rawPool +} + func (l *Database) RegisterNotification(ctx context.Context, n database.NotificationName, f database.NotificationCallback, payload any) error { log.Tracef("RegisterNotification") defer log.Tracef("RegisterNotification exit") @@ -109,6 +131,24 @@ func (l *Database) openDB(name string, options *opt.Options) error { return nil } +func (l *Database) openRawDB(name string, blockSize int64) error { + l.mtx.Lock() + defer l.mtx.Unlock() + + dir := filepath.Join(l.home, name) + rdb, err := rawdb.New(dir, blockSize) + if err != nil { + return fmt.Errorf("rawdb new %v: %w", name, err) + } + err = rdb.Open() + if err != nil { + return fmt.Errorf("rawdb open %v: %w", name, err) + } + l.rawPool[name] = rdb + + return nil +} + func (l *Database) Version(ctx context.Context) (int, error) { mdDB := l.pool[MetadataDB] value, err := mdDB.Get([]byte(versionKey), nil) @@ -135,8 +175,9 @@ func New(ctx context.Context, home string, version int) (*Database, error) { } l := &Database{ - home: h, - pool: make(Pool), + home: h, + pool: make(Pool), + rawPool: make(RawPool), } unwind := true @@ -146,15 +187,10 @@ func New(ctx context.Context, home string, version int) (*Database, error) { } }() - // Peers table err = l.openDB(BlockHeadersDB, nil) if err != nil { return nil, fmt.Errorf("leveldb %v: %w", BlockHeadersDB, err) } - err = l.openDB(BlocksDB, nil) - if err != nil { - return nil, fmt.Errorf("leveldb %v: %w", BlocksDB, err) - } err = l.openDB(BlocksMissingDB, nil) if err != nil { return nil, fmt.Errorf("leveldb %v: %w", BlocksMissingDB, err) @@ -176,6 +212,12 @@ func New(ctx context.Context, home string, version int) (*Database, error) { return nil, fmt.Errorf("leveldb %v: %w", TransactionsDB, err) } + // Blocks database is special + err = l.openRawDB(BlocksDB, 256*1024*1024) + if err != nil { + return nil, fmt.Errorf("rawdb %v: %w", BlocksDB, err) + } + // Treat metadata special so that we can insert some stuff. err = l.openDB(MetadataDB, &opt.Options{ErrorIfMissing: true}) if errors.Is(err, fs.ErrNotExist) { diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index 10f81e2cc..b1b7452af 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -60,7 +60,8 @@ type ldb struct { mtx sync.Mutex *level.Database - pool level.Pool + pool level.Pool + rawPool level.RawPool blockCache *lru.Cache[string, *tbcd.Block] // block cache @@ -99,6 +100,7 @@ func New(ctx context.Context, cfg *Config) (*ldb, error) { l := &ldb{ Database: ld, pool: ld.DB(), + rawPool: ld.RawDB(), cfg: cfg, } @@ -664,16 +666,14 @@ func (l *ldb) BlockInsert(ctx context.Context, b *tbcd.Block) (int64, error) { return -1, fmt.Errorf("block header by hash: %w", err) } - // Insert block without transaction, if it succeeds and the missing - // does not it will be simply redone. - bDB := l.pool[level.BlocksDB] - has, err := bDB.Has(b.Hash, nil) + bDB := l.rawPool[level.BlocksDB] + has, err := bDB.Has(b.Hash) if err != nil { return -1, fmt.Errorf("block insert has: %w", err) } if !has { // Insert block since we do not have it yet - if err = bDB.Put(b.Hash, b.Block, nil); err != nil { + if err = bDB.Insert(b.Hash, b.Block); err != nil { return -1, fmt.Errorf("blocks insert put: %w", err) } if l.cfg.BlockCache > 0 { @@ -705,8 +705,8 @@ func (l *ldb) BlockByHash(ctx context.Context, hash []byte) (*tbcd.Block, error) } } - bDB := l.pool[level.BlocksDB] - eb, err := bDB.Get(hash, nil) + bDB := l.rawPool[level.BlocksDB] + eb, err := bDB.Get(hash) if err != nil { if errors.Is(err, leveldb.ErrNotFound) { ch, _ := chainhash.NewHash(hash) diff --git a/go.mod b/go.mod index 3b41ee3c9..e9c951072 100644 --- a/go.mod +++ b/go.mod @@ -82,16 +82,16 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/stretchr/testify v1.9.0 // indirect - github.com/tklauser/go-sysconf v0.3.13 // indirect - github.com/tklauser/numcpus v0.7.0 // indirect - github.com/yusufpapurcu/wmi v1.2.4 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/metric v1.24.0 // indirect go.opentelemetry.io/otel/trace v1.24.0 // indirect golang.org/x/crypto v0.22.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c // indirect - google.golang.org/grpc v1.62.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect + google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 5e60f7f4e..ad8051a25 100644 --- a/go.sum +++ b/go.sum @@ -225,17 +225,14 @@ github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc= github.com/testcontainers/testcontainers-go v0.32.0 h1:ug1aK08L3gCHdhknlTTwWjPHPS+/alvLJU/DRxTD/ME= github.com/testcontainers/testcontainers-go v0.32.0/go.mod h1:CRHrzHLQhlXUsa5gXjTOfqIEJcrK5+xMDmBr/WMI88E= +github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= -github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08lq3r4= -github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0= +github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= -github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4= -github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw= github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= -github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo= @@ -312,13 +309,13 @@ golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 h1:9+tzLLstTlPTRyJTh+ah5wIMsBW5c4tQwGTN3thOW9Y= -google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80 h1:Lj5rbfG876hIAYFjqiJnPHfhXbv+nzTWfm04Fg/XSVU= -google.golang.org/genproto/googleapis/api v0.0.0-20240123012728-ef4313101c80/go.mod h1:4jWUdICTdgc3Ibxmr8nAJiiLHwQBY0UI0XZcEMaFKaA= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c h1:NUsgEN92SQQqzfA+YtqYNqYmB3DMMYLlIwUZAQFVFbo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20240221002015-b0ce06bbee7c/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY= -google.golang.org/grpc v1.62.0 h1:HQKZ/fa1bXkX1oFOvSjmZEUL8wLSaZTjCcLAlmZRtdk= -google.golang.org/grpc v1.62.0/go.mod h1:IWTG0VlJLCh1SkC58F7np9ka9mx/WNkjl4PGJaiq+QE= +google.golang.org/genproto v0.0.0-20231012201019-e917dd12ba7a h1:fwgW9j3vHirt4ObdHoYNwuO24BEZjSzbh+zPaNWoiY8= +google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb h1:lK0oleSc7IQsUxO3U5TjL9DWlsxpEBemh+zpB7IqhWI= +google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= +google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk= +google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/rawdb/rawdb.go b/rawdb/rawdb.go new file mode 100644 index 000000000..685767836 --- /dev/null +++ b/rawdb/rawdb.go @@ -0,0 +1,225 @@ +// Copyright (c) 2024 Hemi Labs, Inc. +// Use of this source code is governed by the MIT License, +// which can be found in the LICENSE file. + +package rawdb + +import ( + "encoding/binary" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + "github.com/juju/loggo" + "github.com/syndtr/goleveldb/leveldb" +) + +const ( + logLevel = "INFO" + + indexDir = "index" + dataDir = "data" +) + +var ( + log = loggo.GetLogger("rawdb") + lastFilenameKey = []byte("lastfilename") +) + +func init() { + loggo.ConfigureLoggers(logLevel) +} + +type RawDB struct { + mtx sync.RWMutex + + home string + + maxSize int64 + + index *leveldb.DB +} + +func New(home string, maxSize int64) (*RawDB, error) { + log.Tracef("New") + defer log.Tracef("New exit") + + if maxSize < 4096 { + return nil, fmt.Errorf("invalid max size: %v", maxSize) + } + + return &RawDB{ + home: home, + maxSize: maxSize, + }, nil +} + +func (r *RawDB) lastFilename() (string, error) { + log.Tracef("lastFilename") + defer log.Tracef("lastFilename exit") + + return "", errors.New("not yet") +} + +func (r *RawDB) Open() error { + log.Tracef("Open") + defer log.Tracef("Open exit") + + err := os.MkdirAll(filepath.Join(r.home, dataDir), 0o0700) + if err != nil { + return fmt.Errorf("mkdir: %w", err) + } + r.index, err = leveldb.OpenFile(filepath.Join(r.home, indexDir), nil) + if err != nil { + return fmt.Errorf("mkdir: %w", err) + } + + return nil +} + +func (r *RawDB) Close() error { + log.Tracef("Close") + defer log.Tracef("Close exit") + + err := r.index.Close() + if err != nil { + return err + } + + return nil +} + +func (r *RawDB) Has(key []byte) (bool, error) { + log.Tracef("Has") + defer log.Tracef("Has exit") + + return r.index.Has(key, nil) +} + +func (r *RawDB) Insert(key, value []byte) error { + log.Tracef("Insert") + defer log.Tracef("Insert exit") + + if int64(len(value)) > r.maxSize { + return fmt.Errorf("length exceeds maximum length: %v > %v", + len(value), r.maxSize) + } + + // Assert we do not have this key stored yet. + if ok, err := r.index.Has(key, nil); ok { + return errors.New("key already exists") + } else if err != nil { + return err + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + tries := 0 + for { + // This should not happen but we must ensure we aren't spinning. + if tries > 1 { + return fmt.Errorf("could not determinet last filename") + } + + lfe, err := r.index.Get(lastFilenameKey, nil) + if err != nil { + if errors.Is(err, leveldb.ErrNotFound) { + lfe = []byte{0, 0, 0, 0} + } else { + return err + } + } + last := binary.BigEndian.Uint32(lfe) + lastFilename := filepath.Join(r.home, dataDir, fmt.Sprintf("%010v", last)) + + // determine if data fits. + fh, err := os.OpenFile(lastFilename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0o600) + if err != nil { + return err + } + defer func() { + // Close all files we opened along the way. + err := fh.Close() + if err != nil { + log.Errorf("close %v: %v", lastFilename, err) + } + }() + if fi, err := fh.Stat(); err != nil { + return err + } else if fi.Size()+int64(len(value)) > r.maxSize { + last++ + lastData := make([]byte, 8) + binary.BigEndian.PutUint32(lastData, last) + err = r.index.Put(lastFilenameKey, lastData, nil) + if err != nil { + return err + } + tries++ + continue + } else { + // Encoded coordinates. + c := make([]byte, 4+4+4) + binary.BigEndian.PutUint32(c[0:4], last) + binary.BigEndian.PutUint32(c[4:8], uint32(fi.Size())) + binary.BigEndian.PutUint32(c[8:12], uint32(len(value))) + + // Append value to latest file. + n, err := fh.Write(value) + if err != nil { + return err + } + if n != len(value) { + return fmt.Errorf("partial write, data corruption: %v != %v", n, len(value)) + } + + // Write coordinates + err = r.index.Put(key, c, nil) + if err != nil { + return err + } + + return nil + } + } +} + +func (r *RawDB) Get(key []byte) ([]byte, error) { + log.Tracef("Get: %x", key) + defer log.Tracef("Get exit: %x", key) + + c, err := r.index.Get(key, nil) + if err != nil { + return nil, err + } + if len(c) != 12 { + // Should not happen. + return nil, errors.New("invalid coordinates") + } + filename := filepath.Join(r.home, dataDir, fmt.Sprintf("%010v", + binary.BigEndian.Uint32(c[0:4]))) + offset := binary.BigEndian.Uint32(c[4:8]) + size := binary.BigEndian.Uint32(c[8:12]) + f, err := os.OpenFile(filename, os.O_RDONLY, 0o600) + if err != nil { + return nil, err + } + defer func() { + err := f.Close() + if err != nil { + log.Errorf("close: %v", err) + } + }() + data := make([]byte, size) + n, err := f.ReadAt(data, int64(offset)) + if err != nil { + return nil, err + } + if n != int(size) { + return nil, errors.New("invalid read size") + } + + return data, nil +} diff --git a/rawdb/rawdb_test.go b/rawdb/rawdb_test.go new file mode 100644 index 000000000..64fb72352 --- /dev/null +++ b/rawdb/rawdb_test.go @@ -0,0 +1,101 @@ +// Copyright (c) 2024 Hemi Labs, Inc. +// Use of this source code is governed by the MIT License, +// which can be found in the LICENSE file. + +package rawdb + +import ( + "bytes" + "os" + "testing" +) + +func TestRawDB(t *testing.T) { + home, err := os.MkdirTemp("", "rawdb") + if err != nil { + t.Fatal(err) + } + remove := true + defer func() { + if remove { + err := os.RemoveAll(home) + if err != nil { + panic(err) + } + } else { + t.Logf("did not remove home: %v", home) + } + }() + blockSize := int64(4096) + rdb, err := New(home, blockSize) + if err != nil { + t.Fatal(err) + } + err = rdb.Open() + if err != nil { + t.Fatal(err) + } + defer func() { + err := rdb.Close() + if err != nil { + panic(err) + } + }() + + // Open again and expect locked failure + rdb2, err := New(home, blockSize) + if err != nil { + t.Fatal(err) + } + err = rdb2.Open() + if err == nil { + t.Fatal("expected locked db") + } + + key := []byte("key") + data := []byte("hello, world!") + err = rdb.Insert(key, data) + if err != nil { + t.Fatal(err) + } + KEY := []byte("KEY") + DATA := []byte("HELLO, WORLD!") + err = rdb.Insert(KEY, DATA) + if err != nil { + t.Fatal(err) + } + + // Get data out again + dataRead, err := rdb.Get(key) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(data, dataRead) { + t.Fatal("data not identical") + } + dataRead, err = rdb.Get(KEY) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(DATA, dataRead) { + t.Fatal("data not identical") + } + + // Overflow to next file + overflowData := make([]byte, int(blockSize)-len(data)-len(DATA)+1) + for k := range overflowData { + overflowData[k] = uint8(k) + } + overflowKey := []byte("overflow") + err = rdb.Insert(overflowKey, overflowData) + if err != nil { + t.Fatal(err) + } + overflowRead, err := rdb.Get(overflowKey) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(overflowData, overflowRead) { + t.Fatal("overflow data not identical") + } +}