Skip to content

Commit

Permalink
Merge branch 'develop' into release-8.3.2
Browse files Browse the repository at this point in the history
  • Loading branch information
mesmith75 authored Jun 30, 2020
2 parents 4c03c70 + 7b20f6d commit 37f8a4e
Showing 6 changed files with 55 additions and 7 deletions.
16 changes: 13 additions & 3 deletions ganga/GangaCore/GPIDev/Lib/File/OutputFileManager.py
Original file line number Diff line number Diff line change
@@ -230,18 +230,28 @@ def doIHaveInputFiles(job):

if job.inputdata:
if job.inputdata and isType(job.inputdata, GangaDataset):
inputfiles_list += job.inputdata.files
for _f in job.inputdata:
try:
if not _f.accessURL():
inputfiles_list.append(_f)
except NotImplementedError:
inputfiles_list.append(_f)

elif job.master is not None:
if job.master.inputdata and isType(job.master.inputdata, GangaDataset):
inputfiles_list += job.master.inputdata.files
for _f in job.master.inputdata:
try:
if not _f.accessURL():
inputfiles_list.append(_f)
except NotImplementedError:
inputfiles_list.append(_f)

if job.virtualization and isinstance(job.virtualization.image, IGangaFile):
inputfiles_list.append(job.virtualization.image)

for inputFile in inputfiles_list:

inputfileClassName = getName(inputFile)

if outputFilePostProcessingOnWN(job, inputfileClassName):
inputFile.processWildcardMatches()
if inputFile.subfiles:
Original file line number Diff line number Diff line change
@@ -98,7 +98,7 @@

global_random = random

LFN_parallel_limit = 250.
LFN_parallel_limit = 2500.
def wrapped_execute(command, expected_type, new_subprocess = False):
"""
A wrapper around execute to protect us from commands which had errors
2 changes: 2 additions & 0 deletions ganga/GangaLHCb/LHCb.ini
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@ DiracCommandFiles = ["@{GANGA_PYTHONPATH}/GangaLHCb/Lib/Server/DiracLHCbDefiniti
"@{GANGA_PYTHONPATH}/GangaDirac/Lib/Server/DiracCommands.py",
"@{GANGA_PYTHONPATH}/GangaLHCb/Lib/Server/DiracLHCbCommands.py"]

DiracEnvSource = nonsense

[Output]
ForbidLegacyInput = True
ForbidLegacyOutput = True
33 changes: 32 additions & 1 deletion ganga/GangaLHCb/Lib/Backends/Dirac.py
Original file line number Diff line number Diff line change
@@ -6,10 +6,13 @@
from GangaLHCb.Lib.LHCbDataset.LHCbDataset import LHCbDataset
from GangaCore.GPIDev.Base.Proxy import GPIProxyObjectFactory
from GangaDirac.Lib.Utilities.DiracUtilities import execute
from GangaDirac.Lib.Splitters.OfflineGangaDiracSplitter import getLFNReplicas, LFN_parallel_limit
import GangaCore.Utility.logging
logger = GangaCore.Utility.logging.getLogger()
from GangaCore.GPIDev.Credentials import require_credential

from GangaCore.Core.GangaThread.WorkerThreads import getQueues
import math
import time

class Dirac(DiracBase):
_schema = DiracBase._schema.inherit_copy()
@@ -89,5 +92,33 @@ def getLFNMetadata(lfns, credential_requirements=None):

return returnDict

def filterLFNsBySE(lfns, site):
'''Filter the given list of LFNs to those with a replica at the given SE'''
#First get all the replicas
logger.info('Selecting LFNs with replicas at %s. Note missing LFNs are ignored!' % site)
reps = {}
# Request the replicas for all LFN 'LFN_parallel_limit' at a time to not overload the
# server and give some feedback as this is going on
for i in range(int(math.ceil(float(len(lfns)) / LFN_parallel_limit))):

getQueues()._monitoring_threadpool.add_function(getLFNReplicas, (lfns, i, reps))

while len(reps) != int(math.ceil(float(len(lfns)) / LFN_parallel_limit)):
time.sleep(1.)
# This can take a while so lets protect any repo locks
import GangaCore.Runtime.Repository_runtime
GangaCore.Runtime.Repository_runtime.updateLocksNow()
outLFNs = []
#reps is a dict of dicts of dicts with keys the index from the thread, 'Successful', LFN, then the SEs, then the values are the PFNs. Pick out the LFNs we want
for _index in reps.keys():
for _lfn, _replicas in reps[_index]['Successful'].items():
if site in _replicas.keys():
outLFNs.append(_lfn)

return outLFNs



from GangaCore.Runtime.GPIexport import exportToGPI
exportToGPI('getLFNMetadata', getLFNMetadata, 'Functions')
exportToGPI('filterLFNsBySE', filterLFNsBySE, 'Functions')
7 changes: 6 additions & 1 deletion ganga/GangaLHCb/Lib/LHCbDataset/BKQuery.py
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@
from GangaDirac.Lib.Files.DiracFile import DiracFile
from GangaCore.Utility.logging import getLogger
from GangaLHCb.Lib.LHCbDataset import LHCbDataset, LHCbCompressedDataset
from GangaLHCb.Lib.Backends.Dirac import filterLFNsBySE
logger = getLogger()
#\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\/\#

@@ -144,7 +145,7 @@ def getDatasetMetadata(self):
return {'OK': False, 'Value': metadata}

@require_credential
def getDataset(self, compressed = True):
def getDataset(self, compressed = True, SE = None):
'''Gets the dataset from the bookkeeping for current path, etc.'''
if not self.path:
return None
@@ -175,6 +176,10 @@ def getDataset(self, compressed = True):
if not type(files) is list:
files = list(files.keys())

if SE:
tempFiles = filterLFNsBySE(files, SE)
files = tempFiles

logger.debug("Creating dataset")

if compressed:
2 changes: 1 addition & 1 deletion ganga/GangaLHCb/Lib/Splitters/GaussSplitter.py
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ def split(self, job):
opts += 'from Configurables import LHCbApp \n'
opts += 'LHCbApp().EvtMax = %d\n' % self.eventsPerJob
opts += 'GenInit("GaussGen").FirstEventNumber = %d\n' % first
spillOver = ["GaussGenPrev", "GaussGenPrevPrev", "GaussGenNext"]
spillOver = ["GaussGenPrev", "GaussGenPrevPrev", "GaussGenNext", "GaussGenNextNext"]
for s in spillOver:
opts += 'GenInit("%s").FirstEventNumber = %d\n' % (s, first)
#j.application.extra.input_buffers['data.py'] += opts

0 comments on commit 37f8a4e

Please sign in to comment.