-
Notifications
You must be signed in to change notification settings - Fork 108
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Sitelists changes propagation to local workqueue #12245
base: master
Are you sure you want to change the base?
Fix Sitelists changes propagation to local workqueue #12245
Conversation
Jenkins results:
|
And with our latest commit we confirm with @mapellidario that it all works now. Here [1] is a diff between the Local WorkQueuElements for the workflow [1]
|
Jenkins results:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Todor, even though proposed changes looks good to me, I struggle to understand where updates should go. Originally I followed Alan's guidelines about pklFile and now you're changing them to load from local WQ couch database. I rather prefer Alan to provide proper guidelines how load/upload mechanism should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find a few concerns and requests along the code.
@@ -287,7 +287,8 @@ def updateElementsByWorkflow(self, workload, updateParams, status=None): | |||
# Update all workload parameters based on the full reqArgs dictionary | |||
workload.updateWorkloadArgs(updateParams) | |||
# Commit the changes of the current workload object to the database: | |||
workload.saveCouchUrl(workload.specUrl()) | |||
metadata = {'name': wfName} | |||
workload.saveCouch(self.hostWithAuth, self.db.name, metadata=metadata) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this method used by global workqueue as well? It might be that making this change for the agent will actually break global workqueue.
In addition, I think it is the first time I see this update being done with metadata (and isn't it redundant with the workload.updateWorkloadArgs
?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes (is used by global) and No (won't break anything) and No (it is not redundant)
@@ -325,6 +326,41 @@ def deleteWQElementsByWorkflow(self, workflowNames): | |||
deleted += len(ids) | |||
return deleted | |||
|
|||
def getWQElementsByWorkflow(self, workflowNames, inboxFlag=False): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than the workflow name, can't we have a method that will be very very similar to what is implemented for getWorkflowNames()
:
def getWorkflowNames(self, inboxFlag=False): |
?
This implementation seems to be overloaded, also with the additional data structures created and grouped by database.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have changed the the data structure returned so it is now just a list of WQE.
And, No we cannot do Just what is done in getWorkflowNames
- because it'd return only the WE ids, while we need the full content of the WE - hence the rest of the function, beyond the call to couchDB vew elementsByWorkflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand why you need the full element content.
Can you execute an in-place update as performed by this method:
def updateElements(self, *elementIds, **updatedParams): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
additionally, if we have unit tests for this module, please create a new one for this new method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are not using this method for updating any WQE, but rather for listing its content like this: [1]
In [1]: sitelistpoller.localWQ.getWQElementsByWorkflow('dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803')
Out[1]:
[{'Inputs': {},
'ProcessedInputs': [],
'RejectedInputs': [],
'PileupData': {},
'ParentData': {},
'ParentFlag': False,
'Jobs': 0,
'WMSpec': None,
'SiteWhitelist': [],
'SiteBlacklist': [],
'Dbs': None,
'Task': None,
'ParentQueueId': None,
'Priority': 0,
'SubscriptionId': None,
'Status': None,
'EventsWritten': 0,
'FilesProcessed': 0,
'PercentComplete': 0,
'PercentSuccess': 0,
'RequestName': None,
'TaskName': None,
'TeamName': None,
'StartPolicy': {},
'EndPolicy': {},
'ACDC': {},
'ChildQueueUrl': None,
'ParentQueueUrl': None,
'WMBSUrl': None,
'NumberOfLumis': 0,
'NumberOfEvents': 0,
'NumberOfFiles': 0,
'NumOfFilesAdded': 0,
'Mask': None,
'OpenForNewData': False,
'TimestampFoundNewData': 1738241180,
'NoInputUpdate': False,
'NoPileupUpdate': False,
'CreationTime': 1738241180,
'_id': '3c36d2561fce25529b81e2045327b944',
'_rev': '7-90374a9e21a25a0843c0000ce6b29642',
'thunker_encoded_json': True,
'type': 'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement',
'WMCore.WorkQueue.DataStructs.WorkQueueElement.WorkQueueElement': {'Inputs': {'/HLTPhysicsIsolatedBunch/Run2016H-v1/RAW#92049f6e-92f9-11e6-b150-001e67abf228': ['T2_CH_CERN_P5',
'T2_CH_CERN_HLT',
'T2_CH_CERN']},
'ParentFlag': False,
'ParentData': {},
'NumberOfLumis': 3,
'NumberOfFiles': 3,
'NumberOfEvents': 3008,
'Jobs': 8,
'OpenForNewData': False,
'NoInputUpdate': False,
'NoPileupUpdate': False,
'Status': 'Running',
'RequestName': 'dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803',
'TaskName': 'DataProcessing',
'Dbs': 'https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader',
'SiteWhitelist': ['T1_DE_KIT'],
'SiteBlacklist': ['T2_FR_IPHC',
'T2_GR_Ioannina',
'T2_HU_Budapest',
'T2_IN_TIFR',
'T2_IT_Bari',
'T2_IT_Legnaro',
'T2_IT_Pisa',
'T2_IT_Rome',
'T2_KR_KISTI'],
'StartPolicy': 'Block',
'EndPolicy': {'policyName': 'SingleShot'},
'Priority': 190003,
'PileupData': {},
'ProcessedInputs': [],
'RejectedInputs': [],
'ParentQueueId': '3c36d2561fce25529b81e2045327b944',
'SubscriptionId': 844,
'EventsWritten': 0,
'FilesProcessed': 0,
'PercentComplete': 0,
'PercentSuccess': 0,
'TeamName': 'testbed-vocms0192',
'ACDC': {},
'ChildQueueUrl': None,
'ParentQueueUrl': None,
'WMBSUrl': None,
'NumOfFilesAdded': 0,
'Mask': None,
'TimestampFoundNewData': 1737716401,
'CreationTime': 1737716401},
'updatetime': 1737716402.0188694,
'timestamp': 1737716402.0188694}]
ids.append(entry["id"]) | ||
|
||
for id in ids: | ||
wqeList[dbName].append(couchdb.get(uri=f"/workqueue/_design/WorkQueue/_rewrite/element/{id}" )) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this line correctly, here you are actually fetching the workqueue elements given a doc id. Is that correct? If so, why are we doing this? Can't we call the updateElements()
method when we are actually updating the elements properties (of course, outside this method)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you did understand properly the line. And again, we are not using this method for updates this is a get*
method... neither set*
nor update*
wHelper = WMWorkloadHelper() | ||
wHelper.load(pklFileName) | ||
wHelper.load(specUrl) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now we seem to have a mix of workload object from different sources:
pklFileName
, which I believe to live in the filesystem (probably under WorkQueueManager/cache)specUrl
from the CouchDB document.
We should pick one of those and use it consistently. This will avoid confusion and we can also make sure that we are comparing the current site lists agains the correct workload spec file, hence avoid unnecessary updates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, to me, it was not understood why should we switch to loading a .pkl
file at the first place, since the initial idea and also, if I am not wrong, Valentin's previous implementation of this was by loading the spec from couch. only after the PR for swapping the functions to use mine updatWorkloadArgs
, it was changed to loading the spec from pkl
without any explanation why. And as of consistency - the same mechanism happens at central services and we do it quite safely from the database. I do not see a reason why to change this approach here at the Agent.
Something more - tha actual reason indeed is because we are not agnostic to the the way how the workload
object is created.
- If we load from couchDB: then the method
workload.specUrl()
returns:
'http://127.0.0.1:5984/workqueue/dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803/spec
- if we load from
.pkl
: then the methodworkload.specUrl()
returns:
/data/srv/wmagent/2.3.8/install/WorkQueueManager/cache/dmapelli_ReReco_RunBlockWhite_Nvidia_test_v1_250124_095017_1803/WMSandbox/WMWorkload.pkl
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW we may need to investigate on that , because on the other hand I could not reproduce the behaviour just a minute ago, which was strange indeed. but in any case we'd better stay consistent acros Central services and WMAgent and stick to equivalent methods for obtaining the wrkflow spec, which would be guarantied to always produce the spec URI related to the couchDB rather then the filesystem.
try: | ||
# update local WorkQueue first | ||
params = {'SiteWhitelist': siteWhiteList, 'SiteBlacklist': siteBlackList} | ||
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available']) | ||
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available', 'Negotiating', 'Acquired', 'Running']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the element is acquired or running, there is no reason to update the site lists, as data has been already materialized in the WMBS tables and it will no longer change the behavior of jobs.
The Negotiating
state might be better to keep though, just in case global workqueue did not manage to update those (I think we do not update Negotiating elements in global).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which just brings up the question should we not update WMBS as well? This is something I was expecting for a while now to com up. Because looking at it from another angle: There is no way for a WQE to stay in status Availiable
at the agent for very long (it would be fairly short period of time if any at all). Then if we change nothing in terms of future jobs to be materialized in condor by updating the WQEs in status Aquired
and Running
(because being blocked by the already created WMBS records is exactly what it means), then we simply do next to nothing here by changing any of the already fetched WQEs, unless we continue down to condor and propagate the sitelists changes as far as the idle jobs at the very least. Having the short window of updating only WQE in status Availiable
(meaning between the time of fetching a WQE from the Global WQ and creating all the records in WMBS) would completely destroy the efficiency of this functionality.
So if we need to dig deeper and propagate the site lists change to the lower layers of the system, I am all up for this. Count me in. The next dev issue is just behind the corner...... Just say a word ... ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this feature is not going to be useful for short workflows in general, as they usually get fully acquired in a very short time window.
When we discussed this featured, we had 2 main alternatives, and they required different level of effort. We decided to take the first one, which is not very intrusive (at job level), and see how it is used and build up experience with it. If there is a need, we can expand this feature to also cover the second alternative.
These options are described in the meta issue: #8323
So for now, let us propagate the site list changes only to WQE that have not yet been materialized in WMBS/jobs - as we have been working on this feature since the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even in this case I'd remove only Running
from this list of statuses. and keep Negotiating
and Acquired
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to zoom out a bit.
Can you please remind me what are the WQE statuses that we update in global workqueue?
According to this documentation: https://cms-wmcore.docs.cern.ch/training/data_flow/
in local queue, we have:
- workqueue_inbox reflecting documents being replicated from global (hence they go through a Negotiating, Acquired, Running, etc statuses
- workqueue gets created when an agent acquires work from GQ to LQ.
In LQ, are we updating these elements in workqueue or workqueue_inbox database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please remind me what are the WQE statuses that we update in global workqueue?
if you ask about the WQE statuses in GW concerning this feature, there it is:
self.gq_service.updateElementsByWorkflow(workload, reqArgs, status=['Available', 'Negotiating', 'Acquired']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In LQ, are we updating these elements in workqueue or workqueue_inbox database?
In LQ we are updating the elements in workqueue
:
It happens here:
self.localWQ.updateElementsByWorkflow(wHelper, params, status=['Available']) |
And the self.localWQ
object is created here:
WMCore/src/python/WMComponent/WorkflowUpdater/SiteListPoller.py
Lines 44 to 46 in d48389c
self.localCouchUrl = config.WorkQueueManager.couchurl | |
self.localWQ = WorkQueue(self.localCouchUrl, | |
config.WorkQueueManager.dbname) |
While the actual database name comes from the agent configuration and it is as I said above workqueue
:
(WMAgent.venv3) cmst1@vocms0192:WMCore $ grep WorkQueueManager.dbname $WMA_CONFIG_FILE
config.WorkQueueManager.dbname = 'workqueue'
1e9398e
to
8d86005
Compare
Jenkins results:
|
Jenkins results:
|
Fixes #12244
Status
Ready
Description
With the current PR we:
Is it backward compatible (if not, which system it affects?)
YES
Related PRs
#12123
External dependencies / deployment changes
None