Skip to content

Commit

Permalink
Change file read API prototype for zero-copy
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed May 2, 2022
1 parent d5527d9 commit db43f07
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 241 deletions.
34 changes: 17 additions & 17 deletions fs/file_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,53 +135,53 @@ func (handle *FileHandle) Truncate(size int64) error {
return nil
}

// Read reads the file
func (handle *FileHandle) Read(length int) ([]byte, error) {
// Read reads the file, implements io.Reader.Read
func (handle *FileHandle) Read(buffer []byte) (int, error) {
handle.mutex.Lock()
defer handle.mutex.Unlock()

if !handle.IsReadMode() {
return nil, fmt.Errorf("file is opened with %s mode", handle.openmode)
return 0, fmt.Errorf("file is opened with %s mode", handle.openmode)
}

bytes, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, length)
if err != nil {
return nil, err
readLen, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, buffer)
if readLen > 0 {
handle.offset += int64(readLen)
}

handle.offset += int64(len(bytes))
return bytes, nil
// it is possible to return readLen + EOF
return readLen, err
}

// ReadAt reads data from given offset
func (handle *FileHandle) ReadAt(offset int64, length int) ([]byte, error) {
func (handle *FileHandle) ReadAt(buffer []byte, offset int64) (int, error) {
handle.mutex.Lock()
defer handle.mutex.Unlock()

if !handle.IsReadMode() {
return nil, fmt.Errorf("file is opened with %s mode", handle.openmode)
return 0, fmt.Errorf("file is opened with %s mode", handle.openmode)
}

if handle.offset != offset {
newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.SeekSet)
if err != nil {
return nil, err
return 0, err
}

handle.offset = newOffset

if newOffset != offset {
return nil, fmt.Errorf("failed to seek to %d", offset)
return 0, fmt.Errorf("failed to seek to %d", offset)
}
}

bytes, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, length)
if err != nil {
return nil, err
readLen, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, buffer)
if readLen > 0 {
handle.offset += int64(readLen)
}

handle.offset += int64(len(bytes))
return bytes, nil
// it is possible to return readLen + EOF
return readLen, err
}

