Skip to content

Commit

Permalink
Acquire IdealStateUpdaterLock for PinotLLCRealtimeSegmentManager Idea…
Browse files Browse the repository at this point in the history
…lState update (apache#13947)
  • Loading branch information
xiangfu0 authored Sep 7, 2024
1 parent edb1301 commit f5327a5
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1980,11 +1980,13 @@ public void setExistingTableConfig(TableConfig tableConfig, int expectedVersion)
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
String replicationConfigured = Integer.toString(tableConfig.getReplication());
if (!idealState.getReplicas().equals(replicationConfigured)) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> {
assert is != null;
is.setReplicas(replicationConfigured);
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
synchronized (getIdealStateUpdaterLock(tableNameWithType)) {
HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> {
assert is != null;
is.setReplicas(replicationConfigured);
return is;
}, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f));
}
}

// Assign instances
Expand Down Expand Up @@ -3967,48 +3969,48 @@ private boolean writeLineageEntryWithLock(TableConfig tableConfig, String lineag
LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore<ZNRecord> propertyStore,
LineageUpdateType lineageUpdateType, Map<String, String> customMap) {
String tableNameWithType = tableConfig.getTableName();
synchronized (getLineageUpdaterLock(tableNameWithType)) {
// retry attempts are made to account for the distributed update from other controllers
for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
// Fetch the segment lineage
ZNRecord segmentLineageToUpdateZNRecord =
SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName());
int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId);

// If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request.
if (!currentLineageEntry.equals(lineageEntryToMatch)) {
String errorMsg = String.format(
"Aborting the to update lineage entry since we find that the entry has been modified for table %s, "
+ "entry id: %s", tableConfig.getTableName(), lineageEntryId);
LOGGER.error(errorMsg);
throw new RuntimeException(errorMsg);
}
synchronized (getLineageUpdaterLock(tableNameWithType)) {
// retry attempts are made to account for the distributed update from other controllers
for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) {
// Fetch the segment lineage
ZNRecord segmentLineageToUpdateZNRecord =
SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName());
int expectedVersion = segmentLineageToUpdateZNRecord.getVersion();
SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord);
LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId);

// If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request.
if (!currentLineageEntry.equals(lineageEntryToMatch)) {
String errorMsg = String.format(
"Aborting the to update lineage entry since we find that the entry has been modified for table %s, "
+ "entry id: %s", tableConfig.getTableName(), lineageEntryId);
LOGGER.error(errorMsg);
throw new RuntimeException(errorMsg);
}

// Update lineage entry
segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate);
switch (lineageUpdateType) {
case END:
_lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap,
segmentLineageToUpdate);
break;
case REVERT:
_lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap,
segmentLineageToUpdate);
break;
default:
String errorMsg = String.format("Aborting the lineage entry update with type: %s, as the allowed update"
+ "types in this method are END and REVERT", lineageUpdateType);
throw new IllegalStateException(errorMsg);
}
// Update lineage entry
segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate);
switch (lineageUpdateType) {
case END:
_lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap,
segmentLineageToUpdate);
break;
case REVERT:
_lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap,
segmentLineageToUpdate);
break;
default:
String errorMsg = String.format("Aborting the lineage entry update with type: %s, as the allowed update"
+ "types in this method are END and REVERT", lineageUpdateType);
throw new IllegalStateException(errorMsg);
}

// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) {
return true;
}
// Write back to the lineage entry
if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) {
return true;
}
}
}
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,23 +816,24 @@ public void segmentStoppedConsuming(LLCSegmentName llcSegmentName, String instan
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName());
String segmentName = llcSegmentName.getSegmentName();
LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, instanceName);

