Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-31381 Forward trace summary values to Open Telemetry spans #18560

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions esp/platform/esp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ typedef enum LogRequest_
#define TXSUMMARY_GRP_CORE 0x00000001
#define TXSUMMARY_GRP_ENTERPRISE 0x00000002

/// Generate original trace summary contnet.
#define TXSUMMARY_OUT_TEXT 0x00000001
/// Generate JSON-formatted trace summary content.
#define TXSUMMARY_OUT_JSON 0x00000002
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why'd we choose that higher (out of order) bit to signify FWD_OTEL? perhaps a comment would be helpful if we're reserving the lower bits for specific TXSUMMARY_OUT types, and want to use the last few bits for another set of flags

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support serialization of summary content using the two output styles. I'm piggybacking on that mechanism to cause cumulative timer forwarding at the end of a transaction and thought the value separation for the new bit would be an additional clue that it isn't a serialization output. Plus, as the span takes over I can envision adding an otel output style that does request serialization in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Let's not just leave clues, let's be explicit to avoid any potential future misunderstandings.

/// Forward cumulative timer values to a JTrace span using the serialization interface. No serialization.
#define TXSUMMARY_FWD_TIMERS 0x80000000

#define ESPCTX_NO_NAMESPACES 0x00000001
#define ESPCTX_WSDL 0x00000010
Expand All @@ -104,6 +108,7 @@ class CTxSummary;
interface IEspSecureContext;
interface IEspSecureContextEx;
class CumulativeTimer;
enum class SummaryValueUnits;

interface IEspContext : extends IInterface
{
Expand Down Expand Up @@ -197,9 +202,15 @@ interface IEspContext : extends IInterface

virtual CTxSummary* queryTxSummary()=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, SummaryValueUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* spanAttrName, SummaryValueUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryCumulativeTime(unsigned logLevel, const char* name, unsigned __int64 time, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual CumulativeTimer* queryTraceSummaryCumulativeTimer(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void cancelTxSummary()=0;
Expand Down
41 changes: 36 additions & 5 deletions esp/platform/espcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CEspContext : public CInterface, implements IEspContext
if (m_txSummary)
{
m_txSummary->tailor(this);
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle());
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle() | TXSUMMARY_FWD_TIMERS);
}
}
virtual void addOptions(unsigned opts){options|=opts;}
Expand Down Expand Up @@ -512,28 +512,59 @@ class CEspContext : public CInterface, implements IEspContext
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const char* spanAttrName, const unsigned int group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, spanAttrName, SummaryValueUnits::NA, logLevel, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, SummaryValueUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, spanAttrName, SummaryValueUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, SummaryValueUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, nullptr, units, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* spanAttrName, SummaryValueUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, spanAttrName, units, logLevel, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryDoubleValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, spanAttrName, SummaryValueUnits::NA, logLevel, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryTimeStamp(logLevel, name, nullptr, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const char* spanAttrName, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, m_txSummary->getElapsedTime(), logLevel, group, "ms");
m_txSummary->append(name, m_txSummary->getElapsedTime(), spanAttrName, SummaryValueUnits::millis, logLevel, group, "ms");
}

virtual void flushTraceSummary()
{
updateTraceSummaryHeader();
Expand Down
106 changes: 106 additions & 0 deletions esp/platform/txsummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@
static auto pRequestCount = hpccMetrics::registerCounterMetric("esp.requests.received", "Number of requests received", SMeasureCount);
#endif

namespace
{
// Summary entries iterated for serialization lack awareness of the summary that contains them.
// Forwarding cumulative timers, which piggy-backs on the serialization logic, needs to know
// the summary's span forawarder. The forwarder is set immediately beforo, and cleared after,
// the forwarding iteration.
static thread_local CTxSummarySpanForwarder* spanForwarder = nullptr;

struct ForwarderScope
{
ForwarderScope(CTxSummarySpanForwarder* forwarder) { spanForwarder = forwarder; }
~ForwarderScope() { spanForwarder = nullptr; }
};
}

inline bool validate(const char* k)
{
// Empty or null keys are invalid
Expand Down Expand Up @@ -126,6 +141,11 @@ StringBuffer& CTxSummary::TxEntryTimer::serialize(StringBuffer& buf, const LogLe
buf.append(fullname);
buf.appendf("=%" I64F "u;", value->getTotalMillis());
}
else if (requestedStyle & TXSUMMARY_FWD_TIMERS)
{
if (spanForwarder)
spanForwarder->forwardAttribute(name, nullptr, value->getTotalMillis(), SummaryValueUnits::millis, queryLogLevel(), queryGroup());
}

return buf;
}
Expand Down Expand Up @@ -332,6 +352,7 @@ StringBuffer& CTxSummary::TxEntryObject::serialize(StringBuffer& buf, const LogL

CTxSummary::CTxSummary(unsigned creationTime)
: m_creationTime(creationTime ? creationTime : msTick())
, forwarder(new CTxSummarySpanForwarder)
{
#ifdef _SOLVED_DYNAMIC_METRIC_PROBLEM
pRequestCount->inc(1);
Expand Down Expand Up @@ -591,6 +612,14 @@ void CTxSummary::log(const LogLevel logLevel, const unsigned int requestedGroup,
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_OUT_JSON);
DBGLOG("%s", summary.str());
}

if (requestedStyle & TXSUMMARY_FWD_TIMERS)
{
// String values have already been forwarded and should ignore this serialization
// request. Timer values have not been forwarded, and should forward their values.
ForwarderScope scope(forwarder);
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_FWD_TIMERS);
}
}
}