// Write writes the file
Expand Down
123 changes: 39 additions & 84 deletions irods/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,13 @@ func (conn *IRODSConnection) connectWithCSNegotiation() (*types.IRODSVersion, er
logger.Debug("Start up a connection with CS Negotiation")

startup := message.NewIRODSMessageStartupPack(conn.account, conn.applicationName, true)
startupMessage, err := startup.GetMessage()
err := conn.RequestWithoutResponse(startup)
if err != nil {
return nil, fmt.Errorf("could not make a startup message - %v", err)
}

err = conn.SendMessage(startupMessage)
if err != nil {
return nil, fmt.Errorf("could not send a startup message - %v", err)
return nil, fmt.Errorf("could not send a startup - %v", err)
}

// Server responds with negotiation response
negotiationMessage, err := conn.ReadMessage()
negotiationMessage, err := conn.ReadMessage(nil)
if err != nil {
return nil, fmt.Errorf("could not receive a negotiation message - %v", err)
}
Expand Down Expand Up @@ -216,7 +211,7 @@ func (conn *IRODSConnection) connectWithCSNegotiation() (*types.IRODSVersion, er
// Send negotiation result to server
negotiationResult := message.NewIRODSMessageCSNegotiation(policyResult)
version := message.IRODSMessageVersion{}
err = conn.Request(negotiationResult, &version)
err = conn.Request(negotiationResult, &version, nil)
if err != nil {
return nil, fmt.Errorf("could not receive a version message - %v", err)
}
Expand Down Expand Up @@ -247,7 +242,7 @@ func (conn *IRODSConnection) connectWithoutCSNegotiation() (*types.IRODSVersion,

startup := message.NewIRODSMessageStartupPack(conn.account, conn.applicationName, false)
version := message.IRODSMessageVersion{}
err := conn.Request(startup, &version)
err := conn.Request(startup, &version, nil)
if err != nil {
return nil, fmt.Errorf("could not receive a version message - %v", err)
}
Expand Down Expand Up @@ -300,24 +295,14 @@ func (conn *IRODSConnection) sslStartup() error {

// Send a ssl setting
sslSetting := message.NewIRODSMessageSSLSettings(irodsSSLConfig.EncryptionAlgorithm, irodsSSLConfig.EncryptionKeySize, irodsSSLConfig.SaltSize, irodsSSLConfig.HashRounds)
sslSettingMessage, err := sslSetting.GetMessage()
if err != nil {
return fmt.Errorf("could not make a ssl setting message - %v", err)
}

err = conn.SendMessage(sslSettingMessage)
err = conn.RequestWithoutResponse(sslSetting)
if err != nil {
return fmt.Errorf("could not send a ssl setting message - %v", err)
}

// Send a shared secret
sslSharedSecret := message.NewIRODSMessageSSLSharedSecret(encryptionKey)
sslSharedSecretMessage, err := sslSharedSecret.GetMessage()
if err != nil {
return fmt.Errorf("could not make a ssl shared secret message - %v", err)
}

err = conn.SendMessage(sslSharedSecretMessage)
err = conn.RequestWithoutResponse(sslSharedSecret)
if err != nil {
return fmt.Errorf("could not send a ssl shared secret message - %v", err)
}
Expand All @@ -329,7 +314,7 @@ func (conn *IRODSConnection) login(password string) error {
// authenticate
authRequest := message.NewIRODSMessageAuthRequest()
authChallenge := message.IRODSMessageAuthChallenge{}
err := conn.Request(authRequest, &authChallenge)
err := conn.Request(authRequest, &authChallenge, nil)
if err != nil {
return fmt.Errorf("could not receive an authentication challenge message body")
}
Expand All @@ -341,7 +326,7 @@ func (conn *IRODSConnection) login(password string) error {

authResponse := message.NewIRODSMessageAuthResponse(encodedPassword, conn.account.ProxyUser)
authResult := message.IRODSMessageAuthResult{}
return conn.RequestAndCheck(authResponse, &authResult)
return conn.RequestAndCheck(authResponse, &authResult, nil)
}

func (conn *IRODSConnection) loginNative(password string) error {
Expand Down Expand Up @@ -381,7 +366,7 @@ func (conn *IRODSConnection) loginPAM() error {
// authenticate
pamAuthRequest := message.NewIRODSMessagePamAuthRequest(conn.account.ClientUser, conn.account.Password, ttl)
pamAuthResponse := message.IRODSMessagePamAuthResponse{}
err := conn.Request(pamAuthRequest, &pamAuthResponse)
err := conn.Request(pamAuthRequest, &pamAuthResponse, nil)
if err != nil {
return fmt.Errorf("could not receive an authentication challenge message")
}
Expand All @@ -406,7 +391,7 @@ func (conn *IRODSConnection) showTicket() error {
// show the ticket
ticketRequest := message.NewIRODSMessageTicketAdminRequest("session", conn.account.Ticket)
ticketResult := message.IRODSMessageTicketAdminResponse{}
return conn.RequestAndCheck(ticketRequest, &ticketResult)
return conn.RequestAndCheck(ticketRequest, &ticketResult, nil)
}
return nil
}
Expand All @@ -433,14 +418,9 @@ func (conn *IRODSConnection) Disconnect() error {
logger.Debug("Disconnecting the connection")

disconnect := message.NewIRODSMessageDisconnect()
disconnectMessage, err := disconnect.GetMessage()
err := conn.RequestWithoutResponse(disconnect)
if err != nil {
return fmt.Errorf("could not make a disconnect request message - %v", err)
}

err = conn.SendMessage(disconnectMessage)
if err != nil {
return fmt.Errorf("could not send a disconnect request message - %v", err)
return fmt.Errorf("could not send a disconnect request - %v", err)
}

conn.lastSuccessfulAccess = time.Now()
Expand Down Expand Up @@ -589,7 +569,7 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error {
}

// readMessageHeader reads data from the given connection and returns iRODSMessageHeader
func (conn *IRODSConnection) ReadMessageHeader() (*message.IRODSMessageHeader, error) {
func (conn *IRODSConnection) readMessageHeader() (*message.IRODSMessageHeader, error) {
// read header size
headerLenBuffer := make([]byte, 4)
readLen, err := conn.Recv(headerLenBuffer, 4)
Expand Down Expand Up @@ -625,26 +605,41 @@ func (conn *IRODSConnection) ReadMessageHeader() (*message.IRODSMessageHeader, e
}

// ReadMessage reads data from the given socket and returns IRODSMessage
func (conn *IRODSConnection) ReadMessage() (*message.IRODSMessage, error) {
header, err := conn.ReadMessageHeader()
// if bsBuffer is given, bs data will be written directly to the bsBuffer
// if not given, a new buffer will be allocated.
func (conn *IRODSConnection) ReadMessage(bsBuffer []byte) (*message.IRODSMessage, error) {
header, err := conn.readMessageHeader()
if err != nil {
return nil, err
}

// read body
bodyLen := header.MessageLen + header.ErrorLen + header.BsLen
bodyLen := header.MessageLen + header.ErrorLen
bodyBuffer := make([]byte, bodyLen)
if bsBuffer == nil {
bsBuffer = make([]byte, int(header.BsLen))
} else if len(bsBuffer) < int(header.BsLen) {
return nil, fmt.Errorf("provided bs buffer is too short, %d size is given, but %d size is required", len(bsBuffer), int(header.BsLen))
}

readLen, err := conn.Recv(bodyBuffer, int(bodyLen))
bodyReadLen, err := conn.Recv(bodyBuffer, int(bodyLen))
if err != nil {
return nil, fmt.Errorf("could not read body - %v", err)
}
if readLen != int(bodyLen) {
return nil, fmt.Errorf("could not read body fully - %d requested but %d read", bodyLen, readLen)
if bodyReadLen != int(bodyLen) {
return nil, fmt.Errorf("could not read body fully - %d requested but %d read", bodyLen, bodyReadLen)
}

bsReadLen, err := conn.Recv(bsBuffer, int(header.BsLen))
if err != nil {
return nil, fmt.Errorf("could not read body (BS) - %v", err)
}
if bsReadLen != int(header.BsLen) {
return nil, fmt.Errorf("could not read body (BS) fully - %d requested but %d read", int(header.BsLen), bsReadLen)
}

body := message.IRODSMessageBody{}
err = body.FromBytes(header, bodyBuffer)
err = body.FromBytes(header, bodyBuffer, bsBuffer[:int(header.BsLen)])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -684,59 +679,19 @@ func (conn *IRODSConnection) PoorMansRollback() error {

func (conn *IRODSConnection) endTransaction(commit bool) error {
request := message.NewIRODSMessageEndTransactionRequest(commit)
requestMessage, err := request.GetMessage()
if err != nil {
return fmt.Errorf("could not make a end transaction request message - %v", err)
}

err = conn.SendMessage(requestMessage)
if err != nil {
return fmt.Errorf("could not send a end transaction request message - %v", err)
}

// Server responds with results
responseMessage, err := conn.ReadMessage()
if err != nil {
return fmt.Errorf("could not receive a end transaction response message - %v", err)
}

response := message.IRODSMessageEndTransactionResponse{}
err = response.FromMessage(responseMessage)
if err != nil {
return fmt.Errorf("could not receive a end transaction response message - %v", err)
}

err = response.CheckError()
return err
return conn.RequestAndCheck(request, &response, nil)
}

func (conn *IRODSConnection) poorMansEndTransaction(dummyCol string, commit bool) error {
request := message.NewIRODSMessageModColRequest(dummyCol)

if commit {
request.AddKeyVal(common.COLLECTION_TYPE_KW, "NULL_SPECIAL_VALUE")
}

requestMessage, err := request.GetMessage()
if err != nil {
return fmt.Errorf("could not make a poor mans end transaction request message - %v", err)
}

err = conn.SendMessage(requestMessage)
if err != nil {
return fmt.Errorf("could not send a poor mans end transaction request message - %v", err)
}

// Server responds with results
responseMessage, err := conn.ReadMessage()
if err != nil {
return fmt.Errorf("could not receive a poor mans end transaction response message - %v", err)
}

response := message.IRODSMessageModColResponse{}
err = response.FromMessage(responseMessage)
err := conn.Request(request, &response, nil)
if err != nil {
return fmt.Errorf("could not receive a poor mans end transaction response message - %v", err)
return fmt.Errorf("could not make a poor mans end transaction - %v", err)
}

if !commit {
Expand Down
31 changes: 27 additions & 4 deletions irods/connection/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ type CheckErrorResponse interface {
}

// Request sends a request and expects a response.
func (conn *IRODSConnection) Request(request Request, response Response) error {
// bsBuffer is optional
func (conn *IRODSConnection) Request(request Request, response Response, bsBuffer []byte) error {
requestMessage, err := request.GetMessage()
if err != nil {
return fmt.Errorf("could not make a request message - %v", err)
Expand All @@ -41,7 +42,8 @@ func (conn *IRODSConnection) Request(request Request, response Response) error {
}

// Server responds with results
responseMessage, err := conn.ReadMessage()
// external bs buffer
responseMessage, err := conn.ReadMessage(bsBuffer)
if err != nil {
return fmt.Errorf("could not receive a response message - %v", err)
}
Expand All @@ -60,9 +62,30 @@ func (conn *IRODSConnection) Request(request Request, response Response) error {
return nil
}

// RequestWithoutResponse sends a request but does not wait for a response.
func (conn *IRODSConnection) RequestWithoutResponse(request Request) error {
requestMessage, err := request.GetMessage()
if err != nil {
return fmt.Errorf("could not make a request message - %v", err)
}

// translate xml.Marshal XML into irods-understandable XML (among others, replace &#34; by &quot;)
err = conn.PreprocessMessage(requestMessage)
if err != nil {
return fmt.Errorf("could not send preprocess message - %v", err)
}

err = conn.SendMessage(requestMessage)
if err != nil {
return fmt.Errorf("could not send a request message - %v", err)
}

return nil
}

// RequestAndCheck sends a request and expects a CheckErrorResponse, on which the error is already checked.
func (conn *IRODSConnection) RequestAndCheck(request Request, response CheckErrorResponse) error {
if err := conn.Request(request, response); err != nil {
func (conn *IRODSConnection) RequestAndCheck(request Request, response CheckErrorResponse, bsBuffer []byte) error {
if err := conn.Request(request, response, bsBuffer); err != nil {
return err
}

Expand Down
Loading

0 comments on commit db43f07

Please sign in to comment.