Skip to content

Commit

Permalink
data_store_mgr: refactor workflow connection
Browse files Browse the repository at this point in the history
* Move error handling up a level into `connect_workflow` from the
  `_start_subscription` and `_entire_workflow_update` methods.
* Simplify tests (all exceptions are now caught in the same way).
* Remove the multi-workflow handling ability of
  `_entire_workflow_update`, this is unused and can now be achieved more
  easily via asyncio.gather as the threadding has been removed.
  • Loading branch information
oliver-sanders committed Apr 22, 2024
1 parent 1970ef9 commit df0f1b6
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 191 deletions.
128 changes: 69 additions & 59 deletions cylc/uiserver/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from functools import wraps
from pathlib import Path
import time
from typing import Dict, Optional, Set, NamedTuple
from typing import Dict, NamedTuple

from cylc.flow.exceptions import WorkflowStopped
from cylc.flow.id import Tokens
Expand Down Expand Up @@ -197,20 +197,21 @@ async def connect_workflow(self, w_id, contact_data):

self.delta_queues[w_id] = {}

self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT],
)
successful_updates = await self._entire_workflow_update(ids=[w_id])

if w_id not in successful_updates:
# something went wrong, undo any changes to allow for subsequent
# connection attempts
self.log.info(f'failed to connect to {w_id}')
try:
# start the subscriber to keep this store updated
self._start_subscription(
w_id,
contact_data['name'],
contact_data[CFF.HOST],
contact_data[CFF.PUBLISH_PORT],
)
# make a one-off request to provide the initial data
await self._entire_workflow_update(w_id)
except WorkflowStopped:
self.disconnect_workflow(w_id)
except Exception as exc:
self.log.error(f'Failed to connect to {w_id}: {exc}')
self.disconnect_workflow(w_id)
return False
else:
# don't update the contact data until we have successfully updated
self._update_contact(w_id, contact_data)
Expand Down Expand Up @@ -268,7 +269,13 @@ def _purge_workflow(self, w_id):
if w_id in self.delta_queues:
del self.delta_queues[w_id]

def _start_subscription(self, w_id: str, reg: str, host: str, port: int):
def _start_subscription(
self,
w_id: str,
reg: str,
host: str,
port: int,
) -> None:
"""Instantiate and run subscriber data-store sync.
Args:
Expand All @@ -277,22 +284,30 @@ def _start_subscription(self, w_id: str, reg: str, host: str, port: int):
host: Hostname of target workflow.
port: Port of target workflow.
Raises:
WorkflowStoppedError
"""
# create the subscription client
subscriber = WorkflowSubscriber(
reg,
host=host,
port=port,
context=self.workflows_mgr.context,
topics=self.topics,
)

# start the subscription task
subscriber_task = asyncio.create_task(
subscriber.subscribe(
process_delta_msg,
func=_call_to_tuple(self.message_queue.put_nowait),
w_id=w_id,
)
)
self.active_subscriptions[w_id] = ActiveSubscription(subscriber, subscriber_task)
self.active_subscriptions[w_id] = ActiveSubscription(
subscriber, subscriber_task
)

def _stop_subscription(self, w_id: str) -> None:
"""Stop an active subscription.
Expand Down Expand Up @@ -424,55 +439,50 @@ def _reconcile_update(self, topic, delta, w_id):
self.log.exception(exc)

async def _entire_workflow_update(
self, ids: Optional[list] = None
) -> Set[str]:
"""Update entire local data-store of workflow(s).
self,
w_id: str,
req_method: str = 'pb_entire_workflow',
) -> None:
"""Call "req_method" on a workflow and put the data in the store.
Args:
ids: List of workflow external IDs.
w_id:
The workflow ID to fetch data from.
req_method:
The protobuf data endpoint to call on the workflow.
"""
if ids is None:
ids = []
Raises:
WorkflowStoppedError
# Request new data
req_method = 'pb_entire_workflow'

requests = {
w_id: workflow_request(
client=info['req_client'], command=req_method, log=self.log
)
for w_id, info in self.workflows_mgr.workflows.items()
if info.get('req_client') # skip stopped workflows
and (not ids or w_id in ids)
}
results = await asyncio.gather(
*requests.values(), return_exceptions=True
"""
# get the workflow client
client = self.workflows_mgr.workflows.get(w_id, {}).get(
'req_client', None
)
successes: Set[str] = set()
for w_id, result in zip(requests, results):
if isinstance(result, Exception):
if not isinstance(result, WorkflowStopped):
self.log.error(
'Failed to update entire local data-store '
f'of a workflow: {result}'
)
if not client:
raise WorkflowStopped(w_id)

# request data
result = await workflow_request(
client=client,
command=req_method,
log=self.log,
)

# update the store
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
pb_data = PB_METHOD_MAP[req_method]()
pb_data.ParseFromString(result)
new_data = deepcopy(DATA_TEMPLATE)
for field, value in pb_data.ListFields():
if field.name == WORKFLOW:
new_data[field.name].CopyFrom(value)
new_data['delta_times'] = {
key: value.last_updated
for key in DATA_TEMPLATE
}
continue
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data
successes.add(w_id)
return successes
new_data[field.name] = {n.id: n for n in value}
self.data[w_id] = new_data

def _update_contact(
self,
Expand Down
Loading

0 comments on commit df0f1b6

Please sign in to comment.