Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-27843 CLI support for specifying remote storage instead of DALI #17813

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/pkgfiles/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ include_directories (
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
${HPCC_SOURCE_DIR}/common/workunit
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand All @@ -58,6 +59,7 @@ if(NOT PLUGIN)
target_link_libraries(
pkgfiles
dfuwu
ws_dfsclient
)
if (NOT CONTAINERIZED)
target_link_libraries(pkgfiles environment)
Expand Down
2 changes: 1 addition & 1 deletion common/pkgfiles/pkgimpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PKGFILES_API CPackageNode : implements IHpccPackage, public CInterface

inline StringBuffer &makeSuperFileXPath(StringBuffer &xpath, const char *superFileName) const
{
superFileName = skipForeign(superFileName);
superFileName = skipForeignOrRemote(superFileName);
return xpath.append("SuperFile[@id='").appendLower(strlen(superFileName), superFileName).append("']");
}

Expand Down
319 changes: 229 additions & 90 deletions common/pkgfiles/referencedfilelist.cpp

Large diffs are not rendered by default.

26 changes: 14 additions & 12 deletions common/pkgfiles/referencedfilelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@
#define RefFileIndex 0x0001
#define RefFileNotOnCluster 0x0002
#define RefFileNotFound 0x0004
#define RefFileRemote 0x0008
#define RefFileForeign 0x0010
#define RefFileSuper 0x0020
#define RefSubFile 0x0040
#define RefFileCopyInfoFailed 0x0080
#define RefFileCloned 0x0100
#define RefFileInPackage 0x0200
#define RefFileNotOnSource 0x0400
#define RefFileOptional 0x0800 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x1000
#define RefFileResolvedForeign 0x0008
#define RefFileResolvedRemote 0x0010
#define RefFileLFNForeign 0x0020 //LFN was Foreign
#define RefFileLFNRemote 0x0040 //LFN was remote
#define RefFileSuper 0x0080
#define RefSubFile 0x0100
#define RefFileCopyInfoFailed 0x0200
#define RefFileCloned 0x0400
#define RefFileInPackage 0x0800
#define RefFileNotOnSource 0x1000
#define RefFileOptional 0x2000 //File referenced in more than one place can be both optional and not optional
#define RefFileNotOptional 0x4000


interface IReferencedFile : extends IInterface
Expand Down Expand Up @@ -70,14 +72,14 @@ interface IReferencedFileList : extends IInterface
virtual void addFiles(StringArray &files)=0;

virtual IReferencedFileIterator *getFiles()=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveForeign=false)=0;
virtual void resolveFiles(const StringArray &locations, const char *remoteIP, const char * remotePrefix, const char *srcCluster, bool checkLocalFirst, bool addSubFiles, bool trackSubFiles, bool resolveLFNForeign, bool useRemoteStorage)=0;
virtual void cloneAllInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneFileInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defRepFolder)=0;
virtual void cloneRelationships()=0;
virtual void setDfuQueue(const char *dfu_queue) = 0;
};

extern REFFILES_API const char *skipForeign(const char *name, StringBuffer *ip=NULL);
extern REFFILES_API const char *skipForeignOrRemote(const char *name, StringBuffer *ip=nullptr, StringBuffer *remote=nullptr);

