Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RealmAware client change for rest cache - support recursive persist listener #2660

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
Expand Down Expand Up @@ -395,7 +396,8 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
Map<String, ResourceAssignment> currentResourceAssignment,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
// TODO: this is a hacky way to filter our on operation instance. We should consider redesign `getEnabledLiveInstances()`.
final Set<String> enabledLiveInstances = filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
return currentResourceAssignment;
Expand Down Expand Up @@ -424,6 +426,14 @@ private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
}
}

private static Set<String> filterOutOnOperationInstances(Map<String, InstanceConfig> instanceConfigMap,
Set<String> nodes) {
return nodes.stream()
.filter(
instance -> !DelayedAutoRebalancer.INSTANCE_OPERATION_TO_EXCLUDE_FROM_ASSIGNMENT.contains(instanceConfigMap.get(instance).getInstanceOperation()))
.collect(Collectors.toSet());
}

/**
* Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
* and addressing for partitions failing to meet minActiveReplicas.
Expand Down Expand Up @@ -608,7 +618,8 @@ protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clust
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
Set<String> enabledLiveInstances =
filterOutOnOperationInstances(clusterData.getInstanceConfigMap(), clusterData.getEnabledLiveInstances());
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.TestHelper;
import org.apache.helix.common.ZkTestBase;
import org.apache.helix.constants.InstanceConstants;
Expand Down Expand Up @@ -89,9 +90,11 @@ public void beforeClass() throws Exception {

ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
clusterConfig.stateTransitionCancelEnabled(true);
clusterConfig.setDelayRebalaceEnabled(true);
clusterConfig.setRebalanceDelayTime(1800000L);
_configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);

createTestDBs(200);
createTestDBs(1800000L);

setUpWagedBaseline();

Expand Down Expand Up @@ -199,7 +202,7 @@ public void testAddingNodeWithEvacuationTag() throws Exception {
public void testEvacuateAndCancelBeforeBootstrapFinish() throws Exception {
// add a resource where downward state transition is slow
createResourceWithDelayedRebalance(CLUSTER_NAME, "TEST_DB3_DELAYED_CRUSHED", "MasterSlave", PARTITIONS, REPLICA,
REPLICA - 1, 200, CrushEdRebalanceStrategy.class.getName());
REPLICA - 1, 200000, CrushEdRebalanceStrategy.class.getName());
_allDBs.add("TEST_DB3_DELAYED_CRUSHED");
// add a resource where downward state transition is slow
createResourceWithWagedRebalance(CLUSTER_NAME, "TEST_DB4_DELAYED_WAGED", "MasterSlave",
Expand Down Expand Up @@ -338,21 +341,38 @@ public void testMarkEvacuationAfterEMM() throws Exception {

@Test(dependsOnMethods = "testMarkEvacuationAfterEMM")
public void testEvacuationWithOfflineInstancesInCluster() throws Exception {
_participants.get(1).syncStop();
_participants.get(2).syncStop();
_participants.get(3).syncStop();
// wait for converge, and set evacuate on instance 0
Assert.assertTrue(_clusterVerifier.verifyByPolling());

String evacuateInstanceName = _participants.get(0).getInstanceName();
String evacuateInstanceName = _participants.get(_participants.size()-2).getInstanceName();
_gSetupTool.getClusterManagementTool()
.setInstanceOperation(CLUSTER_NAME, evacuateInstanceName, InstanceConstants.InstanceOperation.EVACUATE);

Map<String, IdealState> assignment;
List<String> currentActiveInstances =
_participantNames.stream().filter(n -> (!n.equals(evacuateInstanceName) && !n.equals(_participants.get(3).getInstanceName()))).collect(Collectors.toList());
TestHelper.verify( ()-> {return verifyIS(evacuateInstanceName);}, TestHelper.WAIT_DURATION);
Map<String, ExternalView> assignment;
// EV should contain all participants, check resources one by one
assignment = getEV();
for (String resource : _allDBs) {
ExternalView ev = assignment.get(resource);
for (String partition : ev.getPartitionSet()) {
AtomicInteger activeReplicaCount = new AtomicInteger();
ev.getStateMap(partition)
.values()
.stream()
.filter(
v -> v.equals("MASTER") || v.equals("LEADER") || v.equals("SLAVE") || v.equals("FOLLOWER") || v.equals(
"STANDBY"))
.forEach(v -> activeReplicaCount.getAndIncrement());
Assert.assertTrue(activeReplicaCount.get() >= REPLICA-1);
Assert.assertFalse(ev.getStateMap(partition).containsKey(evacuateInstanceName) && ev.getStateMap(partition)
.get(evacuateInstanceName)
.equals("MASTER") && ev.getStateMap(partition)
.get(evacuateInstanceName)
.equals("LEADER"));

}
}

_participants.get(3).syncStart();
_participants.get(1).syncStart();
_participants.get(2).syncStart();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat
// Create ZkBucketDataAccessor for ReadOnlyWagedRebalancer.
private volatile ZkBucketDataAccessor _zkBucketDataAccessor;


/**
* Multi-ZK support
*/
Expand Down Expand Up @@ -292,6 +293,7 @@ public ZkBucketDataAccessor getZkBucketDataAccessor() {
return _zkBucketDataAccessor;
}


