Skip to content

Commit

Permalink
feat: support HPA-like stabilization window
Browse files Browse the repository at this point in the history
Support stabilization window for scale up and scale down
  • Loading branch information
capuche2412 committed Jun 28, 2024
1 parent 2dbdce9 commit 6fe2ea8
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 72 deletions.
169 changes: 113 additions & 56 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import os
import threading
from dataclasses import dataclass
from time import sleep
from time import sleep, time

import kubernetes
from kubernetes import watch
Expand Down Expand Up @@ -37,9 +37,16 @@ class HPA:
metric_value_path: str
target_kind: str
target_name: str
scale_up_stabilization_window: int
scale_down_stabilization_window: int


SYNC_INTERVAL = 30
class MetricNotFound(Exception):
pass


SYNC_INTERVAL = 10
STABILIZATION_CHECK_INTERVAL = 1
HPAs: dict[str, HPA] = {}


Expand All @@ -64,6 +71,8 @@ def _watch():

def watch_hpa(args) -> None:
LOGGER.info(f"Will watch HPA with {args.hpa_label_selector=} in {args.hpa_namespace=}.")
LOGGER.info(f"The scale_up_stabilization_window is set to {args.scale_up_stabilization_window}s.")
LOGGER.info(f"The scale_down_stabilization_window is set to {args.scale_down_stabilization_window}s.")
while True:
try:
w = watch.Watch()
Expand All @@ -72,13 +81,17 @@ def watch_hpa(args) -> None:
args.hpa_namespace,
label_selector=args.hpa_label_selector,
):
update_hpa(event["object"].metadata)
update_hpa(
event["object"].metadata,
scale_up_stabilization_window=args.scale_up_stabilization_window,
scale_down_stabilization_window=args.scale_down_stabilization_window,
)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 410:
raise exc


def update_hpa(metadata) -> None:
def update_hpa(metadata, scale_up_stabilization_window, scale_down_stabilization_window) -> None:
"""
inserts/updates/deletes the HPA to/in/from HPAs.
"""
Expand All @@ -92,6 +105,8 @@ def update_hpa(metadata) -> None:
metric_value_path=build_metric_value_path(hpa),
target_kind=hpa.spec.scale_target_ref.kind,
target_name=hpa.spec.scale_target_ref.name,
scale_up_stabilization_window=scale_up_stabilization_window,
scale_down_stabilization_window=scale_down_stabilization_window,
)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 404:
Expand Down Expand Up @@ -124,7 +139,7 @@ def build_metric_value_path(hpa) -> str:
def get_needed_replicas(metric_value_path) -> int | None:
"""
returns 0 if the metric value is 0, and 1 otherwise (HPA will take care of scaling up if needed)
returns None, if the needed replicas cannot be determined.
raise MetricNotFound, if the needed replicas cannot be determined.
"""
try:
# We suppose the MetricValueList does contain one item
Expand All @@ -133,75 +148,101 @@ def get_needed_replicas(metric_value_path) -> int | None:
match exc.status:
case 404 | 503 | 403:
LOGGER.exception(f"Could not get Custom metric at {metric_value_path}: {exc}")
raise MetricNotFound()
case _:
raise exc


def update_target(hpa: HPA) -> None:
def get_replicas(*, hpa, read_scale) -> (int, int):
try:
scale = read_scale(namespace=hpa.namespace, name=hpa.name)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 404:
raise exc
LOGGER.warning(f"{hpa.namespace}/{hpa.name} was not found.")

current_replicas = scale.status.replicas
needed_replicas = get_needed_replicas(hpa.metric_value_path)
if needed_replicas is None:
LOGGER.error(f"Will not update {hpa.target_kind} {hpa.namespace}/{hpa.target_name}.")
return
# Maybe, be more precise (using target_api_version e.g.?)

return current_replicas, needed_replicas


def update_target(hpa: HPA) -> None:

match hpa.target_kind:
case "Deployment":
scale_deployment(
namespace=hpa.namespace,
name=hpa.target_name,
needed_replicas=needed_replicas,
)
read_scale = APP_V1.read_namespaced_deployment_scale
patch_scale = APP_V1.patch_namespaced_deployment_scale
case "StatefulSet":
scale_statefulset(
namespace=hpa.namespace,
name=hpa.target_name,
needed_replicas=needed_replicas,
)
read_scale = APP_V1.read_namespaced_stateful_set_scale
patch_scale = APP_V1.patch_namespaced_stateful_set_scale
case _:
raise ValueError(f"Target kind {hpa.target_kind} not supported.")

