From f4a197c5677d7ce75e9d5151682c992f777f8f58 Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Fri, 26 Jun 2020 14:57:14 -0600 Subject: [PATCH] add span monitor process A span can optionally set `monitor` to true at the time it is created. This results in a monitor on the process and if that process exits for any reason all spans started in that process will be ended. --- include/ot_span.hrl | 7 +- src/ot_span_monitor.erl | 117 +++++++++++++++++++++++++++++++++ src/ot_span_sup.erl | 9 ++- src/ot_span_utils.erl | 7 +- test/opentelemetry_SUITE.erl | 30 ++------- test/ot_span_monitor_SUITE.erl | 78 ++++++++++++++++++++++ test/ot_test_utils.erl | 28 ++++++++ 7 files changed, 248 insertions(+), 28 deletions(-) create mode 100644 src/ot_span_monitor.erl create mode 100644 test/ot_span_monitor_SUITE.erl create mode 100644 test/ot_test_utils.erl diff --git a/include/ot_span.hrl b/include/ot_span.hrl index ad2f0635..c599e1d5 100644 --- a/include/ot_span.hrl +++ b/include/ot_span.hrl @@ -74,5 +74,10 @@ %% trace flags lowest bit is 1 but simply not propagated. is_recording :: boolean() | undefined, - instrumentation_library :: #instrumentation_library{} | undefined + instrumentation_library :: #instrumentation_library{} | undefined, + + %% this is the Erlang process the span is active in. It is used only for the optional + %% process monitoring feature where a process can be monitored and have all spans + %% active in that process be ended if the process exits for any reason. + pid :: pid() | undefined }). diff --git a/src/ot_span_monitor.erl b/src/ot_span_monitor.erl new file mode 100644 index 00000000..175a7005 --- /dev/null +++ b/src/ot_span_monitor.erl @@ -0,0 +1,117 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2020, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% Process that can optionally monitor the process a span is in and end the +%% span if the process stops for any reason with the span still unfinished. +%% @end +%%%------------------------------------------------------------------------- +-module(ot_span_monitor). + +-behaviour(gen_server). + +-export([start_link/0, + add_self/0, + add/1]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +-include("ot_span_ets.hrl"). +-include("ot_span.hrl"). +-include("ot_tracer.hrl"). + +-define(SERVER, ?MODULE). + +-record(state, {monitors :: #{reference() => pid()}, + monitored_pids :: sets:set(pid())}). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% @doc Monitor the current process and end all spans in it if the process stops. +-spec add_self() -> ok. +add_self() -> + gen_server:call(?SERVER, {monitor, self()}). + +%% @doc Monitor another process and end all spans in it if the process stops. +-spec add(pid()) -> ok. +add(Pid) -> + gen_server:call(?SERVER, {monitor, Pid}). + +init(_Opts) -> + {ok, #state{monitors=#{}, + monitored_pids=sets:new()}}. + +handle_call({monitor, Pid}, _From, State=#state{monitors=Monitors, + monitored_pids=MonitoredPids}) -> + case sets:is_element(Pid, MonitoredPids) of + true -> + {reply, ok, State}; + false -> + Ref = erlang:monitor(process, Pid), + {reply, ok, State#state{monitors=Monitors#{Ref => Pid}, + monitored_pids=sets:add_element(Pid, MonitoredPids)}} + end. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', Ref, process, Pid, _Info}, State=#state{monitors=Monitors, + monitored_pids=MonitoredPids}) -> + case maps:take(Ref, Monitors) of + {P, Monitors1} when P =:= Pid -> + end_spans(Pid), + {noreply, State#state{monitors=Monitors1, + monitored_pids=sets:del_element(Pid, MonitoredPids)}}; + error -> + {noreply, State} + end. + +%% + +%% ignore these functions because dialyzer doesn't like match spec use of '_' +-dialyzer({nowarn_function, end_spans/1}). +-dialyzer({nowarn_function, match_spec/2}). +-dialyzer({nowarn_function, end_span/1}). +-dialyzer({nowarn_function, select/1}). + +%% TODO: need a `select_take' or `match_take' in ets +end_spans(Pid) -> + Spans = select(Pid), + [begin + case ets:take(?SPAN_TAB, SpanId) of + [] -> + ok; + [Span] -> + end_span(Span) + end + end || SpanId <- Spans], + ok. + +select(Pid) -> + ets:select(?SPAN_TAB, match_spec(Pid, '$1')). + +match_spec(Pid, Return) -> + [{#span{span_id='$1', pid='$2', _='_'}, + [{'=:=', '$2', Pid}], + [Return]}]. + +end_span(Span=#span{tracestate=Tracestate}) -> + %% hack to not lose tracestate when ending without span ctx + Span1 = ot_span_utils:end_span(Span#span{tracestate=Tracestate}), + {_, #tracer{on_end_processors=Processors}} = opentelemetry:get_tracer(), + Processors(Span1). diff --git a/src/ot_span_sup.erl b/src/ot_span_sup.erl index b8babf9b..dfed2936 100644 --- a/src/ot_span_sup.erl +++ b/src/ot_span_sup.erl @@ -45,6 +45,13 @@ init([Opts]) -> type => worker, modules => [ot_span_sweeper]}, + Monitor = #{id => ot_span_monitor, + start => {ot_span_monitor, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [ot_span_monitor]}, + SpanHandler = #{id => ot_span_ets, start => {ot_span_ets, start_link, [[]]}, restart => permanent, @@ -52,7 +59,7 @@ init([Opts]) -> type => worker, modules => [ot_span_ets]}, - ChildSpecs = [SpanHandler, Sweeper], + ChildSpecs = [SpanHandler, Sweeper, Monitor], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/src/ot_span_utils.erl b/src/ot_span_utils.erl index 5fd14e38..1ab26a7a 100644 --- a/src/ot_span_utils.erl +++ b/src/ot_span_utils.erl @@ -37,7 +37,9 @@ start_span(Name, Opts) -> Kind = maps:get(kind, Opts, ?SPAN_KIND_INTERNAL), Sampler = maps:get(sampler, Opts), StartTime = maps:get(start_time, Opts, opentelemetry:timestamp()), - new_span(Name, Parent, Sampler, StartTime, Kind, Attributes, Links). + Result = new_span(Name, Parent, Sampler, StartTime, Kind, Attributes, Links), + case maps:get(monitor, Opts, false) of true -> ot_span_monitor:add_self(); _ -> ok end, + Result. %% if parent is undefined create a new trace id new_span(Name, undefined, Sampler, StartTime, Kind, Attributes, Links) -> @@ -69,7 +71,8 @@ new_span(Name, Parent=#span_ctx{trace_id=TraceId, name=Name, attributes=Attributes++SamplerAttributes, links=Links, - is_recording=IsRecording}, + is_recording=IsRecording, + pid=self()}, {SpanCtx#span_ctx{trace_flags=TraceFlags, is_recording=IsRecording}, Span}. diff --git a/test/opentelemetry_SUITE.erl b/test/opentelemetry_SUITE.erl index 2ffdaa35..427823ca 100644 --- a/test/opentelemetry_SUITE.erl +++ b/test/opentelemetry_SUITE.erl @@ -82,7 +82,7 @@ macros(Config) -> ?end_span(), - [Span1] = assert_exported(Tid, SpanCtx1), + [Span1] = ot_test_utils:assert_exported(Tid, SpanCtx1), ?assertEqual([{Attr1, AttrValue1}], Span1#span.attributes), @@ -121,7 +121,7 @@ child_spans(Config) -> ?assertMatch(SpanCtx3, ?current_span_ctx()), ?end_span(), - assert_exported(Tid, SpanCtx3), + ot_test_utils:assert_exported(Tid, SpanCtx3), %% 2nd span should be the current span ctx now ?assertMatch(SpanCtx2, ?current_span_ctx()), @@ -143,9 +143,9 @@ child_spans(Config) -> ?end_span(), ?assertMatch(undefined, ?current_span_ctx()), - assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3]), + ot_test_utils:assert_all_exported(Tid, [SpanCtx1, SpanCtx2, SpanCtx3]), - [Span4] = assert_exported(Tid, SpanCtx4), + [Span4] = ot_test_utils:assert_exported(Tid, SpanCtx4), ?assertEqual(EarlierTimestamp, Span4#span.start_time). @@ -231,7 +231,7 @@ tracer_instrumentation_library(Config) -> ot_tracer:end_span(Tracer), ?assertMatch(undefined, ?current_span_ctx()), - [Span1] = assert_exported(Tid, SpanCtx1), + [Span1] = ot_test_utils:assert_exported(Tid, SpanCtx1), ?assertEqual({instrumentation_library,<<"tracer1">>,<<"1.0.0">>}, Span1#span.instrumentation_library). @@ -263,7 +263,7 @@ tracer_previous_ctx(Config) -> ot_tracer:set_span(Tracer, SpanCtx2), ot_tracer:end_span(Tracer), - assert_all_exported(Tid, [SpanCtx3, SpanCtx1, SpanCtx2]), + ot_test_utils:assert_all_exported(Tid, [SpanCtx3, SpanCtx1, SpanCtx2]), ok. @@ -282,24 +282,6 @@ stop_temporary_app(_Config) -> %% -assert_all_exported(Tid, SpanCtxs) -> - [assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs]. - -assert_exported(Tid, #span_ctx{trace_id=TraceId, - span_id=SpanId}) -> - ?UNTIL_NOT_EQUAL([], ets:match_object(Tid, #span{trace_id=TraceId, - span_id=SpanId, - _='_'})). - -assert_not_exported(Tid, #span_ctx{trace_id=TraceId, - span_id=SpanId}) -> - %% sleep so exporter has run before we check - %% since we can't do like when checking it exists with UNTIL - timer:sleep(100), - ?assertMatch([], ets:match(Tid, #span{trace_id=TraceId, - span_id=SpanId, - _='_'})). - trace_context(w3c, EncodedTraceId, EncodedSpanId) -> [{<<"traceparent">>, [<<"00">>, "-", EncodedTraceId,"-", EncodedSpanId, "-", <<"01">>]}]; diff --git a/test/ot_span_monitor_SUITE.erl b/test/ot_span_monitor_SUITE.erl new file mode 100644 index 00000000..ce614312 --- /dev/null +++ b/test/ot_span_monitor_SUITE.erl @@ -0,0 +1,78 @@ +%%% --------------------------------------------------------------------------- +%%% @doc +%%% @end +%%% --------------------------------------------------------------------------- +-module(ot_span_monitor_SUITE). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include("ot_test_utils.hrl"). +-include("ot_span.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/tracer.hrl"). + +-include("../src/ot_span_ets.hrl"). + +all() -> + [{group, normal}, {group, abnormal}]. + +groups() -> + [{normal, [], [monitor_pid]}, + {abnormal, [], [monitor_pid]}]. + +init_per_suite(Config) -> + application:load(opentelemetry), + Config. + +end_per_suite(_Config) -> + application:unload(opentelemetry), + ok. + +init_per_testcase(ExitType, Config) -> + application:set_env(opentelemetry, processors, [{ot_batch_processor, #{scheduled_delay_ms => 1}}]), + {ok, _} = application:ensure_all_started(opentelemetry), + %% adds an exporter for a new table + %% spans will be exported to a separate table for each of the test cases + Tid = ets:new(exported_spans, [public, bag]), + ot_batch_processor:set_exporter(ot_exporter_tab, Tid), + [{exit_type, ExitType}, {tid, Tid} | Config]. + +end_per_testcase(_, _Config) -> + _ = application:stop(opentelemetry), + ok. + +monitor_pid(Config) -> + process_flag(trap_exit, true), + ExitType = ?config(exit_type, Config), + Tid = ?config(tid, Config), + + Attr1 = <<"attr-1">>, + AttrValue1 = <<"attr-value-1">>, + + Pid = erlang:spawn_link(fun() -> + _SpanCtx1 = ?start_span(<<"span-1">>), + SpanCtx2 = ?start_span(<<"span-2">>, #{monitor => true}), + + ?assertMatch(SpanCtx2, ?current_span_ctx), + + ?set_attribute(Attr1, AttrValue1), + + erlang:exit(ExitType) + end), + + receive + {'EXIT', Pid, Reason} when Reason =:= ExitType -> + %% process is down now check that there are 2 ended spans in the table + %% even though only span-2 had monitor set to true span-1 is ended as + %% well since it is based on the process of the span + ?UNTIL(2 =:= ets:info(Tid, size)), + + Spans = ets:tab2list(Tid), + + Span2 = lists:keyfind(<<"span-2">>, #span.name, Spans), + ?assertEqual([{Attr1, AttrValue1}], Span2#span.attributes), + ok + end. diff --git a/test/ot_test_utils.erl b/test/ot_test_utils.erl new file mode 100644 index 00000000..a7335153 --- /dev/null +++ b/test/ot_test_utils.erl @@ -0,0 +1,28 @@ +-module(ot_test_utils). + +-export([assert_all_exported/2, + assert_exported/2, + assert_not_exported/2]). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include("ot_test_utils.hrl"). +-include("ot_span.hrl"). + +assert_all_exported(Tid, SpanCtxs) -> + [assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs]. + +assert_exported(Tid, #span_ctx{trace_id=TraceId, + span_id=SpanId}) -> + ?UNTIL_NOT_EQUAL([], ets:match_object(Tid, #span{trace_id=TraceId, + span_id=SpanId, + _='_'})). + +assert_not_exported(Tid, #span_ctx{trace_id=TraceId, + span_id=SpanId}) -> + %% sleep so exporter has run before we check + %% since we can't do like when checking it exists with UNTIL + timer:sleep(100), + ?assertMatch([], ets:match(Tid, #span{trace_id=TraceId, + span_id=SpanId, + _='_'})).