Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-5089] support external connections with node port #21

Merged
merged 23 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
options:
expose-external:
description: "String to determine how to expose the mongos router externally from the Kubernetes cluster. Possible values: 'nodeport', 'none'"
type: string
default: "none"
272 changes: 143 additions & 129 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package-mode = false


[tool.poetry.dependencies]
lightkube = "^0.15.3"
python = "^3.10"
ops = "^2.15.0"
pymongo = "^4.7.3"
Expand Down
87 changes: 79 additions & 8 deletions src/charm.py
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
# See LICENSE file for licensing details.
from ops.main import main
import json

from exceptions import MissingSecretError
from ops.pebble import PathError, ProtocolError, Layer
from node_port import NodePortManager

from typing import Set, Optional, Dict
from charms.mongodb.v0.config_server_interface import ClusterRequirer

Expand All @@ -27,7 +28,7 @@

import ops
from ops.model import BlockedStatus, Container, Relation, ActiveStatus, Unit
from ops.charm import StartEvent, RelationDepartedEvent
from ops.charm import StartEvent, RelationDepartedEvent, ConfigChangedEvent

import logging

Expand Down Expand Up @@ -59,19 +60,56 @@ class MongosCharm(ops.CharmBase):

def __init__(self, *args):
super().__init__(*args)
self.role = Config.Role.MONGOS
self.secrets = SecretCache(self)
self.status = MongosStatusHandler(self)
self.node_port_manager = NodePortManager(self)

# lifecycle events
self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(
self.on.mongos_pebble_ready, self._on_mongos_pebble_ready
)
self.framework.observe(self.on.start, self._on_start)
self.framework.observe(self.on.update_status, self._on_update_status)
self.tls = MongoDBTLS(self, Config.Relations.PEERS, substrate=Config.SUBSTRATE)

self.role = Config.Role.MONGOS
self.secrets = SecretCache(self)
self.status = MongosStatusHandler(self)
# relations
self.tls = MongoDBTLS(self, Config.Relations.PEERS, substrate=Config.SUBSTRATE)
self.cluster = ClusterRequirer(self, substrate=Config.SUBSTRATE)

# BEGIN: hook functions
def _on_config_changed(self, event: ConfigChangedEvent) -> None:
"""Listen to changes in the application configuration."""
previous_config = self.expose_external
external_config = self.model.config["expose-external"]
if external_config not in Config.ExternalConnections.VALID_EXTERNAL_CONFIG:
logger.error(
"External configuration: %s for expose-external is not valid, should be one of: %s",
external_config,
Config.ExternalConnections.VALID_EXTERNAL_CONFIG,
)
self.status.set_and_share_status(Config.Status.INVALID_EXTERNAL_CONFIG)
return

self.expose_external = external_config
if external_config == Config.ExternalConnections.EXTERNAL_NODEPORT:
# every unit attempts to create a nodeport service - if exists, will silently continue
self.node_port_manager.apply_service(
service=self.node_port_manager.build_node_port_services(
port=Config.MONGOS_PORT
)
)

if (
external_config == Config.ExternalConnections.NONE
and previous_config == Config.ExternalConnections.EXTERNAL_NODEPORT
):
# TODO DPE-5268 - support revoking external access
pass

# TODO DPE-5235 support updating data-integrator clients to have/not have public IP
# depending on the result of the configuration

def _on_mongos_pebble_ready(self, event) -> None:
"""Configure MongoDB pebble layer specification."""
if not self.is_integrated_to_config_server():
Expand Down Expand Up @@ -115,6 +153,17 @@ def _on_start(self, event: StartEvent) -> None:

def _on_update_status(self, _):
"""Handle the update status event"""
if (
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
self.model.config["expose-external"]
not in Config.ExternalConnections.VALID_EXTERNAL_CONFIG
):
logger.error(
"External configuration: %s for expose-external is not valid, should be one of: %s",
self.expose_external,
Config.ExternalConnections.VALID_EXTERNAL_CONFIG,
)
self.unit.status = Config.Status.INVALID_EXTERNAL_CONFIG

