diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala index 02124450b..7eeffa5d9 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/EventLogPathProcessor.scala @@ -36,6 +36,8 @@ sealed trait EventLogInfo { def eventLog: Path } +case class EventLogFileSystemInfo(timestamp: Long, size: Long) + case class ApacheSparkEventLog(override val eventLog: Path) extends EventLogInfo case class DatabricksEventLog(override val eventLog: Path) extends EventLogInfo @@ -137,7 +139,7 @@ object EventLogPathProcessor extends Logging { } def getEventLogInfo(pathString: String, - hadoopConf: Configuration): Map[EventLogInfo, Option[Long]] = { + hadoopConf: Configuration): Map[EventLogInfo, Option[EventLogFileSystemInfo]] = { val inputPath = new Path(pathString) try { // Note that some cloud storage APIs may throw FileNotFoundException when the pathPrefix @@ -161,15 +163,19 @@ object EventLogPathProcessor extends Logging { logWarning(msg) // Return an empty map as this is a skip due to unsupported file type, not an exception. // Returning FailedEventLog would clutter the status report with unnecessary entries. - Map.empty[EventLogInfo, Option[Long]] + Map.empty[EventLogInfo, Option[EventLogFileSystemInfo]] } else if (fileStatus.isDirectory && isEventLogDir(fileStatus)) { // either event logDir v2 directory or regular event log val info = ApacheSparkEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo] - Map(info -> Some(fileStatus.getModificationTime)) + // TODO - need to handle size of files in directory, for now document its not supported + Map(info -> + Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen))) } else if (fileStatus.isDirectory && isDatabricksEventLogDir(fileStatus, fs)) { val dbinfo = DatabricksEventLog(fileStatus.getPath).asInstanceOf[EventLogInfo] - Map(dbinfo -> Some(fileStatus.getModificationTime)) + // TODO - need to handle size of files in directory, for now document its not supported + Map(dbinfo -> + Some(EventLogFileSystemInfo(fileStatus.getModificationTime, fileStatus.getLen))) } else { // assume either single event log or directory with event logs in it, we don't // support nested dirs, so if event log dir within another one we skip it @@ -194,10 +200,10 @@ object EventLogPathProcessor extends Logging { logsSupported.map { s => if (s.isFile || (s.isDirectory && isEventLogDir(s.getPath().getName()))) { (ApacheSparkEventLog(s.getPath).asInstanceOf[EventLogInfo] - -> Some(s.getModificationTime)) + -> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen))) } else { (DatabricksEventLog(s.getPath).asInstanceOf[EventLogInfo] - -> Some(s.getModificationTime)) + -> Some(EventLogFileSystemInfo(s.getModificationTime, s.getLen))) } }.toMap } @@ -227,7 +233,8 @@ object EventLogPathProcessor extends Logging { filterNLogs: Option[String], matchlogs: Option[String], eventLogsPaths: List[String], - hadoopConf: Configuration): (Seq[EventLogInfo], Seq[EventLogInfo]) = { + hadoopConf: Configuration, + maxEventLogSize: Option[String] = None): (Seq[EventLogInfo], Seq[EventLogInfo]) = { val logsPathNoWildCards = processWildcardsLogs(eventLogsPaths, hadoopConf) val logsWithTimestamp = logsPathNoWildCards.flatMap { case (rawPath, processedPaths) if processedPaths.isEmpty => @@ -244,24 +251,44 @@ object EventLogPathProcessor extends Logging { logsWithTimestamp.filterKeys(_.eventLog.getName.contains(strMatch)) }.getOrElse(logsWithTimestamp) - val filteredLogs = if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) { - val filteredInfo = filterNLogs.get.split("-") - val numberofEventLogs = filteredInfo(0).toInt - val criteria = filteredInfo(1) - // Before filtering based on user criteria, remove the failed event logs - // (i.e. logs without timestamp) from the list. + val filteredLogs = if ((filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) || + maxEventLogSize.isDefined) { val validMatchedLogs = matchedLogs.collect { case (info, Some(ts)) => info -> ts } - val matched = if (criteria.equals("newest")) { - LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 > _._2): _*) - } else if (criteria.equals("oldest")) { - LinkedHashMap(validMatchedLogs.toSeq.sortWith(_._2 < _._2): _*) + val filteredBySize = if (maxEventLogSize.isDefined) { + val maxSizeInBytes = if (StringUtils.isMemorySize(maxEventLogSize.get)) { + // if it is memory return the bytes unit + StringUtils.convertMemorySizeToBytes(maxEventLogSize.get) + } else { + // size is assumed to be mb + StringUtils.convertMemorySizeToBytes(maxEventLogSize.get + "m") + } + val (matched, filtered) = validMatchedLogs.partition(info => info._2.size <= maxSizeInBytes) + logInfo(s"Filtering eventlogs by size, max size is ${maxSizeInBytes}b. The logs filtered " + + s"out include: ${filtered.keys.map(_.eventLog.toString).mkString(",")}") + matched + } else { + validMatchedLogs + } + if (filterNLogs.nonEmpty && !filterByAppCriteria(filterNLogs)) { + val filteredInfo = filterNLogs.get.split("-") + val numberofEventLogs = filteredInfo(0).toInt + val criteria = filteredInfo(1) + // Before filtering based on user criteria, remove the failed event logs + // (i.e. logs without timestamp) from the list. + val matched = if (criteria.equals("newest")) { + LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp > _._2.timestamp): _*) + } else if (criteria.equals("oldest")) { + LinkedHashMap(filteredBySize.toSeq.sortWith(_._2.timestamp < _._2.timestamp): _*) + } else { + logError("Criteria should be either newest-filesystem or oldest-filesystem") + Map.empty[EventLogInfo, Long] + } + matched.take(numberofEventLogs) } else { - logError("Criteria should be either newest-filesystem or oldest-filesystem") - Map.empty[EventLogInfo, Long] + filteredBySize } - matched.take(numberofEventLogs) } else { matchedLogs } diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index 34c028bde..c5c4d88ce 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -88,6 +88,13 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* descr = "Filter event logs whose application start occurred within the past specified " + "time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," + "m(months). If a period is not specified it defaults to days.") + val maxEventLogSize: ScallopOption[String] = + opt[String](required = false, + descr = "Process only application event logs whose size is less than or equal to the size " + + "specified. Valid units of size are b(bytes),k(kilobytes),m(megabytes),g(gigabytes). " + + "If no units are specified, the size is assumed to be m. Note, this does not support " + + "event log rolling which puts multiple event logs for the same application into a " + + "single directory.") val matchEventLogs: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose filenames contain the input string. Filesystem " + diff --git a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index ed4c304fc..24d9bac7f 100644 --- a/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/core/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -50,6 +50,7 @@ object QualificationMain extends Logging { val eventlogPaths = appArgs.eventlog() val filterN = appArgs.filterCriteria + val maxEventLogSize = appArgs.maxEventLogSize.toOption val matchEventLogs = appArgs.matchEventLogs val outputDirectory = appArgs.outputDirectory().stripSuffix("/") val numOutputRows = appArgs.numOutputRows.getOrElse(1000) @@ -86,7 +87,7 @@ object QualificationMain extends Logging { } val (eventLogFsFiltered, allEventLogs) = EventLogPathProcessor.processAllPaths( - filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf) + filterN.toOption, matchEventLogs.toOption, eventlogPaths, hadoopConf, maxEventLogSize) val filteredLogs = if (argsContainsAppFilters(appArgs)) { val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads) @@ -120,8 +121,10 @@ object QualificationMain extends Logging { def argsContainsFSFilters(appArgs: QualificationArgs): Boolean = { val filterCriteria = appArgs.filterCriteria.toOption + val maxEventLogSize = appArgs.maxEventLogSize.toOption appArgs.matchEventLogs.isSupplied || - (filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem")) + (filterCriteria.isDefined && filterCriteria.get.endsWith("-filesystem")) || + maxEventLogSize.isDefined } def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = { diff --git a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml index 2048f92ef..dc71d36ef 100644 --- a/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml +++ b/user_tools/src/spark_rapids_pytools/resources/qualification-conf.yaml @@ -87,6 +87,7 @@ sparkRapids: - no-html-report - m - match-event-logs + - max-event-log-size - max-sql-desc-length - ml-functions - n