diff --git a/.github/workflows/jirabot.yml b/.github/workflows/jirabot.yml index 277f6e7afb7..2f1db9c80f2 100644 --- a/.github/workflows/jirabot.yml +++ b/.github/workflows/jirabot.yml @@ -38,33 +38,61 @@ jobs: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GHUB_JIRA_USER_MAP: ${{ vars.GHUB_JIRA_USER_MAP }} JIRA_ISSUE_PROPERTY_MAP: ${{ vars.JIRA_ISSUE_PROPERTY_MAP }} - JIRA_ISSUE_TRANSITION_MAP: ${{ vars.JIRA_ISSUE_TRANSITION_MAP }} run: | import os import re import time import sys import json + import subprocess + from email.utils import parseaddr from atlassian.jira import Jira - def updateIssue(jira, issue, prAuthor : str, transitionMap: dict, propertyMap: dict, pull_url: str) -> str: + def sanitizeInput(input: str, inputType: str) -> str: + if inputType.lower() == 'email': + # Return the email address only, returns '' if not valid or found + return parseaddr(input)[1] + else: + return '' + + def updateIssue(jira, issue, prAuthor : str, propertyMap: dict, pull_url: str) -> str: result = '' issueName = issue['key'] issueFields = issue['fields'] - statusName = str(issueFields['status']['name']) - transition = transitionMap.get(statusName, None) + # Need to update user first in case we are starting from Unresourced + if prAuthor: + assignee = issueFields['assignee'] + if assignee is None: + assigneeId = '' + assigneeEmail = '' + else: + assigneeId = assignee['accountId'] + assigneeEmail = assignee["emailAddress"] + + assigneeEmail = sanitizeInput(assigneeEmail, 'email') + + prAuthorId = prAuthor["accountId"] + prAuthorEmail = prAuthor["emailAddress"] + prAuthorEmail = sanitizeInput(prAuthorEmail, 'email') + + if assigneeId is None or assigneeId == '': + jira.assign_issue(issueName, prAuthorId) + result += 'Assigning user: ' + prAuthorEmail + '\n' + elif assigneeId != prAuthorId: + result += 'Changing assignee from: ' + assigneeEmail + ' to: ' + prAuthorEmail + '\n' + jira.assign_issue(issueName, prAuthorId) - if transition == None: - print('Error: Unable to find transition for status: ' + statusName) - elif transition != '': + transitionFlow = ['Merge Pending'] + for desiredStatus in transitionFlow: try: - jira.issue_transition(issueName, transition) - result += 'Workflow Transition: ' + transition + '\n' + transitionId = jira.get_transition_id_to_status_name(issueName, desiredStatus) + jira.set_issue_status_by_transition_id(issueName, transitionId) + result += 'Workflow Transition To: ' + desiredStatus + '\n' except Exception as error: transitions = jira.get_issue_transitions(issueName) - result += 'Error: Transition: "' + transition + '" failed with: "' + str(error) + '" Valid transitions=' + str(transitions) + '\n' + result += 'Error: Transitioning to: "' + desiredStatus + '" failed with: "' + str(error) + '" Valid transitions=' + str(transitions) + '\n' prFieldName = propertyMap.get('pullRequestFieldName', 'customfield_10010') @@ -80,24 +108,6 @@ jobs: elif currentPR is not None and currentPR != pull_url: result += 'Additional PR: ' + pull_url + '\n' - if prAuthor: - assignee = issueFields['assignee'] - if assignee is None: - assigneeId = '' - assigneeEmail = '' - else: - assigneeId = assignee['accountId'] - assigneeEmail = assignee["emailAddress"] - - prAuthorId = prAuthor["accountId"] - prAuthorEmail = prAuthor["emailAddress"] - if assigneeId is None or assigneeId == '': - jira.assign_issue(issueName, prAuthorId) - result += 'Assigning user: ' + prAuthorEmail + '\n' - elif assigneeId != prAuthorId: - result += 'Changing assignee from: ' + assigneeEmail + ' to: ' + prAuthorEmail + '\n' - jira.assign_issue(issueName, prAuthorId) - return result jirabot_user = os.environ['JIRABOT_USERNAME'] @@ -110,7 +120,6 @@ jobs: github_token = os.environ['GITHUB_TOKEN'] comments_url = os.environ['COMMENTS_URL'] - print("%s %s %s" % (title, prAuthor, comments_url)) result = '' issuem = re.search("(HPCC|HH|IDE|EPE|ML|HPCC4J|JAPI)-[0-9]+", title) if issuem: @@ -131,7 +140,7 @@ jobs: if userSearchResults and len(userSearchResults) > 0: jiraUser = userSearchResults[0] else: - print('Error: Unable to find Jira user: ' + prAuthor + ' continuing without assigning') + print('Error: Unable to map GitHub user to Jira user, continuing without assigning') if not jira.issue_exists(issue_name): sys.exit('Error: Unable to find Jira issue: ' + issue_name) @@ -140,17 +149,12 @@ jobs: result = 'Jirabot Action Result:\n' - transitionMap = json.loads(os.environ['JIRA_ISSUE_TRANSITION_MAP']) - if not isinstance(transitionMap, dict): - print('Error: JIRA_ISSUE_TRANSITION_MAP is not a valid JSON object, ignoring.') - transitionMap = {} - jiraIssuePropertyMap = json.loads(os.environ['JIRA_ISSUE_PROPERTY_MAP']) if not isinstance(jiraIssuePropertyMap, dict): print('Error: JIRA_ISSUE_PROPERTY_MAP is not a valid JSON object, ignoring.') jiraIssuePropertyMap = {} - result += updateIssue(jira, issue, jiraUser, transitionMap, jiraIssuePropertyMap, pull_url) + result += updateIssue(jira, issue, jiraUser, jiraIssuePropertyMap, pull_url) jira.issue_add_comment(issue_name, result) result = 'Jira Issue: ' + jira_url + '/browse/' + issue_name + '\n\n' + result @@ -158,9 +162,7 @@ jobs: # Escape the result for JSON result = json.dumps(result) - curlCommand = 'curl -X POST %s -H "Content-Type: application/json" -H "Authorization: token %s" --data \'{ "body": %s }\'' % ( comments_url, github_token, result ) - print(curlCommand) - os.system(curlCommand) + subprocess.run(['curl', '-X', 'POST', comments_url, '-H', 'Content-Type: application/json', '-H', f'Authorization: token {github_token}', '--data', f'{{ "body": {result} }}'], check=True) else: print('Unable to find Jira issue name in title') diff --git a/common/remote/rmtssh.cpp b/common/remote/rmtssh.cpp index 86682580c7e..fe08594b4ad 100644 --- a/common/remote/rmtssh.cpp +++ b/common/remote/rmtssh.cpp @@ -422,6 +422,8 @@ class CFRunSSH: public CInterface, implements IFRunSSH printf("%s\n",cmdline.str()); else { Owned pipe = createPipeProcess(); + // reset LD_LIBRARY_PATH here so ssh cmd itself doesn't use HPCC libssl/crypto as they may be different + pipe->setenv("LD_LIBRARY_PATH", ":"); if (pipe->run((verbose&&!usepssh)?"FRUNSSH":NULL,cmdline.str(),workdir, useplink, // for some reason plink needs input handle true,true)) { diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index ba694168327..04602deab91 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -4341,6 +4341,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { return c->getStateEx(str); } virtual __int64 getAgentSession() const { return c->getAgentSession(); } + virtual __int64 getEngineSession() const + { return c->getEngineSession(); } virtual unsigned getAgentPID() const { return c->getAgentPID(); } virtual const char *queryStateDesc() const @@ -4499,6 +4501,8 @@ class CLockedWorkUnit : implements ILocalWorkUnit, implements IExtendedWUInterfa { c->setStateEx(text); } virtual void setAgentSession(__int64 sessionId) { c->setAgentSession(sessionId); } + virtual void setEngineSession(__int64 sessionId) + { c->setEngineSession(sessionId); } virtual void setStatistic(StatisticCreatorType creatorType, const char * creator, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * optDescription, unsigned __int64 value, unsigned __int64 count, unsigned __int64 maxValue, StatsMergeAction mergeAction) { c->setStatistic(creatorType, creator, scopeType, scope, kind, optDescription, value, count, maxValue, mergeAction); } virtual void setTracingValue(const char * propname, const char * value) @@ -5922,9 +5926,11 @@ void CWorkUnitFactory::clearAborting(const char *wuid) } } -void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent) +void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &state, SessionId sessionId, const char *sessionText) { - WARNLOG("reportAbnormalTermination: session stopped unexpectedly: %" I64F "d state: %d", (__int64) agent, (int) state); + StringBuffer sessionMessage(sessionText); + sessionMessage.appendf(" session stopped unexpectedly [sessionId=%" I64F "d]", (__int64) sessionId); + WARNLOG("reportAbnormalTermination: %s - state: %d", sessionText, (int) state); bool isEcl = false; switch (state) { @@ -5941,7 +5947,10 @@ void CWorkUnitFactory::reportAbnormalTermination(const char *wuid, WUState &stat wu->setState(state); Owned e = wu->createException(); e->setExceptionCode(isEcl ? 1001 : 1000); - e->setExceptionMessage(isEcl ? "EclCC terminated unexpectedly" : "Workunit terminated unexpectedly"); + StringBuffer exceptionText; + exceptionText.append(isEcl ? "EclCC terminated unexpectedly" : "Workunit terminated unexpectedly"); + exceptionText.append(" (").append(sessionMessage).append(")"); + e->setExceptionMessage(exceptionText); } static CriticalSection deleteDllLock; @@ -6426,8 +6435,11 @@ class CDaliWorkUnitFactory : public CWorkUnitFactory, implements IDaliClientShut LocalIAbortHandler abortHandler(*waiter); if (conn) { - SessionId agent = -1; + SessionId agentSessionID = -1; + SessionId engineSessionID = -1; bool agentSessionStopped = false; + bool engineSessionStopped = false; + bool queryRuntimeSessionStopped = false; unsigned start = msTick(); for (;;) { @@ -6456,22 +6468,37 @@ class CDaliWorkUnitFactory : public CWorkUnitFactory, implements IDaliClientShut case WUStateAborting: if (agentSessionStopped) { - reportAbnormalTermination(wuid, ret, agent); + reportAbnormalTermination(wuid, ret, agentSessionID, "Agent"); + return ret; + } + if (engineSessionStopped) + { + reportAbnormalTermination(wuid, ret, engineSessionID, "Engine"); return ret; } if (queryDaliServerVersion().compare("2.1")>=0) { - agent = conn->queryRoot()->getPropInt64("@agentSession", -1); - if((agent>0) && querySessionManager().sessionStopped(agent, 0)) + agentSessionID = conn->queryRoot()->getPropInt64("@agentSession", -1); + if((agentSessionID>0) && querySessionManager().sessionStopped(agentSessionID, 0)) { agentSessionStopped = true; conn->reload(); continue; } + engineSessionID = conn->queryRoot()->getPropInt64("@engineSession", -1); + if((engineSessionID>0) && querySessionManager().sessionStopped(engineSessionID, 0)) + { + engineSessionStopped = true; + conn->reload(); + continue; + } } break; } - agentSessionStopped = false; // reset for state changes such as WUStateWait then WUStateRunning again + // reset for state changes such as WUStateWait then WUStateRunning again + agentSessionStopped = false; + engineSessionStopped = false; + unsigned waited = msTick() - start; if (timeout==-1 || waited + 20000 < timeout) { @@ -7698,6 +7725,12 @@ void CLocalWorkUnit::setAgentSession(__int64 sessionId) p->setPropInt64("@agentSession", sessionId); } +void CLocalWorkUnit::setEngineSession(__int64 sessionId) +{ + CriticalBlock block(crit); + p->setPropInt64("@engineSession", sessionId); +} + bool CLocalWorkUnit::getIsQueryService() const { CriticalBlock block(crit); @@ -7799,6 +7832,12 @@ __int64 CLocalWorkUnit::getAgentSession() const return p->getPropInt64("@agentSession", -1); } +__int64 CLocalWorkUnit::getEngineSession() const +{ + CriticalBlock block(crit); + return p->getPropInt64("@engineSession", -1); +} + unsigned CLocalWorkUnit::getAgentPID() const { CriticalBlock block(crit); diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index 4cd299961e6..ae94e98d447 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1286,6 +1286,7 @@ interface IConstWorkUnit : extends IConstWorkUnitInfo virtual IStringVal & getWorkunitDistributedAccessToken(IStringVal & datoken) const = 0; virtual IStringVal & getStateEx(IStringVal & str) const = 0; virtual __int64 getAgentSession() const = 0; + virtual __int64 getEngineSession() const = 0; virtual unsigned getAgentPID() const = 0; virtual IConstWUResult * getTemporaryByName(const char * name) const = 0; virtual IConstWUResultIterator & getTemporaries() const = 0; @@ -1376,6 +1377,7 @@ interface IWorkUnit : extends IConstWorkUnit virtual void setState(WUState state) = 0; virtual void setStateEx(const char * text) = 0; // Indicates why blocked virtual void setAgentSession(__int64 sessionId) = 0; + virtual void setEngineSession(__int64 sessionId) = 0; virtual void setStatistic(StatisticCreatorType creatorType, const char * creator, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * optDescription, unsigned __int64 value, unsigned __int64 count, unsigned __int64 maxValue, StatsMergeAction mergeAction) = 0; virtual void setTracingValue(const char * propname, const char * value) = 0; virtual void setTracingValueInt(const char * propname, int value) = 0; diff --git a/common/workunit/workunit.ipp b/common/workunit/workunit.ipp index 8ac4acbc0aa..e5f3c2edd0d 100644 --- a/common/workunit/workunit.ipp +++ b/common/workunit/workunit.ipp @@ -257,6 +257,7 @@ public: virtual WUState getState() const; virtual IStringVal & getStateEx(IStringVal & str) const; virtual __int64 getAgentSession() const; + virtual __int64 getEngineSession() const; virtual unsigned getAgentPID() const; virtual const char *queryStateDesc() const; virtual IConstWUResult * getTemporaryByName(const char * name) const; @@ -336,6 +337,7 @@ public: void setState(WUState state); void setStateEx(const char * text); void setAgentSession(__int64 sessionId); + void setEngineSession(__int64 sessionId); bool setDistributedAccessToken(const char * user); void setStatistic(StatisticCreatorType creatorType, const char * creator, StatisticScopeType scopeType, const char * scope, StatisticKind kind, const char * optDescription, unsigned __int64 value, unsigned __int64 count, unsigned __int64 maxValue, StatsMergeAction mergeAction); void setTracingValue(const char * propname, const char * value); @@ -624,7 +626,7 @@ public: } protected: - void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent); + void reportAbnormalTermination(const char *wuid, WUState &state, SessionId agent, const char *sessionText); // These need to be implemented by the derived classes virtual CLocalWorkUnit* _createWorkUnit(const char *wuid, ISecManager *secmgr, ISecUser *secuser) = 0; diff --git a/dali/base/daclient.cpp b/dali/base/daclient.cpp index f8adc17e90a..7a55e473dc1 100644 --- a/dali/base/daclient.cpp +++ b/dali/base/daclient.cpp @@ -270,6 +270,8 @@ void connectLogMsgManagerToDali() void disconnectLogMsgManagerFromDali() { + if (isContainerized()) + return; // we do not redirect logging between components in containerized environments (this is used for audit->dali in BM) disconnectLogMsgManagerFromParentOwn(daliClientLoggingParent); daliClientLoggingParent = 0; } diff --git a/dali/base/dacsds.cpp b/dali/base/dacsds.cpp index 3a23fdf5e4e..8577ad9ac6f 100644 --- a/dali/base/dacsds.cpp +++ b/dali/base/dacsds.cpp @@ -38,8 +38,6 @@ static unsigned clientThrottleLimit; static unsigned clientThrottleDelay; -static ISDSManager *SDSManager=NULL; - static CriticalSection SDScrit; #define CHECK_CONNECTED(XSTR) \ @@ -1290,6 +1288,12 @@ CClientSDSManager::CClientSDSManager() } CClientSDSManager::~CClientSDSManager() +{ + closedown(); + ::Release(properties); +} + +void CClientSDSManager::closedown() { CriticalBlock block(connections.crit); SuperHashIteratorOf iter(connections.queryBaseTable()); @@ -1298,7 +1302,6 @@ CClientSDSManager::~CClientSDSManager() CRemoteConnection &conn = (CRemoteConnection &) iter.query(); conn.setConnected(false); } - ::Release(properties); } bool CClientSDSManager::sendRequest(CMessageBuffer &mb, bool throttle) @@ -2236,32 +2239,59 @@ bool CClientSDSManager::updateEnvironment(IPropertyTree *newEnv, bool forceGroup ////////////// +static ISDSManager * activeSDSManager=NULL; +static ISDSManager * savedSDSManager=NULL; + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + return true; +} +MODULE_EXIT() +{ + delete activeSDSManager; + activeSDSManager = nullptr; + delete savedSDSManager; + savedSDSManager = nullptr; +} + ISDSManager &querySDS() { CriticalBlock block(SDScrit); - if (SDSManager) - return *SDSManager; + if (activeSDSManager) + return *activeSDSManager; else if (!queryCoven().inCoven()) { - if (!SDSManager) - SDSManager = new CClientSDSManager(); + if (!activeSDSManager) + activeSDSManager = new CClientSDSManager(); - return *SDSManager; + return *activeSDSManager; } else { - SDSManager = &querySDSServer(); - return *SDSManager; + activeSDSManager = &querySDSServer(); + return *activeSDSManager; } } void closeSDS() { CriticalBlock block(SDScrit); - if (SDSManager) { + + //In roxie this is called when connection to dali is lost, but other threads can still be processing + //CRemoteConnections (see HPCC-32410), which uses an ISDSManager member - accessing a stale manager. + //There can be similar issues at closedown if threads have not been cleaned up properly. + //Do not delete the active SDS manager immediately - save it so that it is deleted on the next call/closedown. + ISDSManager * toDelete = savedSDSManager; + savedSDSManager = activeSDSManager; + activeSDSManager = nullptr; + if (savedSDSManager || toDelete) + { assertex(!queryCoven().inCoven()); // only called by client - try { - delete SDSManager; + try + { + if (savedSDSManager) + savedSDSManager->closedown(); + delete toDelete; } catch (IMP_Exception *e) { @@ -2270,11 +2300,11 @@ void closeSDS() EXCLOG(e, "closeSDS"); e->Release(); } - catch (IDaliClient_Exception *e) { + catch (IDaliClient_Exception *e) + { if (e->errorCode()!=DCERR_server_closed) throw; e->Release(); } - SDSManager = NULL; } } diff --git a/dali/base/dacsds.ipp b/dali/base/dacsds.ipp index e4b9590b955..3bda3dc649d 100644 --- a/dali/base/dacsds.ipp +++ b/dali/base/dacsds.ipp @@ -418,6 +418,7 @@ public: virtual void setConfigOpt(const char *opt, const char *value); virtual unsigned queryCount(const char *xpath); virtual bool updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response); + virtual void closedown() override; private: void noteDisconnected(CRemoteConnection &connection); diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index 48ec2a1bb63..faa42e68a9e 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -3839,6 +3839,9 @@ protected: friend class CDistributedFilePart; unsigned nc = fdesc->numClusters(); if (nc) { + unsigned flags = 0; + if (FileDescriptorFlags::none != (FileDescriptorFlags::foreign & fdesc->getFlags())) + flags = IFDSF_FOREIGN_GROUP; for (unsigned i=0;iqueryClusterGroup(i), fdesc->queryPartDiskMapping(i), - &queryNamedGroupStore() + &queryNamedGroupStore(), + flags ); if (!cluster->queryGroup(&queryNamedGroupStore())) @@ -7466,8 +7470,20 @@ class CNamedGroupIterator: implements INamedGroupIterator, public CInterface public: IMPLEMENT_IINTERFACE; CNamedGroupIterator(IRemoteConnection *_conn,IGroup *_matchgroup=NULL,bool _exactmatch=false) - : conn(_conn), matchgroup(_matchgroup) + : conn(_conn) { + if (_matchgroup) + { + // the matchgroup may contain ports, but they are never part of published groups and are not to be used for matching + SocketEndpointArray epa; + for (unsigned i=0; i<_matchgroup->ordinality(); i++) + { + SocketEndpoint ep = _matchgroup->queryNode(i).endpoint(); + ep.port = 0; + epa.append(ep); + } + matchgroup.setown(createIGroup(epa)); + } exactmatch = _exactmatch; pe.setown(conn->queryRoot()->getElements("Group")); } diff --git a/dali/base/dafdesc.cpp b/dali/base/dafdesc.cpp index 825833e086a..b03967a19c2 100644 --- a/dali/base/dafdesc.cpp +++ b/dali/base/dafdesc.cpp @@ -417,7 +417,7 @@ struct CClusterInfo: implements IClusterInfo, public CInterface name.clear(); } StringBuffer gname; - if (resolver->find(group,gname,true)||(group->ordinality()>1)) + if (resolver->find(group,gname,!foreignGroup)||(group->ordinality()>1)) name.set(gname); } } @@ -451,11 +451,13 @@ struct CClusterInfo: implements IClusterInfo, public CInterface checkClusterName(resolver); } - CClusterInfo(const char *_name,IGroup *_group,const ClusterPartDiskMapSpec &_mspec,INamedGroupStore *resolver) + CClusterInfo(const char *_name,IGroup *_group,const ClusterPartDiskMapSpec &_mspec,INamedGroupStore *resolver,unsigned flags) : name(_name),group(_group) { name.toLowerCase(); mspec =_mspec; + if (flags & IFDSF_FOREIGN_GROUP) + foreignGroup = true; checkClusterName(resolver); checkStriped(); } @@ -617,9 +619,10 @@ struct CClusterInfo: implements IClusterInfo, public CInterface IClusterInfo *createClusterInfo(const char *name, IGroup *grp, const ClusterPartDiskMapSpec &mspec, - INamedGroupStore *resolver) + INamedGroupStore *resolver, + unsigned flags) { - return new CClusterInfo(name,grp,mspec,resolver); + return new CClusterInfo(name,grp,mspec,resolver,flags); } IClusterInfo *deserializeClusterInfo(MemoryBuffer &mb, INamedGroupStore *resolver) @@ -1281,33 +1284,39 @@ class CFileDescriptor: public CFileDescriptorBase, implements ISuperFileDescrip cluster->getReplicateDir(repDir, os); setReplicateFilename(fullpath,queryDrive(idx,copy),baseDir.str(),repDir.str()); - const char *planeName = cluster->queryGroupName(); - if (!isEmptyString(planeName)) + // The following code manipulates the directory for striping and aliasing if necessary. + // To do so, it needs the plane details. + // Normally, the plane name is obtained from IClusterInfo, however, if this file is foreign, + // then the IClusterInfo's will have no resolved names (aka groups) because the remote groups + // don't exist in the client environment. Instead, if the foreign file came from k8s, it will + // have remoteStoragePlane serialized/set. + Owned plane; + if (remoteStoragePlane) + plane.set(remoteStoragePlane); + else { -#ifdef _CONTAINERIZED - Owned plane = getDataStoragePlane(planeName, false); -#else - Owned plane = remoteStoragePlane.getLink(); -#endif - if (plane) + const char *planeName = cluster->queryGroupName(); + if (!isEmptyString(planeName)) + plane.setown(getDataStoragePlane(planeName, false)); + } + if (plane) + { + StringBuffer planePrefix(plane->queryPrefix()); + Owned alias = plane->getAliasMatch(accessMode); + if (alias) { - StringBuffer planePrefix(plane->queryPrefix()); - Owned alias = plane->getAliasMatch(accessMode); - if (alias) + StringBuffer tmp; + StringBuffer newPlanePrefix(alias->queryPrefix()); + if (setReplicateDir(fullpath, tmp, false, planePrefix, newPlanePrefix)) { - StringBuffer tmp; - StringBuffer newPlanePrefix(alias->queryPrefix()); - if (setReplicateDir(fullpath, tmp, false, planePrefix, newPlanePrefix)) - { - planePrefix.swapWith(newPlanePrefix); - fullpath.swapWith(tmp); - } + planePrefix.swapWith(newPlanePrefix); + fullpath.swapWith(tmp); } - StringBuffer stripeDir; - addStripeDirectory(stripeDir, fullpath, planePrefix, idx, lfnHash, cluster->queryPartDiskMapping().numStripedDevices); - if (!stripeDir.isEmpty()) - fullpath.swapWith(stripeDir); } + StringBuffer stripeDir; + addStripeDirectory(stripeDir, fullpath, planePrefix, idx, lfnHash, cluster->queryPartDiskMapping().numStripedDevices); + if (!stripeDir.isEmpty()) + fullpath.swapWith(stripeDir); } } @@ -1633,6 +1642,8 @@ class CFileDescriptor: public CFileDescriptorBase, implements ISuperFileDescrip attr.setown(createPTreeFromIPT(at)); else attr.setown(createPTree("Attr")); + if (flags & IFDSF_FOREIGN_GROUP) + setFlags(static_cast(fileFlags | FileDescriptorFlags::foreign)); if (attr->hasProp("@lfnHash")) // potentially missing for meta coming from a legacy Dali lfnHash = attr->getPropInt("@lfnHash"); else if (tracename.length()) diff --git a/dali/base/dafdesc.hpp b/dali/base/dafdesc.hpp index c88a6adb34e..535178828e2 100644 --- a/dali/base/dafdesc.hpp +++ b/dali/base/dafdesc.hpp @@ -190,7 +190,8 @@ typedef IIteratorOf IPartDescriptorIterator; enum class FileDescriptorFlags { none = 0x00, - dirperpart = 0x01 + dirperpart = 0x01, + foreign = 0x02 }; BITMASK_ENUM(FileDescriptorFlags); @@ -351,7 +352,8 @@ interface IStoragePlane: extends IInterface IClusterInfo *createClusterInfo(const char *grpname, // NULL if roxie label set IGroup *grp, const ClusterPartDiskMapSpec &mspec, - INamedGroupStore *resolver=NULL + INamedGroupStore *resolver=NULL, + unsigned flags=0 ); IClusterInfo *createRoxieClusterInfo(const char *label, const ClusterPartDiskMapSpec &mspec diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index e5daa59a8a3..89caaa9efc5 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -2031,6 +2031,7 @@ class CCovenSDSManager : public CSDSManagerBase, implements ISDSManagerServer, i virtual void setConfigOpt(const char *opt, const char *value); virtual unsigned queryCount(const char *xpath); virtual bool updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response); + virtual void closedown() override; // ISubscriptionManager impl. virtual void add(ISubscription *subs,SubscriptionId id); @@ -6062,6 +6063,11 @@ CCovenSDSManager::~CCovenSDSManager() config.Release(); } +void CCovenSDSManager::closedown() +{ + //Should never be called - but do not assert since it is harmless and it is better not to report +} + void CCovenSDSManager::validateDeltaBackup() { // check consistency of delta diff --git a/dali/base/dasds.hpp b/dali/base/dasds.hpp index 91a9e6b5048..1a114271bcf 100644 --- a/dali/base/dasds.hpp +++ b/dali/base/dasds.hpp @@ -118,6 +118,7 @@ interface ISDSManager virtual void setConfigOpt(const char *opt, const char *value) = 0; virtual unsigned queryCount(const char *xpath) = 0; virtual bool updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response) = 0; + virtual void closedown() = 0; }; extern da_decl const char *queryNotifyHandlerName(IPropertyTree *tree); diff --git a/ecl/agentexec/agentexec.cpp b/ecl/agentexec/agentexec.cpp index 5582af12177..e0935dba13b 100644 --- a/ecl/agentexec/agentexec.cpp +++ b/ecl/agentexec/agentexec.cpp @@ -342,13 +342,8 @@ class WaitThread : public CInterfaceOf Owned cw = factory->openWorkUnit(wuid); if (cw) { - // if either a) NOT a thoragent with useChildProcesses=false (default in k8s config) or b) is still in an executing state - if (!sharedK8sJob || (cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait)) + if (!sharedK8sJob && ((cw->getState() == WUStateRunning) || (cw->getState() == WUStateBlocked) || (cw->getState() == WUStateWait))) { - // For a shared k8s job, i.e. where this agent is thoragent launching shared (multiJobLinger) k8s jobs - // the job agent should handle the job state. - // In that scenario, this is a fallback that should only come into effect if the job workflow instance has failed to handle the exception - // e.g. because it abruptly disappeared. Owned workunit = &cw->lock(); // recheck now locked if ((workunit->getState() == WUStateRunning) || (workunit->getState() == WUStateBlocked) || (workunit->getState() == WUStateWait)) diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index ee9dc1bc5ab..166cea2c0a7 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -8668,7 +8668,8 @@ bool CHThorDiskReadBaseActivity::openNext() StringBuffer tmp; remoteFileIO->addVirtualFieldMapping("logicalFilename", logicalFileName.str()); remoteFileIO->addVirtualFieldMapping("baseFpos", tmp.clear().append(offsetOfPart).str()); - remoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(curPart->getPartIndex()).str()); + if (curPart) + remoteFileIO->addVirtualFieldMapping("partNum", tmp.clear().append(curPart->getPartIndex()).str()); try { diff --git a/esp/services/ws_workunits/ws_workunitsHelpers.hpp b/esp/services/ws_workunits/ws_workunitsHelpers.hpp index ae040b8aae8..d8b68ce724b 100644 --- a/esp/services/ws_workunits/ws_workunitsHelpers.hpp +++ b/esp/services/ws_workunits/ws_workunitsHelpers.hpp @@ -219,7 +219,7 @@ struct WUComponentLogOptions populateTimeRange(start, end, bufferSecs); //int 0 ==MIN, 1==DEFAULT, 2==ALL, 3==CUSTOM - int colMode = zapHttpRequest->getParameterInt("LogFilter_ColumnMode", -1); + int colMode = zapHttpRequest->getParameterInt("LogFilter_SelectColumnMode", -1); if (colMode != -1) { StringArray customFields; //comma delimited list of available columns, only if ColumnMode==3 diff --git a/esp/src/eslint/index.js b/esp/src/eslint/index.js index 3a433ad8a60..c3bdb1cd045 100644 --- a/esp/src/eslint/index.js +++ b/esp/src/eslint/index.js @@ -2,6 +2,9 @@ module.exports = { rules: { "no-src-react": { + meta: { + fixable: "code" + }, create: function (context) { return { ImportDeclaration(node) { diff --git a/esp/src/package-lock.json b/esp/src/package-lock.json index e15b7782995..ecb1390c775 100644 --- a/esp/src/package-lock.json +++ b/esp/src/package-lock.json @@ -31,7 +31,7 @@ "@hpcc-js/timeline": "2.53.0", "@hpcc-js/tree": "2.41.0", "@hpcc-js/util": "2.52.0", - "@hpcc-js/wasm": "2.18.1", + "@hpcc-js/wasm": "2.18.2", "@kubernetes/client-node": "0.20.0", "clipboard": "2.0.11", "d3-dsv": "3.0.1", @@ -89,12 +89,13 @@ "dev": true }, "node_modules/@75lb/deep-merge": { - "version": "1.1.1", - "resolved": "https://registry.npmjs.org/@75lb/deep-merge/-/deep-merge-1.1.1.tgz", - "integrity": "sha512-xvgv6pkMGBA6GwdyJbNAnDmfAIR/DfWhrj9jgWh3TY7gRm3KO46x/GPjRg6wJ0nOepwqrNxFfojebh0Df4h4Tw==", + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/@75lb/deep-merge/-/deep-merge-1.1.2.tgz", + "integrity": "sha512-08K9ou5VNbheZFxM5tDWoqjA3ImC50DiuuJ2tj1yEPRfkp8lLLg6XAaJ4On+a0yAXor/8ay5gHnAIshRM44Kpw==", "dev": true, + "license": "MIT", "dependencies": { - "lodash.assignwith": "^4.2.0", + "lodash": "^4.17.21", "typical": "^7.1.1" }, "engines": { @@ -2334,9 +2335,9 @@ "license": "0BSD" }, "node_modules/@hpcc-js/wasm": { - "version": "2.18.1", - "resolved": "https://registry.npmjs.org/@hpcc-js/wasm/-/wasm-2.18.1.tgz", - "integrity": "sha512-fT8NCOTaF0NDnT+ZwWpV2VQ6ywFEqw+fG87GSPNQemEmg7FFqUaKRQOW9MBICrkZcXaJBb7VHo1t5UF6bi/JgQ==", + "version": "2.18.2", + "resolved": "https://registry.npmjs.org/@hpcc-js/wasm/-/wasm-2.18.2.tgz", + "integrity": "sha512-9FIpuXvIsIY3UbUd/HZPPiaZe6IFIuA6k5j9Lh54QcINP1s9hbMr/na0xjt+qRPXlwZdrOz3zQJBzHEEQDKnCw==", "license": "Apache-2.0", "dependencies": { "yargs": "17.7.2" @@ -8065,12 +8066,6 @@ "resolved": "https://registry.npmjs.org/lodash-es/-/lodash-es-4.17.21.tgz", "integrity": "sha512-mKnC+QJ9pWVzv+C4/U3rRsHapFfHvQFoFB92e52xeyGMcX6/OlIl78je1u8vePzYZSkkogMPJ2yjxxsb89cxyw==" }, - "node_modules/lodash.assignwith": { - "version": "4.2.0", - "resolved": "https://registry.npmjs.org/lodash.assignwith/-/lodash.assignwith-4.2.0.tgz", - "integrity": "sha512-ZznplvbvtjK2gMvnQ1BR/zqPFZmS6jbK4p+6Up4xcRYA7yMIwxHCfbTcrYxXKzzqLsQ05eJPVznEW3tuwV7k1g==", - "dev": true - }, "node_modules/lodash.camelcase": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz", @@ -12322,4 +12317,4 @@ } } } -} \ No newline at end of file +} diff --git a/esp/src/package.json b/esp/src/package.json index c1ec1651488..e9d7224d6f4 100644 --- a/esp/src/package.json +++ b/esp/src/package.json @@ -57,7 +57,7 @@ "@hpcc-js/timeline": "2.53.0", "@hpcc-js/tree": "2.41.0", "@hpcc-js/util": "2.52.0", - "@hpcc-js/wasm": "2.18.1", + "@hpcc-js/wasm": "2.18.2", "@kubernetes/client-node": "0.20.0", "clipboard": "2.0.11", "d3-dsv": "3.0.1", diff --git a/esp/src/src-react/components/ECLArchive.tsx b/esp/src/src-react/components/ECLArchive.tsx index 8d63d3ae8f5..a84ceaae3ac 100644 --- a/esp/src/src-react/components/ECLArchive.tsx +++ b/esp/src/src-react/components/ECLArchive.tsx @@ -111,7 +111,7 @@ export const ECLArchive: React.FunctionComponent = ({ return } main={ - + { // Only render after archive is loaded (to ensure it "defaults to open") --- archive?.modAttrs.length && diff --git a/esp/src/src-react/components/Files.tsx b/esp/src/src-react/components/Files.tsx index 08f5f953c34..6cc33214cbb 100644 --- a/esp/src/src-react/components/Files.tsx +++ b/esp/src/src-react/components/Files.tsx @@ -156,6 +156,7 @@ export const Files: React.FunctionComponent = ({ } return ""; }, + field: nlsHPCC.Protected, }, IsCompressed: { headerIcon: "ZipFolder", @@ -168,6 +169,7 @@ export const Files: React.FunctionComponent = ({ } return ""; }, + field: nlsHPCC.Compressed, }, Name: { label: nlsHPCC.LogicalName, @@ -194,18 +196,21 @@ export const Files: React.FunctionComponent = ({ formatter: (value, row) => { return Utility.formatNum(row.IntRecordCount); }, + csvFormatter: (value, row) => row.IntRecordCount, }, FileSize: { label: nlsHPCC.Size, formatter: (value, row) => { return Utility.convertedSize(row.IntSize); }, + csvFormatter: (value, row) => row.IntSize, }, CompressedFileSizeString: { label: nlsHPCC.CompressedSize, formatter: (value, row) => { return Utility.convertedSize(row.CompressedFileSize); - } + }, + csvFormatter: (value, row) => row.CompressedFileSize, }, Parts: { label: nlsHPCC.Parts, width: 40, diff --git a/esp/src/src-react/components/Frame.tsx b/esp/src/src-react/components/Frame.tsx index 9821ffdfd43..76b39bd3a5d 100644 --- a/esp/src/src-react/components/Frame.tsx +++ b/esp/src/src-react/components/Frame.tsx @@ -88,7 +88,7 @@ export const Frame: React.FunctionComponent = () => { router.resolve(hashHistory.location).then(setBody); - userKeyValStore().get("user_cookie_consent") + userKeyValStore().get(USER_COOKIE_CONSENT) .then((resp) => { setShowCookieConsent(resp === "1"); }) diff --git a/esp/src/src-react/components/Metrics.tsx b/esp/src/src-react/components/Metrics.tsx index f0df3a94f6a..40413d012c7 100644 --- a/esp/src/src-react/components/Metrics.tsx +++ b/esp/src/src-react/components/Metrics.tsx @@ -634,7 +634,7 @@ export const Metrics: React.FunctionComponent = ({ } main={ - + diff --git a/esp/src/src-react/components/Queries.tsx b/esp/src/src-react/components/Queries.tsx index e11938a5734..ff45b2a7f6d 100644 --- a/esp/src/src-react/components/Queries.tsx +++ b/esp/src/src-react/components/Queries.tsx @@ -112,7 +112,8 @@ export const Queries: React.FunctionComponent = ({ return ; } return ""; - } + }, + field: nlsHPCC.Suspended, }, ErrorCount: { headerIcon: "Warning", @@ -124,7 +125,8 @@ export const Queries: React.FunctionComponent = ({ return ; } return ""; - } + }, + field: nlsHPCC.ErrorWarnings, }, MixedNodeStates: { headerIcon: "Error", @@ -136,7 +138,7 @@ export const Queries: React.FunctionComponent = ({ return ; } return ""; - } + }, }, Activated: { headerIcon: "SkypeCircleCheck", @@ -147,7 +149,8 @@ export const Queries: React.FunctionComponent = ({ return ; } return ""; - } + }, + field: nlsHPCC.Active, }, Id: { label: nlsHPCC.ID, diff --git a/esp/src/src-react/components/SourceEditor.tsx b/esp/src/src-react/components/SourceEditor.tsx index 5e070c6f419..5164848df25 100644 --- a/esp/src/src-react/components/SourceEditor.tsx +++ b/esp/src/src-react/components/SourceEditor.tsx @@ -147,38 +147,44 @@ export const SourceEditor: React.FunctionComponent = ({ interface TextSourceEditorProps { text: string; readonly?: boolean; + toolbar?: boolean; } export const TextSourceEditor: React.FunctionComponent = ({ text = "", - readonly = false + readonly, + toolbar }) => { - return ; + return ; }; interface XMLSourceEditorProps { text: string; readonly?: boolean; + toolbar?: boolean; } export const XMLSourceEditor: React.FunctionComponent = ({ text = "", - readonly = false + readonly, + toolbar }) => { - return ; + return ; }; interface JSONSourceEditorProps { json?: object; readonly?: boolean; + toolbar?: boolean; onChange?: (obj: object) => void; } export const JSONSourceEditor: React.FunctionComponent = ({ json, - readonly = false, + readonly, + toolbar, onChange = (obj: object) => { } }) => { @@ -197,7 +203,7 @@ export const JSONSourceEditor: React.FunctionComponent = } }, [onChange]); - return ; + return ; }; export interface WUXMLSourceEditorProps { @@ -215,10 +221,12 @@ export const WUXMLSourceEditor: React.FunctionComponent export interface WUResourceEditorProps { src: string; + toolbar?: boolean; } export const WUResourceEditor: React.FunctionComponent = ({ - src + src, + toolbar }) => { const [text, setText] = React.useState(""); @@ -231,7 +239,7 @@ export const WUResourceEditor: React.FunctionComponent = }); }, [src]); - return ; + return ; }; interface ECLSourceEditorProps { @@ -266,6 +274,7 @@ interface FetchEditor { url: string; wuid?: string; readonly?: boolean; + toolbar?: boolean; mode?: "ecl" | "xml" | "text"; } @@ -273,6 +282,7 @@ export const FetchEditor: React.FunctionComponent = ({ url, wuid, readonly = true, + toolbar, mode = "text" }) => { @@ -293,11 +303,12 @@ export const FetchEditor: React.FunctionComponent = ({ } }, [url, wuid]); - return ; + return ; }; interface SQLSourceEditorProps { sql: string; + readonly?: boolean; toolbar?: boolean; onSqlChange?: (sql: string) => void; onFetchHints?: (cm: any, option: any) => Promise; @@ -306,11 +317,12 @@ interface SQLSourceEditorProps { export const SQLSourceEditor: React.FunctionComponent = ({ sql, + readonly, toolbar, onSqlChange, onFetchHints, onSubmit }) => { - return ; + return ; }; diff --git a/esp/src/src-react/components/Workunits.tsx b/esp/src/src-react/components/Workunits.tsx index 0fce3d69f16..ab47cf34673 100644 --- a/esp/src/src-react/components/Workunits.tsx +++ b/esp/src/src-react/components/Workunits.tsx @@ -116,7 +116,8 @@ export const Workunits: React.FunctionComponent = ({ return ; } return ""; - } + }, + field: nlsHPCC.Protected, }, Wuid: { label: nlsHPCC.WUID, width: 120, @@ -143,23 +144,20 @@ export const Workunits: React.FunctionComponent = ({ "Compile Cost": { label: nlsHPCC.CompileCost, width: 100, justify: "right", - formatter: (cost, row) => { - return `${formatCost(row.CompileCost)}`; - } + formatter: (cost, row) => `${formatCost(row.CompileCost)}`, + csvFormatter: (cost, row) => row.CompileCost, }, "Execution Cost": { label: nlsHPCC.ExecuteCost, width: 100, justify: "right", - formatter: (cost, row) => { - return `${formatCost(row.ExecuteCost)}`; - } + formatter: (cost, row) => `${formatCost(row.ExecuteCost)}`, + csvFormatter: (cost, row) => row.ExecuteCost, }, "File Access Cost": { label: nlsHPCC.FileAccessCost, width: 100, justify: "right", - formatter: (cost, row) => { - return `${formatCost(row.FileAccessCost)}`; - } + formatter: (cost, row) => `${formatCost(row.FileAccessCost)}`, + csvFormatter: (cost, row) => row.FileAccessCost, } }; }, [filter]); diff --git a/esp/src/src-react/components/controls/Grid.tsx b/esp/src/src-react/components/controls/Grid.tsx index 7a445acac93..c781322a0f2 100644 --- a/esp/src/src-react/components/controls/Grid.tsx +++ b/esp/src/src-react/components/controls/Grid.tsx @@ -122,6 +122,7 @@ const gridStyles = (height: string): Partial => { maxHeight: height, selectors: { ".ms-DetailsHeader-cellName": { fontSize: "13.5px" }, + ".ms-DetailsRow": { userSelect: "text" }, ".ms-DetailsRow-cell:has(.bgFilled)": { color: "white", boxShadow: "inset 1px 0 var(--colorNeutralBackground1), inset -1px 1px var(--colorNeutralBackground1)" }, ".ms-DetailsRow-cell:has(.bgGreen)": { background: "green" }, ".ms-DetailsRow-cell:has(.bgOrange)": { background: "orange" }, diff --git a/esp/src/src-react/layouts/DockPanel.tsx b/esp/src/src-react/layouts/DockPanel.tsx index 8ebea0e1c74..d359e0397b9 100644 --- a/esp/src/src-react/layouts/DockPanel.tsx +++ b/esp/src/src-react/layouts/DockPanel.tsx @@ -3,8 +3,8 @@ import * as ReactDOM from "react-dom"; import { Theme, ThemeProvider } from "@fluentui/react"; import { useConst } from "@fluentui/react-hooks"; import { FluentProvider, Theme as ThemeV9 } from "@fluentui/react-components"; -import { HTMLWidget, Widget } from "@hpcc-js/common"; -import { DockPanel as HPCCDockPanel, IClosable } from "@hpcc-js/phosphor"; +import { HTMLWidget, Widget, Utility } from "@hpcc-js/common"; +import { DockPanel as HPCCDockPanel, IClosable, WidgetAdapter } from "@hpcc-js/phosphor"; import { compare2 } from "@hpcc-js/util"; import { lightTheme, lightThemeV9 } from "../themes"; import { useUserTheme } from "../hooks/theme"; @@ -96,6 +96,7 @@ export class ResetableDockPanel extends HPCCDockPanel { protected _origLayout: DockPanelLayout | undefined; protected _lastLayout: DockPanelLayout | undefined; + protected _visibility: { [id: string]: boolean }; resetLayout() { if (this._origLayout) { @@ -118,8 +119,19 @@ export class ResetableDockPanel extends HPCCDockPanel { return formatLayout(this.layout()) ?? this._lastLayout ?? this._origLayout; } + getVisibility() { + return this._visibility; + } + render(callback?: (w: Widget) => void) { - const retVal = super.render(); + const retVal = this._visibility !== undefined ? super.render() : super.render(() => { + if (this._visibility === undefined) { + this._visibility = {}; + this.widgetAdapters().forEach(wa => { + this._visibility[wa.widget.id()] = wa.isVisible; + }); + } + }); if (this._origLayout === undefined) { this._origLayout = formatLayout(this.layout()); } @@ -130,9 +142,27 @@ export class ResetableDockPanel extends HPCCDockPanel { } // Events --- + childActivation(w: Widget, wa: WidgetAdapter) { + } + + childVisibility(w: Widget, visible: boolean, wa: WidgetAdapter) { + if (this._visibility && this._visibility[w.id()] !== visible) { + this._visibility[w.id()] = visible; + this._lazyVisibilityChanged(); + } + } + layoutChanged() { this._lastLayout = this.getLayout(); } + + // Exposed Events --- + private _lazyVisibilityChanged = Utility.debounce(async () => { + this.visibilityChanged(this._visibility); + }, 60); + + visibilityChanged(visibility: { [id: string]: boolean }) { + } } interface DockPanelItemProps { @@ -154,14 +184,16 @@ export const DockPanelItem: React.FunctionComponent = ({ interface DockPanelProps { layout?: object; hideSingleTabs?: boolean; - onDockPanelCreate?: (dockpanel: ResetableDockPanel) => void; + onCreate?: (dockpanel: ResetableDockPanel) => void; + onVisibilityChanged?: (visibility: { [id: string]: boolean }) => void; children?: React.ReactElement | React.ReactElement[]; } export const DockPanel: React.FunctionComponent = ({ layout, hideSingleTabs, - onDockPanelCreate, + onCreate: onDockPanelCreate, + onVisibilityChanged: onDockPanelVisibilityChanged, children }) => { const items = React.useMemo(() => { @@ -179,6 +211,9 @@ export const DockPanel: React.FunctionComponent = ({ onDockPanelCreate(retVal); }, 0); } + if (onDockPanelVisibilityChanged) { + retVal.on("visibilityChanged", visibility => onDockPanelVisibilityChanged(visibility), true); + } return retVal; }); diff --git a/helm/hpcc/templates/issuers.yaml b/helm/hpcc/templates/issuers.yaml index 9b52fb85800..77dc5e2b3bf 100644 --- a/helm/hpcc/templates/issuers.yaml +++ b/helm/hpcc/templates/issuers.yaml @@ -119,8 +119,8 @@ spec: --- {{- end }} {{- end }} - {{ include "hpcc.addWildIssuerCertificate" (dict "root" .root "issuerKeyName" .issuerKeyName "me" .me ) }} {{- end }} + {{ include "hpcc.addWildIssuerCertificate" (dict "root" .root "issuerKeyName" .issuerKeyName "me" .me ) }} {{- end }} {{- template "hpcc.ensureNoResourceValidationFlag" ( dict "root" $ ) }} diff --git a/initfiles/bin/init_thorslave.in b/initfiles/bin/init_thorslave.in index 1048ce6f9ac..ad845f0a74f 100755 --- a/initfiles/bin/init_thorslave.in +++ b/initfiles/bin/init_thorslave.in @@ -102,7 +102,8 @@ start_slaves() rsync_att=3 rsync_stat=1 while [[ $rsync_stat -ne 0 && $rsync_att -gt 0 ]] ; do - rsync -e "ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no" --timeout=60 $master:$instancedir/slaves $slavesfname + # reset LD_LIBRARY_PATH here so ssh cmd doesn't use HPCC libssl/crypto as they may be different + LD_LIBRARY_PATH=: rsync -e "ssh -o LogLevel=QUIET -o StrictHostKeyChecking=no" --timeout=60 $master:$instancedir/slaves $slavesfname rsync_stat=$? ((rsync_att--)) log "rsync returns ${rsync_stat}" diff --git a/plugins/cassandra/cassandrawu.cpp b/plugins/cassandra/cassandrawu.cpp index 293b0c21ea1..10cb6239430 100644 --- a/plugins/cassandra/cassandrawu.cpp +++ b/plugins/cassandra/cassandrawu.cpp @@ -3873,7 +3873,7 @@ class CCasssandraWorkUnitFactory : public CWorkUnitFactory, implements ICassandr case WUStateAborting: if (agentSessionStopped) { - reportAbnormalTermination(wuid, state, agent); + reportAbnormalTermination(wuid, state, agent, "Agent"); return state; } if (queryDaliServerVersion().compare("2.1")>=0) diff --git a/plugins/parquet/parquet.ecllib b/plugins/parquet/parquet.ecllib index 66529ac50dc..48f26678058 100644 --- a/plugins/parquet/parquet.ecllib +++ b/plugins/parquet/parquet.ecllib @@ -43,7 +43,7 @@ EXPORT ParquetIO := MODULE ENDMACRO; EXPORT Read(resultLayout, basePath, partitionFieldList) := FUNCTIONMACRO - LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readdirectorypartition'), location(basePath)), partitionFields(partitionFieldList) + LOCAL STREAMED DATASET(resultLayout) _DoParquetReadPartition() := EMBED(parquet: activity, option('readdirectorypartition'), location(basePath), partitionFields(partitionFieldList)) ENDEMBED; RETURN _DoParquetReadPartition(); ENDMACRO; diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 2808e699dc8..4af183de56e 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -463,6 +463,8 @@ __int64 ParquetReader::next(TableColumns *&nextTable) if (endsWithIgnoreCase(partOption.c_str(), "partition")) { PARQUET_ASSIGN_OR_THROW(table, queryRows()); // Sets rowsProcessed to current row in table corresponding to startRow + rowsCount = table->num_rows(); + splitTable(table); } else { diff --git a/system/jlib/jbuff.cpp b/system/jlib/jbuff.cpp index e1e535a087c..dc95a5129a4 100644 --- a/system/jlib/jbuff.cpp +++ b/system/jlib/jbuff.cpp @@ -62,12 +62,9 @@ constexpr unsigned BUFF_DOUBLE_LIMIT=4096; constexpr unsigned BUFF_FIRST_CHUNK_SIZE=8; constexpr unsigned BUFF_DETACH_GRANULARITY=16; - -#ifdef _DEBUG +//Always check the length before reading - so ensure that serialization problems are caught +//The overhead is trivial.... #define CHECKREADPOS(len) assertex(readPos+(len)<=length()) -#else -#define CHECKREADPOS(len) -#endif //----------------------------------------------------------------------- diff --git a/system/jlib/jdebug.cpp b/system/jlib/jdebug.cpp index d939b8a417d..7a72987329e 100644 --- a/system/jlib/jdebug.cpp +++ b/system/jlib/jdebug.cpp @@ -1473,10 +1473,40 @@ static void getMemUsage(unsigned &inuse,unsigned &active,unsigned &total,unsigne } inuse = total-free-cached; - // not sure if a bug in kernel or container or ... - // but sometimes we see swapfree > 0 when swaptotal == 0 - if ((swapfree + swapcached) >= swaptotal) - swapinuse = 0; + // swapinuse = swaptotal-swapfree-swapcached; + + // sometimes in containers [under mem pressure ?] we see from /proc/meminfo - + + // SwapCached: 0 kB + // SwapTotal: 0 kB + // SwapFree: 18446744073709551496 kB + // or - + // SwapCached: 0 kB + // SwapTotal: 0 kB + // SwapFree: 120 kB + + // and from free cmd - + + // free -m + // total used free shared buff/cache available + // Mem: 43008 17616 5375 0 20015 25391 + // Swap: 0 18014398509481984 0 + + // if swapfree > 0 when swaptotal == 0 - + // *might* indicate kernel is pushing exe/mmapped pages out of memory to make room + // for other things and this can affect performance + + // not sure why SwapFree value is not always valid/accurate + // vmstat shows more reasonable swpd value, but walks all /proc//stat files + + // SwapCached: Memory that is present within main memory, but also in the swapfile + + if ((swapfree + swapcached) > swaptotal) + { + swapinuse = swapfree + swapcached; + if (swapinuse > total) + swapinuse = active; // something more reasonable ... + } else swapinuse = swaptotal-swapfree-swapcached; #endif diff --git a/system/jlib/jptree.cpp b/system/jlib/jptree.cpp index 4aed0538292..d7ba2871c78 100644 --- a/system/jlib/jptree.cpp +++ b/system/jlib/jptree.cpp @@ -8782,14 +8782,22 @@ class CConfigUpdater : public CInterface absoluteConfigFilename.set(std::get<0>(result).c_str()); } }; - fileWatcher.setown(createFileEventWatcher(updateFunc)); - - // watch the path, not the filename, because the filename might not be seen if directories are moved, softlinks are changed.. - StringBuffer path, filename; - splitFilename(absoluteConfigFilename, nullptr, &path, &filename, &filename); - configFilename.set(filename); - fileWatcher->add(path, FileWatchEvents::anyChange); - fileWatcher->start(); + try + { + fileWatcher.setown(createFileEventWatcher(updateFunc)); + + // watch the path, not the filename, because the filename might not be seen if directories are moved, softlinks are changed.. + StringBuffer path, filename; + splitFilename(absoluteConfigFilename, nullptr, &path, &filename, &filename); + configFilename.set(filename); + fileWatcher->add(path, FileWatchEvents::anyChange); + fileWatcher->start(); + } + catch (IException * e) + { + OERRLOG(e, "Failed to start file watcher"); + e->Release(); + } return true; } void executeCallbacks(IPropertyTree *oldComponentConfiguration, IPropertyTree *oldGlobalConfiguration) diff --git a/system/jlib/jstream.cpp b/system/jlib/jstream.cpp index 8f31718a8e1..c0ea60f0618 100644 --- a/system/jlib/jstream.cpp +++ b/system/jlib/jstream.cpp @@ -896,8 +896,15 @@ class CBlockedSerialOutputStream final : public CInterfaceOf 1M + size32_t alignment = buffer.length() / 4; + if (alignment < 32) + alignment = 32; + newLength += (alignment - 1); + newLength -= newLength % alignment; MemoryAttr expandedBuffer(newLength); memcpy(expandedBuffer.mem(), data(0), bufferOffset); diff --git a/system/mp/mplog.cpp b/system/mp/mplog.cpp index 86b338e85c1..2483ad015b1 100644 --- a/system/mp/mplog.cpp +++ b/system/mp/mplog.cpp @@ -532,6 +532,8 @@ aindex_t LogMsgParentReceiverThread::findParent(const INode * node) const bool connectLogMsgManagerToParent(INode * parentNode) { + if (isContainerized()) + return false; assertex(parentReceiver); MPLogId pid = parentReceiver->getNextId(); return parentReceiver->addParentToManager(0, pid, parentNode, false); @@ -539,6 +541,8 @@ bool connectLogMsgManagerToParent(INode * parentNode) bool connectLogMsgManagerToParentOwn(INode * parentNode) { + if (isContainerized()) + return false; bool ret = connectLogMsgManagerToParent(parentNode); parentNode->Release(); return ret; @@ -546,11 +550,15 @@ bool connectLogMsgManagerToParentOwn(INode * parentNode) bool disconnectLogMsgManagerFromParent(INode * parentNode) { + if (isContainerized()) + return false; return parentReceiver->removeParentFromManager(parentNode, false); } bool disconnectLogMsgManagerFromParentOwn(INode * parentNode) { + if (isContainerized()) + return false; bool ret = disconnectLogMsgManagerFromParent(parentNode); parentNode->Release(); return ret; diff --git a/testing/unittests/jstreamtests.cpp b/testing/unittests/jstreamtests.cpp index dda4523835d..23ee3dccd8b 100644 --- a/testing/unittests/jstreamtests.cpp +++ b/testing/unittests/jstreamtests.cpp @@ -282,6 +282,87 @@ class VariableDataProvider : public CDataProvider }; +//A very large row (because of a large embedded dataset) +//100 bytes of data then 500K rows of 100 bytes +class LargeRowDataProvider : public CDataProvider +{ +public: + LargeRowDataProvider(bool _useCount, unsigned _numChildren) : useCount(_useCount), numChildren(_numChildren) + { + name.append("Large_").append(useCount ? 'C' : 'S').append(numChildren); + } + + virtual size32_t create(IBufferedSerialOutputStream * target, unsigned row) + { + //Output (row, (string)row, (row % 7)items of (row, row*2, row*3)) + byte mainRow[100]; + unsigned childRow[25]; + + for (size32_t i=0; i < sizeof(mainRow); i++) + mainRow[i] = (byte)(i * row); + target->put(sizeof(mainRow), mainRow); + + size32_t childCount = numChildren + row; + size32_t childSize = sizeof(childRow) * childCount; + if (useCount) + target->put(4, &childCount); + else + target->suspend(sizeof(size32_t)); + + unsigned next = 1234 + row * 31419264U; + for (unsigned i=0; i < childCount; i++) + { + for (size32_t i=0; i < sizeof(mainRow)/sizeof(next); i++) + { + childRow[i] = next; + next *= 0x13894225; + next += row; + } + target->put(sizeof(childRow), &childRow); + } + if (!useCount) + target->resume(sizeof(childSize), &childSize); + + return sizeof(mainRow) + 4 + childSize; + } + + virtual size32_t check(IBufferedSerialInputStream * source, unsigned row) + { + byte mainRow[100]; + unsigned childRow[25]; + + source->read(sizeof(mainRow), &mainRow); + for (size32_t i=0; i < sizeof(mainRow); i++) + assertex(mainRow[i] == (byte)(i * row)); + + size32_t childCount = numChildren + row; + size32_t childSize = sizeof(childRow) * childCount; + size32_t size; + source->read(sizeof(size), &size); + if (useCount) + assertex(size == childCount); + else + assertex(size == childSize); + + unsigned next = 1234 + row * 31419264U; + for (unsigned i=0; i < childCount; i++) + { + source->read(sizeof(childRow), &childRow); + for (size32_t i=0; i < sizeof(mainRow)/sizeof(next); i++) + { + assertex(childRow[i] == next); + next *= 0x13894225; + next += row; + } + } + return sizeof(mainRow) + 4 + childSize; + } + +protected: + bool useCount; + unsigned numChildren = 10'000'000; +}; + class NullOuputStream : public CInterfaceOf { virtual size32_t write(size32_t len, const void * ptr) { return len; } @@ -305,6 +386,7 @@ class JlibStreamStressTest : public CppUnit::TestFixture CPPUNIT_TEST(testEvenSequentialStream); // write a file and read results after each flush CPPUNIT_TEST(testParallelStream); // write a file and read in parallel from a separate thread CPPUNIT_TEST(testThreadedWriteStream); // write a file using a threaded writer + CPPUNIT_TEST(testPathologicalRows); // 1M child rows, total row size 100MB //MORE: //Threaded writer //Threaded reader @@ -735,6 +817,20 @@ class JlibStreamStressTest : public CppUnit::TestFixture } } + void testPathologicalRows() + { + LargeRowDataProvider largeCount50K(true, 50'000); + LargeRowDataProvider largeCount10M(true, 10'000'000); + LargeRowDataProvider largeSize50K(false, 50'000); + LargeRowDataProvider largeSize10M(false, 10'000'000); + + ICompressHandler * lz4 = queryCompressHandler(COMPRESS_METHOD_LZ4); + + runSimpleStream(nullptr, largeCount50K, 0x100000, 0x100000, 2000); + runSimpleStream(nullptr, largeCount10M, 0x100000, 0x100000, 10); + runSimpleStream(nullptr, largeSize50K, 0x100000, 0x100000, 2000); + runSimpleStream(nullptr, largeSize10M, 0x100000, 0x100000, 10); + } void testIncSequentialStream() { diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 142724fd868..ea4c932654c 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -542,7 +542,7 @@ bool CJobManager::execute(IConstWorkUnit *workunit, const char *wuid, const char if (workunit->getCodeVersion() == 0) throw makeStringException(0, "Attempting to execute a workunit that hasn't been compiled"); if ((workunit->getCodeVersion() > ACTIVITY_INTERFACE_VERSION) || (workunit->getCodeVersion() < MIN_ACTIVITY_INTERFACE_VERSION)) - throw MakeStringException(0, "Workunit was compiled for eclagent interface version %d, this thor requires version %d..%d", workunit->getCodeVersion(), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION); + throw MakeStringException(0, "Workunit was compiled for eclagent interface version %d, this thor (%s) requires version %d..%d", workunit->getCodeVersion(), globals->queryProp("@name"), MIN_ACTIVITY_INTERFACE_VERSION, ACTIVITY_INTERFACE_VERSION); if (workunit->getCodeVersion() == 652) { // Any workunit compiled using eclcc 7.12.0-7.12.18 is not compatible @@ -554,7 +554,7 @@ bool CJobManager::execute(IConstWorkUnit *workunit, const char *wuid, const char const char *point = version + strlen("7.12."); unsigned pointVer = atoi(point); if (pointVer <= 18) - throw MakeStringException(0, "Workunit was compiled by eclcc version %s which is not compatible with this runtime", buildVersion.str()); + throw MakeStringException(0, "Workunit was compiled by eclcc version %s which is not compatible with this thor (%s)", buildVersion.str(), globals->queryProp("@name")); } } @@ -1114,6 +1114,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StWhenStarted, NULL, startTs, 1, 0, StatsMergeAppend); //Could use addTimeStamp(wu, SSTgraph, graphName, StWhenStarted, wfid) if start time could be this point wu->setState(WUStateRunning); + wu->setEngineSession(myProcessSession()); VStringBuffer version("%d.%d", THOR_VERSION_MAJOR, THOR_VERSION_MINOR); wu->setDebugValue("ThorVersion", version.str(), true); @@ -1140,6 +1141,8 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); if (globals->getPropBool("@watchdogProgressEnabled")) queryDeMonServer()->updateAggregates(wu); + // clear engine session, otherwise agent may consider a failure beyond this point for an unrelated job caused by this instance + wu->setEngineSession(-1); removeJob(*job); } diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 3159d58ad48..c2d05fccb31 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -37,111 +37,119 @@ #include "jstats.h" #ifdef GRAPH_EXPORTS - #define graph_decl DECL_EXPORT +#define graph_decl DECL_EXPORT #else - #define graph_decl DECL_IMPORT +#define graph_decl DECL_IMPORT #endif /// Thor options, that can be hints, workunit options, or global settings -#define THOROPT_COMPRESS_SPILLS "v9_4_compressInternalSpills" // Compress internal spills, e.g. spills created by lookahead or sort gathering (default = true) -#define THOROPT_COMPRESS_SPILL_TYPE "v9_4_spillCompressorType" // Compress spill type, e.g. FLZ, LZ4 (or other to get previous) (default = LZ4) -#define THOROPT_HDIST_SPILL "hdistSpill" // Allow distribute receiver to spill to disk, rather than blocking (default = true) -#define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize" // Distribute send thread pool size (default = 16) -#define THOROPT_HDIST_BUCKET_SIZE "hdOutBufferSize" // Distribute target bucket send size (default = 1MB) -#define THOROPT_HDIST_BUFFER_SIZE "hdInBufferSize" // Distribute send buffer size (for all targets) (default = 32MB) -#define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize" // Distribute pull buffer size (receiver side limit, before spilling) -#define THOROPT_HDIST_CANDIDATELIMIT "hdCandidateLimit" // Limits # of buckets to push to the writers when send buffer is full (default = is 50% largest) -#define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit" // Limit # of writer threads working on a single target (default = unbound, but picks round-robin) -#define THOROPT_HDIST_COMP "v9_4_hdCompressorType" // Distribute compressor to use (default = "LZ4") -#define THOROPT_HDIST_COMPOPTIONS "v9_4_hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "") -#define THOROPT_SPLITTER_SPILL "splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1) -#define THOROPT_SPLITTER_MAXROWMEMK "splitterRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) -#define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K) +#define THOROPT_COMPRESS_SPILLS "v9_4_compressInternalSpills" // Compress internal spills, e.g. spills created by lookahead or sort gathering (default = true) +#define THOROPT_COMPRESS_SPILL_TYPE "v9_4_spillCompressorType" // Compress spill type, e.g. FLZ, LZ4 (or other to get previous) (default = LZ4) +#define THOROPT_HDIST_SPILL "hdistSpill" // Allow distribute receiver to spill to disk, rather than blocking (default = true) +#define THOROPT_HDIST_WRITE_POOL_SIZE "hdistSendPoolSize" // Distribute send thread pool size (default = 16) +#define THOROPT_HDIST_BUCKET_SIZE "hdOutBufferSize" // Distribute target bucket send size (default = 1MB) +#define THOROPT_HDIST_BUFFER_SIZE "hdInBufferSize" // Distribute send buffer size (for all targets) (default = 32MB) +#define THOROPT_HDIST_PULLBUFFER_SIZE "hdPullBufferSize" // Distribute pull buffer size (receiver side limit, before spilling) +#define THOROPT_HDIST_CANDIDATELIMIT "hdCandidateLimit" // Limits # of buckets to push to the writers when send buffer is full (default = is 50% largest) +#define THOROPT_HDIST_TARGETWRITELIMIT "hdTargetLimit" // Limit # of writer threads working on a single target (default = unbound, but picks round-robin) +#define THOROPT_HDIST_COMP "v9_4_hdCompressorType" // Distribute compressor to use (default = "LZ4") +#define THOROPT_HDIST_COMPOPTIONS "v9_4_hdCompressorOptions" // Distribute compressor options, e.g. AES key (default = "") +#define THOROPT_SPLITTER_SPILL "v9_4_splitterSpill" // Force splitters to spill or not, default is to adhere to helper setting (default = -1) +#define THOROPT_SPLITTER_MAXROWMEMK "splitterRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) +#define THOROPT_SPLITTER_READAHEADGRANULARITYK "inMemReadAheadGranularityK" // Splitter in memory read ahead granularity (K) (default = 128K) #define THOROPT_SPLITTER_READAHEADGRANULARITYROWS "inMemReadAheadGranularityRows" // Splitter in memory read ahead granularity (# rows) (default = 64) -#define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) -#define THOROPT_SPLITTER_COMPRESSIONTOTALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) -#define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000) -#define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0) -#define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true) -#define THOROPT_SORT_MAX_DEVIANCE "sort_max_deviance" // Max (byte) variance allowed during sort partitioning (default = 10Mb) -#define THOROPT_OUTPUT_FLUSH_THRESHOLD "output_flush_threshold" // When above limit, workunit result is flushed (committed to Dali) (default = -1 [off]) -#define THOROPT_PARALLEL_MATCH "parallel_match" // Use multi-threaded join helper (retains sort order without unsorted_output) (default = false) -#define THOROPT_UNSORTED_OUTPUT "unsorted_output" // Allow Join results to be reodered, implies parallel match (default = false) -#define THOROPT_JOINHELPER_THREADS "joinHelperThreads" // Number of threads to use in threaded variety of join helper -#define THOROPT_LKJOIN_LOCALFAILOVER "lkjoin_localfailover" // Force SMART to failover to distributed local lookup join (for testing only) (default = false) -#define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only) (default = false) -#define THOROPT_MAX_KERNLOG "max_kern_level" // Max kernel logging level, to push to workunit, -1 to disable (default = 3) -#define THOROPT_COMP_FORCELZW "forceLZW" // Forces file compression to use LZW (default = false) -#define THOROPT_COMP_FORCEFLZ "forceFLZ" // Forces file compression to use FLZ (default = false) -#define THOROPT_COMP_FORCELZ4 "forceLZ4" // Forces file compression to use LZ4 (default = false) -#define THOROPT_COMP_FORCELZ4HC "forceLZ4HC" // Forces file compression to use LZ4HC (default = false) -#define THOROPT_TRACE_ENABLED "traceEnabled" // Output from TRACE activity enabled (default = false) -#define THOROPT_TRACE_LIMIT "traceLimit" // Number of rows from TRACE activity (default = 10) -#define THOROPT_READ_CRC "crcReadEnabled" // Enabled CRC validation on disk reads if file CRC are available (default = true) -#define THOROPT_WRITE_CRC "crcWriteEnabled" // Calculate CRC's for disk outputs and store in file meta data (default = true) -#define THOROPT_READCOMPRESSED_CRC "crcReadCompressedEnabled" // Enabled CRC validation on compressed disk reads if file CRC are available (default = false) -#define THOROPT_WRITECOMPRESSED_CRC "crcWriteCompressedEnabled" // Calculate CRC's for compressed disk outputs and store in file meta data (default = false) -#define THOROPT_CHILD_GRAPH_INIT_TIMEOUT "childGraphInitTimeout" // Time to wait for child graphs to respond to initialization (default = 5*60 seconds) -#define THOROPT_SORT_COMPBLKSZ "sortCompBlkSz" // Block size used by compressed spill in a spilling sort (default = 0, uses row writer default) -#define THOROPT_KEYLOOKUP_QUEUED_BATCHSIZE "keyLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000) -#define THOROPT_KEYLOOKUP_FETCH_QUEUED_BATCHSIZE "fetchLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000) -#define THOROPT_KEYLOOKUP_MAX_LOOKUP_BATCHSIZE "keyLookupMaxLookupBatchSize" // Maximum chunk of rows to process per cycle in lookup handler (default = 1000) -#define THOROPT_KEYLOOKUP_MAX_THREADS "maxKeyLookupThreads" // Maximum number of threads performing keyed lookups (default = 10) -#define THOROPT_KEYLOOKUP_MAX_FETCH_THREADS "maxFetchThreads" // Maximum number of threads performing keyed lookups (default = 10) -#define THOROPT_KEYLOOKUP_MAX_PROCESS_THREADS "keyLookupMaxProcessThreads" // Maximum number of threads performing keyed lookups (default = 10) -#define THOROPT_KEYLOOKUP_MAX_QUEUED "keyLookupMaxQueued" // Total maximum number of rows (across all parts/threads) to queue (default = 10000) -#define THOROPT_KEYLOOKUP_MIN_MB "keyLookupMinJoinGroupMB" // Min(MB) for groups (across all parts/threads) to queue) (default = 50) -#define THOROPT_KEYLOOKUP_MAX_DONE "keyLookupMaxDone" // Maximum number of done items pending to be ready by next activity (default = 10000) -#define THOROPT_KEYLOOKUP_PROCESS_BATCHLIMIT "keyLookupProcessBatchLimit" // Maximum number of key lookups on queue before passing to a processor (default = 1000) -#define THOROPT_FETCHLOOKUP_PROCESS_BATCHLIMIT "fetchLookupProcessBatchLimit" // Maximum number of fetch lookups on queue before passing to a processor (default = 10000) -#define THOROPT_REMOTE_KEYED_LOOKUP "remoteKeyedLookup" // Send key request to remote node unless part is local (default = true) -#define THOROPT_REMOTE_KEYED_FETCH "remoteKeyedFetch" // Send fetch request to remote node unless part is local (default = true) -#define THOROPT_FORCE_REMOTE_KEYED_LOOKUP "forceRemoteKeyedLookup" // force all keyed lookups, even where part local to be sent as if remote (default = false) -#define THOROPT_FORCE_REMOTE_KEYED_FETCH "forceRemoteKeyedFetch" // force all keyed fetches, even where part local to be sent as if remote (default = false) -#define THOROPT_KEYLOOKUP_MAX_LOCAL_HANDLERS "maxLocalHandlers" // maximum number of handlers dealing with local parts (default = 10) -#define THOROPT_KEYLOOKUP_MAX_REMOTE_HANDLERS "maxRemoteHandlers" // maximum number of handlers per remote slave (default = 2) -#define THOROPT_KEYLOOKUP_MAX_FETCH_LOCAL_HANDLERS "maxLocalFetchHandlers" // maximum number of fetch handlers dealing with local parts (default = 10) -#define THOROPT_KEYLOOKUP_MAX_FETCH_REMOTE_HANDLERS "maxRemoteFetchHandlers" // maximum number of fetch handlers per remote slave (default = 2) -#define THOROPT_KEYLOOKUP_COMPRESS_MESSAGES "keyedJoinCompressMsgs" // compress key and fetch request messages (default = true) -#define THOROPT_FORCE_REMOTE_DISABLED "forceRemoteDisabled" // disable remote (via dafilesrv) reads (NB: takes precedence over forceRemoteRead) (default = false) -#define THOROPT_FORCE_REMOTE_READ "forceRemoteRead" // force remote (via dafilesrv) read (NB: takes precedence over environment.conf setting) (default = false) -#define THOROPT_ACTINIT_WAITTIME_MINS "actInitWaitTimeMins" // max time to wait for slave activity initialization message from master -#define THOROPT_MAXLFN_BLOCKTIME_MINS "maxLfnBlockTimeMins" // max time permitted to be blocked on a DFS logical file operation. -#define THOROPT_VALIDATE_FILE_TYPE "validateFileType" // validate file type compatibility, e.g. if on fire error if XML reading CSV (default = true) -#define THOROPT_MIN_REMOTE_CQ_INDEX_SIZE_MB "minRemoteCQIndexSizeMb" // minimum size of index file to enable server side handling (default = 0, meaning use heuristic to determin) -#define THOROPT_KJ_ASSUME_PRIMARY "keyedJoinAssumePrimary" // assume primary part exists (don't check when mapping, which can be slow) -#define THOROPT_COMPRESS_SORTOVERFLOW "compressSortOverflow" // If global sort spills, compress the merged overflow file (default = true) -#define THOROPT_TIME_ACTIVITIES "timeActivities" // Time activities (default=true) -#define THOROPT_MAX_ACTIVITY_CORES "maxActivityCores" // controls number of default threads to use for very parallel phases (like sort/parallel join helper). (default = # of h/w cores) -#define THOROPT_THOR_ROWCRC "THOR_ROWCRC" // Use a CRC checking row allocator (default=false) -#define THOROPT_THOR_PACKEDALLOCATOR "THOR_PACKEDALLOCATOR" // Use packed roxiemem row allocators by default (default=true) -#define THOROPT_MEMORY_SPILL_AT "memorySpillAt" // The threshold (%) that roxiemem will request memory to be reduced (default=80) -#define THOROPT_FAIL_ON_LEAKS "failOnLeaks" // If any leaks are detected at the end of graph, fail the query (default=false) -#define THOROPT_SOAP_TRACE_LEVEL "soapTraceLevel" // The trace SOAP level (default=1) -#define THOROPT_SORT_ALGORITHM "sortAlgorithm" // The algorithm used to sort records (quicksort/mergesort) -#define THOROPT_COMPRESS_ALLFILES "compressAllOutputs" // Compress all output files (default: bare-metal=off, cloud=on) -#define THOROPT_AVOID_RENAME "avoidRename" // Avoid rename, write directly to target physical filenames (no temp file) -#define THOROPT_LOOKAHEAD_MAXROWMEMK "readAheadRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) -#define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) -#define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) -#define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB) - - - -#define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning +#define THOROPT_SPLITTER_WRITEAHEADK "splitterWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) +#define THOROPT_SPLITTER_COMPRESSIONTOTALK "splitterCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_LOOP_MAX_EMPTY "loopMaxEmpty" // Max # of iterations that LOOP can cycle through with 0 results before errors (default = 1000) +#define THOROPT_SMALLSORT "smallSortThreshold" // Use minisort approach, if estimate size of data to sort is below this setting (default = 0) +#define THOROPT_PARALLEL_FUNNEL "parallelFunnel" // Use parallel funnel impl. if !ordered (default = true) +#define THOROPT_SORT_MAX_DEVIANCE "sort_max_deviance" // Max (byte) variance allowed during sort partitioning (default = 10Mb) +#define THOROPT_OUTPUT_FLUSH_THRESHOLD "output_flush_threshold" // When above limit, workunit result is flushed (committed to Dali) (default = -1 [off]) +#define THOROPT_PARALLEL_MATCH "parallel_match" // Use multi-threaded join helper (retains sort order without unsorted_output) (default = false) +#define THOROPT_UNSORTED_OUTPUT "unsorted_output" // Allow Join results to be reodered, implies parallel match (default = false) +#define THOROPT_JOINHELPER_THREADS "joinHelperThreads" // Number of threads to use in threaded variety of join helper +#define THOROPT_LKJOIN_LOCALFAILOVER "lkjoin_localfailover" // Force SMART to failover to distributed local lookup join (for testing only) (default = false) +#define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only) (default = false) +#define THOROPT_MAX_KERNLOG "max_kern_level" // Max kernel logging level, to push to workunit, -1 to disable (default = 3) +#define THOROPT_COMP_FORCELZW "v9_4_forceLZW" // Forces file compression to use LZW (default = false) +#define THOROPT_COMP_FORCEFLZ "v9_4_forceFLZ" // Forces file compression to use FLZ (default = false) +#define THOROPT_COMP_FORCELZ4 "v9_4_forceLZ4" // Forces file compression to use LZ4 (default = false) +#define THOROPT_COMP_FORCELZ4HC "v9_4_forceLZ4HC" // Forces file compression to use LZ4HC (default = false) +#define THOROPT_TRACE_ENABLED "traceEnabled" // Output from TRACE activity enabled (default = false) +#define THOROPT_TRACE_LIMIT "traceLimit" // Number of rows from TRACE activity (default = 10) +#define THOROPT_READ_CRC "crcReadEnabled" // Enabled CRC validation on disk reads if file CRC are available (default = true) +#define THOROPT_WRITE_CRC "crcWriteEnabled" // Calculate CRC's for disk outputs and store in file meta data (default = true) +#define THOROPT_READCOMPRESSED_CRC "crcReadCompressedEnabled" // Enabled CRC validation on compressed disk reads if file CRC are available (default = false) +#define THOROPT_WRITECOMPRESSED_CRC "crcWriteCompressedEnabled" // Calculate CRC's for compressed disk outputs and store in file meta data (default = false) +#define THOROPT_CHILD_GRAPH_INIT_TIMEOUT "childGraphInitTimeout" // Time to wait for child graphs to respond to initialization (default = 5*60 seconds) +#define THOROPT_SORT_COMPBLKSZ "sortCompBlkSz" // Block size used by compressed spill in a spilling sort (default = 0, uses row writer default) +#define THOROPT_KEYLOOKUP_QUEUED_BATCHSIZE "keyLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000) +#define THOROPT_KEYLOOKUP_FETCH_QUEUED_BATCHSIZE "fetchLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000) +#define THOROPT_KEYLOOKUP_MAX_LOOKUP_BATCHSIZE "keyLookupMaxLookupBatchSize" // Maximum chunk of rows to process per cycle in lookup handler (default = 1000) +#define THOROPT_KEYLOOKUP_MAX_THREADS "maxKeyLookupThreads" // Maximum number of threads performing keyed lookups (default = 10) +#define THOROPT_KEYLOOKUP_MAX_FETCH_THREADS "maxFetchThreads" // Maximum number of threads performing keyed lookups (default = 10) +#define THOROPT_KEYLOOKUP_MAX_PROCESS_THREADS "keyLookupMaxProcessThreads" // Maximum number of threads performing keyed lookups (default = 10) +#define THOROPT_KEYLOOKUP_MAX_QUEUED "keyLookupMaxQueued" // Total maximum number of rows (across all parts/threads) to queue (default = 10000) +#define THOROPT_KEYLOOKUP_MIN_MB "keyLookupMinJoinGroupMB" // Min(MB) for groups (across all parts/threads) to queue) (default = 50) +#define THOROPT_KEYLOOKUP_MAX_DONE "keyLookupMaxDone" // Maximum number of done items pending to be ready by next activity (default = 10000) +#define THOROPT_KEYLOOKUP_PROCESS_BATCHLIMIT "keyLookupProcessBatchLimit" // Maximum number of key lookups on queue before passing to a processor (default = 1000) +#define THOROPT_FETCHLOOKUP_PROCESS_BATCHLIMIT "fetchLookupProcessBatchLimit" // Maximum number of fetch lookups on queue before passing to a processor (default = 10000) +#define THOROPT_REMOTE_KEYED_LOOKUP "remoteKeyedLookup" // Send key request to remote node unless part is local (default = true) +#define THOROPT_REMOTE_KEYED_FETCH "remoteKeyedFetch" // Send fetch request to remote node unless part is local (default = true) +#define THOROPT_FORCE_REMOTE_KEYED_LOOKUP "forceRemoteKeyedLookup" // force all keyed lookups, even where part local to be sent as if remote (default = false) +#define THOROPT_FORCE_REMOTE_KEYED_FETCH "forceRemoteKeyedFetch" // force all keyed fetches, even where part local to be sent as if remote (default = false) +#define THOROPT_KEYLOOKUP_MAX_LOCAL_HANDLERS "maxLocalHandlers" // maximum number of handlers dealing with local parts (default = 10) +#define THOROPT_KEYLOOKUP_MAX_REMOTE_HANDLERS "maxRemoteHandlers" // maximum number of handlers per remote slave (default = 2) +#define THOROPT_KEYLOOKUP_MAX_FETCH_LOCAL_HANDLERS "maxLocalFetchHandlers" // maximum number of fetch handlers dealing with local parts (default = 10) +#define THOROPT_KEYLOOKUP_MAX_FETCH_REMOTE_HANDLERS "maxRemoteFetchHandlers" // maximum number of fetch handlers per remote slave (default = 2) +#define THOROPT_KEYLOOKUP_COMPRESS_MESSAGES "keyedJoinCompressMsgs" // compress key and fetch request messages (default = true) +#define THOROPT_FORCE_REMOTE_DISABLED "forceRemoteDisabled" // disable remote (via dafilesrv) reads (NB: takes precedence over forceRemoteRead) (default = false) +#define THOROPT_FORCE_REMOTE_READ "forceRemoteRead" // force remote (via dafilesrv) read (NB: takes precedence over environment.conf setting) (default = false) +#define THOROPT_ACTINIT_WAITTIME_MINS "actInitWaitTimeMins" // max time to wait for slave activity initialization message from master +#define THOROPT_MAXLFN_BLOCKTIME_MINS "maxLfnBlockTimeMins" // max time permitted to be blocked on a DFS logical file operation. +#define THOROPT_VALIDATE_FILE_TYPE "validateFileType" // validate file type compatibility, e.g. if on fire error if XML reading CSV (default = true) +#define THOROPT_MIN_REMOTE_CQ_INDEX_SIZE_MB "minRemoteCQIndexSizeMb" // minimum size of index file to enable server side handling (default = 0, meaning use heuristic to determin) +#define THOROPT_KJ_ASSUME_PRIMARY "keyedJoinAssumePrimary" // assume primary part exists (don't check when mapping, which can be slow) +#define THOROPT_COMPRESS_SORTOVERFLOW "v9_4_compressSortOverflow" // If global sort spills, compress the merged overflow file (default = true) +#define THOROPT_TIME_ACTIVITIES "timeActivities" // Time activities (default=true) +#define THOROPT_MAX_ACTIVITY_CORES "maxActivityCores" // controls number of default threads to use for very parallel phases (like sort/parallel join helper). (default = # of h/w cores) +#define THOROPT_THOR_ROWCRC "THOR_ROWCRC" // Use a CRC checking row allocator (default=false) +#define THOROPT_THOR_PACKEDALLOCATOR "THOR_PACKEDALLOCATOR" // Use packed roxiemem row allocators by default (default=true) +#define THOROPT_MEMORY_SPILL_AT "memorySpillAt" // The threshold (%) that roxiemem will request memory to be reduced (default=80) +#define THOROPT_FAIL_ON_LEAKS "failOnLeaks" // If any leaks are detected at the end of graph, fail the query (default=false) +#define THOROPT_SOAP_TRACE_LEVEL "soapTraceLevel" // The trace SOAP level (default=1) +#define THOROPT_SORT_ALGORITHM "sortAlgorithm" // The algorithm used to sort records (quicksort/mergesort) +#define THOROPT_COMPRESS_ALLFILES "v9_4_compressAllOutputs" // Compress all output files (default: bare-metal=off, cloud=on) +#define THOROPT_AVOID_RENAME "avoidRename" // Avoid rename, write directly to target physical filenames (no temp file) +#define THOROPT_LOOKAHEAD_MAXROWMEMK "readAheadRowMemK" // Splitter max memory (K) to use before spilling (default = 2MB) +#define THOROPT_LOOKAHEAD_WRITEAHEADK "readAheadWriteAheadK" // Splitter spilling write ahead size (K) (default = 2MB) +#define THOROPT_LOOKAHEAD_COMPRESSIONTOTALK "readAheadCompressionTotalK" // Splitter total compression buffer size (shared between writer and readers) (K) (default = 3MB) +#define THOROPT_LOOKAHEAD_TEMPFILE_GRANULARITY "readAheadTempFileGranularity" // Splitter temp file granularity (default = 1GB) + +#define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000 // max of row matches before selfjoin emits warning #define THOR_SEM_RETRY_TIMEOUT 2 // Logging -enum ThorExceptionAction { tea_null, tea_warning, tea_abort, tea_shutdown }; +enum ThorExceptionAction +{ + tea_null, + tea_warning, + tea_abort, + tea_shutdown +}; -enum RegistryCode:unsigned { rc_register, rc_deregister }; +enum RegistryCode : unsigned +{ + rc_register, + rc_deregister +}; -#define createThorRow(size) malloc(size) -#define destroyThorRow(ptr) free(ptr) -#define reallocThorRow(ptr, size) realloc(ptr, size) +#define createThorRow(size) malloc(size) +#define destroyThorRow(ptr) free(ptr) +#define reallocThorRow(ptr, size) realloc(ptr, size) -//statistics gathered by the different activities +// statistics gathered by the different activities extern graph_decl const StatisticsMapping spillStatistics; extern graph_decl const StatisticsMapping jhtreeCacheStatistics; extern graph_decl const StatisticsMapping soapcallStatistics; @@ -174,6 +182,7 @@ extern graph_decl const std::map diskToTempStatsMa class BooleanOnOff { bool &tf; + public: inline BooleanOnOff(bool &_tf) : tf(_tf) { tf = true; } inline ~BooleanOnOff() { tf = false; } @@ -196,6 +205,7 @@ class CReplyCancelHandler SpinBlock b(lock); clear(); } + public: CReplyCancelHandler() { @@ -224,9 +234,9 @@ class CReplyCancelHandler } _comm->cancel(rank, _mpTag); } - bool recv(ICommunicator &_comm, CMessageBuffer &mb, rank_t rank, const mptag_t &_mpTag, rank_t *sender=NULL, unsigned timeout=MP_WAIT_FOREVER) + bool recv(ICommunicator &_comm, CMessageBuffer &mb, rank_t rank, const mptag_t &_mpTag, rank_t *sender = NULL, unsigned timeout = MP_WAIT_FOREVER) { - bool ret=false; + bool ret = false; { SpinBlock b(lock); if (cancelled) @@ -248,7 +258,6 @@ class CReplyCancelHandler } }; - class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded { std::atomic running; @@ -257,6 +266,7 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded unsigned timeout; StringAttr description; CThreaded threaded; + protected: Owned exception; @@ -266,7 +276,7 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded } void start() { - running = (timeout!=0); + running = (timeout != 0); if (running) threaded.start(false); } @@ -283,21 +293,27 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded CriticalBlock block(crit); if (exception.get()) { - { CriticalUnblock b(crit); - if (todo.wait(timeout*1000)) + { + CriticalUnblock b(crit); + if (todo.wait(timeout * 1000)) { // if signalled during timeout period, wait full timeout if (running) - todo.wait(timeout*1000); + todo.wait(timeout * 1000); } } - if (!running) break; + if (!running) + break; if (exception.get()) if (action()) break; } } } - void stop() { running = false; todo.signal(); } + void stop() + { + running = false; + todo.signal(); + } void inform(IException *e) { LOG(MCdebugProgress, "INFORM [%s]", description.get()); @@ -323,13 +339,14 @@ class graph_decl CTimeoutTrigger : public CInterface, implements IThreaded }; // Tracks the current and peak storage used for some files -class CFileSizeTracker: public CInterface +class CFileSizeTracker : public CInterface { RelaxedAtomic activeSize{0}; RelaxedAtomic peakSize{0}; - CFileSizeTracker * parentFileSizeTracker; + CFileSizeTracker *parentFileSizeTracker; + public: - CFileSizeTracker(CFileSizeTracker *parent=nullptr): parentFileSizeTracker(parent) + CFileSizeTracker(CFileSizeTracker *parent = nullptr) : parentFileSizeTracker(parent) { } void growSize(offset_t size) @@ -367,9 +384,10 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface Linked iFile; Linked fileSizeTracker; offset_t fileSize = 0; + public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CFileOwner(IFile *_iFile, CFileSizeTracker * _fileSizeTracker=nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) + CFileOwner(IFile *_iFile, CFileSizeTracker *_fileSizeTracker = nullptr) : iFile(_iFile), fileSizeTracker(_fileSizeTracker) { } ~CFileOwner() @@ -380,12 +398,12 @@ class graph_decl CFileOwner : public CSimpleInterface, implements IInterface } void noteSize(offset_t size) { - if (fileSizeTracker && fileSize!=size) + if (fileSizeTracker && fileSize != size) { if (size > fileSize) - fileSizeTracker->growSize(size-fileSize); + fileSizeTracker->growSize(size - fileSize); else - fileSizeTracker->shrinkSize(fileSize-size); + fileSizeTracker->shrinkSize(fileSize - size); } fileSize = size; } @@ -397,6 +415,7 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf { Linked fileOwner; IExtRowStream *stream; + public: CStreamFileOwner(CFileOwner *_fileOwner, IExtRowStream *_stream) : fileOwner(_fileOwner) { @@ -406,7 +425,7 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf { stream->Release(); } -// IExtRowStream + // IExtRowStream virtual const void *nextRow() override { return stream->nextRow(); } virtual void stop() override { stream->stop(NULL); } virtual offset_t getOffset() const override { return stream->getOffset(); } @@ -429,7 +448,6 @@ class graph_decl CStreamFileOwner : public CSimpleInterfaceOf } }; - #define DEFAULT_THORMASTERPORT 20000 #define DEFAULT_THORSLAVEPORT 20100 #define DEFAULT_SLAVEPORTINC 20 @@ -447,7 +465,7 @@ class graph_decl CFifoFileCache : public CSimpleInterface bool isAvailable(const char *filename); }; -interface graph_decl IBarrierException : extends IException {}; +interface graph_decl IBarrierException : extends IException{}; extern graph_decl IBarrierException *createBarrierAbortException(); interface graph_decl IThorException : extends IException @@ -459,7 +477,7 @@ interface graph_decl IThorException : extends IException virtual graph_id queryGraphId() const = 0; virtual const char *queryJobId() const = 0; virtual unsigned querySlave() const = 0; - virtual void getAssert(StringAttr &file, unsigned &line, unsigned &column) const = 0; + virtual void getAssert(StringAttr & file, unsigned &line, unsigned &column) const = 0; virtual const char *queryOrigin() const = 0; virtual ErrorSeverity querySeverity() const = 0; virtual const char *queryMessage() const = 0; @@ -476,19 +494,24 @@ interface graph_decl IThorException : extends IException virtual void setAssert(const char *file, unsigned line, unsigned column) = 0; virtual void setOrigin(const char *origin) = 0; virtual void setSeverity(ErrorSeverity severity) = 0; - virtual void setOriginalException(IException *e) = 0; + virtual void setOriginalException(IException * e) = 0; }; class CGraphElementBase; class CActivityBase; class CGraphBase; interface IRemoteConnection; -enum ActLogEnum { thorlog_null=0,thorlog_ecl=1,thorlog_all=2 }; +enum ActLogEnum +{ + thorlog_null = 0, + thorlog_ecl = 1, + thorlog_all = 2 +}; -extern graph_decl StringBuffer &ActPrintLogArgsPrep(StringBuffer &res, const CGraphElementBase *container, const ActLogEnum flags, const char *format, va_list args) __attribute__((format(printf,4,0))); +extern graph_decl StringBuffer &ActPrintLogArgsPrep(StringBuffer &res, const CGraphElementBase *container, const ActLogEnum flags, const char *format, va_list args) __attribute__((format(printf, 4, 0))); extern graph_decl void ActPrintLogEx(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 4, 5))); -extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf,4,0))); -extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf,5,0))); +extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf, 4, 0))); +extern graph_decl void ActPrintLogArgs(const CGraphElementBase *container, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf, 5, 0))); extern graph_decl void ActPrintLog(const CActivityBase *activity, const char *format, ...) __attribute__((format(printf, 2, 3))); extern graph_decl void ActPrintLog(const CActivityBase *activity, unsigned traceLevel, const char *format, ...) __attribute__((format(printf, 3, 4))); extern graph_decl void ActPrintLog(const CActivityBase *activity, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4))); @@ -530,9 +553,9 @@ inline void ActPrintLog(const CGraphElementBase *container, IException *e) { ActPrintLogEx(container, e, thorlog_null, MCexception(e, MSGCLS_error), "%s", ""); } -extern graph_decl void GraphPrintLogArgsPrep(StringBuffer &res, CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf,5,0))); -extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf,5,0))); -extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf,4,0))); +extern graph_decl void GraphPrintLogArgsPrep(StringBuffer &res, CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf, 5, 0))); +extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, IException *e, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf, 5, 0))); +extern graph_decl void GraphPrintLogArgs(CGraphBase *graph, const ActLogEnum flags, const LogMsgCategory &logCat, const char *format, va_list args) __attribute__((format(printf, 4, 0))); extern graph_decl void GraphPrintLog(CGraphBase *graph, IException *e, const char *format, ...) __attribute__((format(printf, 3, 4))); inline void GraphPrintLogEx(CGraphBase *graph, ActLogEnum flags, const LogMsgCategory &logCat, const char *format, ...) __attribute__((format(printf, 4, 5))); @@ -597,14 +620,23 @@ extern graph_decl const char *queryTempDir(); extern graph_decl void loadCmdProp(IPropertyTree *tree, const char *cmdProp); extern graph_decl void ensureDirectoryForFile(const char *fName); -extern graph_decl void reportExceptionToWorkunit(IConstWorkUnit &workunit,IException *e, ErrorSeverity severity=SeverityWarning); -extern graph_decl void reportExceptionToWorkunitCheckIgnore(IConstWorkUnit &workunit, IException *e, ErrorSeverity severity=SeverityWarning); - +extern graph_decl void reportExceptionToWorkunit(IConstWorkUnit &workunit, IException *e, ErrorSeverity severity = SeverityWarning); +extern graph_decl void reportExceptionToWorkunitCheckIgnore(IConstWorkUnit &workunit, IException *e, ErrorSeverity severity = SeverityWarning); extern graph_decl Owned globals; extern graph_decl mptag_t masterSlaveMpTag; extern graph_decl mptag_t kjServiceMpTag; -enum SlaveMsgTypes:unsigned { smt_errorMsg=1, smt_initGraphReq, smt_initActDataReq, smt_dataReq, smt_getPhysicalName, smt_getFileOffset, smt_actMsg, smt_getresult }; +enum SlaveMsgTypes : unsigned +{ + smt_errorMsg = 1, + smt_initGraphReq, + smt_initActDataReq, + smt_dataReq, + smt_getPhysicalName, + smt_getFileOffset, + smt_actMsg, + smt_getresult +}; extern graph_decl StringBuffer &getCompoundQueryName(StringBuffer &compoundName, const char *queryName, unsigned version); @@ -625,18 +657,17 @@ extern graph_decl unsigned queryNodeClusterWidth(); extern graph_decl mptag_t allocateClusterMPTag(); // should probably move into so used by master only extern graph_decl void freeClusterMPTag(mptag_t tag); // "" -extern graph_decl IThorException *deserializeThorException(MemoryBuffer &in); -void graph_decl serializeThorException(IException *e, MemoryBuffer &out); +extern graph_decl IThorException *deserializeThorException(MemoryBuffer &in); +void graph_decl serializeThorException(IException *e, MemoryBuffer &out); class CActivityBase; interface IPartDescriptor; -extern graph_decl bool getBestFilePart(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile & ifile, unsigned &location, StringBuffer &path, IExceptionHandler *eHandler = NULL); +extern graph_decl bool getBestFilePart(CActivityBase *activity, IPartDescriptor &partDesc, OwnedIFile &ifile, unsigned &location, StringBuffer &path, IExceptionHandler *eHandler = NULL); extern graph_decl StringBuffer &getFilePartLocations(IPartDescriptor &partDesc, StringBuffer &locations); -extern graph_decl StringBuffer &getPartFilename(IPartDescriptor &partDesc, unsigned copy, StringBuffer &filePath, bool localMount=false); +extern graph_decl StringBuffer &getPartFilename(IPartDescriptor &partDesc, unsigned copy, StringBuffer &filePath, bool localMount = false); extern graph_decl IOutputMetaData *createFixedSizeMetaData(size32_t sz); - interface IRowServer : extends IInterface { virtual void stop() = 0; @@ -653,7 +684,7 @@ extern graph_decl void sendInChunks(ICommunicator &comm, rank_t dst, mptag_t mpT extern graph_decl void logDiskSpace(); class CJobBase; -extern graph_decl IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int minLevel, IPerfMonHook *chain=NULL); // for passing to jdebug startPerformanceMonitor +extern graph_decl IPerfMonHook *createThorMemStatsPerfMonHook(CJobBase &job, int minLevel, IPerfMonHook *chain = NULL); // for passing to jdebug startPerformanceMonitor extern graph_decl bool isOOMException(IException *e); extern graph_decl IThorException *checkAndCreateOOMContextException(CActivityBase *activity, IException *e, const char *msg, rowcount_t numRows, IOutputMetaData *meta, const void *row); @@ -670,7 +701,7 @@ extern graph_decl void checkFileType(CActivityBase *activity, IDistributedFile * template inline void readUnderlyingType(MemoryBuffer &mb, T &v) { - mb.read(reinterpret_cast::type &> (v)); + mb.read(reinterpret_cast::type &>(v)); } constexpr unsigned thorDetailedLogLevel = 200; @@ -681,6 +712,7 @@ class graph_decl CThorPerfTracer : protected PerfTracer PerfTracer perf; StringAttr workunit; unsigned subGraphId; + public: void start(const char *workunit, unsigned subGraphId, double interval); void stop(); @@ -689,4 +721,3 @@ class graph_decl CThorPerfTracer : protected PerfTracer extern graph_decl void saveWuidToFile(const char *wuid); #endif -