diff --git a/esp/scm/ws_logaccess.ecm b/esp/scm/ws_logaccess.ecm
index bd214a668fa..5232f983275 100644
--- a/esp/scm/ws_logaccess.ecm
+++ b/esp/scm/ws_logaccess.ecm
@@ -79,7 +79,8 @@ ESPenum LogColumnValueType : string
str("string"),
numeric("numeric"),
datetime("datetime"),
- enum("enum")
+ enum("enum"),
+ epoch("epoch")
};
ESPStruct [nil_remove] LogColumn
diff --git a/esp/services/ws_logaccess/WsLogAccessService.cpp b/esp/services/ws_logaccess/WsLogAccessService.cpp
index 0e8b12c329d..98e4a98ad90 100644
--- a/esp/services/ws_logaccess/WsLogAccessService.cpp
+++ b/esp/services/ws_logaccess/WsLogAccessService.cpp
@@ -71,6 +71,10 @@ bool Cws_logaccessEx::onGetLogAccessInfo(IEspContext &context, IEspGetLogAccessI
WARNLOG("Invalid col type found in logaccess logmap config");
}
}
+ else
+ {
+ espLogColumn->setColumnType("string");
+ }
logColumns.append(*espLogColumn.getClear());
}
else
diff --git a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml
index 70d09058960..1b2375f09c8 100644
--- a/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml
+++ b/helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml
@@ -21,17 +21,27 @@ global:
- type: "components"
storeName: "stream"
searchColumn: "component"
- columnMode: "MIN"
+ columnMode: "ALL"
columnType: "string"
- type: "timestamp"
storeName: "values"
- searchColumn: "time"
- columnMode: "ALL"
+ searchColumn: "tsNs"
+ columnMode: "MIN"
columnType: "datetime"
- type: "pod"
storeName: "stream"
searchColumn: "pod"
+ columnMode: "DEFAULT"
+ columnType: "string"
+ - type: "message"
+ storeName: "values"
+ searchColumn: "log"
+ columnMode: "MIN"
+ columnType: "string"
+ - type: "node"
+ storeName: "stream"
columnMode: "ALL"
+ searchColumn: "node_name"
columnType: "string"
secrets:
esp:
diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
index 8e58aa8c596..505e2a82778 100644
--- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
+++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
@@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r
}
/*
- * This method consumes a logLine string from a successful Grafana Loki query
- * The LogLine is wrapped in the desired output format
+ * This method consumes a logLine represented as a set of field names and values
+ * The LogLine is wrapped in the requested output format
*/
-void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine)
+void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine)
{
switch (format)
{
case LOGACCESS_LOGFORMAT_xml:
{
- returnbuf.appendf("<%s>", resultLineName);
- encodeXML(resultLine, returnbuf);
- returnbuf.appendf("%s>", resultLineName);
+ returnbuf.append("");
+ Owned fieldsIter = resultLine->getIterator();
+ ForEach(*fieldsIter)
+ {
+ const char * prop = fieldsIter->queryPropValue();
+ returnbuf.appendf("<%s>", fieldsIter->getPropKey());
+ encodeXML(prop, returnbuf);
+ returnbuf.appendf("%s>", fieldsIter->getPropKey());
+ }
+ returnbuf.appendf("");
isFirstLine = false;
break;
}
@@ -210,15 +217,41 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c
if (!isFirstLine)
returnbuf.append(", ");
- returnbuf.append("\"");
- encodeJSON(returnbuf,resultLine);
- returnbuf.append("\"");
+ returnbuf.appendf("{\"fields\": [ ");
+ bool firstField = true;
+ Owned fieldsIter = resultLine->getIterator();
+ ForEach(*fieldsIter)
+ {
+ if (!firstField)
+ returnbuf.append(", ");
+ else
+ firstField = false;
+
+ const char * prop = fieldsIter->queryPropValue();
+ returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey());
+ encodeJSON(returnbuf,prop);
+ returnbuf.append("\"}");
+ }
+ returnbuf.append(" ]}");
+
isFirstLine = false;
break;
}
case LOGACCESS_LOGFORMAT_csv:
{
- encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column
+ bool firstField = true;
+ Owned fieldsIter = resultLine->getIterator();
+ ForEach(*fieldsIter)
+ {
+ if (!firstField)
+ returnbuf.append(", ");
+ else
+ firstField = false;
+
+ const char * fieldValue = resultLine->queryProp(fieldsIter->getPropKey());
+ encodeCSVColumn(returnbuf, fieldValue);
+ }
+
returnbuf.newline();
isFirstLine = false;
break;
@@ -230,18 +263,54 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c
/*
* This method consumes an Iterator of values elements from a successful Grafana Loki query
- * It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format
+ * It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children
+ * which represent timestamp in ns, and the log line, and formats into the requested format
*/
-void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine)
+void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine)
{
+ if (!valuesIter)
+ return;
+
+ Owned fieldValues = createProperties(true);
+
+ //extract the requested fields from the stream if it's available
+ if (stream)
+ {
+ switch (retcolmode)
+ {
+ case RETURNCOLS_MODE_all:
+ {
+ fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name));
+ fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name));
+ fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name));
+ [[fallthrough]];
+ }
+ case RETURNCOLS_MODE_default:
+ {
+ fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name));
+ [[fallthrough]];
+ }
+ case RETURNCOLS_MODE_min:
+ {
+ fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name));
+ fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name));
+ break;
+ }
+ case RETURNCOLS_MODE_custom: //not supported yet
+ default:
+ break;
+ }
+ }
+
ForEach(*valuesIter)
{
IPropertyTree & values = valuesIter->query();
int numofvalues = values.getCount("values");
if (values.getCount("values") == 2)
{
- //const char * insertTimeStamp = values.queryProp("values[1]");
- formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine);
+ fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]"));
+ fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]"));
+ formatResultLine(returnbuf, fieldValues, format, isFirstLine);
}
else
{
@@ -268,14 +337,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format
break;
}
case LOGACCESS_LOGFORMAT_csv:
- {
- if (reportHeader)
- {
- returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line
- returnbuf.newline();
- }
- break;
- }
default:
break;
}
@@ -305,31 +366,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format)
}
}
-/*
- * This method consumes JSON formatted elements from a successful Grafana Loki query
- * It extracts all values elements processes them into the desired format
- */
-void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine)
-{
- Owned logLineIter;
-
- if (result->hasProp("values"))
- {
- logLineIter.setown(result->getElements("values"));
- }
-
- processValues(returnbuf, logLineIter, format, isFirstLine);
-}
-
/*
* This method consumes the JSON response from a Grafana Loki query
* It attempts to unwrap the response and extract the log payload, and reports it in the desired format
*/
-void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader)
+void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader)
{
- resultDetails.totalReceived = 0;
- resultDetails.totalAvailable = 0;
-
Owned tree = createPTreeFromJSONString(retrievedDocument.c_str());
if (!tree)
throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME);
@@ -370,11 +412,17 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re
bool isFirstLine = true;
Owned resultIter = data->getElements("result");
- //many result elements can be returned, each with a unique set of labels
+ //many result elements can be returned, each with a unique set of labels and a common set of stream values
ForEach(*resultIter)
{
IPropertyTree & result = resultIter->query();
- wrapResult(returnbuf, &result, format, isFirstLine);
+ Owned logLineIter;
+
+ if (result.hasProp("values"))
+ {
+ logLineIter.setown(result.getElements("values")); // if no values elements found, will get NullPTreeIterator
+ processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine);
+ }
}
//Adds the format postfix to the return buffer
@@ -465,9 +513,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff
}
case LOGACCESS_FILTER_wildcard:
{
- if (queryValue.isEmpty())
- throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME);
-
DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str());
break;
}
@@ -605,14 +650,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
//from https://grafana.com/docs/loki/latest/query/log_queries/
//Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator.
logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label
- logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label
+ logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label
//This drops the stream, and various insert timestamps
//we're always going to get a stream container, and a the log line...
//the stream container contains unnecessary, and redundant lines
//there's documentation of a 'drop' command whch doesn't work in practice
//online recomendation is to clear those stream entries...
- logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\");
+ logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\");
/* we're not going to attempt to parse the log line for now,
return the entire log line in raw format
@@ -664,8 +709,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
std::string readBuffer;
submitQuery(readBuffer, fullQuery.str());
- processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true);
- //DBGLOG("Query fetchLog result: %s", readBuffer.c_str());
+ processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true);
}
catch(IException * e)
{
@@ -676,6 +720,19 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
return false;
}
+void processLogMapConfig(const IPropertyTree * logMapConfig, LogField * targetField)
+{
+ if (!logMapConfig || !targetField)
+ return;
+
+ if (logMapConfig->hasProp(logMapIndexPatternAtt))
+ if (strcmp(logMapConfig->queryProp(logMapIndexPatternAtt), "stream")==0)
+ targetField->isStream = true;
+
+ if (logMapConfig->hasProp(logMapSearchColAtt))
+ targetField->name = logMapConfig->queryProp(logMapSearchColAtt);
+}
+
GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig)
{
m_pluginCfg.set(&logAccessPluginConfig);
@@ -769,73 +826,29 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess
IPropertyTree & logMap = logMapIter->query();
const char * logMapType = logMap.queryProp("@type");
if (streq(logMapType, "global"))
- {
- if (logMap.hasProp(logMapIndexPatternAtt))
- if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
- m_globalSearchCol.isStream = true;
-
- if (logMap.hasProp(logMapSearchColAtt))
- m_globalSearchCol.name = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_globalSearchCol);
else if (streq(logMapType, "workunits"))
- {
- if (logMap.hasProp(logMapSearchColAtt))
- m_workunitsColumn = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_workunitsColumn);
else if (streq(logMapType, "components"))
- {
- if (logMap.hasProp(logMapIndexPatternAtt))
- if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
- m_componentsColumn.isStream = true;
-
- if (logMap.hasProp(logMapSearchColAtt))
- m_componentsColumn.name = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_componentsColumn);
else if (streq(logMapType, "class"))
- {
- if (logMap.hasProp(logMapSearchColAtt))
- m_classColumn = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_classColumn);
else if (streq(logMapType, "audience"))
- {
- if (logMap.hasProp(logMapSearchColAtt))
- m_audienceColumn = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_audienceColumn);
else if (streq(logMapType, "instance"))
- {
- if (logMap.hasProp(logMapIndexPatternAtt))
- if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
- m_instanceColumn.isStream = true;
-
- if (logMap.hasProp(logMapSearchColAtt))
- m_instanceColumn.name = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_instanceColumn);
else if (streq(logMapType, "node"))
- {
- if (logMap.hasProp(logMapIndexPatternAtt))
- if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
- m_nodeColumn.isStream = true;
-
- if (logMap.hasProp(logMapSearchColAtt))
- m_nodeColumn.name = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_nodeColumn);
else if (streq(logMapType, "host"))
- {
OWARNLOG("%s: 'host' LogMap entry is NOT supported!", COMPONENT_NAME);
- }
else if (streq(logMapType, "pod"))
- {
- if (logMap.hasProp(logMapIndexPatternAtt))
- if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
- m_podColumn.isStream = true;
-
- if (logMap.hasProp(logMapSearchColAtt))
- m_podColumn.name = logMap.queryProp(logMapSearchColAtt);
- }
+ processLogMapConfig(&logMap, &m_podColumn);
+ else if (streq(logMapType, "message"))
+ processLogMapConfig(&logMap, &m_messageColumn);
+ else if (streq(logMapType, "timestamp"))
+ processLogMapConfig(&logMap, &m_logDateTimstampColumn);
else
- {
ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType);
- }
}
DBGLOG("%s: targeting: '%s' - datasource: '%s'", COMPONENT_NAME, m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str());
diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp
index fb6f71cff98..5953161f5a9 100644
--- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp
+++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp
@@ -73,13 +73,14 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf
LogField m_instanceColumn = LogField("instance", true);
LogField m_podColumn = LogField("pod", true);
LogField m_containerColumn = LogField("container", true);
- LogField m_messageColumn = LogField("MSG");
+ LogField m_messageColumn = LogField("log");
LogField m_nodeColumn = LogField("node_name", true);
- LogField m_logTimestampColumn = LogField("TIME");
- LogField m_logDatestampColumn = LogField("DATE");
- LogField m_logSequesnceColumn = LogField("MID");
- LogField m_logProcIDColumn = LogField("PID");
- LogField m_logThreadIDColumn = LogField("TID");
+ LogField m_logDateTimstampColumn = LogField("tsNs");
+ //LogField m_logTimestampColumn = LogField("TIME");
+ //LogField m_logDatestampColumn = LogField("DATE");
+ //LogField m_logSequesnceColumn = LogField("MID");
+ //LogField m_logProcIDColumn = LogField("PID");
+ //LogField m_logThreadIDColumn = LogField("TID");
//LogField m_logTraceIDColumn = LogField("TRC");
//LogField m_logSpanIDColumn = LogField("SPN");
@@ -87,8 +88,9 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf
public:
GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig);
- void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader);
+ void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader);
void processDatasourceJsonResp(const std::string & retrievedDocument);
+ void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine);
void fetchDatasourceByName(const char * targetDataSourceName);
void fetchDatasources(std::string & readBuffer);
void fetchLabels(std::string & readBuffer);