Skip to content

Commit

Permalink
Merge pull request #70 from eecs485staff/performance-optimizations
Browse files Browse the repository at this point in the history
Optimizations
  • Loading branch information
awdeorio authored Nov 6, 2024
2 parents 5563c74 + 6da2dd6 commit 5bea37d
Show file tree
Hide file tree
Showing 8 changed files with 216 additions and 99 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
concurrency = thread,multiprocessing
parallel = true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ build/
/.tox/
/.coverage*
*,cover
# Make an exception for .coveragerc, which should be checked into version control.
!.coveragerc

# Text editors and IDEs
*~
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ graft madoop/example

# Avoid dev and and binary files
exclude tox.ini
exclude .coveragerc
exclude .editorconfig
global-exclude *.pyc
global-exclude __pycache__
218 changes: 122 additions & 96 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,17 @@
import collections
import hashlib
import logging
import math
import pathlib
import shutil
import subprocess
import tempfile
import multiprocessing
import concurrent.futures
from .exceptions import MadoopError


# Large input files are automatically split
MAX_INPUT_SPLIT_SIZE = 2**21 # 2 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than num_reducers
MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB

# Madoop logger
LOGGER = logging.getLogger("madoop")
Expand Down Expand Up @@ -52,18 +50,15 @@ def mapreduce(
LOGGER.debug("tmpdir=%s", tmpdir)

# Create stage input and output directory
map_input_dir = tmpdir/'input'
map_output_dir = tmpdir/'mapper-output'
reduce_input_dir = tmpdir/'reducer-input'
reduce_output_dir = tmpdir/'output'
map_input_dir.mkdir()
map_output_dir.mkdir()
reduce_input_dir.mkdir()
reduce_output_dir.mkdir()

# Copy and rename input files: part-00000, part-00001, etc.
input_path = pathlib.Path(input_path)
prepare_input_files(input_path, map_input_dir)

# Executables must be absolute paths
map_exe = pathlib.Path(map_exe).resolve()
Expand All @@ -73,7 +68,7 @@ def mapreduce(
LOGGER.info("Starting map stage")
map_stage(
exe=map_exe,
input_dir=map_input_dir,
input_dir=input_path,
output_dir=map_output_dir,
)

Expand All @@ -99,7 +94,7 @@ def mapreduce(
for filename in sorted(reduce_output_dir.glob("*")):
st_size = filename.stat().st_size
total_size += st_size
shutil.copy(filename, output_dir)
shutil.move(filename, output_dir)
output_path = output_dir.parent/last_two(filename)
LOGGER.debug("%s size=%sB", output_path, st_size)

Expand All @@ -108,52 +103,36 @@ def mapreduce(
LOGGER.info("Output directory: %s", output_dir)


def prepare_input_files(input_path, output_dir):
"""Copy and split input files. Rename to part-00000, part-00001, etc.
def split_file(input_filename, max_chunksize):
"""Iterate over the data in a file one chunk at a time."""
with open(input_filename, "rb") as input_file:
buffer = b""

The input_path can be a file or a directory of files. If a file is smaller
than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
output_dir. Input files will never be combined.
while True:
chunk = input_file.read(max_chunksize)
# Break if no more data remains.
if not chunk:
break

The number of files created will be the number of mappers since we will
assume that the number of tasks per mapper is 1. Apache Hadoop has a
configurable number of tasks per mapper, however for both simplicity and
because our use case has smaller inputs we use 1.
# Add the chunk to the buffer.
buffer += chunk

"""
part_num = 0
total_size = 0
for inpath in normalize_input_paths(input_path):
assert inpath.is_file()

# Compute output filenames
st_size = inpath.stat().st_size
total_size += st_size
n_splits = math.ceil(st_size / MAX_INPUT_SPLIT_SIZE)
n_splits = 1 if not n_splits else n_splits # Handle empty input file
LOGGER.debug(
"input %s size=%sB partitions=%s", inpath, st_size, n_splits
)
outpaths = [
output_dir/part_filename(part_num + i) for i in range(n_splits)
]
part_num += n_splits

# Copy to new output files
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(i.open('w')) for i in outpaths]
infile = stack.enter_context(inpath.open(encoding="utf-8"))
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
outnames = [i.name for i in outpaths]
logging.debug(
"partition %s >> %s/{%s}",
last_two(inpath), outparent.name, ",".join(outnames),
)
for i, line in enumerate(infile):
outfiles[i % n_splits].write(line)
LOGGER.debug("total input size=%sB", total_size)
# Find the last newline character in the buffer. We don't want to
# yield a chunk that ends in the middle of a line; we have to
# respect line boundaries or we'll corrupt the input.
last_newline = buffer.rfind(b"\n")
if last_newline != -1:
# Yield the content up to the last newline, saving the rest
# for the next chunk.
yield buffer[:last_newline + 1]

# Remove processed data from the buffer. The next chunk will
# start with whatever data came after the last newline.
buffer = buffer[last_newline + 1:]

# Yield any remaining data.
if buffer:
yield buffer


def normalize_input_paths(input_path):
Expand Down Expand Up @@ -209,30 +188,51 @@ def part_filename(num):
return f"part-{num:05d}"


def map_single_chunk(exe, input_path, output_path, chunk):
"""Execute mapper on a single chunk."""
with output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
input=chunk,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def map_stage(exe, input_dir, output_dir):
"""Execute mappers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir()), 1):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
part_num = 0
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for input_path in normalize_input_paths(input_dir):
for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE):
output_path = output_dir/part_filename(part_num)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
LOGGER.info("Finished map executions: %s", i)
futures.append(pool.submit(
map_single_chunk,
exe,
input_path,
output_path,
chunk,
))
part_num += 1
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished map executions: %s", part_num)


def sort_file(path):
Expand Down Expand Up @@ -395,35 +395,61 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
path.unlink()

# Sort output files
for path in sorted(output_dir.iterdir()):
sort_file(path)
try:
# Don't use a with statement here, because Coverage won't be able to
# detect code running in a subprocess if we do.
# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
# pylint: disable=consider-using-with
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
pool.map(sort_file, sorted(output_dir.iterdir()))
finally:
pool.close()
pool.join()

log_output_key_stats(output_keys_stats, output_dir)


def reduce_single_file(exe, input_path, output_path):
"""Execute reducer on a single file."""
with input_path.open() as infile, output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def reduce_stage(exe, input_dir, output_dir):
"""Execute reducers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
futures.append(pool.submit(
reduce_single_file,
exe,
input_path,
output_path,
))
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished reduce executions: %s", i+1)


Expand Down
31 changes: 31 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""System tests for the API interface."""
from pathlib import Path
import pytest
import madoop
from madoop.mapreduce import map_stage, reduce_stage
from . import utils
from .utils import TESTDATA_DIR

Expand Down Expand Up @@ -56,6 +58,18 @@ def test_bash_executable(tmpdir):
)


def test_output_already_exists(tmpdir):
"""Output already existing should raise an error."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
madoop.mapreduce(
input_path=TESTDATA_DIR/"word_count/input",
output_dir=tmpdir,
map_exe=TESTDATA_DIR/"word_count/map.py",
reduce_exe=TESTDATA_DIR/"word_count/reduce.py",
num_reducers=2,
)


def test_bad_map_exe(tmpdir):
"""Map exe returns non-zero should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
Expand Down Expand Up @@ -94,6 +108,23 @@ def test_noninteger_partition_exe(tmpdir):
partitioner=TESTDATA_DIR/"word_count/partition_noninteger.py",
)

with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
map_stage(
exe=TESTDATA_DIR/"word_count/map_invalid.py",
input_dir=TESTDATA_DIR/"word_count/input",
output_dir=Path(tmpdir),
)


def test_bad_reduce_exe(tmpdir):
"""Reduce exe returns non-zero should produce an error message."""
with tmpdir.as_cwd(), pytest.raises(madoop.MadoopError):
reduce_stage(
exe=TESTDATA_DIR/"word_count/reduce_exit_1.py",
input_dir=TESTDATA_DIR/"word_count/input",
output_dir=Path(tmpdir),
)


def test_missing_shebang(tmpdir):
"""Reduce exe with a bad shebag should produce an error message."""
Expand Down
4 changes: 2 additions & 2 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""System tests for the command line interface."""
import subprocess
import pkg_resources
import importlib.metadata
import pytest
from . import utils
from .utils import TESTDATA_DIR
Expand All @@ -15,7 +15,7 @@ def test_version():
)
output = result.stdout.decode("utf-8")
assert "Madoop" in output
assert pkg_resources.get_distribution("madoop").version in output
assert importlib.metadata.version("madoop") in output


def test_help():
Expand Down
Loading

0 comments on commit 5bea37d

Please sign in to comment.