Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32281 Grafana/Loki logaccess 2nd phase improvements #18900

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion esp/scm/ws_logaccess.ecm
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ ESPenum LogColumnValueType : string
str("string"),
numeric("numeric"),
datetime("datetime"),
enum("enum")
enum("enum"),
epoch("epoch")
};

ESPStruct [nil_remove] LogColumn
Expand Down
4 changes: 4 additions & 0 deletions esp/services/ws_logaccess/WsLogAccessService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ bool Cws_logaccessEx::onGetLogAccessInfo(IEspContext &context, IEspGetLogAccessI
WARNLOG("Invalid col type found in logaccess logmap config");
}
}
else
{
espLogColumn->setColumnType("string");
}
logColumns.append(*espLogColumn.getClear());
}
else
Expand Down
16 changes: 13 additions & 3 deletions helm/managed/logging/loki-stack/grafana-hpcc-logaccess.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,27 @@ global:
- type: "components"
storeName: "stream"
searchColumn: "component"
columnMode: "MIN"
columnMode: "ALL"
columnType: "string"
- type: "timestamp"
storeName: "values"
searchColumn: "time"
columnMode: "ALL"
searchColumn: "tsNs"
columnMode: "MIN"
columnType: "datetime"
- type: "pod"
storeName: "stream"
searchColumn: "pod"
columnMode: "DEFAULT"
columnType: "string"
- type: "message"
storeName: "values"
searchColumn: "log"
columnMode: "MIN"
columnType: "string"
- type: "node"
storeName: "stream"
columnMode: "ALL"
searchColumn: "node_name"
columnType: "string"
secrets:
esp:
Expand Down
227 changes: 120 additions & 107 deletions system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,25 @@ void GrafanaLogAccessCurlClient::processDatasourceJsonResp(const std::string & r
}

/*
* This method consumes a logLine string from a successful Grafana Loki query
* The LogLine is wrapped in the desired output format
* This method consumes a logLine represented as a set of field names and values
* The LogLine is wrapped in the requested output format
*/
void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const char * resultLineName, LogAccessLogFormat format, bool & isFirstLine)
void formatResultLine(StringBuffer & returnbuf, const IProperties * resultLine, LogAccessLogFormat format, bool & isFirstLine)
{
switch (format)
{
case LOGACCESS_LOGFORMAT_xml:
{
returnbuf.appendf("<%s>", resultLineName);
encodeXML(resultLine, returnbuf);
returnbuf.appendf("</%s>", resultLineName);
returnbuf.append("<line>");
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
const char * prop = fieldsIter->queryPropValue();
returnbuf.appendf("<%s>", fieldsIter->getPropKey());
encodeXML(prop, returnbuf);
returnbuf.appendf("</%s>", fieldsIter->getPropKey());
}
returnbuf.appendf("</line>");
isFirstLine = false;
break;
}
Expand All @@ -210,15 +217,41 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c
if (!isFirstLine)
returnbuf.append(", ");

returnbuf.append("\"");
encodeJSON(returnbuf,resultLine);
returnbuf.append("\"");
returnbuf.appendf("{\"fields\": [ ");
bool firstField = true;
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
if (!firstField)
returnbuf.append(", ");
else
firstField = false;

const char * prop = fieldsIter->queryPropValue();
returnbuf.appendf("{\"%s\":\"", fieldsIter->getPropKey());
encodeJSON(returnbuf,prop);
returnbuf.append("\"}");
}
returnbuf.append(" ]}");

