Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcs immu #10

Closed
wants to merge 13 commits into from
3 changes: 3 additions & 0 deletions cli/command_repository_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,13 @@ func (c *commandRepositoryCreate) populateRepository(ctx context.Context, passwo

c.out.printStderr("\nTo find more information about default policy run 'kopia policy get'.\nTo change the policy use 'kopia policy set' command.\n")

c.out.printStdout("before setDefaultMaintenanceParameters...")
if err := setDefaultMaintenanceParameters(ctx, w); err != nil {
c.out.printStdout("setDefaultMaintenanceParameters ERR!" + err.Error())
return errors.Wrap(err, "unable to set maintenance parameters")
}

c.out.printStdout("done with populateRepository...")
return nil
})
}
23 changes: 23 additions & 0 deletions cli/storage_gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"os"
"time"

"github.com/alecthomas/kingpin/v2"
"github.com/pkg/errors"
Expand All @@ -26,11 +27,33 @@ func (c *storageGCSFlags) Setup(_ StorageProviderServices, cmd *kingpin.CmdClaus
cmd.Flag("embed-credentials", "Embed GCS credentials JSON in Kopia configuration").BoolVar(&c.embedCredentials)

commonThrottlingFlags(cmd, &c.options.Limits)

var pointInTimeStr string

pitPreAction := func(pc *kingpin.ParseContext) error {
if pointInTimeStr != "" {
t, err := time.Parse(time.RFC3339, pointInTimeStr)
if err != nil {
return errors.Wrap(err, "invalid point-in-time argument")
}

c.options.PointInTime = &t
}

return nil
}

cmd.Flag("point-in-time", "Use a point-in-time view of the storage repository when supported").PlaceHolder(time.RFC3339).PreAction(pitPreAction).StringVar(&pointInTimeStr)

}

func (c *storageGCSFlags) Connect(ctx context.Context, isCreate bool, formatVersion int) (blob.Storage, error) {
_ = formatVersion

if isCreate && c.options.PointInTime != nil && !c.options.PointInTime.IsZero() {
return nil, errors.New("Cannot specify a 'point-in-time' option when creating a repository")
}

if c.embedCredentials {
data, err := os.ReadFile(c.options.ServiceAccountCredentialsFile)
if err != nil {
Expand Down
80 changes: 0 additions & 80 deletions repo/blob/gcs/gcs_internal_test.go

This file was deleted.

4 changes: 4 additions & 0 deletions repo/blob/gcs/gcs_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gcs

import (
"encoding/json"
"time"

"github.com/kopia/kopia/repo/blob/throttling"
)
Expand All @@ -24,4 +25,7 @@ type Options struct {
ReadOnly bool `json:"readOnly,omitempty"`

throttling.Limits

// PointInTime specifies a view of the (versioned) store at that time
PointInTime *time.Time `json:"pointInTime,omitempty"`
}
138 changes: 138 additions & 0 deletions repo/blob/gcs/gcs_pit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package gcs

import (
"context"
"time"

"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/readonly"
"github.com/pkg/errors"
)

type gcsPointInTimeStorage struct {
gcsStorage

pointInTime time.Time
}

func (gcs *gcsPointInTimeStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error {
var (
previousID blob.ID
vs []versionMetadata
)
err := gcs.listBlobVersions(ctx, blobIDPrefix, func(vm versionMetadata) error {
if vm.BlobID != previousID {
// different blob, process previous one
if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found {
if err := cb(v.Metadata); err != nil {
return err
}
}

previousID = vm.BlobID
vs = vs[:0] // reset for next blob
}

vs = append(vs, vm)

return nil
})
if err != nil {
return errors.Wrapf(err, "could not list blob versions at time %s", gcs.pointInTime)
}

// process last blob
if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found {
if err := cb(v.Metadata); err != nil {
return err
}
}

return nil
}

func (gcs *gcsPointInTimeStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error {
// getMetadata returns the specific blob version at time t
m, err := gcs.getMetadata(ctx, b)
if err != nil {
return errors.Wrap(err, "getting metadata")
}

return gcs.getBlobWithVersion(ctx, b, m.Version, offset, length, output)
}

func (gcs *gcsPointInTimeStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) {
bm, err := gcs.getMetadata(ctx, b)

return bm.Metadata, err
}

func (gcs *gcsPointInTimeStorage) getMetadata(ctx context.Context, b blob.ID) (versionMetadata, error) {
var vml []versionMetadata

if err := gcs.getBlobVersions(ctx, b, func(m versionMetadata) error {
// only include versions older than s.pointInTime
if !m.Timestamp.After(gcs.pointInTime) {
vml = append(vml, m)
}

return nil
}); err != nil {
return versionMetadata{}, errors.Wrapf(err, "could not get version metadata for blob %s", b)
}

if v, found := newestAtUnlessDeleted(vml, gcs.pointInTime); found {
return v, nil
}

return versionMetadata{}, blob.ErrBlobNotFound
}

// newestAtUnlessDeleted returns the last version in the list older than the PIT.
func newestAtUnlessDeleted(vx []versionMetadata, t time.Time) (v versionMetadata, found bool) {
vs := getOlderThan(vx, t)

if len(vs) == 0 {
return versionMetadata{}, false
}

v = vs[len(vs)-1]

return v, !v.IsDeleteMarker
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

// Removes versions that are newer than t. The filtering is done in place
// and uses the same slice storage as vs. Assumes entries in vs are in descending
// timestamp order.
func getOlderThan(vs []versionMetadata, t time.Time) []versionMetadata {
for i := range vs {
if vs[i].Timestamp.After(t) {
return vs[:i]
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
}

return vs
}

// maybePointInTimeStore wraps s with a point-in-time store when s is versioned
// and a point-in-time value is specified. Otherwise s is returned.
func maybePointInTimeStore(ctx context.Context, gcs *gcsStorage, pointInTime *time.Time) (blob.Storage, error) {
if pit := gcs.Options.PointInTime; pit == nil || pit.IsZero() {
return gcs, nil
}

// Does the bucket supports versioning?
attrs, err := gcs.bucket.Attrs(ctx)
if err != nil {
return nil, errors.Wrapf(err, "could not get determine if bucket '%s' supports versioning", gcs.BucketName)
}

if !attrs.VersioningEnabled {
return nil, errors.Errorf("cannot create point-in-time view for non-versioned bucket '%s'", gcs.BucketName)
}

return readonly.NewWrapper(&gcsPointInTimeStorage{
gcsStorage: *gcs,
pointInTime: *pointInTime,
}), nil
}
Loading
Loading