Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

[Py2Py3] Migrate Unified/checkor.py to Python3 #979

Open
wants to merge 20 commits into
base: python3-migration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion src/python/Services/DBS/DBSReader.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,48 @@ def _getBlockFileLumis(self, blocks: List[str], validFileOnly: bool = True) -> L
This function runs by default with multithreading on the param blocks, which is a
list of block names.
"""
return self.dbs.listFileLumis(block_name=blocks, validFileOnly=int(validFileOnly))
response = self.dbs.listFileLumis(block_name=blocks, validFileOnly=int(validFileOnly))

if isinstance(response[0]["lumi_section_num"], list):
self.logger.debug("Handling dbsapi.listFileLumis response from NEW DBS server")
return self._aggregateListFileLumis(response)

self.logger.debug("Handling dbsapi.listFileLumis response from OLD DBS server")
return response

def _aggregateListFileLumis(self, response: dict) -> List[dict]:
"""
The function to aggregate the list of file lumis
:param response: raw response of listFileLumis
:return: lumi section files
"""
aggregatedResponse = []
tmpDict = {}

for entry in response:
runNum = entry["run_num"]
lumiSectionNum = entry["lumi_section_num"]
logicalFileName = entry["logical_file_name"]
eventCount = entry["event_count"]

key = (runNum, logicalFileName)

if key in tmpDict:
tmpDict[key]["event_count"].append(eventCount)
else:
tmpDict[key] = {"event_count": [eventCount], "lumi_section_num": [lumiSectionNum]}

for key, value in tmpDict.iteritems():
aggregatedResponse.append(
{
"run_num": key[0],
"lumi_section_num": value["lumi_section_num"],
"logical_file_name": key[1],
"event_count": value["event_count"],
}
)

return aggregatedResponse

def getDBSStatus(self, dataset: str) -> str:
"""
Expand Down Expand Up @@ -477,3 +518,29 @@ def getBlocksLumisAndFilesForCaching(self, blocks: List[dict], validFileOnly: bo
except Exception as error:
self.logger.error("Failed to get lumi sections and files from DBS for given blocks")
self.logger.error(str(error))

def countDatasetFiles(self, dataset: str, skipInvalid: bool = False, onlyInvalid: bool = False) -> int:
"""
The function to count the number of files in a given dataset
:param dataset: dataset name
:param skipInvalid: if True skip invalid files, o/w include them
:param onlyInvalid: if True include only invalid files, o/w include all
:return: number of files
"""
try:
files = self.dbs.listFiles(dataset=dataset, detail=(skipInvalid or onlyInvalid))

mainLFN = "/".join(files[0]["logical_file_name"].split("/")[:3]) if files else ""

if skipInvalid:
files = [*filter(lambda file: file["is_file_valid"] == 1, files)]
elif onlyInvalid:
files = [*filter(lambda file: file["is_file_valid"] == 0, files)]

if (skipInvalid or onlyInvalid) and mainLFN:
return [*filter(lambda file: file["logical_file_name"].startswith(mainLFN), files)]
return files

except Exception as error:
self.logger.error("Failed to count files in dataset")
self.logger.error(str(error))
36 changes: 31 additions & 5 deletions src/python/Services/ReqMgr/ReqMgrWriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ def setWorkflowParam(self, wf: str, param: dict) -> bool:
:return: True if succeeded, False o/w
"""
try:
result = sendResponse(method="PUT", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["request"] + wf, param=param)
result = sendResponse(
method="PUT", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["request"] + wf, param=param
)
return any(item.get(wf) == "OK" for item in result["result"])

except Exception as error:
Expand All @@ -87,7 +89,7 @@ def setAgentConfig(self, agent: str, config: dict) -> bool:
"""
try:
result = sendResponse(
method= "PUT", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["agentConfig"] + agent, param=config
method="PUT", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["agentConfig"] + agent, param=config
)
return result["result"][0]["ok"]

Expand All @@ -102,8 +104,10 @@ def submitWorkflow(self, wfSchema: dict) -> bool:
:return: True if succeeded, False o/w
"""
try:
result = sendResponse(method= "POST", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["request"], param=wfSchema)
return result['result'][0]['request']
result = sendResponse(
method="POST", url=self.reqmgrUrl, endpoint=self.reqmgrEndpoint["request"], param=wfSchema
)
return result["result"][0]["request"]

except Exception as error:
self.logger.error("Failed to submit workflow in reqmgr")
Expand All @@ -118,10 +122,32 @@ def approveWorkflow(self, wf: str) -> bool:
"""
try:
result = sendResponse(
method="PUT", url=self.reqmgrUrl, endpoint=f"{self.reqmgrEndpoint['request']}/{wf}", param={"RequestStatus": "assignment-approved"}
method="PUT",
url=self.reqmgrUrl,
endpoint=f"{self.reqmgrEndpoint['request']}/{wf}",
param={"RequestStatus": "assignment-approved"},
)
return result

except Exception as error:
self.logger.error("Failed to approve workflow in reqmgr")
self.logger.error(str(error))

def closeoutWorkflow(self, wf: str, cascade: bool = False) -> bool:
"""
The function to close out a given workflow
:param wf: workflow name
:param cascade: if cascade or not
:return: True if succeeded, False o/w
"""
try:
result = sendResponse(
url=self.reqmgrUrl,
endpoint=f"{self.reqmgrEndpoint['request']}/{wf}",
param={"RequestStatus": "closed-out", "cascade": cascade},
)
return result["result"][0][wf] == "OK"

except Exception as error:
self.logger.error("Failed to close out workflow in reqmgr")
self.logger.error(str(error))
Loading