Skip to content

Commit

Permalink
Implement search data objects or collections by metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
iychoi committed Mar 22, 2021
1 parent 762d17f commit dad6937
Show file tree
Hide file tree
Showing 3 changed files with 679 additions and 1 deletion.
242 changes: 242 additions & 0 deletions irods/fs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,245 @@ func DeleteCollectionMeta(conn *connection.IRODSConnection, path string, metadat
err = response.CheckError()
return err
}

// SearchCollectionsByMeta searches collections by metadata
func SearchCollectionsByMeta(conn *connection.IRODSConnection, metaName string, metaValue string) ([]*types.IRODSCollection, error) {
if conn == nil || !conn.IsConnected() {
return nil, fmt.Errorf("connection is nil or disconnected")
}

collections := []*types.IRODSCollection{}

continueQuery := true
continueIndex := 0
for continueQuery {
query := message.NewIRODSMessageQuery(common.MaxQueryRows, continueIndex, 0, 0)
query.AddSelect(common.ICAT_COLUMN_COLL_ID, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_CREATE_TIME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_MODIFY_TIME, 1)

metaNameCondVal := fmt.Sprintf("= '%s'", metaName)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal)
metaValueCondVal := fmt.Sprintf("= '%s'", metaValue)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal)

queryMessage, err := query.GetMessage()
if err != nil {
return nil, fmt.Errorf("Could not make a collection query message - %v", err)
}

err = conn.SendMessage(queryMessage)
if err != nil {
return nil, fmt.Errorf("Could not send a collection query message - %v", err)
}

// Server responds with results
queryResultMessage, err := conn.ReadMessage()
if err != nil {
return nil, fmt.Errorf("Could not receive a collection query result message - %v", err)
}

queryResult := message.IRODSMessageQueryResult{}
err = queryResult.FromMessage(queryResultMessage)
if err != nil {
return nil, fmt.Errorf("Could not receive a collection query result message - %v", err)
}

if queryResult.RowCount == 0 {
break
}

if queryResult.AttributeCount > len(queryResult.SQLResult) {
return nil, fmt.Errorf("Could not receive collection attributes - requires %d, but received %d attributes", queryResult.AttributeCount, len(queryResult.SQLResult))
}

pagenatedCollections := make([]*types.IRODSCollection, queryResult.RowCount, queryResult.RowCount)

for attr := 0; attr < queryResult.AttributeCount; attr++ {
sqlResult := queryResult.SQLResult[attr]
if len(sqlResult.Values) != queryResult.RowCount {
return nil, fmt.Errorf("Could not receive collection rows - requires %d, but received %d attributes", queryResult.RowCount, len(sqlResult.Values))
}

for row := 0; row < queryResult.RowCount; row++ {
value := sqlResult.Values[row]

if pagenatedCollections[row] == nil {
// create a new
pagenatedCollections[row] = &types.IRODSCollection{
ID: -1,
Path: "",
Name: "",
Owner: "",
CreateTime: time.Time{},
ModifyTime: time.Time{},
}
}

switch sqlResult.AttributeIndex {
case int(common.ICAT_COLUMN_COLL_ID):
cID, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, fmt.Errorf("Could not parse collection id - %s", value)
}
pagenatedCollections[row].ID = cID
case int(common.ICAT_COLUMN_COLL_NAME):
pagenatedCollections[row].Path = value
pagenatedCollections[row].Name = util.GetIRODSPathFileName(value)
case int(common.ICAT_COLUMN_COLL_OWNER_NAME):
pagenatedCollections[row].Owner = value
case int(common.ICAT_COLUMN_COLL_CREATE_TIME):
cT, err := util.GetIRODSDateTime(value)
if err != nil {
return nil, fmt.Errorf("Could not parse create time - %s", value)
}
pagenatedCollections[row].CreateTime = cT
case int(common.ICAT_COLUMN_COLL_MODIFY_TIME):
mT, err := util.GetIRODSDateTime(value)
if err != nil {
return nil, fmt.Errorf("Could not parse modify time - %s", value)
}
pagenatedCollections[row].ModifyTime = mT
default:
// ignore
}
}
}

collections = append(collections, pagenatedCollections...)

continueIndex = queryResult.ContinueIndex
if continueIndex == 0 {
continueQuery = false
}
}

return collections, nil
}

