Skip to content

Commit

Permalink
HPCC-32873 Avoid major refactoring
Browse files Browse the repository at this point in the history
mainly by forcing pull it # targets<sources
else catching if source>1 && targets>1 and nosplit used

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Nov 4, 2024
1 parent 901d209 commit f0af494
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 112 deletions.
200 changes: 93 additions & 107 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ void FileSprayer::afterTransfer()
}
}

bool FileSprayer::allowSplit()
bool FileSprayer::allowSplit() const
{
return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
}
Expand Down Expand Up @@ -1595,13 +1595,23 @@ void FileSprayer::commonUpSlaves()
if (pushWhole)
return;

// noCommon is defaulted to on for non-containerized (revisit!)
bool noCommon = options->getPropBool(ANnocommon, !isContainerized());
if (isContainerized() && noCommon)
{
IWARNLOG("Ignoring noCommon option in containerized mode");
noCommon = false;
}
else if (noCommon)
return;

//First work out which are the same slaves, and then map the partition.
//Previously it was n^2 in partition, which is fine until you spray 100K files.
unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
bool commonByIp = !isContainerized() && (!options->getPropBool(ANnocommon, true));
offset_t threshold = 0x8000 * numSlaves;
offset_t totalSourceFileSize = 0;
bool commonByIp = !isContainerized();

offset_t totalSourceFileSize = 0;
offset_t threshold = 0x8000 * numSlaves;
ForEachItemIn(i, sources)
{
const FilePartInfo & cur = sources.item(i);
Expand Down Expand Up @@ -2512,7 +2522,6 @@ void FileSprayer::insertHeaders()
}
}


bool FileSprayer::needToCalcOutput()
{
return !usePullOperation() || options->getPropBool(ANverify);
Expand Down Expand Up @@ -3276,31 +3285,6 @@ bool FileSprayer::disallowImplicitReplicate()

}

void FileSprayer::checkPushSupported()
{
if (targetSupportsConcurrentWrite)
return; // push ok

bool multilpeSourcesPerTarget = false;
std::vector<bool> targetUsed;
targetUsed.resize(targets.ordinality());
ForEachItemIn(idx, partition)
{
PartitionPoint & cur = partition.item(idx);
if (targetUsed[cur.whichOutput])
{
multilpeSourcesPerTarget = true;
break;
}
targetUsed[cur.whichOutput] = true;
}
if (!multilpeSourcesPerTarget)
return; // push ok

IWARNLOG("Forcing pull. Multiple source partitions write to same target, and target does not support concurrent write");
cachedUsePull = true;
}

void FileSprayer::spray()
{
if (!allowSplit() && querySplitPrefix())
Expand Down Expand Up @@ -3329,9 +3313,22 @@ void FileSprayer::spray()
return;
}

targetSupportsConcurrentWrite = isContainerized() ? false : true;
if (!targetPlane.isEmpty())
targetSupportsConcurrentWrite = 0 != getPlaneAttributeValue(targetPlane, ConcurrentWriteSupport, targetSupportsConcurrentWrite ? 1 : 0);

checkFormats();
checkForOverlap();

progressTree->setPropBool(ANpull, usePullOperation());

const char * splitPrefix = querySplitPrefix();
if (!replicate && (sources.ordinality() == targets.ordinality()))
{
if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
copySource = true;
}

if (compressOutput&&!replicate&&!copySource)
{
PROGLOG("Compress output forcing pull");
Expand All @@ -3342,8 +3339,6 @@ void FileSprayer::spray()
// in containerized mode, redirect to dafilesrv service or local (if useFtSlave=true)
if (isContainerized())
{
targetSupportsConcurrentWrite = false;

if (useFtSlave)
{
//In containerized world all ftslave processes are executed locally, so make sure we try and connect to a local instance
Expand All @@ -3356,17 +3351,6 @@ void FileSprayer::spray()
sprayServiceHost.clear().append(sprayServiceConfig->queryProp("@name")).append(':').append(sprayServiceConfig->getPropInt("@port"));
}
}
else
targetSupportsConcurrentWrite = true;
if (!targetPlane.isEmpty())
targetSupportsConcurrentWrite = 0 != getPlaneAttributeValue(targetPlane, ConcurrentWriteSupport, targetSupportsConcurrentWrite ? 1 : 0);

const char * splitPrefix = querySplitPrefix();
if (!replicate && (sources.ordinality() == targets.ordinality()))
{
if (srcFormat.equals(tgtFormat) && !disallowImplicitReplicate())
copySource = true;
}

gatherFileSizes(true);
if (!replicate||copySource) // NB: When copySource=true, analyseFileHeaders mainly just sets srcFormat.type
Expand Down Expand Up @@ -3405,10 +3389,6 @@ void FileSprayer::spray()
}
addEmptyFilesToPartition();

