Skip to content

Commit

Permalink
feat(execute): allocate memory for string content. (#5482)
Browse files Browse the repository at this point in the history
Update the string column builder to account for the memory for the
string contents.
  • Loading branch information
mhilton authored May 21, 2024
1 parent 918c26c commit bea9586
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 49 deletions.
29 changes: 19 additions & 10 deletions execute/allocator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package execute

import (
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/memory"
)

Expand Down Expand Up @@ -156,17 +158,14 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 {
return s
}

// Strings makes a slice of string values.
// Only the string headers are accounted for.
func (a *Allocator) Strings(l, c int) []string {
// Strings makes a slice of String values.
func (a *Allocator) Strings(l, c int) []String {
a.account(c, stringSize)
return make([]string, l, c)
return make([]String, l, c)
}

// AppendStrings appends strings to a slice.
// Only the string headers are accounted for.
func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
// TODO(nathanielc): Account for actual size of strings
// AppendStrings appends Strings to a slice.
func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
if cap(slice)-len(slice) >= len(vs) {
return append(slice, vs...)
}
Expand All @@ -176,14 +175,14 @@ func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
return s
}

func (a *Allocator) GrowStrings(slice []string, n int) []string {
func (a *Allocator) GrowStrings(slice []String, n int) []String {
newCap := len(slice) + n
if newCap < cap(slice) {
return slice[:newCap]
}
// grow capacity same way as built-in append
newCap = newCap*3/2 + 1
s := make([]string, len(slice)+n, newCap)
s := make([]String, len(slice)+n, newCap)
copy(s, slice)
diff := cap(s) - cap(slice)
a.account(diff, stringSize)
Expand Down Expand Up @@ -220,3 +219,13 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time {
a.account(diff, timeSize)
return s
}

// String represents a string stored in some backing byte slice.
type String struct {
offset int
len int
}

func (s String) Bytes(buf *arrowmem.Buffer) []byte {
return buf.Bytes()[s.offset : s.offset+s.len]
}
104 changes: 65 additions & 39 deletions execute/table.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package execute

import (
"bytes"
"fmt"
"sort"
"sync/atomic"

arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
"github.com/google/go-cmp/cmp"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
Expand Down Expand Up @@ -295,8 +298,9 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) {
eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data,
rightBuffer.cols[j].(*floatColumnBuilder).data)
case flux.TString:
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data,
rightBuffer.cols[j].(*stringColumnBuilder).data)
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder),
rightBuffer.cols[j].(*stringColumnBuilder),
cmp.Comparer(stringColumnBuilderEqual))
case flux.TTime:
eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data,
rightBuffer.cols[j].(*timeColumnBuilder).data)
Expand Down Expand Up @@ -324,6 +328,27 @@ func colsMatch(left, right []flux.ColMeta) bool {
return true
}

func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool {
if x.Len() != y.Len() {
return false
}
for i := 0; i < x.Len(); i++ {
if x.IsNil(i) {
if !y.IsNil(i) {
return false
}
continue
}
if y.IsNil(i) {
return false
}
if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) {
return false
}
}
return true
}

