Skip to content

Commit

Permalink
HPCC4J-644 FileUtilityTest atomic record count false sharing
Browse files Browse the repository at this point in the history
- Fixed false sharing on atomic counter in FileUtility

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Sep 16, 2024
1 parent b5f3618 commit 846e7ad
Showing 1 changed file with 19 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -850,11 +850,13 @@ public void run()
HpccRemoteFileReader<HPCCRecord> fileReader = new HpccRemoteFileReader<HPCCRecord>(readContext, filePart, new HPCCRecordBuilder(recordDef));
fileReader.getInputStream().setReadRequestDelay(readRequestDelay);

long recCount = 0;
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -900,13 +902,15 @@ public void run()
{
try
{
long recCount = 0;
while (fileReader.hasNext())
{
splitTable.addRecordPosition(fileReader.getStreamPosition());
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsRead.incrementAndGet();
recCount++;
}
context.getCurrentOperation().recordsRead.addAndGet(recCount);

splitTable.finish(fileReader.getStreamPosition());

Expand Down Expand Up @@ -1026,14 +1030,18 @@ public void run()
{
for (int k = 0; k < fileReaders.length; k++)
{
long recordsRead = 0;
long recordsWritten = 0;
HpccRemoteFileReader<HPCCRecord> fileReader = fileReaders[k];
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}
context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

fileReader.close();
context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition());
Expand Down Expand Up @@ -1185,14 +1193,19 @@ public void run()
splitEnd = endingSplit.splitEnd;
}

long recordsRead = 0;
long recordsWritten = 0;
while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd)
{
HPCCRecord record = (HPCCRecord) fileReader.getNext();
fileWriter.writeRecord(record);
context.getCurrentOperation().recordsWritten.incrementAndGet();
context.getCurrentOperation().recordsRead.incrementAndGet();
recordsRead++;
recordsWritten++;
}

context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten);
context.getCurrentOperation().recordsRead.addAndGet(recordsRead);

context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord());
inputStreams[j].close();
}
Expand Down

0 comments on commit 846e7ad

Please sign in to comment.