Skip to content

Commit

Permalink
HPCC-31381 Forward trace summary values to Open Telemetry spans
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Klemm <[email protected]>
  • Loading branch information
Tim Klemm committed Apr 22, 2024
1 parent 2ca3375 commit 794cabc
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 8 deletions.
9 changes: 9 additions & 0 deletions esp/platform/esp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ typedef enum LogRequest_

#define TXSUMMARY_OUT_TEXT 0x00000001
#define TXSUMMARY_OUT_JSON 0x00000002
#define TXSUMMARY_FWD_OTEL 0x80000000

#define ESPCTX_NO_NAMESPACES 0x00000001
#define ESPCTX_WSDL 0x00000010
Expand All @@ -104,6 +105,7 @@ class CTxSummary;
interface IEspSecureContext;
interface IEspSecureContextEx;
class CumulativeTimer;
enum class TxUnits;

interface IEspContext : extends IInterface
{
Expand Down Expand Up @@ -197,9 +199,15 @@ interface IEspContext : extends IInterface

virtual CTxSummary* queryTxSummary()=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, const char *value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryValue(unsigned logLevel, const char *name, __int64 value, const char* otName, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryDoubleValue(unsigned logLevel, const char *name, double value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryTimeStamp(unsigned logLevel, const char *name, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)=0;
virtual void addTraceSummaryCumulativeTime(unsigned logLevel, const char* name, unsigned __int64 time, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual CumulativeTimer* queryTraceSummaryCumulativeTimer(unsigned logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)=0;
virtual void cancelTxSummary()=0;
Expand Down Expand Up @@ -236,6 +244,7 @@ interface IEspContext : extends IInterface
virtual IHttpMessage* queryRequest() = 0;

virtual void setRequestSpan(ISpan * span)=0; // Call this function to set the server span for the query. The spans's lifetime will match the lifetime of the context object.
inline void setActiveSpan(ISpan * span) { setRequestSpan(span); }
virtual ISpan * queryActiveSpan() const = 0;
virtual IProperties * getClientSpanHeaders() const = 0;
virtual const char* getGlobalId() const = 0;
Expand Down
42 changes: 37 additions & 5 deletions esp/platform/espcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class CEspContext : public CInterface, implements IEspContext
, respSerializationFormat(ESPSerializationANY)
{
m_txSummary.setown(new CTxSummary(m_creationTime));
m_txSummary->setConnector(new CTxOpenTelemetryConnector(*this));
updateTraceSummaryHeader();
m_secureContext.setown(secureContext);
m_SecurityHandler.setSecureContext(secureContext);
Expand All @@ -123,7 +124,7 @@ class CEspContext : public CInterface, implements IEspContext
if (m_txSummary)
{
m_txSummary->tailor(this);
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle());
m_txSummary->log(getTxSummaryLevel(), getTxSummaryGroup(), getTxSummaryStyle() | TXSUMMARY_FWD_OTEL);
}
}
virtual void addOptions(unsigned opts){options|=opts;}
Expand Down Expand Up @@ -512,28 +513,59 @@ class CEspContext : public CInterface, implements IEspContext
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, const char *value, const char* otName, const unsigned int group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, TxUnits::NA, logLevel, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryValue(logLevel, name, value, nullptr, TxUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, otName, TxUnits::NA, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
addTraceSummaryValue(logLevel, name, value, nullptr, units, group);
}

virtual void addTraceSummaryValue(LogLevel logLevel, const char *name, __int64 value, const char* otName, TxUnits units, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, units, logLevel, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryDoubleValue(logLevel, name, value, nullptr, group);
}

virtual void addTraceSummaryDoubleValue(LogLevel logLevel, const char *name, double value, const char* otName, unsigned group = TXSUMMARY_GRP_CORE)
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, value, logLevel, group);
m_txSummary->append(name, value, otName, TxUnits::NA, logLevel, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const unsigned int group = TXSUMMARY_GRP_CORE)
{
addTraceSummaryTimeStamp(logLevel, name, nullptr, group);
}

virtual void addTraceSummaryTimeStamp(LogLevel logLevel, const char *name, const char* otName, unsigned group = TXSUMMARY_GRP_CORE) override
{
if (m_txSummary && !isEmptyString(name))
m_txSummary->append(name, m_txSummary->getElapsedTime(), logLevel, group, "ms");
m_txSummary->append(name, m_txSummary->getElapsedTime(), otName, TxUnits::millis, logLevel, group, "ms");
}

virtual void flushTraceSummary()
{
updateTraceSummaryHeader();
Expand Down
90 changes: 90 additions & 0 deletions esp/platform/txsummary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
static auto pRequestCount = hpccMetrics::registerCounterMetric("esp.requests.received", "Number of requests received", SMeasureCount);
#endif

namespace
{
// Iteration of summary entries occurs without knowledge of the ssummary or its connector.
static thread_local CTxOpenTelemetryConnector* otelConnector = nullptr;
}

inline bool validate(const char* k)
{
// Empty or null keys are invalid
Expand Down Expand Up @@ -126,6 +132,11 @@ StringBuffer& CTxSummary::TxEntryTimer::serialize(StringBuffer& buf, const LogLe
buf.append(fullname);
buf.appendf("=%" I64F "u;", value->getTotalMillis());
}
else if (requestedStyle & TXSUMMARY_FWD_OTEL)
{
if (otelConnector)
otelConnector->forwardAttribute(name, nullptr, value->getTotalMillis(), TxUnits::millis, queryLogLevel(), queryGroup());
}

return buf;
}
Expand Down Expand Up @@ -517,6 +528,11 @@ void CTxSummary::setProfile(ITxSummaryProfile* profile)
m_profile.set(profile);
}

void CTxSummary::setConnector(CTxOpenTelemetryConnector* _connector)
{
connector.setown(_connector);
}

void CTxSummary::serialize(StringBuffer& buffer, const LogLevel logLevel, const unsigned int group, const unsigned int requestedStyle) const
{
CriticalBlock block(m_sync);
Expand Down Expand Up @@ -591,6 +607,15 @@ void CTxSummary::log(const LogLevel logLevel, const unsigned int requestedGroup,
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_OUT_JSON);
DBGLOG("%s", summary.str());
}

if (requestedStyle & TXSUMMARY_FWD_OTEL)
{
// String values have already been forwarded and should ignore this serialization
// request. Timer values have not been forwarded, and should forward their values.
otelConnector = connector.get();
serialize(summary.clear(), logLevel, requestedGroup, TXSUMMARY_FWD_OTEL);
otelConnector = nullptr;
}
}
}

