From db43f07f767a4c49b5e7724dbfcc7440956ea195 Mon Sep 17 00:00:00 2001 From: Illyoung Choi Date: Mon, 2 May 2022 12:05:43 -0700 Subject: [PATCH] Change file read API prototype for zero-copy --- fs/file_handle.go | 34 ++++---- irods/connection/connection.go | 123 +++++++++------------------ irods/connection/request_response.go | 31 ++++++- irods/fs/admin.go | 18 ++-- irods/fs/collection.go | 28 +++--- irods/fs/data_object.go | 83 +++++++++--------- irods/fs/data_object_bulk.go | 76 +++++++++-------- irods/fs/resource.go | 6 +- irods/fs/ticket.go | 4 +- irods/fs/usergroup.go | 20 ++--- irods/message/message.go | 18 ++-- test/testcases/fs_api_test.go | 15 ++-- test/testcases/fs_io_test.go | 68 +++++++++++++++ test/testcases/fs_test.go | 29 ++++--- test/testcases/main.go | 33 +++++++ 15 files changed, 345 insertions(+), 241 deletions(-) create mode 100644 test/testcases/fs_io_test.go diff --git a/fs/file_handle.go b/fs/file_handle.go index 4c4690a..c2a62c9 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -135,53 +135,53 @@ func (handle *FileHandle) Truncate(size int64) error { return nil } -// Read reads the file -func (handle *FileHandle) Read(length int) ([]byte, error) { +// Read reads the file, implements io.Reader.Read +func (handle *FileHandle) Read(buffer []byte) (int, error) { handle.mutex.Lock() defer handle.mutex.Unlock() if !handle.IsReadMode() { - return nil, fmt.Errorf("file is opened with %s mode", handle.openmode) + return 0, fmt.Errorf("file is opened with %s mode", handle.openmode) } - bytes, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, length) - if err != nil { - return nil, err + readLen, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, buffer) + if readLen > 0 { + handle.offset += int64(readLen) } - handle.offset += int64(len(bytes)) - return bytes, nil + // it is possible to return readLen + EOF + return readLen, err } // ReadAt reads data from given offset -func (handle *FileHandle) ReadAt(offset int64, length int) ([]byte, error) { +func (handle *FileHandle) ReadAt(buffer []byte, offset int64) (int, error) { handle.mutex.Lock() defer handle.mutex.Unlock() if !handle.IsReadMode() { - return nil, fmt.Errorf("file is opened with %s mode", handle.openmode) + return 0, fmt.Errorf("file is opened with %s mode", handle.openmode) } if handle.offset != offset { newOffset, err := irods_fs.SeekDataObject(handle.connection, handle.irodsfilehandle, offset, types.SeekSet) if err != nil { - return nil, err + return 0, err } handle.offset = newOffset if newOffset != offset { - return nil, fmt.Errorf("failed to seek to %d", offset) + return 0, fmt.Errorf("failed to seek to %d", offset) } } - bytes, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, length) - if err != nil { - return nil, err + readLen, err := irods_fs.ReadDataObject(handle.connection, handle.irodsfilehandle, buffer) + if readLen > 0 { + handle.offset += int64(readLen) } - handle.offset += int64(len(bytes)) - return bytes, nil + // it is possible to return readLen + EOF + return readLen, err } // Write writes the file diff --git a/irods/connection/connection.go b/irods/connection/connection.go index d27cde5..0da812d 100644 --- a/irods/connection/connection.go +++ b/irods/connection/connection.go @@ -158,18 +158,13 @@ func (conn *IRODSConnection) connectWithCSNegotiation() (*types.IRODSVersion, er logger.Debug("Start up a connection with CS Negotiation") startup := message.NewIRODSMessageStartupPack(conn.account, conn.applicationName, true) - startupMessage, err := startup.GetMessage() + err := conn.RequestWithoutResponse(startup) if err != nil { - return nil, fmt.Errorf("could not make a startup message - %v", err) - } - - err = conn.SendMessage(startupMessage) - if err != nil { - return nil, fmt.Errorf("could not send a startup message - %v", err) + return nil, fmt.Errorf("could not send a startup - %v", err) } // Server responds with negotiation response - negotiationMessage, err := conn.ReadMessage() + negotiationMessage, err := conn.ReadMessage(nil) if err != nil { return nil, fmt.Errorf("could not receive a negotiation message - %v", err) } @@ -216,7 +211,7 @@ func (conn *IRODSConnection) connectWithCSNegotiation() (*types.IRODSVersion, er // Send negotiation result to server negotiationResult := message.NewIRODSMessageCSNegotiation(policyResult) version := message.IRODSMessageVersion{} - err = conn.Request(negotiationResult, &version) + err = conn.Request(negotiationResult, &version, nil) if err != nil { return nil, fmt.Errorf("could not receive a version message - %v", err) } @@ -247,7 +242,7 @@ func (conn *IRODSConnection) connectWithoutCSNegotiation() (*types.IRODSVersion, startup := message.NewIRODSMessageStartupPack(conn.account, conn.applicationName, false) version := message.IRODSMessageVersion{} - err := conn.Request(startup, &version) + err := conn.Request(startup, &version, nil) if err != nil { return nil, fmt.Errorf("could not receive a version message - %v", err) } @@ -300,24 +295,14 @@ func (conn *IRODSConnection) sslStartup() error { // Send a ssl setting sslSetting := message.NewIRODSMessageSSLSettings(irodsSSLConfig.EncryptionAlgorithm, irodsSSLConfig.EncryptionKeySize, irodsSSLConfig.SaltSize, irodsSSLConfig.HashRounds) - sslSettingMessage, err := sslSetting.GetMessage() - if err != nil { - return fmt.Errorf("could not make a ssl setting message - %v", err) - } - - err = conn.SendMessage(sslSettingMessage) + err = conn.RequestWithoutResponse(sslSetting) if err != nil { return fmt.Errorf("could not send a ssl setting message - %v", err) } // Send a shared secret sslSharedSecret := message.NewIRODSMessageSSLSharedSecret(encryptionKey) - sslSharedSecretMessage, err := sslSharedSecret.GetMessage() - if err != nil { - return fmt.Errorf("could not make a ssl shared secret message - %v", err) - } - - err = conn.SendMessage(sslSharedSecretMessage) + err = conn.RequestWithoutResponse(sslSharedSecret) if err != nil { return fmt.Errorf("could not send a ssl shared secret message - %v", err) } @@ -329,7 +314,7 @@ func (conn *IRODSConnection) login(password string) error { // authenticate authRequest := message.NewIRODSMessageAuthRequest() authChallenge := message.IRODSMessageAuthChallenge{} - err := conn.Request(authRequest, &authChallenge) + err := conn.Request(authRequest, &authChallenge, nil) if err != nil { return fmt.Errorf("could not receive an authentication challenge message body") } @@ -341,7 +326,7 @@ func (conn *IRODSConnection) login(password string) error { authResponse := message.NewIRODSMessageAuthResponse(encodedPassword, conn.account.ProxyUser) authResult := message.IRODSMessageAuthResult{} - return conn.RequestAndCheck(authResponse, &authResult) + return conn.RequestAndCheck(authResponse, &authResult, nil) } func (conn *IRODSConnection) loginNative(password string) error { @@ -381,7 +366,7 @@ func (conn *IRODSConnection) loginPAM() error { // authenticate pamAuthRequest := message.NewIRODSMessagePamAuthRequest(conn.account.ClientUser, conn.account.Password, ttl) pamAuthResponse := message.IRODSMessagePamAuthResponse{} - err := conn.Request(pamAuthRequest, &pamAuthResponse) + err := conn.Request(pamAuthRequest, &pamAuthResponse, nil) if err != nil { return fmt.Errorf("could not receive an authentication challenge message") } @@ -406,7 +391,7 @@ func (conn *IRODSConnection) showTicket() error { // show the ticket ticketRequest := message.NewIRODSMessageTicketAdminRequest("session", conn.account.Ticket) ticketResult := message.IRODSMessageTicketAdminResponse{} - return conn.RequestAndCheck(ticketRequest, &ticketResult) + return conn.RequestAndCheck(ticketRequest, &ticketResult, nil) } return nil } @@ -433,14 +418,9 @@ func (conn *IRODSConnection) Disconnect() error { logger.Debug("Disconnecting the connection") disconnect := message.NewIRODSMessageDisconnect() - disconnectMessage, err := disconnect.GetMessage() + err := conn.RequestWithoutResponse(disconnect) if err != nil { - return fmt.Errorf("could not make a disconnect request message - %v", err) - } - - err = conn.SendMessage(disconnectMessage) - if err != nil { - return fmt.Errorf("could not send a disconnect request message - %v", err) + return fmt.Errorf("could not send a disconnect request - %v", err) } conn.lastSuccessfulAccess = time.Now() @@ -589,7 +569,7 @@ func (conn *IRODSConnection) SendMessage(msg *message.IRODSMessage) error { } // readMessageHeader reads data from the given connection and returns iRODSMessageHeader -func (conn *IRODSConnection) ReadMessageHeader() (*message.IRODSMessageHeader, error) { +func (conn *IRODSConnection) readMessageHeader() (*message.IRODSMessageHeader, error) { // read header size headerLenBuffer := make([]byte, 4) readLen, err := conn.Recv(headerLenBuffer, 4) @@ -625,26 +605,41 @@ func (conn *IRODSConnection) ReadMessageHeader() (*message.IRODSMessageHeader, e } // ReadMessage reads data from the given socket and returns IRODSMessage -func (conn *IRODSConnection) ReadMessage() (*message.IRODSMessage, error) { - header, err := conn.ReadMessageHeader() +// if bsBuffer is given, bs data will be written directly to the bsBuffer +// if not given, a new buffer will be allocated. +func (conn *IRODSConnection) ReadMessage(bsBuffer []byte) (*message.IRODSMessage, error) { + header, err := conn.readMessageHeader() if err != nil { return nil, err } // read body - bodyLen := header.MessageLen + header.ErrorLen + header.BsLen + bodyLen := header.MessageLen + header.ErrorLen bodyBuffer := make([]byte, bodyLen) + if bsBuffer == nil { + bsBuffer = make([]byte, int(header.BsLen)) + } else if len(bsBuffer) < int(header.BsLen) { + return nil, fmt.Errorf("provided bs buffer is too short, %d size is given, but %d size is required", len(bsBuffer), int(header.BsLen)) + } - readLen, err := conn.Recv(bodyBuffer, int(bodyLen)) + bodyReadLen, err := conn.Recv(bodyBuffer, int(bodyLen)) if err != nil { return nil, fmt.Errorf("could not read body - %v", err) } - if readLen != int(bodyLen) { - return nil, fmt.Errorf("could not read body fully - %d requested but %d read", bodyLen, readLen) + if bodyReadLen != int(bodyLen) { + return nil, fmt.Errorf("could not read body fully - %d requested but %d read", bodyLen, bodyReadLen) + } + + bsReadLen, err := conn.Recv(bsBuffer, int(header.BsLen)) + if err != nil { + return nil, fmt.Errorf("could not read body (BS) - %v", err) + } + if bsReadLen != int(header.BsLen) { + return nil, fmt.Errorf("could not read body (BS) fully - %d requested but %d read", int(header.BsLen), bsReadLen) } body := message.IRODSMessageBody{} - err = body.FromBytes(header, bodyBuffer) + err = body.FromBytes(header, bodyBuffer, bsBuffer[:int(header.BsLen)]) if err != nil { return nil, err } @@ -684,59 +679,19 @@ func (conn *IRODSConnection) PoorMansRollback() error { func (conn *IRODSConnection) endTransaction(commit bool) error { request := message.NewIRODSMessageEndTransactionRequest(commit) - requestMessage, err := request.GetMessage() - if err != nil { - return fmt.Errorf("could not make a end transaction request message - %v", err) - } - - err = conn.SendMessage(requestMessage) - if err != nil { - return fmt.Errorf("could not send a end transaction request message - %v", err) - } - - // Server responds with results - responseMessage, err := conn.ReadMessage() - if err != nil { - return fmt.Errorf("could not receive a end transaction response message - %v", err) - } - response := message.IRODSMessageEndTransactionResponse{} - err = response.FromMessage(responseMessage) - if err != nil { - return fmt.Errorf("could not receive a end transaction response message - %v", err) - } - - err = response.CheckError() - return err + return conn.RequestAndCheck(request, &response, nil) } func (conn *IRODSConnection) poorMansEndTransaction(dummyCol string, commit bool) error { request := message.NewIRODSMessageModColRequest(dummyCol) - if commit { request.AddKeyVal(common.COLLECTION_TYPE_KW, "NULL_SPECIAL_VALUE") } - - requestMessage, err := request.GetMessage() - if err != nil { - return fmt.Errorf("could not make a poor mans end transaction request message - %v", err) - } - - err = conn.SendMessage(requestMessage) - if err != nil { - return fmt.Errorf("could not send a poor mans end transaction request message - %v", err) - } - - // Server responds with results - responseMessage, err := conn.ReadMessage() - if err != nil { - return fmt.Errorf("could not receive a poor mans end transaction response message - %v", err) - } - response := message.IRODSMessageModColResponse{} - err = response.FromMessage(responseMessage) + err := conn.Request(request, &response, nil) if err != nil { - return fmt.Errorf("could not receive a poor mans end transaction response message - %v", err) + return fmt.Errorf("could not make a poor mans end transaction - %v", err) } if !commit { diff --git a/irods/connection/request_response.go b/irods/connection/request_response.go index c3fd2f5..bc0cda5 100644 --- a/irods/connection/request_response.go +++ b/irods/connection/request_response.go @@ -23,7 +23,8 @@ type CheckErrorResponse interface { } // Request sends a request and expects a response. -func (conn *IRODSConnection) Request(request Request, response Response) error { +// bsBuffer is optional +func (conn *IRODSConnection) Request(request Request, response Response, bsBuffer []byte) error { requestMessage, err := request.GetMessage() if err != nil { return fmt.Errorf("could not make a request message - %v", err) @@ -41,7 +42,8 @@ func (conn *IRODSConnection) Request(request Request, response Response) error { } // Server responds with results - responseMessage, err := conn.ReadMessage() + // external bs buffer + responseMessage, err := conn.ReadMessage(bsBuffer) if err != nil { return fmt.Errorf("could not receive a response message - %v", err) } @@ -60,9 +62,30 @@ func (conn *IRODSConnection) Request(request Request, response Response) error { return nil } +// RequestWithoutResponse sends a request but does not wait for a response. +func (conn *IRODSConnection) RequestWithoutResponse(request Request) error { + requestMessage, err := request.GetMessage() + if err != nil { + return fmt.Errorf("could not make a request message - %v", err) + } + + // translate xml.Marshal XML into irods-understandable XML (among others, replace " by ") + err = conn.PreprocessMessage(requestMessage) + if err != nil { + return fmt.Errorf("could not send preprocess message - %v", err) + } + + err = conn.SendMessage(requestMessage) + if err != nil { + return fmt.Errorf("could not send a request message - %v", err) + } + + return nil +} + // RequestAndCheck sends a request and expects a CheckErrorResponse, on which the error is already checked. -func (conn *IRODSConnection) RequestAndCheck(request Request, response CheckErrorResponse) error { - if err := conn.Request(request, response); err != nil { +func (conn *IRODSConnection) RequestAndCheck(request Request, response CheckErrorResponse, bsBuffer []byte) error { + if err := conn.Request(request, response, bsBuffer); err != nil { return err } diff --git a/irods/fs/admin.go b/irods/fs/admin.go index 3dc4c02..4f3d9d0 100644 --- a/irods/fs/admin.go +++ b/irods/fs/admin.go @@ -36,61 +36,61 @@ func AddUser(conn *connection.IRODSConnection, username string, password string) req := message.NewIRODSMessageUserAdminRequest("mkuser", username, scrambledPassword) - return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}, nil) } // AddGroup adds a group. func AddGroup(conn *connection.IRODSConnection, group string) error { req := message.NewIRODSMessageUserAdminRequest("mkgroup", group, string(types.IRODSUserRodsGroup)) - return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}, nil) } // AddChildToResc adds a child to a parent resource func AddChildToResc(conn *connection.IRODSConnection, parent string, child string, options string) error { req := message.NewIRODSMessageAdminRequest("add", "childtoresc", parent, child, options) - return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}, nil) } // AddToGroup adds a user to a group. func AddToGroup(conn *connection.IRODSConnection, group string, user string) error { req := message.NewIRODSMessageUserAdminRequest("modify", "group", group, "add", user) - return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}, nil) } // RmFromGroup removes a user from a group. func RmFromGroup(conn *connection.IRODSConnection, group string, user string) error { req := message.NewIRODSMessageUserAdminRequest("modify", "group", group, "remove", user) - return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageUserAdminResponse{}, nil) } // ChangeUserType changes the type / role of a user object func ChangeUserType(conn *connection.IRODSConnection, user string, newType string) error { req := message.NewIRODSMessageAdminRequest("modify", "user", user, "type", newType) - return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}, nil) } // RmUser removes a user or a group. func RmUser(conn *connection.IRODSConnection, user string) error { req := message.NewIRODSMessageAdminRequest("rm", "user", user) - return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}, nil) } // SetUserQuota sets quota for a given user and resource ('total' for global) func SetUserQuota(conn *connection.IRODSConnection, user string, resource string, value string) error { req := message.NewIRODSMessageAdminRequest("set-quota", "user", user, resource, value) - return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}, nil) } // SetGroupQuota sets quota for a given user and resource ('total' for global) func SetGroupQuota(conn *connection.IRODSConnection, group string, resource string, value string) error { req := message.NewIRODSMessageAdminRequest("set-quota", "group", group, resource, value) - return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}) + return conn.RequestAndCheck(req, &message.IRODSMessageAdminResponse{}, nil) } diff --git a/irods/fs/collection.go b/irods/fs/collection.go index bd0d754..d4b9cc4 100644 --- a/irods/fs/collection.go +++ b/irods/fs/collection.go @@ -57,7 +57,7 @@ func GetCollection(conn *connection.IRODSConnection, path string) (*types.IRODSC query.AddCondition(common.ICAT_COLUMN_COLL_NAME, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection query result message - %v", err) } @@ -158,7 +158,7 @@ func ListCollectionMeta(conn *connection.IRODSConnection, path string) ([]*types query.AddCondition(common.ICAT_COLUMN_COLL_NAME, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection metadata query result message - %v", err) } @@ -255,7 +255,7 @@ func ListCollectionAccess(conn *connection.IRODSConnection, path string) ([]*typ query.AddCondition(common.ICAT_COLUMN_COLL_NAME, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection access query result message - %v", err) } @@ -350,7 +350,7 @@ func ListSubCollections(conn *connection.IRODSConnection, path string) ([]*types query.AddCondition(common.ICAT_COLUMN_COLL_PARENT_NAME, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection query result message - %v", err) } @@ -447,7 +447,7 @@ func CreateCollection(conn *connection.IRODSConnection, path string, recurse boo request := message.NewIRODSMessageMkcolRequest(path, recurse) response := message.IRODSMessageMkcolResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } // DeleteCollection deletes a collection for the path @@ -460,7 +460,7 @@ func DeleteCollection(conn *connection.IRODSConnection, path string, recurse boo request := message.NewIRODSMessageRmcolRequest(path, recurse, force) response := message.IRODSMessageRmcolResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a collection") @@ -481,7 +481,7 @@ func DeleteCollection(conn *connection.IRODSConnection, path string, recurse boo return fmt.Errorf("could not reply to a collection deletion response message - %v", err) } - responseMessageReply, err := conn.ReadMessage() + responseMessageReply, err := conn.ReadMessage(nil) if err != nil { return fmt.Errorf("could not receive a collection deletion response message - %v", err) } @@ -502,7 +502,7 @@ func MoveCollection(conn *connection.IRODSConnection, srcPath string, destPath s request := message.NewIRODSMessageMvcolRequest(srcPath, destPath) response := message.IRODSMessageMvcolResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a collection") } @@ -520,7 +520,7 @@ func AddCollectionMeta(conn *connection.IRODSConnection, path string, metadata * request := message.NewIRODSMessageAddMetadataRequest(types.IRODSCollectionMetaItemType, path, metadata) response := message.IRODSMessageModMetaResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } // DeleteCollectionMeta sets metadata of a data object for the path to the given key values. @@ -543,7 +543,7 @@ func DeleteCollectionMeta(conn *connection.IRODSConnection, path string, metadat } response := message.IRODSMessageModMetaResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a collection") } @@ -576,7 +576,7 @@ func SearchCollectionsByMeta(conn *connection.IRODSConnection, metaName string, query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_VALUE, metaValueCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection query result message - %v", err) } @@ -690,7 +690,7 @@ func SearchCollectionsByMetaWildcard(conn *connection.IRODSConnection, metaName query.AddCondition(common.ICAT_COLUMN_META_COLL_ATTR_VALUE, metaValueCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a collection query result message - %v", err) } @@ -794,7 +794,7 @@ func ChangeAccessControlCollection(conn *connection.IRODSConnection, path string request := message.NewIRODSMessageModAccessRequest(access.ChmodString(), userName, zoneName, path, recursive, adminFlag) response := message.IRODSMessageModAccessResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a collection") } @@ -817,7 +817,7 @@ func SetInheritAccessControl(conn *connection.IRODSConnection, path string, inhe request := message.NewIRODSMessageModAccessRequest(inheritStr, "", "", path, recursive, adminFlag) response := message.IRODSMessageModAccessResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a collection") } diff --git a/irods/fs/data_object.go b/irods/fs/data_object.go index 9f3e253..3c02f51 100644 --- a/irods/fs/data_object.go +++ b/irods/fs/data_object.go @@ -2,6 +2,7 @@ package fs import ( "fmt" + "io" "strconv" "time" @@ -84,7 +85,7 @@ func GetDataObject(conn *connection.IRODSConnection, collection *types.IRODSColl query.AddCondition(common.ICAT_COLUMN_DATA_NAME, pathCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -262,7 +263,7 @@ func GetDataObjectMasterReplica(conn *connection.IRODSConnection, collection *ty query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'") queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -445,7 +446,7 @@ func ListDataObjects(conn *connection.IRODSConnection, collection *types.IRODSCo query.AddCondition(common.ICAT_COLUMN_COLL_NAME, collCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -620,7 +621,7 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection * query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'") queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -793,7 +794,7 @@ func ListDataObjectMeta(conn *connection.IRODSConnection, collection *types.IROD query.AddCondition(common.ICAT_COLUMN_DATA_NAME, nameCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object metadata query result message - %v", err) } @@ -892,7 +893,7 @@ func ListDataObjectAccess(conn *connection.IRODSConnection, collection *types.IR query.AddCondition(common.ICAT_COLUMN_DATA_NAME, nameCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object access query result message - %v", err) } @@ -973,7 +974,7 @@ func DeleteDataObject(conn *connection.IRODSConnection, path string, force bool) request := message.NewIRODSMessageRmobjRequest(path, force) response := message.IRODSMessageRmobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -990,7 +991,7 @@ func MoveDataObject(conn *connection.IRODSConnection, srcPath string, destPath s request := message.NewIRODSMessageMvobjRequest(srcPath, destPath) response := message.IRODSMessageMvobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1005,7 +1006,7 @@ func CopyDataObject(conn *connection.IRODSConnection, srcPath string, destPath s request := message.NewIRODSMessageCpobjRequest(srcPath, destPath) response := message.IRODSMessageCpobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1020,7 +1021,7 @@ func TruncateDataObject(conn *connection.IRODSConnection, path string, size int6 request := message.NewIRODSMessageTruncobjRequest(path, size) response := message.IRODSMessageTruncobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1050,7 +1051,7 @@ func ReplicateDataObject(conn *connection.IRODSConnection, path string, resource } response := message.IRODSMessageReplobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1076,7 +1077,7 @@ func TrimDataObject(conn *connection.IRODSConnection, path string, resource stri } response := message.IRODSMessageTrimobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1101,7 +1102,7 @@ func CreateDataObject(conn *connection.IRODSConnection, path string, resource st request := message.NewIRODSMessageCreateobjRequest(path, resource, fileOpenMode, force) response := message.IRODSMessageCreateobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { return nil, err } @@ -1133,7 +1134,7 @@ func OpenDataObject(conn *connection.IRODSConnection, path string, resource stri request := message.NewIRODSMessageOpenobjRequest(path, resource, fileOpenMode) response := message.IRODSMessageOpenobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return nil, -1, types.NewFileNotFoundErrorf("could not find a data object") @@ -1178,7 +1179,7 @@ func OpenDataObjectWithReplicaToken(conn *connection.IRODSConnection, path strin request := message.NewIRODSMessageOpenobjRequestWithReplicaToken(path, fileOpenMode, resourceHierarchy, replicaToken) response := message.IRODSMessageOpenobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return nil, -1, types.NewFileNotFoundErrorf("could not find a data object") @@ -1223,7 +1224,7 @@ func OpenDataObjectWithOperation(conn *connection.IRODSConnection, path string, request := message.NewIRODSMessageOpenobjRequestWithOperation(path, resource, fileOpenMode, oper) response := message.IRODSMessageOpenobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return nil, types.NewFileNotFoundErrorf("could not find a data object") @@ -1259,7 +1260,7 @@ func GetReplicaAccessInfo(conn *connection.IRODSConnection, handle *types.IRODSF request := message.NewIRODSMessageDescriptorInfoRequest(handle.FileDescriptor) response := message.IRODSMessageDescriptorInfoResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return "", "", types.NewFileNotFoundErrorf("could not find a data object") @@ -1279,7 +1280,7 @@ func SeekDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHan request := message.NewIRODSMessageSeekobjRequest(handle.FileDescriptor, offset, whence) response := message.IRODSMessageSeekobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return -1, types.NewFileNotFoundErrorf("could not find a data object") @@ -1292,25 +1293,31 @@ func SeekDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHan } // ReadDataObject reads data from a data object -func ReadDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHandle, length int) ([]byte, error) { +func ReadDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHandle, buffer []byte) (int, error) { if conn == nil || !conn.IsConnected() { - return nil, fmt.Errorf("connection is nil or disconnected") + return 0, fmt.Errorf("connection is nil or disconnected") } conn.IncreaseDataObjectMetricsRead(1) - request := message.NewIRODSMessageReadobjRequest(handle.FileDescriptor, length) + request := message.NewIRODSMessageReadobjRequest(handle.FileDescriptor, len(buffer)) response := message.IRODSMessageReadobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, buffer) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { - return nil, types.NewFileNotFoundErrorf("could not find a data object") + return 0, types.NewFileNotFoundErrorf("could not find a data object") } - return nil, err + return 0, err + } + + readLen := len(response.Data) + if readLen < len(buffer) { + // EOF + return readLen, io.EOF } - return response.Data, nil + return readLen, nil } // WriteDataObject writes data to a data object @@ -1323,7 +1330,7 @@ func WriteDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHa request := message.NewIRODSMessageWriteobjRequest(handle.FileDescriptor, data) response := message.IRODSMessageWriteobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1350,7 +1357,7 @@ func TruncateDataObjectHandle(conn *connection.IRODSConnection, handle *types.IR // close request1 := message.NewIRODSMessageCloseobjRequest(handle.FileDescriptor) response1 := message.IRODSMessageCloseobjResponse{} - err = conn.RequestAndCheck(request1, &response1) + err = conn.RequestAndCheck(request1, &response1, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1358,7 +1365,7 @@ func TruncateDataObjectHandle(conn *connection.IRODSConnection, handle *types.IR // truncate request2 := message.NewIRODSMessageTruncobjRequest(handle.Path, size) response2 := message.IRODSMessageTruncobjResponse{} - err = conn.RequestAndCheck(request2, &response2) + err = conn.RequestAndCheck(request2, &response2, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1366,7 +1373,7 @@ func TruncateDataObjectHandle(conn *connection.IRODSConnection, handle *types.IR // reopen request3 := message.NewIRODSMessageOpenobjRequestWithOperation(handle.Path, handle.Resource, handle.OpenMode, handle.Oper) response3 := message.IRODSMessageOpenobjResponse{} - err = conn.RequestAndCheck(request3, &response3) + err = conn.RequestAndCheck(request3, &response3, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") @@ -1380,7 +1387,7 @@ func TruncateDataObjectHandle(conn *connection.IRODSConnection, handle *types.IR // seek request4 := message.NewIRODSMessageSeekobjRequest(handle.FileDescriptor, offset, types.SeekSet) response4 := message.IRODSMessageSeekobjResponse{} - err = conn.RequestAndCheck(request4, &response4) + err = conn.RequestAndCheck(request4, &response4, nil) if err != nil { if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") @@ -1400,7 +1407,7 @@ func CloseDataObject(conn *connection.IRODSConnection, handle *types.IRODSFileHa request := message.NewIRODSMessageCloseobjRequest(handle.FileDescriptor) response := message.IRODSMessageCloseobjResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1418,7 +1425,7 @@ func AddDataObjectMeta(conn *connection.IRODSConnection, path string, metadata * request := message.NewIRODSMessageAddMetadataRequest(types.IRODSDataObjectMetaItemType, path, metadata) response := message.IRODSMessageModMetaResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1445,7 +1452,7 @@ func DeleteDataObjectMeta(conn *connection.IRODSConnection, path string, metadat } response := message.IRODSMessageModMetaResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -1490,7 +1497,7 @@ func SearchDataObjectsByMeta(conn *connection.IRODSConnection, metaName string, query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -1685,7 +1692,7 @@ func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, meta query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'") queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -1888,7 +1895,7 @@ func SearchDataObjectsByMetaWildcard(conn *connection.IRODSConnection, metaName query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -2084,7 +2091,7 @@ func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnecti query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'") queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a data object query result message - %v", err) } @@ -2258,7 +2265,7 @@ func ChangeAccessControlDataObject(conn *connection.IRODSConnection, path string request := message.NewIRODSMessageModAccessRequest(access.ChmodString(), userName, zoneName, path, false, adminFlag) response := message.IRODSMessageModAccessResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } diff --git a/irods/fs/data_object_bulk.go b/irods/fs/data_object_bulk.go index 11b40c1..eb7b67e 100644 --- a/irods/fs/data_object_bulk.go +++ b/irods/fs/data_object_bulk.go @@ -29,7 +29,7 @@ func CloseDataObjectReplica(conn *connection.IRODSConnection, handle *types.IROD request := message.NewIRODSMessageClosereplicaRequest(handle.FileDescriptor, false, false, false, false) response := message.IRODSMessageClosereplicaResponse{} - err := conn.RequestAndCheck(request, &response) + err := conn.RequestAndCheck(request, &response, nil) if types.GetIRODSErrorCode(err) == common.CAT_NO_ROWS_FOUND { return types.NewFileNotFoundErrorf("could not find a data object") } @@ -552,21 +552,21 @@ func DownloadDataObject(session *session.IRODSSession, irodsPath string, resourc } defer f.Close() + buffer := make([]byte, common.ReadWriteBufferSize) // copy for { - buffer, err := ReadDataObject(conn, handle, common.ReadWriteBufferSize) - if err != nil { + readLen, err := ReadDataObject(conn, handle, buffer) + if err != nil && err != io.EOF { return err } - if len(buffer) == 0 { - // EOF - return nil + _, err2 := f.Write(buffer[:readLen]) + if err2 != nil { + return err2 } - _, err = f.Write(buffer) - if err != nil { - return err + if err == io.EOF { + return nil } } } @@ -644,30 +644,32 @@ func DownloadDataObjectParallel(session *session.IRODSSession, irodsPath string, taskRemain := taskLength // copy + buffer := make([]byte, common.ReadWriteBufferSize) for taskRemain > 0 { toCopy := taskRemain if toCopy >= int64(common.ReadWriteBufferSize) { toCopy = int64(common.ReadWriteBufferSize) } - buffer, taskErr := ReadDataObject(taskConn, taskHandle, int(toCopy)) - if taskErr != nil { - errChan <- taskErr - return + readLen, taskErr := ReadDataObject(taskConn, taskHandle, buffer[:toCopy]) + if readLen > 0 { + _, taskErr2 := f.WriteAt(buffer[:readLen], taskOffset+(taskLength-taskRemain)) + if taskErr2 != nil { + errChan <- taskErr2 + return + } + taskRemain -= int64(readLen) } - if len(buffer) == 0 { - // EOF + if taskErr != nil && taskErr != io.EOF { + errChan <- taskErr return } - _, taskErr = f.WriteAt(buffer, taskOffset+(taskLength-taskRemain)) - if taskErr != nil { - errChan <- taskErr + if taskErr == io.EOF { + // EOF return } - - taskRemain -= int64(len(buffer)) } } @@ -776,15 +778,16 @@ func DownloadDataObjectParallelInBlocksAsync(session *session.IRODSSession, irod } defer f.Close() + buffer := make([]byte, common.ReadWriteBufferSize) for { taskOffset, ok := <-inputChan if !ok { break } - taskNewOffset, err := SeekDataObject(taskConn, taskHandle, taskOffset, types.SeekSet) - if err != nil { - errChan <- fmt.Errorf("could not seek a data object - %v", err) + taskNewOffset, readErr := SeekDataObject(taskConn, taskHandle, taskOffset, types.SeekSet) + if readErr != nil { + errChan <- fmt.Errorf("could not seek a data object - %v", readErr) return } @@ -802,23 +805,24 @@ func DownloadDataObjectParallelInBlocksAsync(session *session.IRODSSession, irod toCopy = int64(common.ReadWriteBufferSize) } - buffer, err := ReadDataObject(taskConn, taskHandle, int(toCopy)) - if err != nil { - errChan <- err + readLen, readErr := ReadDataObject(taskConn, taskHandle, buffer[:toCopy]) + if readLen > 0 { + _, readErr2 := f.WriteAt(buffer[:readLen], taskOffset+(blockSize-taskRemain)) + if readErr2 != nil { + errChan <- readErr2 + return + } + taskRemain -= int64(readLen) + } + + if readErr != nil && readErr != io.EOF { + errChan <- readErr return } - if len(buffer) == 0 { + if readErr == io.EOF { // EOF - break - } else { - _, err = f.WriteAt(buffer, taskOffset+(blockSize-taskRemain)) - if err != nil { - errChan <- err - return - } - - taskRemain -= int64(len(buffer)) + return } } diff --git a/irods/fs/resource.go b/irods/fs/resource.go index 295a4a9..2ae6c1f 100644 --- a/irods/fs/resource.go +++ b/irods/fs/resource.go @@ -42,7 +42,7 @@ func GetResource(conn *connection.IRODSConnection, name string) (*types.IRODSRes query.AddCondition(common.ICAT_COLUMN_R_RESC_NAME, rescCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a resource query result message - %v", err) } @@ -124,7 +124,7 @@ func AddResourceMeta(conn *connection.IRODSConnection, name string, metadata *ty request := message.NewIRODSMessageAddMetadataRequest(types.IRODSResourceMetaItemType, name, metadata) response := message.IRODSMessageModMetaResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } // DeleteResourceMeta sets metadata of a resource to the given key values. @@ -145,5 +145,5 @@ func DeleteResourceMeta(conn *connection.IRODSConnection, name string, metadata } response := message.IRODSMessageModMetaResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } diff --git a/irods/fs/ticket.go b/irods/fs/ticket.go index c84dad7..4e5e8a2 100644 --- a/irods/fs/ticket.go +++ b/irods/fs/ticket.go @@ -31,7 +31,7 @@ func GetTicketForAnonymousAccess(conn *connection.IRODSConnection, ticket string query.AddCondition(common.ICAT_COLUMN_TICKET_STRING, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a ticket query result message - %v", err) } @@ -131,7 +131,7 @@ func GetTicket(conn *connection.IRODSConnection, ticket string) (*types.IRODSTic query.AddCondition(common.ICAT_COLUMN_TICKET_STRING, condVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a ticket query result message - %v", err) } diff --git a/irods/fs/usergroup.go b/irods/fs/usergroup.go index d5e5984..85bde29 100644 --- a/irods/fs/usergroup.go +++ b/irods/fs/usergroup.go @@ -33,7 +33,7 @@ func GetGroup(conn *connection.IRODSConnection, group string) (*types.IRODSUser, query.AddCondition(common.ICAT_COLUMN_USER_TYPE, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a group query result message - %v", err) } @@ -127,7 +127,7 @@ func ListGroupUsers(conn *connection.IRODSConnection, group string) ([]*types.IR query.AddCondition(common.ICAT_COLUMN_COLL_USER_GROUP_NAME, condNameVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a group user query result message - %v", err) } @@ -222,7 +222,7 @@ func ListGroups(conn *connection.IRODSConnection) ([]*types.IRODSUser, error) { query.AddCondition(common.ICAT_COLUMN_USER_TYPE, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a group query result message - %v", err) } @@ -317,7 +317,7 @@ func ListUsers(conn *connection.IRODSConnection) ([]*types.IRODSUser, error) { query.AddCondition(common.ICAT_COLUMN_USER_TYPE, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a user query result message - %v", err) } @@ -409,7 +409,7 @@ func ListUserGroupNames(conn *connection.IRODSConnection, user string) ([]string query.AddCondition(common.ICAT_COLUMN_USER_NAME, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a group query result message - %v", err) } @@ -480,7 +480,7 @@ func ListUserResourceQuota(conn *connection.IRODSConnection, user string) ([]*ty query.AddCondition(common.ICAT_COLUMN_QUOTA_USER_NAME, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a quota query result message - %v", err) } @@ -568,7 +568,7 @@ func GetUserGlobalQuota(conn *connection.IRODSConnection, user string) (*types.I query.AddCondition(common.ICAT_COLUMN_QUOTA_RESC_ID, condTypeVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a quota query result message - %v", err) } @@ -637,7 +637,7 @@ func AddUserMeta(conn *connection.IRODSConnection, user string, metadata *types. request := message.NewIRODSMessageAddMetadataRequest(types.IRODSUserMetaItemType, user, metadata) response := message.IRODSMessageModMetaResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } // DeleteUserMeta removes the metadata of a user object. @@ -658,7 +658,7 @@ func DeleteUserMeta(conn *connection.IRODSConnection, user string, metadata *typ } response := message.IRODSMessageModMetaResponse{} - return conn.RequestAndCheck(request, &response) + return conn.RequestAndCheck(request, &response, nil) } // ListUserMeta returns a user metadata for the path @@ -682,7 +682,7 @@ func ListUserMeta(conn *connection.IRODSConnection, user string) ([]*types.IRODS query.AddCondition(common.ICAT_COLUMN_USER_NAME, nameCondVal) queryResult := message.IRODSMessageQueryResult{} - err := conn.Request(query, &queryResult) + err := conn.Request(query, &queryResult, nil) if err != nil { return nil, fmt.Errorf("could not receive a user metadata query result message - %v", err) } diff --git a/irods/message/message.go b/irods/message/message.go index c537358..cb5a8ba 100644 --- a/irods/message/message.go +++ b/irods/message/message.go @@ -37,7 +37,7 @@ type IRODSMessage struct { // IRODSMessageSerializationInterface is an interface for serializaing/deserializing of message type IRODSMessageSerializationInterface interface { GetBytes() ([]byte, error) - FromBytes([]byte) error + FromBytes(bodyBytes []byte, bsBytes []byte) error } // MakeIRODSMessageHeader makes a message header @@ -82,19 +82,23 @@ func (body *IRODSMessageBody) GetBytes() ([]byte, error) { } // FromBytes returns struct from bytes -func (body *IRODSMessageBody) FromBytes(header *IRODSMessageHeader, bytes []byte) error { - if len(bytes) < (int(header.MessageLen) + int(header.ErrorLen) + int(header.BsLen)) { - return fmt.Errorf("bytes given is too short to be parsed") +func (body *IRODSMessageBody) FromBytes(header *IRODSMessageHeader, bodyBytes []byte, bsBytes []byte) error { + if len(bodyBytes) < (int(header.MessageLen) + int(header.ErrorLen)) { + return fmt.Errorf("bodyBytes given is too short to be parsed") + } + + if len(bsBytes) < int(header.BsLen) { + return fmt.Errorf("bsBytes given is too short to be parsed") } offset := 0 - body.Message = bytes[offset : offset+int(header.MessageLen)] + body.Message = bodyBytes[offset : offset+int(header.MessageLen)] offset += int(header.MessageLen) - body.Error = bytes[offset : offset+int(header.ErrorLen)] + body.Error = bodyBytes[offset : offset+int(header.ErrorLen)] offset += int(header.ErrorLen) - body.Bs = bytes[offset : offset+int(header.BsLen)] + body.Bs = bsBytes[:int(header.BsLen)] return nil } diff --git a/test/testcases/fs_api_test.go b/test/testcases/fs_api_test.go index 00efbea..cf721ea 100644 --- a/test/testcases/fs_api_test.go +++ b/test/testcases/fs_api_test.go @@ -4,6 +4,7 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "io" "os" "path" "testing" @@ -500,10 +501,12 @@ func testReadWriteIRODSDataObject(t *testing.T) { handle, _, err = fs.OpenDataObject(conn, newDataObjectPath, "", "r") assert.NoError(t, err) - datarecv, err := fs.ReadDataObject(conn, handle, len(data)) + buf := make([]byte, len(data)) + recvLen, err := fs.ReadDataObject(conn, handle, buf) assert.NoError(t, err) - assert.Equal(t, data, string(datarecv)) + assert.Equal(t, len(data), recvLen) + assert.Equal(t, data, string(buf)) err = fs.CloseDataObject(conn, handle) assert.NoError(t, err) @@ -553,10 +556,12 @@ func testTruncateIRODSDataObject(t *testing.T) { handle, _, err = fs.OpenDataObject(conn, newDataObjectPath, "", "r") assert.NoError(t, err) - datarecv, err := fs.ReadDataObject(conn, handle, len(data)) - assert.NoError(t, err) + buf := make([]byte, len(data)) + recvLen, err := fs.ReadDataObject(conn, handle, buf) + assert.Equal(t, io.EOF, err) - assert.Equal(t, "Hello World", string(datarecv)) + assert.Equal(t, 11, recvLen) + assert.Equal(t, "Hello World", string(buf[:recvLen])) err = fs.CloseDataObject(conn, handle) assert.NoError(t, err) diff --git a/test/testcases/fs_io_test.go b/test/testcases/fs_io_test.go new file mode 100644 index 0000000..1533576 --- /dev/null +++ b/test/testcases/fs_io_test.go @@ -0,0 +1,68 @@ +package testcases + +import ( + "fmt" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/cyverse/go-irodsclient/fs" + "github.com/stretchr/testify/assert" +) + +func TestFSIO(t *testing.T) { + setup() + defer shutdown() + + t.Run("test UpDownMBFiles", testUpDownMBFiles) +} + +func testUpDownMBFiles(t *testing.T) { + account := GetTestAccount() + + account.ClientServerNegotiation = false + + fsConfig := fs.NewFileSystemConfigWithDefault("go-irodsclient-test") + + fs, err := fs.NewFileSystem(account, fsConfig) + assert.NoError(t, err) + defer fs.Release() + + homedir := fmt.Sprintf("/%s/home/%s", account.ClientZone, account.ClientUser) + + fileSize := int64(100 * 1024 * 1024) // 100MB + localPath, err := createLocalTestFile("test_file_", fileSize) + assert.NoError(t, err) + + iRODSPath := fmt.Sprintf("%s/%s", homedir, path.Base(localPath)) + localDownloadPath, err := filepath.Abs(fmt.Sprintf("./%s", path.Base(localPath))) + assert.NoError(t, err) + + for i := 0; i < 3; i++ { + start := time.Now() + err = fs.UploadFile(localPath, iRODSPath, "", false) + duration := time.Since(start) + + t.Logf("upload a file in size %d took time - %v", fileSize, duration) + assert.NoError(t, err) + + start = time.Now() + err = fs.DownloadFile(iRODSPath, "", localDownloadPath) + duration = time.Since(start) + + t.Logf("download a file in size %d took time - %v", fileSize, duration) + assert.NoError(t, err) + + // remove + err = fs.RemoveFile(iRODSPath, true) + assert.NoError(t, err) + + err = os.Remove(localDownloadPath) + assert.NoError(t, err) + } + + err = os.Remove(localPath) + assert.NoError(t, err) +} diff --git a/test/testcases/fs_test.go b/test/testcases/fs_test.go index e402ddc..5266634 100644 --- a/test/testcases/fs_test.go +++ b/test/testcases/fs_test.go @@ -4,6 +4,7 @@ import ( "crypto/sha1" "encoding/hex" "fmt" + "io" "testing" "github.com/cyverse/go-irodsclient/fs" @@ -139,13 +140,14 @@ func testReadWrite(t *testing.T) { newHandle, err := filesystem.OpenFile(newDataObjectPath, "", "r") assert.NoError(t, err) - readData, err := newHandle.Read(1024) - assert.NoError(t, err) + buffer := make([]byte, 1024) + readLen, err := newHandle.Read(buffer) + assert.Equal(t, io.EOF, err) err = newHandle.Close() assert.NoError(t, err) - assert.Equal(t, text, string(readData)) + assert.Equal(t, text, string(buffer[:readLen])) // delete err = filesystem.RemoveFile(newDataObjectPath, true) @@ -196,13 +198,14 @@ func testCreateStat(t *testing.T) { newHandle, err := filesystem.OpenFile(newDataObjectPath, "", "r") assert.NoError(t, err) - readData, err := newHandle.Read(1024) - assert.NoError(t, err) + buffer := make([]byte, 1024) + readLen, err := newHandle.Read(buffer) + assert.Equal(t, io.EOF, err) err = newHandle.Close() assert.NoError(t, err) - assert.Equal(t, text, string(readData)) + assert.Equal(t, text, string(buffer[:readLen])) // delete err = filesystem.RemoveFile(newDataObjectPath, true) @@ -257,13 +260,14 @@ func testWriteRename(t *testing.T) { newHandle, err := filesystem.OpenFile(newDataObjectPathRenameTarget, "", "r") assert.NoError(t, err) - readData, err := newHandle.Read(1024) - assert.NoError(t, err) + buffer := make([]byte, 1024) + readLen, err := newHandle.Read(buffer) + assert.Equal(t, io.EOF, err) err = newHandle.Close() assert.NoError(t, err) - assert.Equal(t, text1+text2, string(readData)) + assert.Equal(t, text1+text2, string(buffer[:readLen])) // delete err = filesystem.RemoveFile(newDataObjectPathRenameTarget, true) @@ -324,13 +328,14 @@ func testWriteRenameDir(t *testing.T) { newHandle, err := filesystem.OpenFile(newDataObjectPathRenameTarget, "", "r") assert.NoError(t, err) - readData, err := newHandle.Read(1024) - assert.NoError(t, err) + buffer := make([]byte, 1024) + readLen, err := newHandle.Read(buffer) + assert.Equal(t, io.EOF, err) err = newHandle.Close() assert.NoError(t, err) - assert.Equal(t, text1+text2, string(readData)) + assert.Equal(t, text1+text2, string(buffer[:readLen])) // delete err = filesystem.RemoveFile(newDataObjectPathRenameTarget, true) diff --git a/test/testcases/main.go b/test/testcases/main.go index 4baa6aa..17a1dbc 100644 --- a/test/testcases/main.go +++ b/test/testcases/main.go @@ -87,6 +87,39 @@ func GetTestDirs() []string { return testDirs } +func createLocalTestFile(name string, size int64) (string, error) { + testval := "abcdefghijklmnop" // 16 + // fill + dataBuf := make([]byte, 1024) + i := 0 + for i < len(dataBuf) { + copy(dataBuf[i:], testval) + i += len(testval) + } + + f, err := ioutil.TempFile("", name) + if err != nil { + return "", err + } + + tempPath := f.Name() + + defer f.Close() + + totalWriteLen := int64(0) + for totalWriteLen < size { + writeLen, err := f.Write(dataBuf) + if err != nil { + os.Remove(tempPath) + return "", err + } + + totalWriteLen += int64(writeLen) + } + + return tempPath, nil +} + func testPrepareSamples(t *testing.T) { account := GetTestAccount()