Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.2.x' into candidate-…
Browse files Browse the repository at this point in the history
…9.4.x

Signed-off-by: Jake Smith <[email protected]>

# Conflicts:
#	commons-hpcc/pom.xml
#	dfsclient/pom.xml
#	pom.xml
#	wsclient/pom.xml
  • Loading branch information
jakesmith committed Feb 29, 2024
2 parents 5634319 + c3b26b1 commit 71c82a3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 28 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshots-on-merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Set up Maven Central Repository
uses: actions/setup-java@v3
with:
java-version: '8'
java-version: '11'
distribution: 'adopt'
server-id: ossrh
server-username: MAVEN_USERNAME
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,9 @@ public class RowServiceInputStream extends InputStream implements IProfilable

// Note: The platform may respond with more data than this if records are larger than this limit.
private static final int DEFAULT_MAX_READ_SIZE_KB = 4096;
private static final int PREFETCH_SLEEP_MS = 1;
private static final int SHORT_SLEEP_MS = 1;
private static final int LONG_WAIT_THRESHOLD_US = 100;
private static final int MAX_HOT_LOOP_NS = 10000;

// This is used to prevent the prefetch thread from hot looping when
// the network connection is slow. The read on the socket will block until
Expand Down Expand Up @@ -406,7 +407,11 @@ public RowServiceInputStream(DataPartition dp, FieldDef rd, FieldDef pRd, int co
this.skip(avail);

// Sleep 1ms to prevent hot loop from eating CPU resources
Thread.sleep(1);
try
{
Thread.sleep(SHORT_SLEEP_MS);
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
catch (IOException e)
Expand Down Expand Up @@ -435,27 +440,24 @@ public void run()
{
while (inputStream.isClosed() == false)
{
// If we don't have room in the buffer to fetch more data sleep
if (inputStream.getRemainingBufferCapacity() <= (inputStream.bufferPrefetchThresholdKB * 1024))
inputStream.prefetchData();

// Sleep after each prefetch to prevent hot loop from eating CPU resources
try
{
try
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(PREFETCH_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(PREFETCH_SLEEP_MS);
}
Thread.sleep(SHORT_SLEEP_MS);
}
catch(Exception e){}
}

inputStream.prefetchData();
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}
};
Expand Down Expand Up @@ -1340,14 +1342,34 @@ public int read() throws IOException
}

// We are waiting on a single byte so hot loop
long waitNS = 0;
long waitNS = System.nanoTime();
try
{
// Available will throw an exception when it reaches EOS and available bytes == 0
while (this.available() < 1)
{
long currentWaitNS = System.nanoTime() - waitNS;
if (currentWaitNS >= MAX_HOT_LOOP_NS)
{
try
{
if (CompileTimeConstants.PROFILE_CODE)
{
long sleepTime = System.nanoTime();
Thread.sleep(SHORT_SLEEP_MS);
sleepTime = System.nanoTime() - sleepTime;
sleepTimeNS += sleepTime;
}
else
{
Thread.sleep(SHORT_SLEEP_MS);
}
}
catch(InterruptedException e) {/*We don't care about waking early*/}
}
}

if (CompileTimeConstants.PROFILE_CODE)
{
waitNS = System.nanoTime();
while (this.available() < 1) {}
waitNS = System.nanoTime() - waitNS;
waitTimeNS += waitNS;

Expand All @@ -1357,10 +1379,6 @@ public int read() throws IOException
numLongWaits++;
}
}
else
{
while (this.available() < 1) {}
}
}
catch (IOException e)
{
Expand Down

0 comments on commit 71c82a3

Please sign in to comment.