Skip to content

Commit

Permalink
errGroup fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 12, 2024
1 parent c09c896 commit 4856d58
Showing 1 changed file with 19 additions and 22 deletions.
41 changes: 19 additions & 22 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/apiv1/spannerpb"
gcs "cloud.google.com/go/storage"
"github.com/transparency-dev/merkle/compact"
tessera "github.com/transparency-dev/trillian-tessera"
"github.com/transparency-dev/trillian-tessera/api"
"github.com/transparency-dev/trillian-tessera/api/layout"
Expand Down Expand Up @@ -429,16 +430,16 @@ func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byt
})

errG.Go(func() error {

newSize, newRoot, tiles, err := tb.Integrate(ctx, fromSeq, entries)
if err != nil {
return fmt.Errorf("Integrate: %v", err)
}
for k, v := range tiles {
k, v := k, v
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
func(ctx context.Context, k compact.NodeID, v *api.HashTile) {
errG.Go(func() error {
return s.setTile(ctx, uint64(k.Level), k.Index, newSize, v)
})
}(ctx, k, v)
}
errG.Go(func() error {
//TODO: write out checkpoint
Expand Down Expand Up @@ -466,6 +467,17 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie
}

seqErr := errgroup.Group{}
goSetEntryBundle := func(ctx context.Context, bundleIndex uint64, fromSeq uint64, bundle api.EntryBundle) {
seqErr.Go(func() error {
if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, &bundle); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
})
}

// Add new entries to the bundle
for _, e := range entries {
bundle.Entries = append(bundle.Entries, e)
Expand All @@ -475,15 +487,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie
if entriesInBundle == entryBundleSize {
// This bundle is full, so we need to write it out...
klog.V(1).Infof("Bundle idx %x is full", bundleIndex)
bundle := bundle
seqErr.Go(func() error {
if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
})
goSetEntryBundle(ctx, bundleIndex, fromSeq, *bundle)
// ... and prepare the next entry bundle for any remaining entries in the batch
bundleIndex++
entriesInBundle = 0
Expand All @@ -495,14 +499,7 @@ func (s *Storage) updateEntryBundles(ctx context.Context, fromSeq uint64, entrie
// this needs writing out too.
if entriesInBundle > 0 {
klog.V(1).Infof("Writing partial bundle idx %d.%d", bundleIndex, entriesInBundle)
seqErr.Go(func() error {
if err := s.setEntryBundle(ctx, bundleIndex, fromSeq, bundle); err != nil {
if !errors.Is(os.ErrExist, err) {
return err
}
}
return nil
})
goSetEntryBundle(ctx, bundleIndex, fromSeq, *bundle)
}
return seqErr.Wait()
}
Expand Down

0 comments on commit 4856d58

Please sign in to comment.