diff --git a/esp/scm/ws_logaccess.ecm b/esp/scm/ws_logaccess.ecm index bd214a668fa..5232f983275 100644 --- a/esp/scm/ws_logaccess.ecm +++ b/esp/scm/ws_logaccess.ecm @@ -79,7 +79,8 @@ ESPenum LogColumnValueType : string str("string"), numeric("numeric"), datetime("datetime"), - enum("enum") + enum("enum"), + epoch("epoch") }; ESPStruct [nil_remove] LogColumn diff --git a/esp/services/ws_logaccess/WsLogAccessService.cpp b/esp/services/ws_logaccess/WsLogAccessService.cpp index 0e8b12c329d..98e4a98ad90 100644 --- a/esp/services/ws_logaccess/WsLogAccessService.cpp +++ b/esp/services/ws_logaccess/WsLogAccessService.cpp @@ -71,6 +71,10 @@ bool Cws_logaccessEx::onGetLogAccessInfo(IEspContext &context, IEspGetLogAccessI WARNLOG("Invalid col type found in logaccess logmap config"); } } + else + { + espLogColumn->setColumnType("string"); + } logColumns.append(*espLogColumn.getClear()); } else diff --git a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml index 70d09058960..1b2375f09c8 100644 --- a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml +++ b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml @@ -21,17 +21,27 @@ global: - type: "components" storeName: "stream" searchColumn: "component" - columnMode: "MIN" + columnMode: "ALL" columnType: "string" - type: "timestamp" storeName: "values" - searchColumn: "time" - columnMode: "ALL" + searchColumn: "tsNs" + columnMode: "MIN" columnType: "datetime" - type: "pod" storeName: "stream" searchColumn: "pod" + columnMode: "DEFAULT" + columnType: "string" + - type: "message" + storeName: "values" + searchColumn: "log" + columnMode: "MIN" + columnType: "string" + - type: "node" + storeName: "stream" columnMode: "ALL" + searchColumn: "node_name" columnType: "string" secrets: esp: diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp index 8e58aa8c596..505e2a82778 100644 --- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp @@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r } /* - * This method consumes a logLine string from a successful Grafana Loki query - * The LogLine is wrapped in the desired output format + * This method consumes a logLine represented as a set of field names and values + * The LogLine is wrapped in the requested output format */ -void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine) +void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine) { switch (format) { case LOGACCESS_LOGFORMAT_xml: { - returnbuf.appendf("<%s>", resultLineName); - encodeXML(resultLine, returnbuf); - returnbuf.appendf("", resultLineName); + returnbuf.append(""); + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + const char * prop = fieldsIter->queryPropValue(); + returnbuf.appendf("<%s>", fieldsIter->getPropKey()); + encodeXML(prop, returnbuf); + returnbuf.appendf("", fieldsIter->getPropKey()); + } + returnbuf.appendf(""); isFirstLine = false; break; } @@ -210,15 +217,41 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c if (!isFirstLine) returnbuf.append(", "); - returnbuf.append("\""); - encodeJSON(returnbuf,resultLine); - returnbuf.append("\""); + returnbuf.appendf("{\"fields\": [ "); + bool firstField = true; + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + if (!firstField) + returnbuf.append(", "); + else + firstField = false; + + const char * prop = fieldsIter->queryPropValue(); + returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey()); + encodeJSON(returnbuf,prop); + returnbuf.append("\"}"); + } + returnbuf.append(" ]}"); + isFirstLine = false; break; } case LOGACCESS_LOGFORMAT_csv: { - encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column + bool firstField = true; + Owned fieldsIter = resultLine->getIterator(); + ForEach(*fieldsIter) + { + if (!firstField) + returnbuf.append(", "); + else + firstField = false; + + const char * fieldValue = resultLine->queryProp(fieldsIter->getPropKey()); + encodeCSVColumn(returnbuf, fieldValue); + } + returnbuf.newline(); isFirstLine = false; break; @@ -230,18 +263,54 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c /* * This method consumes an Iterator of values elements from a successful Grafana Loki query - * It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format + * It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children + * which represent timestamp in ns, and the log line, and formats into the requested format */ -void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine) +void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine) { + if (!valuesIter) + return; + + Owned fieldValues = createProperties(true); + + //extract the requested fields from the stream if it's available + if (stream) + { + switch (retcolmode) + { + case RETURNCOLS_MODE_all: + { + fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name)); + fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name)); + fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name)); + [[fallthrough]]; + } + case RETURNCOLS_MODE_default: + { + fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name)); + [[fallthrough]]; + } + case RETURNCOLS_MODE_min: + { + fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name)); + fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name)); + break; + } + case RETURNCOLS_MODE_custom: //not supported yet + default: + break; + } + } + ForEach(*valuesIter) { IPropertyTree & values = valuesIter->query(); int numofvalues = values.getCount("values"); if (values.getCount("values") == 2) { - //const char * insertTimeStamp = values.queryProp("values[1]"); - formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine); + fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]")); + fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]")); + formatResultLine(returnbuf, fieldValues, format, isFirstLine); } else { @@ -268,14 +337,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format break; } case LOGACCESS_LOGFORMAT_csv: - { - if (reportHeader) - { - returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line - returnbuf.newline(); - } - break; - } default: break; } @@ -305,31 +366,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format) } } -/* - * This method consumes JSON formatted elements from a successful Grafana Loki query - * It extracts all values elements processes them into the desired format - */ -void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine) -{ - Owned logLineIter; - - if (result->hasProp("values")) - { - logLineIter.setown(result->getElements("values")); - } - - processValues(returnbuf, logLineIter, format, isFirstLine); -} - /* * This method consumes the JSON response from a Grafana Loki query * It attempts to unwrap the response and extract the log payload, and reports it in the desired format */ -void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader) +void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader) { - resultDetails.totalReceived = 0; - resultDetails.totalAvailable = 0; - Owned tree = createPTreeFromJSONString(retrievedDocument.c_str()); if (!tree) throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME); @@ -370,11 +412,17 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re bool isFirstLine = true; Owned resultIter = data->getElements("result"); - //many result elements can be returned, each with a unique set of labels + //many result elements can be returned, each with a unique set of labels and a common set of stream values ForEach(*resultIter) { IPropertyTree & result = resultIter->query(); - wrapResult(returnbuf, &result, format, isFirstLine); + Owned logLineIter; + + if (result.hasProp("values")) + { + logLineIter.setown(result.getElements("values")); // if no values elements found, will get NullPTreeIterator + processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine); + } } //Adds the format postfix to the return buffer @@ -465,9 +513,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff } case LOGACCESS_FILTER_wildcard: { - if (queryValue.isEmpty()) - throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME); - DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str()); break; } @@ -605,14 +650,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, //from https://grafana.com/docs/loki/latest/query/log_queries/ //Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator. logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label - logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label + logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label //This drops the stream, and various insert timestamps //we're always going to get a stream container, and a the log line... //the stream container contains unnecessary, and redundant lines //there's documentation of a 'drop' command whch doesn't work in practice //online recomendation is to clear those stream entries... - logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\"); + logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\"); /* we're not going to attempt to parse the log line for now, return the entire log line in raw format @@ -664,8 +709,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, std::string readBuffer; submitQuery(readBuffer, fullQuery.str()); - processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true); - //DBGLOG("Query fetchLog result: %s", readBuffer.c_str()); + processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true); } catch(IException * e) { @@ -676,6 +720,19 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, return false; } +void processLogMapConfig(const IPropertyTree * logMapConfig, LogField * targetField) +{ + if (!logMapConfig || !targetField) + return; + + if (logMapConfig->hasProp(logMapIndexPatternAtt)) + if (strcmp(logMapConfig->queryProp(logMapIndexPatternAtt), "stream")==0) + targetField->isStream = true; + + if (logMapConfig->hasProp(logMapSearchColAtt)) + targetField->name = logMapConfig->queryProp(logMapSearchColAtt); +} + GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig) { m_pluginCfg.set(&logAccessPluginConfig); @@ -769,73 +826,29 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess IPropertyTree & logMap = logMapIter->query(); const char * logMapType = logMap.queryProp("@type"); if (streq(logMapType, "global")) - { - if (logMap.hasProp(logMapIndexPatternAtt)) - if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) - m_globalSearchCol.isStream = true; - - if (logMap.hasProp(logMapSearchColAtt)) - m_globalSearchCol.name = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_globalSearchCol); else if (streq(logMapType, "workunits")) - { - if (logMap.hasProp(logMapSearchColAtt)) - m_workunitsColumn = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_workunitsColumn); else if (streq(logMapType, "components")) - { - if (logMap.hasProp(logMapIndexPatternAtt)) - if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) - m_componentsColumn.isStream = true; - - if (logMap.hasProp(logMapSearchColAtt)) - m_componentsColumn.name = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_componentsColumn); else if (streq(logMapType, "class")) - { - if (logMap.hasProp(logMapSearchColAtt)) - m_classColumn = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_classColumn); else if (streq(logMapType, "audience")) - { - if (logMap.hasProp(logMapSearchColAtt)) - m_audienceColumn = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_audienceColumn); else if (streq(logMapType, "instance")) - { - if (logMap.hasProp(logMapIndexPatternAtt)) - if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) - m_instanceColumn.isStream = true; - - if (logMap.hasProp(logMapSearchColAtt)) - m_instanceColumn.name = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_instanceColumn); else if (streq(logMapType, "node")) - { - if (logMap.hasProp(logMapIndexPatternAtt)) - if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) - m_nodeColumn.isStream = true; - - if (logMap.hasProp(logMapSearchColAtt)) - m_nodeColumn.name = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_nodeColumn); else if (streq(logMapType, "host")) - { OWARNLOG("%s: 'host' LogMap entry is NOT supported!", COMPONENT_NAME); - } else if (streq(logMapType, "pod")) - { - if (logMap.hasProp(logMapIndexPatternAtt)) - if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0) - m_podColumn.isStream = true; - - if (logMap.hasProp(logMapSearchColAtt)) - m_podColumn.name = logMap.queryProp(logMapSearchColAtt); - } + processLogMapConfig(&logMap, &m_podColumn); + else if (streq(logMapType, "message")) + processLogMapConfig(&logMap, &m_messageColumn); + else if (streq(logMapType, "timestamp")) + processLogMapConfig(&logMap, &m_logDateTimstampColumn); else - { ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType); - } } DBGLOG("%s: targeting: '%s' - datasource: '%s'", COMPONENT_NAME, m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str()); diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp index fb6f71cff98..5953161f5a9 100644 --- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp @@ -73,13 +73,14 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf LogField m_instanceColumn = LogField("instance", true); LogField m_podColumn = LogField("pod", true); LogField m_containerColumn = LogField("container", true); - LogField m_messageColumn = LogField("MSG"); + LogField m_messageColumn = LogField("log"); LogField m_nodeColumn = LogField("node_name", true); - LogField m_logTimestampColumn = LogField("TIME"); - LogField m_logDatestampColumn = LogField("DATE"); - LogField m_logSequesnceColumn = LogField("MID"); - LogField m_logProcIDColumn = LogField("PID"); - LogField m_logThreadIDColumn = LogField("TID"); + LogField m_logDateTimstampColumn = LogField("tsNs"); + //LogField m_logTimestampColumn = LogField("TIME"); + //LogField m_logDatestampColumn = LogField("DATE"); + //LogField m_logSequesnceColumn = LogField("MID"); + //LogField m_logProcIDColumn = LogField("PID"); + //LogField m_logThreadIDColumn = LogField("TID"); //LogField m_logTraceIDColumn = LogField("TRC"); //LogField m_logSpanIDColumn = LogField("SPN"); @@ -87,8 +88,9 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf public: GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig); - void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader); + void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader); void processDatasourceJsonResp(const std::string & retrievedDocument); + void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine); void fetchDatasourceByName(const char * targetDataSourceName); void fetchDatasources(std::string & readBuffer); void fetchLabels(std::string & readBuffer);