Skip to content

Commit

Permalink
Support maxColumnsToMerge in supervisor tuningConfig (#17030)
Browse files Browse the repository at this point in the history
* support maxColumnsToMerge in supervisor specs

* remove log line

* fix style

* add docs

* fix unit tests
  • Loading branch information
georgew5656 authored Sep 11, 2024
1 parent 9e1544e commit 428f58c
Show file tree
Hide file tree
Showing 21 changed files with 127 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka tuning co
|`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.|No|`false`|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Setting `reportParseExceptions` overrides this limit.|No|unlimited|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps track of the most recent parse exceptions. `maxSavedParseExceptions` limits the number of saved exception instances. These saved exceptions are available after the task finishes in the [task completion report](../ingestion/tasks.md#task-reports). Setting `reportParseExceptions` overrides this limit.|No|0|
|`maxColumnsToMerge`|Integer|Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting.|No|-1|

## Start a supervisor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public RabbitStreamIndexTaskTuningConfig(
@Nullable Integer numPersistThreads,
@Nullable Integer recordBufferSize,
@Nullable Integer recordBufferOfferTimeout,
@Nullable Integer maxRecordsPerPoll)
@Nullable Integer maxRecordsPerPoll,
@Nullable Integer maxColumnsToMerge
)
{
super(
appendableIndexSpec,
Expand All @@ -97,7 +99,8 @@ public RabbitStreamIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);

this.recordBufferSize = recordBufferSize;
Expand Down Expand Up @@ -130,7 +133,8 @@ private RabbitStreamIndexTaskTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
this(
Expand All @@ -156,7 +160,9 @@ private RabbitStreamIndexTaskTuningConfig(
numPersistThreads,
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll);
maxRecordsPerPoll,
maxColumnsToMerge
);
}

@Nullable
Expand Down Expand Up @@ -226,7 +232,8 @@ public RabbitStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
getNumPersistThreads(),
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured()
getMaxRecordsPerPollConfigured(),
getMaxColumnsToMerge()
);
}

Expand All @@ -253,6 +260,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPole=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -99,7 +100,9 @@ public RabbitStreamSupervisorTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll)
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
super(
appendableIndexSpec,
Expand All @@ -124,7 +127,8 @@ public RabbitStreamSupervisorTuningConfig(
numPersistThreads,
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll
maxRecordsPerPoll,
maxColumnsToMerge
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -210,6 +214,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

Expand Down Expand Up @@ -239,7 +244,8 @@ public RabbitStreamIndexTaskTuningConfig convertToTaskTuningConfig()
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public void testtoString() throws Exception
"maxParseExceptions=0, " +
"maxSavedParseExceptions=0, " +
"numPersistThreads=1, " +
"maxRecordsPerPoll=null}";
"maxRecordsPerPoll=null, " +
"maxColumnsToMerge=-1}";


Assert.assertEquals(resStr, config.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public void setupTest()
null,
null,
null,
100);
100,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new StubServiceEmitter("RabbitStreamSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public KafkaIndexTaskTuningConfig(
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads
@Nullable Integer numPersistThreads,
@Nullable Integer maxColumnsToMerge
)
{
super(
Expand All @@ -76,7 +77,8 @@ public KafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
}

Expand All @@ -100,7 +102,8 @@ private KafkaIndexTaskTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
this(
Expand All @@ -123,7 +126,8 @@ private KafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
}

Expand All @@ -150,7 +154,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}

Expand All @@ -177,7 +182,8 @@ public String toString()
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
'}';
", getMaxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -93,7 +94,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
super(
Expand All @@ -116,7 +118,8 @@ public KafkaSupervisorTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -229,7 +232,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2861,6 +2861,7 @@ private KafkaIndexTask createTask(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertEquals(1, config.getNumPersistThreads());
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}

@Test
Expand Down Expand Up @@ -123,6 +124,7 @@ public void testSerdeWithNonDefaults() throws Exception
config.getIndexSpecForIntermediatePersists()
);
Assert.assertEquals(2, config.getNumPersistThreads());
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}

@Test
Expand Down Expand Up @@ -152,7 +154,8 @@ public void testConvert()
null,
null,
null,
2
2,
5
);
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();

Expand All @@ -168,6 +171,7 @@ public void testConvert()
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
Assert.assertEquals(2, copy.getNumPersistThreads());
Assert.assertEquals(5, copy.getMaxColumnsToMerge());
}

@Test
Expand All @@ -193,7 +197,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
true,
42,
42,
2
2,
-1
);

String serialized = mapper.writeValueAsString(base);
Expand All @@ -219,6 +224,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge());
}

@Test
Expand All @@ -244,6 +250,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
42,
42,
2,
-1,
"extra string"
);

Expand All @@ -269,6 +276,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null
null,
null
);

EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
Expand Down Expand Up @@ -497,6 +498,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException
null,
null,
null,
null,
null
),
null
Expand Down Expand Up @@ -4221,6 +4223,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4260,6 +4263,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -4413,6 +4417,7 @@ public void testSequenceNameDoesNotChangeWithTaskId()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4888,6 +4893,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
10,
null,
null
);

Expand Down Expand Up @@ -5002,6 +5008,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("extra") String extra
)
{
Expand All @@ -79,7 +80,8 @@ public TestModifiedKafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
this.extra = extra;
}
Expand Down
Loading

0 comments on commit 428f58c

Please sign in to comment.