Skip to content

Commit

Permalink
remove command thread. throughput seems a bit lower maybe? 2100 tasks…
Browse files Browse the repository at this point in the history
…/second or so?
  • Loading branch information
benclifford committed Jan 18, 2025
1 parent b7c6462 commit 9beca06
Showing 1 changed file with 55 additions and 69 deletions.
124 changes: 55 additions & 69 deletions parsl/executors/high_throughput/interchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import sys
import threading
import time
from typing import Any, Dict, List, NoReturn, Optional, Sequence, Set, Tuple, cast
from typing import Any, Dict, List, Optional, Sequence, Set, Tuple, cast

import zmq

Expand Down Expand Up @@ -217,79 +217,68 @@ def _send_monitoring_info(self, monitoring_radio: Optional[MonitoringRadioSender

monitoring_radio.send((MessageType.NODE_INFO, d))

@wrap_with_logs(target="interchange")
def _command_server(self) -> NoReturn:
def process_command(self, monitoring_radio: Optional[MonitoringRadioSender]) -> None:
""" Command server to run async command to the interchange
"""
logger.debug("Command Server Starting")

if self.hub_address is not None and self.hub_zmq_port is not None:
logger.debug("Creating monitoring radio to %s:%s", self.hub_address, self.hub_zmq_port)
monitoring_radio = ZMQRadioSender(self.hub_address, self.hub_zmq_port)
else:
monitoring_radio = None
logger.debug("entering command_server section")

reply: Any # the type of reply depends on the command_req received (aka this needs dependent types...)

while True:
try:
command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "CONNECTED_BLOCKS":
reply = self.connected_block_history

elif command_req == "WORKERS":
num_workers = 0
for manager in self._ready_managers.values():
num_workers += manager['worker_count']
reply = num_workers

elif command_req == "MANAGERS":
reply = []
for manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
idle_since = m['idle_since']
if idle_since is not None:
idle_duration = time.time() - idle_since
else:
idle_duration = 0.0
resp = {'manager': manager_id.decode('utf-8'),
'block_id': m['block_id'],
'worker_count': m['worker_count'],
'tasks': len(m['tasks']),
'idle_duration': idle_duration,
'active': m['active'],
'parsl_version': m['parsl_version'],
'python_version': m['python_version'],
'draining': m['draining']}
reply.append(resp)

elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(monitoring_radio, m)
if self.command_channel in self.socks and self.socks[self.command_channel] == zmq.POLLIN:

command_req = self.command_channel.recv_pyobj()
logger.debug("Received command request: {}".format(command_req))
if command_req == "CONNECTED_BLOCKS":
reply = self.connected_block_history

elif command_req == "WORKERS":
num_workers = 0
for manager in self._ready_managers.values():
num_workers += manager['worker_count']
reply = num_workers

elif command_req == "MANAGERS":
reply = []
for manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
idle_since = m['idle_since']
if idle_since is not None:
idle_duration = time.time() - idle_since
else:
logger.warning("Worker to hold was not in ready managers list")

reply = None
idle_duration = 0.0
resp = {'manager': manager_id.decode('utf-8'),
'block_id': m['block_id'],
'worker_count': m['worker_count'],
'tasks': len(m['tasks']),
'idle_duration': idle_duration,
'active': m['active'],
'parsl_version': m['parsl_version'],
'python_version': m['python_version'],
'draining': m['draining']}
reply.append(resp)

elif command_req.startswith("HOLD_WORKER"):
cmd, s_manager = command_req.split(';')
manager_id = s_manager.encode('utf-8')
logger.info("Received HOLD_WORKER for {!r}".format(manager_id))
if manager_id in self._ready_managers:
m = self._ready_managers[manager_id]
m['active'] = False
self._send_monitoring_info(monitoring_radio, m)
else:
logger.warning("Worker to hold was not in ready managers list")

elif command_req == "WORKER_PORTS":
reply = (self.worker_task_port, self.worker_result_port)
reply = None

else:
logger.error(f"Received unknown command: {command_req}")
reply = None
elif command_req == "WORKER_PORTS":
reply = (self.worker_task_port, self.worker_result_port)

logger.debug("Reply: {}".format(reply))
self.command_channel.send_pyobj(reply)
else:
logger.error(f"Received unknown command: {command_req}")
reply = None

except zmq.Again:
logger.debug("Command thread is alive")
continue
logger.debug("Reply: {}".format(reply))
self.command_channel.send_pyobj(reply)

@wrap_with_logs
def start(self) -> None:
Expand All @@ -309,17 +298,13 @@ def start(self) -> None:

start = time.time()

self._command_thread = threading.Thread(target=self._command_server,
name="Interchange-Command",
daemon=True)
self._command_thread.start()

kill_event = threading.Event()

poller = zmq.Poller()
poller.register(self.task_outgoing, zmq.POLLIN)
poller.register(self.results_incoming, zmq.POLLIN)
poller.register(self.task_incoming, zmq.POLLIN)
poller.register(self.command_channel, zmq.POLLIN)

# These are managers which we should examine in an iteration
# for scheduling a job (or maybe any other attention?).
Expand All @@ -330,6 +315,7 @@ def start(self) -> None:
while not kill_event.is_set():
self.socks = dict(poller.poll(timeout=poll_period))

self.process_command(monitoring_radio)
self.process_task_incoming()
self.process_task_outgoing_incoming(interesting_managers, monitoring_radio, kill_event)
self.process_results_incoming(interesting_managers, monitoring_radio)
Expand Down

0 comments on commit 9beca06

Please sign in to comment.