diff --git a/common/thorhelper/thorsoapcall.cpp b/common/thorhelper/thorsoapcall.cpp index b87a2bd4ec1..1970b44e6a0 100644 --- a/common/thorhelper/thorsoapcall.cpp +++ b/common/thorhelper/thorsoapcall.cpp @@ -50,6 +50,40 @@ using roxiemem::OwnedRoxieString; #define CONNECTION "Connection" unsigned soapTraceLevel = 1; +static StringBuffer soapSepString; + +void setSoapSepString(const char *_soapSepString) +{ + soapSepString.set(_soapSepString); +} + +static void multiLineAppendReplace(StringBuffer &origStr, StringBuffer &newStr) +{ + if (origStr.isEmpty()) + return; + + newStr.ensureCapacity(origStr.length()); + + const char *cursor = origStr; + while (*cursor) + { + switch (*cursor) + { + case '\r': + newStr.append(soapSepString); + if ('\n' == *(cursor+1)) + cursor++; + break; + case '\n': + newStr.append(soapSepString); + break; + default: + newStr.append(*cursor); + break; + } + ++cursor; + } +} #define WSCBUFFERSIZE 0x10000 #define MAXWSCTHREADS 50 //Max Web Service Call Threads @@ -1940,10 +1974,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo { if (soapTraceLevel > 6 || master->logXML) { - if (!contentEncoded) - master->logctx.mCTXLOG("%s: request(%s)", master->wscCallTypeText(), request.str()); + StringBuffer contentStr; + if (contentEncoded) + contentStr.append(", content encoded."); + // Only do translation if soapcall LOG option set and soapSepString defined + if ( (master->logXML) && (soapSepString.length() > 0) ) + { + StringBuffer request2; + multiLineAppendReplace(request, request2); + master->logctx.CTXLOG("%s: request(%s)%s", master->wscCallTypeText(), request2.str(), contentStr.str()); + } else - master->logctx.mCTXLOG("%s: request(%s), content encoded.", master->wscCallTypeText(), request.str()); + master->logctx.mCTXLOG("%s: request(%s)%s", master->wscCallTypeText(), request.str(), contentStr.str()); } } @@ -2247,9 +2289,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo if (checkContentDecoding(dbgheader, response, contentEncoding)) decodeContent(contentEncoding.str(), response); if (soapTraceLevel > 6 || master->logXML) - master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str()); - else if (soapTraceLevel > 8) - master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", response.str()); // not sure this is that useful but... + { + // Only do translation if soapcall LOG option set and soapSepString defined + if ( (master->logXML) && (soapSepString.length() > 0) ) + { + StringBuffer response2; + multiLineAppendReplace(dbgheader, response2); + multiLineAppendReplace(response, response2); + master->logctx.CTXLOG("%s: LEN=%d %sresponse(%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", response2.str()); + } + else + master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str()); + } return rval; } diff --git a/common/thorhelper/thorsoapcall.hpp b/common/thorhelper/thorsoapcall.hpp index aa1c97a6c22..4ea42feef69 100644 --- a/common/thorhelper/thorsoapcall.hpp +++ b/common/thorhelper/thorsoapcall.hpp @@ -86,5 +86,6 @@ interface IRoxieAbortMonitor extern THORHELPER_API unsigned soapTraceLevel; extern THORHELPER_API IWSCHelper * createSoapCallHelper(IWSCRowProvider *, IEngineRowAllocator * outputAllocator, const char *authToken, SoapCallMode scMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor); extern THORHELPER_API IWSCHelper * createHttpCallHelper(IWSCRowProvider *, IEngineRowAllocator * outputAllocator, const char *authToken, SoapCallMode scMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor); +extern THORHELPER_API void setSoapSepString(const char *_soapSepString); #endif /* __THORSOAPCALL_HPP_ */ diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index e00fb39bcb7..610c15507cc 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -7738,6 +7738,10 @@ void CHThorWSCBaseActivity::init() JBASE64_Encode(uidpair.str(), uidpair.length(), authToken, false); } soapTraceLevel = agent.queryWorkUnit()->getDebugValueInt("soapTraceLevel", 1); + StringBuffer soapSepStr; + StringBufferAdaptor soapSepAdaptor(soapSepStr); + agent.queryWorkUnit()->getDebugValue("soapLogSepString", soapSepAdaptor); + setSoapSepString(soapSepStr.str()); } //--------------------------------------------------------------------------- diff --git a/esp/src/eclwatch/WUDetailsWidget.js b/esp/src/eclwatch/WUDetailsWidget.js index c7d2c331af0..411bba32c52 100644 --- a/esp/src/eclwatch/WUDetailsWidget.js +++ b/esp/src/eclwatch/WUDetailsWidget.js @@ -117,6 +117,16 @@ define([ this.emailFrom = registry.byId(this.id + "EmailFrom"); this.emailSubject = registry.byId(this.id + "EmailSubject"); this.emailBody = registry.byId(this.id + "EmailBody"); + + //Zap LogFilters + this.logFilterStartDateTime = dom.byId(this.id + "StartDateTime"); + this.logFilterStartDate = registry.byId(this.id + "StartDate"); + this.logFilterStartTime = registry.byId(this.id + "StartTime"); + this.logFilterEndDateTime = dom.byId(this.id + "EndDateTime"); + this.logFilterEndDate = registry.byId(this.id + "EndDate"); + this.logFilterEndTime = registry.byId(this.id + "EndTime"); + this.logFilterRelativeTimeRangeBuffer = registry.byId(this.id + "RelativeTimeRangeBuffer"); + this.protected = registry.byId(this.id + "Protected"); this.infoGridWidget = registry.byId(this.id + "InfoContainer"); this.zapDialog = registry.byId(this.id + "ZapDialog"); @@ -146,14 +156,33 @@ define([ this.checkThorLogStatus(); }, + formatLogFilterDateTime: function (dateField, timeField, dateTimeField) { + if (dateField.value.toString() !== "Invalid Date") { + const d = new Date(dateField.value); + const date = `${d.getFullYear()}-${(d.getMonth() < 9 ? "0" : "") + parseInt(d.getMonth() + 1, 10)}-${d.getDate()}`; + const time = timeField.value.toString().replace(/.*1970\s(\S+).*/, "$1"); + dateTimeField.value = `${date}T${time}.000Z`; + } + }, + _onSubmitDialog: function () { var context = this; var includeSlaveLogsCheckbox = this.includeSlaveLogsCheckbox.get("checked"); + if (this.logFilterRelativeTimeRangeBuffer.value !== "") { + this.logFilterEndDate.required = ""; + this.logFilterStartDate.required = ""; + } if (this.zapForm.validate()) { //WUCreateAndDownloadZAPInfo is not a webservice so relying on form to submit. //Server treats "on" and '' as the same thing. this.includeSlaveLogsCheckbox.set("value", includeSlaveLogsCheckbox ? "on" : "off"); + + // Log Filters + this.formatLogFilterDateTime(this.logFilterStartDate, this.logFilterStartTime, this.logFilterStartDateTime); + this.formatLogFilterDateTime(this.logFilterEndDate, this.logFilterEndTime, this.logFilterEndDateTime); + this.zapForm.set("action", "/WsWorkunits/WUCreateAndDownloadZAPInfo"); + this.zapDialog.hide(); this.checkThorLogStatus(); if (this.logAccessorMessage !== "") { diff --git a/esp/src/eclwatch/css/hpcc.css b/esp/src/eclwatch/css/hpcc.css index 26559935861..e6d0e6f6f0b 100644 --- a/esp/src/eclwatch/css/hpcc.css +++ b/esp/src/eclwatch/css/hpcc.css @@ -74,6 +74,10 @@ form li label { padding-top: 4px; } +.dijitDialogPaneContent { + overflow-x: hidden !important; +} + .dijitDialogPaneContent form li label { float: left; width: 25%; diff --git a/esp/src/eclwatch/templates/WUDetailsWidget.html b/esp/src/eclwatch/templates/WUDetailsWidget.html index a56d36ec15c..ef58752e1a6 100644 --- a/esp/src/eclwatch/templates/WUDetailsWidget.html +++ b/esp/src/eclwatch/templates/WUDetailsWidget.html @@ -183,7 +183,7 @@

