Skip to content

Commit

Permalink
Add support to persist all instance information collected by CloudIns…
Browse files Browse the repository at this point in the history
…tanceInformationProcessor in CloudInstanceInformation object. Add ability for CloudInstanceInformationProcessor to produce full DOMAIN field instead of appending _instanceName unless last character in CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN is '='.
  • Loading branch information
zpinto committed Sep 20, 2023
1 parent 0922e1f commit a205a46
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.apache.helix.api.cloud;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.ImmutableMap;

public interface CloudInstanceInformationV2 extends CloudInstanceInformation {
/**
* Get all the key value pairs of a specific cloud instance
* @return A map of all the key value pairs
*/
ImmutableMap<String, String> getAll();
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.helix.PropertyKey;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationProcessor;
import org.apache.helix.api.cloud.CloudInstanceInformationV2;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
Expand Down Expand Up @@ -208,25 +209,33 @@ private void joinCluster() {
InstanceConfig instanceConfig;
if (!ZKUtil.isInstanceSetup(_zkclient, _clusterName, _instanceName, _instanceType)) {
if (!autoJoin) {
throw new HelixException("Initial cluster structure is not set up for instance: "
+ _instanceName + ", instanceType: " + _instanceType);
throw new HelixException(
"Initial cluster structure is not set up for instance: " + _instanceName
+ ", instanceType: " + _instanceType);
}

InstanceConfig.Builder instanceConfigBuilder =
_helixManagerProperty.getDefaultInstanceConfigBuilder();
if (!autoRegistration) {
LOG.info(_instanceName + " is auto-joining cluster: " + _clusterName);
instanceConfig =
_helixManagerProperty.getDefaultInstanceConfigBuilder().build(_instanceName);
instanceConfig = instanceConfigBuilder.build(_instanceName);
} else {
LOG.info(_instanceName + " is auto-registering cluster: " + _clusterName);
CloudInstanceInformation cloudInstanceInformation = getCloudInstanceInformation();
String domain = cloudInstanceInformation.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()) + _instanceName;
instanceConfig =
_helixManagerProperty.getDefaultInstanceConfigBuilder().build(_instanceName);
instanceConfig.setDomain(domain);
if (cloudInstanceInformation instanceof CloudInstanceInformationV2) {
CloudInstanceInformationV2 cloudInstanceInformationV2 =
(CloudInstanceInformationV2) cloudInstanceInformation;
cloudInstanceInformationV2.getAll().forEach(instanceConfigBuilder::addInstanceInfo);
}

String cloudInstanceInformationFaultDomain = cloudInstanceInformation.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name());
instanceConfig = instanceConfigBuilder.setDomain(
cloudInstanceInformationFaultDomain.endsWith("=") ? cloudInstanceInformationFaultDomain
+ _instanceName : cloudInstanceInformationFaultDomain).build(_instanceName);
}
instanceConfig
.validateTopologySettingInInstanceConfig(_configAccessor.getClusterConfig(_clusterName),
_instanceName);
instanceConfig.validateTopologySettingInInstanceConfig(
_configAccessor.getClusterConfig(_clusterName), _instanceName);
_helixAdmin.addInstance(_clusterName, instanceConfig);
} else {
_configAccessor.getInstanceConfig(_clusterName, _instanceName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public enum InstanceConfigProperty {
DOMAIN,
DELAY_REBALANCE_ENABLED,
MAX_CONCURRENT_TASK,
INSTANCE_INFO_MAP,
INSTANCE_CAPACITY_MAP,
TARGET_TASK_THREAD_POOL_SIZE,
INSTANCE_OPERATION
Expand Down Expand Up @@ -607,6 +608,29 @@ public void setTargetTaskThreadPoolSize(int targetTaskThreadPoolSize)
targetTaskThreadPoolSize);
}

/**
* Get the instance information map from the map fields.
* @return data map if it exists, or empty map
*/
public Map<String, String> getInstanceInfoMap() {
Map<String, String> instanceInfoMap =
_record.getMapField(InstanceConfigProperty.INSTANCE_INFO_MAP.name());
return instanceInfoMap != null ? instanceInfoMap : Collections.emptyMap();
}

/**
* Set instanceInfoMap to map of information about the instance that can be used
* to construct the DOMAIN field.
* @param instanceInfoMap Map of information about the instance. ie: { 'rack': 'rack-1', 'host': 'host-1' }
*/
private void setInstanceInfoMap(Map<String, String> instanceInfoMap) {
if (instanceInfoMap == null) {
_record.getMapFields().remove(InstanceConfigProperty.INSTANCE_INFO_MAP.name());
} else {
_record.setMapField(InstanceConfigProperty.INSTANCE_INFO_MAP.name(), instanceInfoMap);
}
}

/**
* Get the instance capacity information from the map fields.
* @return data map if it exists, or empty map
Expand Down Expand Up @@ -748,6 +772,7 @@ public static class Builder {
private int _weight = WEIGHT_NOT_SET;
private List<String> _tags = new ArrayList<>();
private boolean _instanceEnabled = HELIX_ENABLED_DEFAULT_VALUE;
private Map<String, String> _instanceInfoMap;
private Map<String, Integer> _instanceCapacityMap;

/**
Expand Down Expand Up @@ -794,6 +819,10 @@ public InstanceConfig build(String instanceId) {
instanceConfig.setInstanceEnabled(_instanceEnabled);
}

if (_instanceInfoMap != null) {
instanceConfig.setInstanceInfoMap(_instanceInfoMap);
}

if (_instanceCapacityMap != null) {
instanceConfig.setInstanceCapacityMap(_instanceCapacityMap);
}
Expand Down Expand Up @@ -861,6 +890,31 @@ public Builder setInstanceEnabled(boolean instanceEnabled) {
return this;
}

/**
* Set the INSTANCE_INFO_MAP for this instance
* @param instanceInfoMap the instance info map
* @return InstanceConfig.Builder
*/
public Builder setInstanceInfoMap(Map<String, String> instanceInfoMap) {
_instanceInfoMap = instanceInfoMap;
return this;
}

/**
* Add instance info to the INSTANCE_INFO_MAP.
* Only adds if the key does not already exist.
* @param key the key for the information
* @param value the value the information
* @return InstanceConfig.Builder
*/
public Builder addInstanceInfo(String key, String value) {
if (_instanceInfoMap == null) {
_instanceInfoMap = new HashMap<>();
}
_instanceInfoMap.putIfAbsent(key, value);
return this;
}

/**
* Set the capacity map for this instance
* @param instanceCapacityMap the capacity map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@
* under the License.
*/

import java.util.Map;

import com.google.common.collect.ImmutableMap;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.api.cloud.CloudInstanceInformationV2;

/**
* This is a custom implementation of CloudInstanceInformation. It is used to test the functionality
* of Helix node auto-registration.
*/
public class CustomCloudInstanceInformation implements CloudInstanceInformation {
private final String _faultDomain;
public class CustomCloudInstanceInformation implements CloudInstanceInformationV2 {

public static final Map<String, String> _cloudInstanceInfo =
ImmutableMap.of(CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name(),
"mz=0, host=localhost, container=containerId", "MAINTENANCE_ZONE", "0", "INSTANCE_NAME",
"localhost_something");
;

public CustomCloudInstanceInformation(String faultDomain) {
_faultDomain = faultDomain;
public CustomCloudInstanceInformation() {
}

@Override
public String get(String key) {
if (key.equals(CloudInstanceField.FAULT_DOMAIN.name())) {
return _faultDomain;
}
return null;
return _cloudInstanceInfo.get(key);
}

@Override
public Map<String, String> getAll() {
return _cloudInstanceInfo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* It is used to test the functionality of Helix node auto-registration.
*/
public class CustomCloudInstanceInformationProcessor implements CloudInstanceInformationProcessor<String> {

public CustomCloudInstanceInformationProcessor(HelixCloudProperty helixCloudProperty) {
}

Expand All @@ -41,6 +42,6 @@ public List<String> fetchCloudInstanceInformation() {

@Override
public CloudInstanceInformation parseCloudInstanceInformation(List<String> responses) {
return new CustomCloudInstanceInformation("rack=A:123, host=");
return new CustomCloudInstanceInformation();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.api.cloud.CloudInstanceInformation;
import org.apache.helix.cloud.constants.CloudProvider;
import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
import org.apache.helix.integration.common.ZkStandAloneCMTestBase;
Expand Down Expand Up @@ -211,8 +212,11 @@ public void testAutoRegistrationCustomizedFullyQualifiedInfoProcessorPath() thro
// Check that live instance is added and instance config is populated with correct domain.
return null != manager.getHelixDataAccessor()
.getProperty(accessor.keyBuilder().liveInstance(instance5)) && manager.getConfigAccessor()
.getInstanceConfig(CLUSTER_NAME, instance5).getDomainAsString()
.equals("rack=A:123, host=" + instance5);
.getInstanceConfig(CLUSTER_NAME, instance5).getDomainAsString().equals(
CustomCloudInstanceInformation._cloudInstanceInfo.get(
CloudInstanceInformation.CloudInstanceField.FAULT_DOMAIN.name()))
&& manager.getConfigAccessor().getInstanceConfig(CLUSTER_NAME, instance5)
.getInstanceInfoMap().equals(CustomCloudInstanceInformation._cloudInstanceInfo);
}, 2000));

autoParticipant.syncStop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -173,11 +174,16 @@ public void testSetTargetTaskThreadPoolSizeIllegalArgument() {

@Test
public void testInstanceConfigBuilder() {

Map<String, String> instanceInfoMap = new HashMap<>();
instanceInfoMap.put("CAGE", "H");
Map<String, Integer> capacityDataMap = ImmutableMap.of("weight1", 1);
InstanceConfig instanceConfig =
new InstanceConfig.Builder().setHostName("testHost").setPort("1234").setDomain("foo=bar")
.setWeight(100).setInstanceEnabled(true).addTag("tag1").addTag("tag2")
.setInstanceEnabled(false).setInstanceCapacityMap(capacityDataMap).build("instance1");
.setInstanceEnabled(false).setInstanceInfoMap(instanceInfoMap)
.addInstanceInfo("CAGE", "G").addInstanceInfo("CABINET", "30")
.setInstanceCapacityMap(capacityDataMap).build("instance1");

Assert.assertEquals(instanceConfig.getId(), "instance1");
Assert.assertEquals(instanceConfig.getHostName(), "testHost");
Expand All @@ -187,6 +193,8 @@ public void testInstanceConfigBuilder() {
Assert.assertTrue(instanceConfig.getTags().contains("tag1"));
Assert.assertTrue(instanceConfig.getTags().contains("tag2"));
Assert.assertFalse(instanceConfig.getInstanceEnabled());
Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CAGE"), "H");
Assert.assertEquals(instanceConfig.getInstanceInfoMap().get("CABINET"), "30");
Assert.assertEquals(instanceConfig.getInstanceCapacityMap().get("weight1"), Integer.valueOf(1));
}
}

0 comments on commit a205a46

Please sign in to comment.