Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 优化超级恢复功能 #131

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.dfire.core.util.JobUtils;
import com.dfire.logs.ErrorLog;
import com.dfire.logs.MonitorLog;
import com.dfire.logs.ScheduleLog;
import com.dfire.protocol.JobExecuteKind;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class ScheduleOperatorController extends BaseHeraController {
private final Set<Long> cancelSet = new HashSet<>();
private final HeraJobMonitorService jobMonitorService;

public ScheduleOperatorController(HeraJobMonitorService jobMonitorService, HeraJobActionService heraJobActionService, @Qualifier("heraJobMemoryService") HeraJobService heraJobService, @Qualifier("heraGroupMemoryService") HeraGroupService heraGroupService, HeraJobHistoryService heraJobHistoryService, HeraUserService heraUserService, HeraPermissionService heraPermissionService, WorkClient workClient, HeraHostGroupService heraHostGroupService, HeraSsoService heraSsoService) {
public ScheduleOperatorController(HeraJobMonitorService jobMonitorService, HeraJobActionService heraJobActionService, @Qualifier("heraJobMemoryService") HeraJobService heraJobService, @Qualifier("heraGroupMemoryService") HeraGroupService heraGroupService, HeraJobHistoryService heraJobHistoryService, HeraUserService heraUserService, HeraPermissionService heraPermissionService, WorkClient workClient, HeraHostGroupService heraHostGroupService, HeraSsoService heraSsoService) {
this.heraJobActionService = heraJobActionService;
this.heraJobService = heraJobService;
this.heraGroupService = heraGroupService;
Expand All @@ -85,9 +86,9 @@ public ScheduleOperatorController(HeraJobMonitorService jobMonitorService, HeraJ
@PostMapping("/moveNodes")
@ResponseBody
@ApiOperation("任务批量移动接口")
public JsonResponse moveNodes(@ApiParam(value = "任务id集合,用,分割",required = true)String ids
,@ApiParam(value = "之前的所在组目录",required = true) String oldParent
,@ApiParam(value = "新的所在组目录",required = true) String newParent) {
public JsonResponse moveNodes(@ApiParam(value = "任务id集合,用,分割", required = true) String ids
, @ApiParam(value = "之前的所在组目录", required = true) String oldParent
, @ApiParam(value = "新的所在组目录", required = true) String newParent) {
if (ids != null) {
for (String id : ids.split(Constants.COMMA)) {
moveNode(id, newParent, oldParent);
Expand Down Expand Up @@ -148,18 +149,18 @@ public JsonResponse updatePermission(@RequestParam("id") @ApiParam(value = "任
uIdS.remove(perm.getUid());
}
}));
//经过第一轮的筛选后,如果还剩下,继续处理
// 经过第一轮的筛选后,如果还剩下,继续处理
if (uIdS.size() > 0) {
List<HeraPermission> perms = heraPermissionService.findByTargetId(newId, typeStr, 0);
//把以前设置为无效的、这次加入管理的重新设置为有效
// 把以前设置为无效的、这次加入管理的重新设置为有效
if (perms != null) {
perms.stream().filter(perm -> uIdS.contains(perm.getUid())).forEach(perm -> {
uIdS.remove(perm.getUid());
heraPermissionService.updateByUid(newId, typeStr, 1, perm.getUid());
});
}
if (uIdS.size() > 0) {
//余下的都是需要新增的
// 余下的都是需要新增的
Long targetId = Long.parseLong(String.valueOf(newId));
uIdS.forEach(uid -> heraPermissionService.insert(HeraPermission
.builder()
Expand Down Expand Up @@ -284,14 +285,14 @@ public JsonResponse updateSwitch(@ApiParam(value = "任务id", required = true)
if (status.equals(heraJob.getAuto())) {
return new JsonResponse(true, "操作成功");
}
//TODO 上下游任务检测时需要优化 任务链路复杂时 导致关闭/开启耗时较久
//关闭动作 上游关闭时需要判断下游是否有开启任务,如果有,则不允许关闭
// TODO 上下游任务检测时需要优化 任务链路复杂时 导致关闭/开启耗时较久
// 关闭动作 上游关闭时需要判断下游是否有开启任务,如果有,则不允许关闭
if (status != 1) {
String errorMsg;
if ((errorMsg = getJobFromAuto(heraJobService.findDownStreamJob(id), 1)) != null) {
return new JsonResponse(false, id + "下游存在开启状态任务:" + errorMsg);
}
} else { //开启动作 如果有上游任务,上游任务不能为关闭状态
} else { // 开启动作 如果有上游任务,上游任务不能为关闭状态
String errorMsg;
if ((errorMsg = getJobFromAuto(heraJobService.findUpStreamJob(id), 0)) != null) {
return new JsonResponse(false, id + "上游存在关闭状态任务:" + errorMsg);
Expand Down Expand Up @@ -400,6 +401,19 @@ public JsonResponse execute(@JsonSerialize(using = ToStringSerializer.class) @Ap
actionHistory.setStatisticEndTime(heraAction.getStatisticEndTime());
actionHistory.setHostGroupId(heraAction.getHostGroupId());
heraJobHistoryService.insert(actionHistory);
if (Objects.equals(triggerTypeEnum.getId(), TriggerTypeEnum.SUPER_RECOVER.getId())) {
// 获取该action的下游actions,并且将其status 设置为 s-recovery-ing 超级恢复中 , 避免超级恢复时,status == success 的误判
// type 1 下游
List<Integer> jobList = heraJobService.findJobImpact(heraAction.getJobId(), 1);
ScheduleLog.info(" 任务版本:{} 的下游Job有:{}", heraAction.getId(), jobList);
if (jobList.size() >= 1) {
for (int i = 0; i < jobList.size(); i++) {
HeraAction latestByJobId = heraJobActionService.findLatestByJobId(Long.valueOf(jobList.get(i)));
latestByJobId.setStatus(StatusEnum.SUPER_RECOVER_ING.toString());
heraJobActionService.update(latestByJobId);
}
}
}
heraAction.setScript(heraJob.getScript());
heraAction.setHistoryId(actionHistory.getId());
heraAction.setConfigs(configs);
Expand Down Expand Up @@ -439,7 +453,7 @@ public JsonResponse updateJobMessage(@ApiParam(value = "任务vo对象", require
return new JsonResponse(false, "至少选择一个任务所在区域");
}

//如果是依赖任务
// 如果是依赖任务
if (heraJobVo.getScheduleType() == 1) {
String dependencies = heraJobVo.getDependencies();
if (StringUtils.isNotBlank(dependencies)) {
Expand Down Expand Up @@ -525,23 +539,23 @@ public JsonResponse updateJobMessage(@ApiParam(value = "任务vo对象", require
return new JsonResponse(false, "更新失败");
}
doAsync(() -> {
//脚本更新
// 脚本更新
if (!newJob.getScript().equals(memJob.getScript())) {
addJobRecord(newJob.getId(), memJob.getScript(), RecordTypeEnum.SCRIPT, ssoName, ownerId);
}
//依赖任务更新
// 依赖任务更新
if (newJob.getDependencies() != null && !newJob.getDependencies().equals(memJob.getDependencies())) {
addJobRecord(newJob.getId(), memJob.getDependencies(), RecordTypeEnum.DEPEND, ssoName, ownerId);
}
//定时表达式更新
// 定时表达式更新
if (newJob.getCronExpression() != null && !newJob.getCronExpression().equals(memJob.getCronExpression())) {
addJobRecord(newJob.getId(), memJob.getCronExpression(), RecordTypeEnum.CRON, ssoName, ownerId);
}
//执行区域更新
// 执行区域更新
if (newJob.getAreaId() != null && !newJob.getAreaId().equals(memJob.getAreaId())) {
addJobRecord(newJob.getId(), memJob.getAreaId(), RecordTypeEnum.AREA, ssoName, ownerId);
}
//脚本配置项变化
// 脚本配置项变化
if (newJob.getConfigs() != null && !newJob.getConfigs().equals(memJob.getConfigs())) {
addJobRecord(newJob.getId(), memJob.getConfigs(), RecordTypeEnum.CONFIG, ssoName, ownerId);
}
Expand All @@ -554,8 +568,6 @@ public JsonResponse updateJobMessage(@ApiParam(value = "任务vo对象", require
}




@GetMapping(value = "previewJob")
@ResponseBody
@ApiOperation("预览任务接口")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@ public enum StatusEnum {
/**
* 失败状态
*/
FAILED("failed");
FAILED("failed"),

/**
* 超级恢复中
*/
SUPER_RECOVER_ING("s_recover_ing");


private String status;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ public String toName() {
public Integer getId() {
return id;
}

public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void runDebugJob(MasterWorkHolder selectWork, Long debugId) {
* @param actionId actionId
*/
private void runManualJob(MasterWorkHolder selectWork, Long actionId, TriggerTypeEnum triggerType) {
SocketLog.info("start run manual job, actionId = {}", actionId);
SocketLog.info("start run manual job, actionId = {} , triggerType = {}", actionId, triggerType.getName());
HeraAction heraAction = masterContext.getHeraJobActionService().findById(actionId);
HeraJobHistory history = masterContext.getHeraJobHistoryService().findById(heraAction.getHistoryId());
HeraJobHistoryVo historyVo = BeanConvertUtils.convert(history);
Expand Down Expand Up @@ -199,7 +199,7 @@ private void runScheduleJob(MasterWorkHolder selectWork, Long actionId, TriggerT
* @param triggerType
*/
private void runScheduleJobContext(MasterWorkHolder selectWork, Long actionId, int runCount, int retryCount, int retryWaitTime, TriggerTypeEnum triggerType) {
DebugLog.info("重试次数:{},重试时间:{},actionId:{}", retryCount, retryWaitTime, actionId);
DebugLog.info("重试次数:{},重试时间:{},actionId:{},TriggerTypeEnum:{}", retryCount, retryWaitTime, actionId,triggerType);
runCount++;
boolean isCancelJob = false;
if (runCount > 1) {
Expand Down Expand Up @@ -337,7 +337,7 @@ private void updateCacheAction(Long actionId, String status) {
* @return
*/
public boolean isTaskLimit() {
//可能会被apollo等配置中心修改,检测limit的时候进行判断
// 可能会被apollo等配置中心修改,检测limit的时候进行判断
setCoreSize(HeraGlobalEnv.getMaxParallelNum());
return executeJobPool.getActiveCount() >= HeraGlobalEnv.getMaxParallelNum();
}
Expand Down