From f6ab57de9ba4304e5844096d13f6d3d23bfcf960 Mon Sep 17 00:00:00 2001 From: Zachary Mulgrew Date: Tue, 12 Mar 2024 13:25:01 -0700 Subject: [PATCH] Fix broken implementation of ParTaskImpl::tasksFromIterable (#347) The `tasksFromIterable` method in `ParTaskImpl` needs to read the contents of an iterator and then create an array of the read tasks. This is an unfortunate performance/memory tradeoff of using these two things: iterators don't have a length and arrays need one. The implementation reads the iterator's contents into a collection and then turns that collection into an array. The problem is, it then tries to cast that array of `Object` into a more narrow type, which is not allowed. This _always_ results in a ClassCastException if the value is non null. The exists tests did not cover this case. The code has been fixed to take a less performant path and delegate the array creation to `tasksFromCollection`. A test has been written for the `Iterable` code path and verified through coverage analysis. --- .../java/com/linkedin/parseq/ParTaskImpl.java | 2 +- .../java/com/linkedin/parseq/TestParTask.java | 50 ++++++++++++++++++- 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/subprojects/parseq/src/main/java/com/linkedin/parseq/ParTaskImpl.java b/subprojects/parseq/src/main/java/com/linkedin/parseq/ParTaskImpl.java index 4b1c468e..24ce340e 100644 --- a/subprojects/parseq/src/main/java/com/linkedin/parseq/ParTaskImpl.java +++ b/subprojects/parseq/src/main/java/com/linkedin/parseq/ParTaskImpl.java @@ -73,7 +73,7 @@ private Task>[] tasksFromIterable(Iterable coercedTask = (Task) task; taskList.add(coercedTask); } - return (Task>[]) taskList.toArray(); + return tasksFromCollection(taskList); } @SuppressWarnings("unchecked") diff --git a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestParTask.java b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestParTask.java index 725d5599..88c37e5b 100644 --- a/subprojects/parseq/src/test/java/com/linkedin/parseq/TestParTask.java +++ b/subprojects/parseq/src/test/java/com/linkedin/parseq/TestParTask.java @@ -8,7 +8,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -25,6 +27,25 @@ * @author Chris Pettitt */ public class TestParTask extends BaseEngineTest { + /** + * A helper to create an {@link Iterable} that does not also implement java.util.Collection. + * + * @param tasks an array of tasks to return from the iterable + * @return an iterable over provided array of tasks + * @param the type of task + */ + private static Iterable> asIterable(final Task[] tasks) { + return () -> new Iterator>() { + int current = 0; + public boolean hasNext() { return current < tasks.length; } + public Task next() { + if (!hasNext()) { throw new NoSuchElementException(); } + return tasks[current++]; + } + public void remove() { throw new IllegalStateException("Not implemented for tests."); } + }; + } + @Test public void testIterableParWithEmptyList() { try { @@ -48,6 +69,31 @@ public void testIterableParWithSingletonList() throws InterruptedException { assertEquals(valueStr, task.get()); } + @Test + public void testCollectionSeqWithMultipleElements() throws InterruptedException { + final int iters = 500; + + final Task[] tasks = new BaseTask[iters]; + final AtomicInteger counter = new AtomicInteger(0); + for (int i = 0; i < iters; i++) { + tasks[i] = Task.action("task-" + i, () -> { + // Note: We intentionally do not use CAS. We guarantee that + // the run method of Tasks are never executed in parallel. + final int currentCount = counter.get(); + counter.set(currentCount + 1); + } ); + } + + final ParTask par = par(Arrays.asList(tasks)); // The returned object implements Collection. + + runAndWait("TestParTask.testIterableSeqWithMultipleElements", par); + + assertEquals(500, par.getSuccessful().size()); + assertEquals(500, par.getTasks().size()); + assertEquals(500, par.get().size()); + assertEquals(500, counter.get()); + } + @Test public void testIterableSeqWithMultipleElements() throws InterruptedException { final int iters = 500; @@ -63,7 +109,7 @@ public void testIterableSeqWithMultipleElements() throws InterruptedException { } ); } - final ParTask par = par(Arrays.asList(tasks)); + final ParTask par = par(asIterable(tasks)); runAndWait("TestParTask.testIterableSeqWithMultipleElements", par); @@ -77,7 +123,7 @@ public void testIterableSeqWithMultipleElements() throws InterruptedException { @Test public void testAsyncTasksInPar() throws InterruptedException { // Tasks cannot have their run methods invoked at the same time, however - // asynchronous tasks are allowed to execute concurrently outside of their + // asynchronous tasks are allowed to execute concurrently outside their // run methods. This test verifies that two asynchronous tasks are not // serialized such that one must complete before the other.