Skip to content

Commit

Permalink
Add conditional writes for the checkpoint.
Browse files Browse the repository at this point in the history
  • Loading branch information
jiggoha committed Dec 14, 2023
1 parent 0d4d1b4 commit d0688e1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
4 changes: 3 additions & 1 deletion experimental/gcp-log/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,9 @@ func Integrate(w http.ResponseWriter, r *http.Request) {
return
}

func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note, s note.Signer, client *storage.Client, origin string) error {
// signAndWrite signs a checkpoint and writes the new checkpoint to GCS.
func signAndWrite(ctx context.Context, cp *fmtlog.Checkpoint, cpNote note.Note,
s note.Signer, client *storage.Client, origin string) error {
cp.Origin = origin
cpNote.Text = string(cp.Marshal())
cpNoteSigned, err := note.Sign(&cpNote, s)
Expand Down
28 changes: 22 additions & 6 deletions experimental/gcp-log/internal/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ type Client struct {
// Note that nextSeq may be <= than the actual next available number, but
// never greater.
nextSeq uint64
// checkpointGen is the GCS object generation number that this client last
// read. This is useful for read-modify-write operation of the checkpoint.
checkpointGen int64
}

// NewClient returns a Client which allows interaction with the log stored in
Expand All @@ -65,9 +68,10 @@ func NewClient(ctx context.Context, projectID, bucket string) (*Client, error) {
}

return &Client{
gcsClient: c,
projectID: projectID,
bucket: bucket,
gcsClient: c,
projectID: projectID,
bucket: bucket,
checkpointGen: 0,
}, nil
}

Expand Down Expand Up @@ -116,14 +120,17 @@ func (c *Client) SetNextSeq(num uint64) {
c.nextSeq = num
}

// WriteCheckpoint stores a raw log checkpoint on GCS.
// WriteCheckpoint stores a raw log checkpoint on GCS if it matches the
// generation that the client thinks the checkpoint is.
func (c *Client) WriteCheckpoint(ctx context.Context, newCPRaw []byte) error {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)
w := obj.NewWriter(ctx)

w := obj.If(gcs.Conditions{GenerationMatch: c.checkpointGen}).NewWriter(ctx)
if _, err := w.Write(newCPRaw); err != nil {
return err
}
c.checkpointGen++
return w.Close()
}

Expand All @@ -132,13 +139,22 @@ func (c *Client) ReadCheckpoint(ctx context.Context) ([]byte, error) {
bkt := c.gcsClient.Bucket(c.bucket)
obj := bkt.Object(layout.CheckpointPath)

// Get the GCS generation number.
attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, fmt.Errorf("Object(%q).Attrs: %w", obj, err)
}
c.checkpointGen = attrs.Generation

// Get the content of the checkpoint.
r, err := obj.NewReader(ctx)
if err != nil {
return nil, err
}
defer r.Close()

return io.ReadAll(r)
content, err := io.ReadAll(r)
return content, err
}

// GetTile returns the tile at the given tile-level and tile-index.
Expand Down

0 comments on commit d0688e1

Please sign in to comment.