Skip to content

Commit

Permalink
Apply GCS ACLs on a recursive copy, add populate and validate for Cop…
Browse files Browse the repository at this point in the history
…yGCSObjects (#146)

Populate function ensures Role is uppercase
Validate function checks:
- read access to source 
- write access to destination
- ACLRule.Entity is not empty
- ACLRule.Entity can actually be used
- ACLRule.Role matches a known value

Reorganize GCS specific code.
  • Loading branch information
adjackura authored Sep 6, 2017
1 parent 70bc67a commit f5b5f78
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 118 deletions.
51 changes: 0 additions & 51 deletions daisy/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,44 +15,15 @@
package daisy

import (
"fmt"
"math/rand"
"os"
"os/user"
"path"
"reflect"
"regexp"
"strings"
"time"
)

var (
bucket = `([a-z0-9][-_.a-z0-9]*)`
object = `(.+)`
// Many of the Google Storage URLs are supported below.
// It is preferred that customers specify their object using
// its gs://<bucket>/<object> URL.
bucketRegex = regexp.MustCompile(fmt.Sprintf(`^gs://%s/?$`, bucket))
gsRegex = regexp.MustCompile(fmt.Sprintf(`^gs://%s/%s$`, bucket, object))
// Check for the Google Storage URLs:
// http://<bucket>.storage.googleapis.com/<object>
// https://<bucket>.storage.googleapis.com/<object>
gsHTTPRegex1 = regexp.MustCompile(fmt.Sprintf(`^http[s]?://%s\.storage\.googleapis\.com/%s$`, bucket, object))
// http://storage.cloud.google.com/<bucket>/<object>
// https://storage.cloud.google.com/<bucket>/<object>
gsHTTPRegex2 = regexp.MustCompile(fmt.Sprintf(`^http[s]?://storage\.cloud\.google\.com/%s/%s$`, bucket, object))
// Check for the other possible Google Storage URLs:
// http://storage.googleapis.com/<bucket>/<object>
// https://storage.googleapis.com/<bucket>/<object>
//
// The following are deprecated but checked:
// http://commondatastorage.googleapis.com/<bucket>/<object>
// https://commondatastorage.googleapis.com/<bucket>/<object>
gsHTTPRegex3 = regexp.MustCompile(fmt.Sprintf(`^http[s]?://(?:commondata)?storage\.googleapis\.com/%s/%s$`, bucket, object))

gcsAPIBase = "https://storage.cloud.google.com"
)

func getUser() string {
if cu, err := user.Current(); err == nil {
return cu.Username
Expand Down Expand Up @@ -93,14 +64,6 @@ func filter(ss []string, s string) []string {
return result
}

func getGCSAPIPath(p string) (string, error) {
b, o, e := splitGCSPath(p)
if e != nil {
return "", e
}
return fmt.Sprintf("%s/%s", gcsAPIBase, path.Join(b, o)), nil
}

func minInt(x int, ys ...int) int {
for _, y := range ys {
if y < x {
Expand All @@ -120,20 +83,6 @@ func randString(n int) string {
return string(b)
}

func splitGCSPath(p string) (string, string, error) {
for _, rgx := range []*regexp.Regexp{gsRegex, gsHTTPRegex1, gsHTTPRegex2, gsHTTPRegex3} {
matches := rgx.FindStringSubmatch(p)
if matches != nil {
return matches[1], matches[2], nil
}
}
matches := bucketRegex.FindStringSubmatch(p)
if matches != nil {
return matches[1], "", nil
}
return "", "", fmt.Errorf("%q is not a valid GCS path", p)
}

func strIn(s string, ss []string) bool {
for _, x := range ss {
if s == x {
Expand Down
44 changes: 0 additions & 44 deletions daisy/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,6 @@ func TestFilter(t *testing.T) {
}
}

func TestGetGCSAPIPath(t *testing.T) {
got, err := getGCSAPIPath("gs://foo/bar")
want := "https://storage.cloud.google.com/foo/bar"
if err != nil {
t.Errorf("unexpected error: %s", err)
}
if got != want {
t.Errorf("unexpected result: got: %q, want: %q", got, want)
}
}

func TestMinInt(t *testing.T) {
tests := []struct {
desc string
Expand Down Expand Up @@ -322,36 +311,3 @@ func TestSubstitute(t *testing.T) {
}
}
}

func TestSplitGCSPath(t *testing.T) {
tests := []struct {
input string
bucket string
object string
shouldErr bool
}{
{"gs://foo", "foo", "", false},
{"gs://foo/bar", "foo", "bar", false},
{"http://foo.storage.googleapis.com/bar", "foo", "bar", false},
{"https://foo.storage.googleapis.com/bar", "foo", "bar", false},
{"http://storage.cloud.google.com/foo/bar", "foo", "bar", false},
{"https://storage.cloud.google.com/foo/bar/bar", "foo", "bar/bar", false},
{"http://storage.googleapis.com/foo/bar", "foo", "bar", false},
{"https://storage.googleapis.com/foo/bar", "foo", "bar", false},
{"http://commondatastorage.googleapis.com/foo/bar", "foo", "bar", false},
{"https://commondatastorage.googleapis.com/foo/bar", "foo", "bar", false},
{"/local/path", "", "", true},
}

for _, tt := range tests {
b, o, err := splitGCSPath(tt.input)
if tt.shouldErr && err == nil {
t.Errorf("splitGCSPath(%q) should have thrown an error", tt.input)
} else if !tt.shouldErr && err != nil {
t.Errorf("splitGCSPath(%q) should not have thrown an error", tt.input)
}
if b != tt.bucket || o != tt.object {
t.Errorf("splitGCSPath(%q) returned incorrect values -- want bucket=%q, object=%q; got bucket=%q, object=%q", tt.input, tt.bucket, tt.object, b, o)
}
}
}
90 changes: 85 additions & 5 deletions daisy/step_copy_gcs_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,88 @@ type CopyGCSObjects []CopyGCSObject
// CopyGCSFile copies a GCS file from Source to Destination.
type CopyGCSObject struct {
Source, Destination string
ACLRules []storage.ACLRule
ACLRules []*storage.ACLRule
}

func (c *CopyGCSObjects) populate(ctx context.Context, s *Step) error { return nil }
func (c *CopyGCSObjects) populate(ctx context.Context, s *Step) error {
for _, co := range *c {
for _, acl := range co.ACLRules {
acl.Role = storage.ACLRole(strings.ToUpper(string(acl.Role)))
}
}
return nil
}

func (c *CopyGCSObjects) validate(ctx context.Context, s *Step) error { return nil }
func (c *CopyGCSObjects) validate(ctx context.Context, s *Step) error {
for _, co := range *c {
sBkt, _, err := splitGCSPath(co.Source)
if err != nil {
return err
}
dBkt, _, err := splitGCSPath(co.Destination)
if err != nil {
return err
}

// Check if source bucket exists and is readable.
if !readableBkts.contains(sBkt) {
if _, err := s.w.StorageClient.Bucket(sBkt).Attrs(ctx); err != nil {
return fmt.Errorf("error reading bucket %q: %v", sBkt, err)
}
readableBkts.add(sBkt)
}

func recursiveGCS(ctx context.Context, w *Workflow, sBkt, sPrefix, dBkt, dPrefix string) error {
// Check if destination bucket exists and is readable.
if !writableBkts.contains(dBkt) {
if _, err := s.w.StorageClient.Bucket(dBkt).Attrs(ctx); err != nil {
return fmt.Errorf("error reading bucket %q: %v", dBkt, err)
}

// Check if destination bucket is writable.
tObj := s.w.StorageClient.Bucket(dBkt).Object(fmt.Sprintf("daisy-validate-%s-%s", s.name, s.w.id))
w := tObj.NewWriter(ctx)
if _, err := w.Write(nil); err != nil {
return err
}
if err := w.Close(); err != nil {
return fmt.Errorf("error writing to bucket %q: %v", dBkt, err)
}
if err := tObj.Delete(ctx); err != nil {
return err
}
writableBkts.add(dBkt)
}
// Check each ACLRule
for _, acl := range co.ACLRules {
if acl.Entity == "" {
return fmt.Errorf("ACLRule.Entity must not be empty: %+v", acl)
}
roles := []string{string(storage.RoleOwner), string(storage.RoleReader), string(storage.RoleWriter)}
if !strIn(string(acl.Role), roles) {
return fmt.Errorf("ACLRule.Role invalid: %q not one of %q", acl.Role, roles)
}

// Test ACLRule.Entity.
tObj := s.w.StorageClient.Bucket(dBkt).Object(fmt.Sprintf("daisy-validate-%s-%s", s.name, s.w.id))
w := tObj.NewWriter(ctx)
if _, err := w.Write(nil); err != nil {
return err
}
if err := w.Close(); err != nil {
return err
}
defer tObj.Delete(ctx)

if err := tObj.ACL().Set(ctx, acl.Entity, acl.Role); err != nil {
return fmt.Errorf("error validating ACLRule %+v: %v", acl, err)
}
}
}

return nil
}

func recursiveGCS(ctx context.Context, w *Workflow, sBkt, sPrefix, dBkt, dPrefix string, acls []*storage.ACLRule) error {
it := w.StorageClient.Bucket(sBkt).Objects(ctx, &storage.Query{Prefix: sPrefix})
for objAttr, err := it.Next(); err != iterator.Done; objAttr, err = it.Next() {
if err != nil {
Expand All @@ -53,6 +127,12 @@ func recursiveGCS(ctx context.Context, w *Workflow, sBkt, sPrefix, dBkt, dPrefix
if _, err := dstPath.CopierFrom(srcPath).Run(ctx); err != nil {
return err
}

for _, acl := range acls {
if err := dstPath.ACL().Set(ctx, acl.Entity, acl.Role); err != nil {
return err
}
}
}
return nil
}
Expand All @@ -77,7 +157,7 @@ func (c *CopyGCSObjects) run(ctx context.Context, s *Step) error {
}

if sObj == "" || strings.HasSuffix(sObj, "/") {
if err := recursiveGCS(ctx, s.w, sBkt, sObj, dBkt, dObj); err != nil {
if err := recursiveGCS(ctx, s.w, sBkt, sObj, dBkt, dObj, co.ACLRules); err != nil {
e <- fmt.Errorf("error copying from %s to %s: %v", co.Source, co.Destination, err)
return
}
Expand Down
105 changes: 87 additions & 18 deletions daisy/step_copy_gcs_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,114 @@ package daisy

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"

"cloud.google.com/go/storage"
"github.com/kylelemons/godebug/pretty"
"google.golang.org/api/option"
)

func TestCopyGCSObjectsPopulate(t *testing.T) {
if err := (&CopyGCSObjects{}).populate(context.Background(), &Step{}); err != nil {
t.Error("not implemented, err should be nil")
ctx := context.Background()
w := testWorkflow()
s := &Step{w: w}

ws := &CopyGCSObjects{
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []*storage.ACLRule{{Entity: "allAuthenticatedUsers", Role: "writer"}}},
}
if err := ws.populate(ctx, s); err != nil {
t.Errorf("error running CopyGCSObjects.populate(): %v", err)
}
want := &CopyGCSObjects{
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []*storage.ACLRule{{Entity: "allAuthenticatedUsers", Role: "WRITER"}}},
}
if diff := pretty.Compare(ws, want); diff != "" {
t.Errorf("populated CopyGCSObjects does not match expectation: (-got +want)\n%s", diff)
}
}

func TestCopyGCSObjectsRun(t *testing.T) {
func TestCopyGCSObjectsValidate(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u := r.URL.String()
m := r.Method

if m == "GET" && u == "/b/bucket1?alt=json&projection=full" {
fmt.Fprint(w, `{}`)
} else if m == "GET" && u == "/b/bucket3?alt=json&projection=full" {
fmt.Fprint(w, `{}`)
} else if m == "POST" && u == "/b/bucket1/o?alt=json&projection=full&uploadType=multipart" {
fmt.Fprint(w, `{}`)
} else if m == "DELETE" && u == "/b/bucket1/o/abcdef?alt=json" {
fmt.Fprint(w, `{}`)
} else if m == "DELETE" && u == "/b/bucket1/o/daisy-validate--abcdef?alt=json" {
fmt.Fprint(w, `{}`)
} else if m == "PUT" && u == "/b/bucket1/o/daisy-validate--abcdef/acl/allUsers?alt=json" {
fmt.Fprint(w, `{}`)
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "testGCSClient unknown request: %+v\n", r)
}
}))
sc, err := storage.NewClient(context.Background(), option.WithEndpoint(ts.URL), option.WithHTTPClient(http.DefaultClient))
if err != nil {
t.Fatal(err)
}
ctx := context.Background()
w := testWorkflow()
w.StorageClient = sc
s := &Step{w: w}
w.Steps = map[string]*Step{
"copy": {CopyGCSObjects: &CopyGCSObjects{{Source: "", Destination: ""}}},

ws := &CopyGCSObjects{
{Source: "gs://bucket1", Destination: "gs://bucket1"},
{Source: "gs://bucket1", Destination: "gs://bucket1", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
}
if err := ws.validate(ctx, s); err != nil {
t.Errorf("error running CopyGCSObjects.validate(): %v", err)
}

for _, ws := range []*CopyGCSObjects{
{{Source: "gs://bucket1", Destination: ""}},
{{Source: "", Destination: "gs://bucket1"}},
{{Source: "gs://bucket2", Destination: "gs://bucket1"}},
{{Source: "gs://bucket1", Destination: "gs://bucket2"}},
{{Source: "gs://bucket1", Destination: "gs://bucket3"}},
{{Source: "gs://bucket1", Destination: "gs://bucket1", ACLRules: []*storage.ACLRule{{Role: "owner"}}}},
{{Source: "gs://bucket1", Destination: "gs://bucket1", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "owner"}}}},
{{Source: "gs://bucket1", Destination: "gs://bucket1", ACLRules: []*storage.ACLRule{{Entity: "someUser", Role: "OWNER"}}}},
} {
if err := ws.validate(ctx, s); err == nil {
t.Error("expected error")
}
}
}

func TestCopyGCSObjectsRun(t *testing.T) {
ctx := context.Background()
w := testWorkflow()
s := &Step{w: w}

ws := &CopyGCSObjects{
{Source: "gs://bucket", Destination: "gs://bucket"},
{Source: "gs://bucket/object", Destination: "gs://bucket/object"},
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
{Source: "gs://bucket/object", Destination: "gs://bucket/object", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
{Source: "gs://bucket/object/", Destination: "gs://bucket/object/", ACLRules: []*storage.ACLRule{{Entity: "allUsers", Role: "OWNER"}}},
}
if err := ws.run(ctx, s); err != nil {
t.Errorf("error running CopyGCSObjects.run(): %v", err)
}

ws = &CopyGCSObjects{
{Source: "gs://bucket", Destination: ""},
{Source: "", Destination: "gs://bucket"},
}
if err := ws.run(ctx, s); err == nil {
t.Error("expected error")
}
}

func TestCopyGCSObjectsValidate(t *testing.T) {
if err := (&CopyGCSObjects{}).validate(context.Background(), &Step{}); err != nil {
t.Error("not implemented, err should be nil")
for _, ws := range []*CopyGCSObjects{
{{Source: "gs://bucket", Destination: ""}},
{{Source: "", Destination: "gs://bucket"}},
{{Source: "gs://bucket/object/", Destination: "gs://bucket/object/", ACLRules: []*storage.ACLRule{{Entity: "someUser", Role: "OWNER"}}}},
} {
if err := ws.run(ctx, s); err == nil {
t.Error("expected error")
}
}
}
Loading

0 comments on commit f5b5f78

Please sign in to comment.