if (!usePullOperation())
checkPushSupported(); // will switch to pull if push not supported
progressTree->setPropBool(ANpull, usePullOperation()); // NB: usePullOperation will cache result

derivePartitionExtra();
if (partition.ordinality() < 1000)
displayPartition();
Expand Down Expand Up @@ -3998,7 +3978,6 @@ bool FileSprayer::usePullOperation() const
{
calcedPullPush = true;
cachedUsePull = calcUsePull();
LOG(MCdebugInfo, "Using %s operation", cachedUsePull ? "pull" : "push");
}
return cachedUsePull;
}
Expand All @@ -4023,6 +4002,7 @@ bool FileSprayer::canLocateSlaveForNode(const IpAddress &ip) const
return false;
}


bool FileSprayer::calcUsePull() const
{
if (allowRecovery && progressTree->hasProp(ANpull))
Expand All @@ -4035,91 +4015,97 @@ bool FileSprayer::calcUsePull() const
if (sources.ordinality() == 0)
return true;

if (!isContainerized()) // using spray-service (dafilsrv) not ftslave in containerized (or if useFtSlave, then created as local process)
if (options->getPropBool(ANpull, false))
{
// In BM the sources or targets may not be HPCC environment machines and/or might not be able to run ftslave
ForEachItemIn(idx2, sources)
LOG(MCdebugInfo, "Use pull since explicitly specified");
return true;
}

bool pushRequested = options->getPropBool(ANpush);
if (!targetSupportsConcurrentWrite) // NB: default for containerized is false
{
if (!pushRequested)
return true;
if (!usePushWholeOperation())
{
if (!sources.item(idx2).canPush())
if (targets.ordinality() <= sources.ordinality())
{
StringBuffer s;
sources.item(idx2).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str());
// NB: this is being calculated before partitioning has occurred
// It can be refactored so that it decides after partitioning, and only has to force pull
// if multiple partitions write to same target file.
LOG(MCdebugInfo, "Use pull operation because target doesn't support concurrent write");
return true;
}
}
// else targets > sources

if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
{
StringBuffer s;
sources.item(0).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
return true;
}

ForEachItemIn(idx, targets)
{
if (!targets.item(idx).canPull())
// if push requested and N:M and no split, then throw an error unless expert option allows
if (!copySource) // 1:1 partitioning if copySource==true
{
StringBuffer s;
targets.item(idx).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str());
return false;
if ((sources.ordinality() > 1) && (targets.ordinality() > 1) && !allowSplit())
{
if (!getComponentConfigSP()->getPropBool("expert/@allowPushNoSplit"))
throw makeStringExceptionV(0, "Pushing to multiple targets with no split is not supported to this target plane (%s)", targetPlane.str());
}
}
}

if (!canLocateSlaveForNode(targets.item(0).queryIP()))
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
else // ! targetSupportsConcurrentWrite
{
if (pushRequested)
{
StringBuffer s;
targets.item(0).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str());
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
}

