diff --git a/exasol_udf_mock_python/mock_context.py b/exasol_udf_mock_python/mock_context.py index 7d8b8a2..86635c2 100644 --- a/exasol_udf_mock_python/mock_context.py +++ b/exasol_udf_mock_python/mock_context.py @@ -1,4 +1,5 @@ -from typing import List, Tuple, Iterator +from typing import List, Tuple, Iterator, Iterable, Any, Optional, Union +from functools import wraps import pandas as pd @@ -8,50 +9,190 @@ from exasol_udf_mock_python.udf_context import UDFContext +def check_context(f): + """ + Decorator checking that a MockContext object has valid current group context. + Raises a RuntimeError if this is not the case. + """ + @wraps(f) + def wrapper(self, *args, **kwargs): + if self.no_context: + raise RuntimeError('Calling UDFContext interface when the current group context ' + 'is invalid is disallowed') + return f(self, *args, **kwargs) + + return wrapper + + +def validate_emit(row: Tuple, columns: List[Column]): + """ + Validates that a data row to be emitted corresponds to the definition of the output columns. + The number of elements in the row should match the number of columns and the type of each + element should match the type of the correspondent column. Raises a ValueError if the first + condition is false or a TypeError if the second condition is false. + + :param row: Data row + :param columns: Column definition. + """ + if len(row) != len(columns): + raise ValueError(f"row {row} has not the same number of values as columns are defined") + for i, column in enumerate(columns): + if row[i] is not None and not isinstance(row[i], column.type): + raise TypeError(f"Value {row[i]} ({type(row[i])}) at position {i} is not a {column.type}") + + class MockContext(UDFContext): + """ + Implementation of generic UDF Mock Context interface for a SET UDF with groups. + This class allows iterating over groups. The functionality of the UDF Context are applicable + for the current input group. + + Call `next_group` to iterate over groups. The `output_groups` property provides the emit + output for all groups iterated so far including the output for the current group. + + Calling any function of the UDFContext interface when the group iterator has passed the end + or before the first call to the `next_group` is illegal and will cause a RuntimeException. + """ def __init__(self, input_groups: Iterator[Group], metadata: MockMetaData): + """ + :param input_groups: Input groups. Each group object should contain input rows for the group. + + :param metadata: The mock metadata object. + """ + self._input_groups = input_groups - self._output_groups = [] - self._input_group = None # type: Group - self._output_group_list = None # type: List - self._output_group = None # type: Group - self._iter = None # type: Iterator[Tuple] - self._len = None # type: int self._metadata = metadata - self._name_position_map = \ - {column.name: position - for position, column - in enumerate(metadata.input_columns)} + """ Mock context for the current group """ + self._current_context: Optional[StandaloneMockContext] = None + """ Output for all groups """ + self._previous_output: List[Group] = [] + + @property + def no_context(self) -> bool: + """Returns True if the current group context is invalid""" + return self._current_context is None - def _next_group(self): + def next_group(self) -> bool: + """ + Moves group iterator to the next group. + Returns False if the iterator gets beyond the last group. Returns True otherwise. + """ + + # Save output of the current group + if self._current_context is not None: + self._previous_output.append(Group(self._current_context.output)) + self._current_context = None + + # Try get to the next input group try: - self._input_group = next(self._input_groups) + input_group = next(self._input_groups) except StopIteration as e: - self._data = None - self._output_group_list = None - self._output_group = None - self._input_group = None - self._iter = None - self._len = None return False - self._len = len(self._input_group) - if self._len == 0: - self._data = None - self._output_group_list = None - self._output_group = None - self._input_group = None - self._iter = None - self._len = None - raise RuntimeError("Empty input groups are not allowd") - self._output_group_list = [] - self._output_group = Group(self._output_group_list) - self._output_groups.append(self._output_group) - self._iter = iter(self._input_group) - self.next() + if len(input_group) == 0: + raise RuntimeError("Empty input groups are not allowed") + + # Create Mock Context for the new input group + self._current_context = StandaloneMockContext(input_group, self._metadata) return True - def _is_positive_integer(self, value): + @property + def output_groups(self): + """ + Output of all groups including the current one. + """ + if self._current_context is None: + return self._previous_output + else: + groups = list(self._previous_output) + groups.append(Group(self._current_context.output)) + return groups + + @check_context + def __getattr__(self, name): + return getattr(self._current_context, name) + + @check_context + def get_dataframe(self, num_rows: Union[str, int], start_col: int = 0) -> Optional[pd.DataFrame]: + return self._current_context.get_dataframe(num_rows, start_col) + + @check_context + def next(self, reset: bool = False) -> bool: + return self._current_context.next(reset) + + @check_context + def size(self) -> int: + return self._current_context.size() + + @check_context + def reset(self) -> None: + self._current_context.reset() + + @check_context + def emit(self, *args) -> None: + self._current_context.emit(*args) + + +def get_scalar_input(inp: Any) -> Iterable[Tuple[Any, ...]]: + """ + Figures out if the SCALAR parameters are provided as a scalar value or a tuple + and also if there is a wrapping container around. + Unless the parameters are already in a wrapping container returns parameters as a tuple wrapped + into a one-item list, e.g [(param1[, param2, ...)]. Otherwise, returns the original input. + + :param inp: Input parameters. + """ + + if isinstance(inp, Iterable) and not isinstance(inp, str): + row1 = next(iter(inp)) + if isinstance(row1, Iterable) and not isinstance(row1, str): + return inp + else: + return [inp] + else: + return [(inp,)] + + +class StandaloneMockContext(UDFContext): + """ + Implementation of generic UDF Mock Context interface a SCALAR UDF or a SET UDF with no groups. + + For Emit UDFs the output in the form of the list of tuples can be + accessed by reading the `output` property. + """ + + def __init__(self, inp: Any, metadata: MockMetaData): + """ + :param inp: Input rows for a SET UDF or parameters for a SCALAR one. + In the former case the input object must be an iterable of rows. This, for example, + can be a Group object. It must implement the __len__ method. Each data row must be + an indexable container, e.g. a tuple. + In the SCALAR case the input can be a scalar value, or tuple. This can also be wrapped + in an iterable container, similar to the SET case. + + :param metadata: The mock metadata object. + """ + if metadata.input_type.upper() == 'SCALAR': + self._input = get_scalar_input(inp) + else: + self._input = inp + self._metadata = metadata + self._data: Optional[Any] = None + self._iter: Optional[Iterator[Tuple[Any, ...]]] = None + self._name_position_map = \ + {column.name: position + for position, column + in enumerate(metadata.input_columns)} + self._output = [] + self.next(reset=True) + + @property + def output(self) -> List[Tuple[Any, ...]]: + """Emitted output so far""" + return self._output + + @staticmethod + def _is_positive_integer(value): return value is not None and isinstance(value, int) and value > 0 def get_dataframe(self, num_rows='all', start_col=0): @@ -80,26 +221,26 @@ def get_dataframe(self, num_rows='all', start_col=0): return df def __getattr__(self, name): - return self._data[self._name_position_map[name]] + return None if self._data is None else self._data[self._name_position_map[name]] def next(self, reset: bool = False): - if reset: + if self._iter is None or reset: self.reset() else: try: new_data = next(self._iter) self._data = new_data - self._validate_tuples(self._data, self._metadata.input_columns) + validate_emit(self._data, self._metadata.input_columns) return True except StopIteration as e: self._data = None return False def size(self): - return self._len + return len(self._input) def reset(self): - self._iter = iter(self._input_group) + self._iter = iter(self._input) self.next() def emit(self, *args): @@ -108,13 +249,5 @@ def emit(self, *args): else: tuples = [args] for row in tuples: - self._validate_tuples(row, self._metadata.output_columns) - self._output_group_list.extend(tuples) - return - - def _validate_tuples(self, row: Tuple, columns: List[Column]): - if len(row) != len(columns): - raise Exception(f"row {row} has not the same number of values as columns are defined") - for i, column in enumerate(columns): - if row[i] is not None and not isinstance(row[i], column.type): - raise TypeError(f"Value {row[i]} ({type(row[i])}) at position {i} is not a {column.type}") + validate_emit(row, self._metadata.output_columns) + self._output.extend(tuples) diff --git a/exasol_udf_mock_python/udf_mock_executor.py b/exasol_udf_mock_python/udf_mock_executor.py index 448b00d..a9bdd66 100644 --- a/exasol_udf_mock_python/udf_mock_executor.py +++ b/exasol_udf_mock_python/udf_mock_executor.py @@ -7,7 +7,7 @@ def _loop_groups(ctx:MockContext, exa:MockExaEnvironment, runfunc:Callable): - while ctx._next_group(): + while ctx.next_group(): _wrapped_run(ctx, exa, runfunc) @@ -77,4 +77,4 @@ def run(self, finally: if "cleanup" in exec_globals: self._exec_cleanup(exec_globals) - return ctx._output_groups + return ctx.output_groups diff --git a/tests/test_mock_context.py b/tests/test_mock_context.py new file mode 100644 index 0000000..3a50bba --- /dev/null +++ b/tests/test_mock_context.py @@ -0,0 +1,69 @@ +import pytest +import pandas as pd + +from exasol_udf_mock_python.group import Group +from exasol_udf_mock_python.mock_context import MockContext +from tests.test_mock_context_standalone import meta_set_emits + + +@pytest.fixture +def context_set_emits(meta_set_emits): + pets = Group([(1, 'cat'), (2, 'dog')]) + bugs = Group([(3, 'ant'), (4, 'bee'), (5, 'beetle')]) + groups = [pets, bugs] + return MockContext(iter(groups), meta_set_emits) + + +def test_scroll(context_set_emits): + groups = [] + while context_set_emits.next_group(): + group = [context_set_emits.t2] + while context_set_emits.next(): + group.append(context_set_emits.t2) + groups.append(group) + assert groups == [['cat', 'dog'], ['ant', 'bee', 'beetle']] + + +def test_output_groups(context_set_emits): + context_set_emits.next_group() + context_set_emits.emit(1, 'cat') + context_set_emits.emit(2, 'dog') + context_set_emits.next_group() + context_set_emits.emit(3, 'ant') + context_set_emits.emit(4, 'bee') + context_set_emits.emit(5, 'beetle') + context_set_emits.next_group() + assert len(context_set_emits.output_groups) == 2 + assert context_set_emits.output_groups[0] == Group([(1, 'cat'), (2, 'dog')]) + assert context_set_emits.output_groups[1] == Group([(3, 'ant'), (4, 'bee'), (5, 'beetle')]) + + +def test_output_groups_partial(context_set_emits): + context_set_emits.next_group() + context_set_emits.emit(1, 'cat') + context_set_emits.emit(2, 'dog') + context_set_emits.next_group() + context_set_emits.emit(3, 'ant') + context_set_emits.emit(4, 'bee') + assert len(context_set_emits.output_groups) == 2 + assert context_set_emits.output_groups[0] == Group([(1, 'cat'), (2, 'dog')]) + assert context_set_emits.output_groups[1] == Group([(3, 'ant'), (4, 'bee')]) + + +def test_no_context_exception(context_set_emits): + + for _ in range(3): + context_set_emits.next_group() + + with pytest.raises(RuntimeError): + _ = context_set_emits.t2 + with pytest.raises(RuntimeError): + _ = context_set_emits.get_dataframe() + with pytest.raises(RuntimeError): + context_set_emits.next() + with pytest.raises(RuntimeError): + _ = context_set_emits.size() + with pytest.raises(RuntimeError): + context_set_emits.reset() + with pytest.raises(RuntimeError): + context_set_emits.emit(1, 'cat') diff --git a/tests/test_mock_context_standalone.py b/tests/test_mock_context_standalone.py new file mode 100644 index 0000000..9ce794d --- /dev/null +++ b/tests/test_mock_context_standalone.py @@ -0,0 +1,104 @@ +import pytest +import pandas as pd + +from exasol_udf_mock_python.column import Column +from exasol_udf_mock_python.mock_meta_data import MockMetaData +from exasol_udf_mock_python.mock_context import StandaloneMockContext, validate_emit + + +def udf_wrapper(): + pass + + +@pytest.fixture +def meta_scalar_return(): + return MockMetaData( + script_code_wrapper_function=udf_wrapper, + input_type='SCALAR', + input_columns=[Column('t', int, 'INTEGER')], + output_type='RETURNS', + output_columns=[Column('t', int, 'INTEGER')] + ) + + +@pytest.fixture +def meta_set_emits(): + return MockMetaData( + script_code_wrapper_function=udf_wrapper, + input_type='SET', + input_columns=[Column('t1', int, 'INTEGER'), Column('t2', str, 'VARCHAR(100)')], + output_type='EMITS', + output_columns=[Column('t1', int, 'INTEGER'), Column('t2', str, 'VARCHAR(100)')] + ) + + +@pytest.fixture +def context_scalar_return(meta_scalar_return): + return StandaloneMockContext((5,), meta_scalar_return) + + +@pytest.fixture +def context_set_emits(meta_set_emits): + return StandaloneMockContext([(5, 'abc'), (6, 'efgh')], meta_set_emits) + + +def test_get_dataframe(context_set_emits): + df = context_set_emits.get_dataframe() + expected_df = pd.DataFrame({'t1': [5, 6], 't2': ['abc', 'efgh']}) + pd.testing.assert_frame_equal(df, expected_df) + + +def test_get_dataframe_limited(context_set_emits): + df = context_set_emits.get_dataframe(1, 1) + expected_df = pd.DataFrame({'t2': ['abc']}) + pd.testing.assert_frame_equal(df, expected_df) + + +def test_attr_set(context_set_emits): + assert context_set_emits.t1 == 5 + assert context_set_emits.t2 == 'abc' + + +def test_attr_scalar(context_scalar_return): + assert context_scalar_return.t == 5 + + +def test_next(context_set_emits): + assert context_set_emits.next() + assert context_set_emits.t1 == 6 + assert context_set_emits.t2 == 'efgh' + + +def test_next_end(context_set_emits): + context_set_emits.next() + assert not context_set_emits.next() + + +def test_reset(context_set_emits): + context_set_emits.next() + context_set_emits.reset() + assert context_set_emits.t1 == 5 + assert context_set_emits.t2 == 'abc' + + +def test_size(context_set_emits): + assert context_set_emits.size() == 2 + + +def test_validate_emit_good(meta_set_emits): + validate_emit((10, 'fish'), meta_set_emits.output_columns) + + +def test_validate_emit_bad(meta_set_emits): + with pytest.raises(Exception): + validate_emit((10,), meta_set_emits.output_columns) + with pytest.raises(Exception): + validate_emit((10, 'fish', 4.5), meta_set_emits.output_columns) + with pytest.raises(Exception): + validate_emit((10., 'fish'), meta_set_emits.output_columns) + + +def test_emit_df(context_set_emits): + df = pd.DataFrame({'t1': [1, 2], 't2': ['cat', 'dog']}) + context_set_emits.emit(df) + assert context_set_emits.output == [(1, 'cat'), (2, 'dog')]