Skip to content

Commit

Permalink
add another test
Browse files Browse the repository at this point in the history
  • Loading branch information
georgew5656 committed Dec 16, 2024
1 parent ed06f88 commit 93024c7
Showing 1 changed file with 71 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.Test;

import javax.annotation.Nullable;
import java.util.concurrent.Future;

public class StreamSupervisorTest
{
Expand Down Expand Up @@ -100,4 +101,74 @@ public int getActiveTaskGroupsCount()
ex.getMessage()
);
}

@Test
public void testDefaultStopAsync()
{
// Create an instance of stream supervisor without overriding stopAsync().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING;

@Override
public void start()
{

}

@Override
public void stop(boolean stopGracefully)
{
state = SupervisorStateManager.BasicState.STOPPING;
}

@Override
public SupervisorReport getStatus()
{
return null;
}

@Override
public SupervisorStateManager.State getState()
{
return state;
}

@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{

}

@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{

}

@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{

}

@Override
public LagStats computeLagStats()
{
return null;
}

@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};

Future<Void> stopAsyncFuture = streamSupervisor.stopAsync(true);
Assert.assertTrue(stopAsyncFuture.isDone());

// stop should be called by stopAsync
Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState());
}
}

0 comments on commit 93024c7

Please sign in to comment.