Skip to content

Commit

Permalink
Merge pull request PanDAWMS#382 from PanDAWMS/vulture_test
Browse files Browse the repository at this point in the history
Clean up of obsolete code
  • Loading branch information
fbarreir authored Jul 30, 2024
2 parents b8e3fa3 + eb68219 commit 7d50c47
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 2,541 deletions.
3 changes: 0 additions & 3 deletions pandaserver/brokerage/ErrorCode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,3 @@

# release is not found
EC_Release = 100

# voms authentication failure
EC_Voms = 101
10 changes: 1 addition & 9 deletions pandaserver/brokerage/SiteMapper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import copy
import re
import sys
import traceback

from pandacommon.pandalogger.PandaLogger import PandaLogger

from pandaserver.config import panda_config
from pandaserver.dataservice.DataServiceUtils import select_scope
from pandaserver.taskbuffer.NucleusSpec import NucleusSpec
Expand Down Expand Up @@ -361,11 +361,3 @@ def getNucleus(self, tmpName):
if tmpName in self.satellites:
return self.satellites[tmpName]
return None

# get nucleus with ddm endpoint
def getNucleusWithDdmEndpoint(self, ddmEndpoint):
for nucleusName in self.nuclei:
nucleusSpec = self.nuclei[nucleusName]
if nucleusSpec.isAssociatedEndpoint(ddmEndpoint):
return nucleusName
return None
1,022 changes: 494 additions & 528 deletions pandaserver/brokerage/broker.py

Large diffs are not rendered by default.

12 changes: 1 addition & 11 deletions pandaserver/configurator/Configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ def retrieve_data(self):
self.log_stream.error("The site dump was not retrieved correctly")
return False
self.log_stream.debug("Done")
self.site_endpoint_dict = self.get_site_endpoint_dictionary()

self.log_stream.debug("Getting DDM endpoints dump...")
self.endpoint_dump = aux.get_dump(self.CRIC_URL_DDMENDPOINTS)
if not self.endpoint_dump:
self.log_stream.error("The endpoint dump was not retrieved correctly")
return False
self.log_stream.debug("Done")

self.log_stream.debug("Parsing endpoints...")
self.endpoint_token_dict = self.parse_endpoints()
self.log_stream.debug("Done")
Expand Down Expand Up @@ -158,16 +158,6 @@ def parse_endpoints(self):

return endpoint_token_dict

def get_site_endpoint_dictionary(self):
"""
Converts the CRIC site dump into a site dictionary containing the list of DDM endpoints for each site
"""
site_to_endpoints_dict = {}
for site, site_config in self.site_dump.items():
site_to_endpoints_dict[site] = list(site_config["ddmendpoints"])

return site_to_endpoints_dict

