Skip to content

Commit

Permalink
Rename IMSC.announceHistoricalSegments to commitSegments (#15021)
Browse files Browse the repository at this point in the history
This commit pulls out some changes from #14407 to simplify that PR.

Changes:
- Rename `IndexerMetadataStorageCoordinator.announceHistoricalSegments` to `commitSegments`
- Rename the overloaded method to `commitSegmentsAndMetadata`
- Fix some typos
  • Loading branch information
kfaraz authored Sep 21, 2023
1 parent e76962f commit 409bffe
Show file tree
Hide file tree
Showing 24 changed files with 125 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public void testCheckSegments() throws IOException
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
indexerMetadataStorageCoordinator.announceHistoricalSegments(derivativeSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(derivativeSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand Down Expand Up @@ -252,7 +252,7 @@ public void testCheckSegmentsAndSubmitTasks() throws IOException
1024
)
);
indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
indexerMetadataStorageCoordinator.commitSegments(baseSegments);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void testOptimize() throws InterruptedException
1024 * 1024
);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand All @@ -185,7 +185,7 @@ public void testOptimize() throws InterruptedException
1024
);
try {
metadataStorageCoordinator.announceHistoricalSegments(Sets.newHashSet(segment));
metadataStorageCoordinator.commitSegments(Sets.newHashSet(segment));
announceSegmentForServer(druidServer, segment, zkPathsConfig, jsonMapper);
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,7 @@ public void testIsTransientException()
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
MetadataStorageTablesConfig.fromBase(null)
)
);