isFirstLine = false;
break;
}
case LOGACCESS_LOGFORMAT_csv:
{
encodeCSVColumn(returnbuf, resultLine); //Currently treating entire log line as a single CSV column
bool firstField = true;
Owned<IPropertyIterator> fieldsIter = resultLine->getIterator();
ForEach(*fieldsIter)
{
if (!firstField)
returnbuf.append(", ");
else
firstField = false;

const char * fieldValue = resultLine->queryProp(fieldsIter->getPropKey());
encodeCSVColumn(returnbuf, fieldValue);
}

returnbuf.newline();
isFirstLine = false;
break;
Expand All @@ -230,18 +263,54 @@ void formatResultLine(StringBuffer & returnbuf, const char * resultLine, const c

/*
* This method consumes an Iterator of values elements from a successful Grafana Loki query
* It ignores the 1st child (ingest timestamp in ns), and formats the 2nd child (log line) into the desired format
* It extracts the appropriate "stream" values based on return column mode, and the values' 1st and 2nd children
* which represent timestamp in ns, and the log line, and formats into the requested format
*/
void processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, LogAccessLogFormat format, bool & isFirstLine)
void GrafanaLogAccessCurlClient::processValues(StringBuffer & returnbuf, IPropertyTreeIterator * valuesIter, IPropertyTree * stream, LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool & isFirstLine)
{
if (!valuesIter)
return;

Owned<IProperties> fieldValues = createProperties(true);

//extract the requested fields from the stream if it's available
if (stream)
{
switch (retcolmode)
{
case RETURNCOLS_MODE_all:
{
fieldValues->setProp(m_nodeColumn.name, stream->queryProp(m_nodeColumn.name));
fieldValues->setProp(m_containerColumn.name, stream->queryProp(m_containerColumn.name));
fieldValues->setProp(m_instanceColumn.name, stream->queryProp(m_instanceColumn.name));
[[fallthrough]];
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C17 feature, not sure if there's any concern using c17 specific constructs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, I hadn't picked up on that feature. We require c++17, so no problem using the feature - it has the advantage it will be checked by the compiler.

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a comment //fallthrough whenever a case falls through to the next case.

I initially read the code incorrectly because it wasn't there, and coverity will also complain.

case RETURNCOLS_MODE_default:
{
fieldValues->setProp(m_podColumn.name, stream->queryProp(m_podColumn.name));
[[fallthrough]];
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

//fallthrough

case RETURNCOLS_MODE_min:
{
fieldValues->setProp(m_logDateTimstampColumn.name, stream->queryProp(m_logDateTimstampColumn.name));
fieldValues->setProp(m_messageColumn.name, stream->queryProp(m_messageColumn.name));
break;
}
case RETURNCOLS_MODE_custom: //not supported yet
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to fail and let user know

default:
break;
}
}

ForEach(*valuesIter)
{
IPropertyTree & values = valuesIter->query();
int numofvalues = values.getCount("values");
if (values.getCount("values") == 2)
{
//const char * insertTimeStamp = values.queryProp("values[1]");
formatResultLine(returnbuf, values.queryProp("values[2]"), "line", format, isFirstLine);
fieldValues->setProp(m_logDateTimstampColumn.name, values.queryProp("values[1]"));
fieldValues->setProp(m_messageColumn.name, values.queryProp("values[2]"));
formatResultLine(returnbuf, fieldValues, format, isFirstLine);
}
else
{
Expand All @@ -268,14 +337,6 @@ inline void resultsWrapStart(StringBuffer & returnbuf, LogAccessLogFormat format
break;
}
case LOGACCESS_LOGFORMAT_csv:
{
if (reportHeader)
{
returnbuf.append("line"); // this is the entire header for CSV if we're only reporting the line
returnbuf.newline();
}
break;
}
default:
break;
}
Expand Down Expand Up @@ -305,31 +366,12 @@ inline void resultsWrapEnd(StringBuffer & returnbuf, LogAccessLogFormat format)
}
}

/*
* This method consumes JSON formatted elements from a successful Grafana Loki query
* It extracts all values elements processes them into the desired format
*/
void wrapResult(StringBuffer & returnbuf, IPropertyTree * result, LogAccessLogFormat format, bool & isFirstLine)
{
Owned<IPropertyTreeIterator> logLineIter;

if (result->hasProp("values"))
{
logLineIter.setown(result->getElements("values"));
}

processValues(returnbuf, logLineIter, format, isFirstLine);
}

/*
* This method consumes the JSON response from a Grafana Loki query
* It attempts to unwrap the response and extract the log payload, and reports it in the desired format
*/
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, LogAccessLogFormat format, bool reportHeader)
void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & resultDetails, const std::string & retrievedDocument, StringBuffer & returnbuf, const LogAccessLogFormat format, const LogAccessReturnColsMode retcolmode, bool reportHeader)
{
resultDetails.totalReceived = 0;
resultDetails.totalAvailable = 0;

Owned<IPropertyTree> tree = createPTreeFromJSONString(retrievedDocument.c_str());
if (!tree)
throw makeStringExceptionV(-1, "%s: Could not parse log query response", COMPONENT_NAME);
Expand Down Expand Up @@ -370,11 +412,17 @@ void GrafanaLogAccessCurlClient::processQueryJsonResp(LogQueryResultDetails & re

bool isFirstLine = true;
Owned<IPropertyTreeIterator> resultIter = data->getElements("result");
//many result elements can be returned, each with a unique set of labels
//many result elements can be returned, each with a unique set of labels and a common set of stream values
ForEach(*resultIter)
{
IPropertyTree & result = resultIter->query();
wrapResult(returnbuf, &result, format, isFirstLine);
Owned<IPropertyTreeIterator> logLineIter;

if (result.hasProp("values"))
{
logLineIter.setown(result.getElements("values")); // if no values elements found, will get NullPTreeIterator
processValues(returnbuf, logLineIter, result.queryPropTree("stream"), format, retcolmode, isFirstLine);
}
}

//Adds the format postfix to the return buffer
Expand Down Expand Up @@ -465,9 +513,6 @@ void GrafanaLogAccessCurlClient::populateQueryFilterAndStreamSelector(StringBuff
}
case LOGACCESS_FILTER_wildcard:
{
if (queryValue.isEmpty())
throw makeStringExceptionV(-1, "%s: Wildcard filter cannot be empty!", COMPONENT_NAME);

DBGLOG("%s: Searching log entries by wildcard filter: '%s %s %s'...", COMPONENT_NAME, queryField.str(), queryOperator, queryValue.str());
break;
}
Expand Down Expand Up @@ -605,14 +650,14 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
//from https://grafana.com/docs/loki/latest/query/log_queries/
//Adding | json to your pipeline will extract all json properties as labels if the log line is a valid json document. Nested properties are flattened into label keys using the _ separator.
logLineParser.set(" | json log"); //this parses the log entry and extracts the log field into a label
logLineParser.append(" | line_format \"{{.log}}\""); //Formats output line to only contain log label
logLineParser.append(" | line_format \"{{.log | trim}}\""); //Formats output line to only contain log label
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the line_format output doesn't clean up all json new lines

//This drops the stream, and various insert timestamps

//we're always going to get a stream container, and a the log line...
//the stream container contains unnecessary, and redundant lines
//there's documentation of a 'drop' command whch doesn't work in practice
//online recomendation is to clear those stream entries...
logLineParser.append(" | label_format log=\"\", filename=\"\", namespace=\"\", node_name=\"\", job=\"\"");// app=\"\", component=\"\", container=\"\", instance=\"\");
logLineParser.append(" | label_format log=\"\", filename=\"\"");//, namespace=\"\", job=\"\""// app=\"\", component=\"\", container=\"\", instance=\"\");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of the "stream" values are useful, don't clear them


/* we're not going to attempt to parse the log line for now,
return the entire log line in raw format
Expand Down Expand Up @@ -664,8 +709,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
std::string readBuffer;
submitQuery(readBuffer, fullQuery.str());

processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, true);
//DBGLOG("Query fetchLog result: %s", readBuffer.c_str());
processQueryJsonResp(resultDetails, readBuffer, returnbuf, format, options.getReturnColsMode(), true);
}
catch(IException * e)
{
Expand All @@ -676,6 +720,19 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
return false;
}

void processLogMapConfig(const IPropertyTree * logMapConfig, LogField * targetField)
{
if (!logMapConfig || !targetField)
return;

if (logMapConfig->hasProp(logMapIndexPatternAtt))
if (strcmp(logMapConfig->queryProp(logMapIndexPatternAtt), "stream")==0)
targetField->isStream = true;

if (logMapConfig->hasProp(logMapSearchColAtt))
targetField->name = logMapConfig->queryProp(logMapSearchColAtt);
}

GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccessPluginConfig)
{
m_pluginCfg.set(&logAccessPluginConfig);
Expand Down Expand Up @@ -769,73 +826,29 @@ GrafanaLogAccessCurlClient::GrafanaLogAccessCurlClient(IPropertyTree & logAccess
IPropertyTree & logMap = logMapIter->query();
const char * logMapType = logMap.queryProp("@type");
if (streq(logMapType, "global"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_globalSearchCol.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_globalSearchCol.name = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_globalSearchCol);
else if (streq(logMapType, "workunits"))
{
if (logMap.hasProp(logMapSearchColAtt))
m_workunitsColumn = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_workunitsColumn);
else if (streq(logMapType, "components"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_componentsColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_componentsColumn.name = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_componentsColumn);
else if (streq(logMapType, "class"))
{
if (logMap.hasProp(logMapSearchColAtt))
m_classColumn = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_classColumn);
else if (streq(logMapType, "audience"))
{
if (logMap.hasProp(logMapSearchColAtt))
m_audienceColumn = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_audienceColumn);
else if (streq(logMapType, "instance"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_instanceColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_instanceColumn.name = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_instanceColumn);
else if (streq(logMapType, "node"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_nodeColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_nodeColumn.name = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_nodeColumn);
else if (streq(logMapType, "host"))
{
OWARNLOG("%s: 'host' LogMap entry is NOT supported!", COMPONENT_NAME);
}
else if (streq(logMapType, "pod"))
{
if (logMap.hasProp(logMapIndexPatternAtt))
if (strcmp(logMap.queryProp(logMapIndexPatternAtt), "stream")==0)
m_podColumn.isStream = true;

if (logMap.hasProp(logMapSearchColAtt))
m_podColumn.name = logMap.queryProp(logMapSearchColAtt);
}
processLogMapConfig(&logMap, &m_podColumn);
else if (streq(logMapType, "message"))
processLogMapConfig(&logMap, &m_messageColumn);
else if (streq(logMapType, "timestamp"))
processLogMapConfig(&logMap, &m_logDateTimstampColumn);
else
{
ERRLOG("Encountered invalid LogAccess field map type: '%s'", logMapType);
}
}

DBGLOG("%s: targeting: '%s' - datasource: '%s'", COMPONENT_NAME, m_grafanaConnectionStr.str(), m_dataSourcesAPIURI.str());
Expand Down
Loading
Loading