Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add span monitor process to optionally monitor the spans in a process #84

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion include/ot_span.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,10 @@
%% trace flags lowest bit is 1 but simply not propagated.
is_recording :: boolean() | undefined,

instrumentation_library :: #instrumentation_library{} | undefined
instrumentation_library :: #instrumentation_library{} | undefined,

%% this is the Erlang process the span is active in. It is used only for the optional
%% process monitoring feature where a process can be monitored and have all spans
%% active in that process be ended if the process exits for any reason.
pid :: pid() | undefined
}).
117 changes: 117 additions & 0 deletions src/ot_span_monitor.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
%%%------------------------------------------------------------------------
%% Copyright 2020, OpenTelemetry Authors
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% @doc
%% Process that can optionally monitor the process a span is in and end the
%% span if the process stops for any reason with the span still unfinished.
%% @end
%%%-------------------------------------------------------------------------
-module(ot_span_monitor).

-behaviour(gen_server).

-export([start_link/0,
add_self/0,
add/1]).

-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2]).

-include("ot_span_ets.hrl").
-include("ot_span.hrl").
-include("ot_tracer.hrl").

-define(SERVER, ?MODULE).

-record(state, {monitors :: #{reference() => pid()},
monitored_pids :: sets:set(pid())}).

start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

%% @doc Monitor the current process and end all spans in it if the process stops.
-spec add_self() -> ok.
add_self() ->
gen_server:call(?SERVER, {monitor, self()}).

%% @doc Monitor another process and end all spans in it if the process stops.
-spec add(pid()) -> ok.
add(Pid) ->
gen_server:call(?SERVER, {monitor, Pid}).

init(_Opts) ->
{ok, #state{monitors=#{},
monitored_pids=sets:new()}}.

handle_call({monitor, Pid}, _From, State=#state{monitors=Monitors,
monitored_pids=MonitoredPids}) ->
case sets:is_element(Pid, MonitoredPids) of
true ->
{reply, ok, State};
false ->
Ref = erlang:monitor(process, Pid),
{reply, ok, State#state{monitors=Monitors#{Ref => Pid},
monitored_pids=sets:add_element(Pid, MonitoredPids)}}
end.

handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', Ref, process, Pid, _Info}, State=#state{monitors=Monitors,
monitored_pids=MonitoredPids}) ->
case maps:take(Ref, Monitors) of
{P, Monitors1} when P =:= Pid ->
end_spans(Pid),
{noreply, State#state{monitors=Monitors1,
monitored_pids=sets:del_element(Pid, MonitoredPids)}};
error ->
{noreply, State}
end.

%%

%% ignore these functions because dialyzer doesn't like match spec use of '_'
-dialyzer({nowarn_function, end_spans/1}).
-dialyzer({nowarn_function, match_spec/2}).
-dialyzer({nowarn_function, end_span/1}).
-dialyzer({nowarn_function, select/1}).

%% TODO: need a `select_take' or `match_take' in ets
end_spans(Pid) ->
Spans = select(Pid),
[begin
case ets:take(?SPAN_TAB, SpanId) of
[] ->
ok;
[Span] ->
end_span(Span)
end
end || SpanId <- Spans],
ok.

select(Pid) ->
ets:select(?SPAN_TAB, match_spec(Pid, '$1')).

match_spec(Pid, Return) ->
[{#span{span_id='$1', pid='$2', _='_'},
[{'=:=', '$2', Pid}],
[Return]}].

end_span(Span=#span{tracestate=Tracestate}) ->
%% hack to not lose tracestate when ending without span ctx
Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}),
{_, #tracer{on_end_processors=Processors}} = opentelemetry:get_tracer(),
Processors(Span1).
9 changes: 8 additions & 1 deletion src/ot_span_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,21 @@ init([Opts]) ->
type => worker,
modules => [ot_span_sweeper]},

Monitor = #{id => ot_span_monitor,
start => {ot_span_monitor, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [ot_span_monitor]},