Expand Down Expand Up @@ -663,3 +688,68 @@ CTxSummary::TxEntryBase* CTxSummary::queryEntry(const char* key)

return nullptr;
}

StringBuffer& CTxOpenTelemetryConnector::normalizeKey(const char* txKey, const char* otKey, StringBuffer& normalized) const
{
getSnakeCase(normalized, isEmptyString(otKey) ? txKey : otKey);
// Keys for custom attributes may be annotated to reflect the source here.
return normalized;
}

bool CTxOpenTelemetryConnector::isExcluded(const char* key, LogLevel logLevel, unsigned groupMask) const
{
if ((logLevel > maxLogLevel) || ((groupMask & groupSelector) != groupSelector) || isEmptyString(key))
return true;
// Redundant attributes may be excluded by key here. For example, if spans include values like
// the global ID, it may be preferred to not forward a trace summary-sourced copy.
return false;
}

#define ENSURE_TARGET_AND_NAME \
StringBuffer name; \
if (isExcluded(normalizeKey(txKey, otKey, name), logLevel, groupMask)) \
return; \
ISpan* target = context.queryActiveSpan(); \
if (!target) \
return

void CTxOpenTelemetryConnector::forwardUnsigned(const char* txKey, const char* otKey, uint64_t value, TxUnits units, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, scale(value, units));
}

void CTxOpenTelemetryConnector::forwardSigned(const char* txKey, const char* otKey, int64_t value, TxUnits units, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, uint64_t(scale(value, units)));
}

void CTxOpenTelemetryConnector::forwardDouble(const char* txKey, const char* otKey, double value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
StringBuffer tmp;
tmp.append(value);
target->setSpanAttribute(name, tmp);
}

void CTxOpenTelemetryConnector::forwardBool(const char* txKey, const char* otKey, bool value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, uint64_t(value));
}

void CTxOpenTelemetryConnector::forwardString(const char* txKey, const char* otKey, const char* value, LogLevel logLevel, unsigned groupMask) const
{
ENSURE_TARGET_AND_NAME;
target->setSpanAttribute(name, value);
}

#undef ENSURE_TARGET_AND_NAME

CTxOpenTelemetryConnector::CTxOpenTelemetryConnector(IEspContext& _context)
: context(_context)
{
maxLogLevel = getTxSummaryLevel();
groupSelector = getTxSummaryGroup();
}
Loading

0 comments on commit 794cabc

Please sign in to comment.