Skip to content

Commit

Permalink
Add scanned values & bytes to Statistics.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Nov 16, 2018
1 parent b3f9a5d commit a3f0297
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 9 deletions.
27 changes: 26 additions & 1 deletion csv/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,15 @@ type resultIterator struct {
err error

canceled bool
stats flux.Statistics
}

func (r *resultIterator) More() bool {
if r.next == nil || !r.next.eof {
var extraMeta *tableMetadata
if r.next != nil {
extraMeta = r.next.extraMeta
r.stats = r.stats.Add(r.next.Statistics())
}
r.next, r.err = newResultDecoder(r.cr, r.c, extraMeta)
if r.err == nil {
Expand All @@ -135,6 +137,10 @@ func (r *resultIterator) Next() flux.Result {
return r.next
}

func (r *resultIterator) Statistics() flux.Statistics {
return r.stats
}

func (r *resultIterator) Release() {
if r.canceled {
return
Expand All @@ -159,6 +165,8 @@ type resultDecoder struct {
extraMeta *tableMetadata

eof bool

stats flux.Statistics
}

func newResultDecoder(cr *csv.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) (*resultDecoder, error) {
Expand Down Expand Up @@ -198,6 +206,10 @@ func (r *resultDecoder) Tables() flux.TableIterator {
return r
}

func (r *resultDecoder) Statistics() flux.Statistics {
return r.stats
}

func (r *resultDecoder) Abort(error) {
panic("not implemented")
}
Expand Down Expand Up @@ -246,6 +258,7 @@ func (r *resultDecoder) Do(f func(flux.Table) error) error {
if len(extraLine) > 0 {
newMeta = extraLine[annotationIdx] != ""
}
r.stats.Add(b.Statistics())
}
return nil
}
Expand Down Expand Up @@ -415,6 +428,8 @@ type tableDecoder struct {

eof bool
extraLine []string

stats flux.Statistics
}

func newTable(
Expand Down Expand Up @@ -465,15 +480,21 @@ func (d *tableDecoder) Do(f func(flux.ColReader) error) (err error) {
if err != nil {
return
}
err = f(d.builder.RawTable())
rawTable := d.builder.RawTable()
err = f(rawTable)
if err != nil {
return
}
d.stats = d.stats.Add(rawTable.Statistics())
d.builder.ClearData()
}
return
}

func (d *tableDecoder) Statistics() flux.Statistics {
return d.stats
}

// advance reads the csv data until the end of the table or bufSize rows have been read.
// Advance returns whether there is more data and any error.
func (d *tableDecoder) advance(extraLine []string) (bool, error) {
Expand Down Expand Up @@ -611,6 +632,8 @@ func (d *tableDecoder) Cols() []flux.ColMeta {
return d.builder.Cols()
}

// func (d *tableDecoder) Stats() flux.Statistics { return flux.Statistics{} }

type colMeta struct {
flux.ColMeta
fmt string
Expand Down Expand Up @@ -778,7 +801,9 @@ func (e *ResultEncoder) Encode(w io.Writer, result flux.Result) (int64, error) {
}
}

fmt.Printf("dbg/tbl\n")
err := tbl.Do(func(cr flux.ColReader) error {
fmt.Printf("dbg/tbl.col %d\n", recordStartIdx)
record := row[recordStartIdx:]
l := cr.Len()
for i := 0; i < l; i++ {
Expand Down
4 changes: 4 additions & 0 deletions csv/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,3 +996,7 @@ func (r errorResultIterator) Release() {
func (r errorResultIterator) Err() error {
return r.Error
}

func (r errorResultIterator) Statistics() flux.Statistics {
return flux.Statistics{}
}
11 changes: 8 additions & 3 deletions execute/executetest/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ import (
)

type Result struct {
Nm string
Tbls []*Table
Err error
Nm string
Tbls []*Table
Err error
Stats flux.Statistics
}

func NewResult(tables []*Table) *Result {
Expand All @@ -29,6 +30,10 @@ func (r *Result) Normalize() {
NormalizeTables(r.Tbls)
}

func (r *Result) Statistics() flux.Statistics {
return r.Stats
}

type TableIterator struct {
tables []*Table
err error
Expand Down
9 changes: 9 additions & 0 deletions execute/executetest/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executetest
import (
"context"

"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/plan"
uuid "github.com/satori/go.uuid"
Expand Down Expand Up @@ -60,6 +61,14 @@ func (src *FromProcedureSpec) Run(ctx context.Context) {
}
}

func (src *FromProcedureSpec) Statistics() flux.Statistics {
var stats flux.Statistics
for _, tbl := range src.data {
stats = stats.Add(tbl.Statistics())
}
return stats
}

func CreateFromSource(spec plan.ProcedureSpec, id execute.DatasetID, a execute.Administration) (execute.Source, error) {
return spec.(*FromProcedureSpec), nil
}
2 changes: 2 additions & 0 deletions execute/executetest/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func (t *Table) Do(f func(flux.ColReader) error) error {
return nil
}

func (t *Table) Statistics() flux.Statistics { return flux.Statistics{} }

type ColReader struct {
key flux.GroupKey
cols []flux.ColMeta
Expand Down
5 changes: 5 additions & 0 deletions execute/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type result struct {

abortErr chan error
aborted chan struct{}

stats flux.Statistics
}

type resultMessage struct {
Expand Down Expand Up @@ -51,6 +53,8 @@ func (s *result) Process(id DatasetID, tbl flux.Table) error {
return nil
}

func (s *result) Statistics() flux.Statistics { return s.stats }

func (s *result) Tables() flux.TableIterator {
return s
}
Expand All @@ -70,6 +74,7 @@ func (s *result) Do(f func(flux.Table) error) error {
if err := f(msg.table); err != nil {
return err
}
s.stats = s.stats.Add(msg.table.Statistics())
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions execute/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,9 @@ func (t *ColListTable) Empty() bool {
func (t *ColListTable) NRows() int {
return t.nrows
}
func (t *ColListTable) Statistics() flux.Statistics {
return flux.Statistics{}
}

func (t *ColListTable) Len() int {
return t.nrows
Expand Down
4 changes: 4 additions & 0 deletions influxql/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func (ri *resultIterator) Err() error {
return nil
}

func (ri *resultIterator) Statistics() flux.Statistics { return flux.Statistics{} }

type result struct {
res *Result
a *memory.Allocator
Expand Down Expand Up @@ -194,3 +196,5 @@ func (r *result) Do(f func(tbl flux.Table) error) error {
}
return nil
}

func (ri *result) Statistics() flux.Statistics { return flux.Statistics{} }
21 changes: 21 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,25 @@ type Statistics struct {
Concurrency int `json:"concurrency"`
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`

// ScannedValues is the number of values scanned.
ScannedValues int `json:"scanned_values"`
// ScannedBytes number of uncompressed bytes scanned.
ScannedBytes int `json:"scanned_bytes"`
}

// Add returns the sum of s and other.
func (s Statistics) Add(other Statistics) Statistics {
return Statistics{
TotalDuration: s.TotalDuration + other.TotalDuration,
CompileDuration: s.CompileDuration + other.CompileDuration,
QueueDuration: s.QueueDuration + other.QueueDuration,
PlanDuration: s.PlanDuration + other.PlanDuration,
RequeueDuration: s.RequeueDuration + other.RequeueDuration,
ExecuteDuration: s.ExecuteDuration + other.ExecuteDuration,
Concurrency: s.Concurrency + other.Concurrency,
MaxAllocated: s.MaxAllocated + other.MaxAllocated,
ScannedValues: s.ScannedValues + other.ScannedValues,
ScannedBytes: s.ScannedBytes + other.ScannedBytes,
}
}
5 changes: 5 additions & 0 deletions result.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type Result interface {
Name() string
// Tables returns a TableIterator for iterating through results
Tables() TableIterator
// Statistics returns statistics collected the processing of the result.
Statistics() Statistics
}

type TableIterator interface {
Expand All @@ -34,6 +36,9 @@ type Table interface {

// Empty returns whether the table contains no records.
Empty() bool

// Stats returns collected statistics about this table during processing.
Statistics() Statistics
}

// ColMeta contains the information about the column metadata.
Expand Down
30 changes: 25 additions & 5 deletions result_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type ResultIterator interface {
// Err will not report anything unless More has returned false,
// or the query has been cancelled.
Err() error

// Statistics returns any statistics computed by the resultset.
Statistics() Statistics
}

// queryResultIterator implements a ResultIterator while consuming a Query
Expand Down Expand Up @@ -71,7 +74,7 @@ func (r *queryResultIterator) Err() error {
}

func (r *queryResultIterator) Statistics() Statistics {
return r.query.Statistics()
return r.query.Statistics().Add(r.results.Statistics())
}

type mapResultIterator struct {
Expand Down Expand Up @@ -110,7 +113,16 @@ func (r *mapResultIterator) Err() error {
return nil
}

func (r *mapResultIterator) Statistics() Statistics {
var stats Statistics
for _, result := range r.results {
stats = stats.Add(result.Statistics())
}
return stats
}

type sliceResultIterator struct {
i int
results []Result
}

Expand All @@ -121,19 +133,27 @@ func NewSliceResultIterator(results []Result) ResultIterator {
}

func (r *sliceResultIterator) More() bool {
return len(r.results) > 0
return r.i < len(r.results)
}

func (r *sliceResultIterator) Next() Result {
next := r.results[0]
r.results = r.results[1:]
next := r.results[r.i]
r.i++
return next
}

func (r *sliceResultIterator) Release() {
r.results = nil
r.results, r.i = nil, 0
}

func (r *sliceResultIterator) Err() error {
return nil
}

func (r *sliceResultIterator) Statistics() Statistics {
var stats Statistics
for _, result := range r.results {
stats = stats.Add(result.Statistics())
}
return stats
}

0 comments on commit a3f0297

Please sign in to comment.