if self.unit.status == Config.Status.UNHEALTHY_UPGRADE:
return

Expand Down Expand Up @@ -415,6 +464,28 @@ def unit_host(self, unit: Unit) -> str:
# END: helper functions

# BEGIN: properties
@property
def expose_external(self) -> Optional[str]:
"""Returns mode of exposure for external connections."""
if (
self.app_peer_data.get("expose-external", Config.ExternalConnections.NONE)
== Config.ExternalConnections.NONE
):
return None

return self.app_peer_data["expose-external"]

@expose_external.setter
def expose_external(self, expose_external: str) -> None:
"""Set the db_initialised flag."""
if not self.unit.is_leader():
return

if expose_external not in Config.ExternalConnections.VALID_EXTERNAL_CONFIG:
return

self.app_peer_data["expose-external"] = expose_external
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to store value to peer data

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should, since the user could incorrectly set the config, I would like to store the most recent correct config


@property
def peers_units(self) -> list[Unit]:
"""Get peers units in a safe way."""
Expand Down Expand Up @@ -470,13 +541,13 @@ def _unit_ip(self) -> str:
return str(self.model.get_binding(Config.Relations.PEERS).network.bind_address)

@property
def is_external_client(self) -> Optional[str]:
def is_external_client(self) -> bool:
"""Returns the connectivity mode which mongos should use.

Note that for K8s routers this should always default to True. However we still include
delgod marked this conversation as resolved.
Show resolved Hide resolved
this function so that we can have parity on properties with the K8s and VM routers.
"""
return True
return self.expose_external == Config.ExternalConnections.EXTERNAL_NODEPORT

@property
def database(self) -> Optional[str]:
Expand Down
10 changes: 10 additions & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ class Config:
LICENSE_PATH = "/licenses/LICENSE"
DATA_DIR = "/var/lib/mongodb"

class ExternalConnections:
"""External Connections related config for MongoDB Charm."""

NONE = "none"
EXTERNAL_NODEPORT = "nodeport"
VALID_EXTERNAL_CONFIG = [NONE, EXTERNAL_NODEPORT]

class Relations:
"""Relations related config for MongoDB Charm."""

Expand Down Expand Up @@ -68,6 +75,9 @@ class Status:

# TODO Future PR add more status messages here as constants
UNHEALTHY_UPGRADE = BlockedStatus("Unhealthy after upgrade.")
INVALID_EXTERNAL_CONFIG = BlockedStatus(
"Config option for expose-external not valid."
)

class Substrate:
"""Substrate related constants."""
Expand Down
108 changes: 108 additions & 0 deletions src/node_port.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
#!/usr/bin/env python3
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

"""Manager for handling mongos Kubernetes resources for a single mongos pod."""

import logging
from functools import cached_property
from ops.charm import CharmBase
from lightkube.models.meta_v1 import ObjectMeta, OwnerReference
from lightkube.core.client import Client
from lightkube.core.exceptions import ApiError
from lightkube.resources.core_v1 import Pod, Service
from lightkube.models.core_v1 import ServicePort, ServiceSpec


logger = logging.getLogger(__name__)

# default logging from lightkube httpx requests is very noisy
logging.getLogger("lightkube").disabled = True
logging.getLogger("lightkube.core.client").disabled = True
logging.getLogger("httpx").disabled = True
logging.getLogger("httpcore").disabled = True


class NodePortManager:
"""Manager for handling mongos Kubernetes resources for a single mongos pod."""

def __init__(
self,
charm: CharmBase,
):
self.charm = charm
self.pod_name = self.charm.unit.name.replace("/", "-")
self.app_name = self.charm.app.name
self.namespace = self.charm.model.name

@cached_property
def client(self) -> Client:
"""The Lightkube client."""
return Client( # pyright: ignore[reportArgumentType]
field_manager=self.pod_name,
namespace=self.namespace,
)

# --- GETTERS ---

def get_pod(self, pod_name: str = "") -> Pod:
"""Gets the Pod via the K8s API."""
# Allows us to get pods from other peer units
return self.client.get(
res=Pod,
name=pod_name or self.pod_name,
)

