Skip to content

Commit

Permalink
Merge pull request #17813 from afishbeck/cliRemoteDfsSupport3
Browse files Browse the repository at this point in the history
HPCC-27843 CLI support for specifying remote storage instead of DALI

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Oct 4, 2023
2 parents b63ec78 + 8071bf5 commit b73ac78
Show file tree
Hide file tree
Showing 23 changed files with 512 additions and 176 deletions.
2 changes: 2 additions & 0 deletions common/pkgfiles/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ include_directories (
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
${HPCC_SOURCE_DIR}/common/workunit
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand All @@ -58,6 +59,7 @@ if(NOT PLUGIN)
target_link_libraries(
pkgfiles
dfuwu
ws_dfsclient
)
if (NOT CONTAINERIZED)
target_link_libraries(pkgfiles environment)
Expand Down
2 changes: 1 addition & 1 deletion common/pkgfiles/pkgimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PKGFILES_API CPackageNode : implements IHpccPackage, public CInterface

inline StringBuffer &makeSuperFileXPath(StringBuffer &xpath, const char *superFileName) const
{
superFileName = skipForeign(superFileName);
superFileName = skipForeignOrRemote(superFileName);
return xpath.append("SuperFile[@id='").appendLower(strlen(superFileName), superFileName).append("']");
}

Expand Down
319 changes: 229 additions & 90 deletions common/pkgfiles/referencedfilelist.cpp

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions common/pkgfiles/referencedfilelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
#define RefFileIndex 0x0001
#define RefFileNotOnCluster 0x0002
#define RefFileNotFound 0x0004
#define RefFileRemote 0x0008
#define RefFileForeign 0x0010
#define RefFileSuper 0x0020
#define RefSubFile 0x0040
#define RefFileCopyInfoFailed 0x0080
#define RefFileCloned 0x0100
#define RefFileInPackage 0x0200
#define RefFileNotOnSource 0x0400
#define RefFileOptional 0x0800 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x1000
#define RefFileResolvedForeign 0x0008
#define RefFileResolvedRemote 0x0010
#define RefFileLFNForeign 0x0020 //LFN was Foreign
#define RefFileLFNRemote 0x0040 //LFN was remote
#define RefFileSuper 0x0080
#define RefSubFile 0x0100
#define RefFileCopyInfoFailed 0x0200
#define RefFileCloned 0x0400
#define RefFileInPackage 0x0800
#define RefFileNotOnSource 0x1000
#define RefFileOptional 0x2000 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x4000


interface IReferencedFile : extends IInterface
Expand Down Expand Up @@ -70,14 +72,14 @@ interface IReferencedFileList : extends IInterface
virtual void addFiles(StringArray &files)=0;

virtual IReferencedFileIterator *getFiles()=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage)=0;
virtual void cloneAllInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneFileInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneRelationships()=0;
virtual void setDfuQueue(const char *dfu_queue) = 0;
};

extern REFFILES_API const char *skipForeign(const char *name, StringBuffer *ip=NULL);
extern REFFILES_API const char *skipForeignOrRemote(const char *name, StringBuffer *ip=nullptr, StringBuffer *remote=nullptr);

