Skip to content

Commit

Permalink
HPCC-30349 Add open telemetry support to esp
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Sep 28, 2023
1 parent 12afc8f commit d1c48cd
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 121 deletions.
9 changes: 9 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14644,6 +14644,9 @@ static_assert(_elements_in(traceDebugOptions) == _elements_in(traceHeaderNames),

IProperties * extractTraceDebugOptions(IConstWorkUnit * source)
{
if (!source)
return nullptr;

Owned<IProperties> target = createProperties(true);
SCMStringBuffer temp;
for (unsigned i=0; i < _elements_in(traceDebugOptions); i++)
Expand All @@ -14662,6 +14665,9 @@ IProperties * extractTraceDebugOptions(IConstWorkUnit * source)

IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions)
{
if (!debugOptions)
return nullptr;

Owned<IProperties> target = createProperties(true);
if (debugOptions)
{
Expand All @@ -14681,6 +14687,9 @@ IProperties * deserializeTraceDebugOptions(const IPropertyTree * debugOptions)

void recordTraceDebugOptions(IWorkUnit * target, const IProperties * source)
{
if (!source)
return;

for (unsigned i=0; i < _elements_in(traceDebugOptions); i++)
{
const char * headerName = traceHeaderNames[i];
Expand Down
2 changes: 1 addition & 1 deletion esp/bindings/http/platform/httpservice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ int CEspHttpServer::processRequest()
}
ctx->addTraceSummaryValue(LogMin, "custom_fields.URL", url.str(), TXSUMMARY_GRP_ENTERPRISE);

m_response->setHeader(HTTP_HEADER_HPCC_GLOBAL_ID, ctx->getGlobalId());
m_response->setHeader(kGlobalIdHttpHeaderName, ctx->getGlobalId());

if(strieq(method.str(), OPTIONS_METHOD))
return onOptions();
Expand Down
16 changes: 5 additions & 11 deletions esp/bindings/http/platform/httptransport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1940,17 +1940,11 @@ void CHttpRequest::updateContext()
m_context->setUseragent(useragent.str());
getHeader("Accept-Language", acceptLanguage);
m_context->setAcceptLanguage(acceptLanguage.str());
StringBuffer callerId, globalId;
getHeader(HTTP_HEADER_HPCC_GLOBAL_ID, globalId);
if (globalId.isEmpty())
getHeader("hpcc-global-id", globalId);
if(globalId.length())
m_context->setGlobalId(globalId);
getHeader(HTTP_HEADER_HPCC_CALLER_ID, callerId);
if (callerId.isEmpty())
getHeader("hpcc-caller-id", callerId);
if(callerId.length())
m_context->setCallerId(callerId);

//MORE: The previous code would be better off querying httpHeaders...
Owned<IProperties> httpHeaders = getHeadersAsProperties(m_headers);
Owned<ISpan> requestSpan = queryTraceManager().createServerSpan("request", httpHeaders, SpanFlags::EnsureGlobalId);
m_context->setActiveSpan(requestSpan);
}
}

Expand Down
2 changes: 0 additions & 2 deletions esp/bindings/http/platform/httptransport.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ enum MessageLogFlag
#define HTTP_HEADER_CONTENT_ENCODING "Content-Encoding"
#define HTTP_HEADER_TRANSFER_ENCODING "Transfer-Encoding"
#define HTTP_HEADER_ACCEPT_ENCODING "Accept-Encoding"
#define HTTP_HEADER_HPCC_GLOBAL_ID "Global-Id"
#define HTTP_HEADER_HPCC_CALLER_ID "Caller-Id"

class esp_http_decl CHttpMessage : implements IHttpMessage, public CInterface
{
Expand Down
11 changes: 6 additions & 5 deletions esp/platform/esp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,12 @@ interface IEspContext : extends IInterface
virtual void setRequest(IHttpMessage* req) = 0;
virtual IHttpMessage* queryRequest() = 0;

virtual void setGlobalId(const char* id)=0;
virtual const char* getGlobalId()=0;
virtual void setCallerId(const char* id)=0;
virtual const char* getCallerId()=0;
virtual const char* getLocalId()=0;
virtual void setActiveSpan(ISpan * span)=0;
virtual ISpan * queryActiveSpan() const = 0;
virtual IProperties * getClientHeaders() const = 0;
virtual const char* getGlobalId() const = 0;
virtual const char* getCallerId() const = 0;
virtual const char* getLocalId() const = 0;
};


Expand Down
42 changes: 24 additions & 18 deletions esp/platform/espcontext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ class CEspContext : public CInterface, implements IEspContext
Owned<IEspSecureContextEx> m_secureContext;

StringAttr m_transactionID;
StringBuffer m_globalId;
StringBuffer m_localId;
StringBuffer m_callerId;
Owned<ISpan> m_activeSpan;
IHttpMessage* m_request;

public:
Expand All @@ -116,9 +114,6 @@ class CEspContext : public CInterface, implements IEspContext
updateTraceSummaryHeader();
m_secureContext.setown(secureContext);
m_SecurityHandler.setSecureContext(secureContext);
appendGloballyUniqueId(m_localId);
// use localId as globalId unless we receive another
m_globalId.set(m_localId);
}

~CEspContext()
Expand Down Expand Up @@ -630,27 +625,38 @@ class CEspContext : public CInterface, implements IEspContext
{
return m_request;
}

virtual void setGlobalId(const char* id)
virtual void setActiveSpan(ISpan * span) override
{
m_activeSpan.set(span);
}
virtual ISpan * queryActiveSpan() const override
{
m_globalId.set(id);
return m_activeSpan;
}
virtual const char* getGlobalId()
//GH Can these be deleted?
virtual const char* getGlobalId() const override
{
return m_globalId.str();
if (!m_activeSpan)
return nullptr;
return m_activeSpan->queryGlobalId();
}
virtual void setCallerId(const char* id)
virtual const char* getCallerId() const override
{
m_callerId.set(id);
if (!m_activeSpan)
return nullptr;
return m_activeSpan->queryCallerId();
}
virtual const char* getCallerId()
virtual const char* getLocalId() const override
{
return m_callerId.str();
if (!m_activeSpan)
return nullptr;
return m_activeSpan->queryLocalId();
}
// No setLocalId() - it should be set once only when constructed
virtual const char* getLocalId()
virtual IProperties * getClientHeaders() const override
{
return m_localId.str();
if (!m_activeSpan)
return nullptr;
return ::getClientHeaders(m_activeSpan);
}
};

