Skip to content

Commit

Permalink
Consider only supervisors with append lock for concurrent transaction…
Browse files Browse the repository at this point in the history
…al replace (apache#15220)

A SegmentTransactionReplaceAction must only update the mapping of tasks with append locks that are running concurrently. To ensure this, we return the supervisor id only if it has the taskLockType as APPEND in its context.
  • Loading branch information
AmatyaAvadhanula authored Oct 22, 2023
1 parent fbbb9c7 commit 33fdd77
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox)
{
final SupervisorManager supervisorManager = toolbox.getSupervisorManager();
final Optional<String> activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource());
if (!activeSupervisorId.isPresent()) {
final Optional<String> activeSupervisorIdWithAppendLock =
supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource());
if (!activeSupervisorIdWithAppendLock.isPresent()) {
return;
}

Expand All @@ -153,7 +154,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t

upgradedPendingSegments.forEach(
(oldId, newId) -> toolbox.getSupervisorManager()
.registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId)
.registerNewVersionOfPendingSegmentOnSupervisor(
activeSupervisorIdWithAppendLock.get(),
oldId,
newId
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;

Expand Down Expand Up @@ -71,15 +75,34 @@ public Set<String> getSupervisorIds()
return supervisors.keySet();
}

public Optional<String> getActiveSupervisorIdForDatasource(String datasource)
/**
* @param datasource Datasource to find active supervisor id with append lock for.
* @return An optional with the active appending supervisor id if it exists.
*/
public Optional<String> getActiveSupervisorIdForDatasourceWithAppendLock(String datasource)
{
for (Map.Entry<String, Pair<Supervisor, SupervisorSpec>> entry : supervisors.entrySet()) {
final String supervisorId = entry.getKey();
final Supervisor supervisor = entry.getValue().lhs;
final SupervisorSpec supervisorSpec = entry.getValue().rhs;

TaskLockType taskLockType = null;
if (supervisorSpec instanceof SeekableStreamSupervisorSpec) {
SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec;
Map<String, Object> context = seekableStreamSupervisorSpec.getContext();
if (context != null) {
taskLockType = QueryContexts.getAsEnum(
Tasks.TASK_LOCK_TYPE,
context.get(Tasks.TASK_LOCK_TYPE),
TaskLockType.class
);
}
}

if (supervisor instanceof SeekableStreamSupervisor
&& !supervisorSpec.isSuspended()
&& supervisorSpec.getDataSources().contains(datasource)) {
&& supervisorSpec.getDataSources().contains(datasource)
&& TaskLockType.APPEND.equals(taskLockType)) {
return Optional.of(supervisorId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.easymock.Capture;
Expand Down Expand Up @@ -434,6 +438,69 @@ public void testCreateSuspendResumeAndStopSupervisor()
Assert.assertTrue(manager.getSupervisorIds().isEmpty());
}

@Test
public void testGetActiveSupervisorIdForDatasourceWithAppendLock()
{
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap());

NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS"));
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

SeekableStreamSupervisorSpec suspendedSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes();
EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes();
EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes();
EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes();
EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(suspendedSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

SeekableStreamSupervisorSpec activeSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes();
EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes();
EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes();
EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes();
EasyMock.replay(activeSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes();
EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes();
EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes();
EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes();
EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of(
Tasks.TASK_LOCK_TYPE,
TaskLockType.APPEND.name()
)).anyTimes();
EasyMock.replay(activeAppendSpec);
metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject());

replayAll();
manager.start();

Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent());

manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent());

manager.createOrUpdateAndStartSupervisor(suspendedSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent());

manager.createOrUpdateAndStartSupervisor(activeSpec);
Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent());

manager.createOrUpdateAndStartSupervisor(activeAppendSpec);
Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent());

verifyAll();
}

private static class TestSupervisorSpec implements SupervisorSpec
{
Expand Down

0 comments on commit 33fdd77

Please sign in to comment.