extern REFFILES_API IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
extern REFFILES_API IReferencedFileList *createReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
Expand Down
1 change: 1 addition & 0 deletions dali/datest/datest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include_directories (
${HPCC_SOURCE_DIR}/system/jlib
${HPCC_SOURCE_DIR}/system/security/shared
${HPCC_SOURCE_DIR}/esp/clients/wsdfuaccess
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
${HPCC_SOURCE_DIR}/fs/dafsstream
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
Expand Down
2 changes: 2 additions & 0 deletions dali/datest/dfuwutest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include_directories (
./../../system/jlib
./../../common/workunit
../../common/environment
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

HPCC_ADD_EXECUTABLE ( dfuwutest ${SRCS} )
Expand All @@ -50,6 +51,7 @@ target_link_libraries ( dfuwutest
dafsclient
dalibase
dfuwu
ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand Down
4 changes: 3 additions & 1 deletion dali/dfu/dfurun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Owned<INode> foreigndalinode;
StringAttr oldRoxiePrefix;
bool foreigncopy = false;
bool remotecopy = false;
// first check for 'specials' (e.g. multi-cluster keydiff etc)
switch (cmd) {
case DFUcmd_copy:
Expand All @@ -1311,6 +1312,7 @@ class CDFUengine: public CInterface, implements IDFUengine
CDfsLogicalFileName srclfn;
if (tmp.length())
srclfn.set(tmp.str());
remotecopy = srclfn.isRemote();
destination->getLogicalName(tmp.clear());
CDfsLogicalFileName dstlfn;
if (tmp.length())
Expand Down Expand Up @@ -1666,7 +1668,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Audit("COPYDIFF",userdesc,srcName.get(),dstName.get());
}
}
else if (foreigncopy||auxfdesc)
else if (remotecopy||foreigncopy||auxfdesc)
{
IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : srcFdesc.get());
fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
Expand Down
91 changes: 63 additions & 28 deletions dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rmtfile.hpp"
#include "dfuutil.hpp"

#include "ws_dfsclient.hpp"

// savemap
// superkey functions
// (logical) directory functions
Expand Down Expand Up @@ -129,6 +131,7 @@ class CFileCloner
{
public:
StringAttr nameprefix;
StringAttr remoteStorage;
Owned<INode> foreigndalinode;
Linked<IUserDescriptor> userdesc;
Linked<IUserDescriptor> foreignuserdesc;
Expand Down Expand Up @@ -409,33 +412,41 @@ class CFileCloner
void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
{
DBGLOG("updateCloneFrom %s", lfn);
if (!srcdali || srcdali->endpoint().isNull())
if (remoteStorage.isEmpty() && (!srcdali || srcdali->endpoint().isNull()))
attrs.setProp("@cloneFromPeerCluster", srcCluster);
else
{
while(attrs.removeProp("cloneFromGroup"));

StringBuffer s;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
if (!remoteStorage.isEmpty())
{
attrs.setProp("@cloneRemote", remoteStorage.str());
if (!isEmptyString(srcCluster))
attrs.setProp("@cloneRemoteCluster", srcCluster);
}
else
{
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources
attrs.setProp("@cloneFromPeerCluster", "-");
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());

while(attrs.removeProp("cloneFromGroup"));

unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
}
void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
Expand Down Expand Up @@ -570,6 +581,7 @@ class CFileCloner
const char *_cluster2,
IUserDescriptor *_userdesc,
const char *_foreigndali,
const char *_remoteStorage,
IUserDescriptor *_foreignuserdesc,
const char *_nameprefix,
bool _overwrite,
Expand All @@ -590,6 +602,8 @@ class CFileCloner
level = 0;
if (_foreigndali&&*_foreigndali)
foreigndalinode.setown(createINode(_foreigndali,DALI_SERVER_PORT));
if (_remoteStorage && *_remoteStorage)
remoteStorage.set(_remoteStorage);
fdir = &queryDistributedFileDirectory();
switch(_clustmap) {
case DFUcpdm_c_replicated_by_d:
Expand Down Expand Up @@ -833,6 +847,8 @@ class CFileCloner
else
{
StringBuffer s;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneRemote"), remoteStorage.str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getEndpointHostText(s).str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir()))
Expand Down Expand Up @@ -876,20 +892,38 @@ class CFileCloner
srcLFN.clearForeign();
srcdali.setown(createINode(ep));
}

StringBuffer s;
Owned<IPropertyTree> ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign);
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
IPropertyTree *attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
Owned<IPropertyTree> ftree;
IPropertyTree *attsrc = nullptr;
if (remoteStorage || srcLFN.isRemote())
{
StringBuffer remoteLFN;
if (!srcLFN.isRemote())
remoteLFN.append("remote::").append(remoteStorage).append("::");
srcLFN.get(remoteLFN);

Owned<wsdfs::IDFSFile> dfsFile = wsdfs::lookupDFSFile(remoteLFN.str(), AccessMode::readSequential, INFINITE, wsdfs::keepAliveExpiryFrequency, foreignuserdesc);
if (!dfsFile)
throw makeStringExceptionV(-1,"Source file %s could not be found in Remote Storage", remoteLFN.str()); //remote scope already included in remoteLFN
ftree.setown(dfsFile->queryFileMeta()->getPropTree("File"));
}
else
{
ftree.setown(fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign));
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
}

CDfsLogicalFileName dlfn;
dlfn.set(destfilename);
if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File)))
throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s));

