Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gordon Smith <[email protected]>
  • Loading branch information
GordonSmith committed Oct 14, 2024
2 parents 9edf4a8 + 93afec5 commit 0275911
Show file tree
Hide file tree
Showing 36 changed files with 1,845 additions and 160 deletions.
139 changes: 98 additions & 41 deletions common/pkgfiles/referencedfilelist.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,43 @@ void splitDerivedDfsLocation(const char *address, StringBuffer &cluster, StringB
prefix.append(basePrefix);
}

void splitDerivedDfsLocationOrRemote(const char *address, StringBuffer &cluster, StringBuffer &ip, StringBuffer &prefix,
const char *defaultCluster, const char *baseCluster, const char *baseIP, const char *basePrefix,
const char *currentRemoteStorage, StringBuffer& effectiveRemoteStorage, const char *baseRemoteStorage)
{
if (!isEmptyString(address) && !isEmptyString(currentRemoteStorage))
throw makeStringExceptionV(-1, "Cannot specify both a dfs location (%s) and a remote storage location (%s)", address, currentRemoteStorage);
// Choose either a daliip (split from address) or a remote-storage location to
// propagate to the next level of parsing as the effective method of resolving a file
if (!isEmptyString(address))
{
splitDfsLocation(address, cluster, ip, prefix, defaultCluster);
if (!ip.isEmpty())
effectiveRemoteStorage.clear();
return;
}

if (!isEmptyString(currentRemoteStorage))
{
effectiveRemoteStorage.append(currentRemoteStorage);
// Cluster and prefix aren't used if ip is empty, so they don't need to be cleared
ip.clear();
return;
}

ip.append(baseIP);
cluster.append(baseCluster);
prefix.append(basePrefix);
effectiveRemoteStorage.append(baseRemoteStorage);
}

class ReferencedFileList;

class ReferencedFile : implements IReferencedFile, public CInterface
{
public:
IMPLEMENT_IINTERFACE;
ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs, bool calcSize)
ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs, bool calcSize, const char *remoteStorageName)
: pkgid(_pkgid), fileSize(0), numParts(0), flags(_flags), noDfsResolution(noDfs), calcFileSize(calcSize), trackSubFiles(false)
{
{
Expand All @@ -142,6 +172,12 @@ class ReferencedFile : implements IReferencedFile, public CInterface
}
if (remoteStorage.length())
flags |= RefFileLFNRemote;
else if (!isEmptyString(remoteStorageName))
{
// can be declared in packagemap at different scopes
remoteStorage.set(remoteStorageName);
flags |= RefFileLFNRemote;
}
else if (daliip.length())
flags |= RefFileLFNForeign;
else
Expand All @@ -152,6 +188,9 @@ class ReferencedFile : implements IReferencedFile, public CInterface
flags |= RefSubFile;
}

ReferencedFile(const char *lfn, const char *sourceIP, const char *srcCluster, const char *prefix, bool isSubFile, unsigned _flags, const char *_pkgid, bool noDfs, bool calcSize)
: ReferencedFile(lfn, sourceIP, srcCluster, prefix, isSubFile, _flags, _pkgid, noDfs, calcSize, nullptr) { }

void reset()
{
flags &= ~(RefFileNotOnCluster | RefFileNotFound | RefFileResolvedForeign | RefFileResolvedRemote | RefFileCopyInfoFailed | RefFileCloned | RefFileNotOnSource); //these flags are calculated during resolve
Expand Down Expand Up @@ -242,18 +281,18 @@ class ReferencedFileList : implements IReferencedFileList, public CInterface
user.set(userDesc);
}

void ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
void ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip=nullptr, const char *srcCluster=nullptr, const char *remotePrefix=nullptr, const char* remoteStorage=nullptr);

virtual void addFile(const char *ln, const char *daliip=NULL, const char *srcCluster=NULL, const char *remotePrefix=NULL);
virtual void addFile(const char *ln, const char *daliip=nullptr, const char *srcCluster=nullptr, const char *remotePrefix=nullptr, const char *remoteStorageName=nullptr);
virtual void addFiles(StringArray &files);
virtual void addFilesFromWorkUnit(IConstWorkUnit *cw);
virtual bool addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackageMap *pm, const char *queryid);
virtual bool addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg);
virtual void addFilesFromPackageMap(IPropertyTree *pm);