try:
if scaling_is_needed(hpa=hpa, read_scale=read_scale):
scale = read_scale(namespace=hpa.namespace, name=hpa.name)
current_replicas, needed_replicas = get_replicas(hpa=hpa, read_scale=read_scale)
scale.spec.replicas = needed_replicas
patch_scale(namespace=hpa.namespace, name=hpa.name, body=scale)
LOGGER.info(
f"{hpa.target_kind} {hpa.namespace}/{hpa.name} was scaled {current_replicas=}->{needed_replicas=}."
)
else:
current_replicas, needed_replicas = get_replicas(hpa=hpa, read_scale=read_scale)
LOGGER.info(
f"No need to scale {hpa.target_kind} {hpa.namespace}/{hpa.name} {current_replicas=} {needed_replicas=}."
)
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 404:
raise exc
LOGGER.warning(f"{hpa.target_kind} {hpa.namespace}/{hpa.name} was not found.")
except MetricNotFound:
LOGGER.error(f"Will not update {hpa.target_kind} {hpa.namespace}/{hpa.target_name}.")


def scaling_is_needed(*, current_replicas, needed_replicas) -> bool:
def scaling_up_is_needed(current_replicas, needed_replicas):
return current_replicas < needed_replicas and needed_replicas == 1


def scaling_down_is_needed(current_replicas, needed_replicas):
return current_replicas > needed_replicas and needed_replicas == 0


def scaling_is_needed(*, hpa, read_scale) -> bool:
"""
checks if the scale up/down is relevant.
check if the metrics is scale up/down is relevant for the stabilization window duration
"""
# Maybe do not scale down if the HPA is unable to retrieve metrics? leave the current only pod do some work
return bool(current_replicas) != bool(needed_replicas)

current_replicas, needed_replicas = get_replicas(hpa=hpa, read_scale=read_scale)

def scale_deployment(*, namespace, name, needed_replicas) -> None:
try:
scale = APP_V1.read_namespaced_deployment_scale(namespace=namespace, name=name)
current_replicas = scale.status.replicas
if not scaling_is_needed(current_replicas=current_replicas, needed_replicas=needed_replicas):
LOGGER.info(f"No need to scale Deployment {namespace}/{name} {current_replicas=} {needed_replicas=}.")
return

scale.spec.replicas = needed_replicas
# Maybe do not scale immediately? but don't want to reimplement an HPA.
APP_V1.patch_namespaced_deployment_scale(namespace=namespace, name=name, body=scale)
LOGGER.info(f"Deployment {namespace}/{name} was scaled {current_replicas=}->{needed_replicas=}.")
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 404:
raise exc
LOGGER.warning(f"Deployment {namespace}/{name} was not found.")
if scaling_up_is_needed(current_replicas, needed_replicas):
stabilization_window = hpa.scale_up_stabilization_window
elif scaling_down_is_needed(current_replicas, needed_replicas):
stabilization_window = hpa.scale_down_stabilization_window
else:
return False

if stabilization_window != 0:

def scale_statefulset(*, namespace, name, needed_replicas) -> None:
try:
scale = APP_V1.read_namespaced_stateful_set_scale(namespace=namespace, name=name)
current_replicas = scale.status.replicas
if not scaling_is_needed(current_replicas=current_replicas, needed_replicas=needed_replicas):
LOGGER.info(f"No need to scale statefulset {namespace}/{name} {current_replicas=} {needed_replicas=}.")
return

scale.spec.replicas = needed_replicas
# Maybe do not scale immediately? but don't want to reimplement an HPA.
APP_V1.patch_namespaced_stateful_set_scale(namespace=namespace, name=name, body=scale)
LOGGER.info(f"StatefulSet {namespace}/{name} was scaled {current_replicas=}->{needed_replicas=}.")
except kubernetes.client.exceptions.ApiException as exc:
if exc.status != 404:
raise exc
LOGGER.warning(f"StatefulSet {namespace}/{name} was not found.")
stabilization_end_time = time() + stabilization_window

LOGGER.info(
f"{hpa.target_kind} {hpa.namespace}/{hpa.name} will be scaled ({current_replicas=}->{needed_replicas=}). "
"Waiting for stabilization..."
)

while time() < stabilization_end_time:

current_replicas, needed_replicas = get_replicas(hpa=hpa, read_scale=read_scale)

if bool(current_replicas) == bool(needed_replicas):
LOGGER.info(f"{hpa.target_kind} {hpa.namespace}/{hpa.name} scale is canceled due to stabilization.")
return False

sleep(STABILIZATION_CHECK_INTERVAL)

return True


