From c9b92927a21446002618b560ffa0678b706b63e1 Mon Sep 17 00:00:00 2001 From: James McMullan Date: Mon, 16 Sep 2024 11:39:32 -0400 Subject: [PATCH] HPCC4J-644 FileUtilityTest atomic record count false sharing - Fixed false sharing on atomic counter in FileUtility Signed-off-by: James McMullan James.McMullan@lexisnexis.com --- .../hpccsystems/dfs/client/FileUtility.java | 25 ++++++++++++++----- 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index c41ae0946..8e87a24ce 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -843,11 +843,13 @@ public void run() readContext.originalRD = recordDef; HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); + long recCount = 0; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -893,13 +895,15 @@ public void run() { try { + long recCount = 0; while (fileReader.hasNext()) { splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); splitTable.finish(fileReader.getStreamPosition()); @@ -1019,14 +1023,18 @@ public void run() { for (int k = 0; k < fileReaders.length; k++) { + long recordsRead = 0; + long recordsWritten = 0; HpccRemoteFileReader fileReader = fileReaders[k]; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -1178,14 +1186,19 @@ public void run() splitEnd = endingSplit.splitEnd; } + long recordsRead = 0; + long recordsWritten = 0; while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd) { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); }