Skip to content

Commit

Permalink
Merge pull request lightninglabs#693 from lightninglabs/caretaker_sto…
Browse files Browse the repository at this point in the history
…p_fixes

tapgarden: fix races and deadlocks in caretaker
  • Loading branch information
jharveyb authored Dec 8, 2023
2 parents 6b39ff8 + 05b82da commit 2335031
Show file tree
Hide file tree
Showing 7 changed files with 476 additions and 114 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COMMIT := $(shell git describe --tags --dirty)

GOBUILD := GOEXPERIMENT=loopvar GO111MODULE=on go build -v
GOINSTALL := GOEXPERIMENT=loopvar GO111MODULE=on go install -v
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
GOTEST := GOEXPERIMENT=loopvar GO111MODULE=on go test
GOMOD := GO111MODULE=on go mod

GOLIST := go list -deps $(PKG)/... | grep '$(PKG)'
Expand Down Expand Up @@ -168,7 +168,7 @@ unit-cover: $(GOACC_BIN)

unit-race:
@$(call print, "Running unit race tests.")
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(GOLIST) | $(XARGS) env $(GOTEST) -race -test.timeout=20m
env CGO_ENABLED=1 GORACE="history_size=7 halt_on_errors=1" $(UNIT_RACE)

itest: build-itest itest-only

Expand Down
17 changes: 17 additions & 0 deletions fn/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,20 @@ func First[T any](xs []*T, pred func(*T) bool) (*T, error) {

return nil, fmt.Errorf("no item found")
}

// Last returns the last item in the slice that matches the predicate, or an
// error if none matches.
func Last[T any](xs []*T, pred func(*T) bool) (*T, error) {
var matches []*T
for i := range xs {
if pred(xs[i]) {
matches = append(matches, xs[i])
}
}

if len(matches) == 0 {
return nil, fmt.Errorf("no item found")
}

return matches[len(matches)-1], nil
}
4 changes: 3 additions & 1 deletion make/testing_flags.mk
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ LOG_TAGS =
TEST_FLAGS =
ITEST_FLAGS = -logoutput
COVER_PKG = $$(go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)' | grep -v lnrpc)
RACE_PKG = go list -deps -tags="$(DEV_TAGS)" ./... | grep '$(PKG)'
COVER_HTML = go tool cover -html=coverage.txt -o coverage.html
POSTGRES_START_DELAY = 5

Expand All @@ -19,6 +20,7 @@ ifneq ($(pkg),)
UNITPKG := $(PKG)/$(pkg)
UNIT_TARGETED = yes
COVER_PKG = $(PKG)/$(pkg)
RACE_PKG = $(PKG)/$(pkg)
endif

# If a specific unit test case is being target, construct test.run filter.
Expand Down Expand Up @@ -78,7 +80,7 @@ TEST_FLAGS += -test.timeout=$(timeout)
else ifneq ($(optional),)
TEST_FLAGS += -test.timeout=240m
else
TEST_FLAGS += -test.timeout=60m
TEST_FLAGS += -test.timeout=20m
endif

