From f0af4945616169248eb28a97d71b77ff04b96402 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Fri, 1 Nov 2024 18:43:46 +0000 Subject: [PATCH] HPCC-32873 Avoid major refactoring mainly by forcing pull it # targets1 && targets>1 and nosplit used Signed-off-by: Jake Smith --- dali/ft/filecopy.cpp | 200 ++++++++++++++++++++---------------------- dali/ft/filecopy.ipp | 3 +- system/jlib/jfile.cpp | 16 +++- system/jlib/jfile.hpp | 2 +- 4 files changed, 109 insertions(+), 112 deletions(-) diff --git a/dali/ft/filecopy.cpp b/dali/ft/filecopy.cpp index f68d1db67e4..040337c5339 100644 --- a/dali/ft/filecopy.cpp +++ b/dali/ft/filecopy.cpp @@ -886,7 +886,7 @@ void FileSprayer::afterTransfer() } } -bool FileSprayer::allowSplit() +bool FileSprayer::allowSplit() const { return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix)); } @@ -1595,13 +1595,23 @@ void FileSprayer::commonUpSlaves() if (pushWhole) return; + // noCommon is defaulted to on for non-containerized (revisit!) + bool noCommon = options->getPropBool(ANnocommon, !isContainerized()); + if (isContainerized() && noCommon) + { + IWARNLOG("Ignoring noCommon option in containerized mode"); + noCommon = false; + } + else if (noCommon) + return; + //First work out which are the same slaves, and then map the partition. //Previously it was n^2 in partition, which is fine until you spray 100K files. unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality(); - bool commonByIp = !isContainerized() && (!options->getPropBool(ANnocommon, true)); - offset_t threshold = 0x8000 * numSlaves; - offset_t totalSourceFileSize = 0; + bool commonByIp = !isContainerized(); + offset_t totalSourceFileSize = 0; + offset_t threshold = 0x8000 * numSlaves; ForEachItemIn(i, sources) { const FilePartInfo & cur = sources.item(i); @@ -2512,7 +2522,6 @@ void FileSprayer::insertHeaders() } } - bool FileSprayer::needToCalcOutput() { return !usePullOperation() || options->getPropBool(ANverify); @@ -3276,31 +3285,6 @@ bool FileSprayer::disallowImplicitReplicate() } -void FileSprayer::checkPushSupported() -{ - if (targetSupportsConcurrentWrite) - return; // push ok - - bool multilpeSourcesPerTarget = false; - std::vector targetUsed; - targetUsed.resize(targets.ordinality()); - ForEachItemIn(idx, partition) - { - PartitionPoint & cur = partition.item(idx); - if (targetUsed[cur.whichOutput]) - { - multilpeSourcesPerTarget = true; - break; - } - targetUsed[cur.whichOutput] = true; - } - if (!multilpeSourcesPerTarget) - return; // push ok - - IWARNLOG("Forcing pull. Multiple source partitions write to same target, and target does not support concurrent write"); - cachedUsePull = true; -} - void FileSprayer::spray() { if (!allowSplit() && querySplitPrefix()) @@ -3329,9 +3313,22 @@ void FileSprayer::spray() return; } + targetSupportsConcurrentWrite = isContainerized() ? false : true; + if (!targetPlane.isEmpty()) + targetSupportsConcurrentWrite = 0 != getPlaneAttributeValue(targetPlane, ConcurrentWriteSupport, targetSupportsConcurrentWrite ? 1 : 0); + checkFormats(); checkForOverlap(); + progressTree->setPropBool(ANpull, usePullOperation()); + + const char * splitPrefix = querySplitPrefix(); + if (!replicate && (sources.ordinality() == targets.ordinality())) + { + if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate()) + copySource = true; + } + if (compressOutput&&!replicate&&!copySource) { PROGLOG("Compress output forcing pull"); @@ -3342,8 +3339,6 @@ void FileSprayer::spray() // in containerized mode, redirect to dafilesrv service or local (if useFtSlave=true) if (isContainerized()) { - targetSupportsConcurrentWrite = false; - if (useFtSlave) { //In containerized world all ftslave processes are executed locally, so make sure we try and connect to a local instance @@ -3356,17 +3351,6 @@ void FileSprayer::spray() sprayServiceHost.clear().append(sprayServiceConfig->queryProp("@name")).append(':').append(sprayServiceConfig->getPropInt("@port")); } } - else - targetSupportsConcurrentWrite = true; - if (!targetPlane.isEmpty()) - targetSupportsConcurrentWrite = 0 != getPlaneAttributeValue(targetPlane, ConcurrentWriteSupport, targetSupportsConcurrentWrite ? 1 : 0); - - const char * splitPrefix = querySplitPrefix(); - if (!replicate && (sources.ordinality() == targets.ordinality())) - { - if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate()) - copySource = true; - } gatherFileSizes(true); if (!replicate||copySource) // NB: When copySource=true, analyseFileHeaders mainly just sets srcFormat.type @@ -3405,10 +3389,6 @@ void FileSprayer::spray() } addEmptyFilesToPartition(); - if (!usePullOperation()) - checkPushSupported(); // will switch to pull if push not supported - progressTree->setPropBool(ANpull, usePullOperation()); // NB: usePullOperation will cache result - derivePartitionExtra(); if (partition.ordinality() < 1000) displayPartition(); @@ -3998,7 +3978,6 @@ bool FileSprayer::usePullOperation() const { calcedPullPush = true; cachedUsePull = calcUsePull(); - LOG(MCdebugInfo, "Using %s operation", cachedUsePull ? "pull" : "push"); } return cachedUsePull; } @@ -4023,6 +4002,7 @@ bool FileSprayer::canLocateSlaveForNode(const IpAddress &ip) const return false; } + bool FileSprayer::calcUsePull() const { if (allowRecovery && progressTree->hasProp(ANpull)) @@ -4035,91 +4015,97 @@ bool FileSprayer::calcUsePull() const if (sources.ordinality() == 0) return true; - if (!isContainerized()) // using spray-service (dafilsrv) not ftslave in containerized (or if useFtSlave, then created as local process) + if (options->getPropBool(ANpull, false)) { - // In BM the sources or targets may not be HPCC environment machines and/or might not be able to run ftslave - ForEachItemIn(idx2, sources) + LOG(MCdebugInfo, "Use pull since explicitly specified"); + return true; + } + + bool pushRequested = options->getPropBool(ANpush); + if (!targetSupportsConcurrentWrite) // NB: default for containerized is false + { + if (!pushRequested) + return true; + if (!usePushWholeOperation()) { - if (!sources.item(idx2).canPush()) + if (targets.ordinality() <= sources.ordinality()) { - StringBuffer s; - sources.item(idx2).filename.queryIP().getHostText(s); - LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str()); + // NB: this is being calculated before partitioning has occurred + // It can be refactored so that it decides after partitioning, and only has to force pull + // if multiple partitions write to same target file. + LOG(MCdebugInfo, "Use pull operation because target doesn't support concurrent write"); return true; } - } + // else targets > sources - if (!canLocateSlaveForNode(sources.item(0).filename.queryIP())) - { - StringBuffer s; - sources.item(0).filename.queryIP().getHostText(s); - LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str()); - return true; - } - - ForEachItemIn(idx, targets) - { - if (!targets.item(idx).canPull()) + // if push requested and N:M and no split, then throw an error unless expert option allows + if (!copySource) // 1:1 partitioning if copySource==true { - StringBuffer s; - targets.item(idx).queryIP().getHostText(s); - LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str()); - return false; + if ((sources.ordinality() > 1) && (targets.ordinality() > 1) && !allowSplit()) + { + if (!getComponentConfigSP()->getPropBool("expert/@allowPushNoSplit")) + throw makeStringExceptionV(0, "Pushing to multiple targets with no split is not supported to this target plane (%s)", targetPlane.str()); + } } } - - if (!canLocateSlaveForNode(targets.item(0).queryIP())) + LOG(MCdebugInfo, "Use push since explicitly specified"); + return false; + } + else // ! targetSupportsConcurrentWrite + { + if (pushRequested) { - StringBuffer s; - targets.item(0).queryIP().getHostText(s); - LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str()); + LOG(MCdebugInfo, "Use push since explicitly specified"); return false; } } - bool pullRequested = options->hasProp(ANpull) ? options->getPropBool(ANpull) : false; - bool pushRequested = options->hasProp(ANpush) ? options->getPropBool(ANpush) : false; - if (!targetSupportsConcurrentWrite) + ForEachItemIn(idx2, sources) { - if (!pushRequested) - return true; - - if (targets.ordinality() < sources.ordinality()) + if (!sources.item(idx2).canPush()) { - IWARNLOG("Ignoring push option. Multiple source partitions would write to same target, and target does not support concurrent write"); + StringBuffer s; + sources.item(idx2).filename.queryIP().getHostText(s); + LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str()); return true; } - else - { - // may still not be allowed. After partitioning, if multiple sources write to same target, then pull is forced (see checkPushSupported()) - return false; - } } - else + if (!canLocateSlaveForNode(sources.item(0).filename.queryIP())) { - bool wantPull = false; - bool wantPush = false; - if (targets.ordinality() < sources.ordinality()) // implying multiple writes to same target, force pull, and will common up on matching target filenames - wantPull = true; - else if (targets.ordinality() > sources.ordinality()) // targets > sources. i.e. multiple splits of source files for each target - wantPush = true; + StringBuffer s; + sources.item(0).filename.queryIP().getHostText(s); + LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str()); + return true; + } - if (wantPull && pushRequested) - { - IWARNLOG("Wanted to pull since targets < sources, but push option takes precedence"); - return false; // push - } - else if (wantPush && pullRequested) + ForEachItemIn(idx, targets) + { + if (!targets.item(idx).canPull()) { - IWARNLOG("Wanted to push since targets > sources, but pull option takes precedence"); - return true; // pull + StringBuffer s; + targets.item(idx).queryIP().getHostText(s); + LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str()); + return false; } + } - if (wantPush || pushRequested) - return false; // push + if (!canLocateSlaveForNode(targets.item(0).queryIP())) + { + StringBuffer s; + targets.item(0).queryIP().getHostText(s); + LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str()); + return false; + } - return true; // default pull + //Use push if going to a single node. + if ((targets.ordinality() == 1) && (sources.ordinality() > 1)) + { + LOG(MCdebugInfo, "Use push operation because going to a single node from many"); + return false; } + + LOG(MCdebugInfo, "Use pull operation as default"); + return true; } diff --git a/dali/ft/filecopy.ipp b/dali/ft/filecopy.ipp index 94ae3640cfe..2c98d92abc0 100644 --- a/dali/ft/filecopy.ipp +++ b/dali/ft/filecopy.ipp @@ -222,7 +222,7 @@ protected: void addTarget(unsigned idx, INode * node); void afterGatherFileSizes(); void afterTransfer(); - bool allowSplit(); + bool allowSplit() const; void analyseFileHeaders(bool setcurheadersize); void assignPartitionFilenames(); void beforeTransfer(); @@ -239,7 +239,6 @@ protected: void calibrateProgress(); void checkFormats(); void checkForOverlap(); - void checkPushSupported(); void cleanupRecovery(); void cloneHeaderFooter(unsigned idx, bool isHeader); void commonUpSlaves(); diff --git a/system/jlib/jfile.cpp b/system/jlib/jfile.cpp index b57d0d12761..b5a330dd775 100644 --- a/system/jlib/jfile.cpp +++ b/system/jlib/jfile.cpp @@ -7943,7 +7943,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) switch (attrInfo.type) { case PlaneAttrType::integer: - case PlaneAttrType::boolean: // handling is same as integer { unsigned __int64 value = plane.getPropInt64(prop.c_str(), unsetPlaneAttrValue); if (unsetPlaneAttrValue != value) @@ -7954,12 +7953,25 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) value *= attrInfo.scale; } } + values[propNum] = value; + break; + } + case PlaneAttrType::boolean: + { + unsigned __int64 value; + if (plane.hasProp(prop.c_str())) + value = plane.getPropBool(prop.c_str()) ? 1 : 0; else if (FileSyncWriteClose == propNum) // temporary (if FileSyncWriteClose and unset, check legacy fileSyncMaxRetrySecs), purely for short term backward compatibility (see HPCC-32757) { unsigned __int64 v = plane.getPropInt64("expert/@fileSyncMaxRetrySecs", unsetPlaneAttrValue); // NB: fileSyncMaxRetrySecs==0 is treated as set/enabled - value = v != unsetPlaneAttrValue ? 1 : 0; + if (unsetPlaneAttrValue != v) + value = 1; + else + value = unsetPlaneAttrValue; } + else + value = unsetPlaneAttrValue; values[propNum] = value; break; } diff --git a/system/jlib/jfile.hpp b/system/jlib/jfile.hpp index 0fc07b7a1f9..543fb355e79 100644 --- a/system/jlib/jfile.hpp +++ b/system/jlib/jfile.hpp @@ -744,7 +744,7 @@ extern jlib_decl IPropertyTreeIterator * getRemoteStoragesIterator(); extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name); extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize); -enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp +enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp { BlockedSequentialIO, BlockedRandomIO,