Skip to content

Commit

Permalink
HPCC4J-579 prefetch thread hot loop fix
Browse files Browse the repository at this point in the history
- Changed prefetch thread behavior to short sleep after each request
- Added short sleep on main thread if blocked by prefetch thread for more than 10us

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Feb 28, 2024
1 parent 4b47a56 commit 3818c6a
Showing 1 changed file with 45 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable

// Note: The platform may respond with more data than this if records are larger than this limit.
private static final int DEFAULT_MAX_READ_SIZE_KB = 4096;
private static final int PREFETCH_SLEEP_MS = 1;
private static final int SHORT_SLEEP_MS = 1;
private static final int LONG_WAIT_THRESHOLD_US = 100;
private static final int MAX_HOT_LOOP_NS = 10000;

// This is used to prevent the prefetch thread from hot looping when
// the network connection is slow. The read on the socket will block until
Expand Down Expand Up @@ -406,7 +407,11 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
this.skip(avail);

// Sleep 1ms to prevent hot loop from eating CPU resources
Thread.sleep(1);
try
{
Thread.sleep(SHORT_SLEEP_MS);
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
catch (IOException e)
Expand Down Expand Up @@ -435,27 +440,24 @@ public void run()
{
while (inputStream.isClosed() == false)
{
// If we don't have room in the buffer to fetch more data sleep
if (inputStream.getRemainingBufferCapacity() <= (inputStream.bufferPrefetchThresholdKB * 1024))
inputStream.prefetchData();

// Sleep after each prefetch to prevent hot loop from eating CPU resources
try
{
try
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(PREFETCH_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(PREFETCH_SLEEP_MS);
}
Thread.sleep(SHORT_SLEEP_MS);
}
catch(Exception e){}
}

inputStream.prefetchData();
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
};
Expand Down Expand Up @@ -1335,14 +1337,34 @@ public int read() throws IOException
}

// We are waiting on a single byte so hot loop
long waitNS = 0;
long waitNS = System.nanoTime();
try
{
// Available will throw an exception when it reaches EOS and available bytes == 0
while (this.available() < 1)
{
long currentWaitNS = System.nanoTime() - waitNS;
if (currentWaitNS >= MAX_HOT_LOOP_NS)
{
try
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(SHORT_SLEEP_MS);
}
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}

if (CompileTimeConstants.PROFILE_CODE)
{
waitNS = System.nanoTime();
while (this.available() < 1) {}
waitNS = System.nanoTime() - waitNS;
waitTimeNS += waitNS;

Expand All @@ -1352,10 +1374,6 @@ public int read() throws IOException
numLongWaits++;
}
}
else
{
while (this.available() < 1) {}
}
}
catch (IOException e)
{
Expand Down

0 comments on commit 3818c6a

Please sign in to comment.