Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.6.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Mar 21, 2024
2 parents be5f169 + 0c4aa45 commit 167031f
Show file tree
Hide file tree
Showing 31 changed files with 563 additions and 381 deletions.
386 changes: 363 additions & 23 deletions common/thorhelper/thorread.cpp

Large diffs are not rendered by default.

29 changes: 14 additions & 15 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11215,23 +11215,22 @@ class CDaliDFSServer: public Thread, public CTransactionLogTracker, implements I
Owned<IPropertyTree> tree = getNamedPropTree(sroot,queryDfsXmlBranchName(DXB_File),"@name",tail.str(),false);
if (tree)
{
if (isContainerized())
// This is for bare-metal clients using ~foreign pointing at a containerized/k8s setup,
// asking for the returned meta data to be remapped to point to the dafilesrv service.
if (isContainerized() && hasMask(opts, GetFileTreeOpts::remapToService))
{
// This is for bare-metal clients using ~foreign pointing at a containerized/k8s setup,
// asking for the returned meta data to be remapped to point to the dafilesrv service.
if (hasMask(opts, GetFileTreeOpts::remapToService))
{
tree.setown(createPTreeFromIPT(tree)); // copy live Dali tree, because it is about to be altered by remapGroupsToDafilesrv
remapGroupsToDafilesrv(tree, true, secureService);
groupResolver = nullptr; // do not attempt to resolve remapped group (it will not exist and cause addUnique to create a new anon one)

const char *remotePlaneName = tree->queryProp("@group");
Owned<IPropertyTree> filePlane = getStoragePlane(remotePlaneName);
assertex(filePlane);
// Used by DFS clients to determine if stripe and/or alias translation needed
tree->setPropTree("Attr/_remoteStoragePlane", createPTreeFromIPT(filePlane));
}
tree.setown(createPTreeFromIPT(tree)); // copy live Dali tree, because it is about to be altered by remapGroupsToDafilesrv
remapGroupsToDafilesrv(tree, true, secureService);
groupResolver = nullptr; // do not attempt to resolve remapped group (it will not exist and cause addUnique to create a new anon one)

const char *remotePlaneName = tree->queryProp("@group");
Owned<IPropertyTree> filePlane = getStoragePlane(remotePlaneName);
assertex(filePlane);
// Used by DFS clients to determine if stripe and/or alias translation needed
tree->setPropTree("Attr/_remoteStoragePlane", createPTreeFromIPT(filePlane));
}
else
tree->removeProp("Attr/_remoteStoragePlane");

Owned<IFileDescriptor> fdesc = deserializeFileDescriptorTree(tree,groupResolver,IFDSF_EXCLUDE_CLUSTERNAMES);
mb.append((int)1); // 1 == standard file
Expand Down
23 changes: 13 additions & 10 deletions dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,20 @@ class CFileCloner
// for now, only use source file descriptor as cloned source if it's from
// wsdfs file backed by remote storage using dafilesrv (NB: if it is '_remoteStoragePlane' will be set)
// JCSMORE: it may be this can replace the need for the other 'clone*' attributes altogether.
if (srcfdesc->queryProperties().hasProp("_remoteStoragePlane"))
if (srcfdesc->queryProperties().hasProp("_remoteStoragePlane") && srcdali && !srcdali->endpoint().isNull())
{
if (srcdali && !srcdali->endpoint().isNull())
{
attrs.setPropTree("cloneFromFDesc", createPTreeFromIPT(srcTree));
StringBuffer host;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(host).str());
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());
return;
}
attrs.setPropTree("cloneFromFDesc", createPTreeFromIPT(srcTree));
StringBuffer host;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(host).str());
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());
return;
}
else
{
attrs.removeProp("cloneFromFDesc");
attrs.removeProp("@cloneFrom");
attrs.removeProp("@cloneFromPrefix");
}

while(attrs.removeProp("cloneFromGroup"));
Expand Down
3 changes: 3 additions & 0 deletions ecl/eclagent/eclagentmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ int main(int argc, const char *argv[])
try
{
ret = eclagent_main(argc, argv);
//Do not return a non-zero error code in containerized mode - otherwise the system will think it failed to run
if (isContainerized())
ret = 0;
}
catch (IException *E)
{
Expand Down
7 changes: 6 additions & 1 deletion esp/scm/ws_dali.ecm
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ ESPrequest [nil_remove] DisconnectClientConnectionRequest
string Endpoint;
};

