From 588d442b0114727d87294b7f29418d38e0b0c25f Mon Sep 17 00:00:00 2001 From: Jay Hou Date: Wed, 3 Jan 2024 19:54:45 +0000 Subject: [PATCH] Add conditional writes for the checkpoint. (#61) After this PR, the Integrate function may fail if the generation precondition of the new checkpoint does not match the last read checkpoint. To tighten that, we should add a [retry](https://cloud.google.com/workflows/docs/reference/syntax/retrying) policy for the Cloud Build step (e.g. [here](https://github.com/transparency-dev/armored-witness-applet/blob/a8b91dd9b5bb73b0802b4114ee52549204c077f6/release/cloudbuild_ci.yaml#L120-L136)) to Integrate a new log entry. --- experimental/gcp-log/function.go | 4 ++- .../gcp-log/internal/storage/storage.go | 26 +++++++++++++++++-- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/experimental/gcp-log/function.go b/experimental/gcp-log/function.go index 86ccd73..cd7dceb 100644 --- a/experimental/gcp-log/function.go +++ b/experimental/gcp-log/function.go @@ -343,7 +343,9 @@ func Integrate(w http.ResponseWriter, r *http.Request) { return } -func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, s note.Signer, client *storage.Client, origin string) error { +// signAndWrite signs a checkpoint and writes the new checkpoint to GCS. +func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, + s note.Signer, client *storage.Client, origin string) error { cp.Origin = origin cpNote.Text = string(cp.Marshal()) cpNoteSigned, err := note.Sign(&cpNote, s) diff --git a/experimental/gcp-log/internal/storage/storage.go b/experimental/gcp-log/internal/storage/storage.go index 8a93940..7f781aa 100644 --- a/experimental/gcp-log/internal/storage/storage.go +++ b/experimental/gcp-log/internal/storage/storage.go @@ -54,6 +54,9 @@ type Client struct { // Note that nextSeq may be <= than the actual next available number, but // never greater. nextSeq uint64 + // checkpointGen is the GCS object generation number that this client last + // read. This is useful for read-modify-write operation of the checkpoint. + checkpointGen int64 checkpointCacheControl string otherCacheControl string @@ -86,6 +89,7 @@ func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) { gcsClient: c, projectID: opts.ProjectID, bucket: opts.Bucket, + checkpointGen: 0, checkpointCacheControl: opts.CheckpointCacheControl, otherCacheControl: opts.OtherCacheControl, }, nil @@ -136,17 +140,27 @@ func (c *Client) SetNextSeq(num uint64) { c.nextSeq = num } -// WriteCheckpoint stores a raw log checkpoint on GCS. +// WriteCheckpoint stores a raw log checkpoint on GCS if it matches the +// generation that the client thinks the checkpoint is. func (c *Client) WriteCheckpoint(ctx context.Context, newCPRaw []byte) error { bkt := c.gcsClient.Bucket(c.bucket) obj := bkt.Object(layout.CheckpointPath) - w := obj.NewWriter(ctx) + + var cond gcs.Conditions + if c.checkpointGen == 0 { + cond = gcs.Conditions{DoesNotExist: true} + } else { + cond = gcs.Conditions{GenerationMatch: c.checkpointGen} + } + + w := obj.If(cond).NewWriter(ctx) if c.checkpointCacheControl != "" { w.ObjectAttrs.CacheControl = c.checkpointCacheControl } if _, err := w.Write(newCPRaw); err != nil { return err } + c.checkpointGen++ return w.Close() } @@ -155,6 +169,14 @@ func (c *Client) ReadCheckpoint(ctx context.Context) ([]byte, error) { bkt := c.gcsClient.Bucket(c.bucket) obj := bkt.Object(layout.CheckpointPath) + // Get the GCS generation number. + attrs, err := obj.Attrs(ctx) + if err != nil { + return nil, fmt.Errorf("Object(%q).Attrs: %w", obj, err) + } + c.checkpointGen = attrs.Generation + + // Get the content of the checkpoint. r, err := obj.NewReader(ctx) if err != nil { return nil, err