Expand Down
9 changes: 6 additions & 3 deletions esp/services/esdl_svc_engine/esdl_binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1571,9 +1571,12 @@ void EsdlServiceImpl::sendTargetSOAP(IEspContext & context,
httpclient->setPassword(password.str());
}

Owned<IProperties> headers = createProperties();
headers->setProp(HTTP_HEADER_HPCC_GLOBAL_ID, context.getGlobalId());
headers->setProp(HTTP_HEADER_HPCC_CALLER_ID, context.getLocalId());
Owned<ISpan> clientSpan;
ISpan * activeSpan = context.queryActiveSpan();
if (activeSpan)
clientSpan.setown(activeSpan->createClientSpan("soapcall"));

Owned<IProperties> headers = ::getClientHeaders(clientSpan);
StringBuffer status;
StringBuffer clreq(req);

Expand Down
41 changes: 12 additions & 29 deletions esp/services/ws_ecl/ws_ecl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1993,20 +1993,18 @@ int CWsEclBinding::submitWsEclWorkunit(IEspContext & context, WsEclWuInfo &wsinf
StringAttr wuid(workunit->queryWuid()); // NB queryWuid() not valid after workunit,clear()

bool noTimeout = false;
if (httpreq)

Owned<ISpan> clientSpan;
ISpan * activeSpan = context.queryActiveSpan();
if (activeSpan)
{
StringBuffer globalId, callerId;
wsecl->getHttpGlobalIdHeader(httpreq, globalId);
wsecl->getHttpCallerIdHeader(httpreq, callerId);
if (globalId.length())
{
workunit->setDebugValue("GlobalId", globalId.str(), true);
clientSpan.setown(activeSpan->createClientSpan("wsecl"));
Owned<IProperties> httpHeaders = ::getClientHeaders(clientSpan);
recordTraceDebugOptions(workunit, httpHeaders);
}

StringBuffer localId;
appendGloballyUniqueId(localId);
workunit->setDebugValue("CallerId", localId.str(), true); //our localId becomes caller id for the next hop
DBGLOG("GlobalId: %s, CallerId: %s, LocalId: %s, Wuid: %s", globalId.str(), callerId.str(), localId.str(), wuid.str());
}
if (httpreq)
{
IProperties *params = httpreq->queryParameters();
if (params)
noTimeout = params->getPropBool(".noTimeout", false);
Expand Down Expand Up @@ -2087,27 +2085,12 @@ void CWsEclBinding::sendRoxieRequest(const char *target, StringBuffer &req, Stri
if (!trim)
url.append("?.trim=0");

Owned<IProperties> headers;
IEspContext * ctx = httpreq->queryContext();
Owned<IProperties> headers = ctx->getClientHeaders();
Owned<IHttpClient> httpclient = httpctx->createHttpClient(NULL, url);
bool noTimeout = false;
if (httpreq)
{
StringBuffer globalId, callerId;
wsecl->getHttpGlobalIdHeader(httpreq, globalId);
wsecl->getHttpCallerIdHeader(httpreq, callerId);

if (globalId.length())
{
headers.setown(createProperties());
headers->setProp(kGlobalIdHttpHeaderName, globalId);

StringBuffer localId;
appendGloballyUniqueId(localId);
if (localId.length())
headers->setProp(kCallerIdHttpHeaderName, localId);
DBGLOG("GlobalId: %s, CallerId: %s, LocalId: %s", globalId.str(), callerId.str(), localId.str());
}

IProperties *params = httpreq->queryParameters();
if (params)
noTimeout = params->getPropBool(".noTimeout", false);
Expand Down
13 changes: 0 additions & 13 deletions esp/services/ws_ecl/ws_ecl_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,19 +120,6 @@ class CWsEclService : public CInterface,
return false;
}

StringBuffer &getHttpGlobalIdHeader(CHttpRequest *request, StringBuffer &value)
{
if (!getHttpIdHeader(request, kGlobalIdHttpHeaderName, value))
getHttpIdHeader(request, kLegacyGlobalIdHttpHeaderName, value);
return value;
}
StringBuffer &getHttpCallerIdHeader(CHttpRequest *request, StringBuffer &value)
{
if (!getHttpIdHeader(request, kCallerIdHttpHeaderName, value))
getHttpIdHeader(request, kLegacyCallerIdHttpHeaderName, value);
return value;
}

bool unsubscribeServiceFromDali() override {return true;}
bool subscribeServiceToDali() override {return false;}
bool detachServiceFromDali() override
Expand Down
34 changes: 34 additions & 0 deletions system/jlib/jprop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,40 @@ IProperties *cloneProperties(const IProperties * source, bool nocase)
return clone.getClear();
}

//This works on arrays of string of the form x=y and x: y
void extractHeaders(IProperties * target, const StringArray & httpHeaders, char separator)
{
StringBuffer key;
ForEachItemIn(currentHeaderIndex, httpHeaders)
{
const char* httpHeader = httpHeaders.item(currentHeaderIndex);
if(isEmptyString(httpHeader))
continue;

const char* delineator = strchr(httpHeader, separator);
if ((delineator == nullptr) || (delineator == httpHeader))
continue;

const char * value = delineator + 1;
while (isspace(*value))
value++;

if (*value)
{
key.clear().append(delineator - httpHeader, httpHeader);
target->setProp(key, value);
}
}
}

IProperties * getHeadersAsProperties(const StringArray & httpHeaders, char separator)
{
Owned<IProperties> properties = createProperties(true);
extractHeaders(properties, httpHeaders, separator);
return properties.getClear();
}


static CProperties *sysProps = NULL;

extern jlib_decl IProperties *querySystemProperties()
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jprop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ extern jlib_decl IProperties *createProperties(const char *filename, bool nocase
extern jlib_decl IProperties *cloneProperties(const IProperties * properties, bool nocase = false);
extern jlib_decl IProperties *querySystemProperties();
extern jlib_decl IProperties *getSystemProperties();
extern jlib_decl void extractHeaders(IProperties * target, const StringArray & httpHeaders, char separator = ':');
extern jlib_decl IProperties * getHeadersAsProperties(const StringArray & httpHeaders, char separator = ':');

#endif

50 changes: 11 additions & 39 deletions system/jlib/jtrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,19 +235,22 @@ class CSpan : public CInterfaceOf<ISpan>
if (ctxProps == nullptr)
return false;

if (!isEmptyString(hpccGlobalId.get()))
ctxProps->setProp(kGlobalIdHttpHeaderName, hpccGlobalId.get());
const char * globalId = queryGlobalId();
if (!isEmptyString(globalId))
ctxProps->setProp(kGlobalIdHttpHeaderName, globalId);

if (otelFormatted)
{
//The localid is passed as the callerid for the client request....
if (!isEmptyString(hpccLocalId.get()))
ctxProps->setProp(kCallerIdHttpHeaderName, hpccLocalId.get());
const char * localId = queryLocalId();
if (!isEmptyString(localId))
ctxProps->setProp(kCallerIdHttpHeaderName, localId);
}
else
{
if (!isEmptyString(hpccCallerId.get()))
ctxProps->setProp(kCallerIdHttpHeaderName, hpccCallerId.get());
const char * callerId = queryCallerId();
if (!isEmptyString(callerId))
ctxProps->setProp(kCallerIdHttpHeaderName, callerId);
}

if (span == nullptr)
Expand Down Expand Up @@ -547,28 +550,6 @@ class CServerSpan : public CSpan
//Remote parent is declared via http headers from client call
opentelemetry::v1::trace::SpanContext remoteParentSpanCtx = opentelemetry::trace::SpanContext::GetInvalid();

void setSpanContext(StringArray & httpHeaders, const char kvDelineator, SpanFlags flags)
{
Owned<IProperties> contextProps = createProperties(true);
ForEachItemIn(currentHeaderIndex, httpHeaders)
{
const char* httpHeader = httpHeaders.item(currentHeaderIndex);
if(!httpHeader)
continue;

const char* delineator = strchr(httpHeader, kvDelineator);
if(delineator == nullptr)
continue;

StringBuffer key;
key.append(delineator - httpHeader, httpHeader);

contextProps->setProp(key, delineator + 1);
}

setSpanContext(contextProps, flags);
}

void setSpanContext(const IProperties * httpHeaders, SpanFlags flags)
{
if (httpHeaders)
Expand Down Expand Up @@ -608,8 +589,6 @@ class CServerSpan : public CSpan
opts.parent = remoteParentSpanCtx;
}
}

//Generate new HPCCGlobalID if not provided
}

bool getSpanContext(IProperties * ctxProps, bool otelFormatted) const override
Expand All @@ -629,14 +608,6 @@ class CServerSpan : public CSpan
}

public:
CServerSpan(const char * spanName, const char * tracerName_, StringArray & httpHeaders, SpanFlags flags)
: CSpan(spanName, tracerName_)
{
opts.kind = opentelemetry::trace::SpanKind::kServer;
setSpanContext(httpHeaders, ':', flags);
init();
}

CServerSpan(const char * spanName, const char * tracerName_, const IProperties * httpHeaders, SpanFlags flags)
: CSpan(spanName, tracerName_)
{
Expand Down Expand Up @@ -898,7 +869,8 @@ class CTraceManager : implements ITraceManager, public CInterface

ISpan * createServerSpan(const char * name, StringArray & httpHeaders, SpanFlags flags) override
{
return new CServerSpan(name, moduleName.get(), httpHeaders, flags);
Owned<IProperties> headerProperties = getHeadersAsProperties(httpHeaders);
return new CServerSpan(name, moduleName.get(), headerProperties, flags);
}

ISpan * createServerSpan(const char * name, const IProperties * httpHeaders, SpanFlags flags) override
Expand Down

0 comments on commit d1c48cd

Please sign in to comment.