SpanHandler = #{id => ot_span_ets,
start => {ot_span_ets, start_link, [[]]},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [ot_span_ets]},

ChildSpecs = [SpanHandler, Sweeper],
ChildSpecs = [SpanHandler, Sweeper, Monitor],
{ok, {SupFlags, ChildSpecs}}.

%% internal functions
7 changes: 5 additions & 2 deletions src/ot_span_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ start_span(Name, Opts) ->
Kind = maps:get(kind, Opts, ?SPAN_KIND_INTERNAL),
Sampler = maps:get(sampler, Opts),
StartTime = maps:get(start_time, Opts, opentelemetry:timestamp()),
new_span(Name, Parent, Sampler, StartTime, Kind, Attributes, Links).
Result = new_span(Name, Parent, Sampler, StartTime, Kind, Attributes, Links),
case maps:get(monitor, Opts, false) of true -> ot_span_monitor:add_self(); _ -> ok end,
Result.

%% if parent is undefined create a new trace id
new_span(Name, undefined, Sampler, StartTime, Kind, Attributes, Links) ->
Expand Down Expand Up @@ -69,7 +71,8 @@ new_span(Name, Parent=#span_ctx{trace_id=TraceId,
name=Name,
attributes=Attributes++SamplerAttributes,
links=Links,
is_recording=IsRecording},
is_recording=IsRecording,
pid=self()},

{SpanCtx#span_ctx{trace_flags=TraceFlags,
is_recording=IsRecording}, Span}.
Expand Down
30 changes: 6 additions & 24 deletions test/opentelemetry_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ macros(Config) ->

?end_span(),

[Span1] = assert_exported(Tid, SpanCtx1),
[Span1] = ot_test_utils:assert_exported(Tid, SpanCtx1),

?assertEqual([{Attr1, AttrValue1}], Span1#span.attributes),

Expand Down Expand Up @@ -121,7 +121,7 @@ child_spans(Config) ->
?assertMatch(SpanCtx3, ?current_span_ctx()),
?end_span(),

assert_exported(Tid, SpanCtx3),
ot_test_utils:assert_exported(Tid, SpanCtx3),

%% 2nd span should be the current span ctx now
?assertMatch(SpanCtx2, ?current_span_ctx()),
Expand All @@ -143,9 +143,9 @@ child_spans(Config) ->
?end_span(),
?assertMatch(undefined, ?current_span_ctx()),

assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3]),
ot_test_utils:assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3]),

[Span4] = assert_exported(Tid, SpanCtx4),
[Span4] = ot_test_utils:assert_exported(Tid, SpanCtx4),

?assertEqual(EarlierTimestamp, Span4#span.start_time).

Expand Down Expand Up @@ -231,7 +231,7 @@ tracer_instrumentation_library(Config) ->
ot_tracer:end_span(Tracer),
?assertMatch(undefined, ?current_span_ctx()),

[Span1] = assert_exported(Tid, SpanCtx1),
[Span1] = ot_test_utils:assert_exported(Tid, SpanCtx1),

?assertEqual({instrumentation_library,<<"tracer1">>,<<"1.0.0">>}, Span1#span.instrumentation_library).

Expand Down Expand Up @@ -263,7 +263,7 @@ tracer_previous_ctx(Config) ->
ot_tracer:set_span(Tracer, SpanCtx2),
ot_tracer:end_span(Tracer),

assert_all_exported(Tid, [SpanCtx3, SpanCtx1, SpanCtx2]),
ot_test_utils:assert_all_exported(Tid, [SpanCtx3, SpanCtx1, SpanCtx2]),

ok.

Expand All @@ -282,24 +282,6 @@ stop_temporary_app(_Config) ->

%%

assert_all_exported(Tid, SpanCtxs) ->
[assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs].

assert_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId}) ->
?UNTIL_NOT_EQUAL([], ets:match_object(Tid, #span{trace_id=TraceId,
span_id=SpanId,
_='_'})).

