Skip to content

Commit

Permalink
Merge branch 'dev'
Browse files Browse the repository at this point in the history
* dev:
  add test for invalid recipe
  improve depedency checking
  bug fix for trend_bridge
  bug fix for translate_column
  • Loading branch information
semio committed Mar 16, 2017
2 parents 82b7171 + e26caa0 commit c5e1059
Show file tree
Hide file tree
Showing 21 changed files with 105 additions and 18 deletions.
4 changes: 4 additions & 0 deletions ddf_utils/chef/cook.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,10 @@ def run_recipe(recipe, serve=False, outpath=None):
# create DAG of recipe
dag = build_dag(recipe_)

# check all ingredients availability
for root in dag.roots:
root.detect_missing_dependency()

# now run the recipe
dishes = get_dishes(recipe_)

Expand Down
20 changes: 18 additions & 2 deletions ddf_utils/chef/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,22 @@ def detect_downstream_cycle(self, node=None):
t.detect_downstream_cycle(node=node)
return False

def detect_missing_dependency(self):
"""
check if every upstream is available in the DAG.
raise error if something is missing
"""
not_found = set()
for n in self.upstream_list:
if not self.dag.has_node(n.node_id):
not_found.add(n.node_id)
if isinstance(n, ProcedureNode) and not n.procedure:
not_found.add(n.node_id)
if len(not_found) > 0:
raise ChefRuntimeError(
"dependency not found/not definded for {}: {}".format(self.node_id, not_found))
return False


class IngredientNode(BaseNode):
"""Node for storing dataset ingredients.
Expand Down Expand Up @@ -116,8 +132,8 @@ def evaluate(self):
func = getattr(pc, self.procedure['procedure'])
except AttributeError:
raise ProcedureError("Not supported: " + self.procedure['procedure'])
except TypeError:
raise ProcedureError("Procedure Error: " + str(self.node_id))
# except TypeError:
# raise ProcedureError("Procedure Error: " + str(self.node_id))

# check the base ingredients and convert the string id to actual ingredient
ingredients = []
Expand Down
9 changes: 4 additions & 5 deletions ddf_utils/chef/procedure.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,14 +944,14 @@ def trend_bridge(ingredient: BaseIngredient, bridge_start, bridge_end, bridge_le
# calculate trend bridge on each group
res_grouped = []
for g, df in start_group:
gstart = df
gstart = df.copy()
try:
gend = end_group.get_group(g)
except KeyError: # no new data available for this group
logger.warning("no data for group: " + g)
logger.warning("no data for bridge end: " + g)
bridged = gstart[bridge_start['column']]

bridged = tb(gstart[bridge_start['column']], gend[bridge_end['column']], bridge_length)
else:
bridged = tb(gstart[bridge_start['column']], gend[bridge_end['column']], bridge_length)

res_grouped.append((g, bridged))

Expand All @@ -976,4 +976,3 @@ def trend_bridge(ingredient: BaseIngredient, bridge_start, bridge_end, bridge_le
return ProcedureResult(result, start.key, merged)
else:
return ProcedureResult(result, start.key, {target_col: result_data})

10 changes: 8 additions & 2 deletions ddf_utils/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ def _translate_column_inline(df, column, target_column, dictionary,
df_new[target_column] = df_new[column].map(
lambda x: dictionary[x])
if not_found == 'include':
df_new[target_column] = df_new[column].map(
lambda x: dictionary[x] if x in dictionary.keys() else x)
if target_column not in df_new.columns:
# create a new column: if a key not in the mappings, return NaN
df_new[target_column] = df_new[column].map(
lambda x: dictionary[x] if x in dictionary.keys() else np.nan)
else: # if a key not in the mappings, use the original value
for i, x in df_new[column].iteritems():
if x in dictionary.keys():
df_new.ix[i, target_column] = dictionary[x]

return df_new

Expand Down
39 changes: 39 additions & 0 deletions tests/recipes_fail/test_trend_bridge.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
info:
id: test-trend_bridge

ingredients:
- id: cme1
dataset: ddf--cme
key: country, year
value: '*'
- id: cme2
dataset: ddf--cme
key: country, year
value: '*'

cooking:
datapoints:
- procedure: filter_row
ingredients:
- cme1
options:
dictionary:
new_col:
from: imr_median
year: [2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008,
2009, 2010, 2011, 2012, 2013]
result: cme3
- procedure: trend_bridge
ingredients:
- cme1
options:
bridge_start:
column: imr_lower
bridge_end:
ingredient: cme1231
column: new_col
bridge_length: 10
bridge_on: year
target_col: bridged
result: res

File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
41 changes: 32 additions & 9 deletions tests/test_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,44 @@
import pytest
import glob

all_test_recipes = glob.glob('recipes/test_*')
test_recipes_pass = glob.glob('recipes_pass/test_*')
test_recipes_fail = glob.glob('recipes_fail/test_*')


@pytest.fixture(scope='session',
params=test_recipes_pass)
def recipe_file_pass(request):
return request.param


@pytest.fixture(scope='session',
params=all_test_recipes)
def recipe_file(request):
params=test_recipes_fail)
def recipe_file_fail(request):
return request.param


def test_run_recipe(recipe_file, to_disk=False):
print('running test: ' + recipe_file)
recipe = chef.build_recipe(recipe_file)
def test_run_recipe_pass(recipe_file_pass, to_disk=False):
print('running test: ' + recipe_file_pass)
recipe = chef.build_recipe(recipe_file_pass)
if to_disk:
outdir = tempfile.mkdtemp()
print('tmpdir: ' + outdir)
else:
outdir = None

chef.run_recipe(recipe, to_disk, outdir)


def test_run_recipe_fail(recipe_file_fail, to_disk=False):
print('running test: ' + recipe_file_fail)
recipe = chef.build_recipe(recipe_file_fail)
if to_disk:
outdir = tempfile.mkdtemp()
print('tmpdir: ' + outdir)
chef.run_recipe(recipe, True, outdir)
else:
_ = chef.run_recipe(recipe)
assert 1
outdir = None
try:
chef.run_recipe(recipe, to_disk, outdir)
except:
return
assert 0

0 comments on commit c5e1059

Please sign in to comment.