From f5327a512ab90b6137c88c2fbaf0c517c8ac50b6 Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Sat, 7 Sep 2024 08:48:32 -0700 Subject: [PATCH] Acquire IdealStateUpdaterLock for PinotLLCRealtimeSegmentManager IdealState update (#13947) --- .../helix/core/PinotHelixResourceManager.java | 88 +++++++------- .../PinotLLCRealtimeSegmentManager.java | 111 ++++++++++-------- 2 files changed, 104 insertions(+), 95 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index f3a4819c6f70..f727ea5dbcf9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1980,11 +1980,13 @@ public void setExistingTableConfig(TableConfig tableConfig, int expectedVersion) IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); String replicationConfigured = Integer.toString(tableConfig.getReplication()); if (!idealState.getReplicas().equals(replicationConfigured)) { - HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { - assert is != null; - is.setReplicas(replicationConfigured); - return is; - }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + synchronized (getIdealStateUpdaterLock(tableNameWithType)) { + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, is -> { + assert is != null; + is.setReplicas(replicationConfigured); + return is; + }, RetryPolicies.exponentialBackoffRetryPolicy(5, 1000L, 1.2f)); + } } // Assign instances @@ -3967,48 +3969,48 @@ private boolean writeLineageEntryWithLock(TableConfig tableConfig, String lineag LineageEntry lineageEntryToUpdate, LineageEntry lineageEntryToMatch, ZkHelixPropertyStore propertyStore, LineageUpdateType lineageUpdateType, Map customMap) { String tableNameWithType = tableConfig.getTableName(); - synchronized (getLineageUpdaterLock(tableNameWithType)) { - // retry attempts are made to account for the distributed update from other controllers - for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) { - // Fetch the segment lineage - ZNRecord segmentLineageToUpdateZNRecord = - SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName()); - int expectedVersion = segmentLineageToUpdateZNRecord.getVersion(); - SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord); - LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId); - - // If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request. - if (!currentLineageEntry.equals(lineageEntryToMatch)) { - String errorMsg = String.format( - "Aborting the to update lineage entry since we find that the entry has been modified for table %s, " - + "entry id: %s", tableConfig.getTableName(), lineageEntryId); - LOGGER.error(errorMsg); - throw new RuntimeException(errorMsg); - } + synchronized (getLineageUpdaterLock(tableNameWithType)) { + // retry attempts are made to account for the distributed update from other controllers + for (int i = 0; i < DEFAULT_SEGMENT_LINEAGE_UPDATE_NUM_RETRY; i++) { + // Fetch the segment lineage + ZNRecord segmentLineageToUpdateZNRecord = + SegmentLineageAccessHelper.getSegmentLineageZNRecord(propertyStore, tableConfig.getTableName()); + int expectedVersion = segmentLineageToUpdateZNRecord.getVersion(); + SegmentLineage segmentLineageToUpdate = SegmentLineage.fromZNRecord(segmentLineageToUpdateZNRecord); + LineageEntry currentLineageEntry = segmentLineageToUpdate.getLineageEntry(lineageEntryId); + + // If the lineage entry doesn't match with the previously fetched lineage, we need to fail the request. + if (!currentLineageEntry.equals(lineageEntryToMatch)) { + String errorMsg = String.format( + "Aborting the to update lineage entry since we find that the entry has been modified for table %s, " + + "entry id: %s", tableConfig.getTableName(), lineageEntryId); + LOGGER.error(errorMsg); + throw new RuntimeException(errorMsg); + } - // Update lineage entry - segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate); - switch (lineageUpdateType) { - case END: - _lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, - segmentLineageToUpdate); - break; - case REVERT: - _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, - segmentLineageToUpdate); - break; - default: - String errorMsg = String.format("Aborting the lineage entry update with type: %s, as the allowed update" - + "types in this method are END and REVERT", lineageUpdateType); - throw new IllegalStateException(errorMsg); - } + // Update lineage entry + segmentLineageToUpdate.updateLineageEntry(lineageEntryId, lineageEntryToUpdate); + switch (lineageUpdateType) { + case END: + _lineageManager.updateLineageForEndReplaceSegments(tableConfig, lineageEntryId, customMap, + segmentLineageToUpdate); + break; + case REVERT: + _lineageManager.updateLineageForRevertReplaceSegments(tableConfig, lineageEntryId, customMap, + segmentLineageToUpdate); + break; + default: + String errorMsg = String.format("Aborting the lineage entry update with type: %s, as the allowed update" + + "types in this method are END and REVERT", lineageUpdateType); + throw new IllegalStateException(errorMsg); + } - // Write back to the lineage entry - if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) { - return true; - } + // Write back to the lineage entry + if (SegmentLineageAccessHelper.writeSegmentLineage(propertyStore, segmentLineageToUpdate, expectedVersion)) { + return true; } } + } return false; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 85c5cad0de1a..910291ff0062 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -816,23 +816,24 @@ public void segmentStoppedConsuming(LLCSegmentName llcSegmentName, String instan String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(llcSegmentName.getTableName()); String segmentName = llcSegmentName.getSegmentName(); LOGGER.info("Marking CONSUMING segment: {} OFFLINE on instance: {}", segmentName, instanceName); - - try { - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - Map stateMap = idealState.getInstanceStateMap(segmentName); - String state = stateMap.get(instanceName); - if (SegmentStateModel.CONSUMING.equals(state)) { - stateMap.put(instanceName, SegmentStateModel.OFFLINE); - } else { - LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, - instanceName); - } - return idealState; - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); - } catch (Exception e) { - _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); - throw e; + synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { + try { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + Map stateMap = idealState.getInstanceStateMap(segmentName); + String state = stateMap.get(instanceName); + if (SegmentStateModel.CONSUMING.equals(state)) { + stateMap.put(instanceName, SegmentStateModel.OFFLINE); + } else { + LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, + instanceName); + } + return idealState; + }, RetryPolicies.exponentialBackoffRetryPolicy(10, 500L, 1.2f), true); + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L); + throw e; + } } // We know that we have successfully set the idealstate to be OFFLINE. // We can now do a best effort to reset the externalview to be OFFLINE if it is in ERROR state. @@ -924,30 +925,33 @@ public void ensureAllPartitionsConsuming(TableConfig tableConfig, StreamConfig s Preconditions.checkState(!_isStopping, "Segment manager is stopping"); String realtimeTableName = tableConfig.getTableName(); - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { - assert idealState != null; - boolean isTableEnabled = idealState.isEnabled(); - boolean isTablePaused = isTablePaused(idealState); - boolean offsetsHaveToChange = offsetCriteria != null; - if (isTableEnabled && !isTablePaused) { - List currentPartitionGroupConsumptionStatusList = - offsetsHaveToChange - ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions - : getPartitionGroupConsumptionStatusList(idealState, streamConfig); - OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); - // Read the smallest offset when a new partition is detected - streamConfig.setOffsetCriteria(offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); - List newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); - streamConfig.setOffsetCriteria(originalOffsetCriteria); - return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, - recreateDeletedConsumingSegment, offsetCriteria); - } else { - LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", - realtimeTableName, isTableEnabled, isTablePaused); - return idealState; - } - }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); + synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { + HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + assert idealState != null; + boolean isTableEnabled = idealState.isEnabled(); + boolean isTablePaused = isTablePaused(idealState); + boolean offsetsHaveToChange = offsetCriteria != null; + if (isTableEnabled && !isTablePaused) { + List currentPartitionGroupConsumptionStatusList = + offsetsHaveToChange + ? Collections.emptyList() // offsets from metadata are not valid anymore; fetch for all partitions + : getPartitionGroupConsumptionStatusList(idealState, streamConfig); + OffsetCriteria originalOffsetCriteria = streamConfig.getOffsetCriteria(); + // Read the smallest offset when a new partition is detected + streamConfig.setOffsetCriteria( + offsetsHaveToChange ? offsetCriteria : OffsetCriteria.SMALLEST_OFFSET_CRITERIA); + List newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + streamConfig.setOffsetCriteria(originalOffsetCriteria); + return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList, + recreateDeletedConsumingSegment, offsetCriteria); + } else { + LOGGER.info("Skipping LLC segments validation for table: {}, isTableEnabled: {}, isTablePaused: {}", + realtimeTableName, isTableEnabled, isTablePaused); + return idealState; + } + }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f), true); + } } /** @@ -995,7 +999,7 @@ private static PauseState extractTablePauseState(IdealState idealState) { String pauseStateStr = idealState.getRecord().getSimpleField(PinotLLCRealtimeSegmentManager.PAUSE_STATE); try { if (pauseStateStr != null) { - return JsonUtils.stringToObject(pauseStateStr, PauseState.class); + return JsonUtils.stringToObject(pauseStateStr, PauseState.class); } } catch (JsonProcessingException e) { LOGGER.warn("Unable to parse the pause state from ideal state : {}", pauseStateStr); @@ -1503,9 +1507,9 @@ public void uploadToDeepStoreIfMissing(TableConfig tableConfig, List { - ZNRecord znRecord = idealState.getRecord(); - znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString()); - // maintain for backward compatibility - znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString()); - return new IdealState(znRecord); - }, RetryPolicies.noDelayRetryPolicy(3)); + IdealState updatedIdealState; + synchronized (_helixResourceManager.getIdealStateUpdaterLock(tableNameWithType)) { + updatedIdealState = HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { + ZNRecord znRecord = idealState.getRecord(); + znRecord.setSimpleField(PAUSE_STATE, pauseState.toJsonString()); + // maintain for backward compatibility + znRecord.setSimpleField(IS_TABLE_PAUSED, Boolean.valueOf(pause).toString()); + return new IdealState(znRecord); + }, RetryPolicies.noDelayRetryPolicy(3)); + } LOGGER.info("Set 'pauseState' to {} in the Ideal State for table {}. " + "Also set 'isTablePaused' to {} for backward compatibility.", pauseState, tableNameWithType, pause); return updatedIdealState;