void addFileFromSubFile(IPropertyTree &subFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
void addFilesFromSuperFile(IPropertyTree &superFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix);
void addFileFromSubFile(IPropertyTree &subFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix, const char *_remoteStorageName);
void addFilesFromSuperFile(IPropertyTree &superFile, const char *_daliip, const char *srcCluster, const char *_remotePrefix, const char *_remoteStorageName);
void addFilesFromPackage(IPropertyTree &package, const char *_daliip, const char *srcCluster, const char *_remotePrefix, const char *_remoteStorageName);

virtual IReferencedFileIterator *getFiles();
virtual void cloneFileInfo(StringBuffer &publisherWuid, const char *dstCluster, unsigned updateFlags, IDFUhelper *helper, bool cloneSuperInfo, bool cloneForeign, unsigned redundancy, unsigned channelsPerNode, int replicateOffset, const char *defReplicateFolder);
Expand Down Expand Up @@ -883,12 +922,12 @@ class ReferencedFileIterator : implements IReferencedFileIterator, public CInter
Owned<HashIterator> iter;
};

void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip, const char *srcCluster, const char *prefix)
void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *pkgid, bool noDfsResolution, const StringArray *subfileNames, const char *daliip, const char *srcCluster, const char *prefix, const char *remoteStorageName)
{
if (!allowForeign && checkForeign(ln))
throw MakeStringException(-1, "Foreign file not allowed%s: %s", (flags & RefFileInPackage) ? " (declared in package)" : "", ln);

Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid, noDfsResolution, allowSizeCalc);
Owned<ReferencedFile> file = new ReferencedFile(ln, daliip, srcCluster, prefix, false, flags, pkgid, noDfsResolution, allowSizeCalc, remoteStorageName);
if (!file->logicalName.length())
return;
if (subfileNames)
Expand All @@ -904,9 +943,9 @@ void ReferencedFileList::ensureFile(const char *ln, unsigned flags, const char *
}
}

void ReferencedFileList::addFile(const char *ln, const char *daliip, const char *srcCluster, const char *prefix)
void ReferencedFileList::addFile(const char *ln, const char *daliip, const char *srcCluster, const char *prefix, const char *remoteStorageName)
{
ensureFile(ln, 0, NULL, false, nullptr, daliip, srcCluster, prefix);
ensureFile(ln, 0, NULL, false, nullptr, daliip, srcCluster, prefix, remoteStorageName);
}

void ReferencedFileList::addFiles(StringArray &files)
Expand All @@ -915,58 +954,64 @@ void ReferencedFileList::addFiles(StringArray &files)
addFile(files.item(i));
}

void ReferencedFileList::addFileFromSubFile(IPropertyTree &subFile, const char *ip, const char *cluster, const char *prefix)
void ReferencedFileList::addFileFromSubFile(IPropertyTree &subFile, const char *ip, const char *cluster, const char *prefix, const char *remoteStorageName)
{
addFile(subFile.queryProp("@value"), ip, cluster, prefix);
addFile(subFile.queryProp("@value"), ip, cluster, prefix, remoteStorageName);
}

