Skip to content

Commit

Permalink
Merge pull request #1162 from cisagov/za/1054-connection-pooling
Browse files Browse the repository at this point in the history
Ticket #1054: Connection pooling
  • Loading branch information
zandercymatics authored Oct 26, 2023
2 parents c7a1954 + 745f2bb commit 95e557b
Show file tree
Hide file tree
Showing 15 changed files with 1,786 additions and 762 deletions.
2 changes: 1 addition & 1 deletion docs/architecture/decisions/0023-use-geventconnpool.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Date: 2023-13-10

## Status

In Review
Accepted

## Context

Expand Down
3 changes: 3 additions & 0 deletions src/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ django-phonenumber-field = {extras = ["phonenumberslite"], version = "*"}
boto3 = "*"
typing-extensions ='*'
django-login-required-middleware = "*"
greenlet = "*"
gevent = "*"
fred-epplib = {git = "https://github.com/cisagov/epplib.git", ref = "master"}
geventconnpool = {git = "https://github.com/rasky/geventconnpool.git", ref = "1bbb93a714a331a069adf27265fe582d9ba7ecd4"}

[dev-packages]
django-debug-toolbar = "*"
Expand Down
1,707 changes: 1,016 additions & 691 deletions src/Pipfile.lock

Large diffs are not rendered by default.

157 changes: 140 additions & 17 deletions src/epplibwrapper/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""Provide a wrapper around epplib to handle authentication and errors."""

import logging

from time import sleep
from gevent import Timeout
from epplibwrapper.utility.pool_status import PoolStatus

try:
from epplib.client import Client
Expand All @@ -16,6 +19,7 @@
from .cert import Cert, Key
from .errors import LoginError, RegistryError
from .socket import Socket
from .utility.pool import EPPConnectionPool

logger = logging.getLogger(__name__)

Expand All @@ -39,9 +43,8 @@ class EPPLibWrapper:
ATTN: This should not be used directly. Use `Domain` from domain.py.
"""

def __init__(self) -> None:
def __init__(self, start_connection_pool=True) -> None:
"""Initialize settings which will be used for all connections."""

# prepare (but do not send) a Login command
self._login = commands.Login(
cl_id=settings.SECRET_REGISTRY_CL_ID,
Expand All @@ -51,6 +54,7 @@ def __init__(self) -> None:
"urn:ietf:params:xml:ns:contact-1.0",
],
)

# establish a client object with a TCP socket transport
self._client = Client(
SocketTransport(
Expand All @@ -60,44 +64,101 @@ def __init__(self) -> None:
password=settings.SECRET_REGISTRY_KEY_PASSPHRASE,
)
)
# prepare a context manager which will connect and login when invoked
# (it will also logout and disconnect when the context manager exits)
self._connect = Socket(self._client, self._login)

self.pool_options = {
# Pool size
"size": settings.EPP_CONNECTION_POOL_SIZE,
# Which errors the pool should look out for.
# Avoid changing this unless necessary,
# it can and will break things.
"exc_classes": (TransportError,),
# Occasionally pings the registry to keep the connection alive.
# Value in seconds => (keepalive / size)
"keepalive": settings.POOL_KEEP_ALIVE,
}

self._pool = None

# Tracks the status of the pool
self.pool_status = PoolStatus()

if start_connection_pool:
self.start_connection_pool()

def _send(self, command):
"""Helper function used by `send`."""
cmd_type = command.__class__.__name__

# Start a timeout to check if the pool is hanging
timeout = Timeout(settings.POOL_TIMEOUT)
timeout.start()

try:
cmd_type = command.__class__.__name__
with self._connect as wire:
response = wire.send(command)
if not self.pool_status.connection_success:
raise LoginError(
"Couldn't connect to the registry after three attempts"
)
with self._pool.get() as connection:
response = connection.send(command)
except Timeout as t:
# If more than one pool exists,
# multiple timeouts can be floating around.
# We need to be specific as to which we are targeting.
if t is timeout:
# Flag that the pool is frozen,
# then restart the pool.
self.pool_status.pool_hanging = True
self.start_connection_pool()
except (ValueError, ParsingError) as err:
message = "%s failed to execute due to some syntax error."
logger.warning(message, cmd_type, exc_info=True)
message = f"{cmd_type} failed to execute due to some syntax error."
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message) from err
except TransportError as err:
message = "%s failed to execute due to a connection error."
logger.warning(message, cmd_type, exc_info=True)
message = f"{cmd_type} failed to execute due to a connection error."
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message) from err
except LoginError as err:
message = "%s failed to execute due to a registry login error."
logger.warning(message, cmd_type, exc_info=True)
# For linter due to it not liking this line length
text = "failed to execute due to a registry login error."
message = f"{cmd_type} {text}"
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message) from err
except Exception as err:
message = "%s failed to execute due to an unknown error." % err
logger.warning(message, cmd_type, exc_info=True)
message = f"{cmd_type} failed to execute due to an unknown error."
logger.error(f"{message} Error: {err}", exc_info=True)
raise RegistryError(message) from err
else:
if response.code >= 2000:
raise RegistryError(response.msg, code=response.code)
else:
return response
finally:
# Close the timeout no matter what happens
timeout.close()

