Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH IPC Improvements - Support TCP, ZMQ and Better multi-node handling #32

Merged
merged 20 commits into from
Aug 8, 2024

Conversation

gadorlhiac
Copy link
Collaborator

@gadorlhiac gadorlhiac commented Jul 30, 2024

Description

This PR overhauls socket-based IPC. Support for TCP is added, and SSH tunnels are provided as a backup if Unix sockets are used across multiple nodes. A first iteration of ZMQ is added as an alternative to raw sockets for future use. The use of ZMQ is currently controlled by a global bool but this will be configurable in a more standard manner later.

Note: Message order is not controlled. If running with multiple ranks the execution order of the rank as well as the read schedule (and the network) will determine when messages arrive. Messages arriving via the PipeCommunicator are not synchronized with those arriving via the SocketCommunicator and may also arrive out of order.

Checklist

  • SocketCommunicator supports TCP sockets.
  • SocketCommunicator : Task-side will open SSH tunnel if on different machine and using Unix sockets.
  • SocketCommunicator adds ZMQ support
    • Works for tcp
    • Works for Unix
  • Executor passes along information about what host it is on, or what ports to use.
  • LUTE_USE_TCP environment variable.
  • Launch scripts updated
  • Documentation updates.
  • Test case

PR Type:

  • New feature/Enhancement

Address issues:

  • NA

Testing

Tested using direct Python submission and with SLURM across multiple nodes using all test classes. Example output

Testing TCP communication with raw sockets

  • SLURM using MultiNodeCommunicationTester Task which uses MPI.
# Submit with ntasks=4 -> Becomes 3 ranks split on nodes.
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> /sdf/scratch/users/d/dorlhiac/lute/launch_scripts/submit_slurm.sh -t MultiNodeCommunicationTester -c /sdf/scratch/users/d/dorlhiac/lute/config/test.yaml --account=lcls:data --partition=milano --ntasks=4 --nodes=3
Running in standard mode.
Submitting task MultiNodeCommunicationTester
Submitted batch job 53083354

# Receive both text and figure information from all three ranks
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> cat MultiNodeCommunicationTester__r0000_2024-08-06_09-50-29.out
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Rank 0 of 3 sending message.  From sdfmilan114 to sdfmilan114.
INFO:lute.execution.executor:Executor: TestMultiNodeCommunication started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') send_obj='plot' arr_size=5
INFO:lute.execution.executor:Rank 1 of 3 sending message.  From sdfmilan114 to sdfmilan114.
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Rank 2 of 3 sending message.  From sdfmilan115 to sdfmilan114.
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Test Finished.
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestMultiNodeCommunication', task_status=<TaskStatus.COMPLETED: 2>, summary='Test Finished.', payload='', impl_schemas=None)
  • Python submission of SocketTester a non-MPI task.
# Receive 10 arrays and 10 messages that they are being sent
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac/lute]$> LUTE_USE_TCP=1 python -B run_task.py -t SocketTester -c config/test.yaml
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
DEBUG:lute.execution.executor:Absolute path to subprocess_task.py not found.
INFO:lute.execution.executor:Sending array 0
DEBUG:lute.execution.executor:Cannot set result from TaskParameters. `set_result` not specified!
INFO:lute.execution.executor:Executor: TestSocket started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') array_size=8000 num_arrays=10
INFO:lute.execution.executor:Sending array 1
INFO:lute.execution.executor:Sending array 2
INFO:lute.execution.executor:[0.93878673 0.85870327 0.14269453 ... 0.30021307 0.62379757 0.2676032 ]
INFO:lute.execution.executor:[0.21426516 0.95511496 0.11939231 ... 0.68906608 0.11352986 0.34279074]
INFO:lute.execution.executor:Sending array 3
INFO:lute.execution.executor:[0.39335495 0.46792767 0.52884037 ... 0.51449615 0.93212467 0.96204988]
INFO:lute.execution.executor:Sending array 4
INFO:lute.execution.executor:[0.86709589 0.6183857  0.21971463 ... 0.42085349 0.26052249 0.64922644]
INFO:lute.execution.executor:Sending array 5
INFO:lute.execution.executor:[0.6944077  0.81515865 0.09171156 ... 0.46321115 0.485619   0.50942979]
INFO:lute.execution.executor:Sending array 6
INFO:lute.execution.executor:[0.16326848 0.45722451 0.99328962 ... 0.54119834 0.9896704  0.1311367 ]
INFO:lute.execution.executor:Sending array 7
INFO:lute.execution.executor:[0.7469258  0.84533057 0.95389808 ... 0.14454207 0.71833466 0.02560989]
INFO:lute.execution.executor:Sending array 8
INFO:lute.execution.executor:[0.31319232 0.80430753 0.62222915 ... 0.69475618 0.85449827 0.86542318]
INFO:lute.execution.executor:Sending array 9
INFO:lute.execution.executor:[0.28475446 0.80272991 0.13005413 ... 0.46486911 0.14186617 0.57041271]
INFO:lute.execution.executor:[0.02339099 0.4541457  0.10760855 ... 0.4653809  0.96423691 0.71274681]
INFO:lute.execution.executor:Sent 10 arrays
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestSocket', task_status=<TaskStatus.COMPLETED: 2>, summary='Sent 10 arrays', payload=array([0.72978264, 0.22678095, 0.09365268, ..., 0.56888893, 0.49116616,
       0.18841854]), impl_schemas=None)
