Skip to content

Commit

Permalink
fix(stdlib/universe): an error when joining could result in two calls…
Browse files Browse the repository at this point in the history
… to finish (#1225)

When an error occurred in one of the two datasets that compromised a
join, this error would cause `Finish()` to be called even when there
were more tables to process. This meant that the second call to
`Finish()` would cause a panic in some implementations because
`Finish()` is only supposed to be called a maximum of once.

The test for join never called `Finish()` as part of the test so that
has been added to ensure the correct functionality.

Along with that, an error when processing the table would be discarded
except in the case where a join was happening on a null group key. So
many possible errors were never even detected.
  • Loading branch information
jsternberg authored Apr 29, 2019
1 parent 1a26320 commit 3e19f89
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
7 changes: 7 additions & 0 deletions execute/executetest/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type Table struct {
// Data is a list of rows, i.e. Data[row][col]
// Each row must be a list with length equal to len(ColMeta)
Data [][]interface{}
// Err contains the error that should be returned
// by this table when calling Do.
Err error
}

// Normalize ensures all fields of the table are set correctly.
Expand Down Expand Up @@ -79,6 +82,10 @@ func (t *Table) Key() flux.GroupKey {
}

func (t *Table) Do(f func(flux.ColReader) error) error {
if t.Err != nil {
return t.Err
}

cols := make([]array.Interface, len(t.ColMeta))
for j, col := range t.ColMeta {
switch col.Type {
Expand Down
27 changes: 16 additions & 11 deletions stdlib/universe/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ type mergeJoinTransformation struct {
leftName, rightName string

parentState map[execute.DatasetID]*mergeJoinParentState
err error

keys []string
}
Expand Down Expand Up @@ -327,8 +328,10 @@ func (t *mergeJoinTransformation) UpdateProcessingTime(id execute.DatasetID, pt
func (t *mergeJoinTransformation) Finish(id execute.DatasetID, err error) {
t.mu.Lock()
defer t.mu.Unlock()
if err != nil {
t.d.Finish(err)

// Only report the first error that occurs.
if t.err == nil && err != nil {
t.err = err
}

t.parentState[id].finished = true
Expand All @@ -338,7 +341,7 @@ func (t *mergeJoinTransformation) Finish(id execute.DatasetID, err error) {
}

if finished {
t.d.Finish(nil)
t.d.Finish(t.err)
}
}

Expand Down Expand Up @@ -406,15 +409,19 @@ func (buf *streamBuffer) table(key flux.GroupKey) *execute.ColListTableBuilder {
return buf.data[key]
}

func (buf *streamBuffer) insert(table flux.Table) {
func (buf *streamBuffer) insert(table flux.Table) error {
// Construct a new table builder with same schema as input table
builder := execute.NewColListTableBuilder(table.Key(), buf.alloc)
// this will only error if we try to add a duplicate column to the builder.
// since this is a new table, that won't happen.
_ = execute.AddTableCols(table, builder)
if err := execute.AddTableCols(table, builder); err != nil {
return err
}

// Append the input table to this builder, safe to ignore errors
_ = execute.AppendTable(table, builder)
if err := execute.AppendTable(table, builder); err != nil {
return err
}

// Insert this table into the buffer
buf.data[table.Key()] = builder
Expand All @@ -434,6 +441,7 @@ func (buf *streamBuffer) insert(table flux.Table) {
buf.last = leftKeyValue
}
}
return nil
}

func (buf *streamBuffer) expire(key flux.GroupKey) {
Expand Down Expand Up @@ -712,16 +720,13 @@ func (c *MergeJoinCache) insertIntoBuffer(id execute.DatasetID, tbl flux.Table)
// Discard the table and return. Note: we need to iterate over the
// table at least once:
// https://github.com/influxdata/flux/issues/643
err := tbl.Do(func(flux.ColReader) error {
return tbl.Do(func(flux.ColReader) error {
return nil
})
return err
}
}
}

c.buffers[id].insert(tbl)
return nil
return c.buffers[id].insert(tbl)
}

// registerKey takes a group key from the input stream associated with id and joins
Expand Down
37 changes: 33 additions & 4 deletions stdlib/universe/join_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package universe_test

import (
"errors"
"sort"
"testing"
"time"
Expand Down Expand Up @@ -1503,6 +1504,31 @@ func TestMergeJoin_Process(t *testing.T) {
},
},
},
{
name: "two failures",
spec: &universe.MergeJoinProcedureSpec{
On: []string{"_time"},
TableNames: tableNames,
},
data0: []*executetest.Table{
{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TFloat},
},
Err: errors.New("expected error"),
},
},
data1: []*executetest.Table{
{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TFloat},
},
Err: errors.New("expected error"),
},
},
},
}
for _, tc := range testCases {
tc := tc
Expand Down Expand Up @@ -1533,18 +1559,21 @@ func TestMergeJoin_Process(t *testing.T) {
if len(tc.data1) > l {
l = len(tc.data1)
}
var err error
for i := 0; i < l; i++ {
if i < len(tc.data0) {
if err := jt.Process(parents[0], tc.data0[i]); err != nil {
t.Fatal(err)
if err = jt.Process(parents[0], tc.data0[i]); err != nil {
break
}
}
if i < len(tc.data1) {
if err := jt.Process(parents[1], tc.data1[i]); err != nil {
t.Fatal(err)
if err = jt.Process(parents[1], tc.data1[i]); err != nil {
break
}
}
}
jt.Finish(parents[0], err)
jt.Finish(parents[1], err)

got, err := executetest.TablesFromCache(c)
if err != nil {
Expand Down

0 comments on commit 3e19f89

Please sign in to comment.