From 364e5241547e80bb43d0b9bcdd8497632786d5ad Mon Sep 17 00:00:00 2001 From: AmatyaAvadhanula Date: Sun, 22 Oct 2023 14:12:36 +0530 Subject: [PATCH] Consider only supervisors with append lock for concurrent transactional replace (#15220) A SegmentTransactionReplaceAction must only update the mapping of tasks with append locks that are running concurrently. To ensure this, we return the supervisor id only if it has the taskLockType as APPEND in its context. --- .../SegmentTransactionalReplaceAction.java | 11 ++- .../supervisor/SupervisorManager.java | 27 +++++++- .../supervisor/SupervisorManagerTest.java | 67 +++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index 5a2b3ceec8f6..e6ad0426e466 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -139,8 +139,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox toolbox) { final SupervisorManager supervisorManager = toolbox.getSupervisorManager(); - final Optional activeSupervisorId = supervisorManager.getActiveSupervisorIdForDatasource(task.getDataSource()); - if (!activeSupervisorId.isPresent()) { + final Optional activeSupervisorIdWithAppendLock = + supervisorManager.getActiveSupervisorIdForDatasourceWithAppendLock(task.getDataSource()); + if (!activeSupervisorIdWithAppendLock.isPresent()) { return; } @@ -153,7 +154,11 @@ private void tryUpgradeOverlappingPendingSegments(Task task, TaskActionToolbox t upgradedPendingSegments.forEach( (oldId, newId) -> toolbox.getSupervisorManager() - .registerNewVersionOfPendingSegmentOnSupervisor(activeSupervisorId.get(), oldId, newId) + .registerNewVersionOfPendingSegmentOnSupervisor( + activeSupervisorIdWithAppendLock.get(), + oldId, + newId + ) ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index d55f3cc8bd0c..df454c1011a8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -22,14 +22,18 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; +import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; @@ -71,15 +75,34 @@ public Set getSupervisorIds() return supervisors.keySet(); } - public Optional getActiveSupervisorIdForDatasource(String datasource) + /** + * @param datasource Datasource to find active supervisor id with append lock for. + * @return An optional with the active appending supervisor id if it exists. + */ + public Optional getActiveSupervisorIdForDatasourceWithAppendLock(String datasource) { for (Map.Entry> entry : supervisors.entrySet()) { final String supervisorId = entry.getKey(); final Supervisor supervisor = entry.getValue().lhs; final SupervisorSpec supervisorSpec = entry.getValue().rhs; + + TaskLockType taskLockType = null; + if (supervisorSpec instanceof SeekableStreamSupervisorSpec) { + SeekableStreamSupervisorSpec seekableStreamSupervisorSpec = (SeekableStreamSupervisorSpec) supervisorSpec; + Map context = seekableStreamSupervisorSpec.getContext(); + if (context != null) { + taskLockType = QueryContexts.getAsEnum( + Tasks.TASK_LOCK_TYPE, + context.get(Tasks.TASK_LOCK_TYPE), + TaskLockType.class + ); + } + } + if (supervisor instanceof SeekableStreamSupervisor && !supervisorSpec.isSuspended() - && supervisorSpec.getDataSources().contains(datasource)) { + && supervisorSpec.getDataSources().contains(datasource) + && TaskLockType.APPEND.equals(taskLockType)) { return Optional.of(supervisorId); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index ba4b963e9b31..e8c5d839cf19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -23,9 +23,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import org.apache.druid.indexing.common.TaskLockType; +import org.apache.druid.indexing.common.task.Tasks; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.metadata.MetadataSupervisorManager; import org.easymock.Capture; @@ -434,6 +438,69 @@ public void testCreateSuspendResumeAndStopSupervisor() Assert.assertTrue(manager.getSupervisorIds().isEmpty()); } + @Test + public void testGetActiveSupervisorIdForDatasourceWithAppendLock() + { + EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(Collections.emptyMap()); + + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec("noop", ImmutableList.of("noopDS")); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec suspendedSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor suspendedSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(suspendedSpec.getId()).andReturn("suspendedSpec").anyTimes(); + EasyMock.expect(suspendedSpec.isSuspended()).andReturn(true).anyTimes(); + EasyMock.expect(suspendedSpec.getDataSources()).andReturn(ImmutableList.of("suspendedDS")).anyTimes(); + EasyMock.expect(suspendedSpec.createSupervisor()).andReturn(suspendedSupervisor).anyTimes(); + EasyMock.expect(suspendedSpec.createAutoscaler(suspendedSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(suspendedSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(suspendedSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeSpec.getId()).andReturn("activeSpec").anyTimes(); + EasyMock.expect(activeSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeSpec.getDataSources()).andReturn(ImmutableList.of("activeDS")).anyTimes(); + EasyMock.expect(activeSpec.createSupervisor()).andReturn(activeSupervisor).anyTimes(); + EasyMock.expect(activeSpec.createAutoscaler(activeSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeSpec.getContext()).andReturn(null).anyTimes(); + EasyMock.replay(activeSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + SeekableStreamSupervisorSpec activeAppendSpec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + Supervisor activeAppendSupervisor = EasyMock.mock(SeekableStreamSupervisor.class); + EasyMock.expect(activeAppendSpec.getId()).andReturn("activeAppendSpec").anyTimes(); + EasyMock.expect(activeAppendSpec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(activeAppendSpec.getDataSources()).andReturn(ImmutableList.of("activeAppendDS")).anyTimes(); + EasyMock.expect(activeAppendSpec.createSupervisor()).andReturn(activeAppendSupervisor).anyTimes(); + EasyMock.expect(activeAppendSpec.createAutoscaler(activeAppendSupervisor)).andReturn(null).anyTimes(); + EasyMock.expect(activeAppendSpec.getContext()).andReturn(ImmutableMap.of( + Tasks.TASK_LOCK_TYPE, + TaskLockType.APPEND.name() + )).anyTimes(); + EasyMock.replay(activeAppendSpec); + metadataSupervisorManager.insert(EasyMock.anyString(), EasyMock.anyObject()); + + replayAll(); + manager.start(); + + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("nonExistent").isPresent()); + + manager.createOrUpdateAndStartSupervisor(noopSupervisorSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("noopDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(suspendedSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("suspendedDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeSpec); + Assert.assertFalse(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeDS").isPresent()); + + manager.createOrUpdateAndStartSupervisor(activeAppendSpec); + Assert.assertTrue(manager.getActiveSupervisorIdForDatasourceWithAppendLock("activeAppendDS").isPresent()); + + verifyAll(); + } private static class TestSupervisorSpec implements SupervisorSpec {