Skip to content

Commit

Permalink
Better error handling in backfill script (#2148)
Browse files Browse the repository at this point in the history
* Fix error handling for backfill

- defer the channel close so that the channel is closed no matter what
- always report parse and insert errors
- handle a cancelled context in the sql query
- clean up inappropriate log.Fatal calls leftover from an earlier
  refactor
- don't print success if insertion failed

Signed-off-by: Colleen Murphy <[email protected]>

* Abort backfill after limited errors

If there is a persistent problem with one of the network connections,
continuing to attempt the insertion is a waste of time, and if the list
of errors gets too long before the script finishes or is interrupted, it
becomes impossible to tell when it started failing and therefore where
to restart the script from. This change sets a limit on the maximum
number of failures to tolerate before exiting the script. Parsing errors
are related to the format of the data and don't have implications on the
overall success of the script on other data, so no change is implemented
for parsing errors.

Signed-off-by: Colleen Murphy <[email protected]>

---------

Signed-off-by: Colleen Murphy <[email protected]>
  • Loading branch information
cmurphy authored Jun 20, 2024
1 parent ef431ff commit 3323267
Showing 1 changed file with 42 additions and 22 deletions.
64 changes: 42 additions & 22 deletions cmd/backfill-index/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ const (
PRIMARY KEY(PK),
UNIQUE(EntryKey, EntryUUID)
)`
maxInsertErrors = 5
)

type provider int
Expand Down Expand Up @@ -239,7 +240,7 @@ func getIndexClient(backend provider) (indexClient, error) {
}

// populate does the heavy lifting of populating the index storage for whichever client is passed in.
func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) (err error) {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
group, ctx := errgroup.WithContext(ctx)
group.SetLimit(*concurrency)
Expand All @@ -252,6 +253,7 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
var resultChan = make(chan result)
parseErrs := make([]int, 0)
insertErrs := make([]int, 0)
runningInsertErrs := 0

go func() {
for r := range resultChan {
Expand All @@ -264,6 +266,16 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
}
}()

defer func() {
close(resultChan)
if len(parseErrs) > 0 {
err = fmt.Errorf("failed to parse %d entries: %v", len(parseErrs), parseErrs)
}
if len(insertErrs) > 0 {
err = fmt.Errorf("failed to insert/remove %d entries: %v", len(insertErrs), insertErrs)
}
}()

for i := *startIndex; i <= *endIndex; i++ {
index := i // capture loop variable for closure
group.Go(func() error {
Expand All @@ -275,10 +287,23 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
if errors.Is(err, context.Canceled) {
return nil
}
log.Fatalf("retrieving log uuid by index: %v", err)
return fmt.Errorf("retrieving log uuid by index: %v", err)
}
var parseErrs []error
var insertErrs []error
defer func() {
if len(insertErrs) != 0 || len(parseErrs) != 0 {
fmt.Printf("Errors with log index %d:\n", index)
for _, e := range insertErrs {
fmt.Println(e)
}
for _, e := range parseErrs {
fmt.Println(e)
}
} else {
fmt.Printf("Completed log index %d\n", index)
}
}()
for uuid, entry := range resp.Payload {
// uuid is the global UUID - tree ID and entry UUID
e, _, _, err := unmarshalEntryImpl(entry.Body.(string))
Expand All @@ -293,22 +318,24 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
}
for _, key := range keys {
if err := indexClient.idempotentAddToIndex(ctx, key, uuid); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
insertErrs = append(insertErrs, fmt.Errorf("error inserting UUID %s with key %s: %w", uuid, key, err))
runningInsertErrs++
if runningInsertErrs > maxInsertErrors {
resultChan <- result{
index: index,
parseErrs: parseErrs,
insertErrs: insertErrs,
}
return fmt.Errorf("too many insertion errors")
}
continue
}
fmt.Printf("Uploaded entry %s, index %d, key %s\n", uuid, index, key)
}
}
if len(insertErrs) != 0 || len(parseErrs) != 0 {
fmt.Printf("Errors with log index %d:\n", index)
for _, e := range insertErrs {
fmt.Println(e)
}
for _, e := range parseErrs {
fmt.Println(e)
}
} else {
fmt.Printf("Completed log index %d\n", index)
}
resultChan <- result{
index: index,
parseErrs: parseErrs,
Expand All @@ -318,18 +345,11 @@ func populate(indexClient indexClient, rekorClient *rekorclient.Rekor) error {
return nil
})
}
err := group.Wait()
err = group.Wait()
if err != nil {
log.Fatalf("error running backfill: %v", err)
return fmt.Errorf("error running backfill: %v", err)
}
close(resultChan)
fmt.Println("Backfill complete")
if len(parseErrs) > 0 {
return fmt.Errorf("failed to parse %d entries: %v", len(parseErrs), parseErrs)
}
if len(insertErrs) > 0 {
return fmt.Errorf("failed to insert/remove %d entries: %v", len(insertErrs), insertErrs)
}
return nil
}

Expand Down

0 comments on commit 3323267

Please sign in to comment.