From 254ba08be47050aefe79822554d6185c2084945e Mon Sep 17 00:00:00 2001 From: Pieterjan Lambein Date: Wed, 11 Jan 2017 16:36:59 +0100 Subject: [PATCH 1/2] Use default application credentials if no JSON config is available. --- async_worker_group.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/async_worker_group.go b/async_worker_group.go index 3ed870a..d1a1e15 100644 --- a/async_worker_group.go +++ b/async_worker_group.go @@ -1,13 +1,14 @@ package bqstreamer import ( - "errors" "net/http" "sync" "time" "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "golang.org/x/oauth2/jwt" + "google.golang.org/api/bigquery/v2" ) // AsyncWorkerGroup asynchronously streams rows to BigQuery in bulk. @@ -50,11 +51,21 @@ type AsyncWorkerGroup struct { } // New returns a new AsyncWorkerGroup using given OAuth2/JWT configuration. +// Set jwtConfig to nil if your system corresponds to either of the following conditions: +// - a system that has called "gcloud auth application-default login" +// - a system running in Google Application Engine +// - a system running in Google Compute Engine +// ref: https://developers.google.com/identity/protocols/application-default-credentials func NewAsyncWorkerGroup(jwtConfig *jwt.Config, options ...AsyncOptionFunc) (*AsyncWorkerGroup, error) { if jwtConfig == nil { - return nil, errors.New("jwt.Config is nil") + ctx := oauth2.NoContext + client, err := google.DefaultClient(ctx, bigquery.BigqueryInsertdataScope) + if err != nil { + return nil, err + } + newHTTPClient := func() *http.Client { return client } + return newAsyncWorkerGroup(newHTTPClient, options...) } - // Create a new Streamer, with OAuth2/JWT http.Client constructor function. newHTTPClient := func() *http.Client { return jwtConfig.Client(oauth2.NoContext) } return newAsyncWorkerGroup(newHTTPClient, options...) From 61a0cef291f2f279083e7bbe85b442211df204a2 Mon Sep 17 00:00:00 2001 From: Pieterjan Lambein Date: Thu, 29 Jun 2017 10:56:33 +0200 Subject: [PATCH 2/2] Add a 30 sec timeout to the bigquery insert --- sync_worker.go | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/sync_worker.go b/sync_worker.go index be21a6a..6a1663a 100644 --- a/sync_worker.go +++ b/sync_worker.go @@ -3,6 +3,7 @@ package bqstreamer import ( + "context" "net/http" "time" @@ -146,16 +147,22 @@ func (w *SyncWorker) insertAll(insertFunc func(projectID, datasetID, tableID str // TODO cache bigquery service instead of creating a new one every insertTable() call // TODO add support for SkipInvalidRows, IgnoreUnknownValues func (w *SyncWorker) insertTable(projectID, datasetID, tableID string, tbl table) *TableInsertErrors { - res, err := bigquery.NewTabledataService(w.service). - InsertAll( - projectID, datasetID, tableID, - &bigquery.TableDataInsertAllRequest{ - Kind: "bigquery#tableDataInsertAllRequest", - Rows: tbl, - IgnoreUnknownValues: w.ignoreUnknownValues, - SkipInvalidRows: w.skipInvalidRows, - }). - Do() + tabledataInsertAllCall := bigquery.NewTabledataService(w.service).InsertAll( + projectID, datasetID, tableID, + &bigquery.TableDataInsertAllRequest{ + Kind: "bigquery#tableDataInsertAllRequest", + Rows: tbl, + IgnoreUnknownValues: w.ignoreUnknownValues, + SkipInvalidRows: w.skipInvalidRows, + }) + + // Set a timeout on the bigtable insert + // TODO make this configurable by passing through a context + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + tabledataInsertAllCall.Context(ctx) + res, err := tabledataInsertAllCall.Do() var rows []*bigquery.TableDataInsertAllResponseInsertErrors if res != nil {