diff --git a/compiler.go b/compiler.go index bfb5afed19..4a1e03de83 100644 --- a/compiler.go +++ b/compiler.go @@ -2,48 +2,25 @@ package flux import ( "context" - "time" + "fmt" ) -const ( - FluxCompilerType = "flux" - SpecCompilerType = "spec" -) - -// AddCompilerMappings adds the Flux specific compiler mappings. -func AddCompilerMappings(mappings CompilerMappings) error { - if err := mappings.Add(FluxCompilerType, func() Compiler { - return new(FluxCompiler) - - }); err != nil { - return err - } - return mappings.Add(SpecCompilerType, func() Compiler { - return new(SpecCompiler) - }) +// Compiler produces a specification for the query. +type Compiler interface { + // Compile produces a specification for the query. + Compile(ctx context.Context) (*Spec, error) + CompilerType() CompilerType } -// FluxCompiler compiles a Flux script into a spec. -type FluxCompiler struct { - Query string `json:"query"` -} - -func (c FluxCompiler) Compile(ctx context.Context) (*Spec, error) { - return Compile(ctx, c.Query, time.Now()) -} +// CompilerType is the name of a query compiler. +type CompilerType string +type CreateCompiler func() Compiler +type CompilerMappings map[CompilerType]CreateCompiler -func (c FluxCompiler) CompilerType() CompilerType { - return FluxCompilerType -} - -// SpecCompiler implements Compiler by returning a known spec. -type SpecCompiler struct { - Spec *Spec `json:"spec"` -} - -func (c SpecCompiler) Compile(ctx context.Context) (*Spec, error) { - return c.Spec, nil -} -func (c SpecCompiler) CompilerType() CompilerType { - return SpecCompilerType +func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error { + if _, ok := m[t]; ok { + return fmt.Errorf("duplicate compiler mapping for %q", t) + } + m[t] = c + return nil } diff --git a/control/controller.go b/control/controller.go index c16a9b26e4..46a11f0ea1 100644 --- a/control/controller.go +++ b/control/controller.go @@ -31,7 +31,6 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" opentracing "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" @@ -48,7 +47,8 @@ type Controller struct { queryDone chan *Query cancelRequest chan QueryID - metrics *controllerMetrics + metrics *controllerMetrics + labelKeys []string verbose bool @@ -69,6 +69,10 @@ type Config struct { PlannerOptions []plan.Option Logger *zap.Logger Verbose bool + // MetricLabelKeys is a list of labels to add to the metrics produced by the controller. + // The value for a given key will be read off the context. + // The context value must be a string or an implementation of the Stringer interface. + MetricLabelKeys []string } type QueryID uint64 @@ -90,8 +94,9 @@ func New(c Config) *Controller { pplanner: plan.NewPlanner(c.PlannerOptions...), executor: execute.NewExecutor(c.ExecutorDependencies, logger), logger: logger, - metrics: newControllerMetrics(), verbose: c.Verbose, + metrics: newControllerMetrics(c.MetricLabelKeys), + labelKeys: c.MetricLabelKeys, } go ctrl.run() return ctrl @@ -99,9 +104,9 @@ func New(c Config) *Controller { // Query submits a query for execution returning immediately. // Done must be called on any returned Query objects. -func (c *Controller) Query(ctx context.Context, req *flux.Request) (flux.Query, error) { - q := c.createQuery(ctx, req.OrganizationID) - if err := c.compileQuery(q, req.Compiler); err != nil { +func (c *Controller) Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) { + q := c.createQuery(ctx, compiler.CompilerType()) + if err := c.compileQuery(q, compiler); err != nil { q.parentSpan.Finish() return nil, err } @@ -112,11 +117,28 @@ func (c *Controller) Query(ctx context.Context, req *flux.Request) (flux.Query, return q, nil } -func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query { +type Stringer interface { + String() string +} + +func (c *Controller) createQuery(ctx context.Context, ct flux.CompilerType) *Query { id := c.nextID() - labelValues := []string{ - orgID.String(), + labelValues := make([]string, len(c.labelKeys)) + compileLabelValues := make([]string, len(c.labelKeys)+1) + for i, k := range c.labelKeys { + value := ctx.Value(k) + var str string + switch v := value.(type) { + case string: + str = v + case Stringer: + str = v.String() + } + labelValues[i] = str + compileLabelValues[i] = str } + compileLabelValues[len(compileLabelValues)-1] = string(ct) + cctx, cancel := context.WithCancel(ctx) parentSpan, parentCtx := StartSpanFromContext( cctx, @@ -126,16 +148,16 @@ func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query ) ready := make(chan map[string]flux.Result, 1) return &Query{ - id: id, - orgID: orgID, - labelValues: labelValues, - state: Created, - c: c, - now: time.Now().UTC(), - ready: ready, - parentCtx: parentCtx, - parentSpan: parentSpan, - cancel: cancel, + id: id, + labelValues: labelValues, + compileLabelValues: compileLabelValues, + state: Created, + c: c, + now: time.Now().UTC(), + ready: ready, + parentCtx: parentCtx, + parentSpan: parentSpan, + cancel: cancel, } } @@ -296,7 +318,7 @@ func (c *Controller) processQuery(q *Query) (pop bool, err error) { return true, errors.New("failed to transition query into executing state") } q.alloc = new(execute.Allocator) - r, err := c.executor.Execute(q.executeCtx, q.orgID, q.plan, q.alloc) + r, err := c.executor.Execute(q.executeCtx, q.plan, q.alloc) if err != nil { return true, errors.Wrap(err, "failed to execute query") } @@ -338,9 +360,8 @@ func (c *Controller) PrometheusCollectors() []prometheus.Collector { type Query struct { id QueryID - orgID platform.ID - - labelValues []string + labelValues []string + compileLabelValues []string c *Controller @@ -383,10 +404,6 @@ func (q *Query) ID() QueryID { return q.id } -func (q *Query) OrganizationID() platform.ID { - return q.orgID -} - func (q *Query) Spec() *flux.Spec { return &q.spec } @@ -549,8 +566,8 @@ func (q *Query) tryCompile() bool { q.compileSpan, q.compilingCtx = StartSpanFromContext( q.parentCtx, "compiling", - q.c.metrics.compilingDur.WithLabelValues(q.labelValues...), - q.c.metrics.compiling.WithLabelValues(q.labelValues...), + q.c.metrics.compilingDur.WithLabelValues(q.compileLabelValues...), + q.c.metrics.compiling.WithLabelValues(q.compileLabelValues...), ) q.state = Compiling diff --git a/control/controller_test.go b/control/controller_test.go index 4dcbfa7481..c33988861e 100644 --- a/control/controller_test.go +++ b/control/controller_test.go @@ -10,7 +10,6 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/mock" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" @@ -33,18 +32,14 @@ func TestController_CompileQuery_Failure(t *testing.T) { } ctrl := New(Config{}) - req := &flux.Request{ - OrganizationID: platform.ID("a"), - Compiler: compiler, - } // Run the query. It should return an error. - if _, err := ctrl.Query(context.Background(), req); err == nil { + if _, err := ctrl.Query(context.Background(), compiler); err == nil { t.Fatal("expected error") } // Verify the metrics say there are no queries. - gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String()) + gauge, err := ctrl.metrics.all.GetMetricWithLabelValues() if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -71,13 +66,9 @@ func TestController_EnqueueQuery_Failure(t *testing.T) { } ctrl := New(Config{}) - req := &flux.Request{ - OrganizationID: platform.ID("a"), - Compiler: compiler, - } // Run the query. It should return an error. - if _, err := ctrl.Query(context.Background(), req); err == nil { + if _, err := ctrl.Query(context.Background(), compiler); err == nil { t.Fatal("expected error") } @@ -86,7 +77,7 @@ func TestController_EnqueueQuery_Failure(t *testing.T) { "all": ctrl.metrics.all, "queueing": ctrl.metrics.queueing, } { - gauge, err := gaugeVec.GetMetricWithLabelValues(req.OrganizationID.String()) + gauge, err := gaugeVec.GetMetricWithLabelValues() if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -104,19 +95,15 @@ func TestController_EnqueueQuery_Failure(t *testing.T) { func TestController_ExecuteQuery_Failure(t *testing.T) { executor := mock.NewExecutor() - executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { + executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { return nil, errors.New("expected") } ctrl := New(Config{}) ctrl.executor = executor - req := &flux.Request{ - OrganizationID: platform.ID("a"), - Compiler: mockCompiler, - } // Run a query and then wait for it to be ready. - q, err := ctrl.Query(context.Background(), req) + q, err := ctrl.Query(context.Background(), mockCompiler) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -134,7 +121,7 @@ func TestController_ExecuteQuery_Failure(t *testing.T) { q.Done() // Verify the metrics say there are no queries. - gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String()) + gauge, err := ctrl.metrics.all.GetMetricWithLabelValues() if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -151,20 +138,16 @@ func TestController_ExecuteQuery_Failure(t *testing.T) { func TestController_CancelQuery(t *testing.T) { executor := mock.NewExecutor() - executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { + executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { // Return an empty result. return map[string]flux.Result{}, nil } ctrl := New(Config{}) ctrl.executor = executor - req := &flux.Request{ - OrganizationID: platform.ID("a"), - Compiler: mockCompiler, - } // Run a query and then wait for it to be ready. - q, err := ctrl.Query(context.Background(), req) + q, err := ctrl.Query(context.Background(), mockCompiler) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -179,7 +162,7 @@ func TestController_CancelQuery(t *testing.T) { q.Done() // Verify the metrics say there are no queries. - gauge, err := ctrl.metrics.all.GetMetricWithLabelValues(req.OrganizationID.String()) + gauge, err := ctrl.metrics.all.GetMetricWithLabelValues() if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -198,20 +181,16 @@ func TestController_BlockedExecutor(t *testing.T) { done := make(chan struct{}) executor := mock.NewExecutor() - executor.ExecuteFn = func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { + executor.ExecuteFn = func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { <-done return nil, nil } ctrl := New(Config{}) ctrl.executor = executor - req := &flux.Request{ - OrganizationID: platform.ID("a"), - Compiler: mockCompiler, - } // Run a query that will cause the controller to stall. - q, err := ctrl.Query(context.Background(), req) + q, err := ctrl.Query(context.Background(), mockCompiler) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -234,7 +213,7 @@ func TestController_BlockedExecutor(t *testing.T) { } }() - if _, err := ctrl.Query(ctx, req); err == nil { + if _, err := ctrl.Query(ctx, mockCompiler); err == nil { t.Fatal("expected error") } else if got, want := err, context.Canceled; got != want { t.Fatalf("unexpected error: got=%q want=%q", got, want) diff --git a/control/metrics.go b/control/metrics.go index 32f5124163..560f46ccfb 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -19,14 +19,12 @@ type controllerMetrics struct { executingDur *prometheus.HistogramVec } -func newControllerMetrics() *controllerMetrics { +func newControllerMetrics(labels []string) *controllerMetrics { const ( namespace = "query" subsystem = "control" ) - labels := []string{"org"} - return &controllerMetrics{ all: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, @@ -40,7 +38,7 @@ func newControllerMetrics() *controllerMetrics { Subsystem: subsystem, Name: "compiling_active", Help: "Number of queries actively compiling", - }, labels), + }, append(labels, "compiler_type")), queueing: prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: namespace, @@ -84,7 +82,7 @@ func newControllerMetrics() *controllerMetrics { Name: "compiling_duration_seconds", Help: "Histogram of times spent compiling queries", Buckets: prometheus.ExponentialBuckets(1e-3, 5, 7), - }, labels), + }, append(labels, "compiler_type")), queueingDur: prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: namespace, diff --git a/dependency.go b/dependency.go deleted file mode 100644 index 77773a5b76..0000000000 --- a/dependency.go +++ /dev/null @@ -1,56 +0,0 @@ -package flux - -import ( - "context" - - "github.com/influxdata/platform" -) - -// FromBucketService wraps an platform.BucketService in the BucketLookup interface. -func FromBucketService(srv platform.BucketService) *BucketLookup { - return &BucketLookup{ - BucketService: srv, - } -} - -// BucketLookup converts Flux bucket lookups into platform.BucketService calls. -type BucketLookup struct { - BucketService platform.BucketService -} - -// Lookup returns the bucket id and its existence given an org id and bucket name. -func (b *BucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool) { - oid := platform.ID(orgID) - filter := platform.BucketFilter{ - OrganizationID: &oid, - Name: &name, - } - bucket, err := b.BucketService.FindBucket(context.Background(), filter) - if err != nil { - return nil, false - } - return bucket.ID, true -} - -// FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface. -func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup { - return &OrganizationLookup{OrganizationService: srv} -} - -// OrganizationLookup converts organization name lookups into platform.OrganizationService calls. -type OrganizationLookup struct { - OrganizationService platform.OrganizationService -} - -// Lookup returns the organization ID and its existence given an organization name. -func (o *OrganizationLookup) Lookup(ctx context.Context, name string) (platform.ID, bool) { - org, err := o.OrganizationService.FindOrganization( - ctx, - platform.OrganizationFilter{Name: &name}, - ) - - if err != nil { - return nil, false - } - return org.ID, true -} diff --git a/dialect.go b/dialect.go new file mode 100644 index 0000000000..79f24696d3 --- /dev/null +++ b/dialect.go @@ -0,0 +1,25 @@ +package flux + +import "fmt" + +// Dialect describes how to encode results. +type Dialect interface { + // Encoder creates an encoder for the results + Encoder() MultiResultEncoder + // DialectType report the type of the dialect + DialectType() DialectType +} + +// DialectType is the name of a query result dialect. +type DialectType string +type CreateDialect func() Dialect + +type DialectMappings map[DialectType]CreateDialect + +func (m DialectMappings) Add(t DialectType, c CreateDialect) error { + if _, ok := m[t]; ok { + return fmt.Errorf("duplicate dialect mapping for %q", t) + } + m[t] = c + return nil +} diff --git a/docs/Language.md b/docs/Language.md index 8fc5d084c7..7be61a5c0d 100644 --- a/docs/Language.md +++ b/docs/Language.md @@ -9,7 +9,7 @@ The Flux langauage is used to construct query specifications. # Syntax -The langauage syntax is defined by the platform/query/paser/flux.peg grammar. +The langauage syntax is defined by the [SPEC](./SPEC.md). ## Keyword Arguments diff --git a/execute/executor.go b/execute/executor.go index ac688df1ff..80a32fa9f2 100644 --- a/execute/executor.go +++ b/execute/executor.go @@ -9,14 +9,13 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" "github.com/pkg/errors" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) type Executor interface { - Execute(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *Allocator) (map[string]flux.Result, error) + Execute(ctx context.Context, p *plan.PlanSpec, a *Allocator) (map[string]flux.Result, error) } type executor struct { @@ -53,8 +52,6 @@ type executionState struct { p *plan.PlanSpec deps Dependencies - orgID platform.ID - alloc *Allocator resources flux.ResourceManagement @@ -68,8 +65,8 @@ type executionState struct { logger *zap.Logger } -func (e *executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *Allocator) (map[string]flux.Result, error) { - es, err := e.createExecutionState(ctx, orgID, p, a) +func (e *executor) Execute(ctx context.Context, p *plan.PlanSpec, a *Allocator) (map[string]flux.Result, error) { + es, err := e.createExecutionState(ctx, p, a) if err != nil { return nil, errors.Wrap(err, "failed to initialize execute state") } @@ -85,14 +82,13 @@ func validatePlan(p *plan.PlanSpec) error { return nil } -func (e *executor) createExecutionState(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *Allocator) (*executionState, error) { +func (e *executor) createExecutionState(ctx context.Context, p *plan.PlanSpec, a *Allocator) (*executionState, error) { if err := validatePlan(p); err != nil { return nil, errors.Wrap(err, "invalid plan") } // Set allocation limit a.Limit = p.Resources.MemoryBytesQuota es := &executionState{ - orgID: orgID, p: p, deps: e.deps, alloc: a, @@ -139,6 +135,7 @@ func (es *executionState) createNode(ctx context.Context, pr *plan.Procedure, no // Build execution context ec := executionContext{ + ctx: ctx, es: es, streamContext: streamContext, } @@ -248,20 +245,20 @@ func (es *executionState) do(ctx context.Context) { // Need a unique stream context per execution context type executionContext struct { + ctx context.Context es *executionState parents []DatasetID streamContext streamContext } -// Satisfy the ExecutionContext interface -func (ec executionContext) OrganizationID() platform.ID { - return ec.es.orgID -} - func resolveTime(qt flux.Time, now time.Time) Time { return Time(qt.Time(now).UnixNano()) } +func (ec executionContext) Context() context.Context { + return ec.ctx +} + func (ec executionContext) ResolveTime(qt flux.Time) Time { return resolveTime(qt, ec.es.p.Now) } diff --git a/execute/executor_test.go b/execute/executor_test.go index 7517f74a5b..32acdeab53 100644 --- a/execute/executor_test.go +++ b/execute/executor_test.go @@ -15,17 +15,11 @@ import ( "github.com/influxdata/flux/execute/executetest" "github.com/influxdata/flux/functions" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" uuid "github.com/satori/go.uuid" "go.uber.org/zap/zaptest" ) var epoch = time.Unix(0, 0) -var orgID platform.ID - -func init() { - orgID.DecodeFromString("aaaa") -} func TestExecutor_Execute(t *testing.T) { testCases := []struct { @@ -532,7 +526,7 @@ func TestExecutor_Execute(t *testing.T) { tc := tc t.Run(tc.name, func(t *testing.T) { exe := execute.NewExecutor(nil, zaptest.NewLogger(t)) - results, err := exe.Execute(context.Background(), orgID, tc.plan, executetest.UnlimitedAllocator) + results, err := exe.Execute(context.Background(), tc.plan, executetest.UnlimitedAllocator) if err != nil { t.Fatal(err) } diff --git a/execute/transformation.go b/execute/transformation.go index 057711f45f..7b4dcd0b2c 100644 --- a/execute/transformation.go +++ b/execute/transformation.go @@ -1,11 +1,11 @@ package execute import ( + "context" "fmt" "github.com/influxdata/flux" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" ) type Transformation interface { @@ -23,7 +23,7 @@ type StreamContext interface { } type Administration interface { - OrganizationID() platform.ID + Context() context.Context ResolveTime(qt flux.Time) Time StreamContext() StreamContext diff --git a/functions/from.go b/functions/from.go index 5613186add..12c8f4f5e5 100644 --- a/functions/from.go +++ b/functions/from.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/plan" "github.com/influxdata/flux/semantic" "github.com/influxdata/platform" + "github.com/influxdata/platform/query" "github.com/pkg/errors" ) @@ -191,7 +192,11 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu currentTime := w.Start + execute.Time(w.Period) deps := a.Dependencies()[FromKind].(storage.Dependencies) - orgID := a.OrganizationID() + req := query.RequestFromContext(a.Context()) + if req == nil { + return nil, errors.New("missing request on context") + } + orgID := req.OrganizationID var bucketID platform.ID // Determine bucketID diff --git a/functions/from_test.go b/functions/from_test.go index 4046f00660..51ea5b4552 100644 --- a/functions/from_test.go +++ b/functions/from_test.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/flux/functions" "github.com/influxdata/flux/querytest" "github.com/influxdata/platform" + pquerytest "github.com/influxdata/platform/query/querytest" ) func TestFrom_NewQuery(t *testing.T) { @@ -116,7 +117,7 @@ func TestFromOperation_Marshaling(t *testing.T) { func TestFromOpSpec_BucketsAccessed(t *testing.T) { bucketName := "my_bucket" bucketID, _ := platform.IDFromString("deadbeef") - tests := []querytest.NewQueryTestCase{ + tests := []pquerytest.BucketAwareQueryTestCase{ { Name: "From with bucket", Raw: `from(bucket:"my_bucket")`, @@ -134,7 +135,7 @@ func TestFromOpSpec_BucketsAccessed(t *testing.T) { tc := tc t.Run(tc.Name, func(t *testing.T) { t.Parallel() - querytest.NewQueryTestHelper(t, tc) + pquerytest.BucketAwareQueryTestHelper(t, tc) }) } } diff --git a/functions/prepcsvtests/prepcsvtests.go b/functions/prepcsvtests/prepcsvtests.go index ed10e535f9..87f7f88c5d 100644 --- a/functions/prepcsvtests/prepcsvtests.go +++ b/functions/prepcsvtests/prepcsvtests.go @@ -11,9 +11,9 @@ import ( "regexp" "strings" - "github.com/influxdata/flux" _ "github.com/influxdata/flux/builtin" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/querytest" "golang.org/x/text/unicode/norm" ) @@ -68,20 +68,17 @@ func main() { return } - pqs := querytest.GetProxyQueryServiceBridge() - req := &flux.ProxyRequest{ - Request: flux.Request{ - Compiler: querytest.FromCSVCompiler{ - Compiler: flux.FluxCompiler{ - Query: string(querytext), - }, - InputFile: incsv, - }, + pqs := querytest.NewQuerier() + c := querytest.FromCSVCompiler{ + Compiler: lang.FluxCompiler{ + Query: string(querytext), }, - Dialect: csv.DefaultDialect(), + InputFile: incsv, } + d := csv.DefaultDialect() + var buf bytes.Buffer - _, err = pqs.Query(context.Background(), &buf, req) + _, err = pqs.Query(context.Background(), &buf, c, d) if err != nil { fmt.Printf("error: %s", err) return diff --git a/functions/query_test.go b/functions/query_test.go index f52953e0ea..cb496d728a 100644 --- a/functions/query_test.go +++ b/functions/query_test.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/flux" _ "github.com/influxdata/flux/builtin" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/querytest" ) @@ -26,7 +27,7 @@ var skipTests = map[string]string{ "string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)", } -var pqs = querytest.GetProxyQueryServiceBridge() +var querier = querytest.NewQuerier() func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) { dir, err := os.Getwd() @@ -57,7 +58,7 @@ func Test_QueryEndToEnd(t *testing.T) { if skip { t.Skip(reason) } - testFlux(t, pqs, prefix, ".flux") + testFlux(t, querier, prefix, ".flux") }) }) } @@ -77,13 +78,13 @@ func Benchmark_QueryEndToEnd(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - testFlux(b, pqs, prefix, ".flux") + testFlux(b, querier, prefix, ".flux") } }) }) } -func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) { +func testFlux(t testing.TB, querier *querytest.Querier, prefix, queryExt string) { q, err := ioutil.ReadFile(prefix + queryExt) if err != nil { t.Fatal(err) @@ -95,27 +96,22 @@ func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) t.Fatal(err) } - compiler := flux.FluxCompiler{ - Query: string(q), - } - req := &flux.ProxyRequest{ - Request: flux.Request{ - Compiler: querytest.FromCSVCompiler{ - Compiler: compiler, - InputFile: csvInFilename, - }, + c := querytest.FromCSVCompiler{ + Compiler: lang.FluxCompiler{ + Query: string(q), }, - Dialect: csv.DefaultDialect(), + InputFile: csvInFilename, } + d := csv.DefaultDialect() - QueryTestCheckSpec(t, pqs, req, string(csvOut)) + QueryTestCheckSpec(t, querier, c, d, string(csvOut)) } -func QueryTestCheckSpec(t testing.TB, pqs flux.ProxyQueryService, req *flux.ProxyRequest, want string) { +func QueryTestCheckSpec(t testing.TB, querier *querytest.Querier, c flux.Compiler, d flux.Dialect, want string) { t.Helper() var buf bytes.Buffer - _, err := pqs.Query(context.Background(), &buf, req) + _, err := querier.Query(context.Background(), &buf, c, d) if err != nil { t.Errorf("failed to run query: %v", err) return diff --git a/go.mod b/go.mod index a15b003e6e..95bd828d70 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,32 @@ module github.com/influxdata/flux require ( + github.com/NYTimes/gziphandler v1.0.1 // indirect + github.com/RoaringBitmap/roaring v0.4.16 // indirect + github.com/Shopify/sarama v1.18.0 // indirect github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 + github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999 // indirect + github.com/armon/go-metrics v0.0.0-20180713145231-3c58d8115a78 // indirect + github.com/aws/aws-sdk-go v1.15.33 // indirect github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect + github.com/boltdb/bolt v1.3.1 // indirect + github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5 // indirect github.com/c-bata/go-prompt v0.2.2 - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/golang/protobuf v1.2.0 // indirect + github.com/cespare/xxhash v1.1.0 // indirect + github.com/codahale/hrw v0.0.0-20140702143812-d4f22a6ff3fc // indirect + github.com/coreos/bbolt v1.3.0 // indirect + github.com/coreos/etcd v3.3.9+incompatible // indirect + github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect + github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 // indirect + github.com/eapache/go-resiliency v1.1.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/elazarl/go-bindata-assetfs v1.0.0 // indirect + github.com/ghodss/yaml v1.0.0 // indirect + github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd // indirect + github.com/go-logfmt/logfmt v0.3.0 // indirect + github.com/gogo/protobuf v1.1.1 // indirect + github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf // indirect github.com/gonum/diff v0.0.0-20180125090814-f0137a19aa16 // indirect github.com/gonum/floats v0.0.0-20180125090339-7de1f4ea7ab5 // indirect @@ -15,9 +36,34 @@ require ( github.com/gonum/mathext v0.0.0-20180126232648-3ffefb3e36fc // indirect github.com/gonum/matrix v0.0.0-20180124231301-a41cc49d4c29 // indirect github.com/gonum/stat v0.0.0-20180125090729-ec9c8a1062f4 + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect github.com/google/go-cmp v0.2.0 - github.com/influxdata/platform v0.0.0-20180905213845-bbcbaee9fd79 + github.com/google/go-github v17.0.0+incompatible // indirect + github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 // indirect + github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect + github.com/googleapis/gnostic v0.2.0 // indirect + github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 // indirect + github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect + github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 // indirect + github.com/hashicorp/go-immutable-radix v1.0.0 // indirect + github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect + github.com/hashicorp/raft v1.0.0 // indirect + github.com/influxdata/gobreaker v0.0.0-20180523174524-eeace644aa0e // indirect + github.com/influxdata/idpe v0.0.0-20180911213036-9af06c765f1b // indirect + github.com/influxdata/inch v0.0.0-20180710214615-df945ff74609 // indirect + github.com/influxdata/influxdb v1.6.2 // indirect + github.com/influxdata/influxql v0.0.0-20180823200743-a7267bff5327 // indirect + github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e // indirect + github.com/influxdata/platform v0.0.0-20180911232614-de9ab156a890 github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a + github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 // indirect + github.com/influxdata/yamux v0.0.0-20171107173414-1f58ded512de // indirect + github.com/influxdata/yarpc v0.0.1 // indirect + github.com/jessevdk/go-flags v1.4.0 // indirect + github.com/json-iterator/go v1.1.5 // indirect + github.com/jsternberg/zap-logfmt v1.2.0 // indirect + github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d // indirect + github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef // indirect github.com/kr/pretty v0.1.0 // indirect github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.4 // indirect @@ -25,24 +71,45 @@ require ( github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/nats-io/gnatsd v1.3.0 // indirect + github.com/nats-io/go-nats v1.6.0 // indirect + github.com/nats-io/go-nats-streaming v0.4.0 // indirect + github.com/nats-io/nats-streaming-server v0.11.0 // indirect + github.com/nats-io/nuid v1.0.0 // indirect + github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 // indirect github.com/opentracing/opentracing-go v1.0.2 + github.com/openzipkin/zipkin-go-opentracing v0.3.4 // indirect + github.com/peterbourgon/diskv v2.0.1+incompatible // indirect + github.com/philhofer/fwd v1.0.0 // indirect + github.com/pierrec/lz4 v2.0.5+incompatible // indirect github.com/pkg/errors v0.8.0 github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v0.0.0-20171201122222-661e31bf844d github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e // indirect github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 // indirect + github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 // indirect github.com/satori/go.uuid v1.2.0 + github.com/segmentio/kafka-go v0.1.0 // indirect github.com/sergi/go-diff v1.0.0 // indirect - github.com/stretchr/testify v1.2.2 // indirect - go.uber.org/atomic v1.3.2 // indirect - go.uber.org/multierr v1.1.0 // indirect + github.com/sirupsen/logrus v1.0.6 // indirect + github.com/spf13/cobra v0.0.3 // indirect + github.com/spf13/viper v1.2.0 // indirect + github.com/tinylib/msgp v1.0.2 // indirect + github.com/tylerb/graceful v1.2.15 // indirect + github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 // indirect go.uber.org/zap v1.9.1 - golang.org/x/net v0.0.0-20180826012351-8a410e7b638d // indirect - golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f // indirect - golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 // indirect + golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b // indirect golang.org/x/text v0.3.0 + golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 // indirect golang.org/x/tools v0.0.0-20180904205237-0aa4b8830f48 // indirect + google.golang.org/grpc v1.15.0 // indirect gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 // indirect + k8s.io/api v0.0.0-20180911000052-49f236fe119e // indirect + k8s.io/apimachinery v0.0.0-20180908133737-0dbe21f815eb // indirect + k8s.io/client-go v8.0.0+incompatible // indirect ) diff --git a/go.sum b/go.sum index eaa4a1d078..9290c09281 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,71 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/NYTimes/gziphandler v1.0.1 h1:iLrQrdwjDd52kHDA5op2UBJFjmOb9g+7scBan4RN8F0= +github.com/NYTimes/gziphandler v1.0.1/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/RoaringBitmap/roaring v0.4.16 h1:NholfewybRLOwACgfqfzn/N5xa6keKNs4fP00t0cwLo= +github.com/RoaringBitmap/roaring v0.4.16/go.mod h1:8khRDP4HmeXns4xIj9oGrKSz7XTQiJx2zgh7AcNke4w= +github.com/Shopify/sarama v1.18.0 h1:Ha2FAOngREft7C44ouUXDxSZ/Y/77IDCMV1YS4AnUkI= +github.com/Shopify/sarama v1.18.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNgfBlViaCIJKLlCJ6/fmUseuG0wVQ= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= +github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999 h1:FLhNHUJvAhlTHUyOU9ikJbilbuikErpvjr9KVJ8UfsA= +github.com/apache/thrift v0.0.0-20180902110319-2566ecd5d999/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/armon/go-metrics v0.0.0-20180713145231-3c58d8115a78 h1:mdRSArcFLfW0VoL34LZAKSz6LkkK4jFxVx2xYavACMg= +github.com/armon/go-metrics v0.0.0-20180713145231-3c58d8115a78/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= +github.com/aws/aws-sdk-go v1.15.33 h1:Xch3KIkPonPmCiA/BlaRm+RljrWr3M5Jb4EeUpdgmTU= +github.com/aws/aws-sdk-go v1.15.33/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4= +github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= +github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5 h1:kS0dw4K730x7cxT+bVyTyYJZHuSoH7ofSr/Ijit56Qw= +github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5/go.mod h1:CDReaxg1cmLrtcasZy43l4EYPAknXLiQSrb7tLw5zXM= github.com/c-bata/go-prompt v0.2.2 h1:uyKRz6Z6DUyj49QVijyM339UJV9yhbr70gESwbNU3e0= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/codahale/hrw v0.0.0-20140702143812-d4f22a6ff3fc h1:ItikUNK+mfvstQUBc0kVKA41rqSmCNK2xVntAQgxKuo= +github.com/codahale/hrw v0.0.0-20140702143812-d4f22a6ff3fc/go.mod h1:Yt33RS6d6QFrXabhxeBIlfan1/Dsr+lankNgJ/lyOk8= +github.com/coreos/bbolt v1.3.0 h1:HIgH5xUWXT914HCI671AxuTTqjj64UOFr7pHn48LUTI= +github.com/coreos/bbolt v1.3.0/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= +github.com/coreos/etcd v3.3.9+incompatible h1:/pWnp1yEff0z+vBEOBFLZZ22Ux5xoVozEe7X0VFyRNo= +github.com/coreos/etcd v3.3.9+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= +github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 h1:akOQj8IVgoeFfBTzGOEQakCYshWD6RNo1M5pivFXt70= +github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= +github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= +github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd h1:r04MMPyLHj/QwZuMJ5+7tJcBr1AQjpiAK/rZWRrQT7o= +github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= +github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= +github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= +github.com/go-logfmt/logfmt v0.3.0 h1:8HUsc87TaSWLKwrnumgC8/YconD2fJQsRJAsWaPg2ic= +github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf h1:ukIp7SJ4RNEkyqdn8EZDzUTOsqWUbHnwPGU3d8pc7ok= github.com/gonum/blas v0.0.0-20180125090452-e7c5890b24cf/go.mod h1:P32wAyui1PQ58Oce/KYkOqQv8cVw1zAapXOl+dRFGbc= github.com/gonum/diff v0.0.0-20180125090814-f0137a19aa16 h1:W+m3s4RfTnJqXW1lmuPTG8hhB/MDzmkYuUpU0tLCQAM= @@ -26,17 +84,92 @@ github.com/gonum/matrix v0.0.0-20180124231301-a41cc49d4c29 h1:Aj+poYy0aVF2abLrHV github.com/gonum/matrix v0.0.0-20180124231301-a41cc49d4c29/go.mod h1:0EXg4mc1CNP0HCqCz+K4ts155PXIlUywf0wqN+GfPZw= github.com/gonum/stat v0.0.0-20180125090729-ec9c8a1062f4 h1:ljlDrxv0Wij8s9+WEYGswFmz/SEg75X832pYRsYA56Y= github.com/gonum/stat v0.0.0-20180125090729-ec9c8a1062f4/go.mod h1:Z4GIJBJO3Wa4gD4vbwQxXXZ+WHmW6E9ixmNrwvs0iZs= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-github v17.0.0+incompatible h1:N0LgJ1j65A7kfXrZnUDaYCs/Sf4rEjNlfyDHW9dolSY= +github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= +github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135 h1:zLTLjkaOFEFIOxY5BWLFLwh+cL8vOBW4XJ2aqLE/Tf0= +github.com/google/go-querystring v0.0.0-20170111101155-53e6ce116135/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= +github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= +github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho= +github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= +github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU= +github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw= +github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= +github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c h1:BTAbnbegUIMB6xmQCwWE8yRzbA4XSpnZY5hvRJC188I= +github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= +github.com/hashicorp/raft v1.0.0 h1:htBVktAOtGs4Le5Z7K8SF5H2+oWsQFYVmOgH5loro7Y= +github.com/hashicorp/raft v1.0.0/go.mod h1:DVSAWItjLjTOkVbSpWQ0j0kUADIvDaCtBxIcbNAQLkI= +github.com/influxdata/gobreaker v0.0.0-20180523174524-eeace644aa0e h1:zGyOggr8ljOkuUg1uYRuEGLHHFtB3mU3afGvdCJqLPc= +github.com/influxdata/gobreaker v0.0.0-20180523174524-eeace644aa0e/go.mod h1:LSm3xXXz2kAkFUGfk4eDtFYu06HDIi7Q3xWEkTFc/fY= +github.com/influxdata/idpe v0.0.0-20180911213036-9af06c765f1b h1:lxR1MMcpWFpdV1qBEvKGEoWVUTHp5rrjFM4j+Qu442w= +github.com/influxdata/idpe v0.0.0-20180911213036-9af06c765f1b/go.mod h1:9Nrz42q+gdoXz8A5u8xdleIgVeWtsfJJ+VJmE5f7dk8= +github.com/influxdata/inch v0.0.0-20180710214615-df945ff74609 h1:UOUbCE8RoE1t82Ju7tK3e18CzOUNLWhbIQRgcOL/zjY= +github.com/influxdata/inch v0.0.0-20180710214615-df945ff74609/go.mod h1:ZU18/cvxU0VgkTvluTnZCXu3/gBijfA1Vizd7l0kmdU= +github.com/influxdata/influxdb v1.6.2 h1:Cvl0/3n7/T6RkCefitJtEHWKJznmOA+9tT8gVx3vVS0= +github.com/influxdata/influxdb v1.6.2/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= +github.com/influxdata/influxql v0.0.0-20180823200743-a7267bff5327 h1:2BI2JbxV11hx8W8gtFleWN7nLmU0WBpuj298yaDVYws= +github.com/influxdata/influxql v0.0.0-20180823200743-a7267bff5327/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= +github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e h1:/o3vQtpWJhvnIbXley4/jwzzqNeigJK9z+LZcJZ9zfM= +github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= github.com/influxdata/platform v0.0.0-20180905213845-bbcbaee9fd79 h1:0kPsUxHdjBZaq9940tS/E5jcG/F6QAyBXqC7rf6tQcg= github.com/influxdata/platform v0.0.0-20180905213845-bbcbaee9fd79/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911182637-ee5cd72eae03 h1:hXts91n8Z4qBySl+1ZwQaOpaCQ6cvkm3Sb20Xv4mXRo= +github.com/influxdata/platform v0.0.0-20180911225651-06d3e46d1c1d h1:6kzvUMcN5rSxGWp3eziwWmJlqD7fwVKJNkZHu3xDUfs= +github.com/influxdata/platform v0.0.0-20180911225651-06d3e46d1c1d/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911230632-ab75b285da8b h1:zM7yiqSHATe39xD6UiGrdQrqneWGXyQXhfu1RM9dACo= +github.com/influxdata/platform v0.0.0-20180911230632-ab75b285da8b/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911230725-7c3bb66a91f5 h1:HkPgzA4nVAPsj7b28YH1krlCrblgD0S13ir5+M+FTzc= +github.com/influxdata/platform v0.0.0-20180911230725-7c3bb66a91f5/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911231539-562753cc5396 h1:gxaqdSjGj1PfnFsqZ5Zr4L4caH48GEoF05+H1Nb6skI= +github.com/influxdata/platform v0.0.0-20180911231539-562753cc5396/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911231805-c9d6a4292a20 h1:voQD9NoZXFwe4IN9DWUFPdKipl9uKqb40MVvkLWwW4w= +github.com/influxdata/platform v0.0.0-20180911231805-c9d6a4292a20/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911232327-b661e656df69 h1:7dpp6sFJn5SkrEUBpRPsnE9C831iRcvs5dcXnr81KYM= +github.com/influxdata/platform v0.0.0-20180911232327-b661e656df69/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= +github.com/influxdata/platform v0.0.0-20180911232614-de9ab156a890 h1:1mr5d3+FxCx1olze3eqDQVeOx3SYyduLB0JrYhT8i8I= +github.com/influxdata/platform v0.0.0-20180911232614-de9ab156a890/go.mod h1:o8AOzOaMzxS6kSO2oCsbj61/kxrwECx+kFNgE8fqFug= github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a h1:vMqgISSVkIqWxCIZs8m1L4096temR7IbYyNdMiBxSPA= github.com/influxdata/tdigest v0.0.0-20180711151920-a7d76c6f093a/go.mod h1:9GkyshztGufsdPQWjH+ifgnIr3xNUL5syI70g2dzU1o= +github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4= +github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= +github.com/influxdata/yamux v0.0.0-20171107173414-1f58ded512de h1:GqhjCUCDobXxeZrFNdaLTKpyTm25qM9Z8y7ihBTeuJA= +github.com/influxdata/yamux v0.0.0-20171107173414-1f58ded512de/go.mod h1:9fPSNWQM1MYwdjNIBSTzkIHxr/yJvac86kKtvsDs+2Y= +github.com/influxdata/yarpc v0.0.1 h1:qvQgl5KSfG0Jv2fUaZKjeLJnjsaoqMNtFP5i0zafsCE= +github.com/influxdata/yarpc v0.0.1/go.mod h1:y3O0SndVHp7xavEbUpKN/WUj5Ajr+wEWXBRvR7sS3Cw= +github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= +github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/jsternberg/zap-logfmt v1.2.0 h1:1v+PK4/B48cy8cfQbxL4FmmNZrjnIMr2BsnyEmXqv2o= +github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0= +github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d h1:of6+TpypLAaiv4JxgH5aplBZnt0b65B4v4c8q5oy+Sk= +github.com/julienschmidt/httprouter v0.0.0-20180715161854-348b672cd90d/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= +github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef h1:2jNeR4YUziVtswNP9sEFAI913cVrzH85T+8Q6LpYbT0= +github.com/jwilder/encoding v0.0.0-20170811194829-b4e1701a28ef/go.mod h1:Ct9fl0F6iIOGgxJ5npU/IUOhOhqlVrGjyIZc8/MagT0= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.4 h1:bnP0vzxcAdeI1zdubAl5PjU6zsERjGZb7raWodagDYs= @@ -47,12 +180,40 @@ github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a h1:8TGB3DFRNl06DB1Q6z github.com/mattn/go-tty v0.0.0-20180219170247-931426f7535a/go.mod h1:XPvLUNfbS4fJH25nqRHfWLMa1ONC8Amw+mIA639KxkE= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mna/pigeon v1.0.0 h1:n46IoStjdzjaXuyBH53j9HZ8CVqGWpC7P5/v8dP4qEY= github.com/mna/pigeon v1.0.0/go.mod h1:Iym28+kJVnC1hfQvv5MUtI6AiFFzvQjHcvI4RFTG/04= github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae h1:mQO+oxi0kpii/TX+ltfTCFuYkOjEn53JhaOObiMuvnk= github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae/go.mod h1:Iym28+kJVnC1hfQvv5MUtI6AiFFzvQjHcvI4RFTG/04= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/nats-io/gnatsd v1.3.0 h1:+5d80klu3QaJgNbdavVBjWJP7cHd11U2CLnRTFM9ICI= +github.com/nats-io/gnatsd v1.3.0/go.mod h1:nqco77VO78hLCJpIcVfygDP2rPGfsEHkGTUk94uh5DQ= +github.com/nats-io/go-nats v1.6.0 h1:FznPwMfrVwGnSCh7JTXyJDRW0TIkD4Tr+M1LPJt9T70= +github.com/nats-io/go-nats v1.6.0/go.mod h1:+t7RHT5ApZebkrQdnn6AhQJmhJJiKAvJUio1PiiCtj0= +github.com/nats-io/go-nats-streaming v0.4.0 h1:00wOBnTKzZGvQOFRSxj18kUm4X2TvXzv8LS0skZegPc= +github.com/nats-io/go-nats-streaming v0.4.0/go.mod h1:gfq4R3c9sKAINOpelo0gn/b9QDMBZnmrttcsNF+lqyo= +github.com/nats-io/nats-streaming-server v0.11.0 h1:6d32ASBeZJQOoams2GJuviQyf5GVF5R4StHVyEhSQK8= +github.com/nats-io/nats-streaming-server v0.11.0/go.mod h1:RyqtDJZvMZO66YmyjIYdIvS69zu/wDAkyNWa8PIUa5c= +github.com/nats-io/nuid v1.0.0 h1:44QGdhbiANq8ZCbUkdn6W5bqtg+mHuDE4wOUuxxndFs= +github.com/nats-io/nuid v1.0.0/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492 h1:lM6RxxfUMrYL/f8bWEUqdXrANWtrL7Nndbm9iFN0DlU= +github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/openzipkin/zipkin-go-opentracing v0.3.4 h1:x/pBv/5VJNWkcHF1G9xqhug8Iw7X1y1zOMzDmyuvP2g= +github.com/openzipkin/zipkin-go-opentracing v0.3.4/go.mod h1:js2AbwmHW0YD9DwIw2JhQWmbfFi/UnWyYwdVhqbCDOE= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= +github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= +github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= +github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= +github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/term v0.0.0-20180730021639-bffc007b7fd5 h1:tFwafIEMf0B7NlcxV/zJ6leBIa81D3hgGSgsE5hCkOQ= @@ -67,27 +228,83 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrO github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 h1:agujYaXJSxSo18YNX3jzl+4G6Bstwt+kqv47GS12uL0= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165 h1:nkcn14uNmFEuGCb2mBZbBb24RdNRL08b/wb+xBOYpuk= +github.com/rcrowley/go-metrics v0.0.0-20180503174638-e2704e165165/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/segmentio/kafka-go v0.1.0 h1:IXCHG+sXPNiIR5pC/vTEItZduPKu4cnpr85YgxpxlW0= +github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sirupsen/logrus v1.0.6 h1:hcP1GmhGigz/O7h1WVUM5KklBp1JoNS9FggWKdj/j3s= +github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= +github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= +github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2 h1:Fy0orTDgHdbnzHcsOgfCN4LtHf0ec3wwtiwJqwvf3Gc= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.0 h1:M4Rzxlu+RgU4pyBRKhKaVN1VeYOm8h2jgyXnAseDgCc= +github.com/spf13/viper v1.2.0/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= +github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= +github.com/tylerb/graceful v1.2.15 h1:B0x01Y8fsJpogzZTkDg6BDi6eMf03s01lEKGdrv83oA= +github.com/tylerb/graceful v1.2.15/go.mod h1:LPYTbOYmUTdabwRt0TGhLllQ0MUNbs0Y5q1WXJOI9II= +github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6 h1:YdYsPAZ2pC6Tow/nPZOPQ96O3hm/ToAkGsPLzedXERk= +github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b h1:2b9XGzhjiYsYPnKXoEfL7klWZQIt8IfyRCz62gCqqlQ= +golang.org/x/crypto v0.0.0-20180910181607-0e37d006457b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992 h1:BH3eQWeGbwRU2+wxxuuPOdFBmaiBH81O8BugSjHeTFg= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM= +golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180904205237-0aa4b8830f48 h1:PIz+xUHW4G/jqfFWeKhQ96ZV/t2HDsXfWj923rV0bZY= golang.org/x/tools v0.0.0-20180904205237-0aa4b8830f48/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/api v0.0.0-20180910000450-7ca32eb868bf h1:rjxqQmxjyqerRKEj+tZW+MCm4LgpFXu18bsEoCMgDsk= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.15.0 h1:Az/KuahOM4NAidTEuJCv/RonAA7rYsTPkqXVjr+8OOw= +google.golang.org/grpc v1.15.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 h1:E846t8CnR+lv5nE+VuiKTDG/v1U2stad0QzddfJC7kY= +gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5/go.mod h1:hiOFpYm0ZJbusNj2ywpbrXowU3G8U6GIQzqn2mw1UIE= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.0.0-20180911000052-49f236fe119e h1:K6ZWCvyoK2cMC12nMOjJ4dRqjVrnwFs+Efo81mWKkg0= +k8s.io/api v0.0.0-20180911000052-49f236fe119e/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20180908133737-0dbe21f815eb h1:Q+q1qi+vhGTvH9ryVhp9Ks3qrLqC3B3YfjWtRgIilM0= +k8s.io/apimachinery v0.0.0-20180908133737-0dbe21f815eb/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v8.0.0+incompatible h1:2pUaSg2x6iEHr8cia6zmWhoCXG1EDG9TCx9s//Aq7HY= +k8s.io/client-go v8.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= diff --git a/lang/compiler.go b/lang/compiler.go new file mode 100644 index 0000000000..e276cf9d1d --- /dev/null +++ b/lang/compiler.go @@ -0,0 +1,51 @@ +package lang + +import ( + "context" + "time" + + "github.com/influxdata/flux" +) + +const ( + FluxCompilerType = "flux" + SpecCompilerType = "spec" +) + +// AddCompilerMappings adds the Flux specific compiler mappings. +func AddCompilerMappings(mappings flux.CompilerMappings) error { + if err := mappings.Add(FluxCompilerType, func() flux.Compiler { + return new(FluxCompiler) + + }); err != nil { + return err + } + return mappings.Add(SpecCompilerType, func() flux.Compiler { + return new(SpecCompiler) + }) +} + +// FluxCompiler compiles a Flux script into a spec. +type FluxCompiler struct { + Query string `json:"query"` +} + +func (c FluxCompiler) Compile(ctx context.Context) (*flux.Spec, error) { + return flux.Compile(ctx, c.Query, time.Now()) +} + +func (c FluxCompiler) CompilerType() flux.CompilerType { + return FluxCompilerType +} + +// SpecCompiler implements Compiler by returning a known spec. +type SpecCompiler struct { + Spec *flux.Spec `json:"spec"` +} + +func (c SpecCompiler) Compile(ctx context.Context) (*flux.Spec, error) { + return c.Spec, nil +} +func (c SpecCompiler) CompilerType() flux.CompilerType { + return SpecCompilerType +} diff --git a/logger.go b/logger.go deleted file mode 100644 index a41518e94d..0000000000 --- a/logger.go +++ /dev/null @@ -1,50 +0,0 @@ -package flux - -import ( - "time" - - "github.com/influxdata/platform" -) - -// Logger persists metadata about executed queries. -type Logger interface { - Log(Log) error -} - -// Log captures a query and any relevant metadata for the query execution. -type Log struct { - // Time is the time the query was completed - Time time.Time - // OrganizationID is the ID of the organization that requested the query - OrganizationID platform.ID - // Error is any error encountered by the query - Error error - - // ProxyRequest is the query request - ProxyRequest *ProxyRequest - // ResponseSize is the size in bytes of the query response - ResponseSize int64 - // Statistics is a set of statistics about the query execution - Statistics Statistics -} - -// Redact removes any sensitive information before logging -func (q *Log) Redact() { - if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorization != nil { - // Make shallow copy of request - request := new(ProxyRequest) - *request = *q.ProxyRequest - - // Make shallow copy of authorization - auth := new(platform.Authorization) - *auth = *q.ProxyRequest.Request.Authorization - // Redact authorization token - auth.Token = "" - - // Apply redacted authorization - request.Request.Authorization = auth - - // Apply redacted request - q.ProxyRequest = request - } -} diff --git a/logging.go b/logging.go deleted file mode 100644 index 7305f1c165..0000000000 --- a/logging.go +++ /dev/null @@ -1,57 +0,0 @@ -package flux - -import ( - "context" - "fmt" - "io" - "time" -) - -// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface. -type LoggingServiceBridge struct { - QueryService QueryService - QueryLogger Logger -} - -// Query executes and logs the query. -func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) { - var stats Statistics - defer func() { - r := recover() - if r != nil { - err = fmt.Errorf("panic: %v", r) - } - log := Log{ - OrganizationID: req.Request.OrganizationID, - ProxyRequest: req, - ResponseSize: n, - Time: time.Now(), - Statistics: stats, - } - if err != nil { - log.Error = err - } - s.QueryLogger.Log(log) - }() - - results, err := s.QueryService.Query(ctx, &req.Request) - if err != nil { - return 0, err - } - // Check if this result iterator reports stats. We call this defer before cancel because - // the query needs to be finished before it will have valid statistics. - if s, ok := results.(Statisticser); ok { - defer func() { - stats = s.Statistics() - }() - } - defer results.Cancel() - - encoder := req.Dialect.Encoder() - n, err = encoder.Encode(w, results) - if err != nil { - return n, err - } - // The results iterator may have had an error independent of encoding errors. - return n, results.Err() -} diff --git a/mock/executor.go b/mock/executor.go index 89cc594dda..a8f80f592b 100644 --- a/mock/executor.go +++ b/mock/executor.go @@ -6,25 +6,24 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/plan" - "github.com/influxdata/platform" ) var _ execute.Executor = (*Executor)(nil) // Executor is a mock implementation of an execute.Executor. type Executor struct { - ExecuteFn func(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *execute.Allocator) (map[string]flux.Result, error) + ExecuteFn func(ctx context.Context, p *plan.PlanSpec, a *execute.Allocator) (map[string]flux.Result, error) } // NewExecutor returns a mock Executor where its methods will return zero values. func NewExecutor() *Executor { return &Executor{ - ExecuteFn: func(context.Context, platform.ID, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { + ExecuteFn: func(context.Context, *plan.PlanSpec, *execute.Allocator) (map[string]flux.Result, error) { return nil, nil }, } } -func (e *Executor) Execute(ctx context.Context, orgID platform.ID, p *plan.PlanSpec, a *execute.Allocator) (map[string]flux.Result, error) { - return e.ExecuteFn(ctx, orgID, p, a) +func (e *Executor) Execute(ctx context.Context, p *plan.PlanSpec, a *execute.Allocator) (map[string]flux.Result, error) { + return e.ExecuteFn(ctx, p, a) } diff --git a/mock/service.go b/mock/service.go deleted file mode 100644 index 830c2bbe80..0000000000 --- a/mock/service.go +++ /dev/null @@ -1,38 +0,0 @@ -package mock - -import ( - "context" - "io" - - "github.com/influxdata/flux" -) - -// ProxyQueryService mocks the idep QueryService for testing. -type ProxyQueryService struct { - QueryF func(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) -} - -// Query writes the results of the query request. -func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { - return s.QueryF(ctx, w, req) -} - -// QueryService mocks the idep QueryService for testing. -type QueryService struct { - QueryF func(ctx context.Context, req *flux.Request) (flux.ResultIterator, error) -} - -// Query writes the results of the query request. -func (s *QueryService) Query(ctx context.Context, req *flux.Request) (flux.ResultIterator, error) { - return s.QueryF(ctx, req) -} - -// AsyncQueryService mocks the idep QueryService for testing. -type AsyncQueryService struct { - QueryF func(ctx context.Context, req *flux.Request) (flux.Query, error) -} - -// Query writes the results of the query request. -func (s *AsyncQueryService) Query(ctx context.Context, req *flux.Request) (flux.Query, error) { - return s.QueryF(ctx, req) -} diff --git a/operation.go b/operation.go index 2de071ed29..e1a664958f 100644 --- a/operation.go +++ b/operation.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" - "github.com/influxdata/platform" "github.com/pkg/errors" ) @@ -72,12 +71,6 @@ type OperationSpec interface { Kind() OperationKind } -// BucketAwareOperationSpec specifies an operation that reads or writes buckets -type BucketAwareOperationSpec interface { - OperationSpec - BucketsAccessed() (readBuckets, writeBuckets []platform.BucketFilter) -} - // OperationID is a unique ID within a query for the operation. type OperationID string diff --git a/query.go b/query.go index add10b6b5b..876ddc91ad 100644 --- a/query.go +++ b/query.go @@ -1,59 +1,6 @@ -// Package query contains the InfluxDB 2.0 query engine. package flux -import ( - "context" - "encoding/json" - "fmt" - "io" - "time" - - "github.com/influxdata/platform" -) - -// QueryService represents a type capable of performing queries. -type QueryService interface { - // Query submits a query for execution returning a results iterator. - // Cancel must be called on any returned results to free resources. - Query(ctx context.Context, req *Request) (ResultIterator, error) -} - -// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously. -type AsyncQueryService interface { - // Query submits a query for execution returning immediately. - // Done must be called on any returned Query objects. - Query(ctx context.Context, req *Request) (Query, error) -} - -// ProxyQueryService performs queries and encodes the result into a writer. -// The results are opaque to a ProxyQueryService. -type ProxyQueryService interface { - // Query performs the requested query and encodes the results into w. - // The number of bytes written to w is returned __independent__ of any error. - Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error) -} - -// ResultIterator allows iterating through all results -// Cancel must be called to free resources. -// ResultIterators may implement Statisticser. -type ResultIterator interface { - // More indicates if there are more results. - More() bool - - // Next returns the next result. - // If More is false, Next panics. - Next() Result - - // Cancel discards the remaining results. - // Cancel must always be called to free resources. - // It is safe to call Cancel multiple times. - Cancel() - - // Err reports the first error encountered. - // Err will not report anything unless More has returned false, - // or the query has been cancelled. - Err() error -} +import "time" // Query represents an active query. type Query interface { @@ -80,172 +27,6 @@ type Query interface { Statisticser } -// Request respresents the query to run. -type Request struct { - // Scope - Authorization *platform.Authorization `json:"authorization,omitempty"` - OrganizationID platform.ID `json:"organization_id"` - - // Command - - // Compiler converts the query to a specification to run against the data. - Compiler Compiler `json:"compiler"` - - // compilerMappings maps compiler types to creation methods - compilerMappings CompilerMappings -} - -// WithCompilerMappings sets the query type mappings on the request. -func (r *Request) WithCompilerMappings(mappings CompilerMappings) { - r.compilerMappings = mappings -} - -// UnmarshalJSON populates the request from the JSON data. -// WithCompilerMappings must have been called or an error will occur. -func (r *Request) UnmarshalJSON(data []byte) error { - type Alias Request - raw := struct { - *Alias - CompilerType CompilerType `json:"compiler_type"` - Compiler json.RawMessage `json:"compiler"` - }{ - Alias: (*Alias)(r), - } - if err := json.Unmarshal(data, &raw); err != nil { - return err - } - - createCompiler, ok := r.compilerMappings[raw.CompilerType] - if !ok { - return fmt.Errorf("unsupported compiler type %q", raw.CompilerType) - } - - c := createCompiler() - if err := json.Unmarshal(raw.Compiler, c); err != nil { - return err - } - r.Compiler = c - - return nil -} - -func (r Request) MarshalJSON() ([]byte, error) { - type Alias Request - raw := struct { - Alias - CompilerType CompilerType `json:"compiler_type"` - }{ - Alias: (Alias)(r), - CompilerType: r.Compiler.CompilerType(), - } - return json.Marshal(raw) -} - -// Compiler produces a specification for the query. -type Compiler interface { - // Compile produces a specification for the query. - Compile(ctx context.Context) (*Spec, error) - CompilerType() CompilerType -} - -// CompilerType is the name of a query compiler. -type CompilerType string -type CreateCompiler func() Compiler -type CompilerMappings map[CompilerType]CreateCompiler - -func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error { - if _, ok := m[t]; ok { - return fmt.Errorf("duplicate compiler mapping for %q", t) - } - m[t] = c - return nil -} - -// ProxyRequest specifies a query request and the dialect for the results. -type ProxyRequest struct { - // Request is the basic query request - Request Request `json:"request"` - - // Dialect is the result encoder - Dialect Dialect `json:"dialect"` - - // dialectMappings maps dialect types to creation methods - dialectMappings DialectMappings -} - -// WithCompilerMappings sets the compiler type mappings on the request. -func (r *ProxyRequest) WithCompilerMappings(mappings CompilerMappings) { - r.Request.WithCompilerMappings(mappings) -} - -// WithDialectMappings sets the dialect type mappings on the request. -func (r *ProxyRequest) WithDialectMappings(mappings DialectMappings) { - r.dialectMappings = mappings -} - -// UnmarshalJSON populates the request from the JSON data. -// WithCompilerMappings and WithDialectMappings must have been called or an error will occur. -func (r *ProxyRequest) UnmarshalJSON(data []byte) error { - type Alias ProxyRequest - raw := struct { - *Alias - DialectType DialectType `json:"dialect_type"` - Dialect json.RawMessage `json:"dialect"` - }{ - Alias: (*Alias)(r), - } - if err := json.Unmarshal(data, &raw); err != nil { - return err - } - - createDialect, ok := r.dialectMappings[raw.DialectType] - if !ok { - return fmt.Errorf("unsupported dialect type %q", raw.DialectType) - } - - d := createDialect() - if err := json.Unmarshal(raw.Dialect, d); err != nil { - return err - } - r.Dialect = d - - return nil -} - -func (r ProxyRequest) MarshalJSON() ([]byte, error) { - type Alias ProxyRequest - raw := struct { - Alias - DialectType DialectType `json:"dialect_type"` - }{ - Alias: (Alias)(r), - DialectType: r.Dialect.DialectType(), - } - return json.Marshal(raw) -} - -// Dialect describes how to encode results. -type Dialect interface { - // Encoder creates an encoder for the results - Encoder() MultiResultEncoder - // DialectType report the type of the dialect - DialectType() DialectType -} - -// DialectType is the name of a query result dialect. -type DialectType string -type CreateDialect func() Dialect - -type DialectMappings map[DialectType]CreateDialect - -func (m DialectMappings) Add(t DialectType, c CreateDialect) error { - if _, ok := m[t]; ok { - return fmt.Errorf("duplicate dialect mapping for %q", t) - } - m[t] = c - return nil -} - // Statisticser reports statisitcs about query processing. type Statisticser interface { // Statistics reports the statisitcs for the query. diff --git a/query_test.go b/query_test.go deleted file mode 100644 index c628933f25..0000000000 --- a/query_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package flux_test - -import ( - "context" - "encoding/json" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - "github.com/influxdata/flux" - "github.com/influxdata/platform" -) - -var CmpOpts = []cmp.Option{ - cmpopts.IgnoreUnexported(flux.ProxyRequest{}), - cmpopts.IgnoreUnexported(flux.Request{}), -} - -type compilerA struct { - A string `json:"a"` -} - -func (c compilerA) Compile(ctx context.Context) (*flux.Spec, error) { - panic("not implemented") -} - -func (c compilerA) CompilerType() flux.CompilerType { - return "compilerA" -} - -var compilerMappings = flux.CompilerMappings{ - "compilerA": func() flux.Compiler { return new(compilerA) }, -} - -type dialectB struct { - B int `json:"b"` -} - -func (d dialectB) Encoder() flux.MultiResultEncoder { - panic("not implemented") -} - -func (d dialectB) DialectType() flux.DialectType { - return "dialectB" -} - -var dialectMappings = flux.DialectMappings{ - "dialectB": func() flux.Dialect { return new(dialectB) }, -} - -func TestRequest_JSON(t *testing.T) { - testCases := []struct { - name string - data string - want flux.Request - }{ - { - name: "simple", - data: `{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"}`, - want: flux.Request{ - OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA}, - Compiler: &compilerA{ - A: "my custom compiler", - }, - }, - }, - } - for _, tc := range testCases { - var r flux.Request - r.WithCompilerMappings(compilerMappings) - - if err := json.Unmarshal([]byte(tc.data), &r); err != nil { - t.Fatal(err) - } - if !cmp.Equal(tc.want, r, CmpOpts...) { - t.Fatalf("unexpected request: -want/+got:\n%s", cmp.Diff(tc.want, r, CmpOpts...)) - } - marshalled, err := json.Marshal(r) - if err != nil { - t.Fatal(err) - } - if got, want := string(marshalled), tc.data; got != want { - t.Fatalf("unexpected marshalled request: -want/+got:\n%s", cmp.Diff(want, got)) - } - } -} - -func TestProxyRequest_JSON(t *testing.T) { - testCases := []struct { - name string - data string - want flux.ProxyRequest - }{ - { - name: "simple", - data: `{"request":{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"},"dialect":{"b":42},"dialect_type":"dialectB"}`, - want: flux.ProxyRequest{ - Request: flux.Request{ - OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA}, - Compiler: &compilerA{ - A: "my custom compiler", - }, - }, - Dialect: &dialectB{ - B: 42, - }, - }, - }, - } - for _, tc := range testCases { - var pr flux.ProxyRequest - pr.WithCompilerMappings(compilerMappings) - pr.WithDialectMappings(dialectMappings) - - if err := json.Unmarshal([]byte(tc.data), &pr); err != nil { - t.Fatal(err) - } - if !cmp.Equal(tc.want, pr, CmpOpts...) { - t.Fatalf("unexpected proxy request: -want/+got:\n%s", cmp.Diff(tc.want, pr, CmpOpts...)) - } - marshalled, err := json.Marshal(pr) - if err != nil { - t.Fatal(err) - } - if got, want := string(marshalled), tc.data; got != want { - t.Fatalf("unexpected marshalled proxy request: -want/+got:\n%s", cmp.Diff(want, got)) - } - } -} diff --git a/querytest/compile.go b/querytest/compile.go index 87896eed81..43647c2034 100644 --- a/querytest/compile.go +++ b/querytest/compile.go @@ -5,23 +5,18 @@ import ( "testing" "time" - "fmt" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/influxdata/flux" "github.com/influxdata/flux/functions" "github.com/influxdata/flux/semantic/semantictest" - "github.com/influxdata/platform" ) type NewQueryTestCase struct { - Name string - Raw string - Want *flux.Spec - WantErr bool - WantReadBuckets *[]platform.BucketFilter - WantWriteBuckets *[]platform.BucketFilter + Name string + Raw string + Want *flux.Spec + WantErr bool } var opts = append( @@ -38,7 +33,7 @@ func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) { now := time.Now().UTC() got, err := flux.Compile(context.Background(), tc.Raw, now) if (err != nil) != tc.WantErr { - t.Errorf("query.NewQuery() error = %v, wantErr %v", err, tc.WantErr) + t.Errorf("error compiling spec error = %v, wantErr %v", err, tc.WantErr) return } if tc.WantErr { @@ -47,38 +42,7 @@ func NewQueryTestHelper(t *testing.T, tc NewQueryTestCase) { if tc.Want != nil { tc.Want.Now = now if !cmp.Equal(tc.Want, got, opts...) { - t.Errorf("query.NewQuery() = -want/+got %s", cmp.Diff(tc.Want, got, opts...)) - } - } - - var gotReadBuckets, gotWriteBuckets []platform.BucketFilter - if tc.WantReadBuckets != nil || tc.WantWriteBuckets != nil { - gotReadBuckets, gotWriteBuckets, err = got.BucketsAccessed() - } - - if tc.WantReadBuckets != nil { - if diagnostic := verifyBuckets(*tc.WantReadBuckets, gotReadBuckets); diagnostic != "" { - t.Errorf("Could not verify read buckets: %v", diagnostic) - } - } - - if tc.WantWriteBuckets != nil { - if diagnostic := verifyBuckets(*tc.WantWriteBuckets, gotWriteBuckets); diagnostic != "" { - t.Errorf("Could not verify write buckets: %v", diagnostic) - } - } -} - -func verifyBuckets(wantBuckets, gotBuckets []platform.BucketFilter) string { - if len(wantBuckets) != len(gotBuckets) { - return fmt.Sprintf("Expected %v buckets but got %v", len(wantBuckets), len(gotBuckets)) - } - - for i, wantBucket := range wantBuckets { - if diagnostic := cmp.Diff(wantBucket, gotBuckets[i]); diagnostic != "" { - return fmt.Sprintf("Bucket mismatch: -want/+got:\n%v", diagnostic) + t.Errorf("unexpected specs -want/+got %s", cmp.Diff(tc.Want, got, opts...)) } } - - return "" } diff --git a/querytest/execute.go b/querytest/execute.go index cedcb680ab..4cab129ce9 100644 --- a/querytest/execute.go +++ b/querytest/execute.go @@ -1,23 +1,32 @@ package querytest import ( + "context" + "io" "math" "github.com/influxdata/flux" "github.com/influxdata/flux/control" "github.com/influxdata/flux/functions" - "github.com/influxdata/platform" ) -var ( - staticResultID platform.ID -) +type Querier struct { + c *control.Controller +} + +func (q *Querier) Query(ctx context.Context, w io.Writer, c flux.Compiler, d flux.Dialect) (int64, error) { + query, err := q.c.Query(ctx, c) + if err != nil { + return 0, err + } + results := flux.NewResultIteratorFromQuery(query) + defer results.Cancel() -func init() { - staticResultID.DecodeFromString("1") + encoder := d.Encoder() + return encoder.Encode(w, results) } -func GetProxyQueryServiceBridge() flux.ProxyQueryServiceBridge { +func NewQuerier() *Querier { config := control.Config{ ConcurrencyQuota: 1, MemoryBytesQuota: math.MaxInt64, @@ -25,10 +34,8 @@ func GetProxyQueryServiceBridge() flux.ProxyQueryServiceBridge { c := control.New(config) - return flux.ProxyQueryServiceBridge{ - QueryService: flux.QueryServiceBridge{ - AsyncQueryService: c, - }, + return &Querier{ + c: c, } } diff --git a/repl/repl.go b/repl/repl.go index 334a8aa4b0..26df15ae5b 100644 --- a/repl/repl.go +++ b/repl/repl.go @@ -15,28 +15,29 @@ import ( prompt "github.com/c-bata/go-prompt" "github.com/influxdata/flux" - "github.com/influxdata/flux/control" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/functions" "github.com/influxdata/flux/interpreter" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/parser" "github.com/influxdata/flux/semantic" "github.com/influxdata/flux/values" - "github.com/influxdata/platform" "github.com/pkg/errors" ) type REPL struct { - orgID platform.ID - interpreter *interpreter.Interpreter declarations semantic.DeclarationScope - c *control.Controller + querier Querier cancelMu sync.Mutex cancelFunc context.CancelFunc } +type Querier interface { + Query(ctx context.Context, compiler flux.Compiler) (flux.Query, error) +} + func addBuiltIn(script string, itrp *interpreter.Interpreter, declarations semantic.DeclarationScope) error { astProg, err := parser.NewAST(script) if err != nil { @@ -53,16 +54,15 @@ func addBuiltIn(script string, itrp *interpreter.Interpreter, declarations seman return nil } -func New(c *control.Controller, orgID platform.ID) *REPL { +func New(q Querier) *REPL { itrp := flux.NewInterpreter() _, decls := flux.BuiltIns() addBuiltIn("run = () => yield(table:_)", itrp, decls) return &REPL{ - orgID: orgID, interpreter: itrp, declarations: decls, - c: c, + querier: q, } } @@ -201,14 +201,11 @@ func (r *REPL) doQuery(spec *flux.Spec) error { defer cancelFunc() defer r.clearCancel() - req := &flux.Request{ - OrganizationID: r.orgID, - Compiler: flux.SpecCompiler{ - Spec: spec, - }, + compiler := lang.SpecCompiler{ + Spec: spec, } - q, err := r.c.Query(ctx, req) + q, err := r.querier.Query(ctx, compiler) if err != nil { return err } diff --git a/bridges.go b/result_iterator.go similarity index 56% rename from bridges.go rename to result_iterator.go index 6ed59c6385..14fd5b2351 100644 --- a/bridges.go +++ b/result_iterator.go @@ -1,40 +1,45 @@ package flux -import ( - "context" - "io" - "sort" -) +import "sort" -// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface. -type QueryServiceBridge struct { - AsyncQueryService AsyncQueryService -} +// ResultIterator allows iterating through all results +// Cancel must be called to free resources. +// ResultIterators may implement Statisticser. +type ResultIterator interface { + // More indicates if there are more results. + More() bool -func (b QueryServiceBridge) Query(ctx context.Context, req *Request) (ResultIterator, error) { - query, err := b.AsyncQueryService.Query(ctx, req) - if err != nil { - return nil, err - } - return newResultIterator(query), nil + // Next returns the next result. + // If More is false, Next panics. + Next() Result + + // Cancel discards the remaining results. + // Cancel must always be called to free resources. + // It is safe to call Cancel multiple times. + Cancel() + + // Err reports the first error encountered. + // Err will not report anything unless More has returned false, + // or the query has been cancelled. + Err() error } -// resultIterator implements a ResultIterator while consuming a Query -type resultIterator struct { +// QueryResultIterator implements a ResultIterator while consuming a Query +type QueryResultIterator struct { query Query cancel chan struct{} ready bool results *MapResultIterator } -func newResultIterator(q Query) *resultIterator { - return &resultIterator{ +func NewResultIteratorFromQuery(q Query) *QueryResultIterator { + return &QueryResultIterator{ query: q, cancel: make(chan struct{}), } } -func (r *resultIterator) More() bool { +func (r *QueryResultIterator) More() bool { if !r.ready { select { case <-r.cancel: @@ -56,11 +61,11 @@ DONE: return false } -func (r *resultIterator) Next() Result { +func (r *QueryResultIterator) Next() Result { return r.results.Next() } -func (r *resultIterator) Cancel() { +func (r *QueryResultIterator) Cancel() { select { case <-r.cancel: default: @@ -69,11 +74,11 @@ func (r *resultIterator) Cancel() { r.query.Cancel() } -func (r *resultIterator) Err() error { +func (r *QueryResultIterator) Err() error { return r.query.Err() } -func (r *resultIterator) Statistics() Statistics { +func (r *QueryResultIterator) Statistics() Statistics { return r.query.Statistics() } @@ -139,23 +144,3 @@ func (r *SliceResultIterator) Cancel() { func (r *SliceResultIterator) Err() error { return nil } - -// ProxyQueryServiceBridge implements ProxyQueryService while consuming a QueryService interface. -type ProxyQueryServiceBridge struct { - QueryService QueryService -} - -func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) { - results, err := b.QueryService.Query(ctx, &req.Request) - if err != nil { - return 0, err - } - defer results.Cancel() - encoder := req.Dialect.Encoder() - n, err = encoder.Encode(w, results) - if err != nil { - return n, err - } - - return n, nil -} diff --git a/spec.go b/spec.go index 3b326db30d..5386468d32 100644 --- a/spec.go +++ b/spec.go @@ -4,7 +4,6 @@ import ( "fmt" "time" - "github.com/influxdata/platform" "github.com/pkg/errors" ) @@ -180,22 +179,3 @@ func (q *Spec) Functions() ([]string, error) { }) return funcs, err } - -// BucketsAccessed returns the set of buckets read and written by a query spec -func (q *Spec) BucketsAccessed() (readBuckets, writeBuckets []platform.BucketFilter, err error) { - err = q.Walk(func(o *Operation) error { - bucketAwareOpSpec, ok := o.Spec.(BucketAwareOperationSpec) - if ok { - opBucketsRead, opBucketsWritten := bucketAwareOpSpec.BucketsAccessed() - readBuckets = append(readBuckets, opBucketsRead...) - writeBuckets = append(writeBuckets, opBucketsWritten...) - } - return nil - }) - - if err != nil { - return nil, nil, err - } - - return readBuckets, writeBuckets, nil -}