diff --git a/parsl/executors/high_throughput/interchange.py b/parsl/executors/high_throughput/interchange.py index 0dfbc1ba33..177445693d 100644 --- a/parsl/executors/high_throughput/interchange.py +++ b/parsl/executors/high_throughput/interchange.py @@ -9,7 +9,7 @@ import sys import threading import time -from typing import Any, Dict, List, NoReturn, Optional, Sequence, Set, Tuple, cast +from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast import zmq @@ -217,79 +217,68 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender monitoring_radio.send((MessageType.NODE_INFO, d)) - @wrap_with_logs(target="interchange") - def _command_server(self) -> NoReturn: + def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) -> None: """ Command server to run async command to the interchange """ - logger.debug("Command Server Starting") - - if self.hub_address is not None and self.hub_zmq_port is not None: - logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port) - monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port) - else: - monitoring_radio = None + logger.debug("entering command_server section") reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...) - while True: - try: - command_req = self.command_channel.recv_pyobj() - logger.debug("Received command request: {}".format(command_req)) - if command_req == "CONNECTED_BLOCKS": - reply = self.connected_block_history - - elif command_req == "WORKERS": - num_workers = 0 - for manager in self._ready_managers.values(): - num_workers += manager['worker_count'] - reply = num_workers - - elif command_req == "MANAGERS": - reply = [] - for manager_id in self._ready_managers: - m = self._ready_managers[manager_id] - idle_since = m['idle_since'] - if idle_since is not None: - idle_duration = time.time() - idle_since - else: - idle_duration = 0.0 - resp = {'manager': manager_id.decode('utf-8'), - 'block_id': m['block_id'], - 'worker_count': m['worker_count'], - 'tasks': len(m['tasks']), - 'idle_duration': idle_duration, - 'active': m['active'], - 'parsl_version': m['parsl_version'], - 'python_version': m['python_version'], - 'draining': m['draining']} - reply.append(resp) - - elif command_req.startswith("HOLD_WORKER"): - cmd, s_manager = command_req.split(';') - manager_id = s_manager.encode('utf-8') - logger.info("Received HOLD_WORKER for {!r}".format(manager_id)) - if manager_id in self._ready_managers: - m = self._ready_managers[manager_id] - m['active'] = False - self._send_monitoring_info(monitoring_radio, m) + if self.command_channel in self.socks and self.socks[self.command_channel] == zmq.POLLIN: + + command_req = self.command_channel.recv_pyobj() + logger.debug("Received command request: {}".format(command_req)) + if command_req == "CONNECTED_BLOCKS": + reply = self.connected_block_history + + elif command_req == "WORKERS": + num_workers = 0 + for manager in self._ready_managers.values(): + num_workers += manager['worker_count'] + reply = num_workers + + elif command_req == "MANAGERS": + reply = [] + for manager_id in self._ready_managers: + m = self._ready_managers[manager_id] + idle_since = m['idle_since'] + if idle_since is not None: + idle_duration = time.time() - idle_since else: - logger.warning("Worker to hold was not in ready managers list") - - reply = None + idle_duration = 0.0 + resp = {'manager': manager_id.decode('utf-8'), + 'block_id': m['block_id'], + 'worker_count': m['worker_count'], + 'tasks': len(m['tasks']), + 'idle_duration': idle_duration, + 'active': m['active'], + 'parsl_version': m['parsl_version'], + 'python_version': m['python_version'], + 'draining': m['draining']} + reply.append(resp) + + elif command_req.startswith("HOLD_WORKER"): + cmd, s_manager = command_req.split(';') + manager_id = s_manager.encode('utf-8') + logger.info("Received HOLD_WORKER for {!r}".format(manager_id)) + if manager_id in self._ready_managers: + m = self._ready_managers[manager_id] + m['active'] = False + self._send_monitoring_info(monitoring_radio, m) + else: + logger.warning("Worker to hold was not in ready managers list") - elif command_req == "WORKER_PORTS": - reply = (self.worker_task_port, self.worker_result_port) + reply = None - else: - logger.error(f"Received unknown command: {command_req}") - reply = None + elif command_req == "WORKER_PORTS": + reply = (self.worker_task_port, self.worker_result_port) - logger.debug("Reply: {}".format(reply)) - self.command_channel.send_pyobj(reply) + else: + logger.error(f"Received unknown command: {command_req}") + reply = None - except zmq.Again: - logger.debug("Command thread is alive") - continue + logger.debug("Reply: {}".format(reply)) + self.command_channel.send_pyobj(reply) @wrap_with_logs def start(self) -> None: @@ -309,17 +298,13 @@ def start(self) -> None: start = time.time() - self._command_thread = threading.Thread(target=self._command_server, - name="Interchange-Command", - daemon=True) - self._command_thread.start() - kill_event = threading.Event() poller = zmq.Poller() poller.register(self.task_outgoing, zmq.POLLIN) poller.register(self.results_incoming, zmq.POLLIN) poller.register(self.task_incoming, zmq.POLLIN) + poller.register(self.command_channel, zmq.POLLIN) # These are managers which we should examine in an iteration # for scheduling a job (or maybe any other attention?). @@ -330,6 +315,7 @@ def start(self) -> None: while not kill_event.is_set(): self.socks = dict(poller.poll(timeout=poll_period)) + self.process_command(monitoring_radio) self.process_task_incoming() self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event) self.process_results_incoming(interesting_managers, monitoring_radio)