-
Notifications
You must be signed in to change notification settings - Fork 304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HPCC-32281 Grafana/Loki logaccess 2nd phase improvements #18900
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r | |
} | ||
|
||
/* | ||
* This method consumes a logLine string from a successful Grafana Loki query | ||
* The LogLine is wrapped in the desired output format | ||
* This method consumes a logLine represented as a set of field names and values | ||
* The LogLine is wrapped in the requested output format | ||
*/ | ||
void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine) | ||
void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine) | ||
{ | ||
switch (format) | ||
{ | ||
case LOGACCESS_LOGFORMAT_xml: | ||
{ | ||
returnbuf.appendf("<%s>", resultLineName); | ||
encodeXML(resultLine, returnbuf); | ||
returnbuf.appendf("</%s>", resultLineName); | ||
returnbuf.append("<line>"); | ||
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator(); | ||
ForEach(*fieldsIter) | ||
{ | ||
const char * prop = fieldsIter->queryPropValue(); | ||
returnbuf.appendf("<%s>", fieldsIter->getPropKey()); | ||
encodeXML(prop, returnbuf); | ||
returnbuf.appendf("</%s>", fieldsIter->getPropKey()); | ||
} | ||
returnbuf.appendf("</line>"); | ||
isFirstLine = false; | ||
break; | ||
} | ||
|
@@ -210,15 +217,41 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c | |
if (!isFirstLine) | ||
returnbuf.append(", "); | ||
|
||
returnbuf.append("\""); | ||
encodeJSON(returnbuf,resultLine); | ||
returnbuf.append("\""); | ||
returnbuf.appendf("{\"fields\": [ "); | ||
bool firstField = true; | ||
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator(); | ||
ForEach(*fieldsIter) | ||
{ | ||
if (!firstField) | ||
returnbuf.append(", "); | ||
else | ||
firstField = false; | ||
|
||
const char * prop = fieldsIter->queryPropValue(); | ||
returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey()); | ||
encodeJSON(returnbuf,prop); | ||
returnbuf.append("\"}"); | ||
} | ||
returnbuf.append(" ]}"); | ||
|
||
isFirstLine = false; | ||
break; | ||
} | ||
case LOGACCESS_LOGFORMAT_csv: | ||
{ | ||
encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column | ||
bool firstField = true; | ||
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator(); | ||
ForEach(*fieldsIter) | ||
{ | ||
if (!firstField) | ||
returnbuf.append(", "); | ||
else | ||
firstField = false; | ||
|
||
const char * fieldValue = resultLine->queryProp(fieldsIter->getPropKey()); | ||
encodeCSVColumn(returnbuf, fieldValue); | ||
} | ||
|
||
returnbuf.newline(); | ||
isFirstLine = false; | ||
break; | ||
|
@@ -230,18 +263,54 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c | |
|
||
/* | ||
* This method consumes an Iterator of values elements from a successful Grafana Loki query | ||
* It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format | ||
* It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children | ||
* which represent timestamp in ns, and the log line, and formats into the requested format | ||
*/ | ||
void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine) | ||
void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine) | ||
{ | ||
if (!valuesIter) | ||
return; | ||
|
||
Owned<IProperties> fieldValues = createProperties(true); | ||
|
||
//extract the requested fields from the stream if it's available | ||
if (stream) | ||
{ | ||
switch (retcolmode) | ||
{ | ||
case RETURNCOLS_MODE_all: | ||
{ | ||
fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name)); | ||
fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name)); | ||
fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name)); | ||
[[fallthrough]]; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs a comment //fallthrough whenever a case falls through to the next case. I initially read the code incorrectly because it wasn't there, and coverity will also complain. |
||
case RETURNCOLS_MODE_default: | ||
{ | ||
fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name)); | ||
[[fallthrough]]; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. //fallthrough |
||
case RETURNCOLS_MODE_min: | ||
{ | ||
fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name)); | ||
fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name)); | ||
break; | ||
} | ||
case RETURNCOLS_MODE_custom: //not supported yet | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to fail and let user know |
||
default: | ||
break; | ||
} | ||
} | ||
|
||
ForEach(*valuesIter) | ||
{ | ||
IPropertyTree & values = valuesIter->query(); | ||
int numofvalues = values.getCount("values"); | ||
if (values.getCount("values") == 2) | ||
{ | ||
//const char * insertTimeStamp = values.queryProp("values[1]"); | ||
formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine); | ||
fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]")); | ||
fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]")); | ||
formatResultLine(returnbuf, fieldValues, format, isFirstLine); | ||
} | ||
else | ||
{ | ||
|
@@ -268,14 +337,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format | |
break; | ||
} | ||
case LOGACCESS_LOGFORMAT_csv: | ||
{ | ||
if (reportHeader) | ||
{ | ||
returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line | ||
returnbuf.newline(); | ||
} | ||
break; | ||
} | ||
default: | ||
break; | ||
} | ||
|
@@ -305,31 +366,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format) | |
} | ||
} | ||
|
||
/* | ||
* This method consumes JSON formatted elements from a successful Grafana Loki query | ||
* It extracts all values elements processes them into the desired format | ||
*/ | ||
void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine) | ||
{ | ||
Owned<IPropertyTreeIterator> logLineIter; | ||
|
||
if (result->hasProp("values")) | ||
{ | ||
logLineIter.setown(result->getElements("values")); | ||
} | ||
|
||
processValues(returnbuf, logLineIter, format, isFirstLine); | ||
} | ||
|
||
/* | ||
* This method consumes the JSON response from a Grafana Loki query | ||
* It attempts to unwrap the response and extract the log payload, and reports it in the desired format | ||
*/ | ||
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader) | ||
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader) | ||
{ | ||
resultDetails.totalReceived = 0; | ||
resultDetails.totalAvailable = 0; | ||
|
||
Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.c_str()); | ||
if (!tree) | ||
throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME); | ||
|
@@ -370,11 +412,17 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re | |
|
||
bool isFirstLine = true; | ||
Owned<IPropertyTreeIterator> resultIter = data->getElements("result"); | ||
//many result elements can be returned, each with a unique set of labels | ||
//many result elements can be returned, each with a unique set of labels and a common set of stream values | ||
ForEach(*resultIter) | ||
{ | ||
IPropertyTree & result = resultIter->query(); | ||
wrapResult(returnbuf, &result, format, isFirstLine); | ||
Owned<IPropertyTreeIterator> logLineIter; | ||
|
||
if (result.hasProp("values")) | ||
{ | ||
logLineIter.setown(result.getElements("values")); // if no values elements found, will get NullPTreeIterator | ||
processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine); | ||
} | ||
} | ||
|
||
//Adds the format postfix to the return buffer | ||
|
@@ -465,9 +513,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff | |
} | ||
case LOGACCESS_FILTER_wildcard: | ||
{ | ||
if (queryValue.isEmpty()) | ||
throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME); | ||
|
||
DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str()); | ||
break; | ||
} | ||
|
@@ -605,14 +650,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, | |
//from https://grafana.com/docs/loki/latest/query/log_queries/ | ||
//Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator. | ||
logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label | ||
logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label | ||
logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the line_format output doesn't clean up all json new lines |
||
//This drops the stream, and various insert timestamps | ||
|
||
//we're always going to get a stream container, and a the log line... | ||
//the stream container contains unnecessary, and redundant lines | ||
//there's documentation of a 'drop' command whch doesn't work in practice | ||
//online recomendation is to clear those stream entries... | ||
logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\"); | ||
logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. some of the "stream" values are useful, don't clear them |
||
|
||
/* we're not going to attempt to parse the log line for now, | ||
return the entire log line in raw format | ||
|
@@ -664,8 +709,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, | |
std::string readBuffer; | ||
submitQuery(readBuffer, fullQuery.str()); | ||
|
||
processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true); | ||
//DBGLOG("Query fetchLog result: %s", readBuffer.c_str()); | ||
processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true); | ||
} | ||
catch(IException * e) | ||
{ | ||
|
@@ -676,6 +720,19 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, | |
return false; | ||
} | ||
|
||
void processLogMapConfig(const IPropertyTree * logMapConfig, LogField * targetField) | ||
{ | ||
if (!logMapConfig || !targetField) | ||
return; | ||
|
||
if (logMapConfig->hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMapConfig->queryProp(logMapIndexPatternAtt), "stream")==0) | ||
targetField->isStream = true; | ||
|
||
if (logMapConfig->hasProp(logMapSearchColAtt)) | ||
targetField->name = logMapConfig->queryProp(logMapSearchColAtt); | ||
} | ||
|
||
GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig) | ||
{ | ||
m_pluginCfg.set(&logAccessPluginConfig); | ||
|
@@ -769,73 +826,29 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess | |
IPropertyTree & logMap = logMapIter->query(); | ||
const char * logMapType = logMap.queryProp("@type"); | ||
if (streq(logMapType, "global")) | ||
{ | ||
if (logMap.hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) | ||
m_globalSearchCol.isStream = true; | ||
|
||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_globalSearchCol.name = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_globalSearchCol); | ||
else if (streq(logMapType, "workunits")) | ||
{ | ||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_workunitsColumn = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_workunitsColumn); | ||
else if (streq(logMapType, "components")) | ||
{ | ||
if (logMap.hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) | ||
m_componentsColumn.isStream = true; | ||
|
||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_componentsColumn.name = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_componentsColumn); | ||
else if (streq(logMapType, "class")) | ||
{ | ||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_classColumn = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_classColumn); | ||
else if (streq(logMapType, "audience")) | ||
{ | ||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_audienceColumn = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_audienceColumn); | ||
else if (streq(logMapType, "instance")) | ||
{ | ||
if (logMap.hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) | ||
m_instanceColumn.isStream = true; | ||
|
||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_instanceColumn.name = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_instanceColumn); | ||
else if (streq(logMapType, "node")) | ||
{ | ||
if (logMap.hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) | ||
m_nodeColumn.isStream = true; | ||
|
||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_nodeColumn.name = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_nodeColumn); | ||
else if (streq(logMapType, "host")) | ||
{ | ||
OWARNLOG("%s: 'host' LogMap entry is NOT supported!", COMPONENT_NAME); | ||
} | ||
else if (streq(logMapType, "pod")) | ||
{ | ||
if (logMap.hasProp(logMapIndexPatternAtt)) | ||
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) | ||
m_podColumn.isStream = true; | ||
|
||
if (logMap.hasProp(logMapSearchColAtt)) | ||
m_podColumn.name = logMap.queryProp(logMapSearchColAtt); | ||
} | ||
processLogMapConfig(&logMap, &m_podColumn); | ||
else if (streq(logMapType, "message")) | ||
processLogMapConfig(&logMap, &m_messageColumn); | ||
else if (streq(logMapType, "timestamp")) | ||
processLogMapConfig(&logMap, &m_logDateTimstampColumn); | ||
else | ||
{ | ||
ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType); | ||
} | ||
} | ||
|
||
DBGLOG("%s: targeting: '%s' - datasource: '%s'", COMPONENT_NAME, m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
C17 feature, not sure if there's any concern using c17 specific constructs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, I hadn't picked up on that feature. We require c++17, so no problem using the feature - it has the advantage it will be checked by the compiler.