diff --git a/execute/allocator.go b/execute/allocator.go index 45f60b1d90..d176875a55 100644 --- a/execute/allocator.go +++ b/execute/allocator.go @@ -1,6 +1,8 @@ package execute import ( + arrowmem "github.com/apache/arrow/go/v7/arrow/memory" + "github.com/influxdata/flux/memory" ) @@ -156,17 +158,14 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 { return s } -// Strings makes a slice of string values. -// Only the string headers are accounted for. -func (a *Allocator) Strings(l, c int) []string { +// Strings makes a slice of String values. +func (a *Allocator) Strings(l, c int) []String { a.account(c, stringSize) - return make([]string, l, c) + return make([]String, l, c) } -// AppendStrings appends strings to a slice. -// Only the string headers are accounted for. -func (a *Allocator) AppendStrings(slice []string, vs ...string) []string { - // TODO(nathanielc): Account for actual size of strings +// AppendStrings appends Strings to a slice. +func (a *Allocator) AppendStrings(slice []String, vs ...String) []String { if cap(slice)-len(slice) >= len(vs) { return append(slice, vs...) } @@ -176,14 +175,14 @@ func (a *Allocator) AppendStrings(slice []string, vs ...string) []string { return s } -func (a *Allocator) GrowStrings(slice []string, n int) []string { +func (a *Allocator) GrowStrings(slice []String, n int) []String { newCap := len(slice) + n if newCap < cap(slice) { return slice[:newCap] } // grow capacity same way as built-in append newCap = newCap*3/2 + 1 - s := make([]string, len(slice)+n, newCap) + s := make([]String, len(slice)+n, newCap) copy(s, slice) diff := cap(s) - cap(slice) a.account(diff, stringSize) @@ -220,3 +219,13 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time { a.account(diff, timeSize) return s } + +// String represents a string stored in some backing byte slice. +type String struct { + offset int + len int +} + +func (s String) Bytes(buf *arrowmem.Buffer) []byte { + return buf.Bytes()[s.offset : s.offset+s.len] +} diff --git a/execute/table.go b/execute/table.go index 82a0886981..9d0c93c1ab 100644 --- a/execute/table.go +++ b/execute/table.go @@ -1,11 +1,14 @@ package execute import ( + "bytes" "fmt" "sort" "sync/atomic" + arrowmem "github.com/apache/arrow/go/v7/arrow/memory" "github.com/google/go-cmp/cmp" + "github.com/influxdata/flux" "github.com/influxdata/flux/array" "github.com/influxdata/flux/arrow" @@ -295,8 +298,9 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) { eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data, rightBuffer.cols[j].(*floatColumnBuilder).data) case flux.TString: - eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data, - rightBuffer.cols[j].(*stringColumnBuilder).data) + eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder), + rightBuffer.cols[j].(*stringColumnBuilder), + cmp.Comparer(stringColumnBuilderEqual)) case flux.TTime: eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data, rightBuffer.cols[j].(*timeColumnBuilder).data) @@ -324,6 +328,27 @@ func colsMatch(left, right []flux.ColMeta) bool { return true } +func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool { + if x.Len() != y.Len() { + return false + } + for i := 0; i < x.Len(); i++ { + if x.IsNil(i) { + if !y.IsNil(i) { + return false + } + continue + } + if y.IsNil(i) { + return false + } + if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) { + return false + } + } + return true +} + // ColMap writes a mapping of builder index to cols index into colMap. // When colMap does not have enough capacity a new colMap is allocated. // The colMap is always returned @@ -598,6 +623,7 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) { case flux.TString: b.cols = append(b.cols, &stringColumnBuilder{ columnBuilderBase: colBase, + buf: arrowmem.NewResizableBuffer(b.alloc.Allocator), }) if b.NRows() > 0 { if err := b.GrowStrings(newIdx, b.NRows()); err != nil { @@ -919,8 +945,9 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error { if err := b.checkCol(j, flux.TString); err != nil { return err } - b.cols[j].(*stringColumnBuilder).data[i] = value - b.cols[j].SetNil(i, false) + col := b.cols[j].(*stringColumnBuilder) + col.data[i] = col.makeString(value) + col.SetNil(i, false) return nil } @@ -929,7 +956,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error { return err } col := b.cols[j].(*stringColumnBuilder) - col.data = b.alloc.AppendStrings(col.data, value) + col.data = b.alloc.AppendStrings(col.data, col.makeString(value)) b.nrows = len(col.data) return nil } @@ -1152,11 +1179,6 @@ func (b *ColListTableBuilder) Floats(j int) []float64 { CheckColType(b.colMeta[j], flux.TFloat) return b.cols[j].(*floatColumnBuilder).data } -func (b *ColListTableBuilder) Strings(j int) []string { - meta := b.colMeta[j] - CheckColType(meta, flux.TString) - return b.cols[j].(*stringColumnBuilder).data -} func (b *ColListTableBuilder) Times(j int) []values.Time { CheckColType(b.colMeta[j], flux.TTime) return b.cols[j].(*timeColumnBuilder).data @@ -1180,7 +1202,9 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object { case flux.TFloat: val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row]) case flux.TString: - val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row]) + // TODO(mhilton): avoid a copy + col := b.cols[j].(*stringColumnBuilder) + val = values.NewString(string(col.data[row].Bytes(col.buf))) case flux.TTime: val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row]) } @@ -1866,46 +1890,38 @@ func (c *stringColumn) Copy() column { type stringColumnBuilder struct { columnBuilderBase - data []string + data []String + + // buf contains a backing buffer containing the bytes of the + // strings. + buf *arrowmem.Buffer } func (c *stringColumnBuilder) Clear() { - c.data = c.data[0:0] + c.buf.Release() + c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator) + c.data = c.data[:0] } func (c *stringColumnBuilder) Release() { + c.buf.Release() c.alloc.Free(cap(c.data), stringSize) - c.data = nil } func (c *stringColumnBuilder) Copy() column { - var data *array.String - if len(c.nils) > 0 { - b := arrow.NewStringBuilder(c.alloc.Allocator) - b.Reserve(len(c.data)) - sz := 0 - for i, v := range c.data { - if c.nils[i] { - continue - } - sz += len(v) - } - b.ReserveData(sz) - for i, v := range c.data { - if c.nils[i] { - b.AppendNull() - continue - } - b.Append(v) + builder := arrow.NewStringBuilder(c.alloc.Allocator) + builder.Reserve(len(c.data)) + builder.ReserveData(c.buf.Len()) + for i, v := range c.data { + if c.nils[i] { + builder.AppendNull() + continue } - data = b.NewStringArray() - b.Release() - } else { - data = arrow.NewString(c.data, c.alloc.Allocator) + builder.AppendBytes(v.Bytes(c.buf)) } col := &stringColumn{ ColMeta: c.ColMeta, - data: data, + data: builder.NewStringArray(), } return col } @@ -1916,13 +1932,13 @@ func (c *stringColumnBuilder) Len() int { func (c *stringColumnBuilder) Equal(i, j int) bool { return c.EqualFunc(i, j, func(i, j int) bool { - return c.data[i] == c.data[j] + return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) }) } func (c *stringColumnBuilder) Less(i, j int) bool { return c.LessFunc(i, j, func(i, j int) bool { - return c.data[i] < c.data[j] + return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0 }) } @@ -1931,6 +1947,16 @@ func (c *stringColumnBuilder) Swap(i, j int) { c.data[i], c.data[j] = c.data[j], c.data[i] } +func (c *stringColumnBuilder) makeString(s string) String { + offset := c.buf.Len() + c.buf.Resize(offset + len(s)) + copy(c.buf.Bytes()[offset:], s) + return String{ + offset: offset, + len: len(s), + } +} + type timeColumn struct { flux.ColMeta data *array.Int diff --git a/execute/table_test.go b/execute/table_test.go index 8311d0b05a..9996acdc5e 100644 --- a/execute/table_test.go +++ b/execute/table_test.go @@ -148,6 +148,58 @@ func TestTablesEqual(t *testing.T) { }, want: false, }, + { + name: "string values", + data0: &executetest.Table{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), "1"}, + {execute.Time(2), "2"}, + {execute.Time(3), "3"}, + }, + }, + data1: &executetest.Table{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), "1"}, + {execute.Time(2), "2"}, + {execute.Time(3), "3"}, + }, + }, + want: true, + }, + { + name: "string mismatch", + data0: &executetest.Table{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), "1"}, + {execute.Time(2), "2"}, + {execute.Time(3), "3"}, + }, + }, + data1: &executetest.Table{ + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(1), "1"}, + {execute.Time(2), "2"}, + {execute.Time(3), "4"}, + }, + }, + want: false, + }, } for _, tc := range testCases { tc := tc