diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 5a2b3ceec8f6..e6ad0426e466 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -139,8 +139,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource()); - if (!activeSupervisorId.isPresent()) { + final Optional activeSupervisorIdWithAppendLock = + supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } @@ -153,7 +154,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t upgradedPendingSegments.forEach( (oldId, newId) -> toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId) + .registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + oldId, + newId + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index d55f3cc8bd0c..df454c1011a8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -22,14 +22,18 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -71,15 +75,34 @@ public Set getSupervisorIds() return supervisors.keySet(); } - public Optional getActiveSupervisorIdForDatasource(String datasource) + /** + * @param datasource Datasource to find active supervisor id with append lock for. + * @return An optional with the active appending supervisor id if it exists. + */ + public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String datasource) { for (Map.Entry> entry : supervisors.entrySet()) { final String supervisorId = entry.getKey(); final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; + + TaskLockType taskLockType = null; + if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { + SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; + Map context = seekableStreamSupervisorSpec.getContext(); + if (context != null) { + taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + } + } + if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() - && supervisorSpec.getDataSources().contains(datasource)) { + && supervisorSpec.getDataSources().contains(datasource) + && TaskLockType.APPEND.equals(taskLockType)) { return Optional.of(supervisorId); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index ba4b963e9b31..e8c5d839cf19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -23,9 +23,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; import org.easymock.Capture; @@ -434,6 +438,69 @@ public void testCreateSuspendResumeAndStopSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testGetActiveSupervisorIdForDatasourceWithAppendLock() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec suspendedSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes(); + EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes(); + EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes(); + EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes(); + EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(suspendedSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes(); + EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes(); + EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes(); + EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(activeSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); + EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes(); + EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes(); + EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name() + )).anyTimes(); + EasyMock.replay(activeAppendSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + replayAll(); + manager.start(); + + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent()); + + manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(suspendedSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeAppendSpec); + Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent()); + + verifyAll(); + } private static class TestSupervisorSpec implements SupervisorSpec {