Skip to content

Commit

Permalink
HPCC4J-577 Added Read Retry to HPCCRemoteFileReader
Browse files Browse the repository at this point in the history
- Added Read Retry to HPCCRemoteFileReader
- Minor improvements to FileUtility to improve testing

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed May 3, 2024
1 parent cd1e704 commit 212831e
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class FileUtility
private static final int DEFAULT_SPLIT_TABLE_SIZE = 128;

private static final int NUM_DEFAULT_THREADS = 4;
static private final int DEFAULT_ACCESS_EXPIRY_SECONDS = 120;

private static class TaskContext
{
Expand Down Expand Up @@ -425,6 +426,7 @@ private static Options getReadTestOptions()
options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
options.addOption("access_expiry_seconds", true, "Access token expiration seconds.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
Expand Down Expand Up @@ -633,11 +635,6 @@ private static String[] filterFilesByFormat(String[] srcFiles, FileFormat format

private static void executeTasks(Runnable[] tasks, int numThreads) throws Exception
{
if (tasks.length > numThreads)
{
numThreads = tasks.length;
}

int numTasksPerThread = tasks.length / numThreads;
int numResidualTasks = tasks.length % numThreads;

Expand Down Expand Up @@ -686,16 +683,15 @@ private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDe
{
final int taskIndex = i;
final DataPartition filePart = fileParts[taskIndex];
final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

tasks[taskIndex] = new Runnable()
{
HpccRemoteFileReader<HPCCRecord> fileReader = filePartReader;

public void run()
{
try
{
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
Expand Down Expand Up @@ -1250,6 +1246,18 @@ private static void performReadTest(String[] args, TaskContext context)
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

int expirySeconds = DEFAULT_ACCESS_EXPIRY_SECONDS;
String expirySecondsStr = cmd.getOptionValue("access_expiry_seconds", "" + expirySeconds);
try
{
expirySeconds = Integer.parseInt(expirySecondsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for access_expiry_seconds: "
+ numThreadsStr + ", must be an integer. Defaulting to: " + DEFAULT_ACCESS_EXPIRY_SECONDS + "s.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
Expand Down Expand Up @@ -1277,6 +1285,7 @@ private static void performReadTest(String[] args, TaskContext context)
try
{
file = new HPCCFile(datasetName, connString, user, pass);
file.setFileAccessExpirySecs(expirySeconds);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
private boolean handlePrefetch = true;
private boolean isClosed = false;
private boolean canReadNext = true;
private boolean createPrefetchThread = true;
private int retryCount = 0;
private int connectTimeout = 0;
private int readSizeKB = 0;
private int limit = -1;
private int maxReadRetries = DEFAULT_READ_RETRIES;
private int socketOpTimeoutMs = 0;
private long openTimeMs = 0;
private long recordsRead = 0;

public static final int NO_RECORD_LIMIT = -1;
public static final int DEFAULT_READ_SIZE_OPTION = -1;
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1;
public static final int DEFAULT_READ_RETRIES = 3;

public static class FileReadResumeInfo
{
Expand Down Expand Up @@ -189,18 +197,23 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
{
this.handlePrefetch = createPrefetchThread;
this.originalRecordDef = originalRD;
if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Original record definition is null.");
}
this.dataPartition = dp;
this.recordBuilder = recBuilder;
this.readSizeKB = readSizeKB;
this.limit = limit;
this.createPrefetchThread = createPrefetchThread;
this.socketOpTimeoutMs = socketOpTimeoutMs;

if (connectTimeout < 1)
{
connectTimeout = RowServiceInputStream.DEFAULT_CONNECT_TIMEOUT_MILIS;
}
this.connectTimeout = connectTimeout;

this.dataPartition = dp;
this.recordBuilder = recBuilder;
if (this.originalRecordDef == null)
{
throw new Exception("HpccRemoteFileReader: Provided original record definition is null, original record definition is required.");
}

FieldDef projectedRecordDefinition = recBuilder.getRecordDefinition();
if (projectedRecordDefinition == null)
Expand Down Expand Up @@ -246,6 +259,61 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
openTimeMs = System.currentTimeMillis();
}

private boolean retryRead()
{
if (retryCount < maxReadRetries)
{
log.info("Retrying read for " + this.dataPartition.toString() + " retry count: " + retryCount);
retryCount++;

FileReadResumeInfo resumeInfo = getFileReadResumeInfo();
RowServiceInputStream.RestartInformation restartInfo = new RowServiceInputStream.RestartInformation();
restartInfo.streamPos = resumeInfo.inputStreamPos;
restartInfo.tokenBin = resumeInfo.tokenBin;

try
{
this.inputStream.close();
}
catch (Exception e) {}

try
{
this.inputStream = new RowServiceInputStream(this.dataPartition, this.originalRecordDef,
this.recordBuilder.getRecordDefinition(), this.connectTimeout, this.limit, this.createPrefetchThread,
this.readSizeKB, restartInfo, false, this.socketOpTimeoutMs);
long bytesToSkip = resumeInfo.recordReaderStreamPos - resumeInfo.inputStreamPos;
if (bytesToSkip < 0)
{
throw new Exception("Unable to restart read stream, unexpected stream position in record reader.");
}
this.inputStream.skip(bytesToSkip);

this.binaryRecordReader = new BinaryRecordReader(this.inputStream, resumeInfo.recordReaderStreamPos);
this.binaryRecordReader.initialize(this.recordBuilder);
}
catch (Exception e)
{
log.error("Failed to retry read for " + this.dataPartition.toString() + " " + e.getMessage(), e);
return false;
}

return true;
}

return false;
}

/**
* Sets the maximum number of times to retry a read operation before failing.
*
* @param maxReadRetries maximum number of read retries
*/
public void setMaxReadRetries(int maxReadRetries)
{
this.maxReadRetries = maxReadRetries;
}

/**
* Returns the stream position within the file.
*
Expand Down Expand Up @@ -363,11 +431,16 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
if (!retryRead())
{
canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString(), e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
}

return hasNext();
}

return canReadNext;
Expand All @@ -393,10 +466,15 @@ public T next()
}
catch (HpccFileException e)
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
if (!retryRead())
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage(), e);
java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
exception.initCause(e);
throw exception;
}

return next();
}

recordsRead++;
Expand Down

0 comments on commit 212831e

Please sign in to comment.