diff --git a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py index 3aea50f..cf8def1 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py @@ -117,6 +117,57 @@ def get_tracer(self) -> opentelemetry.trace.Tracer: See the official opentelemetry Python SDK documentation for usage: https://opentelemetry-python.readthedocs.io/en/latest/ + +## Caching traces +The `trace_charm` machinery will buffer any traces collected during charm execution and store them +to a file on the charm container until a tracing backend becomes available. At that point, it will +flush them to the tracing receiver. + +By default, the buffer is configured to start dropping old traces if any of these conditions apply: + +- the storage size exceeds 10 MiB +- the number of buffered events exceeds 100 + +You can configure this by, for example: + +```python +@trace_charm( + tracing_endpoint="my_tracing_endpoint", + server_cert="_server_cert", + # only cache up to 42 events + buffer_max_events=42, + # only cache up to 42 MiB + buffer_max_size_mib=42, # minimum 10! +) +class MyCharm(CharmBase): + ... +``` + +Note that setting `buffer_max_events` to 0 will effectively disable the buffer. + +The path of the buffer file is by default in the charm's execution root, which for k8s charms means +that in case of pod churn, the cache will be lost. The recommended solution is to use an existing storage +(or add a new one) such as: + +```yaml +storage: + data: + type: filesystem + location: /charm-traces +``` + +and then configure the `@trace_charm` decorator to use it as path for storing the buffer: +```python +@trace_charm( + tracing_endpoint="my_tracing_endpoint", + server_cert="_server_cert", + # store traces to a PVC so they're not lost on pod restart. + buffer_path="/charm-traces/buffer.file", +) +class MyCharm(CharmBase): + ... +``` + ## Upgrading from `v0` If you are upgrading from `charm_tracing` v0, you need to take the following steps (assuming you already @@ -174,6 +225,12 @@ def my_tracing_endpoint(self) -> Optional[str]: 3) If you were passing a certificate (str) using `server_cert`, you need to change it to provide an *absolute* path to the certificate file instead. """ +import typing + +from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import ( + encode_spans, +) +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter def _remove_stale_otel_sdk_packages(): @@ -225,6 +282,9 @@ def _remove_stale_otel_sdk_packages(): otel_logger.debug("Successfully applied _remove_stale_otel_sdk_packages patch. ") +# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. +# it could be trouble if someone ever decides to implement their own tracer parallel to +# ours and before the charm has inited. We assume they won't. _remove_stale_otel_sdk_packages() import functools @@ -238,6 +298,7 @@ def _remove_stale_otel_sdk_packages(): Any, Callable, Generator, + List, Optional, Sequence, Type, @@ -250,8 +311,12 @@ def _remove_stale_otel_sdk_packages(): import ops from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import Span, TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanExportResult, +) from opentelemetry.trace import INVALID_SPAN, Tracer from opentelemetry.trace import get_current_span as otlp_get_current_span from opentelemetry.trace import ( @@ -272,7 +337,7 @@ def _remove_stale_otel_sdk_packages(): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 3 +LIBPATCH = 4 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] @@ -280,7 +345,7 @@ def _remove_stale_otel_sdk_packages(): dev_logger = logging.getLogger("tracing-dev") # set this to 0 if you are debugging/developing this library source -dev_logger.setLevel(logging.CRITICAL) +dev_logger.setLevel(logging.ERROR) _CharmType = Type[CharmBase] # the type CharmBase and any subclass thereof _C = TypeVar("_C", bound=_CharmType) @@ -290,6 +355,186 @@ def _remove_stale_otel_sdk_packages(): _GetterType = Union[Callable[[_CharmType], Optional[str]], property] CHARM_TRACING_ENABLED = "CHARM_TRACING_ENABLED" +BUFFER_DEFAULT_CACHE_FILE_NAME = ".charm_tracing_buffer.raw" +# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to serialize/deserialize it in +# any portable format. Json dumping is supported, but loading isn't. +# cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003 + +BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB = 10 +_BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN = 10 +BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH = 100 +_MiB_TO_B = 2**20 # megabyte to byte conversion rate +_OTLP_SPAN_EXPORTER_TIMEOUT = 1 +"""Timeout in seconds that the OTLP span exporter has to push traces to the backend.""" + + +class _Buffer: + """Handles buffering for spans emitted while no tracing backend is configured or available. + + Use the max_event_history_length_buffering param of @trace_charm to tune + the amount of memory that this will hog on your units. + + The buffer is formatted as a bespoke byte dump (protobuf limitation). + We cannot store them as json because that is not well-supported by the sdk + (see https://github.com/open-telemetry/opentelemetry-python/issues/3364). + """ + + _SPANSEP = b"__CHARM_TRACING_BUFFER_SPAN_SEP__" + + def __init__(self, db_file: Path, max_event_history_length: int, max_buffer_size_mib: int): + self._db_file = db_file + self._max_event_history_length = max_event_history_length + self._max_buffer_size_mib = max(max_buffer_size_mib, _BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN) + + # set by caller + self.exporter: Optional[OTLPSpanExporter] = None + + def save(self, spans: typing.Sequence[ReadableSpan]): + """Save the spans collected by this exporter to the cache file. + + This method should be as fail-safe as possible. + """ + if self._max_event_history_length < 1: + dev_logger.debug("buffer disabled: max history length < 1") + return + + current_history_length = len(self.load()) + new_history_length = current_history_length + len(spans) + if (diff := self._max_event_history_length - new_history_length) < 0: + self.drop(diff) + self._save(spans) + + def _serialize(self, spans: Sequence[ReadableSpan]) -> bytes: + # encode because otherwise we can't json-dump them + return encode_spans(spans).SerializeToString() + + def _save(self, spans: Sequence[ReadableSpan], replace: bool = False): + dev_logger.debug(f"saving {len(spans)} new spans to buffer") + old = [] if replace else self.load() + new = self._serialize(spans) + + try: + # if the buffer exceeds the size limit, we start dropping old spans until it does + + while len((new + self._SPANSEP.join(old))) > (self._max_buffer_size_mib * _MiB_TO_B): + if not old: + # if we've already dropped all spans and still we can't get under the + # size limit, we can't save this span + logger.error( + f"span exceeds total buffer size limit ({self._max_buffer_size_mib}MiB); " + f"buffering FAILED" + ) + return + + old = old[1:] + logger.warning( + f"buffer size exceeds {self._max_buffer_size_mib}MiB; dropping older spans... " + f"Please increase the buffer size, disable buffering, or ensure the spans can be flushed." + ) + + self._db_file.write_bytes(new + self._SPANSEP.join(old)) + except Exception: + logger.exception("error buffering spans") + + def load(self) -> List[bytes]: + """Load currently buffered spans from the cache file. + + This method should be as fail-safe as possible. + """ + if not self._db_file.exists(): + dev_logger.debug("buffer file not found. buffer empty.") + return [] + try: + spans = self._db_file.read_bytes().split(self._SPANSEP) + except Exception: + logger.exception(f"error parsing {self._db_file}") + return [] + return spans + + def drop(self, n_spans: Optional[int] = None): + """Drop some currently buffered spans from the cache file.""" + current = self.load() + if n_spans: + dev_logger.debug(f"dropping {n_spans} spans from buffer") + new = current[n_spans:] + else: + dev_logger.debug("emptying buffer") + new = [] + + self._db_file.write_bytes(self._SPANSEP.join(new)) + + def flush(self) -> Optional[bool]: + """Export all buffered spans to the given exporter, then clear the buffer. + + Returns whether the flush was successful, and None if there was nothing to flush. + """ + if not self.exporter: + dev_logger.debug("no exporter set; skipping buffer flush") + return False + + buffered_spans = self.load() + if not buffered_spans: + dev_logger.debug("nothing to flush; buffer empty") + return None + + errors = False + for span in buffered_spans: + try: + out = self.exporter._export(span) # type: ignore + if not (200 <= out.status_code < 300): + # take any 2xx status code as a success + errors = True + except ConnectionError: + dev_logger.debug( + "failed exporting buffered span; backend might be down or still starting" + ) + errors = True + except Exception: + logger.exception("unexpected error while flushing span batch from buffer") + errors = True + + if not errors: + self.drop() + else: + logger.error("failed flushing spans; buffer preserved") + return not errors + + @property + def is_empty(self): + """Utility to check whether the buffer has any stored spans. + + This is more efficient than attempting a load() given how large the buffer might be. + """ + return (not self._db_file.exists()) or (self._db_file.stat().st_size == 0) + + +class _OTLPSpanExporter(OTLPSpanExporter): + """Subclass of OTLPSpanExporter to configure the max retry timeout, so that it fails a bit faster.""" + + # The issue we're trying to solve is that the model takes AGES to settle if e.g. tls is misconfigured, + # as every hook of a charm_tracing-instrumented charm takes about a minute to exit, as the charm can't + # flush the traces and keeps retrying for 'too long' + + _MAX_RETRY_TIMEOUT = 4 + # we give the exporter 4 seconds in total to succeed pushing the traces to tempo + # if it fails, we'll be caching the data in the buffer and flush it the next time, so there's no data loss risk. + # this means 2/3 retries (hard to guess from the implementation) and up to ~7 seconds total wait + + +class _BufferedExporter(InMemorySpanExporter): + def __init__(self, buffer: _Buffer) -> None: + super().__init__() + self._buffer = buffer + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + self._buffer.save(spans) + return super().export(spans) + + def force_flush(self, timeout_millis: int = 0) -> bool: + # parent implementation is fake, so the timeout_millis arg is not doing anything. + result = super().force_flush(timeout_millis) + self._buffer.save(self.get_finished_spans()) + return result def is_enabled() -> bool: @@ -426,7 +671,10 @@ def _setup_root_span_initializer( charm_type: _CharmType, tracing_endpoint_attr: str, server_cert_attr: Optional[str], - service_name: Optional[str] = None, + service_name: Optional[str], + buffer_path: Optional[Path], + buffer_max_events: int, + buffer_max_size_mib: int, ): """Patch the charm's initializer.""" original_init = charm_type.__init__ @@ -445,18 +693,11 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): logger.info("Tracing DISABLED: skipping root span initialization") return - # already init some attrs that will be reinited later by calling original_init: - # self.framework = framework - # self.handle = Handle(None, self.handle_kind, None) - original_event_context = framework._event_context # default service name isn't just app name because it could conflict with the workload service name _service_name = service_name or f"{self.app.name}-charm" unit_name = self.unit.name - # apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. - # it could be trouble if someone ever decides to implement their own tracer parallel to - # ours and before the charm has inited. We assume they won't. resource = Resource.create( attributes={ "service.name": _service_name, @@ -474,33 +715,60 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): # if anything goes wrong with retrieving the endpoint, we let the exception bubble up. tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_attr, self, charm_type) + buffer_only = False + # whether we're only exporting to buffer, or also to the otlp exporter. + if not tracing_endpoint: # tracing is off if tracing_endpoint is None - return + # however we can buffer things until tracing comes online + buffer_only = True server_cert: Optional[Union[str, Path]] = ( _get_server_cert(server_cert_attr, self, charm_type) if server_cert_attr else None ) - if tracing_endpoint.startswith("https://") and not server_cert: + if (tracing_endpoint and tracing_endpoint.startswith("https://")) and not server_cert: logger.error( "Tracing endpoint is https, but no server_cert has been passed." "Please point @trace_charm to a `server_cert` attr. " "This might also mean that the tracing provider is related to a " "certificates provider, but this application is not (yet). " "In that case, you might just have to wait a bit for the certificates " - "integration to settle. " + "integration to settle. This span will be buffered." ) - return + buffer_only = True - exporter = OTLPSpanExporter( - endpoint=tracing_endpoint, - certificate_file=str(Path(server_cert).absolute()) if server_cert else None, - timeout=2, + buffer = _Buffer( + db_file=buffer_path or Path() / BUFFER_DEFAULT_CACHE_FILE_NAME, + max_event_history_length=buffer_max_events, + max_buffer_size_mib=buffer_max_size_mib, ) + previous_spans_buffered = not buffer.is_empty + + exporters: List[SpanExporter] = [] + if buffer_only: + # we have to buffer because we're missing necessary backend configuration + dev_logger.debug("buffering mode: ON") + exporters.append(_BufferedExporter(buffer)) + + else: + dev_logger.debug("buffering mode: FALLBACK") + # in principle, we have the right configuration to be pushing traces, + # but if we fail for whatever reason, we will put everything in the buffer + # and retry the next time + otlp_exporter = _OTLPSpanExporter( + endpoint=tracing_endpoint, + certificate_file=str(Path(server_cert).absolute()) if server_cert else None, + timeout=_OTLP_SPAN_EXPORTER_TIMEOUT, # give individual requests 1 second to succeed + ) + exporters.append(otlp_exporter) + exporters.append(_BufferedExporter(buffer)) + buffer.exporter = otlp_exporter + + for exporter in exporters: + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) - processor = BatchSpanProcessor(exporter) - provider.add_span_processor(processor) set_tracer_provider(provider) _tracer = get_tracer(_service_name) # type: ignore _tracer_token = tracer.set(_tracer) @@ -524,7 +792,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): @contextmanager def wrap_event_context(event_name: str): - dev_logger.info(f"entering event context: {event_name}") + dev_logger.debug(f"entering event context: {event_name}") # when the framework enters an event context, we create a span. with _span("event: " + event_name) as event_context_span: if event_context_span: @@ -538,12 +806,50 @@ def wrap_event_context(event_name: str): @functools.wraps(original_close) def wrap_close(): - dev_logger.info("tearing down tracer and flushing traces") + dev_logger.debug("tearing down tracer and flushing traces") span.end() opentelemetry.context.detach(span_token) # type: ignore tracer.reset(_tracer_token) tp = cast(TracerProvider, get_tracer_provider()) - tp.force_flush(timeout_millis=1000) # don't block for too long + flush_successful = tp.force_flush(timeout_millis=1000) # don't block for too long + + if buffer_only: + # if we're in buffer_only mode, it means we couldn't even set up the exporter for + # tempo as we're missing some data. + # so attempting to flush the buffer doesn't make sense + dev_logger.debug("tracing backend unavailable: all spans pushed to buffer") + + else: + dev_logger.debug("tracing backend found: attempting to flush buffer...") + + # if we do have an exporter for tempo, and we could send traces to it, + # we can attempt to flush the buffer as well. + if not flush_successful: + logger.error("flushing FAILED: unable to push traces to backend.") + else: + dev_logger.debug("flush succeeded.") + + # the backend has accepted the spans generated during this event, + if not previous_spans_buffered: + # if the buffer was empty to begin with, any spans we collected now can be discarded + buffer.drop() + dev_logger.debug("buffer dropped: this trace has been sent already") + else: + # if the buffer was nonempty, we can attempt to flush it + dev_logger.debug("attempting buffer flush...") + buffer_flush_successful = buffer.flush() + if buffer_flush_successful: + dev_logger.debug("buffer flush OK") + elif buffer_flush_successful is None: + # TODO is this even possible? + dev_logger.debug("buffer flush OK; empty: nothing to flush") + else: + # this situation is pretty weird, I'm not even sure it can happen, + # because it would mean that we did manage + # to push traces directly to the tempo exporter (flush_successful), + # but the buffer flush failed to push to the same exporter! + logger.error("buffer flush FAILED") + tp.shutdown() original_close() @@ -558,6 +864,9 @@ def trace_charm( server_cert: Optional[str] = None, service_name: Optional[str] = None, extra_types: Sequence[type] = (), + buffer_max_events: int = BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH, + buffer_max_size_mib: int = BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB, + buffer_path: Optional[Union[str, Path]] = None, ) -> Callable[[_T], _T]: """Autoinstrument the decorated charm with tracing telemetry. @@ -599,6 +908,10 @@ def trace_charm( Defaults to the juju application name this charm is deployed under. :param extra_types: pass any number of types that you also wish to autoinstrument. For example, charm libs, relation endpoint wrappers, workload abstractions, ... + :param buffer_max_events: max number of events to save in the buffer. Set to 0 to disable buffering. + :param buffer_max_size_mib: max size of the buffer file. When exceeded, spans will be dropped. + Minimum 10MiB. + :param buffer_path: path to buffer file to use for saving buffered spans. """ def _decorator(charm_type: _T) -> _T: @@ -609,6 +922,9 @@ def _decorator(charm_type: _T) -> _T: server_cert_attr=server_cert, service_name=service_name, extra_types=extra_types, + buffer_path=Path(buffer_path) if buffer_path else None, + buffer_max_size_mib=buffer_max_size_mib, + buffer_max_events=buffer_max_events, ) return charm_type @@ -621,6 +937,9 @@ def _autoinstrument( server_cert_attr: Optional[str] = None, service_name: Optional[str] = None, extra_types: Sequence[type] = (), + buffer_max_events: int = BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH, + buffer_max_size_mib: int = BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB, + buffer_path: Optional[Path] = None, ) -> _T: """Set up tracing on this charm class. @@ -653,13 +972,20 @@ def _autoinstrument( Defaults to the juju application name this charm is deployed under. :param extra_types: pass any number of types that you also wish to autoinstrument. For example, charm libs, relation endpoint wrappers, workload abstractions, ... + :param buffer_max_events: max number of events to save in the buffer. Set to 0 to disable buffering. + :param buffer_max_size_mib: max size of the buffer file. When exceeded, spans will be dropped. + Minimum 10MiB. + :param buffer_path: path to buffer file to use for saving buffered spans. """ - dev_logger.info(f"instrumenting {charm_type}") + dev_logger.debug(f"instrumenting {charm_type}") _setup_root_span_initializer( charm_type, tracing_endpoint_attr, server_cert_attr=server_cert_attr, service_name=service_name, + buffer_path=buffer_path, + buffer_max_events=buffer_max_events, + buffer_max_size_mib=buffer_max_size_mib, ) trace_type(charm_type) for type_ in extra_types: @@ -675,12 +1001,12 @@ def trace_type(cls: _T) -> _T: It assumes that this class is only instantiated after a charm type decorated with `@trace_charm` has been instantiated. """ - dev_logger.info(f"instrumenting {cls}") + dev_logger.debug(f"instrumenting {cls}") for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - dev_logger.info(f"discovered {method}") + dev_logger.debug(f"discovered {method}") if method.__name__.startswith("__"): - dev_logger.info(f"skipping {method} (dunder)") + dev_logger.debug(f"skipping {method} (dunder)") continue # the span title in the general case should be: @@ -726,7 +1052,7 @@ def trace_function(function: _F, name: Optional[str] = None) -> _F: def _trace_callable(callable: _F, qualifier: str, name: Optional[str] = None) -> _F: - dev_logger.info(f"instrumenting {callable}") + dev_logger.debug(f"instrumenting {callable}") # sig = inspect.signature(callable) @functools.wraps(callable) diff --git a/lib/charms/tls_certificates_interface/v3/tls_certificates.py b/lib/charms/tls_certificates_interface/v3/tls_certificates.py index da7fa95..54f3fed 100644 --- a/lib/charms/tls_certificates_interface/v3/tls_certificates.py +++ b/lib/charms/tls_certificates_interface/v3/tls_certificates.py @@ -318,7 +318,7 @@ def _on_all_certificates_invalidated(self, event: AllCertificatesInvalidatedEven # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 20 +LIBPATCH = 22 PYDEPS = ["cryptography", "jsonschema"] @@ -1902,10 +1902,20 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: ) else: try: + secret = self.model.get_secret(label=f"{LIBID}-{csr_in_sha256_hex}") logger.debug( "Setting secret with label %s", f"{LIBID}-{csr_in_sha256_hex}" ) - secret = self.model.get_secret(label=f"{LIBID}-{csr_in_sha256_hex}") + # Juju < 3.6 will create a new revision even if the content is the same + if ( + secret.get_content(refresh=True).get("certificate", "") + == certificate.certificate + ): + logger.debug( + "Secret %s with correct certificate already exists", + f"{LIBID}-{csr_in_sha256_hex}", + ) + return secret.set_content( {"certificate": certificate.certificate, "csr": certificate.csr} ) @@ -1986,11 +1996,19 @@ def _on_secret_expired(self, event: SecretExpiredEvent) -> None: provider_certificate = self._find_certificate_in_relation_data(csr) if not provider_certificate: # A secret expired but we did not find matching certificate. Cleaning up + logger.warning( + "Failed to find matching certificate for csr, cleaning up secret %s", + event.secret.label, + ) event.secret.remove_all_revisions() return if not provider_certificate.expiry_time: # A secret expired but matching certificate is invalid. Cleaning up + logger.warning( + "Certificate matching csr is invalid, cleaning up secret %s", + event.secret.label, + ) event.secret.remove_all_revisions() return @@ -2023,14 +2041,18 @@ def _find_certificate_in_relation_data(self, csr: str) -> Optional[ProviderCerti return provider_certificate return None - def _get_csr_from_secret(self, secret: Secret) -> str: + def _get_csr_from_secret(self, secret: Secret) -> Union[str, None]: """Extract the CSR from the secret label or content. This function is a workaround to maintain backwards compatibility and fix the issue reported in https://github.com/canonical/tls-certificates-interface/issues/228 """ - if not (csr := secret.get_content().get("csr", "")): + try: + content = secret.get_content(refresh=True) + except SecretNotFoundError: + return None + if not (csr := content.get("csr", None)): # In versions <14 of the Lib we were storing the CSR in the label of the secret # The CSR now is stored int the content of the secret, which was a breaking change # Here we get the CSR if the secret was created by an app using libpatch 14 or lower diff --git a/requirements.txt b/requirements.txt index 37eaa6c..53d0720 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ # pin importlib-metadata version else charmcraft pack will fail to resolve the dependencies for the pydeps-installed otlp library importlib-metadata~=6.0.0 -ops +ops>=2.17 crossplane jsonschema==4.17.0 lightkube>=0.15.4 diff --git a/src/charm.py b/src/charm.py index 7404d2b..d5dcc21 100755 --- a/src/charm.py +++ b/src/charm.py @@ -48,6 +48,8 @@ class PeerData(DatabagModel): tracing_endpoint="tempo_otlp_http_endpoint", server_cert="server_ca_cert", extra_types=(Tempo, TracingEndpointProvider, Coordinator, ClusterRolesConfig), + # use PVC path for buffer data, so we don't lose it on pod churn + buffer_path=Path("/tempo-data/.charm_tracing_buffer.raw"), ) class TempoCoordinatorCharm(CharmBase): """Charmed Operator for Tempo; a distributed tracing backend.""" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index cc52850..5010225 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -4,10 +4,12 @@ import logging import os import random +import shlex import shutil import subprocess import tempfile from pathlib import Path +from subprocess import check_output from pytest import fixture from pytest_operator.plugin import OpsTest @@ -67,8 +69,7 @@ def copy_charm_libs_into_tester_charm(ops_test): yield # cleanup: remove all libs - for path in copies: - Path(path).unlink() + check_output(shlex.split("rm -rf ./tests/integration/tester/lib")) @fixture(scope="module", autouse=True) @@ -89,8 +90,7 @@ def copy_charm_libs_into_tester_grpc_charm(ops_test): yield # cleanup: remove all libs - for path in copies: - Path(path).unlink() + check_output(shlex.split("rm -rf ./tests/integration/tester-grpc/lib")) @fixture(scope="function") diff --git a/tests/integration/test_integration.py b/tests/integration/test_integration.py index aef43d3..c50b3ef 100644 --- a/tests/integration/test_integration.py +++ b/tests/integration/test_integration.py @@ -8,6 +8,8 @@ from helpers import WORKER_NAME, deploy_cluster from pytest_operator.plugin import OpsTest +from tests.integration.helpers import get_traces_patiently + METADATA = yaml.safe_load(Path("./charmcraft.yaml").read_text()) APP_NAME = "tempo" TESTER_METADATA = yaml.safe_load(Path("./tests/integration/tester/metadata.yaml").read_text()) @@ -22,7 +24,7 @@ @pytest.mark.setup @pytest.mark.abort_on_fail -async def test_build_and_deploy(ops_test: OpsTest, tempo_charm: Path): +async def test_build_deploy_testers(ops_test: OpsTest, tempo_charm: Path): # Given a fresh build of the charm # When deploying it together with testers # Then applications should eventually be created @@ -91,29 +93,40 @@ async def test_relate(ops_test: OpsTest): async def test_verify_traces_http(ops_test: OpsTest): # given a relation between charms # when traces endpoint is queried - # then it should contain traces from tester charm + # then it should contain traces from the tester charm status = await ops_test.model.get_status() app = status["applications"][APP_NAME] - endpoint = app.public_address + ":3200/api/search" - cmd = [ - "curl", - endpoint, - ] - rc, stdout, stderr = await ops_test.run(*cmd) - logger.info("%s: %s", endpoint, (rc, stdout, stderr)) - assert rc == 0, ( - f"curl exited with rc={rc} for {endpoint}; " - f"non-zero return code means curl encountered a >= 400 HTTP code; " - f"cmd={cmd}" + traces = await get_traces_patiently( + tempo_host=app.public_address, service_name="TempoTesterCharm", tls=False ) - traces = json.loads(stdout)["traces"] + assert ( + traces + ), f"There's no trace of charm exec traces in tempo. {json.dumps(traces, indent=2)}" - found = False - for trace in traces: - if trace["rootServiceName"] == "TempoTesterCharm": - found = True - assert found, f"There's no trace of charm exec traces in tempo. {json.dumps(traces, indent=2)}" +@pytest.mark.skip(reason="fails because search query results are not stable") +# keep an eye onhttps://github.com/grafana/tempo/issues/3777 and see if they fix it +async def test_verify_buffered_charm_traces_http(ops_test: OpsTest): + # given a relation between charms + # when traces endpoint is queried + # then it should contain all traces from the tester charm since the setup phase, thanks to the buffer + status = await ops_test.model.get_status() + app = status["applications"][APP_NAME] + traces = await get_traces_patiently( + tempo_host=app.public_address, service_name="TempoTesterCharm", tls=False + ) + + # charm-tracing trace names are in the format: + # "mycharm/0: event" + captured_events = {trace["rootTraceName"].split(" ")[1] for trace in traces} + expected_setup_events = { + "start", + "install", + "leader-elected", + "tracing-relation-created", + "replicas-relation-created", + } + assert expected_setup_events.issubset(captured_events) async def test_verify_traces_grpc(ops_test: OpsTest): @@ -122,27 +135,11 @@ async def test_verify_traces_grpc(ops_test: OpsTest): status = await ops_test.model.get_status() app = status["applications"][APP_NAME] logger.info(app.public_address) - endpoint = app.public_address + ":3200/api/search" - cmd = [ - "curl", - endpoint, - ] - rc, stdout, stderr = await ops_test.run(*cmd) - logger.info("%s: %s", endpoint, (rc, stdout, stderr)) - assert rc == 0, ( - f"curl exited with rc={rc} for {endpoint}; " - f"non-zero return code means curl encountered a >= 400 HTTP code; " - f"cmd={cmd}" + traces = await get_traces_patiently( + tempo_host=app.public_address, service_name="TempoTesterGrpcCharm", tls=False ) - traces = json.loads(stdout)["traces"] - - found = False - for trace in traces: - if trace["rootServiceName"] == "TempoTesterGrpcCharm": - found = True - assert ( - found + traces ), f"There's no trace of generated grpc traces in tempo. {json.dumps(traces, indent=2)}" diff --git a/tests/integration/tester-grpc/lib/.gitignore b/tests/integration/tester-grpc/lib/.gitignore deleted file mode 100644 index 8e5bbf0..0000000 --- a/tests/integration/tester-grpc/lib/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.py \ No newline at end of file diff --git a/tests/integration/tester-grpc/lib/charms/observability_libs/v0/.gitkeep b/tests/integration/tester-grpc/lib/charms/observability_libs/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester-grpc/lib/charms/prometheus_k8s/v0/.gitkeep b/tests/integration/tester-grpc/lib/charms/prometheus_k8s/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester-grpc/lib/charms/tempo_coordinator_k8s/v0/.gitkeep b/tests/integration/tester-grpc/lib/charms/tempo_coordinator_k8s/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester/lib/.gitignore b/tests/integration/tester/lib/.gitignore deleted file mode 100644 index 8e5bbf0..0000000 --- a/tests/integration/tester/lib/.gitignore +++ /dev/null @@ -1 +0,0 @@ -*.py \ No newline at end of file diff --git a/tests/integration/tester/lib/charms/observability_libs/v0/.gitkeep b/tests/integration/tester/lib/charms/observability_libs/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester/lib/charms/prometheus_k8s/v0/.gitkeep b/tests/integration/tester/lib/charms/prometheus_k8s/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester/lib/charms/tempo_coordinator_k8s/v0/.gitkeep b/tests/integration/tester/lib/charms/tempo_coordinator_k8s/v0/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/tests/integration/tester/src/charm.py b/tests/integration/tester/src/charm.py index 78ff61a..8db9868 100755 --- a/tests/integration/tester/src/charm.py +++ b/tests/integration/tester/src/charm.py @@ -26,7 +26,11 @@ TRACING_APP_NAME = "TempoTesterCharm" -@trace_charm(tracing_endpoint="tempo_otlp_http_endpoint", service_name=TRACING_APP_NAME) +@trace_charm( + tracing_endpoint="tempo_otlp_http_endpoint", + service_name=TRACING_APP_NAME, + buffer_max_events=100, +) class TempoTesterCharm(CharmBase): """Charm the service.""" diff --git a/tests/scenario/conftest.py b/tests/scenario/conftest.py index 1785fe6..0894fe6 100644 --- a/tests/scenario/conftest.py +++ b/tests/scenario/conftest.py @@ -2,29 +2,46 @@ from unittest.mock import MagicMock, patch import pytest +from charms.tempo_coordinator_k8s.v0.charm_tracing import charm_tracing_disabled from ops import ActiveStatus from scenario import Container, Context, PeerRelation, Relation from charm import PEERS_RELATION_ENDPOINT_NAME, TempoCoordinatorCharm +@pytest.fixture(autouse=True) +def patch_buffer_file_for_charm_tracing(tmp_path): + with patch( + "charms.tempo_coordinator_k8s.v0.charm_tracing.BUFFER_DEFAULT_CACHE_FILE_NAME", + str(tmp_path / "foo.json"), + ): + yield + + +@pytest.fixture(autouse=True, scope="session") +def disable_charm_tracing(): + with charm_tracing_disabled(): + yield + + @pytest.fixture() def coordinator(): return MagicMock() @pytest.fixture -def tempo_charm(): +def tempo_charm(tmp_path): with patch("lightkube.core.client.GenericSyncClient"): with patch("charm.TempoCoordinatorCharm.are_certificates_on_disk", False): - with patch.multiple( - "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch", - _namespace="test-namespace", - _patch=lambda _: None, - get_status=lambda _: ActiveStatus(""), - is_ready=lambda _: True, - ): - yield TempoCoordinatorCharm + with patch("tempo.Tempo.tls_ca_path", str(tmp_path / "cert.tmp")): + with patch.multiple( + "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch", + _namespace="test-namespace", + _patch=lambda _: None, + get_status=lambda _: ActiveStatus(""), + is_ready=lambda _: True, + ): + yield TempoCoordinatorCharm @pytest.fixture(scope="function") diff --git a/tests/scenario/test_charm_tracing.py b/tests/scenario/test_charm_tracing.py new file mode 100644 index 0000000..f95ca00 --- /dev/null +++ b/tests/scenario/test_charm_tracing.py @@ -0,0 +1,820 @@ +import functools +import logging +import os +import socket +from pathlib import Path +from typing import Optional +from unittest.mock import MagicMock, patch + +import pytest +import scenario +from charms.tempo_coordinator_k8s.v0.charm_tracing import CHARM_TRACING_ENABLED +from charms.tempo_coordinator_k8s.v0.charm_tracing import ( + _autoinstrument as autoinstrument, +) +from charms.tempo_coordinator_k8s.v0.charm_tracing import ( + _Buffer, + get_current_span, + trace, + trace_charm, +) +from charms.tempo_coordinator_k8s.v0.tracing import ( + ProtocolType, + Receiver, + TracingEndpointRequirer, + TracingProviderAppData, + TracingRequirerAppData, + charm_tracing_config, +) +from ops import EventBase, EventSource, Framework +from ops.charm import CharmBase, CharmEvents +from scenario import Context, Relation, State +from scenario.runtime import UncaughtCharmError + +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True) +def cleanup(): + # if any other test module disabled it... + os.environ[CHARM_TRACING_ENABLED] = "1" + + def patched_set_tracer_provider(tracer_provider, log): + import opentelemetry + + opentelemetry.trace._TRACER_PROVIDER = tracer_provider + + with patch("opentelemetry.trace._set_tracer_provider", new=patched_set_tracer_provider): + yield + + +class MyCharmSimple(CharmBase): + META = {"name": "frank"} + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmSimple, "tempo", buffer_max_events=0) + + +def test_base_tracer_endpoint(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmSimple, meta=MyCharmSimple.META) + ctx.run(ctx.on.start(), State()) + # assert "Setting up span exporter to endpoint: foo.bar:80" in caplog.text + assert "Starting root trace with id=" in caplog.text + span = f.call_args_list[0].args[0][0] + assert span.resource.attributes["service.name"] == "frank-charm" + assert span.resource.attributes["compose_service"] == "frank-charm" + assert span.resource.attributes["charm_type"] == "MyCharmSimple" + + +class SubObject: + def foo(self): + return "bar" + + +class MyCharmSubObject(CharmBase): + META = {"name": "frank"} + + def __init__(self, framework: Framework): + super().__init__(framework) + self.subobj = SubObject() + framework.observe(self.on.start, self._on_start) + + def _on_start(self, _): + self.subobj.foo() + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmSubObject, "tempo", extra_types=[SubObject], buffer_max_events=0) + + +def test_subobj_tracer_endpoint(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmSubObject, meta=MyCharmSubObject.META) + ctx.run(ctx.on.start(), State()) + spans = f.call_args_list[0].args[0] + assert f.call_count == 1 + assert spans[0].name == "method call: SubObject.foo" + + +class MyCharmInitAttr(CharmBase): + META = {"name": "frank"} + + def __init__(self, framework: Framework): + super().__init__(framework) + self._tempo = "foo.bar:80" + + @property + def tempo(self): + return self._tempo + + +autoinstrument(MyCharmInitAttr, "tempo", buffer_max_events=0) + + +def test_init_attr(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmInitAttr, meta=MyCharmInitAttr.META) + ctx.run(ctx.on.start(), State()) + # assert "Setting up span exporter to endpoint: foo.bar:80" in caplog.text + span = f.call_args_list[0].args[0][0] + assert span.resource.attributes["service.name"] == "frank-charm" + assert span.resource.attributes["compose_service"] == "frank-charm" + assert span.resource.attributes["charm_type"] == "MyCharmInitAttr" + + +class MyCharmSimpleDisabled(CharmBase): + META = {"name": "frank"} + + @property + def tempo(self): + return None + + +autoinstrument(MyCharmSimpleDisabled, "tempo", buffer_max_events=0) + + +def test_base_tracer_endpoint_disabled(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmSimpleDisabled, meta=MyCharmSimpleDisabled.META) + ctx.run(ctx.on.start(), State()) + + assert not f.called + + +@trace +def _my_fn(foo): + return foo + 1 + + +class MyCharmSimpleEvent(CharmBase): + META = {"name": "frank"} + + def __init__(self, fw): + super().__init__(fw) + span = get_current_span() + assert span is None # can't do that in init. + fw.observe(self.on.start, self._on_start) + + def _on_start(self, _): + span = get_current_span() + span.add_event( + "log", + { + "foo": "bar", + "baz": "qux", + }, + ) + _my_fn(2) + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmSimpleEvent, "tempo", buffer_max_events=0) + + +def test_base_tracer_endpoint_event(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmSimpleEvent, meta=MyCharmSimpleEvent.META) + ctx.run(ctx.on.start(), State()) + + spans = f.call_args_list[0].args[0] + span0, span1, span2, span3 = spans + assert span0.name == "function call: _my_fn" + + assert span1.name == "method call: MyCharmSimpleEvent._on_start" + + assert span2.name == "event: start" + evt = span2.events[0] + assert evt.name == "start" + + assert span3.name == "frank/0: start event" + + for span in spans: + assert span.resource.attributes["service.name"] == "frank-charm" + + +def test_juju_topology_injection(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmSimpleEvent, meta=MyCharmSimpleEvent.META) + state = ctx.run(ctx.on.start(), State()) + + spans = f.call_args_list[0].args[0] + + for span in spans: + # topology + assert span.resource.attributes["juju_unit"] == "frank/0" + assert span.resource.attributes["juju_application"] == "frank" + assert span.resource.attributes["juju_model"] == state.model.name + assert span.resource.attributes["juju_model_uuid"] == state.model.uuid + + +class MyCharmWithMethods(CharmBase): + META = {"name": "frank"} + + def __init__(self, fw): + super().__init__(fw) + fw.observe(self.on.start, self._on_start) + + def _on_start(self, _): + self.a() + self.b() + self.c() + + def a(self): + pass + + def b(self): + pass + + def c(self): + pass + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmWithMethods, "tempo", buffer_max_events=0) + + +def test_base_tracer_endpoint_methods(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmWithMethods, meta=MyCharmWithMethods.META) + ctx.run(ctx.on.start(), State()) + + spans = f.call_args_list[0].args[0] + span_names = [span.name for span in spans] + assert span_names == [ + "method call: MyCharmWithMethods.a", + "method call: MyCharmWithMethods.b", + "method call: MyCharmWithMethods.c", + "method call: MyCharmWithMethods._on_start", + "event: start", + "frank/0: start event", + ] + + +class Foo(EventBase): + pass + + +class MyEvents(CharmEvents): + foo = EventSource(Foo) + + +class MyCharmWithCustomEvents(CharmBase): + on = MyEvents() + + META = {"name": "frank"} + + def __init__(self, fw): + super().__init__(fw) + fw.observe(self.on.start, self._on_start) + fw.observe(self.on.foo, self._on_foo) + + def _on_start(self, _): + self.on.foo.emit() + + def _on_foo(self, _): + pass + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmWithCustomEvents, "tempo", buffer_max_events=0) + + +def test_base_tracer_endpoint_custom_event(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmWithCustomEvents, meta=MyCharmWithCustomEvents.META) + ctx.run(ctx.on.start(), State()) + + spans = f.call_args_list[0].args[0] + span_names = [span.name for span in spans] + assert span_names == [ + "method call: MyCharmWithCustomEvents._on_foo", + "event: foo", + "method call: MyCharmWithCustomEvents._on_start", + "event: start", + "frank/0: start event", + ] + # only the charm exec span is a root + assert not spans[-1].parent + for span in spans[:-1]: + assert span.parent + assert span.parent.trace_id + assert len({(span.parent.trace_id if span.parent else 0) for span in spans}) == 2 + + +class MyRemoteCharm(CharmBase): + META = {"name": "charlie", "requires": {"tracing": {"interface": "tracing", "limit": 1}}} + _request = True + + def __init__(self, framework: Framework): + super().__init__(framework) + self.tracing = TracingEndpointRequirer( + self, "tracing", protocols=(["otlp_http"] if self._request else []) + ) + + def tempo(self): + return self.tracing.get_endpoint("otlp_http") + + +autoinstrument(MyRemoteCharm, "tempo", buffer_max_events=0) + + +@pytest.mark.parametrize("leader", (True, False)) +def test_tracing_requirer_remote_charm_request_response(leader): + # IF the leader unit (whoever it is) did request the endpoint to be activated + MyRemoteCharm._request = True + ctx = Context(MyRemoteCharm, meta=MyRemoteCharm.META) + # WHEN you get any event AND the remote unit has already replied + tracing = scenario.Relation( + "tracing", + # if we're not leader, assume the leader did its part already + local_app_data=( + TracingRequirerAppData(receivers=["otlp_http"]).dump() if not leader else {} + ), + remote_app_data=TracingProviderAppData( + host="foo.com", + receivers=[ + Receiver( + url="http://foo.com:80", protocol=ProtocolType(name="otlp_http", type="http") + ) + ], + ).dump(), + ) + with ctx(ctx.on.start(), State(leader=leader, relations=[tracing])) as mgr: + # THEN you're good + assert mgr.charm.tempo() == "http://foo.com:80" + + +@pytest.mark.parametrize("leader", (True, False)) +def test_tracing_requirer_remote_charm_no_request_but_response(leader): + # IF the leader did NOT request the endpoint to be activated + MyRemoteCharm._request = False + ctx = Context(MyRemoteCharm, meta=MyRemoteCharm.META) + # WHEN you get any event AND the remote unit has already replied + tracing = scenario.Relation( + "tracing", + # empty local app data + remote_app_data=TracingProviderAppData( + # but the remote end has sent the data you need + receivers=[ + Receiver( + url="http://foo.com:80", protocol=ProtocolType(name="otlp_http", type="http") + ) + ], + ).dump(), + ) + with ctx(ctx.on.start(), State(leader=leader, relations=[tracing])) as mgr: + # THEN you're lucky, but you're good + assert mgr.charm.tempo() == "http://foo.com:80" + + +@pytest.mark.parametrize("relation", (True, False)) +@pytest.mark.parametrize("leader", (True, False)) +def test_tracing_requirer_remote_charm_no_request_no_response(leader, relation): + """Verify that the charm errors out (even with charm_tracing disabled) if the tempo() call raises.""" + # IF the leader did NOT request the endpoint to be activated + MyRemoteCharm._request = False + ctx = Context(MyRemoteCharm, meta=MyRemoteCharm.META) + # WHEN you get any event + if relation: + # AND you have an empty relation + tracing = scenario.Relation( + "tracing", + # empty local and remote app data + ) + relations = [tracing] + else: + # OR no relation at all + relations = [] + + # THEN self.tempo() will raise on init + # FIXME: non-leader units should get a permission denied exception, + # but it won't fire due to https://github.com/canonical/operator/issues/1378 + with pytest.raises(UncaughtCharmError, match=r"ProtocolNotRequestedError"): + ctx.run(ctx.on.start(), State(relations=relations, leader=leader)) + + +class MyRemoteBorkyCharm(CharmBase): + META = {"name": "charlie", "requires": {"tracing": {"interface": "tracing", "limit": 1}}} + _borky_return_value = None + + def tempo(self): + return self._borky_return_value + + +autoinstrument(MyRemoteBorkyCharm, "tempo", buffer_max_events=0) + + +@pytest.mark.parametrize("borky_return_value", (True, 42, object(), 0.2, [], (), {})) +def test_borky_tempo_return_value(borky_return_value, caplog): + """Verify that the charm exits 1 (even with charm_tracing disabled) if the tempo() call returns bad values.""" + # IF the charm's tempo endpoint getter returns anything but None or str + MyRemoteBorkyCharm._borky_return_value = borky_return_value + ctx = Context(MyRemoteBorkyCharm, meta=MyRemoteBorkyCharm.META) + # WHEN you get any event + # THEN the self.tempo getter will raise and charm exec will exit 1 + + # traceback from the TypeError raised by _get_tracing_endpoint + with pytest.raises( + UncaughtCharmError, + match=r"MyRemoteBorkyCharm\.tempo should resolve to a tempo " + r"endpoint \(string\); got (.*) instead\.", + ): + ctx.run(ctx.on.start(), State()) + + +class MyCharmStaticMethods(CharmBase): + META = {"name": "jolene"} + + def __init__(self, fw): + super().__init__(fw) + fw.observe(self.on.start, self._on_start) + fw.observe(self.on.update_status, self._on_update_status) + + def _on_start(self, _): + for o in (OtherObj(), OtherObj): + for meth in ("_staticmeth", "_staticmeth1", "_staticmeth2"): + assert getattr(o, meth)(1) == 2 + + def _on_update_status(self, _): + # super-ugly edge cases + OtherObj()._staticmeth3(OtherObj()) + OtherObj()._staticmeth4(OtherObj()) + OtherObj._staticmeth3(OtherObj()) + OtherObj._staticmeth4(OtherObj(), foo=2) + + @property + def tempo(self): + return "foo.bar:80" + + +class OtherObj: + @staticmethod + def _staticmeth(i: int, *args, **kwargs): + return 1 + i + + @staticmethod + def _staticmeth1(i: int): + return 1 + i + + @staticmethod + def _staticmeth2(i: int, foo="bar"): + return 1 + i + + @staticmethod + def _staticmeth3(abc: "OtherObj", foo="bar"): + return 1 + 1 + + @staticmethod + def _staticmeth4(abc: int, foo="bar"): + return 1 + 1 + + +autoinstrument(MyCharmStaticMethods, "tempo", extra_types=[OtherObj], buffer_max_events=0) + + +def test_trace_staticmethods(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmStaticMethods, meta=MyCharmStaticMethods.META) + ctx.run(ctx.on.start(), State()) + + spans = f.call_args_list[0].args[0] + + span_names = [span.name for span in spans] + assert span_names == [ + "method call: OtherObj._staticmeth", + "method call: OtherObj._staticmeth1", + "method call: OtherObj._staticmeth2", + "method call: OtherObj._staticmeth", + "method call: OtherObj._staticmeth1", + "method call: OtherObj._staticmeth2", + "method call: MyCharmStaticMethods._on_start", + "event: start", + "jolene/0: start event", + ] + + for span in spans: + assert span.resource.attributes["service.name"] == "jolene-charm" + + +def test_trace_staticmethods_bork(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmStaticMethods, meta=MyCharmStaticMethods.META) + ctx.run(ctx.on.update_status(), State()) + + +class SuperCharm(CharmBase): + def foo(self): + return "bar" + + +class MyInheritedCharm(SuperCharm): + META = {"name": "godcat"} + + def __init__(self, framework: Framework): + super().__init__(framework) + framework.observe(self.on.start, self._on_start) + + def _on_start(self, _): + self.foo() + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyInheritedCharm, "tempo", buffer_max_events=0) + + +def test_inheritance_tracing(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyInheritedCharm, meta=MyInheritedCharm.META) + ctx.run(ctx.on.start(), State()) + spans = f.call_args_list[0].args[0] + assert spans[0].name == "method call: SuperCharm.foo" + + +def bad_wrapper(func): + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +def good_wrapper(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + +class MyCharmWrappedMethods(CharmBase): + META = {"name": "catgod"} + + def __init__(self, fw): + super().__init__(fw) + fw.observe(self.on.start, self._on_start) + + @good_wrapper + def a(self): + pass + + @bad_wrapper + def b(self): + pass + + def _on_start(self, _): + self.a() + self.b() + + @property + def tempo(self): + return "foo.bar:80" + + +autoinstrument(MyCharmWrappedMethods, "tempo", buffer_max_events=0) + + +def test_wrapped_method_wrapping(caplog): + import opentelemetry + + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter.export" + ) as f: + f.return_value = opentelemetry.sdk.trace.export.SpanExportResult.SUCCESS + ctx = Context(MyCharmWrappedMethods, meta=MyCharmWrappedMethods.META) + ctx.run(ctx.on.start(), State()) + spans = f.call_args_list[0].args[0] + assert spans[0].name == "method call: MyCharmWrappedMethods.a" + assert spans[1].name == "method call: @bad_wrapper(MyCharmWrappedMethods.b)" + + +def make_buffering_charm( + crt_path: Optional[Path], + buffer_path: Path = None, + buffer_max_events: int = 100, + buffer_max_size: int = 100, +): + @trace_charm( + tracing_endpoint="tracing_endpoint", + server_cert="server_cert", + **({"buffer_path": buffer_path} if buffer_path else {}), + buffer_max_events=buffer_max_events, + buffer_max_size_mib=buffer_max_size, + ) + class MyBufferingCharm(CharmBase): + META = {"name": "josianne", "requires": {"tracing": {"interface": "tracing", "limit": 1}}} + + def __init__(self, framework: Framework): + super().__init__(framework) + self.tracing = TracingEndpointRequirer(self, "tracing") + self.tracing_endpoint, self.server_cert = charm_tracing_config(self.tracing, crt_path) + framework.observe(self.on.start, self._on_start) + + def _on_start(self, _): + pass + + return MyBufferingCharm + + +@pytest.mark.parametrize("tls", (True, False)) +def test_buffering_save(tmp_path, tls): + if tls: + cert = tmp_path / "mycert" + cert.write_text("foo") + else: + cert = None + + buffer_path = tmp_path / "mycert" + charm = make_buffering_charm(cert, buffer_path) + + # given a charm without a tracing relation + ctx = Context(charm, meta=charm.META) + # when we receive an event + ctx.run(ctx.on.start(), State()) + + # then the trace gets buffered + buffer = _Buffer(buffer_path, 100, 100) + assert buffer.load() + + +@pytest.mark.parametrize("tls", (True, False)) +def test_buffering_flush(tmp_path, tls): + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter._export" + ) as f: + mockresp = MagicMock() + mockresp.status_code = 200 + f.return_value = mockresp + + if tls: + cert = tmp_path / "mycert" + cert.write_text("foo") + else: + cert = None + + buffer_path = tmp_path / "mycert" + buffer_path.write_bytes(b"mockspan") + + charm = make_buffering_charm(cert, buffer_path) + + # given a charm with a tracing relation and a nonempty buffer + ctx = Context(charm, meta=charm.META) + # when we receive an event + + host = socket.getfqdn() + tracing = Relation( + "tracing", + remote_app_data={ + "receivers": f'[{{"protocol": {{"name": "otlp_grpc", "type": "grpc"}}, "url": "{host}:4317"}}, ' + f'{{"protocol": {{"name": "otlp_http", "type": "http"}}, "url": "http://{host}:4318"}}, ' + f'{{"protocol": {{"name": "zipkin", "type": "http"}}, "url": "http://{host}:9411" }}]', + }, + ) + + ctx.run(ctx.on.start(), State(relations={tracing})) + # then the buffered traces get flushed + assert f.call_count == 2 + + # and the buffer is empty + assert buffer_path.read_bytes() == b"" + + +@pytest.mark.parametrize("tls", (True, False)) +def test_buffering_size_limit(tmp_path, tls): + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter._export" + ) as f: + mockresp = MagicMock() + mockresp.status_code = 200 + f.return_value = mockresp + + if tls: + cert = tmp_path / "mycert" + cert.write_text("foo") + else: + cert = None + + buffer_path = tmp_path / "mycert" + + # current buffer contains a span that's ~80mb large + buffer_path.write_bytes(b"mockspan" * 10**7) + # set max buffer size to 1mb + charm = make_buffering_charm(cert, buffer_path, buffer_max_size=1) + + # given a charm with a tracing relation and a nonempty buffer + ctx = Context(charm, meta=charm.META) + # when we receive an event + + ctx.run(ctx.on.start(), State()) + # then the buffer only contains one span, and not the large one it had before + buffer = _Buffer(buffer_path, 100, 100) + buf = buffer.load() + assert len(buf) == 1 + assert b"mockspan" not in buf[0] + + +@pytest.mark.parametrize("tls", (True, False)) +@pytest.mark.parametrize("n_events", (5, 10)) +def test_buffering_event_n_limit(tmp_path, tls, n_events): + with patch( + "opentelemetry.exporter.otlp.proto.http.trace_exporter.OTLPSpanExporter._export" + ) as f: + mockresp = MagicMock() + mockresp.status_code = 200 + f.return_value = mockresp + + if tls: + cert = tmp_path / "mycert" + cert.write_text("foo") + else: + cert = None + + buffer_path = tmp_path / "mycert" + + # set max buffer size to 2 events + charm = make_buffering_charm(cert, buffer_path, buffer_max_events=2) + + # given a charm with a tracing relation and a nonempty buffer + ctx = Context(charm, meta=charm.META) + + # when we receive many events + for n in range(n_events): + ctx.run(ctx.on.start(), State()) + + # then the buffer only contains at most 2 spans + buffer = _Buffer(buffer_path, 100, 100) + buf = buffer.load() + assert len(buf) <= 2 diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py deleted file mode 100644 index 1800ebb..0000000 --- a/tests/unit/test_charm.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2024 Canonical Ltd. -# See LICENSE file for licensing details. - -import unittest -from unittest.mock import patch - -from ops.testing import Harness - -from charm import TempoCoordinatorCharm - -CONTAINER_NAME = "nginx" - -k8s_resource_multipatch = patch.multiple( - "cosl.coordinated_workers.coordinator.KubernetesComputeResourcesPatch", - _namespace="test-namespace", - _patch=lambda _: None, -) -lightkube_client_patch = patch("lightkube.core.client.GenericSyncClient") - - -class TestTempoCoordinatorCharm(unittest.TestCase): - - @k8s_resource_multipatch - @lightkube_client_patch - def setUp(self, *_): - self.harness = Harness(TempoCoordinatorCharm) - self.harness.set_model_name("testmodel") - self.addCleanup(self.harness.cleanup) - self.harness.set_leader(True) - self.harness.begin_with_initial_hooks() - self.harness.add_relation("s3", "s3-integrator") - self.harness.add_relation("tempo-cluster", "tempo-worker-k8s") - self.maxDiff = None # we're comparing big traefik configs in tests - - def test_entrypoints_are_generated_with_sanitized_names(self): - expected_entrypoints = { - "entryPoints": { - "tempo-http": {"address": ":3200"}, - "tempo-grpc": {"address": ":9096"}, - "zipkin": {"address": ":9411"}, - "otlp-grpc": {"address": ":4317"}, - "otlp-http": {"address": ":4318"}, - "jaeger-thrift-http": {"address": ":14268"}, - "jaeger-grpc": {"address": ":14250"}, - } - } - self.assertEqual(self.harness.charm._static_ingress_config, expected_entrypoints) diff --git a/tox.ini b/tox.ini index 556291a..ddec196 100644 --- a/tox.ini +++ b/tox.ini @@ -61,7 +61,7 @@ description = Run scenario tests deps = pytest<8.2.0 # https://github.com/pytest-dev/pytest/issues/12263 coverage[toml] - ops-scenario>=7.0.1 + ops[testing]>=2.17 -r{toxinidir}/requirements.txt commands = coverage run --source={[vars]src_path} \