-
Notifications
You must be signed in to change notification settings - Fork 7
/
reduce.go
61 lines (51 loc) · 1.16 KB
/
reduce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package ron
type EmptyReducer struct {
}
func (r EmptyReducer) Reduce(inputs Batch) (frame Frame) {
ai := &inputs[0]
loc := ai.Ref()
if !ai.IsHeader() {
loc = ai.Event()
}
cur := MakeFrame(128)
spec := ai.Spec()
spec.SetEvent(inputs[len(inputs)-1].Event())
spec.SetRef(loc)
cur.AppendStateHeader(spec)
return cur.Close()
}
type OmniReducer struct {
empty EmptyReducer
Types map[UUID]Reducer
}
var REDUCER = OmniReducer{}
func NewOmniReducer() (ret OmniReducer) {
ret.Types = make(map[UUID]Reducer)
for rdt, fct := range RDTYPES {
ret.Types[rdt] = fct()
}
return
}
func (omni OmniReducer) Features() int {
return 0 // should it be a reducer?
}
func (omni EmptyReducer) Features() int {
return 0
}
func (omni OmniReducer) AddType(id UUID, r Reducer) {
omni.Types[id] = r
}
func (omni OmniReducer) pickReducer(t UUID) Reducer {
r := omni.Types[t]
if r == nil {
r = omni.empty
}
return r
}
// Reduce picks a reducer function, performs all the sanity checks,
// creates the header, invokes the reducer, returns the result
func (omni OmniReducer) Reduce(ins Batch) Frame {
r := omni.pickReducer(ins[0].Type())
// TODO sanity checks?
return r.Reduce(ins)
}