void ReferencedFileList::addFilesFromSuperFile(IPropertyTree &superFile, const char *_ip, const char *_cluster, const char *_prefix)
void ReferencedFileList::addFilesFromSuperFile(IPropertyTree &superFile, const char *_ip, const char *_cluster, const char *_prefix, const char *ancestorRemoteStorage)
{
StringBuffer ip;
StringBuffer cluster;
StringBuffer prefix;
splitDerivedDfsLocation(superFile.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
StringBuffer effectiveRemoteStorage;
splitDerivedDfsLocationOrRemote(superFile.queryProp("@daliip"), cluster, ip, prefix, nullptr, _cluster, _ip, _prefix,
superFile.queryProp("@remoteStorage"), effectiveRemoteStorage, ancestorRemoteStorage);
if (superFile.hasProp("@sourceCluster"))
cluster.set(superFile.queryProp("@sourceCluster"));

Owned<IPropertyTreeIterator> subFiles = superFile.getElements("SubFile[@value]");
ForEach(*subFiles)
addFileFromSubFile(subFiles->query(), ip, cluster, prefix);
addFileFromSubFile(subFiles->query(), ip, cluster, prefix, effectiveRemoteStorage);
}

void ReferencedFileList::addFilesFromPackage(IPropertyTree &package, const char *_ip, const char *_cluster, const char *_prefix)
void ReferencedFileList::addFilesFromPackage(IPropertyTree &package, const char *_ip, const char *_cluster, const char *_prefix, const char *ancestorRemoteStorage)
{
StringBuffer ip;
StringBuffer cluster;
StringBuffer prefix;
splitDerivedDfsLocation(package.queryProp("@daliip"), cluster, ip, prefix, NULL, _cluster, _ip, _prefix);
StringBuffer effectiveRemoteStorage;
splitDerivedDfsLocationOrRemote(package.queryProp("@daliip"), cluster, ip, prefix, nullptr, _cluster, _ip, _prefix,
package.queryProp("@remoteStorage"), effectiveRemoteStorage, ancestorRemoteStorage);
if (package.hasProp("@sourceCluster"))
cluster.set(package.queryProp("@sourceCluster"));

Owned<IPropertyTreeIterator> supers = package.getElements("SuperFile");
ForEach(*supers)
addFilesFromSuperFile(supers->query(), ip, cluster, prefix);
addFilesFromSuperFile(supers->query(), ip, cluster, prefix, effectiveRemoteStorage);
}

void ReferencedFileList::addFilesFromPackageMap(IPropertyTree *pm)
{
StringBuffer ip;
StringBuffer cluster;
StringBuffer prefix;
StringBuffer remoteStorageName;
const char *srcCluster = pm->queryProp("@sourceCluster");
splitDerivedDfsLocation(pm->queryProp("@daliip"), cluster, ip, prefix, srcCluster, srcCluster, NULL, NULL);
splitDerivedDfsLocationOrRemote(pm->queryProp("@daliip"), cluster, ip, prefix, srcCluster, srcCluster, nullptr, nullptr,
pm->queryProp("@remoteStorage"), remoteStorageName, nullptr);
Owned<IPropertyTreeIterator> packages = pm->getElements("Package");
ForEach(*packages)
addFilesFromPackage(packages->query(), ip, cluster, prefix);
addFilesFromPackage(packages->query(), ip, cluster, prefix, remoteStorageName);
packages.setown(pm->getElements("Part/Package"));
ForEach(*packages)
addFilesFromPackage(packages->query(), ip, cluster, prefix);
addFilesFromPackage(packages->query(), ip, cluster, prefix, remoteStorageName);
}

bool ReferencedFileList::addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)
{
SummaryMap files;
if (cw->getSummary(SummaryType::ReadFile, files) &&
if (cw->getSummary(SummaryType::ReadFile, files) &&
cw->getSummary(SummaryType::ReadIndex, files))
{
for (const auto& [lName, summaryFlags] : files)
Expand Down Expand Up @@ -1104,31 +1149,43 @@ void ReferencedFileList::resolveFiles(const StringArray &locations, const char *
srcCluster.set(_srcCluster);
remotePrefix.set(_remotePrefix);

ReferencedFileIterator files(this);

// For use when expandSuperFiles=true
if (useRemoteStorage)
{
DBGLOG("ReferencedFileList resolving remote storage files at %s", nullText(remoteLocation));
if (!user)
user.setown(createUserDescriptor());
remoteStorage.set(remoteLocation);
ReferencedFileIterator files(this);
ForEach(files)
files.queryObject().resolveLocalOrRemote(locations, srcCluster, user, remoteStorage, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign);
}
else

ForEach(files)
{
if (!isEmptyString(remoteLocation))
DBGLOG("ReferencedFileList resolving remote dali files at %s", remoteLocation);
ReferencedFile &file = files.queryObject();
if (file.daliip.isEmpty() && (!file.remoteStorage.isEmpty() || useRemoteStorage))
{
if (!user)
user.setown(createUserDescriptor());

if (file.remoteStorage.isEmpty()) // Can be set at multiple levels in a packagemap
file.remoteStorage.set(remoteLocation); // Top-level remoteLocation has lowest precedence, used if nothing set in packagemap

DBGLOG("ReferencedFileList resolving remote storage file at %s", nullText(file.remoteStorage));
file.resolveLocalOrRemote(locations, srcCluster, user, file.remoteStorage, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign);
}
else
DBGLOG("ReferencedFileList resolving local files (no daliip)");
remote.setown(!isEmptyString(remoteLocation) ? createINode(remoteLocation, 7070) : nullptr);
{
// The remoteLocation is a daliip when useRemoteStorage is false
const char *passedDaliip = !useRemoteStorage ? remoteLocation : nullptr;
if (!isEmptyString(passedDaliip) || !file.daliip.isEmpty())
DBGLOG("ReferencedFileList resolving remote dali file at %s", isEmptyString(passedDaliip) ? nullText(file.daliip) : passedDaliip);
else
DBGLOG("ReferencedFileList resolving local file (no daliip or remote storage)");
// Otherwise, passing nullptr for remote allows resolveLocalOrForeign to use ReferencedFile.daliip with
// the matching ReferencedFile.remotePrefix instead of the ReferencedFileList.remotePrefix passed in here.
remote.setown(!isEmptyString(passedDaliip) ? createINode(passedDaliip, 7070) : nullptr);
file.resolveLocalOrForeign(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign);
}

ReferencedFileIterator files(this);
ForEach(files)
files.queryObject().resolveLocalOrForeign(locations, srcCluster, user, remote, remotePrefix, checkLocalFirst, expandSuperFiles ? &subfiles : NULL, trackSubFiles, resolveLFNForeign);
if (expandSuperFiles)
resolveSubFiles(subfiles, locations, checkLocalFirst, trackSubFiles, resolveLFNForeign);
}

if (expandSuperFiles)
resolveSubFiles(subfiles, locations, checkLocalFirst, trackSubFiles, resolveLFNForeign);
}

bool ReferencedFileList::filesNeedCopying(bool cloneForeign)
Expand Down
2 changes: 1 addition & 1 deletion common/pkgfiles/referencedfilelist.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ interface IReferencedFileList : extends IInterface
virtual bool addFilesFromQuery(IConstWorkUnit *cw, const IHpccPackage *pkg)=0;
virtual void addFilesFromPackageMap(IPropertyTree *pm)=0;

virtual void addFile(const char *ln, const char *daliip=NULL, const char *sourceProcessCluster=NULL, const char *remotePrefix=NULL)=0;
virtual void addFile(const char *ln, const char *daliip=nullptr, const char *sourceProcessCluster=nullptr, const char *remotePrefix=nullptr, const char *remoteStorageName=nullptr)=0;
virtual void addFiles(StringArray &files)=0;

virtual IReferencedFileIterator *getFiles()=0;
Expand Down
63 changes: 57 additions & 6 deletions common/thorhelper/thorsoapcall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,40 @@ using roxiemem::OwnedRoxieString;
#define CONNECTION "Connection"

unsigned soapTraceLevel = 1;
static StringBuffer soapSepString;

void setSoapSepString(const char *_soapSepString)
{
soapSepString.set(_soapSepString);
}

static void multiLineAppendReplace(StringBuffer &origStr, StringBuffer &newStr)
{
if (origStr.isEmpty())
return;

newStr.ensureCapacity(origStr.length());

const char *cursor = origStr;
while (*cursor)
{
switch (*cursor)
{
case '\r':
newStr.append(soapSepString);
if ('\n' == *(cursor+1))
cursor++;
break;
case '\n':
newStr.append(soapSepString);
break;
default:
newStr.append(*cursor);
break;
}
++cursor;
}
}

#define WSCBUFFERSIZE 0x10000
#define MAXWSCTHREADS 50 //Max Web Service Call Threads
Expand Down Expand Up @@ -1940,10 +1974,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
{
if (soapTraceLevel > 6 || master->logXML)
{
if (!contentEncoded)
master->logctx.mCTXLOG("%s: request(%s)", master->wscCallTypeText(), request.str());
StringBuffer contentStr;
if (contentEncoded)
contentStr.append(", content encoded.");
// Only do translation if soapcall LOG option set and soapSepString defined
if ( (master->logXML) && (soapSepString.length() > 0) )
{
StringBuffer request2;
multiLineAppendReplace(request, request2);
master->logctx.CTXLOG("%s: request(%s)%s", master->wscCallTypeText(), request2.str(), contentStr.str());
}
else
master->logctx.mCTXLOG("%s: request(%s), content encoded.", master->wscCallTypeText(), request.str());
master->logctx.mCTXLOG("%s: request(%s)%s", master->wscCallTypeText(), request.str(), contentStr.str());
}
}

Expand Down Expand Up @@ -2247,9 +2289,18 @@ class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFo
if (checkContentDecoding(dbgheader, response, contentEncoding))
decodeContent(contentEncoding.str(), response);
if (soapTraceLevel > 6 || master->logXML)
master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str());
else if (soapTraceLevel > 8)
master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", response.str()); // not sure this is that useful but...
{
// Only do translation if soapcall LOG option set and soapSepString defined
if ( (master->logXML) && (soapSepString.length() > 0) )
{
StringBuffer response2;
multiLineAppendReplace(dbgheader, response2);
multiLineAppendReplace(response, response2);
master->logctx.CTXLOG("%s: LEN=%d %sresponse(%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", response2.str());
}
else
master->logctx.mCTXLOG("%s: LEN=%d %sresponse(%s%s)", getWsCallTypeName(master->wscType),response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str());
}
return rval;
}

Expand Down
Loading

0 comments on commit 0275911

Please sign in to comment.