diff --git a/cmd/hemictl/hemictl.go b/cmd/hemictl/hemictl.go index b597c35e8..a9ce65313 100644 --- a/cmd/hemictl/hemictl.go +++ b/cmd/hemictl/hemictl.go @@ -206,7 +206,7 @@ func tbcdb() error { return fmt.Errorf("new server: %w", err) } // Open db. - err = s.DBOpen(ctx) + err = s.DBOpen(ctx) // XXX kill this and verify all reversed hashes as parameters if err != nil { return fmt.Errorf("db open: %w", err) } @@ -398,7 +398,8 @@ func tbcdb() error { fmt.Println("\tdumpoutputs ") fmt.Println("\thelp") fmt.Println("\tscripthashbyoutpoint [txid] [index]") - fmt.Println("\tspendoutputsbytxid [txid] [index]") + fmt.Println("\tspentoutputsbytxid ") + fmt.Println("\ttxbyid ") fmt.Println("\ttxindex ") fmt.Println("\tutxoindex ") fmt.Println("\tutxosbyscripthash [hash]") @@ -460,7 +461,7 @@ func tbcdb() error { var revTxId [32]byte copy(revTxId[:], chtxid[:]) - bh, err := s.DB().BlocksByTxId(ctx, revTxId) + bh, err := s.DB().BlocksByTxId(ctx, revTxId[:]) if err != nil { return fmt.Errorf("block by txid: %w", err) } @@ -468,7 +469,23 @@ func tbcdb() error { fmt.Printf("%v\n", bh[k]) } - case "spendoutputsbytxid": + case "txbyid": + txid := args["txid"] + if txid == "" { + return errors.New("txid: must be set") + } + chtxid, err := chainhash.NewHashFromStr(txid) + if err != nil { + return fmt.Errorf("chainhash: %w", err) + } + + tx, err := s.TxById(ctx, chtxid) + if err != nil { + return fmt.Errorf("block by txid: %w", err) + } + fmt.Printf("%v\n", spew.Sdump(tx)) + + case "spentoutputsbytxid": txid := args["txid"] if txid == "" { return errors.New("txid: must be set") @@ -477,10 +494,8 @@ func tbcdb() error { if err != nil { return fmt.Errorf("chainhash: %w", err) } - var revTxId [32]byte - copy(revTxId[:], chtxid[:]) - si, err := s.DB().SpendOutputsByTxId(ctx, revTxId) + si, err := s.SpentOutputsByTxId(ctx, chtxid) if err != nil { return fmt.Errorf("spend outputs by txid: %w", err) } diff --git a/database/level/level.go b/database/level/level.go index 38207b67e..44f9fdeb4 100644 --- a/database/level/level.go +++ b/database/level/level.go @@ -44,18 +44,14 @@ func init() { loggo.ConfigureLoggers(logLevel) } -type ( - Pool map[string]*leveldb.DB - Database struct { - mtx sync.RWMutex - wg sync.WaitGroup // Wait group for notification handler exit +type Pool map[string]*leveldb.DB - pool Pool // database pool +type Database struct { + mtx sync.RWMutex + pool Pool // database pool - ntfn map[database.NotificationName]int // Notification handlers - home string // leveld toplevel database directory - } -) + home string // leveld toplevel database directory +} var _ database.Database = (*Database)(nil) @@ -68,8 +64,7 @@ func (l *Database) Close() error { var errSeen error // XXX return last error for now for k, v := range l.pool { - err := v.Close() - if err != nil { + if err := v.Close(); err != nil { // do continue, leveldb does not like unfresh shutdowns log.Errorf("close %v: %v", k, err) errSeen = err diff --git a/database/tbcd/database.go b/database/tbcd/database.go index e4b07c2b8..8f8dbbb61 100644 --- a/database/tbcd/database.go +++ b/database/tbcd/database.go @@ -65,10 +65,10 @@ type Database interface { BlockByHash(ctx context.Context, hash []byte) (*Block, error) // Transactions - BlockUtxoUpdate(ctx context.Context, utxos map[Outpoint]CacheOutput) error - BlockTxUpdate(ctx context.Context, txs map[TxKey]*TxValue) error - BlocksByTxId(ctx context.Context, txId TxId) ([]BlockHash, error) - SpendOutputsByTxId(ctx context.Context, txId TxId) ([]SpendInfo, error) + BlockUtxoUpdate(ctx context.Context, direction int, utxos map[Outpoint]CacheOutput) error + BlockTxUpdate(ctx context.Context, direction int, txs map[TxKey]*TxValue) error + BlocksByTxId(ctx context.Context, txId []byte) ([]BlockHash, error) + SpentOutputsByTxId(ctx context.Context, txId []byte) ([]SpentInfo, error) // Peer manager PeersStats(ctx context.Context) (int, int) // good, bad count @@ -82,6 +82,12 @@ type Database interface { UtxosByScriptHash(ctx context.Context, sh ScriptHash, start uint64, count uint64) ([]Utxo, error) } +// XXX there exist various types in this file that need to be reevaluated. +// Such as BlockHash, ScriptHash etc. They exist for convenience reasons but +// it may be worth to switch to chainhash and btcd.OutPoint etc. This does need +// thought because we have composites that are needed for the code to function +// properly. + // BlockHeader contains the first 80 raw bytes of a bitcoin block plus its // location information (hash+height) and the cumulative difficulty. type BlockHeader struct { @@ -142,7 +148,7 @@ type BlockIdentifier struct { Hash database.ByteArray } -type SpendInfo struct { +type SpentInfo struct { BlockHash BlockHash TxId TxId InputIndex uint32 @@ -199,7 +205,7 @@ type CacheOutput [32 + 8 + 4]byte // script_hash + value + out_idx // String reutrns pretty printable CacheOutput. Hash is not reversed since it is an // opaque pointer. It prints satoshis@script_hash:output_index func (c CacheOutput) String() string { - return fmt.Sprintf("%d @ %v:%d", binary.BigEndian.Uint64(c[32:40]), + return fmt.Sprintf("%d @ %x:%d", binary.BigEndian.Uint64(c[32:40]), c[0:32], binary.BigEndian.Uint32(c[40:])) } @@ -314,6 +320,11 @@ func (t TxId) String() string { return hex.EncodeToString(rev[:]) } +func (t TxId) Hash() *chainhash.Hash { + h, _ := chainhash.NewHash(t[:]) + return h +} + func NewTxId(x [32]byte) (txId TxId) { copy(txId[:], x[:]) return @@ -340,6 +351,11 @@ func (bh BlockHash) String() string { return hex.EncodeToString(rev[:]) } +func (bh BlockHash) Hash() *chainhash.Hash { + h, _ := chainhash.NewHash(bh[:]) + return h +} + func NewBlockHash(x [32]byte) (blockHash BlockHash) { copy(blockHash[:], x[:]) return @@ -378,7 +394,7 @@ func NewScriptHashFromBytes(x []byte) (scriptHash ScriptHash, err error) { // Spent Transaction: // -// s + txin.PrevOutPoint.Hash + txin.PrevOutPoint.Index + blockhash = txid + txin_index + blockhash | [1 + 32 + 4 + 32] = [32 + 4] +// s + txin.PrevOutPoint.Hash + txin.PrevOutPoint.Index + blockhash = txid + txin_index | [1 + 32 + 4 + 32] = [32 + 4] // // Transaction ID to Block mapping: // @@ -414,6 +430,21 @@ func NewTxMapping(txId, blockHash *chainhash.Hash) (txKey TxKey) { return txKey } +func TxIdBlockHashFromTxKey(txKey TxKey) (*TxId, *BlockHash, error) { + if txKey[0] != 't' { + return nil, nil, fmt.Errorf("invalid magic 0x%02x", txKey[0]) + } + txId, err := NewTxIdFromBytes(txKey[1:33]) + if err != nil { + return nil, nil, fmt.Errorf("invalid tx id: %w", err) + } + blockHash, err := NewBlockHashFromBytes(txKey[33:65]) + if err != nil { + return nil, nil, fmt.Errorf("invalid block hash: %w", err) + } + return &txId, &blockHash, nil +} + // Helper functions // B2H converts a raw block header to a wire block header structure. diff --git a/database/tbcd/level/level.go b/database/tbcd/level/level.go index c652c8e16..1916796a5 100644 --- a/database/tbcd/level/level.go +++ b/database/tbcd/level/level.go @@ -312,7 +312,9 @@ func (l *ldb) BlockHeaderGenesisInsert(ctx context.Context, bh [80]byte) error { hhKey := heightHashToKey(0, bhash[:]) hhBatch.Put(hhKey, []byte{}) - ebh := encodeBlockHeader(0, bh, new(big.Int)) + cdiff := big.NewInt(0) + cdiff = new(big.Int).Add(cdiff, blockchain.CalcWork(wbh.Bits)) + ebh := encodeBlockHeader(0, bh, cdiff) bhBatch.Put(bhash[:], ebh[:]) bhBatch.Put([]byte(bhsCanonicalTipKey), ebh[:]) @@ -724,7 +726,7 @@ func (l *ldb) BlockByHash(ctx context.Context, hash []byte) (*tbcd.Block, error) }, nil } -func (l *ldb) BlocksByTxId(ctx context.Context, txId tbcd.TxId) ([]tbcd.BlockHash, error) { +func (l *ldb) BlocksByTxId(ctx context.Context, txId []byte) ([]tbcd.BlockHash, error) { log.Tracef("BlocksByTxId") defer log.Tracef("BlocksByTxId exit") @@ -746,18 +748,18 @@ func (l *ldb) BlocksByTxId(ctx context.Context, txId tbcd.TxId) ([]tbcd.BlockHas return nil, fmt.Errorf("blocks by id iterator: %w", err) } if len(blocks) == 0 { - ch, _ := chainhash.NewHash(txId[:]) - return nil, database.NotFoundError(fmt.Sprintf("tx not found: %v", ch)) + ctxid, _ := chainhash.NewHash(txId) + return nil, database.NotFoundError(fmt.Sprintf("tx not found: %v", ctxid)) } return blocks, nil } -func (l *ldb) SpendOutputsByTxId(ctx context.Context, txId tbcd.TxId) ([]tbcd.SpendInfo, error) { - log.Tracef("SpendOutputByOutpoint") - defer log.Tracef("SpendOutputByOutpoint exit") +func (l *ldb) SpentOutputsByTxId(ctx context.Context, txId []byte) ([]tbcd.SpentInfo, error) { + log.Tracef("SpentOutputByOutpoint") + defer log.Tracef("SpentOutputByOutpoint exit") - si := make([]tbcd.SpendInfo, 0, 2) + si := make([]tbcd.SpentInfo, 0, 2) txDB := l.pool[level.TransactionsDB] var key [1 + 32]byte key[0] = 's' @@ -765,10 +767,10 @@ func (l *ldb) SpendOutputsByTxId(ctx context.Context, txId tbcd.TxId) ([]tbcd.Sp it := txDB.NewIterator(&util.Range{Start: key[:]}, nil) defer it.Release() for it.Next() { - if !bytes.Equal(it.Key()[1:33], key[1:33]) { - break + if !bytes.Equal(it.Key()[:33], key[:]) { + continue } - var s tbcd.SpendInfo + var s tbcd.SpentInfo copy(s.TxId[:], it.Value()[0:32]) copy(s.BlockHash[:], it.Key()[37:]) s.InputIndex = binary.BigEndian.Uint32(it.Value()[32:36]) @@ -778,8 +780,7 @@ func (l *ldb) SpendOutputsByTxId(ctx context.Context, txId tbcd.TxId) ([]tbcd.Sp return nil, fmt.Errorf("blocks by id iterator: %w", err) } if len(si) == 0 { - ch, _ := chainhash.NewHash(txId[:]) - return nil, database.NotFoundError(fmt.Sprintf("not found %v", ch)) + return nil, database.NotFoundError(fmt.Sprintf("not found %v", txId)) } return si, nil @@ -856,10 +857,14 @@ func (l *ldb) UtxosByScriptHash(ctx context.Context, sh tbcd.ScriptHash, start u return utxos, nil } -func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { +func (l *ldb) BlockUtxoUpdate(ctx context.Context, direction int, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { log.Tracef("BlockUtxoUpdate") defer log.Tracef("BlockUtxoUpdate exit") + if !(direction == 1 || direction == -1) { + return fmt.Errorf("invalid direction: %v", direction) + } + // outputs outsTx, outsCommit, outsDiscard, err := l.startTransaction(level.OutputsDB) if err != nil { @@ -877,6 +882,8 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd. copy(hop[33:65], op.TxId()) copy(hop[65:], utxo.OutputIndexBytes()) + // The cache is updated in a way that makes the direction + // irrelevant. if utxo.IsDelete() { // Delete balance and utxos outsBatch.Delete(op[:][:]) @@ -886,6 +893,7 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd. outsBatch.Put(op[:], utxo.ScriptHashSlice()) outsBatch.Put(hop[:], utxo.ValueBytes()) } + // XXX this probably should be done by the caller but we do it // here to lower memory pressure as large gobs of data are // written to disk. @@ -905,10 +913,14 @@ func (l *ldb) BlockUtxoUpdate(ctx context.Context, utxos map[tbcd.Outpoint]tbcd. return nil } -func (l *ldb) BlockTxUpdate(ctx context.Context, txs map[tbcd.TxKey]*tbcd.TxValue) error { +func (l *ldb) BlockTxUpdate(ctx context.Context, direction int, txs map[tbcd.TxKey]*tbcd.TxValue) error { log.Tracef("BlockTxUpdate") defer log.Tracef("BlockTxUpdate exit") + if !(direction == 1 || direction == -1) { + return fmt.Errorf("invalid direction: %v", direction) + } + // transactions txsTx, txsCommit, txsDiscard, err := l.startTransaction(level.TransactionsDB) if err != nil { @@ -931,12 +943,16 @@ func (l *ldb) BlockTxUpdate(ctx context.Context, txs map[tbcd.TxKey]*tbcd.TxValu default: return fmt.Errorf("invalid cache entry: %v", spew.Sdump(k)) } + switch direction { + case -1: + txsBatch.Delete(key) + case 1: + txsBatch.Put(key, value) + } - txsBatch.Put(key, value) - // log.Infof("%v:%v", spew.Sdump(key), spew.Sdump(value)) - // // XXX this probably should be done by the caller but we do it - // // here to lower memory pressure as large gobs of data are - // // written to disk. + // XXX this probably should be done by the caller but we do it + // here to lower memory pressure as large gobs of data are + // written to disk. delete(txs, k) } diff --git a/database/tbcd/level/level_test.go b/database/tbcd/level/level_test.go deleted file mode 100644 index a71ab72a5..000000000 --- a/database/tbcd/level/level_test.go +++ /dev/null @@ -1,420 +0,0 @@ -// Copyright (c) 2024 Hemi Labs, Inc. -// Use of this source code is governed by the MIT License, -// which can be found in the LICENSE file. - -package level - -import ( - "bytes" - "crypto/rand" - "encoding/binary" - "fmt" - "io" - "math/big" - "sort" - "testing" - - "github.com/btcsuite/btcd/chaincfg" - "github.com/btcsuite/btcd/chaincfg/chainhash" - "github.com/btcsuite/btcd/wire" - "github.com/davecgh/go-spew/spew" - "github.com/go-test/deep" - - "github.com/hemilabs/heminetwork/database/tbcd" -) - -func bytes2Block(block []byte) (*wire.MsgBlock, error) { - var b wire.MsgBlock - err := b.Deserialize(bytes.NewReader(block)) - if err != nil { - return nil, fmt.Errorf("deserialize msg block: %w", err) - } - return &b, nil -} - -func bytes2Header(header []byte) (*wire.BlockHeader, error) { - var bh wire.BlockHeader - err := bh.Deserialize(bytes.NewReader(header)) - if err != nil { - return nil, fmt.Errorf("deserialize block header: %w", err) - } - return &bh, nil -} - -func h2b(wbh *wire.BlockHeader) []byte { - hb, err := header2Bytes(wbh) - if err != nil { - panic(err) - } - return hb -} - -func header2Bytes(wbh *wire.BlockHeader) ([]byte, error) { - var b bytes.Buffer - err := wbh.Serialize(&b) - if err != nil { - return nil, err - } - return b.Bytes(), nil -} - -// random returns a variable number of random bytes. -func random(n int) []byte { - buffer := make([]byte, n) - _, err := io.ReadFull(rand.Reader, buffer) - if err != nil { - panic(err) - } - return buffer -} - -func TestEncodeDecodeBlockHeader(t *testing.T) { - cp := chaincfg.TestNet3Params - genesisBH := cp.GenesisBlock.Header - genesisHash := cp.GenesisHash - - randDiff := random(32) - difficulty := new(big.Int).SetBytes(randDiff) - - bh := tbcd.BlockHeader{ - Hash: genesisHash[:], - Height: 0x1122334455667788, // we need not zero to test decoding of height - Header: h2b(&genesisBH), - Difficulty: *difficulty, - } - er := encodeBlockHeader(bh.Height, [80]byte(h2b(&genesisBH)), &bh.Difficulty) - dr := decodeBlockHeader(er[:]) - if diff := deep.Equal(bh, *dr); len(diff) > 0 { - t.Fatalf("unexpected diff: %v%v", spew.Sdump(bh), spew.Sdump(dr)) - t.Errorf("unexpected diff: %s", diff) - } -} - -func TestKey(t *testing.T) { - height := uint64(0xffeeddcc11223344) - hv := []byte{1, 3, 3, 7} - hash := chainhash.DoubleHashH(hv) - key := heightHashToKey(height, hash[:]) - - heightO, hashO := keyToHeightHash(key) - if height != heightO { - t.Fatalf("invalid height wanted %v got %v", height, heightO) - } - if !bytes.Equal(hash[:], hashO) { - t.Fatalf("invalid hash wanted %v got %v", - spew.Sdump(hash), spew.Sdump(hashO)) - } - - t.Logf("height %x", height) - t.Logf("key %v", spew.Sdump(key)) - t.Logf("%v%v", spew.Sdump(hash[:]), spew.Sdump(hashO)) -} - -type ByteSlice [][]byte - -func (x ByteSlice) Len() int { return len(x) } -func (x ByteSlice) Less(i, j int) bool { return bytes.Compare(x[i], x[j]) == -1 } -func (x ByteSlice) Swap(i, j int) { x[i], x[j] = x[j], x[i] } - -func TestKeyOrder(t *testing.T) { - // Create slice in reverse order - count := uint64(10) - keys := make(ByteSlice, count) - for i := range count { - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, i) - hash := chainhash.DoubleHashH(b) - keys[count-1-i] = heightHashToKey(i, hash[:]) - } - log.Debugf("%v", spew.Sdump(keys)) - - // Now sort - sort.Sort(keys) - log.Debugf("%v", spew.Sdump(keys)) - - for i := range count { - height, hash := keyToHeightHash(keys[i]) - if i != height { - t.Fatalf("invalid height wanted %v got %v", i, height) - } - - b := make([]byte, 8) - binary.BigEndian.PutUint64(b, i) - expectedHash := chainhash.DoubleHashH(b) - if !bytes.Equal(expectedHash[:], hash) { - t.Fatalf("invalid hash wanted %x got %x", expectedHash, hash) - } - } -} - -//func TestLevelDB(t *testing.T) { -// // Missing blocks -// // 1 000 000 000 -// -// loggo.ConfigureLoggers("INFO") -// -// dir, err := os.MkdirTemp("", "leveldbtest") -// if err != nil { -// t.Fatal(err) -// } -// defer os.RemoveAll(dir) -// -// ctx := context.Background() -// ldb, err := New(ctx, dir) -// if err != nil { -// t.Fatal(err) -// } -// defer func() { -// err := ldb.Close() -// if err != nil { -// t.Fatalf("close: %v", err) -// } -// }() -// -// // Create fake blockchain somewhat resembling tbc calls -// -// // Insert genesis -// cp := &chaincfg.TestNet3Params -// gbh, err := header2Bytes(&cp.GenesisBlock.Header) -// if err != nil { -// t.Fatal(err) -// } -// -// // Insert genesis -// _, err = ldb.BlockHeadersInsert(ctx, [][80]byte{ -// h2b80(&cp.GenesisBlock.Header), -// }, -// ) -// if err != nil { -// t.Fatalf("block headers insert: %v", err) -// } -// -// missing, err := ldb.BlocksMissing(ctx, 16) -// if err != nil { -// t.Fatalf("block headers missing: %v", err) -// } -// -// if len(missing) != 0 { -// t.Fatal("genesis should not be returned") -// } -// -// // Insert fake block headers -// count := uint64(64) -// bhs := make([][80]byte, 0, count+1) -// bhs = append(bhs, h2b80(&cp.GenesisBlock.Header)) // need genesis for prevhash -// for i := uint64(1); i < count; i++ { -// bits := uint32(i + 4567) -// nonce := uint32(i + 1337) -// // XXX decode and get hash from previous block -// prevHash, err := chainhash.NewHash(bhs[i-1].Hash[:]) -// if err != nil { -// t.Fatalf("prevhash %v", err) -// } -// merkleHash := chainhash.DoubleHashH(prevHash[:]) -// wbh := wire.NewBlockHeader(1, prevHash, &merkleHash, bits, nonce) -// blockHash := wbh.BlockHash() -// t.Logf("height %v prev %v", i, prevHash) -// bhs = append(bhs, h2b80(wbh)) -// //bhs = append(bhs, tbcd.BlockHeader{ -// // Height: i, -// // Hash: database.ByteArray(blockHash[:]), -// // Header: h2b(wbh), -// // // XXX set cumulative difficulty to verify -// //}) -// } -// t.Logf("%v", spew.Sdump(bhs)) -// // Insert missing blocks -// _, err = ldb.BlockHeadersInsert(ctx, bhs[1:]) // skip genesis insert -// if err != nil { -// t.Fatalf("block headers insert: %v", err) -// } -// -// expectedMissingBH := 16 -// missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) -// if err != nil { -// t.Fatalf("block headers missing: %v", err) -// } -// t.Logf("%v", spew.Sdump(missing)) -// -// if len(missing) != min(expectedMissingBH, int(count-1)) { -// t.Fatalf("%v %v %v", len(missing), expectedMissingBH, count) -// } -// -// // Start at height 1 -// height := uint64(1) -// for k := range missing { -// if height != bhs[height].Height { -// t.Fatalf("unexpected internal height wanted %v got %v", -// height, bhs[height].Height) -// } -// if bhs[height].Height != missing[k].Height { -// t.Fatalf("unexpected missing height wanted %v got %v", -// bhs[height].Height, missing[k].Height) -// } -// if !bytes.Equal(bhs[height].Hash, missing[k].Hash) { -// t.Fatalf("unexpected missing hash wanted %v got %v", -// bhs[height].Hash, missing[k].Hash) -// } -// -// height++ -// } -// -// // Insert missing blocks -// for i := uint64(1); i < count; i++ { -// b := tbcd.Block{ -// Hash: bhs[i].Hash, -// Block: []byte{'i', 'a', 'm', 'b', 'l', 'o', 'c', 'k'}, -// } -// insertedHeight, err := ldb.BlockInsert(ctx, &b) -// if err != nil { -// t.Fatal(err) -// } -// log.Infof("inserted height: %v", insertedHeight) -// } -// -// // Ensure blocks missing table is updated -// missing, err = ldb.BlocksMissing(ctx, expectedMissingBH) -// if err != nil { -// t.Fatalf("block headers missing: %v", err) -// } -// if len(missing) != 0 { -// t.Fatalf("expected missing table to be empty: %v", spew.Sdump(missing)) -// } -// if len(ldb.blocksMissingCache) != 0 { -// t.Fatalf("expected missing blocks cache to be empty: %v", -// spew.Sdump(ldb.blocksMissingCache)) -// } -//} - -// func TestBitcoinBits(t *testing.T) { -// // Decode block -// block381 := `01000000c5b9489065fa7e1ac4facc51a5a0ccc2111911609f43386ebe7ca1d200000000a0db3bbb22a2a8441d84dbe335c24959ea3d3d6e91bf67e66bbcb0d7e0a9c4836a834a4dffff001d041813660201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0e046a834a4d017e062f503253482fffffffff0100f2052a01000000232103dac3fb8de40965f42fb4afb3baa07d3304bc2aa28cfc25f12b52f1523681451dac00000000010000001518b50db063333a3261b9b41e887b4aa5b69becdc9967550507c120e22a764967000000004a493046022100e49de3c89180769db346145cdda48323ddecc2af0041293432528767b18407650221009f7878deb054e4f9c0e6aecbe6de15f5d829041c11f7952d33e96c76ada1258b01ffffffff322948a4806acfeca2b32248d0e183c8eb09d5e5ef48adf33777307635414cc0000000004a493046022100ba88d34e4d4fd85ab5e4d77cb74f71c87a24235bcbe39cf4334633f70ff27233022100b5aa1b96bab59457d3d837473de1e4f9f89ba3ee39964463952271c5b4140fa001ffffffffcf330295467623ec1378dc6fa312103ad8a210b3e1351f2f4b6a57ac43fcd472000000004a493046022100b21560dfda52352c4416c1e48496659ea3d29e4e25706a991986864210bc759e0221009c1e45af6e2eba0883a862442d85a2b48c3395e35a4276f535cd70d45a971c7401ffffffffeeed0f4d975db8f66788f809ecf8c351d19ff5805276ef31983bc5682548342d0000000049483045022100e02cc0b4bf8a126807b1577819944c1bb13e8f4028cf7df0a0729013d511b071022010a1bcdefca334588939f9fe40e0d8607588191684fce0f46180a139305b8b4001ffffffffc8ac0a2fb1c01e0e0a5339d296eb072b2b9f9cb1d410a1fdd69a2c797094dda50000000049483045022016ba8f50d7f30be7e4a68c3d50368d577e2ef6c8b60842725ae636b2985776fc022100bb39d47d1955ffca47920d743bcd6f05b31ea2bf3dc7ede225eb4c901126b48901fffffffff1b03cf0680b9ef33fd311f6bbc6db3f1c164f9341f48a02df1905cec4ce241b000000004948304502201dbbfabc47f6da84ceedbc92b792d4a8ef632f0bddf7ebfad5ca21f3731f450502210098751ccf37fd97ff82446486d4c1d62860c2080a1128ea5ddb0d30bfde3cd7a801ffffffff1fe6898ac074a680fe7458ff87a03956db73a880d2ace6539efcc43002bd97ed000000004a493046022100f8a83fadb06af9c0cc730f17ae47fe7a09cada9eae623b8dd86bf365ef0e20480221009a10b073b2a8b313d975f801213efdf12b94141d7b6a8e98de3b0c67ee1cef4c01ffffffff6fd85c0213cfe9863573596a4d5f1509ac41a91b572e6c1bdafe46d9249a5fa4000000004a493046022100f3e98f3e76cc0f533b0e1cccd82650b704e31e3e7e62bf81bb474cf2add58ebf022100f77003eec814a3336cc305b8461cf3ccb19b1f18f06f66208ed31c3e468466ed01ffffffff9e93a056a6515e7916fc049578708d188c2146d3c12638acac92e0b72e076edd000000004a4930460221008ee8d7348aed82a8d074753ab4c8dbdd28a668da821269c4cd0c5c253738cab7022100b06a0208d60af1be6303dd883fd05f964a42f7de317761641ec1158944f52b6b01ffffffff0ecc7b73d8cd5d377d74d836bb6e3473478554a923154286ddaf6985948fd9d300000000494830450221008159ed783bc717ff5a6502cd87a8d8144fae74c6fc6943a5a38da7170203cb3802207e31577a576bc01510cb2280f918a371f63eee44cd2b4490c0994d261787916e01ffffffff78966e9f0a2d4452ab2418249fa6fb1a325a04f039d015899141a82aa5a6c05c000000004847304402206655b13198e413ac8f1aa8926d4617560758cf8b5045afdfc9116da0873ed89802205db55cf3f398467bfc6997f68c881e5f2a7225293ebbd2af40d15df6de4ef87701ffffffff69f2096bbede7015fee2fb307f7d7dd084641b7f4af5c3074dc7b2b6df03277c000000004a493046022100c9199296673a1beae598a6d2348ef13ad1b9f15eebaa825d2282adf017cbb5f0022100b54934e40ff0194a53dcaa9d017c36a93dbb53aa45fe21ab93b07fbb58570d5501ffffffff3c11b146d43fd62ec36b733942a52ba0c352c95a3f078808a38d080898cb83300000000048473044022004c64773b9e6a17cfca7ff583be650104c0538940289b2da8f8bebbd32e486b302200174d8f0938a0f9eeab4c4b137581e032f06d4740e3b0ad9d0423a0a8de65af101ffffffff59ac3c37adfa89b9a907ef9d485c57262e9283e1eb96069c2de04369ef1b3c7600000000494830450220306f3ac72de9dbeb1ec139e4e89cc3b3b9bcb63747bf0e165fcfc773f3669832022100c00a16800f16bf1c71ac6c2989b42d974b0ec2f3e3671325fb2cae52a1c569d801ffffffffb4bbecee818dd986e5ab82f36dbd5ccc29ab134614e304c0a397e14082fe7bb7000000004a493046022100ed68e0303052b41ffd80c1e905cee5547e92422d43b73e473a615e4a47146bb5022100ecab3f92c62477350753b4efea19d608fcce15b1b2c38fbe905e9d1f9ad7631f01ffffffff7546bbac9ae1c8980da6e8c154b368eb4df305b6f3f27ff38f195a13c9ee0484000000004948304502202288566af2b68b6982d1244e293ea3d7c156a425329b7f61b272e4deec317bea022100d9739976b442d35c32830cb2c105e0d7275f7efaa99eaeea4b24a553267a31fc01ffffffffd15854d1e5ba349daf72089f470b24557a2be25105b7831a3f18a62fb8bab677000000004948304502206e3a23075e0248ea8cabc7c875b4cfd9f036c1c4f358a00ec152fc96d1cb6cf8022100d34c018815f63c65f5364061369382b31d579cd6d8a4afe9ec1f03ba66d7717801ffffffffdf686a7f31c2c1de6a608553b26d6336434719fa45428eb3df59bbef75ce9e7e000000004948304502200a22a24a8f817a2f24d3f8c2670f3cb25cd389ce25e0d45eeb0aea08563c5c9802210081ff14edb230a44e5b52e35f573676096a937fc27cc830b153b229b92cac75c101ffffffffd226fea91b99c5a31a034d340f647b722e50950c96a876eb96569efaeaf3b227000000004a4930460221009684e60a7fd61362d0dad79858044aa4a7b878b3f0bd432e384fe4c7e6c90bde0221009883e4f739cffe574bac5bed0a4e69708433973a2490d9415d303614fc31be4701fffffffff640c60ea438dc020048599869836f5323ef47477ee17caddf076ed428898f7100000000494830450220028eb7617dc161a282512c81975d41a1594c05f34cb26fb759682bf784da7071022100a0913abea7229b3c465a4fa32dc861f72ef684e8dd3f19aac5f0f74ea39c03cf01ffffffffd59d2a49b1883c6f7ac68a9d2649dc0dde3f0205e19d8fdaf8065381f9ba61cc000000004a4930460221009f5b27dfd397423a04cab52ee6e8215e290e9666309f0f59f5bc5f6c207d3639022100f5a79133db2cc786140aeee0bf7c8a81adca6071928e8210f1c9f0c653e2f04201ffffffff0240195e29010000001976a914944a7d4b3a8d3a5ecf19dfdfd8dcc18c6f1487dd88acc0c01e49170000001976a91432040178c5cf81cb200ab99af1131f187745b51588ac00000000` -// -// bb, err := hex.DecodeString(block381) -// if err != nil { -// t.Fatal(err) -// } -// // decode -// b, err := btcutil.NewBlockFromBytes(bb) -// if err != nil { -// t.Fatal(err) -// } -// txs := b.Transactions() -// chainParams := &chaincfg.TestNet3Params -// for k := range txs { -// tx := txs[k] -// t.Logf("tx %v %v", tx.Index(), tx.Hash()) -// if blockchain.IsCoinBase(tx) { -// t.Logf("coinbase! %v", spew.Sdump(tx.MsgTx())) -// } -// for kk := range tx.MsgTx().TxOut { -// scriptClass, _, _, err := txscript.ExtractPkScriptAddrs( -// tx.MsgTx().TxOut[kk].PkScript, &chaincfg.TestNet3Params, -// ) -// t.Logf("---- %v", spew.Sdump(scriptClass)) -// -// p, err := txscript.ParsePkScript(tx.MsgTx().TxOut[kk].PkScript) -// if err != nil { -// t.Logf("ERROR: %v %v", kk, err) -// continue -// } else { -// t.Logf("tx %v", spew.Sdump(p)) -// } -// a, err := p.Address(chainParams) -// if err != nil { -// t.Logf("ERROR address: %v %v", kk, err) -// } else { -// t.Logf("tx address %v", spew.Sdump(a)) -// } -// } -// } -// } -// -// func TestDumpIndex(t *testing.T) { -// levelDBHome := "~/.tbcd" -// network := "testnet3" -// -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// // Open existing DB -// db, err := New(ctx, filepath.Join(levelDBHome, network)) -// if err != nil { -// t.Fatal(err) -// } -// defer func() { -// err := db.Close() -// if err != nil { -// t.Fatalf("close: %v", err) -// } -// }() -// -// outsDB := db.pool[level.OutputsDB] -// it := outsDB.NewIterator(nil, nil) -// defer it.Release() -// for it.Next() { -// t.Logf("outputs key %vvalue %v", spew.Sdump(it.Key()), spew.Sdump(it.Value())) -// } -// -// bsDB := db.pool[level.BalancesDB] -// bsIt := bsDB.NewIterator(&util.Range{Start: nil, Limit: nil}, nil) -// defer bsIt.Release() -// for bsIt.Next() { -// t.Logf("balances key %vvalue %v", spew.Sdump(bsIt.Key()), spew.Sdump(bsIt.Value())) -// } -// } -// -// func TestIndex(t *testing.T) { -// // t.Skip() -// -// // start block -// levelDBHome := "~/.tbcd" -// network := "testnet3" -// -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// // Open existing DB -// db, err := New(ctx, filepath.Join(levelDBHome, network)) -// if err != nil { -// t.Fatal(err) -// } -// defer func() { -// err := db.Close() -// if err != nil { -// t.Fatalf("close: %v", err) -// } -// }() -// -// startHeight := uint64(0) -// count := uint64(10) // block 381 is the first to spend transactions -// start := time.Now() -// log.Infof("Starting to index to height %v at %v", startHeight, start) -// elapsed := time.Now() -// for height := startHeight; height < startHeight+count; height++ { -// bhs, err := db.BlockHeadersByHeight(ctx, height) -// if err != nil { -// t.Fatalf("block headers by height %v: %v", height, err) -// } -// t.Logf("%v", bhs) -// _ = elapsed -// //b, err := db.BlockByHash(ctx, bhs[0].Hash) -// //if err != nil { -// // t.Fatalf("block by hash %v: %v", height, err) -// //} -// //bh, btxs, err := tbcd.BlockTxs(&chaincfg.TestNet3Params, b.Block) -// //if err != nil { -// // t.Fatalf("block transactions %v: %v", height, err) -// //} -// //err = db.BlockTxUpdate(ctx, bh[:], btxs) -// //if err != nil { -// // // t.Fatalf("%v", spew.Sdump(btxs)) -// // t.Fatalf("block utxos %v: %v", height, err) -// //} -// //if height%1000 == 0 { -// // log.Infof("height %v %v", height, time.Now().Sub(elapsed)) -// // elapsed = time.Now() -// //} -// } -// log.Infof("Ending index height %v took %v", count, time.Now().Sub(start)) -// } diff --git a/service/tbc/crawler.go b/service/tbc/crawler.go index 544c1b859..f9caaa684 100644 --- a/service/tbc/crawler.go +++ b/service/tbc/crawler.go @@ -13,8 +13,8 @@ import ( "sync" "time" + "github.com/btcsuite/btcd/blockchain" "github.com/btcsuite/btcd/btcutil" - "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/davecgh/go-spew/spew" @@ -27,6 +27,8 @@ import ( var ( UtxoIndexHashKey = []byte("utxoindexhash") // last indexed utxo hash TxIndexHashKey = []byte("txindexhash") // last indexed tx hash + + ErrNotLinear = errors.New("not linear") // not a valid chain ) type HashHeight struct { @@ -66,6 +68,43 @@ func (s *Server) TxIndexHash(ctx context.Context) (*HashHeight, error) { return s.mdHashHeight(ctx, TxIndexHashKey) } +// findCanonicalHash determines which hash is on the canonical chain by walking +// back the chain from the provided end point. It returns the index in bhs of +// the correct hash. On failure it returns -1 DELIBERATELY to crash the caller +// if error is not checked. +func (s *Server) findCanonicalHash(ctx context.Context, endHash *chainhash.Hash, bhs []tbcd.BlockHeader) (int, error) { + switch len(bhs) { + case 1: + return 0, nil // most common fast path + case 0: + return -1, errors.New("no blockheaders provided") + } + + // XXX make sure endHash has higher cumulative difficulty + + // When this happens we have to walk back from endHash to find the + // connecting block. There is no shortcut possible without hitting edge + // conditions. + for k, v := range bhs { + h := endHash + for { + bh, err := s.db.BlockHeaderByHash(ctx, h[:]) + if err != nil { + return -1, fmt.Errorf("block header by hash: %w", err) + } + if h.IsEqual(v.BlockHash()) { + return k, nil + } + if h.IsEqual(s.chainParams.GenesisHash) { + break + } + h = bh.ParentHash() + } + } + + return -1, errors.New("path not found") +} + func logMemStats() { var mem runtime.MemStats runtime.ReadMemStats(&mem) @@ -82,17 +121,16 @@ func logMemStats() { mem.NumGC) } -func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { - for idx, tx := range txs { +func processUtxos(txs []*btcutil.Tx, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { + for _, tx := range txs { for _, txIn := range tx.MsgTx().TxIn { - if idx == 0 { + if blockchain.IsCoinBase(tx) { // Skip coinbase inputs - continue + break } op := tbcd.NewOutpoint(txIn.PreviousOutPoint.Hash, txIn.PreviousOutPoint.Index) if utxo, ok := utxos[op]; ok && !utxo.IsDelete() { - // log.Infof("deleting utxo %s value %d", hex.EncodeToString(utxo.ScriptHashSlice()), utxo.Value()) delete(utxos, op) continue } @@ -101,17 +139,103 @@ func processUtxos(cp *chaincfg.Params, txs []*btcutil.Tx, utxos map[tbcd.Outpoin if txscript.IsUnspendable(txOut.PkScript) { continue } - - // scriptHash := sha256.Sum256(txOut.PkScript) - // log.Infof("adding utxo to script hash %s value %d", hex.EncodeToString(scriptHash[:]), uint64(txOut.Value)) - utxos[tbcd.NewOutpoint(*tx.Hash(), uint32(outIndex))] = tbcd.NewCacheOutput( sha256.Sum256(txOut.PkScript), uint64(txOut.Value), uint32(outIndex)) } } - // log.Infof("%v", spew.Sdump(utxos)) + return nil +} + +func (s *Server) scriptValue(ctx context.Context, op tbcd.Outpoint) ([]byte, int64, error) { + txId := op.TxId() + txIndex := op.TxIndex() + opHash, err := chainhash.NewHash(txId) + if err != nil { + return nil, 0, fmt.Errorf("new hash: %w", err) + } + + // Find block hashes + blockHashes, err := s.db.BlocksByTxId(ctx, txId[:]) + if err != nil { + return nil, 0, fmt.Errorf("blocks by txid: %w", err) + } + // Note that we may have more than one block hash however since the + // TxID is generated from the actual Tx the script hash and value + // should be identical and thus we can return the values from the first + // block found. + if len(blockHashes) == 0 { + return nil, 0, errors.New("script value: no block hashes") + } + blk, err := s.db.BlockByHash(ctx, blockHashes[0][:]) + if err != nil { + return nil, 0, fmt.Errorf("block by hash: %w", err) + } + b, err := btcutil.NewBlockFromBytes(blk.Block) + if err != nil { + return nil, 0, fmt.Errorf("new block: %w", err) + } + for _, tx := range b.Transactions() { + if !tx.Hash().IsEqual(opHash) { + continue + } + txOuts := tx.MsgTx().TxOut + if len(txOuts) < int(txIndex) { + return nil, 0, fmt.Errorf("tx index invalid: %v", op) + } + tx := txOuts[txIndex] + return tx.PkScript, tx.Value, nil + } + + return nil, 0, fmt.Errorf("tx id not found: %v", op) +} + +func (s *Server) unprocessUtxos(ctx context.Context, txs []*btcutil.Tx, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { + // Walk backwards through the txs + for idx := len(txs) - 1; idx >= 0; idx-- { + tx := txs[idx] + // TxIn get data from disk and insert into the cache as insert + for _, txIn := range tx.MsgTx().TxIn { + if blockchain.IsCoinBase(tx) { + // Skip coinbase inputs + break + } + + op := tbcd.NewOutpoint(txIn.PreviousOutPoint.Hash, + txIn.PreviousOutPoint.Index) + pkScript, value, err := s.scriptValue(ctx, op) + if err != nil { + return fmt.Errorf("script value: %v", err) + } + // XXX this should not happen. We are keeping it for + // now to ensure it indeed does not happen. Remove in a + // couple of years. + if _, ok := utxos[op]; ok { + return fmt.Errorf("impossible collision: %v", op) + } + utxos[op] = tbcd.NewCacheOutput(sha256.Sum256(pkScript), + uint64(value), txIn.PreviousOutPoint.Index) + } + + // TxOut if those are in the cache delete from cache; if they + // are not in the cache insert "delete from disk command" into + // cache. + for outIndex, txOut := range tx.MsgTx().TxOut { + if txscript.IsUnspendable(txOut.PkScript) { + continue + } + + op := tbcd.NewOutpoint(*tx.Hash(), uint32(outIndex)) + if _, ok := utxos[op]; ok { + delete(utxos, op) + } else { + utxos[op] = tbcd.NewDeleteCacheOutput(sha256.Sum256(txOut.PkScript), + op.TxIndex()) + } + } + } + return nil } @@ -134,13 +258,13 @@ func (s *Server) fetchOP(ctx context.Context, w *sync.WaitGroup, op tbcd.Outpoin func (s *Server) fixupCache(ctx context.Context, b *btcutil.Block, utxos map[tbcd.Outpoint]tbcd.CacheOutput) error { w := new(sync.WaitGroup) - txs := b.Transactions() - for idx, tx := range txs { + for _, tx := range b.Transactions() { for _, txIn := range tx.MsgTx().TxIn { - if idx == 0 { + if blockchain.IsCoinBase(tx) { // Skip coinbase inputs - continue + break } + op := tbcd.NewOutpoint(txIn.PreviousOutPoint.Hash, txIn.PreviousOutPoint.Index) s.mtx.Lock() @@ -214,7 +338,7 @@ func (s *Server) indexUtxosInBlocks(ctx context.Context, endHash *chainhash.Hash // At this point we can lockless since it is all single // threaded again. // log.Infof("processing utxo at height %d", height) - err = processUtxos(s.chainParams, b.Transactions(), utxos) + err = processUtxos(b.Transactions(), utxos) if err != nil { return 0, last, fmt.Errorf("process utxos %v: %w", hh, err) } @@ -251,56 +375,178 @@ func (s *Server) indexUtxosInBlocks(ctx context.Context, endHash *chainhash.Hash return 0, last, fmt.Errorf("block headers by height %v: %w", height, err) } - if len(bhs) > 1 { - panic("FIXME handle multiple block headers") // XXX: Handle correctly + index, err := s.findCanonicalHash(ctx, endHash, bhs) + if err != nil { + return 0, last, fmt.Errorf("could not determine canonical path %v: %w", + height, err) } // Verify it connects to parent - if !hash.IsEqual(bhs[0].ParentHash()) { + if !hash.IsEqual(bhs[index].ParentHash()) { return 0, last, fmt.Errorf("%v does not connect to: %v", - bhs[0], hash) + bhs[index], hash) } - hh.Hash = *bhs[0].BlockHash() - hh.Height = bhs[0].Height + hh.Hash = *bhs[index].BlockHash() + hh.Height = bhs[index].Height } return blocksProcessed, last, nil } -func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error { - log.Tracef("UtxoIndexer") - defer log.Tracef("UtxoIndexer exit") +// unindexUtxosInBlocks unindexes utxos from the last processed block until the +// provided end hash, exclusive. It returns the number of blocks processed and +// the last hash it has processedd. +// Note that by walking backwards the terminal condition MUST BE EXCLUSIVE!! +func (s *Server) unindexUtxosInBlocks(ctx context.Context, endHash *chainhash.Hash, utxos map[tbcd.Outpoint]tbcd.CacheOutput) (int, *HashHeight, error) { + log.Tracef("unindexUtxoBlocks") + defer log.Tracef("unindexUtxoBlocks exit") - // Verify exit condition hash - if endHash == nil { - return errors.New("must provide an end hash") - } - _, endHeight, err := s.BlockHeaderByHash(ctx, endHash) - if err != nil { - return fmt.Errorf("blockheader hash: %w", err) - } + // indicates if we have processed endHash and thus have hit the exit + // condition. + var last *HashHeight - // Verify start point is not after the end point + // Find start hash utxoHH, err := s.UtxoIndexHash(ctx) if err != nil { if !errors.Is(err, database.ErrNotFound) { - return fmt.Errorf("utxo indexer : %w", err) + return 0, last, fmt.Errorf("utxo index hash: %w", err) } utxoHH = &HashHeight{ Hash: *s.chainParams.GenesisHash, Height: 0, } } - // XXX we need training wheels here. We can't blind accept the end - // without asserting if it is either ihigher in the chain or is a - // forced for. - // XXX check cumulative? check fork? + + utxosPercentage := 95 // flush cache at >95% capacity + blocksProcessed := 0 + hh := utxoHH + for { + log.Debugf("unindexing utxos: %v", hh) + + hash := hh.Hash + bh, err := s.db.BlockHeaderByHash(ctx, hash[:]) + if err != nil { + return 0, last, fmt.Errorf("block header %v: %w", hash, err) + } + + // Exit if we processed the provided end hash + if endHash.IsEqual(&hash) { + last = hh + break + } + + // Index block + eb, err := s.db.BlockByHash(ctx, bh.Hash) + if err != nil { + return 0, last, fmt.Errorf("block by hash %v: %w", bh, err) + } + b, err := btcutil.NewBlockFromBytes(eb.Block) + if err != nil { + return 0, last, fmt.Errorf("could not decode block %v: %w", hh, err) + } + + err = s.unprocessUtxos(ctx, b.Transactions(), utxos) + if err != nil { + return 0, last, fmt.Errorf("process utxos %v: %w", hh, err) + } + + blocksProcessed++ + + // Try not to overshoot the cache to prevent costly allocations + cp := len(utxos) * 100 / s.cfg.MaxCachedTxs + if bh.Height%10000 == 0 || cp > utxosPercentage || blocksProcessed == 1 { + log.Infof("UTxo unindexer: %v utxo cache %v%%", hh, cp) + } + if cp > utxosPercentage { + // Set txsMax to the largest tx capacity seen + s.cfg.MaxCachedTxs = max(len(utxos), s.cfg.MaxCachedTxs) + last = hh + // Flush + break + } + + // Move to previous block + height := bh.Height - 1 + pbh, err := s.db.BlockHeaderByHash(ctx, bh.ParentHash()[:]) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + log.Infof("No more blocks at: %v", height) + break + } + return 0, last, fmt.Errorf("block headers by height %v: %w", + height, err) + } + hh.Hash = *pbh.BlockHash() + hh.Height = pbh.Height + } + + return blocksProcessed, last, nil +} + +func (s *Server) UtxoIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { + log.Tracef("UtxoIndexerUnwind") + defer log.Tracef("UtxoIndexerUnwind exit") + + // XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped // Allocate here so that we don't waste space when not indexing. utxos := make(map[tbcd.Outpoint]tbcd.CacheOutput, s.cfg.MaxCachedTxs) defer clear(utxos) - log.Infof("Start indexing UTxos at hash %v height %v", utxoHH.Hash, utxoHH.Height) - log.Infof("End indexing UTxos at hash %v height %v", endHash, endHeight) + log.Infof("Start unwinding UTxos at hash %v height %v", startBH, startBH.Height) + log.Infof("End unwinding UTxos at hash %v height %v", endBH, endBH.Height) + endHash := endBH.BlockHash() + for { + start := time.Now() + blocksProcessed, last, err := s.unindexUtxosInBlocks(ctx, endHash, utxos) + if err != nil { + return fmt.Errorf("unindex utxos in blocks: %w", err) + } + if blocksProcessed == 0 { + return nil + } + utxosCached := len(utxos) + log.Infof("UTxo unwinder blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v", + blocksProcessed, time.Since(start), utxosCached, + s.cfg.MaxCachedTxs-utxosCached, utxosCached/blocksProcessed) + + // Flush to disk + start = time.Now() + if err = s.db.BlockUtxoUpdate(ctx, -1, utxos); err != nil { + return fmt.Errorf("block utxo update: %w", err) + } + // leveldb does all kinds of allocations, force GC to lower + // memory preassure. + logMemStats() + runtime.GC() + + log.Infof("Flushing unwind utxos complete %v took %v", + utxosCached, time.Since(start)) + + // Record height in metadata + err = s.db.MetadataPut(ctx, UtxoIndexHashKey, last.Hash[:]) + if err != nil { + return fmt.Errorf("metadata utxo hash: %w", err) + } + + if endHash.IsEqual(&last.Hash) { + break + } + } + + return nil +} + +func (s *Server) UtxoIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { + log.Tracef("UtxoIndexerWind") + defer log.Tracef("UtxoIndexerWind exit") + + // Allocate here so that we don't waste space when not indexing. + utxos := make(map[tbcd.Outpoint]tbcd.CacheOutput, s.cfg.MaxCachedTxs) + defer clear(utxos) + + log.Infof("Start indexing UTxos at hash %v height %v", startBH, startBH.Height) + log.Infof("End indexing UTxos at hash %v height %v", endBH, endBH.Height) + endHash := endBH.BlockHash() for { start := time.Now() blocksProcessed, last, err := s.indexUtxosInBlocks(ctx, endHash, utxos) @@ -317,7 +563,7 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error // Flush to disk start = time.Now() - if err = s.db.BlockUtxoUpdate(ctx, utxos); err != nil { + if err = s.db.BlockUtxoUpdate(ctx, 1, utxos); err != nil { return fmt.Errorf("block tx update: %w", err) } // leveldb does all kinds of allocations, force GC to lower @@ -342,12 +588,65 @@ func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error return nil } -func processTxs(cp *chaincfg.Params, blockHash *chainhash.Hash, txs []*btcutil.Tx, txsCache map[tbcd.TxKey]*tbcd.TxValue) error { +func (s *Server) UtxoIndexer(ctx context.Context, endHash *chainhash.Hash) error { + log.Tracef("UtxoIndexer") + defer log.Tracef("UtxoIndexer exit") + + // XXX this is basically duplicate from UtxoIndexIsLinear + + // Verify exit condition hash + if endHash == nil { + return errors.New("must provide an end hash") + } + endBH, err := s.db.BlockHeaderByHash(ctx, endHash[:]) + if err != nil { + return fmt.Errorf("blockheader hash: %w", err) + } + + // Verify start point is not after the end point + utxoHH, err := s.UtxoIndexHash(ctx) + if err != nil { + if !errors.Is(err, database.ErrNotFound) { + return fmt.Errorf("utxo indexer : %w", err) + } + utxoHH = &HashHeight{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + } + + // XXX make sure there is no gap between start and end or vice versa. + startBH, err := s.db.BlockHeaderByHash(ctx, utxoHH.Hash[:]) + if err != nil { + return fmt.Errorf("blockheader hash: %w", err) + } + direction, err := s.UtxoIndexIsLinear(ctx, endHash) + if err != nil { + return fmt.Errorf("TxIndexIsLinear: %w", err) + } + switch direction { + case 1: + return s.UtxoIndexerWind(ctx, startBH, endBH) + case -1: + return s.UtxoIndexerUnwind(ctx, startBH, endBH) + case 0: + // Because we call TxIndexIsLinear we know it's the same block. + return nil + } + return fmt.Errorf("invalid direction: %v", direction) +} + +func processTxs(blockHash *chainhash.Hash, txs []*btcutil.Tx, txsCache map[tbcd.TxKey]*tbcd.TxValue) error { for _, tx := range txs { // cache txid <-> block txsCache[tbcd.NewTxMapping(tx.Hash(), blockHash)] = nil - // cache spent transactions + // Don't keep track of spent coinbase inputs + if blockchain.IsCoinBase(tx) { + // Skip coinbase inputs + continue + } + for txInIdx, txIn := range tx.MsgTx().TxIn { txk, txv := tbcd.NewTxSpent( blockHash, @@ -406,7 +705,7 @@ func (s *Server) indexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, return 0, last, fmt.Errorf("could not decode block %v: %w", hh, err) } - err = processTxs(s.chainParams, b.Hash(), b.Transactions(), txs) + err = processTxs(b.Hash(), b.Transactions(), txs) if err != nil { return 0, last, fmt.Errorf("process txs %v: %w", hh, err) } @@ -443,39 +742,39 @@ func (s *Server) indexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, return 0, last, fmt.Errorf("block headers by height %v: %w", height, err) } - if len(bhs) > 1 { - panic("FIXME handle multiple block headers") // XXX: Handle correctly + index, err := s.findCanonicalHash(ctx, endHash, bhs) + if err != nil { + return 0, last, fmt.Errorf("could not determine canonical path %v: %w", + height, err) } // Verify it connects to parent - if !hash.IsEqual(bhs[0].ParentHash()) { + if !hash.IsEqual(bhs[index].ParentHash()) { return 0, last, fmt.Errorf("%v does not connect to: %v", - bhs[0], hash) + bhs[index], hash) } - hh.Hash = *bhs[0].BlockHash() - hh.Height = bhs[0].Height + hh.Hash = *bhs[index].BlockHash() + hh.Height = bhs[index].Height } return blocksProcessed, last, nil } -func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { - log.Tracef("TxIndexer") - defer log.Tracef("TxIndexer exit") +// unindexTxsInBlocks indexes txs from the last processed block until the +// provided end hash, inclusive. It returns the number of blocks processed and +// the last hash it has processedd. +func (s *Server) unindexTxsInBlocks(ctx context.Context, endHash *chainhash.Hash, txs map[tbcd.TxKey]*tbcd.TxValue) (int, *HashHeight, error) { + log.Tracef("unindexTxsInBlocks") + defer log.Tracef("unindexTxsInBlocks exit") - // Verify exit condition hash - if endHash == nil { - return errors.New("must provide an end hash") - } - _, endHeight, err := s.BlockHeaderByHash(ctx, endHash) - if err != nil { - return fmt.Errorf("blockheader hash: %w", err) - } + // indicates if we have processed endHash and thus have hit the exit + // condition. + var last *HashHeight - // Verify start point is not after the end point + // Find start hash txHH, err := s.TxIndexHash(ctx) if err != nil { if !errors.Is(err, database.ErrNotFound) { - return fmt.Errorf("tx indexer : %w", err) + return 0, last, fmt.Errorf("tx index hash: %w", err) } txHH = &HashHeight{ Hash: *s.chainParams.GenesisHash, @@ -483,17 +782,138 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { } } - // XXX we need training wheels here. We can't blind accept the end - // without asserting if it is either ihigher in the chain or is a - // forced for. - // XXX check cumulative? check fork? + txsPercentage := 95 // flush cache at >95% capacity + blocksProcessed := 0 + hh := txHH + for { + log.Debugf("unindexing txs: %v", hh) + + hash := hh.Hash + + // Exit if we processed the provided end hash + if endHash.IsEqual(&hash) { + last = hh + break + } + + bh, err := s.db.BlockHeaderByHash(ctx, hash[:]) + if err != nil { + return 0, last, fmt.Errorf("block header %v: %w", hash, err) + } + + // Index block + eb, err := s.db.BlockByHash(ctx, bh.Hash) + if err != nil { + return 0, last, fmt.Errorf("block by hash %v: %w", bh, err) + } + b, err := btcutil.NewBlockFromBytes(eb.Block) + if err != nil { + return 0, last, fmt.Errorf("could not decode block %v: %w", hh, err) + } + + err = processTxs(b.Hash(), b.Transactions(), txs) + if err != nil { + return 0, last, fmt.Errorf("process txs %v: %w", hh, err) + } + + blocksProcessed++ + + // Try not to overshoot the cache to prevent costly allocations + cp := len(txs) * 100 / s.cfg.MaxCachedTxs + if bh.Height%10000 == 0 || cp > txsPercentage || blocksProcessed == 1 { + log.Infof("Tx unindexer: %v tx cache %v%%", hh, cp) + } + if cp > txsPercentage { + // Set txsMax to the largest tx capacity seen + s.cfg.MaxCachedTxs = max(len(txs), s.cfg.MaxCachedTxs) + last = hh + // Flush + break + } + + // Move to previous block + height := bh.Height - 1 + pbh, err := s.db.BlockHeaderByHash(ctx, bh.ParentHash()[:]) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + log.Infof("No more blocks at: %v", height) + break + } + return 0, last, fmt.Errorf("block headers by height %v: %w", + height, err) + } + hh.Hash = *pbh.BlockHash() + hh.Height = pbh.Height + } + + return blocksProcessed, last, nil +} + +func (s *Server) TxIndexerUnwind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { + log.Tracef("TxIndexerUnwind") + defer log.Tracef("TxIndexerUnwind exit") + + // XXX dedup with TxIndexedWind; it's basically the same code but with the direction, start anf endhas flipped // Allocate here so that we don't waste space when not indexing. txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs) defer clear(txs) - log.Infof("Start indexing Txs at hash %v height %v", txHH.Hash, txHH.Height) - log.Infof("End indexing Txs at hash %v height %v", endHash, endHeight) + log.Infof("Start unwinding Txs at hash %v height %v", startBH, startBH.Height) + log.Infof("End unwinding Txs at hash %v height %v", endBH, endBH.Height) + endHash := endBH.BlockHash() + for { + start := time.Now() + blocksProcessed, last, err := s.unindexTxsInBlocks(ctx, endHash, txs) + if err != nil { + return fmt.Errorf("unindex txs in blocks: %w", err) + } + if blocksProcessed == 0 { + return nil + } + txsCached := len(txs) + log.Infof("Tx unwinder blocks processed %v in %v transactions cached %v cache unused %v avg tx/blk %v", + blocksProcessed, time.Since(start), txsCached, + s.cfg.MaxCachedTxs-txsCached, txsCached/blocksProcessed) + + // Flush to disk + start = time.Now() + if err = s.db.BlockTxUpdate(ctx, -1, txs); err != nil { + return fmt.Errorf("block tx update: %w", err) + } + // leveldb does all kinds of allocations, force GC to lower + // memory preassure. + logMemStats() + runtime.GC() + + log.Infof("Flushing unwind txs complete %v took %v", + txsCached, time.Since(start)) + + // Record height in metadata + err = s.db.MetadataPut(ctx, TxIndexHashKey, last.Hash[:]) + if err != nil { + return fmt.Errorf("metadata tx hash: %w", err) + } + + if endHash.IsEqual(&last.Hash) { + break + } + + } + return nil +} + +func (s *Server) TxIndexerWind(ctx context.Context, startBH, endBH *tbcd.BlockHeader) error { + log.Tracef("TxIndexerWind") + defer log.Tracef("TxIndexerWind exit") + + // Allocate here so that we don't waste space when not indexing. + txs := make(map[tbcd.TxKey]*tbcd.TxValue, s.cfg.MaxCachedTxs) + defer clear(txs) + + log.Infof("Start indexing Txs at hash %v height %v", startBH, startBH.Height) + log.Infof("End indexing Txs at hash %v height %v", endBH, endBH.Height) + endHash := endBH.BlockHash() for { start := time.Now() blocksProcessed, last, err := s.indexTxsInBlocks(ctx, endHash, txs) @@ -510,7 +930,7 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { // Flush to disk start = time.Now() - if err = s.db.BlockTxUpdate(ctx, txs); err != nil { + if err = s.db.BlockTxUpdate(ctx, 1, txs); err != nil { return fmt.Errorf("block tx update: %w", err) } // leveldb does all kinds of allocations, force GC to lower @@ -536,8 +956,158 @@ func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { return nil } +func (s *Server) TxIndexer(ctx context.Context, endHash *chainhash.Hash) error { + log.Tracef("TxIndexer") + defer log.Tracef("TxIndexer exit") + + // XXX this is basically duplicate from TxIndexIsLinear + + // Verify exit condition hash + if endHash == nil { + return errors.New("must provide an end hash") + } + endBH, err := s.db.BlockHeaderByHash(ctx, endHash[:]) + if err != nil { + return fmt.Errorf("blockheader hash: %w", err) + } + + // Verify start point is not after the end point + txHH, err := s.TxIndexHash(ctx) + if err != nil { + if !errors.Is(err, database.ErrNotFound) { + return fmt.Errorf("tx indexer : %w", err) + } + txHH = &HashHeight{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + } + + // Make sure there is no gap between start and end or vice versa. + startBH, err := s.db.BlockHeaderByHash(ctx, txHH.Hash[:]) + if err != nil { + return fmt.Errorf("blockheader hash: %w", err) + } + direction, err := s.TxIndexIsLinear(ctx, endHash) + if err != nil { + return fmt.Errorf("TxIndexIsLinear: %w", err) + } + switch direction { + case 1: + return s.TxIndexerWind(ctx, startBH, endBH) + case -1: + return s.TxIndexerUnwind(ctx, startBH, endBH) + case 0: + // Because we call TxIndexIsLinear we know it's the same block. + return nil + } + + return fmt.Errorf("invalid direction: %v", direction) +} + +func (s *Server) UtxoIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (int, error) { + log.Tracef("UtxoIndexIsLinear") + defer log.Tracef("UtxoIndexIsLinear exit") + + // Verify start point is not after the end point + utxoHH, err := s.UtxoIndexHash(ctx) + if err != nil { + if !errors.Is(err, database.ErrNotFound) { + return 0, fmt.Errorf("tx indexer : %w", err) + } + utxoHH = &HashHeight{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + } + + return s.IndexIsLinear(ctx, &utxoHH.Hash, endHash) +} + +func (s *Server) TxIndexIsLinear(ctx context.Context, endHash *chainhash.Hash) (int, error) { + log.Tracef("TxIndexIsLinear") + defer log.Tracef("TxIndexIsLinear exit") + + // Verify start point is not after the end point + txHH, err := s.TxIndexHash(ctx) + if err != nil { + if !errors.Is(err, database.ErrNotFound) { + return 0, fmt.Errorf("tx indexer : %w", err) + } + txHH = &HashHeight{ + Hash: *s.chainParams.GenesisHash, + Height: 0, + } + } + + return s.IndexIsLinear(ctx, &txHH.Hash, endHash) +} + +func (s *Server) IndexIsLinear(ctx context.Context, startHash, endHash *chainhash.Hash) (int, error) { + log.Tracef("IndexIsLinear") + defer log.Tracef("IndexIsLinear exit") + + // Verify exit condition hash + if endHash == nil { + return 0, errors.New("must provide an end hash") + } + endBH, err := s.db.BlockHeaderByHash(ctx, endHash[:]) + if err != nil { + return 0, fmt.Errorf("blockheader hash: %w", err) + } + + // Make sure there is no gap between start and end or vice versa. + startBH, err := s.db.BlockHeaderByHash(ctx, startHash[:]) + if err != nil { + return 0, fmt.Errorf("blockheader hash: %w", err) + } + // Short circuit if the block hash is the same. + if startBH.BlockHash().IsEqual(endBH.BlockHash()) { + return 0, nil + } + + direction := endBH.Difficulty.Cmp(&startBH.Difficulty) + log.Debugf("startBH %v %v", startBH, startBH.Difficulty) + log.Debugf("endBH %v %v", endBH, endBH.Difficulty) + log.Debugf("direction %v", direction) + + // Expensive linear test, this needs some performance love. We can + // memoize it keep snapshot heights whereto we know the chain is + // synced. For now just do the entire thing. + + // Always walk backwards because it's only a single lookup. + var h, e *chainhash.Hash + switch direction { + case 1: + h = endBH.BlockHash() + e = startBH.BlockHash() + case -1: + h = startBH.BlockHash() + e = endBH.BlockHash() + default: + // This is a fork and thus not linear. + return 0, ErrNotLinear + } + for { + bh, err := s.db.BlockHeaderByHash(ctx, h[:]) + if err != nil { + return -1, fmt.Errorf("block header by hash: %w", err) + } + h = bh.ParentHash() + if h.IsEqual(e) { + return direction, nil + } + if h.IsEqual(s.chainParams.GenesisHash) { + return direction, ErrNotLinear + } + } +} + // SyncIndexersToHash tries to move the various indexers to the supplied // height (inclusive). +// Note: on unwind it means that it WILL unwind the the various indexers +// including the hash that was passed in. E.g. if this unwinds from 1001 to +// 1000 the indexes for block 1000 WILL be updated as well. func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) error { log.Tracef("SyncIndexersToHash") defer log.Tracef("SyncIndexersToHash exit") @@ -555,7 +1125,6 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e s.mtx.Lock() s.quiesced = false s.indexing = false - // s.clipped = false actualHeight, bhb, err := s.RawBlockHeaderBest(ctx) if err != nil { log.Errorf("sync indexers best: %v", err) @@ -574,6 +1143,7 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e // XXX explain why we need to get more headers here // continue getting headers, XXX this does not belong here either // XXX if bh download fails we will get jammed. We need a queued "must execute this command" added to peer/service. + // XXX we may not want to do this when in special "driver mode" log.Infof("resuming block header download at: %v", actualHeight) if err = s.getHeaders(ctx, p, bhb); err != nil { log.Errorf("sync indexers: %v", err) @@ -582,6 +1152,8 @@ func (s *Server) SyncIndexersToHash(ctx context.Context, hash *chainhash.Hash) e }() log.Debugf("Syncing indexes to: %v", hash) + + // Utxos if err := s.UtxoIndexer(ctx, hash); err != nil { return fmt.Errorf("utxo indexer: %w", err) } diff --git a/service/tbc/rpc.go b/service/tbc/rpc.go index 469073d08..d4e62d94a 100644 --- a/service/tbc/rpc.go +++ b/service/tbc/rpc.go @@ -16,6 +16,7 @@ import ( "sync" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/wire" "github.com/davecgh/go-spew/spew" "nhooyr.io/websocket" @@ -357,14 +358,15 @@ func (s *Server) handleTxByIdRawRequest(ctx context.Context, req *tbcapi.TxByIdR log.Tracef("handleTxByIdRawRequest") defer log.Tracef("handleTxByIdRawRequest exit") - if len(req.TxId) != 32 { + txId, err := chainhash.NewHash(req.TxId) + if err != nil { responseErr := protocol.RequestErrorf("invalid tx id") return &tbcapi.TxByIdRawResponse{ Error: responseErr, }, nil } - tx, err := s.TxById(ctx, [32]byte(req.TxId)) + tx, err := s.TxById(ctx, txId) if err != nil { if errors.Is(err, database.ErrNotFound) { responseErr := protocol.RequestErrorf("tx not found: %s", req.TxId) @@ -396,14 +398,15 @@ func (s *Server) handleTxByIdRequest(ctx context.Context, req *tbcapi.TxByIdRequ log.Tracef("handleTxByIdRequest") defer log.Tracef("handleTxByIdRequest exit") - if len(req.TxId) != 32 { + txId, err := chainhash.NewHash(req.TxId) + if err != nil { responseErr := protocol.RequestErrorf("invalid tx id") return &tbcapi.TxByIdResponse{ Error: responseErr, }, nil } - tx, err := s.TxById(ctx, [32]byte(reverseBytes(req.TxId))) + tx, err := s.TxById(ctx, txId) if err != nil { if errors.Is(err, database.ErrNotFound) { responseErr := protocol.RequestErrorf("tx not found: %s", req.TxId) @@ -563,6 +566,7 @@ func wireTxToTBC(w *wire.MsgTx) *tbcapi.Tx { return tx } +// XXX this probably should not exist, it means the code is busted instead func reverseBytes(b []byte) []byte { slices.Reverse(b) return b diff --git a/service/tbc/rpc_test.go b/service/tbc/rpc_test.go index 0ef163d9d..b0c094fc3 100644 --- a/service/tbc/rpc_test.go +++ b/service/tbc/rpc_test.go @@ -16,6 +16,7 @@ import ( "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/davecgh/go-spew/spew" "github.com/docker/go-connections/nat" "github.com/go-test/deep" @@ -27,7 +28,6 @@ import ( "github.com/hemilabs/heminetwork/api/protocol" "github.com/hemilabs/heminetwork/api/tbcapi" "github.com/hemilabs/heminetwork/bitcoin" - "github.com/hemilabs/heminetwork/database/tbcd" ) func TestBlockHeadersByHeightRaw(t *testing.T) { @@ -1417,14 +1417,13 @@ func TestTxById(t *testing.T) { indexAll(ctx, t, tbcServer) lastErr = nil - txId := getRandomTxId(ctx, t, bitcoindContainer) - txIdBytes, err := hex.DecodeString(txId) + txId, err := chainhash.NewHashFromStr(getRandomTxId(ctx, t, bitcoindContainer)) if err != nil { t.Fatal(err) } err = tbcapi.Write(ctx, tws.conn, "someid", tbcapi.TxByIdRequest{ - TxId: txIdBytes, + TxId: txId[:], }) if err != nil { lastErr = err @@ -1447,7 +1446,7 @@ func TestTxById(t *testing.T) { t.Fatal(response.Error.Message) } - tx, err := tbcServer.TxById(ctx, tbcd.TxId(reverseBytes(txIdBytes))) + tx, err := tbcServer.TxById(ctx, txId) if err != nil { t.Fatal(err) } diff --git a/service/tbc/tbc.go b/service/tbc/tbc.go index da8f92221..1479548b0 100644 --- a/service/tbc/tbc.go +++ b/service/tbc/tbc.go @@ -269,6 +269,7 @@ func NewServer(cfg *Config) (*Server, error) { } // DB exports the underlying database. This should only be used in tests. +// XXX remove this and deal with the fallout. func (s *Server) DB() tbcd.Database { return s.db } @@ -335,9 +336,7 @@ func (s *Server) seed(pctx context.Context, peersWanted int) ([]tbcd.Peer, error } // insert into peers table - for _, ms := range moreSeeds { - peers = append(peers, ms) - } + peers = append(peers, moreSeeds...) // return fake peers but don't save them to the database return peers, nil @@ -371,11 +370,15 @@ func (s *Server) seedForever(ctx context.Context, peersWanted int) ([]tbcd.Peer, } } -func (s *Server) peerAdd(p *peer) { +func (s *Server) peerAdd(p *peer) error { log.Tracef("peerAdd: %v", p.address) s.mtx.Lock() + defer s.mtx.Unlock() + if _, ok := s.peers[p.address]; ok { + return fmt.Errorf("peer exists: %v", p) + } s.peers[p.address] = p - s.mtx.Unlock() + return nil } func (s *Server) peerDelete(address string) { @@ -432,7 +435,10 @@ func (s *Server) peerManager(ctx context.Context) error { log.Errorf("new peer: %v", err) continue } - s.peerAdd(peer) + if err := s.peerAdd(peer); err != nil { + log.Tracef("add peer: %v", err) + continue + } go s.peerConnect(ctx, peerC, peer) @@ -497,7 +503,9 @@ func (s *Server) localPeerManager(ctx context.Context) error { log.Infof("Local peer manager connecting to %v peers", peersWanted) for { - s.peerAdd(peer) + if err := s.peerAdd(peer); err != nil { + return err + } go s.peerConnect(ctx, peerC, peer) select { @@ -623,35 +631,41 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { } }() - _ = p.write(defaultCmdTimeout, wire.NewMsgSendHeaders()) // Ask peer to send headers - _ = p.write(defaultCmdTimeout, wire.NewMsgGetAddr()) // Try to get network information - - log.Debugf("Peer connected: %v", p) + // Ask peer to send headers + err = p.write(defaultCmdTimeout, wire.NewMsgSendHeaders()) + if err != nil { + log.Errorf("peer write send headers: %v %v", p, err) + return + } + // Try to get network information + err = p.write(defaultCmdTimeout, wire.NewMsgGetAddr()) + if err != nil { + log.Errorf("peer write get addr: %v %v", p, err) + return + } - // Pretend we are always in IBD. - // - // This obviously will put a pressure on the internet connection and - // database because each and every peer is racing at start of day. As - // multiple answers come in the insert of the headers fails or - // succeeds. If it fails no more headers will be requested from that - // peer. + // Ask peer for block headers and special handle the first message. + // XXX explain bhb, err := s.db.BlockHeaderBest(ctx) if err != nil { - log.Errorf("block headers best: %v", err) + log.Errorf("block headers best: %v %v", p, err) // database is closed, nothing we can do, return here to avoid below // panic if errors.Is(err, leveldb.ErrClosed) { return } } - log.Debugf("block header best hash: %s", bhb.Hash) - + log.Debugf("block header best hash: %v %s", p, bhb) if err = s.getHeaders(ctx, p, bhb.Header); err != nil { // This should not happen - log.Errorf("get headers: %v", err) + log.Errorf("get headers: %v %v", p, err) return } + // Only now can we consider the peer connected + log.Debugf("Peer connected: %v", p) + + headersSeen := false verbose := false for { // See if we were interrupted, for the love of pete add ctx to wire @@ -670,8 +684,40 @@ func (s *Server) peerConnect(ctx context.Context, peerC chan string, p *peer) { return } + // We must check the initial get headers response. If we asked + // for an unknown tip we'll get genesis back. This indicates + // that our tip is forked, + // XXX this needs to be cleaned up; maybe moved into handshake + if !headersSeen { + switch m := msg.(type) { + case *wire.MsgHeaders: + if len(m.Headers) != 0 { + h0 := m.Headers[0].PrevBlock + if !bhb.BlockHash().IsEqual(&h0) && + s.chainParams.GenesisHash.IsEqual(&h0) { + log.Debugf("%v", bhb.BlockHash()) + log.Debugf("%v", h0) + + nbh, err := s.db.BlockHeaderByHash(ctx, bhb.ParentHash()[:]) + if err != nil { + panic(err) // XXX + } + bhb = nbh + log.Infof("Fork detected, walking chain back to: %v", bhb) + if err = s.getHeaders(ctx, p, bhb.Header); err != nil { + panic(err) // XXX this needs to be a log and exit + // return + } + continue + } + _ = m + headersSeen = true + } + } + } + if verbose { - spew.Sdump(msg) + log.Infof("%v: %v", p, spew.Sdump(msg)) } // Commands that are always accepted. @@ -755,15 +801,6 @@ func (s *Server) blksMissing(ctx context.Context) bool { return len(bm) > 0 } -// blocksMissing checks the block cache and the database and returns true if all -// blocks have not been downloaded. -func (s *Server) blocksMissing(ctx context.Context) bool { - s.mtx.Lock() - defer s.mtx.Unlock() - - return s.blksMissing(ctx) -} - func (s *Server) handleAddr(ctx context.Context, p *peer, msg *wire.MsgAddr) { log.Tracef("handleAddr (%v): %v", p, len(msg.AddrList)) defer log.Tracef("handleAddr exit (%v)", p) @@ -1339,6 +1376,9 @@ func (s *Server) BlockHeaderBest(ctx context.Context) (uint64, *wire.BlockHeader } func (s *Server) BalanceByAddress(ctx context.Context, encodedAddress string) (uint64, error) { + log.Tracef("BalanceByAddress") + defer log.Tracef("BalanceByAddress exit") + addr, err := btcutil.DecodeAddress(encodedAddress, s.chainParams) if err != nil { return 0, err @@ -1358,6 +1398,9 @@ func (s *Server) BalanceByAddress(ctx context.Context, encodedAddress string) (u } func (s *Server) UtxosByAddress(ctx context.Context, encodedAddress string, start uint64, count uint64) ([]tbcd.Utxo, error) { + log.Tracef("UtxosByAddress") + defer log.Tracef("UtxosByAddress exit") + addr, err := btcutil.DecodeAddress(encodedAddress, s.chainParams) if err != nil { return nil, err @@ -1377,12 +1420,39 @@ func (s *Server) UtxosByAddress(ctx context.Context, encodedAddress string, star return utxos, nil } -func (s *Server) TxById(ctx context.Context, txId tbcd.TxId) (*wire.MsgTx, error) { - blockHashes, err := s.db.BlocksByTxId(ctx, txId) +func (s *Server) SpentOutputsByTxId(ctx context.Context, txId *chainhash.Hash) ([]tbcd.SpentInfo, error) { + log.Tracef("SpentOutputsByTxId") + defer log.Tracef("SpentOutputsByTxId exit") + + // XXX investigate if this is indeed correct. As it is written now it + // returns all spent outputs. The db should always be canonical but + // assert that. + + si, err := s.db.SpentOutputsByTxId(ctx, txId[:]) + if err != nil { + return nil, err + } + + return si, nil +} + +func (s *Server) TxById(ctx context.Context, txId *chainhash.Hash) (*wire.MsgTx, error) { + log.Tracef("TxById") + defer log.Tracef("TxById exit") + + blockHashes, err := s.db.BlocksByTxId(ctx, txId[:]) if err != nil { return nil, err } + if len(blockHashes) > 1 { + panic("fix me blockhashes len") + } + + // XXX investigate if this is indeed correct. As it is written now it + // returns the first block the tx exists in. This however must be the + // canonical block. This function must also return the blockhash. + // chain hash stores the bytes in reverse order revTxId := bytes.Clone(txId[:]) slices.Reverse(revTxId) @@ -1535,6 +1605,7 @@ func (s *Server) Synced(ctx context.Context) SyncInfo { // DBOpen opens the underlying server database. It has been put in its own // function to make it available during tests and hemictl. // It would be good if it can be deleted. +// XXX remove and find a different way to do this. func (s *Server) DBOpen(ctx context.Context) error { log.Tracef("DBOpen") defer log.Tracef("DBOpen exit") @@ -1558,6 +1629,7 @@ func (s *Server) DBOpen(ctx context.Context) error { return nil } +// XXX remove and find a different way to do this. func (s *Server) DBClose() error { log.Tracef("DBClose") defer log.Tracef("DBClose") @@ -1616,8 +1688,8 @@ func (s *Server) Run(pctx context.Context) error { return err } } - log.Infof("Starting block headers sync at height: %v time %v", - bhb.Height, bhb.Timestamp()) + log.Infof("Starting block headers sync at %v height: %v time %v", + bhb, bhb.Height, bhb.Timestamp()) // HTTP server mux := http.NewServeMux() diff --git a/service/tbc/tbc_test.go b/service/tbc/tbc_test.go index d4f143c8c..804c97c58 100644 --- a/service/tbc/tbc_test.go +++ b/service/tbc/tbc_test.go @@ -80,6 +80,9 @@ func TestBlockHeaderEncodeDecode(t *testing.T) { t.Error(err) } awbh, err := bytes2Header(ash[:]) + if err != nil { + t.Errorf("bytes2Header failed: %v", err) + } if diff := deep.Equal(&gwbh, awbh); len(diff) > 0 { t.Errorf("unexpected diff: %s", diff) } diff --git a/service/tbc/tbcfork_test.go b/service/tbc/tbcfork_test.go index 9b3c5da50..81d439c41 100644 --- a/service/tbc/tbcfork_test.go +++ b/service/tbc/tbcfork_test.go @@ -5,6 +5,7 @@ package tbc import ( + "bytes" "context" "crypto/rand" "encoding/binary" @@ -27,40 +28,291 @@ import ( "github.com/juju/loggo" "github.com/hemilabs/heminetwork/api/tbcapi" + "github.com/hemilabs/heminetwork/database/tbcd" ) +type block struct { + name string + b *btcutil.Block + + txs map[tbcd.TxKey]*tbcd.TxValue // Parsed Txs in cache format +} + +func newBlock(params *chaincfg.Params, name string, b *btcutil.Block) *block { + blk := &block{ + name: name, + b: b, + txs: make(map[tbcd.TxKey]*tbcd.TxValue, 10), + } + err := processTxs(b.Hash(), b.Transactions(), blk.txs) + if err != nil { + panic(fmt.Errorf("processTxs: %v", err)) + } + + return blk +} + +func (b block) Hash() *chainhash.Hash { + return b.b.Hash() +} + +func (b block) Height() int32 { + return b.b.Height() +} + +func (b block) MsgBlock() *wire.MsgBlock { + return b.b.MsgBlock() +} + +func (b block) TxByIndex(index int) *btcutil.Tx { + tx, err := b.b.Tx(index) + if err != nil { + panic(err) + } + return tx +} + +func (b block) String() string { + return fmt.Sprintf("%v: %v %v", b.name, b.Height(), b.Hash()) +} + +type namedKey struct { + name string + key *btcec.PrivateKey +} + type btcNode struct { - t *testing.T + t *testing.T // for logging + le bool // log enable port string p *peer mtx sync.RWMutex - chain map[string]*btcutil.Block - blocksAtHeight map[int32][]*btcutil.Block + chain map[string]*block + blocksAtHeight map[int32][]*block height int32 params *chaincfg.Params + genesis *block + gtx *btcutil.Tx // for printing and diagnostics + private *btcec.PrivateKey + public *btcec.PublicKey + address *btcutil.AddressPubKeyHash + + keys map[string]*namedKey // keys used to sign various tx' + + listener net.Listener } func newFakeNode(t *testing.T, port string) (*btcNode, error) { + genesis := btcutil.NewBlock(chaincfg.RegressionNetParams.GenesisBlock) + genesis.SetHeight(0) node := &btcNode{ t: t, + le: false, port: port, - chain: make(map[string]*btcutil.Block, 10), - blocksAtHeight: make(map[int32][]*btcutil.Block, 10), + chain: make(map[string]*block, 10), + blocksAtHeight: make(map[int32][]*block, 10), height: 0, params: &chaincfg.RegressionNetParams, + keys: make(map[string]*namedKey, 10), } - genesis := btcutil.NewBlock(chaincfg.RegressionNetParams.GenesisBlock) - genesis.SetHeight(0) - // node.chain[chaincfg.RegressionNetParams.GenesisHash.String()] = genesis - _, err := node.insertBlock(genesis) + // Add miner key to key pool + var err error + node.private, node.public, node.address, err = node.newKey("miner") + if err != nil { + return nil, err + } + node.genesis = newBlock(node.params, "genesis", genesis) + _, err = node.insertBlock(node.genesis) + if err != nil { + return nil, err + } + node.gtx, err = node.genesis.b.Tx(0) if err != nil { return nil, err } + t.Logf("genesis") + t.Logf(" block: %v", node.genesis.Hash()) + t.Logf(" tx : %v", node.gtx.Hash()) + t.Logf("") + t.Logf("miner keys") + t.Logf(" private: %x", node.private.Serialize()) + t.Logf(" public : %x", node.public.SerializeCompressed()) + t.Logf(" address: %v", node.address) + return node, nil } +// lookupKey is used by the sign function. +// Must be called locked. +func (b *btcNode) lookupKey(a btcutil.Address) (*btcec.PrivateKey, bool, error) { + nk, ok := b.keys[a.String()] + if !ok { + return nil, false, fmt.Errorf("key not found: %v", a.String()) + } + return nk.key, true, nil +} + +// newKey creates and inserts a new key into thw lookup table. +// Must be called locked +func (b *btcNode) newKey(name string) (*btcec.PrivateKey, *btcec.PublicKey, *btcutil.AddressPubKeyHash, error) { + privateKey, err := btcec.NewPrivateKey() + if err != nil { + return nil, nil, nil, err + } + publicKey := privateKey.PubKey() + address, err := btcutil.NewAddressPubKeyHash(btcutil.Hash160(publicKey.SerializeCompressed()), b.params) + if err != nil { + return nil, nil, nil, err + } + + // Add to lookup + b.keys[address.String()] = &namedKey{name: name, key: privateKey} + + return privateKey, publicKey, address, nil +} + +// func (b *btcNode) verifyAllKeyBalances(ctx context.Context, s *Server) error { +// // Verify the balances +// for address, key := range b.keys { +// balance, err := s.BalanceByAddress(ctx, address) +// if err != nil { +// return fmt.Errorf("balance by address: %w", err) +// } +// // addressBalance := b.keyBalance(address) +// addressBalance := btcutil.Amount(0) +// panic("fixme") +// if addressBalance != btcutil.Amount(balance) { +// return fmt.Errorf("%v (%v): balance invalid wanted %v, got %v", +// address, key.name, addressBalance, btcutil.Amount(balance)) +// } +// // Verify utxos add up to balance +// utxos, err := s.UtxosByAddress(ctx, address, 0, 100) +// if err != nil { +// return fmt.Errorf("utxos by address: %w", err) +// } +// total := uint64(0) +// for _, utxo := range utxos { +// total += utxo.Value() +// } +// if addressBalance != btcutil.Amount(total) { +// return fmt.Errorf("%v: utxo balance invalid wanted %v, got %v", +// address, addressBalance, btcutil.Amount(total)) +// } +// } +// return nil +// } + +func (b *btcNode) newSignedTxFromTx(name string, inTx *btcutil.Tx, amount btcutil.Amount) (*btcutil.Tx, error) { + utxos := inTx.MsgTx().TxOut + redeemTx := wire.NewMsgTx(wire.TxVersion) + inHash := inTx.Hash() + + total, err := btcutil.NewAmount(0) + if err != nil { + return nil, err + } + for _, txOut := range utxos { + total += btcutil.Amount(txOut.Value) + } + if amount > total { + return nil, fmt.Errorf("can't fund %v, got %v", amount, total) + } + + // create new key to redeem + redeemPrivate, redeemPublic, redeemAddress, err := b.newKey(name) + if err != nil { + return nil, err + } + pkScript, err := txscript.PayToAddrScript(redeemAddress) + if err != nil { + return nil, err + } + b.t.Logf("rdeeem pkScript: %x", pkScript) + b.t.Logf("redeem keys:") + b.t.Logf(" private : %x", redeemPrivate.Serialize()) + b.t.Logf(" public : %x", redeemPublic.SerializeCompressed()) + b.t.Logf(" address : %v", redeemAddress) + + // find enough utxos to cover amount + left := amount + prevOuts := make(map[string][]byte, len(utxos)) + for i, txOut := range utxos { + prevOut := wire.NewOutPoint(inHash, uint32(i)) + txIn := wire.NewTxIn(prevOut, nil, nil) + redeemTx.AddTxIn(txIn) + prevOuts[prevOut.String()] = txOut.PkScript + value := btcutil.Amount(txOut.Value) // amount to send + + // extract txout script address to subtract value + sc, as, sigs, err := txscript.ExtractPkScriptAddrs(txOut.PkScript, b.params) + if err != nil { + return nil, err + } + _ = sc + _ = sigs + + // only support one address for now + if len(as) != 1 { + return nil, fmt.Errorf("only 1 address suported in pkSctipt got %v", len(as)) + } + + // b.t.Logf("left %v value %v", left, value) + if left > value { + redeemTx.AddTxOut(wire.NewTxOut(int64(value), pkScript)) + left -= value + continue + } + // Remaining bits + redeemTx.AddTxOut(wire.NewTxOut(int64(left), pkScript)) + + change := value - left + if change != 0 { + payToAddress := as[0] + changeScript, err := txscript.PayToAddrScript(payToAddress) + if err != nil { + return nil, err + } + txOutChange := wire.NewTxOut(int64(change), changeScript) + redeemTx.AddTxOut(txOutChange) + } + break + } + for i, txIn := range redeemTx.TxIn { + prevPkScript, ok := prevOuts[txIn.PreviousOutPoint.String()] + if !ok { + panic("xx") + } + sigScript, err := txscript.SignTxOutput(b.params, redeemTx, i, + prevPkScript, txscript.SigHashAll, + txscript.KeyClosure(b.lookupKey), nil, nil) + if err != nil { + return nil, err + } + redeemTx.TxIn[i].SignatureScript = sigScript + } + + flags := txscript.ScriptBip16 | txscript.ScriptVerifyDERSignatures | + txscript.ScriptStrictMultiSig | txscript.ScriptDiscourageUpgradableNops + vm, err := txscript.NewEngine(utxos[0].PkScript, redeemTx, 0, flags, nil, nil, -1, nil) + if err != nil { + return nil, err + } + if err := vm.Execute(); err != nil { + return nil, err + } + + return btcutil.NewTx(redeemTx), nil +} + +func (b *btcNode) logf(format string, args ...any) { + if !b.le { + return + } + b.t.Logf(format, args...) +} + func (b *btcNode) handleGetHeaders(m *wire.MsgGetHeaders) (*wire.MsgHeaders, error) { b.mtx.Lock() defer b.mtx.Unlock() @@ -77,22 +329,30 @@ func (b *btcNode) handleGetHeaders(m *wire.MsgGetHeaders) (*wire.MsgHeaders, err nmh := wire.NewMsgHeaders() height := from.Height() + 1 - b.t.Logf("start from %v", height) + b.logf("start from %v", height) for range 2000 { bs, ok := b.blocksAtHeight[height] if !ok { - b.t.Logf("no more blocks at: %v", height) + b.logf("no more blocks at: %v", height) return nmh, nil } - if len(bs) != 1 { - return nil, fmt.Errorf("fork at height: %v", height) + var parentBlock *block + for _, v := range bs { + if from.Hash().IsEqual(v.Hash()) { + continue + } + parentBlock = v + break + } + if parentBlock == nil { + return nil, fmt.Errorf("no parent at: %v", height) } - err := nmh.AddBlockHeader(&bs[0].MsgBlock().Header) + err := nmh.AddBlockHeader(&parentBlock.MsgBlock().Header) if err != nil { return nil, fmt.Errorf("add header: %w", err) } - b.t.Logf("%v: %v", height, bs[0].MsgBlock().Header.BlockHash()) + b.logf("%v: %v", height, parentBlock.MsgBlock().Header.BlockHash()) height++ } @@ -103,7 +363,7 @@ func (b *btcNode) handleGetData(m *wire.MsgGetData) (*wire.MsgBlock, error) { b.mtx.Lock() defer b.mtx.Unlock() - // b.t.Logf("get data: %v", spew.Sdump(m)) + // b.logf("get data: %v", spew.Sdump(m)) if len(m.InvList) != 1 { return nil, errors.New("not supported multi invlist requests") } @@ -113,18 +373,21 @@ func (b *btcNode) handleGetData(m *wire.MsgGetData) (*wire.MsgBlock, error) { return nil, fmt.Errorf("unsuported data type: %v", v.Type) } - block, ok := b.chain[v.Hash.String()] + blk, ok := b.chain[v.Hash.String()] if !ok { return nil, fmt.Errorf("block not found: %v", v.Hash) } - return block.MsgBlock(), nil + return blk.b.MsgBlock(), nil } -func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) { +func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) error { b.t.Logf("handleRPC %v", conn.RemoteAddr()) defer b.t.Logf("handleRPC exit %v", conn.RemoteAddr()) + b.logf("handleRPC %v", conn.RemoteAddr()) + defer b.logf("handleRPC exit %v", conn.RemoteAddr()) + p := &peer{ conn: conn, connected: time.Now(), @@ -138,8 +401,8 @@ func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) { ProtocolVersion: int32(wire.AddrV2Version), } if err := p.write(time.Second, mv); err != nil { - b.t.Logf("write version %v: %v", p, err) - return + b.logf("write version %v: %v", p, err) + return err } b.mtx.Lock() @@ -149,7 +412,7 @@ func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) { for { select { case <-ctx.Done(): - return + return ctx.Err() default: } @@ -160,13 +423,11 @@ func (b *btcNode) handleRPC(ctx context.Context, conn net.Conn) { b.t.Log("wire: unknown message") continue } - b.t.Logf("peer read %v: %v", p, err) - return + return fmt.Errorf("peer read %v: %w", p, err) } if err = b.handleMsg(ctx, p, msg); err != nil { - b.t.Logf("handle message %v: %v", p, err) - return + return fmt.Errorf("handle message %v: %w", p, err) } } } @@ -180,29 +441,29 @@ func (b *btcNode) handleMsg(ctx context.Context, p *peer, msg wire.Message) erro } case *wire.MsgGetHeaders: - // b.t.Logf("get headers %v", spew.Sdump(m)) + // b.logf("get headers %v", spew.Sdump(m)) headers, err := b.handleGetHeaders(m) if err != nil { return fmt.Errorf("handle get headers: %w", err) } - // b.t.Logf("%v", spew.Sdump(headers)) + // b.logf("%v", spew.Sdump(headers)) if err = p.write(time.Second, headers); err != nil { return fmt.Errorf("write headers: %w", err) } case *wire.MsgGetData: - // b.t.Logf("get data %v", spew.Sdump(m)) + // b.logf("get data %v", spew.Sdump(m)) data, err := b.handleGetData(m) if err != nil { return fmt.Errorf("handle get data: %w", err) } - // b.t.Logf("%v", spew.Sdump(data)) + // b.logf("%v", spew.Sdump(data)) if err = p.write(time.Second, data); err != nil { return fmt.Errorf("write data: %w", err) } default: - b.t.Logf("unhandled command: %v", spew.Sdump(msg)) + b.logf("unhandled command: %v", spew.Sdump(msg)) } return nil @@ -219,21 +480,21 @@ func (b *btcNode) dumpChain(parent *chainhash.Hash) error { defer b.mtx.Unlock() for { - block, ok := b.chain[parent.String()] + blk, ok := b.chain[parent.String()] if !ok { return fmt.Errorf("parent not found: %v", parent) } - b.t.Logf("%v: %v", block.Height(), block.Hash()) + b.t.Logf("%v", blk) - bh := block.MsgBlock().Header + bh := blk.MsgBlock().Header parent = &bh.PrevBlock - if block.Height() == 0 { + if blk.Height() == 0 { return nil } } } -func newBlockTemplate(params *chaincfg.Params, payToAddress btcutil.Address, nextBlockHeight int32, parent *chainhash.Hash, extraNonce uint64) (*btcutil.Block, error) { +func newBlockTemplate(params *chaincfg.Params, payToAddress btcutil.Address, nextBlockHeight int32, parent *chainhash.Hash, extraNonce uint64, mempool []*btcutil.Tx) (*btcutil.Block, error) { coinbaseScript, err := standardCoinbaseScript(nextBlockHeight, extraNonce) if err != nil { return nil, err @@ -243,12 +504,15 @@ func newBlockTemplate(params *chaincfg.Params, payToAddress btcutil.Address, nex if err != nil { return nil, err } + log.Infof("coinbase tx %v: %v", nextBlockHeight, coinbaseTx.Hash()) reqDifficulty := uint32(0x1d00ffff) // XXX var blockTxs []*btcutil.Tx blockTxs = append(blockTxs, coinbaseTx) - + if mempool != nil { + blockTxs = append(blockTxs, mempool...) + } msgBlock := &wire.MsgBlock{ Header: wire.BlockHeader{ Version: int32(vbTopBits), @@ -269,11 +533,11 @@ func newBlockTemplate(params *chaincfg.Params, payToAddress btcutil.Address, nex return b, nil } -func (b *btcNode) insertBlock(block *btcutil.Block) (int, error) { - b.chain[block.Hash().String()] = block - bAtHeight := b.blocksAtHeight[block.Height()] - b.blocksAtHeight[block.Height()] = append(bAtHeight, block) - return len(b.blocksAtHeight[block.Height()]), nil +func (b *btcNode) insertBlock(blk *block) (int, error) { + b.chain[blk.Hash().String()] = blk + bAtHeight := b.blocksAtHeight[blk.Height()] + b.blocksAtHeight[blk.Height()] = append(bAtHeight, blk) + return len(b.blocksAtHeight[blk.Height()]), nil } func (b *btcNode) blockHeadersAtHeight(height int32) ([]*wire.BlockHeader, error) { @@ -320,45 +584,153 @@ func random(count int) []byte { return b } -func (b *btcNode) Mine(count int, from *chainhash.Hash, payToAddress btcutil.Address) ([]*btcutil.Block, error) { - b.mtx.Lock() - defer b.mtx.Unlock() +type addressToKey struct { + key *btcec.PrivateKey + compressed bool +} + +func mkGetKey(keys map[string]addressToKey) txscript.KeyDB { + if keys == nil { + return txscript.KeyClosure(func(addr btcutil.Address) (*btcec.PrivateKey, + bool, error, + ) { + return nil, false, errors.New("nope") + }) + } + return txscript.KeyClosure(func(addr btcutil.Address) (*btcec.PrivateKey, + bool, error, + ) { + a2k, ok := keys[addr.EncodeAddress()] + if !ok { + return nil, false, errors.New("nope") + } + return a2k.key, a2k.compressed, nil + }) +} + +func mkGetScript(scripts map[string][]byte) txscript.ScriptDB { + if scripts == nil { + return txscript.ScriptClosure(func(addr btcutil.Address) ([]byte, error) { + return nil, errors.New("nope") + }) + } + return txscript.ScriptClosure(func(addr btcutil.Address) ([]byte, error) { + script, ok := scripts[addr.EncodeAddress()] + if !ok { + return nil, errors.New("nope") + } + return script, nil + }) +} +func (b *btcNode) mine(name string, from *chainhash.Hash, payToAddress btcutil.Address) (*block, error) { parent, ok := b.chain[from.String()] if !ok { return nil, errors.New("parent hash not found") } - - blocks := make([]*btcutil.Block, 0, count) - for range count { - // extra nonce is needed to prevent block collisions - en := random(8) - extraNonce := binary.BigEndian.Uint64(en) - - nextBlockHeight := parent.Height() + 1 - block, err := newBlockTemplate(b.params, payToAddress, nextBlockHeight, - parent.Hash(), extraNonce) + // extra nonce is needed to prevent block collisions + en := random(8) + extraNonce := binary.BigEndian.Uint64(en) + var mempool []*btcutil.Tx + + nextBlockHeight := parent.Height() + 1 + switch nextBlockHeight { + case 2: + // spend block 1 coinbase + tx, err := b.newSignedTxFromTx(name, parent.TxByIndex(0), 3000000000) + if err != nil { + return nil, fmt.Errorf("new tx from tx: %w", err) + } + b.t.Logf("tx %v: %v spent from %v", nextBlockHeight, tx.Hash(), + tx.MsgTx().TxIn[0].PreviousOutPoint) + mempool = []*btcutil.Tx{tx} + case 3: + // spend block 2 transaction 1 + tx, err := b.newSignedTxFromTx(name+":0", parent.TxByIndex(1), 1100000000) if err != nil { - return nil, fmt.Errorf("height %v: %w", nextBlockHeight, err) + return nil, fmt.Errorf("new tx from tx: %w", err) } - blocks = append(blocks, block) - b.t.Logf("mined %v: %v", nextBlockHeight, block.Hash()) + b.t.Logf("tx %v: %v spent from %v", nextBlockHeight, tx.Hash(), + tx.MsgTx().TxIn[0].PreviousOutPoint) + mempool = []*btcutil.Tx{tx} - n, err := b.insertBlock(block) + // spend above tx in same block + tx2, err := b.newSignedTxFromTx(name+":1", tx, 3000000000) if err != nil { - return nil, fmt.Errorf("insert block at height %v: %v", - nextBlockHeight, err) + return nil, fmt.Errorf("new tx from tx: %w", err) } - if n != 1 { - b.t.Logf("fork at: %v blocks %v", nextBlockHeight, n) + b.t.Logf("tx %v: %v spent from %v", nextBlockHeight, tx2.Hash(), + tx2.MsgTx().TxIn[0].PreviousOutPoint) + mempool = []*btcutil.Tx{tx, tx2} + } + + bt, err := newBlockTemplate(b.params, payToAddress, nextBlockHeight, + parent.Hash(), extraNonce, mempool) + if err != nil { + return nil, fmt.Errorf("height %v: %w", nextBlockHeight, err) + } + blk := newBlock(b.params, name, bt) + _, err = b.insertBlock(blk) + if err != nil { + return nil, fmt.Errorf("insert block at height %v: %v", + nextBlockHeight, err) + } + // XXX this really sucks, we should get rid of height as a best indicator + if blk.Height() > b.height { + b.height = blk.Height() + } + + return blk, nil +} + +func (b *btcNode) mineN(count int, from *chainhash.Hash, payToAddress btcutil.Address) ([]*block, error) { + parent, ok := b.chain[from.String()] + if !ok { + return nil, errors.New("parent hash not found") + } + + blocks := make([]*block, 0, count) + for range count { + nextBlockHeight := parent.Height() + 1 + blk, err := b.mine(fmt.Sprintf("b%v", nextBlockHeight), parent.Hash(), payToAddress) + if err != nil { + return nil, err } - parent = block - b.height = nextBlockHeight + blocks = append(blocks, blk) + parent = blk } return blocks, nil } +func (b *btcNode) Mine(name string, parent *chainhash.Hash, payToAddress btcutil.Address) (*block, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + return b.mine(name, parent, payToAddress) +} + +func (b *btcNode) MineN(count int, from *chainhash.Hash, payToAddress btcutil.Address) ([]*block, error) { + b.mtx.Lock() + defer b.mtx.Unlock() + return b.mineN(count, from, payToAddress) +} + +func (b *btcNode) MineAndSend(ctx context.Context, name string, parent *chainhash.Hash, payToAddress btcutil.Address) (*block, error) { + blk, err := b.Mine(name, parent, payToAddress) + if err != nil { + return nil, err + } + + err = b.SendBlockheader(ctx, blk.MsgBlock().Header) + if err != nil { + return nil, err + } + + time.Sleep(1000 * time.Millisecond) + + return blk, nil +} + func (b *btcNode) Run(ctx context.Context) error { lc := &net.ListenConfig{} l, err := lc.Listen(ctx, "tcp", "localhost:"+b.port) @@ -366,145 +738,130 @@ func (b *btcNode) Run(ctx context.Context) error { return err } - for { - b.t.Logf("waiting for connection") - conn, err := l.Accept() - if err != nil { - return err - } - go b.handleRPC(ctx, conn) + b.logf("waiting for connection") + conn, err := l.Accept() + if err != nil { + return err + } + + b.listener = l + + return b.handleRPC(ctx, conn) +} + +func (b *btcNode) Stop() error { + b.mtx.Lock() + p := b.p + b.p = nil + b.mtx.Unlock() + if p == nil { + return nil } + + if err := p.conn.Close(); err != nil { + return err + } + + return b.listener.Close() } -func newPKAddress(params *chaincfg.Params) (*btcec.PrivateKey, *btcutil.AddressPubKey, error) { +func newPKAddress(params *chaincfg.Params) (*btcec.PrivateKey, *btcec.PublicKey, *btcutil.AddressPubKeyHash, error) { key, err := btcec.NewPrivateKey() if err != nil { - return nil, nil, err + return nil, nil, nil, err } pk := key.PubKey().SerializeUncompressed() - address, err := btcutil.NewAddressPubKey(pk, params) + // address, err := btcutil.NewAddressPubKey(pk, params) + address, err := btcutil.NewAddressPubKeyHash(btcutil.Hash160(pk), params) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return key, address, nil + return key, key.PubKey(), address, nil } -// XXX: Fix and re-enable test. -// func TestBasic(t *testing.T) { -// t.Skip() -// -// ctx, cancel := context.WithCancel(context.Background()) -// defer cancel() -// -// key, address, err := newPKAddress(&chaincfg.RegressionNetParams) -// if err != nil { -// t.Fatal(err) -// } -// t.Logf("key : %v", key) -// t.Logf("address: %v", address) -// -// n, err := newFakeNode(t, "18444") // TODO: should use random free port -// if err != nil { -// t.Fatal(err) -// } -// -// go func() { -// if err := n.Run(ctx); err != nil { -// panic(fmt.Errorf("node exited with error: %w", err)) -// } -// }() -// -// startHash := n.Best() -// count := 9 -// expectedHeight := uint64(count) -// -// if _, err = n.Mine(count, startHash[0], address); err != nil { -// t.Fatal(fmt.Errorf("mine: %w", err)) -// } -// -// if err = n.dumpChain(n.Best()[0]); err != nil { -// t.Fatal(fmt.Errorf("dump chain: %w", err)) -// } -// // t.Logf("%v", spew.Sdump(n.chain[n.Best()[0].String()])) -// time.Sleep(1 * time.Second) // XXX -// -// // Connect tbc service -// cfg := &Config{ -// AutoIndex: true, // XXX for now -// BlockSanity: false, -// LevelDBHome: t.TempDir(), -// ListenAddress: tbcapi.DefaultListen, // TODO: should use random free port -// // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", -// MaxCachedTxs: 1000, // XXX -// Network: networkLocalnet, -// PrometheusListenAddress: "", -// } -// _ = loggo.ConfigureLoggers(cfg.LogLevel) -// s, err := NewServer(cfg) -// if err != nil { -// t.Fatal(err) -// } -// s.ignoreUlimit = true -// go func() { -// err := s.Run(ctx) -// if err != nil && !errors.Is(err, context.Canceled) { -// panic(err) -// } -// }() -// -// for { -// select { -// case <-ctx.Done(): -// return -// case <-time.After(time.Second): -// } -// -// // See if we are synced -// si := s.Synced(ctx) -// if !(si.Synced && si.BlockHeaderHeight == expectedHeight) { -// log.Infof("not synced") -// continue -// } -// -// // Execute tests -// balance, err := s.BalanceByAddress(ctx, address.String()) -// if err != nil { -// t.Fatal(err) -// } -// // TODO: magic numbers should be extract into constants -// if balance != uint64(count*5000000000) { -// t.Fatalf("balance got %v wanted %v", balance, count*5000000000) -// } -// t.Logf("balance %v", spew.Sdump(balance)) -// -// utxos, err := s.UtxosByAddress(ctx, address.String(), 0, 100) -// if err != nil { -// t.Fatal(err) -// } -// t.Logf("%v", spew.Sdump(utxos)) -// return -// } -// } +func mustHave(ctx context.Context, s *Server, blocks ...*block) error { + for _, b := range blocks { + _, height, err := s.BlockHeaderByHash(ctx, b.Hash()) + if err != nil { + return err + } + if height != uint64(b.Height()) { + return fmt.Errorf("%v != %v", height, uint64(b.Height())) + } + + log.Infof("mustHave: %v", b.Hash()) + // Verify Txs cache + for ktx, vtx := range b.txs { + switch ktx[0] { + case 's': + // grab previous outpoint from the key + tx, err := chainhash.NewHash(ktx[1:33]) + if err != nil { + return fmt.Errorf("invalid tx hash: %w", err) + } + sis, err := s.SpentOutputsByTxId(ctx, tx) + if err != nil { + return fmt.Errorf("invalid spend infos: %w", err) + } + found := false + for _, si := range sis { + if !bytes.Equal(b.Hash()[:], si.BlockHash[:]) { + continue + } + found = true + break + } + if !found { + log.Infof("tx hash: %v", tx) + log.Infof("ktx: %v", spew.Sdump(ktx)) + log.Infof("vtx: %v", spew.Sdump(vtx)) + log.Infof(spew.Sdump(sis)) + return errors.New("block mismatch") + } + + case 't': + txId, blockHash, err := tbcd.TxIdBlockHashFromTxKey(ktx) + if err != nil { + return fmt.Errorf("invalid tx key: %w", err) + } + _, err = s.TxById(ctx, txId.Hash()) + if err != nil { + return fmt.Errorf("tx by id: %w", err) + } + // db block retrieval tested by TxById + if !b.Hash().IsEqual(blockHash.Hash()) { + return errors.New("t cache block hash invalid") + } + default: + return fmt.Errorf("invalid tx type %v", ktx[0]) + } + } + } + + return nil +} func TestFork(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - key, address, err := newPKAddress(&chaincfg.RegressionNetParams) - if err != nil { - t.Fatal(err) - } - t.Logf("key : %v", key) - t.Logf("address: %v", address) + defer func() { + cancel() + }() n, err := newFakeNode(t, "18444") // TODO: should use random free port if err != nil { t.Fatal(err) } + // n.le = true + defer func() { + err := n.Stop() + if err != nil { + t.Logf("node stop: %v", err) + } + }() go func() { - if err := n.Run(ctx); err != nil { + if err := n.Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, context.Canceled) { panic(err) } }() @@ -512,7 +869,8 @@ func TestFork(t *testing.T) { startHash := n.Best() count := 9 expectedHeight := uint64(count) - _, err = n.Mine(count, startHash[0], address) + address := n.address + _, err = n.MineN(count, startHash[0], address) if err != nil { t.Fatal(err) } @@ -542,6 +900,8 @@ func TestFork(t *testing.T) { } s.ignoreUlimit = true go func() { + log.Infof("s run") + defer log.Infof("s run done") err := s.Run(ctx) if err != nil && !errors.Is(err, context.Canceled) { panic(err) @@ -592,66 +952,44 @@ func TestFork(t *testing.T) { // Advance both heads b9 := n.Best()[0] - b10a, err := n.Mine(1, b9, address) + b10a, err := n.MineAndSend(ctx, "b10a", b9, address) if err != nil { t.Fatal(err) } - b10b, err := n.Mine(1, b9, address) + b10b, err := n.MineAndSend(ctx, "b10b", b9, address) if err != nil { t.Fatal(err) } - t.Logf("b10a: %v", b10a[0].Hash()) - t.Logf("b10b: %v", b10b[0].Hash()) + // XXX check hashes + time.Sleep(50 * time.Millisecond) + t.Logf("b10a: %v", b10a.Hash()) + t.Logf("b10b: %v", b10b.Hash()) b10s := n.Best() if len(b10s) != 2 { t.Fatalf("expected 2 best blocks, got %v", len(b10s)) } - // Tell tbcd - err = n.SendBlockheader(ctx, b10a[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - err = n.SendBlockheader(ctx, b10b[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - // XXX check hashes - time.Sleep(500 * time.Millisecond) - // Advance both heads again - b10aHash := b10a[0].MsgBlock().Header.BlockHash() - b11a, err := n.Mine(1, &b10aHash, address) + b11a, err := n.MineAndSend(ctx, "b11a", b10a.Hash(), address) if err != nil { t.Fatal(err) } - b10bHash := b10b[0].MsgBlock().Header.BlockHash() - b11b, err := n.Mine(1, &b10bHash, address) + b11b, err := n.MineAndSend(ctx, "b11b", b10b.Hash(), address) if err != nil { t.Fatal(err) } - t.Logf("b11a: %v", b11a[0].Hash()) - t.Logf("b11b: %v", b11b[0].Hash()) + t.Logf("b11a: %v", b11a.Hash()) + t.Logf("b11b: %v", b11b.Hash()) b11s := n.Best() if len(b11s) != 2 { t.Fatalf("expected 2 best blocks, got %v", len(b11s)) } - // Tell tbcd - err = n.SendBlockheader(ctx, b11a[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - time.Sleep(500 * time.Millisecond) - err = n.SendBlockheader(ctx, b11b[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - time.Sleep(500 * time.Millisecond) - - // Let's see if tbcd agrees - si := s.Synced(ctx) - // t.Logf("--- %v", si) - bhsAt11, err := s.BlockHeadersByHeight(ctx, 11) + time.Sleep(50 * time.Millisecond) + + // Let's see if tbcd agrees + si := s.Synced(ctx) + // t.Logf("--- %v", si) + bhsAt11, err := s.BlockHeadersByHeight(ctx, 11) if err != nil { t.Fatal(err) } @@ -680,28 +1018,20 @@ func TestFork(t *testing.T) { // 9 -> 10a -> 11a -> // \-> 10b -> 11c -> 12 t.Logf("mine 11c") - b11c, err := n.Mine(1, &b10bHash, address) + b11c, err := n.MineAndSend(ctx, "b11c", b10b.Hash(), address) if err != nil { t.Fatal(err) } - b11cHash := b11c[0].MsgBlock().Header.BlockHash() - err = n.SendBlockheader(ctx, b11c[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - time.Sleep(500 * time.Millisecond) + time.Sleep(50 * time.Millisecond) // 12 t.Logf("mine 12") - b12, err := n.Mine(1, &b11cHash, address) + b12, err := n.MineAndSend(ctx, "b12", b11c.Hash(), address) if err != nil { t.Fatal(err) } - err = n.SendBlockheader(ctx, b12[0].MsgBlock().Header) - if err != nil { - t.Fatal(err) - } - time.Sleep(500 * time.Millisecond) + _ = b12 + time.Sleep(50 * time.Millisecond) t.Logf("did we fork?") @@ -761,6 +1091,620 @@ func TestWork(t *testing.T) { t.Logf("compact to big: 0x%x", blockchain.CompactToBig(0x170331db)) } +func TestIndexNoFork(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + n, err := newFakeNode(t, "18444") + if err != nil { + t.Fatal(err) + } + + defer func() { + err := n.Stop() + if err != nil { + t.Logf("node stop: %v", err) + } + }() + + go func() { + if err := n.Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + time.Sleep(time.Second) + + // Connect tbc service + cfg := &Config{ + AutoIndex: false, + BlockSanity: false, + LevelDBHome: t.TempDir(), + ListenAddress: tbcapi.DefaultListen, + // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", + MaxCachedTxs: 1000, // XXX + Network: networkLocalnet, + PeersWanted: 1, + PrometheusListenAddress: "", + } + _ = loggo.ConfigureLoggers(cfg.LogLevel) + s, err := NewServer(cfg) + if err != nil { + t.Fatal(err) + } + s.ignoreUlimit = true + + go func() { + err := s.Run(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + + time.Sleep(2 * time.Second) + + // creat a linear chain with some tx's + // g -> b1 -> b2 -> b3 + + // best chain + parent := chaincfg.RegressionNetParams.GenesisHash + address := n.address + b1, err := n.MineAndSend(ctx, "b1", parent, address) + if err != nil { + t.Fatal(err) + } + b2, err := n.MineAndSend(ctx, "b2", b1.Hash(), address) + if err != nil { + t.Fatal(err) + } + b3, err := n.MineAndSend(ctx, "b3", b2.Hash(), address) + if err != nil { + t.Fatal(err) + } + + // genesis -> b3 should work with negative direction (cdiff is less than target) + direction, err := s.TxIndexIsLinear(ctx, b3.Hash()) + if err != nil { + t.Fatalf("expected success g -> b3, got %v", err) + } + if direction <= 0 { + t.Fatalf("expected 1 going from genesis to b3, got %v", direction) + } + + // Index to b3 + err = s.SyncIndexersToHash(ctx, b3.Hash()) + if err != nil { + t.Fatal(err) + } + err = mustHave(ctx, s, n.genesis, b1, b2, b3) + if err != nil { + t.Fatal(err) + } + + // XXX verify all balances + for address, key := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v (%v): %v", address, key.name, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + // make sure genesis tx is in db + _, err = s.TxById(ctx, n.gtx.Hash()) + if err != nil { + t.Fatalf("genesis not found: %v", err) + } + // make sure gensis was not spent + _, err = s.SpentOutputsByTxId(ctx, n.gtx.Hash()) + if err == nil { + t.Fatal("genesis coinbase tx should not be spent") + } + + // Spot check tx 1 from b2 + tx := b2.b.Transactions()[1] + txb2, err := s.TxById(ctx, tx.Hash()) + if err != nil { + t.Fatal(err) + } + if !btcutil.NewTx(txb2).Hash().IsEqual(tx.Hash()) { + t.Fatal("hash not equal") + } + si, err := s.SpentOutputsByTxId(ctx, b1.b.Transactions()[0].Hash()) + if err != nil { + t.Fatal(err) + } + // t.Logf("%v: %v", b1.b.Transactions()[0].Hash(), spew.Sdump(si)) + si, err = s.SpentOutputsByTxId(ctx, b2.b.Transactions()[1].Hash()) + if err != nil { + t.Fatal(err) + } + // t.Logf("%v: %v", b2.b.Transactions()[1].Hash(), spew.Sdump(si)) + _ = si + + // unwind back to b3 (removes b3 and b2) + err = s.SyncIndexersToHash(ctx, b2.Hash()) + if err != nil { + t.Fatalf("unwinding to genesis should have returned nil, got %v", err) + } + err = mustHave(ctx, s, n.genesis, b1) + if err != nil { + t.Fatalf("expected an error from mustHave: %v", err) + } + + err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatal(err) + } + _, err = s.TxById(ctx, n.gtx.Hash()) + if err != nil { + t.Fatal("expected genesis") + } + + // Expect 0 balances everywhere + for address, key := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + log.Infof("balance address %v %v", address, btcutil.Amount(balance)) + if balance != 0 { + t.Fatalf("%v (%v) invalid balance expected 0, got %v", + key.name, address, btcutil.Amount(balance)) + } + } +} + +func TestIndexFork(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + n, err := newFakeNode(t, "18444") + if err != nil { + t.Fatal(err) + } + defer func() { + err := n.Stop() + if err != nil { + t.Logf("node stop: %v", err) + } + }() + go func() { + if err := n.Run(ctx); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + time.Sleep(time.Second) + + // Connect tbc service + cfg := &Config{ + AutoIndex: false, + BlockSanity: false, + LevelDBHome: t.TempDir(), + ListenAddress: tbcapi.DefaultListen, + // LogLevel: "tbcd=TRACE:tbc=TRACE:level=DEBUG", + MaxCachedTxs: 1000, // XXX + Network: networkLocalnet, + PeersWanted: 1, + PrometheusListenAddress: "", + } + _ = loggo.ConfigureLoggers(cfg.LogLevel) + s, err := NewServer(cfg) + if err != nil { + t.Fatal(err) + } + s.ignoreUlimit = true + go func() { + err := s.Run(ctx) + if err != nil && !errors.Is(err, context.Canceled) { + panic(err) + } + }() + time.Sleep(2 * time.Second) + + // Create a bunch of weird geometries to catch all corner cases in the indexer. + + // /-> b1a -> b2a + // g -> b1 -> b2 -> b3 + // \-> b1b -> b2b + + // best is b3 + + // best chain + parent := chaincfg.RegressionNetParams.GenesisHash + address := n.address + b1, err := n.MineAndSend(ctx, "b1", parent, address) + if err != nil { + t.Fatal(err) + } + b2, err := n.MineAndSend(ctx, "b2", b1.Hash(), address) + if err != nil { + t.Fatal(err) + } + b3, err := n.MineAndSend(ctx, "b3", b2.Hash(), address) + if err != nil { + t.Fatal(err) + } + + // a chain + b1a, err := n.MineAndSend(ctx, "b1a", parent, address) + if err != nil { + t.Fatal(err) + } + b2a, err := n.MineAndSend(ctx, "b2a", b1a.Hash(), address) + if err != nil { + t.Fatal(err) + } + + // b chain + b1b, err := n.MineAndSend(ctx, "b1b", parent, address) + if err != nil { + t.Fatal(err) + } + b2b, err := n.MineAndSend(ctx, "b2b", b1b.Hash(), address) + if err != nil { + t.Fatal(err) + } + + // Verify linear indexing. Current TxIndex is sitting at genesis + + // genesis -> b3 should work with negative direction (cdiff is less than target) + direction, err := s.TxIndexIsLinear(ctx, b3.Hash()) + if err != nil { + t.Fatalf("expected success g -> b3, got %v", err) + } + if direction <= 0 { + t.Fatalf("expected 1 going from genesis to b3, got %v", direction) + } + + // Index to b3 + err = s.SyncIndexersToHash(ctx, b3.Hash()) + if err != nil { + t.Fatal(err) + } + // XXX verify indexes + err = mustHave(ctx, s, n.genesis, b1, b2, b3) + if err != nil { + t.Fatal(err) + } + // XXX add mustNotHave + // verify tx + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + // Verify linear indexing. Current TxIndex is sitting at b3 + t.Logf("b3: %v", b3) + + // b3 -> genesis should work with postive direction (cdiff is greater than target) + direction, err = s.TxIndexIsLinear(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatalf("expected success b3 -> genesis, got %v", err) + } + if direction != -1 { + t.Fatalf("expected -1 going from b3 to genesis, got %v", direction) + } + + // b3 -> b1 should work with positive direction + direction, err = s.TxIndexIsLinear(ctx, b1.Hash()) + if err != nil { + t.Fatalf("expected success b3 -> b1, got %v", err) + } + if direction != -1 { + t.Fatalf("expected -1 going from b3 to genesis, got %v", direction) + } + + // b3 -> b2a should fail + _, err = s.TxIndexIsLinear(ctx, b2a.Hash()) + if !errors.Is(err, ErrNotLinear) { + t.Fatalf("b2a is not linear to b3: %v", err) + } + + // b3 -> b2b should fail + _, err = s.TxIndexIsLinear(ctx, b2b.Hash()) + if !errors.Is(err, ErrNotLinear) { + t.Fatalf("b2b is not linear to b3: %v", err) + } + + // make sure syncing to iself is non linear + err = s.SyncIndexersToHash(ctx, b3.Hash()) + if err != nil { + t.Fatalf("at b3, should have returned nil, got %v", err) + } + + // unwind back to genesis + err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatalf("unwinding to genesis should have returned nil, got %v", err) + } + err = mustHave(ctx, s, n.genesis, b1, b2, b3) + if err == nil { + t.Fatalf("expected an error from mustHave") + } + + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + // XXX verify indexes + txHH, err := s.TxIndexHash(ctx) + if err != nil { + t.Fatalf("expected success getting tx index hash, got: %v", err) + } + if !txHH.Hash.IsEqual(s.chainParams.GenesisHash) { + t.Fatalf("expected tx index hash to be equal to genesis, got: %v", txHH) + } + if txHH.Height != 0 { + t.Fatalf("expected tx index height to be 0, got: %v", txHH.Height) + } + + // see if we can move to b2z + direction, err = s.TxIndexIsLinear(ctx, b2a.Hash()) + if err != nil { + t.Fatalf("expected success genesis -> b2a, got %v", err) + } + if direction != 1 { + t.Fatalf("expected 1 going from genesis to b2a, got %v", direction) + } + + t.Logf("---------------------------------------- going to b2a") + err = s.SyncIndexersToHash(ctx, b2a.Hash()) + if err != nil { + t.Fatalf("wind to b2a: %v", err) + } + + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + // unwind back to genesis + err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatalf("unwinding to genesis should have returned nil, got %v", err) + } + err = mustHave(ctx, s, n.genesis, b1, b2, b3) + if err == nil { + t.Fatalf("expected an error from mustHave") + } + txHH, err = s.TxIndexHash(ctx) + if err != nil { + t.Fatalf("expected success getting tx index hash, got: %v", err) + } + if !txHH.Hash.IsEqual(s.chainParams.GenesisHash) { + t.Fatalf("expected tx index hash to be equal to genesis, got: %v", txHH) + } + if txHH.Height != 0 { + t.Fatalf("expected tx index height to be 0, got: %v", txHH.Height) + } + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + t.Logf("---------------------------------------- going to b2b") + err = s.SyncIndexersToHash(ctx, b2b.Hash()) + if err != nil { + t.Fatalf("wind to b2b: %v", err) + } + + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + + // t.Logf("---------------------------------------- going to b3") + // unwind back to genesis + err = s.SyncIndexersToHash(ctx, s.chainParams.GenesisHash) + if err != nil { + t.Fatalf("xxxx %v", err) + } + for address := range n.keys { + balance, err := s.BalanceByAddress(ctx, address) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, balance) + utxos, err := s.UtxosByAddress(ctx, address, 0, 100) + if err != nil { + t.Fatal(err) + } + t.Logf("%v: %v", address, utxos) + } + // err = mustHave(ctx, s, n.genesis, b1, b2, b3) + // if err == nil { + // t.Fatalf("expected an error from mustHave") + // } + // txHH, err = s.TxIndexHash(ctx) + // if err != nil { + // t.Fatalf("expected success getting tx index hash, got: %v", err) + // } + // if !txHH.Hash.IsEqual(s.chainParams.GenesisHash) { + // t.Fatalf("expected tx index hash to be equal to genesis, got: %v", txHH) + // } + // if txHH.Height != 0 { + // t.Fatalf("expected tx index height to be 0, got: %v", txHH.Height) + // } + + // // Index to b3 + // err = s.SyncIndexersToHash(ctx, b3.Hash()) + // if err != nil { + // t.Fatal(err) + // } + // // XXX verify indexes + // err = mustHave(ctx, s, n.genesis, b1, b2, b3) + // if err != nil { + // t.Fatal(err) + // } + + // // Should fail + // t.Logf("=== index b2a ===") + // err = s.SyncIndexersToHash(ctx, b2a.Hash()) + // if err != nil { + // t.Fatal(err) + // } + + // t.Logf("=== index b2b ===") + // err = s.SyncIndexersToHash(ctx, b2b.Hash()) + // if err != nil { + // t.Fatal(err) + // } + + // time.Sleep(time.Second) +} + +func TestTransactions(t *testing.T) { + params := &chaincfg.RegressionNetParams + nextBlockHeight := int32(2) + extraNonce := uint64(nextBlockHeight) + coinbaseScript, err := standardCoinbaseScript(nextBlockHeight, extraNonce) + if err != nil { + t.Fatal(err) + } + payToKey, err := btcec.NewPrivateKey() + if err != nil { + t.Fatal(err) + } + payToKeyPublic := payToKey.PubKey() + payToAddress, err := btcutil.NewAddressPubKeyHash( + btcutil.Hash160(payToKeyPublic.SerializeCompressed()), params) + if err != nil { + t.Fatal(err) + } + + coinbaseTx, err := createCoinbaseTx(params, coinbaseScript, + nextBlockHeight, payToAddress) + if err != nil { + t.Fatal(err) + } + t.Logf("coinbase: %v", spew.Sdump(coinbaseTx)) + wireCoinbaseTx := coinbaseTx.MsgTx() + disasm, err := txscript.DisasmString(wireCoinbaseTx.TxOut[0].PkScript) + if err != nil { + t.Fatal(err) + } + t.Logf("coinbase: %v", disasm) + + // now create a tx that spends the above TxIn + redeemKey, err := btcec.NewPrivateKey() + if err != nil { + t.Fatal(err) + } + redeemKeyPublic := redeemKey.PubKey() + redeemAddress, err := btcutil.NewAddressPubKeyHash(btcutil.Hash160(redeemKeyPublic.SerializeCompressed()), params) + if err != nil { + t.Fatal(err) + } + + redeemTx := wire.NewMsgTx(wire.TxVersion) + prevOut := wire.NewOutPoint(coinbaseTx.Hash(), 0) + txIn := wire.NewTxIn(prevOut, nil, nil) + redeemTx.AddTxIn(txIn) + pkScript, err := txscript.PayToAddrScript(redeemAddress) + if err != nil { + t.Fatal(err) + } + txOut := wire.NewTxOut(3000000000, pkScript) + redeemTx.AddTxOut(txOut) + sc, as, sigs, err := txscript.ExtractPkScriptAddrs(pkScript, params) + if err != nil { + t.Fatal(err) + } + t.Logf("%v %v: %v", sc, sigs, spew.Sdump(as)) + + // add cgange + changeScript, err := txscript.PayToAddrScript(payToAddress) + if err != nil { + t.Fatal(err) + } + txOutChange := wire.NewTxOut(2000000000, changeScript) + redeemTx.AddTxOut(txOutChange) + // sign + lookupKey := func(a btcutil.Address) (*btcec.PrivateKey, bool, error) { + return payToKey, true, nil + } + sigScript, err := txscript.SignTxOutput(&chaincfg.MainNetParams, + redeemTx, 0, wireCoinbaseTx.TxOut[0].PkScript, txscript.SigHashAll, + txscript.KeyClosure(lookupKey), nil, nil) + if err != nil { + t.Fatal(err) + } + redeemTx.TxIn[0].SignatureScript = sigScript + log.Infof("redeem tx: %v", spew.Sdump(redeemTx)) + + flags := txscript.ScriptBip16 | txscript.ScriptVerifyDERSignatures | + txscript.ScriptStrictMultiSig | + txscript.ScriptDiscourageUpgradableNops + vm, err := txscript.NewEngine(wireCoinbaseTx.TxOut[0].PkScript, redeemTx, 0, + flags, nil, nil, -1, nil) + if err != nil { + t.Fatal(err) + } + if err := vm.Execute(); err != nil { + t.Fatal(err) + } + disasm, err = vm.DisasmScript(0) + if err != nil { + t.Fatal(err) + } + t.Logf("coinbase signed tx in 0: %v", disasm) + disasm, err = vm.DisasmScript(1) + if err != nil { + t.Fatal(err) + } + t.Logf("coinbase signed tx out 0: %v", disasm) +} + // borrowed from btcd // // Copyright (c) 2014-2016 The btcsuite developers