Expand All @@ -70,7 +58,7 @@ public void testLimitClause()
SQLServerConnector connector = new SQLServerConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
MetadataStorageTablesConfig.fromBase(null)
)
);
Assert.assertEquals("FETCH NEXT 100 ROWS ONLY", connector.limitClause(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public String getDriverClassName()
private static final Supplier<MetadataStorageConnectorConfig> CONNECTOR_CONFIG_SUPPLIER =
MetadataStorageConnectorConfig::new;
private static final Supplier<MetadataStorageTablesConfig> TABLES_CONFIG_SUPPLIER =
() -> new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null);
() -> MetadataStorageTablesConfig.fromBase(null);


@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,7 @@ public void testIsTransientException()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
),
Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Expand All @@ -68,9 +54,7 @@ public void testLimitClause()
{
PostgreSQLConnector connector = new PostgreSQLConnector(
Suppliers.ofInstance(new MetadataStorageConnectorConfig()),
Suppliers.ofInstance(
new MetadataStorageTablesConfig(null, null, null, null, null, null, null, null, null, null, null)
),
Suppliers.ofInstance(MetadataStorageTablesConfig.fromBase(null)),
new PostgreSQLConnectorConfig(),
new PostgreSQLTablesConfig()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public TypeReference<Set<DataSegment>> getReturnTypeReference()

/**
* Behaves similarly to
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#announceHistoricalSegments},
* {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments},
* with startMetadata and endMetadata both null.
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Void perform(Task task, TaskActionToolbox toolbox)
try {
toolbox.getTaskLockbox().doInCriticalSection(
task,
segments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
segments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.builder()
.onValidLocks(
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,10 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
try {
retVal = toolbox.getTaskLockbox().doInCriticalSection(
task,
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toList()),
allSegments.stream().map(DataSegment::getInterval).collect(Collectors.toSet()),
CriticalAction.<SegmentPublishResult>builder()
.onValidLocks(
() -> toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
() -> toolbox.getIndexerMetadataStorageCoordinator().commitSegmentsAndMetadata(
segments,
startMetadata,
endMetadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LockRequestForNewSegment implements LockRequest
private final int priority;
private final String sequenceName;
@Nullable
private final String previsousSegmentId;
private final String previousSegmentId;
private final boolean skipSegmentLineageCheck;

private String version;
Expand All @@ -55,7 +55,7 @@ public LockRequestForNewSegment(
PartialShardSpec partialShardSpec,
int priority,
String sequenceName,
@Nullable String previsousSegmentId,
@Nullable String previousSegmentId,
boolean skipSegmentLineageCheck
)
{
Expand All @@ -67,7 +67,7 @@ public LockRequestForNewSegment(
this.partialShardSpec = partialShardSpec;
this.priority = priority;
this.sequenceName = sequenceName;
this.previsousSegmentId = previsousSegmentId;
this.previousSegmentId = previousSegmentId;
this.skipSegmentLineageCheck = skipSegmentLineageCheck;
}

Expand All @@ -79,7 +79,7 @@ public LockRequestForNewSegment(
Interval interval,
PartialShardSpec partialShardSpec,
String sequenceName,
@Nullable String previsousSegmentId,
@Nullable String previousSegmentId,
boolean skipSegmentLineageCheck
)
{
Expand All @@ -92,7 +92,7 @@ public LockRequestForNewSegment(
partialShardSpec,
task.getPriority(),
sequenceName,
previsousSegmentId,
previousSegmentId,
skipSegmentLineageCheck
);
}
Expand Down Expand Up @@ -168,9 +168,9 @@ public String getSequenceName()
}

@Nullable
public String getPrevisousSegmentId()
public String getPreviousSegmentId()
{
return previsousSegmentId;
return previousSegmentId;
}

public boolean isSkipSegmentLineageCheck()
Expand All @@ -190,7 +190,7 @@ public String toString()
", partialShardSpec=" + partialShardSpec +
", priority=" + priority +
", sequenceName='" + sequenceName + '\'' +
", previsousSegmentId='" + previsousSegmentId + '\'' +
", previousSegmentId='" + previousSegmentId + '\'' +
", skipSegmentLineageCheck=" + skipSegmentLineageCheck +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques
return metadataStorageCoordinator.allocatePendingSegment(
request.getDataSource(),
request.getSequenceName(),
request.getPrevisousSegmentId(),
request.getPreviousSegmentId(),
request.getInterval(),
request.getPartialShardSpec(),
version,
Expand All @@ -773,7 +773,7 @@ private SegmentIdWithShardSpec allocateSegmentId(LockRequestForNewSegment reques
* @param intervals intervals
* @param action action to be performed inside of the critical section
*/
public <T> T doInCriticalSection(Task task, List<Interval> intervals, CriticalAction<T> action) throws Exception
public <T> T doInCriticalSection(Task task, Set<Interval> intervals, CriticalAction<T> action) throws Exception
{
giant.lock();

Expand All @@ -790,7 +790,7 @@ public <T> T doInCriticalSection(Task task, List<Interval> intervals, CriticalAc
* It doesn't check other semantics like acquired locks are enough to overwrite existing segments.
* This kind of semantic should be checked in each caller of {@link #doInCriticalSection}.
*/
private boolean isTaskLocksValid(Task task, List<Interval> intervals)
private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
{
giant.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void setup() throws IOException
expectedUnusedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "1"));

actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUnusedSegments);
.commitSegments(expectedUnusedSegments);

expectedUnusedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

Expand All @@ -70,7 +70,7 @@ public void setup() throws IOException
expectedUsedSegments.add(createSegment(Intervals.of("2017-10-07/2017-10-08"), "2"));

actionTestKit.getMetadataStorageCoordinator()
.announceHistoricalSegments(expectedUsedSegments);
.commitSegments(expectedUsedSegments);

expectedUsedSegments.forEach(s -> actionTestKit.getTaskLockbox().unlock(task, s.getInterval()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ public void testAddToExistingLinearShardSpecsSameGranularity() throws Exception
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -639,7 +639,7 @@ public void testAddToExistingNumberedShardSpecsSameGranularity() throws Exceptio
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -702,7 +702,7 @@ public void testAddToExistingNumberedShardSpecsCoarserPreferredGranularity() thr
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -741,7 +741,7 @@ public void testAddToExistingNumberedShardSpecsFinerPreferredGranularity() throw
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -780,7 +780,7 @@ public void testCannotAddToExistingNumberedShardSpecsWithCoarserQueryGranularity
{
final Task task = NoopTask.create();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down Expand Up @@ -825,7 +825,7 @@ public void testWithPartialShardSpecAndOvershadowingSegments() throws IOExceptio

final ObjectMapper objectMapper = new DefaultObjectMapper();

taskActionTestKit.getMetadataStorageCoordinator().announceHistoricalSegments(
taskActionTestKit.getMetadataStorageCoordinator().commitSegments(
ImmutableSet.of(
DataSegment.builder()
.dataSource(DATA_SOURCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testSimple() throws Exception
acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000);
actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
Collections.singleton(INTERVAL),
CriticalAction.builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
Expand Down Expand Up @@ -137,7 +137,7 @@ public void testFailBadVersion() throws Exception
thrown.expectMessage(CoreMatchers.containsString("are not covered by locks"));
final Set<DataSegment> segments = actionTestKit.getTaskLockbox().doInCriticalSection(
task,
Collections.singletonList(INTERVAL),
Collections.singleton(INTERVAL),
CriticalAction.<Set<DataSegment>>builder()
.onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox()))
.onInvalidLocks(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1507,9 +1507,9 @@ private void makeToolboxFactory(final File directory)
)
{
@Override
public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) throws IOException
public Set<DataSegment> commitSegments(Set<DataSegment> segments) throws IOException
{
Set<DataSegment> result = super.announceHistoricalSegments(segments);
Set<DataSegment> result = super.commitSegments(segments);

Assert.assertFalse(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
Expand All @@ -1523,13 +1523,13 @@ public Set<DataSegment> announceHistoricalSegments(Set<DataSegment> segments) th
}

@Override
public SegmentPublishResult announceHistoricalSegments(
public SegmentPublishResult commitSegmentsAndMetadata(
Set<DataSegment> segments,
DataSourceMetadata startMetadata,
DataSourceMetadata endMetadata
) throws IOException
{
SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata);
SegmentPublishResult result = super.commitSegmentsAndMetadata(segments, startMetadata, endMetadata);

Assert.assertNotNull(
"Segment latch not initialized, did you forget to call expectPublishSegments?",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testKill() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -114,7 +114,7 @@ public void testKillWithMarkUnused() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -178,7 +178,7 @@ public void testKillBatchSizeOneAndLimit4() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down Expand Up @@ -223,7 +223,7 @@ public void testKillBatchSizeThree() throws Exception
newSegment(Intervals.of("2019-03-01/2019-04-01"), version),
newSegment(Intervals.of("2019-04-01/2019-05-01"), version)
);
final Set<DataSegment> announced = getMetadataStorageCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> announced = getMetadataStorageCoordinator().commitSegments(segments);

Assert.assertEquals(segments, announced);

Expand Down
Loading

0 comments on commit 409bffe

Please sign in to comment.