diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 883cec3f8..c6488a248 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -850,11 +850,13 @@ public void run() HpccRemoteFileReader fileReader = new HpccRemoteFileReader(readContext, filePart, new HPCCRecordBuilder(recordDef)); fileReader.getInputStream().setReadRequestDelay(readRequestDelay); + long recCount = 0; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -900,13 +902,15 @@ public void run() { try { + long recCount = 0; while (fileReader.hasNext()) { splitTable.addRecordPosition(fileReader.getStreamPosition()); HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recCount++; } + context.getCurrentOperation().recordsRead.addAndGet(recCount); splitTable.finish(fileReader.getStreamPosition()); @@ -1026,14 +1030,18 @@ public void run() { for (int k = 0; k < fileReaders.length; k++) { + long recordsRead = 0; + long recordsWritten = 0; HpccRemoteFileReader fileReader = fileReaders[k]; while (fileReader.hasNext()) { HPCCRecord record = fileReader.next(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); fileReader.close(); context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosition()); @@ -1185,14 +1193,19 @@ public void run() splitEnd = endingSplit.splitEnd; } + long recordsRead = 0; + long recordsWritten = 0; while (fileReader.hasNext() && fileReader.getStreamPosAfterLastRecord() < splitEnd) { HPCCRecord record = (HPCCRecord) fileReader.getNext(); fileWriter.writeRecord(record); - context.getCurrentOperation().recordsWritten.incrementAndGet(); - context.getCurrentOperation().recordsRead.incrementAndGet(); + recordsRead++; + recordsWritten++; } + context.getCurrentOperation().recordsWritten.addAndGet(recordsWritten); + context.getCurrentOperation().recordsRead.addAndGet(recordsRead); + context.getCurrentOperation().bytesRead.addAndGet(fileReader.getStreamPosAfterLastRecord()); inputStreams[j].close(); }