Skip to content

Commit

Permalink
retry keystone on failure
Browse files Browse the repository at this point in the history
* keep queue of 10 newest keystones known
* upon receiving a keystone, only queue if it is greater than the oldest keystone
* if queue is full, drop oldest keystone upon inserting another
* when a keystone is mined, mark it as "processed" to not repeat mining same keystone
  • Loading branch information
ClaytonNorthey92 committed Mar 1, 2024
1 parent 7875a89 commit 68f726b
Show file tree
Hide file tree
Showing 3 changed files with 527 additions and 32 deletions.
2 changes: 1 addition & 1 deletion e2e/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestFullNetwork(t *testing.T) {
"-rpcuser=user",
"-rpcpassword=password",
"generatetoaddress",
"5000", // need to generate a lot for greater chance to not spend coinbase
"200", // need to generate a lot for greater chance to not spend coinbase
btcAddress.EncodeAddress(),
})
if err != nil {
Expand Down
132 changes: 124 additions & 8 deletions service/popm/popm.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,108 @@ func init() {
loggo.ConfigureLoggers(logLevel)
}

// L2KeystonePriorityBuffer holds up to "size" L2Keystones. it allows a caller
// to push to it and it will keep the most recent "size" keystones, overriding
// the oldest if full. we define "oldest" as "the smallest l2 block number"
type L2KeystonePriorityBuffer struct {
mtx sync.Mutex
mapping map[string]*L2KeystonePriorityBufferElement
size int
}

// L2KeystonePriorityBufferElement holds an L2Keystone and whether it has been
// "processed" or not
type L2KeystonePriorityBufferElement struct {
l2Keystone hemi.L2Keystone
requiresProcessing bool
}

// NewL2KeystonePriorityBuffer creates a L2KeystonePriorityBuffer with size n
func NewL2KeystonePriorityBuffer(n int) *L2KeystonePriorityBuffer {
return &L2KeystonePriorityBuffer{
mapping: make(map[string]*L2KeystonePriorityBufferElement),
size: n,
}
}

// Push inserts an L2Keystone, dropping the oldest if full
func (r *L2KeystonePriorityBuffer) Push(val hemi.L2Keystone) {
r.mtx.Lock()
defer r.mtx.Unlock()

item := L2KeystonePriorityBufferElement{
l2Keystone: val,
requiresProcessing: true,
}

serialized := hemi.L2KeystoneAbbreviate(val).Serialize()
key := hex.EncodeToString(serialized[:])

// keystone already exists, no-op
if _, ok := r.mapping[key]; ok {
return
}

if len(r.mapping) < r.size {
r.mapping[key] = &item
return
}

var smallestL2BlockNumber uint32
var smallestKey string

for k, v := range r.mapping {
if smallestL2BlockNumber == 0 || v.l2Keystone.L2BlockNumber < smallestL2BlockNumber {
smallestL2BlockNumber = v.l2Keystone.L2BlockNumber
smallestKey = k
}
}

// do not insert an L2Keystone that is older than all of the ones already
// queued
if item.l2Keystone.L2BlockNumber < smallestL2BlockNumber {
return
}

delete(r.mapping, smallestKey)

r.mapping[key] = &item
}

// ForEach is a thread-safe function that calls a callback function to "process"
// each L2Keystone. The callback function is called with a copy of the
// L2Keystone to process. if the callback returns an error, no-op, otherwise
// mark the L2Keystone as processed
func (r *L2KeystonePriorityBuffer) ForEach(cb func(ks hemi.L2Keystone) error) {
r.mtx.Lock()
copies := []L2KeystonePriorityBufferElement{}
for _, v := range r.mapping {
if v.requiresProcessing {
copies = append(copies, *v)
}
}
r.mtx.Unlock()

// mine the newest keystone first
slices.SortFunc(copies, func(a, b L2KeystonePriorityBufferElement) int {
return int(b.l2Keystone.L2BlockNumber) - int(a.l2Keystone.L2BlockNumber)
})

for _, e := range copies {
mined := hemi.L2KeystoneAbbreviate(e.l2Keystone).Serialize()
key := hex.EncodeToString(mined[:])

if err := cb(e.l2Keystone); err == nil {
r.mtx.Lock()
// check to see if still in map before marking
if _, ok := r.mapping[key]; ok {
r.mapping[key].requiresProcessing = false
}
r.mtx.Unlock()
}
}
}

type Config struct {
// BFGWSURL specifies the URL of the BFG private websocket endpoint
BFGWSURL string
Expand Down Expand Up @@ -94,13 +196,15 @@ type Miner struct {
btcAddress *btcutil.AddressPubKeyHash

lastKeystone *hemi.L2Keystone
keystoneCh chan *hemi.L2Keystone
keystoneBuf *L2KeystonePriorityBuffer

// Prometheus
isRunning bool

bfgWg sync.WaitGroup
bfgCmdCh chan bfgCmd // commands to send to bfg

mineNowCh chan struct{}
}

func NewMiner(cfg *Config) (*Miner, error) {
Expand All @@ -110,10 +214,11 @@ func NewMiner(cfg *Config) (*Miner, error) {

m := &Miner{
cfg: cfg,
keystoneCh: make(chan *hemi.L2Keystone, 3),
keystoneBuf: NewL2KeystonePriorityBuffer(10),
bfgCmdCh: make(chan bfgCmd, 10),
holdoffTimeout: 5 * time.Second,
requestTimeout: 5 * time.Second,
mineNowCh: make(chan struct{}),
}

switch strings.ToLower(cfg.BTCChainName) {
Expand Down Expand Up @@ -435,24 +540,35 @@ func (m *Miner) BitcoinUTXOs(ctx context.Context, scriptHash string) (*bfgapi.Bi
return ir, nil
}

func (m *Miner) mineKnownKeystones(ctx context.Context) {
m.keystoneBuf.ForEach(func(ks hemi.L2Keystone) error {
log.Infof("Received keystone for mining with height %v...", ks.L2BlockNumber)
if err := m.mineKeystone(ctx, &ks); err != nil {
log.Errorf("Failed to mine keystone: %v", err)
return err
}
return nil
})
}

func (m *Miner) mine(ctx context.Context) {
defer m.wg.Done()
for {
select {
case <-ctx.Done():
return
case ks := <-m.keystoneCh:
log.Tracef("Received new keystone header for mining with height %v...", ks.L2BlockNumber)
if err := m.mineKeystone(ctx, ks); err != nil {
log.Errorf("Failed to mine keystone: %v", err)
}
case <-m.mineNowCh:
go m.mineKnownKeystones(ctx)
case <-time.After(15 * time.Second):
go m.mineKnownKeystones(ctx)
}
}
}

func (m *Miner) queueKeystoneForMining(keystone *hemi.L2Keystone) {
m.keystoneBuf.Push(*keystone)
select {
case m.keystoneCh <- keystone:
case m.mineNowCh <- struct{}{}:
default:
}
}
Expand Down
Loading

0 comments on commit 68f726b

Please sign in to comment.