Skip to content

Commit

Permalink
Simplify hashing of intermediate output dirs
Browse files Browse the repository at this point in the history
Signed-off-by: Partho Sarthi <[email protected]>
  • Loading branch information
parthosa committed Dec 21, 2024
1 parent 634f033 commit 1ae1a76
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
"""
import hashlib
import os
import re
import socket
import subprocess
import time
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Callable, List, Tuple, ClassVar
Expand All @@ -39,9 +39,6 @@ class SparkJarRunner:
remote_executor_output_path: str = field(init=True)
jar_cmd_args: JarCmdArgs = field(init=True)
env_vars: dict = field(init=True) # Environment variables to be used in the Spark job
# Maximum length of the file name to be used as a directory name
# Reference: https://en.wikipedia.org/wiki/Comparison_of_file_systems#Limits
max_file_name_bytes: ClassVar[int] = 200
default_file_name_sep: ClassVar[str] = '_'

def get_env_var(self, key: str) -> str:
Expand All @@ -59,9 +56,9 @@ def construct_jar_cmd_map_func(self) -> Callable[[str], Tuple[List[str], AppStat
Construct a function to be used as a map function for running JAR files.
"""
def run_jar_map_func(file_path: str):
# Sanitize the file name and create the output directory for the map task
sanitized_file_name = self.sanitize_for_posix(os.path.basename(file_path))
executor_output_path = os.path.join(self.remote_executor_output_path, sanitized_file_name)
# Hash the file name and create the output directory for the map task
hashed_file_name = self.get_file_name_hash(os.path.basename(file_path))
executor_output_path = os.path.join(self.remote_executor_output_path, hashed_file_name)
# Construct the JAR command and submit it as a map task
jar_command = self._construct_jar_cmd(file_path, executor_output_path)
# Execute the JAR command and capture the status, output, and execution time
Expand Down Expand Up @@ -131,47 +128,14 @@ def _submit_jar_cmd(jar_command: List[str]) -> Tuple[AppStatusResult, timedelta]
return app_status, processing_time

@classmethod
def sanitize_for_posix(cls, filename: str) -> str:
def get_file_name_hash(cls, filename: str) -> str:
"""
Sanitize a filename to make it safe for use as a directory name on POSIX systems.
This function removes or replaces characters that are not allowed or could cause issues in a directory name.
It converts spaces to underscores, removes invalid characters, and enforces a maximum length of 255 characters.
Examples:
>>> SparkJarRunner.sanitize_for_posix('My File Name.txt')
'My_File_Name.txt'
>>> SparkJarRunner.sanitize_for_posix('example/file/name')
'example_file_name'
>>> SparkJarRunner.sanitize_for_posix('file*name|with<>invalid:chars')
'filenamewithinvalidchars'
>>> SparkJarRunner.sanitize_for_posix('!@#$%^&*()')
'default_directory_name'
:param filename: The filename to sanitize.
:return: A sanitized version of the filename safe for use as a directory name.
Returns a hashed and timestamped version of the filename safe for use as a directory name.
Timestamp is used to ensure uniqueness in case there are multiple files with the same name.
"""
sanitized = filename.strip() # Remove leading/trailing whitespace
sanitized = sanitized.replace(' ', cls.default_file_name_sep) # Replace spaces with underscores
sanitized = sanitized.replace('.', cls.default_file_name_sep) # Replace dots with underscores
sanitized = re.sub(r'[\/]', cls.default_file_name_sep, sanitized) # Replace slashes with underscores
sanitized = re.sub(r'[^\w\-]', cls.default_file_name_sep, sanitized) # Replace any non-word or dash characters with underscores

if len(sanitized) > cls.max_file_name_bytes:
# If the sanitized file name is too long, hash it to ensure it fits within the FileSystem limits
file_name_hash = str(hashlib.md5(sanitized.encode()).hexdigest())
# Calculate how much space is available for truncating the file name
remaining_space = cls.max_file_name_bytes - (len(file_name_hash) + 1)
if remaining_space > 0:
# Truncate the file name and append the hash
sanitized = f'{sanitized[:remaining_space]}_{file_name_hash}'
else:
sanitized = file_name_hash

return sanitized
hex_digest = hashlib.md5(filename.encode()).hexdigest()
timestamp_ns = time.time_ns()
return f'{hex_digest}_{timestamp_ns}'

@staticmethod
def _generate_log_lines(file_path: str, executor_output_path: str, jar_command: List[str],
Expand Down
13 changes: 0 additions & 13 deletions user_tools/tests/spark_rapids_tools_distributed/__init__.py

This file was deleted.

This file was deleted.

0 comments on commit 1ae1a76

Please sign in to comment.