diff --git a/execute/executetest/table.go b/execute/executetest/table.go index 56795b519d..f7f99e5b25 100644 --- a/execute/executetest/table.go +++ b/execute/executetest/table.go @@ -29,6 +29,9 @@ type Table struct { // Data is a list of rows, i.e. Data[row][col] // Each row must be a list with length equal to len(ColMeta) Data [][]interface{} + // Err contains the error that should be returned + // by this table when calling Do. + Err error } // Normalize ensures all fields of the table are set correctly. @@ -79,6 +82,10 @@ func (t *Table) Key() flux.GroupKey { } func (t *Table) Do(f func(flux.ColReader) error) error { + if t.Err != nil { + return t.Err + } + cols := make([]array.Interface, len(t.ColMeta)) for j, col := range t.ColMeta { switch col.Type { diff --git a/stdlib/universe/join.go b/stdlib/universe/join.go index 819410aff3..d97a8c5596 100644 --- a/stdlib/universe/join.go +++ b/stdlib/universe/join.go @@ -242,6 +242,7 @@ type mergeJoinTransformation struct { leftName, rightName string parentState map[execute.DatasetID]*mergeJoinParentState + err error keys []string } @@ -327,8 +328,10 @@ func (t *mergeJoinTransformation) UpdateProcessingTime(id execute.DatasetID, pt func (t *mergeJoinTransformation) Finish(id execute.DatasetID, err error) { t.mu.Lock() defer t.mu.Unlock() - if err != nil { - t.d.Finish(err) + + // Only report the first error that occurs. + if t.err == nil && err != nil { + t.err = err } t.parentState[id].finished = true @@ -338,7 +341,7 @@ func (t *mergeJoinTransformation) Finish(id execute.DatasetID, err error) { } if finished { - t.d.Finish(nil) + t.d.Finish(t.err) } } @@ -406,15 +409,19 @@ func (buf *streamBuffer) table(key flux.GroupKey) *execute.ColListTableBuilder { return buf.data[key] } -func (buf *streamBuffer) insert(table flux.Table) { +func (buf *streamBuffer) insert(table flux.Table) error { // Construct a new table builder with same schema as input table builder := execute.NewColListTableBuilder(table.Key(), buf.alloc) // this will only error if we try to add a duplicate column to the builder. // since this is a new table, that won't happen. - _ = execute.AddTableCols(table, builder) + if err := execute.AddTableCols(table, builder); err != nil { + return err + } // Append the input table to this builder, safe to ignore errors - _ = execute.AppendTable(table, builder) + if err := execute.AppendTable(table, builder); err != nil { + return err + } // Insert this table into the buffer buf.data[table.Key()] = builder @@ -434,6 +441,7 @@ func (buf *streamBuffer) insert(table flux.Table) { buf.last = leftKeyValue } } + return nil } func (buf *streamBuffer) expire(key flux.GroupKey) { @@ -712,16 +720,13 @@ func (c *MergeJoinCache) insertIntoBuffer(id execute.DatasetID, tbl flux.Table) // Discard the table and return. Note: we need to iterate over the // table at least once: // https://github.com/influxdata/flux/issues/643 - err := tbl.Do(func(flux.ColReader) error { + return tbl.Do(func(flux.ColReader) error { return nil }) - return err } } } - - c.buffers[id].insert(tbl) - return nil + return c.buffers[id].insert(tbl) } // registerKey takes a group key from the input stream associated with id and joins diff --git a/stdlib/universe/join_test.go b/stdlib/universe/join_test.go index 332afd5614..03a98305ef 100644 --- a/stdlib/universe/join_test.go +++ b/stdlib/universe/join_test.go @@ -1,6 +1,7 @@ package universe_test import ( + "errors" "sort" "testing" "time" @@ -1503,6 +1504,31 @@ func TestMergeJoin_Process(t *testing.T) { }, }, }, + { + name: "two failures", + spec: &universe.MergeJoinProcedureSpec{ + On: []string{"_time"}, + TableNames: tableNames, + }, + data0: []*executetest.Table{ + { + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TFloat}, + }, + Err: errors.New("expected error"), + }, + }, + data1: []*executetest.Table{ + { + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TFloat}, + }, + Err: errors.New("expected error"), + }, + }, + }, } for _, tc := range testCases { tc := tc @@ -1533,18 +1559,21 @@ func TestMergeJoin_Process(t *testing.T) { if len(tc.data1) > l { l = len(tc.data1) } + var err error for i := 0; i < l; i++ { if i < len(tc.data0) { - if err := jt.Process(parents[0], tc.data0[i]); err != nil { - t.Fatal(err) + if err = jt.Process(parents[0], tc.data0[i]); err != nil { + break } } if i < len(tc.data1) { - if err := jt.Process(parents[1], tc.data1[i]); err != nil { - t.Fatal(err) + if err = jt.Process(parents[1], tc.data1[i]); err != nil { + break } } } + jt.Finish(parents[0], err) + jt.Finish(parents[1], err) got, err := executetest.TablesFromCache(c) if err != nil {