try {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
String state = stateMap.get(instanceName);
if (SegmentStateModel.CONSUMING.equals(state)) {
stateMap.put(instanceName, SegmentStateModel.OFFLINE);
} else {
LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state,
instanceName);
}
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
throw e;
synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
try {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName);
String state = stateMap.get(instanceName);
if (SegmentStateModel.CONSUMING.equals(state)) {
stateMap.put(instanceName, SegmentStateModel.OFFLINE);
} else {
LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state,
instanceName);
}
return idealState;
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true);
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
throw e;
}
}
// We know that we have successfully set the idealstate to be OFFLINE.
// We can now do a best effort to reset the externalview to be OFFLINE if it is in ERROR state.
Expand Down Expand Up @@ -924,30 +925,33 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s
Preconditions.checkState(!_isStopping, "Segment manager is stopping");

String realtimeTableName = tableConfig.getTableName();
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
boolean isTableEnabled = idealState.isEnabled();
boolean isTablePaused = isTablePaused(idealState);
boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
offsetsHaveToChange
? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
: getPartitionGroupConsumptionStatusList(idealState, streamConfig);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
// Read the smallest offset when a new partition is detected
streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment, offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) {
HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> {
assert idealState != null;
boolean isTableEnabled = idealState.isEnabled();
boolean isTablePaused = isTablePaused(idealState);
boolean offsetsHaveToChange = offsetCriteria != null;
if (isTableEnabled && !isTablePaused) {
List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList =
offsetsHaveToChange
? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions
: getPartitionGroupConsumptionStatusList(idealState, streamConfig);
OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria();
// Read the smallest offset when a new partition is detected
streamConfig.setOffsetCriteria(
offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA);
List<PartitionGroupMetadata> newPartitionGroupMetadataList =
getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList,
recreateDeletedConsumingSegment, offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}",
realtimeTableName, isTableEnabled, isTablePaused);
return idealState;
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true);
}
}

/**
Expand Down Expand Up @@ -995,7 +999,7 @@ private static PauseState extractTablePauseState(IdealState idealState) {
String pauseStateStr = idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE);
try {
if (pauseStateStr != null) {
return JsonUtils.stringToObject(pauseStateStr, PauseState.class);
return JsonUtils.stringToObject(pauseStateStr, PauseState.class);
}
} catch (JsonProcessingException e) {
LOGGER.warn("Unable to parse the pause state from ideal state : {}", pauseStateStr);
Expand Down Expand Up @@ -1503,9 +1507,9 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List<SegmentZKMe

// Use this retention value to avoid the data racing between segment upload and retention management.
long retentionMs = TimeUnit.valueOf(validationConfig.getRetentionTimeUnit().toUpperCase())
.toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
.toMillis(Long.parseLong(validationConfig.getRetentionTimeValue()));
RetentionStrategy retentionStrategy = new TimeRetentionStrategy(TimeUnit.MILLISECONDS,
retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);

PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme());

Expand Down Expand Up @@ -1725,7 +1729,7 @@ public PauseStatusDetails pauseConsumption(String tableNameWithType, PauseState.
sendForceCommitMessageToServers(tableNameWithType, consumingSegments);
return new PauseStatusDetails(true, consumingSegments, reasonCode, comment != null ? comment
: "Pause flag is set. Consuming segments are being committed."
+ " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.",
+ " Use /pauseStatus endpoint in a few moments to check if all consuming segments have been committed.",
new Timestamp(System.currentTimeMillis()).toString());
}

Expand Down Expand Up @@ -1756,13 +1760,16 @@ private IdealState updatePauseStateInIdealState(String tableNameWithType, boolea
PauseState.ReasonCode reasonCode, @Nullable String comment) {
PauseState pauseState = new PauseState(pause, reasonCode, comment,
new Timestamp(System.currentTimeMillis()).toString());
IdealState updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
// maintain for backward compatibility
znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(3));
IdealState updatedIdealState;
synchronized (_helixResourceManager.getIdealStateUpdaterLock(tableNameWithType)) {
updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> {
ZNRecord znRecord = idealState.getRecord();
znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString());
// maintain for backward compatibility
znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString());
return new IdealState(znRecord);
}, RetryPolicies.noDelayRetryPolicy(3));
}
LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. "
+ "Also set 'isTablePaused' to {} for backward compatibility.", pauseState, tableNameWithType, pause);
return updatedIdealState;
Expand Down

0 comments on commit f5327a5

Please sign in to comment.