diff --git a/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/TaskMonitorService.scala b/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/TaskMonitorService.scala index f6f250e6a..6a22728e4 100644 --- a/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/TaskMonitorService.scala +++ b/streamis-jobmanager/streamis-job-manager/streamis-job-manager-service/src/main/scala/com/webank/wedatasphere/streamis/jobmanager/manager/service/TaskMonitorService.scala @@ -86,10 +86,9 @@ class TaskMonitorService extends Logging { streamTasks.filter(shouldMonitor).foreach { streamTask => val job = streamJobMapper.getJobById(streamTask.getJobId) if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES.getValue.contains(job.getJobType)) { - val userList = Sets.newHashSet(job.getSubmitUser, job.getCreateBy) - userList.addAll(getAlertUsers(job)) + val userList = getAlertUsers(job) val alertMsg = s"Spark Streaming应用[${job.getName}]已经超过 ${Utils.msDurationToString(System.currentTimeMillis - streamTask.getLastUpdateTime.getTime)} 没有更新状态, 请及时确认应用是否正常!" - alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList[String](userList), streamTask) + alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask) } else { streamTask.setLastUpdateTime(new Date) streamTaskMapper.updateTask(streamTask) @@ -110,7 +109,6 @@ class TaskMonitorService extends Logging { } else { // 连续三次还是出现异常,说明Linkis的Manager已经不能正常提供服务,告警并不再尝试获取状态,等待下次尝试 val users = getAlertUsers(job) - users.add(job.getCreateBy) alert(jobService.getAlertLevel(job), s"请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[${job.getName}]", users, streamTask) } } @@ -138,9 +136,8 @@ class TaskMonitorService extends Logging { } case _ => } - val userList = Sets.newHashSet(job.getSubmitUser, job.getCreateBy) - userList.addAll(getAlertUsers(job)) - alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList[String](userList), streamTask) + val userList = getAlertUsers(job) + alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask) } } } @@ -159,12 +156,19 @@ class TaskMonitorService extends Logging { } protected def getAlertUsers(job: StreamJob): util.List[String] = { - var users = jobService.getAlertUsers(job) - if (users == null) { - users = new util.ArrayList[String]() + val allUsers = new util.LinkedHashSet[String]() + val alertUsers = jobService.getAlertUsers(job) + if (alertUsers!= null) { + alertUsers.foreach(user => { + allUsers.add(user) + }) } - users.addAll(util.Arrays.asList(JobConf.STREAMIS_DEVELOPER.getValue.split(","):_*)) - users + allUsers.add(job.getSubmitUser) + allUsers.add(job.getCreateBy) + util.Arrays.asList(JobConf.STREAMIS_DEVELOPER.getValue.split(","):_*).foreach(user => { + allUsers.add(user) + }) + new util.ArrayList[String](allUsers) } protected def alert(alertLevel: AlertLevel, alertMsg: String, users: util.List[String], streamTask:StreamTask): Unit = alerters.foreach{ alerter =>