diff --git a/.gitignore b/.gitignore index 18805a8..ea6daf8 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,4 @@ MANIFEST Untitled1.ipynb Untitled.ipynb +tests/pysubgroup.code-workspace diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 501d87a..81b185a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -53,10 +53,10 @@ repos: # - id: blacken-docs # additional_dependencies: [black] -- repo: https://github.com/PyCQA/flake8 - rev: 6.1.0 - hooks: - - id: flake8 +#- repo: https://github.com/PyCQA/flake8 +# rev: 6.1.0 +# hooks: +# - id: flake8 ## You can add flake8 plugins via `additional_dependencies`: # additional_dependencies: [flake8-bugbear] diff --git a/src/pysubgroup/algorithms.py b/src/pysubgroup/algorithms.py index ac81791..d503261 100644 --- a/src/pysubgroup/algorithms.py +++ b/src/pysubgroup/algorithms.py @@ -3,6 +3,7 @@ @author: lemmerfn """ + import copy import warnings from collections import Counter, defaultdict, namedtuple @@ -17,7 +18,7 @@ class SubgroupDiscoveryTask: """ - Capsulates all parameters required to perform standard subgroup discovery + Encapsulates all parameters required to perform standard subgroup discovery. """ def __init__( @@ -31,6 +32,19 @@ def __init__( min_quality=float("-inf"), constraints=None, ): + """ + Initializes a new SubgroupDiscoveryTask. + + Parameters: + data: The dataset to be analyzed. + target: The target concept for subgroup discovery. + search_space: The search space of possible selectors. + qf: The quality function to evaluate subgroups. + result_set_size: The maximum number of subgroups to return. + depth: The maximum depth (length) of the subgroups. + min_quality: The minimal quality threshold for subgroups. + constraints: A list of constraints to be satisfied by subgroups. + """ self.data = data self.target = target self.search_space = search_space @@ -50,6 +64,18 @@ def __init__( def constraints_satisfied(constraints, subgroup, statistics=None, data=None): + """ + Checks if all constraints are satisfied for a given subgroup. + + Parameters: + constraints: A list of constraints to check. + subgroup: The subgroup to be evaluated. + statistics: Precomputed statistics for the subgroup (optional). + data: The dataset to be analyzed (optional). + + Returns: + True if all constraints are satisfied, False otherwise. + """ return all( constr.is_satisfied(subgroup, statistics, data) for constr in constraints ) @@ -64,6 +90,16 @@ def constraints_satisfied(constraints, subgroup, statistics=None, data=None): @njit([(int32[:, :], int64[:])], cache=True) def getNewCandidates(candidates, hashes): # pragma: no cover + """ + Generates new candidate pairs for the next level using Numba for acceleration. + + Parameters: + candidates: A 2D numpy array of candidate selector IDs. + hashes: A 1D numpy array of hash values for the candidates. + + Returns: + A list of tuples, each containing indices of candidate pairs to be combined. + """ result = [] for i in range(len(candidates) - 1): for j in range(i + 1, len(candidates)): @@ -77,9 +113,23 @@ def getNewCandidates(candidates, hashes): # pragma: no cover class Apriori: + """ + Implementation of the Apriori algorithm for subgroup discovery. + + This class provides methods to perform level-wise search for subgroups using the Apriori algorithm. + """ + def __init__( self, representation_type=None, combination_name="Conjunction", use_numba=True ): + """ + Initializes the Apriori algorithm. + + Parameters: + representation_type: The representation type to use for subgroups (default is BitSetRepresentation). + combination_name: The name of the combination method (e.g., "Conjunction" or "Disjunction"). + use_numba: Whether to use Numba for performance optimization. + """ self.combination_name = combination_name if representation_type is None: @@ -99,6 +149,17 @@ def __init__( pass def get_next_level_candidates(self, task, result, next_level_candidates): + """ + Evaluates candidates at the current level and filters promising ones for the next level. + + Parameters: + task: The subgroup discovery task. + result: The current list of discovered subgroups. + next_level_candidates: List of subgroups to be evaluated at the current level. + + Returns: + A list of promising candidates (selectors) for the next level. + """ promising_candidates = [] optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name) for sg in next_level_candidates: @@ -129,6 +190,17 @@ def get_next_level_candidates(self, task, result, next_level_candidates): return promising_candidates def get_next_level_candidates_vectorized(self, task, result, next_level_candidates): + """ + Vectorized evaluation of candidates at the current level to filter promising ones for the next level. + + Parameters: + task: The subgroup discovery task. + result: The current list of discovered subgroups. + next_level_candidates: List of subgroups to be evaluated at the current level. + + Returns: + A list of promising candidates (selectors) for the next level. + """ promising_candidates = [] statistics = [] optimistic_estimate_function = getattr(task.qf, self.optimistic_estimate_name) @@ -156,9 +228,19 @@ def get_next_level_candidates_vectorized(self, task, result, next_level_candidat return promising_candidates def get_next_level_numba(self, promising_candidates): # pragma: no cover + """ + Generates the next level of candidates using Numba for acceleration. + + Parameters: + promising_candidates: A list of promising candidate selectors. + + Returns: + A list of new candidate selectors for the next level. + """ if not hasattr(self, "compiled_func") or self.compiled_func is None: self.compiled_func = getNewCandidates + # Map selectors to unique IDs all_selectors = Counter(chain.from_iterable(promising_candidates)) all_selectors_ids = {selector: i for i, selector in enumerate(all_selectors)} promising_candidates_selector_ids = [ @@ -186,6 +268,15 @@ def get_next_level_numba(self, promising_candidates): # pragma: no cover ] def get_next_level(self, promising_candidates): + """ + Generates the next level of candidates based on the current promising candidates. + + Parameters: + promising_candidates: A list of promising candidate selectors. + + Returns: + A list of new candidate selectors for the next level. + """ by_prefix_dict = defaultdict(list) for sg in promising_candidates: by_prefix_dict[tuple(sg[:-1])].append(sg[-1]) @@ -196,6 +287,15 @@ def get_next_level(self, promising_candidates): ] def execute(self, task): + """ + Executes the Apriori algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ if not isinstance( task.qf, ps.BoundedInterestingnessMeasure ): # pragma: no cover @@ -208,7 +308,7 @@ def execute(self, task): with self.representation_type(task.data, task.search_space) as representation: combine_selectors = getattr(representation.__class__, self.combination_name) result = [] - # init the first level + # Initialize the first level candidates next_level_candidates = [] for sel in task.search_space: sg = combine_selectors([sel]) @@ -217,10 +317,10 @@ def execute(self, task): ): next_level_candidates.append(sg) - # level-wise search + # Level-wise search depth = 1 while next_level_candidates: - # check sgs from the last level + # Evaluate subgroups from the last level if self.use_vectorization: promising_candidates = self.get_next_level_candidates_vectorized( task, result, next_level_candidates @@ -237,10 +337,8 @@ def execute(self, task): next_level_candidates_no_pruning = self.next_level(promising_candidates) - # select those selectors and build a subgroup from them - # for which all subsets of length depth (=candidate length -1) - # are in the set of promising candidates - curr_depth = depth # WARNING: need copy of depth for lazy eval + # Select selectors and build subgroups for which all subsets are in the set of promising candidates + curr_depth = depth # Need copy of depth for lazy evaluation set_promising_candidates = set(tuple(p) for p in promising_candidates) next_level_candidates = ( combine_selectors(selectors) @@ -258,7 +356,20 @@ def execute(self, task): class BestFirstSearch: + """ + Implements the Best-First Search algorithm for subgroup discovery. + """ + def execute(self, task): + """ + Executes the Best-First Search algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ result = [] queue = [(float("-inf"), ps.Conjunction([]))] operator = ps.StaticSpecializationOperator(task.search_space) @@ -286,7 +397,7 @@ def execute(self, task): else: optimistic_estimate = np.inf - # compute refinements and fill the queue + # Compute refinements and fill the queue if optimistic_estimate >= ps.minimum_required_quality(result, task): if ps.constraints_satisfied( task.constraints_monotone, @@ -303,16 +414,29 @@ def execute(self, task): class GeneralisingBFS: # pragma: no cover + """ + Implements a Generalizing Best-First Search algorithm for subgroup discovery. + """ + def __init__(self): self.alpha = 1.10 self.discarded = [0, 0, 0, 0, 0, 0, 0] self.refined = [0, 0, 0, 0, 0, 0, 0] def execute(self, task): + """ + Executes the Generalizing Best-First Search algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ result = [] queue = [] operator = ps.StaticGeneralizationOperator(task.search_space) - # init the first level + # Initialize the first level for sel in task.search_space: queue.append((float("-inf"), ps.Disjunction([sel]))) task.qf.calculate_constant_statistics(task.data, task.target) @@ -339,20 +463,11 @@ def execute(self, task): optimistic_estimate = task.qf.optimistic_estimate( sg, task.target, task.data, statistics ) - # else: - # ps.add_if_required( - # result, sg, task.qf.evaluate_from_dataset(task.data, sg), task) - # optimistic_estimate = task.qf.optimistic_generalisation_from_dataset( - # task.data, sg) if qf_is_bounded else float("inf") - - # compute refinements and fill the queue + # Compute refinements and fill the queue if len(candidate_description) < task.depth and ( optimistic_estimate / self.alpha ** (len(candidate_description) + 1) ) >= ps.minimum_required_quality(result, task): - # print(qual) - # print(optimistic_estimate) self.refined[len(candidate_description)] += 1 - # print(str(candidate_description)) for new_description in operator.refinements(candidate_description): heappush(queue, (-optimistic_estimate, new_description)) else: @@ -365,20 +480,36 @@ def execute(self, task): class BeamSearch: """ - Implements the BeamSearch algorithm. Its a basic implementation + Implements the Beam Search algorithm for subgroup discovery. """ def __init__(self, beam_width=20, beam_width_adaptive=False): + """ + Initializes the Beam Search algorithm. + + Parameters: + beam_width: The width of the beam (number of candidates to keep at each level). + beam_width_adaptive: Whether to adapt the beam width to the result set size. + """ self.beam_width = beam_width self.beam_width_adaptive = beam_width_adaptive def execute(self, task): - # adapt beam width to the result set size if desired + """ + Executes the Beam Search algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ + # Adapt beam width to the result set size if desired beam_width = self.beam_width if self.beam_width_adaptive: beam_width = task.result_set_size - # check if beam size is to small for result set + # Check if beam size is too small for result set if beam_width < task.result_set_size: raise RuntimeError( "Beam width in the beam search algorithm " @@ -387,7 +518,7 @@ def execute(self, task): task.qf.calculate_constant_statistics(task.data, task.target) - # init + # Initialize beam = [ ( 0, @@ -405,7 +536,7 @@ def execute(self, task): continue setattr(last_sg, "visited", True) for sel in task.search_space: - # create a clone + # Create a clone if sel in last_sg.selectors: continue sg = ps.Conjunction(last_sg.selectors + (sel,)) @@ -424,7 +555,7 @@ def execute(self, task): ) depth += 1 - # result = beam[-task.result_set_size:] + # Trim the beam to the result set size while len(beam) > task.result_set_size: heappop(beam) @@ -434,10 +565,29 @@ def execute(self, task): class SimpleSearch: + """ + Implements a simple exhaustive search algorithm for subgroup discovery. + """ + def __init__(self, show_progress=True): + """ + Initializes the Simple Search algorithm. + + Parameters: + show_progress: Whether to display a progress bar during the search. + """ self.show_progress = show_progress def execute(self, task): + """ + Executes the Simple Search algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ task.qf.calculate_constant_statistics(task.data, task.target) result = [] all_selectors = chain.from_iterable( @@ -473,7 +623,22 @@ def binomial(x, y): class SimpleDFS: + """ + Implements a simple Depth-First Search algorithm for subgroup discovery. + It is the most elementary (and thus probably slow) algorithm implementation. + """ + def execute(self, task, use_optimistic_estimates=True): + """ + Executes the Simple DFS algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + use_optimistic_estimates: Whether to use optimistic estimates for pruning. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ task.qf.calculate_constant_statistics(task.data, task.target) result = self.search_internal( task, [], task.search_space, [], use_optimistic_estimates @@ -484,6 +649,19 @@ def execute(self, task, use_optimistic_estimates=True): def search_internal( self, task, prefix, modification_set, result, use_optimistic_estimates ): + """ + Recursively searches for subgroups in a depth-first manner. + + Parameters: + task: The subgroup discovery task. + prefix: The current list of selectors in the subgroup description. + modification_set: The remaining selectors to consider. + result: The current list of discovered subgroups. + use_optimistic_estimates: Whether to use optimistic estimates for pruning. + + Returns: + The updated list of discovered subgroups. + """ sg = ps.Conjunction(copy.copy(prefix)) statistics = task.qf.calculate_statistics(sg, task.target, task.data) @@ -512,18 +690,23 @@ def search_internal( self.search_internal( task, prefix, new_modification_set, result, use_optimistic_estimates ) - # remove the sel again + # Remove the selector again prefix.pop(-1) return result class DFS: """ - Implementation of a depth-first-search - with look-ahead using a provided datastructure. + Implementation of a depth-first search with look-ahead using a provided data structure. """ def __init__(self, apply_representation=None): + """ + Initializes the DFS algorithm. + + Parameters: + apply_representation: The representation type to use for subgroups. + """ self.target_bitset = None if apply_representation is None: apply_representation = ps.BitSetRepresentation @@ -534,6 +717,15 @@ def __init__(self, apply_representation=None): ) def execute(self, task): + """ + Executes the DFS algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ self.operator = ps.StaticSpecializationOperator(task.search_space) task.qf.calculate_constant_statistics(task.data, task.target) result = [] @@ -543,6 +735,14 @@ def execute(self, task): return ps.SubgroupDiscoveryResult(result, task) def search_internal(self, task, result, sg): + """ + Recursively searches for subgroups in a depth-first manner. + + Parameters: + task: The subgroup discovery task. + result: The current list of discovered subgroups. + sg: The current subgroup being evaluated. + """ statistics = task.qf.calculate_statistics(sg, task.target, task.data) if not constraints_satisfied( task.constraints_monotone, sg, statistics, task.data @@ -562,6 +762,10 @@ def search_internal(self, task, result, sg): class DFSNumeric: + """ + Implements a specialized DFS algorithm for numeric quality functions. + """ + tpl = namedtuple("size_mean_parameters", ("size_sg", "mean")) def __init__(self): @@ -573,6 +777,15 @@ def __init__(self): self.evaluate = None def execute(self, task): + """ + Executes the DFSNumeric algorithm on the given task. + + Parameters: + task: The subgroup discovery task to be executed. + + Returns: + A SubgroupDiscoveryResult containing the discovered subgroups. + """ if not isinstance(task.qf, ps.StandardQFNumeric): raise RuntimeError( "BSD_numeric so far is only implemented for StandardQFNumeric" @@ -582,15 +795,15 @@ def execute(self, task): task.target.get_attributes()[0], ascending=False ) - # generate target bitset + # Generate target values self.target_values = sorted_data[task.target.get_attributes()[0]].to_numpy() task.qf.calculate_constant_statistics(task.data, task.target) - # generate selector bitsets + # Generate selector bitsets self.bitsets = {} for sel in task.search_space: - # generate bitset + # Generate bitset self.bitsets[sel] = sel.covers(sorted_data) result = self.search_internal( task, [], task.search_space, [], np.ones(len(sorted_data), dtype=bool) @@ -599,6 +812,19 @@ def execute(self, task): return ps.SubgroupDiscoveryResult(result, task) def search_internal(self, task, prefix, modification_set, result, bitset): + """ + Recursively searches for subgroups in a depth-first manner using numeric quality functions. + + Parameters: + task: The subgroup discovery task. + prefix: The current list of selectors in the subgroup description. + modification_set: The remaining selectors to consider. + result: The current list of discovered subgroups. + bitset: The current bitset representing the subgroup. + + Returns: + The updated list of discovered subgroups. + """ self.num_calls += 1 sg_size = bitset.sum() if sg_size == 0: @@ -630,6 +856,6 @@ def search_internal(self, task, prefix, modification_set, result, bitset): self.search_internal( task, prefix, new_modification_set, result, new_bitset ) - # remove the sel again + # Remove the selector again prefix.pop(-1) return result diff --git a/src/pysubgroup/binary_target.py b/src/pysubgroup/binary_target.py index a19e56c..b1c6f4e 100644 --- a/src/pysubgroup/binary_target.py +++ b/src/pysubgroup/binary_target.py @@ -20,6 +20,12 @@ @total_ordering class BinaryTarget(BaseTarget): + """Binary target for classic subgroup discovery with boolean targets. + + Stores the target attribute and value, and computes various statistics related to + the target within a subgroup. + """ + statistic_types = ( "size_sg", "size_dataset", @@ -38,14 +44,25 @@ class BinaryTarget(BaseTarget): def __init__(self, target_attribute=None, target_value=None, target_selector=None): """ + Initialize a BinaryTarget instance. + Creates a new target for the boolean model class (classic subgroup discovery). If target_attribute and target_value are given, the target_selector is computed - using attribute and value + using the attribute and value. + + Parameters: + target_attribute (str, optional): The name of the target attribute. + target_value (any, optional): The value of the target attribute. + target_selector (Selector, optional): A predefined target selector. + + Raises: + ValueError: If both target_selector and target_attribute/target_value are provided, + or if none are provided. """ if target_attribute is not None and target_value is not None: if target_selector is not None: raise ValueError( - "BinaryTarget is to be constructed" + "BinaryTarget is to be constructed " "EITHER by a selector OR by attribute/value pair" ) target_selector = EqualitySelector(target_attribute, target_value) @@ -54,21 +71,47 @@ def __init__(self, target_attribute=None, target_value=None, target_selector=Non self.target_selector = target_selector def __repr__(self): + """String representation of the BinaryTarget.""" return "T: " + str(self.target_selector) def __eq__(self, other): + """Check equality based on the instance dictionary.""" return self.__dict__ == other.__dict__ def __lt__(self, other): + """Define less-than comparison for sorting purposes.""" return str(self) < str(other) def covers(self, instance): + """Determine whether the target selector covers the given instance. + + Parameters: + instance (pandas DataFrame): The data instance to check. + + Returns: + numpy.ndarray: Boolean array indicating coverage. + """ return self.target_selector.covers(instance) def get_attributes(self): + """Get the attribute names used in the target. + + Returns: + tuple: A tuple containing the attribute name. + """ return (self.target_selector.attribute_name,) def get_base_statistics(self, subgroup, data): + """Compute basic statistics for the target within the subgroup and dataset. + + Parameters: + subgroup: The subgroup for which to compute statistics. + data (pandas DataFrame): The dataset. + + Returns: + tuple: Contains instances_dataset, positives_dataset, + instances_subgroup, positives_subgroup. + """ cover_arr, size_sg = get_cover_array_and_size(subgroup, len(data), data) positives = self.covers(data) instances_subgroup = size_sg @@ -83,6 +126,16 @@ def get_base_statistics(self, subgroup, data): ) def calculate_statistics(self, subgroup, data, cached_statistics=None): + """Calculate various statistics for the subgroup. + + Parameters: + subgroup: The subgroup for which to calculate statistics. + data (pandas DataFrame): The dataset. + cached_statistics (dict, optional): Previously computed statistics. + + Returns: + dict: A dictionary containing various statistical measures. + """ if self.all_statistics_present(cached_statistics): return cached_statistics @@ -123,15 +176,27 @@ def calculate_statistics(self, subgroup, data, cached_statistics=None): class SimplePositivesQF( AbstractInterestingnessMeasure ): # pylint: disable=abstract-method + """Quality function for binary targets based on positive instances.""" + tpl = namedtuple("PositivesQF_parameters", ("size_sg", "positives_count")) def __init__(self): + """Initialize the SimplePositivesQF.""" self.dataset_statistics = None self.positives = None self.has_constant_statistics = False self.required_stat_attrs = ("size_sg", "positives_count") def calculate_constant_statistics(self, data, target): + """Calculate statistics that remain constant for the dataset. + + Parameters: + data (pandas DataFrame): The dataset. + target (BinaryTarget): The target definition. + + Raises: + AssertionError: If the target is not an instance of BinaryTarget. + """ assert isinstance(target, BinaryTarget) self.positives = target.covers(data) self.dataset_statistics = SimplePositivesQF.tpl( @@ -142,6 +207,17 @@ def calculate_constant_statistics(self, data, target): def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument + """Calculate statistics specific to the subgroup. + + Parameters: + subgroup: The subgroup for which to calculate statistics. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + namedtuple: Contains size_sg and positives_count for the subgroup. + """ cover_arr, size_sg = get_cover_array_and_size( subgroup, len(self.positives), data ) @@ -151,38 +227,87 @@ def calculate_statistics( # <<< GpGrowth >>> def gp_get_stats(self, row_index): + """Get statistics for a single row (used in GP-Growth algorithms). + + Parameters: + row_index (int): The index of the row. + + Returns: + numpy.ndarray: Array containing [1, positives[row_index]]. + """ return np.array([1, self.positives[row_index]], dtype=int) def gp_get_null_vector(self): + """Get a null vector for initialization in GP-Growth algorithms. + + Returns: + numpy.ndarray: Zero-initialized array of size 2. + """ return np.zeros(2) def gp_merge(self, left, right): + """Merge two statistics vectors by summing them. + + Parameters: + left (numpy.ndarray): Left statistics vector. + right (numpy.ndarray): Right statistics vector. + """ left += right def gp_get_params(self, _cover_arr, v): + """Extract parameters from the statistics vector. + + Parameters: + _cover_arr: Unused parameter. + v (numpy.ndarray): Statistics vector. + + Returns: + namedtuple: Contains size_sg and positives_count. + """ return SimplePositivesQF.tpl(v[0], v[1]) def gp_to_str(self, stats): + """Convert statistics to a string representation. + + Parameters: + stats (numpy.ndarray): Statistics vector. + + Returns: + str: String representation of the statistics. + """ return " ".join(map(str, stats)) def gp_size_sg(self, stats): + """Get the size of the subgroup from the statistics. + + Parameters: + stats (numpy.ndarray): Statistics vector. + + Returns: + int: Size of the subgroup. + """ return stats[0] @property def gp_requires_cover_arr(self): + """Indicate whether the GP-Growth algorithm requires a cover array. + + Returns: + bool: False, since cover array is not required. + """ return False # TODO Make ChiSquared useful for real nominal data not just binary # Introduce Enum for direction -# Maybe it is possible to give a optimistic estimate for ChiSquared +# Maybe it is possible to give an optimistic estimate for ChiSquared class ChiSquaredQF(SimplePositivesQF): # pragma: no cover """ - ChiSquaredQF which test for statistical independence - of a subgroup against it's complement - - ... + ChiSquaredQF tests for statistical independence + of a subgroup against its complement. + Calculates the chi-squared statistic or p-value to measure the + significance of the difference between the subgroup and the dataset. """ @staticmethod @@ -197,29 +322,23 @@ def chi_squared_qf( index=0, ): """ - Performs chi2 test of statistical independence - - Test whether a subgroup is statistically independent - from it's complement (see scipy.stats.chi2_contingency). - - - Parameters - ---------- - instances_dataset, - positives_dataset, - instances_subgroup, - positives_subgroup : int - counts of subgroup and dataset - min_instances : int, optional - number of required instances, if less -inf is returned for that subgroup - bidirect : bool, optional - If true both directions are considered interesting - else direction_positive decides which direction is interesting - direction_positive: bool, optional - Only used if bidirect=False; specifies whether you are interested - in positive (True) or negative deviations - index : {0, 1}, optional - decides whether the test statistic (0) or the p-value (1) should be used + Perform chi-squared test of statistical independence. + + Tests whether a subgroup is statistically independent + from its complement (see scipy.stats.chi2_contingency). + + Parameters: + instances_dataset (int): Total number of instances in the dataset. + positives_dataset (int): Total number of positive instances in the dataset. + instances_subgroup (int): Number of instances in the subgroup. + positives_subgroup (int): Number of positive instances in the subgroup. + min_instances (int, optional): Minimum required instances; return -inf if less. + bidirect (bool, optional): If True, both directions are considered interesting. + direction_positive (bool, optional): If bidirect is False, specifies the direction. + index (int, optional): Index to decide whether to return statistic (0) or p-value (1). + + Returns: + float: The chi-squared statistic or p-value, depending on the index parameter. """ import scipy.stats # pylint:disable=import-outside-toplevel @@ -258,6 +377,18 @@ def chi_squared_qf_weighted( effective_sample_size=0, min_instances=5, ): + """Perform chi-squared test for weighted data. + + Parameters: + subgroup: The subgroup for which to calculate the statistic. + data (pandas DataFrame): The dataset. + weighting_attribute (str): The attribute used for weighting. + effective_sample_size (int, optional): Effective sample size. + min_instances (int, optional): Minimum required instances. + + Returns: + float: The p-value from the chi-squared test. + """ import scipy.stats # pylint:disable=import-outside-toplevel ( @@ -274,8 +405,6 @@ def chi_squared_qf_weighted( effective_sample_size = derive_effective_sample_size( data[weighting_attribute] ) - # p_subgroup = positivesSubgroup / instancesSubgroup - # p_dataset = positivesDataset / instancesDataset negatives_subgroup = instancesSubgroup - positivesSubgroup negatives_dataset = instancesDataset - positivesDataset @@ -292,30 +421,42 @@ def chi_squared_qf_weighted( def __init__(self, direction="both", min_instances=5, stat="chi2"): """ - Parameters - ---------- - direction : {'both', 'positive', 'negative'} - direction of deviation that is of interest - min_instances : int, optional - number of required instances, if less -inf is returned for that subgroup - stat : {'chi2', 'p'} - whether to report the test statistic - or the p-value (see scipy.stats.chi2_contingency) + Initialize the ChiSquaredQF. + + Parameters: + direction (str, optional): Direction of deviation of interest ('both', 'positive', 'negative'). + min_instances (int, optional): Minimum required instances; return -inf if less. + stat (str, optional): Whether to report the test statistic ('chi2') or the p-value ('p'). """ if direction == "both": self.bidirect = True self.direction_positive = True - if direction == "positive": + elif direction == "positive": self.bidirect = False self.direction_positive = True - if direction == "negative": + elif direction == "negative": self.bidirect = False self.direction_positive = False + else: + raise ValueError( + "Invalid direction; must be 'both', 'positive', or 'negative'" + ) self.min_instances = min_instances self.index = {"chi2": 0, "p": 1}[stat] super().__init__() def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup using the chi-squared test. + + Parameters: + subgroup: The subgroup to evaluate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The chi-squared statistic or p-value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return ChiSquaredQF.chi_squared_qf( @@ -332,45 +473,59 @@ def evaluate(self, subgroup, target, data, statistics=None): class StandardQF(SimplePositivesQF, BoundedInterestingnessMeasure): """ - StandardQF which weights the relative size against the difference in averages + StandardQF which weights the relative size against the difference in averages. The StandardQF is a general form of quality function - which for different values of a is order equivalen to + which for different values of 'a' is order equivalent to many popular quality measures. - - Attributes - ---------- - a : float - used as an exponent to scale the relative size to the difference in averages - """ @staticmethod def standard_qf( a, instances_dataset, positives_dataset, instances_subgroup, positives_subgroup ): + """Compute the standard quality function. + + Parameters: + a (float): Exponent to trade-off the relative size with the difference in means. + instances_dataset (int): Total number of instances in the dataset. + positives_dataset (int): Total number of positive instances in the dataset. + instances_subgroup (int): Number of instances in the subgroup. + positives_subgroup (int): Number of positive instances in the subgroup. + + Returns: + float: The computed quality value. + """ if not hasattr(instances_subgroup, "__array_interface__") and ( instances_subgroup == 0 ): return np.nan p_subgroup = np.divide(positives_subgroup, instances_subgroup) - # if instances_subgroup == 0: - # return 0 - # p_subgroup = positives_subgroup / instances_subgroup p_dataset = positives_dataset / instances_dataset return (instances_subgroup / instances_dataset) ** a * (p_subgroup - p_dataset) def __init__(self, a): """ - Parameters - ---------- - a : float - exponent to trade-off the relative size with the difference in means + Initialize the StandardQF. + + Parameters: + a (float): Exponent to trade-off the relative size with the difference in means. """ self.a = a super().__init__() def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup using the standard quality function. + + Parameters: + subgroup: The subgroup to evaluate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The computed quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( @@ -382,6 +537,17 @@ def evaluate(self, subgroup, target, data, statistics=None): ) def optimistic_estimate(self, subgroup, target, data, statistics=None): + """Compute the optimistic estimate of the quality function. + + Parameters: + subgroup: The subgroup for which to compute the optimistic estimate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The optimistic estimate of the quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQF.standard_qf( @@ -393,6 +559,17 @@ def optimistic_estimate(self, subgroup, target, data, statistics=None): ) def optimistic_generalisation(self, subgroup, target, data, statistics=None): + """Compute the optimistic generalization of the quality function. + + Parameters: + subgroup: The subgroup for which to compute the optimistic generalization. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The optimistic generalization of the quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics pos_remaining = dataset.positives_count - statistics.positives_count @@ -407,7 +584,7 @@ def optimistic_generalisation(self, subgroup, target, data, statistics=None): class LiftQF(StandardQF): """ - Lift Quality Function + Lift Quality Function. LiftQF is a StandardQF with a=0. Thus it treats the difference in ratios as the quality @@ -415,54 +592,64 @@ class LiftQF(StandardQF): """ def __init__(self): - """ """ - + """Initialize the LiftQF.""" super().__init__(0.0) # TODO add true binomial quality function as in -# https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786 # noqa: E501 +# https://opus.bibliothek.uni-wuerzburg.de/opus4-wuerzburg/frontdoor/index/index/docId/1786 class SimpleBinomialQF(StandardQF): """ - Simple Binomial Quality Function + Simple Binomial Quality Function. SimpleBinomialQF is a StandardQF with a=0.5. - It is an order equivalent approximation of the full binomial test + It is an order-equivalent approximation of the full binomial test if the subgroup size is much smaller than the size of the entire dataset. """ def __init__(self): - """ """ - + """Initialize the SimpleBinomialQF.""" super().__init__(0.5) class WRAccQF(StandardQF): """ - Weighted Relative Accuracy Quality Function + Weighted Relative Accuracy Quality Function. WRAccQF is a StandardQF with a=1. - It is order equivalent to the difference in the observed + It is order-equivalent to the difference in the observed and expected number of positive instances. """ def __init__(self): - """ """ - + """Initialize the WRAccQF.""" super().__init__(1.0) ##### -# GeneralizationAware Interestingness Measures +# Generalization-Aware Interestingness Measures ##### class GeneralizationAware_StandardQF( GeneralizationAwareQF_stats, BoundedInterestingnessMeasure ): + """Generalization-Aware Standard Quality Function. + + Extends the StandardQF to consider generalizations during subgroup discovery, + providing methods for optimistic estimates and aggregate statistics. + """ + ga_sQF_agg_tuple = namedtuple( "ga_sQF_agg_tuple", ["max_p", "min_delta_negatives", "min_negatives"] ) def __init__(self, a, optimistic_estimate_strategy="default"): + """ + Initialize the GeneralizationAware_StandardQF. + + Parameters: + a (float): Exponent to trade-off the relative size with the difference in means. + optimistic_estimate_strategy (str, optional): Strategy for optimistic estimates. + """ super().__init__(StandardQF(a)) if optimistic_estimate_strategy in ("default", "difference"): self.optimistic_estimate = self.difference_based_optimistic_estimate @@ -480,6 +667,17 @@ def __init__(self, a, optimistic_estimate_strategy="default"): self.a = a def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup considering generalizations. + + Parameters: + subgroup: The subgroup to evaluate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The computed quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats if sg_stats.size_sg == 0: @@ -492,6 +690,15 @@ def evaluate(self, subgroup, target, data, statistics=None): ) def max_based_aggregate_statistics(self, stats_subgroup, list_of_pairs): + """Aggregate statistics using the maximum-based strategy. + + Parameters: + stats_subgroup: Statistics of the current subgroup. + list_of_pairs: List of (stats, agg_tuple) for all generalizations. + + Returns: + The aggregated statistics. + """ if len(list_of_pairs) == 0: return stats_subgroup max_ratio = -100 @@ -509,8 +716,16 @@ def max_based_aggregate_statistics(self, stats_subgroup, list_of_pairs): return max_stats def max_based_optimistic_estimate(self, subgroup, target, data, statistics=None): - """ - Computes the oe as the hypothetical subgroup containing only positive instances + """Compute the optimistic estimate using the maximum-based strategy. + + Parameters: + subgroup: The subgroup for which to compute the estimate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + float: The optimistic estimate of the quality value. """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats @@ -524,26 +739,41 @@ def max_based_optimistic_estimate(self, subgroup, target, data, statistics=None) ) def max_based_read_p(self, agg_tuple): + """Read the p-value from the aggregate tuple using the maximum-based strategy. + + Parameters: + agg_tuple: The aggregate statistics tuple. + + Returns: + float: The ratio of positives in the aggregate statistics. + """ return agg_tuple.positives_count / agg_tuple.size_sg def difference_based_optimistic_estimate(self, subgroup, target, data, statistics): + """Compute the optimistic estimate using the difference-based strategy. + + Parameters: + subgroup: The subgroup for which to compute the estimate. + target (BinaryTarget): The target definition. + data (pandas DataFrame): The dataset. + statistics (any): Current statistics. + + Returns: + float: The optimistic estimate of the quality value. + """ sg_stats, agg_stats = self.ensure_statistics(subgroup, target, data, statistics) if np.isposinf(agg_stats.min_delta_negatives): return np.inf delta_n = agg_stats.min_delta_negatives size_dataset = self.qf.dataset_statistics.size_sg - tau_diff = 0 if self.qf.a == 0: pos = 1 - # return delta_n /(1 + delta_n) elif self.qf.a == 1.0: pos = sg_stats.positives_count - # return pos / size_dataset * delta_n /(pos + delta_n) else: a = self.qf.a p_hat = min(np.ceil(a * delta_n / (1 - a)), sg_stats.positives_count) pos = p_hat - # return (p_hat / size_dataset) ** a * delta_n /(p_hat+delta_n) tau_diff = pos / (pos + delta_n) if sg_stats.size_sg > 0: tau_sg = sg_stats.positives_count / sg_stats.size_sg @@ -553,8 +783,14 @@ def difference_based_optimistic_estimate(self, subgroup, target, data, statistic return (sg_stats.positives_count / size_dataset) ** self.a * (1 - tau_max) def difference_based_agg_function(self, stats_subgroup, list_of_pairs): - """ - list_of_pairs is a list of (stats, agg_tuple) for all the generalizations + """Aggregate statistics using the difference-based strategy. + + Parameters: + stats_subgroup: Statistics of the current subgroup. + list_of_pairs: List of (stats, agg_tuple) for all generalizations. + + Returns: + namedtuple: Aggregate statistics tuple. """ def get_negatives_count(sg_stats): @@ -595,4 +831,12 @@ def get_percentage_positives(sg_stats): ) def difference_based_read_p(self, agg_tuple): + """Read the p-value from the aggregate tuple using the difference-based strategy. + + Parameters: + agg_tuple: The aggregate statistics tuple. + + Returns: + float: The maximum percentage of positives. + """ return agg_tuple.max_p diff --git a/src/pysubgroup/constraints.py b/src/pysubgroup/constraints.py index 32cad5d..956e0bd 100644 --- a/src/pysubgroup/constraints.py +++ b/src/pysubgroup/constraints.py @@ -2,27 +2,72 @@ class MinSupportConstraint: + """ + A constraint that ensures a subgroup has at least a minimum support. + + Attributes: + min_support (int): The minimum number of instances that a subgroup must cover. + """ + def __init__(self, min_support): + """ + Initializes the MinSupportConstraint with the specified minimum support. + + Parameters: + min_support (int): The minimum support required for subgroups. + """ self.min_support = min_support @property def is_monotone(self): + """ + Indicates whether the constraint is monotone. + + Returns: + bool: True if the constraint is monotone, False otherwise. + """ return True def is_satisfied(self, subgroup, statistics=None, data=None): + """ + Checks if the subgroup satisfies the minimum support constraint. + + Parameters: + subgroup: The subgroup to be evaluated. + statistics: Precomputed statistics for the subgroup (optional). + data: The dataset being analyzed (optional). + + Returns: + bool: True if the subgroup's size is at least the minimum support, False otherwise. + """ if hasattr(statistics, "size_sg"): return statistics.size_sg >= self.min_support if isinstance(statistics, dict) and "size_sg" in statistics: return statistics["size_sg"] >= self.min_support try: return ps.get_size(subgroup, len(data), data) >= self.min_support - except AttributeError: # special case for gp_growth + except AttributeError: # Special case for gp_growth algorithm return self.get_size_sg(statistics) def gp_prepare(self, qf): + """ + Prepares the constraint for the GP-Growth algorithm by accessing the size function. + + Parameters: + qf: The quality function used in the GP-Growth algorithm. + """ self.get_size_sg = ( qf.gp_size_sg ) # pylint: disable=attribute-defined-outside-init def gp_is_satisfied(self, node): + """ + Checks if a node satisfies the constraint in the GP-Growth algorithm. + + Parameters: + node: The node to be evaluated. + + Returns: + bool: True if the node's size is at least the minimum support, False otherwise. + """ return self.get_size_sg(node) >= self.min_support diff --git a/src/pysubgroup/datasets.py b/src/pysubgroup/datasets.py index 5d3454a..87bc4e0 100644 --- a/src/pysubgroup/datasets.py +++ b/src/pysubgroup/datasets.py @@ -1,3 +1,8 @@ +""" +This module provides functions to load example datasets for testing and demonstration purposes. +The datasets included are the German Credit Data and the Titanic dataset. +""" + from io import StringIO import pandas as pd @@ -6,14 +11,31 @@ def get_credit_data(): + """Load the German Credit Data dataset. + + The dataset is provided in ARFF format and includes various attributes related to creditworthiness. + + Returns: + pandas.DataFrame: A DataFrame containing the credit data. + """ s_io = StringIO( - str(pkg_resources.resource_string("pysubgroup", "data/credit-g.arff"), "utf-8") + pkg_resources.resource_string("pysubgroup", "data/credit-g.arff").decode( + "utf-8" + ) ) - return pd.DataFrame(arff.loadarff(s_io)[0]) + data = arff.loadarff(s_io)[0] + return pd.DataFrame(data) def get_titanic_data(): + """Load the Titanic dataset. + + The dataset includes information about the passengers on the Titanic, such as age, sex, class, and survival status. + + Returns: + pandas.DataFrame: A DataFrame containing the Titanic data. + """ s_io = StringIO( - str(pkg_resources.resource_string("pysubgroup", "data/titanic.csv"), "utf-8") + pkg_resources.resource_string("pysubgroup", "data/titanic.csv").decode("utf-8") ) - return pd.read_csv(s_io, sep="\t", header=[0]) + return pd.read_csv(s_io, sep="\t", header=0) diff --git a/src/pysubgroup/fi_target.py b/src/pysubgroup/fi_target.py index 5969389..f8059e5 100644 --- a/src/pysubgroup/fi_target.py +++ b/src/pysubgroup/fi_target.py @@ -2,6 +2,9 @@ Created on 29.09.2017 @author: lemmerfn + +This module defines the FITarget and related quality functions for frequent itemset mining +using the pysubgroup package. """ from collections import namedtuple from functools import total_ordering @@ -11,25 +14,54 @@ @total_ordering class FITarget(ps.BaseTarget): + """Target class for frequent itemset mining. + + Represents the target for mining frequent itemsets, + extending the BaseTarget class from pysubgroup. + """ + statistic_types = ("size_sg", "size_dataset") def __repr__(self): + """String representation of the FITarget.""" return "T: Frequent Itemsets" def __eq__(self, other): + """Check equality based on the instance dictionary.""" return self.__dict__ == other.__dict__ def __lt__(self, other): + """Define less-than comparison for sorting purposes.""" return str(self) < str(other) # pragma: no cover def get_attributes(self): + """Return an empty list as attributes are not used in FITarget.""" return [] def get_base_statistics(self, subgroup, data): + """Compute the base statistics for the subgroup. + + Parameters: + subgroup: The subgroup for which to compute statistics. + data: The dataset. + + Returns: + int: The size of the subgroup. + """ _, size = ps.get_cover_array_and_size(subgroup, len(data), data) return size def calculate_statistics(self, subgroup_description, data, cached_statistics=None): + """Calculate statistics for the subgroup. + + Parameters: + subgroup_description: The description of the subgroup. + data: The dataset. + cached_statistics (dict, optional): Previously computed statistics. + + Returns: + dict: A dictionary containing 'size_sg' and 'size_dataset'. + """ if self.all_statistics_present(cached_statistics): return cached_statistics @@ -41,10 +73,16 @@ def calculate_statistics(self, subgroup_description, data, cached_statistics=Non class SimpleCountQF(ps.AbstractInterestingnessMeasure): + """Quality function that counts the number of instances in a subgroup. + + Provides basic counting functionality, useful for frequent itemset mining. + """ + tpl = namedtuple("CountQF_parameters", ("size_sg")) gp_requires_cover_arr = False def __init__(self): + """Initialize the SimpleCountQF.""" self.required_stat_attrs = ("size_sg",) self.has_constant_statistics = True self.size_dataset = None @@ -52,46 +90,147 @@ def __init__(self): def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument + """Calculate statistics that remain constant for the dataset. + + Parameters: + data: The dataset. + target: The target definition (unused in this implementation). + """ self.size_dataset = len(data) def calculate_statistics( self, subgroup_description, target, data, statistics=None ): # pylint: disable=unused-argument + """Calculate statistics specific to the subgroup. + + Parameters: + subgroup_description: The description of the subgroup. + target: The target definition (unused in this implementation). + data: The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + namedtuple: Contains 'size_sg' for the subgroup. + """ _, size = ps.get_cover_array_and_size( subgroup_description, self.size_dataset, data ) return SimpleCountQF.tpl(size) def gp_get_stats(self, _): + """Get statistics for a single instance (used in GP-Growth algorithms). + + Returns: + dict: A dictionary with 'size_sg' set to 1. + """ return {"size_sg": 1} def gp_get_null_vector(self): + """Get a null vector for initialization in GP-Growth algorithms. + + Returns: + dict: A dictionary with 'size_sg' set to 0. + """ return {"size_sg": 0} def gp_merge(self, left, right): + """Merge two statistics dictionaries by summing 'size_sg'. + + Parameters: + left (dict): Left statistics dictionary. + right (dict): Right statistics dictionary. + """ left["size_sg"] += right["size_sg"] def gp_get_params(self, _cover_arr, v): + """Extract parameters from the statistics dictionary. + + Parameters: + _cover_arr: Unused parameter. + v (dict): Statistics dictionary. + + Returns: + namedtuple: Contains 'size_sg' from the statistics. + """ return SimpleCountQF.tpl(v["size_sg"]) def gp_to_str(self, stats): + """Convert statistics to a string representation. + + Parameters: + stats (dict): Statistics dictionary. + + Returns: + str: String representation of 'size_sg'. + """ return str(stats["size_sg"]) def gp_size_sg(self, stats): + """Get the size of the subgroup from the statistics. + + Parameters: + stats (dict): Statistics dictionary. + + Returns: + int: Size of the subgroup. + """ return stats["size_sg"] class CountQF(SimpleCountQF, ps.BoundedInterestingnessMeasure): + """Quality function that evaluates subgroups based on their size. + + Extends SimpleCountQF and BoundedInterestingnessMeasure. + """ + def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup. + + Parameters: + subgroup: The subgroup to evaluate. + target: The target definition. + data: The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + int: The size of the subgroup. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.size_sg def optimistic_estimate(self, subgroup, target, data, statistics=None): + """Compute the optimistic estimate of the quality function. + + Parameters: + subgroup: The subgroup for which to compute the optimistic estimate. + target: The target definition. + data: The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + int: The size of the subgroup. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.size_sg class AreaQF(SimpleCountQF): + """Quality function that evaluates subgroups based on their area. + + The area is computed as the size of the subgroup multiplied by the number of contained items + """ + def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup. + + Parameters: + subgroup: The subgroup to evaluate. + target: The target definition. + data: The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + int: The area of the subgroup (size_sg * depth). + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.size_sg * subgroup.depth diff --git a/src/pysubgroup/gp_growth.py b/src/pysubgroup/gp_growth.py index 446dce4..08ec154 100644 --- a/src/pysubgroup/gp_growth.py +++ b/src/pysubgroup/gp_growth.py @@ -9,31 +9,81 @@ def identity(x, *args, **kwargs): # pylint: disable=unused-argument + """ + Identity function used as a placeholder for tqdm when progress bars are not needed. + + Parameters: + x: The input value to return. + *args: Variable length argument list. + **kwargs: Arbitrary keyword arguments. + + Returns: + The input value x. + """ return x class GpGrowth: + """ + Implementation of the GP-Growth algorithm. + + GP-Growth is a generalization of FP-Growth and SD-Map capable of working with different + Exceptional Model Mining targets on top of Frequent Itemset Mining and Subgroup Discovery. + + This class provides methods to perform pattern mining using GP-Growth, supporting both + bottom-up ('b_u') and top-down ('t_d') modes. + + Attributes: + GP_node (namedtuple): Structure representing a node in the GP-tree. + minSupp (int): Minimum support threshold (currently unused). + tqdm (function): Function for progress bars (default is identity function). + depth (int): Maximum depth of the search. + mode (str): Mode of the algorithm ('b_u' for bottom-up, 't_d' for top-down). + constraints_monotone (list): List of monotonic constraints. + results (list): List to store the resulting subgroups. + task (SubgroupDiscoveryTask): The subgroup discovery task to execute. + """ + def __init__(self, mode="b_u"): + """ + Initializes the GpGrowth algorithm with the specified mode. + + Parameters: + mode (str): The mode of the algorithm ('b_u' for bottom-up, 't_d' for top-down). + """ self.GP_node = namedtuple( "GP_node", ["cls", "id", "parent", "children", "stats"] ) self.minSupp = 10 - self.tqdm = identity + self.tqdm = identity # Placeholder for progress bar function self.depth = 0 - self.mode = mode # specify eihther b_u (bottom up) or t_d (top down) + self.mode = mode # Specify either 'b_u' (bottom-up) or 't_d' (top-down) self.constraints_monotone = [] self.results = [] self.task = [] - # Future: There also is the option of a stable mode + # Future: There is also the option of a stable mode # which never creates the prefix trees def prepare_selectors(self, search_space, data): + """ + Prepares the selectors by computing their coverage arrays and filtering based on constraints. + + Parameters: + search_space (list): The list of selectors to consider. + data (DataFrame): The dataset to be analyzed. + + Returns: + tuple: A tuple containing: + - selectors_sorted (list): The sorted list of selectors after filtering. + - arrs (ndarray): A 2D NumPy array where each column corresponds to the coverage array of a selector. + """ selectors = [] - assert len(search_space) > 0, "Provided searchspace was empty" + assert len(search_space) > 0, "Provided search space was empty" for selector in search_space: cov_arr = selector.covers(data) selectors.append((np.count_nonzero(cov_arr), selector, cov_arr)) + # Filter selectors based on monotonic constraints selectors = [ (size, selector, arr) for size, selector, arr in selectors @@ -42,26 +92,36 @@ def prepare_selectors(self, search_space, data): for constraint in self.constraints_monotone ) ] + # Sort selectors in decreasing order of support (size) sorted_selectors = sorted(selectors, reverse=True) + + # Remove selectors with low optimistic estimate if necessary self.remove_selectors_with_low_optimistic_estimate( sorted_selectors, len(search_space) ) + # Extract the sorted selectors and their coverage arrays selectors_sorted = [selector for size, selector, arr in sorted_selectors] if len(selectors_sorted) == 0: arrs = np.empty((0, 0), dtype=np.bool_) else: arrs = np.vstack([arr for size, selector, arr in sorted_selectors]).T - # print(selectors_sorted) return selectors_sorted, arrs def remove_selectors_with_low_optimistic_estimate(self, s, search_space_size): + """ + Removes selectors from the list that have an optimistic estimate below the minimum required quality. + + Parameters: + s (list): List of selectors with their size and coverage arrays. + search_space_size (int): The size of the initial search space. + """ if not hasattr(self.task.qf, "optimistic_estimate"): return if search_space_size > self.task.result_set_size: - # remove selectors which have to lo of an optimistic estimate - # selectors_map = {selector : i for i,(_, selector, _) in enumerate(s)} + # Remove selectors which have too low of an optimistic estimate stats = [] + # Evaluate each selector and update the result set for _, _, cov_arr in s: statistics = self.task.qf.calculate_statistics( cov_arr, self.task.target, self.task.data @@ -75,7 +135,9 @@ def remove_selectors_with_low_optimistic_estimate(self, s, search_space_size): ) del statistics to_pop = [] + # Determine the minimum required quality based on current results min_quality = ps.minimum_required_quality(self.results, self.task) + # Identify selectors to remove for i, ((_, _, cov_arr), statistics) in enumerate(zip(s, stats)): if ( not self.task.qf.optimistic_estimate( @@ -84,20 +146,37 @@ def remove_selectors_with_low_optimistic_estimate(self, s, search_space_size): > min_quality ): to_pop.append(i) + # Update the minimum quality for the task self.task.min_quality = np.nextafter( float(min_quality), self.task.min_quality ) + # Remove the selectors with low optimistic estimate for i in reversed(to_pop): s.pop(i) self.results.clear() def nodes_to_cls_nodes(self, nodes): + """ + Groups nodes by their class labels. + + Parameters: + nodes (list): List of nodes to group. + + Returns: + defaultdict: A dictionary mapping class labels to lists of nodes. + """ cls_nodes = defaultdict(list) for node in nodes: cls_nodes[node.cls].append(node) return cls_nodes def setup_from_quality_function(self, qf): + """ + Sets up function pointers from the quality function. + + Parameters: + qf: The quality function used in the task. + """ # pylint: disable=attribute-defined-outside-init self.get_stats = qf.gp_get_stats self.get_null_vector = qf.gp_get_null_vector @@ -106,37 +185,71 @@ def setup_from_quality_function(self, qf): # pylint: enable=attribute-defined-outside-init def setup_constraints(self, constraints, qf): + """ + Prepares constraints for use in the algorithm. + + Parameters: + constraints (list): List of constraints to apply. + qf: The quality function used in the task. + """ self.constraints_monotone = constraints for constraint in self.constraints_monotone: constraint.gp_prepare(qf) if self.mode == "t_d" and len(self.constraints_monotone) == 0: warnings.warn( - """Poor runtime expected: Top down method does not use + """Poor runtime expected: Top-down method does not use optimistic estimates and no constraints were provided""", UserWarning, ) if len(constraints) == 1: + # Optimize constraint checking if only one constraint is present self.check_constraints = constraints[0].gp_is_satisfied def check_constraints(self, node): # pylint: disable=method-hidden + """ + Checks if a node satisfies all monotonic constraints. + + Parameters: + node: The node to check. + + Returns: + bool: True if the node satisfies all constraints, False otherwise. + """ return all( constraint.gp_is_satisfied(node) for constraint in self.constraints_monotone ) def setup(self, task): + """ + Prepares the algorithm by setting up the task, depth, constraints, and quality function. + + Parameters: + task (SubgroupDiscoveryTask): The task to execute. + """ self.task = task task.qf.calculate_constant_statistics(task.data, task.target) self.depth = task.depth self.setup_constraints(task.constraints_monotone, task.qf) - self.setup_from_quality_function(task.qf) def create_initial_tree(self, arrs): - # Create tree + """ + Creates the initial FP-tree from the coverage arrays. + + Parameters: + arrs (ndarray): A 2D NumPy array where each column corresponds to the coverage array of a selector. + + Returns: + tuple: A tuple containing: + - root (GP_node): The root node of the tree. + - nodes (list): A list of all nodes in the tree. + """ + # Create root node root = self.GP_node(-1, -1, None, {}, self.get_null_vector()) nodes = [] + # Build the tree by inserting transactions for row_index, row in self.tqdm( enumerate(arrs), "creating tree", total=len(arrs) ): @@ -147,13 +260,22 @@ def create_initial_tree(self, arrs): return root, nodes def execute(self, task): + """ + Executes the GP-Growth algorithm on the given task. + + Parameters: + task (SubgroupDiscoveryTask): The subgroup discovery task to execute. + + Returns: + SubgroupDiscoveryResult: The result of the subgroup discovery. + """ assert self.mode in ("b_u", "t_d"), "mode needs to be either b_u or t_d" self.setup(task) selectors_sorted, arrs = self.prepare_selectors(task.search_space, task.data) root, nodes = self.create_initial_tree(arrs) - # mine tree + # Mine the tree cls_nodes = self.nodes_to_cls_nodes(nodes) if self.mode == "b_u": self.recurse(cls_nodes, tuple()) @@ -164,12 +286,23 @@ def execute(self, task): ps.add_if_required( self.results, sg, quality, self.task, statistics=stats ) + # Convert the results to subgroups self.results = self.convert_results_to_subgroups(self.results, selectors_sorted) self.results = ps.prepare_subgroup_discovery_result(self.results, task) return ps.SubgroupDiscoveryResult(self.results, task) def convert_results_to_subgroups(self, results, selectors_sorted): + """ + Converts patterns (indices) to actual subgroups. + + Parameters: + results (list): List of results containing qualities, indices, and statistics. + selectors_sorted (list): The list of sorted selectors. + + Returns: + list: A list of tuples containing quality, subgroup, and statistics. + """ new_result = [] for quality, indices, stats in results: selectors = [selectors_sorted[i] for i in indices] @@ -178,12 +311,24 @@ def convert_results_to_subgroups(self, results, selectors_sorted): return new_result def calculate_quality_function_for_patterns(self, task, results, arrs): + """ + Calculates the quality function for the given patterns. + + Parameters: + task (SubgroupDiscoveryTask): The task containing the quality function. + results (list): List of patterns with their aggregated parameters. + arrs (ndarray): The coverage arrays of the selectors. + + Returns: + list: A list of tuples containing quality, indices, and statistics. + """ out = [] for indices, gp_params in self.tqdm( results, "computing quality function", ): if self.requires_cover_arr: + # Reconstruct the cover array for the pattern if len(indices) == 1: cover_arr = arrs[:, indices[0]] else: @@ -193,26 +338,48 @@ def calculate_quality_function_for_patterns(self, task, results, arrs): else: statistics = task.qf.gp_get_params(None, gp_params) sg = None - # qual1 = task.qf.evaluate(sg, task.qf.calculate_statistics(sg, task.data)) + # Evaluate the quality of the subgroup qual2 = task.qf.evaluate(sg, task.target, task.data, statistics) out.append((qual2, indices, statistics)) return out def normal_insert(self, root, nodes, new_stats, classes): + """ + Inserts a transaction into the FP-tree. + + Parameters: + root (GP_node): The root node of the tree. + nodes (list): List of all nodes in the tree. + new_stats: The statistics associated with the transaction. + classes (array-like): The class labels (selectors) present in the transaction. + + Returns: + GP_node: The leaf node where the transaction ends. + """ node = root for cls in classes: if cls not in node.children: + # Create a new child node if necessary new_child = self.GP_node( cls, len(nodes), node, {}, self.get_null_vector() ) nodes.append(new_child) node.children[cls] = new_child + # Merge the statistics self.merge(node.stats, new_stats) node = node.children[cls] + # Merge statistics at the leaf node self.merge(node.stats, new_stats) return node def add_if_required(self, prefix, gp_stats): + """ + Adds a pattern to the result set if it meets the quality threshold. + + Parameters: + prefix (tuple): The current pattern (tuple of class indices). + gp_stats: The aggregated statistics for the pattern. + """ statistics = self.task.qf.gp_get_params(None, gp_stats) quality = self.task.qf.evaluate(None, None, None, statistics) ps.add_if_required( @@ -220,14 +387,24 @@ def add_if_required(self, prefix, gp_stats): ) def recurse(self, cls_nodes, prefix, is_single_path=False): + """ + Recursively mines patterns in bottom-up mode. + + Parameters: + cls_nodes (defaultdict): Dictionary mapping class labels to nodes. + prefix (tuple): The current pattern prefix. + is_single_path (bool): Flag indicating if the current path is a single path. + """ if len(cls_nodes) == 0: raise RuntimeError # pragma: no cover + # Add current pattern to results self.add_if_required(prefix, cls_nodes[-1][0].stats) if len(prefix) >= self.depth: return # pragma: no cover stats_dict = self.get_stats_for_class(cls_nodes) if not self.requires_cover_arr: + # Prune using optimistic estimate if possible statistics = self.task.qf.gp_get_params(None, cls_nodes[-1][0].stats) optimistic_estimate = self.task.qf.optimistic_estimate( None, self.task.target, self.task.data, statistics @@ -237,20 +414,19 @@ def recurse(self, cls_nodes, prefix, is_single_path=False): ): return if is_single_path: + # Handle single-path optimization if len(cls_nodes) == 1 and -1 in cls_nodes: return - del stats_dict[-1] # remove root node + del stats_dict[-1] # Remove root node all_combinations = ps.powerset( stats_dict.keys(), max_length=self.depth - len(prefix) + 1 ) for comb in all_combinations: - # it might still be, that stats_dict[comb[-1]] is wrong - # if that is the case then - # stats_dict[comb[0]] is correct if len(comb) > 0: self.add_if_required(prefix + comb, stats_dict[comb[-1]]) else: + # Recursively mine each child node for cls, nodes in cls_nodes.items(): if cls >= 0: if self.check_constraints(stats_dict[cls]): @@ -263,10 +439,17 @@ def recurse(self, cls_nodes, prefix, is_single_path=False): self.recurse(new_tree, (*prefix, cls), is_single_path_now) def recurse_top_down(self, cls_nodes, root, depth_in=0): - # print(f"{depth_in}"+"\t"*depth_in+str(root.cls)) - # print("init root", root.cls) - # print(depth_in) - # self.check_tree_is_ordered(root) + """ + Recursively mines patterns in top-down mode. + + Parameters: + cls_nodes (defaultdict): Dictionary mapping class labels to nodes. + root (GP_node): The current root node. + depth_in (int): The current depth in the recursion. + + Returns: + list: A list of patterns with their aggregated statistics. + """ results = [] curr_depth = depth_in @@ -275,19 +458,15 @@ def recurse_top_down(self, cls_nodes, root, depth_in=0): key: self.check_constraints(gp_stats) for key, gp_stats in stats_dict.items() } - # init_class = root.cls - # direct_child = None init_root = root alpha = [] + # Traverse down single paths while True: if root.cls == -1: pass else: alpha.append(root.cls) if len(root.children) == 1 and curr_depth <= self.depth: - # print(f"Path optmization {len(root.children)}") - # curr_depth += 1 - potential_root = next(iter(root.children.values())) if is_valid_class[potential_root.cls]: root = potential_root @@ -295,14 +474,9 @@ def recurse_top_down(self, cls_nodes, root, depth_in=0): break else: break - # self.get_prefixes_top_down(alpha, max_length=self.depth - depth_in + 1) # - # assert len(alpha) > 0 + # Generate prefixes from alpha prefixes = list(ps.powerset(alpha, max_length=self.depth - depth_in + 1))[1:] - # prefixes = list(map(lambda x: sum(x, tuple()), prefixes)) - # print(root.cls, list(root.children), prefixes) - # print("AAA", list(cls_nodes.keys())) - if init_root.cls == -1: prefixes.append(tuple()) for prefix in prefixes: @@ -314,11 +488,9 @@ def recurse_top_down(self, cls_nodes, root, depth_in=0): assert is_valid_class[cls] results.append((prefix, stats_dict[cls])) - # suffixes = [((), root.stats)] - suffixes = [] if curr_depth == (self.depth - 1): - # print(f"{depth_in}"+"\t"*depth_in+"B") + # Handle leaf nodes for cls, stats in stats_dict.items(): if cls < 0 or cls in alpha: continue @@ -327,40 +499,20 @@ def recurse_top_down(self, cls_nodes, root, depth_in=0): ), f"{cls} {max(alpha)}, {alpha}, {list(stats_dict.keys())}" suffixes.append(((cls,), stats)) else: - # print(f"{depth_in}"+"\t"*depth_in+"A") + # Recursively mine child nodes for cls in stats_dict: if cls < 0 or cls in alpha: continue if is_valid_class[cls]: - # Future: There is also the possibility - # to compute the stats_dict of the prefix tree - # without creating the prefix tree first - # This might be useful if curr_depth == self.depth - 2 - # as we need not recreate the tree new_root, nodes = self.get_top_down_tree_for_class( cls_nodes, cls, is_valid_class ) - # self.check_tree_is_ordered(new_root) - # self.check_tree_is_ordered(init_root) assert len(nodes) > 0 new_cls_nodes = self.nodes_to_cls_nodes(nodes) - # new_dict = self.get_stats_for_class(new_cls_nodes) - # for key, value in new_dict.items(): - # if isinstance(stats_dict[key], dict): - # continue - # assert stats_dict[key][0]>=value[0], \ - # f"{stats_dict[key][0]} {value[0]}" - # assert stats_dict[key][1]>=value[1], \ - # f"{stats_dict[key][1]} {value[1]}" - # print(" " * curr_depth, cls, curr_depth, len(new_cls_nodes)) suffixes.extend( self.recurse_top_down(new_cls_nodes, new_root, curr_depth + 1) ) - # if prefixes == [(12,), (13,)]: - # print(f"{depth_in}"+"\t"*depth_in+ "pre, suf", prefixes) - - # the combination below can be optimized to avoid the if - # by first grouping them by length + # Combine prefixes and suffixes to form new patterns results.extend( [ ((*pre, *suf), gp_stats) @@ -369,13 +521,19 @@ def recurse_top_down(self, cls_nodes, root, depth_in=0): and (len(pre) == 0 or pre[-1] < suf[0]) ] ) - # if prefixes == [(12,), (13,)]: - # print(f"{depth_in}"+"\t"*depth_in+ "results", results) - # print() return results def check_tree_is_ordered(self, root, prefix=None): # pragma: no cover - """Verify that the nodes of a tree are sorted in ascending order""" + """ + Verifies that the nodes of a tree are sorted in ascending order. + + Parameters: + root (GP_node): The root node of the tree. + prefix (list): The current path prefix. + + Returns: + set: A set of class labels in the tree. + """ if prefix is None: prefix = [] s = {root.cls} @@ -386,11 +544,22 @@ def check_tree_is_ordered(self, root, prefix=None): # pragma: no cover return s def get_top_down_tree_for_class(self, cls_nodes, cls, is_valid_class): - # Future: Can eventually also remove infrequent nodes already - # during tree creation + """ + Creates a subtree for a specific class in top-down mode. + + Parameters: + cls_nodes (defaultdict): Dictionary mapping class labels to nodes. + cls (int): The class label to create the subtree for. + is_valid_class (dict): Dictionary indicating valid classes. + + Returns: + tuple: A tuple containing: + - base_root (GP_node): The root of the new subtree. + - nodes (list): A list of nodes in the new subtree. + """ base_root = None nodes = [] - if len(cls_nodes[cls]) > 0 and is_valid_class[cls]: # pragma: no branch okay + if len(cls_nodes[cls]) > 0 and is_valid_class[cls]: # pragma: no branch base_root = self.create_copy_of_tree_top_down( cls_nodes[cls][0], nodes, is_valid_class=is_valid_class ) @@ -401,10 +570,20 @@ def get_top_down_tree_for_class(self, cls_nodes, cls, is_valid_class): def create_copy_of_tree_top_down( self, from_root, nodes=None, parent=None, is_valid_class=None ): + """ + Creates a copy of the tree starting from a specific root in top-down mode. + + Parameters: + from_root (GP_node): The root node to copy from. + nodes (list): List to store the new nodes. + parent (GP_node): The parent of the new root node. + is_valid_class (dict): Dictionary indicating valid classes. + + Returns: + GP_node: The new root node of the copied subtree. + """ if nodes is None: nodes = [] # pragma: no cover - # if len(nodes) == 0: - # root_cls = -1 children = {} new_root = self.GP_node( from_root.cls, len(nodes), parent, children, from_root.stats.copy() @@ -413,7 +592,7 @@ def create_copy_of_tree_top_down( for child_cls, child in from_root.children.items(): if ( is_valid_class is None or child_cls in is_valid_class - ): # pragma: no branch okay + ): # pragma: no branch new_child = self.create_copy_of_tree_top_down( child, nodes, new_root, is_valid_class=is_valid_class ) @@ -421,9 +600,19 @@ def create_copy_of_tree_top_down( return new_root def merge_trees_top_down(self, nodes, mutable_root, from_root, is_valid_class): + """ + Merges two trees in top-down mode. + + Parameters: + nodes (list): List of nodes in the mutable tree. + mutable_root (GP_node): The root of the mutable tree to merge into. + from_root (GP_node): The root of the tree to merge from. + is_valid_class (dict): Dictionary indicating valid classes. + """ self.merge(mutable_root.stats, from_root.stats) for cls in from_root.children: if cls not in mutable_root.children: + # Add new child to mutable root new_child = self.create_copy_of_tree_top_down( from_root.children[cls], nodes, @@ -432,6 +621,7 @@ def merge_trees_top_down(self, nodes, mutable_root, from_root, is_valid_class): ) mutable_root.children[cls] = new_child else: + # Merge existing child nodes self.merge_trees_top_down( nodes, mutable_root.children[cls], @@ -440,6 +630,15 @@ def merge_trees_top_down(self, nodes, mutable_root, from_root, is_valid_class): ) def get_stats_for_class(self, cls_nodes): + """ + Aggregates statistics for each class label. + + Parameters: + cls_nodes (defaultdict): Dictionary mapping class labels to nodes. + + Returns: + dict: A dictionary mapping class labels to aggregated statistics. + """ out = {} for key, nodes in cls_nodes.items(): s = self.get_null_vector() @@ -449,18 +648,34 @@ def get_stats_for_class(self, cls_nodes): return out def create_new_tree_from_nodes(self, nodes): + """ + Creates a new tree from a list of nodes for recursive mining. + + Parameters: + nodes (list): List of nodes to build the new tree from. + + Returns: + defaultdict: A dictionary mapping class labels to nodes in the new tree. + """ new_nodes = {} for node in nodes: nodes_upwards = self.get_nodes_upwards(node) self.create_copy_of_path(nodes_upwards[1:], new_nodes, node.stats) - # self.remove_infrequent_nodes(new_nodes) cls_nodes = defaultdict(list) for new_node in new_nodes.values(): cls_nodes[new_node.cls].append(new_node) return cls_nodes def create_copy_of_path(self, nodes, new_nodes, stats): + """ + Creates a copy of a path in the tree, updating statistics. + + Parameters: + nodes (list): The list of nodes in the path. + new_nodes (dict): Dictionary to store new nodes. + stats: The statistics to merge into the nodes. + """ parent = None for node in reversed(nodes): if node.id not in new_nodes: @@ -474,6 +689,15 @@ def create_copy_of_path(self, nodes, new_nodes, stats): parent = new_node def get_nodes_upwards(self, node): + """ + Retrieves all nodes from a given node up to the root. + + Parameters: + node (GP_node): The starting node. + + Returns: + list: A list of nodes from the given node up to the root. + """ ref = node path = [] while True: @@ -484,6 +708,13 @@ def get_nodes_upwards(self, node): return path def to_file(self, task, path): + """ + Writes the tree to a file in a specific format. + + Parameters: + task (SubgroupDiscoveryTask): The task containing the quality function. + path (str or Path): The file path to write to. + """ self.setup(task) _, arrs = self.prepare_selectors(task.search_space, task.data) diff --git a/src/pysubgroup/measures.py b/src/pysubgroup/measures.py index 872e02a..0ef73f0 100644 --- a/src/pysubgroup/measures.py +++ b/src/pysubgroup/measures.py @@ -170,6 +170,7 @@ def overlaps_list(sg, list_of_sgs, data, similarity_level=0.9): return False +# Wrapper for other measures class CountCallsInterestingMeasure(BoundedInterestingnessMeasure): def __init__(self, qf): self.qf = qf diff --git a/src/pysubgroup/model_target.py b/src/pysubgroup/model_target.py index 2dcf780..9df9e27 100644 --- a/src/pysubgroup/model_target.py +++ b/src/pysubgroup/model_target.py @@ -4,22 +4,42 @@ import pysubgroup as ps +# Define a named tuple to store regression parameters and subgroup size beta_tuple = namedtuple("beta_tuple", ["beta", "size_sg"]) class EMM_Likelihood(ps.AbstractInterestingnessMeasure): + """Exceptional Model Mining likelihood-based interestingness measure. + + This class computes the difference in likelihoods between a subgroup model + and the inverse (complement) model, providing a measure of how exceptional + the subgroup is with respect to the entire dataset. + """ + + # Define a named tuple to store model parameters and likelihoods tpl = namedtuple( "EMM_Likelihood", ["model_params", "subgroup_likelihood", "inverse_likelihood", "size"], ) def __init__(self, model): + """Initialize the EMM_Likelihood measure with a given model. + + Parameters: + model: An instance of a model class that provides fit and likelihood methods. + """ self.model = model self.has_constant_statistics = False self.required_stat_attrs = EMM_Likelihood.tpl._fields self.data_size = None def calculate_constant_statistics(self, data, target): + """Calculate statistics that remain constant over all subgroups. + + Parameters: + data: The dataset as a pandas DataFrame. + target: The target variable (unused in this context). + """ self.model.calculate_constant_statistics(data, target) self.data_size = len(data) self.has_constant_statistics = True @@ -27,47 +47,121 @@ def calculate_constant_statistics(self, data, target): def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument + """Calculate statistics specific to a subgroup. + + Parameters: + subgroup: The subgroup description. + target: The target variable (unused in this context). + data: The dataset as a pandas DataFrame. + statistics: Previously calculated statistics (optional). + + Returns: + An EMM_Likelihood.tpl namedtuple containing model parameters, + subgroup likelihood, inverse likelihood, and subgroup size. + """ cover_arr, sg_size = ps.get_cover_array_and_size(subgroup, self.data_size, data) params = self.model.fit(cover_arr, data) return self.get_tuple(sg_size, params, cover_arr) def get_tuple(self, sg_size, params, cover_arr): - # numeric stability? + """Compute the likelihoods for the subgroup and its complement. + + Parameters: + sg_size: Size of the subgroup. + params: Model parameters obtained from fitting the subgroup. + cover_arr: Boolean array indicating the instances in the subgroup. + + Returns: + An EMM_Likelihood.tpl namedtuple with the computed statistics. + """ + # Compute likelihoods for all data instances all_likelihood = self.model.likelihood( params, np.ones(self.data_size, dtype=bool) ) + # Sum of likelihoods for subgroup instances sg_likelihood_sum = np.sum(all_likelihood[cover_arr]) + # Sum of likelihoods for all instances total_likelihood_sum = np.sum(all_likelihood) + # Compute average likelihood for the complement (inverse) subgroup dataset_average = np.nan if (self.data_size - sg_size) > 0: dataset_average = (total_likelihood_sum - sg_likelihood_sum) / ( self.data_size - sg_size ) + # Compute average likelihood for the subgroup sg_average = np.nan if sg_size > 0: sg_average = sg_likelihood_sum / sg_size return EMM_Likelihood.tpl(params, sg_average, dataset_average, sg_size) def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the interestingness of a subgroup. + + Parameters: + subgroup: The subgroup description. + target: The target variable (unused in this context). + data: The dataset as a pandas DataFrame. + statistics: Previously calculated statistics (optional). + + Returns: + The difference between subgroup likelihood and inverse likelihood. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) - # numeric stability? return statistics.subgroup_likelihood - statistics.inverse_likelihood def gp_get_params(self, cover_arr, v): + """Get parameters for GP-Growth algorithm. + + Parameters: + cover_arr: Boolean array indicating the instances in the subgroup. + v: Statistics vector from GP-Growth. + + Returns: + An EMM_Likelihood.tpl namedtuple with the computed statistics. + """ params = self.model.gp_get_params(v) sg_size = params.size_sg return self.get_tuple(sg_size, params, cover_arr) @property def gp_requires_cover_arr(self): + """Indicate whether the GP-Growth algorithm requires a cover array. + + Returns: + True, since the cover array is required. + """ return True def __getattr__(self, name): + """Delegate attribute access to the underlying model. + + Parameters: + name: Name of the attribute. + + Returns: + The attribute from the model if it exists. + """ return getattr(self.model, name) class PolyRegression_ModelClass: + """Polynomial Regression Model Class for Exceptional Model Mining. + + Provides methods to fit a polynomial regression model to a subgroup and + compute likelihoods for Exceptional Model Mining. + """ + def __init__(self, x_name="x", y_name="y", degree=1): + """Initialize the Polynomial Regression Model. + + Parameters: + x_name (str): Name of the independent variable in the data. + y_name (str): Name of the dependent variable in the data. + degree (int): Degree of the polynomial (currently only degree=1 is supported). + + Raises: + ValueError: If degree is not equal to 1. + """ self.x_name = x_name self.y_name = y_name if degree != 1: @@ -81,12 +175,24 @@ def __init__(self, x_name="x", y_name="y", degree=1): def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument + """Calculate statistics that remain constant over all subgroups. + + Parameters: + data: The dataset as a pandas DataFrame. + target: The target variable (unused in this context). + """ self.x = data[self.x_name].to_numpy() self.y = data[self.y_name].to_numpy() self.has_constant_statistics = True @staticmethod def gp_merge(u, v): + """Merge two statistics vectors for the GP-Growth algorithm. + + Parameters: + u (numpy.ndarray): Left statistics vector. + v (numpy.ndarray): Right statistics vector. + """ v0 = v[0] u0 = u[0] if v0 == 0 or u0 == 0: @@ -97,47 +203,122 @@ def gp_merge(u, v): u[3] += d def gp_get_null_vector(self): + """Get a null vector for initialization in the GP-Growth algorithm. + + Returns: + numpy.ndarray: Zero-initialized array of size 5. + """ return np.zeros(5) def gp_get_stats(self, row_index): + """Get statistics for a single row (used in GP-Growth algorithm). + + Parameters: + row_index (int): Index of the row in the dataset. + + Returns: + numpy.ndarray: Statistics vector for the given row. + """ x = self.x[row_index] return np.array([1, x, self.y[row_index], 0, x * x]) def gp_get_params(self, v): + """Extract model parameters from the statistics vector. + + Parameters: + v (numpy.ndarray): Statistics vector. + + Returns: + beta_tuple: Contains regression coefficients and subgroup size. + """ size = v[0] if size < self.degree: return beta_tuple(np.full(self.degree + 1, np.nan), size) v1 = v[1] + # Compute slope and intercept for linear regression slope = v[0] * v[3] / (v[0] * v[4] - v1 * v1) - intersept = v[2] / v[0] - slope * v[1] / v[0] - return beta_tuple(np.array([slope, intersept]), v[0]) + intercept = v[2] / v[0] - slope * v[1] / v[0] + return beta_tuple(np.array([slope, intercept]), v[0]) def gp_to_str(self, stats): + """Convert statistics to a string representation. + + Parameters: + stats (numpy.ndarray): Statistics vector. + + Returns: + str: String representation of the statistics. + """ return " ".join(map(str, stats)) def gp_size_sg(self, stats): + """Get the size of the subgroup from the statistics. + + Parameters: + stats (numpy.ndarray): Statistics vector. + + Returns: + float: Size of the subgroup. + """ return stats[0] @property def gp_requires_cover_arr(self): + """Indicate whether the GP-Growth algorithm requires a cover array. + + Returns: + False, since the cover array is not required. + """ return False def fit(self, subgroup, data=None): + """Fit the polynomial regression model to the subgroup data. + + Parameters: + subgroup: The subgroup description. + data: The dataset as a pandas DataFrame (optional). + + Returns: + beta_tuple: Contains regression coefficients and subgroup size. + """ cover_arr, size = ps.get_cover_array_and_size(subgroup, len(self.x), data) if size <= self.degree + 1: return beta_tuple(np.full(self.degree + 1, np.nan), size) + # Fit polynomial regression model to subgroup data return beta_tuple( np.polyfit(self.x[cover_arr], self.y[cover_arr], deg=self.degree), size ) def likelihood(self, stats, sg): + """Compute the likelihoods for the subgroup instances. + + Parameters: + stats (beta_tuple): Regression parameters and subgroup size. + sg (numpy.ndarray): Boolean array indicating subgroup instances. + + Returns: + numpy.ndarray: Likelihood values for the subgroup instances. + """ from scipy.stats import norm # pylint: disable=import-outside-toplevel if any(np.isnan(stats.beta)): return np.full(self.x[sg].shape, np.nan) - return norm.pdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg]) + # Compute the residuals and evaluate the normal probability density function + residuals = np.polyval(stats.beta, self.x[sg]) - self.y[sg] + return norm.pdf(residuals) def loglikelihood(self, stats, sg): + """Compute the log-likelihoods for the subgroup instances. + + Parameters: + stats (beta_tuple): Regression parameters and subgroup size. + sg (numpy.ndarray): Boolean array indicating subgroup instances. + + Returns: + numpy.ndarray: Log-likelihood values for the subgroup instances. + """ from scipy.stats import norm # pylint: disable=import-outside-toplevel - return norm.logpdf(np.polyval(stats.beta, self.x[sg]) - self.y[sg]) + # Compute the residuals and evaluate the normal log-probability density function + residuals = np.polyval(stats.beta, self.x[sg]) - self.y[sg] + return norm.logpdf(residuals) diff --git a/src/pysubgroup/numeric_target.py b/src/pysubgroup/numeric_target.py index b1ce680..c128d40 100644 --- a/src/pysubgroup/numeric_target.py +++ b/src/pysubgroup/numeric_target.py @@ -1,8 +1,8 @@ """ -Created on 29.09.2017 - -@author: lemmerfn +This module defines the NumericTarget and associated quality functions for subgroup discovery +when the target variable is numeric. """ + import numbers from collections import namedtuple from functools import total_ordering @@ -14,6 +14,12 @@ @total_ordering class NumericTarget: + """Target class for numeric variables in subgroup discovery. + + Represents a target where the variable of interest is numeric, + and computes statistics such as mean, median, standard deviation within subgroups. + """ + statistic_types = ( "size_sg", "size_dataset", @@ -32,21 +38,43 @@ class NumericTarget: ) def __init__(self, target_variable): + """Initialize the NumericTarget with the specified target variable. + + Parameters: + target_variable (str): The name of the numeric target variable. + """ self.target_variable = target_variable def __repr__(self): + """String representation of the NumericTarget.""" return "T: " + str(self.target_variable) def __eq__(self, other): + """Check equality based on the instance dictionary.""" return self.__dict__ == other.__dict__ # pragma: no cover def __lt__(self, other): + """Define less-than comparison for sorting purposes.""" return str(self) < str(other) # pragma: no cover def get_attributes(self): + """Get a list of attribute names used by the target. + + Returns: + list: A list containing the target variable name. + """ return [self.target_variable] def get_base_statistics(self, subgroup, data): + """Compute basic statistics for the subgroup and dataset. + + Parameters: + subgroup: The subgroup for which to compute statistics. + data (pandas.DataFrame): The dataset. + + Returns: + tuple: (instances_dataset, mean_dataset, instances_subgroup, mean_sg) + """ cover_arr, size_sg = ps.get_cover_array_and_size(subgroup, len(data), data) all_target_values = data[self.target_variable] sg_target_values = all_target_values[cover_arr] @@ -57,6 +85,16 @@ def get_base_statistics(self, subgroup, data): return (instances_dataset, mean_dataset, instances_subgroup, mean_sg) def calculate_statistics(self, subgroup, data, cached_statistics=None): + """Calculate various statistics for the subgroup and dataset. + + Parameters: + subgroup: The subgroup for which to calculate statistics. + data (pandas.DataFrame): The dataset. + cached_statistics (dict, optional): Previously computed statistics. + + Returns: + dict: A dictionary containing statistical measures. + """ if cached_statistics is None or not isinstance(cached_statistics, dict): statistics = {} elif all(k in cached_statistics for k in NumericTarget.statistic_types): @@ -88,14 +126,38 @@ def calculate_statistics(self, subgroup, data, cached_statistics=None): def read_median(tpl): + """Extract the median value from a namedtuple. + + Parameters: + tpl (namedtuple): A namedtuple containing a 'median' field. + + Returns: + float: The median value. + """ return tpl.median def read_mean(tpl): + """Extract the mean value from a namedtuple. + + Parameters: + tpl (namedtuple): A namedtuple containing a 'mean' field. + + Returns: + float: The mean value. + """ return tpl.mean def calc_sorted_median(arr): + """Calculate the median of a sorted array. + + Parameters: + arr (numpy.ndarray): A sorted array. + + Returns: + float: The median value. + """ half = (len(arr) - 1) // 2 if len(arr) % 2 == 0: return (arr[half] + arr[half + 1]) / 2 @@ -104,6 +166,19 @@ def calc_sorted_median(arr): class StandardQFNumeric(ps.BoundedInterestingnessMeasure): + """Standard Quality Function for numeric targets. + + This quality function computes interestingness of subgroups based on + the difference between subgroup mean (or median) and dataset mean (or median), + weighted by the size of the subgroup raised to the power of 'a'. + + Attributes: + a (float): Exponent to trade off between subgroup size and difference in means. + invert (bool): Whether to invert the quality function (not used currently). + estimator (str): Strategy for optimistic estimation ('sum', 'max', 'order'). + centroid (str): Central tendency measure ('mean', 'median', 'sorted_median'). + """ + tpl = namedtuple("StandardQFNumeric_parameters", ("size_sg", "mean", "estimate")) mean_tpl = tpl median_tpl = namedtuple( @@ -112,9 +187,34 @@ class StandardQFNumeric(ps.BoundedInterestingnessMeasure): @staticmethod def standard_qf_numeric(a, _, mean_dataset, instances_subgroup, mean_sg): + """Compute the standard quality function for numeric targets. + + Parameters: + a (float): Exponent for weighting the subgroup size. + _ : Unused parameter (size of dataset). + mean_dataset (float): Mean of the target variable in the dataset. + instances_subgroup (int): Number of instances in the subgroup. + mean_sg (float): Mean of the target variable in the subgroup. + + Returns: + float: The computed quality value. + """ return instances_subgroup**a * (mean_sg - mean_dataset) def __init__(self, a, invert=False, estimator="default", centroid="mean"): + """Initialize the StandardQFNumeric. + + Parameters: + a (float): Exponent for weighting the subgroup size. + invert (bool): Whether to invert the quality function (not used currently). + estimator (str): Strategy for optimistic estimation ('sum', 'max', 'order'). + centroid (str): Central tendency measure to use ('mean', 'median', 'sorted_median'). + + Raises: + ValueError: If 'a' is not a number. + ValueError: If 'centroid' is not one of 'mean', 'median', 'sorted_median'. + ValueError: If 'estimator' is invalid. + """ if not isinstance(a, numbers.Number): raise ValueError(f"a is not a number. Received a={a}") self.a = a @@ -178,6 +278,12 @@ def __init__(self, a, invert=False, estimator="default", centroid="mean"): ) def calculate_constant_statistics(self, data, target): + """Calculate statistics that remain constant for the dataset. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + """ data = self.estimator.get_data(data, target) self.all_target_values = data[target.target_variable].to_numpy() target_centroid = self.agg(self.all_target_values) @@ -187,6 +293,17 @@ def calculate_constant_statistics(self, data, target): self.has_constant_statistics = True def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup using the standard quality function. + + Parameters: + subgroup: The subgroup to evaluate. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + float: The computed quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQFNumeric.standard_qf_numeric( @@ -200,6 +317,17 @@ def evaluate(self, subgroup, target, data, statistics=None): def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument + """Calculate statistics specific to the subgroup. + + Parameters: + subgroup: The subgroup for which to calculate statistics. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + namedtuple: Contains size_sg, mean or median, and estimate. + """ cover_arr, sg_size = ps.get_cover_array_and_size( subgroup, len(self.all_target_values), data ) @@ -216,42 +344,85 @@ def calculate_statistics( return self.tpl(sg_size, sg_centroid, estimate) def optimistic_estimate(self, subgroup, target, data, statistics=None): + """Compute the optimistic estimate of the quality function. + + Parameters: + subgroup: The subgroup for which to compute the optimistic estimate. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + float: The optimistic estimate of the quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.estimate class Summation_Estimator: - r"""\ - This estimator calculates the optimistic estimate as a hyppothetical subgroup\ - which contains only instances with value greater than the dataset mean and\ - is of maximal size. + r"""Estimator for optimistic estimate using summation strategy. + + This estimator calculates the optimistic estimate as a hypothetical subgroup + which contains only instances with value greater than the dataset mean and + is of maximal size. + + From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 2 (page 81)]: + .. math:: oe(sg) = \sum_{x \in sg, T(x)>0} (T(sg) - \mu_0) - - From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 2 (page 81)] """ def __init__(self, qf): + """Initialize the Summation_Estimator. + + Parameters: + qf (StandardQFNumeric): Reference to the quality function instance. + """ self.qf = qf self.indices_greater_centroid = None self.target_values_greater_centroid = None def get_data(self, data, target): # pylint: disable=unused-argument + """Prepare data for estimation (no changes for this estimator). + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + + Returns: + pandas.DataFrame: The unmodified dataset. + """ return data def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument + """Calculate constant statistics needed for estimation. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + """ self.indices_greater_centroid = ( self.qf.all_target_values > self.qf.read_centroid(self.qf.dataset_statistics) ) - self.target_values_greater_centroid = ( - self.qf.all_target_values - ) # [self.indices_greater_mean] + self.target_values_greater_centroid = self.qf.all_target_values def get_estimate( self, subgroup, sg_size, sg_centroid, cover_arr, _ ): # pylint: disable=unused-argument + """Compute the optimistic estimate for the subgroup. + + Parameters: + subgroup: The subgroup description. + sg_size (int): Size of the subgroup. + sg_centroid (float): Mean or median of the subgroup. + cover_arr (numpy.ndarray): Boolean array indicating subgroup instances. + _ : Unused parameter. + + Returns: + float: The optimistic estimate. + """ larger_than_centroid = self.target_values_greater_centroid[cover_arr][ self.indices_greater_centroid[cover_arr] ] @@ -263,24 +434,48 @@ def get_estimate( ) class Max_Estimator: - r""" - This estimator calculates the optimistic estimate + r"""Estimator for optimistic estimate using maximum value strategy. + + This estimator calculates the optimistic estimate based on the maximum value + greater than the dataset centroid. + + From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 4 (page 82)]: + .. math:: oe(sg) = n_{>\mu_0}^a (T^{\max}(sg) - \mu_0) - From Florian Lemmerich's Dissertation [section 4.2.2.1, Theorem 4 (page 82)] """ def __init__(self, qf): + """Initialize the Max_Estimator. + + Parameters: + qf (StandardQFNumeric): Reference to the quality function instance. + """ self.qf = qf self.indices_greater_centroid = None self.target_values_greater_centroid = None def get_data(self, data, target): # pylint: disable=unused-argument + """Prepare data for estimation (no changes for this estimator). + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + + Returns: + pandas.DataFrame: The unmodified dataset. + """ return data def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument + """Calculate constant statistics needed for estimation. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + """ self.indices_greater_centroid = ( self.qf.all_target_values > self.qf.read_centroid(self.qf.dataset_statistics) @@ -290,6 +485,18 @@ def calculate_constant_statistics( def get_estimate( self, subgroup, sg_size, sg_centroid, cover_arr, _ ): # pylint: disable=unused-argument + """Compute the optimistic estimate for the subgroup. + + Parameters: + subgroup: The subgroup description. + sg_size (int): Size of the subgroup. + sg_centroid (float): Mean or median of the subgroup. + cover_arr (numpy.ndarray): Boolean array indicating subgroup instances. + _ : Unused parameter. + + Returns: + float: The optimistic estimate. + """ larger_than_centroid = self.target_values_greater_centroid[cover_arr][ self.indices_greater_centroid[cover_arr] ] @@ -303,7 +510,18 @@ def get_estimate( ) class MeanOrdering_Estimator: + """Estimator for optimistic estimate using mean ordering strategy. + + This estimator sorts the target values and computes the optimal subgroup by + considering prefixes of the sorted list. + """ + def __init__(self, qf): + """Initialize the MeanOrdering_Estimator. + + Parameters: + qf (StandardQFNumeric): Reference to the quality function instance. + """ self.qf = qf self.indices_greater_centroid = None self._get_estimate = self.get_estimate_numpy @@ -311,18 +529,33 @@ def __init__(self, qf): self.numba_in_place = False def get_data(self, data, target): + """Prepare data by sorting according to the target variable. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + + Returns: + pandas.DataFrame: The sorted dataset. + """ data.sort_values(target.get_attributes()[0], ascending=False, inplace=True) return data def calculate_constant_statistics( self, data, target ): # pylint: disable=unused-argument + """Set up the estimation function, possibly using Numba for speed. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + """ if not self.use_numba or self.numba_in_place: return try: from numba import njit # pylint: disable=import-outside-toplevel - # print('StandardQf_Numeric: Using numba for speedup') + # Use Numba for speedup except ImportError: # pragma: no cover return @@ -346,6 +579,18 @@ def estimate_numba(values_sg, a, mean_dataset): # pragma: no cover def get_estimate( self, subgroup, sg_size, sg_mean, cover_arr, target_values_sg ): # pylint: disable=unused-argument + """Compute the optimistic estimate for the subgroup. + + Parameters: + subgroup: The subgroup description. + sg_size (int): Size of the subgroup. + sg_mean (float): Mean of the subgroup. + cover_arr (numpy.ndarray): Boolean array indicating subgroup instances. + target_values_sg (numpy.ndarray): Target values in the subgroup. + + Returns: + float: The optimistic estimate. + """ if self.numba_in_place: return self._get_estimate( target_values_sg, self.qf.a, self.qf.dataset_statistics.mean @@ -356,6 +601,16 @@ def get_estimate( ) def get_estimate_numpy(self, values_sg, _, mean_dataset): + """Compute the optimistic estimate using NumPy. + + Parameters: + values_sg (numpy.ndarray): Sorted target values in the subgroup. + _ : Unused parameter. + mean_dataset (float): Mean of the dataset. + + Returns: + float: The optimistic estimate. + """ target_values_cs = np.cumsum(values_sg) sizes = np.arange(1, len(target_values_cs) + 1) mean_values = target_values_cs / sizes @@ -366,6 +621,12 @@ def get_estimate_numpy(self, values_sg, _, mean_dataset): class StandardQFNumericMedian(ps.BoundedInterestingnessMeasure): + """Quality function for numeric targets using median (deprecated). + + Note: + This class is no longer supported. Use StandardQFNumeric with centroid='median' instead. + """ + tpl = namedtuple( "StandardQFNumericMedian_parameters", ( @@ -378,6 +639,7 @@ class StandardQFNumericMedian(ps.BoundedInterestingnessMeasure): def __init__( self, ): + """Initialize the StandardQFNumericMedian (raises NotImplementedError).""" raise NotImplementedError( "StandardQFNumericMedian is no longer supported use " "StandardQFNumeric(centroid='median' instead)" @@ -385,18 +647,36 @@ def __init__( class StandardQFNumericTscore(ps.BoundedInterestingnessMeasure): + """Quality function for numeric targets using T-score.""" + tpl = namedtuple( "StandardQFNumericTscore_parameters", ("size_sg", "mean", "std", "estimate") ) @staticmethod def t_score(mean_dataset, instances_subgroup, mean_sg, std_sg): + """Compute the T-score for the subgroup. + + Parameters: + mean_dataset (float): Mean of the dataset. + instances_subgroup (int): Number of instances in the subgroup. + mean_sg (float): Mean of the subgroup. + std_sg (float): Standard deviation of the subgroup. + + Returns: + float: The computed T-score. + """ if std_sg == 0: return 0 else: return (instances_subgroup**0.5 * (mean_sg - mean_dataset)) / std_sg def __init__(self, invert=False): + """Initialize the StandardQFNumericTscore. + + Parameters: + invert (bool): Whether to invert the quality function (not used currently). + """ self.invert = invert self.required_stat_attrs = ("size_sg", "mean", "std") self.dataset_statistics = None @@ -404,6 +684,12 @@ def __init__(self, invert=False): self.has_constant_statistics = False def calculate_constant_statistics(self, data, target): + """Calculate statistics that remain constant for the dataset. + + Parameters: + data (pandas.DataFrame): The dataset. + target (NumericTarget): The target definition. + """ self.all_target_values = data[target.target_variable].to_numpy() target_mean = np.mean(self.all_target_values) target_std = np.std(self.all_target_values) @@ -414,6 +700,17 @@ def calculate_constant_statistics(self, data, target): self.has_constant_statistics = True def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup using the T-score. + + Parameters: + subgroup: The subgroup to evaluate. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + float: The computed T-score. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) dataset = self.dataset_statistics return StandardQFNumericTscore.t_score( @@ -426,6 +723,17 @@ def evaluate(self, subgroup, target, data, statistics=None): def calculate_statistics( self, subgroup, target, data, statistics=None ): # pylint: disable=unused-argument + """Calculate statistics specific to the subgroup. + + Parameters: + subgroup: The subgroup for which to calculate statistics. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Unused in this implementation. + + Returns: + namedtuple: Contains size_sg, mean, std, and estimate. + """ cover_arr, sg_size = ps.get_cover_array_and_size( subgroup, len(self.all_target_values), data ) @@ -442,17 +750,53 @@ def calculate_statistics( return StandardQFNumericTscore.tpl(sg_size, sg_mean, sg_std, estimate) def optimistic_estimate(self, subgroup, target, data, statistics=None): + """Compute the optimistic estimate of the quality function. + + Parameters: + subgroup: The subgroup for which to compute the optimistic estimate. + target: The target definition. + data: The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + float: The optimistic estimate of the quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) return statistics.estimate class GeneralizationAware_StandardQFNumeric(ps.GeneralizationAwareQF_stats): + """Generalization-Aware Standard Quality Function for Numeric Targets. + + Extends StandardQFNumeric to consider generalizations during subgroup discovery, + providing methods for optimistic estimates and aggregate statistics. + """ + def __init__(self, a, invert=False, estimator="default", centroid="mean"): + """Initialize the GeneralizationAware_StandardQFNumeric. + + Parameters: + a (float): Exponent for weighting the subgroup size. + invert (bool): Whether to invert the quality function (not used currently). + estimator (str): Strategy for optimistic estimation. + centroid (str): Central tendency measure ('mean', 'median', 'sorted_median'). + """ super().__init__( StandardQFNumeric(a, invert=invert, estimator=estimator, centroid=centroid) ) def evaluate(self, subgroup, target, data, statistics=None): + """Evaluate the quality of the subgroup considering generalizations. + + Parameters: + subgroup: The subgroup to evaluate. + target (NumericTarget): The target definition. + data (pandas.DataFrame): The dataset. + statistics (any, optional): Previously computed statistics. + + Returns: + float: The computed quality value. + """ statistics = self.ensure_statistics(subgroup, target, data, statistics) sg_stats = statistics.subgroup_stats general_stats = statistics.generalisation_stats @@ -464,6 +808,15 @@ def evaluate(self, subgroup, target, data, statistics=None): ) def aggregate_statistics(self, stats_subgroup, list_of_pairs): + """Aggregate statistics from generalizations. + + Parameters: + stats_subgroup: Statistics of the current subgroup. + list_of_pairs: List of (stats, agg_stats) tuples from generalizations. + + Returns: + The aggregated statistics. + """ read_centroid = self.qf.read_centroid if len(list_of_pairs) == 0: return stats_subgroup diff --git a/src/pysubgroup/refinement_operator.py b/src/pysubgroup/refinement_operator.py index 6627e42..a9a6c54 100644 --- a/src/pysubgroup/refinement_operator.py +++ b/src/pysubgroup/refinement_operator.py @@ -3,39 +3,86 @@ class RefinementOperator: + """Base class for refinement operators.""" + pass class StaticSpecializationOperator: + """Refinement operator for static specialization. + + This operator specializes subgroups by adding selectors in a predefined order, + ensuring that each attribute is used only once in a subgroup description. + """ + def __init__(self, selectors): + """Initialize the StaticSpecializationOperator. + + Parameters: + selectors: List of selectors to define the search space. + """ search_space_dict = defaultdict(list) for selector in selectors: + # Group selectors by their attribute name search_space_dict[selector.attribute_name].append(selector) self.search_space = list(search_space_dict.values()) + # Map attribute names to their index in the search space self.search_space_index = { key: i for i, key in enumerate(search_space_dict.keys()) } def refinements(self, subgroup): + """Generate refinements of the given subgroup. + + Parameters: + subgroup: The subgroup to refine. + + Returns: + A generator of refined subgroups. + """ if subgroup.depth > 0: + # Get the index of the attribute of the last selector in the subgroup index_of_last = self.search_space_index[ subgroup._selectors[-1].attribute_name ] + # Generate selectors for attributes that come after the last one used new_selectors = chain.from_iterable(self.search_space[index_of_last + 1 :]) else: + # If subgroup is empty, use all selectors new_selectors = chain.from_iterable(self.search_space) return (subgroup & sel for sel in new_selectors) class StaticGeneralizationOperator: + """Refinement operator for static generalization. + + This operator generalizes subgroups by adding selectors from a predefined list, + ensuring that each selector is used in a specific order. + """ + def __init__(self, selectors): + """Initialize the StaticGeneralizationOperator. + + Parameters: + selectors: List of selectors to define the search space. + """ self.search_space = selectors def refinements(self, sG): + """Generate refinements of the given subgroup. + + Parameters: + sG: The subgroup to refine. + + Returns: + A generator of refined subgroups. + """ + # Find the index of the last selector used in the subgroup index_of_last_selector = min( self.search_space.index(sG._selectors[-1]), len(self.search_space) - 1 ) + # Select the selectors that come after the last used selector new_selectors = self.search_space[index_of_last_selector + 1 :] return (sG | sel for sel in new_selectors) diff --git a/src/pysubgroup/representations.py b/src/pysubgroup/representations.py index 876e7c4..a9e819e 100644 --- a/src/pysubgroup/representations.py +++ b/src/pysubgroup/representations.py @@ -4,41 +4,79 @@ class RepresentationBase: + """Base class for different representation strategies. + + Provides methods to patch selectors and manage class-level patches. + Can be used as a context manager to ensure patches are applied and removed properly. + """ + def __init__(self, new_conjunction, selectors_to_patch): + """Initialize the RepresentationBase. + + Parameters: + new_conjunction: The new Conjunction class to use. + selectors_to_patch: List of selectors to patch. + """ self._new_conjunction = new_conjunction self.previous_conjunction = None self.selectors_to_patch = selectors_to_patch def patch_all_selectors(self): + """Patch all selectors in the selectors_to_patch list.""" for sel in self.selectors_to_patch: self.patch_selector(sel) def patch_selector(self, sel): # pragma: no cover + """Patch a single selector. + + This method should be implemented by subclasses. + """ raise NotImplementedError() # pragma: no cover def patch_classes(self): + """Patch the required classes. + + Can be overridden by subclasses to patch class-level attributes or methods. + """ pass def undo_patch_classes(self): + """Undo patches applied to classes. + + Can be overridden by subclasses to remove class-level patches. + """ pass def __enter__(self): + """Enter the runtime context and apply patches.""" self.patch_classes() self.patch_all_selectors() return self def __exit__(self, *args): + """Exit the runtime context and undo patches.""" self.undo_patch_classes() class BitSet_Conjunction(Conjunction): + """Conjunction subclass that uses bitsets for representation. + + Provides efficient computation of the conjunction using numpy boolean arrays. + """ + n_instances = 0 def __init__(self, *args, **kwargs): + """Initialize the BitSet_Conjunction and compute its representation.""" super().__init__(*args, **kwargs) self.representation = self.compute_representation() def compute_representation(self): + """Compute the bitset representation of the conjunction. + + Returns: + Numpy boolean array representing the instances covered by the conjunction. + """ # empty description ==> return a list of all '1's if not self._selectors: return np.full(BitSet_Conjunction.n_instances, True, dtype=bool) @@ -47,9 +85,15 @@ def compute_representation(self): @property def size_sg(self): + """Size of the subgroup represented by the conjunction.""" return np.count_nonzero(self.representation) def append_and(self, to_append): + """Append a selector using logical AND and update the representation. + + Parameters: + to_append: Selector to append. + """ super().append_and(to_append) self.representation = np.logical_and( self.representation, to_append.representation @@ -57,16 +101,28 @@ def append_and(self, to_append): @property def __array_interface__(self): + """Provide the array interface of the representation for compatibility.""" return self.representation.__array_interface__ class BitSet_Disjunction(Disjunction): + """Disjunction subclass that uses bitsets for representation. + + Provides efficient computation of the disjunction using numpy boolean arrays. + """ + def __init__(self, *args, **kwargs): + """Initialize the BitSet_Disjunction and compute its representation.""" super().__init__(*args, **kwargs) self.representation = self.compute_representation() def compute_representation(self): - # empty description ==> return a list of all '1's + """Compute the bitset representation of the disjunction. + + Returns: + Numpy boolean array representing the instances covered by the disjunction. + """ + # empty description ==> return a list of all '0's if not self._selectors: return np.full(BitSet_Conjunction.n_instances, False, dtype=bool) # non-empty description @@ -74,9 +130,15 @@ def compute_representation(self): @property def size_sg(self): + """Size of the subgroup represented by the disjunction.""" return np.count_nonzero(self.representation) def append_or(self, to_append): + """Append a selector using logical OR and update the representation. + + Parameters: + to_append: Selector to append. + """ super().append_or(to_append) self.representation = np.logical_or( self.representation, to_append.representation @@ -84,36 +146,59 @@ def append_or(self, to_append): @property def __array_interface__(self): + """Provide the array interface of the representation for compatibility.""" return self.representation.__array_interface__ class BitSetRepresentation(RepresentationBase): + """Representation class that uses bitsets for selectors and conjunctions.""" + Conjunction = BitSet_Conjunction Disjunction = BitSet_Disjunction def __init__(self, df, selectors_to_patch): + """Initialize the BitSetRepresentation. + + Parameters: + df: pandas DataFrame containing the data. + selectors_to_patch: List of selectors to patch. + """ self.df = df super().__init__(BitSet_Conjunction, selectors_to_patch) def patch_selector(self, sel): + """Patch a selector by computing its bitset representation. + + Parameters: + sel: Selector to patch. + """ sel.representation = sel.covers(self.df) sel.size_sg = np.count_nonzero(sel.representation) def patch_classes(self): + """Patch class-level attributes before entering the context.""" BitSet_Conjunction.n_instances = len(self.df) super().patch_classes() class Set_Conjunction(Conjunction): + """Conjunction subclass that uses sets for representation.""" + all_set = set() def __init__(self, *args, **kwargs): + """Initialize the Set_Conjunction and compute its representation.""" super().__init__(*args, **kwargs) self.representation = self.compute_representation() self.arr_for_interface = np.array(list(self.representation), dtype=int) def compute_representation(self): - # empty description ==> return a list of all '1's + """Compute the set representation of the conjunction. + + Returns: + Set of indices representing the instances covered by the conjunction. + """ + # empty description ==> return the set of all indices if not self._selectors: return Set_Conjunction.all_set # non-empty description @@ -121,43 +206,72 @@ def compute_representation(self): @property def size_sg(self): + """Size of the subgroup represented by the conjunction.""" return len(self.representation) def append_and(self, to_append): + """Append a selector using logical AND and update the representation. + + Parameters: + to_append: Selector to append. + """ super().append_and(to_append) self.representation = self.representation.intersection(to_append.representation) self.arr_for_interface = np.array(list(self.representation), dtype=int) @property def __array_interface__(self): + """Provide the array interface of the representation for compatibility.""" return self.arr_for_interface.__array_interface__ # pylint: disable=no-member class SetRepresentation(RepresentationBase): + """Representation class that uses sets for selectors and conjunctions.""" + Conjunction = Set_Conjunction def __init__(self, df, selectors_to_patch): + """Initialize the SetRepresentation. + + Parameters: + df: pandas DataFrame containing the data. + selectors_to_patch: List of selectors to patch. + """ self.df = df super().__init__(Set_Conjunction, selectors_to_patch) def patch_selector(self, sel): + """Patch a selector by computing its set representation. + + Parameters: + sel: Selector to patch. + """ sel.representation = set(*np.nonzero(sel.covers(self.df))) sel.size_sg = len(sel.representation) def patch_classes(self): + """Patch class-level attributes before entering the context.""" Set_Conjunction.all_set = set(self.df.index) super().patch_classes() class NumpySet_Conjunction(Conjunction): + """Conjunction subclass that uses numpy arrays for set representation.""" + all_set = None def __init__(self, *args, **kwargs): + """Initialize the NumpySet_Conjunction and compute its representation.""" super().__init__(*args, **kwargs) self.representation = self.compute_representation() def compute_representation(self): - # empty description ==> return a list of all '1's + """Compute the numpy array representation of the conjunction. + + Returns: + Numpy array of indices representing the instances covered by the conjunction. + """ + # empty description ==> return an array of all indices if not self._selectors: return NumpySet_Conjunction.all_set start = self._selectors[0].representation @@ -167,31 +281,51 @@ def compute_representation(self): @property def size_sg(self): + """Size of the subgroup represented by the conjunction.""" return len(self.representation) def append_and(self, to_append): + """Append a selector using logical AND and update the representation. + + Parameters: + to_append: Selector to append. + """ super().append_and(to_append) - # self._selectors.append(to_append) self.representation = np.intersect1d( self.representation, to_append.representation, True ) @property def __array_interface__(self): + """Provide the array interface of the representation for compatibility.""" return self.representation.__array_interface__ class NumpySetRepresentation(RepresentationBase): + """Representation class that uses numpy arrays for selectors and conjunctions.""" + Conjunction = NumpySet_Conjunction def __init__(self, df, selectors_to_patch): + """Initialize the NumpySetRepresentation. + + Parameters: + df: pandas DataFrame containing the data. + selectors_to_patch: List of selectors to patch. + """ self.df = df super().__init__(NumpySet_Conjunction, selectors_to_patch) def patch_selector(self, sel): + """Patch a selector by computing its numpy array representation. + + Parameters: + sel: Selector to patch. + """ sel.representation = np.nonzero(sel.covers(self.df))[0] sel.size_sg = len(sel.representation) def patch_classes(self): + """Patch class-level attributes before entering the context.""" NumpySet_Conjunction.all_set = np.arange(len(self.df)) super().patch_classes() diff --git a/src/pysubgroup/subgroup_description.py b/src/pysubgroup/subgroup_description.py index c5e61b3..fcace2d 100644 --- a/src/pysubgroup/subgroup_description.py +++ b/src/pysubgroup/subgroup_description.py @@ -16,12 +16,16 @@ @total_ordering class SelectorBase(ABC): + """Base class for selectors, ensuring each selector instance is unique.""" + # selector cache __refs__ = weakref.WeakSet() def __new__(cls, *args, **kwargs): - """Ensures that each selector only exists once.""" + """Create a new SelectorBase instance, ensuring uniqueness. + Ensures that each selector only exists once by caching instances. + """ # create temporary selector tmp = super().__new__(cls) tmp.set_descriptions(*args, **kwargs) @@ -45,33 +49,49 @@ def __new__(cls, *args, **kwargs): return tmp # pragma: no cover def __getnewargs_ex__(self): # pylint: disable=invalid-getnewargs-ex-returned + """Return arguments necessary to recreate the object during unpickling.""" tmp_args = self.__new_args__ del self.__new_args__ return tmp_args def __init__(self): + """Initialize the SelectorBase and add it to the cache.""" # add selector to cache # TODO: why not do this in `__new__`, # then it would be all together in one function? SelectorBase.__refs__.add(self) def __eq__(self, other): + """Check equality based on the string representation.""" if other is None: # pragma: no cover return False return repr(self) == repr(other) def __lt__(self, other): + """Define less-than comparison based on the string representation.""" return repr(self) < repr(other) def __hash__(self): + """Return the hash value.""" return self._hash # pylint: disable=no-member @abstractmethod def set_descriptions(self, *args, **kwargs): + """Set the descriptions for the selector.""" pass # pragma: no cover def get_cover_array_and_size(subgroup, data_len=None, data=None): + """Compute the cover array and its size for a given subgroup. + + Parameters: + subgroup: The subgroup for which to compute the cover array and size. + data_len: Optional length of the data. + data: Optional data. + + Returns: + Tuple of (cover array, size). + """ if hasattr(subgroup, "representation"): cover_arr = subgroup size = subgroup.size_sg @@ -105,6 +125,16 @@ def get_cover_array_and_size(subgroup, data_len=None, data=None): def get_size(subgroup, data_len=None, data=None): + """Compute the size of the cover array for a given subgroup. + + Parameters: + subgroup: The subgroup for which to compute the size. + data_len: Optional length of the data. + data: Optional data. + + Returns: + Size of the cover array. + """ if hasattr(subgroup, "representation"): size = subgroup.size_sg elif isinstance(subgroup, slice): @@ -134,6 +164,15 @@ def get_size(subgroup, data_len=None, data=None): def pandas_sparse_eq(col, value): + """Compare a pandas sparse column to a value. + + Parameters: + col: pandas Series with SparseArray data. + value: The value to compare with. + + Returns: + A pandas SparseArray of booleans indicating where col equals value. + """ import pandas as pd # pylint: disable=import-outside-toplevel from pandas._libs.sparse import ( IntIndex, # pylint: disable=import-outside-toplevel, no-name-in-module @@ -152,6 +191,8 @@ def pandas_sparse_eq(col, value): class EqualitySelector(SelectorBase): + """Selector that checks for equality with a specific value.""" + def __init__(self, attribute_name, attribute_value, selector_name=None): if attribute_name is None: raise TypeError() @@ -170,21 +211,25 @@ def __init__(self, attribute_name, attribute_value, selector_name=None): @property def attribute_name(self): + """Name of the attribute.""" return self._attribute_name @property def attribute_value(self): + """Value of the attribute to compare for equality.""" return self._attribute_value def set_descriptions( self, attribute_name, attribute_value, selector_name=None ): # pylint: disable=arguments-differ + """Set the descriptions (query, string, hash) for the selector.""" self._hash, self._query, self._string = EqualitySelector.compute_descriptions( attribute_name, attribute_value, selector_name=selector_name ) @classmethod def compute_descriptions(cls, attribute_name, attribute_value, selector_name): + """Compute the descriptions (hash, query, string) for the selector.""" if isinstance(attribute_value, (str, bytes)): query = str(attribute_name) + "==" + "'" + str(attribute_value) + "'" elif attribute_value is None: @@ -201,9 +246,18 @@ def compute_descriptions(cls, attribute_name, attribute_value, selector_name): return (hash_value, query, string_) def __repr__(self): + """Representation of the selector as a query string.""" return self._query def covers(self, data): + """Determine which instances in data are covered by this selector. + + Parameters: + data: pandas DataFrame containing the data. + + Returns: + A boolean array indicating which instances are covered. + """ import pandas as pd # pylint: disable=import-outside-toplevel column = data[self.attribute_name] @@ -218,14 +272,24 @@ def covers(self, data): return row == self.attribute_value def __str__(self, open_brackets="", closing_brackets=""): + """String representation of the selector, optionally with brackets.""" return open_brackets + self._string + closing_brackets @property def selectors(self): + """Return the selector itself as a tuple (for compatibility).""" return (self,) @staticmethod def from_str(s): + """Create an EqualitySelector from a string representation. + + Parameters: + s: String representation of the selector. + + Returns: + An EqualitySelector instance. + """ s = s.strip() attribute_name, attribute_value = s.split("==") if attribute_value[0] == "'" and attribute_value[-1] == "'": @@ -247,6 +311,8 @@ def from_str(s): class NegatedSelector(SelectorBase): + """Selector that negates another selector.""" + def __init__(self, selector): # TODO: this is redundant due to `__new__` and `set_descriptions` self._selector = selector @@ -255,29 +321,44 @@ def __init__(self, selector): super().__init__() def covers(self, data_instance): + """Determine which instances are not covered by the underlying selector. + + Parameters: + data_instance: pandas DataFrame containing the data. + + Returns: + A boolean array indicating which instances are not covered. + """ return np.logical_not(self._selector.covers(data_instance)) def __repr__(self): + """Representation of the negated selector as a query string.""" return self._query def __str__(self, open_brackets="", closing_brackets=""): + """String representation of the negated selector.""" return "NOT " + self._selector.__str__(open_brackets, closing_brackets) def set_descriptions(self, selector): # pylint: disable=arguments-differ + """Set the descriptions (query, hash) for the negated selector.""" self._query = "(not " + repr(selector) + ")" self._hash = hash(repr(self)) @property def attribute_name(self): + """Name of the attribute.""" return self._selector.attribute_name @property def selectors(self): + """Return the selector itself as a tuple (for compatibility).""" return (self,) # Including the lower bound, excluding the upper_bound class IntervalSelector(SelectorBase): + """Selector that checks if a value is within an interval.""" + def __init__(self, attribute_name, lower_bound, upper_bound, selector_name=None): assert lower_bound < upper_bound # TODO: this is redundant due to `__new__` and `set_descriptions` @@ -291,33 +372,47 @@ def __init__(self, attribute_name, lower_bound, upper_bound, selector_name=None) @property def attribute_name(self): + """Name of the attribute.""" return self._attribute_name @property def lower_bound(self): + """Lower bound of the interval (inclusive).""" return self._lower_bound @property def upper_bound(self): + """Upper bound of the interval (exclusive).""" return self._upper_bound def covers(self, data_instance): + """Determine which instances are covered by this interval selector. + + Parameters: + data_instance: pandas DataFrame containing the data. + + Returns: + A boolean array indicating which instances are within the interval. + """ val = data_instance[self.attribute_name].to_numpy() return np.logical_and((val >= self.lower_bound), (val < self.upper_bound)) def __repr__(self): + """Representation of the interval selector as a query string.""" return self._query def __hash__(self): return self._hash def __str__(self): + """String representation of the interval selector.""" return self._string @classmethod def compute_descriptions( cls, attribute_name, lower_bound, upper_bound, selector_name=None ): + """Compute the descriptions (hash, query, string) for the interval selector.""" if selector_name is None: _string = cls.compute_string( attribute_name, lower_bound, upper_bound, rounding_digits=2 @@ -333,12 +428,14 @@ def compute_descriptions( def set_descriptions( self, attribute_name, lower_bound, upper_bound, selector_name=None ): # pylint: disable=arguments-differ + """Set the descriptions (hash, query, string) for the interval selector.""" self._hash, self._query, self._string = IntervalSelector.compute_descriptions( attribute_name, lower_bound, upper_bound, selector_name=selector_name ) @classmethod def compute_string(cls, attribute_name, lower_bound, upper_bound, rounding_digits): + """Compute the string representation of the interval selector.""" if rounding_digits is None: formatter = "{}" else: @@ -362,6 +459,14 @@ def compute_string(cls, attribute_name, lower_bound, upper_bound, rounding_digit @staticmethod def from_str(s): + """Create an IntervalSelector from a string representation. + + Parameters: + s: String representation of the interval selector. + + Returns: + An IntervalSelector instance. + """ s = s.strip() if s.endswith(" = anything"): return IntervalSelector( @@ -394,10 +499,22 @@ def from_str(s): @property def selectors(self): + """Return the selector itself as a tuple (for compatibility).""" return (self,) def create_selectors(data, nbins=5, intervals_only=True, ignore=None): + """Create a list of selectors for all attributes in the data. + + Parameters: + data: pandas DataFrame containing the data. + nbins: Number of bins to use for numeric attributes. + intervals_only: If True, only create interval selectors for numeric attributes. + ignore: List of attribute names to ignore. + + Returns: + List of selectors. + """ if ignore is None: ignore = [] sels = create_nominal_selectors(data, ignore) @@ -406,6 +523,15 @@ def create_selectors(data, nbins=5, intervals_only=True, ignore=None): def create_nominal_selectors(data, ignore=None): + """Create equality selectors for nominal attributes. + + Parameters: + data: pandas DataFrame containing the data. + ignore: List of attribute names to ignore. + + Returns: + List of EqualitySelector instances. + """ if ignore is None: ignore = [] nominal_selectors = [] @@ -425,6 +551,16 @@ def create_nominal_selectors(data, ignore=None): def create_nominal_selectors_for_attribute(data, attribute_name, dtypes=None): + """Create equality selectors for a nominal attribute. + + Parameters: + data: pandas DataFrame containing the data. + attribute_name: Name of the attribute. + dtypes: Data types of the data columns. + + Returns: + List of EqualitySelector instances for the attribute. + """ import pandas as pd # pylint: disable=import-outside-toplevel nominal_selectors = [] @@ -442,6 +578,18 @@ def create_nominal_selectors_for_attribute(data, attribute_name, dtypes=None): def create_numeric_selectors( data, nbins=5, intervals_only=True, weighting_attribute=None, ignore=None ): + """Create selectors for numeric attributes. + + Parameters: + data: pandas DataFrame containing the data. + nbins: Number of bins to use for discretization. + intervals_only: If True, only create interval selectors. + weighting_attribute: Optional attribute for weighting. + ignore: List of attribute names to ignore. + + Returns: + List of numeric selectors. + """ if ignore is None: ignore = [] # pragma: no cover numeric_selectors = [] @@ -461,6 +609,18 @@ def create_numeric_selectors( def create_numeric_selectors_for_attribute( data, attr_name, nbins=5, intervals_only=True, weighting_attribute=None ): + """Create selectors for a numeric attribute. + + Parameters: + data: pandas DataFrame containing the data. + attr_name: Name of the attribute. + nbins: Number of bins to use for discretization. + intervals_only: If True, only create interval selectors. + weighting_attribute: Optional attribute for weighting. + + Returns: + List of numeric selectors for the attribute. + """ import pandas as pd # pylint: disable=import-outside-toplevel numeric_selectors = [] @@ -504,6 +664,15 @@ def create_numeric_selectors_for_attribute( def remove_target_attributes(selectors, target): + """Remove selectors that are based on target attributes. + + Parameters: + selectors: List of selectors. + target: The target object with get_attributes method. + + Returns: + List of selectors not based on target attributes. + """ return [ sel for sel in selectors if sel.attribute_name not in target.get_attributes() ] @@ -513,31 +682,40 @@ def remove_target_attributes(selectors, target): # Boolean expressions ############## class BooleanExpressionBase(ABC): + """Base class for boolean expressions (conjunctions and disjunctions).""" + def __or__(self, other): + """Override the '|' operator to create a new expression with logical OR.""" tmp = copy.copy(self) tmp.append_or(other) return tmp def __and__(self, other): + """Override the '&' operator to create a new expression with logical AND.""" tmp = copy.copy(self) tmp.append_and(other) return tmp @abstractmethod def append_and(self, to_append): + """Append a selector or expression using logical AND.""" pass @abstractmethod def append_or(self, to_append): + """Append a selector or expression using logical OR.""" pass @abstractmethod def __copy__(self): + """Create a copy of the boolean expression.""" pass @total_ordering class Conjunction(BooleanExpressionBase): + """Conjunction of selectors (logical AND).""" + def __init__(self, selectors): self._repr = None self._hash = None @@ -548,6 +726,14 @@ def __init__(self, selectors): self._selectors = [selectors] def covers(self, instance): + """Determine which instances are covered by the conjunction. + + Parameters: + instance: pandas DataFrame containing the data. + + Returns: + A boolean array indicating which instances are covered. + """ # empty description ==> return a list of all '1's if not self._selectors: return np.full(len(instance), True, dtype=bool) @@ -555,15 +741,18 @@ def covers(self, instance): return np.all([sel.covers(instance) for sel in self._selectors], axis=0) def __len__(self): + """Return the number of selectors in the conjunction.""" return len(self._selectors) def __str__(self, open_brackets="", closing_brackets="", and_term=" AND "): + """String representation of the conjunction.""" if not self._selectors: return "Dataset" attrs = sorted(str(sel) for sel in self._selectors) return "".join((open_brackets, and_term.join(attrs), closing_brackets)) def __repr__(self): + """Representation of the conjunction.""" if self._repr is not None: return self._repr else: @@ -571,12 +760,15 @@ def __repr__(self): return self._repr def __eq__(self, other): + """Check equality based on the string representation.""" return repr(self) == repr(other) def __lt__(self, other): + """Define less-than comparison based on the string representation.""" return repr(self) < repr(other) def __hash__(self): + """Return the hash value.""" if self._hash is not None: return self._hash else: @@ -584,19 +776,23 @@ def __hash__(self): return self._hash def _compute_repr(self): + """Compute the representation of the conjunction.""" if not self._selectors: return "True" reprs = sorted(repr(sel) for sel in self._selectors) return "(" + " and ".join(reprs) + ")" def _compute_hash(self): + """Compute the hash of the conjunction.""" return hash(repr(self)) def _invalidate_representations(self): + """Invalidate cached representations.""" self._repr = None self._hash = None def append_and(self, to_append): + """Append a selector or conjunction using logical AND.""" if isinstance(to_append, SelectorBase): self._selectors.append(to_append) elif isinstance(to_append, Conjunction): @@ -606,19 +802,23 @@ def append_and(self, to_append): self._invalidate_representations() def append_or(self, to_append): + """Append a selector or expression using logical OR (not supported).""" raise RuntimeError( "Or operations are not supported by a pure Conjunction. Consider using DNF." ) def pop_and(self): + """Remove and return the last selector added using AND.""" return self._selectors.pop() def pop_or(self): + """Pop operation for OR is not supported in Conjunction.""" raise RuntimeError( "Or operations are not supported by a pure Conjunction. Consider using DNF." ) def __copy__(self): + """Create a copy of the conjunction.""" cls = self.__class__ result = cls.__new__(cls) result.__dict__.update(self.__dict__) @@ -627,14 +827,24 @@ def __copy__(self): @property def depth(self): + """Return the number of selectors in the conjunction.""" return len(self._selectors) @property def selectors(self): + """Return the selectors in the conjunction as a tuple.""" return tuple(chain.from_iterable(sel.selectors for sel in self._selectors)) @staticmethod def from_str(s): + """Create a Conjunction from a string representation. + + Parameters: + s: String representation of the conjunction. + + Returns: + A Conjunction instance. + """ if s.strip() == "Dataset": return Conjunction([]) selector_strings = s.split(" AND ") @@ -650,6 +860,8 @@ def from_str(s): @total_ordering class Disjunction(BooleanExpressionBase): + """Disjunction of selectors (logical OR).""" + def __init__(self, selectors=None): if isinstance(selectors, (list, tuple)): self._selectors = selectors @@ -659,43 +871,59 @@ def __init__(self, selectors=None): self._selectors = [selectors] def covers(self, instance): - # empty description ==> return a list of all '1's + """Determine which instances are covered by the disjunction. + + Parameters: + instance: pandas DataFrame containing the data. + + Returns: + A boolean array indicating which instances are covered. + """ + # empty description ==> return a list of all '0's if not self._selectors: return np.full(len(instance), False, dtype=bool) # non-empty description return np.any([sel.covers(instance) for sel in self._selectors], axis=0) def __len__(self): + """Return the number of selectors in the disjunction.""" return len(self._selectors) def __str__(self, open_brackets="", closing_brackets="", or_term=" OR "): + """String representation of the disjunction.""" if not self._selectors: return "Empty" # pragma: no cover attrs = sorted(str(sel) for sel in self._selectors) return "".join((open_brackets, or_term.join(attrs), closing_brackets)) def __repr__(self): + """Representation of the disjunction.""" if not self._selectors: return "True" reprs = sorted(repr(sel) for sel in self._selectors) return "".join(("(", " or ".join(reprs), ")")) def __eq__(self, other): + """Check equality based on the string representation.""" return repr(self) == repr(other) def __lt__(self, other): + """Define less-than comparison based on the string representation.""" return repr(self) < repr(other) def __hash__(self): + """Return the hash value.""" return hash(repr(self)) def append_and(self, to_append): + """Append a selector or expression using logical AND (not supported).""" raise RuntimeError( "And operations are not supported by a pure Conjunction. " "Consider using DNF." ) def append_or(self, to_append): + """Append a selector or disjunction using logical OR.""" if isinstance(to_append, Disjunction): self._selectors.extend(to_append.selectors) return @@ -705,6 +933,7 @@ def append_or(self, to_append): self._selectors.append(to_append) def __copy__(self): + """Create a copy of the disjunction.""" cls = self.__class__ result = cls.__new__(cls) result.__dict__.update(self.__dict__) @@ -713,10 +942,13 @@ def __copy__(self): @property def selectors(self): + """Return the selectors in the disjunction as a tuple.""" return tuple(chain.from_iterable(sel.selectors for sel in self._selectors)) class DNF(Disjunction): + """Disjunctive Normal Form expression.""" + def __init__(self, selectors=None): if selectors is None: selectors = [] @@ -725,6 +957,7 @@ def __init__(self, selectors=None): @staticmethod def _ensure_pure_conjunction(to_append): + """Ensure that the appended expression is a pure conjunction.""" if isinstance(to_append, Conjunction): return to_append elif isinstance(to_append, SelectorBase): @@ -739,6 +972,7 @@ def _ensure_pure_conjunction(to_append): ) # pragma: no cover def append_or(self, to_append): + """Append a selector or conjunction using logical OR.""" if isinstance(to_append, ps.Disjunction): to_append = to_append.selectors try: @@ -749,6 +983,7 @@ def append_or(self, to_append): super().append_or(conjunctions) def append_and(self, to_append): + """Append a selector using logical AND to all conjunctions.""" if isinstance(to_append, Disjunction): raise NotImplementedError( "Appeding a disjunction to DNF is not implemented" @@ -761,6 +996,7 @@ def append_and(self, to_append): self._selectors.append(conj) def pop_and(self): + """Remove and return the last selector added using AND from all conjunctions.""" out_list = [s.pop_and() for s in self._selectors] return_val = out_list[0] if all(x == return_val for x in out_list): diff --git a/src/pysubgroup/utils.py b/src/pysubgroup/utils.py index 4be5fc9..6560158 100644 --- a/src/pysubgroup/utils.py +++ b/src/pysubgroup/utils.py @@ -16,6 +16,18 @@ def str_to_bool(s): + """ + Converts a string representation of a boolean value to a boolean type. + + Parameters: + s (str): The string to convert (e.g., 'true', 'False', '1', '0'). + + Returns: + bool: The boolean value represented by the string. + + Raises: + ValueError: If the string does not represent a valid boolean value. + """ s = s.lower() if s in ["y", "yes", "t", "true", "on", "1"]: return True @@ -26,6 +38,16 @@ def str_to_bool(s): def minimum_required_quality(result, task): + """ + Determines the minimum quality required for a subgroup to be considered for inclusion in the result set. + + Parameters: + result (list): The current list of subgroups (heap). + task (SubgroupDiscoveryTask): The task containing parameters like result_set_size and min_quality. + + Returns: + float: The minimum required quality for a subgroup to be added to the result set. + """ if len(result) < task.result_set_size: return task.min_quality else: @@ -33,16 +55,37 @@ def minimum_required_quality(result, task): def prepare_subgroup_discovery_result(result, task): + """ + Filters and sorts the result set of subgroups according to the task parameters. + + Parameters: + result (list): The list of subgroups (heap). + task (SubgroupDiscoveryTask): The task containing parameters like result_set_size and min_quality. + + Returns: + list: The filtered and sorted list of subgroups. + """ result_filtered = [tpl for tpl in result if tpl[0] > task.min_quality] result_filtered.sort(reverse=True) result_filtered = result_filtered[: task.result_set_size] return result_filtered -# Returns the cutpoints for discretization def equal_frequency_discretization( data, attribute_name, nbins=5, weighting_attribute=None ): + """ + Discretizes a numerical attribute into bins with approximately equal frequency. + + Parameters: + data (DataFrame): The dataset containing the attribute to discretize. + attribute_name (str): The name of the attribute to discretize. + nbins (int): The number of bins to create. + weighting_attribute (str, optional): An optional attribute to weight the instances. + + Returns: + list: A list of cutpoints defining the bins. + """ import pandas as pd # pylint: disable=import-outside-toplevel cutpoints = [] @@ -63,7 +106,6 @@ def equal_frequency_discretization( if val not in cutpoints: break position += 1 - # print (sorted_data [position]) if val not in cutpoints: cutpoints.append(val) else: @@ -88,10 +130,29 @@ def equal_frequency_discretization( def conditional_invert(val, invert): + """ + Conditionally inverts a value based on a boolean flag. + + Parameters: + val (float): The value to potentially invert. + invert (bool): If True, the value is inverted. + + Returns: + float: The (possibly inverted) value. + """ return -2 * (invert - 0.5) * val def results_df_autoround(df): + """ + Automatically rounds numerical columns in a DataFrame for better readability. + + Parameters: + df (DataFrame): The DataFrame containing the results. + + Returns: + DataFrame: The DataFrame with rounded numerical values. + """ return df.round( { "quality": 3, @@ -126,31 +187,98 @@ def results_df_autoround(df): def perc_formatter(x): + """ + Formats a float as a percentage string with one decimal place. + + Parameters: + x (float): The value to format. + + Returns: + str: The formatted percentage string. + """ return "{0:.1f}%".format(x * 100) def float_formatter(x, digits=2): + """ + Formats a float to a specified number of decimal places. + + Parameters: + x (float): The value to format. + digits (int): The number of decimal places. + + Returns: + str: The formatted string. + """ return ("{0:." + str(digits) + "f}").format(x) def is_categorical_attribute(data, attribute_name): + """ + Determines if an attribute in the dataset is categorical. + + Parameters: + data (DataFrame): The dataset. + attribute_name (str): The name of the attribute. + + Returns: + bool: True if the attribute is categorical, False otherwise. + """ return attribute_name in data.select_dtypes(exclude=["number"]).columns.values def is_numerical_attribute(data, attribute_name): + """ + Determines if an attribute in the dataset is numerical. + + Parameters: + data (DataFrame): The dataset. + attribute_name (str): The name of the attribute. + + Returns: + bool: True if the attribute is numerical, False otherwise. + """ return attribute_name in data.select_dtypes(include=["number"]).columns.values def remove_selectors_with_attributes(selector_list, attribute_list): + """ + Removes selectors that are based on specified attributes. + + Parameters: + selector_list (list): The list of selectors to filter. + attribute_list (list): The list of attribute names to remove selectors for. + + Returns: + list: The filtered list of selectors. + """ return [x for x in selector_list if x.attributeName not in attribute_list] def derive_effective_sample_size(weights): + """ + Calculates the effective sample size for weighted data. + + Parameters: + weights (array-like): The weights assigned to the samples. + + Returns: + float: The effective sample size. + """ return sum(weights) ** 2 / sum(weights**2) -# from https://docs.python.org/3/library/itertools.html#recipes def powerset(iterable, max_length=None): + """ + Generates the power set (all possible combinations) of an iterable up to a maximum length. + + Parameters: + iterable (iterable): The iterable to generate combinations from. + max_length (int, optional): The maximum length of combinations. + + Returns: + iterator: An iterator over the power set of the iterable. + """ "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)" s = list(iterable) if max_length is None: @@ -163,6 +291,17 @@ def powerset(iterable, max_length=None): def overlap(sg, another_sg, data): + """ + Calculates the Jaccard similarity between two subgroups based on their coverage. + + Parameters: + sg: The first subgroup. + another_sg: The second subgroup. + data (DataFrame): The dataset. + + Returns: + float: The Jaccard similarity between the two subgroups. + """ cover_sg = sg.covers(data) cover_another_sg = another_sg.covers(data) union = np.logical_or(cover_sg, cover_another_sg) @@ -175,6 +314,15 @@ def overlap(sg, another_sg, data): # bitset operations ##### def to_bits(list_of_ints): + """ + Converts a list of integers to a bitset represented as an integer. + + Parameters: + list_of_ints (list): The list of integers to convert. + + Returns: + int: The bitset represented as an integer. + """ v = 0 for x in list_of_ints: v += 1 << x @@ -182,6 +330,15 @@ def to_bits(list_of_ints): def count_bits(bitset_as_int): + """ + Counts the number of set bits (1s) in a bitset represented as an integer. + + Parameters: + bitset_as_int (int): The bitset represented as an integer. + + Returns: + int: The number of set bits. + """ c = 0 while bitset_as_int > 0: c += 1 @@ -190,6 +347,15 @@ def count_bits(bitset_as_int): def find_set_bits(bitset_as_int): + """ + Finds the indices of set bits in a bitset represented as an integer. + + Parameters: + bitset_as_int (int): The bitset represented as an integer. + + Yields: + int: The index of each set bit. + """ while bitset_as_int > 0: x = bitset_as_int.bit_length() - 1 yield x @@ -200,6 +366,16 @@ def find_set_bits(bitset_as_int): # TID-list operations ##### def intersect_of_ordered_list(list_1, list_2): + """ + Computes the intersection of two ordered lists. + + Parameters: + list_1 (list): The first ordered list. + list_2 (list): The second ordered list. + + Returns: + list: The intersection of the two lists. + """ result = [] i = 0 j = 0 @@ -216,7 +392,22 @@ def intersect_of_ordered_list(list_1, list_2): class BaseTarget: + """ + Base class for defining targets in subgroup discovery. + + Provides a method to check if all required statistics are present. + """ + def all_statistics_present(self, cached_statistics): + """ + Checks if all required statistics are present in the cached statistics. + + Parameters: + cached_statistics (dict): The dictionary of cached statistics. + + Returns: + bool: True if all required statistics are present, False otherwise. + """ # pylint: disable=no-member if isinstance(cached_statistics, dict) and all( expected_value in cached_statistics @@ -228,12 +419,34 @@ def all_statistics_present(self, cached_statistics): class SubgroupDiscoveryResult: + """ + Represents the result of a subgroup discovery task. + + Contains methods to convert results to different formats. + """ + def __init__(self, results, task): + """ + Initializes the SubgroupDiscoveryResult with the results and the task. + + Parameters: + results (Iterable): An iterable of (quality, subgroup, statistics) tuples. + task (SubgroupDiscoveryTask): The subgroup discovery task. + """ self.task = task self.results = results assert isinstance(results, Iterable) def to_descriptions(self, include_stats=False): + """ + Converts the results to a list of subgroup descriptions. + + Parameters: + include_stats (bool): If True, includes statistics in the output. + + Returns: + list: A list of subgroup descriptions. + """ if include_stats: return list(self.results) else: @@ -242,6 +455,17 @@ def to_descriptions(self, include_stats=False): def to_table( self, statistics_to_show=None, print_header=True, include_target=False ): + """ + Converts the results to a table format. + + Parameters: + statistics_to_show (list, optional): The statistics to include in the table. + print_header (bool): If True, includes a header row. + include_target (bool): If True, includes the target in the table. + + Returns: + list: A list of rows representing the table. + """ if statistics_to_show is None: statistics_to_show = type(self.task.target).statistic_types table = [] @@ -265,6 +489,17 @@ def to_table( def to_dataframe( self, statistics_to_show=None, autoround=False, include_target=False ): + """ + Converts the results to a pandas DataFrame. + + Parameters: + statistics_to_show (list, optional): The statistics to include in the DataFrame. + autoround (bool): If True, automatically rounds numerical columns. + include_target (bool): If True, includes the target in the DataFrame. + + Returns: + DataFrame: A pandas DataFrame representing the results. + """ import pandas as pd # pylint: disable=import-outside-toplevel if statistics_to_show is None: @@ -277,6 +512,16 @@ def to_dataframe( return df def to_latex(self, statistics_to_show=None, escape_underscore=True): + """ + Converts the results to a LaTeX-formatted table. + + Parameters: + statistics_to_show (list, optional): The statistics to include in the LaTeX table. + escape_underscore (bool): If True, escapes underscores in strings. + + Returns: + str: A string containing the LaTeX-formatted table. + """ if statistics_to_show is None: statistics_to_show = type(self.task.target).statistic_types df = self.to_dataframe(statistics_to_show) @@ -317,9 +562,23 @@ def add_if_required( explicit_result_set_size=None, ): """ + Adds a subgroup to the result set if it meets the required quality and constraints. + IMPORTANT: Only add/remove subgroups from `result` by using `heappop` and `heappush` to ensure order of subgroups by quality. + + Parameters: + result (list): The current list of subgroups (heap). + sg: The subgroup to potentially add. + quality (float): The quality of the subgroup. + task (SubgroupDiscoveryTask): The task containing parameters and constraints. + check_for_duplicates (bool): If True, checks for duplicates before adding. + statistics (optional): Precomputed statistics for the subgroup. + explicit_result_set_size (int, optional): Overrides the task's result_set_size. + + Returns: + None """ if explicit_result_set_size is None: explicit_result_set_size = task.result_set_size