From d64f46399441371e9d3b70d6324a48b73106a8fc Mon Sep 17 00:00:00 2001 From: Timothy Jennison Date: Thu, 18 Oct 2018 14:57:34 -0400 Subject: [PATCH] Add support for exporting v1 operations --- export/export/export.go | 408 +++++++++++++++++++ export/main.go | 76 ++++ genomics/genomics.go | 116 ++++++ pipelines/internal/commands/export/export.go | 217 +--------- pipelines/main.go | 74 +--- 5 files changed, 618 insertions(+), 273 deletions(-) create mode 100644 export/export/export.go create mode 100644 export/main.go create mode 100644 genomics/genomics.go diff --git a/export/export/export.go b/export/export/export.go new file mode 100644 index 0000000..f058024 --- /dev/null +++ b/export/export/export.go @@ -0,0 +1,408 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package export provides helpers for exporting pipelines to BigQuery. +package export + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "cloud.google.com/go/bigquery" + genomicsv1 "google.golang.org/api/genomics/v1alpha2" + genomics "google.golang.org/api/genomics/v2alpha1" + "google.golang.org/api/iterator" +) + +type Config struct { + Project, Filter, Dataset, Table string + Update bool +} + +func (c *Config) ExportV2(ctx context.Context, service *genomics.Service) error { + e, filter, err := c.newExporter(ctx, func(filter string, t time.Time) string { + return combineTerms(fmt.Sprintf("metadata.createTime > %q", t.Format(time.RFC3339Nano)), filter, "%s AND (%s)") + }) + if err != nil { + return fmt.Errorf("creating exporter: %v", err) + } + + path := fmt.Sprintf("projects/%s/operations", c.Project) + call := service.Projects.Operations.List(path).Context(ctx).PageSize(256).Filter(filter) + err = call.Pages(ctx, func(resp *genomics.ListOperationsResponse) error { + e.startPage() + + for _, operation := range resp.Operations { + var metadata genomics.Metadata + if err := json.Unmarshal(operation.Metadata, &metadata); err != nil { + return fmt.Errorf("unmarshalling operation (after %d operations): %v", e.Count, err) + } + + pipeline, err := json.Marshal(metadata.Pipeline) + if err != nil { + return fmt.Errorf("marshalling pipeline (after %d operations): %v", e.Count, err) + } + resources := metadata.Pipeline.Resources + + r := row{ + Name: operation.Name, + Done: operation.Done, + CreateTime: parseTimestamp(metadata.CreateTime).Timestamp, + StartTime: parseTimestamp(metadata.StartTime), + EndTime: parseTimestamp(metadata.EndTime), + + Pipeline: string(pipeline), + Regions: resources.Regions, + Zones: resources.Zones, + MachineType: resources.VirtualMachine.MachineType, + Preemptible: resources.VirtualMachine.Preemptible, + } + + if operation.Error != nil { + r.Error = &status{ + Message: operation.Error.Message, + Code: operation.Error.Code, + } + } + + for k, v := range metadata.Labels { + r.Labels = append(r.Labels, label{Key: k, Value: v}) + } + + for _, e := range metadata.Events { + r.Events = append(r.Events, event{ + Timestamp: parseTimestamp(e.Timestamp).Timestamp, + Description: e.Description, + }) + } + + if err := e.encode(&r); err != nil { + return fmt.Errorf("encoding row (after %d operations): %v", e.Count, err) + } + } + + if err := e.finishPage(ctx); err != nil { + return fmt.Errorf("finishing page (after %d operations): %v", e.Count, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("exporting operations: %v", err) + } + + e.finish() + return nil +} + +func (c *Config) ExportV1(ctx context.Context, service *genomicsv1.Service) error { + e, filter, err := c.newExporter(ctx, func(filter string, t time.Time) string { + return combineTerms(fmt.Sprintf("createTime >= %d", t.Unix()+1), filter, "%s AND %s") + }) + if err != nil { + return fmt.Errorf("creating exporter: %v", err) + } + + filter = combineTerms("projectId = "+c.Project, filter, "%s AND %s") + + call := service.Operations.List("operations").Context(ctx).PageSize(256).Filter(filter) + err = call.Pages(ctx, func(resp *genomicsv1.ListOperationsResponse) error { + e.startPage() + + for _, operation := range resp.Operations { + var metadata genomicsv1.OperationMetadata + if err := json.Unmarshal(operation.Metadata, &metadata); err != nil { + return fmt.Errorf("unmarshalling operation (after %d operations): %v", e.Count, err) + } + + if metadata.Request == nil { + continue + } + + var t struct { + Type string `json:"@type"` + } + if err := json.Unmarshal(metadata.Request, &t); err != nil { + return fmt.Errorf("unmarshalling request type (after %d operations): %v", e.Count, err) + } + if !strings.HasSuffix(t.Type, ".RunPipelineRequest") { + continue + } + + var request genomicsv1.RunPipelineRequest + if err := json.Unmarshal(metadata.Request, &request); err != nil { + return fmt.Errorf("unmarshalling request (after %d operations): %v", e.Count, err) + } + + var zones []string + var preemptible bool + if request.EphemeralPipeline != nil && request.EphemeralPipeline.Resources != nil { + zones = request.EphemeralPipeline.Resources.Zones + preemptible = request.EphemeralPipeline.Resources.Preemptible + } + if request.PipelineArgs != nil && request.PipelineArgs.Resources != nil { + zones = request.PipelineArgs.Resources.Zones + preemptible = request.PipelineArgs.Resources.Preemptible + } + + r := row{ + Name: operation.Name, + Done: operation.Done, + CreateTime: parseTimestamp(metadata.CreateTime).Timestamp, + StartTime: parseTimestamp(metadata.StartTime), + EndTime: parseTimestamp(metadata.EndTime), + + Pipeline: string(metadata.Request), + Zones: zones, + Preemptible: preemptible, + } + + if metadata.RuntimeMetadata != nil { + var runtime genomicsv1.RuntimeMetadata + if err := json.Unmarshal(metadata.RuntimeMetadata, &runtime); err != nil { + return fmt.Errorf("unmarshalling request (after %d operations): %v", e.Count, err) + } + + if runtime.ComputeEngine != nil { + parts := strings.Split(runtime.ComputeEngine.MachineType, "/") + r.MachineType = parts[1] + } + } + + if operation.Error != nil { + r.Error = &status{ + Message: operation.Error.Message, + Code: operation.Error.Code, + } + } + + if request.PipelineArgs != nil { + for k, v := range request.PipelineArgs.Labels { + r.Labels = append(r.Labels, label{Key: k, Value: v}) + } + } + + for _, e := range metadata.Events { + r.Events = append(r.Events, event{ + Timestamp: parseTimestamp(e.StartTime).Timestamp, + Description: e.Description, + }) + } + + if err := e.encode(&r); err != nil { + return fmt.Errorf("encoding row (after %d operations): %v", e.Count, err) + } + } + + if err := e.finishPage(ctx); err != nil { + return fmt.Errorf("finishing page (after %d operations): %v", e.Count, err) + } + + return nil + }) + if err != nil { + return fmt.Errorf("exporting operations: %v", err) + } + + e.finish() + return nil +} + +type exporter struct { + Count int + + bq *bigquery.Client + table *bigquery.Table + schema bigquery.Schema + + buffer bytes.Buffer + encoder *json.Encoder +} + +func (c *Config) newExporter(ctx context.Context, addTimestamp func(f string, t time.Time) string) (*exporter, string, error) { + bq, err := bigquery.NewClient(ctx, c.Project) + if err != nil { + return nil, "", fmt.Errorf("creating BigQuery client: %v", err) + } + + dataset := bq.Dataset(c.Dataset) + if _, err := dataset.Metadata(ctx); err != nil { + return nil, "", fmt.Errorf("looking up dataset: %v", err) + } + + schema, err := bigquery.InferSchema(row{}) + if err != nil { + return nil, "", fmt.Errorf("inferring schema: %v", err) + } + + filter := c.Filter + table := dataset.Table(c.Table) + if _, err := table.Metadata(ctx); err != nil { + if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { + return nil, "", fmt.Errorf("creating table: %v", err) + } + } else if c.Update { + timestamp, err := c.latestTimestamp(ctx, bq, c.Project) + if err != nil { + return nil, "", fmt.Errorf("retrieving latest timestamp: %v", err) + } + if timestamp != nil { + filter = addTimestamp(filter, *timestamp) + } + } + + fmt.Printf("Exporting operations") + + return &exporter{bq: bq, table: table, schema: schema}, filter, nil +} + +func (e *exporter) startPage() { + e.buffer.Reset() + e.encoder = json.NewEncoder(&e.buffer) + e.encoder.SetEscapeHTML(false) +} + +func (e *exporter) encode(r *row) error { + e.Count++ + return e.encoder.Encode(r) +} + +func (e *exporter) finishPage(ctx context.Context) error { + fmt.Printf(".") + + if e.buffer.Len() == 0 { + return nil + } + + source := bigquery.NewReaderSource(&e.buffer) + source.Schema = e.schema + source.SourceFormat = bigquery.JSON + loader := e.table.LoaderFrom(source) + + job, err := loader.Run(ctx) + if err != nil { + return fmt.Errorf("running loader (after %d operations): %v", e.Count, err) + } + + status, err := job.Wait(ctx) + if err != nil { + return fmt.Errorf("waiting for job (after %d operations): %v", e.Count, err) + } + + if err := status.Err(); err != nil { + for _, e := range status.Errors { + fmt.Println(e) + } + return fmt.Errorf("job status (after %d operations): %v", e.Count, err) + } + + return nil +} + +func (e *exporter) finish() { + fmt.Printf("done\n%d operations exported\n", e.Count) +} + +type row struct { + Name string + Done bool + Error *status `bigquery:",nullable"` + + // The raw pipeline JSON. + Pipeline string + + Labels []label + Events []event + + CreateTime time.Time + StartTime, EndTime bigquery.NullTimestamp + + // Additional fields pulled out of the pipeline for convenience. + Regions []string + Zones []string + MachineType string + Preemptible bool +} + +type status struct { + Message string + Code int64 +} + +type event struct { + Timestamp time.Time + Description string +} + +type label struct { + Key, Value string +} + +func parseTimestamp(ts string) bigquery.NullTimestamp { + t, err := time.Parse(time.RFC3339Nano, ts) + if err != nil { + return bigquery.NullTimestamp{} + } + return bigquery.NullTimestamp{ + Timestamp: t, + Valid: true, + } +} + +func (c *Config) latestTimestamp(ctx context.Context, bq *bigquery.Client, project string) (*time.Time, error) { + // Query in micros otherwise BigQuery returns the timestamp as a double. + q := bq.Query(fmt.Sprintf("SELECT UNIX_MICROS(MAX(CreateTime)) FROM `%s.%s.%s`", project, c.Dataset, c.Table)) + job, err := q.Run(ctx) + if err != nil { + return nil, fmt.Errorf("running query: %v", err) + } + status, err := job.Wait(ctx) + if err != nil { + return nil, fmt.Errorf("waiting for query: %v", err) + } + if err := status.Err(); err != nil { + return nil, fmt.Errorf("query status: %v", err) + } + it, err := job.Read(ctx) + if err != nil { + return nil, fmt.Errorf("reading query: %v", err) + } + + var v []bigquery.Value + err = it.Next(&v) + if err == iterator.Done { + return nil, nil + } + if err != nil { + return nil, fmt.Errorf("getting query data: %v", err) + } + + micros, ok := v[0].(int64) + if !ok { + return nil, fmt.Errorf("unexpected timestamp type: %T", v[0]) + } + t := time.Unix(0, micros*int64(time.Microsecond)).UTC() + return &t, nil +} + +func combineTerms(req, opt, format string) string { + if opt == "" { + return req + } + return fmt.Sprintf(format, req, opt) +} diff --git a/export/main.go b/export/main.go new file mode 100644 index 0000000..9ec4363 --- /dev/null +++ b/export/main.go @@ -0,0 +1,76 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This tool exports pipelines operations to BigQuery. +package main + +import ( + "context" + "flag" + "fmt" + "os" + + "github.com/googlegenomics/pipelines-tools/export/export" + "github.com/googlegenomics/pipelines-tools/genomics" +) + +var ( + project = flag.String("project", defaultProject(), "the cloud project name") + basePath = flag.String("api", "", "the API base to use") + + filter = flag.String("filter", "", "the export filter") + dataset = flag.String("dataset", "", "the dataset to export to which must already exist") + table = flag.String("table", "", "the table to export to") + update = flag.Bool("update", true, "only export operations newer than those already exported") +) + +func main() { + flag.Parse() + + if *project == "" { + exitf("You must specify a project with --project") + } + + if *dataset == "" || *table == "" { + exitf("You must specify a dataset and table with --dataset and --table") + } + + ctx := context.Background() + service, err := genomics.NewServiceV1(ctx, *basePath) + if err != nil { + exitf("Failed to create service: %v", err) + } + + c := export.Config{ + Project: *project, + Dataset: *dataset, + Table: *table, + Filter: *filter, + Update: *update, + } + + if err := c.ExportV1(ctx, service); err != nil { + exitf("Failed to export operations: %v", err) + } +} + +func exitf(format string, arguments ...interface{}) { + fmt.Fprintf(os.Stderr, format, arguments...) + fmt.Fprintln(os.Stderr) + os.Exit(1) +} + +func defaultProject() string { + return os.Getenv("GOOGLE_CLOUD_PROJECT") +} diff --git a/genomics/genomics.go b/genomics/genomics.go new file mode 100644 index 0000000..5a6191a --- /dev/null +++ b/genomics/genomics.go @@ -0,0 +1,116 @@ +// Copyright 2018 Google Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package genomics provides methods for connecting to the Genomics service. +package genomics + +import ( + "context" + "crypto/tls" + "fmt" + "net/http" + "strings" + "time" + + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" + genomicsv1 "google.golang.org/api/genomics/v1alpha2" + genomics "google.golang.org/api/genomics/v2alpha1" +) + +func NewService(ctx context.Context, basePath string) (*genomics.Service, error) { + client, err := newClient(ctx, genomics.GenomicsScope, basePath) + if err != nil { + return nil, fmt.Errorf("creating client: %v", err) + } + + service, err := genomics.New(client) + if err != nil { + return nil, fmt.Errorf("creating service object: %v", err) + } + if basePath != "" { + service.BasePath = basePath + } + return service, nil +} + +func NewServiceV1(ctx context.Context, basePath string) (*genomicsv1.Service, error) { + client, err := newClient(ctx, genomicsv1.GenomicsScope, basePath) + if err != nil { + return nil, fmt.Errorf("creating client: %v", err) + } + + service, err := genomicsv1.New(client) + if err != nil { + return nil, fmt.Errorf("creating service object: %v", err) + } + if basePath != "" { + service.BasePath = basePath + } + return service, nil +} + +func newClient(ctx context.Context, scope, basePath string) (*http.Client, error) { + var transport robustTransport + + // When connecting to a local server (for Google developers only) disable SSL + // verification since the certificates are not easily verifiable. + if strings.HasPrefix(basePath, "https://localhost:") { + transport.Base.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + } + + ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport}) + + client, err := google.DefaultClient(ctx, scope) + if err != nil { + return nil, fmt.Errorf("creating authenticated client: %v", err) + } + + return client, nil +} + +type robustTransport struct { + Base http.Transport +} + +func (rt *robustTransport) RoundTrip(req *http.Request) (*http.Response, error) { + delay := time.Second + + var errors []string + for { + resp, err := rt.roundTrip(req) + if err == nil { + return resp, nil + } + errors = append(errors, fmt.Sprintf("attempt %d: %v", len(errors)+1, err)) + if len(errors) == 3 { + return resp, fmt.Errorf("%d failed requests: %v", len(errors), strings.Join(errors, ", ")) + } + + delay *= 2 + time.Sleep(delay) + } +} + +func (rt *robustTransport) roundTrip(req *http.Request) (*http.Response, error) { + resp, err := rt.Base.RoundTrip(req) + if err != nil { + return nil, err + } + switch resp.StatusCode { + case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: + return nil, fmt.Errorf("retryable HTTP error: %q", resp.Status) + } + return resp, err +} diff --git a/pipelines/internal/commands/export/export.go b/pipelines/internal/commands/export/export.go index 396b1ce..f479f5d 100644 --- a/pipelines/internal/commands/export/export.go +++ b/pipelines/internal/commands/export/export.go @@ -17,225 +17,36 @@ package export import ( "context" - "encoding/json" "errors" "flag" - "fmt" - "time" - "cloud.google.com/go/bigquery" - "cloud.google.com/go/civil" + "github.com/googlegenomics/pipelines-tools/export/export" genomics "google.golang.org/api/genomics/v2alpha1" ) var ( flags = flag.NewFlagSet("", flag.ExitOnError) - filter = flags.String("filter", "", "the export filter") - datasetName = flags.String("dataset", "", "the dataset to export to which must already exist") - tableName = flags.String("table", "", "the table to export to") - update = flags.Bool("update", true, "only export operations newer than those already exported") + filter = flags.String("filter", "", "the export filter") + dataset = flags.String("dataset", "", "the dataset to export to which must already exist") + table = flags.String("table", "", "the table to export to") + update = flags.Bool("update", true, "only export operations newer than those already exported") + v1 = flags.Bool("v1", false, "export v1 operations") ) -type row struct { - Name string - Done bool - Error *status `bigquery:",nullable"` - - // The raw pipeline JSON. - Pipeline string - - Labels []label - Events []event - - CreateTime civil.DateTime - StartTime, EndTime bigquery.NullDateTime - - // Additional fields pulled out of the pipeline for convenience. - Regions []string - Zones []string - MachineType string - Preemptible bool -} - -type status struct { - Message string - Code int64 -} - -type event struct { - Timestamp civil.DateTime - Description string -} - -type label struct { - Key, Value string -} - func Invoke(ctx context.Context, service *genomics.Service, project string, arguments []string) error { flags.Parse(arguments) - if *datasetName == "" || *tableName == "" { + if *dataset == "" || *table == "" { return errors.New("dataset and table are required") } - path := fmt.Sprintf("projects/%s/operations", project) - call := service.Projects.Operations.List(path).Context(ctx) - call.PageSize(256) - - bq, err := bigquery.NewClient(ctx, project) - if err != nil { - return fmt.Errorf("creating BigQuery client: %v", err) - } - - dataset := bq.Dataset(*datasetName) - if _, err := dataset.Metadata(ctx); err != nil { - return fmt.Errorf("looking up dataset: %v", err) - } - - schema, err := bigquery.InferSchema(row{}) - if err != nil { - return fmt.Errorf("inferring schema: %v", err) - } - - f := *filter - table := dataset.Table(*tableName) - if _, err := table.Metadata(ctx); err != nil { - if err := table.Create(ctx, &bigquery.TableMetadata{Schema: schema}); err != nil { - return fmt.Errorf("creating table: %v", err) - } - } else if *update { - timestamp, err := latestTimestamp(ctx, bq, project) - if err != nil { - return fmt.Errorf("retrieving latest timestamp: %v", err) - } - if timestamp != "" { - expr := fmt.Sprintf(`metadata.createTime > %q`, timestamp) - if f != "" { - f = fmt.Sprintf("(%s) AND %s", f, expr) - } else { - f = expr - } - } - } - call.Filter(f) - - uploader := table.Uploader() - - fmt.Printf("Exporting operations") - - var count int - var pageToken string - for { - resp, err := call.PageToken(pageToken).Do() - if err != nil { - return fmt.Errorf("calling list (after %d operations): %v", count, err) - } - - fmt.Printf(".") - - var savers []*bigquery.StructSaver - for _, operation := range resp.Operations { - var metadata genomics.Metadata - if err := json.Unmarshal(operation.Metadata, &metadata); err != nil { - return fmt.Errorf("unmarshalling operation (after %d operations): %v", count, err) - return err - } - - pipeline, err := json.Marshal(metadata.Pipeline) - if err != nil { - return fmt.Errorf("marshalling pipeline (after %d operations): %v", count, err) - } - resources := metadata.Pipeline.Resources - - r := row{ - Name: operation.Name, - Done: operation.Done, - CreateTime: parseTimestamp(metadata.CreateTime).DateTime, - StartTime: parseTimestamp(metadata.StartTime), - EndTime: parseTimestamp(metadata.EndTime), - - Pipeline: string(pipeline), - Regions: resources.Regions, - Zones: resources.Zones, - MachineType: resources.VirtualMachine.MachineType, - Preemptible: resources.VirtualMachine.Preemptible, - } - - if operation.Error != nil { - r.Error = &status{ - Message: operation.Error.Message, - Code: operation.Error.Code, - } - } - - for k, v := range metadata.Labels { - r.Labels = append(r.Labels, label{Key: k, Value: v}) - } - - for _, e := range metadata.Events { - r.Events = append(r.Events, event{ - Timestamp: parseTimestamp(e.Timestamp).DateTime, - Description: e.Description, - }) - } - - savers = append(savers, &bigquery.StructSaver{ - Struct: r, - InsertID: operation.Name, - Schema: schema, - }) - count++ - } - - if err := uploader.Put(ctx, savers); err != nil { - return fmt.Errorf("uploading rows (after %d operations): %v", count, err) - } - - if resp.NextPageToken == "" { - fmt.Printf("done\n%d operations exported\n", count) - return nil - } - - pageToken = resp.NextPageToken - } -} - -func parseTimestamp(ts string) bigquery.NullDateTime { - t, err := time.Parse(time.RFC3339, ts) - if err != nil { - return bigquery.NullDateTime{} - } - return bigquery.NullDateTime{ - DateTime: civil.DateTimeOf(t), - Valid: true, - } -} - -func latestTimestamp(ctx context.Context, bq *bigquery.Client, project string) (string, error) { - q := bq.Query(fmt.Sprintf("SELECT MAX(CreateTime) FROM `%s.%s.%s`", project, *datasetName, *tableName)) - job, err := q.Run(ctx) - if err != nil { - return "", fmt.Errorf("running query: %v", err) - } - status, err := job.Wait(ctx) - if err != nil { - return "", fmt.Errorf("waiting for query: %v", err) - } - if err := status.Err(); err != nil { - return "", fmt.Errorf("query status: %v", err) - } - it, err := job.Read(ctx) - if err != nil { - return "", fmt.Errorf("reading query: %v", err) - } - - var v []bigquery.Value - if err := it.Next(&v); err != nil { - return "", fmt.Errorf("getting query data: %v", err) - } - if v[0] == nil { - return "", nil + c := export.Config{ + Project: project, + Dataset: *dataset, + Table: *table, + Filter: *filter, + Update: *update, } - return fmt.Sprintf("%s", v[0]), nil + return c.ExportV2(ctx, service) } diff --git a/pipelines/main.go b/pipelines/main.go index 4685c57..d50b3df 100644 --- a/pipelines/main.go +++ b/pipelines/main.go @@ -17,30 +17,25 @@ package main import ( "context" - "crypto/tls" "flag" "fmt" - "net/http" "os" - "strings" - "time" + "github.com/googlegenomics/pipelines-tools/genomics" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/cancel" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/export" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/query" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/run" "github.com/googlegenomics/pipelines-tools/pipelines/internal/commands/watch" - "golang.org/x/oauth2" - "golang.org/x/oauth2/google" - genomics "google.golang.org/api/genomics/v2alpha1" + genomicsv2 "google.golang.org/api/genomics/v2alpha1" ) var ( project = flag.String("project", defaultProject(), "the cloud project name") basePath = flag.String("api", "", "the API base to use") - commands = map[string]func(context.Context, *genomics.Service, string, []string) error{ + commands = map[string]func(context.Context, *genomicsv2.Service, string, []string) error{ "run": run.Invoke, "cancel": cancel.Invoke, "query": query.Invoke, @@ -71,7 +66,7 @@ func main() { } ctx := context.Background() - service, err := newService(ctx, *basePath) + service, err := genomics.NewService(ctx, *basePath) if err != nil { exitf("Failed to create service: %v", err) } @@ -87,67 +82,6 @@ func exitf(format string, arguments ...interface{}) { os.Exit(1) } -func newService(ctx context.Context, basePath string) (*genomics.Service, error) { - var transport robustTransport - - // When connecting to a local server (for Google developers only) disable SSL - // verification since the certificates are not easily verifiable. - if strings.HasPrefix(basePath, "https://localhost:") { - transport.Base.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - } - - ctx = context.WithValue(ctx, oauth2.HTTPClient, &http.Client{Transport: &transport}) - - client, err := google.DefaultClient(ctx, genomics.GenomicsScope) - if err != nil { - return nil, fmt.Errorf("creating authenticated client: %v", err) - } - - service, err := genomics.New(client) - if err != nil { - return nil, fmt.Errorf("creating service object: %v", err) - } - if basePath != "" { - service.BasePath = basePath - } - return service, nil -} - func defaultProject() string { return os.Getenv("GOOGLE_CLOUD_PROJECT") } - -type robustTransport struct { - Base http.Transport -} - -func (rt *robustTransport) RoundTrip(req *http.Request) (*http.Response, error) { - delay := time.Second - - var errors []string - for { - resp, err := rt.roundTrip(req) - if err == nil { - return resp, nil - } - errors = append(errors, fmt.Sprintf("attempt %d: %v", len(errors)+1, err)) - if len(errors) == 3 { - return resp, fmt.Errorf("%d failed requests: %v", len(errors), strings.Join(errors, ", ")) - } - - delay *= 2 - time.Sleep(delay) - } -} - -func (rt *robustTransport) roundTrip(req *http.Request) (*http.Response, error) { - resp, err := rt.Base.RoundTrip(req) - if err != nil { - return nil, err - } - switch resp.StatusCode { - case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout: - return nil, fmt.Errorf("retryable HTTP error: %q", resp.Status) - } - return resp, err -}