From 06cf1fb1602f1d7636056fc5dd47de577fd275a9 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Thu, 24 May 2018 17:14:16 -0500 Subject: [PATCH] feat(http): perform error handling in the transpiler and the query service The query service now handles some errors (not all) from the ifqld service. This way, the transpiler, which uses the query service, can report when it encounters an error from ifqld. The transpiler will now correctly return errors in the JSON response. --- csv/result.go | 3 + execute/executetest/result.go | 4 +- influxql/influxql_encoder_test.go | 72 --------------- influxql/response.go | 76 ++++------------ influxql/result.go | 146 ++++++++++++++++-------------- influxql/result_test.go | 83 +++++++++++++++++ influxql/row.go | 18 ---- 7 files changed, 186 insertions(+), 216 deletions(-) delete mode 100644 influxql/influxql_encoder_test.go create mode 100644 influxql/result_test.go delete mode 100644 influxql/row.go diff --git a/csv/result.go b/csv/result.go index 14ee23b173..18cc3b5752 100644 --- a/csv/result.go +++ b/csv/result.go @@ -80,6 +80,9 @@ type MultiResultDecoder struct { // NewMultiResultDecoder creates a new MultiResultDecoder. func NewMultiResultDecoder(c ResultDecoderConfig) *MultiResultDecoder { + if c.MaxBufferCount == 0 { + c.MaxBufferCount = defaultMaxBufferCount + } return &MultiResultDecoder{ c: c, } diff --git a/execute/executetest/result.go b/execute/executetest/result.go index 980de34e42..d8fc536fd6 100644 --- a/execute/executetest/result.go +++ b/execute/executetest/result.go @@ -1,6 +1,8 @@ package executetest -import "github.com/influxdata/platform/query" +import ( + "github.com/influxdata/platform/query" +) type Result struct { Blks []*Block diff --git a/influxql/influxql_encoder_test.go b/influxql/influxql_encoder_test.go deleted file mode 100644 index 4e4ba5809f..0000000000 --- a/influxql/influxql_encoder_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package influxql - -import ( - "testing" - "time" - - "bytes" - "strings" - - "github.com/google/go-cmp/cmp" - "github.com/influxdata/platform/query" - "github.com/influxdata/platform/query/execute" - "github.com/influxdata/platform/query/execute/executetest" -) - -var epoch = time.Unix(0, 0) - -func TestMultiResultEncoder_Encode(t *testing.T) { - testCases := []struct { - name string - query string - blocks map[string][]*executetest.Block - output string - }{ - { - query: "", - name: "one result one row", - blocks: map[string][]*executetest.Block{ - "0": {{ - KeyCols: []string{"_measurement", "_field", "host"}, - ColMeta: []query.ColMeta{ - {Label: "_time", Type: query.TTime}, - {Label: "_measurement", Type: query.TString}, - {Label: "_field", Type: query.TString}, - {Label: "host", Type: query.TString}, - {Label: execute.DefaultValueColLabel, Type: query.TFloat}, - }, - Data: [][]interface{}{ - {execute.Time(5), "cpu", "max", "localhost", 98.9}, - }, - }}, - }, - output: `{"results":[{"statement_id":0,"series":[{"name":"cpu","tags":{"host":"localhost"},"columns":["time","max"],"values":[["1970-01-01T00:00:00Z",98.9]]}]}]}`, - }, - } - - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - - resultsmap := map[string]query.Result{} - - for k, v := range tc.blocks { - resultsmap[k] = executetest.NewResult(v) - } - - results := query.NewMapResultIterator(resultsmap) - - var resp bytes.Buffer - var influxQLEncoder MultiResultEncoder - err := influxQLEncoder.Encode(&resp, results) - - if err != nil { - t.Error("error writing to buffer: ", err) - } - got := strings.TrimSpace(resp.String()) - if !cmp.Equal(got, tc.output) { - t.Error("unexpected results -want/+got", cmp.Diff(tc.output, got)) - } - }) - } -} diff --git a/influxql/response.go b/influxql/response.go index 297525442b..5c45e582f5 100644 --- a/influxql/response.go +++ b/influxql/response.go @@ -1,10 +1,5 @@ package influxql -import ( - "encoding/json" - "errors" -) - // all of this code is copied more or less verbatim from the influxdb repo. // we copy instead of sharing because we want to prevent inadvertent breaking // changes introduced by the transpiler vs the actual InfluxQL engine. @@ -12,8 +7,13 @@ import ( // results generated by the transpiler diverge from InfluxQL. type Response struct { - Results []Result `json:"results"` - Err string `json:"err,omitempty"` + Results []Result `json:"results,omitempty"` + Err string `json:"error,omitempty"` +} + +func (r *Response) error(err error) { + r.Results = nil + r.Err = err.Error() } // Message represents a user-facing message to be included with the result. @@ -27,56 +27,18 @@ type Message struct { type Result struct { // StatementID is just the statement's position in the query. It's used // to combine statement results if they're being buffered in memory. - StatementID int - Series Rows - Messages []*Message - Partial bool - Err error -} - -// MarshalJSON encodes the result into JSON. -func (r *Result) MarshalJSON() ([]byte, error) { - // Define a struct that outputs "error" as a string. - var o struct { - StatementID int `json:"statement_id"` - Series []*Row `json:"series,omitempty"` - Messages []*Message `json:"messages,omitempty"` - Partial bool `json:"partial,omitempty"` - Err string `json:"error,omitempty"` - } - - // Copy fields to output struct. - o.StatementID = r.StatementID - o.Series = r.Series - o.Messages = r.Messages - o.Partial = r.Partial - if r.Err != nil { - o.Err = r.Err.Error() - } - - return json.Marshal(&o) + StatementID int `json:"statement_id"` + Series []*Row `json:"series,omitempty"` + Messages []*Message `json:"messages,omitempty"` + Partial bool `json:"partial,omitempty"` + Err string `json:"error,omitempty"` } -// UnmarshalJSON decodes the data into the Result struct -func (r *Result) UnmarshalJSON(b []byte) error { - var o struct { - StatementID int `json:"statement_id"` - Series []*Row `json:"series,omitempty"` - Messages []*Message `json:"messages,omitempty"` - Partial bool `json:"partial,omitempty"` - Err string `json:"error,omitempty"` - } - - err := json.Unmarshal(b, &o) - if err != nil { - return err - } - r.StatementID = o.StatementID - r.Series = o.Series - r.Messages = o.Messages - r.Partial = o.Partial - if o.Err != "" { - r.Err = errors.New(o.Err) - } - return nil +// Row represents a single row returned from the execution of a statement. +type Row struct { + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Columns []string `json:"columns,omitempty"` + Values [][]interface{} `json:"values,omitempty"` + Partial bool `json:"partial,omitempty"` } diff --git a/influxql/result.go b/influxql/result.go index 1c871532df..9910907dc3 100644 --- a/influxql/result.go +++ b/influxql/result.go @@ -9,7 +9,6 @@ import ( "time" "github.com/influxdata/platform/query" - "github.com/influxdata/platform/query/execute" ) // MultiResultEncoder encodes results as InfluxQL JSON format. @@ -17,13 +16,16 @@ type MultiResultEncoder struct{} // Encode writes a collection of results to the influxdb 1.X http response format. // Expectations/Assumptions: -// 1. Each result will be published as a 'statement' in the top-level list of results. The 'staementID' -// will be interpreted as an integer, and will return an error otherwise. -// 2. If the _field name is present in the tags, and a _value column is present, the _value column will -// be renamed to the value of the _field tag -// 3. If the _measurement name is present in the tags, it will be used as the row.Name for all rows. -// Otherwise, we'll use the column value, which _must_ be present in that case. - +// 1. Each result will be published as a 'statement' in the top-level list of results. The result name +// will be interpreted as an integer and used as the statement id. +// 2. If the _measurement name is present in the partition key, it will be used as the result name instead +// of as a normal tag. +// 3. All columns in the partition key must be strings and they will be used as tags. There is no current way +// to have a tag and field be the same name in the results. +// TODO(jsternberg): For full compatibility, the above must be possible. +// 4. All other columns are fields and will be output in the order they are found. +// TODO(jsternberg): This function currently requires the first column to be a time field, but this isn't +// a strict requirement and will be lifted when we begin to work on transpiling meta queries. func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) error { resp := Response{} @@ -31,17 +33,17 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e name, r := results.Next() id, err := strconv.Atoi(name) if err != nil { - return fmt.Errorf("unable to parse statement id from result name: %s", err) + resp.error(fmt.Errorf("unable to parse statement id from result name: %s", err)) + results.Cancel() + break } blocks := r.Blocks() result := Result{StatementID: id} - err = blocks.Do(func(b query.Block) error { - r := NewRow() + if err := blocks.Do(func(b query.Block) error { + var r Row - fieldName := "" - measurementVaries := -1 for j, c := range b.Key().Cols() { if c.Type != query.TString { return fmt.Errorf("partition column %q is not a string type", c.Label) @@ -49,89 +51,97 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e v := b.Key().Value(j).(string) if c.Label == "_measurement" { r.Name = v - } else if c.Label == "_field" { - fieldName = v } else { + if r.Tags == nil { + r.Tags = make(map[string]string) + } r.Tags[c.Label] = v } } - for i, c := range b.Cols() { - if c.Label == "_time" { + for _, c := range b.Cols() { + if c.Label == "time" { r.Columns = append(r.Columns, "time") - } else if c.Label == "_value" && fieldName != "" { - r.Columns = append(r.Columns, fieldName) } else if !b.Key().HasCol(c.Label) { r.Columns = append(r.Columns, c.Label) - if r.Name == "" && c.Label == "_measurement" { - measurementVaries = i - } } } - if r.Name == "" && measurementVaries == -1 { - return fmt.Errorf("no Measurement name found in result blocks for result: %s", name) - } - timeIdx := execute.ColIdx(execute.DefaultTimeColLabel, b.Cols()) - if timeIdx < 0 { - return errors.New("table must have an _time column") - } - if typ := b.Cols()[timeIdx].Type; typ != query.TTime { - return fmt.Errorf("column _time must be of type Time got %v", typ) - } - err := b.Do(func(cr query.ColReader) error { - ts := cr.Times(timeIdx) - for i := range ts { - var v []interface{} - - for j, c := range cr.Cols() { - if cr.Key().HasCol(c.Label) { - continue - } - - if j == measurementVaries { - if c.Type != query.TString { - return errors.New("unexpected type, _measurement is not a string") - } - r.Name = cr.Strings(j)[i] - continue - } + if err := b.Do(func(cr query.ColReader) error { + var values [][]interface{} + j := 0 + for idx, c := range b.Cols() { + if cr.Key().HasCol(c.Label) { + continue + } + // Use the first column, usually time, to pre-generate all of the value containers. + if j == 0 { switch c.Type { - case query.TFloat: - v = append(v, cr.Floats(j)[i]) - case query.TInt: - v = append(v, cr.Ints(j)[i]) - case query.TString: - v = append(v, cr.Strings(j)[i]) - case query.TUInt: - v = append(v, cr.UInts(j)[i]) - case query.TBool: - v = append(v, cr.Bools(j)[i]) case query.TTime: - v = append(v, cr.Times(j)[i].Time().Format(time.RFC3339)) + values = make([][]interface{}, len(cr.Times(0))) default: - v = append(v, "unknown") + // TODO(jsternberg): Support using other columns. This will + // mostly be necessary for meta queries. + return errors.New("first column must be time") + } + + for j := range values { + values[j] = make([]interface{}, len(r.Columns)) } } - r.Values = append(r.Values, v) + // Fill in the values for each column. + switch c.Type { + case query.TFloat: + for i, v := range cr.Floats(idx) { + values[i][j] = v + } + case query.TInt: + for i, v := range cr.Ints(idx) { + values[i][j] = v + } + case query.TString: + for i, v := range cr.Strings(idx) { + values[i][j] = v + } + case query.TUInt: + for i, v := range cr.UInts(idx) { + values[i][j] = v + } + case query.TBool: + for i, v := range cr.Bools(idx) { + values[i][j] = v + } + case query.TTime: + for i, v := range cr.Times(idx) { + values[i][j] = v.Time().Format(time.RFC3339) + } + default: + return fmt.Errorf("unsupported column type: %s", c.Type) + } + j++ } + r.Values = append(r.Values, values...) return nil - }) - if err != nil { + }); err != nil { return err } - result.Series = append(result.Series, r) + result.Series = append(result.Series, &r) return nil - }) - if err != nil { - return fmt.Errorf("error iterating through results: %s", err) + }); err != nil { + resp.error(err) + results.Cancel() + break } resp.Results = append(resp.Results, result) } + if err := results.Err(); err != nil { + resp.error(err) + } + return json.NewEncoder(w).Encode(resp) } func NewMultiResultEncoder() *MultiResultEncoder { diff --git a/influxql/result_test.go b/influxql/result_test.go new file mode 100644 index 0000000000..569a54a7c5 --- /dev/null +++ b/influxql/result_test.go @@ -0,0 +1,83 @@ +package influxql_test + +import ( + "bytes" + "errors" + "strings" + "testing" + + "time" + + "github.com/influxdata/platform/query" + "github.com/influxdata/platform/query/execute" + "github.com/influxdata/platform/query/execute/executetest" + "github.com/influxdata/platform/query/influxql" +) + +func TestMultiResultEncoder_Encode(t *testing.T) { + for _, tt := range []struct { + name string + in query.ResultIterator + out string + }{ + { + name: "Default", + in: query.NewMapResultIterator( + map[string]query.Result{ + "0": &executetest.Result{ + Blks: []*executetest.Block{{ + KeyCols: []string{"_measurement", "host"}, + ColMeta: []query.ColMeta{ + {Label: "time", Type: query.TTime}, + {Label: "_measurement", Type: query.TString}, + {Label: "host", Type: query.TString}, + {Label: "value", Type: query.TFloat}, + }, + Data: [][]interface{}{ + {mustParseTime("2018-05-24T09:00:00Z"), "m0", "server01", float64(2)}, + }, + }}, + }, + }, + ), + out: `{"results":[{"statement_id":0,"series":[{"name":"m0","tags":{"host":"server01"},"columns":["time","value"],"values":[["2018-05-24T09:00:00Z",2]]}]}]}`, + }, + { + name: "Error", + in: &resultErrorIterator{Error: "expected"}, + out: `{"error":"expected"}`, + }, + } { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + enc := influxql.NewMultiResultEncoder() + if err := enc.Encode(&buf, tt.in); err != nil { + t.Fatalf("unexpected error: %s", err) + } + + if got, exp := strings.TrimSpace(buf.String()), tt.out; got != exp { + t.Fatalf("unexpected output:\nexp=%s\ngot=%s", exp, got) + } + }) + } +} + +type resultErrorIterator struct { + Error string +} + +func (*resultErrorIterator) Cancel() {} +func (*resultErrorIterator) More() bool { return false } +func (*resultErrorIterator) Next() (string, query.Result) { return "", nil } + +func (ri *resultErrorIterator) Err() error { + return errors.New(ri.Error) +} + +func mustParseTime(s string) execute.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return execute.Time(t.UnixNano()) +} diff --git a/influxql/row.go b/influxql/row.go deleted file mode 100644 index 0b5ed38b0a..0000000000 --- a/influxql/row.go +++ /dev/null @@ -1,18 +0,0 @@ -package influxql - -// Row represents a single row returned from the execution of a statement. -type Row struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Columns []string `json:"columns,omitempty"` - Values [][]interface{} `json:"values,omitempty"` - Partial bool `json:"partial,omitempty"` -} - -type Rows []*Row - -func NewRow() *Row { - return &Row{ - Tags: make(map[string]string), - } -}