public void close() {
if (_zkClient != null) {
_zkClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
Expand Down Expand Up @@ -164,6 +165,18 @@ void subscribeStateChanges(
void unsubscribeStateChanges(
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener);

default void subscribePersistRecursiveListener(String path,
RecursivePersistListener recursivePersistListener) {
throw new UnsupportedOperationException(
"subscribePersistRecursiveListener() is not supported!");
}

default void unsubscribePersistRecursiveListener(String path,
RecursivePersistListener recursivePersistListener) {
throw new UnsupportedOperationException(
"subscribePersistRecursiveListener() is not supported!");
}

void unsubscribeAll();

// data access
Expand Down Expand Up @@ -564,6 +577,8 @@ class RealmAwareZkClientConfig {
protected String _monitorInstanceName = null;
protected boolean _monitorRootPathOnly = true;

protected boolean _usePersistWatcher = false;

public RealmAwareZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
this._zkSerializer = zkSerializer;
return this;
Expand Down Expand Up @@ -619,6 +634,11 @@ public RealmAwareZkClientConfig setConnectInitTimeout(long _connectInitTimeout)
return this;
}

public RealmAwareZkClientConfig setUsePersistWatcher(boolean usePersistWatcher) {
this._usePersistWatcher = usePersistWatcher;
return this;
}

public PathBasedZkSerializer getZkSerializer() {
if (_zkSerializer == null) {
_zkSerializer = new BasicZkSerializer(new SerializableSerializer());
Expand Down Expand Up @@ -650,6 +670,10 @@ public long getConnectInitTimeout() {
return _connectInitTimeout;
}

public boolean isUsePersistWatcher() {
return _usePersistWatcher;
}

/**
* Create HelixZkClient.ZkClientConfig based on RealmAwareZkClientConfig.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkConnection;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener;
Expand Down Expand Up @@ -104,10 +105,11 @@ public DedicatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connect
new ZkConnection(zkRealmAddress, connectionConfig.getSessionTimeout());

// Create a ZkClient
_rawZkClient = new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(),
clientConfig.getOperationRetryTimeout(), clientConfig.getZkSerializer(),
clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly());
_rawZkClient =
new ZkClient(zkConnection, (int) clientConfig.getConnectInitTimeout(), clientConfig.getOperationRetryTimeout(),
clientConfig.getZkSerializer(), clientConfig.getMonitorType(), clientConfig.getMonitorKey(),
clientConfig.getMonitorInstanceName(), clientConfig.isMonitorRootPathOnly(), true,
clientConfig.isUsePersistWatcher());
}

@Override
Expand Down Expand Up @@ -156,6 +158,16 @@ public void unsubscribeStateChanges(IZkStateListener listener) {
_rawZkClient.unsubscribeStateChanges(listener);
}

@Override
public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {
_rawZkClient.subscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {
_rawZkClient.unsubscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribeAll() {
_rawZkClient.unsubscribeAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
Expand Down Expand Up @@ -158,6 +159,16 @@ public void unsubscribeStateChanges(
throwUnsupportedOperationException();
}

@Override
public void subscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {
getZkClient(path).subscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribePersistRecursiveListener(String path, RecursivePersistListener recursivePersistListener) {
getZkClient(path).unsubscribePersistRecursiveListener(path, recursivePersistListener);
}

@Override
public void unsubscribeAll() {
_zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
Expand Down Expand Up @@ -700,9 +711,9 @@ private String updateRoutingDataOnCacheMiss(String path) throws InvalidRoutingDa
private ZkClient createZkClient(String zkAddress) {
LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(),
_clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
_clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
_clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly());
_clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer, _clientConfig.getMonitorType(),
_clientConfig.getMonitorKey(), _clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly(),
true, _clientConfig.isUsePersistWatcher());
}

private void checkClosedState() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.exception.ZkClientException;
import org.apache.helix.zookeeper.zkclient.IZkConnection;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
Expand All @@ -39,7 +41,9 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.RecursivePersistListener;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.testng.Assert;
Expand Down Expand Up @@ -648,6 +652,68 @@ public void testClose() {
}
}

@Test(dependsOnMethods = "testClose")
public void testFederatedZkClientWithPersistListener() throws InvalidRoutingDataException, InterruptedException {
RealmAwareZkClient realmAwareZkClient =
new FederatedZkClient(new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(),
new RealmAwareZkClient.RealmAwareZkClientConfig().setUsePersistWatcher(true));
int count = 100;
final AtomicInteger[] event_count = {new AtomicInteger(0)};
final AtomicInteger[] event_count2 = {new AtomicInteger(0)};
// for each iteration, we will edit a node, create a child, create a grand child, and
// delete child. Expect 4 event per iteration. -> total event should be count*4
CountDownLatch countDownLatch1 = new CountDownLatch(count * 4);
CountDownLatch countDownLatch2 = new CountDownLatch(count);
realmAwareZkClient.createPersistent(TEST_VALID_PATH, true);
String path = TEST_VALID_PATH + "/testFederatedZkClientWithPersistListener";
RecursivePersistListener rcListener = new RecursivePersistListener() {
@Override
public void handleZNodeChange(String dataPath, Watcher.Event.EventType eventType) throws Exception {
countDownLatch1.countDown();
event_count[0].incrementAndGet();
}
};
realmAwareZkClient.create(path, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.subscribePersistRecursiveListener(path, rcListener);
for (int i = 0; i < count; ++i) {
realmAwareZkClient.writeData(path, "data7" + i, -1);
realmAwareZkClient.create(path + "/c1_" + i, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.create(path + "/c1_" + i + "/c2", "datat", CreateMode.PERSISTENT);
realmAwareZkClient.delete(path + "/c1_" + i + "/c2");
}
Assert.assertTrue(countDownLatch1.await(50000000, TimeUnit.MILLISECONDS));

// subscribe a persist child watch, it should throw exception
IZkChildListener childListener2 = new IZkChildListener() {
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
countDownLatch2.countDown();
event_count2[0].incrementAndGet();
}
};
try {
realmAwareZkClient.subscribeChildChanges(path, childListener2, false);
} catch (Exception ex) {
Assert.assertEquals(ex.getClass().getName(), "java.lang.UnsupportedOperationException");
}

// unsubscribe recursive persist watcher, and subscribe persist watcher should success.
realmAwareZkClient.unsubscribePersistRecursiveListener(path, rcListener);
realmAwareZkClient.subscribeChildChanges(path, childListener2, false);
// we should only get 100 event since only 100 direct child change.
for (int i = 0; i < count; ++i) {
realmAwareZkClient.writeData(path, "data7" + i, -1);
realmAwareZkClient.create(path + "/c2_" + i, "datat", CreateMode.PERSISTENT);
realmAwareZkClient.create(path + "/c2_" + i + "/c3", "datat", CreateMode.PERSISTENT);
realmAwareZkClient.delete(path + "/c2_" + i + "/c3");
}
Assert.assertTrue(countDownLatch2.await(50000000, TimeUnit.MILLISECONDS));

realmAwareZkClient.deleteRecursively(TEST_VALID_PATH);
realmAwareZkClient.close();
}


@Override
public void testMultiSetup() throws InvalidRoutingDataException {
super.testMultiSetup();
Expand Down