Skip to content

Commit

Permalink
Added range query.
Browse files Browse the repository at this point in the history
  • Loading branch information
AlfredChester committed Dec 14, 2024
1 parent 49a998e commit 6cdca07
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 0 deletions.
89 changes: 89 additions & 0 deletions cyaron/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""
This module provides a `RangeQuery` class for generating queries
based on limits of each dimension.
Classes:
RangeQuery: A class for generating random queries.
Usage:
n = randint(1, 10)
q = randint(1, 10)
Q = Query.random(q, [(1, n)])
"""

import random
from enum import IntEnum
from typing import Optional, Union, Tuple, List

from .utils import list_like


class RangeQueryRandomMode(IntEnum):
less = 0 # disallow l = r
allow_equal = 1 # allow l = r


class RangeQuery:
"""A class for generating random queries."""

@staticmethod
def random(
num: int = 1,
position_range: Optional[List[Union[int, Tuple[int, int]]]] = None,
mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal,
) -> List[Tuple[List[int], List[int]]]:
"""
Generate `num` random queries with dimension limit.
Args:
num: the number of queries
position_range: a list of limits for each dimension
single number x represents range [1, x]
list [x, y] or tuple (x, y) represents range [x, y]
mode: the mode queries generate, see Enum Class RangeQueryRandomMode
"""
if position_range is None:
position_range = [10]

if not list_like(position_range):
raise TypeError("the 2nd param must be a list or tuple")

result: List[Tuple[List[int], List[int]]] = []
for _ in range(num):
result.append(RangeQuery.get_one_query(position_range, mode))
return result

@staticmethod
def get_one_query(
position_range: Optional[List[Union[int, Tuple[int, int]]]] = None,
mode: RangeQueryRandomMode = RangeQueryRandomMode.allow_equal,
) -> Tuple[List[int], List[int]]:
dimension = len(position_range)
query_l: List[int] = []
query_r: List[int] = []
for i in range(dimension):
cur_range: Tuple[int, int]
if isinstance(position_range[i], int):
cur_range = (1, position_range[i])
elif len(position_range[i]) == 1:
cur_range = (1, position_range[i][0])
else:
cur_range = position_range[i]

if cur_range[0] > cur_range[1]:
raise ValueError("upper-bound should be larger than lower-bound")
if mode == RangeQueryRandomMode.less and cur_range[0] == cur_range[1]:
raise ValueError(
"mode is set to less but upper-bound is equal to lower-bound"
)

l = random.randint(cur_range[0], cur_range[1])
r = random.randint(cur_range[0], cur_range[1])
# Expected complexity is O(log(1 / V))
while mode == RangeQueryRandomMode.less and l == r:
l = random.randint(cur_range[0], cur_range[1])
r = random.randint(cur_range[0], cur_range[1])
if l > r:
l, r = r, l
query_l.append(l)
query_r.append(r)
return (query_l, query_r)
1 change: 1 addition & 0 deletions cyaron/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
from .compare_test import TestCompare
from .graph_test import TestGraph
from .vector_test import TestVector
from .range_query_test import TestRangeQuery
114 changes: 114 additions & 0 deletions cyaron/tests/range_query_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import unittest
import random
from cyaron.query import *
from cyaron.vector import *


def valid_query(l, r, mode: RangeQueryRandomMode, limits) -> bool:
if len(l) != len(r) or len(l) != len(limits):
return False
dimension = len(l)
for i in range(dimension):
cur_limit = limits[i]
if isinstance(cur_limit, int):
cur_limit = (1, cur_limit)
elif len(limits[i]) == 1:
cur_limit = (1, cur_limit[0])
if l[i] > r[i] or (l[i] == r[i] and mode == RangeQueryRandomMode.less):
print("bound", l[i], r[i])
return False
if not (cur_limit[0] <= l[i] <= r[i] <= cur_limit[1]):
print("limit", cur_limit[0], cur_limit[1], l[i], r[i])
return False
return True


class TestRangeQuery(unittest.TestCase):
def test_allow_equal_v1(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000]) # n1, n2 ...
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits)
)

def test_allow_equal_v2_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ...
conflict = False
for i in range(dimension):
conflict = conflict or limits[i][0] > limits[i][1]
throw = False
try:
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(
Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits
)
)
except:
throw = True

self.assertEqual(throw, conflict)

def test_allow_equal_v2_no_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ...
for i in range(dimension):
if limits[i][0] > limits[i][1]:
limits[i][0], limits[i][1] = limits[i][1], limits[i][0]
Q = RangeQuery.random(10**5, limits)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.allow_equal, limits)
)

def test_less_v1(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000]) # n1, n2 ...
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits)
)

def test_less_v2_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ...
conflict = False
for i in range(dimension):
conflict = conflict or limits[i][0] >= limits[i][1]
throw = False
try:
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits)
)
except:
throw = True

self.assertEqual(throw, conflict)

def test_less_v2_no_throw(self):
dimension = random.randint(1, 10)
limits = Vector.random(dimension, [1000, 1000]) # n1, n2 ...
for i in range(dimension):
while limits[i][0] == limits[i][1]:
limits[i][0] = random.randint(1, 1000)
limits[i][1] = random.randint(1, 1000)
if limits[i][0] > limits[i][1]:
limits[i][0], limits[i][1] = limits[i][1], limits[i][0]
Q = RangeQuery.random(10**5, limits, RangeQueryRandomMode.less)
self.assertEqual(len(Q), 10**5)
for i in range(10**5):
self.assertTrue(
valid_query(Q[i][0], Q[i][1], RangeQueryRandomMode.less, limits)
)

0 comments on commit 6cdca07

Please sign in to comment.