diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 0ed81f6c1e8c..321c33fa1dbf 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -692,9 +692,16 @@ public RowSignature buildDataSourceRowSignature(final String dataSource) RowSignature rowSignature = optionalSchema.get().getSchemaPayload().getRowSignature(); mergeRowSignature(columnTypes, rowSignature); } else { - // mark it for refresh, however, this case shouldn't arise by design - markSegmentAsNeedRefresh(segmentId); log.debug("SchemaMetadata for segmentId [%s] is absent.", segmentId); + + ImmutableDruidDataSource druidDataSource = + sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(segmentId.getDataSource()); + + if (druidDataSource != null && druidDataSource.getSegment(segmentId) != null) { + // mark it for refresh only if it is used + // however, this case shouldn't arise by design + markSegmentAsNeedRefresh(segmentId); + } } } } else { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 283c1687a972..0c099cb551cb 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -2290,6 +2290,102 @@ public void addSegment(final DruidServerMetadata server, final DataSegment segme Assert.assertEquals(0, metadatas.size()); } + @Test + public void testUnusedSegmentIsNotRefreshed() throws InterruptedException, IOException + { + String dataSource = "xyz"; + CountDownLatch latch = new CountDownLatch(1); + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + segmentSchemaCache, + backFillQueue, + sqlSegmentsMetadataManager, + segmentsMetadataManagerConfigSupplier + ) { + @Override + public void refresh(Set segmentsToRefresh, Set dataSourcesToRebuild) + throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + latch.countDown(); + } + }; + + List segments = ImmutableList.of( + newSegment(dataSource, 1), + newSegment(dataSource, 2), + newSegment(dataSource, 3) + ); + + final DruidServer historicalServer = druidServers.stream() + .filter(s -> s.getType().equals(ServerType.HISTORICAL)) + .findAny() + .orElse(null); + + Assert.assertNotNull(historicalServer); + final DruidServerMetadata historicalServerMetadata = historicalServer.getMetadata(); + + ImmutableMap.Builder segmentStatsMap = new ImmutableMap.Builder<>(); + segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp")); + segmentStatsMap.put(segments.get(1).getId(), new SegmentMetadata(20L, "fp")); + segmentStatsMap.put(segments.get(2).getId(), new SegmentMetadata(20L, "fp")); + + ImmutableMap.Builder schemaPayloadMap = new ImmutableMap.Builder<>(); + schemaPayloadMap.put("fp", new SchemaPayload(RowSignature.builder().add("c1", ColumnType.DOUBLE).build())); + segmentSchemaCache.updateFinalizedSegmentSchema( + new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build()) + ); + + schema.addSegment(historicalServerMetadata, segments.get(0)); + schema.addSegment(historicalServerMetadata, segments.get(1)); + schema.addSegment(historicalServerMetadata, segments.get(2)); + + serverView.addSegment(segments.get(0), ServerType.HISTORICAL); + serverView.addSegment(segments.get(1), ServerType.HISTORICAL); + serverView.addSegment(segments.get(2), ServerType.HISTORICAL); + + schema.onLeaderStart(); + schema.awaitInitialization(); + + Assert.assertTrue(latch.await(2, TimeUnit.SECONDS)); + + // make segment3 unused + segmentStatsMap = new ImmutableMap.Builder<>(); + segmentStatsMap.put(segments.get(0).getId(), new SegmentMetadata(20L, "fp")); + + segmentSchemaCache.updateFinalizedSegmentSchema( + new SegmentSchemaCache.FinalizedSegmentSchemaInfo(segmentStatsMap.build(), schemaPayloadMap.build()) + ); + + Map segmentMap = new HashMap<>(); + segmentMap.put(segments.get(0).getId(), segments.get(0)); + segmentMap.put(segments.get(1).getId(), segments.get(1)); + + ImmutableDruidDataSource druidDataSource = + new ImmutableDruidDataSource( + "xyz", + Collections.emptyMap(), + segmentMap + ); + + Mockito.when(sqlSegmentsMetadataManager.getImmutableDataSourceWithUsedSegments(ArgumentMatchers.anyString())) + .thenReturn(druidDataSource); + + Set segmentsToRefresh = segments.stream().map(DataSegment::getId).collect(Collectors.toSet()); + segmentsToRefresh.remove(segments.get(1).getId()); + segmentsToRefresh.remove(segments.get(2).getId()); + + schema.refresh(segmentsToRefresh, Sets.newHashSet(dataSource)); + + Assert.assertTrue(schema.getSegmentsNeedingRefresh().contains(segments.get(1).getId())); + Assert.assertFalse(schema.getSegmentsNeedingRefresh().contains(segments.get(2).getId())); + } + private void verifyFooDSSchema(CoordinatorSegmentMetadataCache schema, int columns) { final DataSourceInformation fooDs = schema.getDatasource("foo");