Skip to content

Commit

Permalink
HPCC-31381 Forward trace summary values to Open Telemetry spans
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Klemm <[email protected]>
  • Loading branch information
Tim Klemm committed Apr 23, 2024
1 parent 442adb4 commit 27a2f2a
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 8 deletions.
8 changes: 8 additions & 0 deletions esp/platform/esp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ typedef enum LogRequest_

#define TXSUMMARY_OUT_TEXT 0x00000001
#define TXSUMMARY_OUT_JSON 0x00000002
#define TXSUMMARY_FWD_OTEL 0x80000000

#define ESPCTX_NO_NAMESPACES 0x00000001
#define ESPCTX_WSDL 0x00000010
Expand All @@ -104,6 +105,7 @@ class CTxSummary;
interface IEspSecureContext;
interface IEspSecureContextEx;
class CumulativeTimer;
enum class TxUnits;

interface IEspContext : extends IInterface
{
Expand Down Expand Up @@ -197,9 +199,15 @@ interface IEspContext : extends IInterface

virtual CTxSummary* queryTxSummary()=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* otName, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryCumulativeTime(unsigned logLevel, const char* name, unsigned __int64 time, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual CumulativeTimer* queryTraceSummaryCumulativeTimer(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void cancelTxSummary()=0;
Expand Down
41 changes: 36 additions & 5 deletions esp/platform/espcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CEspContext : public CInterface, implements IEspContext
if (m_txSummary)
{
m_txSummary->tailor(this);
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle());
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle() | TXSUMMARY_FWD_OTEL);
}
}
virtual void addOptions(unsigned opts){options|=opts;}
Expand Down Expand Up @@ -512,28 +512,59 @@ class CEspContext : public CInterface, implements IEspContext
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const char* otName, const unsigned int group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, TxUnits::NA, logLevel, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, TxUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, otName, TxUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, nullptr, units, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* otName, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, units, logLevel, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryDoubleValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, TxUnits::NA, logLevel, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryTimeStamp(logLevel, name, nullptr, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const char* otName, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, m_txSummary->getElapsedTime(), logLevel, group, "ms");
m_txSummary->append(name, m_txSummary->getElapsedTime(), otName, TxUnits::millis, logLevel, group, "ms");
}

virtual void flushTraceSummary()
{
updateTraceSummaryHeader();
Expand Down
85 changes: 85 additions & 0 deletions esp/platform/txsummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
static auto pRequestCount = hpccMetrics::registerCounterMetric("esp.requests.received", "Number of requests received", SMeasureCount);
#endif

namespace
{
// Iteration of summary entries occurs without knowledge of the summary or its connector.
static thread_local CTxOpenTelemetryConnector* otelConnector = nullptr;
}

inline bool validate(const char* k)
{
// Empty or null keys are invalid
Expand Down Expand Up @@ -126,6 +132,11 @@ StringBuffer& CTxSummary::TxEntryTimer::serialize(StringBuffer& buf, const LogLe
buf.append(fullname);
buf.appendf("=%" I64F "u;", value->getTotalMillis());
}
else if (requestedStyle & TXSUMMARY_FWD_OTEL)
{
if (otelConnector)
otelConnector->forwardAttribute(name, nullptr, value->getTotalMillis(), TxUnits::millis, queryLogLevel(), queryGroup());
}

return buf;
}
Expand Down Expand Up @@ -332,6 +343,7 @@ StringBuffer& CTxSummary::TxEntryObject::serialize(StringBuffer& buf, const LogL

CTxSummary::CTxSummary(unsigned creationTime)
: m_creationTime(creationTime ? creationTime : msTick())
, connector(new CTxOpenTelemetryConnector)
{
#ifdef _SOLVED_DYNAMIC_METRIC_PROBLEM
pRequestCount->inc(1);
Expand Down Expand Up @@ -591,6 +603,15 @@ void CTxSummary::log(const LogLevel logLevel, const unsigned int requestedGroup,
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_OUT_JSON);
DBGLOG("%s", summary.str());
}

if (requestedStyle & TXSUMMARY_FWD_OTEL)
{
// String values have already been forwarded and should ignore this serialization
// request. Timer values have not been forwarded, and should forward their values.
otelConnector = connector.get();
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_FWD_OTEL);
otelConnector = nullptr;
}
}
}

Expand Down Expand Up @@ -663,3 +684,67 @@ CTxSummary::TxEntryBase* CTxSummary::queryEntry(const char* key)

return nullptr;
}

StringBuffer& CTxOpenTelemetryConnector::normalizeKey(const char* txKey, const char* otKey, StringBuffer& normalized) const
{
getSnakeCase(normalized, isEmptyString(otKey) ? txKey : otKey);
// Keys for custom attributes may be annotated to reflect the source here.
return normalized;
}

bool CTxOpenTelemetryConnector::isExcluded(const char* key, LogLevel logLevel, unsigned groupMask) const
{
if ((logLevel > maxLogLevel) || ((groupMask & groupSelector) != groupSelector) || isEmptyString(key))
return true;
// Redundant attributes may be excluded by key here. For example, if spans include values like
// the global ID, it may be preferred to not forward a trace summary-sourced copy.
return false;
}

#define ENSURE_TARGET_AND_NAME \
StringBuffer name; \
if (isExcluded(normalizeKey(txKey, otKey, name), logLevel, groupMask)) \
return; \
ISpan* target = queryThreadedActiveSpan(); \
if (!target) \
return

void CTxOpenTelemetryConnector::forwardUnsigned(const char* txKey, const char* otKey, uint64_t value, TxUnits units, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, scale(value, units));
}

void CTxOpenTelemetryConnector::forwardSigned(const char* txKey, const char* otKey, int64_t value, TxUnits units, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, uint64_t(scale(value, units)));
}

void CTxOpenTelemetryConnector::forwardDouble(const char* txKey, const char* otKey, double value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
StringBuffer tmp;
tmp.append(value);
target->setSpanAttribute(name, tmp);
}

void CTxOpenTelemetryConnector::forwardBool(const char* txKey, const char* otKey, bool value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, uint64_t(value));
}

void CTxOpenTelemetryConnector::forwardString(const char* txKey, const char* otKey, const char* value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, value);
}

#undef ENSURE_TARGET_AND_NAME

CTxOpenTelemetryConnector::CTxOpenTelemetryConnector()
: maxLogLevel(getTxSummaryLevel())
, groupSelector(getTxSummaryGroup())
{
}
Loading

0 comments on commit 27a2f2a

Please sign in to comment.