Skip to content

Commit

Permalink
Add druid.indexing.formats.stringMultiValueHandlingMode system conf…
Browse files Browse the repository at this point in the history
…ig (apache#16822)

This patch introduces an optional cluster configuration, druid.indexing.formats.stringMultiValueHandlingMode, allowing operators to override the default mode SORTED_SET for string dimensions. The possible values for the config are SORTED_SET, SORTED_ARRAY, or ARRAY (SORTED_SET is the default). Case insensitive values are allowed.
While this cluster property allows users to manage the multi-value handling mode for string dimension types, it's recommended to migrate to using real array types instead of MVDs.
 
This fixes a long-standing issue where compaction will honor the configured cluster wide property instead of rewriting it as the default SORTED_ARRAY always, even if the data was originally ingested with ARRAY or SORTED_SET.
  • Loading branch information
abhishekrb19 authored Aug 3, 2024
1 parent 9dc2569 commit 31b4375
Show file tree
Hide file tree
Showing 44 changed files with 232 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.AggregatorsModule;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Pair;
Expand Down Expand Up @@ -68,7 +68,7 @@ public class GroupByDeserializationBenchmark

static {
NullHandling.initializeForTests();
NestedDataModule.registerHandlersAndSerde();
BuiltInTypesModule.registerHandlersAndSerde();
AggregatorsModule.registerComplexMetricsAndSerde();
}

Expand All @@ -93,7 +93,7 @@ public class GroupByDeserializationBenchmark
public void setup() throws JsonProcessingException
{
final ObjectMapper undecoratedMapper = TestHelper.makeJsonMapper();
undecoratedMapper.registerModules(NestedDataModule.getJacksonModulesList());
undecoratedMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
undecoratedMapper.registerModule(new AggregatorsModule());
final Pair<GroupByQuery, String> sqlQueryAndResultRow = sqlQueryAndResultRow(
numDimensions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.write.FrameWriters;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.NonnullPair;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class FrameChannelMergerBenchmark
{
static {
NullHandling.initializeForTests();
NestedDataModule.registerHandlersAndSerde();
BuiltInTypesModule.registerHandlersAndSerde();
}

private static final String KEY = "key";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -89,7 +89,7 @@ public class SqlGroupByBenchmark
static {
NullHandling.initializeForTests();
ExpressionProcessing.initializeForTests();
NestedDataModule.registerHandlersAndSerde();
BuiltInTypesModule.registerHandlersAndSerde();
}

private static final Logger log = new Logger(SqlGroupByBenchmark.class);
Expand Down Expand Up @@ -331,7 +331,7 @@ public void setup()

// Hacky and pollutes global namespace, but it is fine since benchmarks are run in isolation. Wasn't able
// to work up a cleaner way of doing it by modifying the injector.
CalciteTests.getJsonMapper().registerModules(NestedDataModule.getJacksonModulesList());
CalciteTests.getJsonMapper().registerModules(BuiltInTypesModule.getJacksonModulesList());

final DruidSchemaCatalog rootSchema =
CalciteTests.createMockRootSchema(conglomerate, walker, plannerConfig, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.LocalInputSource;
import org.apache.druid.data.input.impl.systemfield.SystemFields;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.msq.indexing.MSQSpec;
import org.apache.druid.msq.indexing.MSQTuningConfig;
Expand Down Expand Up @@ -64,7 +64,7 @@
public class MSQComplexGroupByTest extends MSQTestBase
{
static {
NestedDataModule.registerHandlersAndSerde();
BuiltInTypesModule.registerHandlersAndSerde();
}

private String dataFileNameJsonString;
Expand Down Expand Up @@ -109,7 +109,7 @@ public void setup() throws IOException
dataFileSignature
);

objectMapper.registerModules(NestedDataModule.getJacksonModulesList());
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
}

@MethodSource("data")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@
import org.apache.druid.frame.channel.FrameChannelSequence;
import org.apache.druid.frame.processor.Bouncer;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.DruidInjectorBuilder;
import org.apache.druid.guice.DruidSecondaryModule;
import org.apache.druid.guice.ExpressionModule;
import org.apache.druid.guice.GuiceInjectors;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.SegmentWranglerModule;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.guice.annotations.EscalatedGlobal;
Expand Down Expand Up @@ -357,8 +357,8 @@ public void configure(Binder binder)
{
// We want this module to bring InputSourceModule along for the ride.
binder.install(new InputSourceModule());
binder.install(new NestedDataModule());
NestedDataModule.registerHandlersAndSerde();
binder.install(new BuiltInTypesModule());
BuiltInTypesModule.registerHandlersAndSerde();
SqlBindings.addOperatorConversion(binder, ExternalOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, HttpOperatorConversion.class);
SqlBindings.addOperatorConversion(binder, InlineOperatorConversion.class);
Expand Down Expand Up @@ -521,7 +521,7 @@ public String getFormatString()
objectMapper = setupObjectMapper(injector);
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
objectMapper.registerModules(sqlModule.getJacksonModules());
objectMapper.registerModules(NestedDataModule.getJacksonModulesList());
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());

doReturn(mock(Request.class)).when(brokerClient).makeRequest(any(), anyString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Strings;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.guice.annotations.PublicApi;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
Expand Down Expand Up @@ -110,10 +111,9 @@ public static MultiValueHandling fromString(String name)
return name == null ? ofDefault() : valueOf(StringUtils.toUpperCase(name));
}

// this can be system configuration
public static MultiValueHandling ofDefault()
{
return SORTED_ARRAY;
return BuiltInTypesModule.getStringMultiValueHandlingMode();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Binder;
import com.google.inject.Provides;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.segment.DefaultColumnFormatConfig;
import org.apache.druid.segment.DimensionHandler;
Expand All @@ -38,11 +39,19 @@
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;

import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;

public class NestedDataModule implements DruidModule
public class BuiltInTypesModule implements DruidModule
{
/**
* Initialized with a default value so tests can just get it via {@link #getStringMultiValueHandlingMode} without any
* explicit initialization. In production, this default may be overridden if a value is configured via
* {@link #initDimensionHandlerAndMvHandlingMode(DefaultColumnFormatConfig)}.
*/
private static DimensionSchema.MultiValueHandling STRING_MV_MODE = DimensionSchema.MultiValueHandling.SORTED_ARRAY;

@Override
public List<? extends Module> getJacksonModules()
{
Expand All @@ -53,14 +62,15 @@ public List<? extends Module> getJacksonModules()
public void configure(Binder binder)
{
registerSerde();
// binding our side effect class to the lifecycle causes registerHandler to be called on service start, allowing
// use of the config to get the system default format version
LifecycleModule.register(binder, SideEffectHandlerRegisterer.class);
// binding our side effect classes to the lifecycle causes the initDimensionHandlerAndMvHandlingMode to be
// called on service start, allowing use of the config to get the system default format version and string multi
// value handling mode.
LifecycleModule.register(binder, SideEffectRegisterer.class);
}

@Provides
@LazySingleton
public SideEffectHandlerRegisterer registerHandler(DefaultColumnFormatConfig formatsConfig)
public SideEffectRegisterer initDimensionHandlerAndMvHandlingMode(DefaultColumnFormatConfig formatsConfig)
{
if (formatsConfig.getNestedColumnFormatVersion() != null && formatsConfig.getNestedColumnFormatVersion() == 4) {
DimensionHandlerUtils.registerDimensionHandlerProvider(
Expand All @@ -73,7 +83,25 @@ public SideEffectHandlerRegisterer registerHandler(DefaultColumnFormatConfig for
new NestedCommonFormatHandlerProvider()
);
}
return new SideEffectHandlerRegisterer();

setStringMultiValueHandlingModeIfConfigured(formatsConfig.getStringMultiValueHandlingMode());
return new SideEffectRegisterer();
}

private static void setStringMultiValueHandlingModeIfConfigured(@Nullable String stringMultiValueHandlingMode)
{
if (stringMultiValueHandlingMode != null) {
STRING_MV_MODE = DimensionSchema.MultiValueHandling.fromString(stringMultiValueHandlingMode);
}
}

/**
* @return the configured string multi value handling mode from the system config if set; otherwise, returns
* the default.
*/
public static DimensionSchema.MultiValueHandling getStringMultiValueHandlingMode()
{
return STRING_MV_MODE;
}

public static List<SimpleModule> getJacksonModulesList()
Expand Down Expand Up @@ -126,13 +154,15 @@ public DimensionHandler<StructuredData, StructuredData, StructuredData> get(Stri
return new NestedDataColumnHandlerV4(dimensionName);
}
}

/**
* this is used as a vehicle to register the correct version of the system default nested column handler by side
* effect with the help of binding to {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} so that
* {@link #registerHandler(DefaultColumnFormatConfig)} can be called with the injected
* this is used as a vehicle to register the correct version of the system default nested column handler and multi
* value handling mode by side effect with the help of binding to
* {@link org.apache.druid.java.util.common.lifecycle.Lifecycle} so that
* {@link #initDimensionHandlerAndMvHandlingMode(DefaultColumnFormatConfig)} can be called with the injected
* {@link DefaultColumnFormatConfig}.
*/
public static class SideEffectHandlerRegisterer
public static class SideEffectRegisterer
{
// nothing to see here
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.error.DruidException;

import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Objects;

public class DefaultColumnFormatConfig
Expand All @@ -39,16 +41,44 @@ public static void validateNestedFormatVersion(@Nullable Integer formatVersion)
}
}

private static void validateMultiValueHandlingMode(@Nullable String stringMultiValueHandlingMode)
{
if (stringMultiValueHandlingMode != null) {
try {
DimensionSchema.MultiValueHandling.fromString(stringMultiValueHandlingMode);
}
catch (IllegalArgumentException e) {
throw DruidException.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.INVALID_INPUT)
.build(
"Invalid value[%s] specified for 'druid.indexing.formats.stringMultiValueHandlingMode'."
+ " Supported values are [%s].",
stringMultiValueHandlingMode,
Arrays.toString(DimensionSchema.MultiValueHandling.values())
);
}
}
}

@Nullable
@JsonProperty("nestedColumnFormatVersion")
private final Integer nestedColumnFormatVersion;

@Nullable
@JsonProperty("stringMultiValueHandlingMode")
private final String stringMultiValueHandlingMode;

@JsonCreator
public DefaultColumnFormatConfig(
@JsonProperty("nestedColumnFormatVersion") @Nullable Integer nestedColumnFormatVersion
@JsonProperty("nestedColumnFormatVersion") @Nullable Integer nestedColumnFormatVersion,
@JsonProperty("stringMultiValueHandlingMode") @Nullable String stringMultiValueHandlingMode
)
{
validateNestedFormatVersion(nestedColumnFormatVersion);
validateMultiValueHandlingMode(stringMultiValueHandlingMode);

this.nestedColumnFormatVersion = nestedColumnFormatVersion;
validateNestedFormatVersion(this.nestedColumnFormatVersion);
this.stringMultiValueHandlingMode = stringMultiValueHandlingMode;
}

@Nullable
Expand All @@ -58,6 +88,13 @@ public Integer getNestedColumnFormatVersion()
return nestedColumnFormatVersion;
}

@Nullable
@JsonProperty("stringMultiValueHandlingMode")
public String getStringMultiValueHandlingMode()
{
return stringMultiValueHandlingMode;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -68,20 +105,22 @@ public boolean equals(Object o)
return false;
}
DefaultColumnFormatConfig that = (DefaultColumnFormatConfig) o;
return Objects.equals(nestedColumnFormatVersion, that.nestedColumnFormatVersion);
return Objects.equals(nestedColumnFormatVersion, that.nestedColumnFormatVersion)
&& Objects.equals(stringMultiValueHandlingMode, that.stringMultiValueHandlingMode);
}

@Override
public int hashCode()
{
return Objects.hash(nestedColumnFormatVersion);
return Objects.hash(nestedColumnFormatVersion, stringMultiValueHandlingMode);
}

@Override
public String toString()
{
return "DefaultColumnFormatConfig{" +
"nestedColumnFormatVersion=" + nestedColumnFormatVersion +
", stringMultiValueHandlingMode=" + stringMultiValueHandlingMode +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private static String emptyToNullIfNeeded(@Nullable Object o)
private volatile boolean hasMultipleValues = false;

public StringDimensionIndexer(
MultiValueHandling multiValueHandling,
@Nullable MultiValueHandling multiValueHandling,
boolean hasBitmapIndexes,
boolean hasSpatialIndexes,
boolean useMaxMemoryEstimates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.Comparators;
Expand Down Expand Up @@ -62,7 +62,7 @@ public class NestedDataComplexTypeSerde extends ComplexMetricSerde
smileFactory.delegateToTextual(true);
final ObjectMapper mapper = new DefaultObjectMapper(smileFactory, null);
mapper.getFactory().setCodec(mapper);
mapper.registerModules(NestedDataModule.getJacksonModulesList());
mapper.registerModules(BuiltInTypesModule.getJacksonModulesList());
OBJECT_MAPPER = mapper;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.druid.frame.segment.FrameSegment;
import org.apache.druid.frame.segment.FrameStorageAdapter;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.guice.NestedDataModule;
import org.apache.druid.guice.BuiltInTypesModule;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RE;
Expand Down Expand Up @@ -92,7 +92,7 @@ public class FrameWriterTest extends InitializedNullHandlingTest

static {
ComplexMetrics.registerSerde(HyperUniquesSerde.TYPE_NAME, new HyperUniquesSerde());
NestedDataModule.registerHandlersAndSerde();
BuiltInTypesModule.registerHandlersAndSerde();
}

private static final int DEFAULT_ALLOCATOR_CAPACITY = 1_000_000;
Expand Down
Loading

0 comments on commit 31b4375

Please sign in to comment.