if (!srcdali.get()||queryCoven().inCoven(srcdali))
if (!remoteStorage.length() && (!srcdali.get() || queryCoven().inCoven(srcdali)))
{
// if dali is local and filenames same
if (streq(srcLFN.get(), dlfn.get()))
Expand Down Expand Up @@ -1241,7 +1275,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
)
{
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
CDfsLogicalFileName dlfn;
cloner.cloneSuperFile(srcname,dlfn);
}
Expand All @@ -1263,7 +1297,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
{
DBGLOG("createSingleFileClone src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=%d", srcname, srcCluster, dstname, cluster1, prefix, overwrite, dophysicalcopy);
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.srcCluster.set(srcCluster);
cloner.prefix.set(prefix);
cloner.cloneFile(srcname,dstname);
Expand All @@ -1280,6 +1314,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
const char *defReplicateFolder,
IUserDescriptor *userdesc, // user desc for local dali
const char *foreigndali, // can be omitted if srcname foreign or local
const char *remoteStorage, // can be omitted if srcname remote or local
unsigned overwriteFlags, // overwrite destination if exists
bool dophysicalcopy
)
Expand All @@ -1288,7 +1323,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
CFileCloner cloner;
// MORE: Would the following be better to ensure files are copied when queries are deployed?
// bool copyPhysical = isContainerized() && (foreigndali != nullptr);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, dophysicalcopy);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, remoteStorage, NULL, NULL, false, dophysicalcopy);
cloner.overwriteFlags = overwriteFlags;
#ifndef _CONTAINERIZED
//In containerized mode there is no need to replicate files to the local disks of the roxie cluster - so don't set the special flag
Expand Down
2 changes: 2 additions & 0 deletions dali/dfu/dfuutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface IDfuFileCopier: extends IInterface
#define DALI_UPDATEF_PACKAGEMAP 0x0100
#define DFU_UPDATEF_COPY 0x1000
#define DFU_UPDATEF_OVERWRITE 0x2000
#define DFU_UPDATEF_REMOTESTORAGE 0x4000


#define DALI_UPDATEF_SUBFILE_MASK (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_APPEND_CLUSTER)
Expand Down Expand Up @@ -93,6 +94,7 @@ interface IDFUhelper: extends IInterface
const char *defReplicateFolder,
IUserDescriptor *userdesc, // user desc for local dali
const char *foreigndali, // can be omitted if srcname foreign or local
const char *remoteStorage, // can be omitted if srcname foreign or local
unsigned overwriteFlags, // overwrite destination options
bool dophysicalcopy
) = 0;
Expand Down
2 changes: 2 additions & 0 deletions dali/dfu/dfuwu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ include_directories (
./../../system/include
./../../system/jlib
./../../common/workunit
./../../esp/clients/ws_dfsclient
${HPCC_SOURCE_DIR}/system/security/shared #seclib.hpp
)

Expand All @@ -53,5 +54,6 @@ target_link_libraries ( dfuwu
hrpc
remote
dalibase
ws_dfsclient
)

4 changes: 3 additions & 1 deletion dali/dfu/dfuwu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "wujobq.hpp"
#include "dfuutil.hpp"

#include "ws_dfsclient.hpp"

#include "dfuwu.hpp"

#define COPY_WAIT_SECONDS 30
Expand Down Expand Up @@ -981,7 +983,7 @@ class CDFUfileSpec: public CLinkedDFUWUchild, implements IDFUfileSpec
parent->getPassword(password);
}
userdesc->set(username.str(),password.str());
Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser);
Owned<IDistributedFile> file = wsdfs::lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser,INFINITE);
if (file)
return file->getFileDescriptor();
}
Expand Down
3 changes: 3 additions & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,9 @@ void FileSprayer::calibrateProgress()

void FileSprayer::checkForOverlap()
{
if (distributedSource && distributedSource->isExternal())
return;

unsigned num = std::min(sources.ordinality(), targets.ordinality());

for (unsigned idx = 0; idx < num; idx++)
Expand Down
Loading

0 comments on commit b73ac78

Please sign in to comment.