Skip to content

Commit

Permalink
Fix endless creation of best possible nodes triggered by unintended m…
Browse files Browse the repository at this point in the history
…odification of cached best possible map. (#2970)

The baseline and bestPossible maps are type Map<String, ResourceAssignment>. When this map is stored in the cache by persistBaseline() and persistBestPossibleAssignment(), the ResourceAssignment objects can still be accessed and modified after calling the relevant persist method. This change creates a deep copy of the assignment map with new ResourceAssignment objects to prevent unintended modification of the cached assignment after the call to persist.
  • Loading branch information
GrantPSpencer authored Jan 28, 2025
1 parent 892fc27 commit 7f2d30f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;

import java.util.stream.Collectors;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
Expand Down Expand Up @@ -133,11 +134,15 @@ private void persistAssignmentToMetadataStore(Map<String, ResourceAssignment> ne
* @param globalBaseline
*/
public synchronized void persistBaseline(Map<String, ResourceAssignment> globalBaseline) {
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
Map<String, ResourceAssignment> baselineCopy = globalBaseline.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
// write to metadata store
persistAssignmentToMetadataStore(globalBaseline, _baselinePath, BASELINE_KEY);
persistAssignmentToMetadataStore(baselineCopy, _baselinePath, BASELINE_KEY);
// write to memory
getBaseline().clear();
getBaseline().putAll(globalBaseline);
getBaseline().putAll(baselineCopy);
}

/**
Expand All @@ -146,11 +151,15 @@ public synchronized void persistBaseline(Map<String, ResourceAssignment> globalB
* @param bestPossibleAssignment
*/
public synchronized void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment) {
// Create defensive copy so that the in-memory assignment is not modified after it is persisted
Map<String, ResourceAssignment> bestPossibleAssignmentCopy = bestPossibleAssignment.entrySet().stream().collect(
Collectors.toMap(Map.Entry::getKey,
entry -> new ResourceAssignment(entry.getValue().getRecord())));
// write to metadata store
persistAssignmentToMetadataStore(bestPossibleAssignment, _bestPossiblePath, BEST_POSSIBLE_KEY);
persistAssignmentToMetadataStore(bestPossibleAssignmentCopy, _bestPossiblePath, BEST_POSSIBLE_KEY);
// write to memory
getBestPossibleAssignment().clear();
getBestPossibleAssignment().putAll(bestPossibleAssignment);
getBestPossibleAssignment().putAll(bestPossibleAssignmentCopy);
_bestPossibleVersion++;
_lastPersistedBestPossibleVersion = _bestPossibleVersion;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package org.apache.helix.integration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class TestEndlessBestPossibleNodes extends ZkTestBase {

public static String CLUSTER_NAME = TestHelper.getTestClassName() + "_cluster";
public static int PARTICIPANT_COUNT = 12;
public static List<MockParticipantManager> _participants = new ArrayList<>();
public static ClusterControllerManager _controller;
public static ConfigAccessor _configAccessor;

@BeforeClass
public void beforeClass() {
System.out.println("Start test " + TestHelper.getTestClassName());
_gSetupTool.addCluster(CLUSTER_NAME, true);
for (int i = 0; i < PARTICIPANT_COUNT; i++) {
addParticipant("localhost_" + i);
}

String controllerName = CONTROLLER_PREFIX + "_0";
_controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
_controller.syncStart();

_configAccessor = new ConfigAccessor(_gZkClient);
ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.setInstanceCapacityKeys(Collections.singletonList("partcount"));
clusterConfig.setDefaultInstanceCapacityMap(Collections.singletonMap("partcount", 10000));
clusterConfig.setDefaultPartitionWeightMap(Collections.singletonMap("partcount", 1));
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(57600000);
clusterConfig.setPersistBestPossibleAssignment(true);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
}

// This test was constructed to capture the bug described in issue 2971
// https://github.com/apache/helix/issues/2971
@Test
public void testEndlessBestPossibleNodes() throws Exception {
int numPartition = 10;
BestPossibleExternalViewVerifier verifier =
new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR)
.setWaitTillVerify(TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();

// Create 1 WAGED Resource
String firstDB = "InPlaceMigrationTestDB3";
_gSetupTool.addResourceToCluster(CLUSTER_NAME, firstDB, numPartition, "LeaderStandby",
IdealState.RebalanceMode.FULL_AUTO.name(), null);
IdealState idealStateOne =
_gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, firstDB);
idealStateOne.setMinActiveReplicas(2);
idealStateOne.setRebalancerClassName(WagedRebalancer.class.getName());
_gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, firstDB, idealStateOne);
_gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, firstDB, 3);
Assert.assertTrue(verifier.verifyByPolling());

// Disable instances so delay rebalance overwrite is required due to min active
for (int i = 0; i < PARTICIPANT_COUNT/2; i++) {
_gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, _participants.get(i).getInstanceName(), false);
}

// Add new instance to cause partial rebalance to calculate a new best possible
addParticipant("newInstance_0");

// Sleep to let pipeline run and ZK writes occur
Thread.sleep(5000);

// There should only be 2 best possibles created (children will be 0, 1, LAST_WRITE, and LAST_SUCCESSFUL_WRITE)
int childCount = _gZkClient.getChildren("/" + CLUSTER_NAME + "/ASSIGNMENT_METADATA/BEST_POSSIBLE").size();
Assert.assertTrue(childCount > 0);
Assert.assertTrue(childCount < 5, "Child count was " + childCount);
}

public MockParticipantManager addParticipant(String instanceName) {
_gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
MockParticipantManager participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
participant.syncStart();
_participants.add(participant);
return participant;
}
}

0 comments on commit 7f2d30f

Please sign in to comment.