diff --git a/experimental/gcp-log/internal/storage/storage.go b/experimental/gcp-log/internal/storage/storage.go index 8cb8317..cbc7821 100644 --- a/experimental/gcp-log/internal/storage/storage.go +++ b/experimental/gcp-log/internal/storage/storage.go @@ -16,6 +16,7 @@ package storage import ( + "bytes" "context" "errors" "fmt" @@ -380,7 +381,7 @@ func (c *Client) Sequence(ctx context.Context, leafhash []byte, leaf []byte) (ui return 0, fmt.Errorf("couldn't create leafhash object: %w", err) } if err := wLeaf.Close(); err != nil { - return 0, fmt.Errorf("couldn't close writer for object %q", leafPath) + return 0, fmt.Errorf("couldn't close writer for object %q, %w", leafPath, err) } // All done! @@ -388,6 +389,30 @@ func (c *Client) Sequence(ctx context.Context, leafhash []byte, leaf []byte) (ui } } +// assertContent checks that the content at `gcsPath` matches the passed in `data`. +func (c *Client) assertContent(ctx context.Context, gcsPath string, data []byte) (equal bool, err error) { + bkt := c.gcsClient.Bucket(c.bucket) + + obj := bkt.Object(gcsPath) + r, err := obj.NewReader(ctx) + if err != nil { + klog.V(2).Infof("assertContent: failed to create reader for object %q in bucket %q: %v", + gcsPath, c.bucket, err) + return false, err + } + defer r.Close() + + gcsData, err := io.ReadAll(r) + if err != nil { + return false, err + } + + if bytes.Equal(gcsData, data) { + return true, nil + } + return false, nil +} + // StoreTile writes a tile out to GCS. // Fully populated tiles are stored at the path corresponding to the level & // index parameters, partially populated (i.e. right-hand edge) tiles are @@ -409,12 +434,34 @@ func (c *Client) StoreTile(ctx context.Context, level, index uint64, tile *api.T tPath := filepath.Join(layout.TilePath("", level, index, tileSize%256)) obj := bkt.Object(tPath) - w := obj.NewWriter(ctx) + // Tiles, partial or full, should only be written once. + w := obj.If(gcs.Conditions{DoesNotExist: true}).NewWriter(ctx) if c.otherCacheControl != "" { w.ObjectAttrs.CacheControl = c.otherCacheControl } if _, err := w.Write(t); err != nil { return fmt.Errorf("failed to write tile object %q to bucket %q: %w", tPath, c.bucket, err) } - return w.Close() + + if err := w.Close(); err != nil { + switch ee := err.(type) { + case *googleapi.Error: + // If we run into a precondition failure error, check that the object + // which exists contains the same content that we want to write. + if ee.Code == http.StatusPreconditionFailed { + if equal, err := c.assertContent(ctx, tPath, t); err != nil { + return fmt.Errorf("failed to read content of %q: %w", tPath, err) + } else if !equal { + return fmt.Errorf("assertion that tile content for %q has not changed failed", tPath) + } + + klog.V(2).Infof("StoreTile: identical tile already exists for level %d index %x ts: %x", level, index, tileSize) + return nil + } + default: + return err + } + } + + return nil }