-
Notifications
You must be signed in to change notification settings - Fork 1
/
sink.go
80 lines (70 loc) · 1.62 KB
/
sink.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package tuna
import (
"encoding/csv"
"io"
"os"
"sort"
)
// A Sink can persist the output of an Agg's Collect method.
type Sink interface {
Write(rows <-chan Row) error
}
// CSVSink persist the output of an Agg's Collect method to a CSV file.
// The columns are ordered in lexical order.
type CSVSink struct {
w *csv.Writer
cols []string
tmp []string
}
// writeRow writes a single Row.
func (cw CSVSink) writeRow(row Row) error {
for i, c := range cw.cols {
cw.tmp[i] = row[c]
}
return cw.w.Write(cw.tmp)
}
// Write to a CSV located at Path.
func (cw *CSVSink) Write(rows <-chan Row) error {
defer func() { cw.w.Flush() }()
if cw.cols == nil {
// Extract and write the column names and the first row
cw.cols = make([]string, 0)
for r := range rows {
// Extract the columns
for k := range r {
cw.cols = append(cw.cols, k)
}
// Write the columns
sort.Strings(cw.cols)
if err := cw.w.Write(cw.cols); err != nil {
return err
}
// Write the first Row
cw.tmp = make([]string, len(cw.cols))
if err := cw.writeRow(r); err != nil {
return err
}
break
}
}
// Write each Row down
for r := range rows {
if err := cw.writeRow(r); err != nil {
return err
}
}
return nil
}
// NewCSVSink returns a CSVSink which persists results to the given file.
func NewCSVSink(writer io.Writer) *CSVSink {
return &CSVSink{w: csv.NewWriter(writer)}
}
// NewCSVSinkFromPath returns a CSVSink which persists results to the given
// path.
func NewCSVSinkFromPath(path string) (*CSVSink, error) {
file, err := os.Create(path)
if err != nil {
return nil, err
}
return NewCSVSink(file), nil
}