From d198f5a1870afc88a964c02689aaa5f6da72bdb7 Mon Sep 17 00:00:00 2001 From: Rishabh Singh <6513075+findingrish@users.noreply.github.com> Date: Tue, 14 Nov 2023 12:52:33 +0530 Subject: [PATCH] Non-existent datasource shouldn't affect schema rebuilding for other datasources (#15355) In pull request #14985, a bug was introduced where periodic refresh would skip rebuilding a datasource's schema after encountering a non-existent datasource. This resulted in remaining datasources having stale schema information. This change addresses the bug and adds a unit test to validate the refresh mechanism's behaviour when a datasource is removed, and other datasources have schema changes. --- .../CoordinatorSegmentMetadataCache.java | 5 +- .../CoordinatorSegmentMetadataCacheTest.java | 141 ++++++++++++++++++ .../metadata/SegmentMetadataCacheCommon.java | 2 +- .../schema/BrokerSegmentMetadataCache.java | 2 +- .../BrokerSegmentMetadataCacheTest.java | 141 ++++++++++++++++++ 5 files changed, 287 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java index 960921c9e93b0..1badb2383d442 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCache.java @@ -134,10 +134,11 @@ public void refresh(final Set segmentsToRefresh, final Set da for (String dataSource : dataSourcesToRebuild) { final RowSignature rowSignature = buildDataSourceRowSignature(dataSource); if (rowSignature == null) { - log.info("RowSignature null for dataSource [%s], implying it no longer exists, all metadata removed.", dataSource); + log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource); tables.remove(dataSource); - return; + continue; } + DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature); final DataSourceInformation oldTable = tables.put(dataSource, druidTable); if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java index 22a5d7a67c419..31176a17f1920 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentMetadataCacheTest.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -27,6 +28,7 @@ import com.google.common.collect.Sets; import org.apache.druid.client.DruidServer; import org.apache.druid.client.InternalQueryConfig; +import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -34,14 +36,21 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.ColumnAnalysis; import org.apache.druid.query.metadata.metadata.SegmentAnalysis; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryResponse; @@ -53,16 +62,19 @@ import org.apache.druid.server.security.NoopEscalator; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.EnumSet; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -361,6 +373,135 @@ public void testNullDatasource() throws IOException, InterruptedException Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } + @Test + public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException + { + CountDownLatch addSegmentLatch = new CountDownLatch(7); + CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache( + getQueryLifecycleFactory(walker), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter() + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + addSegmentLatch.countDown(); + } + + @Override + public void removeSegment(final DataSegment segment) + { + super.removeSegment(segment); + } + + @Override + public void markDataSourceAsNeedRebuild(String datasource) + { + super.markDataSourceAsNeedRebuild(datasource); + markDataSourceLatch.countDown(); + } + + @Override + @VisibleForTesting + public void refresh( + final Set segmentsToRefresh, + final Set dataSourcesToRebuild) throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + } + }; + + schema.start(); + schema.awaitInitialization(); + + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + Assert.assertEquals(6, segments.size()); + + // verify that dim3 column isn't present in schema for datasource foo + DataSourceInformation fooDs = schema.getDatasource("foo"); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals)); + + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(segmentToRemove); + schema.removeSegment(segmentToRemove); + + // we will add a segment to another datasource and + // check if columns in this segment is reflected in the datasource schema + DataSegment newSegment = + DataSegment.builder() + .dataSource(DATASOURCE1) + .interval(Intervals.of("2002/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + final File tmpDir = temporaryFolder.newFolder(); + + List rows = ImmutableList.of( + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3")) + ); + + QueryableIndex index = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(rows) + .buildMMappedIndex(); + + walker.add(newSegment, index); + serverView.addSegment(newSegment, ServerType.HISTORICAL); + + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); + dataSources.remove("foo2"); + + // LinkedHashSet to ensure we encounter the remove datasource first + Set dataSourcesToRefresh = new LinkedHashSet<>(); + dataSourcesToRefresh.add("foo2"); + dataSourcesToRefresh.addAll(dataSources); + + segments = schema.getSegmentMetadataSnapshot().values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); + Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); + + fooDs = schema.getDatasource("foo"); + + // check if the new column present in the added segment is present in the datasource schema + // ensuring that the schema is rebuilt + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); + } + @Test public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java index d421a15e35fcb..fb7b87580e1fb 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/SegmentMetadataCacheCommon.java @@ -286,7 +286,7 @@ public void tearDown() throws Exception resourceCloser.close(); } - InputRow createRow(final ImmutableMap map) + public InputRow createRow(final ImmutableMap map) { return MapInputRowParser.parse(FOO_SCHEMA, (Map) map); } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java index a52c6d8925959..21c4a50996b7b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java @@ -201,7 +201,7 @@ public void refresh(final Set segmentsToRefresh, final Set da if (rowSignature == null) { log.info("datasource [%s] no longer exists, all metadata removed.", dataSource); tables.remove(dataSource); - return; + continue; } final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java index 3cc566cfc1b0c..dd29082b85873 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheTest.java @@ -36,6 +36,7 @@ import org.apache.druid.client.InternalQueryConfig; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.client.coordinator.NoopCoordinatorClient; +import org.apache.druid.data.input.InputRow; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.guava.Sequences; @@ -44,15 +45,22 @@ import org.apache.druid.query.GlobalTableDataSource; import org.apache.druid.query.QueryContexts; import org.apache.druid.query.TableDataSource; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.metadata.metadata.AllColumnIncluderator; import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery; import org.apache.druid.query.spec.MultipleSpecificSegmentSpec; +import org.apache.druid.segment.IndexBuilder; +import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.metadata.AbstractSegmentMetadataCache; import org.apache.druid.segment.metadata.AvailableSegmentMetadata; import org.apache.druid.segment.metadata.DataSourceInformation; +import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycle; import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryResponse; @@ -67,6 +75,7 @@ import org.apache.druid.sql.calcite.util.CalciteTests; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.LinearShardSpec; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.easymock.EasyMock; import org.junit.After; @@ -76,12 +85,14 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -575,6 +586,136 @@ public void testNullDatasource() throws IOException, InterruptedException Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size()); } + @Test + public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException + { + CountDownLatch addSegmentLatch = new CountDownLatch(7); + BrokerSegmentMetadataCache schema = new BrokerSegmentMetadataCache( + CalciteTests.createMockQueryLifecycleFactory(walker, conglomerate), + serverView, + SEGMENT_CACHE_CONFIG_DEFAULT, + new NoopEscalator(), + new InternalQueryConfig(), + new NoopServiceEmitter(), + new PhysicalDatasourceMetadataFactory(globalTableJoinable, segmentManager), + new NoopCoordinatorClient() + ) + { + @Override + public void addSegment(final DruidServerMetadata server, final DataSegment segment) + { + super.addSegment(server, segment); + addSegmentLatch.countDown(); + } + + @Override + public void removeSegment(final DataSegment segment) + { + super.removeSegment(segment); + } + + @Override + public void markDataSourceAsNeedRebuild(String datasource) + { + super.markDataSourceAsNeedRebuild(datasource); + } + + @Override + @VisibleForTesting + public void refresh( + final Set segmentsToRefresh, + final Set dataSourcesToRebuild) throws IOException + { + super.refresh(segmentsToRefresh, dataSourcesToRebuild); + } + }; + + schema.start(); + schema.awaitInitialization(); + + final Map segmentMetadatas = schema.getSegmentMetadataSnapshot(); + List segments = segmentMetadatas.values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + Assert.assertEquals(6, segments.size()); + + // verify that dim3 column isn't present in the schema for foo + DatasourceTable.PhysicalDatasourceMetadata fooDs = schema.getDatasource("foo"); + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals)); + + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream() + .filter(segment -> segment.getDataSource().equals("foo2")) + .findFirst() + .orElse(null); + Assert.assertNotNull(segmentToRemove); + schema.removeSegment(segmentToRemove); + + // we will add a segment to another datasource and + // check if columns in this segment is reflected in the datasource schema + DataSegment newSegment = + DataSegment.builder() + .dataSource("foo") + .interval(Intervals.of("2002/P1Y")) + .version("1") + .shardSpec(new LinearShardSpec(0)) + .size(0) + .build(); + + final File tmpDir = temporaryFolder.newFolder(); + + List rows = ImmutableList.of( + createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")), + createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")), + createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3")) + ); + + QueryableIndex index = IndexBuilder.create() + .tmpDir(new File(tmpDir, "1")) + .segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()) + .schema( + new IncrementalIndexSchema.Builder() + .withMetrics( + new CountAggregatorFactory("cnt"), + new DoubleSumAggregatorFactory("m1", "m1"), + new HyperUniquesAggregatorFactory("unique_dim1", "dim1") + ) + .withRollup(false) + .build() + ) + .rows(rows) + .buildMMappedIndex(); + + walker.add(newSegment, index); + serverView.addSegment(newSegment, ServerType.HISTORICAL); + + Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS)); + + Set dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet()); + dataSources.remove("foo2"); + + // LinkedHashSet to ensure that the datasource with no segments is encountered first + Set dataSourcesToRefresh = new LinkedHashSet<>(); + dataSourcesToRefresh.add("foo2"); + dataSourcesToRefresh.addAll(dataSources); + + segments = schema.getSegmentMetadataSnapshot().values() + .stream() + .map(AvailableSegmentMetadata::getSegment) + .collect(Collectors.toList()); + + schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh); + Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size()); + + fooDs = schema.getDatasource("foo"); + + // check if the new column present in the added segment is present in the datasource schema + // ensuring that the schema is rebuilt + Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals)); + } + @Test public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException {