Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(array): less leaky string array #5483

Merged
merged 2 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 74 additions & 111 deletions array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package array

import (
"strconv"
"sync/atomic"

"github.com/apache/arrow/go/v7/arrow"
"github.com/apache/arrow/go/v7/arrow/array"
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/memory"
Expand Down Expand Up @@ -103,10 +103,23 @@ type Builder interface {
NewArray() Array
}

type binaryArray interface {
NullN() int
NullBitmapBytes() []byte
IsNull(i int) bool
IsValid(i int) bool
Data() arrow.ArrayData
Len() int
ValueBytes() []byte
ValueLen(i int) int
ValueOffset(i int) int
ValueString(i int) string
Retain()
Release()
}

type String struct {
length int
data *array.Binary
value *stringValue
binaryArray
}

// NewStringFromBinaryArray creates an instance of String from
Expand All @@ -118,140 +131,90 @@ type String struct {
func NewStringFromBinaryArray(data *array.Binary) *String {
data.Retain()
return &String{
data: data,
binaryArray: data,
}
}

func (a *String) DataType() DataType {
return StringType
}
func (a *String) NullN() int {
if a.data != nil {
return a.data.NullN()
}
return 0
}
func (a *String) NullBitmapBytes() []byte {
if a.data != nil {
return a.data.NullBitmapBytes()
}
return nil
}
func (a *String) IsNull(i int) bool {
if a.data != nil {
return a.data.IsNull(i)
}
return false
}
func (a *String) IsValid(i int) bool {
if a.data != nil {
return a.data.IsValid(i)
}
return true
}
func (a *String) Data() arrow.ArrayData {
if a.data != nil {
return a.data.Data()
}
return nil
}
func (a *String) Len() int {
if a.data != nil {
return a.data.Len()
}
return a.length
}
func (a *String) Retain() {
if a.data != nil {
a.data.Retain()
return
}
a.value.Retain()
}
func (a *String) Release() {
if a.data != nil {
a.data.Release()
return
}
a.value.Release()
}

func (a *String) Slice(i, j int) Array {
if a.data != nil {
data := array.NewSliceData(a.data.Data(), int64(i), int64(j))
defer data.Release()
return &String{
data: array.NewBinaryData(data),
}
slice, ok := a.binaryArray.(interface{ Slice(i, j int) binaryArray })
if ok {
return &String{binaryArray: slice.Slice(i, j)}
}
a.value.Retain()
data := array.NewSliceData(a.binaryArray.Data(), int64(i), int64(j))
defer data.Release()
return &String{
value: a.value,
length: j - i,
binaryArray: array.NewBinaryData(data),
}
}

// ValueBytes returns a byte slice containing the value of this string
// at index i. This slice points to the contents of the data buffer and
// is only valid for the lifetime of the array.
func (a *String) ValueBytes(i int) []byte {
if a.data != nil {
return a.data.Value(i)
}
return a.value.Bytes()
}

// Value returns a string copy of the value stored at index i. The
// returned value will outlive the array and is safe to use like any
// other go string. The memory backing the string will be allocated by
// the runtime, rather than any provided allocator.
// Value returns a string view of the bytes in the array. The string
// is only valid for the lifetime of the array. Care should be taken not
// to store this string without also retaining the array.
func (a *String) Value(i int) string {
return string(a.ValueBytes(i))
return a.ValueString(i)
}
func (a *String) ValueLen(i int) int {
if a.data != nil {
return a.data.ValueLen(i)

// ValueRef returns a reference to the memory buffer and location that
// stores the value at i. The reference is only valid for as long as the
// array is, the buffer needs to be retained if further access is
// required.
func (a *String) ValueRef(i int) StringRef {
if vr, ok := a.binaryArray.(interface{ ValueRef(int) StringRef }); ok {
return vr.ValueRef(i)
}
return StringRef{
buf: a.Data().Buffers()[2],
off: a.ValueOffset(i),
len: a.ValueLen(i),
}
return a.value.Len()
}
func (a *String) IsConstant() bool {
return a.data == nil

// ValueCopy returns the value at the requested position copied into a
// new memory location. This value will remain valid after the array is
// released, but is not tracked by the memory allocator.
//
// This function is intended to be temporary while changes are being
// made to reduce the amount of unaccounted data memory.
func (a *String) ValueCopy(i int) string {
return string(a.ValueRef(i).Bytes())
Comment on lines +176 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments. It is helpful! 👍

}

type stringValue struct {
rc int64
data []byte
func (a *String) IsConstant() bool {
ic, ok := a.binaryArray.(interface{ IsConstant() bool })
return ok && ic.IsConstant()
}

mem arrowmem.Allocator
// StringRef contains a referenct to the storage for a value.
type StringRef struct {
buf *arrowmem.Buffer
off int
len int
}

func (v *stringValue) Retain() {
if v == nil {
return
}
atomic.AddInt64(&v.rc, 1)
// Buffer returns the memory buffer that contains the value.
func (r StringRef) Buffer() *arrowmem.Buffer {
return r.buf
Comment on lines +198 to +200
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't find anywhere uses Buffer() function. Is it still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be used by the follow-up PRs in this series.

}

func (v *stringValue) Release() {
if v == nil {
return
}
if atomic.AddInt64(&v.rc, -1) == 0 {
v.mem.Free(v.data)
}
// Offset returns the offset into the memory buffer at which the value
// starts.
func (r StringRef) Offset() int {
return r.off
}

func (v *stringValue) Bytes() []byte {
if v == nil {
return nil
}
return v.data
// Len returns the length of the value.
func (r StringRef) Len() int {
return r.len
}

func (v *stringValue) Len() int {
if v == nil {
return 0
}
return len(v.data)
// Bytes returns the bytes from the memory buffer that contain the
// value.
func (r StringRef) Bytes() []byte {
return r.buf.Bytes()[r.off : r.off+r.len]
}

type sliceable interface {
Expand Down
28 changes: 20 additions & 8 deletions array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,30 @@ func TestString(t *testing.T) {
for _, tc := range []struct {
name string
build func(b *array.StringBuilder)
bsz int
sz int
want []interface{}
}{
{
name: "Constant",
build: func(b *array.StringBuilder) {
for i := 0; i < 10; i++ {
b.Append("a")
b.Append("abcdefghij")
}
},
sz: 1,
bsz: 256, // 64 bytes nulls + 192 bytes data.
sz: 64, // The minimum size of a buffer is 64 bytes
want: []interface{}{
"a", "a", "a", "a", "a",
"a", "a", "a", "a", "a",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
"abcdefghij",
},
},
{
Expand All @@ -41,7 +51,8 @@ func TestString(t *testing.T) {
b.Append("b")
}
},
sz: 192,
bsz: 192,
sz: 192,
want: []interface{}{
"a", "a", "a", "a", "a",
"b", "b", "b", "b", "b",
Expand All @@ -58,7 +69,8 @@ func TestString(t *testing.T) {
b.Append(v)
}
},
sz: 192,
bsz: 192,
sz: 192,
want: []interface{}{
"a", "b", "c", "d", "e",
nil, "g", "h", "i", "j",
Expand Down Expand Up @@ -90,7 +102,7 @@ func TestString(t *testing.T) {
if want, got := len(tc.want)+2, b.Cap(); want != got {
t.Errorf("unexpected builder cap -want/+got:\n\t- %d\n\t+ %d", want, got)
}
assert.Equal(t, tc.sz, mem.CurrentAlloc(), "unexpected memory allocation.")
assert.Equal(t, tc.bsz, mem.CurrentAlloc(), "unexpected memory allocation.")

arr := b.NewStringArray()
defer arr.Release()
Expand Down Expand Up @@ -165,7 +177,7 @@ func TestStringBuilder_NewArray(t *testing.T) {
}

arr := b.NewArray()
assert.Equal(t, 1, mem.CurrentAlloc(), "unexpected memory allocation.")
assert.Equal(t, 64, mem.CurrentAlloc(), "unexpected memory allocation.")
arr.Release()
mem.AssertSize(t, 0)

Expand Down
24 changes: 12 additions & 12 deletions array/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ func StringAdd(l, r *String, mem memory.Allocator) (*String, error) {
b.Resize(n)
for i := 0; i < n; i++ {
if l.IsValid(i) && r.IsValid(i) {
lb := l.ValueBytes(i)
rb := r.ValueBytes(i)
buf := make([]byte, len(lb)+len(rb))
copy(buf, lb)
copy(buf[len(lb):], rb)
ls := l.Value(i)
rs := r.Value(i)
buf := make([]byte, len(ls)+len(rs))
copy(buf, ls)
copy(buf[len(ls):], rs)
b.AppendBytes(buf)

} else {
Expand All @@ -177,10 +177,10 @@ func StringAddLConst(l string, r *String, mem memory.Allocator) (*String, error)
b.Resize(n)
for i := 0; i < n; i++ {
if r.IsValid(i) {
rb := r.ValueBytes(i)
buf := make([]byte, len(l)+len(rb))
rs := r.Value(i)
buf := make([]byte, len(l)+len(rs))
copy(buf, l)
copy(buf[len(l):], rb)
copy(buf[len(l):], rs)
b.AppendBytes(buf)

} else {
Expand All @@ -198,10 +198,10 @@ func StringAddRConst(l *String, r string, mem memory.Allocator) (*String, error)
b.Resize(n)
for i := 0; i < n; i++ {
if l.IsValid(i) {
lb := l.ValueBytes(i)
buf := make([]byte, len(lb)+len(r))
copy(buf, lb)
copy(buf[len(lb):], r)
ls := l.Value(i)
buf := make([]byte, len(ls)+len(r))
copy(buf, ls)
copy(buf[len(ls):], r)
b.AppendBytes(buf)

} else {
Expand Down
Loading