-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tracer support #20
base: master
Are you sure you want to change the base?
Tracer support #20
Changes from all commits
cedfbea
624eef2
1d02840
df80527
370eebd
bcf5f4e
87207b9
1e8d6f4
f6bd6fc
1f7ed3f
cdda0fb
b20b8c4
62a6b7a
86bae2c
ceefcc2
7573bf1
d2ef962
cc1face
49a48fb
5719bf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
from nameko_grpc.inspection import Inspector | ||
from nameko_grpc.streams import ReceiveStream, SendStream | ||
from nameko_grpc.timeout import unbucket_timeout | ||
from nameko_grpc.utils import Teeable | ||
|
||
|
||
log = getLogger(__name__) | ||
|
@@ -234,6 +235,8 @@ def handle_request(self, request_stream, response_stream): | |
|
||
if self.cardinality in (Cardinality.UNARY_STREAM, Cardinality.UNARY_UNARY): | ||
request = next(request) | ||
else: | ||
request = Teeable(request) | ||
|
||
context = GrpcContext(request_stream, response_stream) | ||
|
||
|
@@ -261,18 +264,28 @@ def handle_request(self, request_stream, response_stream): | |
def handle_result(self, response_stream, worker_ctx, result, exc_info): | ||
|
||
if self.cardinality in (Cardinality.STREAM_UNARY, Cardinality.UNARY_UNARY): | ||
result = (result,) | ||
response = (result,) | ||
else: | ||
result = Teeable(result) | ||
response = result | ||
|
||
if exc_info is None: | ||
try: | ||
response_stream.populate(result) | ||
except Exception as exception: | ||
error = GrpcError( | ||
status=StatusCode.UNKNOWN, | ||
details="Exception iterating responses: {}".format(exception), | ||
debug_error_string="<traceback>", | ||
) | ||
response_stream.close(error) | ||
|
||
def send_response(): | ||
try: | ||
response_stream.populate(response) | ||
except Exception as exception: | ||
error = GrpcError( | ||
status=StatusCode.UNKNOWN, | ||
details="Exception iterating responses: {}".format(exception), | ||
debug_error_string="<traceback>", | ||
) | ||
response_stream.close(error) | ||
|
||
self.container.spawn_managed_thread( | ||
send_response, identifier="send_response" | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Send response had to be deferred to a thread, otherwise the |
||
|
||
else: | ||
error = GrpcError( | ||
status=StatusCode.UNKNOWN, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
# -*- coding: utf-8 -*- | ||
from nameko_grpc.tracer.adapter import GrpcEntrypointAdapter # noqa: F401 | ||
from nameko_grpc.tracer.dependency import GrpcTracer # noqa: F401 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
# -*- coding: utf-8 -*- | ||
from nameko_tracer import constants | ||
from nameko_tracer.adapters import DefaultAdapter | ||
|
||
from nameko_grpc.constants import Cardinality | ||
|
||
|
||
GRPC_STREAM = "GrpcStream" | ||
GRPC_CONTEXT = "GrpcContext" | ||
GRPC_REQUEST = "GrpcRequest" | ||
GRPC_RESPONSE = "GrpcResponse" | ||
|
||
|
||
def is_request_record(extra): | ||
""" Determine whether the record represents a request. | ||
""" | ||
return extra["stage"] == constants.Stage.request | ||
|
||
|
||
def is_response_record(extra): | ||
""" Determine whether the record represents a response. | ||
""" | ||
return extra["stage"] == constants.Stage.response | ||
|
||
|
||
def is_stream_record(extra): | ||
""" Determine whether the record represents part of a streaming request or response. | ||
""" | ||
return "stream_part" in extra | ||
|
||
|
||
def is_error_record(extra): | ||
""" Determine whether the record represents an error response | ||
""" | ||
return is_response_record(extra) and extra["exc_info_"] | ||
|
||
|
||
def is_streaming_request_method(extra): | ||
""" Determine whether the record relates to a method that has a streaming request. | ||
|
||
Note that the record may be the request or response trace, or part of one of these. | ||
""" | ||
cardinality = get_cardinality(extra) | ||
return cardinality in (Cardinality.STREAM_UNARY, Cardinality.STREAM_STREAM) | ||
|
||
|
||
def is_streaming_response_method(extra): | ||
""" Determine whether the record relates to a method that has a streaming response. | ||
|
||
Note that the record may be the request or response trace, or part of one of these. | ||
""" | ||
cardinality = get_cardinality(extra) | ||
return cardinality in (Cardinality.UNARY_STREAM, Cardinality.STREAM_STREAM) | ||
|
||
|
||
def get_cardinality(extra): | ||
""" Extract the cardinality of the method that this record relates to. | ||
""" | ||
return extra["worker_ctx"].entrypoint.cardinality | ||
|
||
|
||
def add_cardinality(extra): | ||
""" Add the cardinality of the method to which this record relates to the trace | ||
data. | ||
""" | ||
trace_data = extra[constants.TRACE_KEY] | ||
trace_data["cardinality"] = get_cardinality(extra) | ||
|
||
|
||
def add_stream_part(extra): | ||
""" If this record represents part of a stream, add the stream part identifier to | ||
the trace data. | ||
""" | ||
if not is_stream_record(extra): | ||
return | ||
trace_data = extra[constants.TRACE_KEY] | ||
trace_data["stream_part"] = extra["stream_part"] | ||
|
||
|
||
def add_stream_age(extra): | ||
""" If this record represents part of a stream, add the commulative stream age to | ||
the trace data. | ||
""" | ||
if not is_stream_record(extra): | ||
return | ||
trace_data = extra[constants.TRACE_KEY] | ||
trace_data["stream_age"] = extra["stream_age"] | ||
|
||
|
||
def add_grpc_request(extra): | ||
""" Add the GRPC request message to the trace data for this record, under the | ||
`grpc_request` key. | ||
|
||
All records receive a value for this key. | ||
|
||
If this record relates to a method that has a streaming request and the record | ||
does not represent part that stream (i.e. it's a "top-level" record, or a response), | ||
the value is the GRPC_STREAM placeholder. | ||
""" | ||
trace_data = extra[constants.TRACE_KEY] | ||
|
||
if is_streaming_request_method(extra): | ||
if is_request_record(extra) and is_stream_record(extra): | ||
trace_data["grpc_request"] = extra["request"] | ||
else: | ||
trace_data["grpc_request"] = GRPC_STREAM | ||
else: | ||
request, context = extra["worker_ctx"].args | ||
trace_data["grpc_request"] = request | ||
|
||
|
||
def add_grpc_response(extra): | ||
""" Add the GRPC response message to the trace data for this record, under the | ||
`grpc_response` key. | ||
|
||
Only response records receive a value for this key. | ||
|
||
If this record relates to a method that has a streaming response and the record | ||
does not represent part of that stream (i.e. it's the "top-level" record), | ||
the value is the GRPC_STREAM placeholder. | ||
""" | ||
if not is_response_record(extra): | ||
return | ||
|
||
trace_data = extra[constants.TRACE_KEY] | ||
|
||
if is_streaming_response_method(extra): | ||
if is_stream_record(extra): | ||
trace_data["grpc_response"] = extra["result"] | ||
else: | ||
trace_data["grpc_response"] = GRPC_STREAM | ||
else: | ||
trace_data["grpc_response"] = extra["result"] | ||
|
||
|
||
def add_grpc_context(extra): | ||
""" Add the GRPC context object to the trace data for this record, under the | ||
`grpc_context` key. | ||
""" | ||
request, context = extra["worker_ctx"].args | ||
|
||
trace_data = extra[constants.TRACE_KEY] | ||
trace_data["grpc_context"] = context | ||
|
||
|
||
def clean_call_args(extra): | ||
""" Replace the `context` and `request` keys of `call_args` in the trace data for | ||
this record. | ||
|
||
These objects are exposed in the `grpc_context` and `grpc_request` fields | ||
respectively and don't need to be in multiple places. See `add_grpc_context` and | ||
`add_grpc_request` respectively. | ||
|
||
The value for `context` is the GRPC_CONTEXT placeholder. The value for `request` | ||
is the GRPC_REQUEST placeholder, or GRPC_STREAM placeholder if this record relates | ||
to a method that has a streaming request. | ||
""" | ||
|
||
trace_data = extra[constants.TRACE_KEY] | ||
trace_data["call_args"]["context"] = GRPC_CONTEXT | ||
|
||
if is_streaming_request_method(extra): | ||
trace_data["call_args"]["request"] = GRPC_STREAM | ||
else: | ||
trace_data["call_args"]["request"] = GRPC_REQUEST | ||
|
||
|
||
def clean_response(extra): | ||
""" Replaces the `response` key in the trace data for this record. | ||
|
||
Only successful response records have a value for this key. | ||
|
||
The GRPC response message is exposed in the `grpc_response` field and doesn't need | ||
to be in multiple places. See `add_grpc_response`. | ||
|
||
The value for `response` is the GRPC_RESPONSE placeholder, or GRPC_STREAM | ||
placeholder if this record relates to a method that has a streaming response. | ||
|
||
""" | ||
|
||
if not is_response_record(extra) or is_error_record(extra): | ||
return | ||
|
||
trace_data = extra[constants.TRACE_KEY] | ||
|
||
if is_streaming_response_method(extra) and not is_stream_record(extra): | ||
trace_data["response"] = GRPC_STREAM | ||
else: | ||
trace_data["response"] = GRPC_RESPONSE | ||
|
||
|
||
def clean_response_status(extra): | ||
""" Replaces `response_status` keys in the trace data for this record. | ||
|
||
Only response records have a value for this key. | ||
|
||
The value for is unchanged unless this record relates to a method that has a | ||
streaming response and the record does not represent part of that stream | ||
(i.e. it's the "top-level" record), and the record does not already indicate an | ||
error (i.e. the method immediately failed). | ||
|
||
The status of the response is therefore not yet known, so the value is set to | ||
`None`. | ||
|
||
""" | ||
if not is_response_record(extra) or is_error_record(extra): | ||
return | ||
|
||
trace_data = extra[constants.TRACE_KEY] | ||
|
||
if is_streaming_response_method(extra) and not is_stream_record(extra): | ||
# response status still unknown | ||
trace_data["response_status"] = None | ||
|
||
|
||
class GrpcEntrypointAdapter(DefaultAdapter): | ||
""" Logging adapter for methods decorated with the Grpc entrypoint. | ||
|
||
Records may represent one of the following: | ||
|
||
* The request to a "unary request" RPC method | ||
* The response from a "unary response" RPC method | ||
* The "top-level" request to a "streaming request" RPC method | ||
* Each "part" of the stream to a "streaming request" RPC method | ||
* The "top-level" response from a "streaming response" RPC method | ||
* Each "part" of the stream from a "streaming response" RPC method | ||
|
||
""" | ||
|
||
def process(self, message, kwargs): | ||
message, kwargs = super().process(message, kwargs) | ||
|
||
extra = kwargs["extra"] | ||
|
||
add_cardinality(extra) | ||
add_stream_part(extra) | ||
add_stream_age(extra) | ||
|
||
add_grpc_request(extra) | ||
add_grpc_response(extra) | ||
add_grpc_context(extra) | ||
|
||
clean_call_args(extra) | ||
clean_response(extra) | ||
clean_response_status(extra) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This whole module is super-nicely organised and documented! .) |
||
|
||
return message, kwargs | ||
|
||
def get_result(self, result): | ||
""" Override get_result to remove serialization. | ||
""" | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/send_response/grpc_send_response
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that is better. Actually there are lots of places where the
identifier
passed is quite sloppy. Evengrpc_send_response
is a bit useless -- it'd be nicer if it used the call stack so you could differentiate multiple threads for concurrent workers.