def build_node_port_services(self, port: str) -> Service:
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
"""Builds a ClusterIP service for initial client connection."""
pod = self.get_pod(pod_name=self.pod_name)
if not pod.metadata:
raise Exception(f"Could not find metadata for {pod}")

unit_id = self.charm.unit.name.split("/")[1]
return Service(
metadata=ObjectMeta(
name=f"{self.app_name}-{unit_id}-external",
namespace=self.namespace,
# When we scale-down K8s will keep the Services for the deleted units around,
# unless the Services' owner is also deleted.
ownerReferences=[
OwnerReference(
apiVersion=pod.apiVersion,
kind=pod.kind,
name=self.pod_name,
uid=pod.metadata.uid,
blockOwnerDeletion=False,
)
],
),
spec=ServiceSpec(
externalTrafficPolicy="Local",
type="NodePort",
selector={
"statefulset.kubernetes.io/pod-name": self.pod_name,
},
ports=[
ServicePort(
protocol="TCP",
port=port,
targetPort=port,
name=f"{self.pod_name}-port",
)
],
),
)

def apply_service(self, service: Service) -> None:
"""Applies a given Service."""
try:
self.client.apply(service)
except ApiError as e:
if e.status.code == 403:
logger.error("Could not apply service, application needs `juju trust`")
return
if e.status.code == 422 and "port is already allocated" in e.status.message:
logger.error(e.status.message)
return
else:
raise
2 changes: 2 additions & 0 deletions tests/integration/client_relations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
77 changes: 77 additions & 0 deletions tests/integration/client_relations/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.

import subprocess
import logging

from ..helpers import (
MONGOS_APP_NAME,
get_mongos_user_password,
MongoClient,
)

from pytest_operator.plugin import OpsTest

logger = logging.getLogger(__name__)

PORT_MAPPING_INDEX = 4


def get_port_from_node_port(ops_test: OpsTest, node_port_name: str) -> str:
node_port_cmd = f"kubectl get svc -n {ops_test.model.name} | grep NodePort | grep {node_port_name}"
result = subprocess.run(node_port_cmd, shell=True, capture_output=True, text=True)
if result.returncode:
logger.info("was not able to find nodeport")
assert False, f"Command: {node_port_cmd} to find node port failed."

assert (
len(result.stdout.splitlines()) > 0
), "No port information available for expected service"

# port information is available at PORT_MAPPING_INDEX
port_mapping = result.stdout.split()[PORT_MAPPING_INDEX]

# port information is of the form 27018:30259/TCP
return port_mapping.split(":")[1].split("/")[0]


def assert_node_port_available(ops_test: OpsTest, node_port_name: str) -> None:
assert get_port_from_node_port(
ops_test, node_port_name
), "No port information for expected service"


async def assert_all_unit_node_ports_available(ops_test: OpsTest):
"""Assert all ports available in mongos deployment."""
for unit_id in range(len(ops_test.model.applications[MONGOS_APP_NAME].units)):
assert_node_port_available(
ops_test, node_port_name=f"{MONGOS_APP_NAME}-{unit_id}-external"
)

exposed_node_port = get_port_from_node_port(
ops_test, node_port_name=f"{MONGOS_APP_NAME}-{unit_id}-external"
)
public_k8s_ip = get_public_k8s_ip()
username, password = await get_mongos_user_password(ops_test, MONGOS_APP_NAME)
external_mongos_client = MongoClient(
f"mongodb://{username}:{password}@{public_k8s_ip}:{exposed_node_port}"
)
external_mongos_client.admin.command("usersInfo")
external_mongos_client.close()


def get_public_k8s_ip() -> str:
result = subprocess.run(
"kubectl get nodes", shell=True, capture_output=True, text=True
)

if result.returncode:
logger.info("failed to retrieve public facing k8s IP")
assert False, "failed to retrieve public facing k8s IP"

# port information is the first item of the last line
port_mapping = result.stdout.splitlines()[-1].split()[0]

# port mapping is of the form ip-172-31-18-133
return port_mapping.split("ip-")[1].replace("-", ".")
Loading
Loading