Skip to content

Commit

Permalink
HPCC-32584 Ensure that dfuserver closes files cleanly so errors are r…
Browse files Browse the repository at this point in the history
…eported

Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Sep 3, 2024
1 parent 3015116 commit 091f0e2
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/dllserver/thorplugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ const StringArray &HelperDll::queryManifestFiles(const char *type, const char *w
OwnedIFileIO o = f->open(IFOcreate);
assertex(o.get() != nullptr);
o->write(0, len, data);
o->close();

list->append(extractName);
if (doTrace(traceJava) && streq(type, "jar"))
DBGLOG("Extracted jar resource %u size %u to %s in %u ms", id, len, extractName.str(), msTick() - start);
Expand Down
2 changes: 2 additions & 0 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5289,6 +5289,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface
iFileIO->write(0, sizeof(unsigned), crc);
if (storeInfo)
storeInfo->cache.set(filename.str());
iFileIO->close();
}

void updateStoreInfo(const char *base, const char *location, unsigned edition, unsigned *crc, CStoreInfo *storeInfo=NULL)
Expand Down Expand Up @@ -5400,6 +5401,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface

OwnedIFileIO detachIPIO = detachIPIFile->open(IFOcreate);
detachIPIO->write(0, sizeof(storeHelper.mySessId), &storeHelper.mySessId);
detachIPIO->close();
detachIPIO.clear();
detachIPIFile->rename(activeDetachIPStr.str());
// check often do not wait any longer than necessary
Expand Down
1 change: 1 addition & 0 deletions dali/ft/filecopy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ void FileSprayer::beforeTransfer()
io->write(lastOutputOffset-sizeof(null), sizeof(null), &null);
}
}
io->close();
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions dali/ft/ftbase.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ public:
virtual offset_t tell() override;
virtual size32_t write(size32_t len, const void * data) override;
virtual unsigned __int64 getStatistic(StatisticKind kind) override { return stream->getStatistic(kind); }
virtual void close() override
{
stream->close();
}

unsigned getCRC() { return crc; }
void setCRC(unsigned long _crc) { crc = _crc; }
Expand Down
9 changes: 9 additions & 0 deletions dali/ft/fttransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,8 @@ bool TransferServer::pull()
assertex(curProgress.status != OutputProgress::StatusRenamed);
if (curProgress.status != OutputProgress::StatusCopied)
{
if (out)
out->close();
out.setown(createIOStream(outio));
out->seek(progressOffset, IFSbegin);
wrapOutInCRC(curProgress.outputCRC);
Expand Down Expand Up @@ -885,6 +887,8 @@ bool TransferServer::pull()
}
}

if (out)
out->close();
out.setown(createIOStream(outio));
out->seek(0, IFSbegin);
wrapOutInCRC(0);
Expand All @@ -903,7 +907,10 @@ bool TransferServer::pull()
}

crcOut.clear();
if (out)
out->close();
out.clear();

//Once the transfers have completed, rename the files, and sync file times
//if replicating...
if (!isSafeMode)
Expand Down Expand Up @@ -997,6 +1004,7 @@ bool TransferServer::push()
}
outio.setown(createCompressedFileWriter(outio, false, 0, true, compressor, COMPRESS_METHOD_LZ4));
}

out.setown(createIOStream(outio));
if (!compressOutput)
out->seek(curPartition.outputOffset + curProgress.outputLength, IFSbegin);
Expand All @@ -1011,6 +1019,7 @@ bool TransferServer::push()
sendProgress(curProgress);
}
crcOut.clear();
out->close();
out.clear();
}
}
Expand Down
6 changes: 6 additions & 0 deletions system/jlib/jfcmp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ class CFcmpStream : public CSimpleInterfaceOf<IFileIOStream>
{
return baseio->getStatistic(kind);
}

virtual void close() override
{
flush();
baseio->close();
}
};

#endif
17 changes: 16 additions & 1 deletion system/jlib/jfile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2953,7 +2953,11 @@ class CBufferedFileIOStream : public CBufferedFileIOStreamBase
{
return io->getStatistic(kind);
}

virtual void close() override
{
flush();
io->close();
}
protected:
IFileIOAttr io;
};
Expand Down Expand Up @@ -3064,6 +3068,13 @@ class CBufferedAsyncIOStream: public CBufferedFileIOStreamBase
virtual size32_t directWrite(size32_t len, const void * data) { assertex(false); return 0; } // shouldn't get called
virtual offset_t directSize() { waitAsyncWrite(); return io->size(); }
virtual unsigned __int64 getStatistic(StatisticKind kind) { return io->getStatistic(kind); }
virtual void close() override
{
flush();
waitAsyncWrite();
waitAsyncRead();
io->close();
}
};


Expand Down Expand Up @@ -4571,6 +4582,10 @@ IFileIOStream *createProgressIFileIOStream(IFileIOStream *iFileIOStream, offset_
{
return iFileIOStream->getStatistic(kind);
}
virtual void close() override
{
iFileIOStream->close();
}
};
return new CProgressIFileIOStream(iFileIOStream, totalSize, msg, periodSecs);
}
Expand Down
1 change: 1 addition & 0 deletions system/jlib/jfile.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ interface IFileIOStream : extends IIOStream
virtual offset_t size() = 0;
virtual offset_t tell() = 0;
virtual unsigned __int64 getStatistic(StatisticKind kind) = 0;
virtual void close() = 0;
};

interface IDiscretionaryLock: extends IInterface
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jfile.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ public:
virtual offset_t tell();
virtual size32_t write(size32_t len, const void * data);
virtual unsigned __int64 getStatistic(StatisticKind kind) { return io->getStatistic(kind); }
virtual void close() override { io->close(); }
protected:
Linked<IFileIO> io;
offset_t curOffset;
Expand All @@ -238,6 +239,7 @@ public:
virtual offset_t tell();
virtual size32_t write(size32_t len, const void * data);
virtual unsigned __int64 getStatistic(StatisticKind kind) { return stream->getStatistic(kind); }
virtual void close() override { stream->close(); }
protected:
Linked<IFileIOStream> stream;
};
Expand Down
1 change: 1 addition & 0 deletions tools/dumpkey/dumpkey.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ class DummyFileIOStream : public CInterfaceOf<IFileIOStream>
virtual offset_t size() override { return hwm; }
virtual offset_t tell() override { return offset; }
virtual unsigned __int64 getStatistic(StatisticKind kind) { return stats.getStatistic(kind); }
virtual void close() override { }
private:
offset_t offset = 0;
offset_t hwm = 0;
Expand Down

0 comments on commit 091f0e2

Please sign in to comment.