Skip to content

Commit

Permalink
HPCC-32281 Grafana/Loki logaccess 2nd phase improvements
Browse files Browse the repository at this point in the history
- Ensures proper log output format
- Utilizes context log fields available
- Updates grafana logaccess sample config
- Replaces unnecessary funtion w/ inline logic
- Removes invalid wildcard filter restriction
- Trims log data at server due to added json newlines
- Reenables several stream columns

Signed-off-by: Rodrigo Pastrana <[email protected]>
  • Loading branch information
rpastrana committed Jul 22, 2024
1 parent 1adf1ca commit 4590538
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 61 deletions.
16 changes: 13 additions & 3 deletions helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ global:
- type: "components"
storeName: "stream"
searchColumn: "component"
columnMode: "MIN"
columnMode: "ALL"
columnType: "string"
- type: "timestamp"
storeName: "values"
searchColumn: "time"
columnMode: "ALL"
searchColumn: "tsNs"
columnMode: "MIN"
columnType: "datetime"
- type: "pod"
storeName: "stream"
searchColumn: "pod"
columnMode: "DEFAULT"
columnType: "string"
- type: "message"
storeName: "values"
searchColumn: "log"
columnMode: "MIN"
columnType: "string"
- type: "node"
storeName: "stream"
columnMode: "ALL"
searchColumn: "node_name"
columnType: "string"
secrets:
esp:
Expand Down
161 changes: 110 additions & 51 deletions system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r
}

/*
* This method consumes a logLine string from a successful Grafana Loki query
* The LogLine is wrapped in the desired output format
* This method consumes a logLine represented as a set of field names and values
* The LogLine is wrapped in the requested output format
*/
void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine)
void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine)
{
switch (format)
{
case LOGACCESS_LOGFORMAT_xml:
{
returnbuf.appendf("<%s>", resultLineName);
encodeXML(resultLine, returnbuf);
returnbuf.appendf("</%s>", resultLineName);
returnbuf.append("<line>");
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
const char * prop = resultLine->queryProp(fieldsIter->getPropKey());
returnbuf.appendf("<%s>", fieldsIter->getPropKey());
encodeXML(prop, returnbuf);
returnbuf.appendf("</%s>", fieldsIter->getPropKey());
}
returnbuf.appendf("</lin>");
isFirstLine = false;
break;
}
Expand All @@ -210,15 +217,42 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c
if (!isFirstLine)
returnbuf.append(", ");

returnbuf.append("\"");
encodeJSON(returnbuf,resultLine);
returnbuf.append("\"");
returnbuf.appendf("{\"fields\": [ ");
bool firstField = true;
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
if (!firstField)
returnbuf.append(", ");
else
firstField = false;

const char * prop = resultLine->queryProp(fieldsIter->getPropKey());
returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey());
encodeJSON(returnbuf,prop);
returnbuf.append("\"}");
}
returnbuf.append(" ]}");

isFirstLine = false;
break;
}
case LOGACCESS_LOGFORMAT_csv:
{
encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column
bool firstField = true;
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
if (!firstField)
returnbuf.append(", ");
else
firstField = false;

StringBuffer prop;
resultLine->getProp(fieldsIter->getPropKey(), prop);
encodeCSVColumn(returnbuf, prop);
}

returnbuf.newline();
isFirstLine = false;
break;
Expand All @@ -230,18 +264,49 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c

/*
* This method consumes an Iterator of values elements from a successful Grafana Loki query
* It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format
* It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children
* which represent timestamp in ns, and the log line, and formats into the requested format
*/
void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine)
void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine)
{
Owned<IProperties> fieldValues = createProperties(true);

//extract the requested fields from the stream if it's available
if (stream)
{
switch (retcolmode)
{
case RETURNCOLS_MODE_all:
{
fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name));
fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name));
fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name));
}
case RETURNCOLS_MODE_default:
{
fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name));
}
case RETURNCOLS_MODE_min:
{
fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name));
fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name));
break;
}
case RETURNCOLS_MODE_custom: //not supported yet
default:
break;
}
}

ForEach(*valuesIter)
{
IPropertyTree & values = valuesIter->query();
int numofvalues = values.getCount("values");
if (values.getCount("values") == 2)
{
//const char * insertTimeStamp = values.queryProp("values[1]");
formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine);
fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]"));
fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]"));
formatResultLine(returnbuf, fieldValues, format, isFirstLine);
}
else
{
Expand All @@ -268,14 +333,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format
break;
}
case LOGACCESS_LOGFORMAT_csv:
{
if (reportHeader)
{
returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line
returnbuf.newline();
}
break;
}
default:
break;
}
Expand Down Expand Up @@ -305,31 +362,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format)
}
}