extern REFFILES_API IReferencedFileList *createReferencedFileList(const char *user, const char *pw, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
extern REFFILES_API IReferencedFileList *createReferencedFileList(IUserDescriptor *userDesc, bool allowForeignFiles, bool allowFileSizeCalc, const char *jobname = nullptr);
Expand Down
1 change: 1 addition & 0 deletions dali/datest/datest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ include_directories (
${HPCC_SOURCE_DIR}/system/jlib
${HPCC_SOURCE_DIR}/system/security/shared
${HPCC_SOURCE_DIR}/esp/clients/wsdfuaccess
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
${HPCC_SOURCE_DIR}/fs/dafsstream
${HPCC_SOURCE_DIR}/rtl/include
${HPCC_SOURCE_DIR}/rtl/eclrtl
Expand Down
2 changes: 2 additions & 0 deletions dali/datest/dfuwutest.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include_directories (
./../../system/jlib
./../../common/workunit
../../common/environment
${HPCC_SOURCE_DIR}/esp/clients/ws_dfsclient
)

HPCC_ADD_EXECUTABLE ( dfuwutest ${SRCS} )
Expand All @@ -50,6 +51,7 @@ target_link_libraries ( dfuwutest
dafsclient
dalibase
dfuwu
ws_dfsclient
)

if (NOT CONTAINERIZED)
Expand Down
4 changes: 3 additions & 1 deletion dali/dfu/dfurun.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Owned<INode> foreigndalinode;
StringAttr oldRoxiePrefix;
bool foreigncopy = false;
bool remotecopy = false;
// first check for 'specials' (e.g. multi-cluster keydiff etc)
switch (cmd) {
case DFUcmd_copy:
Expand All @@ -1311,6 +1312,7 @@ class CDFUengine: public CInterface, implements IDFUengine
CDfsLogicalFileName srclfn;
if (tmp.length())
srclfn.set(tmp.str());
remotecopy = srclfn.isRemote();
destination->getLogicalName(tmp.clear());
CDfsLogicalFileName dstlfn;
if (tmp.length())
Expand Down Expand Up @@ -1666,7 +1668,7 @@ class CDFUengine: public CInterface, implements IDFUengine
Audit("COPYDIFF",userdesc,srcName.get(),dstName.get());
}
}
else if (foreigncopy||auxfdesc)
else if (remotecopy||foreigncopy||auxfdesc)
{
IFileDescriptor * srcDesc = (auxfdesc.get() ? auxfdesc.get() : srcFdesc.get());
fsys.import(srcDesc, dstFile, recovery, recoveryconn, filter, opttree, &feedback, &abortnotify, dfuwuid);
Expand Down
91 changes: 63 additions & 28 deletions dali/dfu/dfuutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "rmtfile.hpp"
#include "dfuutil.hpp"

#include "ws_dfsclient.hpp"

// savemap
// superkey functions
// (logical) directory functions
Expand Down Expand Up @@ -129,6 +131,7 @@ class CFileCloner
{
public:
StringAttr nameprefix;
StringAttr remoteStorage;
Owned<INode> foreigndalinode;
Linked<IUserDescriptor> userdesc;
Linked<IUserDescriptor> foreignuserdesc;
Expand Down Expand Up @@ -409,33 +412,41 @@ class CFileCloner
void updateCloneFrom(const char *lfn, IPropertyTree &attrs, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
{
DBGLOG("updateCloneFrom %s", lfn);
if (!srcdali || srcdali->endpoint().isNull())
if (remoteStorage.isEmpty() && (!srcdali || srcdali->endpoint().isNull()))
attrs.setProp("@cloneFromPeerCluster", srcCluster);
else
{
while(attrs.removeProp("cloneFromGroup"));

StringBuffer s;
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
if (!remoteStorage.isEmpty())
{
attrs.setProp("@cloneRemote", remoteStorage.str());
if (!isEmptyString(srcCluster))
attrs.setProp("@cloneRemoteCluster", srcCluster);
}
else
{
attrs.setProp("@cloneFrom", srcdali->endpoint().getEndpointHostText(s).str());
unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
attrs.setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
if (srcCluster && *srcCluster) //where to copy from has been explicity set to a remote location, don't copy from local sources
attrs.setProp("@cloneFromPeerCluster", "-");
if (prefix.length())
attrs.setProp("@cloneFromPrefix", prefix.get());

while(attrs.removeProp("cloneFromGroup"));

unsigned numClusters = srcfdesc->numClusters();
for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
{
StringBuffer sourceGroup;
srcfdesc->getClusterGroupName(clusterNum, sourceGroup, NULL);
if (srcCluster && *srcCluster && !streq(sourceGroup, srcCluster))
continue;
Owned<IPropertyTree> groupInfo = createPTree("cloneFromGroup");
groupInfo->setProp("@groupName", sourceGroup);
ClusterPartDiskMapSpec &spec = srcfdesc->queryPartDiskMapping(clusterNum);
spec.toProp(groupInfo);
attrs.addPropTree("cloneFromGroup", groupInfo.getClear());
}
}
}
void updateCloneFrom(IDistributedFile *dfile, IFileDescriptor *srcfdesc, INode *srcdali, const char *srcCluster)
Expand Down Expand Up @@ -570,6 +581,7 @@ class CFileCloner
const char *_cluster2,
IUserDescriptor *_userdesc,
const char *_foreigndali,
const char *_remoteStorage,
IUserDescriptor *_foreignuserdesc,
const char *_nameprefix,
bool _overwrite,
Expand All @@ -590,6 +602,8 @@ class CFileCloner
level = 0;
if (_foreigndali&&*_foreigndali)
foreigndalinode.setown(createINode(_foreigndali,DALI_SERVER_PORT));
if (_remoteStorage && *_remoteStorage)
remoteStorage.set(_remoteStorage);
fdir = &queryDistributedFileDirectory();
switch(_clustmap) {
case DFUcpdm_c_replicated_by_d:
Expand Down Expand Up @@ -833,6 +847,8 @@ class CFileCloner
else
{
StringBuffer s;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneRemote"), remoteStorage.str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFrom"), srcdali->endpoint().getEndpointHostText(s).str()))
return true;
if (checkValueChanged(dfile->queryAttributes().queryProp("@cloneFromDir"), srcfdesc->queryDefaultDir()))
Expand Down Expand Up @@ -876,20 +892,38 @@ class CFileCloner
srcLFN.clearForeign();
srcdali.setown(createINode(ep));
}

StringBuffer s;
Owned<IPropertyTree> ftree = fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign);
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
IPropertyTree *attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
Owned<IPropertyTree> ftree;
IPropertyTree *attsrc = nullptr;
if (remoteStorage || srcLFN.isRemote())
{
StringBuffer remoteLFN;
if (!srcLFN.isRemote())
remoteLFN.append("remote::").append(remoteStorage).append("::");
srcLFN.get(remoteLFN);

Owned<wsdfs::IDFSFile> dfsFile = wsdfs::lookupDFSFile(remoteLFN.str(), AccessMode::readSequential, INFINITE, wsdfs::keepAliveExpiryFrequency, foreignuserdesc);
if (!dfsFile)
throw makeStringExceptionV(-1,"Source file %s could not be found in Remote Storage", remoteLFN.str()); //remote scope already included in remoteLFN
ftree.setown(dfsFile->queryFileMeta()->getPropTree("File"));
}
else
{
ftree.setown(fdir->getFileTree(srcLFN.get(), foreignuserdesc, srcdali, FOREIGN_DALI_TIMEOUT, GetFileTreeOpts::appendForeign));
if (!ftree.get())
throw MakeStringException(-1,"Source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
attsrc = ftree->queryPropTree("Attr");
if (!attsrc)
throw MakeStringException(-1,"Attributes for source file %s could not be found in Dali %s",srcLFN.get(), getDaliEndPointStr(srcdali, s));
}

CDfsLogicalFileName dlfn;
dlfn.set(destfilename);
if (!streq(ftree->queryName(),queryDfsXmlBranchName(DXB_File)))
throw MakeStringException(-1,"Source file %s in Dali %s is not a simple file",filename, getDaliEndPointStr(srcdali, s));

if (!srcdali.get()||queryCoven().inCoven(srcdali))
if (!remoteStorage.length() && (!srcdali.get() || queryCoven().inCoven(srcdali)))
{
// if dali is local and filenames same
if (streq(srcLFN.get(), dlfn.get()))
Expand Down Expand Up @@ -1241,7 +1275,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
)
{
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,nameprefix,overwrite,dophysicalcopy);
CDfsLogicalFileName dlfn;
cloner.cloneSuperFile(srcname,dlfn);
}
Expand All @@ -1263,7 +1297,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
{
DBGLOG("createSingleFileClone src=%s@%s, dst=%s@%s, prefix=%s, ow=%d, docopy=%d", srcname, srcCluster, dstname, cluster1, prefix, overwrite, dophysicalcopy);
CFileCloner cloner;
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.init(cluster1,clustmap,repeattlk,cluster2,userdesc,foreigndali,nullptr,foreignuserdesc,NULL,overwrite,dophysicalcopy);
cloner.srcCluster.set(srcCluster);
cloner.prefix.set(prefix);
cloner.cloneFile(srcname,dstname);
Expand All @@ -1280,6 +1314,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
const char *defReplicateFolder,
IUserDescriptor *userdesc, // user desc for local dali
const char *foreigndali, // can be omitted if srcname foreign or local
const char *remoteStorage, // can be omitted if srcname remote or local
unsigned overwriteFlags, // overwrite destination if exists
bool dophysicalcopy
)
Expand All @@ -1288,7 +1323,7 @@ class CDFUhelper: implements IDFUhelper, public CInterface
CFileCloner cloner;
// MORE: Would the following be better to ensure files are copied when queries are deployed?
// bool copyPhysical = isContainerized() && (foreigndali != nullptr);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, NULL, NULL, false, dophysicalcopy);
cloner.init(dstCluster, DFUcpdm_c_replicated_by_d, true, NULL, userdesc, foreigndali, remoteStorage, NULL, NULL, false, dophysicalcopy);
cloner.overwriteFlags = overwriteFlags;
#ifndef _CONTAINERIZED
//In containerized mode there is no need to replicate files to the local disks of the roxie cluster - so don't set the special flag
Expand Down
2 changes: 2 additions & 0 deletions dali/dfu/dfuutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ interface IDfuFileCopier: extends IInterface
#define DALI_UPDATEF_PACKAGEMAP 0x0100
#define DFU_UPDATEF_COPY 0x1000
#define DFU_UPDATEF_OVERWRITE 0x2000
#define DFU_UPDATEF_REMOTESTORAGE 0x4000


#define DALI_UPDATEF_SUBFILE_MASK (DALI_UPDATEF_REPLACE_FILE | DALI_UPDATEF_CLONE_FROM | DALI_UPDATEF_APPEND_CLUSTER)
Expand Down Expand Up @@ -93,6 +94,7 @@ interface IDFUhelper: extends IInterface
const char *defReplicateFolder,
IUserDescriptor *userdesc, // user desc for local dali
const char *foreigndali, // can be omitted if srcname foreign or local
const char *remoteStorage, // can be omitted if srcname foreign or local
unsigned overwriteFlags, // overwrite destination options
bool dophysicalcopy
) = 0;
Expand Down
2 changes: 2 additions & 0 deletions dali/dfu/dfuwu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ include_directories (
./../../system/include
./../../system/jlib
./../../common/workunit
./../../esp/clients/ws_dfsclient
${HPCC_SOURCE_DIR}/system/security/shared #seclib.hpp
)

Expand All @@ -53,5 +54,6 @@ target_link_libraries ( dfuwu
hrpc
remote
dalibase
ws_dfsclient
)

4 changes: 3 additions & 1 deletion dali/dfu/dfuwu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#include "wujobq.hpp"
#include "dfuutil.hpp"

#include "ws_dfsclient.hpp"

#include "dfuwu.hpp"

#define COPY_WAIT_SECONDS 30
Expand Down Expand Up @@ -981,7 +983,7 @@ class CDFUfileSpec: public CLinkedDFUWUchild, implements IDFUfileSpec
parent->getPassword(password);
}
userdesc->set(username.str(),password.str());
Owned<IDistributedFile> file = queryDistributedFileDirectory().lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser);
Owned<IDistributedFile> file = wsdfs::lookup(lfn,userdesc,AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser,INFINITE);
if (file)
return file->getFileDescriptor();
}
Expand Down
3 changes: 3 additions & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,9 @@ void FileSprayer::calibrateProgress()

void FileSprayer::checkForOverlap()
{
if (distributedSource && distributedSource->isExternal())
return;

unsigned num = std::min(sources.ordinality(), targets.ordinality());

for (unsigned idx = 0; idx < num; idx++)
Expand Down
Loading
Loading