Skip to content

Commit

Permalink
Tests: Long lived connections module
Browse files Browse the repository at this point in the history
Module implements support of openning huge amount of long
lived connections during test. Those are just opened, no
data are transfered.

When using, be sure of appropriate system-wide ulimit
for `NO_FILES` is set, soft limit is set by module itself.
  • Loading branch information
enhaut committed Jun 18, 2024
1 parent 1610072 commit 9486fbc
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
168 changes: 168 additions & 0 deletions lnst/Tests/LongLivedConnections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
import time
import select
import socket
import logging
import resource
import threading
from typing import Union

from .BaseTestModule import BaseTestModule
from lnst.Common.IpAddress import Ip4Address, Ip6Address, ipaddress
from lnst.Common.Parameters import IntParam, StrParam


def get_af(ip: Union[Ip4Address, Ip6Address]):
ip = ipaddress(
ip
) # self.params are transferred as values, so we need to convert it back

if isinstance(ip, Ip4Address):
return socket.AF_INET
elif isinstance(ip, Ip6Address):
return socket.AF_INET6
else:
raise ValueError(f"Unknown IP address type: {ip}")


class BaseLongLivedTestModule(BaseTestModule):
server_ip = StrParam(mandatory=True)
server_port = IntParam(mandatory=True)
duration = IntParam(default=0) # 0 means run indefinitely, until SIGINT is received
connections_count = IntParam(default=1)

def run(self):
self._running = True
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)

self._set_rlimit(hard, hard)
self._start()
if self.params.duration:
time.sleep(self.params.duration)
else:
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logging.info("Interrupted, dying...")

self._stop()
self._set_rlimit(hard, soft)

return (
self._result
) # is set as part of _stop (if # of conns matches expected # of conns)

def _set_rlimit(self, hard: int, soft: int):
logging.info(f"Setting RLIMIT_NOFILE to {(soft, hard)}")
resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))

def runtime_estimate(self):
return self.params.duration + 5


class LongLivedServer(BaseLongLivedTestModule):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self._running = False
self._connections = []

self._polling_thread = None
self._listening_thread = None

def _start(self):
self._running = True
self._listening_thread = threading.Thread(target=self._listen)
self._listening_thread.start()

self._polling_thread = threading.Thread(target=self._poll_connections)
self._polling_thread.start()

def _stop(self):
self._result = (
True if len(self._connections) == self.params.connections_count else False
)

logging.info("Stopping LongLivedServer server")
self._running = False

self._listening_thread.join()
self._polling_thread.join()

for conn in self._connections:
conn.close()

def _listen(self):
with socket.socket(
get_af(self.params.server_ip), socket.SOCK_STREAM
) as server_socket:
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind((str(self.params.server_ip), self.params.server_port))
server_socket.listen(65536)

logging.info(
f"TCP server started on {self.params.server_ip}:{self.params.server_port}"
)
while self._running:
try:
ready, _, _ = select.select([server_socket], [], [], 1)
except ValueError:
continue

if not ready:
time.sleep(1) # prevent active polling
continue

for conn in ready:
client_socket, client_address = server_socket.accept()
client_socket.setblocking(False)

self._connections.append(client_socket)

def _poll_connections(self):
while self._running: # no need to lock due to GIL
if not self._connections:
time.sleep(1)
continue

try:
ready, _, _ = select.select(self._connections, [], [], 1)
except ValueError:
continue


class LongLivedClient(BaseLongLivedTestModule):
client_ip = StrParam(mandatory=True)

def __init__(self, **kwargs):
super().__init__(**kwargs)
self._connections = []

def _start(self):
for _ in range(self.params.connections_count):
conn = self._start_connection()
self._connections.append(conn)

logging.info(f"All connections established by {self}")

def _stop(self):
self._result = (
True if len(self._connections) == self.params.connections_count else False
)

for conn in self._connections:
conn.close()

self._connections = []

def _start_connection(self):
sck = socket.socket(get_af(self.params.server_ip), socket.SOCK_STREAM)
sck.bind(
(self.params.client_ip, 0)
) # needs to be binded to specific IP to respect flow IPs
sck.connect((str(self.params.server_ip), self.params.server_port))

sck.setblocking(False)
# ^^^ sets socket into a "something may be written to there" state,
# so it won't close connection when jumping out of this function context

return sck
1 change: 1 addition & 0 deletions lnst/Tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@
from lnst.Tests.RDMABandwidth import RDMABandwidthClient, RDMABandwidthServer
from lnst.Tests.PktGen import PktGen
from lnst.Tests.XDPBench import XDPBench
from .LongLivedConnections import LongLivedServer, LongLivedClient
#TODO add support for test classes from lnst-ctl.conf

0 comments on commit 9486fbc

Please sign in to comment.