Skip to content

Commit

Permalink
Add tests for workload tracing (#649)
Browse files Browse the repository at this point in the history
* tests for workload tracing

* fix tests
  • Loading branch information
michaeldmitry authored Nov 27, 2024
1 parent 5a37b31 commit 0953ac0
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 1 deletion.
105 changes: 105 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
# See LICENSE file for licensing details.

import grp
import json
import logging
import subprocess
from pathlib import Path
from typing import List

import requests
import yaml
from juju.application import Application
from juju.unit import Unit
from lightkube import Client
from lightkube.resources.core_v1 import Pod
from minio import Minio
from pytest_operator.plugin import OpsTest
from tenacity import retry, stop_after_attempt, wait_exponential
from workload import Prometheus

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -277,3 +283,102 @@ def get_workload_file(
log.error(e.stdout.decode())
raise e
return res.stdout


async def deploy_and_configure_minio(ops_test: OpsTest) -> None:
"""Deploy and set up minio and s3-integrator needed for s3-like storage backend in the HA charms."""
config = {
"access-key": "accesskey",
"secret-key": "secretkey",
}
await ops_test.model.deploy("minio", channel="edge", trust=True, config=config)
await ops_test.model.wait_for_idle(apps=["minio"], status="active", timeout=2000)
minio_addr = await unit_address(ops_test, "minio", 0)

mc_client = Minio(
f"{minio_addr}:9000",
access_key="accesskey",
secret_key="secretkey",
secure=False,
)

# create tempo bucket
found = mc_client.bucket_exists("tempo")
if not found:
mc_client.make_bucket("tempo")

# configure s3-integrator
s3_integrator_app: Application = ops_test.model.applications["s3-integrator"]
s3_integrator_leader: Unit = s3_integrator_app.units[0]

await s3_integrator_app.set_config(
{
"endpoint": f"minio-0.minio-endpoints.{ops_test.model.name}.svc.cluster.local:9000",
"bucket": "tempo",
}
)

action = await s3_integrator_leader.run_action("sync-s3-credentials", **config)
action_result = await action.wait()
assert action_result.status == "completed"


async def deploy_tempo_cluster(ops_test: OpsTest):
"""Deploys tempo in its HA version together with minio and s3-integrator."""
tempo_app = "tempo"
worker_app = "tempo-worker"
tempo_worker_charm_url, worker_channel = "tempo-worker-k8s", "edge"
tempo_coordinator_charm_url, coordinator_channel = "tempo-coordinator-k8s", "edge"
await ops_test.model.deploy(
tempo_worker_charm_url, application_name=worker_app, channel=worker_channel, trust=True
)
await ops_test.model.deploy(
tempo_coordinator_charm_url,
application_name=tempo_app,
channel=coordinator_channel,
trust=True,
)
await ops_test.model.deploy("s3-integrator", channel="edge")

await ops_test.model.integrate(tempo_app + ":s3", "s3-integrator" + ":s3-credentials")
await ops_test.model.integrate(tempo_app + ":tempo-cluster", worker_app + ":tempo-cluster")

await deploy_and_configure_minio(ops_test)
async with ops_test.fast_forward():
await ops_test.model.wait_for_idle(
apps=[tempo_app, worker_app, "s3-integrator"],
status="active",
timeout=2000,
idle_period=30,
)


def get_traces(tempo_host: str, service_name="tracegen-otlp_http", tls=True):
"""Get traces directly from Tempo REST API."""
url = f"{'https' if tls else 'http'}://{tempo_host}:3200/api/search?tags=service.name={service_name}"
req = requests.get(
url,
verify=False,
)
assert req.status_code == 200
traces = json.loads(req.text)["traces"]
return traces


@retry(stop=stop_after_attempt(15), wait=wait_exponential(multiplier=1, min=4, max=10))
async def get_traces_patiently(tempo_host, service_name="tracegen-otlp_http", tls=True):
"""Get traces directly from Tempo REST API, but also try multiple times.
Useful for cases when Tempo might not return the traces immediately (its API is known for returning data in
random order).
"""
traces = get_traces(tempo_host, service_name=service_name, tls=tls)
assert len(traces) > 0
return traces


async def get_application_ip(ops_test: OpsTest, app_name: str) -> str:
"""Get the application IP address."""
status = await ops_test.model.get_status()
app = status["applications"][app_name]
return app.public_address
73 changes: 73 additions & 0 deletions tests/integration/test_workload_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/env python3
# Copyright 2024 Canonical Ltd.
# See LICENSE file for licensing details.
import asyncio
import logging
from pathlib import Path

import pytest
import yaml
from helpers import deploy_tempo_cluster, get_application_ip, get_traces_patiently, oci_image

logger = logging.getLogger(__name__)

METADATA = yaml.safe_load(Path("./metadata.yaml").read_text())
APP_NAME = "prometheus"
TEMPO_APP_NAME = "tempo"
TEMPO_WORKER_APP_NAME = "tempo-worker"
PROMETHEUS_RESOURCES = {"prometheus-image": oci_image("./metadata.yaml", "prometheus-image")}
SSC = "self-signed-certificates"
SSC_APP_NAME = "ssc"


@pytest.mark.abort_on_fail
async def test_workload_traces(ops_test, prometheus_charm):

# deploy Tempo and Prometheus
await asyncio.gather(
deploy_tempo_cluster(ops_test),
ops_test.model.deploy(
prometheus_charm, resources=PROMETHEUS_RESOURCES, application_name=APP_NAME, trust=True
),
)

# integrate workload-tracing only to not affect search results with charm traces
await ops_test.model.integrate(f"{APP_NAME}:workload-tracing", f"{TEMPO_APP_NAME}:tracing")

# stimulate prometheus to generate traces
await ops_test.model.integrate(
f"{APP_NAME}:receive-remote-write", f"{TEMPO_APP_NAME}:send-remote-write"
)

await ops_test.model.wait_for_idle(
apps=[APP_NAME, TEMPO_APP_NAME, TEMPO_WORKER_APP_NAME], status="active", timeout=300
)

# verify workload traces are ingested into Tempo
assert await get_traces_patiently(
await get_application_ip(ops_test, TEMPO_APP_NAME),
service_name=f"{APP_NAME}",
tls=False,
)


@pytest.mark.abort_on_fail
async def test_workload_traces_tls(ops_test):

# integrate with a TLS Provider
await ops_test.model.deploy(SSC, application_name=SSC_APP_NAME)
await ops_test.model.integrate(SSC_APP_NAME + ":certificates", APP_NAME + ":certificates")
await ops_test.model.integrate(
SSC_APP_NAME + ":certificates", TEMPO_APP_NAME + ":certificates"
)

# wait for workloads to settle down
await ops_test.model.wait_for_idle(
apps=[APP_NAME, TEMPO_APP_NAME, TEMPO_WORKER_APP_NAME], status="active", timeout=300
)

# verify workload traces are ingested into Tempo
assert await get_traces_patiently(
await get_application_ip(ops_test, TEMPO_APP_NAME),
service_name=f"{APP_NAME}",
)
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ deps =
pytest-operator
prometheus-api-client
tenacity
websockets < 14.0
# https://github.com/juju/python-libjuju/issues/1184
websockets < 14.0
minio
commands =
pytest -vv --tb native --log-cli-level=INFO --color=yes -s {posargs} {toxinidir}/tests/integration
allowlist_externals =
Expand Down

0 comments on commit 0953ac0

Please sign in to comment.