Expand Down Expand Up @@ -663,3 +692,80 @@ CTxSummary::TxEntryBase* CTxSummary::queryEntry(const char* key)

return nullptr;
}

ISpan* CTxSummarySpanForwarder::prepareToForward(const char* summaryKey, const char* spanKey, StringBuffer& actualKey, LogLevel logLevel, unsigned groupMask) const
{
if (!isExcluded(normalizeKey(summaryKey, spanKey, actualKey), logLevel, groupMask))
{
ISpan* target = queryThreadedActiveSpan();
if (target && target->isRecording())
return target;
}
return nullptr;
}

StringBuffer& CTxSummarySpanForwarder::normalizeKey(const char* summaryKey, const char* spanKey, StringBuffer& actualKey) const
{
getSnakeCase(actualKey, isEmptyString(spanKey) ? summaryKey : spanKey);
// Keys for custom attributes may be annotated to reflect the source here.
return actualKey;
}

bool CTxSummarySpanForwarder::isExcluded(const char* key, LogLevel logLevel, unsigned groupMask) const
{
if ((logLevel > maxLogLevel) || ((groupMask & groupSelector) != groupSelector) || isEmptyString(key))
return true;
// Redundant attributes may be excluded by key here. For example, if spans include values like
// the global ID, it may be preferred to not forward a trace summary-sourced copy.
return false;
}

void CTxSummarySpanForwarder::forwardUnsigned(const char* summaryKey, const char* spanKey, uint64_t value, SummaryValueUnits units, LogLevel logLevel, unsigned groupMask) const
{
StringBuffer attrName;
ISpan* target = prepareToForward(summaryKey, spanKey, attrName, logLevel, groupMask);
if (target)
target->setSpanAttribute(attrName, scale(value, units));
}

void CTxSummarySpanForwarder::forwardSigned(const char* summaryKey, const char* spanKey, int64_t value, SummaryValueUnits units, LogLevel logLevel, unsigned groupMask) const
{
StringBuffer attrName;
ISpan* target = prepareToForward(summaryKey, spanKey, attrName, logLevel, groupMask);
if (target)
target->setSpanAttribute(attrName, uint64_t(scale(value, units)));
}

void CTxSummarySpanForwarder::forwardDouble(const char* summaryKey, const char* spanKey, double value, LogLevel logLevel, unsigned groupMask) const
{
StringBuffer attrName;
ISpan* target = prepareToForward(summaryKey, spanKey, attrName, logLevel, groupMask);
if (target)
{
StringBuffer tmp;
tmp.append(value);
target->setSpanAttribute(attrName, tmp);
}
}

void CTxSummarySpanForwarder::forwardBool(const char* summaryKey, const char* spanKey, bool value, LogLevel logLevel, unsigned groupMask) const
{
StringBuffer attrName;
ISpan* target = prepareToForward(summaryKey, spanKey, attrName, logLevel, groupMask);
if (target)
target->setSpanAttribute(attrName, uint64_t(value));
}

void CTxSummarySpanForwarder::forwardString(const char* summaryKey, const char* spanKey, const char* value, LogLevel logLevel, unsigned groupMask) const
{
StringBuffer attrName;
ISpan* target = prepareToForward(summaryKey, spanKey, attrName, logLevel, groupMask);
if (target)
target->setSpanAttribute(attrName, value);
}

CTxSummarySpanForwarder::CTxSummarySpanForwarder()
: maxLogLevel(getTxSummaryLevel())
, groupSelector(getTxSummaryGroup())
{
}
Loading
Loading