From 68f726bfdbd9a659d1da64c8a176e8d5caa4d585 Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Wed, 28 Feb 2024 11:58:53 -0500 Subject: [PATCH] retry keystone on failure * keep queue of 10 newest keystones known * upon receiving a keystone, only queue if it is greater than the oldest keystone * if queue is full, drop oldest keystone upon inserting another * when a keystone is mined, mark it as "processed" to not repeat mining same keystone --- e2e/network_test.go | 2 +- service/popm/popm.go | 132 +++++++++++- service/popm/popm_test.go | 425 +++++++++++++++++++++++++++++++++++--- 3 files changed, 527 insertions(+), 32 deletions(-) diff --git a/e2e/network_test.go b/e2e/network_test.go index 675823c96..1292ea015 100644 --- a/e2e/network_test.go +++ b/e2e/network_test.go @@ -88,7 +88,7 @@ func TestFullNetwork(t *testing.T) { "-rpcuser=user", "-rpcpassword=password", "generatetoaddress", - "5000", // need to generate a lot for greater chance to not spend coinbase + "200", // need to generate a lot for greater chance to not spend coinbase btcAddress.EncodeAddress(), }) if err != nil { diff --git a/service/popm/popm.go b/service/popm/popm.go index 3b2e37b87..455fb08ff 100644 --- a/service/popm/popm.go +++ b/service/popm/popm.go @@ -50,6 +50,108 @@ func init() { loggo.ConfigureLoggers(logLevel) } +// L2KeystonePriorityBuffer holds up to "size" L2Keystones. it allows a caller +// to push to it and it will keep the most recent "size" keystones, overriding +// the oldest if full. we define "oldest" as "the smallest l2 block number" +type L2KeystonePriorityBuffer struct { + mtx sync.Mutex + mapping map[string]*L2KeystonePriorityBufferElement + size int +} + +// L2KeystonePriorityBufferElement holds an L2Keystone and whether it has been +// "processed" or not +type L2KeystonePriorityBufferElement struct { + l2Keystone hemi.L2Keystone + requiresProcessing bool +} + +// NewL2KeystonePriorityBuffer creates a L2KeystonePriorityBuffer with size n +func NewL2KeystonePriorityBuffer(n int) *L2KeystonePriorityBuffer { + return &L2KeystonePriorityBuffer{ + mapping: make(map[string]*L2KeystonePriorityBufferElement), + size: n, + } +} + +// Push inserts an L2Keystone, dropping the oldest if full +func (r *L2KeystonePriorityBuffer) Push(val hemi.L2Keystone) { + r.mtx.Lock() + defer r.mtx.Unlock() + + item := L2KeystonePriorityBufferElement{ + l2Keystone: val, + requiresProcessing: true, + } + + serialized := hemi.L2KeystoneAbbreviate(val).Serialize() + key := hex.EncodeToString(serialized[:]) + + // keystone already exists, no-op + if _, ok := r.mapping[key]; ok { + return + } + + if len(r.mapping) < r.size { + r.mapping[key] = &item + return + } + + var smallestL2BlockNumber uint32 + var smallestKey string + + for k, v := range r.mapping { + if smallestL2BlockNumber == 0 || v.l2Keystone.L2BlockNumber < smallestL2BlockNumber { + smallestL2BlockNumber = v.l2Keystone.L2BlockNumber + smallestKey = k + } + } + + // do not insert an L2Keystone that is older than all of the ones already + // queued + if item.l2Keystone.L2BlockNumber < smallestL2BlockNumber { + return + } + + delete(r.mapping, smallestKey) + + r.mapping[key] = &item +} + +// ForEach is a thread-safe function that calls a callback function to "process" +// each L2Keystone. The callback function is called with a copy of the +// L2Keystone to process. if the callback returns an error, no-op, otherwise +// mark the L2Keystone as processed +func (r *L2KeystonePriorityBuffer) ForEach(cb func(ks hemi.L2Keystone) error) { + r.mtx.Lock() + copies := []L2KeystonePriorityBufferElement{} + for _, v := range r.mapping { + if v.requiresProcessing { + copies = append(copies, *v) + } + } + r.mtx.Unlock() + + // mine the newest keystone first + slices.SortFunc(copies, func(a, b L2KeystonePriorityBufferElement) int { + return int(b.l2Keystone.L2BlockNumber) - int(a.l2Keystone.L2BlockNumber) + }) + + for _, e := range copies { + mined := hemi.L2KeystoneAbbreviate(e.l2Keystone).Serialize() + key := hex.EncodeToString(mined[:]) + + if err := cb(e.l2Keystone); err == nil { + r.mtx.Lock() + // check to see if still in map before marking + if _, ok := r.mapping[key]; ok { + r.mapping[key].requiresProcessing = false + } + r.mtx.Unlock() + } + } +} + type Config struct { // BFGWSURL specifies the URL of the BFG private websocket endpoint BFGWSURL string @@ -94,13 +196,15 @@ type Miner struct { btcAddress *btcutil.AddressPubKeyHash lastKeystone *hemi.L2Keystone - keystoneCh chan *hemi.L2Keystone + keystoneBuf *L2KeystonePriorityBuffer // Prometheus isRunning bool bfgWg sync.WaitGroup bfgCmdCh chan bfgCmd // commands to send to bfg + + mineNowCh chan struct{} } func NewMiner(cfg *Config) (*Miner, error) { @@ -110,10 +214,11 @@ func NewMiner(cfg *Config) (*Miner, error) { m := &Miner{ cfg: cfg, - keystoneCh: make(chan *hemi.L2Keystone, 3), + keystoneBuf: NewL2KeystonePriorityBuffer(10), bfgCmdCh: make(chan bfgCmd, 10), holdoffTimeout: 5 * time.Second, requestTimeout: 5 * time.Second, + mineNowCh: make(chan struct{}), } switch strings.ToLower(cfg.BTCChainName) { @@ -435,24 +540,35 @@ func (m *Miner) BitcoinUTXOs(ctx context.Context, scriptHash string) (*bfgapi.Bi return ir, nil } +func (m *Miner) mineKnownKeystones(ctx context.Context) { + m.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + log.Infof("Received keystone for mining with height %v...", ks.L2BlockNumber) + if err := m.mineKeystone(ctx, &ks); err != nil { + log.Errorf("Failed to mine keystone: %v", err) + return err + } + return nil + }) +} + func (m *Miner) mine(ctx context.Context) { defer m.wg.Done() for { select { case <-ctx.Done(): return - case ks := <-m.keystoneCh: - log.Tracef("Received new keystone header for mining with height %v...", ks.L2BlockNumber) - if err := m.mineKeystone(ctx, ks); err != nil { - log.Errorf("Failed to mine keystone: %v", err) - } + case <-m.mineNowCh: + go m.mineKnownKeystones(ctx) + case <-time.After(15 * time.Second): + go m.mineKnownKeystones(ctx) } } } func (m *Miner) queueKeystoneForMining(keystone *hemi.L2Keystone) { + m.keystoneBuf.Push(*keystone) select { - case m.keystoneCh <- keystone: + case m.mineNowCh <- struct{}{}: default: } } diff --git a/service/popm/popm_test.go b/service/popm/popm_test.go index 3f3129185..c87782f73 100644 --- a/service/popm/popm_test.go +++ b/service/popm/popm_test.go @@ -12,6 +12,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "slices" "testing" "time" @@ -138,7 +139,9 @@ func TestProcessReceivedKeystones(t *testing.T) { }, } - miner := Miner{} + miner := Miner{ + keystoneBuf: NewL2KeystonePriorityBuffer(10), + } miner.processReceivedKeystones(context.Background(), firstBatchOfL2Keystones) diff := deep.Equal(*miner.lastKeystone, hemi.L2Keystone{ @@ -496,23 +499,335 @@ func TestProcessReceivedInAscOrder(t *testing.T) { receivedKeystones := []hemi.L2Keystone{} - for { - select { - case l2Keystone := <-miner.keystoneCh: - receivedKeystones = append(receivedKeystones, *l2Keystone) - continue - default: - break - } - break - } + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + receivedKeystones = append(receivedKeystones, ks) + return nil + }) + slices.Reverse(receivedKeystones) diff := deep.Equal(firstBatchOfL2Keystones, receivedKeystones) if len(diff) != 0 { t.Fatalf("received unexpected diff: %s", diff) } } +// TestProcessReceivedOnlyOnce ensures that we only process keystones once if +// no error +func TestProcessReceivedOnlyOnce(t *testing.T) { + keystones := []hemi.L2Keystone{ + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + { + L2BlockNumber: 1, + EPHash: []byte{1}, + }, + } + + miner, err := NewMiner(&Config{ + BTCPrivateKey: "ebaaedce6af48a03bbfd25e8cd0364140ebaaedce6af48a03bbfd25e8cd03641", + BTCChainName: "testnet3", + }) + if err != nil { + t.Fatal(err) + } + miner.processReceivedKeystones(context.Background(), keystones) + + processedKeystonesFirstTime := 0 + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + processedKeystonesFirstTime++ + return nil + }) + if processedKeystonesFirstTime != 3 { + t.Fatalf("should have processed 3 keystones, processed %d", processedKeystonesFirstTime) + } + + processedKeystonesSecondTime := 0 + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + processedKeystonesSecondTime++ + return nil + }) + + if processedKeystonesSecondTime != 0 { + t.Fatal("should have only processed the keystones once") + } +} + +// TestProcessReceivedUntilError ensures that we retry until no error +func TestProcessReceivedOnlyOnceWithError(t *testing.T) { + keystones := []hemi.L2Keystone{ + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + { + L2BlockNumber: 1, + EPHash: []byte{1}, + }, + } + + miner, err := NewMiner(&Config{ + BTCPrivateKey: "ebaaedce6af48a03bbfd25e8cd0364140ebaaedce6af48a03bbfd25e8cd03641", + BTCChainName: "testnet3", + }) + if err != nil { + t.Fatal(err) + } + miner.processReceivedKeystones(context.Background(), keystones) + + processedKeystonesFirstTime := 0 + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + processedKeystonesFirstTime++ + return errors.New("something bad happened") + }) + if processedKeystonesFirstTime != 3 { + t.Fatalf("should have processed 3 keystones, processed %d", processedKeystonesFirstTime) + } + + processedKeystonesSecondTime := 0 + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + processedKeystonesSecondTime++ + return nil + }) + + if processedKeystonesSecondTime != 3 { + t.Fatalf("should have processed 3 keystones, processed %d", processedKeystonesSecondTime) + } + + processedKeystonesThirdTime := 0 + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + processedKeystonesThirdTime++ + return nil + }) + + if processedKeystonesThirdTime != 0 { + t.Fatal("keystones should have already been processed") + } +} + +// TestProcessReceivedNoDuplicates ensures that we don't queue a duplicate +func TestProcessReceivedNoDuplicates(t *testing.T) { + keystones := []hemi.L2Keystone{ + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + } + + miner, err := NewMiner(&Config{ + BTCPrivateKey: "ebaaedce6af48a03bbfd25e8cd0364140ebaaedce6af48a03bbfd25e8cd03641", + BTCChainName: "testnet3", + }) + if err != nil { + t.Fatal(err) + } + + receivedKeystones := []hemi.L2Keystone{} + + miner.processReceivedKeystones(context.Background(), keystones) + + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + receivedKeystones = append(receivedKeystones, ks) + return nil + }) + + slices.Reverse(keystones) + + diff := deep.Equal([]hemi.L2Keystone{ + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + }, receivedKeystones) + if len(diff) != 0 { + t.Fatalf("received unexpected diff: %s", diff) + } +} + +// TestProcessReceivedInAscOrder ensures that if we queue more than 10 keystones +// for mining, that we override the oldest +func TestProcessReceivedInAscOrderOverride(t *testing.T) { + keystones := []hemi.L2Keystone{ + { + L2BlockNumber: 1, + EPHash: []byte{1}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 4, + EPHash: []byte{4}, + }, + { + L2BlockNumber: 5, + EPHash: []byte{5}, + }, + { + L2BlockNumber: 6, + EPHash: []byte{6}, + }, + { + L2BlockNumber: 7, + EPHash: []byte{7}, + }, + { + L2BlockNumber: 8, + EPHash: []byte{8}, + }, + { + L2BlockNumber: 9, + EPHash: []byte{9}, + }, + { + L2BlockNumber: 10, + EPHash: []byte{10}, + }, + { + L2BlockNumber: 11, + EPHash: []byte{11}, + }, + } + + miner, err := NewMiner(&Config{ + BTCPrivateKey: "ebaaedce6af48a03bbfd25e8cd0364140ebaaedce6af48a03bbfd25e8cd03641", + BTCChainName: "testnet3", + }) + if err != nil { + t.Fatal(err) + } + + for _, keystone := range keystones { + miner.processReceivedKeystones(context.Background(), []hemi.L2Keystone{keystone}) + } + + receivedKeystones := []hemi.L2Keystone{} + + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + receivedKeystones = append(receivedKeystones, ks) + return nil + }) + + slices.Reverse(keystones) + + diff := deep.Equal(keystones[:10], receivedKeystones) + if len(diff) != 0 { + t.Fatalf("received unexpected diff: %s", diff) + } +} + +// TestProcessReceivedInAscOrderNoInsertIfTooOld ensures that if the queue +// is full, and we try to insert a keystone that is older than every other +// keystone, we don't insert it +func TestProcessReceivedInAscOrderNoInsertIfTooOld(t *testing.T) { + keystones := []hemi.L2Keystone{ + { + L2BlockNumber: 1, + EPHash: []byte{1}, + }, + { + L2BlockNumber: 2, + EPHash: []byte{2}, + }, + { + L2BlockNumber: 3, + EPHash: []byte{3}, + }, + { + L2BlockNumber: 4, + EPHash: []byte{4}, + }, + { + L2BlockNumber: 5, + EPHash: []byte{5}, + }, + { + L2BlockNumber: 6, + EPHash: []byte{6}, + }, + { + L2BlockNumber: 7, + EPHash: []byte{7}, + }, + { + L2BlockNumber: 8, + EPHash: []byte{8}, + }, + { + L2BlockNumber: 9, + EPHash: []byte{9}, + }, + { + L2BlockNumber: 10, + EPHash: []byte{10}, + }, + { + L2BlockNumber: 11, + EPHash: []byte{11}, + }, + } + + miner, err := NewMiner(&Config{ + BTCPrivateKey: "ebaaedce6af48a03bbfd25e8cd0364140ebaaedce6af48a03bbfd25e8cd03641", + BTCChainName: "testnet3", + }) + if err != nil { + t.Fatal(err) + } + + for _, keystone := range keystones { + miner.processReceivedKeystones(context.Background(), []hemi.L2Keystone{keystone}) + } + + // this one should be dropped + miner.processReceivedKeystones(context.Background(), []hemi.L2Keystone{ + { + L2BlockNumber: 1, + EPHash: []byte{1}, + }, + }) + + receivedKeystones := []hemi.L2Keystone{} + + miner.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error { + receivedKeystones = append(receivedKeystones, ks) + return nil + }) + + slices.Reverse(keystones) + + diff := deep.Equal(keystones[:10], receivedKeystones) + if len(diff) != 0 { + t.Fatalf("received unexpected diff: %s", diff) + } +} + func TestConnectToBFGAndPerformMineWithAuth(t *testing.T) { privateKey, err := dcrsecp256k1.GeneratePrivateKey() if err != nil { @@ -523,7 +838,7 @@ func TestConnectToBFGAndPerformMineWithAuth(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - server, msgCh, cleanup := createMockBFG(ctx, t, []string{publicKey}) + server, msgCh, cleanup := createMockBFG(ctx, t, []string{publicKey}, false, 1) defer cleanup() go func() { @@ -587,7 +902,7 @@ func TestConnectToBFGAndPerformMine(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - server, msgCh, cleanup := createMockBFG(ctx, t, []string{}) + server, msgCh, cleanup := createMockBFG(ctx, t, []string{}, false, 1) defer cleanup() go func() { @@ -643,6 +958,71 @@ func TestConnectToBFGAndPerformMine(t *testing.T) { } } +func TestConnectToBFGAndPerformMineMultiple(t *testing.T) { + privateKey, err := dcrsecp256k1.GeneratePrivateKey() + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + server, msgCh, cleanup := createMockBFG(ctx, t, []string{}, false, 2) + defer cleanup() + + go func() { + miner, err := NewMiner(&Config{ + BFGWSURL: server.URL + bfgapi.RouteWebsocketPublic, + BTCChainName: "testnet3", + BTCPrivateKey: hex.EncodeToString(privateKey.Serialize()), + }) + if err != nil { + panic(err) + } + + err = miner.Run(ctx) + if err != nil && err != context.Canceled { + panic(err) + } + }() + + // we can't guarantee order here, so test that we get all expected messages + // from popm within the timeout + + messagesReceived := make(map[string]int) + + messagesExpected := map[protocol.Command]int{ + EventConnected: 1, + bfgapi.CmdL2KeystonesRequest: 1, + bfgapi.CmdBitcoinInfoRequest: 2, + bfgapi.CmdBitcoinBalanceRequest: 2, + bfgapi.CmdBitcoinUTXOsRequest: 2, + bfgapi.CmdBitcoinBroadcastRequest: 2, + } + + for { + select { + case msg := <-msgCh: + t.Logf("received message %v", msg) + messagesReceived[msg]++ + case <-ctx.Done(): + if ctx.Err() != nil { + t.Fatal(ctx.Err()) + } + } + missing := false + for m := range messagesExpected { + message := fmt.Sprintf("%s", m) + if messagesReceived[message] != messagesExpected[m] { + t.Logf("still missing message %v, found %d want %d", m, messagesReceived[message], messagesExpected[m]) + missing = true + } + } + if missing == false { + break + } + } +} + func TestConnectToBFGAndPerformMineWithAuthError(t *testing.T) { privateKey, err := dcrsecp256k1.GeneratePrivateKey() if err != nil { @@ -651,7 +1031,7 @@ func TestConnectToBFGAndPerformMineWithAuthError(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - server, msgCh, cleanup := createMockBFG(ctx, t, []string{"incorrect"}) + server, msgCh, cleanup := createMockBFG(ctx, t, []string{"incorrect"}, false, 1) defer cleanup() miner, err := NewMiner(&Config{ @@ -683,7 +1063,7 @@ func TestConnectToBFGAndPerformMineWithAuthError(t *testing.T) { } } -func createMockBFG(ctx context.Context, t *testing.T, publicKeys []string) (*httptest.Server, chan string, func()) { +func createMockBFG(ctx context.Context, t *testing.T, publicKeys []string, keystoneMined bool, keystoneCount int) (*httptest.Server, chan string, func()) { msgCh := make(chan string) handler := func(w http.ResponseWriter, r *http.Request) { @@ -770,13 +1150,13 @@ func createMockBFG(ctx context.Context, t *testing.T, publicKeys []string) (*htt }() if command == bfgapi.CmdL2KeystonesRequest { - if err := bfgapi.Write(ctx, conn, id, bfgapi.L2KeystonesResponse{ - L2Keystones: []hemi.L2Keystone{ - { - L2BlockNumber: 100, - }, - }, - }); err != nil { + response := bfgapi.L2KeystonesResponse{} + for i := 0; i < keystoneCount; i++ { + response.L2Keystones = append(response.L2Keystones, hemi.L2Keystone{ + L2BlockNumber: uint32(100 + i), + }) + } + if err := bfgapi.Write(ctx, conn, id, response); err != nil { if !errors.Is(ctx.Err(), context.Canceled) { panic(err) } @@ -834,7 +1214,6 @@ func createMockBFG(ctx context.Context, t *testing.T, publicKeys []string) (*htt } } } - } }