-
Notifications
You must be signed in to change notification settings - Fork 1
/
group_by.go
127 lines (117 loc) · 2.7 KB
/
group_by.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package tuna
import (
"sort"
)
// GroupBy maintains one Agg instance per group.
type GroupBy struct {
By string
NewAgg func() Agg
SortResults bool
groups map[string]Agg
}
// Update updates the Agg of the Row's group.
func (gb *GroupBy) Update(row Row) error {
key, ok := row[gb.By]
if !ok {
return ErrUnknownField{gb.By}
}
if _, ok = gb.groups[key]; !ok {
gb.groups[key] = gb.NewAgg()
}
return gb.groups[key].Update(row)
}
// Collect streams the Collect of each group. The groups are output in the
// lexical order of their keys.
func (gb GroupBy) Collect() <-chan Row {
// We first list the existing group keys
keys := make([]string, len(gb.groups))
var i uint
for k := range gb.groups {
keys[i] = k
i++
}
// Sort the group keys so that the output is deterministic
if gb.SortResults {
sort.Strings(keys)
}
c := make(chan Row)
go func() {
for _, key := range keys {
for r := range gb.groups[key].Collect() {
// Add the group key to the results
r[gb.By] = key
c <- r
}
}
close(c)
}()
return c
}
// NewGroupBy returns a GroupBy that maintains a Agg for each
// distinct value of a given variable.
func NewGroupBy(by string, newAgg func() Agg) *GroupBy {
return &GroupBy{
By: by,
NewAgg: newAgg,
SortResults: true,
groups: make(map[string]Agg),
}
}
// SequentialGroupBy maintains one Agg instance. Once a new group key is
// encoutered the Trigger is called. This has many practical use case for large
// but sequential data.
type SequentialGroupBy struct {
By string
NewAgg func() Agg
Sink Sink
key string
agg Agg
}
// Flush writes the results of the Agg and resets it.
func (sgb *SequentialGroupBy) Flush() error {
if sgb.agg != nil {
if err := sgb.Sink.Write(sgb.Collect()); err != nil {
return err
}
sgb.agg = sgb.NewAgg()
} else {
sgb.agg = sgb.NewAgg()
}
return nil
}
// Update updates the Agg of the Row's group.
func (sgb *SequentialGroupBy) Update(row Row) error {
key, ok := row[sgb.By]
if !ok {
return ErrUnknownField{sgb.By}
}
if sgb.key != key {
if err := sgb.Flush(); err != nil {
return err
}
}
sgb.key = key
return sgb.agg.Update(row)
}
// Collect streams the Collect of the current Agg.
func (sgb SequentialGroupBy) Collect() <-chan Row {
c := make(chan Row)
go func() {
for r := range sgb.agg.Collect() {
// Add the group key to the results
r[sgb.By] = sgb.key
c <- r
}
close(c)
}()
return c
}
// NewSequentialGroupBy returns a SequentialGroupBy that maintains an Agg
// for the given variable.
func NewSequentialGroupBy(by string, newAgg func() Agg, sink Sink) *SequentialGroupBy {
return &SequentialGroupBy{
By: by,
NewAgg: newAgg,
Sink: sink,
}
}