Skip to content

Commit

Permalink
Merge pull request #67 from eecs485staff/input-file
Browse files Browse the repository at this point in the history
Support single input file
  • Loading branch information
awdeorio authored Nov 4, 2023
2 parents 2e813d2 + 42d973e commit a5ded12
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 17 deletions.
2 changes: 1 addition & 1 deletion madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def main():
# Run MapReduce API
try:
mapreduce(
input_dir=args.input,
input_path=args.input,
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
Expand Down
40 changes: 29 additions & 11 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
LOGGER = logging.getLogger("madoop")


def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
def mapreduce(input_path, output_dir, map_exe, reduce_exe):
"""Madoop API."""
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
Expand Down Expand Up @@ -54,8 +54,8 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
reduce_output_dir.mkdir()

# Copy and rename input files: part-00000, part-00001, etc.
input_dir = pathlib.Path(input_dir)
prepare_input_files(input_dir, map_input_dir)
input_path = pathlib.Path(input_path)
prepare_input_files(input_path, map_input_dir)

# Executables must be absolute paths
map_exe = pathlib.Path(map_exe).resolve()
Expand Down Expand Up @@ -98,25 +98,23 @@ def mapreduce(input_dir, output_dir, map_exe, reduce_exe):
LOGGER.info("Output directory: %s", output_dir)


def prepare_input_files(input_dir, output_dir):
def prepare_input_files(input_path, output_dir):
"""Copy and split input files. Rename to part-00000, part-00001, etc.
If a file in input_dir is smaller than MAX_INPUT_SPLIT_SIZE, then copy it
to output_dir. For larger files, split into blocks of MAX_INPUT_SPLIT_SIZE
bytes and write block to output_dir. Input files will never be combined.
The input_path can be a file or a directory of files. If a file is smaller
than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
output_dir. Input files will never be combined.
The number of files created will be the number of mappers since we will
assume that the number of tasks per mapper is 1. Apache Hadoop has a
configurable number of tasks per mapper, however for both simplicity and
because our use case has smaller inputs we use 1.
"""
assert input_dir.is_dir(), f"Can't find input_dir '{input_dir}'"

# Split and copy input files
part_num = 0
total_size = 0
for inpath in sorted(input_dir.glob('*')):
for inpath in normalize_input_paths(input_path):
assert inpath.is_file()

# Compute output filenames
Expand Down Expand Up @@ -148,6 +146,26 @@ def prepare_input_files(input_dir, output_dir):
LOGGER.debug("total input size=%sB", total_size)


def normalize_input_paths(input_path):
"""Return a list of filtered input files.
If input_path is a file, then use it. If input_path is a directory, then
grab all the *files* inside. Ignore subdirectories.
"""
input_paths = []
if input_path.is_dir():
for path in sorted(input_path.glob('*')):
if path.is_file():
input_paths.append(path)
else:
LOGGER.warning("Ignoring non-file: %s", path)
elif input_path.is_file():
input_paths.append(input_path)
assert input_paths, f"No input: {input_path}"
return input_paths


def is_executable(exe):
"""Verify exe is executable and raise exception if it is not.
Expand Down
42 changes: 37 additions & 5 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def test_simple(tmpdir):
"""Run a simple MapReduce job and verify the output."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand All @@ -24,7 +24,7 @@ def test_bash_executable(tmpdir):
"""Run a MapReduce job written in Bash."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.sh",
reduce_exe=TESTDATA_DIR/"word_count/reduce.sh",
Expand All @@ -39,7 +39,7 @@ def test_bad_map_exe(tmpdir):
"""Map exe returns non-zero should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map_invalid.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand All @@ -50,7 +50,7 @@ def test_missing_shebang(tmpdir):
"""Reduce exe with a bad shebag should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input",
input_path=TESTDATA_DIR/"word_count/input",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce_invalid.py",
Expand All @@ -61,7 +61,39 @@ def test_empty_inputs(tmpdir):
"""Empty input files should not raise an error."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_dir=TESTDATA_DIR/"word_count/input_empty",
input_path=TESTDATA_DIR/"word_count/input_empty",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
tmpdir/"output",
)


def test_single_input_file(tmpdir):
"""Run a simple MapReduce job with an input file instead of dir."""
with tmpdir.as_cwd():
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input-single-file.txt",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
)
utils.assert_dirs_eq(
TESTDATA_DIR/"word_count/correct/output",
tmpdir/"output",
)


def test_ignores_subdirs(tmpdir):
"""Run a simple MapReduce job with an input directory containing a
subdirectory. The subdirectory should be gracefully ignored.
"""
with tmpdir.as_cwd():
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input_with_subdir",
output_dir="output",
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
Expand Down
4 changes: 4 additions & 0 deletions tests/testdata/word_count/input-single-file.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Hello World
Bye World
Hello Hadoop
Goodbye Hadoop
2 changes: 2 additions & 0 deletions tests/testdata/word_count/input_with_subdir/input01.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello World
Bye World
2 changes: 2 additions & 0 deletions tests/testdata/word_count/input_with_subdir/input02.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Hello Hadoop
Goodbye Hadoop
Empty file.

0 comments on commit a5ded12

Please sign in to comment.