DEBUG:lute.io._sqlite:_make_task_table[CREATE]: CREATE TABLE IF NOT EXISTS TestSocket(id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, gen_cfg_id INTEGER, exec_cfg_id INTEGER, "array_size" INTEGER, "num_arrays" INTEGER, "result.task_status" TEXT, "result.summary" TEXT, "result.payload" BLOB, "result.impl_schemas" TEXT, valid_flag INTEGER)
DEBUG:root:_gen_cfg_table_entry: Rows matching title LIKE 'LUTE Task Configuration' AND experiment LIKE 'EXPL10000' AND run LIKE '' AND date LIKE '2023/10/25' AND lute_version LIKE '0.1' AND task_timeout LIKE '600': [(1,)]
DEBUG:root:_exec_cfg_table_entry: No matching rows - adding new row: 211
DEBUG:lute.io._sqlite:_add_task_entry: ['"gen_cfg_id"', '"exec_cfg_id"', '"array_size"', '"num_arrays"', '"result.task_status"', '"result.summary"', '"result.payload"', '"result.impl_schemas"', '"valid_flag"']
		[1, 211, 8000, 10, 'COMPLETED', 'Sent 10 arrays', array([0.72978264, 0.22678095, 0.09365268, ..., 0.56888893, 0.49116616,
       0.18841854]), None, 1]
DEBUG:lute.execution.ipc:Stopping socket reader thread.
QUEUE SIZE - 0
HAS MESSAGES: False
DEBUG:lute.execution.ipc:Closed reading thread.

Testing Unix with raw sockets

  • SLURM using MultiNodeCommunicationTester Task which uses MPI.
# Set TCP mode off. Submit with ntasks=4 -> Becomes 3 ranks split on nodes.
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> LUTE_USE_TCP=0 /sdf/scratch/users/d/dorlhiac/lute/launch_scripts/submit_slurm.sh -t MultiNodeCommunicationTester -c /sdf/scratch/users/d/dorlhiac/lute/config/test.yaml --account=lcls:data --partition=milano --ntasks=4 --nodes=3
# Receive both text and figure information from all three ranks
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> cat MultiNodeCommunicationTester__r0000_2024-08-06_11-09-54.out
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Rank 0 of 3 sending message.  From sdfmilan111 to sdfmilan111.
INFO:lute.execution.executor:Executor: TestMultiNodeCommunication started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') send_obj='plot' arr_size=5
INFO:lute.execution.executor:Rank 1 of 3 sending message.  From sdfmilan114 to sdfmilan111.
INFO:lute.execution.executor:Rank 2 of 3 sending message.  From sdfmilan114 to sdfmilan111.
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Test Finished.
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestMultiNodeCommunication', task_status=<TaskStatus.COMPLETED: 2>, summary='Test Finished.', payload='', impl_schemas=None)
  • Python submission of SocketTester a non-MPI task. (no SSH)
