From 23e857dd2d15b0cc49e35889c6d77c2ce552f84b Mon Sep 17 00:00:00 2001 From: Illyoung Choi Date: Mon, 2 May 2022 12:52:48 -0700 Subject: [PATCH] Update file write API for zero-copy --- fs/file_handle.go | 24 ++++++++++++------------ irods/connection/connection.go | 9 ++++++++- irods/message/message.go | 14 ++++++++++++++ test/testcases/fs_test.go | 12 ++++++------ 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/fs/file_handle.go b/fs/file_handle.go index c2a62c9..82ac58d 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -109,11 +109,11 @@ func (handle *FileHandle) closeWithoutFSHandleManagement() error { } // Seek moves file pointer -func (handle *FileHandle) Seek(offset int64, whence types.Whence) (int64, error) { +func (handle *FileHandle) Seek(offset int64, whence int) (int64, error) { handle.mutex.Lock() defer handle.mutex.Unlock() - newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, whence) + newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.Whence(whence)) if err != nil { return newOffset, err } @@ -185,17 +185,17 @@ func (handle *FileHandle) ReadAt(buffer []byte, offset int64) (int, error) { } // Write writes the file -func (handle *FileHandle) Write(data []byte) error { +func (handle *FileHandle) Write(data []byte) (int, error) { handle.mutex.Lock() defer handle.mutex.Unlock() if !handle.IsWriteMode() { - return fmt.Errorf("file is opened with %s mode", handle.openmode) + return 0, fmt.Errorf("file is opened with %s mode", handle.openmode) } err := irods_fs.WriteDataObject(handle.connection, handle.irodsfilehandle, data) if err != nil { - return err + return 0, err } handle.offset += int64(len(data)) @@ -205,34 +205,34 @@ func (handle *FileHandle) Write(data []byte) error { handle.entry.Size = handle.offset + int64(len(data)) } - return nil + return len(data), nil } // WriteAt writes the file to given offset -func (handle *FileHandle) WriteAt(offset int64, data []byte) error { +func (handle *FileHandle) WriteAt(data []byte, offset int64) (int, error) { handle.mutex.Lock() defer handle.mutex.Unlock() if !handle.IsWriteMode() { - return fmt.Errorf("file is opened with %s mode", handle.openmode) + return 0, fmt.Errorf("file is opened with %s mode", handle.openmode) } if handle.offset != offset { newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.SeekSet) if err != nil { - return err + return 0, err } handle.offset = newOffset if newOffset != offset { - return fmt.Errorf("failed to seek to %d", offset) + return 0, fmt.Errorf("failed to seek to %d", offset) } } err := irods_fs.WriteDataObject(handle.connection, handle.irodsfilehandle, data) if err != nil { - return err + return 0, err } handle.offset += int64(len(data)) @@ -242,7 +242,7 @@ func (handle *FileHandle) WriteAt(offset int64, data []byte) error { handle.entry.Size = handle.offset + int64(len(data)) } - return nil + return len(data), nil } // preprocessRename should be called before the file is renamed diff --git a/irods/connection/connection.go b/irods/connection/connection.go index 0da812d..fa45b20 100644 --- a/irods/connection/connection.go +++ b/irods/connection/connection.go @@ -553,7 +553,7 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error { messageBuffer.Write(headerBytes) if msg.Body != nil { - bodyBytes, err := msg.Body.GetBytes() + bodyBytes, err := msg.Body.GetBytesWithoutBS() if err != nil { return err } @@ -565,6 +565,13 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error { // send bytes := messageBuffer.Bytes() conn.Send(bytes, len(bytes)) + + // send body-bs + if msg.Body != nil { + if msg.Body.Bs != nil { + conn.Send(msg.Body.Bs, len(msg.Body.Bs)) + } + } return nil } diff --git a/irods/message/message.go b/irods/message/message.go index cb5a8ba..afeed13 100644 --- a/irods/message/message.go +++ b/irods/message/message.go @@ -81,6 +81,20 @@ func (body *IRODSMessageBody) GetBytes() ([]byte, error) { return messageBuffer.Bytes(), nil } +// GetBytesWithoutBS returns byte array of body without BS +func (body *IRODSMessageBody) GetBytesWithoutBS() ([]byte, error) { + messageBuffer := new(bytes.Buffer) + if body.Message != nil { + messageBuffer.Write(body.Message) + } + + if body.Error != nil { + messageBuffer.Write(body.Error) + } + + return messageBuffer.Bytes(), nil +} + // FromBytes returns struct from bytes func (body *IRODSMessageBody) FromBytes(header *IRODSMessageHeader, bodyBytes []byte, bsBytes []byte) error { if len(bodyBytes) < (int(header.MessageLen) + int(header.ErrorLen)) { diff --git a/test/testcases/fs_test.go b/test/testcases/fs_test.go index 5266634..a31608d 100644 --- a/test/testcases/fs_test.go +++ b/test/testcases/fs_test.go @@ -128,7 +128,7 @@ func testReadWrite(t *testing.T) { handle, err := filesystem.CreateFile(newDataObjectPath, "", "w") assert.NoError(t, err) - err = handle.Write([]byte(text)) + _, err = handle.Write([]byte(text)) assert.NoError(t, err) err = handle.Close() @@ -185,7 +185,7 @@ func testCreateStat(t *testing.T) { assert.Equal(t, fs.FileEntry, stat.Type) // write - err = handle.Write([]byte(text)) + _, err = handle.Write([]byte(text)) assert.NoError(t, err) // close @@ -239,7 +239,7 @@ func testWriteRename(t *testing.T) { assert.NoError(t, err) // write - err = handle.Write([]byte(text1)) + _, err = handle.Write([]byte(text1)) assert.NoError(t, err) // rename @@ -247,7 +247,7 @@ func testWriteRename(t *testing.T) { assert.NoError(t, err) // write again - err = handle.Write([]byte(text2)) + _, err = handle.Write([]byte(text2)) assert.NoError(t, err) // close @@ -307,7 +307,7 @@ func testWriteRenameDir(t *testing.T) { assert.NoError(t, err) // write - err = handle.Write([]byte(text1)) + _, err = handle.Write([]byte(text1)) assert.NoError(t, err) // rename @@ -315,7 +315,7 @@ func testWriteRenameDir(t *testing.T) { assert.NoError(t, err) // write again - err = handle.Write([]byte(text2)) + _, err = handle.Write([]byte(text2)) assert.NoError(t, err) // close