Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): implement custom tracer_provider injection for opentelemetry traces #1229

Merged
merged 19 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
da102c4
all: implement custom tracer_provider injection
odeke-em Nov 9, 2024
2181ca9
Address review feedback by attaching observability_options to Client …
odeke-em Nov 12, 2024
540d0b8
Attach observability_options directly before trace_call
odeke-em Nov 12, 2024
0b9ca49
More reverts for formatting
odeke-em Nov 12, 2024
854a3ff
Plumb observability_options into _restart_on_unavailable
odeke-em Nov 13, 2024
3e9a8ca
completely decouple observability_options from session
odeke-em Nov 13, 2024
8af9e75
apply SPANNER_ENABLE_EXTENDED_TRACING but in inverse due to compatibi…
odeke-em Nov 13, 2024
7adab38
Document SPANNER_ENABLE_EXTENDED_TRACING in environment
odeke-em Nov 13, 2024
afd01e1
Revert a vestige of mock
odeke-em Nov 13, 2024
ca8d598
tests: add unit test for propagating TracerProvider
odeke-em Nov 13, 2024
6fb046c
Add preliminary end-to-end test to check for injection of observabili…
odeke-em Nov 13, 2024
fb3e8ea
Document default enable_extended_tracing value
odeke-em Nov 13, 2024
762390c
Carve out observability_options test
odeke-em Nov 13, 2024
21bfbcd
Ensure that observability_options test sets up and deletes database
odeke-em Nov 13, 2024
797241f
Ensure instance.create() is invoked in system tests
odeke-em Nov 14, 2024
25edeaa
Use getattr for mock _Client
odeke-em Nov 14, 2024
42b9a4c
Update with code review suggestions
odeke-em Nov 14, 2024
6c8bb88
Deal with mock.Mock false positives failing tests
odeke-em Nov 14, 2024
196e9a0
Address review feedback
odeke-em Nov 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions docs/opentelemetry-tracing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,21 @@ We also need to tell OpenTelemetry which exporter to use. To export Spanner trac

