From 4a69a8faf7f412a020979dca097caa1fb840bc29 Mon Sep 17 00:00:00 2001 From: seanjensengrey Date: Wed, 16 Feb 2011 19:29:13 -0800 Subject: [PATCH] path finding and quoting made a little more fault tolerant. Default paramters to launch modified to more noob friendly. * single quote for shell safety, double quote for strings with possible $ expansion == streaming.jar search == * Finding streaming.jar more is more resilient to paths with symlinks. * switched running find in a subshell * warns user if HADOOP_HOME is not set == in hadoopy.launch() following changes == * use_typedbytes=False, use_seqoutput=False, * use_autoinput=True * add_python=False * python_cmd=None, if you pass in python bin path, be explicit If you specify add_python=True, it will use sys.executable, if you need to override sys.executable, use python_cmd='path/to/python' The changes to _runner.py.launch() should make it a little more friendly out of the box. If you need more advanced features you can turn those on with the above named parameters. The changes to how the python interperter is located make it possible to easily intergrate non-system python installs (like Python 2.6 running on Centos 5.5). Tested on: * Python 2.6.6 x86_64 * Hadoop 0.20.2+737 --- hadoopy/_runner.py | 39 ++++++++++++++++++++++++++------------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/hadoopy/_runner.py b/hadoopy/_runner.py index 8902727..7c7704b 100644 --- a/hadoopy/_runner.py +++ b/hadoopy/_runner.py @@ -25,6 +25,7 @@ import tempfile import hadoopy._freeze +import pdb def _find_hstreaming(): """Finds the whole path to the hadoop streaming jar. @@ -39,19 +40,29 @@ def _find_hstreaming(): try: search_root = os.environ['HADOOP_HOME'] except KeyError: - search_root = '/' - cmd = 'find %s -name hadoop*streaming*.jar' % (search_root) - p = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, + # cloudera default install + if os.path.isdir('/usr/lib/hadoop'): + sys.stderr.write('HADOOP_HOME not set, found /usr/lib/hadoop, searching for streaming jar\n') + search_root = '/usr/lib/hadoop' + else: + sys.stderr.write('HADOOP_HOME not set, falling back to /, please set HADOOP_HOME\n') + search_root = '/' + cmd = "find -L %s -name 'hadoop*streaming*.jar'" % (search_root) + p = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return p.communicate()[0].split('\n')[0] + streamingjar = p.communicate()[0].split('\n')[0] + if not os.path.isfile(streamingjar): + raise Exception('cannot find streaming jar') + else: + return streamingjar def launch(in_name, out_name, script_path, mapper=True, reducer=True, combiner=False, partitioner=False, files=(), jobconfs=(), cmdenvs=(), copy_script=True, hstreaming=None, name=None, - use_typedbytes=True, use_seqoutput=True, use_autoinput=True, - pretend=False, add_python=True, config=None, - python_cmd="python", num_mappers=None, num_reducers=None, + use_typedbytes=False, use_seqoutput=False, use_autoinput=True, + pretend=False, add_python=False, config=None, + python_cmd=None, num_mappers=None, num_reducers=None, script_dir='',**kw): """Run Hadoop given the parameters @@ -98,12 +109,14 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, subprocess.CalledProcessError: Hadoop error. OSError: Hadoop streaming not found. """ - try: + if hstreaming: hadoop_cmd = 'hadoop jar ' + hstreaming - except TypeError: + else: hadoop_cmd = 'hadoop jar ' + _find_hstreaming() job_name = os.path.basename(script_path) if add_python: + if not python_cmd: + python_cmd = sys.executable script_name = '%s %s' % (python_cmd, os.path.basename(script_path)) else: script_name = os.path.basename(script_path) @@ -120,7 +133,7 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, if isinstance(in_name, str): in_name = [in_name] for f in in_name: - cmd += ['-input', f] + cmd += ['-input', "'%s'" % (f,)] # Add mapper/reducer cmd += ['-mapper', '"%s"' % (mapper)] @@ -137,9 +150,9 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, cmd += ['-partitioner', '"%s"' % (partitioner)] if num_mappers: - cmd += ['-numMapTasks', "'%i'"%(int(num_mappers))] + cmd += ['-numMapTasks', '%i' % (int(num_mappers),)] if num_reducers: - cmd += ['-numReduceTasks', "'%i'"%(int(num_reducers))] + cmd += ['-numReduceTasks', '%i' %(int(num_reducers),)] # Add files if isinstance(files, str): files = [files] @@ -157,7 +170,7 @@ def launch(in_name, out_name, script_path, mapper=True, reducer=True, del new_files # END BUG for f in files: - cmd += ['-file', f] + cmd += ['-file', "'%s'" % (f,)] # Add jobconfs if isinstance(jobconfs, str): jobconfs = [jobconfs]