From 409bffe7f2892ac192c99ff329f6db3dc9f4b1ae Mon Sep 17 00:00:00 2001 From: Kashif Faraz Date: Thu, 21 Sep 2023 16:19:03 +0530 Subject: [PATCH] Rename IMSC.announceHistoricalSegments to commitSegments (#15021) This commit pulls out some changes from #14407 to simplify that PR. Changes: - Rename `IndexerMetadataStorageCoordinator.announceHistoricalSegments` to `commitSegments` - Rename the overloaded method to `commitSegmentsAndMetadata` - Fix some typos --- .../MaterializedViewSupervisorTest.java | 6 +- .../DatasourceOptimizerTest.java | 4 +- .../sqlserver/SQLServerConnectorTest.java | 16 +-- .../storage/mysql/MySQLConnectorTest.java | 2 +- .../postgresql/PostgreSQLConnectorTest.java | 20 +--- .../common/actions/SegmentInsertAction.java | 2 +- .../actions/SegmentMetadataUpdateAction.java | 2 +- .../common/actions/SegmentNukeAction.java | 2 +- .../SegmentTransactionalInsertAction.java | 4 +- .../overlord/LockRequestForNewSegment.java | 16 +-- .../druid/indexing/overlord/TaskLockbox.java | 6 +- .../actions/RetrieveSegmentsActionsTest.java | 4 +- .../actions/SegmentAllocateActionTest.java | 12 +- .../actions/SegmentInsertActionTest.java | 4 +- ...penderatorDriverRealtimeIndexTaskTest.java | 8 +- .../task/KillUnusedSegmentsTaskTest.java | 8 +- .../indexing/overlord/TaskLifecycleTest.java | 4 +- .../overlord/TaskLockBoxConcurrencyTest.java | 9 +- .../indexing/overlord/TaskLockboxTest.java | 10 +- ...TestIndexerMetadataStorageCoordinator.java | 10 +- .../indexing/overlord/DataSourceMetadata.java | 4 +- .../IndexerMetadataStorageCoordinator.java | 8 +- .../IndexerSQLMetadataStorageCoordinator.java | 8 +- ...exerSQLMetadataStorageCoordinatorTest.java | 108 +++++++++--------- 24 files changed, 125 insertions(+), 152 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index e49b160c1e99..1e74180ae69f 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -192,8 +192,8 @@ public void testCheckSegments() throws IOException 1024 ) ); - indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments); - indexerMetadataStorageCoordinator.announceHistoricalSegments(derivativeSegments); + indexerMetadataStorageCoordinator.commitSegments(baseSegments); + indexerMetadataStorageCoordinator.commitSegments(derivativeSegments); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); @@ -252,7 +252,7 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException 1024 ) ); - indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments); + indexerMetadataStorageCoordinator.commitSegments(baseSegments); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); diff --git a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java index 520aa3193b63..673c2531999c 100644 --- a/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java +++ b/extensions-contrib/materialized-view-selection/src/test/java/org/apache/druid/query/materializedview/DatasourceOptimizerTest.java @@ -160,7 +160,7 @@ public void testOptimize() throws InterruptedException 1024 * 1024 ); try { - metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment)); + metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment)); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); } catch (IOException e) { @@ -185,7 +185,7 @@ public void testOptimize() throws InterruptedException 1024 ); try { - metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment)); + metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment)); announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper); } catch (IOException e) { diff --git a/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java b/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java index a31159e88134..ab3d9c37fe23 100644 --- a/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java +++ b/extensions-contrib/sqlserver-metadata-storage/src/test/java/org/apache/druid/metadata/storage/sqlserver/SQLServerConnectorTest.java @@ -37,19 +37,7 @@ public void testIsTransientException() SQLServerConnector connector = new SQLServerConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance( - new MetadataStorageTablesConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ) + MetadataStorageTablesConfig.fromBase(null) ) ); @@ -70,7 +58,7 @@ public void testLimitClause() SQLServerConnector connector = new SQLServerConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), Suppliers.ofInstance( - new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null) + MetadataStorageTablesConfig.fromBase(null) ) ); Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100)); diff --git a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java index 514d01b6a59a..b60168d5f429 100644 --- a/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java +++ b/extensions-core/mysql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/mysql/MySQLConnectorTest.java @@ -45,7 +45,7 @@ public String getDriverClassName() private static final Supplier CONNECTOR_CONFIG_SUPPLIER = MetadataStorageConnectorConfig::new; private static final Supplier TABLES_CONFIG_SUPPLIER = - () -> new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null); + () -> MetadataStorageTablesConfig.fromBase(null); @Test diff --git a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java index 08f3c333a1fb..3b6c9aace521 100644 --- a/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java +++ b/extensions-core/postgresql-metadata-storage/src/test/java/org/apache/druid/metadata/storage/postgresql/PostgreSQLConnectorTest.java @@ -35,21 +35,7 @@ public void testIsTransientException() { PostgreSQLConnector connector = new PostgreSQLConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), - Suppliers.ofInstance( - new MetadataStorageTablesConfig( - null, - null, - null, - null, - null, - null, - null, - null, - null, - null, - null - ) - ), + Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)), new PostgreSQLConnectorConfig(), new PostgreSQLTablesConfig() ); @@ -68,9 +54,7 @@ public void testLimitClause() { PostgreSQLConnector connector = new PostgreSQLConnector( Suppliers.ofInstance(new MetadataStorageConnectorConfig()), - Suppliers.ofInstance( - new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null) - ), + Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)), new PostgreSQLConnectorConfig(), new PostgreSQLTablesConfig() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java index b8f27389aaca..be318f0b70dd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java @@ -65,7 +65,7 @@ public TypeReference> getReturnTypeReference() /** * Behaves similarly to - * {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#announceHistoricalSegments}, + * {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments}, * with startMetadata and endMetadata both null. */ @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java index 0ca2bbae4809..9c90cf0efaa2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -69,7 +69,7 @@ public Void perform(Task task, TaskActionToolbox toolbox) try { toolbox.getTaskLockbox().doInCriticalSection( task, - segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() .onValidLocks( () -> { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java index 53651cdb523f..ec7218eae59e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java @@ -71,7 +71,7 @@ public Void perform(Task task, TaskActionToolbox toolbox) try { toolbox.getTaskLockbox().doInCriticalSection( task, - segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() .onValidLocks( () -> { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index 99dc94a9828c..9b23db71d464 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -200,10 +200,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) try { retVal = toolbox.getTaskLockbox().doInCriticalSection( task, - allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()), + allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()), CriticalAction.builder() .onValidLocks( - () -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments( + () -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata( segments, startMetadata, endMetadata diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockRequestForNewSegment.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockRequestForNewSegment.java index 2171d6b38680..a8aef9cb1f79 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockRequestForNewSegment.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/LockRequestForNewSegment.java @@ -41,7 +41,7 @@ public class LockRequestForNewSegment implements LockRequest private final int priority; private final String sequenceName; @Nullable - private final String previsousSegmentId; + private final String previousSegmentId; private final boolean skipSegmentLineageCheck; private String version; @@ -55,7 +55,7 @@ public LockRequestForNewSegment( PartialShardSpec partialShardSpec, int priority, String sequenceName, - @Nullable String previsousSegmentId, + @Nullable String previousSegmentId, boolean skipSegmentLineageCheck ) { @@ -67,7 +67,7 @@ public LockRequestForNewSegment( this.partialShardSpec = partialShardSpec; this.priority = priority; this.sequenceName = sequenceName; - this.previsousSegmentId = previsousSegmentId; + this.previousSegmentId = previousSegmentId; this.skipSegmentLineageCheck = skipSegmentLineageCheck; } @@ -79,7 +79,7 @@ public LockRequestForNewSegment( Interval interval, PartialShardSpec partialShardSpec, String sequenceName, - @Nullable String previsousSegmentId, + @Nullable String previousSegmentId, boolean skipSegmentLineageCheck ) { @@ -92,7 +92,7 @@ public LockRequestForNewSegment( partialShardSpec, task.getPriority(), sequenceName, - previsousSegmentId, + previousSegmentId, skipSegmentLineageCheck ); } @@ -168,9 +168,9 @@ public String getSequenceName() } @Nullable - public String getPrevisousSegmentId() + public String getPreviousSegmentId() { - return previsousSegmentId; + return previousSegmentId; } public boolean isSkipSegmentLineageCheck() @@ -190,7 +190,7 @@ public String toString() ", partialShardSpec=" + partialShardSpec + ", priority=" + priority + ", sequenceName='" + sequenceName + '\'' + - ", previsousSegmentId='" + previsousSegmentId + '\'' + + ", previousSegmentId='" + previousSegmentId + '\'' + ", skipSegmentLineageCheck=" + skipSegmentLineageCheck + '}'; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index a89816888887..2251c8a063dd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -754,7 +754,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques return metadataStorageCoordinator.allocatePendingSegment( request.getDataSource(), request.getSequenceName(), - request.getPrevisousSegmentId(), + request.getPreviousSegmentId(), request.getInterval(), request.getPartialShardSpec(), version, @@ -773,7 +773,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques * @param intervals intervals * @param action action to be performed inside of the critical section */ - public T doInCriticalSection(Task task, List intervals, CriticalAction action) throws Exception + public T doInCriticalSection(Task task, Set intervals, CriticalAction action) throws Exception { giant.lock(); @@ -790,7 +790,7 @@ public T doInCriticalSection(Task task, List intervals, CriticalAc * It doesn't check other semantics like acquired locks are enough to overwrite existing segments. * This kind of semantic should be checked in each caller of {@link #doInCriticalSection}. */ - private boolean isTaskLocksValid(Task task, List intervals) + private boolean isTaskLocksValid(Task task, Set intervals) { giant.lock(); try { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java index 2d360dfeb521..1b224b798567 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsActionsTest.java @@ -60,7 +60,7 @@ public void setup() throws IOException expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1")); actionTestKit.getMetadataStorageCoordinator() - .announceHistoricalSegments(expectedUnusedSegments); + .commitSegments(expectedUnusedSegments); expectedUnusedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval())); @@ -70,7 +70,7 @@ public void setup() throws IOException expectedUsedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "2")); actionTestKit.getMetadataStorageCoordinator() - .announceHistoricalSegments(expectedUsedSegments); + .commitSegments(expectedUsedSegments); expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval())); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index b498834fbaba..13c499e47e2a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -574,7 +574,7 @@ public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception { final Task task = NoopTask.create(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -639,7 +639,7 @@ public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exceptio { final Task task = NoopTask.create(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -702,7 +702,7 @@ public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() thr { final Task task = NoopTask.create(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -741,7 +741,7 @@ public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throw { final Task task = NoopTask.create(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -780,7 +780,7 @@ public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity { final Task task = NoopTask.create(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) @@ -825,7 +825,7 @@ public void testWithPartialShardSpecAndOvershadowingSegments() throws IOExceptio final ObjectMapper objectMapper = new DefaultObjectMapper(); - taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments( + taskActionTestKit.getMetadataStorageCoordinator().commitSegments( ImmutableSet.of( DataSegment.builder() .dataSource(DATA_SOURCE) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java index c10df89ad986..8ac5c6b517f9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java @@ -107,7 +107,7 @@ public void testSimple() throws Exception acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); actionTestKit.getTaskLockbox().doInCriticalSection( task, - Collections.singletonList(INTERVAL), + Collections.singleton(INTERVAL), CriticalAction.builder() .onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox())) .onInvalidLocks( @@ -137,7 +137,7 @@ public void testFailBadVersion() throws Exception thrown.expectMessage(CoreMatchers.containsString("are not covered by locks")); final Set segments = actionTestKit.getTaskLockbox().doInCriticalSection( task, - Collections.singletonList(INTERVAL), + Collections.singleton(INTERVAL), CriticalAction.>builder() .onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox())) .onInvalidLocks( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index f403eeee415a..5e088724f899 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1507,9 +1507,9 @@ private void makeToolboxFactory(final File directory) ) { @Override - public Set announceHistoricalSegments(Set segments) throws IOException + public Set commitSegments(Set segments) throws IOException { - Set result = super.announceHistoricalSegments(segments); + Set result = super.commitSegments(segments); Assert.assertFalse( "Segment latch not initialized, did you forget to call expectPublishSegments?", @@ -1523,13 +1523,13 @@ public Set announceHistoricalSegments(Set segments) th } @Override - public SegmentPublishResult announceHistoricalSegments( + public SegmentPublishResult commitSegmentsAndMetadata( Set segments, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata ) throws IOException { - SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); + SegmentPublishResult result = super.commitSegmentsAndMetadata(segments, startMetadata, endMetadata); Assert.assertNotNull( "Segment latch not initialized, did you forget to call expectPublishSegments?", diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java index 3a89c19a785a..f320df824a1e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/KillUnusedSegmentsTaskTest.java @@ -59,7 +59,7 @@ public void testKill() throws Exception newSegment(Intervals.of("2019-03-01/2019-04-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); @@ -114,7 +114,7 @@ public void testKillWithMarkUnused() throws Exception newSegment(Intervals.of("2019-03-01/2019-04-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); @@ -178,7 +178,7 @@ public void testKillBatchSizeOneAndLimit4() throws Exception newSegment(Intervals.of("2019-03-01/2019-04-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); @@ -223,7 +223,7 @@ public void testKillBatchSizeThree() throws Exception newSegment(Intervals.of("2019-03-01/2019-04-01"), version), newSegment(Intervals.of("2019-04-01/2019-05-01"), version) ); - final Set announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments); + final Set announced = getMetadataStorageCoordinator().commitSegments(segments); Assert.assertEquals(segments, announced); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 185e1468d399..69f0039f615a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -562,9 +562,9 @@ private TestIndexerMetadataStorageCoordinator setUpMetadataStorageCoordinator() return new TestIndexerMetadataStorageCoordinator() { @Override - public Set announceHistoricalSegments(Set segments) + public Set commitSegments(Set segments) { - Set retVal = super.announceHistoricalSegments(segments); + Set retVal = super.commitSegments(segments); if (publishCountDown != null) { publishCountDown.countDown(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java index 5a75e205f9d1..8e809169b30d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockBoxConcurrencyTest.java @@ -42,6 +42,7 @@ import org.junit.Test; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -120,7 +121,7 @@ public void testDoInCriticalSectionWithDifferentTasks() return lockbox.doInCriticalSection( lowPriorityTask, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder() .onValidLocks( () -> { @@ -150,7 +151,7 @@ public void testDoInCriticalSectionWithDifferentTasks() return lockbox.doInCriticalSection( highPriorityTask, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder() .onValidLocks( () -> { @@ -200,7 +201,7 @@ public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception final Future future1 = service.submit(() -> lockbox.doInCriticalSection( task, - intervals.subList(0, 2), + new HashSet<>(intervals.subList(0, 2)), CriticalAction.builder() .onValidLocks( () -> { @@ -223,7 +224,7 @@ public void testDoInCriticalSectionWithOverlappedIntervals() throws Exception latch.await(); return lockbox.doInCriticalSection( task, - intervals.subList(1, 3), + new HashSet<>(intervals.subList(1, 3)), CriticalAction.builder() .onValidLocks( () -> { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java index ceb1657f68eb..4c761a6f71b6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java @@ -528,7 +528,7 @@ public void testDoInCriticalSectionWithSharedLock() throws Exception Assert.assertTrue( lockbox.doInCriticalSection( task, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); @@ -546,7 +546,7 @@ public void testDoInCriticalSectionWithExclusiveLock() throws Exception Assert.assertTrue( lockbox.doInCriticalSection( task, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); @@ -565,7 +565,7 @@ public void testDoInCriticalSectionWithSmallerInterval() throws Exception Assert.assertTrue( lockbox.doInCriticalSection( task, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); @@ -591,7 +591,7 @@ public void testPreemptionAndDoInCriticalSection() throws Exception Assert.assertTrue( lockbox.doInCriticalSection( highPriorityTask, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); @@ -616,7 +616,7 @@ public void testDoInCriticalSectionWithRevokedLock() throws Exception Assert.assertFalse( lockbox.doInCriticalSection( lowPriorityTask, - Collections.singletonList(interval), + Collections.singleton(interval), CriticalAction.builder().onValidLocks(() -> true).onInvalidLocks(() -> false).build() ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index 1db186e5a485..0c14d52980d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -134,7 +134,7 @@ public int markSegmentsAsUnusedWithinInterval(String dataSource, Interval interv } @Override - public Set announceHistoricalSegments(Set segments) + public Set commitSegments(Set segments) { Set added = new HashSet<>(); for (final DataSegment segment : segments) { @@ -157,14 +157,14 @@ public Map allocatePendingSegments } @Override - public SegmentPublishResult announceHistoricalSegments( + public SegmentPublishResult commitSegmentsAndMetadata( Set segments, - DataSourceMetadata oldCommitMetadata, - DataSourceMetadata newCommitMetadata + @Nullable DataSourceMetadata startMetadata, + @Nullable DataSourceMetadata endMetadata ) { // Don't actually compare metadata, just do it! - return SegmentPublishResult.ok(announceHistoricalSegments(segments)); + return SegmentPublishResult.ok(commitSegments(segments)); } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java index 23579a0c8a4c..1e5b3d1d5db8 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/DataSourceMetadata.java @@ -26,7 +26,7 @@ /** * Commit metadata for a dataSource. Used by - * {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments(Set, DataSourceMetadata, DataSourceMetadata)} + * {@link IndexerMetadataStorageCoordinator#commitSegmentsAndMetadata(Set, DataSourceMetadata, DataSourceMetadata)} * to provide metadata transactions for segment inserts. * * Two metadata instances can be added together, and any conflicts are resolved in favor of the right-hand side. @@ -45,7 +45,7 @@ public interface DataSourceMetadata boolean isValidStart(); /** - * As in {@link IndexerMetadataStorageCoordinator#announceHistoricalSegments}, this class can represent start and + * As in {@link IndexerMetadataStorageCoordinator#commitSegments}, this class can represent start and * end of a sequence. * * This method converts itself into the one for start of a sequence. Most implementations can simply return diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index fb65b8106723..b8715eb53caa 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -166,7 +166,7 @@ List retrieveUnusedSegmentsForInterval( * * @return set of segments actually added */ - Set announceHistoricalSegments(Set segments) throws IOException; + Set commitSegments(Set segments) throws IOException; /** * Allocates pending segments for the given requests in the pending segments table. @@ -271,7 +271,7 @@ SegmentIdWithShardSpec allocatePendingSegment( * @throws IllegalArgumentException if startMetadata and endMetadata are not either both null or both non-null * @throws RuntimeException if the state of metadata storage after this call is unknown */ - SegmentPublishResult announceHistoricalSegments( + SegmentPublishResult commitSegmentsAndMetadata( Set segments, @Nullable DataSourceMetadata startMetadata, @Nullable DataSourceMetadata endMetadata @@ -321,11 +321,11 @@ SegmentPublishResult announceHistoricalSegments( int removeDataSourceMetadataOlderThan(long timestamp, @NotNull Set excludeDatasources); /** - * Similar to {@link #announceHistoricalSegments(Set)}, but meant for streaming ingestion tasks for handling + * Similar to {@link #commitSegments}, but meant for streaming ingestion tasks for handling * the case where the task ingested no records and created no segments, but still needs to update the metadata * with the progress that the task made. * - * The metadata should undergo the same validation checks as performed by {@link #announceHistoricalSegments}. + * The metadata should undergo the same validation checks as performed by {@link #commitSegments}. * * * @param dataSource the datasource diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index ee1cff6476ef..9ea612a69c61 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -301,9 +301,9 @@ private Collection retrieveAllUsedSegmentsForIntervalsWithHandle( } @Override - public Set announceHistoricalSegments(final Set segments) throws IOException + public Set commitSegments(final Set segments) throws IOException { - final SegmentPublishResult result = announceHistoricalSegments(segments, null, null); + final SegmentPublishResult result = commitSegmentsAndMetadata(segments, null, null); // Metadata transaction cannot fail because we are not trying to do one. if (!result.isSuccess()) { @@ -314,7 +314,7 @@ public Set announceHistoricalSegments(final Set segmen } @Override - public SegmentPublishResult announceHistoricalSegments( + public SegmentPublishResult commitSegmentsAndMetadata( final Set segments, @Nullable final DataSourceMetadata startMetadata, @Nullable final DataSourceMetadata endMetadata @@ -1512,7 +1512,7 @@ private Set segmentExistsBatch(final Handle handle, final Set dataSegments) @Test public void testSimpleAnnounce() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); for (DataSegment segment : SEGMENTS) { Assert.assertArrayEquals( mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), @@ -496,7 +496,7 @@ public void testAnnounceHistoricalSegments() throws IOException ); } - coordinator.announceHistoricalSegments(segments); + coordinator.commitSegments(segments); for (DataSegment segment : segments) { Assert.assertArrayEquals( mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), @@ -525,7 +525,7 @@ public void testOvershadowingAnnounce() throws IOException { final ImmutableSet segments = ImmutableSet.of(defaultSegment, defaultSegment2, defaultSegment4); - coordinator.announceHistoricalSegments(segments); + coordinator.commitSegments(segments); for (DataSegment segment : segments) { Assert.assertArrayEquals( @@ -546,7 +546,7 @@ public void testOvershadowingAnnounce() throws IOException public void testTransactionalAnnounceSuccess() throws IOException { // Insert first segment. - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result1 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -564,7 +564,7 @@ public void testTransactionalAnnounceSuccess() throws IOException ); // Insert second segment. - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result2 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment2), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) @@ -620,7 +620,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( }; // Insert first segment. - final SegmentPublishResult result1 = failOnceCoordinator.announceHistoricalSegments( + final SegmentPublishResult result1 = failOnceCoordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -641,7 +641,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( attemptCounter.set(0); // Insert second segment. - final SegmentPublishResult result2 = failOnceCoordinator.announceHistoricalSegments( + final SegmentPublishResult result2 = failOnceCoordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment2), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) @@ -671,7 +671,7 @@ protected DataStoreMetadataUpdateResult updateDataSourceMetadataWithHandle( @Test public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result1 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(ImmutableMap.of("foo", "bar")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) @@ -688,14 +688,14 @@ public void testTransactionalAnnounceFailDbNullWantNotNull() throws IOException @Test public void testTransactionalAnnounceFailDbNotNullWantNull() throws IOException { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result1 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result2 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment2), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) @@ -727,14 +727,14 @@ public void testRetrieveSegmentForId() @Test public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOException { - final SegmentPublishResult result1 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result1 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "baz")) ); Assert.assertEquals(SegmentPublishResult.ok(ImmutableSet.of(defaultSegment)), result1); - final SegmentPublishResult result2 = coordinator.announceHistoricalSegments( + final SegmentPublishResult result2 = coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment2), new ObjectMetadata(ImmutableMap.of("foo", "qux")), new ObjectMetadata(ImmutableMap.of("foo", "baz")) @@ -751,7 +751,7 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws IOExcep @Test public void testSimpleUsedList() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertEquals( SEGMENTS, ImmutableSet.copyOf( @@ -767,8 +767,8 @@ public void testSimpleUsedList() throws IOException @Test public void testMultiIntervalUsedList() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); - coordinator.announceHistoricalSegments(ImmutableSet.of(defaultSegment3)); + coordinator.commitSegments(SEGMENTS); + coordinator.commitSegments(ImmutableSet.of(defaultSegment3)); Assertions.assertThat( coordinator.retrieveUsedSegmentsForIntervals( @@ -810,7 +810,7 @@ public void testMultiIntervalUsedList() throws IOException @Test public void testSimpleUnusedList() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertEquals( SEGMENTS, @@ -826,7 +826,7 @@ public void testSimpleUnusedList() throws IOException @Test public void testSimpleUnusedListWithLimit() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); int limit = SEGMENTS.size() - 1; Set retreivedUnusedSegments = ImmutableSet.copyOf( @@ -843,7 +843,7 @@ public void testSimpleUnusedListWithLimit() throws IOException @Test public void testUsedOverlapLow() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Set actualSegments = ImmutableSet.copyOf( coordinator.retrieveUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -861,7 +861,7 @@ public void testUsedOverlapLow() throws IOException @Test public void testUsedOverlapHigh() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertEquals( SEGMENTS, ImmutableSet.copyOf( @@ -877,7 +877,7 @@ public void testUsedOverlapHigh() throws IOException @Test public void testUsedOutOfBoundsLow() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertTrue( coordinator.retrieveUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -891,7 +891,7 @@ public void testUsedOutOfBoundsLow() throws IOException @Test public void testUsedOutOfBoundsHigh() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertTrue( coordinator.retrieveUsedSegmentsForInterval( defaultSegment.getDataSource(), @@ -904,7 +904,7 @@ public void testUsedOutOfBoundsHigh() throws IOException @Test public void testUsedWithinBoundsEnd() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertEquals( SEGMENTS, ImmutableSet.copyOf( @@ -920,7 +920,7 @@ public void testUsedWithinBoundsEnd() throws IOException @Test public void testUsedOverlapEnd() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); Assert.assertEquals( SEGMENTS, ImmutableSet.copyOf( @@ -937,7 +937,7 @@ public void testUsedOverlapEnd() throws IOException @Test public void testUnusedOverlapLow() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( @@ -953,7 +953,7 @@ public void testUnusedOverlapLow() throws IOException @Test public void testUnusedUnderlapLow() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( @@ -967,7 +967,7 @@ public void testUnusedUnderlapLow() throws IOException @Test public void testUnusedUnderlapHigh() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( @@ -980,7 +980,7 @@ public void testUnusedUnderlapHigh() throws IOException @Test public void testUnusedOverlapHigh() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertTrue( coordinator.retrieveUnusedSegmentsForInterval( @@ -993,7 +993,7 @@ public void testUnusedOverlapHigh() throws IOException @Test public void testUnusedBigOverlap() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertEquals( SEGMENTS, @@ -1009,7 +1009,7 @@ public void testUnusedBigOverlap() throws IOException @Test public void testUnusedLowRange() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertEquals( SEGMENTS, @@ -1034,7 +1034,7 @@ public void testUnusedLowRange() throws IOException @Test public void testUnusedHighRange() throws IOException { - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); markAllSegmentsUnused(); Assert.assertEquals( SEGMENTS, @@ -1059,7 +1059,7 @@ public void testUnusedHighRange() throws IOException @Test public void testUsedHugeTimeRangeEternityFilter() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( hugeTimeRangeSegment1, hugeTimeRangeSegment2, @@ -1082,7 +1082,7 @@ public void testUsedHugeTimeRangeEternityFilter() throws IOException @Test public void testUsedHugeTimeRangeTrickyFilter1() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( hugeTimeRangeSegment1, hugeTimeRangeSegment2, @@ -1105,7 +1105,7 @@ public void testUsedHugeTimeRangeTrickyFilter1() throws IOException @Test public void testUsedHugeTimeRangeTrickyFilter2() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( hugeTimeRangeSegment1, hugeTimeRangeSegment2, @@ -1129,7 +1129,7 @@ public void testUsedHugeTimeRangeTrickyFilter2() throws IOException @Test public void testEternitySegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( eternitySegment ) @@ -1150,7 +1150,7 @@ public void testEternitySegmentWithStringComparison() throws IOException @Test public void testEternityMultipleSegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( numberedSegment0of0, eternitySegment @@ -1172,7 +1172,7 @@ public void testEternityMultipleSegmentWithStringComparison() throws IOException @Test public void testFirstHalfEternitySegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( firstHalfEternityRangeSegment ) @@ -1193,7 +1193,7 @@ public void testFirstHalfEternitySegmentWithStringComparison() throws IOExceptio @Test public void testFirstHalfEternityMultipleSegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( numberedSegment0of0, firstHalfEternityRangeSegment @@ -1215,7 +1215,7 @@ public void testFirstHalfEternityMultipleSegmentWithStringComparison() throws IO @Test public void testSecondHalfEternitySegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( secondHalfEternityRangeSegment ) @@ -1238,7 +1238,7 @@ public void testSecondHalfEternitySegmentWithStringComparison() throws IOExcepti @Test public void testLargeIntervalWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( hugeTimeRangeSegment4 ) @@ -1259,7 +1259,7 @@ public void testLargeIntervalWithStringComparison() throws IOException @Test public void testSecondHalfEternityMultipleSegmentWithStringComparison() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegments( ImmutableSet.of( numberedSegment0of0, secondHalfEternityRangeSegment @@ -1281,7 +1281,7 @@ public void testSecondHalfEternityMultipleSegmentWithStringComparison() throws I @Test public void testDeleteDataSourceMetadata() throws IOException { - coordinator.announceHistoricalSegments( + coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -1302,7 +1302,7 @@ public void testDeleteDataSourceMetadata() throws IOException public void testDeleteSegmentsInMetaDataStorage() throws IOException { // Published segments to MetaDataStorage - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); // check segments Published Assert.assertEquals( @@ -1335,7 +1335,7 @@ public void testDeleteSegmentsInMetaDataStorage() throws IOException public void testUpdateSegmentsInMetaDataStorage() throws IOException { // Published segments to MetaDataStorage - coordinator.announceHistoricalSegments(SEGMENTS); + coordinator.commitSegments(SEGMENTS); // check segments Published Assert.assertEquals( @@ -1397,7 +1397,7 @@ public void testMultipleAdditionalNumberedShardsWithOneCorePartition() throws IO private void additionalNumberedShardTest(Set segments) throws IOException { - coordinator.announceHistoricalSegments(segments); + coordinator.commitSegments(segments); for (DataSegment segment : segments) { Assert.assertArrayEquals( @@ -1989,7 +1989,7 @@ public void testAllocatePendingSegmentsWithOvershadowingSegments() throws IOExce 10L ) ); - final Set announced = coordinator.announceHistoricalSegments(toBeAnnounced); + final Set announced = coordinator.commitSegments(toBeAnnounced); Assert.assertEquals(toBeAnnounced, announced); } @@ -2042,7 +2042,7 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO Assert.assertEquals(0, shardSpec.getNumCorePartitions()); Assert.assertEquals(5, shardSpec.getNumBuckets()); - coordinator.announceHistoricalSegments( + coordinator.commitSegments( Collections.singleton( new DataSegment( id.getDataSource(), @@ -2073,7 +2073,7 @@ public void testAllocatePendingSegmentsForHashBasedNumberedShardSpec() throws IO Assert.assertEquals(0, shardSpec.getNumCorePartitions()); Assert.assertEquals(5, shardSpec.getNumBuckets()); - coordinator.announceHistoricalSegments( + coordinator.commitSegments( Collections.singleton( new DataSegment( id.getDataSource(), @@ -2135,7 +2135,7 @@ public void testAddNumberedShardSpecAfterMultiDimensionsShardSpecWithUnknownCore ) ); } - coordinator.announceHistoricalSegments(originalSegments); + coordinator.commitSegments(originalSegments); final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( datasource, "seq", @@ -2180,7 +2180,7 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor ) ); } - coordinator.announceHistoricalSegments(originalSegments); + coordinator.commitSegments(originalSegments); final SegmentIdWithShardSpec id = coordinator.allocatePendingSegment( datasource, "seq", @@ -2196,7 +2196,7 @@ public void testAddNumberedShardSpecAfterSingleDimensionsShardSpecWithUnknownCor @Test public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDeleted() throws Exception { - coordinator.announceHistoricalSegments( + coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -2224,7 +2224,7 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceActiveShouldNotBeDele @Test public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThanTimeShouldBeDeleted() throws Exception { - coordinator.announceHistoricalSegments( + coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -2249,7 +2249,7 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveAndOlderThan public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderThanTimeShouldNotBeDeleted() throws Exception { - coordinator.announceHistoricalSegments( + coordinator.commitSegmentsAndMetadata( ImmutableSet.of(defaultSegment), new ObjectMetadata(null), new ObjectMetadata(ImmutableMap.of("foo", "bar")) @@ -2278,7 +2278,7 @@ public void testRemoveDataSourceMetadataOlderThanDatasourceNotActiveButNotOlderT @Test public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException { - coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + coordinator.commitSegments(ImmutableSet.of(existingSegment1, existingSegment2)); // interval covers existingSegment1 and partially overlaps existingSegment2, // only existingSegment1 will be dropped @@ -2310,7 +2310,7 @@ public void testMarkSegmentsAsUnusedWithinIntervalOneYear() throws IOException @Test public void testMarkSegmentsAsUnusedWithinIntervalTwoYears() throws IOException { - coordinator.announceHistoricalSegments(ImmutableSet.of(existingSegment1, existingSegment2)); + coordinator.commitSegments(ImmutableSet.of(existingSegment1, existingSegment2)); // interval covers existingSegment1 and partially overlaps existingSegment2, // only existingSegment1 will be dropped