def process_site_dumps(self):
"""
Parses the CRIC site and endpoint dumps and prepares a format loadable to the DB
Expand Down
6 changes: 3 additions & 3 deletions pandaserver/configurator/aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from urllib.parse import urlparse

import requests

from pandaserver.config import panda_config

GB = 1024**3
Expand All @@ -16,7 +17,6 @@
PACKETLOSS = "packetloss"
DONE = "done"
QUEUED = "queued"
TOTAL = "total"
LATEST = "latest"
H1 = "1h"
H6 = "6h"
Expand Down Expand Up @@ -47,7 +47,7 @@ def get_dump(url):
else:
cert = None
ca_certs = False
for i in range(1, 4): # 3 retries
for _ in range(1, 4): # 3 retries
try:
r = requests.get(url, cert=cert, verify=ca_certs, timeout=REQUESTS_TIMEOUT)
if r.status_code == requests.codes.ok:
Expand All @@ -63,7 +63,7 @@ def query_grafana_proxy(query, bearer_token):
"Authorization": f"Bearer {bearer_token}",
}
grafana_proxy_url = "https://monit-grafana.cern.ch/api/datasources/proxy/10349/_msearch"
for i in range(1, 4): # 3 retries
for _ in range(1, 4): # 3 retries
try:
r = requests.post(grafana_proxy_url, data=query, headers=headers)
if r.status_code == requests.codes.ok:
Expand Down
6 changes: 3 additions & 3 deletions pandaserver/daemons/scripts/add_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import time
import traceback

import pandaserver.taskbuffer.ErrorCode
import pandaserver.userinterface.Client as Client
from pandacommon.pandalogger.LogWrapper import LogWrapper
from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.thread_utils import GenericThread

import pandaserver.taskbuffer.ErrorCode
import pandaserver.userinterface.Client as Client
from pandaserver.brokerage.SiteMapper import SiteMapper
from pandaserver.config import panda_config
from pandaserver.srvcore.CoreUtils import commands_get_status_output
Expand Down Expand Up @@ -132,7 +133,6 @@ def main(argv=tuple(), tbuf=None, **kwargs):
tmpMethod = match.group(2)
tmpSite = match.group(3)
tmpNode = match.group(4)
tmpType = match.group(5)

# protection against corrupted entries from pilot,
# e.g. pilot reading site json from cvmfs while it was being updated
Expand Down
5 changes: 1 addition & 4 deletions pandaserver/daemons/scripts/cache_schedconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from pandacommon.pandalogger.PandaLogger import PandaLogger
from pandacommon.pandautils.thread_utils import GenericThread

from pandaserver.config import panda_config

# logger
Expand Down Expand Up @@ -199,10 +200,6 @@ def dumpSingleQueue(self, queueDict, dest="/tmp", outputSet="all", format="txt")
except Exception:
raise

def dumpQueues(self, queueArray, dest="/tmp", outputSet="all", format="txt"):
for queueDict in queueArray:
self.dumpSingleQueue(queueArray, dest, outputSet, format)

def queueDictPythonise(self, queueDict, deepCopy=True):
"""Turn queue dictionary with SQL text fields into a more stuctured python representation"""
if deepCopy:
Expand Down
2 changes: 1 addition & 1 deletion pandaserver/daemons/scripts/copyArchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ def _memoryCheck(str):
sql = "SELECT distinct jediTaskID FROM ATLAS_PANDA.jobsActive4 "
sql += "WHERE prodSourceLabel=:prodSourceLabel AND jobStatus=:jobStatus and modificationTime<:timeLimit "
tmp, res = taskBuffer.querySQLS(sql, varMap)
checkedDS = set()

for (jediTaskID,) in res:
varMap = {}
varMap[":jediTaskID"] = jediTaskID
Expand Down
19 changes: 10 additions & 9 deletions pandaserver/dataservice/AdderAtlasPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,6 @@
import time
import traceback

from pandaserver.config import panda_config
from pandaserver.dataservice import DataServiceUtils, ErrorCode
from pandaserver.dataservice.AdderPluginBase import AdderPluginBase
from pandaserver.dataservice.DataServiceUtils import select_scope
from pandaserver.dataservice.ddm import rucioAPI
from pandaserver.srvcore.MailUtils import MailUtils
from pandaserver.taskbuffer import EventServiceUtils, JobUtils
from rucio.common.exception import (
DataIdentifierNotFound,
FileConsistencyMismatch,
Expand All @@ -30,6 +23,14 @@
UnsupportedOperation,
)

from pandaserver.config import panda_config
from pandaserver.dataservice import DataServiceUtils, ErrorCode
from pandaserver.dataservice.AdderPluginBase import AdderPluginBase
from pandaserver.dataservice.DataServiceUtils import select_scope
from pandaserver.dataservice.ddm import rucioAPI
from pandaserver.srvcore.MailUtils import MailUtils
from pandaserver.taskbuffer import EventServiceUtils, JobUtils


class AdderAtlasPlugin(AdderPluginBase):
# constructor
Expand Down Expand Up @@ -706,7 +707,7 @@ def _updateOutputs(self):
},
)
)
for iDDMTry in range(3):
for _ in range(3):
isFailed = False
try:
status = rucioAPI.register_dataset_subscription(
Expand Down Expand Up @@ -759,7 +760,7 @@ def _updateOutputs(self):
{"lifetime": "14 days"},
)
)
for iDDMTry in range(3):
for _ in range(3):
out = "OK"
isFailed = False
try:
Expand Down
38 changes: 17 additions & 21 deletions pandaserver/dataservice/AdderGen.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import uuid
import xml.dom.minidom

import pandaserver.dataservice.ErrorCode
import pandaserver.taskbuffer.ErrorCode
from pandacommon.pandalogger.LogWrapper import LogWrapper
from pandacommon.pandalogger.PandaLogger import PandaLogger

import pandaserver.dataservice.ErrorCode
import pandaserver.taskbuffer.ErrorCode
from pandaserver.config import panda_config
from pandaserver.dataservice import closer
from pandaserver.taskbuffer import EventServiceUtils, JobUtils, retryModule
Expand Down Expand Up @@ -664,25 +665,20 @@ def parseXML(self):
if fullLFN is not None:
fullLfnMap[lfn] = fullLFN
except Exception:
# check if file exists
# if os.path.exists(self.xmlFile):
if True:
type, value, traceBack = sys.exc_info()
self.logger.error(f": {type} {value}")
# set failed anyway
self.job.jobStatus = "failed"
# XML error happens when pilot got killed due to wall-time limit or failures in wrapper
if (
(self.job.pilotErrorCode in [0, "0", "NULL"])
and (self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_WorkerDone])
and (self.job.transExitCode in [0, "0", "NULL"])
):
self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
self.job.ddmErrorDiag = "Could not get GUID/LFN/MD5/FSIZE/SURL from pilot XML"
return 2
else:
# XML was deleted
return 1
type, value, traceBack = sys.exc_info()
self.logger.error(f": {type} {value}")
# set failed anyway
self.job.jobStatus = "failed"
# XML error happens when pilot got killed due to wall-time limit or failures in wrapper
if (
(self.job.pilotErrorCode in [0, "0", "NULL"])
and (self.job.taskBufferErrorCode not in [pandaserver.taskbuffer.ErrorCode.EC_WorkerDone])
and (self.job.transExitCode in [0, "0", "NULL"])
):
self.job.ddmErrorCode = pandaserver.dataservice.ErrorCode.EC_Adder
self.job.ddmErrorDiag = "Could not get GUID/LFN/MD5/FSIZE/SURL from pilot XML"
return 2

# parse metadata to get nEvents
nEventsFrom = None
try:
Expand Down
34 changes: 8 additions & 26 deletions pandaserver/dataservice/DataServiceUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import sys

from OpenSSL import crypto

from pandaserver.taskbuffer import JobUtils


Expand Down Expand Up @@ -168,19 +169,7 @@ def get_endpoints_at_nucleus(tmpRepMap, siteMapper, cloudName):
return retList


# check DDM response
def isDQ2ok(out):
if (
out.find("DQ2 internal server exception") != -1
or out.find("An error occurred on the central catalogs") != -1
or out.find("MySQL server has gone away") != -1
or out == "()"
):
return False
return True


# check if DBR
# check if the dataset is a DB release
def isDBR(datasetName):
if datasetName.startswith("ddo"):
return True
Expand All @@ -204,19 +193,6 @@ def getDatasetType(dataset):
return datasetType


# check certificate
def checkCertificate(certName):
try:
cert = crypto.load_certificate(crypto.FILETYPE_PEM, open(certName).read())
if cert.has_expired() is True:
return False, f"{certName} expired"
else:
return True, None
except Exception:
errtype, errvalue = sys.exc_info()[:2]
return False, f"{errtype.__name__}:{errvalue}"


# get sites which share DDM endpoint
def getSitesShareDDM(siteMapper, siteName, prodSourceLabel, job_label, output_share=False):
# output_share: False to get sites which use the output RSE as input, True to get sites which use
Expand Down Expand Up @@ -334,6 +310,7 @@ def select_scope(site_spec, prodsourcelabel, job_label):

return scope_input, scope_output


def is_top_level_dataset(dataset_name: str) -> bool:
"""
Check if top dataset
Expand All @@ -346,6 +323,7 @@ def is_top_level_dataset(dataset_name: str) -> bool:
"""
return re.sub("_sub\d+$", "", dataset_name) == dataset_name


def is_sub_dataset(dataset_name: str) -> bool:
"""
Check if the dataset name ends with '_sub' followed by one or more digits.
Expand All @@ -358,6 +336,7 @@ def is_sub_dataset(dataset_name: str) -> bool:
"""
return re.search("_sub\d+$", dataset_name) is not None


def is_tid_dataset(destination_data_block: str) -> bool:
"""
Check if the destination data block ends with '_tid' followed by one or more digits.
Expand All @@ -370,6 +349,7 @@ def is_tid_dataset(destination_data_block: str) -> bool:
"""
return re.search("_tid[\d_]+$", destination_data_block) is not None


def is_hammercloud_dataset(destination_data_block: str) -> bool:
"""
Check if the destination data block starts with 'hc_test.'.
Expand All @@ -382,6 +362,7 @@ def is_hammercloud_dataset(destination_data_block: str) -> bool:
"""
return re.search("^hc_test\.", destination_data_block) is not None


def is_user_gangarbt_dataset(destination_data_block: str) -> bool:
"""
Check if the destination data block starts with 'user.gangarbt.'.
Expand All @@ -394,6 +375,7 @@ def is_user_gangarbt_dataset(destination_data_block: str) -> bool:
"""
return re.search("^user\.gangarbt\.", destination_data_block) is not None


def is_lib_dataset(destination_data_block: str) -> bool:
"""
Check if the destination data block ends with '.lib'.
Expand Down
3 changes: 0 additions & 3 deletions pandaserver/dataservice/ErrorCode.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
# location registration failures
EC_Location = 203

# lost file (taskbuffer.ErrorCode.EC_LostFile)
EC_LostFile = 110

# missing nEvents
EC_MissingNumEvents = 204

Expand Down
Loading

0 comments on commit 7d50c47

Please sign in to comment.