Skip to content

Commit

Permalink
Create a GCS bucket if no GCSPath is set, guess at project and zone (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
adjackura authored Jun 30, 2017
1 parent 15b2572 commit 77920ac
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 95 deletions.
6 changes: 3 additions & 3 deletions daisy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ workflow and step field names are case-insensitive, but we suggest upper camel c
| Field Name | Type | Description |
|-|-|-|
| Name | string | The name of the workflow. Must be between 1-20 characters and match regex **[a-z]\([-a-z0-9]\*[a-z0-9])?**|
| Project | string | The GCE and GCS API enabled GCP project in which to run the workflow. |
| Zone | string | The GCE zone in which to run the workflow. |
| Project | string | The GCE and GCS API enabled GCP project in which to run the workflow, if no project is given and Daisy is running on a GCE instance, that instances project will be used. |
| Zone | string | The GCE zone in which to run the workflow, if no zone is given and Daisy is running on a GCE instance, that instances zone will be used. |
| OAuthPath | string | A local path to JSON credentials for your Project. These credentials should have full GCE permission and read/write permission to GCSPath. If credentials are not provided here, Daisy will look for locally cached user credentials such as are generated by `gcloud init`. |
| GCSPath | string | Daisy will use this location as scratch space and for logging/output results. **NOTE**: Your workflow VMs need access to this location, use a bucket in the same project that you will launch instances in or grant your Project's default service account read/write permissions.|
| GCSPath | string | Daisy will use this location as scratch space and for logging/output results, if no GCSPath is given and Daisy will create a bucket to use in the project, subsequent runs will reuse this bucket. **NOTE**: Your workflow VMs need access to this location, use a bucket in the same project that you will launch instances in or grant your Project's default service account read/write permissions.|
| Sources | map[string]string | A map of destination paths to local and GCS source paths. These sources will be uploaded to a subdirectory in GCSPath. The sources are referenced by their key name within the workflow config. See [Sources](#sources) below for more information. |
| Vars | map[string]string | A map of key value pairs. Vars are referenced by "${key}" within the workflow config. Caution should be taken to avoid conflicts with [autovars](#autovars). |
| Steps | map[string]Step | A map of step names to Steps. See [Steps](#steps) below for more information. |
Expand Down
1 change: 0 additions & 1 deletion daisy/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ func TestCreateInstance(t *testing.T) {
if r.Method == "POST" && r.URL.String() == fmt.Sprintf("/%s/zones/%s/instances?alt=json", testProject, testZone) {
if insertErr != nil {
w.WriteHeader(400)
fmt.Println(w, insertErr)
return
}
buf := new(bytes.Buffer)
Expand Down
80 changes: 46 additions & 34 deletions daisy/daisy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"

"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/storage"
"github.com/GoogleCloudPlatform/compute-image-tools/daisy/compute"
"github.com/GoogleCloudPlatform/compute-image-tools/daisy/workflow"
Expand Down Expand Up @@ -58,46 +59,53 @@ func splitVariables(input string) map[string]string {
return varMap
}

func parseWorkflows(ctx context.Context, paths []string, varMap map[string]string, project, zone, gcsPath, oauth, cEndpoint, sEndpoint string) ([]*workflow.Workflow, error) {
var ws []*workflow.Workflow
for _, path := range paths {
w, err := workflow.NewFromFile(path)
func parseWorkflow(ctx context.Context, path string, varMap map[string]string, project, zone, gcsPath, oauth, cEndpoint, sEndpoint string) (*workflow.Workflow, error) {
w, err := workflow.NewFromFile(path)
if err != nil {
return nil, err
}
for k, v := range varMap {
w.AddVar(k, v)
}

if project != "" {
w.Project = project
} else if w.Project == "" && metadata.OnGCE() {
w.Project, err = metadata.ProjectID()
if err != nil {
return nil, err
}
for k, v := range varMap {
w.AddVar(k, v)
}
if project != "" {
w.Project = project
}
if zone != "" {
w.Zone = zone
}
if gcsPath != "" {
w.GCSPath = gcsPath
}
if oauth != "" {
w.OAuthPath = oauth
}
if zone != "" {
w.Zone = zone
} else if w.Zone == "" && metadata.OnGCE() {
w.Zone, err = metadata.Zone()
if err != nil {
return nil, err
}
}
if gcsPath != "" {
w.GCSPath = gcsPath
}
if oauth != "" {
w.OAuthPath = oauth
}

if cEndpoint != "" {
w.ComputeClient, err = compute.NewClient(ctx, option.WithEndpoint(cEndpoint), option.WithServiceAccountFile(w.OAuthPath))
if err != nil {
return nil, err
}
if cEndpoint != "" {
w.ComputeClient, err = compute.NewClient(ctx, option.WithEndpoint(cEndpoint), option.WithServiceAccountFile(w.OAuthPath))
if err != nil {
return nil, err
}
}

if sEndpoint != "" {
w.StorageClient, err = storage.NewClient(ctx, option.WithEndpoint(sEndpoint), option.WithServiceAccountFile(w.OAuthPath))
if err != nil {
return nil, err
}
if sEndpoint != "" {
w.StorageClient, err = storage.NewClient(ctx, option.WithEndpoint(sEndpoint), option.WithServiceAccountFile(w.OAuthPath))
if err != nil {
return nil, err
}

ws = append(ws, w)
}
return ws, nil

return w, nil
}

func main() {
Expand All @@ -107,10 +115,14 @@ func main() {
}
ctx := context.Background()

var ws []*workflow.Workflow
varMap := splitVariables(*variables)
ws, err := parseWorkflows(ctx, flag.Args(), varMap, *project, *zone, *gcsPath, *oauth, *ce, *se)
if err != nil {
log.Fatal(err)
for _, path := range flag.Args() {
w, err := parseWorkflow(ctx, path, varMap, *project, *zone, *gcsPath, *oauth, *ce, *se)
if err != nil {
log.Fatalf("error parsing workflow %q: %v", path, err)
}
ws = append(ws, w)
}

errors := make(chan error, len(ws))
Expand Down
43 changes: 25 additions & 18 deletions daisy/daisy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,42 @@ func TestSplitVariables(t *testing.T) {
}

func TestParseWorkflows(t *testing.T) {
paths := []string{"./workflow/test.wf.json"}
path := "./workflow/test.wf.json"
varMap := map[string]string{"key1": "var1", "key2": "var2"}
project := "project"
zone := "zone"
gcsPath := "gcspath"
oauth := "oauthpath"
ws, err := parseWorkflows(context.Background(), paths, varMap, project, zone, gcsPath, oauth, "", "")
w, err := parseWorkflow(context.Background(), path, varMap, project, zone, gcsPath, oauth, "", "")
if err != nil {
t.Fatal(err)
}

for _, w := range ws {
tests := []struct {
want, got string
}{
{w.Project, project},
{w.Zone, zone},
{w.GCSPath, gcsPath},
{w.OAuthPath, oauth},
}
tests := []struct {
want, got string
}{
{w.Project, project},
{w.Zone, zone},
{w.GCSPath, gcsPath},
{w.OAuthPath, oauth},
}

for _, tt := range tests {
if tt.want != tt.got {
t.Errorf("%s != %v", varMap, w.Vars)
}
for _, tt := range tests {
if tt.want != tt.got {
t.Errorf("%s != %v", varMap, w.Vars)
}
}

if reflect.DeepEqual(w.Vars, varMap) {
t.Errorf("unexpected vars, want: %s, got: %v", varMap, w.Vars)
}
if reflect.DeepEqual(w.Vars, varMap) {
t.Errorf("unexpected vars, want: %s, got: %v", varMap, w.Vars)
}

want := "dialing: cannot read service account file: open oauthpath: no such file or directory"
if _, err := parseWorkflow(context.Background(), path, varMap, project, zone, gcsPath, oauth, "noplace", ""); err.Error() != want {
t.Errorf("did not get expected error, got: %q, want: %q", err.Error(), want)
}

if _, err := parseWorkflow(context.Background(), path, varMap, project, zone, gcsPath, oauth, "", "noplace"); err.Error() != want {
t.Errorf("did not get expected error, got: %q, want: %q", err.Error(), want)
}
}
7 changes: 4 additions & 3 deletions daisy/workflow/step_create_instances.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ func (c *CreateInstance) MarshalJSON() ([]byte, error) {
return json.Marshal(*c)
}

func logSerialOutput(ctx context.Context, w *Workflow, name string, port int64) {
func logSerialOutput(ctx context.Context, w *Workflow, name string, port int64, interval time.Duration) {
logsObj := path.Join(w.logsPath, fmt.Sprintf("%s-serial-port%d.log", name, port))
w.logger.Printf("CreateInstances: streaming instance %q serial port %d output to gs://%s/%s", name, port, w.bucket, logsObj)
var start int64
var buf bytes.Buffer
var errs int
tick := time.Tick(1 * time.Second)
tick := time.Tick(interval)
for {
select {
case <-ctx.Done():
Expand All @@ -89,6 +89,7 @@ func logSerialOutput(ctx context.Context, w *Workflow, name string, port int64)
}
// Otherwise retry 3 times on 5xx error.
if apiErr, ok := err.(*googleapi.Error); ok && errs < 3 && (apiErr.Code >= 500 && apiErr.Code <= 599) {
errs++
continue
}
w.logger.Printf("CreateInstances: instance %q: error getting serial port: %v", name, err)
Expand Down Expand Up @@ -353,7 +354,7 @@ func (c *CreateInstances) run(ctx context.Context, s *Step) error {
e <- err
return
}
go logSerialOutput(ctx, w, ci.Name, 1)
go logSerialOutput(ctx, w, ci.Name, 1, 1*time.Second)
instances[w].add(ci.daisyName, &resource{real: ci.Name, link: ci.SelfLink, noCleanup: ci.NoCleanup})
}(ci)
}
Expand Down
108 changes: 108 additions & 0 deletions daisy/workflow/step_create_instances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,126 @@
package workflow

import (
"bytes"
"context"
"errors"
"fmt"
"log"
"net/http"
"path"
"sort"
"strings"
"testing"
"time"

daisy_compute "github.com/GoogleCloudPlatform/compute-image-tools/daisy/compute"
"github.com/kylelemons/godebug/pretty"
compute "google.golang.org/api/compute/v1"
"reflect"
)

func TestLogSerialOutput(t *testing.T) {
ctx := context.Background()
w := testWorkflow()

var get []string
_, c, err := daisy_compute.NewTestClient(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && strings.Contains(r.URL.String(), "serialPort?alt=json&port=1") {
if len(get) == 0 {
fmt.Fprintln(w, `{"Contents":"test","Start":"0"}`)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
get = append(get, r.URL.String())
} else if r.Method == "GET" && strings.Contains(r.URL.String(), "serialPort?alt=json&port=2") {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintln(w, "500 error")
} else if r.Method == "GET" && strings.Contains(r.URL.String(), "serialPort?alt=json&port=3") {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, "400 error")
} else if r.Method == "GET" && strings.Contains(r.URL.String(), fmt.Sprintf("/%s/zones/%s/instances/i1", testProject, testZone)) {
fmt.Fprintln(w, `{"Status":"TERMINATED","SelfLink":"link"}`)
} else if r.Method == "GET" && strings.Contains(r.URL.String(), fmt.Sprintf("/%s/zones/%s/instances/i2", testProject, testZone)) {
fmt.Fprintln(w, `{"Status":"RUNNING","SelfLink":"link"}`)
} else if r.Method == "GET" && strings.Contains(r.URL.String(), fmt.Sprintf("/%s/zones/%s/instances/i3", testProject, testZone)) {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintln(w, "test error")
} else {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(w, "bad request: %+v", r)
}
get = append(get, r.URL.String())
}))
if err != nil {
t.Fatal(err)
}
w.ComputeClient = c
w.bucket = "test-bucket"

instances[w].m = map[string]*resource{
"i1": {real: w.genName("i1"), link: "link"},
"i2": {real: w.genName("i2"), link: "link"},
"i3": {real: w.genName("i3"), link: "link"},
}

var buf bytes.Buffer
w.logger = log.New(&buf, "", 0)

tests := []struct {
test, want, name string
port int64
get []string // Test expected api call flow.
}{
{
"400 error but instance stopped",
"CreateInstances: streaming instance \"i1\" serial port 2 output to gs://test-bucket/i1-serial-port2.log\n",
"i1",
2,
[]string{"/test-project/zones/test-zone/instances/i1/serialPort?alt=json&port=2&start=0", "/test-project/zones/test-zone/instances/i1?alt=json"},
},
{
"400 error but instance running",
"CreateInstances: streaming instance \"i2\" serial port 3 output to gs://test-bucket/i2-serial-port3.log\nCreateInstances: instance \"i2\": error getting serial port: googleapi: got HTTP response code 400 with body: 400 error\n",
"i2",
3,
[]string{"/test-project/zones/test-zone/instances/i2/serialPort?alt=json&port=3&start=0", "/test-project/zones/test-zone/instances/i2?alt=json"},
},
{
"500 error but instance running",
"CreateInstances: streaming instance \"i2\" serial port 2 output to gs://test-bucket/i2-serial-port2.log\nCreateInstances: instance \"i2\": error getting serial port: googleapi: got HTTP response code 500 with body: 500 error\n",
"i2",
2,
[]string{"/test-project/zones/test-zone/instances/i2/serialPort?alt=json&port=2&start=0", "/test-project/zones/test-zone/instances/i2?alt=json", "/test-project/zones/test-zone/instances/i2/serialPort?alt=json&port=2&start=0", "/test-project/zones/test-zone/instances/i2?alt=json", "/test-project/zones/test-zone/instances/i2/serialPort?alt=json&port=2&start=0", "/test-project/zones/test-zone/instances/i2?alt=json", "/test-project/zones/test-zone/instances/i2/serialPort?alt=json&port=2&start=0", "/test-project/zones/test-zone/instances/i2?alt=json"},
},
{
"500 error but instance deleted",
"CreateInstances: streaming instance \"i4\" serial port 2 output to gs://test-bucket/i4-serial-port2.log\n",
"i4",
2,
[]string{"/test-project/zones/test-zone/instances/i4/serialPort?alt=json&port=2&start=0"},
},
{
"normal flow",
"CreateInstances: streaming instance \"i1\" serial port 1 output to gs://test-bucket/i1-serial-port1.log\n",
"i1",
1,
[]string{"/test-project/zones/test-zone/instances/i1/serialPort?alt=json&port=1&start=0", "/test-project/zones/test-zone/instances/i1/serialPort?alt=json&port=1&start=0", "/test-project/zones/test-zone/instances/i1/serialPort?alt=json&port=1&start=0", "/test-project/zones/test-zone/instances/i1/serialPort?alt=json&port=1&start=0", "/test-project/zones/test-zone/instances/i1?alt=json"},
},
}

for _, tt := range tests {
get = nil
buf.Reset()
logSerialOutput(ctx, w, tt.name, tt.port, 1*time.Microsecond)
if !reflect.DeepEqual(get, tt.get) {
t.Errorf("%s: got get calls: %q, want get calls: %q", tt.test, get, tt.get)
}
if buf.String() != tt.want {
t.Errorf("%s: got: %q, want: %q", tt.test, buf.String(), tt.want)
}
}
}

func TestCreateInstancePopulate(t *testing.T) {
ctx := context.Background()
w := testWorkflow()
Expand Down
Loading

0 comments on commit 77920ac

Please sign in to comment.