Skip to content

Commit

Permalink
HPCC4J-556: File Reader Early Close Behavior
Browse files Browse the repository at this point in the history
- Modified HPCCRemoteFileReader to enforce consistent early close behavior
- Added unit test to ensure consistent behavior on early close
- Minor changes to exception handling

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Nov 16, 2023
1 parent 30c2e6a commit 0d2459b
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private BinaryRecordReader binaryRecordReader;
private IRecordBuilder recordBuilder = null;
private boolean handlePrefetch = true;
private boolean isClosed = false;
private boolean canReadNext = true;
private long openTimeMs = 0;
private long recordsRead = 0;

Expand Down Expand Up @@ -234,7 +236,6 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.binaryRecordReader.initialize(this.recordBuilder);
}


log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart()
+ (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" ));
log.trace("Original record definition:\n"
Expand Down Expand Up @@ -315,12 +316,18 @@ public String getRemoteReadMessages()
*/
public void prefetch()
{
if (this.handlePrefetch)
if (handlePrefetch)
{
log.warn("Prefetch called on an HpccRemoteFileReader that has an internal prefetch thread.");
return;
}

if (isClosed)
{
log.warn("Prefetch called on an HpccRemoteFileReader that has been closed.");
return;
}

this.inputStream.prefetchData();
}

Expand All @@ -332,10 +339,19 @@ public void prefetch()
@Override
public boolean hasNext()
{
boolean rslt = false;
if (isClosed)
{
log.warn("hasNext() called on an HpccRemoteFileReader that has been closed.");
return false;
}

// Keep track of whether we have said there is another record.
// This allows us to handle edge cases around close() being called between hasNext() and next()
canReadNext = false;

try
{
rslt = this.binaryRecordReader.hasNext();
canReadNext = this.binaryRecordReader.hasNext();

// Has next may not catch the prefetch exception if it occurs at the beginning of a read
// This is due to InputStream.hasNext() being allowed to throw an IOException when closed.
Expand All @@ -346,12 +362,14 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
rslt = false;
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString());
throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
}

return rslt;
return canReadNext;
}

/**
Expand All @@ -362,6 +380,11 @@ public boolean hasNext()
@Override
public T next()
{
if (isClosed && !canReadNext)
{
throw new java.util.NoSuchElementException("Fatal read error: Attempting to read next() from a closed file reader.");
}

Object rslt = null;
try
{
Expand All @@ -370,10 +393,16 @@ public T next()
catch (HpccFileException e)
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
}

recordsRead++;

// Reset this after each read so we can handle edge cases where close() was called between hasNext() / next()
canReadNext = false;

return (T) rslt;
}

Expand All @@ -385,8 +414,14 @@ public T next()
*/
public void close() throws Exception
{
if (isClosed)
{
return;
}

report();
this.inputStream.close();
isClosed = true;

long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,77 @@ public void invalidSignatureTest()
}
}

@Test
public void earlyCloseTest() throws Exception
{
HPCCFile file = new HPCCFile(datasets[0], connString , hpccUser, hpccPass);

DataPartition[] fileParts = file.getFileParts();
if (fileParts == null || fileParts.length == 0)
{
Assert.fail("No file parts found");
}

FieldDef originalRD = file.getRecordDefinition();
if (originalRD == null || originalRD.getNumDefs() == 0)
{
Assert.fail("Invalid or null record definition");
}

{
HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[0], originalRD, recordBuilder);

int expectedRecordCounts = 10;
int numRecords = 0;
while (fileReader.hasNext())
{
try
{
fileReader.next();
numRecords++;
}
catch (Exception e)
{
System.out.println("Error: " + e.getMessage());
}

if (numRecords == expectedRecordCounts)
{
fileReader.close();
}
}
assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts);
}

// Check that calling close() inbetween hasNext() & next() allows the current record to be read
{
HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(fileParts[0], originalRD, recordBuilder);

int expectedRecordCounts = 11;
int numRecords = 0;
while (fileReader.hasNext())
{
if (numRecords == expectedRecordCounts-1)
{
fileReader.close();
}

try
{
fileReader.next();
numRecords++;
}
catch (Exception e)
{
System.out.println("Error: " + e.getMessage());
}
}
assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts);
}
}

public List<HPCCRecord> readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception
{
return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING);
Expand Down

0 comments on commit 0d2459b

Please sign in to comment.