Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5089] support external connections with node port #21

Merged
merged 23 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
411 changes: 224 additions & 187 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package-mode = false


[tool.poetry.dependencies]
lightkube = "^0.15.3"
python = "^3.10"
ops = "^2.15.0"
pymongo = "^4.7.3"
Expand Down
40 changes: 25 additions & 15 deletions src/charm.py
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,11 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import json

from exceptions import MissingSecretError

from ops.pebble import PathError, ProtocolError


from typing import Set, Optional, Dict


from node_port import NodePortManager
from charms.mongos.v0.set_status import MongosStatusHandler

from charms.mongodb.v0.mongodb_secrets import SecretCache
from charms.mongodb.v0.mongodb_secrets import generate_secret_label
from charms.mongodb.v1.mongos import MongosConfiguration, MongosConnection
Expand Down Expand Up @@ -68,9 +62,16 @@ def __init__(self, *args):
self.secrets = SecretCache(self)
self.status = MongosStatusHandler(self)

self.node_port_manager = NodePortManager(
self, pod_name=self.unit.name.replace("/", "-"), namespace=self.model.name
)

# BEGIN: hook functions
def _on_mongos_pebble_ready(self, event) -> None:
"""Configure MongoDB pebble layer specification."""
# any external services must be created before setting of properties
self.update_external_services()
marcoppenheimer marked this conversation as resolved.
Show resolved Hide resolved

