Skip to content

Commit

Permalink
Cleanup UpsertCompaction validations during task-generation (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
tibrewalpratik17 authored Nov 12, 2024
1 parent 013e80f commit 0180a11
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
for (TableConfig tableConfig : tableConfigs) {
if (!validate(tableConfig)) {
LOGGER.warn("Validation failed for table {}. Skipping..", tableConfig.getTableName());
continue;
}

String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Start generating task configs for table: {}", tableNameWithType);
Expand Down Expand Up @@ -150,21 +146,6 @@ public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) {
taskConfigs.getOrDefault(UpsertCompactionTask.NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST,
String.valueOf(DEFAULT_NUM_SEGMENTS_BATCH_PER_SERVER_REQUEST)));

// Validate that the snapshot is enabled if validDocIdsType is validDocIdsSnapshot
if (validDocIdsType == ValidDocIdsType.SNAPSHOT) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
Preconditions.checkState(upsertConfig.isEnableSnapshot(), String.format(
"'enableSnapshot' from UpsertConfig must be enabled for UpsertCompactionTask with validDocIdsType = %s",
validDocIdsType));
} else if (validDocIdsType == ValidDocIdsType.IN_MEMORY_WITH_DELETE) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
Preconditions.checkNotNull(upsertConfig, "UpsertConfig must be provided for UpsertCompactionTask");
Preconditions.checkNotNull(upsertConfig.getDeleteRecordColumn(),
String.format("deleteRecordColumn must be provided for " + "UpsertCompactionTask with validDocIdsType = %s",
validDocIdsType));
}

Map<String, List<ValidDocIdsMetadataInfo>> validDocIdsMetadataList =
serverSegmentMetadataReader.getSegmentToValidDocIdsMetadataFromServer(tableNameWithType, serverToSegments,
serverToEndpoints, null, 60_000, validDocIdsType.toString(), numSegmentsBatchPerServerRequest);
Expand Down Expand Up @@ -288,23 +269,6 @@ public static int getMaxTasks(String taskType, String tableNameWithType, Map<Str
return maxTasks;
}

@VisibleForTesting
static boolean validate(TableConfig tableConfig) {
String taskType = MinionConstants.UpsertCompactionTask.TASK_TYPE;
String tableNameWithType = tableConfig.getTableName();
if (tableConfig.getTableType() == TableType.OFFLINE) {
LOGGER.warn("Skip generation task: {} for table: {}, offline table is not supported", taskType,
tableNameWithType);
return false;
}
if (!tableConfig.isUpsertEnabled()) {
LOGGER.warn("Skip generation task: {} for table: {}, table without upsert enabled is not supported", taskType,
tableNameWithType);
return false;
}
return true;
}

@Override
public void validateTaskConfigs(TableConfig tableConfig, Map<String, String> taskConfigs) {
// check table is realtime
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -98,21 +97,6 @@ public void setUp() {
_completedSegmentsMap.put(_completedSegment2.getSegmentName(), _completedSegment2);
}

@Test
public void testValidate() {
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
.build();
assertFalse(UpsertCompactionTaskGenerator.validate(tableConfig));

TableConfigBuilder tableConfigBuilder =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME);
assertFalse(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));

tableConfigBuilder = tableConfigBuilder.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL));
assertTrue(UpsertCompactionTaskGenerator.validate(tableConfigBuilder.build()));
}

@Test
public void testGenerateTasksValidatesTableConfigs() {
UpsertCompactionTaskGenerator taskGenerator = new UpsertCompactionTaskGenerator();
Expand Down

0 comments on commit 0180a11

Please sign in to comment.