Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
awdeorio committed Mar 29, 2024
2 parents f2f9dac + 63a8f6e commit b00a513
Show file tree
Hide file tree
Showing 15 changed files with 259 additions and 39 deletions.
12 changes: 6 additions & 6 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ $ tox -e py3
Update your local `develop` branch. Make sure it's clean.
```console
$ git fetch
$ git checkout develop
$ git switch develop
$ git rebase
$ git status
```
Expand All @@ -56,24 +56,24 @@ $ tox -e py3

Update version
```console
$ $EDITOR setup.py
$ git commit -m "version bump" setup.py
$ $EDITOR pyproject.toml
$ git commit -m "version bump" pyproject.toml
$ git push origin develop
```

Update main branch
```console
$ git fetch
$ git checkout main
$ git switch main
$ git rebase
$ git merge --no-ff origin/develop
```

Tag a release
```console
$ git tag -a X.Y.Z
$ grep version= setup.py
version="X.Y.Z",
$ grep version pyproject.toml
version = "X.Y.Z"
$ git describe
X.Y.Z
$ git push --tags origin main
Expand Down
8 changes: 7 additions & 1 deletion madoop/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ def main():
'-numReduceTasks', dest='num_reducers', default=4,
help="max number of reducers"
)
optional_args.add_argument(
'-partitioner', dest='partitioner', default=None,
help=("executable that computes a partition for each key-value pair "
"of map output: default is hash(key) %% num_reducers"),
)
required_args = parser.add_argument_group('required arguments')
required_args.add_argument('-input', dest='input', required=True)
required_args.add_argument('-output', dest='output', required=True)
Expand All @@ -64,7 +69,8 @@ def main():
output_dir=args.output,
map_exe=args.mapper,
reduce_exe=args.reducer,
num_reducers=int(args.num_reducers)
num_reducers=int(args.num_reducers),
partitioner=args.partitioner,
)
except MadoopError as err:
sys.exit(f"Error: {err}")
Expand Down
124 changes: 99 additions & 25 deletions madoop/mapreduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


# Large input files are automatically split
MAX_INPUT_SPLIT_SIZE = 2**20 # 1 MB
MAX_INPUT_SPLIT_SIZE = 2**21 # 2 MB

# The number of reducers is dynamically determined by the number of unique keys
# but will not be more than num_reducers
Expand All @@ -25,8 +25,16 @@
LOGGER = logging.getLogger("madoop")


def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
def mapreduce(
input_path,
output_dir,
map_exe,
reduce_exe,
num_reducers,
partitioner=None,
):
"""Madoop API."""
# pylint: disable=too-many-arguments
# Do not clobber existing output directory
output_dir = pathlib.Path(output_dir)
if output_dir.exists():
Expand Down Expand Up @@ -73,7 +81,8 @@ def mapreduce(input_path, output_dir, map_exe, reduce_exe, num_reducers):
group_stage(
input_dir=map_output_dir,
output_dir=reduce_input_dir,
num_reducers=num_reducers
num_reducers=num_reducers,
partitioner=partitioner,
)

# Run the reducing stage
Expand Down Expand Up @@ -178,13 +187,13 @@ def is_executable(exe):
try:
subprocess.run(
str(exe),
shell=True,
shell=False,
input="".encode(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as err:
except (subprocess.CalledProcessError, OSError) as err:
raise MadoopError(f"Failed executable test: {err}") from err


Expand Down Expand Up @@ -212,7 +221,7 @@ def map_stage(exe, input_dir, output_dir):
try:
subprocess.run(
str(exe),
shell=True,
shell=False,
check=True,
stdin=infile,
stdout=outfile,
Expand Down Expand Up @@ -240,7 +249,7 @@ def keyhash(key):
return int(hexdigest, base=16)


def partition_keys(
def partition_keys_default(
inpath,
outpaths,
input_keys_stats,
Expand All @@ -250,7 +259,6 @@ def partition_keys(
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
Expand All @@ -266,7 +274,80 @@ def partition_keys(
output_keys_stats[outpath].add(key)


def group_stage(input_dir, output_dir, num_reducers):
def partition_keys_custom(
inpath,
outpaths,
input_keys_stats,
output_keys_stats,
num_reducers,
partitioner,
):
"""Allocate lines of inpath among outpaths using a custom partitioner.
Update the data structures provided by the caller input_keys_stats and
output_keys_stats. Both map a filename to a set of of keys.
"""
# pylint: disable=too-many-arguments,too-many-locals
assert len(outpaths) == num_reducers
outparent = outpaths[0].parent
assert all(i.parent == outparent for i in outpaths)
with contextlib.ExitStack() as stack:
outfiles = [stack.enter_context(p.open("a")) for p in outpaths]
process = stack.enter_context(subprocess.Popen(
[partitioner, str(num_reducers)],
stdin=stack.enter_context(inpath.open()),
stdout=subprocess.PIPE,
text=True,
))
for line, partition in zip(
stack.enter_context(inpath.open()),
stack.enter_context(process.stdout)
):
try:
partition = int(partition)
except ValueError as err:
raise MadoopError(
"Partition executable returned non-integer value: "
f"{partition} for line '{line}'."
) from err
if not 0 <= partition < num_reducers:
raise MadoopError(
"Partition executable returned invalid value: "
f"0 <= {partition} < {num_reducers} for line '{line}'."
)
key = line.partition('\t')[0]
input_keys_stats[inpath].add(key)
outfiles[partition].write(line)
outpath = outpaths[partition]
output_keys_stats[outpath].add(key)

return_code = process.wait()
if return_code:
raise MadoopError(
f"Partition executable returned non-zero: {str(partitioner)}"
)


def log_input_key_stats(input_keys_stats, input_dir):
"""Log input key stats."""
all_input_keys = set()
for inpath, keys in sorted(input_keys_stats.items()):
all_input_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys))


def log_output_key_stats(output_keys_stats, output_dir):
"""Log output keyspace stats."""
all_output_keys = set()
for outpath, keys in sorted(output_keys_stats.items()):
all_output_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", output_dir.name,
len(all_output_keys))


def group_stage(input_dir, output_dir, num_reducers, partitioner):
"""Run group stage.
Process each mapper output file, allocating lines to grouper output files
Expand All @@ -285,15 +366,14 @@ def group_stage(input_dir, output_dir, num_reducers):

# Partition input, appending to output files
for inpath in sorted(input_dir.iterdir()):
partition_keys(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
if not partitioner:
partition_keys_default(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers)
else:
partition_keys_custom(inpath, outpaths, input_keys_stats,
output_keys_stats, num_reducers, partitioner)

# Log input keyspace stats
all_input_keys = set()
for inpath, keys in sorted(input_keys_stats.items()):
all_input_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(inpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", input_dir.name, len(all_input_keys))
log_input_key_stats(input_keys_stats, input_dir)

# Log partition input and output filenames
outnames = [i.name for i in outpaths]
Expand All @@ -315,13 +395,7 @@ def group_stage(input_dir, output_dir, num_reducers):
for path in sorted(output_dir.iterdir()):
sort_file(path)

# Log output keyspace stats
all_output_keys = set()
for outpath, keys in sorted(output_keys_stats.items()):
all_output_keys.update(keys)
LOGGER.debug("%s unique_keys=%s", last_two(outpath), len(keys))
LOGGER.debug("%s all_unique_keys=%s", output_dir.name,
len(all_output_keys))
log_output_key_stats(output_keys_stats, output_dir)


def reduce_stage(exe, input_dir, output_dir):
Expand All @@ -337,7 +411,7 @@ def reduce_stage(exe, input_dir, output_dir):
try:
subprocess.run(
str(exe),
shell=True,
shell=False,
check=True,
stdin=infile,
stdout=outfile,
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "madoop"
version = "1.1.0"
version = "1.2.0"
description="A light weight MapReduce framework for education."
license = {file = "LICENSE"}
authors = [
Expand All @@ -25,7 +25,6 @@ madoop = "madoop.__main__:main"

[project.optional-dependencies]
dev = [
"pdbpp",
"build",
"twine",
"tox",
Expand Down
Loading

0 comments on commit b00a513

Please sign in to comment.