diff --git a/csv/result.go b/csv/result.go index 14ee23b173..18cc3b5752 100644 --- a/csv/result.go +++ b/csv/result.go @@ -80,6 +80,9 @@ type MultiResultDecoder struct { // NewMultiResultDecoder creates a new MultiResultDecoder. func NewMultiResultDecoder(c ResultDecoderConfig) *MultiResultDecoder { + if c.MaxBufferCount == 0 { + c.MaxBufferCount = defaultMaxBufferCount + } return &MultiResultDecoder{ c: c, } diff --git a/execute/executetest/result.go b/execute/executetest/result.go index 980de34e42..d8fc536fd6 100644 --- a/execute/executetest/result.go +++ b/execute/executetest/result.go @@ -1,6 +1,8 @@ package executetest -import "github.com/influxdata/platform/query" +import ( + "github.com/influxdata/platform/query" +) type Result struct { Blks []*Block diff --git a/influxql/influxql_encoder_test.go b/influxql/influxql_encoder_test.go deleted file mode 100644 index 4e4ba5809f..0000000000 --- a/influxql/influxql_encoder_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package influxql - -import ( - "testing" - "time" - - "bytes" - "strings" - - "github.com/google/go-cmp/cmp" - "github.com/influxdata/platform/query" - "github.com/influxdata/platform/query/execute" - "github.com/influxdata/platform/query/execute/executetest" -) - -var epoch = time.Unix(0, 0) - -func TestMultiResultEncoder_Encode(t *testing.T) { - testCases := []struct { - name string - query string - blocks map[string][]*executetest.Block - output string - }{ - { - query: "", - name: "one result one row", - blocks: map[string][]*executetest.Block{ - "0": {{ - KeyCols: []string{"_measurement", "_field", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "_field", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: execute.DefaultValueColLabel, Type: query.TFloat}, - }, - Data: [][]interface{}{ - {execute.Time(5), "cpu", "max", "localhost", 98.9}, - }, - }}, - }, - output: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"localhost"},"columns":["time","max"],"values":[["1970-01-01T00:00:00Z",98.9]]}]}]}`, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - - resultsmap := map[string]query.Result{} - - for k, v := range tc.blocks { - resultsmap[k] = executetest.NewResult(v) - } - - results := query.NewMapResultIterator(resultsmap) - - var resp bytes.Buffer - var influxQLEncoder MultiResultEncoder - err := influxQLEncoder.Encode(&resp, results) - - if err != nil { - t.Error("error writing to buffer: ", err) - } - got := strings.TrimSpace(resp.String()) - if !cmp.Equal(got, tc.output) { - t.Error("unexpected results -want/+got", cmp.Diff(tc.output, got)) - } - }) - } -} diff --git a/influxql/response.go b/influxql/response.go index 297525442b..5c45e582f5 100644 --- a/influxql/response.go +++ b/influxql/response.go @@ -1,10 +1,5 @@ package influxql -import ( - "encoding/json" - "errors" -) - // all of this code is copied more or less verbatim from the influxdb repo. // we copy instead of sharing because we want to prevent inadvertent breaking // changes introduced by the transpiler vs the actual InfluxQL engine. @@ -12,8 +7,13 @@ import ( // results generated by the transpiler diverge from InfluxQL. type Response struct { - Results []Result `json:"results"` - Err string `json:"err,omitempty"` + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` +} + +func (r *Response) error(err error) { + r.Results = nil + r.Err = err.Error() } // Message represents a user-facing message to be included with the result. @@ -27,56 +27,18 @@ type Message struct { type Result struct { // StatementID is just the statement's position in the query. It's used // to combine statement results if they're being buffered in memory. - StatementID int - Series Rows - Messages []*Message - Partial bool - Err error -} - -// MarshalJSON encodes the result into JSON. -func (r *Result) MarshalJSON() ([]byte, error) { - // Define a struct that outputs "error" as a string. - var o struct { - StatementID int `json:"statement_id"` - Series []*Row `json:"series,omitempty"` - Messages []*Message `json:"messages,omitempty"` - Partial bool `json:"partial,omitempty"` - Err string `json:"error,omitempty"` - } - - // Copy fields to output struct. - o.StatementID = r.StatementID - o.Series = r.Series - o.Messages = r.Messages - o.Partial = r.Partial - if r.Err != nil { - o.Err = r.Err.Error() - } - - return json.Marshal(&o) + StatementID int `json:"statement_id"` + Series []*Row `json:"series,omitempty"` + Messages []*Message `json:"messages,omitempty"` + Partial bool `json:"partial,omitempty"` + Err string `json:"error,omitempty"` } -// UnmarshalJSON decodes the data into the Result struct -func (r *Result) UnmarshalJSON(b []byte) error { - var o struct { - StatementID int `json:"statement_id"` - Series []*Row `json:"series,omitempty"` - Messages []*Message `json:"messages,omitempty"` - Partial bool `json:"partial,omitempty"` - Err string `json:"error,omitempty"` - } - - err := json.Unmarshal(b, &o) - if err != nil { - return err - } - r.StatementID = o.StatementID - r.Series = o.Series - r.Messages = o.Messages - r.Partial = o.Partial - if o.Err != "" { - r.Err = errors.New(o.Err) - } - return nil +// Row represents a single row returned from the execution of a statement. +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` + Partial bool `json:"partial,omitempty"` } diff --git a/influxql/result.go b/influxql/result.go index 1c871532df..9910907dc3 100644 --- a/influxql/result.go +++ b/influxql/result.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/platform/query" - "github.com/influxdata/platform/query/execute" ) // MultiResultEncoder encodes results as InfluxQL JSON format. @@ -17,13 +16,16 @@ type MultiResultEncoder struct{} // Encode writes a collection of results to the influxdb 1.X http response format. // Expectations/Assumptions: -// 1. Each result will be published as a 'statement' in the top-level list of results. The 'staementID' -// will be interpreted as an integer, and will return an error otherwise. -// 2. If the _field name is present in the tags, and a _value column is present, the _value column will -// be renamed to the value of the _field tag -// 3. If the _measurement name is present in the tags, it will be used as the row.Name for all rows. -// Otherwise, we'll use the column value, which _must_ be present in that case. - +// 1. Each result will be published as a 'statement' in the top-level list of results. The result name +// will be interpreted as an integer and used as the statement id. +// 2. If the _measurement name is present in the partition key, it will be used as the result name instead +// of as a normal tag. +// 3. All columns in the partition key must be strings and they will be used as tags. There is no current way +// to have a tag and field be the same name in the results. +// TODO(jsternberg): For full compatibility, the above must be possible. +// 4. All other columns are fields and will be output in the order they are found. +// TODO(jsternberg): This function currently requires the first column to be a time field, but this isn't +// a strict requirement and will be lifted when we begin to work on transpiling meta queries. func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) error { resp := Response{} @@ -31,17 +33,17 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e name, r := results.Next() id, err := strconv.Atoi(name) if err != nil { - return fmt.Errorf("unable to parse statement id from result name: %s", err) + resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err)) + results.Cancel() + break } blocks := r.Blocks() result := Result{StatementID: id} - err = blocks.Do(func(b query.Block) error { - r := NewRow() + if err := blocks.Do(func(b query.Block) error { + var r Row - fieldName := "" - measurementVaries := -1 for j, c := range b.Key().Cols() { if c.Type != query.TString { return fmt.Errorf("partition column %q is not a string type", c.Label) @@ -49,89 +51,97 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e v := b.Key().Value(j).(string) if c.Label == "_measurement" { r.Name = v - } else if c.Label == "_field" { - fieldName = v } else { + if r.Tags == nil { + r.Tags = make(map[string]string) + } r.Tags[c.Label] = v } } - for i, c := range b.Cols() { - if c.Label == "_time" { + for _, c := range b.Cols() { + if c.Label == "time" { r.Columns = append(r.Columns, "time") - } else if c.Label == "_value" && fieldName != "" { - r.Columns = append(r.Columns, fieldName) } else if !b.Key().HasCol(c.Label) { r.Columns = append(r.Columns, c.Label) - if r.Name == "" && c.Label == "_measurement" { - measurementVaries = i - } } } - if r.Name == "" && measurementVaries == -1 { - return fmt.Errorf("no Measurement name found in result blocks for result: %s", name) - } - timeIdx := execute.ColIdx(execute.DefaultTimeColLabel, b.Cols()) - if timeIdx < 0 { - return errors.New("table must have an _time column") - } - if typ := b.Cols()[timeIdx].Type; typ != query.TTime { - return fmt.Errorf("column _time must be of type Time got %v", typ) - } - err := b.Do(func(cr query.ColReader) error { - ts := cr.Times(timeIdx) - for i := range ts { - var v []interface{} - - for j, c := range cr.Cols() { - if cr.Key().HasCol(c.Label) { - continue - } - - if j == measurementVaries { - if c.Type != query.TString { - return errors.New("unexpected type, _measurement is not a string") - } - r.Name = cr.Strings(j)[i] - continue - } + if err := b.Do(func(cr query.ColReader) error { + var values [][]interface{} + j := 0 + for idx, c := range b.Cols() { + if cr.Key().HasCol(c.Label) { + continue + } + // Use the first column, usually time, to pre-generate all of the value containers. + if j == 0 { switch c.Type { - case query.TFloat: - v = append(v, cr.Floats(j)[i]) - case query.TInt: - v = append(v, cr.Ints(j)[i]) - case query.TString: - v = append(v, cr.Strings(j)[i]) - case query.TUInt: - v = append(v, cr.UInts(j)[i]) - case query.TBool: - v = append(v, cr.Bools(j)[i]) case query.TTime: - v = append(v, cr.Times(j)[i].Time().Format(time.RFC3339)) + values = make([][]interface{}, len(cr.Times(0))) default: - v = append(v, "unknown") + // TODO(jsternberg): Support using other columns. This will + // mostly be necessary for meta queries. + return errors.New("first column must be time") + } + + for j := range values { + values[j] = make([]interface{}, len(r.Columns)) } } - r.Values = append(r.Values, v) + // Fill in the values for each column. + switch c.Type { + case query.TFloat: + for i, v := range cr.Floats(idx) { + values[i][j] = v + } + case query.TInt: + for i, v := range cr.Ints(idx) { + values[i][j] = v + } + case query.TString: + for i, v := range cr.Strings(idx) { + values[i][j] = v + } + case query.TUInt: + for i, v := range cr.UInts(idx) { + values[i][j] = v + } + case query.TBool: + for i, v := range cr.Bools(idx) { + values[i][j] = v + } + case query.TTime: + for i, v := range cr.Times(idx) { + values[i][j] = v.Time().Format(time.RFC3339) + } + default: + return fmt.Errorf("unsupported column type: %s", c.Type) + } + j++ } + r.Values = append(r.Values, values...) return nil - }) - if err != nil { + }); err != nil { return err } - result.Series = append(result.Series, r) + result.Series = append(result.Series, &r) return nil - }) - if err != nil { - return fmt.Errorf("error iterating through results: %s", err) + }); err != nil { + resp.error(err) + results.Cancel() + break } resp.Results = append(resp.Results, result) } + if err := results.Err(); err != nil { + resp.error(err) + } + return json.NewEncoder(w).Encode(resp) } func NewMultiResultEncoder() *MultiResultEncoder { diff --git a/influxql/result_test.go b/influxql/result_test.go new file mode 100644 index 0000000000..569a54a7c5 --- /dev/null +++ b/influxql/result_test.go @@ -0,0 +1,83 @@ +package influxql_test + +import ( + "bytes" + "errors" + "strings" + "testing" + + "time" + + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/influxql" +) + +func TestMultiResultEncoder_Encode(t *testing.T) { + for _, tt := range []struct { + name string + in query.ResultIterator + out string + }{ + { + name: "Default", + in: query.NewMapResultIterator( + map[string]query.Result{ + "0": &executetest.Result{ + Blks: []*executetest.Block{{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)}, + }, + }}, + }, + }, + ), + out: `{"results":[{"statement_id":0,"series":[{"name":"m0","tags":{"host":"server01"},"columns":["time","value"],"values":[["2018-05-24T09:00:00Z",2]]}]}]}`, + }, + { + name: "Error", + in: &resultErrorIterator{Error: "expected"}, + out: `{"error":"expected"}`, + }, + } { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + enc := influxql.NewMultiResultEncoder() + if err := enc.Encode(&buf, tt.in); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if got, exp := strings.TrimSpace(buf.String()), tt.out; got != exp { + t.Fatalf("unexpected output:\nexp=%s\ngot=%s", exp, got) + } + }) + } +} + +type resultErrorIterator struct { + Error string +} + +func (*resultErrorIterator) Cancel() {} +func (*resultErrorIterator) More() bool { return false } +func (*resultErrorIterator) Next() (string, query.Result) { return "", nil } + +func (ri *resultErrorIterator) Err() error { + return errors.New(ri.Error) +} + +func mustParseTime(s string) execute.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return execute.Time(t.UnixNano()) +} diff --git a/influxql/row.go b/influxql/row.go deleted file mode 100644 index 0b5ed38b0a..0000000000 --- a/influxql/row.go +++ /dev/null @@ -1,18 +0,0 @@ -package influxql - -// Row represents a single row returned from the execution of a statement. -type Row struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Columns []string `json:"columns,omitempty"` - Values [][]interface{} `json:"values,omitempty"` - Partial bool `json:"partial,omitempty"` -} - -type Rows []*Row - -func NewRow() *Row { - return &Row{ - Tags: make(map[string]string), - } -}