# Receive 10 arrays and 10 messages that they are being sent
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac/lute]$> python -B run_task.py -t SocketTester -c config/test.yaml
DEBUG:lute.execution.ipc:UnixSocketCommunicator defines socket_path: /lscratch/dorlhiac/tmp/lute_276e379f3175423aa01a03c11361c7f3.sock
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
INFO:lute.execution.executor:Sourcing file /sdf/group/lcls/ds/tools/ccp4-8.0/bin/ccp4.setup-sh
DEBUG:lute.execution.executor:Absolute path to subprocess_task.py not found.
INFO:lute.execution.executor:Sending array 0
DEBUG:lute.execution.executor:Cannot set result from TaskParameters. `set_result` not specified!
INFO:lute.execution.executor:Executor: TestSocket started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') array_size=8000 num_arrays=10
INFO:lute.execution.executor:Sending array 1
INFO:lute.execution.executor:Sending array 2
INFO:lute.execution.executor:[0.52160079 0.96890058 0.65177914 ... 0.50543691 0.22715026 0.99365486]
INFO:lute.execution.executor:Sending array 3
INFO:lute.execution.executor:[0.41676567 0.88092955 0.09547312 ... 0.59151888 0.38080628 0.78248672]
INFO:lute.execution.executor:[0.39964097 0.51591746 0.34305216 ... 0.13039327 0.36177007 0.17490791]
INFO:lute.execution.executor:[0.42283322 0.7667537  0.27851572 ... 0.99605828 0.63202465 0.29843001]
INFO:lute.execution.executor:Sending array 5
INFO:lute.execution.executor:[0.04352282 0.60851292 0.75757486 ... 0.22440194 0.90463436 0.1703041 ]
INFO:lute.execution.executor:Sending array 6
INFO:lute.execution.executor:[0.4470582  0.94456332 0.20254665 ... 0.16694299 0.18221614 0.37255754]
INFO:lute.execution.executor:Sending array 7
INFO:lute.execution.executor:[0.94784087 0.91379064 0.73552507 ... 0.8184123  0.55642381 0.88222426]
INFO:lute.execution.executor:Sending array 8
INFO:lute.execution.executor:[0.35079542 0.95001879 0.30060745 ... 0.89436458 0.3716709  0.7773894 ]
INFO:lute.execution.executor:Sending array 9
INFO:lute.execution.executor:[0.50044605 0.4128454  0.70774964 ... 0.31209946 0.47164879 0.62445722]
INFO:lute.execution.executor:[0.01827509 0.76334622 0.18165282 ... 0.14527713 0.0569127  0.93727003]
INFO:lute.execution.executor:Sent 10 arrays
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestSocket', task_status=<TaskStatus.COMPLETED: 2>, summary='Sent 10 arrays', payload=array([0.12877769, 0.27503862, 0.34789427, ..., 0.12360039, 0.39367444,
       0.35728848]), impl_schemas=None)
DEBUG:lute.io._sqlite:_make_task_table[CREATE]: CREATE TABLE IF NOT EXISTS TestSocket(id INTEGER PRIMARY KEY AUTOINCREMENT, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, gen_cfg_id INTEGER, exec_cfg_id INTEGER, "array_size" INTEGER, "num_arrays" INTEGER, "result.task_status" TEXT, "result.summary" TEXT, "result.payload" BLOB, "result.impl_schemas" TEXT, valid_flag INTEGER)
DEBUG:root:_gen_cfg_table_entry: Rows matching title LIKE 'LUTE Task Configuration' AND experiment LIKE 'EXPL10000' AND run LIKE '' AND date LIKE '2023/10/25' AND lute_version LIKE '0.1' AND task_timeout LIKE '600': [(1,)]
DEBUG:root:_exec_cfg_table_entry: No matching rows - adding new row: 214
DEBUG:lute.io._sqlite:_add_task_entry: ['"gen_cfg_id"', '"exec_cfg_id"', '"array_size"', '"num_arrays"', '"result.task_status"', '"result.summary"', '"result.payload"', '"result.impl_schemas"', '"valid_flag"']
		[1, 214, 8000, 10, 'COMPLETED', 'Sent 10 arrays', array([0.12877769, 0.27503862, 0.34789427, ..., 0.12360039, 0.39367444,
       0.35728848]), None, 1]
DEBUG:lute.execution.ipc:Stopping socket reader thread.
DEBUG:lute.execution.ipc:Closed reading thread.

