Skip to content

Commit

Permalink
#290 Extraction and recovery of executor's traffic
Browse files Browse the repository at this point in the history
  • Loading branch information
heziai committed Nov 16, 2017
1 parent 8947e32 commit c6636e2
Show file tree
Hide file tree
Showing 14 changed files with 795 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ public final class ServerBriefInfo implements Serializable {

private String version;

private boolean noTraffic;

public ServerBriefInfo(String executorName) {
this.executorName = executorName;
}

public String getServerIp() {
return this.serverIp;
}
Expand Down Expand Up @@ -100,62 +106,11 @@ public void setVersion(String version) {
this.version = version;
}

public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof ServerBriefInfo))
return false;
ServerBriefInfo other = (ServerBriefInfo) o;
Object this$executorName = getExecutorName();
Object other$executorName = other.getExecutorName();
if (this$executorName == null ? other$executorName != null : !this$executorName.equals(other$executorName))
return false;
Object this$totalLoadLevel = getTotalLoadLevel();
Object other$totalLoadLevel = other.getTotalLoadLevel();
if (this$totalLoadLevel == null ? other$totalLoadLevel != null
: !this$totalLoadLevel.equals(other$totalLoadLevel))
return false;
Object this$sharding = getSharding();
Object other$sharding = other.getSharding();
if (this$sharding == null ? other$sharding != null : !this$sharding.equals(other$sharding))
return false;
Object this$hasSharding = getHasSharding();
Object other$hasSharding = other.getHasSharding();
if (this$hasSharding == null ? other$hasSharding != null : !this$hasSharding.equals(other$hasSharding))
return false;
Object this$lastBeginTime = getLastBeginTime();
Object other$lastBeginTime = other.getLastBeginTime();
if (this$lastBeginTime == null ? other$lastBeginTime != null : !this$lastBeginTime.equals(other$lastBeginTime))
return false;
Object this$status = getStatus();
Object other$status = other.getStatus();
if (this$status == null ? other$status != null : !this$status.equals(other$status))
return false;
Object this$version = getVersion();
Object other$version = other.getVersion();
return this$version == null ? other$version == null : this$version.equals(other$version);
}