assert_not_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId}) ->
%% sleep so exporter has run before we check
%% since we can't do like when checking it exists with UNTIL
timer:sleep(100),
?assertMatch([], ets:match(Tid, #span{trace_id=TraceId,
span_id=SpanId,
_='_'})).

trace_context(w3c, EncodedTraceId, EncodedSpanId) ->
[{<<"traceparent">>,
[<<"00">>, "-", EncodedTraceId,"-", EncodedSpanId, "-", <<"01">>]}];
Expand Down
78 changes: 78 additions & 0 deletions test/ot_span_monitor_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
%%% ---------------------------------------------------------------------------
%%% @doc
%%% @end
%%% ---------------------------------------------------------------------------
-module(ot_span_monitor_SUITE).

-compile(export_all).

-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").

-include("ot_test_utils.hrl").
-include("ot_span.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").
-include_lib("opentelemetry_api/include/tracer.hrl").

-include("../src/ot_span_ets.hrl").

all() ->
[{group, normal}, {group, abnormal}].

groups() ->
[{normal, [], [monitor_pid]},
{abnormal, [], [monitor_pid]}].

init_per_suite(Config) ->
application:load(opentelemetry),
Config.

end_per_suite(_Config) ->
application:unload(opentelemetry),
ok.

init_per_testcase(ExitType, Config) ->
application:set_env(opentelemetry, processors, [{ot_batch_processor, #{scheduled_delay_ms => 1}}]),
{ok, _} = application:ensure_all_started(opentelemetry),
%% adds an exporter for a new table
%% spans will be exported to a separate table for each of the test cases
Tid = ets:new(exported_spans, [public, bag]),
ot_batch_processor:set_exporter(ot_exporter_tab, Tid),
[{exit_type, ExitType}, {tid, Tid} | Config].

end_per_testcase(_, _Config) ->
_ = application:stop(opentelemetry),
ok.

monitor_pid(Config) ->
process_flag(trap_exit, true),
ExitType = ?config(exit_type, Config),
Tid = ?config(tid, Config),

Attr1 = <<"attr-1">>,
AttrValue1 = <<"attr-value-1">>,

Pid = erlang:spawn_link(fun() ->
_SpanCtx1 = ?start_span(<<"span-1">>),
SpanCtx2 = ?start_span(<<"span-2">>, #{monitor => true}),

?assertMatch(SpanCtx2, ?current_span_ctx),

?set_attribute(Attr1, AttrValue1),

erlang:exit(ExitType)
end),

receive
{'EXIT', Pid, Reason} when Reason =:= ExitType ->
%% process is down now check that there are 2 ended spans in the table
%% even though only span-2 had monitor set to true span-1 is ended as
%% well since it is based on the process of the span
?UNTIL(2 =:= ets:info(Tid, size)),

Spans = ets:tab2list(Tid),

Span2 = lists:keyfind(<<"span-2">>, #span.name, Spans),
?assertEqual([{Attr1, AttrValue1}], Span2#span.attributes),
ok
end.
28 changes: 28 additions & 0 deletions test/ot_test_utils.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-module(ot_test_utils).

-export([assert_all_exported/2,
assert_exported/2,
assert_not_exported/2]).

-include_lib("stdlib/include/assert.hrl").
-include_lib("opentelemetry_api/include/opentelemetry.hrl").
-include("ot_test_utils.hrl").
-include("ot_span.hrl").

assert_all_exported(Tid, SpanCtxs) ->
[assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs].

assert_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId}) ->
?UNTIL_NOT_EQUAL([], ets:match_object(Tid, #span{trace_id=TraceId,
span_id=SpanId,
_='_'})).

assert_not_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId}) ->
%% sleep so exporter has run before we check
%% since we can't do like when checking it exists with UNTIL
timer:sleep(100),
?assertMatch([], ets:match(Tid, #span{trace_id=TraceId,
span_id=SpanId,
_='_'})).