diff --git a/storage/gcp/gcp.go b/storage/gcp/gcp.go index 3194f402..bf4e95a7 100644 --- a/storage/gcp/gcp.go +++ b/storage/gcp/gcp.go @@ -59,15 +59,17 @@ import ( ) const ( - entryBundleSize = 256 - logContType = "application/octet-stream" - ckptContType = "text/plain; charset=utf-8" // minCheckpointInterval is the shortest permitted interval between updating published checkpoints. // GCS has a rate limit 1 update per second for individual objects, but we've observed that attempting // to update at exactly that rate still results in the occasional refusal, so bake in a little wiggle // room. minCheckpointInterval = 1200 * time.Millisecond + logContType = "application/octet-stream" + ckptContType = "text/plain; charset=utf-8" + logCacheControl = "max-age=604800,immutable" + ckptCacheControl = "no-cache" + DefaultPushbackMaxOutstanding = 4096 DefaultIntegrationSizeLimit = 5 * 4096 ) @@ -88,7 +90,7 @@ type Storage struct { // objStore describes a type which can store and retrieve objects. type objStore interface { getObject(ctx context.Context, obj string) ([]byte, int64, error) - setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string) error + setObject(ctx context.Context, obj string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error lastModified(ctx context.Context, obj string) (time.Time, error) } @@ -270,7 +272,7 @@ func (s *Storage) publishCheckpoint(ctx context.Context, minStaleness time.Durat return fmt.Errorf("newCP: %v", err) } - if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType); err != nil { + if err := s.objStore.setObject(ctx, layout.CheckpointPath, cpRaw, nil, ckptContType, ckptCacheControl); err != nil { return fmt.Errorf("writeCheckpoint: %v", err) } return nil @@ -288,7 +290,7 @@ func (s *Storage) setTile(ctx context.Context, level, index, logSize uint64, til tPath := layout.TilePath(level, index, layout.PartialTileSize(level, index, logSize)) klog.V(2).Infof("StoreTile: %s (%d entries)", tPath, len(tile.Nodes)) - return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType) + return s.objStore.setObject(ctx, tPath, data, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl) } // getTiles returns the tiles with the given tile-coords for the specified log size. @@ -351,7 +353,7 @@ func (s *Storage) setEntryBundle(ctx context.Context, bundleIndex uint64, p uint // Note that setObject does an idempotent interpretation of DoesNotExist - it only // returns an error if the named object exists _and_ contains different data to what's // passed in here. - if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType); err != nil { + if err := s.objStore.setObject(ctx, objName, bundleRaw, &gcs.Conditions{DoesNotExist: true}, logContType, logCacheControl); err != nil { return fmt.Errorf("setObject(%q): %v", objName, err) } @@ -748,7 +750,7 @@ func (s *gcsStorage) getObject(ctx context.Context, obj string) ([]byte, int64, // Note that when preconditions are specified and are not met, an error will be returned *unless* // the currently stored data is bit-for-bit identical to the data to-be-written. // This is intended to provide idempotentency for writes. -func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string) error { +func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, cond *gcs.Conditions, contType string, cacheCtl string) error { bkt := s.gcsClient.Bucket(s.bucket) obj := bkt.Object(objName) @@ -760,6 +762,7 @@ func (s *gcsStorage) setObject(ctx context.Context, objName string, data []byte, w = obj.If(*cond).NewWriter(ctx) } w.ObjectAttrs.ContentType = contType + w.ObjectAttrs.CacheControl = cacheCtl if _, err := w.Write(data); err != nil { return fmt.Errorf("failed to write object %q to bucket %q: %w", objName, s.bucket, err) } diff --git a/storage/gcp/gcp_test.go b/storage/gcp/gcp_test.go index 682cf1e6..ed44df86 100644 --- a/storage/gcp/gcp_test.go +++ b/storage/gcp/gcp_test.go @@ -347,7 +347,7 @@ func TestPublishCheckpoint(t *testing.T) { t.Fatalf("storage.init: %v", err) } cpOld := []byte("bananas") - if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, ""); err != nil { + if err := m.setObject(ctx, layout.CheckpointPath, cpOld, nil, "", ""); err != nil { t.Fatalf("setObject(bananas): %v", err) } m.lMod = test.cpModifiedAt @@ -394,7 +394,7 @@ func (m *memObjStore) getObject(_ context.Context, obj string) ([]byte, int64, e } // TODO(phboneff): add content type tests -func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _ string) error { +func (m *memObjStore) setObject(_ context.Context, obj string, data []byte, cond *gcs.Conditions, _, _ string) error { m.Lock() defer m.Unlock()