-
Notifications
You must be signed in to change notification settings - Fork 155
/
query.go
275 lines (230 loc) · 8.06 KB
/
query.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
// Package query contains the InfluxDB 2.0 query engine.
package flux
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/influxdata/platform"
)
// QueryService represents a type capable of performing queries.
type QueryService interface {
// Query submits a query for execution returning a results iterator.
// Cancel must be called on any returned results to free resources.
Query(ctx context.Context, req *Request) (ResultIterator, error)
}
// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type AsyncQueryService interface {
// Query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
Query(ctx context.Context, req *Request) (Query, error)
}
// ProxyQueryService performs queries and encodes the result into a writer.
// The results are opaque to a ProxyQueryService.
type ProxyQueryService interface {
// Query performs the requested query and encodes the results into w.
// The number of bytes written to w is returned __independent__ of any error.
Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error)
}
// ResultIterator allows iterating through all results
// Cancel must be called to free resources.
// ResultIterators may implement Statisticser.
type ResultIterator interface {
// More indicates if there are more results.
More() bool
// Next returns the next result.
// If More is false, Next panics.
Next() Result
// Cancel discards the remaining results.
// Cancel must always be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports the first error encountered.
// Err will not report anything unless More has returned false,
// or the query has been cancelled.
Err() error
}
// Query represents an active query.
type Query interface {
// Spec returns the spec used to execute this query.
// Spec must not be modified.
Spec() *Spec
// Ready returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Ready() <-chan map[string]Result
// Done must always be called to free resources.
Done()
// Cancel will stop the query execution.
// Done must still be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports any error the query may have encountered.
Err() error
Statisticser
}
// Request respresents the query to run.
type Request struct {
// Scope
Authorization *platform.Authorization `json:"authorization,omitempty"`
OrganizationID platform.ID `json:"organization_id"`
// Command
// Compiler converts the query to a specification to run against the data.
Compiler Compiler `json:"compiler"`
// compilerMappings maps compiler types to creation methods
compilerMappings CompilerMappings
}
// WithCompilerMappings sets the query type mappings on the request.
func (r *Request) WithCompilerMappings(mappings CompilerMappings) {
r.compilerMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings must have been called or an error will occur.
func (r *Request) UnmarshalJSON(data []byte) error {
type Alias Request
raw := struct {
*Alias
CompilerType CompilerType `json:"compiler_type"`
Compiler json.RawMessage `json:"compiler"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createCompiler, ok := r.compilerMappings[raw.CompilerType]
if !ok {
return fmt.Errorf("unsupported compiler type %q", raw.CompilerType)
}
c := createCompiler()
if err := json.Unmarshal(raw.Compiler, c); err != nil {
return err
}
r.Compiler = c
return nil
}
func (r Request) MarshalJSON() ([]byte, error) {
type Alias Request
raw := struct {
Alias
CompilerType CompilerType `json:"compiler_type"`
}{
Alias: (Alias)(r),
CompilerType: r.Compiler.CompilerType(),
}
return json.Marshal(raw)
}
// Compiler produces a specification for the query.
type Compiler interface {
// Compile produces a specification for the query.
Compile(ctx context.Context) (*Spec, error)
CompilerType() CompilerType
}
// CompilerType is the name of a query compiler.
type CompilerType string
type CreateCompiler func() Compiler
type CompilerMappings map[CompilerType]CreateCompiler
func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error {
if _, ok := m[t]; ok {
return fmt.Errorf("duplicate compiler mapping for %q", t)
}
m[t] = c
return nil
}
// ProxyRequest specifies a query request and the dialect for the results.
type ProxyRequest struct {
// Request is the basic query request
Request Request `json:"request"`
// Dialect is the result encoder
Dialect Dialect `json:"dialect"`
// dialectMappings maps dialect types to creation methods
dialectMappings DialectMappings
}
// WithCompilerMappings sets the compiler type mappings on the request.
func (r *ProxyRequest) WithCompilerMappings(mappings CompilerMappings) {
r.Request.WithCompilerMappings(mappings)
}
// WithDialectMappings sets the dialect type mappings on the request.
func (r *ProxyRequest) WithDialectMappings(mappings DialectMappings) {
r.dialectMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings and WithDialectMappings must have been called or an error will occur.
func (r *ProxyRequest) UnmarshalJSON(data []byte) error {
type Alias ProxyRequest
raw := struct {
*Alias
DialectType DialectType `json:"dialect_type"`
Dialect json.RawMessage `json:"dialect"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createDialect, ok := r.dialectMappings[raw.DialectType]
if !ok {
return fmt.Errorf("unsupported dialect type %q", raw.DialectType)
}
d := createDialect()
if err := json.Unmarshal(raw.Dialect, d); err != nil {
return err
}
r.Dialect = d
return nil
}
func (r ProxyRequest) MarshalJSON() ([]byte, error) {
type Alias ProxyRequest
raw := struct {
Alias
DialectType DialectType `json:"dialect_type"`
}{
Alias: (Alias)(r),
DialectType: r.Dialect.DialectType(),
}
return json.Marshal(raw)
}
// Dialect describes how to encode results.
type Dialect interface {
// Encoder creates an encoder for the results
Encoder() MultiResultEncoder
// DialectType report the type of the dialect
DialectType() DialectType
}
// DialectType is the name of a query result dialect.
type DialectType string
type CreateDialect func() Dialect
type DialectMappings map[DialectType]CreateDialect
func (m DialectMappings) Add(t DialectType, c CreateDialect) error {
if _, ok := m[t]; ok {
return fmt.Errorf("duplicate dialect mapping for %q", t)
}
m[t] = c
return nil
}
// Statisticser reports statisitcs about query processing.
type Statisticser interface {
// Statistics reports the statisitcs for the query.
// The statisitcs are not complete until the query is finished.
Statistics() Statistics
}
// Statistics is a collection of statisitcs about the processing of a query.
type Statistics struct {
// TotalDuration is the total amount of time in nanoseconds spent.
TotalDuration time.Duration `json:"total_duration"`
// CompileDuration is the amount of time in nanoseconds spent compiling the query.
CompileDuration time.Duration `json:"compile_duration"`
// QueueDuration is the amount of time in nanoseconds spent queueing.
QueueDuration time.Duration `json:"queue_duration"`
// PlanDuration is the amount of time in nanoseconds spent in plannig the query.
PlanDuration time.Duration `json:"plan_duration"`
// RequeueDuration is the amount of time in nanoseconds spent requeueing.
RequeueDuration time.Duration `json:"requeue_duration"`
// ExecuteDuration is the amount of time in nanoseconds spent in executing the query.
ExecuteDuration time.Duration `json:"execute_duration"`
// Concurrency is the number of goroutines allocated to process the query
Concurrency int `json:"concurrency"`
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`
}