From 3f31bbf7f965f95d5ec9c1ed068d9faa91fb65c9 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Fri, 15 Nov 2024 18:36:32 +0100 Subject: [PATCH 1/5] Migrate config .github/renovate.json5 (#673) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/renovate.json5 | 58 ++++++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/.github/renovate.json5 b/.github/renovate.json5 index 3a11766c18..34085c9225 100644 --- a/.github/renovate.json5 +++ b/.github/renovate.json5 @@ -1,30 +1,44 @@ { - "$schema": "https://docs.renovatebot.com/renovate-schema.json", - "extends": ["github>canonical/data-platform//renovate_presets/charm.json5"], - "reviewers": ["team:data-platform-postgresql"], - "packageRules": [ - // Later rules override earlier rules + $schema: 'https://docs.renovatebot.com/renovate-schema.json', + extends: [ + 'github>canonical/data-platform//renovate_presets/charm.json5', + ], + reviewers: [ + 'team:data-platform-postgresql', + ], + packageRules: [ { - "matchPackageNames": ["pydantic"], - "allowedVersions": "<2.0.0" + matchPackageNames: [ + 'pydantic', + ], + allowedVersions: '<2.0.0', }, { - "matchManagers": ["regex"], - "matchDepNames": ["juju"], - "matchDatasources": ["pypi"], - "allowedVersions": "<3", - "groupName": "Juju agents" - } + matchManagers: [ + 'custom.regex', + ], + matchDepNames: [ + 'juju', + ], + matchDatasources: [ + 'pypi', + ], + allowedVersions: '<3', + groupName: 'Juju agents', + }, ], - "regexManagers": [ + customManagers: [ { - "fileMatch": ["^\\.github/workflows/[^/]+\\.ya?ml$"], - "matchStrings": [ - "(libjuju: )==(?.*?) +# renovate: latest libjuju 2" + customType: 'regex', + fileMatch: [ + '^\\.github/workflows/[^/]+\\.ya?ml$', + ], + matchStrings: [ + '(libjuju: )==(?.*?) +# renovate: latest libjuju 2', ], - "depNameTemplate": "juju", - "datasourceTemplate": "pypi", - "versioningTemplate": "loose" - } - ] + depNameTemplate: 'juju', + datasourceTemplate: 'pypi', + versioningTemplate: 'loose', + }, + ], } From 3951e95e64d7b7598f0ccb850c06f766985b5223 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:03:24 -0300 Subject: [PATCH 2/5] Update data-platform-workflows to v23.0.5 (#676) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/ci.yaml | 6 +++--- .github/workflows/release.yaml | 4 ++-- .github/workflows/sync_docs.yaml | 2 +- poetry.lock | 18 +++++++++--------- pyproject.toml | 8 ++++---- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 2907e3eb59..45bcaa710e 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -23,7 +23,7 @@ on: jobs: lint: name: Lint - uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v23.0.5 unit-test: name: Unit test charm @@ -45,7 +45,7 @@ jobs: build: name: Build charm - uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v23.0.5 with: cache: true @@ -77,7 +77,7 @@ jobs: - lint - unit-test - build - uses: canonical/data-platform-workflows/.github/workflows/integration_test_charm.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/integration_test_charm.yaml@v23.0.5 with: artifact-prefix: ${{ needs.build.outputs.artifact-prefix }} architecture: ${{ matrix.architecture }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index b356a84476..f709bf43b1 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -25,14 +25,14 @@ jobs: build: name: Build charm - uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v23.0.5 release: name: Release charm needs: - ci-tests - build - uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v23.0.5 with: channel: 14/edge artifact-prefix: ${{ needs.build.outputs.artifact-prefix }} diff --git a/.github/workflows/sync_docs.yaml b/.github/workflows/sync_docs.yaml index cc6cfbc480..3a41cc31cc 100644 --- a/.github/workflows/sync_docs.yaml +++ b/.github/workflows/sync_docs.yaml @@ -10,7 +10,7 @@ on: jobs: sync-docs: name: Sync docs from Discourse - uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v23.0.4 + uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v23.0.5 with: reviewers: a-velasco,izmalk permissions: diff --git a/poetry.lock b/poetry.lock index a298423bd0..7b4d2ac24f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -31,8 +31,8 @@ pytest = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v23.0.4" -resolved_reference = "60f088b7f0f967a8e35d45339f5123a6e74786f7" +reference = "v23.0.5" +resolved_reference = "e3f522c648375decee87fc0982c012e46ffb0b98" subdirectory = "python/pytest_plugins/allure_pytest_collection_report" [[package]] @@ -1802,8 +1802,8 @@ develop = false [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v23.0.4" -resolved_reference = "60f088b7f0f967a8e35d45339f5123a6e74786f7" +reference = "v23.0.5" +resolved_reference = "e3f522c648375decee87fc0982c012e46ffb0b98" subdirectory = "python/pytest_plugins/github_secrets" [[package]] @@ -1840,8 +1840,8 @@ pyyaml = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v23.0.4" -resolved_reference = "60f088b7f0f967a8e35d45339f5123a6e74786f7" +reference = "v23.0.5" +resolved_reference = "e3f522c648375decee87fc0982c012e46ffb0b98" subdirectory = "python/pytest_plugins/pytest_operator_cache" [[package]] @@ -1859,8 +1859,8 @@ pytest = "*" [package.source] type = "git" url = "https://github.com/canonical/data-platform-workflows" -reference = "v23.0.4" -resolved_reference = "60f088b7f0f967a8e35d45339f5123a6e74786f7" +reference = "v23.0.5" +resolved_reference = "e3f522c648375decee87fc0982c012e46ffb0b98" subdirectory = "python/pytest_plugins/pytest_operator_groups" [[package]] @@ -2533,4 +2533,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "be84825d8bc3d6716d62a2c7f283a49f386445927c12dea73df65e317df7b3d9" +content-hash = "a24006bb8af98b161cd722b73b93b3ce7fbc5f44e46ee2d4faa24e438c09e0de" diff --git a/pyproject.toml b/pyproject.toml index 500cffe99d..ea24e76a47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,10 +61,10 @@ optional = true [tool.poetry.group.integration.dependencies] pytest = "^8.3.3" -pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.4", subdirectory = "python/pytest_plugins/github_secrets"} +pytest-github-secrets = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.5", subdirectory = "python/pytest_plugins/github_secrets"} pytest-operator = "^0.38.0" -pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.4", subdirectory = "python/pytest_plugins/pytest_operator_cache"} -pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.4", subdirectory = "python/pytest_plugins/pytest_operator_groups"} +pytest-operator-cache = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.5", subdirectory = "python/pytest_plugins/pytest_operator_cache"} +pytest-operator-groups = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.5", subdirectory = "python/pytest_plugins/pytest_operator_groups"} # renovate caret doesn't work: https://github.com/renovatebot/renovate/issues/26940 juju = "<=3.5.0.0" boto3 = "*" @@ -73,7 +73,7 @@ landscape-api-py3 = "^0.9.0" mailmanclient = "^3.3.5" psycopg2-binary = "^2.9.10" allure-pytest = "^2.13.5" -allure-pytest-collection-report = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.4", subdirectory = "python/pytest_plugins/allure_pytest_collection_report"} +allure-pytest-collection-report = {git = "https://github.com/canonical/data-platform-workflows", tag = "v23.0.5", subdirectory = "python/pytest_plugins/allure_pytest_collection_report"} # Testing tools configuration [tool.coverage.run] From 84f381ea636121d4fd6e81a22a3db30db43d4e97 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:59:00 -0300 Subject: [PATCH 3/5] Update codecov/codecov-action action to v5 (#674) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 45bcaa710e..8d160e58dc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -39,7 +39,7 @@ jobs: - name: Run tests run: tox run -e unit - name: Upload Coverage to Codecov - uses: codecov/codecov-action@v4 + uses: codecov/codecov-action@v5 env: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} From 1b6a748c71109f2ccf98f5cc44fc6102afe66b60 Mon Sep 17 00:00:00 2001 From: shayancanonical <99665202+shayancanonical@users.noreply.github.com> Date: Mon, 18 Nov 2024 15:46:45 -0500 Subject: [PATCH 4/5] Test against juju 3.6/candidate + upgrade dpw to v23.0.5 (#675) * Test against juju 3.6/candidate + upgrade dpw to v23.0.5 * Update 3.6 another set of nightly tests to run against 3.6/candidate instead of 3.6/beta --- .github/workflows/ci.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8d160e58dc..b76a889f3c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -59,7 +59,7 @@ jobs: allure_on_amd64: false - agent: 3.4.6 # renovate: juju-agent-pin-minor allure_on_amd64: true - - snap_channel: 3.6/beta + - snap_channel: 3.6/candidate allure_on_amd64: false architecture: - amd64 @@ -69,7 +69,7 @@ jobs: allure_on_amd64: true architecture: arm64 - juju: - snap_channel: 3.6/beta + snap_channel: 3.6/candidate allure_on_amd64: false architecture: arm64 name: Integration | ${{ matrix.juju.agent || matrix.juju.snap_channel }} | ${{ matrix.architecture }} From f50d3732b9a755f5891dca993aa5866c49623e09 Mon Sep 17 00:00:00 2001 From: Dragomir Penev <6687393+dragomirp@users.noreply.github.com> Date: Tue, 19 Nov 2024 12:08:58 +0200 Subject: [PATCH 5/5] Bump libs (#677) --- lib/charms/postgresql_k8s/v0/postgresql.py | 16 +- .../tempo_coordinator_k8s/v0/charm_tracing.py | 389 ++++++++++++++++-- 2 files changed, 370 insertions(+), 35 deletions(-) diff --git a/lib/charms/postgresql_k8s/v0/postgresql.py b/lib/charms/postgresql_k8s/v0/postgresql.py index 2f2b2f9990..4d8d6dc30c 100644 --- a/lib/charms/postgresql_k8s/v0/postgresql.py +++ b/lib/charms/postgresql_k8s/v0/postgresql.py @@ -36,7 +36,7 @@ # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 37 +LIBPATCH = 39 INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles" @@ -244,7 +244,7 @@ def create_user( privilege for privilege in privileges if privilege not in valid_privileges ] if len(invalid_privileges) > 0: - logger.error(f'Invalid extra user roles: {", ".join(privileges)}') + logger.error(f"Invalid extra user roles: {', '.join(privileges)}") raise PostgreSQLCreateUserError(INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE) with self._connect_to_database() as connection, connection.cursor() as cursor: @@ -256,7 +256,7 @@ def create_user( user_definition = "CREATE ROLE {}" user_definition += f"WITH {'NOLOGIN' if user == 'admin' else 'LOGIN'}{' SUPERUSER' if admin else ''} ENCRYPTED PASSWORD '{password}'{'IN ROLE admin CREATEDB' if admin_role else ''}" if privileges: - user_definition += f' {" ".join(privileges)}' + user_definition += f" {' '.join(privileges)}" cursor.execute(sql.SQL("BEGIN;")) cursor.execute(sql.SQL("SET LOCAL log_statement = 'none';")) cursor.execute(sql.SQL(f"{user_definition};").format(sql.Identifier(user))) @@ -375,8 +375,12 @@ def _generate_database_privileges_statements( UNION SELECT 2 AS index,'ALTER SEQUENCE '|| sequence_schema || '."' || sequence_name ||'" OWNER TO {};' AS statement FROM information_schema.sequences WHERE NOT sequence_schema IN ('pg_catalog', 'information_schema') UNION SELECT 3 AS index,'ALTER FUNCTION '|| nsp.nspname || '."' || p.proname ||'"('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};' AS statement -FROM pg_proc p JOIN pg_namespace nsp ON p.pronamespace = nsp.oid WHERE NOT nsp.nspname IN ('pg_catalog', 'information_schema') -UNION SELECT 4 AS index,'ALTER VIEW '|| schemaname || '."' || viewname ||'" OWNER TO {};' AS statement +FROM pg_proc p JOIN pg_namespace nsp ON p.pronamespace = nsp.oid WHERE NOT nsp.nspname IN ('pg_catalog', 'information_schema') AND p.prokind = 'f' +UNION SELECT 4 AS index,'ALTER PROCEDURE '|| nsp.nspname || '."' || p.proname ||'"('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};' AS statement +FROM pg_proc p JOIN pg_namespace nsp ON p.pronamespace = nsp.oid WHERE NOT nsp.nspname IN ('pg_catalog', 'information_schema') AND p.prokind = 'p' +UNION SELECT 5 AS index,'ALTER AGGREGATE '|| nsp.nspname || '."' || p.proname ||'"('||pg_get_function_identity_arguments(p.oid)||') OWNER TO {};' AS statement +FROM pg_proc p JOIN pg_namespace nsp ON p.pronamespace = nsp.oid WHERE NOT nsp.nspname IN ('pg_catalog', 'information_schema') AND p.prokind = 'a' +UNION SELECT 6 AS index,'ALTER VIEW '|| schemaname || '."' || viewname ||'" OWNER TO {};' AS statement FROM pg_catalog.pg_views WHERE NOT schemaname IN ('pg_catalog', 'information_schema')) AS statements ORDER BY index) LOOP EXECUTE format(r.statement); END LOOP; @@ -386,6 +390,8 @@ def _generate_database_privileges_statements( sql.Identifier(user), sql.Identifier(user), sql.Identifier(user), + sql.Identifier(user), + sql.Identifier(user), ) ) statements.append( diff --git a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py index 1e7ff8405a..cf8def11ac 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py @@ -69,6 +69,9 @@ def my_tracing_endpoint(self) -> Optional[str]: - every event as a span (including custom events) - every charm method call (except dunders) as a span +We recommend that you scale up your tracing provider and relate it to an ingress so that your tracing requests +go through the ingress and get load balanced across all units. Otherwise, if the provider's leader goes down, your tracing goes down. + ## TLS support If your charm integrates with a TLS provider which is also trusted by the tracing provider (the Tempo charm), @@ -114,6 +117,57 @@ def get_tracer(self) -> opentelemetry.trace.Tracer: See the official opentelemetry Python SDK documentation for usage: https://opentelemetry-python.readthedocs.io/en/latest/ + +## Caching traces +The `trace_charm` machinery will buffer any traces collected during charm execution and store them +to a file on the charm container until a tracing backend becomes available. At that point, it will +flush them to the tracing receiver. + +By default, the buffer is configured to start dropping old traces if any of these conditions apply: + +- the storage size exceeds 10 MiB +- the number of buffered events exceeds 100 + +You can configure this by, for example: + +```python +@trace_charm( + tracing_endpoint="my_tracing_endpoint", + server_cert="_server_cert", + # only cache up to 42 events + buffer_max_events=42, + # only cache up to 42 MiB + buffer_max_size_mib=42, # minimum 10! +) +class MyCharm(CharmBase): + ... +``` + +Note that setting `buffer_max_events` to 0 will effectively disable the buffer. + +The path of the buffer file is by default in the charm's execution root, which for k8s charms means +that in case of pod churn, the cache will be lost. The recommended solution is to use an existing storage +(or add a new one) such as: + +```yaml +storage: + data: + type: filesystem + location: /charm-traces +``` + +and then configure the `@trace_charm` decorator to use it as path for storing the buffer: +```python +@trace_charm( + tracing_endpoint="my_tracing_endpoint", + server_cert="_server_cert", + # store traces to a PVC so they're not lost on pod restart. + buffer_path="/charm-traces/buffer.file", +) +class MyCharm(CharmBase): + ... +``` + ## Upgrading from `v0` If you are upgrading from `charm_tracing` v0, you need to take the following steps (assuming you already @@ -171,6 +225,12 @@ def my_tracing_endpoint(self) -> Optional[str]: 3) If you were passing a certificate (str) using `server_cert`, you need to change it to provide an *absolute* path to the certificate file instead. """ +import typing + +from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import ( + encode_spans, +) +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter def _remove_stale_otel_sdk_packages(): @@ -222,6 +282,9 @@ def _remove_stale_otel_sdk_packages(): otel_logger.debug("Successfully applied _remove_stale_otel_sdk_packages patch. ") +# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. +# it could be trouble if someone ever decides to implement their own tracer parallel to +# ours and before the charm has inited. We assume they won't. _remove_stale_otel_sdk_packages() import functools @@ -235,6 +298,7 @@ def _remove_stale_otel_sdk_packages(): Any, Callable, Generator, + List, Optional, Sequence, Type, @@ -247,8 +311,12 @@ def _remove_stale_otel_sdk_packages(): import ops from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import Span, TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider +from opentelemetry.sdk.trace.export import ( + BatchSpanProcessor, + SpanExporter, + SpanExportResult, +) from opentelemetry.trace import INVALID_SPAN, Tracer from opentelemetry.trace import get_current_span as otlp_get_current_span from opentelemetry.trace import ( @@ -269,7 +337,7 @@ def _remove_stale_otel_sdk_packages(): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 2 +LIBPATCH = 4 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] @@ -277,7 +345,7 @@ def _remove_stale_otel_sdk_packages(): dev_logger = logging.getLogger("tracing-dev") # set this to 0 if you are debugging/developing this library source -dev_logger.setLevel(logging.CRITICAL) +dev_logger.setLevel(logging.ERROR) _CharmType = Type[CharmBase] # the type CharmBase and any subclass thereof _C = TypeVar("_C", bound=_CharmType) @@ -287,6 +355,186 @@ def _remove_stale_otel_sdk_packages(): _GetterType = Union[Callable[[_CharmType], Optional[str]], property] CHARM_TRACING_ENABLED = "CHARM_TRACING_ENABLED" +BUFFER_DEFAULT_CACHE_FILE_NAME = ".charm_tracing_buffer.raw" +# we store the buffer as raw otlp-native protobuf (bytes) since it's hard to serialize/deserialize it in +# any portable format. Json dumping is supported, but loading isn't. +# cfr: https://github.com/open-telemetry/opentelemetry-python/issues/1003 + +BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB = 10 +_BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN = 10 +BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH = 100 +_MiB_TO_B = 2**20 # megabyte to byte conversion rate +_OTLP_SPAN_EXPORTER_TIMEOUT = 1 +"""Timeout in seconds that the OTLP span exporter has to push traces to the backend.""" + + +class _Buffer: + """Handles buffering for spans emitted while no tracing backend is configured or available. + + Use the max_event_history_length_buffering param of @trace_charm to tune + the amount of memory that this will hog on your units. + + The buffer is formatted as a bespoke byte dump (protobuf limitation). + We cannot store them as json because that is not well-supported by the sdk + (see https://github.com/open-telemetry/opentelemetry-python/issues/3364). + """ + + _SPANSEP = b"__CHARM_TRACING_BUFFER_SPAN_SEP__" + + def __init__(self, db_file: Path, max_event_history_length: int, max_buffer_size_mib: int): + self._db_file = db_file + self._max_event_history_length = max_event_history_length + self._max_buffer_size_mib = max(max_buffer_size_mib, _BUFFER_CACHE_FILE_SIZE_LIMIT_MiB_MIN) + + # set by caller + self.exporter: Optional[OTLPSpanExporter] = None + + def save(self, spans: typing.Sequence[ReadableSpan]): + """Save the spans collected by this exporter to the cache file. + + This method should be as fail-safe as possible. + """ + if self._max_event_history_length < 1: + dev_logger.debug("buffer disabled: max history length < 1") + return + + current_history_length = len(self.load()) + new_history_length = current_history_length + len(spans) + if (diff := self._max_event_history_length - new_history_length) < 0: + self.drop(diff) + self._save(spans) + + def _serialize(self, spans: Sequence[ReadableSpan]) -> bytes: + # encode because otherwise we can't json-dump them + return encode_spans(spans).SerializeToString() + + def _save(self, spans: Sequence[ReadableSpan], replace: bool = False): + dev_logger.debug(f"saving {len(spans)} new spans to buffer") + old = [] if replace else self.load() + new = self._serialize(spans) + + try: + # if the buffer exceeds the size limit, we start dropping old spans until it does + + while len((new + self._SPANSEP.join(old))) > (self._max_buffer_size_mib * _MiB_TO_B): + if not old: + # if we've already dropped all spans and still we can't get under the + # size limit, we can't save this span + logger.error( + f"span exceeds total buffer size limit ({self._max_buffer_size_mib}MiB); " + f"buffering FAILED" + ) + return + + old = old[1:] + logger.warning( + f"buffer size exceeds {self._max_buffer_size_mib}MiB; dropping older spans... " + f"Please increase the buffer size, disable buffering, or ensure the spans can be flushed." + ) + + self._db_file.write_bytes(new + self._SPANSEP.join(old)) + except Exception: + logger.exception("error buffering spans") + + def load(self) -> List[bytes]: + """Load currently buffered spans from the cache file. + + This method should be as fail-safe as possible. + """ + if not self._db_file.exists(): + dev_logger.debug("buffer file not found. buffer empty.") + return [] + try: + spans = self._db_file.read_bytes().split(self._SPANSEP) + except Exception: + logger.exception(f"error parsing {self._db_file}") + return [] + return spans + + def drop(self, n_spans: Optional[int] = None): + """Drop some currently buffered spans from the cache file.""" + current = self.load() + if n_spans: + dev_logger.debug(f"dropping {n_spans} spans from buffer") + new = current[n_spans:] + else: + dev_logger.debug("emptying buffer") + new = [] + + self._db_file.write_bytes(self._SPANSEP.join(new)) + + def flush(self) -> Optional[bool]: + """Export all buffered spans to the given exporter, then clear the buffer. + + Returns whether the flush was successful, and None if there was nothing to flush. + """ + if not self.exporter: + dev_logger.debug("no exporter set; skipping buffer flush") + return False + + buffered_spans = self.load() + if not buffered_spans: + dev_logger.debug("nothing to flush; buffer empty") + return None + + errors = False + for span in buffered_spans: + try: + out = self.exporter._export(span) # type: ignore + if not (200 <= out.status_code < 300): + # take any 2xx status code as a success + errors = True + except ConnectionError: + dev_logger.debug( + "failed exporting buffered span; backend might be down or still starting" + ) + errors = True + except Exception: + logger.exception("unexpected error while flushing span batch from buffer") + errors = True + + if not errors: + self.drop() + else: + logger.error("failed flushing spans; buffer preserved") + return not errors + + @property + def is_empty(self): + """Utility to check whether the buffer has any stored spans. + + This is more efficient than attempting a load() given how large the buffer might be. + """ + return (not self._db_file.exists()) or (self._db_file.stat().st_size == 0) + + +class _OTLPSpanExporter(OTLPSpanExporter): + """Subclass of OTLPSpanExporter to configure the max retry timeout, so that it fails a bit faster.""" + + # The issue we're trying to solve is that the model takes AGES to settle if e.g. tls is misconfigured, + # as every hook of a charm_tracing-instrumented charm takes about a minute to exit, as the charm can't + # flush the traces and keeps retrying for 'too long' + + _MAX_RETRY_TIMEOUT = 4 + # we give the exporter 4 seconds in total to succeed pushing the traces to tempo + # if it fails, we'll be caching the data in the buffer and flush it the next time, so there's no data loss risk. + # this means 2/3 retries (hard to guess from the implementation) and up to ~7 seconds total wait + + +class _BufferedExporter(InMemorySpanExporter): + def __init__(self, buffer: _Buffer) -> None: + super().__init__() + self._buffer = buffer + + def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: + self._buffer.save(spans) + return super().export(spans) + + def force_flush(self, timeout_millis: int = 0) -> bool: + # parent implementation is fake, so the timeout_millis arg is not doing anything. + result = super().force_flush(timeout_millis) + self._buffer.save(self.get_finished_spans()) + return result def is_enabled() -> bool: @@ -423,7 +671,10 @@ def _setup_root_span_initializer( charm_type: _CharmType, tracing_endpoint_attr: str, server_cert_attr: Optional[str], - service_name: Optional[str] = None, + service_name: Optional[str], + buffer_path: Optional[Path], + buffer_max_events: int, + buffer_max_size_mib: int, ): """Patch the charm's initializer.""" original_init = charm_type.__init__ @@ -442,18 +693,11 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): logger.info("Tracing DISABLED: skipping root span initialization") return - # already init some attrs that will be reinited later by calling original_init: - # self.framework = framework - # self.handle = Handle(None, self.handle_kind, None) - original_event_context = framework._event_context # default service name isn't just app name because it could conflict with the workload service name _service_name = service_name or f"{self.app.name}-charm" unit_name = self.unit.name - # apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm. - # it could be trouble if someone ever decides to implement their own tracer parallel to - # ours and before the charm has inited. We assume they won't. resource = Resource.create( attributes={ "service.name": _service_name, @@ -471,33 +715,60 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): # if anything goes wrong with retrieving the endpoint, we let the exception bubble up. tracing_endpoint = _get_tracing_endpoint(tracing_endpoint_attr, self, charm_type) + buffer_only = False + # whether we're only exporting to buffer, or also to the otlp exporter. + if not tracing_endpoint: # tracing is off if tracing_endpoint is None - return + # however we can buffer things until tracing comes online + buffer_only = True server_cert: Optional[Union[str, Path]] = ( _get_server_cert(server_cert_attr, self, charm_type) if server_cert_attr else None ) - if tracing_endpoint.startswith("https://") and not server_cert: + if (tracing_endpoint and tracing_endpoint.startswith("https://")) and not server_cert: logger.error( "Tracing endpoint is https, but no server_cert has been passed." "Please point @trace_charm to a `server_cert` attr. " "This might also mean that the tracing provider is related to a " "certificates provider, but this application is not (yet). " "In that case, you might just have to wait a bit for the certificates " - "integration to settle. " + "integration to settle. This span will be buffered." ) - return + buffer_only = True - exporter = OTLPSpanExporter( - endpoint=tracing_endpoint, - certificate_file=str(Path(server_cert).absolute()) if server_cert else None, - timeout=2, + buffer = _Buffer( + db_file=buffer_path or Path() / BUFFER_DEFAULT_CACHE_FILE_NAME, + max_event_history_length=buffer_max_events, + max_buffer_size_mib=buffer_max_size_mib, ) + previous_spans_buffered = not buffer.is_empty + + exporters: List[SpanExporter] = [] + if buffer_only: + # we have to buffer because we're missing necessary backend configuration + dev_logger.debug("buffering mode: ON") + exporters.append(_BufferedExporter(buffer)) + + else: + dev_logger.debug("buffering mode: FALLBACK") + # in principle, we have the right configuration to be pushing traces, + # but if we fail for whatever reason, we will put everything in the buffer + # and retry the next time + otlp_exporter = _OTLPSpanExporter( + endpoint=tracing_endpoint, + certificate_file=str(Path(server_cert).absolute()) if server_cert else None, + timeout=_OTLP_SPAN_EXPORTER_TIMEOUT, # give individual requests 1 second to succeed + ) + exporters.append(otlp_exporter) + exporters.append(_BufferedExporter(buffer)) + buffer.exporter = otlp_exporter + + for exporter in exporters: + processor = BatchSpanProcessor(exporter) + provider.add_span_processor(processor) - processor = BatchSpanProcessor(exporter) - provider.add_span_processor(processor) set_tracer_provider(provider) _tracer = get_tracer(_service_name) # type: ignore _tracer_token = tracer.set(_tracer) @@ -521,7 +792,7 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): @contextmanager def wrap_event_context(event_name: str): - dev_logger.info(f"entering event context: {event_name}") + dev_logger.debug(f"entering event context: {event_name}") # when the framework enters an event context, we create a span. with _span("event: " + event_name) as event_context_span: if event_context_span: @@ -535,12 +806,50 @@ def wrap_event_context(event_name: str): @functools.wraps(original_close) def wrap_close(): - dev_logger.info("tearing down tracer and flushing traces") + dev_logger.debug("tearing down tracer and flushing traces") span.end() opentelemetry.context.detach(span_token) # type: ignore tracer.reset(_tracer_token) tp = cast(TracerProvider, get_tracer_provider()) - tp.force_flush(timeout_millis=1000) # don't block for too long + flush_successful = tp.force_flush(timeout_millis=1000) # don't block for too long + + if buffer_only: + # if we're in buffer_only mode, it means we couldn't even set up the exporter for + # tempo as we're missing some data. + # so attempting to flush the buffer doesn't make sense + dev_logger.debug("tracing backend unavailable: all spans pushed to buffer") + + else: + dev_logger.debug("tracing backend found: attempting to flush buffer...") + + # if we do have an exporter for tempo, and we could send traces to it, + # we can attempt to flush the buffer as well. + if not flush_successful: + logger.error("flushing FAILED: unable to push traces to backend.") + else: + dev_logger.debug("flush succeeded.") + + # the backend has accepted the spans generated during this event, + if not previous_spans_buffered: + # if the buffer was empty to begin with, any spans we collected now can be discarded + buffer.drop() + dev_logger.debug("buffer dropped: this trace has been sent already") + else: + # if the buffer was nonempty, we can attempt to flush it + dev_logger.debug("attempting buffer flush...") + buffer_flush_successful = buffer.flush() + if buffer_flush_successful: + dev_logger.debug("buffer flush OK") + elif buffer_flush_successful is None: + # TODO is this even possible? + dev_logger.debug("buffer flush OK; empty: nothing to flush") + else: + # this situation is pretty weird, I'm not even sure it can happen, + # because it would mean that we did manage + # to push traces directly to the tempo exporter (flush_successful), + # but the buffer flush failed to push to the same exporter! + logger.error("buffer flush FAILED") + tp.shutdown() original_close() @@ -555,6 +864,9 @@ def trace_charm( server_cert: Optional[str] = None, service_name: Optional[str] = None, extra_types: Sequence[type] = (), + buffer_max_events: int = BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH, + buffer_max_size_mib: int = BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB, + buffer_path: Optional[Union[str, Path]] = None, ) -> Callable[[_T], _T]: """Autoinstrument the decorated charm with tracing telemetry. @@ -596,6 +908,10 @@ def trace_charm( Defaults to the juju application name this charm is deployed under. :param extra_types: pass any number of types that you also wish to autoinstrument. For example, charm libs, relation endpoint wrappers, workload abstractions, ... + :param buffer_max_events: max number of events to save in the buffer. Set to 0 to disable buffering. + :param buffer_max_size_mib: max size of the buffer file. When exceeded, spans will be dropped. + Minimum 10MiB. + :param buffer_path: path to buffer file to use for saving buffered spans. """ def _decorator(charm_type: _T) -> _T: @@ -606,6 +922,9 @@ def _decorator(charm_type: _T) -> _T: server_cert_attr=server_cert, service_name=service_name, extra_types=extra_types, + buffer_path=Path(buffer_path) if buffer_path else None, + buffer_max_size_mib=buffer_max_size_mib, + buffer_max_events=buffer_max_events, ) return charm_type @@ -618,6 +937,9 @@ def _autoinstrument( server_cert_attr: Optional[str] = None, service_name: Optional[str] = None, extra_types: Sequence[type] = (), + buffer_max_events: int = BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH, + buffer_max_size_mib: int = BUFFER_DEFAULT_CACHE_FILE_SIZE_LIMIT_MiB, + buffer_path: Optional[Path] = None, ) -> _T: """Set up tracing on this charm class. @@ -650,13 +972,20 @@ def _autoinstrument( Defaults to the juju application name this charm is deployed under. :param extra_types: pass any number of types that you also wish to autoinstrument. For example, charm libs, relation endpoint wrappers, workload abstractions, ... + :param buffer_max_events: max number of events to save in the buffer. Set to 0 to disable buffering. + :param buffer_max_size_mib: max size of the buffer file. When exceeded, spans will be dropped. + Minimum 10MiB. + :param buffer_path: path to buffer file to use for saving buffered spans. """ - dev_logger.info(f"instrumenting {charm_type}") + dev_logger.debug(f"instrumenting {charm_type}") _setup_root_span_initializer( charm_type, tracing_endpoint_attr, server_cert_attr=server_cert_attr, service_name=service_name, + buffer_path=buffer_path, + buffer_max_events=buffer_max_events, + buffer_max_size_mib=buffer_max_size_mib, ) trace_type(charm_type) for type_ in extra_types: @@ -672,12 +1001,12 @@ def trace_type(cls: _T) -> _T: It assumes that this class is only instantiated after a charm type decorated with `@trace_charm` has been instantiated. """ - dev_logger.info(f"instrumenting {cls}") + dev_logger.debug(f"instrumenting {cls}") for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - dev_logger.info(f"discovered {method}") + dev_logger.debug(f"discovered {method}") if method.__name__.startswith("__"): - dev_logger.info(f"skipping {method} (dunder)") + dev_logger.debug(f"skipping {method} (dunder)") continue # the span title in the general case should be: @@ -723,7 +1052,7 @@ def trace_function(function: _F, name: Optional[str] = None) -> _F: def _trace_callable(callable: _F, qualifier: str, name: Optional[str] = None) -> _F: - dev_logger.info(f"instrumenting {callable}") + dev_logger.debug(f"instrumenting {callable}") # sig = inspect.signature(callable) @functools.wraps(callable)