Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add conditional writes for the checkpoint. #61

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion experimental/gcp-log/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,9 @@ func Integrate(w http.ResponseWriter, r *http.Request) {
return
}

func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, s note.Signer, client *storage.Client, origin string) error {
// signAndWrite signs a checkpoint and writes the new checkpoint to GCS.
func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note,
s note.Signer, client *storage.Client, origin string) error {
cp.Origin = origin
cpNote.Text = string(cp.Marshal())
cpNoteSigned, err := note.Sign(&cpNote, s)
Expand Down
26 changes: 24 additions & 2 deletions experimental/gcp-log/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Client struct {
// Note that nextSeq may be <= than the actual next available number, but
// never greater.
nextSeq uint64
// checkpointGen is the GCS object generation number that this client last
// read. This is useful for read-modify-write operation of the checkpoint.
checkpointGen int64

checkpointCacheControl string
otherCacheControl string
Expand Down Expand Up @@ -86,6 +89,7 @@ func NewClient(ctx context.Context, opts ClientOpts) (*Client, error) {
gcsClient: c,
projectID: opts.ProjectID,
bucket: opts.Bucket,
checkpointGen: 0,
checkpointCacheControl: opts.CheckpointCacheControl,
otherCacheControl: opts.OtherCacheControl,
}, nil
Expand Down Expand Up @@ -136,17 +140,27 @@ func (c *Client) SetNextSeq(num uint64) {
c.nextSeq = num
}

// WriteCheckpoint stores a raw log checkpoint on GCS.
// WriteCheckpoint stores a raw log checkpoint on GCS if it matches the
// generation that the client thinks the checkpoint is.
func (c *Client) WriteCheckpoint(ctx context.Context, newCPRaw []byte) error {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)
w := obj.NewWriter(ctx)

var cond gcs.Conditions
if c.checkpointGen == 0 {
cond = gcs.Conditions{DoesNotExist: true}
} else {
cond = gcs.Conditions{GenerationMatch: c.checkpointGen}
}

w := obj.If(cond).NewWriter(ctx)
if c.checkpointCacheControl != "" {
w.ObjectAttrs.CacheControl = c.checkpointCacheControl
}
if _, err := w.Write(newCPRaw); err != nil {
return err
}
c.checkpointGen++
return w.Close()
}

Expand All @@ -155,6 +169,14 @@ func (c *Client) ReadCheckpoint(ctx context.Context) ([]byte, error) {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)

// Get the GCS generation number.
attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("Object(%q).Attrs: %w", obj, err)
}
c.checkpointGen = attrs.Generation

// Get the content of the checkpoint.
r, err := obj.NewReader(ctx)
if err != nil {
return nil, err
Expand Down
Loading