Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
awdeorio committed Nov 6, 2024
2 parents 566dd79 + 5294aa6 commit fec0a45
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 107 deletions.
3 changes: 3 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[run]
concurrency = thread,multiprocessing
parallel = true
4 changes: 3 additions & 1 deletion .github/workflows/continuous_integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ jobs:
# Upload coverage report
# https://github.com/codecov/codecov-action
- name: Upload coverage report
uses: codecov/codecov-action@v1
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
slug: eecs485staff/madoop
fail_ci_if_error: true
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ build/
/.tox/
/.coverage*
*,cover
# Make an exception for .coveragerc, which should be checked into version control.
!.coveragerc

# Text editors and IDEs
*~
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Set up a development virtual environment.
```console
$ python3 -m venv .venv
$ source .venv/bin/activate
$ pip install --editable .[dev,test]
$ pip install --editable .[dev]
```

A `madoop` entry point script is installed in your virtual environment.
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ graft madoop/example

# Avoid dev and and binary files
exclude tox.ini
exclude .coveragerc
exclude .editorconfig
global-exclude *.pyc
global-exclude __pycache__
2 changes: 1 addition & 1 deletion README_Hadoop_Streaming.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Hadoop Streaming in Python
===========================

This tutorial shows how to write MapReduce programs in Python that are compatible with [Hadoop Streaming](https://hadoop.apache.org/docs/r1.2.1/streaming.html). We'll use Python's `itertools.groupby()` function to simplify our code.
This tutorial shows how to write MapReduce programs in Python that are compatible with [Hadoop Streaming](https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html). We'll use Python's `itertools.groupby()` function to simplify our code.

Install Madoop, a light weight MapReduce framework for education. Madoop implements the Hadoop Streaming interface.
```console
Expand Down
223 changes: 126 additions & 97 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,24 @@
import collections
import hashlib
import logging
import math
import pathlib
import shutil
import subprocess
import tempfile
import multiprocessing
import concurrent.futures
from .exceptions import MadoopError


# Large input files are automatically split
MAX_INPUT_SPLIT_SIZE = 2**21 # 2 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than num_reducers
MAX_INPUT_SPLIT_SIZE = 10 * 1024 * 1024 # 10 MB

# Madoop logger
LOGGER = logging.getLogger("madoop")


def mapreduce(
*,
input_path,
output_dir,
map_exe,
Expand All @@ -51,18 +50,15 @@ def mapreduce(
LOGGER.debug("tmpdir=%s", tmpdir)

# Create stage input and output directory
map_input_dir = tmpdir/'input'
map_output_dir = tmpdir/'mapper-output'
reduce_input_dir = tmpdir/'reducer-input'
reduce_output_dir = tmpdir/'output'
map_input_dir.mkdir()
map_output_dir.mkdir()
reduce_input_dir.mkdir()
reduce_output_dir.mkdir()

# Copy and rename input files: part-00000, part-00001, etc.
input_path = pathlib.Path(input_path)
prepare_input_files(input_path, map_input_dir)

# Executables must be absolute paths
map_exe = pathlib.Path(map_exe).resolve()
Expand All @@ -72,7 +68,7 @@ def mapreduce(
LOGGER.info("Starting map stage")
map_stage(
exe=map_exe,
input_dir=map_input_dir,
input_dir=input_path,
output_dir=map_output_dir,
)

Expand All @@ -98,7 +94,7 @@ def mapreduce(
for filename in sorted(reduce_output_dir.glob("*")):
st_size = filename.stat().st_size
total_size += st_size
shutil.copy(filename, output_dir)
shutil.move(filename, output_dir)
output_path = output_dir.parent/last_two(filename)
LOGGER.debug("%s size=%sB", output_path, st_size)

Expand All @@ -107,52 +103,36 @@ def mapreduce(
LOGGER.info("Output directory: %s", output_dir)


def prepare_input_files(input_path, output_dir):
"""Copy and split input files. Rename to part-00000, part-00001, etc.
def split_file(input_filename, max_chunksize):
"""Iterate over the data in a file one chunk at a time."""
with open(input_filename, "rb") as input_file:
buffer = b""

The input_path can be a file or a directory of files. If a file is smaller
than MAX_INPUT_SPLIT_SIZE, then copy it to output_dir. For larger files,
split into blocks of MAX_INPUT_SPLIT_SIZE bytes and write block to
output_dir. Input files will never be combined.
while True:
chunk = input_file.read(max_chunksize)
# Break if no more data remains.
if not chunk:
break

The number of files created will be the number of mappers since we will
assume that the number of tasks per mapper is 1. Apache Hadoop has a
configurable number of tasks per mapper, however for both simplicity and
because our use case has smaller inputs we use 1.
# Add the chunk to the buffer.
buffer += chunk

"""
part_num = 0
total_size = 0
for inpath in normalize_input_paths(input_path):
assert inpath.is_file()

# Compute output filenames
st_size = inpath.stat().st_size
total_size += st_size
n_splits = math.ceil(st_size / MAX_INPUT_SPLIT_SIZE)
n_splits = 1 if not n_splits else n_splits # Handle empty input file
LOGGER.debug(
"input %s size=%sB partitions=%s", inpath, st_size, n_splits
)
outpaths = [
output_dir/part_filename(part_num + i) for i in range(n_splits)
]
part_num += n_splits

# Copy to new output files
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(i.open('w')) for i in outpaths]
infile = stack.enter_context(inpath.open(encoding="utf-8"))
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
outnames = [i.name for i in outpaths]
logging.debug(
"partition %s >> %s/{%s}",
last_two(inpath), outparent.name, ",".join(outnames),
)
for i, line in enumerate(infile):
outfiles[i % n_splits].write(line)
LOGGER.debug("total input size=%sB", total_size)
# Find the last newline character in the buffer. We don't want to
# yield a chunk that ends in the middle of a line; we have to
# respect line boundaries or we'll corrupt the input.
last_newline = buffer.rfind(b"\n")
if last_newline != -1:
# Yield the content up to the last newline, saving the rest
# for the next chunk.
yield buffer[:last_newline + 1]

# Remove processed data from the buffer. The next chunk will
# start with whatever data came after the last newline.
buffer = buffer[last_newline + 1:]

# Yield any remaining data.
if buffer:
yield buffer


def normalize_input_paths(input_path):
Expand Down Expand Up @@ -208,30 +188,51 @@ def part_filename(num):
return f"part-{num:05d}"


def map_single_chunk(exe, input_path, output_path, chunk):
"""Execute mapper on a single chunk."""
with output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
input=chunk,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def map_stage(exe, input_dir, output_dir):
"""Execute mappers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir()), 1):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
part_num = 0
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for input_path in normalize_input_paths(input_dir):
for chunk in split_file(input_path, MAX_INPUT_SPLIT_SIZE):
output_path = output_dir/part_filename(part_num)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
LOGGER.info("Finished map executions: %s", i)
futures.append(pool.submit(
map_single_chunk,
exe,
input_path,
output_path,
chunk,
))
part_num += 1
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished map executions: %s", part_num)


def sort_file(path):
Expand Down Expand Up @@ -287,7 +288,9 @@ def partition_keys_custom(
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
# pylint: disable=too-many-arguments,too-many-locals
# pylint: disable=too-many-arguments
# pylint: disable=too-many-positional-arguments
# pylint: disable=too-many-locals
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
Expand Down Expand Up @@ -392,35 +395,61 @@ def group_stage(input_dir, output_dir, num_reducers, partitioner):
path.unlink()

# Sort output files
for path in sorted(output_dir.iterdir()):
sort_file(path)
try:
# Don't use a with statement here, because Coverage won't be able to
# detect code running in a subprocess if we do.
# https://pytest-cov.readthedocs.io/en/latest/subprocess-support.html
# pylint: disable=consider-using-with
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
pool.map(sort_file, sorted(output_dir.iterdir()))
finally:
pool.close()
pool.join()

log_output_key_stats(output_keys_stats, output_dir)


def reduce_single_file(exe, input_path, output_path):
"""Execute reducer on a single file."""
with input_path.open() as infile, output_path.open("w") as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err


def reduce_stage(exe, input_dir, output_dir):
"""Execute reducers."""
i = 0
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
with input_path.open() as infile, output_path.open('w') as outfile:
try:
subprocess.run(
str(exe),
shell=False,
check=True,
stdin=infile,
stdout=outfile,
)
except subprocess.CalledProcessError as err:
raise MadoopError(
f"Command returned non-zero: "
f"{exe} < {input_path} > {output_path}"
) from err
futures = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=multiprocessing.cpu_count()
) as pool:
for i, input_path in enumerate(sorted(input_dir.iterdir())):
output_path = output_dir/part_filename(i)
LOGGER.debug(
"%s < %s > %s",
exe.name, last_two(input_path), last_two(output_path),
)
futures.append(pool.submit(
reduce_single_file,
exe,
input_path,
output_path,
))
for future in concurrent.futures.as_completed(futures):
exception = future.exception()
if exception:
raise exception
LOGGER.info("Finished reduce executions: %s", i+1)


Expand Down
7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "madoop"
version = "1.2.2"
version = "1.3.0"
description="A light weight MapReduce framework for education."
license = {file = "LICENSE"}
authors = [
Expand All @@ -26,15 +26,14 @@ madoop = "madoop.__main__:main"
[project.optional-dependencies]
dev = [
"build",
"twine",
"tox",
"check-manifest",
"freezegun",
"pycodestyle",
"pydocstyle",
"pylint",
"pytest",
"pytest-cov",
"tox",
"twine",
]

[tool.setuptools.packages.find]
Expand Down
Loading

0 comments on commit fec0a45

Please sign in to comment.