Skip to content

Commit

Permalink
HPCC-33043 Ensure blob storage files are completely written before be…
Browse files Browse the repository at this point in the history
…ing read

Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Nov 28, 2024
1 parent ab280fe commit 8e7d451
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 1 deletion.
42 changes: 42 additions & 0 deletions dali/base/dadfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3779,6 +3779,47 @@ protected: friend class CDistributedFilePart;
clusters.kill();
}

//Ensure that enough time has passed from when the file was last modified for reads to be consistent
//Important for blob storage or remote, geographically synchronized storage
void checkWriteSync()
{
time_t modifiedTime = 0;
time_t now = 0;

Owned<IPropertyTreeIterator> iter = root->getElements("Cluster");
ForEach(*iter)
{
const char * name = iter->query().queryProp("@name");
unsigned marginMs = getWriteSyncMarginMs(name);
if (marginMs)
{
if (0 == modifiedTime)
{
CDateTime modified;
if (!getModificationTime(modified))
return;
modifiedTime = modified.getSimple();
}

if (0 == now)
now = time(&now);

//Round the elapsed time down - so that a change on the last ms of one time period does not count as a whole second of elapsed time
//This could be avoided if the modified time was more granular
unsigned __int64 elapsedMs = (now - modifiedTime) * 1000;
if (elapsedMs >= 1000)
elapsedMs -= 999;

if (unlikely(elapsedMs < marginMs))
{
LOG(MCuserProgress, "Delaying access to %s on %s for %ums to ensure write sync", queryLogicalName(), name, (unsigned)(marginMs - elapsedMs));
MilliSleep(marginMs - elapsedMs);
now = 0; // re-evaluate now - unlikely to actually happen
}
}
}
}

bool hasDirPerPart() const
{
return FileDescriptorFlags::none != (fileFlags & FileDescriptorFlags::dirperpart);
Expand Down Expand Up @@ -8299,6 +8340,7 @@ IDistributedFile *CDistributedFileDirectory::dolookup(CDfsLogicalFileName &_logi
}
CDistributedFile *ret = new CDistributedFile(this,fcl.detach(),*logicalname,accessMode,user); // found
ret->setSuperOwnerLock(superOwnerLock.detach());
ret->checkWriteSync();
return ret;
}
// now super file
Expand Down
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,11 @@
"eclwatchVisible": {
"type": "boolean"
},
"writeSyncMarginMs": {
"description": "Time that is required to elapse between writing a file and all read copies to be consistently updated",
"type": "integer",
"default": 0
},
"components": {},
"prefix": {},
"subPath": {},
Expand Down
9 changes: 8 additions & 1 deletion system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7911,7 +7911,8 @@ static const std::array<PlaneAttributeInfo, PlaneAttributeCount> planeAttributeI
{ PlaneAttrType::integer, 1024, false, "blockedFileIOKB" }, // enum PlaneAttributeType::BlockedSequentialIO {0}
{ PlaneAttrType::integer, 1024, false, "blockedRandomIOKB" }, // enum PlaneAttributeType::blockedRandomIOKB {1}
{ PlaneAttrType::boolean, 0, true, "fileSyncWriteClose" }, // enum PlaneAttributeType::fileSyncWriteClose {2}
{ PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" } // enum PlaneAttributeType::concurrentWriteSupport {3}
{ PlaneAttrType::boolean, 0, true, "concurrentWriteSupport" },// enum PlaneAttributeType::concurrentWriteSupport {3}
{ PlaneAttrType::integer, 1, false, "writeSyncMarginMs" }, // enum PlaneAttributeType::WriteSyncMarginMs {4}
}};

// {prefix, {key1: value1, key2: value2, ...}}
Expand Down Expand Up @@ -8089,6 +8090,12 @@ bool getFileSyncWriteCloseEnabled(const char *planeName)
return 0 != getPlaneAttributeValue(planeName, FileSyncWriteClose, defaultFileSyncWriteCloseEnabled ? 1 : 0);
}

unsigned getWriteSyncMarginMs(const char * planeName)
{
constexpr unsigned dft = 0;
return (unsigned)getPlaneAttributeValue(planeName, WriteSyncMarginMs, dft);
}

static constexpr bool defaultConcurrentWriteSupport = isContainerized() ? false : true;
bool getConcurrentWriteSupported(const char *planeName)
{
Expand Down
3 changes: 3 additions & 0 deletions system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,12 +744,14 @@ extern jlib_decl IPropertyTreeIterator * getRemoteStoragesIterator();
extern jlib_decl IPropertyTreeIterator * getPlanesIterator(const char * category, const char *name);

extern jlib_decl IFileIO *createBlockedIO(IFileIO *base, size32_t blockSize);
//MORE: Should use enum class to avoid potential symbol clashes
enum PlaneAttributeType // remember to update planeAttributeInfo in jfile.cpp
{
BlockedSequentialIO,
BlockedRandomIO,
FileSyncWriteClose,
ConcurrentWriteSupport,
WriteSyncMarginMs,
PlaneAttributeCount
};
extern jlib_decl const char *getPlaneAttributeString(PlaneAttributeType attr);
Expand All @@ -761,6 +763,7 @@ extern jlib_decl size32_t getBlockedFileIOSize(const char *planeName, size32_t d
extern jlib_decl size32_t getBlockedRandomIOSize(const char *planeName, size32_t defaultSize=0);
extern jlib_decl bool getFileSyncWriteCloseEnabled(const char *planeName);
extern jlib_decl bool getConcurrentWriteSupported(const char *planeName);
extern jlib_decl unsigned getWriteSyncMarginMs(const char * planeName);

//---- Pluggable file type related functions ----------------------------------------------

Expand Down

0 comments on commit 8e7d451

Please sign in to comment.