Skip to content

Commit

Permalink
Filter out tombstone segments from metadata cache (apache#16890)
Browse files Browse the repository at this point in the history
* Fix build

* Support segment metadata queries for tombstones

* Filter out tombstone segments from metadata cache

* Revert some changes

* checkstyle

* Update docs
  • Loading branch information
findingrish authored Aug 20, 2024
1 parent 518f642 commit bc4b3a2
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,13 @@ public int getTotalSegments()
@VisibleForTesting
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
// Skip adding tombstone segment to the cache. These segments lack data or column information.
// Additionally, segment metadata queries, which are not yet implemented for tombstone segments
// (see: https://github.com/apache/druid/pull/12137) do not provide metadata for tombstones,
// leading to indefinite refresh attempts for these segments.
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
// someday we could hypothetically remove broker special casing, whenever BrokerServerView supports tracking
Expand Down Expand Up @@ -530,6 +537,10 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme
@VisibleForTesting
public void removeSegment(final DataSegment segment)
{
// tombstone segments are not present in the cache
if (segment.isTombstone()) {
return;
}
// Get lock first so that we won't wait in ConcurrentMap.compute().
synchronized (lock) {
log.debug("Segment [%s] is gone.", segment.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -2214,6 +2215,77 @@ protected void coldDatasourceSchemaExec()
Assert.assertEquals(0, latch.getCount());
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);

CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
segmentSchemaCache,
backFillQueue,
sqlSegmentsMetadataManager,
segmentsMetadataManagerConfigSupplier
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};

schema.onLeaderStart();
schema.awaitInitialization();

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);

Assert.assertEquals(6, schema.getTotalSegments());

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
}

private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns)
{
final DataSourceInformation fooDs = schema.getDatasource("foo");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.TombstoneShardSpec;
import org.easymock.EasyMock;
import org.joda.time.Period;
import org.junit.After;
Expand Down Expand Up @@ -1136,4 +1137,73 @@ public void testNoDatasourceSchemaWhenNoSegmentMetadata() throws InterruptedExce

Assert.assertNull(schema.getDatasource("foo"));
}

@Test
public void testTombstoneSegmentIsNotAdded() throws InterruptedException
{
String datasource = "newSegmentAddTest";
CountDownLatch addSegmentLatch = new CountDownLatch(1);
BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache(
CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate),
serverView,
BrokerSegmentMetadataCacheConfig.create(),
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter(),
new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager),
new NoopCoordinatorClient(),
CentralizedDatasourceSchemaConfig.create()
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
if (datasource.equals(segment.getDataSource())) {
addSegmentLatch.countDown();
}
}
};

schema.start();
schema.awaitInitialization();

DataSegment segment = new DataSegment(
datasource,
Intervals.of("2001/2002"),
"1",
Collections.emptyMap(),
Collections.emptyList(),
Collections.emptyList(),
TombstoneShardSpec.INSTANCE,
null,
null,
0
);

Assert.assertEquals(6, schema.getTotalSegments());

serverView.addSegment(segment, ServerType.HISTORICAL);
Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));
Assert.assertEquals(0, addSegmentLatch.getCount());

Assert.assertEquals(6, schema.getTotalSegments());
List<AvailableSegmentMetadata> metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());

serverView.removeSegment(segment, ServerType.HISTORICAL);
Assert.assertEquals(6, schema.getTotalSegments());
metadatas = schema
.getSegmentMetadataSnapshot()
.values()
.stream()
.filter(metadata -> datasource.equals(metadata.getSegment().getDataSource()))
.collect(Collectors.toList());
Assert.assertEquals(0, metadatas.size());
}
}

0 comments on commit bc4b3a2

Please sign in to comment.