-
+
@@ -199,8 +199,41 @@

+ + + + + + + + + + + + + +

-
+
diff --git a/esp/src/package-lock.json b/esp/src/package-lock.json index 262f9708232..aa0994e6469 100644 --- a/esp/src/package-lock.json +++ b/esp/src/package-lock.json @@ -18,7 +18,7 @@ "@hpcc-js/chart": "2.84.1", "@hpcc-js/codemirror": "2.63.0", "@hpcc-js/common": "2.72.0", - "@hpcc-js/comms": "2.96.1", + "@hpcc-js/comms": "2.97.0", "@hpcc-js/dataflow": "8.1.7", "@hpcc-js/eclwatch": "2.75.3", "@hpcc-js/graph": "2.86.0", @@ -2144,9 +2144,9 @@ } }, "node_modules/@hpcc-js/comms": { - "version": "2.96.1", - "resolved": "https://registry.npmjs.org/@hpcc-js/comms/-/comms-2.96.1.tgz", - "integrity": "sha512-38vIe8foZa5fYtrj65oeWyYWUDZmQTbKetHG5HXWZWMu0Lfmln8uG5/J7mO0ilw3ls2oZj7xOk5T/4xvg7v43w==", + "version": "2.97.0", + "resolved": "https://registry.npmjs.org/@hpcc-js/comms/-/comms-2.97.0.tgz", + "integrity": "sha512-1AqZoYNDhb/zRLc/ZSj8eIrpOab1fUzMXFZYVYJvbDgO6dcwDn5cFMG+Y3g5m9Z/sc48E50S9lTZFFqQFZQOGg==", "dependencies": { "@hpcc-js/ddl-shim": "^2.21.0", "@hpcc-js/util": "^2.52.0", diff --git a/esp/src/package.json b/esp/src/package.json index f3f5270f0e7..2676b215461 100644 --- a/esp/src/package.json +++ b/esp/src/package.json @@ -44,7 +44,7 @@ "@hpcc-js/chart": "2.84.1", "@hpcc-js/codemirror": "2.63.0", "@hpcc-js/common": "2.72.0", - "@hpcc-js/comms": "2.96.1", + "@hpcc-js/comms": "2.97.0", "@hpcc-js/dataflow": "8.1.7", "@hpcc-js/eclwatch": "2.75.3", "@hpcc-js/graph": "2.86.0", diff --git a/esp/src/src-react/components/Logs.tsx b/esp/src/src-react/components/Logs.tsx index 0ec5a9b4699..a0ef17ee26b 100644 --- a/esp/src/src-react/components/Logs.tsx +++ b/esp/src/src-react/components/Logs.tsx @@ -3,7 +3,8 @@ import { CommandBar, ContextualMenuItemType, ICommandBarItemProps } from "@fluen import { GetLogsExRequest, LogaccessService, TargetAudience, LogType } from "@hpcc-js/comms"; import { Level, scopedLogger } from "@hpcc-js/util"; import nlsHPCC from "src/nlsHPCC"; -import { logColor, wuidToDate, wuidToTime } from "src/Utility"; +import { logColor, timestampToDate, wuidToDate, wuidToTime } from "src/Utility"; +import { useLoggingEngine } from "../hooks/platform"; import { HolyGrail } from "../layouts/HolyGrail"; import { pushParams } from "../util/history"; import { FluentGrid, useCopyButtons, useFluentStoreState, FluentColumns } from "./controls/Grid"; @@ -110,28 +111,49 @@ export const Logs: React.FunctionComponent = ({ const now = React.useMemo(() => new Date(), []); + const loggingEngine = useLoggingEngine(); + // Grid --- const columns = React.useMemo((): FluentColumns => { - return { - timestamp: { label: nlsHPCC.TimeStamp, width: 140, sortable: false, }, - message: { label: nlsHPCC.Message, width: 600, sortable: false, }, - components: { label: nlsHPCC.ContainerName, width: 150, sortable: false }, - instance: { label: nlsHPCC.PodName, width: 150, sortable: false }, - audience: { label: nlsHPCC.Audience, width: 60, sortable: false, }, - class: { - label: nlsHPCC.Class, width: 40, sortable: false, - formatter: level => { - const colors = logColor(levelMap(level)); - const styles = { backgroundColor: colors.background, padding: "2px 6px", color: colors.foreground }; - return {level}; - } + let retVal = { + timestamp: { + label: nlsHPCC.TimeStamp, width: 140, sortable: false, + formatter: ts => { + if (ts) { + if (ts.indexOf(":") < 0) { + return timestampToDate(ts).toISOString(); + } + return new Date(ts).toISOString(); + } + }, }, - workunits: { label: nlsHPCC.JobID, width: 50, sortable: false, hidden: wuid !== undefined, }, - processid: { label: nlsHPCC.ProcessID, width: 75, sortable: false, }, - logid: { label: nlsHPCC.Sequence, width: 70, sortable: false, }, - threadid: { label: nlsHPCC.ThreadID, width: 60, sortable: false, }, + message: { label: nlsHPCC.Message, width: 600, sortable: false, }, }; - }, [wuid]); + if (loggingEngine === "grafanacurl") { + retVal = Object.assign(retVal, { + pod: { label: nlsHPCC.PodName, width: 150, sortable: false }, + }); + } else { + retVal = Object.assign(retVal, { + instance: { label: nlsHPCC.PodName, width: 150, sortable: false }, + components: { label: nlsHPCC.ContainerName, width: 150, sortable: false }, + audience: { label: nlsHPCC.Audience, width: 60, sortable: false, }, + class: { + label: nlsHPCC.Class, width: 40, sortable: false, + formatter: level => { + const colors = logColor(levelMap(level)); + const styles = { backgroundColor: colors.background, padding: "2px 6px", color: colors.foreground }; + return {level}; + } + }, + workunits: { label: nlsHPCC.JobID, width: 50, sortable: false, hidden: wuid !== undefined, }, + processid: { label: nlsHPCC.ProcessID, width: 75, sortable: false, }, + logid: { label: nlsHPCC.Sequence, width: 70, sortable: false, }, + threadid: { label: nlsHPCC.ThreadID, width: 60, sortable: false, }, + }); + } + return retVal; + }, [loggingEngine, wuid]); const copyButtons = useCopyButtons(columns, selection, "logaccess"); diff --git a/esp/src/src-react/components/Queries.tsx b/esp/src/src-react/components/Queries.tsx index ff45b2a7f6d..a64847aff70 100644 --- a/esp/src/src-react/components/Queries.tsx +++ b/esp/src/src-react/components/Queries.tsx @@ -120,8 +120,8 @@ export const Queries: React.FunctionComponent = ({ headerTooltip: nlsHPCC.ErrorWarnings, width: 16, sortable: false, - formatter: (error) => { - if (error > 0) { + formatter: (error, row) => { + if (row.ErrorCount > 0) { return ; } return ""; @@ -133,8 +133,9 @@ export const Queries: React.FunctionComponent = ({ headerTooltip: nlsHPCC.MixedNodeStates, width: 16, sortable: false, - formatter: (mixed) => { - if (mixed === true) { + formatter: (mixed, row) => { + const mixedStates = row.Clusters.ClusterQueryState[0]?.MixedNodeStates ?? false; + if (mixedStates === true) { return ; } return ""; @@ -144,8 +145,8 @@ export const Queries: React.FunctionComponent = ({ headerIcon: "SkypeCircleCheck", headerTooltip: nlsHPCC.Active, width: 16, - formatter: (activated) => { - if (activated === true) { + formatter: (activated, row) => { + if (row.Activated === true) { return ; } return ""; diff --git a/esp/src/src-react/hooks/platform.ts b/esp/src/src-react/hooks/platform.ts index 66dac5d9487..4bc86caa2c4 100644 --- a/esp/src/src-react/hooks/platform.ts +++ b/esp/src/src-react/hooks/platform.ts @@ -2,7 +2,7 @@ import * as React from "react"; import { Octokit } from "octokit"; import { useConst } from "@fluentui/react-hooks"; import { scopedLogger } from "@hpcc-js/util"; -import { Topology, WsTopology, WorkunitsServiceEx } from "@hpcc-js/comms"; +import { LogaccessService, Topology, WsTopology, WorkunitsServiceEx } from "@hpcc-js/comms"; import { getBuildInfo, BuildInfo, fetchModernMode } from "src/Session"; import { cmake_build_type, containerized, ModernMode } from "src/BuildInfo"; import { sessionKeyValStore, userKeyValStore } from "src/KeyValStore"; @@ -10,6 +10,8 @@ import { Palette } from "@hpcc-js/common"; const logger = scopedLogger("src-react/hooks/platform.ts"); +export const service = new LogaccessService({ baseUrl: "" }); + declare const dojoConfig; export function useBuildInfo(): [BuildInfo, { isContainer: boolean, currencyCode: string, opsCategory: string }] { @@ -205,4 +207,14 @@ export function useModernMode(): { }, [modernMode, sessionStore, userStore]); return { modernMode, setModernMode }; -} \ No newline at end of file +} + +export function useLoggingEngine(): string { + const [loggingEngine, setLoggingEngine] = React.useState(""); + + React.useEffect(() => { + service.GetLogAccessInfo({}).then(response => setLoggingEngine(response.RemoteLogManagerType ?? "")); + }, []); + + return loggingEngine; +} \ No newline at end of file diff --git a/esp/src/src/Utility.ts b/esp/src/src/Utility.ts index cc78350ed6d..4b70d873129 100644 --- a/esp/src/src/Utility.ts +++ b/esp/src/src/Utility.ts @@ -1242,6 +1242,21 @@ export function format(labelTpl, obj) { .join("\n") ; } + +const TEN_TRILLION = 10000000000000; +export function nanosToMillis(timestamp: number): number { + if (timestamp > TEN_TRILLION) { + return Math.round(timestamp / 1000000); + } else { + return timestamp; + } +} + +export function timestampToDate(timestamp: number): Date { + const millis = nanosToMillis(timestamp); + return new Date(millis); +} + const theme = getTheme(); const { semanticColors } = theme; diff --git a/roxie/ccd/ccdmain.cpp b/roxie/ccd/ccdmain.cpp index 9d2680aa8ee..192e58a5670 100644 --- a/roxie/ccd/ccdmain.cpp +++ b/roxie/ccd/ccdmain.cpp @@ -888,6 +888,12 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml) udpTraceLevel = topology->getPropInt("@udpTraceLevel", runOnce ? 0 : 1); roxiemem::setMemTraceLevel(topology->getPropInt("@memTraceLevel", runOnce ? 0 : 1)); soapTraceLevel = topology->getPropInt("@soapTraceLevel", runOnce ? 0 : 1); + if (topology->hasProp("@soapLogSepString")) + { + StringBuffer tmpSepString; + topology->getProp("@soapLogSepString", tmpSepString); + setSoapSepString(tmpSepString.str()); + } miscDebugTraceLevel = topology->getPropInt("@miscDebugTraceLevel", 0); traceRemoteFiles = topology->getPropBool("@traceRemoteFiles", false); testAgentFailure = topology->getPropInt("expert/@testAgentFailure", testAgentFailure); diff --git a/system/jlib/jdebug.cpp b/system/jlib/jdebug.cpp index c2ba7d12313..76a4069b61d 100644 --- a/system/jlib/jdebug.cpp +++ b/system/jlib/jdebug.cpp @@ -835,6 +835,54 @@ static struct CNtKernelInformation } NtKernelFunctions; #endif +//=========================================================================== + +#ifndef _WIN32 + +static std::atomic gatheredGroup{false}; +static StringAttr cgroup; +static CriticalSection csgroupCs; +static const char * queryCGroup() +{ + if (!gatheredGroup) + { + CriticalBlock block(csgroupCs); + if (!gatheredGroup) + { + StringBuffer contents; + if (loadBinaryFile(contents, "/proc/self/cgroup", false)) + { + auto processLine = [](size_t len, const char * ln) + { + //Note ln points at the start of the line, but the line is not null terminated + switch (*ln) + { + case '0': + if (strncmp(ln, "0::/", 4) == 0) + { + //Format is 0::/ + //If not running in a container the "cgroup" may be something like user.slice/user-1000.slice/user@1000.service/.... + //If so ignore because it is not a real cgroup + if (!memchr(ln+4, '/', len-4)) + cgroup.set(ln+4, len-4); + } + break; + } + //Some systems with version 1 cgroups have :cpu,cpuacct:/ + const char * match = (const char *)jmemmem(len, ln, 14, ":cpu,cpuacct:/"); + if (match) + cgroup.set(match+14, (ln + len) - (match + 14)); + }; + + processLines(contents, processLine); + } + gatheredGroup = true; + } + } + return cgroup.get(); +} + +#endif //=========================================================================== @@ -912,6 +960,9 @@ SystemProcessInfo SystemProcessInfo::operator - (const SystemProcessInfo & rhs) result.activeDataMemory = activeDataMemory - rhs.activeDataMemory; result.majorFaults = majorFaults - rhs.majorFaults; result.numThreads = numThreads - rhs.numThreads; + result.numPeriods = numPeriods - rhs.numPeriods; + result.numThrottledPeriods = numThrottledPeriods - rhs.numThrottledPeriods; + result.timeThrottledNs = timeThrottledNs - rhs.timeThrottledNs; return result; } @@ -1014,7 +1065,7 @@ bool ProcessInfo::update(unsigned flags) if (loadBinaryFile(contents, "/proc/self/status", false)) { contextSwitches = 0; - auto processLine = [this](const char * cur) + auto processLine = [this](size_t len, const char * cur) { __uint64 value; switch (*cur) @@ -1089,7 +1140,7 @@ bool SystemInfo::update(unsigned flags) StringBuffer contents; if (loadBinaryFile(contents, "/proc/stat", false)) { - auto processLine = [this](const char * ln) + auto processLine = [this](size_t len, const char * ln) { switch (*ln) { @@ -1124,6 +1175,52 @@ bool SystemInfo::update(unsigned flags) processLines(contents, processLine); } + auto processLine = [this](size_t len, const char * ln) + { + switch (*ln) + { + case 'n': + if (strncmp(ln, "nr_periods ", 11) == 0) + numPeriods = strtod(ln+11, nullptr); + else if (strncmp(ln, "nr_throttled ", 13) == 0) + numThrottledPeriods = strtod(ln+13, nullptr); + break; + case 't': + if (strncmp(ln, "throttled_usec ", 15) == 0) + timeThrottledNs = strtod(ln+15, nullptr) * 1000; + else if (strncmp(ln, "throttled_time ", 15) == 0) + timeThrottledNs = strtod(ln+15, nullptr); + break; + } + }; + + bool done = false; + const char * cgroup = queryCGroup(); + if (cgroup) + { + //Version 2 of cgroups has the information in cgroup/ + VStringBuffer filename("/sys/fs/cgroup/%s/cpu.stat", cgroup); + if (loadBinaryFile(contents.clear(), filename.str(), false)) + { + processLines(contents, processLine); + done = true; + } + else + { + //Some systems with version 1 cgroups have the information in /sys/fs/cgroup/cpu//cpu.stat + filename.clear().appendf("/sys/fs/cgroup/cpu/%s/cpu.stat", cgroup); + if (loadBinaryFile(contents.clear(), filename.str(), false)) + { + processLines(contents, processLine); + done = true; + } + } + } + + //If the version 2 file was not found look for ther version 1 information in cgroup/cpu + if (!done && loadBinaryFile(contents.clear(), "/sys/fs/cgroup/cpu/cpu.stat", false)) + processLines(contents, processLine); + return true; #endif } @@ -2521,9 +2618,16 @@ class CExtendedStats // Disk network and cpu stats } if (totalcpu) { - if (out.length()&&(out.charAt(out.length()-1)!=' ')) - out.append(' '); + ensureSeparator(out, ' '); out.appendf("CPU: usr=%d sys=%d iow=%d idle=%d", deltacpu.getUserPercent(), deltacpu.getSystemPercent(), deltacpu.getIoWaitPercent(), deltacpu.getIdlePercent()); + + __uint64 periods = deltacpu.getNumPeriods(); + if (periods) + { + unsigned throttling = deltacpu.getNumThrottledPeriods() * 100 / periods; + __uint64 timeThrottledNs = deltacpu.getTimeThrottledNs(); + out.appendf(" thr=%u%% thrns=%llu", throttling, timeThrottledNs); + } } return true; } diff --git a/system/jlib/jdebug.hpp b/system/jlib/jdebug.hpp index 0a8a0dacfcc..a4d5d41c705 100644 --- a/system/jlib/jdebug.hpp +++ b/system/jlib/jdebug.hpp @@ -446,6 +446,10 @@ class jlib_decl SystemProcessInfo unsigned getSystemPercent() const; unsigned getUserPercent() const; + __uint64 getNumPeriods() const { return numPeriods; } + __uint64 getNumThrottledPeriods() const { return numThrottledPeriods; } + __uint64 getTimeThrottledNs() const { return timeThrottledNs; } + __uint64 getTotal() const { return user + system + idle + iowait; } protected: __uint64 user = 0; // user time in jiffies (~`1/100s) @@ -461,6 +465,9 @@ class jlib_decl SystemProcessInfo __uint64 activeDataMemory = 0; __uint64 majorFaults = 0; __uint64 numThreads = 0; + __uint64 numPeriods = 0; + __uint64 numThrottledPeriods = 0; + __uint64 timeThrottledNs = 0; }; class jlib_decl ProcessInfo : public SystemProcessInfo diff --git a/system/jlib/jfcmp.hpp b/system/jlib/jfcmp.hpp index 24e6029df94..f9749a067f4 100644 --- a/system/jlib/jfcmp.hpp +++ b/system/jlib/jfcmp.hpp @@ -230,6 +230,7 @@ class jlib_decl CFcmpExpander : public CExpanderBase size32_t outlen; size32_t bufalloc; const size32_t *in = nullptr; + const void * original = nullptr; public: CFcmpExpander() @@ -247,6 +248,7 @@ class jlib_decl CFcmpExpander : public CExpanderBase virtual size32_t init(const void *blk) { + original = blk; const size32_t *expsz = (const size32_t *)blk; outlen = *expsz; in = (expsz+1); diff --git a/system/jlib/jlz4.cpp b/system/jlib/jlz4.cpp index 6a854c39fbf..930b38fa02d 100644 --- a/system/jlib/jlz4.cpp +++ b/system/jlib/jlz4.cpp @@ -264,6 +264,13 @@ class jlib_decl CLZ4Expander : public CFcmpExpander size32_t written; if (szchunk+totalExpanded outlen) - throwUnexpected(); + { + VStringBuffer msg("Decompression expected max %u bytes, but now %u at block offset %u", outlen, maxOut, (size32_t)((const byte *)in - (const byte *)original)); + throwUnexpectedX(msg.str()); + } maxOut += szchunk; // Likely to quickly approach the actual expanded size target.clear(); diff --git a/system/jlib/jlzw.cpp b/system/jlib/jlzw.cpp index 6ff771eade6..6401657a7a9 100644 --- a/system/jlib/jlzw.cpp +++ b/system/jlib/jlzw.cpp @@ -2118,7 +2118,7 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface { unsigned code = e->errorCode(); StringBuffer msg; - e->errorMessage(msg).appendf(" at position %llu of %llu", nextExpansionPos, trailer.indexPos); + e->errorMessage(msg).appendf(" at uncompressed position %llu block %u of %llu", nextExpansionPos, curblocknum, trailer.indexPos); e->Release(); throw makeStringException(code, msg.str()); } @@ -2157,37 +2157,48 @@ class CCompressedFile : implements ICompressedFileIO, public CInterface void expand(const void *compbuf,MemoryBuffer &expbuf,size32_t expsize, offset_t compressedPos) { - size32_t rs = trailer.recordSize; - if (rs) { // diff expand - const byte *src = (const byte *)compbuf; - byte *dst = (byte *)expbuf.reserve(expsize); - if (expsize) { - assertex(expsize>=rs); - memcpy(dst,src,rs); - dst += rs; - src += rs; - expsize -= rs; - while (expsize) { + try + { + size32_t rs = trailer.recordSize; + if (rs) { // diff expand + const byte *src = (const byte *)compbuf; + byte *dst = (byte *)expbuf.reserve(expsize); + if (expsize) { assertex(expsize>=rs); - src += DiffExpand(src, dst, dst-rs, rs); - expsize -= rs; + memcpy(dst,src,rs); dst += rs; + src += rs; + expsize -= rs; + while (expsize) { + assertex(expsize>=rs); + src += DiffExpand(src, dst, dst-rs, rs); + expsize -= rs; + dst += rs; + } } } - } - else { // lzw or fastlz or lz4 - assertex(expander.get()); - size32_t exp = expander->expandFirst(expbuf, compbuf); - if (exp == 0) - { - unsigned numZeros = countZeros(trailer.blockSize, (const byte *)compbuf); - if (numZeros >= 16) - throw makeStringExceptionV(-1, "Unexpected zero fill in compressed file at position %llu length %u", compressedPos, numZeros); - } + else { // lzw or fastlz or lz4 + assertex(expander.get()); + size32_t exp = expander->expandFirst(expbuf, compbuf); + if (exp == 0) + { + unsigned numZeros = countZeros(trailer.blockSize, (const byte *)compbuf); + if (numZeros >= 16) + throw makeStringExceptionV(-1, "Unexpected zero fill in compressed file at position %llu length %u", compressedPos, numZeros); + } - startBlockPos = curblockpos; - nextExpansionPos = startBlockPos + exp; - fullBlockSize = expsize; + startBlockPos = curblockpos; + nextExpansionPos = startBlockPos + exp; + fullBlockSize = expsize; + } + } + catch (IException * e) + { + unsigned code = e->errorCode(); + StringBuffer msg; + e->errorMessage(msg).appendf(" at compressed position %llu of %llu", compressedPos, trailer.indexPos); + e->Release(); + throw makeStringException(code, msg.str()); } } diff --git a/system/jlib/jlzw.hpp b/system/jlib/jlzw.hpp index 1c3402d3318..da160d112e4 100644 --- a/system/jlib/jlzw.hpp +++ b/system/jlib/jlzw.hpp @@ -145,8 +145,8 @@ extern jlib_decl bool isCompressedFile(const char *filename); extern jlib_decl bool isCompressedFile(IFile *file); extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFile *file,IExpander *expander=NULL, bool memorymapped=false, IFEflags extraFlags=IFEnone); extern jlib_decl ICompressedFileIO *createCompressedFileReader(IFileIO *fileio,IExpander *expander=NULL); -extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL, unsigned compMethod=COMPRESS_METHOD_LZ4, IFEflags extraFlags=IFEnone); extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFileIO *fileio, bool append, size32_t recordsize,bool setcrc=true,ICompressor *compressor=NULL, unsigned compMethod=COMPRESS_METHOD_LZ4); +extern jlib_decl ICompressedFileIO *createCompressedFileWriter(IFile *file,size32_t recordsize,bool append=false,bool setcrc=true,ICompressor *compressor=NULL, unsigned compMethod=COMPRESS_METHOD_LZ4, IFEflags extraFlags=IFEnone); #define COMPRESSEDFILECRC (~0U) diff --git a/system/jlib/jstring.cpp b/system/jlib/jstring.cpp index 50951938ae5..7e581f2503a 100644 --- a/system/jlib/jstring.cpp +++ b/system/jlib/jstring.cpp @@ -2902,6 +2902,12 @@ void getSnakeCase(StringBuffer & out, const char * camelValue) } } +void ensureSeparator(StringBuffer & out, char separator) +{ + if (out.length() && (out.charAt(out.length()-1) != separator)) + out.append(separator); +} + /** * stristr - Case insensitive strstr() * @haystack: Where we will search for our @needle @@ -2949,3 +2955,31 @@ const char * stristr (const char *haystack, const char *needle) } return nullptr; } + + +const void * jmemmem(size_t lenHaystack, const void * haystack, size_t lenNeedle, const void *needle) +{ + if (lenNeedle == 0) + return haystack; + + if (lenHaystack < lenNeedle) + return nullptr; + + const char * search = (const char *)needle; + char first = *search; + if (lenNeedle == 1) + return memchr(haystack, first, lenHaystack); + + const char * buffer = (const char *)haystack; + for (size_t i = 0; i <= lenHaystack - lenNeedle; i++) + { + //Special case the first character to avoid a function call each iteration. + if (buffer[i] == first) + { + if (memcmp(buffer + i + 1, search + 1, lenNeedle-1) == 0) + return buffer + i; + } + } + + return nullptr; +} diff --git a/system/jlib/jstring.hpp b/system/jlib/jstring.hpp index b3fe7651daf..d61b7fb7a1f 100644 --- a/system/jlib/jstring.hpp +++ b/system/jlib/jstring.hpp @@ -632,11 +632,18 @@ void processLines(const StringBuffer & content, LineProcessor process) const char * cur = content; while (*cur) { - process(cur); const char * next = strchr(cur, '\n'); - if (!next) + if (next) + { + if (next != cur) + process(next-cur, cur); + cur = next+1; + } + else + { + process(strlen(cur), cur); break; - cur = next+1; + } } } @@ -646,5 +653,10 @@ extern jlib_decl void processOptionString(const char * options, optionCallback c extern jlib_decl const char * stristr(const char *haystack, const char *needle); extern jlib_decl void getSnakeCase(StringBuffer & out, const char * camelValue); +//If the string has any characters, ensure the last character matches the separator +extern jlib_decl void ensureSeparator(StringBuffer & out, char separator); + +//Search for one block of bytes within another block of bytes - memmem is not standard, so we provide our own +extern jlib_decl const void * jmemmem(size_t lenHaystack, const void * haystack, size_t lenNeedle, const void *needle); #endif diff --git a/testing/unittests/CMakeLists.txt b/testing/unittests/CMakeLists.txt index 73571bee22d..59498fb0d2f 100644 --- a/testing/unittests/CMakeLists.txt +++ b/testing/unittests/CMakeLists.txt @@ -46,6 +46,7 @@ set ( SRCS accessmaptests.cpp fxpptests.cpp espapicmdtests.cpp + filetests.cpp ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/xpp/fxpp/FragmentedXmlPullParser.cpp ${HPCC_SOURCE_DIR}/esp/bindings/SOAP/xpp/fxpp/FragmentedXmlAssistant.cpp ${CMAKE_BINARY_DIR}/generated/ws_loggingservice_esp.cpp diff --git a/testing/unittests/filetests.cpp b/testing/unittests/filetests.cpp new file mode 100644 index 00000000000..c2c29ea718e --- /dev/null +++ b/testing/unittests/filetests.cpp @@ -0,0 +1,185 @@ +/*############################################################################## + + HPCC SYSTEMS software Copyright (C) 2024 HPCC Systems®. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +############################################################################## */ + +/* + * File regression tests + * + */ + +#ifdef _USE_CPPUNIT +#include +#include +#include +#include "jsem.hpp" +#include "jfile.hpp" +#include "jdebug.hpp" +#include "jset.hpp" +#include "rmtfile.hpp" +#include "jlzw.hpp" +#include "jqueue.hpp" +#include "jregexp.hpp" +#include "jsecrets.hpp" +#include "jutil.hpp" +#include "junicode.hpp" + +#include "opentelemetry/sdk/common/attribute_utils.h" +#include "opentelemetry/sdk/resource/resource.h" + +#include "unittests.hpp" + +#define CPPUNIT_ASSERT_EQUAL_STR(x, y) CPPUNIT_ASSERT_EQUAL(std::string(x ? x : ""),std::string(y ? y : "")) + +class JlibFileTest : public CppUnit::TestFixture +{ +public: + CPPUNIT_TEST_SUITE(JlibFileTest); + CPPUNIT_TEST(testCompressed); + CPPUNIT_TEST(cleanup); + CPPUNIT_TEST_SUITE_END(); + + static constexpr const char * testFilename = "unittests_compressfile"; + void createCompressed() + { + Owned file(createIFile(testFilename)); + Owned io(createCompressedFileWriter(file, 0, false, false, nullptr)); + + constexpr size_t cnt = 10000; + constexpr size_t size = 1000; + offset_t pos = 0; + for (unsigned i = 0; i < cnt; i++) + { + byte temp[size]; + + for (unsigned j = 0; j < size; j += 4) + { + temp[j] = (byte)j; + temp[j+1] = (byte)j+1; + temp[j+2] = (byte)j+2; + temp[j+3] = (byte)random(); + } + + io->write(pos, size, temp); + pos += size; + } + } + void readCompressed(bool errorExpected) + { + bool success = false; + try + { + Owned file(createIFile(testFilename)); + Owned io(createCompressedFileReader(file)); + + constexpr size_t cnt = 10000; + constexpr size_t size = 1000; + offset_t pos = 0; + for (unsigned i = 0; i < cnt; i++) + { + byte temp[size]; + + io->read(pos, size, temp); + + for (unsigned j = 0; j < size; j += 4) + { + CPPUNIT_ASSERT_EQUAL(temp[j], (byte)j); + CPPUNIT_ASSERT_EQUAL(temp[j+1], (byte)(j+1)); + } + + pos += size; + } + + success = true; + } + catch (IException *e) + { + if (errorExpected) + { + DBGLOG(e, "Expected error reading compressed file:"); + } + else + { + StringBuffer msg("Unexpected error reading compressed file:"); + e->errorMessage(msg); + CPPUNIT_FAIL(msg.str()); + } + e->Release(); + } + if (success && errorExpected) + CPPUNIT_FAIL("Expected error reading compressed file"); + } + void read(offset_t offset, size32_t size, void * data) + { + Owned file(createIFile(testFilename)); + Owned io(file->open(IFOread)); + io->read(offset, size, data); + } + void write(offset_t offset, size32_t size, void * data) + { + Owned file(createIFile(testFilename)); + Owned io(file->open(IFOwrite)); + io->write(offset, size, data); + } + void testCompressed() + { + //patch the first block with zeros + constexpr byte zeros[0x100000] = { 0 }; + + createCompressed(); + readCompressed(false); + + write(0, sizeof(zeros), (void *)zeros); + readCompressed(true); + + createCompressed(); + write(0x10000, sizeof(zeros), (void *)zeros); + readCompressed(true); + + createCompressed(); + write(0x9000, sizeof(zeros), (void *)zeros); + readCompressed(true); + + //Test the second block being corrupted with zeros + size32_t firstBlockSize = 0; + createCompressed(); + read(4, sizeof(firstBlockSize), &firstBlockSize); + write(8+firstBlockSize, sizeof(zeros), (void *)zeros); + readCompressed(true); + + //Test the data after the second block being corrupted with zeros + createCompressed(); + read(4, sizeof(firstBlockSize), &firstBlockSize); + write(8+4+firstBlockSize, sizeof(zeros), (void *)zeros); + readCompressed(true); + + //Test the second block being corrupted to an invalid size + size32_t newSize = 1; + createCompressed(); + read(4, sizeof(firstBlockSize), &firstBlockSize); + write(8+firstBlockSize, sizeof(newSize), &newSize); + readCompressed(true); + } + void cleanup() + { + Owned file(createIFile(testFilename)); + file->remove(); + } +}; + +CPPUNIT_TEST_SUITE_REGISTRATION( JlibFileTest ); +CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( JlibFileTest, "JlibFileTest" ); + +#endif diff --git a/testing/unittests/jlibtests.cpp b/testing/unittests/jlibtests.cpp index 85840b34d34..195a34c3705 100644 --- a/testing/unittests/jlibtests.cpp +++ b/testing/unittests/jlibtests.cpp @@ -3413,6 +3413,7 @@ class MachineInfoTimingTest : public CppUnit::TestFixture DBGLOG(" Process: User(%u) System(%u) Total(%u) %u%% Ctx(%" I64F "u)", (unsigned)(deltaProcess.getUserNs() / 1000000), (unsigned)(deltaProcess.getSystemNs() / 1000000), (unsigned)(deltaProcess.getTotalNs() / 1000000), (unsigned)((deltaProcess.getUserNs() * 100) / deltaSystem.getTotalNs()), deltaProcess.getNumContextSwitches()); + DBGLOG(" Throttled: Periods(%llu/%llu) TimeNs(%llu)", deltaSystem.getNumThrottledPeriods(), deltaSystem.getNumPeriods(), deltaSystem.getTimeThrottledNs()); } for (unsigned j=0; j < i*100000000; j++) @@ -4653,6 +4654,7 @@ class JLibStringTest : public CppUnit::TestFixture public: CPPUNIT_TEST_SUITE(JLibStringTest); CPPUNIT_TEST(testStristr); + CPPUNIT_TEST(testMemMem); CPPUNIT_TEST_SUITE_END(); void testStristr() @@ -4670,6 +4672,21 @@ class JLibStringTest : public CppUnit::TestFixture CPPUNIT_ASSERT_EQUAL_STR(stristr("", "ABC"), ""); CPPUNIT_ASSERT_EQUAL_STR(stristr("ABC", ""), ""); } + + void testMemMem() + { + constexpr const char * haystack = "abcdefghijklmnopqrstuvwxyz"; + CPPUNIT_ASSERT_EQUAL((const void*)(haystack), jmemmem(10, haystack, 0, nullptr)); + CPPUNIT_ASSERT_EQUAL((const void*)(haystack), jmemmem(10, haystack, 3, "abc")); + CPPUNIT_ASSERT_EQUAL((const void*)(haystack), jmemmem(3, haystack, 3, "abc")); + CPPUNIT_ASSERT_EQUAL((const void*)nullptr, jmemmem(2, haystack, 3, "abc")); + CPPUNIT_ASSERT_EQUAL((const void*)(haystack+7), jmemmem(10, haystack, 3, "hij")); + CPPUNIT_ASSERT_EQUAL((const void*)nullptr, jmemmem(10, haystack, 3, "ijk")); + CPPUNIT_ASSERT_EQUAL((const void*)(haystack+8), jmemmem(10, haystack, 1, "i")); + CPPUNIT_ASSERT_EQUAL((const void*)(nullptr), jmemmem(8, haystack, 1, "i")); + CPPUNIT_ASSERT_EQUAL((const void*)(nullptr), jmemmem(9, haystack, 2, "ij")); + CPPUNIT_ASSERT_EQUAL((const void*)(haystack+8), jmemmem(10, haystack, 2, "ij")); + } }; CPPUNIT_TEST_SUITE_REGISTRATION( JLibStringTest ); diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index 414b2ec22e2..0b994cd1af3 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2750,6 +2750,9 @@ void CJobBase::init() failOnLeaks = getOptBool(THOROPT_FAIL_ON_LEAKS); maxLfnBlockTimeMins = getOptInt(THOROPT_MAXLFN_BLOCKTIME_MINS, DEFAULT_MAXLFN_BLOCKTIME_MINS); soapTraceLevel = getOptInt(THOROPT_SOAP_TRACE_LEVEL, 1); + StringBuffer tmpSepString; + getOpt(THOROPT_SOAP_LOG_SEP_STRING, tmpSepString); + setSoapSepString(tmpSepString.str()); StringBuffer tracing("maxActivityCores = "); if (maxActivityCores) diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 260b58a5b55..174dc5d6f1f 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -118,6 +118,7 @@ #define THOROPT_MEMORY_SPILL_AT "memorySpillAt" // The threshold (%) that roxiemem will request memory to be reduced (default=80) #define THOROPT_FAIL_ON_LEAKS "failOnLeaks" // If any leaks are detected at the end of graph, fail the query (default=false) #define THOROPT_SOAP_TRACE_LEVEL "soapTraceLevel" // The trace SOAP level (default=1) +#define THOROPT_SOAP_LOG_SEP_STRING "soapLogSepString" // The SOAP request/response separator string for logging (default="") #define THOROPT_SORT_ALGORITHM "sortAlgorithm" // The algorithm used to sort records (quicksort/mergesort) #define THOROPT_COMPRESS_ALLFILES "v9_4_compressAllOutputs" // Compress all output files (default: bare-metal=off, cloud=on) #define THOROPT_AVOID_RENAME "avoidRename" // Avoid rename, write directly to target physical filenames (no temp file)