Skip to content

Commit

Permalink
Fix possible issue on get/list data objects with master replica
Browse files Browse the repository at this point in the history
It may not have replica 0 if it is deleted. A new replica will have the highest replica number.
  • Loading branch information
iychoi committed Mar 4, 2022
1 parent 40d1ce0 commit bc7fec4
Showing 1 changed file with 138 additions and 32 deletions.
170 changes: 138 additions & 32 deletions irods/fs/data_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func GetDataObject(conn *connection.IRODSConnection, collection *types.IRODSColl
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand Down Expand Up @@ -166,7 +166,7 @@ func GetDataObject(conn *connection.IRODSConnection, collection *types.IRODSColl
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -200,6 +200,10 @@ func GetDataObject(conn *connection.IRODSConnection, collection *types.IRODSColl
}
}

if len(dataObjects) == 0 {
return nil, types.NewFileNotFoundErrorf("could not find a data object")
}

// merge data objects per file
mergedDataObjectsMap := map[int64]*types.IRODSDataObject{}
for _, object := range dataObjects {
Expand All @@ -213,17 +217,12 @@ func GetDataObject(conn *connection.IRODSConnection, collection *types.IRODSColl
}
}

// convert map to array
mergedDataObjects := []*types.IRODSDataObject{}
for _, object := range mergedDataObjectsMap {
mergedDataObjects = append(mergedDataObjects, object)
// returns only the first object
return object, nil
}

if len(mergedDataObjects) == 0 {
return nil, types.NewFileNotFoundErrorf("could not find a data object")
}

return mergedDataObjects[0], nil
return nil, types.NewFileNotFoundErrorf("could not find a data object")
}

// GetDataObjectMasterReplica returns a data object for the path, returns only master replica
Expand All @@ -249,7 +248,7 @@ func GetDataObjectMasterReplica(conn *connection.IRODSConnection, collection *ty
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand All @@ -260,7 +259,7 @@ func GetDataObjectMasterReplica(conn *connection.IRODSConnection, collection *ty
query.AddCondition(common.ICAT_COLUMN_COLL_NAME, collCondVal)
pathCondVal := fmt.Sprintf("= '%s'", filename)
query.AddCondition(common.ICAT_COLUMN_DATA_NAME, pathCondVal)
query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'")
query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'")

queryResult := message.IRODSMessageQueryResult{}
err := conn.Request(query, &queryResult)
Expand Down Expand Up @@ -345,7 +344,7 @@ func GetDataObjectMasterReplica(conn *connection.IRODSConnection, collection *ty
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -383,7 +382,33 @@ func GetDataObjectMasterReplica(conn *connection.IRODSConnection, collection *ty
return nil, types.NewFileNotFoundErrorf("could not find a data object")
}

return dataObjects[0], nil
// merge data objects per file
mergedDataObjectsMap := map[int64]*types.IRODSDataObject{}
for _, object := range dataObjects {
existingObj, exists := mergedDataObjectsMap[object.ID]
if exists {
// compare and replace
if len(existingObj.Replicas) == 0 {
// replace
mergedDataObjectsMap[object.ID] = object
} else if len(object.Replicas) > 0 {
if existingObj.Replicas[0].CreateTime.After(object.Replicas[0].CreateTime) {
// found old replica (meaning master) - replace
mergedDataObjectsMap[object.ID] = object
}
}
} else {
// add
mergedDataObjectsMap[object.ID] = object
}
}

for _, object := range mergedDataObjectsMap {
// returns only the first object
return object, nil
}

return nil, types.NewFileNotFoundErrorf("could not find a data object")
}

// ListDataObjects lists data objects in the given collection
Expand All @@ -409,7 +434,7 @@ func ListDataObjects(conn *connection.IRODSConnection, collection *types.IRODSCo
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand Down Expand Up @@ -504,7 +529,7 @@ func ListDataObjects(conn *connection.IRODSConnection, collection *types.IRODSCo
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -583,7 +608,7 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection *
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand All @@ -592,7 +617,7 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection *

collCondVal := fmt.Sprintf("= '%s'", collection.Path)
query.AddCondition(common.ICAT_COLUMN_COLL_NAME, collCondVal)
query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'")
query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'")

queryResult := message.IRODSMessageQueryResult{}
err := conn.Request(query, &queryResult)
Expand Down Expand Up @@ -679,7 +704,7 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection *
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -713,7 +738,34 @@ func ListDataObjectsMasterReplica(conn *connection.IRODSConnection, collection *
}
}

return dataObjects, nil
// merge data objects per file
mergedDataObjectsMap := map[int64]*types.IRODSDataObject{}
for _, object := range dataObjects {
existingObj, exists := mergedDataObjectsMap[object.ID]
if exists {
// compare and replace
if len(existingObj.Replicas) == 0 {
// replace
mergedDataObjectsMap[object.ID] = object
} else if len(object.Replicas) > 0 {
if existingObj.Replicas[0].CreateTime.After(object.Replicas[0].CreateTime) {
// found old replica (meaning master) - replace
mergedDataObjectsMap[object.ID] = object
}
}
} else {
// add
mergedDataObjectsMap[object.ID] = object
}
}

// convert map to array
mergedDataObjects := []*types.IRODSDataObject{}
for _, object := range mergedDataObjectsMap {
mergedDataObjects = append(mergedDataObjects, object)
}

return mergedDataObjects, nil
}

// ListDataObjectMeta returns a data object metadata for the path
Expand Down Expand Up @@ -1392,7 +1444,7 @@ func SearchDataObjectsByMeta(conn *connection.IRODSConnection, metaName string,
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand Down Expand Up @@ -1505,7 +1557,7 @@ func SearchDataObjectsByMeta(conn *connection.IRODSConnection, metaName string,
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -1586,7 +1638,7 @@ func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, meta
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand All @@ -1597,7 +1649,7 @@ func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, meta
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal)
metaValueCondVal := fmt.Sprintf("= '%s'", metaValue)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal)
query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'")
query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'")

queryResult := message.IRODSMessageQueryResult{}
err := conn.Request(query, &queryResult)
Expand Down Expand Up @@ -1700,7 +1752,7 @@ func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, meta
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -1734,7 +1786,34 @@ func SearchDataObjectsMasterReplicaByMeta(conn *connection.IRODSConnection, meta
}
}

return dataObjects, nil
// merge data objects per file
mergedDataObjectsMap := map[int64]*types.IRODSDataObject{}
for _, object := range dataObjects {
existingObj, exists := mergedDataObjectsMap[object.ID]
if exists {
// compare and replace
if len(existingObj.Replicas) == 0 {
// replace
mergedDataObjectsMap[object.ID] = object
} else if len(object.Replicas) > 0 {
if existingObj.Replicas[0].CreateTime.After(object.Replicas[0].CreateTime) {
// found old replica (meaning master) - replace
mergedDataObjectsMap[object.ID] = object
}
}
} else {
// add
mergedDataObjectsMap[object.ID] = object
}
}

// convert map to array
mergedDataObjects := []*types.IRODSDataObject{}
for _, object := range mergedDataObjectsMap {
mergedDataObjects = append(mergedDataObjects, object)
}

return mergedDataObjects, nil
}

