Skip to content

Commit

Permalink
General code quality improvements for the controller part of this rep…
Browse files Browse the repository at this point in the history
…ository.

Make sure exported symbols are correctly set in __init__.py with __all__.
Renamed pytest fixture for creating a k8s namespace from ns to namespace since ns is a bit overloaded, especially in test_controller.py
Introduce typehints in controller.py and test_controller.py to massively improve developer experience (previously, one just really had to guess what functions are for and which types are expected)
Removed unused function parameters
Made function headers (i.e. parameter [type] defintion) compatible with what `kopf.on.[create/change/...]' expects
Introduce mypy (and missing stubs of external packages if available)
Introduce mypy ignores for (still) untyped parts of the codebase for now
  • Loading branch information
Jonas Dedden committed Apr 19, 2024
1 parent 323e0d1 commit 9855fd4
Show file tree
Hide file tree
Showing 9 changed files with 483 additions and 241 deletions.
1 change: 0 additions & 1 deletion dask_kubernetes/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from importlib import import_module
from warnings import warn

from . import config
from . import _version
Expand Down
6 changes: 3 additions & 3 deletions dask_kubernetes/classic/tests/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ async def test_start_with_workers(k8s_cluster, pod_spec):

@pytest.mark.anyio
@pytest.mark.xfail(reason="Flaky in CI and classic is deprecated anyway")
async def test_adapt_delete(cluster, ns):
async def test_adapt_delete(cluster, namespace):
"""
testing whether KubeCluster.adapt will bring
back deleted worker pod (issue #244)
Expand All @@ -782,7 +782,7 @@ async def test_adapt_delete(cluster, ns):

async def get_worker_pods():
pods_list = await core_api.list_namespaced_pod(
namespace=ns,
namespace=namespace,
label_selector=f"dask.org/component=worker,dask.org/cluster-name={cluster.name}",
)
return [x.metadata.name for x in pods_list.items]
Expand All @@ -797,7 +797,7 @@ async def get_worker_pods():
assert len(worker_pods) == 2
# delete one worker pod
to_delete = worker_pods[0]
await core_api.delete_namespaced_pod(name=to_delete, namespace=ns)
await core_api.delete_namespaced_pod(name=to_delete, namespace=namespace)
# wait until it is deleted
start = time()
while True:
Expand Down
29 changes: 16 additions & 13 deletions dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,38 @@
import sys
import tempfile
import uuid
from typing import Final, Iterator

import pytest
from kopf.testing import KopfRunner
from pytest_kind.cluster import KindCluster

from dask_kubernetes.common.utils import check_dependency

DIR = pathlib.Path(__file__).parent.absolute()
DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.absolute()

check_dependency("helm")
check_dependency("kubectl")
check_dependency("docker")

DISABLE_LOGGERS = ["httpcore", "httpx"]
DISABLE_LOGGERS: Final[list[str]] = ["httpcore", "httpx"]


def pytest_configure():
def pytest_configure() -> None:
for logger_name in DISABLE_LOGGERS:
logger = logging.getLogger(logger_name)
logger.disabled = True


@pytest.fixture()
def kopf_runner(k8s_cluster, ns):
def kopf_runner(k8s_cluster: KindCluster, namespace: str) -> KopfRunner:
yield KopfRunner(
["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", ns]
["run", "-m", "dask_kubernetes.operator", "--verbose", "--namespace", namespace]
)


@pytest.fixture(scope="session")
def docker_image():
def docker_image() -> str:
image_name = "dask-kubernetes:dev"
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
subprocess.run(
Expand All @@ -54,7 +55,9 @@ def docker_image():


@pytest.fixture(scope="session")
def k8s_cluster(request, docker_image):
def k8s_cluster(
request: pytest.FixtureRequest, docker_image: str
) -> Iterator[KindCluster]:
image = None
if version := os.environ.get("KUBERNETES_VERSION"):
image = f"kindest/node:v{version}"
Expand All @@ -73,7 +76,7 @@ def k8s_cluster(request, docker_image):


@pytest.fixture(scope="session", autouse=True)
def install_istio(k8s_cluster):
def install_istio(k8s_cluster: KindCluster) -> None:
if bool(os.environ.get("TEST_ISTIO", False)):
check_dependency("istioctl")
subprocess.run(
Expand All @@ -85,15 +88,15 @@ def install_istio(k8s_cluster):


@pytest.fixture(autouse=True)
def ns(k8s_cluster):
def namespace(k8s_cluster: KindCluster) -> Iterator[str]:
ns = "dask-k8s-pytest-" + uuid.uuid4().hex[:10]
k8s_cluster.kubectl("create", "ns", ns)
yield ns
k8s_cluster.kubectl("delete", "ns", ns, "--wait=false", "--ignore-not-found=true")


@pytest.fixture(scope="session", autouse=True)
def install_gateway(k8s_cluster):
def install_gateway(k8s_cluster: KindCluster) -> Iterator[None]:
if bool(os.environ.get("TEST_DASK_GATEWAY", False)):
check_dependency("helm")
# To ensure the operator can coexist with Gateway
Expand Down Expand Up @@ -129,11 +132,11 @@ def install_gateway(k8s_cluster):


@pytest.fixture(scope="session", autouse=True)
def customresources(k8s_cluster):
def customresources(k8s_cluster: KindCluster) -> Iterator[None]:
temp_dir = tempfile.TemporaryDirectory()
crd_path = os.path.join(DIR, "operator", "customresources")

def run_generate(crd_path, patch_path, temp_path):
def run_generate(crd_path: str, patch_path: str, temp_path: str) -> None:
subprocess.run(
["k8s-crd-resolver", "-r", "-j", patch_path, crd_path, temp_path],
check=True,
Expand All @@ -154,5 +157,5 @@ def run_generate(crd_path, patch_path, temp_path):


@pytest.fixture
def anyio_backend():
def anyio_backend() -> str:
return "asyncio"
8 changes: 8 additions & 0 deletions dask_kubernetes/operator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,11 @@
make_worker_spec,
discover,
)

__all__ = [
"KubeCluster",
"make_cluster_spec",
"make_scheduler_spec",
"make_worker_spec",
"discover",
]
Loading

0 comments on commit 9855fd4

Please sign in to comment.