From ed06f88ee990c1eb2c614af918ae253666786fbc Mon Sep 17 00:00:00 2001 From: George Wu Date: Fri, 13 Dec 2024 15:39:57 -0500 Subject: [PATCH] add test --- .../SeekableStreamSupervisorStateTest.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index af66ce3b8b97..175089c2ca2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1000,6 +1000,28 @@ public void testStopping() verifyAll(); } + @Test + public void testStopGracefully() throws Exception + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + taskRunner.unregisterListener("testSupervisorId"); + indexTaskClient.close(); + recordSupplier.close(); + + replayAll(); + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.runInternal(); + ListenableFuture stopFuture = supervisor.stopAsync(false); + stopFuture.get(); + verifyAll(); + } + @Test public void testStoppingGracefully() {