// SearchDataObjectsByMetaWildcard searches data objects by metadata
Expand Down Expand Up @@ -1763,7 +1842,7 @@ func SearchDataObjectsByMetaWildcard(conn *connection.IRODSConnection, metaName
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand Down Expand Up @@ -1876,7 +1955,7 @@ func SearchDataObjectsByMetaWildcard(conn *connection.IRODSConnection, metaName
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -1958,7 +2037,7 @@ func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnecti
query.AddSelect(common.ICAT_COLUMN_DATA_REPL_NUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_OWNER_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_CHECKSUM, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_REPL_STATUS, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_NAME, 1)
query.AddSelect(common.ICAT_COLUMN_D_DATA_PATH, 1)
query.AddSelect(common.ICAT_COLUMN_D_RESC_HIER, 1)
Expand All @@ -1969,7 +2048,7 @@ func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnecti
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_NAME, metaNameCondVal)
metaValueCondVal := fmt.Sprintf("like '%s'", metaValue)
query.AddCondition(common.ICAT_COLUMN_META_DATA_ATTR_VALUE, metaValueCondVal)
query.AddCondition(common.ICAT_COLUMN_DATA_REPL_NUM, "= '0'")
query.AddCondition(common.ICAT_COLUMN_D_REPL_STATUS, "= '1'")

queryResult := message.IRODSMessageQueryResult{}
err := conn.Request(query, &queryResult)
Expand Down Expand Up @@ -2072,7 +2151,7 @@ func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnecti
pagenatedDataObjects[row].Replicas[0].Owner = value
case int(common.ICAT_COLUMN_D_DATA_CHECKSUM):
pagenatedDataObjects[row].Replicas[0].CheckSum = value
case int(common.ICAT_COLUMN_D_DATA_STATUS):
case int(common.ICAT_COLUMN_D_REPL_STATUS):
pagenatedDataObjects[row].Replicas[0].Status = value
case int(common.ICAT_COLUMN_D_RESC_NAME):
pagenatedDataObjects[row].Replicas[0].ResourceName = value
Expand Down Expand Up @@ -2106,7 +2185,34 @@ func SearchDataObjectsMasterReplicaByMetaWildcard(conn *connection.IRODSConnecti
}
}

return dataObjects, nil
// merge data objects per file
mergedDataObjectsMap := map[int64]*types.IRODSDataObject{}
for _, object := range dataObjects {
existingObj, exists := mergedDataObjectsMap[object.ID]
if exists {
// compare and replace
if len(existingObj.Replicas) == 0 {
// replace
mergedDataObjectsMap[object.ID] = object
} else if len(object.Replicas) > 0 {
if existingObj.Replicas[0].CreateTime.After(object.Replicas[0].CreateTime) {
// found old replica (meaning master) - replace
mergedDataObjectsMap[object.ID] = object
}
}
} else {
// add
mergedDataObjectsMap[object.ID] = object
}
}

// convert map to array
mergedDataObjects := []*types.IRODSDataObject{}
for _, object := range mergedDataObjectsMap {
mergedDataObjects = append(mergedDataObjects, object)
}

return mergedDataObjects, nil
}

// ChangeAccessControlDataObject changes access control on a data object.
Expand Down

0 comments on commit bc7fec4

Please sign in to comment.