diff --git a/experimental/gcp-log/function.go b/experimental/gcp-log/function.go index 86ccd73..cd7dceb 100644 --- a/experimental/gcp-log/function.go +++ b/experimental/gcp-log/function.go @@ -343,7 +343,9 @@ func Integrate(w http.ResponseWriter, r *http.Request) { return } -func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, s note.Signer, client *storage.Client, origin string) error { +// signAndWrite signs a checkpoint and writes the new checkpoint to GCS. +func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, + s note.Signer, client *storage.Client, origin string) error { cp.Origin = origin cpNote.Text = string(cp.Marshal()) cpNoteSigned, err := note.Sign(&cpNote, s) diff --git a/experimental/gcp-log/internal/storage/storage.go b/experimental/gcp-log/internal/storage/storage.go index 8a93940..c2024bc 100644 --- a/experimental/gcp-log/internal/storage/storage.go +++ b/experimental/gcp-log/internal/storage/storage.go @@ -54,26 +54,14 @@ type Client struct { // Note that nextSeq may be <= than the actual next available number, but // never greater. nextSeq uint64 + // checkpointGen is the GCS object generation number that this client last + // read. This is useful for read-modify-write operation of the checkpoint. + checkpointGen int64 checkpointCacheControl string otherCacheControl string } -// ClientOpts holds configuration options for the storage client. -type ClientOpts struct { - // ProjectID is the GCP project which hosts the storage bucket for the log. - ProjectID string - // Bucket is the name of the bucket to use for storing log state. - Bucket string - // CheckpointCacheControl, if set, will cause the Cache-Control header associated with the - // checkpoint object to be set to this value. If unset, the current GCP default will be used. - CheckpointCacheControl string - // OtherCacheControl, if set, will cause the Cache-Control header associated with the - // all non-checkpoint objects to be set to this value. If unset, the current GCP default - // will be used. - OtherCacheControl string -} - // NewClient returns a Client which allows interaction with the log stored in // the specified bucket on GCS. func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) { @@ -86,6 +74,7 @@ func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) { gcsClient: c, projectID: opts.ProjectID, bucket: opts.Bucket, + checkpointGen: 0, checkpointCacheControl: opts.CheckpointCacheControl, otherCacheControl: opts.OtherCacheControl, }, nil @@ -136,17 +125,20 @@ func (c *Client) SetNextSeq(num uint64) { c.nextSeq = num } -// WriteCheckpoint stores a raw log checkpoint on GCS. +// WriteCheckpoint stores a raw log checkpoint on GCS if it matches the +// generation that the client thinks the checkpoint is. func (c *Client) WriteCheckpoint(ctx context.Context, newCPRaw []byte) error { bkt := c.gcsClient.Bucket(c.bucket) obj := bkt.Object(layout.CheckpointPath) - w := obj.NewWriter(ctx) + + w := obj.If(gcs.Conditions{GenerationMatch: c.checkpointGen}).NewWriter(ctx) if c.checkpointCacheControl != "" { w.ObjectAttrs.CacheControl = c.checkpointCacheControl } if _, err := w.Write(newCPRaw); err != nil { return err } + c.checkpointGen++ return w.Close() } @@ -155,13 +147,22 @@ func (c *Client) ReadCheckpoint(ctx context.Context) ([]byte, error) { bkt := c.gcsClient.Bucket(c.bucket) obj := bkt.Object(layout.CheckpointPath) + // Get the GCS generation number. + attrs, err := obj.Attrs(ctx) + if err != nil { + return nil, fmt.Errorf("Object(%q).Attrs: %w", obj, err) + } + c.checkpointGen = attrs.Generation + + // Get the content of the checkpoint. r, err := obj.NewReader(ctx) if err != nil { return nil, err } defer r.Close() - return io.ReadAll(r) + content, err := io.ReadAll(r) + return content, err } // GetTile returns the tile at the given tile-level and tile-index.