Skip to content

Commit

Permalink
Fix stuck test.
Browse files Browse the repository at this point in the history
  • Loading branch information
gianm committed Jul 30, 2024
1 parent ac49b8e commit ff34ede
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -128,6 +129,11 @@ public class WorkerImpl implements Worker
*/
private final ConcurrentHashMap<IntObjectPair<StageId>, CounterTracker> stageCounters = new ConcurrentHashMap<>();

/**
* Atomic that is set to true when {@link #run()} starts (or when {@link #stop()} is called before {@link #run()}).
*/
private final AtomicBoolean didRun = new AtomicBoolean();

/**
* Future that resolves when {@link #run()} completes.
*/
Expand Down Expand Up @@ -165,6 +171,10 @@ public String id()
@Override
public void run()
{
if (!didRun.compareAndSet(false, true)) {
throw new ISE("already run");
}

try (final Closer closer = Closer.create()) {
final KernelHolders kernelHolders = KernelHolders.create(context, closer);
controllerClient = kernelHolders.getControllerClient();
Expand Down Expand Up @@ -526,7 +536,13 @@ public void stop()
{
// stopGracefully() is called when the containing process is terminated, or when the task is canceled.
log.info("Worker id[%s] canceled.", context.workerId());
doCancel();

if (didRun.compareAndSet(false, true)) {
// run() hasn't been called yet. Set runFuture so awaitStop() still works.
runFuture.set(null);
} else {
doCancel();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,6 @@ public ListenableFuture<Void> cancelTask(String workerId)
final Worker worker = inMemoryWorkers.remove(workerId);
if (worker != null) {
worker.stop();
worker.awaitStop();
}
return Futures.immediateFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,6 @@ public ListenableFuture<Boolean> fetchChannelData(
@Override
public void close()
{
inMemoryWorkers.forEach((k, v) -> {
v.stop();
v.awaitStop();
});
inMemoryWorkers.forEach((k, v) -> v.stop());
}
}

0 comments on commit ff34ede

Please sign in to comment.