Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracer support #20

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions nameko_grpc/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from nameko_grpc.inspection import Inspector
from nameko_grpc.streams import ReceiveStream, SendStream
from nameko_grpc.timeout import unbucket_timeout
from nameko_grpc.utils import Teeable


log = getLogger(__name__)
Expand Down Expand Up @@ -234,6 +235,8 @@ def handle_request(self, request_stream, response_stream):

if self.cardinality in (Cardinality.UNARY_STREAM, Cardinality.UNARY_UNARY):
request = next(request)
else:
request = Teeable(request)

context = GrpcContext(request_stream, response_stream)

Expand Down Expand Up @@ -261,18 +264,28 @@ def handle_request(self, request_stream, response_stream):
def handle_result(self, response_stream, worker_ctx, result, exc_info):

if self.cardinality in (Cardinality.STREAM_UNARY, Cardinality.UNARY_UNARY):
result = (result,)
response = (result,)
else:
result = Teeable(result)
response = result

if exc_info is None:
try:
response_stream.populate(result)
except Exception as exception:
error = GrpcError(
status=StatusCode.UNKNOWN,
details="Exception iterating responses: {}".format(exception),
debug_error_string="<traceback>",
)
response_stream.close(error)

def send_response():
try:
response_stream.populate(response)
except Exception as exception:
error = GrpcError(
status=StatusCode.UNKNOWN,
details="Exception iterating responses: {}".format(exception),
debug_error_string="<traceback>",
)
response_stream.close(error)

self.container.spawn_managed_thread(
send_response, identifier="send_response"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/send_response/grpc_send_response ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is better. Actually there are lots of places where the identifier passed is quite sloppy. Even grpc_send_response is a bit useless -- it'd be nicer if it used the call stack so you could differentiate multiple threads for concurrent workers.

)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Send response had to be deferred to a thread, otherwise the response iterator was exhausted before worker_result was called in any DependencyProviders