/*
* This method consumes JSON formatted elements from a successful Grafana Loki query
* It extracts all values elements processes them into the desired format
*/
void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine)
{
Owned<IPropertyTreeIterator> logLineIter;

if (result->hasProp("values"))
{
logLineIter.setown(result->getElements("values"));
}

processValues(returnbuf, logLineIter, format, isFirstLine);
}

/*
* This method consumes the JSON response from a Grafana Loki query
* It attempts to unwrap the response and extract the log payload, and reports it in the desired format
*/
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader)
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader)
{
resultDetails.totalReceived = 0;
resultDetails.totalAvailable = 0;

Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.c_str());
if (!tree)
throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME);
Expand Down Expand Up @@ -370,11 +408,18 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re

bool isFirstLine = true;
Owned<IPropertyTreeIterator> resultIter = data->getElements("result");
//many result elements can be returned, each with a unique set of labels
//many result elements can be returned, each with a unique set of labels and a common set of stream values
ForEach(*resultIter)
{
IPropertyTree & result = resultIter->query();
wrapResult(returnbuf, &result, format, isFirstLine);
Owned<IPropertyTreeIterator> logLineIter;

if (result.hasProp("values"))
{
logLineIter.setown(result.getElements("values"));
}

processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine);
}

//Adds the format postfix to the return buffer
Expand Down Expand Up @@ -465,9 +510,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff
}
case LOGACCESS_FILTER_wildcard:
{
if (queryValue.isEmpty())
throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME);

DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str());
break;
}
Expand Down Expand Up @@ -605,14 +647,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
//from https://grafana.com/docs/loki/latest/query/log_queries/
//Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator.
logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label
logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label
logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label
//This drops the stream, and various insert timestamps

//we're always going to get a stream container, and a the log line...
//the stream container contains unnecessary, and redundant lines
//there's documentation of a 'drop' command whch doesn't work in practice
//online recomendation is to clear those stream entries...
logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\");
logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\");

/* we're not going to attempt to parse the log line for now,
return the entire log line in raw format
Expand Down Expand Up @@ -664,8 +706,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
std::string readBuffer;
submitQuery(readBuffer, fullQuery.str());

processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true);
//DBGLOG("Query fetchLog result: %s", readBuffer.c_str());
processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true);
}
catch(IException * e)
{
Expand Down Expand Up @@ -832,6 +873,24 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess
if (logMap.hasProp(logMapSearchColAtt))
m_podColumn.name = logMap.queryProp(logMapSearchColAtt);
}
else if (streq(logMapType, "message"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_messageColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_messageColumn.name = logMap.queryProp(logMapSearchColAtt);
}
else if (streq(logMapType, "timestamp"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_logDateTimstampColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_logDateTimstampColumn.name = logMap.queryProp(logMapSearchColAtt);
}
else
{
ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType);
Expand Down
16 changes: 9 additions & 7 deletions system/logaccess/Grafana/CurlClient/GrafanaCurlClient.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,24 @@ class GrafanaLogAccessCurlClient : public CInterfaceOf<IRemoteLogAccess>
LogField m_instanceColumn = LogField("instance", true);
LogField m_podColumn = LogField("pod", true);
LogField m_containerColumn = LogField("container", true);
LogField m_messageColumn = LogField("MSG");
LogField m_messageColumn = LogField("log");
LogField m_nodeColumn = LogField("node_name", true);
LogField m_logTimestampColumn = LogField("TIME");
LogField m_logDatestampColumn = LogField("DATE");
LogField m_logSequesnceColumn = LogField("MID");
LogField m_logProcIDColumn = LogField("PID");
LogField m_logThreadIDColumn = LogField("TID");
LogField m_logDateTimstampColumn = LogField("tsNs");
//LogField m_logTimestampColumn = LogField("TIME");
//LogField m_logDatestampColumn = LogField("DATE");
//LogField m_logSequesnceColumn = LogField("MID");
//LogField m_logProcIDColumn = LogField("PID");
//LogField m_logThreadIDColumn = LogField("TID");
//LogField m_logTraceIDColumn = LogField("TRC");
//LogField m_logSpanIDColumn = LogField("SPN");

StringAttr m_expectedLogFormat; //json|table|xml

public:
GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig);
void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader);
void processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader);
void processDatasourceJsonResp(const std::string & retrievedDocument);
void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine);
void fetchDatasourceByName(const char * targetDataSourceName);
void fetchDatasources(std::string & readBuffer);
void fetchLabels(std::string & readBuffer);
Expand Down

0 comments on commit 4590538

Please sign in to comment.