public int hashCode() {
int PRIME = 59;
int result = 1;
Object $executorName = getExecutorName();
result = result * 59 + ($executorName == null ? 43 : $executorName.hashCode());
Object $totalLoadLevel = getTotalLoadLevel();
result = result * 59 + ($totalLoadLevel == null ? 43 : $totalLoadLevel.hashCode());
Object $sharding = getSharding();
result = result * 59 + ($sharding == null ? 43 : $sharding.hashCode());
Object $hasSharding = getHasSharding();
result = result * 59 + ($hasSharding == null ? 43 : $hasSharding.hashCode());
Object $lastBeginTime = getLastBeginTime();
result = result * 59 + ($lastBeginTime == null ? 43 : $lastBeginTime.hashCode());
Object $status = getStatus();
result = result * 59 + ($status == null ? 43 : $status.hashCode());
Object $version = getVersion();
return result * 59 + ($version == null ? 43 : $version.hashCode());
public boolean isNoTraffic() {
return noTraffic;
}

public ServerBriefInfo(String executorName) {
this.executorName = executorName;
public void setNoTraffic(boolean noTraffic) {
this.noTraffic = noTraffic;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Map;

import com.vip.saturn.job.console.domain.ServerStatus;
import com.vip.saturn.job.console.exception.SaturnJobConsoleException;

public interface ServerDimensionService {

Expand All @@ -30,4 +31,6 @@ public interface ServerDimensionService {

boolean isReady(String jobName, String executor);

void traffic(String executorName, boolean extract) throws SaturnJobConsoleException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public static String getExecutorTaskNodePath(final String executorName) {
return getExecutorNodePath(executorName, "task");
}

public static String getExecutorNoTrafficNodePath(final String executorName) {
return getExecutorNodePath(executorName, "noTraffic");
}

public static String getExecutorIpNodePath(final String executorName) {
return getExecutorNodePath(executorName, "ip");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;

import com.vip.saturn.job.console.domain.RequestResult;
import com.vip.saturn.job.console.exception.SaturnJobConsoleException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.vip.saturn.job.console.service.JobOperationService;
import com.vip.saturn.job.console.service.ServerDimensionService;

@RestController
Expand All @@ -37,12 +38,36 @@ public class ServerController extends AbstractController {
@Resource
private ServerDimensionService serverDimensionService;

@Resource
private JobOperationService jobOperationService;

@RequestMapping(value = "servers", method = RequestMethod.GET)
public Map<String, Object> getAllServersBriefInfo(final HttpServletRequest request) {
return serverDimensionService.getAllServersBriefInfo();
}

@RequestMapping(value = "traffic", method = RequestMethod.POST)
public RequestResult traffic(final HttpServletRequest request, String executorName, String operation) {
RequestResult requestResult = new RequestResult();
try {
if(executorName == null || executorName.trim().isEmpty()) {
throw new SaturnJobConsoleException("The parameter executorName cannot be null or empty");
}
boolean extract;
if("extract".equals(operation)) {
extract = true;
} else if("recover".equals(operation)) {
extract = false;
} else {
throw new SaturnJobConsoleException("The operation " + operation + " is not supported");
}
serverDimensionService.traffic(executorName, extract);
requestResult.setSuccess(true);
} catch (SaturnJobConsoleException e) {
requestResult.setSuccess(false);
requestResult.setMessage(e.getMessage());
} catch (Exception e) {
requestResult.setSuccess(false);
requestResult.setMessage(e.toString());
}
return requestResult;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public Map<String, Object> getAllServersBriefInfo() {
ServerBriefInfo sbf = new ServerBriefInfo(executor);
String ip = curatorFrameworkOp.getData(ExecutorNodePath.getExecutorNodePath(executor, "ip"));
sbf.setServerIp(ip);
sbf.setNoTraffic(curatorFrameworkOp.checkExists(ExecutorNodePath.getExecutorNodePath(executor, "noTraffic")));
String lastBeginTime = curatorFrameworkOp
.getData(ExecutorNodePath.getExecutorNodePath(sbf.getExecutorName(), "lastBeginTime"));
sbf.setLastBeginTime(
Expand Down Expand Up @@ -192,4 +193,18 @@ public boolean isReady(String jobName, String executor) {
return false;
}

@Override
public void traffic(String executorName, boolean extract) throws SaturnJobConsoleException {
CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = curatorRepository.inSessionClient();
if (!curatorFrameworkOp.checkExists(ExecutorNodePath.getExecutorNodePath(executorName))) {
throw new SaturnJobConsoleException("The executorName(" + executorName + ") is not existing");
}
String executorNoTrafficNodePath = ExecutorNodePath.getExecutorNoTrafficNodePath(executorName);
if (extract) {
curatorFrameworkOp.create(executorNoTrafficNodePath);
} else {
curatorFrameworkOp.deleteRecursive(executorNoTrafficNodePath);
}
}

}
53 changes: 46 additions & 7 deletions saturn-console/src/main/resources/static/js/overview.js
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,25 @@ $(function() {
return false;
});
});

$("#traffic-executor-confirm-dialog").on("shown.bs.modal", function (event) {
var button = $(event.relatedTarget);
var executor = button.data('executor');
var operation = button.data('operation');
$("#traffic-executor-confirm-dialog-confirm-btn").unbind('click').click(function() {
var $btn = $(this).button('loading');
$.post("server/traffic", {executorName: executor, operation: operation, nns: regName}, function (data) {
$("#traffic-executor-confirm-dialog").modal("hide");
if(data.success) {
showSuccessDialogWithCallback(function(){location.reload(true);});
} else {
$("#executor-failure-dialog .fail-reason").text(data.message);
showFailureDialog("executor-failure-dialog");
}
}).always(function() { $btn.button('reset'); });
return false;
});
});

$("#remove-executor-confirm-dialog").on("shown.bs.modal", function (event) {
// 批量删除和单击删除公用一个confirm-dialog,如果取到relatedTarget为字符串,则为批量删除
Expand Down Expand Up @@ -1408,6 +1427,19 @@ $(function() {
$("#change-jobStatus-confirm-dialog").modal("show", obj);
}

function showTrafficConfirmDialog(obj) {
var executor = $(obj).data('executor');
var operation = $(obj).data('operation');
var confirmReason = null;
if(operation == "extract") {
confirmReason = "确认要 摘取 Executor(" + executor + ") 的流量吗?";
} else {
confirmReason = "确认要 恢复 Executor(" + executor + ") 的流量吗?";
}
$("#traffic-executor-confirm-dialog .confirm-reason").text(confirmReason);
$("#traffic-executor-confirm-dialog").modal("show", obj);
}

function showRemoveExecutorConfirmDialog(obj) {
var confirmReason = "确认要删除Executor:(" + $(obj).data('executor') + ")吗?";
$("#remove-executor-confirm-dialog .confirm-reason").text(confirmReason);
Expand Down Expand Up @@ -1928,30 +1960,37 @@ $(function() {
var loadLevels= [], exeNames = [], serverInfos = data["serverInfos"], lv = data["jobShardLoadLevels"];
if(serverInfos){
for (var i = 0;i < serverInfos.length;i++) {
var status = serverInfos[i].status,jobStatus = serverInfos[i].jobStatus,sharding = serverInfos[i].sharding, lastBeginTime = serverInfos[i].lastBeginTime,executorName = serverInfos[i].executorName,trClass = "",removeBtnClass = "",removeBtnTitle="";
loadLevels.push(serverInfos[i].totalLoadLevel),hasSharding = serverInfos[i].hasSharding;
var serverInfo = serverInfos[i];
var status = serverInfo.status,jobStatus = serverInfo.jobStatus,sharding = serverInfo.sharding, lastBeginTime = serverInfo.lastBeginTime,executorName = serverInfo.executorName,trClass = "",removeBtnClass = "",removeBtnTitle="";
loadLevels.push(serverInfo.totalLoadLevel),hasSharding = serverInfo.hasSharding, noTraffic = serverInfo.noTraffic;
if ("ONLINE" === status) {
trClass = "success";
onlines ++;
removeBtnClass = "disabled";
removeBtnTitle="无法删除ONLINE的Executor";
exeNames.push(serverInfos[i].executorName);
exeNames.push(serverInfo.executorName);
} else {
trClass = "warning";
offlines ++;
lastBeginTime = "";
removeBtnTitle="点击进行删除该Executor";
}
var trafficButton = "";
if(noTraffic == false) { // 可以摘取流量
trafficButton = "<button class='btn btn-warning' data-executor='" + executorName + "' data-operation='extract' onclick='showTrafficConfirmDialog(this);'" + ">流量摘取</button>";
} else {
trafficButton = "<button class='btn btn-info' data-executor='" + executorName + "' data-operation='recover' onclick='showTrafficConfirmDialog(this);'" + ">流量恢复</button>";
}
var removeButton = "<button operation='removeExecutor' title='"+removeBtnTitle+"' class='btn btn-danger "+removeBtnClass+"' data-executor='" + executorName + "' onclick='showRemoveExecutorConfirmDialog(this);' "+removeBtnClass+">删除</button>";
var baseTd = "<td><input class='batchDelExecutorInput' executorName='"+executorName+"' removeBtnClass='"+removeBtnClass+"' type='checkbox' onclick='clickBatchDelExecutorInputCheckBox(this);'/></td>"
+ "<td>" + executorName + "</td>"
+ "<td>" + serverInfos[i].serverIp + "</td>"
+ "<td>" + serverInfos[i].totalLoadLevel + "</td>"
+ "<td>" + serverInfo.serverIp + "</td>"
+ "<td>" + serverInfo.totalLoadLevel + "</td>"
+ "<td>" + sharding + "</td>"
+ "<td>" + status + "</td>"
+ "<td>" + lastBeginTime + "</td>"
+ "<td>" + serverInfos[i].version + "</td>"
+ "<td>" + removeButton + "</td>";
+ "<td>" + serverInfo.version + "</td>"
+ "<td>" + trafficButton + removeButton + "</td>";
$("#servers-overview-tbl tbody").append("<tr class='" + trClass + "'>" + baseTd + "</tr>");
}
}
Expand Down
1 change: 1 addition & 0 deletions saturn-console/src/main/resources/templates/overview.html
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ <h4 class="modal-title" id="addJobTitle">添加作业</h4>
<div th:replace="fragments :: confirm-dialog(id='remove-job-confirm-dialog', text='确认删除作业吗?')"></div>
<div th:replace="fragments :: confirm-dialog(id='remove-job-batch-confirm-dialog', text='')"></div>
<div th:replace="fragments :: confirm-dialog(id='remove-executor-confirm-dialog', text='')"></div>
<div th:replace="fragments :: confirm-dialog(id='traffic-executor-confirm-dialog', text='')"></div>
<div th:replace="fragments :: confirm-dialog(id='shard-all-at-once-confirm-dialog', text='确认要进行一键重排吗?(即把当前域下所有分片按照作业负荷以及优先Executor等作业配置重新进行排序。注:重排可能导致分片分布剧烈动荡,请谨慎操作!另外在操作本功能时,请尽量避免同时操作Executor上下线或启用禁用作业,以免影响到重排的均衡效果!)')"></div>
<div th:replace="fragments :: prompt-dialog(id='batch-add-job-prompt-dialog', text='')"></div>
<div th:replace="fragments :: failure-dialog(id='executor-failure-dialog', reason='')"></div>
Expand Down
23 changes: 23 additions & 0 deletions saturn-it/src/test/java/com/vip/saturn/it/SaturnAutoBasic.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.vip.saturn.job.console.SaturnEnvProperties;
import com.vip.saturn.job.console.domain.RequestResult;
import com.vip.saturn.job.console.springboot.SaturnConsoleApp;
import com.vip.saturn.job.console.utils.ExecutorNodePath;
import com.vip.saturn.job.executor.Main;
import com.vip.saturn.job.executor.SaturnExecutor;
import com.vip.saturn.job.internal.config.ConfigurationNode;
Expand Down Expand Up @@ -349,6 +350,28 @@ public static void forceStopJob(String jobName) {
}
}

public static void runAtOnceAndWaitShardingCompleted(final JobConfiguration jobConfiguration) throws Exception {
runAtOnce(jobConfiguration.getJobName());
Thread.sleep(1000L);

waitForFinish(new FinishCheck() {

@Override
public boolean docheck() {
return !isNeedSharding(jobConfiguration);
}

}, 10);
}

public static void extractTraffic(String executorName) {
regCenter.persist(ExecutorNodePath.getExecutorNoTrafficNodePath(executorName), "");
}

public static void recoverTraffic(String executorName) {
regCenter.remove(ExecutorNodePath.getExecutorNoTrafficNodePath(executorName));
}

protected static void configJob(String jobName, String configPath, Object value) {
JobConfiguration jobConfiguration = new JobConfiguration(jobName);
JobNodeStorage jobNodeStorage = new JobNodeStorage(regCenter, jobConfiguration);
Expand Down
Loading

0 comments on commit c6636e2

Please sign in to comment.