Skip to content

Commit

Permalink
Non-existent datasource shouldn't affect schema rebuilding for other …
Browse files Browse the repository at this point in the history
…datasources (apache#15355)

In pull request apache#14985, a bug was introduced where periodic refresh would skip rebuilding a datasource's schema after encountering a non-existent datasource. This resulted in remaining datasources having stale schema information.

This change addresses the bug and adds a unit test to validate the refresh mechanism's behaviour when a datasource is removed, and other datasources have schema changes.
  • Loading branch information
findingrish authored and ycp2 committed Nov 17, 2023
1 parent b747682 commit d198f5a
Show file tree
Hide file tree
Showing 5 changed files with 287 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
for (String dataSource : dataSourcesToRebuild) {
final RowSignature rowSignature = buildDataSourceRowSignature(dataSource);
if (rowSignature == null) {
log.info("RowSignature null for dataSource [%s], implying it no longer exists, all metadata removed.", dataSource);
log.info("RowSignature null for dataSource [%s], implying that it no longer exists. All metadata removed.", dataSource);
tables.remove(dataSource);
return;
continue;
}

DataSourceInformation druidTable = new DataSourceInformation(dataSource, rowSignature);
final DataSourceInformation oldTable = tables.put(dataSource, druidTable);
if (oldTable == null || !oldTable.getRowSignature().equals(druidTable.getRowSignature())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,37 @@
package org.apache.druid.segment.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.InternalQueryConfig;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.metadata.metadata.AllColumnIncluderator;
import org.apache.druid.query.metadata.metadata.ColumnAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentAnalysis;
import org.apache.druid.query.metadata.metadata.SegmentMetadataQuery;
import org.apache.druid.query.spec.MultipleSpecificSegmentSpec;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.QueryLifecycle;
import org.apache.druid.server.QueryLifecycleFactory;
import org.apache.druid.server.QueryResponse;
Expand All @@ -53,16 +62,19 @@
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -361,6 +373,135 @@ public void testNullDatasource() throws IOException, InterruptedException
Assert.assertEquals(5, schema.getSegmentMetadataSnapshot().size());
}

@Test
public void testAllDatasourcesRebuiltOnDatasourceRemoval() throws IOException, InterruptedException
{
CountDownLatch addSegmentLatch = new CountDownLatch(7);
CoordinatorSegmentMetadataCache schema = new CoordinatorSegmentMetadataCache(
getQueryLifecycleFactory(walker),
serverView,
SEGMENT_CACHE_CONFIG_DEFAULT,
new NoopEscalator(),
new InternalQueryConfig(),
new NoopServiceEmitter()
)
{
@Override
public void addSegment(final DruidServerMetadata server, final DataSegment segment)
{
super.addSegment(server, segment);
addSegmentLatch.countDown();
}

@Override
public void removeSegment(final DataSegment segment)
{
super.removeSegment(segment);
}

@Override
public void markDataSourceAsNeedRebuild(String datasource)
{
super.markDataSourceAsNeedRebuild(datasource);
markDataSourceLatch.countDown();
}

@Override
@VisibleForTesting
public void refresh(
final Set<SegmentId> segmentsToRefresh,
final Set<String> dataSourcesToRebuild) throws IOException
{
super.refresh(segmentsToRefresh, dataSourcesToRebuild);
}
};

schema.start();
schema.awaitInitialization();

final Map<SegmentId, AvailableSegmentMetadata> segmentMetadatas = schema.getSegmentMetadataSnapshot();
List<DataSegment> segments = segmentMetadatas.values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());
Assert.assertEquals(6, segments.size());

// verify that dim3 column isn't present in schema for datasource foo
DataSourceInformation fooDs = schema.getDatasource("foo");
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().noneMatch("dim3"::equals));

// segments contains two segments with datasource "foo" and one with datasource "foo2"
// let's remove the only segment with datasource "foo2"
final DataSegment segmentToRemove = segments.stream()
.filter(segment -> segment.getDataSource().equals("foo2"))
.findFirst()
.orElse(null);
Assert.assertNotNull(segmentToRemove);
schema.removeSegment(segmentToRemove);

// we will add a segment to another datasource and
// check if columns in this segment is reflected in the datasource schema
DataSegment newSegment =
DataSegment.builder()
.dataSource(DATASOURCE1)
.interval(Intervals.of("2002/P1Y"))
.version("1")
.shardSpec(new LinearShardSpec(0))
.size(0)
.build();

final File tmpDir = temporaryFolder.newFolder();

List<InputRow> rows = ImmutableList.of(
createRow(ImmutableMap.of("t", "2002-01-01", "m1", "1.0", "dim1", "", "dim3", "c1")),
createRow(ImmutableMap.of("t", "2002-01-02", "m1", "2.0", "dim1", "10.1", "dim3", "c2")),
createRow(ImmutableMap.of("t", "2002-01-03", "m1", "3.0", "dim1", "2", "dim3", "c3"))
);

QueryableIndex index = IndexBuilder.create()
.tmpDir(new File(tmpDir, "1"))
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
.schema(
new IncrementalIndexSchema.Builder()
.withMetrics(
new CountAggregatorFactory("cnt"),
new DoubleSumAggregatorFactory("m1", "m1"),
new HyperUniquesAggregatorFactory("unique_dim1", "dim1")
)
.withRollup(false)
.build()
)
.rows(rows)
.buildMMappedIndex();

walker.add(newSegment, index);
serverView.addSegment(newSegment, ServerType.HISTORICAL);

Assert.assertTrue(addSegmentLatch.await(1, TimeUnit.SECONDS));

Set<String> dataSources = segments.stream().map(DataSegment::getDataSource).collect(Collectors.toSet());
dataSources.remove("foo2");

// LinkedHashSet to ensure we encounter the remove datasource first
Set<String> dataSourcesToRefresh = new LinkedHashSet<>();
dataSourcesToRefresh.add("foo2");
dataSourcesToRefresh.addAll(dataSources);

segments = schema.getSegmentMetadataSnapshot().values()
.stream()
.map(AvailableSegmentMetadata::getSegment)
.collect(Collectors.toList());

schema.refresh(segments.stream().map(DataSegment::getId).collect(Collectors.toSet()), dataSourcesToRefresh);
Assert.assertEquals(6, schema.getSegmentMetadataSnapshot().size());

fooDs = schema.getDatasource("foo");

// check if the new column present in the added segment is present in the datasource schema
// ensuring that the schema is rebuilt
Assert.assertTrue(fooDs.getRowSignature().getColumnNames().stream().anyMatch("dim3"::equals));
}

@Test
public void testNullAvailableSegmentMetadata() throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public void tearDown() throws Exception
resourceCloser.close();
}

InputRow createRow(final ImmutableMap<String, ?> map)
public InputRow createRow(final ImmutableMap<String, ?> map)
{
return MapInputRowParser.parse(FOO_SCHEMA, (Map<String, Object>) map);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public void refresh(final Set<SegmentId> segmentsToRefresh, final Set<String> da
if (rowSignature == null) {
log.info("datasource [%s] no longer exists, all metadata removed.", dataSource);
tables.remove(dataSource);
return;
continue;
}

final PhysicalDatasourceMetadata physicalDatasourceMetadata = dataSourceMetadataFactory.build(dataSource, rowSignature);
Expand Down
Loading

0 comments on commit d198f5a

Please sign in to comment.