Skip to content

Commit

Permalink
HPCC-32873 Prevent concurrent write to same file when spraying/despra…
Browse files Browse the repository at this point in the history
…ying

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Nov 5, 2024
1 parent ed05bcf commit dc4b77e
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 64 deletions.
83 changes: 75 additions & 8 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ void FileSprayer::afterTransfer()
}
}

bool FileSprayer::allowSplit()
bool FileSprayer::allowSplit() const
{
return !(options->getPropBool(ANnosplit) || options->getPropBool(ANnosplit2) || options->queryProp(ANprefix));
}
Expand Down Expand Up @@ -1592,12 +1592,31 @@ void FileSprayer::commonUpSlaves()
cur.whichSlave = 0;
}

if (options->getPropBool(ANnocommon, true) || pushWhole)
if (pushWhole)
return;

// noCommon is defaulted to on for non-containerized (revisit!)
bool noCommon = options->getPropBool(ANnocommon, !isContainerized());
if (noCommon)
{
if (!isContainerized())
return;
IWARNLOG("Ignoring noCommon option in containerized mode");
}

//First work out which are the same slaves, and then map the partition.
//Previously it was n^2 in partition, which is fine until you spray 100K files.
unsigned numSlaves = pull ? targets.ordinality() : sources.ordinality();
bool commonByIp = !isContainerized();

offset_t totalSourceFileSize = 0;
offset_t threshold = 0x8000 * numSlaves;
ForEachItemIn(i, sources)
{
const FilePartInfo & cur = sources.item(i);
totalSourceFileSize += copyCompressed ? cur.psize : cur.size;
}

unsigned * slaveMapping = new unsigned [numSlaves];
for (unsigned i = 0; i < numSlaves; i++)
slaveMapping[i] = i;
Expand All @@ -1609,22 +1628,32 @@ void FileSprayer::commonUpSlaves()
TargetLocation & cur = targets.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp)
match = targets.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (!targetSupportsConcurrentWrite || totalSourceFileSize < threshold)
match = targets.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
}
}
}
}
else
else // push
{
for (unsigned i1 = 1; i1 < numSlaves; i1++)
{
FilePartInfo & cur = sources.item(i1);
for (unsigned i2 = 0; i2 < i1; i2++)
{
if (sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP()))
bool match = false;
if (commonByIp) // match by IP
match = sources.item(i2).filename.queryIP().ipequals(cur.filename.queryIP());
else if (totalSourceFileSize < threshold)
match = sources.item(i2).filename.equals(cur.filename);
if (match)
{
slaveMapping[i1] = i2;
break;
Expand All @@ -1633,7 +1662,6 @@ void FileSprayer::commonUpSlaves()
}
}


for (unsigned i3 = 0; i3 < max; i3++)
{
PartitionPoint & cur = partition.item(i3);
Expand Down Expand Up @@ -2493,7 +2521,6 @@ void FileSprayer::insertHeaders()
}
}


bool FileSprayer::needToCalcOutput()
{
return !usePullOperation() || options->getPropBool(ANverify);
Expand Down Expand Up @@ -2607,6 +2634,7 @@ void FileSprayer::pullParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -2662,6 +2690,7 @@ void FileSprayer::pushParts()
transferSlaves.append(next);
}

// NB: not all transferServers will be used, depending on mapping of whichSlave
ForEachItemIn(idx3, partition)
{
PartitionPoint & cur = partition.item(idx3);
Expand Down Expand Up @@ -3048,6 +3077,7 @@ void FileSprayer::setTarget(IDistributedFile * target)
TargetLocation & next = * new TargetLocation(curPart->getFilename(rfn,copy), idx);
targets.append(next);
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand All @@ -3069,6 +3099,7 @@ void FileSprayer::setTarget(IFileDescriptor * target, unsigned copy)
target->getFilename(idx, copy, filename);
targets.append(*new TargetLocation(filename, idx));
}
target->getClusterGroupName(0, targetPlane.clear());

checkSprayOptions();
}
Expand Down Expand Up @@ -3281,6 +3312,8 @@ void FileSprayer::spray()
return;
}

targetSupportsConcurrentWrite = getConcurrentWriteSupported(targetPlane);

checkFormats();
checkForOverlap();

Expand Down Expand Up @@ -3984,11 +4017,45 @@ bool FileSprayer::calcUsePull() const
LOG(MCdebugInfo, "Use pull since explicitly specified");
return true;
}
if (options->getPropBool(ANpush, false))

bool pushRequested = options->getPropBool(ANpush);
if (!targetSupportsConcurrentWrite) // NB: default for containerized is false
{
if (!pushRequested)
return true;
if (!usePushWholeOperation())
{
if (targets.ordinality() <= sources.ordinality())
{
// NB: this is being calculated before partitioning has occurred
// It can be refactored so that it decides after partitioning, and only has to force pull
// if multiple partitions write to same target file.
LOG(MCdebugInfo, "Use pull operation because target doesn't support concurrent write");
return true;
}
// else targets > sources

// if push requested and N:M and no split, then throw an error unless expert option allows
if (!copySource) // 1:1 partitioning if copySource==true
{
if ((sources.ordinality() > 1) && (targets.ordinality() > 1) && !allowSplit())
{
if (!getComponentConfigSP()->getPropBool("expert/@allowPushNoSplit"))
throw makeStringExceptionV(0, "Pushing to multiple targets with no split is not supported to this target plane (%s)", targetPlane.str());
}
}
}
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
else // ! targetSupportsConcurrentWrite
{
if (pushRequested)
{
LOG(MCdebugInfo, "Use push since explicitly specified");
return false;
}
}

ForEachItemIn(idx2, sources)
{
Expand Down
4 changes: 3 additions & 1 deletion dali/ft/filecopy.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected:
void addTarget(unsigned idx, INode * node);
void afterGatherFileSizes();
void afterTransfer();
bool allowSplit();
bool allowSplit() const;
void analyseFileHeaders(bool setcurheadersize);
void assignPartitionFilenames();
void beforeTransfer();
Expand Down Expand Up @@ -307,6 +307,8 @@ protected:
Linked<IDistributedFile> distributedTarget;
Linked<IDistributedFile> distributedSource;
TargetLocationArray targets;
StringBuffer targetPlane;
bool targetSupportsConcurrentWrite = true; // if false, will prevent multiple writers to same target file (e.g. not supported by Azure Blob storage)
FileFormat srcFormat;
FileFormat tgtFormat;
Owned<IDFPartFilter> filter;
Expand Down
137 changes: 83 additions & 54 deletions system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1868,38 +1868,7 @@ IFileIO * CFile::openShared(IFOmode mode,IFSHmode share,IFEflags extraFlags)

//---------------------------------------------------------------------------

static const std::array<const char*, PlaneAttributeCount> planeAttributeTypeStrings =
{
"blockedFileIOKB",
"blockedRandomIOKB",
"fileSyncWriteClose"
};

static constexpr bool defaultGlobalFileSyncWriteCloseEnabled = false;
static bool globalFileSyncWriteCloseEnabled = defaultGlobalFileSyncWriteCloseEnabled;
static std::atomic<bool> globalFileSyncWriteCloseConfigured{false};
static CriticalSection globalFileSyncCS;
static bool getGlobalFileSyncWriteCloseEnabled()
{
if (!globalFileSyncWriteCloseConfigured)
{
CriticalBlock b(globalFileSyncCS);
if (!globalFileSyncWriteCloseConfigured)
{
Owned<IPropertyTree> global = getGlobalConfig();
Owned<IPropertyTree> config = getComponentConfig();
std::string propName = "expert/@" + std::string(planeAttributeTypeStrings[FileSyncWriteClose]);
if (config->hasProp(propName.c_str()))
globalFileSyncWriteCloseEnabled = config->getPropBool(propName.c_str());
else if (global->hasProp(propName.c_str()))
globalFileSyncWriteCloseEnabled = global->getPropBool(propName.c_str());
// else leave at default
globalFileSyncWriteCloseConfigured = true;
}
}
return globalFileSyncWriteCloseEnabled;
}

static std::atomic<bool> defaultFileSyncWriteCloseEnabled = false; // NB: set/updated by config updateFunc

extern jlib_decl IFileIO *createIFileIO(IFile * creator,HANDLE handle,IFOmode openmode,IFEflags extraFlags)
{
Expand Down Expand Up @@ -2128,7 +2097,7 @@ CFileIO::CFileIO(IFile * _creator, HANDLE handle, IFOmode _openmode, IFSHmode _s
if ('/' == filePath[0]) // only for absolute paths
{
unsigned __int64 value;
if (findPlaneAttrFromPath(filePath, FileSyncWriteClose, getGlobalFileSyncWriteCloseEnabled() ? 1 : 0, value)) // NB: returns only if plane found
if (findPlaneAttrFromPath(filePath, FileSyncWriteClose, defaultFileSyncWriteCloseEnabled ? 1 : 0, value)) // NB: returns only if plane found
{
if (value) // true or false
extraFlags = static_cast<IFEflags>(extraFlags | IFEsyncAtClose);
Expand Down Expand Up @@ -7928,6 +7897,23 @@ bool hasGenericFiletypeName(const char * name)
// Cache/update plane attributes settings
static unsigned jFileHookId = 0;


// Declare the array with an anonymous struct
enum PlaneAttrType { boolean, integer };
struct PlaneAttributeInfo
{
PlaneAttrType type;
size32_t scale;
bool isExpert;
const char *name;
};
static const std::array<PlaneAttributeInfo, PlaneAttributeCount> planeAttributeInfo = {{
{ PlaneAttrType::integer, 1024, false, "blockedFileIOKB" }, // enum PlaneAttributeType::BlockedSequentialIO {0}
{ PlaneAttrType::integer, 1024, false, "blockedRandomIOKB" }, // enum PlaneAttributeType::blockedRandomIOKB {1}
{ PlaneAttrType::boolean, 0, true, "fileSyncWriteClose" }, // enum PlaneAttributeType::fileSyncWriteClose {2}
{ PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" } // enum PlaneAttributeType::concurrentWriteSupport {3}
}};

// {prefix, {key1: value1, key2: value2, ...}}
typedef std::pair<std::string, std::array<unsigned __int64, PlaneAttributeCount>> PlaneAttributesMapElement;

Expand All @@ -7947,38 +7933,75 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
PlaneAttributesMapElement &element = planeAttributesMap[plane.queryProp("@name")];
element.first = plane.queryProp("@prefix");
auto &values = element.second;
unsigned __int64 value;
value = plane.getPropInt64(("@" + std::string(planeAttributeTypeStrings[BlockedSequentialIO])).c_str(), unsetPlaneAttrValue);
values[BlockedSequentialIO] = (unsetPlaneAttrValue != value) ? value * 1024 : value;
value = plane.getPropInt64(("@" + std::string(planeAttributeTypeStrings[BlockedRandomIO])).c_str(), unsetPlaneAttrValue);
values[BlockedRandomIO] = (unsetPlaneAttrValue != value) ? value * 1024 : value;

// plane expert settings
std::string propName = "expert/@" + std::string(planeAttributeTypeStrings[FileSyncWriteClose]);
if (plane.hasProp(propName.c_str()))
values[FileSyncWriteClose] = plane.getPropBool(propName.c_str()) ? 1 : 0;
else
for (unsigned propNum=0; propNum<PlaneAttributeType::PlaneAttributeCount; ++propNum)
{
// temporary (check legacy fileSyncMaxRetrySecs too), purely for short term backward compatibility (see HPCC-xxxx)
unsigned __int64 v = plane.getPropInt64("expert/@fileSyncMaxRetrySecs", unsetPlaneAttrValue);
// NB: fileSyncMaxRetrySecs==0 is treated as set/enabled
values[FileSyncWriteClose] = v != unsetPlaneAttrValue ? 1 : 0;
const PlaneAttributeInfo &attrInfo = planeAttributeInfo[propNum];
std::string prop;
if (attrInfo.isExpert)
prop += "expert/";
prop += "@" + std::string(attrInfo.name);
switch (attrInfo.type)
{
case PlaneAttrType::integer:
{
unsigned __int64 value = plane.getPropInt64(prop.c_str(), unsetPlaneAttrValue);
if (unsetPlaneAttrValue != value)
{
if (attrInfo.scale)
{
dbgassertex(PlaneAttrType::integer == attrInfo.type);
value *= attrInfo.scale;
}
}
values[propNum] = value;
break;
}
case PlaneAttrType::boolean:
{
unsigned __int64 value;
if (plane.hasProp(prop.c_str()))
value = plane.getPropBool(prop.c_str()) ? 1 : 0;
else if (FileSyncWriteClose == propNum) // temporary (if FileSyncWriteClose and unset, check legacy fileSyncMaxRetrySecs), purely for short term backward compatibility (see HPCC-32757)
{
unsigned __int64 v = plane.getPropInt64("expert/@fileSyncMaxRetrySecs", unsetPlaneAttrValue);
// NB: fileSyncMaxRetrySecs==0 is treated as set/enabled
if (unsetPlaneAttrValue != v)
value = 1;
else
value = unsetPlaneAttrValue;
}
else
value = unsetPlaneAttrValue;
values[propNum] = value;
break;
}
default:
throwUnexpected();
}
}
}

// reset defaults
expertEnableIFileFlagsMask = IFEnone;
expertDisableIFileFlagsMask = IFEnone;

Owned<IPropertyTree> componentConfig = getComponentConfig();
Owned<IPropertyTree> globalConfig = getGlobalConfig();
StringBuffer fileFlagsStr;
if (getComponentConfigSP()->getProp("expert/@enableIFileMask", fileFlagsStr) || getGlobalConfigSP()->getProp("expert/@enableIFileMask", fileFlagsStr))
if (componentConfig->getProp("expert/@enableIFileMask", fileFlagsStr) || globalConfig->getProp("expert/@enableIFileMask", fileFlagsStr))
expertEnableIFileFlagsMask = (IFEflags)strtoul(fileFlagsStr, NULL, 0);

if (getComponentConfigSP()->getProp("expert/@disableIFileMask", fileFlagsStr.clear()) || getGlobalConfigSP()->getProp("expert/@disableIFileMask", fileFlagsStr))
if (componentConfig->getProp("expert/@disableIFileMask", fileFlagsStr.clear()) || globalConfig->getProp("expert/@disableIFileMask", fileFlagsStr))
expertDisableIFileFlagsMask = (IFEflags)strtoul(fileFlagsStr, NULL, 0);

// clear for getGlobalFileSyncWriteCloseEnabled() to re-evaluate
globalFileSyncWriteCloseConfigured = false;
std::string propName = "expert/@" + std::string(planeAttributeInfo[FileSyncWriteClose].name);
if (componentConfig->hasProp(propName.c_str()))
defaultFileSyncWriteCloseEnabled = componentConfig->getPropBool(propName.c_str());
else
{
if (globalConfig->hasProp(propName.c_str()))
defaultFileSyncWriteCloseEnabled = globalConfig->getPropBool(propName.c_str());
}
};
jFileHookId = installConfigUpdateHook(updateFunc, true);

Expand All @@ -7993,7 +8016,7 @@ MODULE_EXIT()
const char *getPlaneAttributeString(PlaneAttributeType attr)
{
assertex(attr < PlaneAttributeCount);
return planeAttributeTypeStrings[attr];
return planeAttributeInfo[attr].name;
}

unsigned __int64 getPlaneAttributeValue(const char *planeName, PlaneAttributeType planeAttrType, unsigned __int64 defaultValue)
Expand Down Expand Up @@ -8063,5 +8086,11 @@ size32_t getBlockedRandomIOSize(const char *planeName, size32_t defaultSize)

bool getFileSyncWriteCloseEnabled(const char *planeName)
{
return 0 != getPlaneAttributeValue(planeName, FileSyncWriteClose, defaultGlobalFileSyncWriteCloseEnabled ? 1 : 0);
return 0 != getPlaneAttributeValue(planeName, FileSyncWriteClose, defaultFileSyncWriteCloseEnabled ? 1 : 0);
}

static constexpr bool defaultConcurrentWriteSupport = isContainerized() ? false : true;
bool getConcurrentWriteSupported(const char *planeName)
{
return 0 != getPlaneAttributeValue(planeName, ConcurrentWriteSupport, defaultConcurrentWriteSupport ? 1 : 0);
}
Loading

0 comments on commit dc4b77e

Please sign in to comment.