ESPrequest [nil_remove] ListSDSLocksRequest
{
};

ESPrequest [nil_remove] UnlockSDSLockRequest
{
string ConnectionID;
Expand All @@ -180,7 +184,7 @@ ESPrequest [nil_remove] ClearTraceTransactionsRequest
};

ESPservice [auth_feature("NONE"), //This declares that the method logic handles feature level authorization
version("1.06"), default_client_version("1.06"), exceptions_inline("./smc_xslt/exceptions.xslt")] WSDali
version("1.07"), default_client_version("1.07"), exceptions_inline("./smc_xslt/exceptions.xslt")] WSDali
{
ESPmethod [min_ver("1.01")] SetValue(SetValueRequest, ResultResponse);
ESPmethod [min_ver("1.01")] GetValue(GetValueRequest, ResultResponse);
Expand All @@ -205,6 +209,7 @@ ESPservice [auth_feature("NONE"), //This declares that the method logic handles
ESPmethod [min_ver("1.05")] GetSDSStats(GetSDSStatsRequest, ResultResponse);
ESPmethod [min_ver("1.05")] GetSDSSubscribers(GetSDSSubscribersRequest, ResultResponse);
ESPmethod [min_ver("1.06")] DisconnectClientConnection(DisconnectClientConnectionRequest, ResultResponse);
ESPmethod [min_ver("1.07")] ListSDSLocks(ListSDSLocksRequest, ResultResponse);
ESPmethod [min_ver("1.06")] UnlockSDSLock(UnlockSDSLockRequest, ResultResponse);
ESPmethod [min_ver("1.06")] SaveSDSStore(SaveSDSStoreRequest, ResultResponse);
ESPmethod [min_ver("1.06")] SetTraceTransactions(SetTraceTransactionsRequest, ResultResponse);
Expand Down
18 changes: 18 additions & 0 deletions esp/services/ws_dali/ws_daliservice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,24 @@ bool CWSDaliEx::onDisconnectClientConnection(IEspContext& context, IEspDisconnec
return true;
}

bool CWSDaliEx::onListSDSLocks(IEspContext& context, IEspListSDSLocksRequest& req, IEspResultResponse& resp)
{
try
{
checkAccess(context);

Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks();
StringBuffer result;
lockInfoCollection->toString(result);
resp.setResult(result);
}
catch(IException* e)
{
FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
}
return true;
}

bool CWSDaliEx::onUnlockSDSLock(IEspContext& context, IEspUnlockSDSLockRequest& req, IEspResultResponse& resp)
{
try
Expand Down
1 change: 1 addition & 0 deletions esp/services/ws_dali/ws_daliservice.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class CWSDaliEx : public CWSDali
virtual bool onGetSDSStats(IEspContext& context, IEspGetSDSStatsRequest& req, IEspResultResponse& resp) override;
virtual bool onGetSDSSubscribers(IEspContext& context, IEspGetSDSSubscribersRequest& req, IEspResultResponse& resp) override;
virtual bool onDisconnectClientConnection(IEspContext& context, IEspDisconnectClientConnectionRequest& req, IEspResultResponse& resp) override;
virtual bool onListSDSLocks(IEspContext& context, IEspListSDSLocksRequest& req, IEspResultResponse& resp) override;
virtual bool onUnlockSDSLock(IEspContext& context, IEspUnlockSDSLockRequest& req, IEspResultResponse& resp) override;
virtual bool onSaveSDSStore(IEspContext& context, IEspSaveSDSStoreRequest& req, IEspResultResponse& resp) override;
virtual bool onSetTraceTransactions(IEspContext& context, IEspSetTraceTransactionsRequest& req, IEspResultResponse& resp) override;
Expand Down
14 changes: 12 additions & 2 deletions esp/src/src-react/components/ECLArchive.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import * as React from "react";
import { CommandBar, ContextualMenuItemType, ICommandBarItemProps } from "@fluentui/react";
import { WUDetails, IScope } from "@hpcc-js/comms";
import { Workunit, WUDetails, IScope } from "@hpcc-js/comms";
import { scopedLogger } from "@hpcc-js/util";
import nlsHPCC from "src/nlsHPCC";
import { useWorkunitArchive } from "../hooks/workunit";
import { useWorkunitMetrics } from "../hooks/metrics";
Expand All @@ -12,6 +13,8 @@ import { ECLArchiveTree } from "./ECLArchiveTree";
import { ECLArchiveEditor } from "./ECLArchiveEditor";
import { MetricsPropertiesTables } from "./MetricsPropertiesTables";

const logger = scopedLogger("src-react/components/ECLArchive.tsx");

const scopeFilterDefault: WUDetails.RequestNS.ScopeFilter = {
MaxDepth: 999999,
ScopeTypes: ["graph"]
Expand Down Expand Up @@ -54,8 +57,15 @@ export const ECLArchive: React.FunctionComponent<ECLArchiveProps> = ({
setSelectionText(archive?.content(selection) ?? "");
setMarkers(archive?.markers(selection) ?? []);
setSelectedMetrics(archive?.metrics(selection) ?? []);
} else {
if (archive && !archive.build) {
const wu = Workunit.attach({ baseUrl: "" }, wuid);
wu.fetchQuery().then(function (query) {
setSelectionText(query?.Text ?? "");
}).catch(err => logger.error(err));
}
}
}, [archive, metrics.length, selection]);
}, [archive, metrics.length, selection, wuid]);

const setSelectedItem = React.useCallback((selId: string) => {
pushUrl(`${parentUrl}/${selId}`);
Expand Down
4 changes: 2 additions & 2 deletions esp/src/src-react/components/forms/Fields.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ const AsyncDropdown: React.FunctionComponent<AsyncDropdownProps> = ({
}, [onChange, selectedItem, selectedIdx, selectedKey]);

return options === undefined ?
<DropdownBase label={label} options={[]} placeholder={nlsHPCC.loadingMessage} disabled={true} /> :
<DropdownBase label={label} options={selOptions} selectedKey={selectedItem?.key} onChange={(_, item: IDropdownOption) => setSelectedItem(item)} placeholder={placeholder} disabled={disabled} required={required} errorMessage={errorMessage} className={className} />;
<DropdownBase label={label} dropdownWidth="auto" options={[]} placeholder={nlsHPCC.loadingMessage} disabled={true} /> :
<DropdownBase label={label} dropdownWidth="auto" options={selOptions} selectedKey={selectedItem?.key} onChange={(_, item: IDropdownOption) => setSelectedItem(item)} placeholder={placeholder} disabled={disabled} required={required} errorMessage={errorMessage} className={className} />;
};

interface DropdownMultiProps {
Expand Down
17 changes: 12 additions & 5 deletions fs/dafsserver/dafsserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1235,11 +1235,14 @@ class CRemoteDiskBaseActivity : public CSimpleInterfaceOf<IRemoteReadActivity>,
}
virtual void serializeCursor(MemoryBuffer &tgt) const override
{
throwUnexpected();
// we need to serialize something, because the lack of a cursor is used to signify end of stream
// NB: the cursor is opaque and only to be consumed by dafilesrv. When used it is simply passed back.
tgt.append("UNSUPPORTED");
}
virtual void restoreCursor(MemoryBuffer &src) override
{
throwUnexpected();
throw makeStringExceptionV(0, "restoreCursor not supported in: %s", typeid(*this).name());
throwUnimplemented();
}
virtual void flushStatistics(CClientStats &stats) override
{
Expand Down Expand Up @@ -2019,11 +2022,13 @@ class CRemoteJsonReadActivity : public CRemoteMarkupReadActivity
public:
CRemoteJsonReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc, TAKjsonread)
{
xpath.set("/");
if (customRowTag.isEmpty()) // no override
fileDesc->queryProperties().getProp("@rowTag", xpath);
else
{
xpath.set("/");
xpath.append(customRowTag);
}
}
};

Expand Down Expand Up @@ -2392,11 +2397,13 @@ class CRemoteWriteBaseActivity : public CSimpleInterfaceOf<IRemoteWriteActivity>
}
virtual void serializeCursor(MemoryBuffer &tgt) const override
{
throwUnexpected();
// we need to serialize something, because the lack of a cursor is used to signify end of stream
// NB: the cursor is opaque and only to be consumed by dafilesrv. When used it is simply passed back.
tgt.append("UNSUPPORTED");
}
virtual void restoreCursor(MemoryBuffer &src) override
{
throwUnexpected();
throw makeStringExceptionV(0, "restoreCursor not supported in: %s", typeid(*this).name());
}
virtual StringBuffer &getInfoStr(StringBuffer &out) const override
{
Expand Down
6 changes: 5 additions & 1 deletion roxie/ccd/ccd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,11 @@ inline unsigned getBondedChannel(unsigned partNo)
extern void FatalError(const char *format, ...) __attribute__((format(printf, 1, 2)));
extern unsigned getNextInstanceId();
extern void closedown();
extern void saveTopology();
extern void saveTopology(bool lockDali);
extern unsigned __int64 getTopologyHash();

extern unsigned __int64 currentTopologyHash;
extern unsigned __int64 originalTopologyHash;

#define LOGGING_INTERCEPTED 0x01
#define LOGGING_TIMEACTIVITIES 0x02
Expand Down
21 changes: 17 additions & 4 deletions roxie/ccd/ccdmain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,16 +419,18 @@ int myhook(int alloctype, void *, size_t nSize, int p1, long allocSeq, const uns
}
#endif

void saveTopology()
void saveTopology(bool lockDali)
{
// Write back changes that have been made via certain control:xxx changes, so that they survive a roxie restart
// Write back changes that have been made via control:(un)lockDali changes, so that they survive a roxie restart
// Note that they are overwritten when Roxie is manually stopped/started via hpcc-init service - these changes
// are only intended to be temporary for the current session
if (!useOldTopology)
return;
try
{
saveXML(topologyFile.str(), topology);
Owned<IPTree> tempTopology = createPTreeFromXMLFile(topologyFile.str(), ipt_caseInsensitive);
tempTopology->setPropBool("@lockDali", lockDali);
saveXML(topologyFile.str(), tempTopology);
}
catch (IException *E)
{
Expand Down Expand Up @@ -461,6 +463,16 @@ static std::vector<std::pair<unsigned, unsigned>> agentChannels;

void *leakChecker = nullptr; // Used to deliberately leak an allocation to ensure leak checking is working

unsigned __int64 currentTopologyHash = 0;
unsigned __int64 originalTopologyHash = 0;

hash64_t getTopologyHash()
{
StringBuffer xml;
toXML(topology, xml, 0, XML_SortTags);
return rtlHash64Data(xml.length(), xml.str(), 707018);
}

#ifndef _CONTAINERIZED
void readStaticTopology()
{
Expand Down Expand Up @@ -695,7 +707,8 @@ int CCD_API roxie_main(int argc, const char *argv[], const char * defaultYaml)
topologyFile.append(codeDirectory).append(PATHSEPCHAR).append("RoxieTopology.xml");
useOldTopology = checkFileExists(topologyFile.str());
topology = loadConfiguration(useOldTopology ? nullptr : defaultYaml, argv, "roxie", "ROXIE", topologyFile, nullptr, "@netAddress");
saveTopology();
saveTopology(topology->getPropBool("@lockDali", false));
originalTopologyHash = currentTopologyHash = getTopologyHash();

// Any settings we read from topology that must NOT be overridden in workunit debug fields should be read at this point, before the following section
getAllowedPipePrograms(allowedPipePrograms, true);
Expand Down
14 changes: 14 additions & 0 deletions roxie/ccd/ccdqueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,8 @@ struct PingRecord
{
unsigned tick;
IpAddress senderIP;
unsigned __int64 currentTopoHash;
unsigned __int64 originalTopoHash;
};

void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
Expand All @@ -1059,6 +1061,16 @@ void doPing(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
StringBuffer s;
throw MakeStringException(ROXIE_UNKNOWN_SERVER, "Message received from unknown Roxie server %s", header.toString(s).str());
}
if (originalTopologyHash != data->originalTopoHash)
{
StringBuffer s;
EXCLOG(MCoperatorError,"ERROR: Configuration file mismatch detected with Roxie server %s", header.toString(s).str());
}
if (currentTopologyHash != data->currentTopoHash)
{
StringBuffer s;
DBGLOG("WARNING: Temporary configuration mismatch detected with Roxie server %s", header.toString(s).str());
}
RoxiePacketHeader newHeader(header, ROXIE_PING, 0); // subchannel not relevant
Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
void *ret = output->getBuffer(contextLength, false);
Expand Down Expand Up @@ -3887,6 +3899,8 @@ class PingTimer : public Thread
PingRecord data;
data.senderIP.ipset(myNode.getIpAddress());
data.tick = usTick();
data.originalTopoHash = originalTopologyHash;
data.currentTopoHash = currentTopologyHash;
mb.append(sizeof(PingRecord), &data);
if (doTrace(traceRoxiePings))
DBGLOG("PING sent");
Expand Down
Loading

0 comments on commit 167031f

Please sign in to comment.