diff --git a/mrjob/emr.py b/mrjob/emr.py index 143edf436..3a9c584ee 100644 --- a/mrjob/emr.py +++ b/mrjob/emr.py @@ -1121,6 +1121,7 @@ def _create_cluster(self, persistent=False): emr_client = self.make_emr_client() kwargs = self._cluster_kwargs(persistent) + kwargs["Tags"] = self._build_tags() log.debug('Calling run_job_flow(%s)' % ( ', '.join('%s=%r' % (k, v) for k, v in sorted(kwargs.items())))) @@ -1128,6 +1129,13 @@ def _create_cluster(self, persistent=False): log.info('Created new cluster %s' % cluster_id) + return cluster_id + + def _build_tags(self): + """ + Build tags to be set on cluster creation + """ + # set EMR tags for the cluster tags = dict(self._opts['tags']) @@ -1143,25 +1151,7 @@ def _create_cluster(self, persistent=False): tags['__mrjob_pool_hash'] = self._pool_hash() tags['__mrjob_pool_name'] = self._opts['pool_name'] - self._add_tags(tags, cluster_id) - - return cluster_id - - def _add_tags(self, tags, cluster_id): - """Add tags in the dict *tags* to cluster *cluster_id*. Do nothing - if *tags* is empty or ``None``""" - if not tags: - return - - tags_items = sorted(tags.items()) - - self.make_emr_client().add_tags( - ResourceId=cluster_id, - Tags=[dict(Key=k, Value=v) for k, v in tags_items]) - - log.info('Added EMR tags to cluster %s: %s' % ( - cluster_id, - ', '.join('%s=%s' % (tag, value) for tag, value in tags_items))) + return [dict(Key=str(k), Value='' if v is None else str(v)) for k, v in tags.items()] # TODO: could break this into sub-methods for clarity def _cluster_kwargs(self, persistent=False):