From 459053816aac5035c6e787b5b6189960ad0d722f Mon Sep 17 00:00:00 2001 From: Rodrigo Pastrana Date: Fri, 19 Jul 2024 22:34:25 -0400 Subject: [PATCH] HPCC-32281 Grafana/Loki logaccess 2nd phase improvements - Ensures proper log output format - Utilizes context log fields available - Updates grafana logaccess sample config - Replaces unnecessary funtion w/ inline logic - Removes invalid wildcard filter restriction - Trims log data at server due to added json newlines - Reenables several stream columns Signed-off-by: Rodrigo Pastrana --- .../loki-stack/grafana-hpcc-logaccess.yaml | 16 +- .../Grafana/CurlClient/GrafanaCurlClient.cpp | 161 ++++++++++++------ .../Grafana/CurlClient/GrafanaCurlClient.hpp | 16 +- 3 files changed, 132 insertions(+), 61 deletions(-) diff --git a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml index 70d09058960..1b2375f09c8 100644 --- a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml +++ b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml @@ -21,17 +21,27 @@ global: - type: "components" storeName: "stream" searchColumn: "component" - columnMode: "MIN" + columnMode: "ALL" columnType: "string" - type: "timestamp" storeName: "values" - searchColumn: "time" - columnMode: "ALL" + searchColumn: "tsNs" + columnMode: "MIN" columnType: "datetime" - type: "pod" storeName: "stream" searchColumn: "pod" + columnMode: "DEFAULT" + columnType: "string" + - type: "message" + storeName: "values" + searchColumn: "log" + columnMode: "MIN" + columnType: "string" + - type: "node" + storeName: "stream" columnMode: "ALL" + searchColumn: "node_name" columnType: "string" secrets: esp: diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp index 8e58aa8c596..c9db448f9fe 100644 --- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp @@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r } /* - * This method consumes a logLine string from a successful Grafana Loki query - * The LogLine is wrapped in the desired output format + * This method consumes a logLine represented as a set of field names and values + * The LogLine is wrapped in the requested output format */ -void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine) +void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine) { switch (format) { case LOGACCESS_LOGFORMAT_xml: { - returnbuf.appendf("<%s>", resultLineName); - encodeXML(resultLine, returnbuf); - returnbuf.appendf("", resultLineName); + returnbuf.append(""); + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + const char * prop = resultLine->queryProp(fieldsIter->getPropKey()); + returnbuf.appendf("<%s>", fieldsIter->getPropKey()); + encodeXML(prop, returnbuf); + returnbuf.appendf("", fieldsIter->getPropKey()); + } + returnbuf.appendf(""); isFirstLine = false; break; } @@ -210,15 +217,42 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c if (!isFirstLine) returnbuf.append(", "); - returnbuf.append("\""); - encodeJSON(returnbuf,resultLine); - returnbuf.append("\""); + returnbuf.appendf("{\"fields\": [ "); + bool firstField = true; + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + if (!firstField) + returnbuf.append(", "); + else + firstField = false; + + const char * prop = resultLine->queryProp(fieldsIter->getPropKey()); + returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey()); + encodeJSON(returnbuf,prop); + returnbuf.append("\"}"); + } + returnbuf.append(" ]}"); + isFirstLine = false; break; } case LOGACCESS_LOGFORMAT_csv: { - encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column + bool firstField = true; + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + if (!firstField) + returnbuf.append(", "); + else + firstField = false; + + StringBuffer prop; + resultLine->getProp(fieldsIter->getPropKey(), prop); + encodeCSVColumn(returnbuf, prop); + } + returnbuf.newline(); isFirstLine = false; break; @@ -230,18 +264,49 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c /* * This method consumes an Iterator of values elements from a successful Grafana Loki query - * It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format + * It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children + * which represent timestamp in ns, and the log line, and formats into the requested format */ -void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine) +void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine) { + Owned fieldValues = createProperties(true); + + //extract the requested fields from the stream if it's available + if (stream) + { + switch (retcolmode) + { + case RETURNCOLS_MODE_all: + { + fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name)); + fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name)); + fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name)); + } + case RETURNCOLS_MODE_default: + { + fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name)); + } + case RETURNCOLS_MODE_min: + { + fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name)); + fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name)); + break; + } + case RETURNCOLS_MODE_custom: //not supported yet + default: + break; + } + } + ForEach(*valuesIter) { IPropertyTree & values = valuesIter->query(); int numofvalues = values.getCount("values"); if (values.getCount("values") == 2) { - //const char * insertTimeStamp = values.queryProp("values[1]"); - formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine); + fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]")); + fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]")); + formatResultLine(returnbuf, fieldValues, format, isFirstLine); } else { @@ -268,14 +333,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format break; } case LOGACCESS_LOGFORMAT_csv: - { - if (reportHeader) - { - returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line - returnbuf.newline(); - } - break; - } default: break; } @@ -305,31 +362,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format) } } -/* - * This method consumes JSON formatted elements from a successful Grafana Loki query - * It extracts all values elements processes them into the desired format - */ -void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine) -{ - Owned logLineIter; - - if (result->hasProp("values")) - { - logLineIter.setown(result->getElements("values")); - } - - processValues(returnbuf, logLineIter, format, isFirstLine); -} - /* * This method consumes the JSON response from a Grafana Loki query * It attempts to unwrap the response and extract the log payload, and reports it in the desired format */ -void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader) +void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader) { - resultDetails.totalReceived = 0; - resultDetails.totalAvailable = 0; - Owned tree = createPTreeFromJSONString(retrievedDocument.c_str()); if (!tree) throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME); @@ -370,11 +408,18 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re bool isFirstLine = true; Owned resultIter = data->getElements("result"); - //many result elements can be returned, each with a unique set of labels + //many result elements can be returned, each with a unique set of labels and a common set of stream values ForEach(*resultIter) { IPropertyTree & result = resultIter->query(); - wrapResult(returnbuf, &result, format, isFirstLine); + Owned logLineIter; + + if (result.hasProp("values")) + { + logLineIter.setown(result.getElements("values")); + } + + processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine); } //Adds the format postfix to the return buffer @@ -465,9 +510,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff } case LOGACCESS_FILTER_wildcard: { - if (queryValue.isEmpty()) - throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME); - DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str()); break; } @@ -605,14 +647,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, //from https://grafana.com/docs/loki/latest/query/log_queries/ //Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator. logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label - logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label + logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label //This drops the stream, and various insert timestamps //we're always going to get a stream container, and a the log line... //the stream container contains unnecessary, and redundant lines //there's documentation of a 'drop' command whch doesn't work in practice //online recomendation is to clear those stream entries... - logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\"); + logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\"); /* we're not going to attempt to parse the log line for now, return the entire log line in raw format @@ -664,8 +706,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, std::string readBuffer; submitQuery(readBuffer, fullQuery.str()); - processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true); - //DBGLOG("Query fetchLog result: %s", readBuffer.c_str()); + processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true); } catch(IException * e) { @@ -832,6 +873,24 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess if (logMap.hasProp(logMapSearchColAtt)) m_podColumn.name = logMap.queryProp(logMapSearchColAtt); } + else if (streq(logMapType, "message")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_messageColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_messageColumn.name = logMap.queryProp(logMapSearchColAtt); + } + else if (streq(logMapType, "timestamp")) + { + if (logMap.hasProp(logMapIndexPatternAtt)) + if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) + m_logDateTimstampColumn.isStream = true; + + if (logMap.hasProp(logMapSearchColAtt)) + m_logDateTimstampColumn.name = logMap.queryProp(logMapSearchColAtt); + } else { ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType); diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp index fb6f71cff98..5953161f5a9 100644 --- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp @@ -73,13 +73,14 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf LogField m_instanceColumn = LogField("instance", true); LogField m_podColumn = LogField("pod", true); LogField m_containerColumn = LogField("container", true); - LogField m_messageColumn = LogField("MSG"); + LogField m_messageColumn = LogField("log"); LogField m_nodeColumn = LogField("node_name", true); - LogField m_logTimestampColumn = LogField("TIME"); - LogField m_logDatestampColumn = LogField("DATE"); - LogField m_logSequesnceColumn = LogField("MID"); - LogField m_logProcIDColumn = LogField("PID"); - LogField m_logThreadIDColumn = LogField("TID"); + LogField m_logDateTimstampColumn = LogField("tsNs"); + //LogField m_logTimestampColumn = LogField("TIME"); + //LogField m_logDatestampColumn = LogField("DATE"); + //LogField m_logSequesnceColumn = LogField("MID"); + //LogField m_logProcIDColumn = LogField("PID"); + //LogField m_logThreadIDColumn = LogField("TID"); //LogField m_logTraceIDColumn = LogField("TRC"); //LogField m_logSpanIDColumn = LogField("SPN"); @@ -87,8 +88,9 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf public: GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig); - void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader); + void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader); void processDatasourceJsonResp(const std::string & retrievedDocument); + void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine); void fetchDatasourceByName(const char * targetDataSourceName); void fetchDatasources(std::string & readBuffer); void fetchLabels(std::string & readBuffer);