def parse_cli_args():
Expand All @@ -220,6 +261,22 @@ def parse_cli_args():
default="",
help="label_selector to get HPA to watch, 'foo=bar,bar=foo' e.g. (default: empty string to select all)",
)
# TODO: Remove when spec.behavior.scaleUp.stabilizationWindowSeconds can be retreived with the K8s Python API
parser.add_argument(
"--scale-up-stabilization-window",
dest="scale_up_stabilization_window",
default="0",
help="scale_up_stabilization_window restricts the flapping of replica count while scaling up (default: 0)",
type=int,
)
# TODO: Remove when spec.behavior.scaleDown.stabilizationWindowSeconds can be retreived with the K8s Python API
parser.add_argument(
"--scale-down-stabilization-window",
dest="scale_down_stabilization_window",
default="0",
help="scale_down_stabilization_window restricts the flapping of replica count while scaling up (default: 0)",
type=int,
)

return parser.parse_args()

Expand Down
97 changes: 94 additions & 3 deletions tests/e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import subprocess
from pathlib import Path
from time import sleep

import pytest

Expand All @@ -10,7 +11,7 @@
TESTS_PATH = Path(__file__).parent
MANIFESTS_PATH = TESTS_PATH / "manifests"

TIMEOUT = SYNC_INTERVAL * 3
TIMEOUT = SYNC_INTERVAL * 6

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -74,8 +75,27 @@ def delete_target(manifest: str):
run(command=["kubectl", "delete", "-f", f"{MANIFESTS_PATH}/{manifest}", "--wait=true"])


def run_scaler():
return subprocess.Popen(["python", f"{TESTS_PATH.parent}/main.py"])
def run_scaler(scale_up_stabilization_window: int = 0, scale_down_stabilization_window: int = 0):
if scale_up_stabilization_window != 0:
return subprocess.Popen(
[
"python",
f"{TESTS_PATH.parent}/main.py",
"--scale-up-stabilization-window",
str(scale_up_stabilization_window),
]
)
elif scale_down_stabilization_window != 0:
return subprocess.Popen(
[
"python",
f"{TESTS_PATH.parent}/main.py",
"--scale-down-stabilization-window",
str(scale_down_stabilization_window),
]
)
else:
return subprocess.Popen(["python", f"{TESTS_PATH.parent}/main.py"])


def set_foo_metric_value(value: int):
Expand Down Expand Up @@ -128,3 +148,74 @@ def test_target(setup, target_name: str, kind: str):
finally:
khstz.kill()
delete_target(f"{target_name}.yaml")


@pytest.mark.parametrize(
"target_name, kind, scale_stabilization_window, wait_duration",
[("target-1", "deployment", 15, 5), ("target-2", "statefulset", 15, 30)],
)
def test_target_with_scale_up_stabilization(
setup, target_name: str, kind: str, scale_stabilization_window: int, wait_duration: int
):
set_foo_metric_value(0)

deploy_target(f"{target_name}.yaml")

# The intial replicas count is 1
wait_scale(kind=kind, name=target_name, replicas=1)

khstz = run_scaler(scale_up_stabilization_window=scale_stabilization_window)

try:

# The initial metric value is 0, it should scale the target to 0
wait_scale(kind=kind, name=target_name, replicas=0)

# Increase the metric value
set_foo_metric_value(10)

sleep(wait_duration)

if wait_duration < scale_stabilization_window:
# The deloyment was revived but the HPA was not able to scale it up yet.
wait_scale(kind=kind, name=target_name, replicas=0)
else:
# The deloyment was revived and the HPA was able to scale it up.
wait_scale(kind=kind, name=target_name, replicas=3)
finally:
khstz.kill()
delete_target(f"{target_name}.yaml")


@pytest.mark.parametrize(
"target_name, kind, scale_stabilization_window, wait_duration",
[("target-1", "deployment", 15, 5), ("target-2", "statefulset", 15, 30)],
)
def test_target_with_scale_down_stabilization(
setup, target_name: str, kind: str, scale_stabilization_window: int, wait_duration: int
):
set_foo_metric_value(10)

deploy_target(f"{target_name}.yaml")

# The intial replicas count is 3
wait_scale(kind=kind, name=target_name, replicas=3)

khstz = run_scaler(scale_down_stabilization_window=scale_stabilization_window)

try:

# Decrease the metric value
set_foo_metric_value(0)

sleep(wait_duration)

if wait_duration < scale_stabilization_window:
# The deployment or the statefulset must be up before the end of the stabilization_window.
wait_scale(kind=kind, name=target_name, replicas=3)
else:
# The deployment or the statefulset must go down to zero after the end of the stabilization_window.
wait_scale(kind=kind, name=target_name, replicas=0)
finally:
khstz.kill()
delete_target(f"{target_name}.yaml")
Loading

0 comments on commit 6fe2ea8

Please sign in to comment.