GOLIST := go list -tags="$(DEV_TAGS)" -deps $(PKG)/... | grep '$(PKG)'| grep -v '/vendor/'
Expand Down
184 changes: 122 additions & 62 deletions tapgarden/caretaker.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,62 +166,82 @@ func (b *BatchCaretaker) Start() error {
func (b *BatchCaretaker) Stop() error {
var stopErr error
b.stopOnce.Do(func() {
log.Infof("BatchCaretaker(%x): Stopping", b.batchKey[:])

close(b.Quit)
b.Wg.Wait()
})

return stopErr
}

// Cancel signals for a batch caretaker to stop advancing a batch if possible.
// A batch can only be cancelled if it has not reached BatchStateBroadcast yet.
func (b *BatchCaretaker) Cancel() CancelResp {
// Cancel signals for a batch caretaker to stop advancing a batch. A batch can
// only be cancelled if it has not reached BatchStateBroadcast yet. If
// cancellation succeeds, we forward the batch state after cancellation. If the
// batch could not be cancelled, the planter will handle caretaker shutdown and
// batch state.
func (b *BatchCaretaker) Cancel() error {
ctx, cancel := b.WithCtxQuit()
defer cancel()

batchKey := b.cfg.Batch.BatchKey.PubKey.SerializeCompressed()
batchKey := b.batchKey[:]
batchState := b.cfg.Batch.State()
var cancelResp CancelResp

// This function can only be called before the caretaker state stepping
// function, so the batch state read is the next state that has not yet
// been executed. Seedlings are converted to asset sprouts in the Frozen
// state, and broadcast in the Broadast state.
log.Debugf("BatchCaretaker(%x): Trying to cancel", batchKey)
switch batchState {
// In the pending state, the batch seedlings have not sprouted yet.
case BatchStatePending, BatchStateFrozen:
finalBatchState := BatchStateSeedlingCancelled
err := b.cfg.Log.UpdateBatchState(
ctx, b.cfg.Batch.BatchKey.PubKey,
finalBatchState,
BatchStateSeedlingCancelled,
)
if err != nil {
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
"cancel failed: %w", batchKey, batchState, err)
}

b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")

return CancelResp{&finalBatchState, err}
cancelResp = CancelResp{true, err}

case BatchStateCommitted:
finalBatchState := BatchStateSproutCancelled
err := b.cfg.Log.UpdateBatchState(
ctx, b.cfg.Batch.BatchKey.PubKey,
finalBatchState,
BatchStateSproutCancelled,
)
if err != nil {
err = fmt.Errorf("BatchCaretaker(%x), batch state(%v), "+
"cancel failed: %w", batchKey, batchState, err)
}

b.cfg.BroadcastErrChan <- fmt.Errorf("caretaker canceled")

return CancelResp{&finalBatchState, err}
cancelResp = CancelResp{true, err}

default:
err := fmt.Errorf("BatchCaretaker(%x), batch not cancellable",
b.cfg.Batch.BatchKey.PubKey.SerializeCompressed())
return CancelResp{nil, err}
cancelResp = CancelResp{false, err}
}

b.cfg.CancelRespChan <- cancelResp

// If the batch was cancellable, the final write of the cancelled batch
// may still have failed. That error will be handled by the planter. At
// this point, the caretaker should shut down gracefully if cancellation
// was attempted.
if cancelResp.cancelAttempted {
log.Infof("BatchCaretaker(%x), attempted batch cancellation, "+
"shutting down", b.batchKey[:])

return nil
}

// If the cancellation failed, that error will be handled by the
// planter.
return fmt.Errorf("BatchCaretaker(%x) cancellation failed",
b.batchKey[:])
}

// advanceStateUntil attempts to advance the internal state machine until the
Expand All @@ -241,22 +261,20 @@ func (b *BatchCaretaker) advanceStateUntil(currentState,
return 0, fmt.Errorf("BatchCaretaker(%x), shutting "+
"down", b.batchKey[:])

// If the batch was cancellable, the finalState of the cancel
// response will be non-nil. If the cancellation failed, that
// error will be handled by the planter. At this point, the
// caretaker should always shut down gracefully.
case <-b.cfg.CancelReqChan:
cancelResp := b.Cancel()
b.cfg.CancelRespChan <- cancelResp

// TODO(jhb): Use concrete error types for caretaker
// shutdown cases
// If the batch was cancellable, the finalState of the
// cancel response will be non-nil. If the cancellation
// failed, that error will be handled by the planter.
// At this point, the caretaker should always shut down
// gracefully.
if cancelResp.finalState != nil {
cancelErr := b.Cancel()
if cancelErr == nil {
return 0, fmt.Errorf("BatchCaretaker(%x), "+
"attempted batch cancellation, "+
"shutting down", b.batchKey[:])
}

log.Info(cancelErr)

default:
}

Expand Down Expand Up @@ -313,7 +331,7 @@ func (b *BatchCaretaker) assetCultivator() {
currentBatchState, BatchStateBroadcast,
)
if err != nil {
log.Errorf("unable to advance state machine: %v", err)
log.Errorf("Unable to advance state machine: %v", err)
b.cfg.BroadcastErrChan <- err
return
}
Expand Down Expand Up @@ -360,7 +378,12 @@ func (b *BatchCaretaker) assetCultivator() {
return

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

log.Error(cancelErr)

case <-b.Quit:
return
Expand Down Expand Up @@ -740,7 +763,7 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
b.cfg.Batch.GenesisPacket.ChainFees = chainFees

log.Infof("BatchCaretaker(%x): GenesisPacket absolute fee: "+
"%d sats", chainFees)
"%d sats", b.batchKey[:], chainFees)
log.Infof("BatchCaretaker(%x): GenesisPacket finalized",
b.batchKey[:])
log.Tracef("GenesisPacket: %v", spew.Sdump(signedPkt))
Expand Down Expand Up @@ -848,48 +871,85 @@ func (b *BatchCaretaker) stateStep(currentState BatchState) (BatchState, error)
defer confCancel()
defer b.Wg.Done()

var confEvent *chainntnfs.TxConfirmation
select {
case confEvent = <-confNtfn.Confirmed:
log.Debugf("Got chain confirmation: %v",
confEvent.Tx.TxHash())

case err := <-errChan:
b.cfg.ErrChan <- fmt.Errorf("error getting "+
"confirmation: %w", err)
return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, context " +
"done")

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
var (
confEvent *chainntnfs.TxConfirmation
confRecv bool
)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, exiting")
return
for !confRecv {
select {
case confEvent = <-confNtfn.Confirmed:
confRecv = true

case err := <-errChan:
confErr := fmt.Errorf("error getting "+
"confirmation: %w", err)
log.Info(confErr)
b.cfg.ErrChan <- confErr

return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, " +
"context done")
confRecv = true

case <-b.cfg.CancelReqChan:
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

// Cancellation failed, continue to wait
// for transaction confirmation.
log.Info(cancelErr)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, " +
"exiting")
return
}
}

if confEvent == nil {
b.cfg.ErrChan <- fmt.Errorf("got empty " +
confErr := fmt.Errorf("got empty " +
"confirmation event in batch")
log.Info(confErr)
b.cfg.ErrChan <- confErr

return
}

select {
case b.confEvent <- confEvent:

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, context " +
"done")

case <-b.cfg.CancelReqChan:
b.cfg.CancelRespChan <- b.Cancel()
if confEvent.Tx != nil {
log.Debugf("Got chain confirmation: %v",
confEvent.Tx.TxHash())
}

case <-b.Quit:
log.Debugf("Skipping TX confirmation, exiting")
return
for {
select {
case b.confEvent <- confEvent:
return

case <-confCtx.Done():
log.Debugf("Skipping TX confirmation, " +
"context done")
return

case <-b.cfg.CancelReqChan:
cancelErr := b.Cancel()
if cancelErr == nil {
return
}

// Cancellation failed, continue to try
// and send the confirmation event.
log.Info(cancelErr)

case <-b.Quit:
log.Debugf("Skipping TX confirmation, " +
"exiting")
return
}
}
}()

Expand Down
Loading

0 comments on commit 2335031

Please sign in to comment.