// SearchCollectionsByMetaWildcard searches collections by metadata
// Caution: This is a very slow operation
func SearchCollectionsByMetaWildcard(conn *connection.IRODSConnection, metaName string, metaValue string) ([]*types.IRODSCollection, error) {
if conn == nil || !conn.IsConnected() {
return nil, fmt.Errorf("connection is nil or disconnected")
}

collections := []*types.IRODSCollection{}

continueQuery := true
continueIndex := 0
for continueQuery {
query := message.NewIRODSMessageQuery(common.MaxQueryRows, continueIndex, 0, 0)
query.AddSelect(common.ICAT_COLUMN_COLL_ID, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_CREATE_TIME, 1)
query.AddSelect(common.ICAT_COLUMN_COLL_MODIFY_TIME, 1)

metaNameCondVal := fmt.Sprintf("= '%s'", metaName)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal)
metaValueCondVal := fmt.Sprintf("like '%s'", metaValue)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal)

queryMessage, err := query.GetMessage()
if err != nil {
return nil, fmt.Errorf("Could not make a collection query message - %v", err)
}

err = conn.SendMessage(queryMessage)
if err != nil {
return nil, fmt.Errorf("Could not send a collection query message - %v", err)
}

// Server responds with results
queryResultMessage, err := conn.ReadMessage()
if err != nil {
return nil, fmt.Errorf("Could not receive a collection query result message - %v", err)
}

queryResult := message.IRODSMessageQueryResult{}
err = queryResult.FromMessage(queryResultMessage)
if err != nil {
return nil, fmt.Errorf("Could not receive a collection query result message - %v", err)
}

if queryResult.RowCount == 0 {
break
}

if queryResult.AttributeCount > len(queryResult.SQLResult) {
return nil, fmt.Errorf("Could not receive collection attributes - requires %d, but received %d attributes", queryResult.AttributeCount, len(queryResult.SQLResult))
}

pagenatedCollections := make([]*types.IRODSCollection, queryResult.RowCount, queryResult.RowCount)

for attr := 0; attr < queryResult.AttributeCount; attr++ {
sqlResult := queryResult.SQLResult[attr]
if len(sqlResult.Values) != queryResult.RowCount {
return nil, fmt.Errorf("Could not receive data object rows - requires %d, but received %d attributes", queryResult.RowCount, len(sqlResult.Values))
}

for attr := 0; attr < queryResult.AttributeCount; attr++ {
sqlResult := queryResult.SQLResult[attr]
if len(sqlResult.Values) != queryResult.RowCount {
return nil, fmt.Errorf("Could not receive collection rows - requires %d, but received %d attributes", queryResult.RowCount, len(sqlResult.Values))
}

for row := 0; row < queryResult.RowCount; row++ {
value := sqlResult.Values[row]

if pagenatedCollections[row] == nil {
// create a new
pagenatedCollections[row] = &types.IRODSCollection{
ID: -1,
Path: "",
Name: "",
Owner: "",
CreateTime: time.Time{},
ModifyTime: time.Time{},
}
}

switch sqlResult.AttributeIndex {
case int(common.ICAT_COLUMN_COLL_ID):
cID, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return nil, fmt.Errorf("Could not parse collection id - %s", value)
}
pagenatedCollections[row].ID = cID
case int(common.ICAT_COLUMN_COLL_NAME):
pagenatedCollections[row].Path = value
pagenatedCollections[row].Name = util.GetIRODSPathFileName(value)
case int(common.ICAT_COLUMN_COLL_OWNER_NAME):
pagenatedCollections[row].Owner = value
case int(common.ICAT_COLUMN_COLL_CREATE_TIME):
cT, err := util.GetIRODSDateTime(value)
if err != nil {
return nil, fmt.Errorf("Could not parse create time - %s", value)
}
pagenatedCollections[row].CreateTime = cT
case int(common.ICAT_COLUMN_COLL_MODIFY_TIME):
mT, err := util.GetIRODSDateTime(value)
if err != nil {
return nil, fmt.Errorf("Could not parse modify time - %s", value)
}
pagenatedCollections[row].ModifyTime = mT
default:
// ignore
}
}
}
}

collections = append(collections, pagenatedCollections...)

continueIndex = queryResult.ContinueIndex
if continueIndex == 0 {
continueQuery = false
}
}

return collections, nil
}
Loading

0 comments on commit dad6937

Please sign in to comment.