Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retry mine keystone on failure #18

Merged
merged 18 commits into from
Mar 14, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 19 additions & 22 deletions e2e/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestFullNetwork(t *testing.T) {
"-rpcuser=user",
"-rpcpassword=password",
"generatetoaddress",
"5000", // need to generate a lot for greater chance to not spend coinbase
"200", // need to generate a lot for greater chance to not spend coinbase
btcAddress.EncodeAddress(),
})
if err != nil {
Expand Down Expand Up @@ -213,29 +213,26 @@ func TestFullNetwork(t *testing.T) {
}
}()

go func() {
for {
l2Keystone.L2BlockNumber++
l2Keystone.L1BlockNumber++

l2KeystoneRequest := bssapi.L2KeystoneRequest{
L2Keystone: l2Keystone,
}
l2KeystoneRequest := bssapi.L2KeystoneRequest{
L2Keystone: l2Keystone,
}

err = bssapi.Write(ctx, bws.conn, "someid", l2KeystoneRequest)
if err != nil {
t.Logf("error: %s", err)
return
}
err = bssapi.Write(ctx, bws.conn, "someid", l2KeystoneRequest)
if err != nil {
t.Logf("error: %s", err)
return
}

// give time for the L2 Keystone to propogate to bitcoin tx mempool
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
t.Log(ctx.Err())
return
}
// give time for the L2 Keystone to propogate to bitcoin tx mempool
select {
case <-time.After(10 * time.Second):
case <-ctx.Done():
t.Logf(ctx.Err().Error())
return
}

go func() {
for {
// generate a new btc block, this should include the l2 keystone
err = runBitcoinCommand(ctx,
t,
Expand Down Expand Up @@ -385,7 +382,7 @@ func createBfg(ctx context.Context, t *testing.T, pgUri string, electrumxAddr st
req := testcontainers.ContainerRequest{
Env: map[string]string{
"BFG_POSTGRES_URI": pgUri,
"BFG_BTC_START_HEIGHT": "5000",
"BFG_BTC_START_HEIGHT": "1",
"BFG_EXBTC_ADDRESS": electrumxAddr,
"BFG_LOG_LEVEL": "TRACE",
"BFG_PUBLIC_ADDRESS": ":8383",
Expand Down
134 changes: 125 additions & 9 deletions service/popm/popm.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ const (
logLevel = "INFO"

promSubsystem = "popm_service" // Prometheus

l2KeystonesMaxSize = 10
)

var log = loggo.GetLogger("popm")
var (
log = loggo.GetLogger("popm")

l2KeystoneRetryTimeout = 15 * time.Second
joshuasing marked this conversation as resolved.
Show resolved Hide resolved
)

func init() {
loggo.ConfigureLoggers(logLevel)
Expand Down Expand Up @@ -79,6 +85,11 @@ type bfgCmd struct {
ch chan any
}

type L2KeystoneProcessingContainer struct {
l2Keystone hemi.L2Keystone
requiresProcessing bool
}

type Miner struct {
mtx sync.RWMutex
wg sync.WaitGroup
Expand All @@ -94,13 +105,16 @@ type Miner struct {
btcAddress *btcutil.AddressPubKeyHash

lastKeystone *hemi.L2Keystone
keystoneCh chan *hemi.L2Keystone

// Prometheus
isRunning bool

bfgWg sync.WaitGroup
bfgCmdCh chan bfgCmd // commands to send to bfg

mineNowCh chan struct{}

l2Keystones map[string]L2KeystoneProcessingContainer
}

func NewMiner(cfg *Config) (*Miner, error) {
Expand All @@ -110,10 +124,11 @@ func NewMiner(cfg *Config) (*Miner, error) {

m := &Miner{
cfg: cfg,
keystoneCh: make(chan *hemi.L2Keystone, 3),
bfgCmdCh: make(chan bfgCmd, 10),
holdoffTimeout: 5 * time.Second,
requestTimeout: 5 * time.Second,
mineNowCh: make(chan struct{}, 1),
l2Keystones: make(map[string]L2KeystoneProcessingContainer, l2KeystonesMaxSize),
}

switch strings.ToLower(cfg.BTCChainName) {
Expand Down Expand Up @@ -467,24 +482,52 @@ func (m *Miner) BitcoinUTXOs(ctx context.Context, scriptHash string) (*bfgapi.Bi
return ir, nil
}

func (m *Miner) mineKnownKeystones(ctx context.Context) {
copies := m.l2KeystonesForProcessing()

for _, e := range copies {
serialized := hemi.L2KeystoneAbbreviate(e).Serialize()
key := hex.EncodeToString(serialized[:])

log.Infof("Received keystone for mining with height %v...", e.L2BlockNumber)

err := m.mineKeystone(ctx, &e)
if err != nil {
log.Errorf("Failed to mine keystone: %v", err)
}

m.mtx.Lock()

if v, ok := m.l2Keystones[key]; ok {
// if there is an error, mark keystone as "requires processing" so
// potentially gets retried, otherwise set this to false to
// nothing tries to process it
v.requiresProcessing = err != nil
m.l2Keystones[key] = v
}

m.mtx.Unlock()
}
}

func (m *Miner) mine(ctx context.Context) {
defer m.wg.Done()
for {
select {
case <-ctx.Done():
return
case ks := <-m.keystoneCh:
log.Tracef("Received new keystone header for mining with height %v...", ks.L2BlockNumber)
if err := m.mineKeystone(ctx, ks); err != nil {
log.Errorf("Failed to mine keystone: %v", err)
}
case <-m.mineNowCh:
go m.mineKnownKeystones(ctx)
case <-time.After(l2KeystoneRetryTimeout):
go m.mineKnownKeystones(ctx)
}
}
}

func (m *Miner) queueKeystoneForMining(keystone *hemi.L2Keystone) {
m.AddL2Keystone(*keystone)
select {
case m.keystoneCh <- keystone:
case m.mineNowCh <- struct{}{}:
default:
}
}
Expand Down Expand Up @@ -821,3 +864,76 @@ func (m *Miner) Run(pctx context.Context) error {

return err
}

func (m *Miner) AddL2Keystone(val hemi.L2Keystone) {
serialized := hemi.L2KeystoneAbbreviate(val).Serialize()
key := hex.EncodeToString(serialized[:])

toInsert := L2KeystoneProcessingContainer{
l2Keystone: val,
requiresProcessing: true,
}

m.mtx.Lock()
defer m.mtx.Unlock()

// keystone already exists, no-op
if _, ok := m.l2Keystones[key]; ok {
return
}

if len(m.l2Keystones) < l2KeystonesMaxSize {
m.l2Keystones[key] = toInsert
return
}

var smallestL2BlockNumber uint32
var smallestKey string

for k, v := range m.l2Keystones {
if smallestL2BlockNumber == 0 || v.l2Keystone.L2BlockNumber < smallestL2BlockNumber {
smallestL2BlockNumber = v.l2Keystone.L2BlockNumber
smallestKey = k
}
}

// do not insert an L2Keystone that is older than all of the ones already
// added
if val.L2BlockNumber < smallestL2BlockNumber {
return
}

delete(m.l2Keystones, smallestKey)

m.l2Keystones[key] = toInsert
}

// l2KeystonesForProcessing creates copies of the l2 keystones, set them to
// "processing", then returns the copies with the newest first
func (m *Miner) l2KeystonesForProcessing() []hemi.L2Keystone {
copies := make([]hemi.L2Keystone, 0)

m.mtx.Lock()

for i, v := range m.l2Keystones {

joshuasing marked this conversation as resolved.
Show resolved Hide resolved
// if we're currently processing, or we've already processed the keystone
// then don't process
if !v.requiresProcessing {
continue
}

// since we're about to process, mark this as false so others don't
// process the same
v.requiresProcessing = false
m.l2Keystones[i] = v
copies = append(copies, v.l2Keystone)
}
m.mtx.Unlock()

slices.SortFunc(copies, func(a, b hemi.L2Keystone) int {
return int(b.L2BlockNumber) - int(a.L2BlockNumber)
})

return copies
}
Loading