Skip to content

Commit

Permalink
Add conditional writes for the checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
jiggoha committed Jan 3, 2024
1 parent a4ff46d commit e861c6c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 19 deletions.
4 changes: 3 additions & 1 deletion experimental/gcp-log/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ func Integrate(w http.ResponseWriter, r *http.Request) {
return
}

func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, s note.Signer, client *storage.Client, origin string) error {
// signAndWrite signs a checkpoint and writes the new checkpoint to GCS.
func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note,
s note.Signer, client *storage.Client, origin string) error {
cp.Origin = origin
cpNote.Text = string(cp.Marshal())
cpNoteSigned, err := note.Sign(&cpNote, s)
Expand Down
37 changes: 19 additions & 18 deletions experimental/gcp-log/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,14 @@ type Client struct {
// Note that nextSeq may be <= than the actual next available number, but
// never greater.
nextSeq uint64
// checkpointGen is the GCS object generation number that this client last
// read. This is useful for read-modify-write operation of the checkpoint.
checkpointGen int64

checkpointCacheControl string
otherCacheControl string
}

// ClientOpts holds configuration options for the storage client.
type ClientOpts struct {
// ProjectID is the GCP project which hosts the storage bucket for the log.
ProjectID string
// Bucket is the name of the bucket to use for storing log state.
Bucket string
// CheckpointCacheControl, if set, will cause the Cache-Control header associated with the
// checkpoint object to be set to this value. If unset, the current GCP default will be used.
CheckpointCacheControl string
// OtherCacheControl, if set, will cause the Cache-Control header associated with the
// all non-checkpoint objects to be set to this value. If unset, the current GCP default
// will be used.
OtherCacheControl string
}

// NewClient returns a Client which allows interaction with the log stored in
// the specified bucket on GCS.
func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) {
Expand All @@ -86,6 +74,7 @@ func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) {
gcsClient: c,
projectID: opts.ProjectID,
bucket: opts.Bucket,
checkpointGen: 0,
checkpointCacheControl: opts.CheckpointCacheControl,
otherCacheControl: opts.OtherCacheControl,
}, nil
Expand Down Expand Up @@ -136,17 +125,20 @@ func (c *Client) SetNextSeq(num uint64) {
c.nextSeq = num
}

// WriteCheckpoint stores a raw log checkpoint on GCS.
// WriteCheckpoint stores a raw log checkpoint on GCS if it matches the
// generation that the client thinks the checkpoint is.
func (c *Client) WriteCheckpoint(ctx context.Context, newCPRaw []byte) error {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)
w := obj.NewWriter(ctx)

w := obj.If(gcs.Conditions{GenerationMatch: c.checkpointGen}).NewWriter(ctx)
if c.checkpointCacheControl != "" {
w.ObjectAttrs.CacheControl = c.checkpointCacheControl
}
if _, err := w.Write(newCPRaw); err != nil {
return err
}
c.checkpointGen++
return w.Close()
}

Expand All @@ -155,13 +147,22 @@ func (c *Client) ReadCheckpoint(ctx context.Context) ([]byte, error) {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)

// Get the GCS generation number.
attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("Object(%q).Attrs: %w", obj, err)
}
c.checkpointGen = attrs.Generation

// Get the content of the checkpoint.
r, err := obj.NewReader(ctx)
if err != nil {
return nil, err
}
defer r.Close()

return io.ReadAll(r)
content, err := io.ReadAll(r)
return content, err
}

// GetTile returns the tile at the given tile-level and tile-index.
Expand Down

0 comments on commit e861c6c

Please sign in to comment.