Skip to content

Commit

Permalink
Consume previously sequenced entries
Browse files Browse the repository at this point in the history
  • Loading branch information
AlCutter committed Jul 12, 2024
1 parent 417b4cb commit 50a8a8f
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 5 deletions.
130 changes: 126 additions & 4 deletions storage/gcp/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,17 @@ type objStore interface {

// coord describes a type which knows how to sequence entries.
type sequencer interface {
// assignEntries should durably allocate contiguous index numbers to the provided entries,
// and return the lowest assigned index.
assignEntries(ctx context.Context, entries [][]byte) (uint64, error)
// consumeEntries should call the provided function with up to limit previously sequenced entries.
// If the call to consumeFunc returns no error, the entries should be considered to have been consumed.
consumeEntries(ctx context.Context, limit uint64, f consumeFunc) (bool, error)
}

// consumeFunc is the signature of a function which can consume entries from the sequencer.
type consumeFunc func(ctx context.Context, from uint64, entries [][]byte) error

// Config holds GCP project and resource configuration for a storage instance.
type Config struct {
// ProjectID is the GCP project which hosts the storage bucket and Spanner database for the log.
Expand Down Expand Up @@ -110,6 +118,29 @@ func New(ctx context.Context, cfg Config) (*Storage, error) {
// TODO(al): make queue options configurable:
r.queue = storage.NewQueue(time.Second, 256, r.sequencer.assignEntries)

go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
for {
cctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if more, err := r.sequencer.consumeEntries(cctx, 2048 /*limit*/, r.integrate); err != nil {
klog.Errorf("integrate: %v", err)
break
} else if !more {
break
}
klog.V(1).Info("Quickloop")
}
}
}()

return r, nil
}

Expand Down Expand Up @@ -172,6 +203,11 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, logSiz
return s.objStore.setObject(ctx, objName, data, &gcs.Conditions{DoesNotExist: true})
}

// integrate incorporates the provided entries into the log starting at fromSeq.
func (s *Storage) integrate(ctx context.Context, fromSeq uint64, entries [][]byte) error {
return nil
}

// spannerSequencer uses Cloud Spanner to provide
// a durable and thread/multi-process safe sequencer.
type spannerSequencer struct {
Expand Down Expand Up @@ -230,11 +266,11 @@ func (s *spannerSequencer) initDB(ctx context.Context) error {
// sequencing and integration to occur.
// Note that this will only succeed if no row exists, so there's no danger
// of "resetting" an existing log.
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("SeqCoord", []string{"id", "next"}, []interface{}{0, 0})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {

Check failure on line 269 in storage/gcp/gcp.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
//return err
}
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); spanner.ErrCode(err) != codes.AlreadyExists {
return err
if _, err := s.dbPool.Apply(ctx, []*spanner.Mutation{spanner.Insert("IntCoord", []string{"id", "seq"}, []interface{}{0, 0})}); err != nil && spanner.ErrCode(err) != codes.AlreadyExists {

Check failure on line 272 in storage/gcp/gcp.go

View workflow job for this annotation

GitHub Actions / lint

SA9003: empty branch (staticcheck)
//return err
}
return nil
}
Expand Down Expand Up @@ -289,6 +325,92 @@ func (s *spannerSequencer) assignEntries(ctx context.Context, entries [][]byte)
return uint64(next), nil
}