// ColMap writes a mapping of builder index to cols index into colMap.
// When colMap does not have enough capacity a new colMap is allocated.
// The colMap is always returned
Expand Down Expand Up @@ -598,6 +623,7 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
case flux.TString:
b.cols = append(b.cols, &stringColumnBuilder{
columnBuilderBase: colBase,
buf: arrowmem.NewResizableBuffer(b.alloc.Allocator),
})
if b.NRows() > 0 {
if err := b.GrowStrings(newIdx, b.NRows()); err != nil {
Expand Down Expand Up @@ -919,8 +945,9 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error {
if err := b.checkCol(j, flux.TString); err != nil {
return err
}
b.cols[j].(*stringColumnBuilder).data[i] = value
b.cols[j].SetNil(i, false)
col := b.cols[j].(*stringColumnBuilder)
col.data[i] = col.makeString(value)
col.SetNil(i, false)
return nil
}

Expand All @@ -929,7 +956,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error {
return err
}
col := b.cols[j].(*stringColumnBuilder)
col.data = b.alloc.AppendStrings(col.data, value)
col.data = b.alloc.AppendStrings(col.data, col.makeString(value))
b.nrows = len(col.data)
return nil
}
Expand Down Expand Up @@ -1152,11 +1179,6 @@ func (b *ColListTableBuilder) Floats(j int) []float64 {
CheckColType(b.colMeta[j], flux.TFloat)
return b.cols[j].(*floatColumnBuilder).data
}
func (b *ColListTableBuilder) Strings(j int) []string {
meta := b.colMeta[j]
CheckColType(meta, flux.TString)
return b.cols[j].(*stringColumnBuilder).data
}
func (b *ColListTableBuilder) Times(j int) []values.Time {
CheckColType(b.colMeta[j], flux.TTime)
return b.cols[j].(*timeColumnBuilder).data
Expand All @@ -1180,7 +1202,9 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object {
case flux.TFloat:
val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row])
case flux.TString:
val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row])
// TODO(mhilton): avoid a copy
col := b.cols[j].(*stringColumnBuilder)
val = values.NewString(string(col.data[row].Bytes(col.buf)))
case flux.TTime:
val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row])
}
Expand Down Expand Up @@ -1866,46 +1890,38 @@ func (c *stringColumn) Copy() column {

type stringColumnBuilder struct {
columnBuilderBase
data []string
data []String

// buf contains a backing buffer containing the bytes of the
// strings.
buf *arrowmem.Buffer
}

func (c *stringColumnBuilder) Clear() {
c.data = c.data[0:0]
c.buf.Release()
c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator)
c.data = c.data[:0]
}

func (c *stringColumnBuilder) Release() {
c.buf.Release()
c.alloc.Free(cap(c.data), stringSize)
c.data = nil
}

func (c *stringColumnBuilder) Copy() column {
var data *array.String
if len(c.nils) > 0 {
b := arrow.NewStringBuilder(c.alloc.Allocator)
b.Reserve(len(c.data))
sz := 0
for i, v := range c.data {
if c.nils[i] {
continue
}
sz += len(v)
}
b.ReserveData(sz)
for i, v := range c.data {
if c.nils[i] {
b.AppendNull()
continue
}
b.Append(v)
builder := arrow.NewStringBuilder(c.alloc.Allocator)
builder.Reserve(len(c.data))
builder.ReserveData(c.buf.Len())
for i, v := range c.data {
if c.nils[i] {
builder.AppendNull()
continue
}
data = b.NewStringArray()
b.Release()
} else {
data = arrow.NewString(c.data, c.alloc.Allocator)
builder.AppendBytes(v.Bytes(c.buf))
}
col := &stringColumn{
ColMeta: c.ColMeta,
data: data,
data: builder.NewStringArray(),
}
return col
}
Expand All @@ -1916,13 +1932,13 @@ func (c *stringColumnBuilder) Len() int {

func (c *stringColumnBuilder) Equal(i, j int) bool {
return c.EqualFunc(i, j, func(i, j int) bool {
return c.data[i] == c.data[j]
return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf))
})
}

func (c *stringColumnBuilder) Less(i, j int) bool {
return c.LessFunc(i, j, func(i, j int) bool {
return c.data[i] < c.data[j]
return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0
})
}

Expand All @@ -1931,6 +1947,16 @@ func (c *stringColumnBuilder) Swap(i, j int) {
c.data[i], c.data[j] = c.data[j], c.data[i]
}

func (c *stringColumnBuilder) makeString(s string) String {
offset := c.buf.Len()
c.buf.Resize(offset + len(s))
copy(c.buf.Bytes()[offset:], s)
return String{
offset: offset,
len: len(s),
}
}

type timeColumn struct {
flux.ColMeta
data *array.Int
Expand Down
52 changes: 52 additions & 0 deletions execute/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,58 @@ func TestTablesEqual(t *testing.T) {
},
want: false,
},
{
name: "string values",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
want: true,
},
{
name: "string mismatch",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "4"},
},
},
want: false,
},
}
for _, tc := range testCases {
tc := tc
Expand Down

0 comments on commit bea9586

Please sign in to comment.