bool pullRequested = options->hasProp(ANpull) ? options->getPropBool(ANpull) : false;
bool pushRequested = options->hasProp(ANpush) ? options->getPropBool(ANpush) : false;
if (!targetSupportsConcurrentWrite)
ForEachItemIn(idx2, sources)
{
if (!pushRequested)
return true;

if (targets.ordinality() < sources.ordinality())
if (!sources.item(idx2).canPush())
{
IWARNLOG("Ignoring push option. Multiple source partitions would write to same target, and target does not support concurrent write");
StringBuffer s;
sources.item(idx2).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s cannot push", s.str());
return true;
}
else
{
// may still not be allowed. After partitioning, if multiple sources write to same target, then pull is forced (see checkPushSupported())
return false;
}
}
else
if (!canLocateSlaveForNode(sources.item(0).filename.queryIP()))
{
bool wantPull = false;
bool wantPush = false;
if (targets.ordinality() < sources.ordinality()) // implying multiple writes to same target, force pull, and will common up on matching target filenames
wantPull = true;
else if (targets.ordinality() > sources.ordinality()) // targets > sources. i.e. multiple splits of source files for each target
wantPush = true;
StringBuffer s;
sources.item(0).filename.queryIP().getHostText(s);
LOG(MCdebugInfo, "Use pull operation because %s doesn't appear to have an ftslave", s.str());
return true;
}

if (wantPull && pushRequested)
{
IWARNLOG("Wanted to pull since targets < sources, but push option takes precedence");
return false; // push
}
else if (wantPush && pullRequested)
ForEachItemIn(idx, targets)
{
if (!targets.item(idx).canPull())
{
IWARNLOG("Wanted to push since targets > sources, but pull option takes precedence");
return true; // pull
StringBuffer s;
targets.item(idx).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s cannot pull", s.str());
return false;
}
}

if (wantPush || pushRequested)
return false; // push
if (!canLocateSlaveForNode(targets.item(0).queryIP()))
{
StringBuffer s;
targets.item(0).queryIP().getHostText(s);
LOG(MCdebugInfo, "Use push operation because %s doesn't appear to have an ftslave", s.str());
return false;
}

return true; // default pull
//Use push if going to a single node.
if ((targets.ordinality() == 1) && (sources.ordinality() > 1))
{
LOG(MCdebugInfo, "Use push operation because going to a single node from many");
return false;
}

LOG(MCdebugInfo, "Use pull operation as default");
return true;
}


Expand Down
3 changes: 1 addition & 2 deletions dali/ft/filecopy.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected:
void addTarget(unsigned idx, INode * node);
void afterGatherFileSizes();
void afterTransfer();
bool allowSplit();
bool allowSplit() const;
void analyseFileHeaders(bool setcurheadersize);
void assignPartitionFilenames();
void beforeTransfer();
Expand All @@ -239,7 +239,6 @@ protected:
void calibrateProgress();
void checkFormats();
void checkForOverlap();
void checkPushSupported();
void cleanupRecovery();
void cloneHeaderFooter(unsigned idx, bool isHeader);
void commonUpSlaves();
Expand Down
16 changes: 14 additions & 2 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7943,7 +7943,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
switch (attrInfo.type)
{
case PlaneAttrType::integer:
case PlaneAttrType::boolean: // handling is same as integer
{
unsigned __int64 value = plane.getPropInt64(prop.c_str(), unsetPlaneAttrValue);
if (unsetPlaneAttrValue != value)
Expand All @@ -7954,12 +7953,25 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
value *= attrInfo.scale;
}
}
values[propNum] = value;
break;
}
case PlaneAttrType::boolean:
{
unsigned __int64 value;
if (plane.hasProp(prop.c_str()))
value = plane.getPropBool(prop.c_str()) ? 1 : 0;
else if (FileSyncWriteClose == propNum) // temporary (if FileSyncWriteClose and unset, check legacy fileSyncMaxRetrySecs), purely for short term backward compatibility (see HPCC-32757)
{
unsigned __int64 v = plane.getPropInt64("expert/@fileSyncMaxRetrySecs", unsetPlaneAttrValue);
// NB: fileSyncMaxRetrySecs==0 is treated as set/enabled
value = v != unsetPlaneAttrValue ? 1 : 0;
if (unsetPlaneAttrValue != v)
value = 1;
else
value = unsetPlaneAttrValue;
}
else
value = unsetPlaneAttrValue;
values[propNum] = value;
break;
}
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ extern jlib_decl IPropertyTreeIterator * getRemoteStoragesIterator();
extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name);

extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize);
enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp
enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp
{
BlockedSequentialIO,
BlockedRandomIO,
Expand Down

0 comments on commit f0af494

Please sign in to comment.