Skip to content

Commit

Permalink
fix(query/csv): Encode result iterator errors
Browse files Browse the repository at this point in the history
This change also adds Name() to the Result interface to allow for
correct MultiResultEncoding.
  • Loading branch information
nathanielc committed May 24, 2018
1 parent 9812808 commit 8f65e74
Show file tree
Hide file tree
Showing 12 changed files with 646 additions and 354 deletions.
89 changes: 65 additions & 24 deletions csv/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type ResultDecoderConfig struct {
}

func (d *ResultDecoder) Decode(r io.Reader) (query.Result, error) {
return newResultDecoder(r, d.c, nil), nil
return newResultDecoder(r, d.c, nil)
}

// MultiResultDecoder reads multiple results from a single csv file.
Expand Down Expand Up @@ -109,14 +109,14 @@ func (r *resultIterator) More() bool {
if r.next != nil {
extraMeta = r.next.extraMeta
}
r.next = newResultDecoder(r.r, r.c, extraMeta)
r.next, r.err = newResultDecoder(r.r, r.c, extraMeta)
return true
}
return false
}

func (r *resultIterator) Next() (string, query.Result) {
return r.next.id, r.next
func (r *resultIterator) Next() query.Result {
return r.next
}

func (r *resultIterator) Cancel() {
Expand All @@ -131,17 +131,33 @@ type resultDecoder struct {
r io.Reader
c ResultDecoderConfig

cr *csv.Reader

extraMeta *tableMetadata

eof bool
}

func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) *resultDecoder {
return &resultDecoder{
func newResultDecoder(r io.Reader, c ResultDecoderConfig, extraMeta *tableMetadata) (*resultDecoder, error) {
d := &resultDecoder{
r: r,
c: c,
cr: newCSVReader(r),
extraMeta: extraMeta,
}
// We need to know the result ID before we return
if extraMeta == nil {
tm, err := readMetadata(d.cr, c, nil)
if err != nil {
if err == io.EOF {
d.eof = true
}
return nil, err
}
d.extraMeta = &tm
}
d.id = d.extraMeta.ResultID
return d, nil
}

func newCSVReader(r io.Reader) *csv.Reader {
Expand All @@ -152,6 +168,10 @@ func newCSVReader(r io.Reader) *csv.Reader {
return csvr
}

func (r *resultDecoder) Name() string {
return r.id
}

func (r *resultDecoder) Blocks() query.BlockIterator {
return r
}
Expand All @@ -161,19 +181,16 @@ func (r *resultDecoder) Abort(error) {
}

func (r *resultDecoder) Do(f func(query.Block) error) error {
cr := newCSVReader(r.r)

var extraLine []string
var meta tableMetadata

newMeta := true
for !r.eof {
if newMeta {
if r.extraMeta != nil {
meta = *r.extraMeta
r.extraMeta = nil
} else {
tm, err := readMetadata(cr, r.c, extraLine)
tm, err := readMetadata(r.cr, r.c, extraLine)
if err != nil {
if err == io.EOF {
r.eof = true
Expand All @@ -184,9 +201,6 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
meta = tm
extraLine = nil
}
if r.id == "" {
r.id = meta.ResultID
}

if meta.ResultID != r.id {
r.extraMeta = &meta
Expand All @@ -195,7 +209,7 @@ func (r *resultDecoder) Do(f func(query.Block) error) error {
}

// create new block
b, err := newBlock(cr, r.c, meta, extraLine)
b, err := newBlock(r.cr, r.c, meta, extraLine)
if err != nil {
return err
}
Expand Down Expand Up @@ -576,7 +590,8 @@ func newUnlimitedAllocator() *execute.Allocator {
}

type ResultEncoder struct {
c ResultEncoderConfig
c ResultEncoderConfig
written bool
}

// ResultEncoderConfig are options that can be specified on the ResultEncoder.
Expand Down Expand Up @@ -605,6 +620,15 @@ func NewResultEncoder(c ResultEncoderConfig) *ResultEncoder {
}
}

func (e *ResultEncoder) csvWriter(w io.Writer) *csv.Writer {
writer := csv.NewWriter(w)
if e.c.Delimiter != 0 {
writer.Comma = e.c.Delimiter
}
writer.UseCRLF = true
return writer
}

func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
tableID := 0
tableIDStr := "0"
Expand All @@ -613,16 +637,13 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
{ColMeta: query.ColMeta{Label: resultLabel, Type: query.TString}},
{ColMeta: query.ColMeta{Label: tableLabel, Type: query.TInt}},
}
writer := csv.NewWriter(w)
if e.c.Delimiter != 0 {
writer.Comma = e.c.Delimiter
}
writer.UseCRLF = true
writer := e.csvWriter(w)

var lastCols []colMeta
var lastEmpty bool

return result.Blocks().Do(func(b query.Block) error {
e.written = true
// Update cols with block cols
cols := metaCols
for _, c := range b.Cols() {
Expand All @@ -643,7 +664,7 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
writer.Write(nil)
}

if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), tableIDStr); err != nil {
if err := writeSchema(writer, &e.c, row, cols, b.Empty(), b.Key(), result.Name(), tableIDStr); err != nil {
return err
}
}
Expand Down Expand Up @@ -692,14 +713,27 @@ func (e *ResultEncoder) Encode(w io.Writer, result query.Result) error {
})
}

func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, tableID string) error {
func (e *ResultEncoder) EncodeError(w io.Writer, err error) error {
writer := e.csvWriter(w)
if e.written {
// Write out empty line
writer.Write(nil)
}

writer.Write([]string{"error", "reference"})
// TODO: Add referenced code
writer.Write([]string{err.Error(), ""})
writer.Flush()
return writer.Error()
}

func writeSchema(writer *csv.Writer, c *ResultEncoderConfig, row []string, cols []colMeta, useKeyDefaults bool, key query.PartitionKey, resultName, tableID string) error {
defaults := make([]string, len(row))
for j, c := range cols {
switch j {
case annotationIdx:
case resultIdx:
// TODO use real result name
defaults[j] = "_result"
defaults[j] = resultName
case tableIdx:
if useKeyDefaults {
defaults[j] = tableID
Expand Down Expand Up @@ -962,3 +996,10 @@ func equalCols(a, b []colMeta) bool {
}
return true
}

func NewMultiResultEncoder(c ResultEncoderConfig) query.MultiResultEncoder {
return &query.DelimitedMultiResultEncoder{
Delimiter: []byte("\r\n"),
Encoder: NewResultEncoder(c),
}
}
Loading

0 comments on commit 8f65e74

Please sign in to comment.