def send(self, command, *, cleaned=False):
"""Login, send the command, then close the connection. Tries 3 times."""
# try to prevent use of this method without appropriate safeguards
if not cleaned:
raise ValueError("Please sanitize user input before sending it.")

# Reopen the pool if its closed
# Only occurs when a login error is raised, after connection is successful
if not self.pool_status.pool_running:
# We want to reopen the connection pool,
# but we don't want the end user to wait while it opens.
# Raise syntax doesn't allow this, so we use a try/catch
# block.
try:
logger.error("Can't contact the Registry. Pool was not running.")
raise RegistryError("Can't contact the Registry. Pool was not running.")
except RegistryError as err:
raise err
finally:
# Code execution will halt after here.
# The end user will need to recall .send.
self.start_connection_pool()

counter = 0 # we'll try 3 times
while True:
try:
Expand All @@ -109,11 +170,73 @@ def send(self, command, *, cleaned=False):
else: # don't try again
raise err

def get_pool(self):
"""Get the current pool instance"""
return self._pool

def _create_pool(self, client, login, options):
"""Creates and returns new pool instance"""
return EPPConnectionPool(client, login, options)

def start_connection_pool(self, restart_pool_if_exists=True):
"""Starts a connection pool for the registry.
restart_pool_if_exists -> bool:
If an instance of the pool already exists,
then then that instance will be killed first.
It is generally recommended to keep this enabled.
"""
# Since we reuse the same creds for each pool, we can test on
# one socket, and if successful, then we know we can connect.
if not self._test_registry_connection_success():
logger.warning("Cannot contact the Registry")
self.pool_status.connection_success = False
else:
self.pool_status.connection_success = True

# If this function is reinvoked, then ensure
# that we don't have duplicate data sitting around.
if self._pool is not None and restart_pool_if_exists:
logger.info("Connection pool restarting...")
self.kill_pool()

self._pool = self._create_pool(self._client, self._login, self.pool_options)

self.pool_status.pool_running = True
self.pool_status.pool_hanging = False

logger.info("Connection pool started")

def kill_pool(self):
"""Kills the existing pool. Use this instead
of self._pool = None, as that doesn't clear
gevent instances."""
if self._pool is not None:
self._pool.kill_all_connections()
self._pool = None
self.pool_status.pool_running = False
return None
logger.info("kill_pool() was invoked but there was no pool to delete")

def _test_registry_connection_success(self):
"""Check that determines if our login
credentials are valid, and/or if the Registrar
can be contacted
"""
socket = Socket(self._client, self._login)
can_login = False

# Something went wrong if this doesn't exist
if hasattr(socket, "test_connection_success"):
can_login = socket.test_connection_success()

return can_login


try:
# Initialize epplib
CLIENT = EPPLibWrapper()
logger.debug("registry client initialized")
logger.info("registry client initialized")
except Exception:
CLIENT = None # type: ignore
logger.warning(
Expand Down
68 changes: 65 additions & 3 deletions src/epplibwrapper/socket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging
from time import sleep

try:
from epplib import commands
from epplib.client import Client
except ImportError:
pass

Expand All @@ -14,24 +16,84 @@
class Socket:
"""Context manager which establishes a TCP connection with registry."""

def __init__(self, client, login) -> None:
def __init__(self, client: Client, login: commands.Login) -> None:
"""Save the epplib client and login details."""
self.client = client
self.login = login

def __enter__(self):
"""Runs connect(), which opens a connection with EPPLib."""
self.connect()

def __exit__(self, *args, **kwargs):
"""Runs disconnect(), which closes a connection with EPPLib."""
self.disconnect()

def connect(self):
"""Use epplib to connect."""
self.client.connect()
response = self.client.send(self.login)
if response.code >= 2000:
if self.is_login_error(response.code):
self.client.close()
raise LoginError(response.msg)
return self.client

def __exit__(self, *args, **kwargs):
def disconnect(self):
"""Close the connection."""
try:
self.client.send(commands.Logout())
self.client.close()
except Exception:
logger.warning("Connection to registry was not cleanly closed.")

def send(self, command):
"""Sends a command to the registry.
If the RegistryError code is >= 2000,
then this function raises a LoginError.
The calling function should handle this."""
response = self.client.send(command)
if self.is_login_error(response.code):
self.client.close()
raise LoginError(response.msg)

return response

def is_login_error(self, code):
"""Returns the result of code >= 2000 for RegistryError.
This indicates that something weird happened on the Registry,
and that we should return a LoginError."""
return code >= 2000

def test_connection_success(self):
"""Tests if a successful connection can be made with the registry.
Tries 3 times."""
# Something went wrong if this doesn't exist
if not hasattr(self.client, "connect"):
logger.warning("self.client does not have a connect attribute")
return False

counter = 0 # we'll try 3 times
while True:
try:
self.client.connect()
response = self.client.send(self.login)
except LoginError as err:
if err.should_retry() and counter < 3:
counter += 1
sleep((counter * 50) / 1000) # sleep 50 ms to 150 ms
else: # don't try again
return False
# Occurs when an invalid creds are passed in - such as on localhost
except OSError as err:
logger.error(err)
return False
else:
self.disconnect()

# If we encounter a login error, fail
if self.is_login_error(response.code):
logger.warning("A login error was found in test_connection_success")
return False

# Otherwise, just return true
return True
Empty file.
Loading

0 comments on commit 95e557b

Please sign in to comment.