Skip to content

Commit

Permalink
Adjust the order of alert user
Browse files Browse the repository at this point in the history
  • Loading branch information
jefftlin committed Mar 21, 2023
1 parent 866a8c2 commit 01333fb
Showing 1 changed file with 16 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ class TaskMonitorService extends Logging {
streamTasks.filter(shouldMonitor).foreach { streamTask =>
val job = streamJobMapper.getJobById(streamTask.getJobId)
if(!JobConf.SUPPORTED_MANAGEMENT_JOB_TYPES.getValue.contains(job.getJobType)) {
val userList = Sets.newHashSet(job.getSubmitUser, job.getCreateBy)
userList.addAll(getAlertUsers(job))
val userList = getAlertUsers(job)
val alertMsg = s"Spark Streaming应用[${job.getName}]已经超过 ${Utils.msDurationToString(System.currentTimeMillis - streamTask.getLastUpdateTime.getTime)} 没有更新状态, 请及时确认应用是否正常!"
alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList[String](userList), streamTask)
alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
} else {
streamTask.setLastUpdateTime(new Date)
streamTaskMapper.updateTask(streamTask)
Expand All @@ -110,7 +109,6 @@ class TaskMonitorService extends Logging {
} else {
// 连续三次还是出现异常,说明Linkis的Manager已经不能正常提供服务,告警并不再尝试获取状态,等待下次尝试
val users = getAlertUsers(job)
users.add(job.getCreateBy)
alert(jobService.getAlertLevel(job), s"请求LinkisManager失败,Linkis集群出现异常,请关注!影响任务[${job.getName}]", users, streamTask)
}
}
Expand Down Expand Up @@ -138,9 +136,8 @@ class TaskMonitorService extends Logging {
}
case _ =>
}
val userList = Sets.newHashSet(job.getSubmitUser, job.getCreateBy)
userList.addAll(getAlertUsers(job))
alert(jobService.getAlertLevel(job), alertMsg, new util.ArrayList[String](userList), streamTask)
val userList = getAlertUsers(job)
alert(jobService.getAlertLevel(job), alertMsg, userList, streamTask)
}
}
}
Expand All @@ -159,12 +156,19 @@ class TaskMonitorService extends Logging {
}

protected def getAlertUsers(job: StreamJob): util.List[String] = {
var users = jobService.getAlertUsers(job)
if (users == null) {
users = new util.ArrayList[String]()
val allUsers = new util.LinkedHashSet[String]()
val alertUsers = jobService.getAlertUsers(job)
if (alertUsers!= null) {
alertUsers.foreach(user => {
allUsers.add(user)
})
}
users.addAll(util.Arrays.asList(JobConf.STREAMIS_DEVELOPER.getValue.split(","):_*))
users
allUsers.add(job.getSubmitUser)
allUsers.add(job.getCreateBy)
util.Arrays.asList(JobConf.STREAMIS_DEVELOPER.getValue.split(","):_*).foreach(user => {
allUsers.add(user)
})
new util.ArrayList[String](allUsers)
}

protected def alert(alertLevel: AlertLevel, alertMsg: String, users: util.List[String], streamTask:StreamTask): Unit = alerters.foreach{ alerter =>
Expand Down

0 comments on commit 01333fb

Please sign in to comment.