Testing TCP with ZMQ

  • SLURM using MultiNodeCommunicationTester Task which uses MPI.
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> cat multi_tcp_zmq.out
INFO:lute.execution.ipc:Will use TCP (ZMQ).
INFO:lute.execution.executor:Rank 0 of 3 sending message.  From sdfmilan101 to sdfmilan101. 
INFO:lute.execution.executor:Executor: TestMultiNodeCommunication started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') send_obj='plot' arr_size=5
INFO:lute.execution.executor:Rank 1 of 3 sending message.  From sdfmilan101 to sdfmilan101.
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Rank 2 of 3 sending message.  From sdfmilan102 to sdfmilan101. 
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Test Finished.
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestMultiNodeCommunication', task_status=<TaskStatus.COMPLETED: 2>, summary='Test Finished.', payload='', impl_schemas=None)
[dorlhiac@sdfiana004 /sdf/scratch/users/d/dorlhiac]$> cat multi_unix_zmq.out
INFO:lute.execution.ipc:Will use Unix sockets (ZMQ).
INFO:lute.execution.executor:Executor: TestMultiNodeCommunication started
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:lute_config=AnalysisHeader(title='LUTE Task Configuration', experiment='EXPL10000', run='', date='2023/10/25', lute_version=0.1, task_timeout=600, work_dir='/sdf/scratch/users/d/dorlhiac') send_obj='plot' arr_size=5
INFO:lute.execution.executor:Rank 0 of 3 sending message.  From sdfmilan113 to sdfmilan113.
INFO:lute.execution.executor:Rank 1 of 3 sending message.  From sdfmilan113 to sdfmilan113.
INFO:lute.execution.executor:Rank 2 of 3 sending message.  From sdfmilan114 to sdfmilan113.
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Figure(640x480)
INFO:lute.execution.executor:Test Finished.
INFO:lute.execution.executor:TaskStatus.COMPLETED
INFO:lute.io.elog:eLog Update Failed! JID_UPDATE_COUNTERS is not defined!
INFO:lute.execution.executor:TaskResult(task_name='TestMultiNodeCommunication', task_status=<TaskStatus.COMPLETED: 2>, summary='Test Finished.', payload='', impl_schemas=None)

Testing Unix with ZMQ

Screenshots

@gadorlhiac gadorlhiac changed the title ENH IPC over SSH ENH IPC Improvements - Support TCP, ZMQ and Better multi-node handling Aug 2, 2024
@gadorlhiac gadorlhiac marked this pull request as ready for review August 8, 2024 18:04
@gadorlhiac gadorlhiac requested a review from valmar August 8, 2024 18:04
Copy link
Contributor

@valmar valmar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • In new_task.md, line 662, there is an ultra minor typo: " tricky-to-track-dow" -> "tricky-to-track-down"

  • The ZMQ and SSH code could be removed, up to you. I would remove it just to keep the code cleaner, but whatever you prefer

  • If I understand correctly the communicator will scan the ports to find an empty one. Is this because the ports could be used by other communicators? Could in the future this be flagged as an attack, if we run some secuity software on our machines?

  • HEAD[::-1] Love this!! Sequence in reverse! Very cool! HELLO -> OLLEH :-)

@gadorlhiac
Copy link
Collaborator Author

I'll address the typo (and something else I just thought of). For the other comments:

  • I'd leave the ZMQ code for now in case we want to use it for other types of communication. I agree it's a bit messier but if not we'll need to rewrite this at some point rather than have an already working ZMQ template.
    • The SSH I think will still be necessary unless we automatically choose TCP or Unix sockets based on hostname (but then we have to manage two sockets). This is to control for the case where we submit with Unix sockets but accidentally get multiple nodes.
  • Interesting point on the port scanning I didn't consider. Its there because we don't know if another process will be using a port (more than because other communicators will be using it). I'd hope that this is relatively infrequent - so since we exit after finding a port we should hopefully only connecting once or twice. What would you suggest instead?

@valmar
Copy link
Contributor

valmar commented Aug 8, 2024

The port scanning was just a thought. Let's merge it for now

@valmar valmar merged commit 2d37011 into slac-lcls:dev Aug 8, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants