Skip to content

Commit

Permalink
Add NativeBuffer to avoid copying of bytes to Golang array (#217)
Browse files Browse the repository at this point in the history
* Use NativeBuffer when writing to a byte array
This saves us from doing a copy of the bytes and removes limit of array size
* changelog
  • Loading branch information
DanEngelbrecht authored Jul 6, 2022
1 parent 9e7464c commit 6686d9d
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 53 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
##
- **FIXED** Full support for windows extended length paths (UNC paths)
- **CHANGED** Add NativeBuffer to avoid copying of bytes to Golang array and remove signed 32-bit integer length of arrays (`WriteStoredBlockToBuffer`, `WriteBlockIndexToBuffer`, `WriteVersionIndexToBuffer`, `WriteStoreIndexToBuffer`)
- **FIXED** Full support for windows extended length paths (fixes: UNC path may not contain forward slashes (#214))

## v0.3.5
- **UPDATED** Updated longtail to 0.3.4
Expand Down
11 changes: 7 additions & 4 deletions commands/cmd_clonestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func cloneVersionIndex(v longtaillib.Longtail_VersionIndex) longtaillib.Longtail
log.WithError(err).Info(fname)
return longtaillib.Longtail_VersionIndex{}
}
copy, err := longtaillib.ReadVersionIndexFromBuffer(vbuffer)
defer vbuffer.Dispose()
copy, err := longtaillib.ReadVersionIndexFromBuffer(vbuffer.ToBuffer())
if err != nil {
err := errors.Wrap(err, "longtaillib.ReadVersionIndexFromBuffer() failed")
log.WithError(err).Info(fname)
Expand Down Expand Up @@ -454,8 +455,9 @@ func cloneOneVersion(
if err != nil {
return targetVersionIndex, errors.Wrap(err, fname)
}
defer vbuffer.Dispose()

err = longtailutils.WriteToURI(targetFilePath, vbuffer, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
err = longtailutils.WriteToURI(targetFilePath, vbuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
if err != nil {
return targetVersionIndex, errors.Wrap(err, fname)
}
Expand All @@ -467,13 +469,14 @@ func cloneOneVersion(
err = errors.Wrap(err, fmt.Sprintf("failed merging store index for `%s`", versionLocalStoreIndexPath))
return targetVersionIndex, errors.Wrap(err, fname)
}
defer versionLocalStoreIndex.Dispose()
versionLocalStoreIndexBuffer, err := longtaillib.WriteStoreIndexToBuffer(versionLocalStoreIndex)
versionLocalStoreIndex.Dispose()
if err != nil {
err = errors.Wrap(err, fmt.Sprintf("failed serializing store index for `%s`", versionLocalStoreIndexPath))
return targetVersionIndex, errors.Wrap(err, fname)
}
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer, longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
defer versionLocalStoreIndexBuffer.Dispose()
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(targetEndpointResolverURI))
if err != nil {
return targetVersionIndex, errors.Wrap(err, fname)
}
Expand Down
1 change: 1 addition & 0 deletions commands/cmd_clonestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/DanEngelbrecht/golongtail/longtailstorelib"
"github.com/DanEngelbrecht/golongtail/longtailutils"

"github.com/pkg/errors"
)

Expand Down
3 changes: 2 additions & 1 deletion commands/cmd_createversionstoreindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func createVersionStoreIndex(
err = errors.Wrapf(err, "Cant serialize store index for `%s`", sourceFilePath)
return storeStats, timeStats, errors.Wrap(err, fname)
}
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
defer versionLocalStoreIndexBuffer.Dispose()
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
Expand Down
4 changes: 3 additions & 1 deletion commands/cmd_initremotestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ func TestInitRemoteStore(t *testing.T) {

storeIndexObject.Delete()
emptyStoreIndex, _ := longtaillib.CreateStoreIndexFromBlocks([]longtaillib.Longtail_BlockIndex{})
defer emptyStoreIndex.Dispose()
emptyStoreIndexBytes, _ := longtaillib.WriteStoreIndexToBuffer(emptyStoreIndex)
storeIndexObject.Write(emptyStoreIndexBytes)
defer emptyStoreIndexBytes.Dispose()
storeIndexObject.Write(emptyStoreIndexBytes.ToBuffer())

// Force rebuilding the index even though it exists
cmd, err = executeCommandLine("init-remote-store", "--storage-uri", fsBlobPathPrefix+"/storage")
Expand Down
3 changes: 2 additions & 1 deletion commands/cmd_prunestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ func pruneOne(

if versionLocalStoreIndexFilePath != "" && writeVersionLocalStoreIndex && !dryRun {
sbuffer, err := longtaillib.WriteStoreIndexToBuffer(existingStoreIndex)
defer sbuffer.Dispose()
if err != nil {
existingStoreIndex.Dispose()
result.err = errors.Wrap(err, fname)
return
}
err = longtailutils.WriteToURI(versionLocalStoreIndexFilePath, sbuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
err = longtailutils.WriteToURI(versionLocalStoreIndexFilePath, sbuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
existingStoreIndex.Dispose()
result.err = errors.Wrap(err, fname)
Expand Down
6 changes: 4 additions & 2 deletions commands/cmd_prunestore_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ func pruneOneUsingStoreIndex(
result.err = errors.Wrap(err, fname)
return
}
err = longtailutils.WriteToURI(versionLocalStoreIndexFilePath, sbuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
defer sbuffer.Dispose()
err = longtailutils.WriteToURI(versionLocalStoreIndexFilePath, sbuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
existingStoreIndex.Dispose()
result.err = errors.Wrap(err, fname)
Expand Down Expand Up @@ -329,11 +330,12 @@ func pruneStoreIndex(

writeStoreIndexStartTime := time.Now()
prunedStoreIndexBuffer, err := longtaillib.WriteStoreIndexToBuffer(prunedStoreIndex)
defer prunedStoreIndexBuffer.Dispose()
prunedStoreIndex.Dispose()
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
}
err = longtailutils.WriteToURI(storeIndexPath, prunedStoreIndexBuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
err = longtailutils.WriteToURI(storeIndexPath, prunedStoreIndexBuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
timeStats = append(timeStats, longtailutils.TimeStat{"Write store index", time.Since(writeStoreIndexStartTime)})
if err != nil {
return storeStats, timeStats, errors.Wrap(err, fname)
Expand Down
6 changes: 4 additions & 2 deletions commands/cmd_upsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ func upsync(
err = errors.Wrapf(err, "Failed serializing version index for `%s`", targetFilePath)
return storeStats, timeStats, errors.Wrapf(err, fname)
}
defer vbuffer.Dispose()

err = longtailutils.WriteToURI(targetFilePath, vbuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
err = longtailutils.WriteToURI(targetFilePath, vbuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrapf(err, fname)
}
Expand All @@ -207,11 +208,12 @@ func upsync(
}
defer versionLocalStoreIndex.Dispose()
versionLocalStoreIndexBuffer, err := longtaillib.WriteStoreIndexToBuffer(versionLocalStoreIndex)
defer versionLocalStoreIndexBuffer.Dispose()
if err != nil {
err = errors.Wrapf(err, "Failed serializing store index for `%s`", versionLocalStoreIndexPath)
return storeStats, timeStats, errors.Wrapf(err, fname)
}
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer, longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
err = longtailutils.WriteToURI(versionLocalStoreIndexPath, versionLocalStoreIndexBuffer.ToBuffer(), longtailutils.WithS3EndpointResolverURI(s3EndpointResolverURI))
if err != nil {
return storeStats, timeStats, errors.Wrapf(err, fname)
}
Expand Down
56 changes: 36 additions & 20 deletions longtaillib/longtaillib.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,11 @@ type Longtail_ChunkerAPI struct {
cChunkerAPI *C.struct_Longtail_ChunkerAPI
}

type NativeBuffer struct {
buffer unsafe.Pointer
size C.size_t
}

var pointerIndex uint32
var pointerStore [1024]interface{}
var pointerIndexer = (*[1 << 30]C.uint32_t)(C.malloc(4 * 1024))
Expand Down Expand Up @@ -405,6 +410,25 @@ func UnrefPointer(ptr unsafe.Pointer) {
pointerStore[index] = nil
}

func (b *NativeBuffer) ToBuffer() []byte {
if b.size == 0 {
return nil
}
return (*[1 << 30]byte)(unsafe.Pointer((*byte)(b.buffer)))[:b.size]
}

func (b *NativeBuffer) Dispose() {
if b.buffer != nil {
C.Longtail_Free(b.buffer)
b.buffer = nil
b.size = 0
}
}

func (b *NativeBuffer) Size() int {
return int(b.size)
}

// ReadFromStorage ...
func (storageAPI *Longtail_StorageAPI) ReadFromStorage(rootPath string, path string) ([]byte, error) {
const fname = "ReadFromStorage"
Expand Down Expand Up @@ -1267,18 +1291,16 @@ func (blockIndex *Longtail_BlockIndex) Dispose() {
}
}

func WriteStoredBlockToBuffer(storedBlock Longtail_StoredBlock) ([]byte, error) {
func WriteStoredBlockToBuffer(storedBlock Longtail_StoredBlock) (NativeBuffer, error) {
const fname = "WriteStoredBlockToBuffer"

var buffer unsafe.Pointer
var size C.size_t
errno := C.Longtail_WriteStoredBlockToBuffer(storedBlock.cStoredBlock, &buffer, &size)
if errno != 0 {
return nil, errors.Wrap(errnoToError(errno), fname)
return NativeBuffer{}, errors.Wrap(errnoToError(errno), fname)
}
defer C.Longtail_Free(buffer)
bytes := C.GoBytes(buffer, C.int(size))
return bytes, nil
return NativeBuffer{buffer, size}, nil
}

func ReadStoredBlockFromBuffer(buffer []byte) (Longtail_StoredBlock, error) {
Expand Down Expand Up @@ -1541,18 +1563,16 @@ func (fileInfos Longtail_FileInfos) GetPath(index uint32) string {
}

// WriteBlockIndexToBuffer ...
func WriteBlockIndexToBuffer(index Longtail_BlockIndex) ([]byte, error) {
func WriteBlockIndexToBuffer(index Longtail_BlockIndex) (NativeBuffer, error) {
const fname = "WriteBlockIndexToBuffer"

var buffer unsafe.Pointer
size := C.size_t(0)
errno := C.Longtail_WriteBlockIndexToBuffer(index.cBlockIndex, &buffer, &size)
if errno != 0 {
return nil, errors.Wrap(errnoToError(errno), fname)
return NativeBuffer{}, errors.Wrap(errnoToError(errno), fname)
}
defer C.Longtail_Free(buffer)
bytes := C.GoBytes(buffer, C.int(size))
return bytes, nil
return NativeBuffer{buffer, size}, nil
}

// ReadBlockIndexFromBuffer ...
Expand Down Expand Up @@ -1628,18 +1648,16 @@ func CreateVersionIndex(
}

// WriteVersionIndexToBuffer ...
func WriteVersionIndexToBuffer(index Longtail_VersionIndex) ([]byte, error) {
func WriteVersionIndexToBuffer(index Longtail_VersionIndex) (NativeBuffer, error) {
const fname = "WriteVersionIndexToBuffer"

var buffer unsafe.Pointer
size := C.size_t(0)
errno := C.Longtail_WriteVersionIndexToBuffer(index.cVersionIndex, &buffer, &size)
if errno != 0 {
return nil, errors.Wrap(errnoToError(errno), fname)
return NativeBuffer{}, errors.Wrap(errnoToError(errno), fname)
}
defer C.Longtail_Free(buffer)
bytes := C.GoBytes(buffer, C.int(size))
return bytes, nil
return NativeBuffer{buffer, size}, nil
}

// WriteVersionIndex ...
Expand Down Expand Up @@ -1838,18 +1856,16 @@ func MergeStoreIndex(local_store_index Longtail_StoreIndex, remote_store_index L
}

// WriteStoreIndexToBuffer ...
func WriteStoreIndexToBuffer(index Longtail_StoreIndex) ([]byte, error) {
func WriteStoreIndexToBuffer(index Longtail_StoreIndex) (NativeBuffer, error) {
const fname = "WriteStoreIndexToBuffer"

var buffer unsafe.Pointer
size := C.size_t(0)
errno := C.Longtail_WriteStoreIndexToBuffer(index.cStoreIndex, &buffer, &size)
if errno != 0 {
return nil, errors.Wrap(errnoToError(errno), fname)
return NativeBuffer{}, errors.Wrap(errnoToError(errno), fname)
}
defer C.Longtail_Free(buffer)
bytes := C.GoBytes(buffer, C.int(size))
return bytes, nil
return NativeBuffer{buffer, size}, nil
}

// ReadStoreIndexFromBuffer ...
Expand Down
12 changes: 8 additions & 4 deletions longtaillib/longtaillib_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ func validateStoredBlock(t *testing.T, storedBlock Longtail_StoredBlock, hashIde
blockIndex := storedBlock.GetBlockIndex()

b, _ := WriteBlockIndexToBuffer(blockIndex)
bi2, _ := ReadBlockIndexFromBuffer(b)
defer b.Dispose()
bi2, _ := ReadBlockIndexFromBuffer(b.ToBuffer())
bi2.Dispose()

if blockIndex.GetHashIdentifier() != hashIdentifier {
Expand Down Expand Up @@ -301,9 +302,10 @@ func Test_ReadWriteStoredBlockBuffer(t *testing.T) {
if err != nil {
t.Errorf("WriteStoredBlockToBuffer() %s", err)
}
defer storedBlockData.Dispose()
originalBlock.Dispose()

copyBlock, err := ReadStoredBlockFromBuffer(storedBlockData)
copyBlock, err := ReadStoredBlockFromBuffer(storedBlockData.ToBuffer())

if err != nil {
t.Errorf("InitStoredBlockFromData() %s", err)
Expand Down Expand Up @@ -965,7 +967,8 @@ func TestWriteContent(t *testing.T) {
}

b, _ := WriteVersionIndexToBuffer(versionIndex)
v2, _ := ReadVersionIndexFromBuffer(b)
defer b.Dispose()
v2, _ := ReadVersionIndexFromBuffer(b.ToBuffer())
v2.Dispose()

getExistingContentComplete := &testGetExistingContentCompletionAPI{}
Expand Down Expand Up @@ -1224,10 +1227,11 @@ func TestChangeVersion(t *testing.T) {
defer existingStoreIndex.Dispose()

b, err := WriteStoreIndexToBuffer(existingStoreIndex)
defer b.Dispose()
if err != nil {
t.Errorf("TestChangeVersion() WriteStoreIndexToBuffer() %s", err)
}
si2, err := ReadStoreIndexFromBuffer(b)
si2, err := ReadStoreIndexFromBuffer(b.ToBuffer())
if err != nil {
t.Errorf("TestChangeVersion() ReadStoreIndexFromBuffer() %s", err)
}
Expand Down
7 changes: 5 additions & 2 deletions longtailstorelib/memblobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,16 @@ func (blobObject *memBlobObject) Write(data []byte) (bool, error) {
}
}

dataCopy := make([]byte, len(data))
copy(dataCopy, data)

if !exists {
blob = &memBlob{generation: 0, path: blobObject.path, data: data}
blob = &memBlob{generation: 0, path: blobObject.path, data: dataCopy}
blobObject.client.store.blobs[blobObject.path] = blob
return true, nil
}

blob.data = data
blob.data = dataCopy
blob.generation++
return true, nil
}
Expand Down
Loading

0 comments on commit 6686d9d

Please sign in to comment.