# Create and export one trace every 1000 requests
sampler = TraceIdRatioBased(1/1000)
# Use the default tracer provider
trace.set_tracer_provider(TracerProvider(sampler=sampler))
trace.get_tracer_provider().add_span_processor(
tracer_provider = TracerProvider(sampler=sampler)
tracer_provider.add_span_processor(
# Initialize the cloud tracing exporter
BatchSpanProcessor(CloudTraceSpanExporter())
)
observability_options = dict(
tracer_provider=tracer_provider,

# By default extended_tracing is set to True due
# to legacy reasons to avoid breaking changes, you
# can modify it though using the environment variable
# SPANNER_ENABLE_EXTENDED_TRACING=false.
enable_extended_tracing=False,
)
spanner = spanner.NewClient(project_id, observability_options=observability_options)


To get more fine-grained traces from gRPC, you can enable the gRPC instrumentation by the following
Expand All @@ -52,3 +61,13 @@ Generated spanner traces should now be available on `Cloud Trace <https://consol

Tracing is most effective when many libraries are instrumented to provide insight over the entire lifespan of a request.
For a list of libraries that can be instrumented, see the `OpenTelemetry Integrations` section of the `OpenTelemetry Python docs <https://opentelemetry-python.readthedocs.io/en/stable/>`_

Annotating spans with SQL
~~~~~~~~~~~~~~~~~~~~~~~~~

By default your spans will be annotated with SQL statements where appropriate, but that can be a PII (Personally Identifiable Information)
leak. Sadly due to legacy behavior, we cannot simply turn off this behavior by default. However you can control this behavior by setting

SPANNER_ENABLE_EXTENDED_TRACING=false

to turn it off globally or when creating each SpannerClient, please set `observability_options.enable_extended_tracing=false`
11 changes: 7 additions & 4 deletions examples/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,18 @@ def main():
tracer_provider = TracerProvider(sampler=ALWAYS_ON)
trace_exporter = CloudTraceSpanExporter(project_id=project_id)
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
trace.set_tracer_provider(tracer_provider)
# Retrieve a tracer from the global tracer provider.
tracer = tracer_provider.get_tracer('MyApp')

# Setup the Cloud Spanner Client.
spanner_client = spanner.Client(project_id)
spanner_client = spanner.Client(
project_id,
observability_options=dict(tracer_provider=tracer_provider, enable_extended_tracing=True),
)
instance = spanner_client.instance('test-instance')
database = instance.database('test-db')

# Retrieve a tracer from our custom tracer provider.
tracer = tracer_provider.get_tracer('MyApp')

# Now run our queries
with tracer.start_as_current_span('QueryInformationSchema'):
with database.snapshot() as snapshot:
Expand Down
27 changes: 25 additions & 2 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Manages OpenTelemetry trace creation and handling"""

from contextlib import contextmanager
import os

from google.cloud.spanner_v1 import SpannerClient
from google.cloud.spanner_v1 import gapic_version
Expand All @@ -33,6 +34,9 @@

TRACER_NAME = "cloud.google.com/python/spanner"
TRACER_VERSION = gapic_version.__version__
extended_tracing_globally_disabled = (
os.getenv("SPANNER_ENABLE_EXTENDED_TRACING", "").lower() == "false"
)


def get_tracer(tracer_provider=None):
Expand All @@ -51,13 +55,26 @@ def get_tracer(tracer_provider=None):


@contextmanager
def trace_call(name, session, extra_attributes=None):
def trace_call(name, session, extra_attributes=None, observability_options=None):
harshachinta marked this conversation as resolved.
Show resolved Hide resolved
if not HAS_OPENTELEMETRY_INSTALLED or not session:
# Empty context manager. Users will have to check if the generated value is None or a span
yield None
return

tracer = get_tracer()
tracer_provider = None

# By default enable_extended_tracing=True because in a bid to minimize
# breaking changes and preserve legacy behavior, we are keeping it turned
# on by default.
enable_extended_tracing = True

if isinstance(observability_options, dict): # Avoid false positives with mock.Mock
tracer_provider = observability_options.get("tracer_provider", None)
enable_extended_tracing = observability_options.get(
"enable_extended_tracing", enable_extended_tracing
)

tracer = get_tracer(tracer_provider)

# Set base attributes that we know for every trace created
attributes = {
Expand All @@ -72,6 +89,12 @@ def trace_call(name, session, extra_attributes=None):
if extra_attributes:
attributes.update(extra_attributes)

if extended_tracing_globally_disabled:
enable_extended_tracing = False

if not enable_extended_tracing:
attributes.pop("db.statement", False)

with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
Expand Down
16 changes: 14 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,13 @@ def commit(
max_commit_delay=max_commit_delay,
request_options=request_options,
)
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.Commit",
self._session,
trace_attributes,
observability_options=observability_options,
):
method = functools.partial(
api.commit,
request=request,
Expand Down Expand Up @@ -318,7 +324,13 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes):
observability_options = getattr(database, "observability_options", None)
with trace_call(
"CloudSpanner.BatchWrite",
self._session,
trace_attributes,
observability_options=observability_options,
):
method = functools.partial(
api.batch_write,
request=request,
Expand Down
21 changes: 21 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,16 @@ class Client(ClientWithProject):
for all ReadRequests and ExecuteSqlRequests that indicates which replicas
or regions should be used for non-transactional reads or queries.

:type observability_options: dict (str -> any) or None
:param observability_options: (Optional) the configuration to control
odeke-em marked this conversation as resolved.
Show resolved Hide resolved
the tracer's behavior.
tracer_provider is the injected tracer provider
enable_extended_tracing: :type:boolean when set to true will allow for
odeke-em marked this conversation as resolved.
Show resolved Hide resolved
spans that issue SQL statements to be annotated with SQL.
Default `True`, please set it to `False` to turn it off
or you can use the environment variable `SPANNER_ENABLE_EXTENDED_TRACING=<boolean>`
to control it.

:raises: :class:`ValueError <exceptions.ValueError>` if both ``read_only``
and ``admin`` are :data:`True`
"""
Expand All @@ -146,6 +156,7 @@ def __init__(
query_options=None,
route_to_leader_enabled=True,
directed_read_options=None,
observability_options=None,
):
self._emulator_host = _get_spanner_emulator_host()

Expand Down Expand Up @@ -187,6 +198,7 @@ def __init__(

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options
self._observability_options = observability_options
odeke-em marked this conversation as resolved.
Show resolved Hide resolved

@property
def credentials(self):
Expand Down Expand Up @@ -268,6 +280,15 @@ def route_to_leader_enabled(self):
"""
return self._route_to_leader_enabled

@property
def observability_options(self):
"""Getter for observability_options.

:rtype: dict
:returns: The configured observability_options if set.
"""
return self._observability_options

@property
def directed_read_options(self):
"""Getter for directed_read_options.
Expand Down
12 changes: 12 additions & 0 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,7 @@ def execute_pdml():
method=method,
request=request,
transaction_selector=txn_selector,
observability_options=self.observability_options,
)

result_set = StreamedResultSet(iterator)
Expand Down Expand Up @@ -1106,6 +1107,17 @@ def set_iam_policy(self, policy):
response = api.set_iam_policy(request=request, metadata=metadata)
return response

@property
def observability_options(self):
"""
Returns the observability options that you set when creating
the SpannerClient.
"""
if not (self._instance and self._instance._client):
return None

return getattr(self._instance._client, "observability_options", None)


class BatchCheckout(object):
"""Context manager for using a batch from a database.
Expand Down
20 changes: 17 additions & 3 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ def create(self):
if self._labels:
request.session.labels = self._labels

with trace_call("CloudSpanner.CreateSession", self, self._labels):
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.CreateSession",
self,
self._labels,
observability_options=observability_options,
):
session_pb = api.create_session(
request=request,
metadata=metadata,
Expand All @@ -169,7 +175,10 @@ def exists(self):
)
)

with trace_call("CloudSpanner.GetSession", self) as span:
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.GetSession", self, observability_options=observability_options
) as span:
try:
api.get_session(name=self.name, metadata=metadata)
if span:
Expand All @@ -194,7 +203,12 @@ def delete(self):
raise ValueError("Session ID not set by back-end")
api = self._database.spanner_api
metadata = _metadata_with_prefix(self._database.name)
with trace_call("CloudSpanner.DeleteSession", self):
observability_options = getattr(self._database, "observability_options", None)
with trace_call(
"CloudSpanner.DeleteSession",
self,
observability_options=observability_options,
):
api.delete_session(name=self.name, metadata=metadata)

def ping(self):
Expand Down
Loading