Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add supporting APIs for evacuation #2618

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions helix-core/src/main/java/org/apache/helix/HelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -738,4 +738,15 @@ Map<String, Boolean> validateResourcesForWagedRebalance(String clusterName,
*/
Map<String, Boolean> validateInstancesForWagedRebalance(String clusterName,
List<String> instancesNames);

/**
* Return if instance operation 'Evacuate' is finished.
* Only return true if there is no current state on the instance and that instance is still alive.
* @param clusterName
* @param instancesNames
* @return
*/
boolean isEvacuateFinished(String clusterName, String instancesNames);

boolean isPrepopulateReady(String clusterName, String instancesNames);
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyPathBuilder;
Expand Down Expand Up @@ -406,7 +407,41 @@ public ZNRecord update(ZNRecord currentData) {
}
}


@Override
public boolean isEvacuateFinished(String clusterName, String instanceName) {
HelixDataAccessor accessor =
new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient));
PropertyKey.Builder keyBuilder = accessor.keyBuilder();

// check the instance is alive
LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
logger.warn("instance {} in cluster {} is not alive.", instanceName, clusterName);
return false;
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}

BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient);
String sessionId = liveInstance.getEphemeralOwner();

String path = PropertyPathBuilder.instanceCurrentState(clusterName, instanceName, sessionId);
List<String> currentStates = baseAccessor.getChildNames(path, 0);
if (currentStates == null) {
logger.warn("instance {} in cluster {} does not have live session.", instanceName,
clusterName);
return false;
}
return currentStates.isEmpty();
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean isPrepopulateReady(String clusterName, String instanceName) {
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
return this.getInstanceConfig(clusterName, instanceName)
.getInstanceOperation()
.equals(InstanceConstants.InstanceOperation.EVACUATE.name());
junkaixue marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public void enableResource(final String clusterName, final String resourceName,
final boolean enabled) {
logger.info("{} resource {} in cluster {}.", enabled ? "Enable" : "Disable", resourceName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
Expand All @@ -22,6 +23,7 @@
import org.apache.helix.controller.rebalancer.waged.AssignmentMetadataStore;
import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBucketDataAccessor;
import org.apache.helix.model.BuiltInStateModelDefinitions;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class TestInstanceOperation extends ZkTestBase {
private ZkHelixClusterVerifier _clusterVerifier;
private ConfigAccessor _configAccessor;
private long _stateModelDelay = 3L;

private HelixAdmin _admin;
protected AssignmentMetadataStore _assignmentMetadataStore;
HelixDataAccessor _dataAccessor;

Expand Down Expand Up @@ -91,6 +95,8 @@ public void beforeClass() throws Exception {
createTestDBs(200);

setUpWagedBaseline();

_admin = new ZKHelixAdmin(_gZkClient);
}

@Test
Expand Down Expand Up @@ -119,6 +125,9 @@ public void testEvacuate() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));
Assert.assertTrue(_admin.isPrepopulateReady(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testEvacuate")
Expand Down Expand Up @@ -215,6 +224,7 @@ public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(participant))).isEmpty()), 30000);
}
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// sleep a bit so ST messages can start executing
Thread.sleep(Math.abs(_stateModelDelay / 100));
Expand Down Expand Up @@ -261,6 +271,7 @@ public void testEvacuateAndCancelBeforeDropFinish() throws Exception {
// message should be pending at the to evacuate participant
TestHelper.verify(
() -> ((_dataAccessor.getChildNames(_dataAccessor.keyBuilder().messages(instanceToEvacuate))).isEmpty()), 30000);
Assert.assertFalse(_admin.isEvacuateFinished(CLUSTER_NAME, instanceToEvacuate));

// cancel evacuation
_gSetupTool.getClusterManagementTool()
Expand Down Expand Up @@ -323,7 +334,7 @@ public void testMarkEvacuationAfterEMM() throws Exception {
Assert.assertFalse(newPAssignedParticipants.contains(instanceToEvacuate));
Assert.assertTrue(newPAssignedParticipants.containsAll(currentActiveInstances));
}

Assert.assertTrue(_admin.isPrepopulateReady(CLUSTER_NAME, instanceToEvacuate));
}

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
Expand Down
10 changes: 10 additions & 0 deletions helix-core/src/test/java/org/apache/helix/mock/MockHelixAdmin.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,14 @@ public Map<String, Boolean> validateInstancesForWagedRebalance(String clusterNam
List<String> instancesNames) {
return null;
}

@Override
public boolean isEvacuateFinished(String clusterName, String instancesNames) {
return false;
}

@Override
public boolean isPrepopulateReady(String clusterName, String instancesNames) {
return false;
}
}