Skip to content

Commit

Permalink
Improve coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
AmatyaAvadhanula committed Oct 18, 2023
1 parent 368312e commit 548c4c0
Showing 1 changed file with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,48 @@ public void testSupervisorResetOneTaskSpecificOffsetsWithCheckpoints() throws In
validateSupervisorStateAfterResetOffsets(supervisor, resetOffsets, 0);
}

@Test
public void testGetActiveRealtimeSequencePrefixes() throws IOException
{
EasyMock.expect(spec.isSuspended()).andReturn(false);

replayAll();

final TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();

// Spin off two active tasks with each task serving one partition.
supervisor.getIoConfig().setTaskCount(3);
supervisor.start();
supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("0"),
ImmutableMap.of("0", "5"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task1"),
ImmutableSet.of()
);

supervisor.addTaskGroupToActivelyReadingTaskGroup(
supervisor.getTaskGroupIdForPartition("1"),
ImmutableMap.of("1", "6"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task2"),
ImmutableSet.of()
);

supervisor.addTaskGroupToPendingCompletionTaskGroup(
supervisor.getTaskGroupIdForPartition("2"),
ImmutableMap.of("2", "100"),
Optional.absent(),
Optional.absent(),
ImmutableSet.of("task3"),
ImmutableSet.of()
);

Assert.assertEquals(3, supervisor.getActiveRealtimeSequencePrefixes().size());
}

@Test
public void testSupervisorResetSpecificOffsetsTasksWithCheckpoints() throws InterruptedException, IOException
{
Expand Down

0 comments on commit 548c4c0

Please sign in to comment.