Skip to content

Commit

Permalink
删除集群中的服务,未删除对应服务的参数(t_ddh_cluster_variable,GlobalVariables) (#546)
Browse files Browse the repository at this point in the history
* [Bug] Doris] DDP1.2.2,使用doris官方包安装be失败 #539

* [Bug] [datasophon-api] 集群安装服务失败,删除服务重新安装,因为没有删除失败服务的全局参数(t_ddh_cluster_variable,GlobalVariables), 重新安装任然使用安装失败时配置的参数(Doirs2.0.7) #545
  • Loading branch information
hawk9821 authored May 11, 2024
1 parent 81ee1e6 commit 773c3a3
Show file tree
Hide file tree
Showing 38 changed files with 262 additions and 195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE t_ddh_cluster_variable ADD service_name VARCHAR(128) DEFAULT '' COMMENT '服务名';
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@
"from": "script/status_be.sh",
"to": "be/bin/status_be.sh",
"md5": "cb2df48c72383f61613660da7e942ef7"
}, {
"type": "replace",
"source": "be/bin/start_be.sh",
"regex":"\/bin\/limit3",
"replacement": "/usr/bin/ulimit"
}],
"startRunner": {
"timeout": "600",
Expand Down Expand Up @@ -367,4 +362,4 @@
"defaultValue": ""
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package com.datasophon.dao.entity;

import java.io.Serializable;

import lombok.Data;

import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.io.Serializable;

@Data
@TableName("t_ddh_cluster_variable")
Expand All @@ -31,21 +30,26 @@ public class ClusterVariable implements Serializable {
private static final long serialVersionUID = 1L;

/**
*
*
*/
@TableId
private Integer id;
/**
*
*
*/
private Integer clusterId;
/**
*
*
*/
private String variableName;
/**
*
*
*/
private String variableValue;

/**
*
*/
private String serviceName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.baomidou.mybatisplus.extension.service.IService;
import com.datasophon.dao.entity.ClusterVariable;

import java.util.List;

/**
*
*
*
* @author gaodayu
* @email [email protected]
Expand All @@ -30,4 +32,6 @@
public interface ClusterVariableService extends IService<ClusterVariable> {

ClusterVariable getVariableByVariableName(String variableName, Integer clusterId);

List<ClusterVariable> getVariables(Integer clusterId, String serviceName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.datasophon.api.service.*;
import com.datasophon.common.Constants;
import com.datasophon.common.model.SimpleServiceConfig;
import com.datasophon.common.utils.CollectionUtils;
import com.datasophon.common.utils.PlaceholderUtils;
import com.datasophon.common.utils.Result;
import com.datasophon.dao.entity.*;
Expand All @@ -46,9 +47,9 @@
@Transactional
public class ClusterServiceInstanceServiceImpl
extends
ServiceImpl<ClusterServiceInstanceMapper, ClusterServiceInstanceEntity>
ServiceImpl<ClusterServiceInstanceMapper, ClusterServiceInstanceEntity>
implements
ClusterServiceInstanceService {
ClusterServiceInstanceService {

@Autowired
private ClusterServiceInstanceMapper serviceInstanceMapper;
Expand Down Expand Up @@ -77,6 +78,9 @@ public class ClusterServiceInstanceServiceImpl
@Autowired
private ClusterServiceRoleInstanceWebuisService webuisService;

@Autowired
private ClusterVariableService variableService;

@Override
public ClusterServiceInstanceEntity getServiceInstanceByClusterIdAndServiceName(Integer clusterId,
String serviceName) {
Expand Down Expand Up @@ -243,6 +247,15 @@ public Result delServiceInstance(Integer serviceInstanceId) {

// del service instance
this.removeById(serviceInstanceId);
// del variable
roleGroups.forEach(roleGroup -> {
List<ClusterVariable> variables = variableService.getVariables(roleGroup.getClusterId(), roleGroup.getServiceName());
if (CollectionUtils.isNotEmpty(variables)) {
Map<String, String> variablesMap = GlobalVariables.get(roleGroup.getClusterId());
variables.forEach(var -> variablesMap.remove(var.getVariableName()));
variableService.removeByIds(variables.stream().map(ClusterVariable::getId).collect(Collectors.toList()));
}
});
return Result.success();
}

Expand All @@ -256,6 +269,6 @@ public List<ClusterServiceInstanceEntity> listRunningServiceInstance(Integer clu
public boolean hasRunningRoleInstance(Integer serviceInstanceId) {
List<ClusterServiceRoleInstanceEntity> list =
roleInstanceService.getRunningServiceRoleInstanceListByServiceId(serviceInstanceId);
return !list.isEmpty();
return !list.isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.datasophon.api.service.impl;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.datasophon.api.service.ClusterVariableService;
Expand All @@ -31,7 +32,7 @@
@Service("clusterVariableService")
public class ClusterVariableServiceImpl extends ServiceImpl<ClusterVariableMapper, ClusterVariable>
implements
ClusterVariableService {
ClusterVariableService {

@Override
public ClusterVariable getVariableByVariableName(String variableName, Integer clusterId) {
Expand All @@ -42,4 +43,12 @@ public ClusterVariable getVariableByVariableName(String variableName, Integer cl
}
return null;
}

@Override
public List<ClusterVariable> getVariables(Integer clusterId, String serviceName) {
return this.list(new LambdaQueryWrapper<ClusterVariable>()
.eq(ClusterVariable::getClusterId, clusterId)
.eq(ClusterVariable::getServiceName, serviceName)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,14 @@ public Result getInstallStep(Integer type) {
*/
@Override
public Result analysisHostList(
Integer clusterId,
String hosts,
String sshUser,
Integer sshPort,
Integer page,
Integer pageSize) {
Integer clusterId,
String hosts,
String sshUser,
Integer sshPort,
Integer page,
Integer pageSize) {
Map<String, String> globalVariables = GlobalVariables.get(clusterId);
ProcessUtils.generateClusterVariable(globalVariables, clusterId, SSHUSER, sshUser);
ProcessUtils.generateClusterVariable(globalVariables, clusterId, null, SSHUSER, sshUser);

List<HostInfo> list = new ArrayList<>();
hosts = hosts.replace(" ", "");
Expand Down Expand Up @@ -199,7 +199,7 @@ private void tellHostCheck(String clusterCode, HostInfo hostInfo) {
}

public HostInfo createHostInfo(
String host, Integer sshPort, String sshUser, String clusterCode) {
String host, Integer sshPort, String sshUser, String clusterCode) {
HostInfo hostInfo = new HostInfo();

hostInfo.setHostname(HostUtils.getHostName(host));
Expand Down Expand Up @@ -248,7 +248,7 @@ public Result getHostCheckStatus(Integer clusterId, String sshUser, Integer sshP

@Override
public Result rehostCheck(
Integer clusterId, String hostnames, String sshUser, Integer sshPort) {
Integer clusterId, String hostnames, String sshUser, Integer sshPort) {
// 开启主机校验
ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId);
String clusterCode = clusterInfo.getClusterCode();
Expand All @@ -271,7 +271,7 @@ public Result rehostCheck(

@Override
public Result dispatcherHostAgentList(
Integer clusterId, Integer installStateCode, Integer page, Integer pageSize) {
Integer clusterId, Integer installStateCode, Integer page, Integer pageSize) {

ClusterInfoEntity clusterInfo = clusterInfoService.getById(clusterId);
String clusterCode = clusterInfo.getClusterCode();
Expand Down Expand Up @@ -380,7 +380,7 @@ public Result hostCheckCompleted(Integer clusterId) {
HostInfo value = hostInfoEntry.getValue();
if (Objects.isNull(value.getCheckResult())
|| (Objects.nonNull(value.getCheckResult())
&& value.getCheckResult().getCode() != 10001)) {
&& value.getCheckResult().getCode() != 10001)) {
return Result.success().put("hostCheckCompleted", false);
}
}
Expand All @@ -389,7 +389,7 @@ public Result hostCheckCompleted(Integer clusterId) {

@Override
public Result cancelDispatcherHostAgent(
Integer clusterId, String hostname, Integer installStateCode) {
Integer clusterId, String hostname, Integer installStateCode) {

return null;
}
Expand Down Expand Up @@ -430,7 +430,7 @@ public Result generateHostAgentCommand(String clusterHostIds, String commandType
MinaUtils.execCmdWithResult(session, "service datasophon-worker " + commandType);
logger.info("hostAgent command:{}", "service datasophon-worker " + commandType);
if (ObjectUtil.isNotEmpty(session)) {
session.close();
session.close();
}
}
return Result.success();
Expand All @@ -456,7 +456,7 @@ public Result generateHostServiceCommand(String clusterHostIds, String commandTy
CommandType serviceCommandType = "start".equalsIgnoreCase(commandType) ? CommandType.START_SERVICE : CommandType.STOP_SERVICE;
for (ClusterHostDO clusterHostDO : clusterHostList) {
WorkerServiceMessage serviceMessage = new WorkerServiceMessage(
clusterHostDO.getHostname(), clusterHostDO.getClusterId(), serviceCommandType);
clusterHostDO.getHostname(), clusterHostDO.getClusterId(), serviceCommandType);
try {
ActorRef actor =
ActorUtils.getLocalActor(WorkerStartActor.class, "workerStartActor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public Result saveServiceConfig(
ServiceRoleStrategy serviceRoleHandler =
ServiceRoleStrategyContext.getServiceRoleHandler(serviceName);
if (Objects.nonNull(serviceRoleHandler)) {
serviceRoleHandler.handlerConfig(clusterId, list);
serviceRoleHandler.handlerConfig(clusterId, list, ServiceRoleStrategyContext.getServiceName(serviceName));
}
// add variable
FrameServiceEntity frameServiceEntity =
Expand All @@ -170,7 +170,7 @@ public Result saveServiceConfig(
String variableValue = String.valueOf(serviceConfig.getValue());
// add to global variable
if (Constants.INPUT.equals(serviceConfig.getType())) {
addToGlobalVariable(clusterId, variableName, variableValue);
addToGlobalVariable(clusterId, serviceName, variableName, variableValue);
}
globalVariables.put(variableName, variableValue);
map.put(serviceConfig.getName(), serviceConfig);
Expand Down Expand Up @@ -264,8 +264,10 @@ public Result saveServiceRoleHostMapping(Integer clusterId, List<ServiceRoleHost
ServiceRoleStrategy serviceRoleHandler =
ServiceRoleStrategyContext.getServiceRoleHandler(
serviceRoleHostMapping.getServiceRole());
String serviceName = ServiceRoleStrategyContext.getServiceName(
serviceRoleHostMapping.getServiceRole());
if (Objects.nonNull(serviceRoleHandler)) {
serviceRoleHandler.handler(clusterId, serviceRoleHostMapping.getHosts());
serviceRoleHandler.handler(clusterId, serviceRoleHostMapping.getHosts(), serviceName);
}
}

Expand Down Expand Up @@ -627,17 +629,19 @@ private void buildConfigFileMap(
}
}

private void addToGlobalVariable(Integer clusterId, String variableName, String value) {
private void addToGlobalVariable(Integer clusterId, String serviceName, String variableName, String value) {
ClusterVariable clusterVariable =
variableService.getVariableByVariableName(variableName, clusterId);
if (Objects.nonNull(clusterVariable)) {
if (!value.equals(clusterVariable.getVariableValue())) {
clusterVariable.setServiceName(serviceName);
clusterVariable.setVariableValue(value);
variableService.updateById(clusterVariable);
}
} else {
clusterVariable = new ClusterVariable();
clusterVariable.setClusterId(clusterId);
clusterVariable.setServiceName(serviceName);
clusterVariable.setVariableName(variableName);
clusterVariable.setVariableValue(value);
variableService.save(clusterVariable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.datasophon.api.strategy;

import cn.hutool.http.HttpUtil;
import com.datasophon.api.utils.ProcessUtils;
import com.datasophon.common.model.ServiceConfig;
import com.datasophon.common.model.ServiceRoleInfo;
Expand All @@ -26,17 +27,15 @@
import java.util.List;
import java.util.Map;

import cn.hutool.http.HttpUtil;

public class AlertManagerHandlerStrategy implements ServiceRoleStrategy {

@Override
public void handler(Integer clusterId, List<String> hosts) {
public void handler(Integer clusterId, List<String> hosts, String serviceName) {

}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list) {
public void handlerConfig(Integer clusterId, List<ServiceConfig> list, String serviceName) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public class BEHandlerStartegy implements ServiceRoleStrategy {
private static final Logger logger = LoggerFactory.getLogger(BEHandlerStartegy.class);

@Override
public void handler(Integer clusterId, List<String> hosts) {
public void handler(Integer clusterId, List<String> hosts, String serviceName) {

}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list) {
public void handlerConfig(Integer clusterId, List<ServiceConfig> list, String serviceName) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@
public class ElasticSearchHandlerStrategy implements ServiceRoleStrategy {

@Override
public void handler(Integer clusterId, List<String> hosts) {
public void handler(Integer clusterId, List<String> hosts, String serviceName) {
Map<String, String> globalVariables = GlobalVariables.get(clusterId);

ProcessUtils.generateClusterVariable(globalVariables, clusterId, "${initMasterNodes}", String.join(",", hosts));
ProcessUtils.generateClusterVariable(globalVariables, clusterId, serviceName,"${initMasterNodes}", String.join(",", hosts));
String join = String.join(":9300,", hosts);
String seedHosts = join + ":9300";
ProcessUtils.generateClusterVariable(globalVariables, clusterId, "${seedHosts}", seedHosts);
ProcessUtils.generateClusterVariable(globalVariables, clusterId, serviceName,"${seedHosts}", seedHosts);

}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list) {
public void handlerConfig(Integer clusterId, List<ServiceConfig> list, String serviceName) {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ public class FEHandlerStartegy implements ServiceRoleStrategy {
private static final Logger logger = LoggerFactory.getLogger(FEHandlerStartegy.class);

@Override
public void handler(Integer clusterId, List<String> hosts) {
public void handler(Integer clusterId, List<String> hosts, String serviceName) {
Map<String, String> globalVariables = GlobalVariables.get(clusterId);
// if feMaster is null, set the first host as feMaster
//Prevent FE Observer nodes from starting and FE Master nodes from changing
if (!globalVariables.containsKey("${feMaster}") || ObjUtil.isNull(globalVariables.get("${feMaster}"))) {
if (!hosts.isEmpty()) {
ProcessUtils.generateClusterVariable(globalVariables, clusterId, "${feMaster}", hosts.get(0));
ProcessUtils.generateClusterVariable(globalVariables, clusterId, serviceName,"${feMaster}", hosts.get(0));
}
}
}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list) {
public void handlerConfig(Integer clusterId, List<ServiceConfig> list, String serviceName) {

}

Expand Down
Loading

0 comments on commit 773c3a3

Please sign in to comment.