Skip to content

Commit

Permalink
Add a check to enable size based threshold for realtime tables (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
soumitra-st authored Nov 18, 2023
1 parent 244c947 commit 6aecd41
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ public static void validate(TableConfig tableConfig, @Nullable Schema schema, @N
throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e);
}
validateDecoder(streamConfig);
// if segmentSizeBytes is specified, rows must be zero.
if (streamConfigMap.containsKey(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE)
|| streamConfigMap.containsKey(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE)) {
Preconditions.checkState(streamConfig.getFlushThresholdRows() == 0,
String.format("Invalid config: %s=%d, it must be set to 0 for size based segment threshold to work.",
StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, streamConfig.getFlushThresholdRows()));
}
}
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
Expand Down Expand Up @@ -694,6 +695,54 @@ public void ingestionStreamConfigsTest() {
} catch (IllegalStateException e) {
// expected
}

Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();

// When size based threshold is specified, default rows does not work, it has to be explicitly set to 0.
streamConfigs = getKafkaStreamConfigs();
streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
streamConfigs.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m");
streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
.setIngestionConfig(ingestionConfig).build();

try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("must be set to 0"));
}

// When size based threshold is specified, rows has to be set to 0.
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000");
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();

try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("must be set to 0"));
}

// When size based threshold is specified, rows has to be set to 0.
streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0");
ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();

try {
TableConfigUtils.validate(tableConfig, schema);
} catch (IllegalStateException e) {
Assert.fail(e.getMessage());
}
}

@Test
Expand Down

0 comments on commit 6aecd41

Please sign in to comment.