Skip to content

Commit

Permalink
Merge pull request #43 from influxdata/sgc-metaqueries
Browse files Browse the repository at this point in the history
fix(query): Utilize improvements storage RPC API
  • Loading branch information
stuartcarnie authored May 24, 2018
2 parents fb571cc + 50ab64b commit 86bc413
Show file tree
Hide file tree
Showing 13 changed files with 984 additions and 290 deletions.
2 changes: 1 addition & 1 deletion execute/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (f *Formatter) WriteTo(out io.Writer) (int64, error) {

// Write rows
r := 0
f.b.Do(func(cr query.ColReader) error {
w.err = f.b.Do(func(cr query.ColReader) error {
if r == 0 {
l := cr.Len()
for i := 0; i < l; i++ {
Expand Down
20 changes: 17 additions & 3 deletions functions/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ func (r DistinctPointLimitRewriteRule) Rewrite(pr *plan.Procedure, planner plan.
}

groupStar := !fromSpec.GroupingSet && distinct.Column != execute.DefaultValueColLabel
groupByColumn := fromSpec.GroupingSet && ((len(fromSpec.GroupKeys) > 0 && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) || (len(fromSpec.GroupExcept) > 0 && !execute.ContainsStr(fromSpec.GroupExcept, distinct.Column)))
groupByColumn := fromSpec.GroupingSet && len(fromSpec.GroupKeys) > 0 &&
((fromSpec.GroupMode == GroupModeBy && execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)) ||
(fromSpec.GroupMode == GroupModeExcept && !execute.ContainsStr(fromSpec.GroupKeys, distinct.Column)))
if groupStar || groupByColumn {
fromSpec.LimitSet = true
fromSpec.PointsLimit = -1
Expand Down Expand Up @@ -150,8 +152,20 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er

colIdx := execute.ColIdx(t.column, b.Cols())
if colIdx < 0 {
return fmt.Errorf("no column %q exists", t.column)
// doesn't exist in this block, so add an empty value
execute.AddBlockKeyCols(b.Key(), builder)
colIdx = builder.AddCol(query.ColMeta{
Label: execute.DefaultValueColLabel,
Type: query.TString,
})
builder.AppendString(colIdx, "")
execute.AppendKeyValues(b.Key(), builder)
// TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error {
return nil
})
}

col := b.Cols()[colIdx]

execute.AddBlockKeyCols(b.Key(), builder)
Expand All @@ -178,7 +192,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, b query.Block) er
}

execute.AppendKeyValues(b.Key(), builder)
// TODO: this is a hack
// TODO: hack required to ensure data flows downstream
return b.Do(func(query.ColReader) error {
return nil
})
Expand Down
6 changes: 2 additions & 4 deletions functions/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ type FromProcedureSpec struct {

GroupingSet bool
OrderByTime bool
MergeAll bool
GroupMode GroupMode
GroupKeys []string
GroupExcept []string

AggregateSet bool
AggregateMethod string
Expand Down Expand Up @@ -214,9 +213,8 @@ func createFromSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a execu
SeriesOffset: spec.SeriesOffset,
Descending: spec.Descending,
OrderByTime: spec.OrderByTime,
MergeAll: spec.MergeAll,
GroupMode: storage.GroupMode(spec.GroupMode),
GroupKeys: spec.GroupKeys,
GroupExcept: spec.GroupExcept,
AggregateMethod: spec.AggregateMethod,
},
bounds,
Expand Down
124 changes: 94 additions & 30 deletions functions/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,25 @@ import (
"github.com/influxdata/platform/query/interpreter"
"github.com/influxdata/platform/query/plan"
"github.com/influxdata/platform/query/semantic"
"math/bits"
)

const GroupKind = "group"

type GroupOpSpec struct {
By []string `json:"by"`
Except []string `json:"except"`
All bool `json:"all"`
None bool `json:"none"`
}

var groupSignature = query.DefaultFunctionSignature()

func init() {
groupSignature.Params["by"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["except"] = semantic.NewArrayType(semantic.String)
groupSignature.Params["none"] = semantic.Bool
groupSignature.Params["all"] = semantic.Bool

query.RegisterFunction(GroupKind, createGroupOpSpec, groupSignature)
query.RegisterOpSpec(GroupKind, newGroupOp)
Expand All @@ -38,6 +43,18 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
}

spec := new(GroupOpSpec)

if val, ok, err := args.GetBool("none"); err != nil {
return nil, err
} else if ok && val {
spec.None = true
}
if val, ok, err := args.GetBool("all"); err != nil {
return nil, err
} else if ok && val {
spec.All = true
}

if array, ok, err := args.GetArray("by", semantic.String); err != nil {
return nil, err
} else if ok {
Expand All @@ -55,9 +72,16 @@ func createGroupOpSpec(args query.Arguments, a *query.Administration) (query.Ope
}
}

if len(spec.By) > 0 && len(spec.Except) > 0 {
return nil, errors.New(`cannot specify both "by" and "except" keyword arguments`)
switch bits.OnesCount(uint(groupModeFromSpec(spec))) {
case 0:
// empty args
spec.All = true
case 1:
// all good
default:
return nil, errors.New(`specify one of "by", "except", "none" or "all" keyword arguments`)
}

return spec, nil
}

Expand All @@ -69,9 +93,45 @@ func (s *GroupOpSpec) Kind() query.OperationKind {
return GroupKind
}

type GroupMode int

const (
// GroupModeDefault will use the default grouping of GroupModeAll.
GroupModeDefault GroupMode = 0

// GroupModeNone merges all series into a single group.
GroupModeNone GroupMode = 1 << iota
// GroupModeAll produces a separate block for each series.
GroupModeAll
// GroupModeBy produces a block for each unique value of the specified GroupKeys.
GroupModeBy
// GroupModeExcept produces a block for the unique values of all keys, except those specified by GroupKeys.
GroupModeExcept
)

func groupModeFromSpec(spec *GroupOpSpec) GroupMode {
var mode GroupMode
if spec.All {
mode |= GroupModeAll
}
if spec.None {
mode |= GroupModeNone
}
if len(spec.By) > 0 {
mode |= GroupModeBy
}
if len(spec.Except) > 0 {
mode |= GroupModeExcept
}
if mode == GroupModeDefault {
mode = GroupModeAll
}
return mode
}

type GroupProcedureSpec struct {
By []string
Except []string
GroupMode GroupMode
GroupKeys []string
}

func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
Expand All @@ -80,9 +140,22 @@ func newGroupProcedure(qs query.OperationSpec, pa plan.Administration) (plan.Pro
return nil, fmt.Errorf("invalid spec type %T", qs)
}

mode := groupModeFromSpec(spec)
var keys []string
switch mode {
case GroupModeAll:
case GroupModeNone:
case GroupModeBy:
keys = spec.By
case GroupModeExcept:
keys = spec.Except
default:
return nil, fmt.Errorf("invalid GroupOpSpec; multiple modes detected")
}

p := &GroupProcedureSpec{
By: spec.By,
Except: spec.Except,
GroupMode: mode,
GroupKeys: keys,
}
return p, nil
}
Expand All @@ -93,11 +166,10 @@ func (s *GroupProcedureSpec) Kind() plan.ProcedureKind {
func (s *GroupProcedureSpec) Copy() plan.ProcedureSpec {
ns := new(GroupProcedureSpec)

ns.By = make([]string, len(s.By))
copy(ns.By, s.By)
ns.GroupMode = s.GroupMode

ns.Except = make([]string, len(s.Except))
copy(ns.Except, s.Except)
ns.GroupKeys = make([]string, len(s.GroupKeys))
copy(ns.GroupKeys, s.GroupKeys)

return ns
}
Expand All @@ -120,19 +192,16 @@ func (s *GroupProcedureSpec) PushDown(root *plan.Procedure, dup func() *plan.Pro
selectSpec = root.Spec.(*FromProcedureSpec)
selectSpec.OrderByTime = false
selectSpec.GroupingSet = false
selectSpec.MergeAll = false
selectSpec.GroupMode = GroupModeDefault
selectSpec.GroupKeys = nil
selectSpec.GroupExcept = nil
return
}
selectSpec.GroupingSet = true
// TODO implement OrderByTime
//selectSpec.OrderByTime = true

// Merge all series into a single group if we have no specific grouping dimensions.
selectSpec.MergeAll = len(s.By) == 0 && len(s.Except) == 0
selectSpec.GroupKeys = s.By
selectSpec.GroupExcept = s.Except
selectSpec.GroupMode = s.GroupMode
selectSpec.GroupKeys = s.GroupKeys
}

type AggregateGroupRewriteRule struct {
Expand Down Expand Up @@ -196,23 +265,18 @@ type groupTransformation struct {
d execute.Dataset
cache execute.BlockBuilderCache

keys []string
except []string

// Ignoring is true of len(keys) == 0 && len(except) > 0
ignoring bool
mode GroupMode
keys []string
}

func NewGroupTransformation(d execute.Dataset, cache execute.BlockBuilderCache, spec *GroupProcedureSpec) *groupTransformation {
t := &groupTransformation{
d: d,
cache: cache,
keys: spec.By,
except: spec.Except,
ignoring: len(spec.By) == 0 && len(spec.Except) > 0,
d: d,
cache: cache,
mode: spec.GroupMode,
keys: spec.GroupKeys,
}
sort.Strings(t.keys)
sort.Strings(t.except)
return t
}

Expand All @@ -233,14 +297,14 @@ func (t *groupTransformation) RetractBlock(id execute.DatasetID, key query.Parti
func (t *groupTransformation) Process(id execute.DatasetID, b query.Block) error {
cols := b.Cols()
on := make(map[string]bool, len(cols))
if len(t.keys) > 0 {
if t.mode == GroupModeBy && len(t.keys) > 0 {
for _, k := range t.keys {
on[k] = true
}
} else if len(t.except) > 0 {
} else if t.mode == GroupModeExcept && len(t.keys) > 0 {
COLS:
for _, c := range cols {
for _, label := range t.except {
for _, label := range t.keys {
if c.Label == label {
continue COLS
}
Expand Down
22 changes: 14 additions & 8 deletions functions/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan in",
spec: &functions.GroupProcedureSpec{
By: []string{"t1"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
},
data: []query.Block{
&executetest.Block{
Expand Down Expand Up @@ -117,7 +118,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan in ignoring",
spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"},
GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
},
data: []query.Block{
&executetest.Block{
Expand Down Expand Up @@ -207,7 +209,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan out",
spec: &functions.GroupProcedureSpec{
By: []string{"t1"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1"},
},
data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{
Expand Down Expand Up @@ -248,7 +251,8 @@ func TestGroup_Process(t *testing.T) {
{
name: "fan out ignoring",
spec: &functions.GroupProcedureSpec{
Except: []string{"_time", "_value", "t2"},
GroupMode: functions.GroupModeExcept,
GroupKeys: []string{"_time", "_value", "t2"},
},
data: []query.Block{&executetest.Block{
ColMeta: []query.ColMeta{
Expand Down Expand Up @@ -310,15 +314,16 @@ func TestGroup_Process(t *testing.T) {

func TestGroup_PushDown(t *testing.T) {
spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
}
root := &plan.Procedure{
Spec: new(functions.FromProcedureSpec),
}
want := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
GroupingSet: true,
MergeAll: false,
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
},
}
Expand All @@ -327,12 +332,13 @@ func TestGroup_PushDown(t *testing.T) {
}
func TestGroup_PushDown_Duplicate(t *testing.T) {
spec := &functions.GroupProcedureSpec{
By: []string{"t1", "t2"},
GroupMode: functions.GroupModeBy,
GroupKeys: []string{"t1", "t2"},
}
root := &plan.Procedure{
Spec: &functions.FromProcedureSpec{
GroupingSet: true,
MergeAll: true,
GroupMode: functions.GroupModeAll,
},
}
want := &plan.Procedure{
Expand Down
Loading

0 comments on commit 86bc413

Please sign in to comment.