Skip to content

Commit

Permalink
Update file write API for zero-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed May 2, 2022
1 parent db43f07 commit 23e857d
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 19 deletions.
24 changes: 12 additions & 12 deletions fs/file_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ func (handle *FileHandle) closeWithoutFSHandleManagement() error {
}

// Seek moves file pointer
func (handle *FileHandle) Seek(offset int64, whence types.Whence) (int64, error) {
func (handle *FileHandle) Seek(offset int64, whence int) (int64, error) {
handle.mutex.Lock()
defer handle.mutex.Unlock()

newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, whence)
newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.Whence(whence))
if err != nil {
return newOffset, err
}
Expand Down Expand Up @@ -185,17 +185,17 @@ func (handle *FileHandle) ReadAt(buffer []byte, offset int64) (int, error) {
}

// Write writes the file
func (handle *FileHandle) Write(data []byte) error {
func (handle *FileHandle) Write(data []byte) (int, error) {
handle.mutex.Lock()
defer handle.mutex.Unlock()

if !handle.IsWriteMode() {
return fmt.Errorf("file is opened with %s mode", handle.openmode)
return 0, fmt.Errorf("file is opened with %s mode", handle.openmode)
}

err := irods_fs.WriteDataObject(handle.connection, handle.irodsfilehandle, data)
if err != nil {
return err
return 0, err
}

handle.offset += int64(len(data))
Expand All @@ -205,34 +205,34 @@ func (handle *FileHandle) Write(data []byte) error {
handle.entry.Size = handle.offset + int64(len(data))
}

return nil
return len(data), nil
}

// WriteAt writes the file to given offset
func (handle *FileHandle) WriteAt(offset int64, data []byte) error {
func (handle *FileHandle) WriteAt(data []byte, offset int64) (int, error) {
handle.mutex.Lock()
defer handle.mutex.Unlock()

if !handle.IsWriteMode() {
return fmt.Errorf("file is opened with %s mode", handle.openmode)
return 0, fmt.Errorf("file is opened with %s mode", handle.openmode)
}

if handle.offset != offset {
newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.SeekSet)
if err != nil {
return err
return 0, err
}

handle.offset = newOffset

if newOffset != offset {
return fmt.Errorf("failed to seek to %d", offset)
return 0, fmt.Errorf("failed to seek to %d", offset)
}
}

err := irods_fs.WriteDataObject(handle.connection, handle.irodsfilehandle, data)
if err != nil {
return err
return 0, err
}

handle.offset += int64(len(data))
Expand All @@ -242,7 +242,7 @@ func (handle *FileHandle) WriteAt(offset int64, data []byte) error {
handle.entry.Size = handle.offset + int64(len(data))
}

return nil
return len(data), nil
}

// preprocessRename should be called before the file is renamed
Expand Down
9 changes: 8 additions & 1 deletion irods/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error {
messageBuffer.Write(headerBytes)

if msg.Body != nil {
bodyBytes, err := msg.Body.GetBytes()
bodyBytes, err := msg.Body.GetBytesWithoutBS()
if err != nil {
return err
}
Expand All @@ -565,6 +565,13 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error {
// send
bytes := messageBuffer.Bytes()
conn.Send(bytes, len(bytes))

// send body-bs
if msg.Body != nil {
if msg.Body.Bs != nil {
conn.Send(msg.Body.Bs, len(msg.Body.Bs))
}
}
return nil
}

Expand Down
14 changes: 14 additions & 0 deletions irods/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ func (body *IRODSMessageBody) GetBytes() ([]byte, error) {
return messageBuffer.Bytes(), nil
}

// GetBytesWithoutBS returns byte array of body without BS
func (body *IRODSMessageBody) GetBytesWithoutBS() ([]byte, error) {
messageBuffer := new(bytes.Buffer)
if body.Message != nil {
messageBuffer.Write(body.Message)
}

if body.Error != nil {
messageBuffer.Write(body.Error)
}

return messageBuffer.Bytes(), nil
}

// FromBytes returns struct from bytes
func (body *IRODSMessageBody) FromBytes(header *IRODSMessageHeader, bodyBytes []byte, bsBytes []byte) error {
if len(bodyBytes) < (int(header.MessageLen) + int(header.ErrorLen)) {
Expand Down
12 changes: 6 additions & 6 deletions test/testcases/fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func testReadWrite(t *testing.T) {
handle, err := filesystem.CreateFile(newDataObjectPath, "", "w")
assert.NoError(t, err)

err = handle.Write([]byte(text))
_, err = handle.Write([]byte(text))
assert.NoError(t, err)

err = handle.Close()
Expand Down Expand Up @@ -185,7 +185,7 @@ func testCreateStat(t *testing.T) {
assert.Equal(t, fs.FileEntry, stat.Type)

// write
err = handle.Write([]byte(text))
_, err = handle.Write([]byte(text))
assert.NoError(t, err)

// close
Expand Down Expand Up @@ -239,15 +239,15 @@ func testWriteRename(t *testing.T) {
assert.NoError(t, err)

// write
err = handle.Write([]byte(text1))
_, err = handle.Write([]byte(text1))
assert.NoError(t, err)

// rename
err = filesystem.RenameFile(newDataObjectPath, newDataObjectPathRenameTarget)
assert.NoError(t, err)

// write again
err = handle.Write([]byte(text2))
_, err = handle.Write([]byte(text2))
assert.NoError(t, err)

// close
Expand Down Expand Up @@ -307,15 +307,15 @@ func testWriteRenameDir(t *testing.T) {
assert.NoError(t, err)

// write
err = handle.Write([]byte(text1))
_, err = handle.Write([]byte(text1))
assert.NoError(t, err)

// rename
err = filesystem.RenameDir(newdir, newdirRenameTarget)
assert.NoError(t, err)

// write again
err = handle.Write([]byte(text2))
_, err = handle.Write([]byte(text2))
assert.NoError(t, err)

// close
Expand Down

0 comments on commit 23e857d

Please sign in to comment.