Skip to content

Commit

Permalink
Qualification tool: Add option to filter event logs for a maximum fil…
Browse files Browse the repository at this point in the history
…e system size (#1275)

* Add option to filter event logs for a maximum file system size

---------

Signed-off-by: Thomas Graves <[email protected]>
  • Loading branch information
tgravescs authored Aug 12, 2024
1 parent 32239bf commit 46f7cab
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ sealed trait EventLogInfo {
def eventLog: Path
}

case class EventLogFileSystemInfo(timestamp: Long, size: Long)

case class ApacheSparkEventLog(override val eventLog: Path) extends EventLogInfo
case class DatabricksEventLog(override val eventLog: Path) extends EventLogInfo

Expand Down Expand Up @@ -137,7 +139,7 @@ object EventLogPathProcessor extends Logging {
}

def getEventLogInfo(pathString: String,
hadoopConf: Configuration): Map[EventLogInfo, Option[Long]] = {
hadoopConf: Configuration): Map[EventLogInfo, Option[EventLogFileSystemInfo]] = {
val inputPath = new Path(pathString)
try {
// Note that some cloud storage APIs may throw FileNotFoundException when the pathPrefix
Expand All @@ -161,15 +163,19 @@ object EventLogPathProcessor extends Logging {
logWarning(msg)
// Return an empty map as this is a skip due to unsupported file type, not an exception.
// Returning FailedEventLog would clutter the status report with unnecessary entries.
Map.empty[EventLogInfo, Option[Long]]
Map.empty[EventLogInfo, Option[EventLogFileSystemInfo]]
} else if (fileStatus.isDirectory && isEventLogDir(fileStatus)) {
// either event logDir v2 directory or regular event log
val info = ApacheSparkEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(info -> Some(fileStatus.getModificationTime))
// TODO - need to handle size of files in directory, for now document its not supported
Map(info ->
Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen)))
} else if (fileStatus.isDirectory &&
isDatabricksEventLogDir(fileStatus, fs)) {
val dbinfo = DatabricksEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo]
Map(dbinfo -> Some(fileStatus.getModificationTime))
// TODO - need to handle size of files in directory, for now document its not supported
Map(dbinfo ->
Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen)))
} else {
// assume either single event log or directory with event logs in it, we don't
// support nested dirs, so if event log dir within another one we skip it
Expand All @@ -194,10 +200,10 @@ object EventLogPathProcessor extends Logging {
logsSupported.map { s =>
if (s.isFile || (s.isDirectory && isEventLogDir(s.getPath().getName()))) {
(ApacheSparkEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
-> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen)))
} else {
(DatabricksEventLog(s.getPath).asInstanceOf[EventLogInfo]
-> Some(s.getModificationTime))
-> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen)))
}
}.toMap
}
Expand Down Expand Up @@ -227,7 +233,8 @@ object EventLogPathProcessor extends Logging {
filterNLogs: Option[String],
matchlogs: Option[String],
eventLogsPaths: List[String],
hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
hadoopConf: Configuration,
maxEventLogSize: Option[String] = None): (Seq[EventLogInfo], Seq[EventLogInfo]) = {
val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf)
val logsWithTimestamp = logsPathNoWildCards.flatMap {
case (rawPath, processedPaths) if processedPaths.isEmpty =>
Expand All @@ -244,24 +251,44 @@ object EventLogPathProcessor extends Logging {
logsWithTimestamp.filterKeys(_.eventLog.getName.contains(strMatch))
}.getOrElse(logsWithTimestamp)

val filteredLogs = if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val filteredLogs = if ((filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) ||
maxEventLogSize.isDefined) {
val validMatchedLogs = matchedLogs.collect {
case (info, Some(ts)) => info -> ts
}
val matched = if (criteria.equals("newest")) {
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 > _._2): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 < _._2): _*)
val filteredBySize = if (maxEventLogSize.isDefined) {
val maxSizeInBytes = if (StringUtils.isMemorySize(maxEventLogSize.get)) {
// if it is memory return the bytes unit
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get)
} else {
// size is assumed to be mb
StringUtils.convertMemorySizeToBytes(maxEventLogSize.get + "m")
}
val (matched, filtered) = validMatchedLogs.partition(info => info._2.size <= maxSizeInBytes)
logInfo(s"Filtering eventlogs by size, max size is ${maxSizeInBytes}b. The logs filtered " +
s"out include: ${filtered.keys.map(_.eventLog.toString).mkString(",")}")
matched
} else {
validMatchedLogs
}
if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) {
val filteredInfo = filterNLogs.get.split("-")
val numberofEventLogs = filteredInfo(0).toInt
val criteria = filteredInfo(1)
// Before filtering based on user criteria, remove the failed event logs
// (i.e. logs without timestamp) from the list.
val matched = if (criteria.equals("newest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp > _._2.timestamp): _*)
} else if (criteria.equals("oldest")) {
LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp < _._2.timestamp): _*)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
}
matched.take(numberofEventLogs)
} else {
logError("Criteria should be either newest-filesystem or oldest-filesystem")
Map.empty[EventLogInfo, Long]
filteredBySize
}
matched.take(numberofEventLogs)
} else {
matchedLogs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Filter event logs whose application start occurred within the past specified " +
"time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," +
"m(months). If a period is not specified it defaults to days.")
val maxEventLogSize: ScallopOption[String] =
opt[String](required = false,
descr = "Process only application event logs whose size is less than or equal to the size " +
"specified. Valid units of size are b(bytes),k(kilobytes),m(megabytes),g(gigabytes). " +
"If no units are specified, the size is assumed to be m. Note, this does not support " +
"event log rolling which puts multiple event logs for the same application into a " +
"single directory.")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string. Filesystem " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object QualificationMain extends Logging {

val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
val maxEventLogSize = appArgs.maxEventLogSize.toOption
val matchEventLogs = appArgs.matchEventLogs
val outputDirectory = appArgs.outputDirectory().stripSuffix("/")
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
Expand Down Expand Up @@ -86,7 +87,7 @@ object QualificationMain extends Logging {
}

val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths(
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf)
filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf, maxEventLogSize)

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
Expand Down Expand Up @@ -120,8 +121,10 @@ object QualificationMain extends Logging {

def argsContainsFSFilters(appArgs: QualificationArgs): Boolean = {
val filterCriteria = appArgs.filterCriteria.toOption
val maxEventLogSize = appArgs.maxEventLogSize.toOption
appArgs.matchEventLogs.isSupplied ||
(filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem"))
(filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem")) ||
maxEventLogSize.isDefined
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ sparkRapids:
- no-html-report
- m
- match-event-logs
- max-event-log-size
- max-sql-desc-length
- ml-functions
- n
Expand Down

0 comments on commit 46f7cab

Please sign in to comment.