diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index d64d06da1a2..5472c3b7470 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -69,8 +69,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl ThorRowQueue *in; size32_t insz; ThorRowQueue *out; - Linked file; - Owned fileio; + CFileOwner tmpFileOwner; + Owned tempFileIO; SpinLock lock; bool waiting; Semaphore waitsem; @@ -139,12 +139,12 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl insz = 0; return; } - if (!fileio) { + if (!tempFileIO) { SpinUnblock unblock(lock); - fileio.setown(file->open(IFOcreaterw)); - if (!fileio) + tempFileIO.setown(tmpFileOwner.queryIFile().open(IFOcreaterw)); + if (!tempFileIO) { - throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",file->queryFilename()); + throw MakeStringException(-1,"CSmartRowBuffer::flush cannot write file %s",tmpFileOwner.queryIFile().queryFilename()); } } MemoryBuffer mb; @@ -182,7 +182,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t left = nb*blocksize-mb.length(); memset(mb.reserve(left),0,left); } - fileio->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tempFileIO->write(blk*(offset_t)blocksize,mb.length(),mb.bufferBase()); + tmpFileOwner.noteSize(numblocks*blocksize); mb.clear(); } if (waiting) { @@ -220,8 +221,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl size32_t readBlockSize = nb*blocksize; byte *buf = (byte *)ma.allocate(readBlockSize); CThorStreamDeserializerSource ds(readBlockSize,buf); - assertex(fileio.get()); - size32_t rd = fileio->read(blk*(offset_t)blocksize,readBlockSize,buf); + assertex(tempFileIO.get()); + size32_t rd = tempFileIO->read(blk*(offset_t)blocksize,readBlockSize,buf); assertex(rd==readBlockSize); for (;;) { byte b; @@ -246,8 +247,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); - CSmartRowBuffer(CActivityBase *_activity, IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) - : activity(_activity), file(_file), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) + CSmartRowBuffer(CActivityBase *_activity,IFile *_file,size32_t bufsize,IThorRowInterfaces *rowif) + : activity(_activity), tmpFileOwner(_file, _activity->queryTempFileSizeTracker()), allocator(rowif->queryRowAllocator()), serializer(rowif->queryRowSerializer()), deserializer(rowif->queryRowDeserializer()) { #ifdef _DEBUG putrecheck = false; @@ -282,11 +283,8 @@ class CSmartRowBuffer: public CSimpleInterface, implements ISmartRowBuffer, impl while (out->ordinality()) ReleaseThorRow(out->dequeue()); delete out; - if (fileio) - { - fileio.clear(); - file->remove(); - } + if (tempFileIO) + tempFileIO.clear(); } void putRow(const void *row) @@ -1197,8 +1195,8 @@ bool CRowSet::Release() const class CSharedWriteAheadDisk : public CSharedWriteAheadBase { - Owned spillFile; - Owned spillFileIO; + Owned tempFileOwner; + Owned tempFileIO; CIArrayOf freeChunks; PointerArrayOf freeChunksSized; QueueOf savedChunks; @@ -1420,7 +1418,7 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase } else { - Owned stream = createFileSerialStream(spillFileIO, chunk.offset); + Owned stream = createFileSerialStream(tempFileIO, chunk.offset); #ifdef TRACE_WRITEAHEAD unsigned diskChunkNum; stream->get(sizeof(diskChunkNum), &diskChunkNum); @@ -1484,7 +1482,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase mb.append((byte)0); size32_t len = mb.length(); chunk.setown(getOutOffset(len)); // will find space for 'len', might be bigger if from free list - spillFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileIO->write(chunk->offset, len, mb.toByteArray()); + tempFileOwner->noteSize(highOffset); #ifdef TRACE_WRITEAHEAD ActPrintLogEx(&activity->queryContainer(), thorlog_all, MCdebugProgress, "Flushed chunk = %d (savedChunks pos=%d), writeOffset = %" I64F "d, writeSize = %d", inMemRows->queryChunk(), savedChunks.ordinality(), chunk->offset, len); #endif @@ -1507,16 +1506,15 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase allocator(rowIf->queryRowAllocator()), deserializer(rowIf->queryRowDeserializer()), serializeMeta(meta->querySerializedDiskMeta()) { assertex(spillName); - spillFile.setown(createIFile(spillName)); - spillFile->setShareMode(IFSHnone); - spillFileIO.setown(spillFile->open(IFOcreaterw)); + Owned iFile = createIFile(spillName); + iFile->setShareMode(IFSHnone); + tempFileIO.setown(iFile->open(IFOcreaterw)); + tempFileOwner.setown(new CFileOwner(iFile.getClear(), activity->queryTempFileSizeTracker())); highOffset = 0; } ~CSharedWriteAheadDisk() { - spillFileIO.clear(); - if (spillFile) - spillFile->remove(); + tempFileIO.clear(); for (;;) { @@ -1536,7 +1534,8 @@ class CSharedWriteAheadDisk : public CSharedWriteAheadBase freeChunks.kill(); freeChunksSized.kill(); highOffset = 0; - spillFileIO->setSize(0); + tempFileIO->setSize(0); + tempFileOwner->noteSize(0); } };