diff --git a/cyaron/query.py b/cyaron/query.py new file mode 100644 index 0000000..5a93958 --- /dev/null +++ b/cyaron/query.py @@ -0,0 +1,89 @@ +""" +This module provides a `RangeQuery` class for generating queries +based on limits of each dimension. + +Classes: + RangeQuery: A class for generating random queries. + +Usage: + n = randint(1, 10) + q = randint(1, 10) + Q = Query.random(q, [(1, n)]) +""" + +import random +from enum import IntEnum +from typing import Optional, Union, Tuple, List + +from .utils import list_like + + +class RangeQueryRandomMode(IntEnum): + less = 0 # disallow l = r + allow_equal = 1 # allow l = r + + +class RangeQuery: + """A class for generating random queries.""" + + @staticmethod + def random( + num: int = 1, + position_range: Optional[List[Union[int, Tuple[int, int]]]] = None, + mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal, + ) -> List[Tuple[List[int], List[int]]]: + """ + Generate `num` random queries with dimension limit. + Args: + num: the number of queries + position_range: a list of limits for each dimension + single number x represents range [1, x] + list [x, y] or tuple (x, y) represents range [x, y] + mode: the mode queries generate, see Enum Class RangeQueryRandomMode + """ + if position_range is None: + position_range = [10] + + if not list_like(position_range): + raise TypeError("the 2nd param must be a list or tuple") + + result: List[Tuple[List[int], List[int]]] = [] + for _ in range(num): + result.append(RangeQuery.get_one_query(position_range, mode)) + return result + + @staticmethod + def get_one_query( + position_range: Optional[List[Union[int, Tuple[int, int]]]] = None, + mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal, + ) -> Tuple[List[int], List[int]]: + dimension = len(position_range) + query_l: List[int] = [] + query_r: List[int] = [] + for i in range(dimension): + cur_range: Tuple[int, int] + if isinstance(position_range[i], int): + cur_range = (1, position_range[i]) + elif len(position_range[i]) == 1: + cur_range = (1, position_range[i][0]) + else: + cur_range = position_range[i] + + if cur_range[0] > cur_range[1]: + raise ValueError("upper-bound should be larger than lower-bound") + if mode == RangeQueryRandomMode.less and cur_range[0] == cur_range[1]: + raise ValueError( + "mode is set to less but upper-bound is equal to lower-bound" + ) + + l = random.randint(cur_range[0], cur_range[1]) + r = random.randint(cur_range[0], cur_range[1]) + # Expected complexity is O(log(1 / V)) + while mode == RangeQueryRandomMode.less and l == r: + l = random.randint(cur_range[0], cur_range[1]) + r = random.randint(cur_range[0], cur_range[1]) + if l > r: + l, r = r, l + query_l.append(l) + query_r.append(r) + return (query_l, query_r) diff --git a/cyaron/tests/__init__.py b/cyaron/tests/__init__.py index 328a930..0995841 100644 --- a/cyaron/tests/__init__.py +++ b/cyaron/tests/__init__.py @@ -5,3 +5,4 @@ from .compare_test import TestCompare from .graph_test import TestGraph from .vector_test import TestVector +from .range_query_test import TestRangeQuery diff --git a/cyaron/tests/range_query_test.py b/cyaron/tests/range_query_test.py new file mode 100644 index 0000000..8f91f7d --- /dev/null +++ b/cyaron/tests/range_query_test.py @@ -0,0 +1,114 @@ +import unittest +import random +from cyaron.query import * +from cyaron.vector import * + + +def valid_query(l, r, mode: RangeQueryRandomMode, limits) -> bool: + if len(l) != len(r) or len(l) != len(limits): + return False + dimension = len(l) + for i in range(dimension): + cur_limit = limits[i] + if isinstance(cur_limit, int): + cur_limit = (1, cur_limit) + elif len(limits[i]) == 1: + cur_limit = (1, cur_limit[0]) + if l[i] > r[i] or (l[i] == r[i] and mode == RangeQueryRandomMode.less): + print("bound", l[i], r[i]) + return False + if not (cur_limit[0] <= l[i] <= r[i] <= cur_limit[1]): + print("limit", cur_limit[0], cur_limit[1], l[i], r[i]) + return False + return True + + +class TestRangeQuery(unittest.TestCase): + def test_allow_equal_v1(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000]) # n1, n2 ... + Q = RangeQuery.random(10**5, limits) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits) + ) + + def test_allow_equal_v2_throw(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ... + conflict = False + for i in range(dimension): + conflict = conflict or limits[i][0] > limits[i][1] + throw = False + try: + Q = RangeQuery.random(10**5, limits) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query( + Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits + ) + ) + except: + throw = True + + self.assertEqual(throw, conflict) + + def test_allow_equal_v2_no_throw(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ... + for i in range(dimension): + if limits[i][0] > limits[i][1]: + limits[i][0], limits[i][1] = limits[i][1], limits[i][0] + Q = RangeQuery.random(10**5, limits) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits) + ) + + def test_less_v1(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000]) # n1, n2 ... + Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits) + ) + + def test_less_v2_throw(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ... + conflict = False + for i in range(dimension): + conflict = conflict or limits[i][0] >= limits[i][1] + throw = False + try: + Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits) + ) + except: + throw = True + + self.assertEqual(throw, conflict) + + def test_less_v2_no_throw(self): + dimension = random.randint(1, 10) + limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ... + for i in range(dimension): + while limits[i][0] == limits[i][1]: + limits[i][0] = random.randint(1, 1000) + limits[i][1] = random.randint(1, 1000) + if limits[i][0] > limits[i][1]: + limits[i][0], limits[i][1] = limits[i][1], limits[i][0] + Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less) + self.assertEqual(len(Q), 10**5) + for i in range(10**5): + self.assertTrue( + valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits) + )