if not self.is_integrated_to_config_server():
logger.info(
"mongos service not starting. Cannot start until application is integrated to a config-server."
Expand Down Expand Up @@ -129,9 +130,22 @@ def _on_update_status(self, _):
# END: hook functions

# BEGIN: helper functions
def update_external_services(self) -> None:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
"""Attempts to update any external Kubernetes services."""
if not self.is_external_client:
return

# every unit attempts to create a nodeport service
# if exists, will silently continue
self.node_port_manager.apply_service(
service=self.node_port_manager.build_node_port_services(
port=Config.MONGOS_PORT
)
)
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved

def is_integrated_to_config_server(self) -> bool:
"""Returns True if the mongos application is integrated to a config-server."""
return self.model.relations[Config.Relations.CLUSTER_RELATIONS_NAME] is not None
return len(self.model.relations[Config.Relations.CLUSTER_RELATIONS_NAME])
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved

def _get_mongos_config_for_user(
self, user: MongoDBUser, hosts: Set[str]
Expand Down Expand Up @@ -404,14 +418,10 @@ def _unit_ip(self) -> str:
def is_external_client(self) -> Optional[str]:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
"""Returns the connectivity mode which mongos should use.

This is determined by checking the modes requested by the client(s).

TODO: Future PR. This should be modified to work for many clients.
Note that for K8s routers this should always default to True. However we still include
delgod marked this conversation as resolved.
Show resolved Hide resolved
this function so that we can have parity on properties with the K8s and VM routers.
"""
if EXTERNAL_CONNECTIVITY_TAG not in self.app_peer_data:
return False

return json.loads(self.app_peer_data.get(EXTERNAL_CONNECTIVITY_TAG))
return True
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved

@property
def database(self) -> Optional[str]:
Expand Down
101 changes: 101 additions & 0 deletions src/node_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#!/usr/bin/env python3
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Manager for handling mongos Kubernetes resources for a single mongos pod."""

import logging
from functools import cached_property
from ops.charm import CharmBase
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
from lightkube.resources.core_v1 import Pod, Service
from lightkube.models.core_v1 import ServicePort, ServiceSpec


logger = logging.getLogger(__name__)

# default logging from lightkube httpx requests is very noisy
logging.getLogger("lightkube").disabled = True
logging.getLogger("lightkube.core.client").disabled = True
logging.getLogger("httpx").disabled = True
logging.getLogger("httpcore").disabled = True


class NodePortManager:
"""Manager for handling mongos Kubernetes resources for a single mongos pod."""

def __init__(
self,
charm: CharmBase,
pod_name: str,
namespace: str,
):
self.charm = charm
self.pod_name = pod_name
self.app_name = "-".join(pod_name.split("-")[:-1])
self.namespace = namespace
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
self.nodeport_service_name = f"{self.app_name}-nodeport"
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved

@cached_property
def client(self) -> Client:
"""The Lightkube client."""
return Client( # pyright: ignore[reportArgumentType]
field_manager=self.pod_name,
namespace=self.namespace,
)

# --- GETTERS ---

def get_pod(self, pod_name: str = "") -> Pod:
"""Gets the Pod via the K8s API."""
# Allows us to get pods from other peer units
pod_name = pod_name or self.pod_name

return self.client.get(
res=Pod,
name=self.pod_name,
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
)

def build_node_port_services(self, port: str) -> Service:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
"""Builds a ClusterIP service for initial client connection."""
pod = self.get_pod(pod_name=self.pod_name)
if not pod.metadata:
raise Exception(f"Could not find metadata for {pod}")

unit_id = self.charm.unit.name.split("/")[1]
return Service(
metadata=ObjectMeta(
name=f"{self.charm.app.name}-{unit_id}-external",
namespace=self.namespace,
# owned by the StatefulSet
ownerReferences=pod.metadata.ownerReferences,
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
),
spec=ServiceSpec(
type="NodePort",
selector={"app.kubernetes.io/name": self.pod_name},
ports=[
ServicePort(
protocol="TCP",
port=port,
targetPort=port,
name=f"{self.charm.app.name}",
)
],
),
)

def apply_service(self, service: Service) -> None:
"""Applies a given Service."""
try:
self.client.apply(service)
except ApiError as e:
if e.status.code == 403:
logger.error("Could not apply service, application needs `juju trust`")
return
if e.status.code == 422 and "port is already allocated" in e.status.message:
logger.error(e.status.message)
return
else:
raise
45 changes: 43 additions & 2 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
# See LICENSE file for licensing details.

import json
from typing import Any, Dict, List, Optional
import subprocess
import logging

from typing import Any, Dict, List, Optional
from dateutil.parser import parse
from pytest_operator.plugin import OpsTest
from tenacity import Retrying, stop_after_delay, wait_fixed

logger = logging.getLogger(__name__)


PORT_MAPPING_INDEX = 4
MONGOS_APP_NAME = "mongos"


Expand Down Expand Up @@ -145,3 +146,43 @@ async def wait_for_mongos_units_blocked(
await check_all_units_blocked_with_status(ops_test, db_app_name, status)
finally:
await ops_test.model.set_config({hook_interval_key: old_interval})


def get_port_from_node_port(ops_test: OpsTest, node_port_name: str) -> None:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
node_port_cmd = f"kubectl get svc -n {ops_test.model.name} | grep NodePort | grep {node_port_name}"
result = subprocess.run(node_port_cmd, shell=True, capture_output=True, text=True)
if result.returncode:
logger.info("was not able to find nodeport")
assert False, f"Command: {node_port_cmd} to find node port failed."

assert (
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
len(result.stdout.splitlines()) > 0
), "No port information available for expected service"

# port information is available at PORT_MAPPING_INDEX
port_mapping = result.stdout.split()[PORT_MAPPING_INDEX]

# port information is of the form 27018:30259/TCP
return port_mapping.split(":")[1].split("/")[0]


def assert_node_port_available(ops_test: OpsTest, node_port_name: str) -> None:
assert get_port_from_node_port(
ops_test, node_port_name
), "No port information for expected service"


def get_public_k8s_ip() -> str:
result = subprocess.run(
"kubectl get nodes", shell=True, capture_output=True, text=True
)

if result.returncode:
logger.info("failed to retrieve public facing k8s IP")
assert False, "failed to retrieve public facing k8s IP"

# port information is the first item of the last line
port_mapping = result.stdout.splitlines()[-1].split()[0]

# port mapping is of the form ip-172-31-18-133
return port_mapping.split("ip-")[1].replace("-", ".")
26 changes: 26 additions & 0 deletions tests/integration/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pytest_operator.plugin import OpsTest

from .helpers import (
assert_node_port_available,
wait_for_mongos_units_blocked,
MONGOS_APP_NAME,
)
Expand All @@ -32,6 +33,11 @@ async def test_build_and_deploy(ops_test: OpsTest):
resources=resources,
application_name=MONGOS_APP_NAME,
series="jammy",
num_units=2,
)

await ops_test.model.wait_for_idle(
apps=[MONGOS_APP_NAME], timeout=1000, idle_period=30
)


Expand All @@ -47,3 +53,23 @@ async def test_waits_for_config_server(ops_test: OpsTest) -> None:
status="Missing relation to config-server.",
timeout=300,
)


@pytest.mark.group(1)
@pytest.mark.abort_on_fail
async def test_mongos_external_connections(ops_test: OpsTest) -> None:
"""Tests that mongos is accessible externally."""
for unit_id in range(len(ops_test.model.applications[MONGOS_APP_NAME].units)):
assert_node_port_available(
ops_test, node_port_name=f"{MONGOS_APP_NAME}-{unit_id}-external"
)

# TODO add this in once DPE-5040 / PR #20 merges
# exposed_node_port = get_port_from_node_port(ops_test, node_port_name="mongos-k8s-nodeport")
# public_k8s_ip = get_public_k8s_ip()
# username, password = await get_mongos_user_password(ops_test, MONGOS_APP_NAME)
# external_mongos_client = MongoClient(
# f"mongodb://{username}:{password}@{public_k8s_ip}:{exposed_node_port}"
# )
# external_mongos_client.admin.command("usersInfo")
# external_mongos_client.close()
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
Loading