else:
error = GrpcError(
status=StatusCode.UNKNOWN,
Expand Down
3 changes: 3 additions & 0 deletions nameko_grpc/tracer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# -*- coding: utf-8 -*-
from nameko_grpc.tracer.adapter import GrpcEntrypointAdapter # noqa: F401
from nameko_grpc.tracer.dependency import GrpcTracer # noqa: F401
252 changes: 252 additions & 0 deletions nameko_grpc/tracer/adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
# -*- coding: utf-8 -*-
from nameko_tracer import constants
from nameko_tracer.adapters import DefaultAdapter

from nameko_grpc.constants import Cardinality


GRPC_STREAM = "GrpcStream"
GRPC_CONTEXT = "GrpcContext"
GRPC_REQUEST = "GrpcRequest"
GRPC_RESPONSE = "GrpcResponse"


def is_request_record(extra):
""" Determine whether the record represents a request.
"""
return extra["stage"] == constants.Stage.request


def is_response_record(extra):
""" Determine whether the record represents a response.
"""
return extra["stage"] == constants.Stage.response


def is_stream_record(extra):
""" Determine whether the record represents part of a streaming request or response.
"""
return "stream_part" in extra


def is_error_record(extra):
""" Determine whether the record represents an error response
"""
return is_response_record(extra) and extra["exc_info_"]


def is_streaming_request_method(extra):
""" Determine whether the record relates to a method that has a streaming request.

Note that the record may be the request or response trace, or part of one of these.
"""
cardinality = get_cardinality(extra)
return cardinality in (Cardinality.STREAM_UNARY, Cardinality.STREAM_STREAM)


def is_streaming_response_method(extra):
""" Determine whether the record relates to a method that has a streaming response.

Note that the record may be the request or response trace, or part of one of these.
"""
cardinality = get_cardinality(extra)
return cardinality in (Cardinality.UNARY_STREAM, Cardinality.STREAM_STREAM)


def get_cardinality(extra):
""" Extract the cardinality of the method that this record relates to.
"""
return extra["worker_ctx"].entrypoint.cardinality


def add_cardinality(extra):
""" Add the cardinality of the method to which this record relates to the trace
data.
"""
trace_data = extra[constants.TRACE_KEY]
trace_data["cardinality"] = get_cardinality(extra)


def add_stream_part(extra):
""" If this record represents part of a stream, add the stream part identifier to
the trace data.
"""
if not is_stream_record(extra):
return
trace_data = extra[constants.TRACE_KEY]
trace_data["stream_part"] = extra["stream_part"]


def add_stream_age(extra):
""" If this record represents part of a stream, add the commulative stream age to
the trace data.
"""
if not is_stream_record(extra):
return
trace_data = extra[constants.TRACE_KEY]
trace_data["stream_age"] = extra["stream_age"]


def add_grpc_request(extra):
""" Add the GRPC request message to the trace data for this record, under the
`grpc_request` key.

All records receive a value for this key.

If this record relates to a method that has a streaming request and the record
does not represent part that stream (i.e. it's a "top-level" record, or a response),
the value is the GRPC_STREAM placeholder.
"""
trace_data = extra[constants.TRACE_KEY]

if is_streaming_request_method(extra):
if is_request_record(extra) and is_stream_record(extra):
trace_data["grpc_request"] = extra["request"]
else:
trace_data["grpc_request"] = GRPC_STREAM
else:
request, context = extra["worker_ctx"].args
trace_data["grpc_request"] = request


def add_grpc_response(extra):
""" Add the GRPC response message to the trace data for this record, under the
`grpc_response` key.

Only response records receive a value for this key.

If this record relates to a method that has a streaming response and the record
does not represent part of that stream (i.e. it's the "top-level" record),
the value is the GRPC_STREAM placeholder.
"""
if not is_response_record(extra):
return

trace_data = extra[constants.TRACE_KEY]

if is_streaming_response_method(extra):
if is_stream_record(extra):
trace_data["grpc_response"] = extra["result"]
else:
trace_data["grpc_response"] = GRPC_STREAM
else:
trace_data["grpc_response"] = extra["result"]


def add_grpc_context(extra):
""" Add the GRPC context object to the trace data for this record, under the
`grpc_context` key.
"""
request, context = extra["worker_ctx"].args

trace_data = extra[constants.TRACE_KEY]
trace_data["grpc_context"] = context


def clean_call_args(extra):
""" Replace the `context` and `request` keys of `call_args` in the trace data for
this record.

These objects are exposed in the `grpc_context` and `grpc_request` fields
respectively and don't need to be in multiple places. See `add_grpc_context` and
`add_grpc_request` respectively.

The value for `context` is the GRPC_CONTEXT placeholder. The value for `request`
is the GRPC_REQUEST placeholder, or GRPC_STREAM placeholder if this record relates
to a method that has a streaming request.
"""

trace_data = extra[constants.TRACE_KEY]
trace_data["call_args"]["context"] = GRPC_CONTEXT

if is_streaming_request_method(extra):
trace_data["call_args"]["request"] = GRPC_STREAM
else:
trace_data["call_args"]["request"] = GRPC_REQUEST


def clean_response(extra):
""" Replaces the `response` key in the trace data for this record.

Only successful response records have a value for this key.

The GRPC response message is exposed in the `grpc_response` field and doesn't need
to be in multiple places. See `add_grpc_response`.

The value for `response` is the GRPC_RESPONSE placeholder, or GRPC_STREAM
placeholder if this record relates to a method that has a streaming response.

"""

if not is_response_record(extra) or is_error_record(extra):
return

trace_data = extra[constants.TRACE_KEY]

if is_streaming_response_method(extra) and not is_stream_record(extra):
trace_data["response"] = GRPC_STREAM
else:
trace_data["response"] = GRPC_RESPONSE


def clean_response_status(extra):
""" Replaces `response_status` keys in the trace data for this record.

Only response records have a value for this key.

The value for is unchanged unless this record relates to a method that has a
streaming response and the record does not represent part of that stream
(i.e. it's the "top-level" record), and the record does not already indicate an
error (i.e. the method immediately failed).

The status of the response is therefore not yet known, so the value is set to
`None`.

"""
if not is_response_record(extra) or is_error_record(extra):
return

trace_data = extra[constants.TRACE_KEY]

if is_streaming_response_method(extra) and not is_stream_record(extra):
# response status still unknown
trace_data["response_status"] = None


class GrpcEntrypointAdapter(DefaultAdapter):
""" Logging adapter for methods decorated with the Grpc entrypoint.

Records may represent one of the following:

* The request to a "unary request" RPC method
* The response from a "unary response" RPC method
* The "top-level" request to a "streaming request" RPC method
* Each "part" of the stream to a "streaming request" RPC method
* The "top-level" response from a "streaming response" RPC method
* Each "part" of the stream from a "streaming response" RPC method

"""

def process(self, message, kwargs):
message, kwargs = super().process(message, kwargs)

extra = kwargs["extra"]

add_cardinality(extra)
add_stream_part(extra)
add_stream_age(extra)

add_grpc_request(extra)
add_grpc_response(extra)
add_grpc_context(extra)

clean_call_args(extra)
clean_response(extra)
clean_response_status(extra)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole module is super-nicely organised and documented! .)


return message, kwargs

def get_result(self, result):
""" Override get_result to remove serialization.
"""
return result
Loading