// consumeEntries calls f with previously sequenced entries.
//
// Once f returns without error, the entries it was called with are considered to have been consumed and are
// removed from the Seq table.
//
// Returns true if some entries were consumed as a weak signal that there may be further entries waiting to be consumed.
func (s *spannerSequencer) consumeEntries(ctx context.Context, limit uint64, f consumeFunc) (bool, error) {
didWork := false
_, err := s.dbPool.ReadWriteTransaction(ctx, func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
// Figure out which is the starting index of sequenced entries to start consuming from.
row, err := txn.ReadRowWithOptions(ctx, "IntCoord", spanner.Key{0}, []string{"seq"}, &spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
if err != nil {
return err
}
var fromSeq int64 // Spanner doesn't support uint64
if err := row.Column(0, &fromSeq); err != nil {
return fmt.Errorf("failed to read integration coordination info: %v", err)
}
klog.V(1).Infof("Consuming from %d", fromSeq)

// Now read the sequenced starting at the index we got above.
rows := txn.ReadWithOptions(ctx, "Seq",
spanner.KeyRange{Start: spanner.Key{0, fromSeq}, End: spanner.Key{0, fromSeq + int64(limit)}},
[]string{"seq", "v"},
&spanner.ReadOptions{LockHint: spannerpb.ReadRequest_LOCK_HINT_EXCLUSIVE})
defer rows.Stop()

seqsConsumed := []int64{}
entries := make([][]byte, 0, limit)
orderCheck := fromSeq
for {
row, err := rows.Next()
if row == nil || err == iterator.Done {
break
}

var vGob []byte
var seq int64 // spanner doesn't have uint64
if err := row.Columns(&seq, &vGob); err != nil {
return fmt.Errorf("failed to scan seq row: %v", err)
}

if orderCheck != seq {
return fmt.Errorf("integrity fail - expected seq %d, but found %d", orderCheck, seq)
}

g := gob.NewDecoder(bytes.NewReader(vGob))
b := [][]byte{}
if err := g.Decode(&b); err != nil {
return fmt.Errorf("failed to deserialise v: %v", err)
}
entries = append(entries, b...)
seqsConsumed = append(seqsConsumed, seq)
orderCheck += int64(len(b))
}
if len(seqsConsumed) == 0 {
klog.V(1).Info("Found no rows to sequence")
return nil
}

// Call consumeFunc with the entries we've found
if err := f(ctx, uint64(fromSeq), entries); err != nil {
return err
}

// consumeFunc was successful, so we can update our coordination row, and delete the row(s) for
// the then consumed entries.
m := make([]*spanner.Mutation, 0)
m = append(m, spanner.Update("IntCoord", []string{"id", "seq"}, []interface{}{0, int64(orderCheck)}))
for _, c := range seqsConsumed {
m = append(m, spanner.Delete("Seq", spanner.Key{0, c}))
}
if err := txn.BufferWrite(m); err != nil {
return err
}

didWork = true
return nil
})
if err != nil {
return false, err
}

return didWork, nil
}

// gcsStorage knows how to store and retrieve objects from GCS.
type gcsStorage struct {
bucket string
Expand Down
47 changes: 46 additions & 1 deletion storage/gcp/gcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newSpannerDB(t *testing.T) func() {

}

func TestSpannerSequencer(t *testing.T) {
func TestSpannerSequencerAssignEntries(t *testing.T) {
ctx := context.Background()
close := newSpannerDB(t)
defer close()
Expand All @@ -82,6 +82,51 @@ func TestSpannerSequencer(t *testing.T) {
}
}

func TestSpannerSequencerRoundTrip(t *testing.T) {
ctx := context.Background()
close := newSpannerDB(t)
defer close()

s, err := newSpannerSequencer(ctx, "projects/p/instances/i/databases/d")
if err != nil {
t.Fatalf("newSpannerSequencer: %v", err)
}

seq := 0
for chunks := 0; chunks < 10; chunks++ {
entries := [][]byte{}
for i := 0; i < 10+chunks; i++ {
entries = append(entries, []byte(fmt.Sprintf("item %d", seq)))
seq++
}
if _, err := s.assignEntries(ctx, entries); err != nil {
t.Fatalf("assignEntries: %v", err)
}
}

seenIdx := uint64(0)
f := func(_ context.Context, fromSeq uint64, entries [][]byte) error {
if fromSeq != seenIdx {
return fmt.Errorf("f called with fromSeq %d, want %d", fromSeq, seenIdx)
}
for i, e := range entries {
if !bytes.Equal(e, []byte(fmt.Sprintf("item %d", i))) {
return fmt.Errorf("entry %d+%d != %d", fromSeq, i, seenIdx)
}
seenIdx++
}
return nil
}

more, err := s.consumeEntries(ctx, 7, f)
if err != nil {
t.Errorf("consumeEntries: %v", err)
}
if !more {
t.Errorf("more: false, expected true")
}
}

func makeTile(t *testing.T, size uint64) *api.HashTile {
t.Helper()
r := &api.HashTile{Nodes: make([][]byte, size)}
Expand Down

0 comments on commit 50a8a8f

Please sign in to comment.