From 2f059a948dcceae76489aede3e25d01fad06a78a Mon Sep 17 00:00:00 2001 From: MarcusRostSAP <146723913+MarcusRostSAP@users.noreply.github.com> Date: Mon, 25 Mar 2024 10:20:55 +0100 Subject: [PATCH] Explainability (#7) * Initial commit explainer module * Some experimentation * Tried a new heurstic * Some code clean up * Slight change in logic * added activation feature * Made some changes to the activation and logic * Updated notebook * some small logic changes * Notebook compile * added shapley_values * Progress on shapely for traces * Added comments * Linting issues * Linting * Linting * more linting * More linting * Linting * Linting * linter * Linter * Linting --- .gitignore | 4 + bpmnconstraints/script.py | 1 + bpmnconstraints/utils/plot.py | 1 + explainer/README.md | 47 ++ explainer/explainer.py | 546 ++++++++++++++++++++ explainer/tutorial/explainer_tutorial.ipynb | 308 +++++++++++ setup.py | 1 + tests/explainer/explainer_test.py | 261 ++++++++++ tutorial/tutorial.ipynb | 4 +- 9 files changed, 1171 insertions(+), 2 deletions(-) create mode 100644 explainer/README.md create mode 100644 explainer/explainer.py create mode 100644 explainer/tutorial/explainer_tutorial.ipynb create mode 100644 tests/explainer/explainer_test.py diff --git a/.gitignore b/.gitignore index 3dc4d04..d34c252 100644 --- a/.gitignore +++ b/.gitignore @@ -135,3 +135,7 @@ dmypy.json .idea/ .vscode/ + +# explainer Stuff +explainer/test.py +explainer/old_code.py \ No newline at end of file diff --git a/bpmnconstraints/script.py b/bpmnconstraints/script.py index 6f7eb1e..060ca00 100644 --- a/bpmnconstraints/script.py +++ b/bpmnconstraints/script.py @@ -1,4 +1,5 @@ """Entry point for bpmnsignal command. Verifies argument and runs parser.""" + # pylint: disable=import-error import argparse import logging diff --git a/bpmnconstraints/utils/plot.py b/bpmnconstraints/utils/plot.py index 7a5990c..434b782 100644 --- a/bpmnconstraints/utils/plot.py +++ b/bpmnconstraints/utils/plot.py @@ -1,6 +1,7 @@ """ Module for plotting functions. """ + # pylint: disable=import-error import matplotlib.pyplot as plt diff --git a/explainer/README.md b/explainer/README.md new file mode 100644 index 0000000..e4052a2 --- /dev/null +++ b/explainer/README.md @@ -0,0 +1,47 @@ +# Symbolic Explanations of Process Conformance Violations +## Introduction + + +# Regex usage for the first iteration of the software + +## 1. Sequence Constraint +Pattern: `'A.*B.*C'` + +Explanation: This regex specifies that for a trace to be conformant, it must contain the nodes 'A', 'B', and 'C' in that order, though not necessarily consecutively. The .* allows for any number of intervening nodes between the specified nodes. + +> Example: A trace ['A', 'X', 'B', 'Y', 'C'] would be conformant, while ['A', 'C', 'B'] would not. + +## 2. Immediate Succession +Pattern: `'AB'` + +Explanation: This regex specifies that node 'A' must be immediately followed by node 'B' with no intervening nodes. + +> Example: A trace ['A', 'B', 'C'] would be conformant, while ['A', 'X', 'B'] would not. + +## 3. Optional Node +Pattern: `'A(B)?C'` + +Explanation: This regex specifies that the node 'B' is optional between 'A' and 'C'. The node 'C' must follow 'A', but 'B' can either be present or absent. + +> Example: Both traces ['A', 'B', 'C'] and ['A', 'C'] would be conformant. + +## 4. Excluding Specific Nodes +Pattern: `'A[^D]*B'` + +Explanation: This regex specifies that 'A' must be followed by 'B' without the occurrence of 'D' in between them. The [^D] part matches any character except 'D'. + +> Example: A trace ['A', 'C', 'B'] would be conformant, while ['A', 'D', 'B'] would not. + +## 5. Repetition of Nodes +Pattern: `'A(B{2,3})C'` + +Explanation: This regex specifies that 'A' must be followed by 'B' repeated 2 to 3 times and then followed by 'C'. + +> Example: Traces ['A', 'B', 'B', 'C'] and ['A', 'B', 'B', 'B', 'C'] would be conformant, while ['A', 'B', 'C'] or ['A', 'B', 'B', 'B', 'B', 'C'] would not. + +## 6. Alternative Paths +Pattern: `'A(B|D)C'` + +Explanation: This regex specifies that after 'A', there must be either a 'B' or a 'D', followed by a 'C'. + +> Example: Both traces ['A', 'B', 'C'] and ['A', 'D', 'C'] would be conformant. diff --git a/explainer/explainer.py b/explainer/explainer.py new file mode 100644 index 0000000..ffb2811 --- /dev/null +++ b/explainer/explainer.py @@ -0,0 +1,546 @@ +import math +import re +from itertools import combinations, product, chain + + +class Trace: + def __init__(self, nodes): + """ + Initializes a Trace instance. + + :param nodes: A list of nodes where each node is represented as a string label. + """ + self.nodes = nodes + + def __len__(self): + """ + Returns the number of nodes in the trace. + """ + return len(self.nodes) + + def __iter__(self): + """ + Initializes the iteration over the nodes in the trace. + """ + self.index = 0 + return self + + def __next__(self): + """ + Returns the next node in the trace during iteration. + """ + if self.index < len(self.nodes): + result = self.nodes[self.index] + self.index += 1 + return result + else: + raise StopIteration + + def __split__(self): + """ + Splits the nodes of the trace into a list. + + :return: A list containing the nodes of the trace. + """ + spl = [] + for node in self.nodes: + spl.append(node) + return spl + + +class EventLog: + def __init__(self, trace=None): + """ + Initializes an EventLog instance. + + :param traces: A list of Trace instances. + """ + self.log = {} + if trace: + self.add_trace(trace) + + def add_trace(self, trace, count=1): + """ + Adds a trace to the log or increments its count if it already exists. + + :param trace: A Trace instance to add. + """ + trace_tuple = tuple(trace.nodes) + if trace_tuple in self.log: + self.log[trace_tuple] += count + else: + self.log[trace_tuple] = count + + def remove_trace(self, trace, count=1): + """ + Removes a trace from the log or decrements its count if the count is greater than 1. + + :param trace: A Trace instance to remove. + """ + trace_tuple = tuple(trace.nodes) + if trace_tuple in self.log: + if self.log[trace_tuple] > count: + self.log[trace_tuple] -= count + else: + del self.log[trace_tuple] + + def __str__(self): + """ + Returns a string representation of the event log. + """ + return str(self.log) + + def __len__(self): + """ + Returns the total number of trace occurrences in the log. + """ + return sum(self.log.values()) + + def __iter__(self): + """ + Allows iteration over each trace occurrence in the log. + """ + for trace_tuple, count in self.log.items(): + for _ in range(count): + yield Trace(list(trace_tuple)) + + +class Explainer: + def __init__(self): + """ + Initializes an Explainer instance. + """ + self.constraints = [] # List to store constraints (regex patterns) + self.adherent_trace = None + + def add_constraint(self, regex): + """ + Adds a new constraint and updates the nodes list. + + :param regex: A regular expression representing the constraint. + """ + self.constraints.append(regex) + if self.contradiction(): + self.constraints.remove(regex) + print(f"Constraint {regex} contradicts the other constraints.") + + def remove_constraint(self, idx): + """ + Removes a constraint by index and updates the nodes list if necessary. + + :param idx: Index of the constraint to be removed. + """ + if 0 <= idx < len(self.constraints): + removed_regex = self.constraints.pop(idx) + removed_nodes = set(filter(str.isalpha, removed_regex)) + + # Re-evaluate nodes to keep based on remaining constraints + remaining_nodes = set(filter(str.isalpha, "".join(self.constraints))) + self.nodes = remaining_nodes + + # Optionally, remove nodes that are no longer in any constraint + for node in removed_nodes: + if node not in remaining_nodes: + self.nodes.discard(node) + + def activation(self, trace, constraints=None): + """ + Checks if any of the nodes in the trace activates any constraint. + + :param trace: A Trace instance. + :return: Boolean indicating if any constraint is activated. + """ + if not constraints: + constraints = self.constraints + con_activation = [0] * len(constraints) + activated = False + for idx, con in enumerate(constraints): + if activated: + activated = False + continue + target = self.identify_existance_constraints(con) + if target: + con_activation[idx] = 1 + continue + for event in trace: + if event in con: + con_activation[idx] = 1 + activated = True + break + return con_activation + + def identify_existance_constraints(self, pattern): + """ + Identifies existance constraints within a pattern. + + :param pattern: The constraint pattern as a string. + :return: A tuple indicating the type of existance constraint and the node involved. + """ + # Check for AtLeastOne constraint + for match in re.finditer(r"(? 100: + return f"{explanation}\n Maximum depth of {depth -1} reached" + score = self.evaluate_similarity(working_trace) + return self.operate_on_trace(working_trace, score, explanation, depth) + + def operate_on_trace(self, trace, score, explanation_path, depth=0): + """ + Finds and applies modifications to the trace to make it conformant. + + :param trace: The trace to be modified. + :param score: The similarity score of the trace. + :param explanation_path: The current explanation path. + :param depth: The current recursion depth. + :return: A string explaining why the best subtrace is non-conformant or a message indicating the maximum depth has been reached. + """ + explanation = None + counter_factuals = self.modify_subtrace(trace) + best_subtrace = None + best_score = -float("inf") + for subtrace in counter_factuals: + current_score = self.evaluate_similarity(subtrace[0]) + if current_score > best_score and current_score > score: + best_score = current_score + best_subtrace = subtrace[0] + explanation = subtrace[1] + if best_subtrace == None: + for subtrace in counter_factuals: + self.operate_on_trace(subtrace[0], score, explanation_path, depth + 1) + explanation_string = explanation_path + "\n" + str(explanation) + return self.counter_factual_helper(best_subtrace, explanation_string, depth + 1) + + def get_nodes_from_constraint(self, constraint=None): + """ + Extracts unique nodes from a constraint pattern. + + :param constraint: The constraint pattern as a string. + :return: A list of unique nodes found within the constraint. + """ + if constraint is None: + all_nodes = set() + for con in self.constraints: + all_nodes.update(re.findall(r"[A-Za-z]", con)) + return list(set(all_nodes)) + else: + return list(set(re.findall(r"[A-Za-z]", constraint))) + + def modify_subtrace(self, trace): + """ + Modifies the given trace to meet constraints by adding nodes where the pattern fails. + + Parameters: + - trace: A list of node identifiers + + Returns: + - A list of potential subtraces each modified to meet constraints. + """ + potential_subtraces = [] + possible_additions = self.get_nodes_from_constraint() + for i, s_trace in enumerate(get_iterative_subtrace(trace)): + for con in self.constraints: + new_trace_str = "".join(s_trace) + match = re.match(new_trace_str, con) + if not match: + for add in possible_additions: + potential_subtraces.append( + [ + Trace(s_trace + [add] + trace.nodes[i + 1 :]), + f"Addition (Added {add} at position {i+1}): " + + "->".join(s_trace + [add] + trace.nodes[i + 1 :]), + ] + ) + potential_subtraces.append( + [ + Trace(s_trace[:-1] + [add] + trace.nodes[i:]), + f"Addition (Added {add} at position {i}): " + + "->".join(s_trace[:-1] + [add] + trace.nodes[i:]), + ] + ) + + potential_subtraces.append( + [ + Trace(s_trace[:-1] + trace.nodes[i + 1 :]), + f"Subtraction (Removed {s_trace[i]} from position {i}): " + + "->".join(s_trace[:-1] + trace.nodes[i + 1 :]), + ] + ) + return potential_subtraces + + def determine_shapley_value(self, log, constraints, index): + """Determines the Shapley value-based contribution of a constraint to a the + overall conformance rate. + Args: + log (dictionary): The event log, where keys are strings and values are + ints + constraints (list): A list of constraints (regexp strings) + index (int): The + Returns: + float: The contribution of the constraint to the overall conformance + rate + """ + if len(constraints) < index: + raise Exception("Constraint not in constraint list.") + contributor = constraints[index] + sub_ctrbs = [] + reduced_constraints = [c for c in constraints if not c == contributor] + subsets = determine_powerset(reduced_constraints) + for subset in subsets: + lsubset = list(subset) + constraints_without = [c for c in constraints if c in lsubset] + constraints_with = [c for c in constraints if c in lsubset + [contributor]] + weight = ( + math.factorial(len(lsubset)) + * math.factorial(len(constraints) - 1 - len(lsubset)) + ) / math.factorial(len(constraints)) + sub_ctrb = weight * ( + self.determine_conformance_rate(log, constraints_without) + - self.determine_conformance_rate(log, constraints_with) + ) + sub_ctrbs.append(sub_ctrb) + return sum(sub_ctrbs) + + def evaluate_similarity(self, trace): + """ + Calculates the similarity between the adherent trace and the given trace using the Levenshtein distance. + + :param trace: The trace to compare with the adherent trace. + :return: A normalized score indicating the similarity between the adherent trace and the given trace. + """ + length = len(self.adherent_trace) + trace_len = len("".join(trace)) + lev_distance = levenshtein_distance(self.adherent_trace, "".join(trace)) + max_distance = max(length, trace_len) + normalized_score = 1 - lev_distance / max_distance + return normalized_score + + def determine_conformance_rate(self, event_log, constraints=None): + """ + Determines the conformance rate of the event log based on the given constraints. + + :param event_log: The event log to analyze. + :param constraints: The constraints to check against the event log. + :return: The conformance rate as a float between 0 and 1, or a message if no constraints are provided. + """ + if not self.constraints and not constraints: + return "The explainer have no constraints" + len_log = len(event_log) + if len_log == 0: + return 1 + non_conformant = 0 + if constraints == None: + constraints = self.constraints + for trace, count in event_log.log.items(): + for con in constraints: + if not re.search(con, "".join(trace)): + non_conformant += count + break + return (len_log - non_conformant) / len_log + + def trace_contribution_to_conformance_loss( + self, event_log, trace, constraints=None + ): + """ + Calculates the contribution of a specific trace to the conformance loss of the event log. + + :param event_log: The event log to analyze. + :param trace: The trace to calculate its contribution. + :param constraints: The constraints to check against the event log. + :return: The contribution of the trace to the conformance loss as a float between 0 and 1. + """ + if not constraints: + constraints = self.constraints + total_traces = len(event_log) + contribution_of_trace = 0 + for t, count in event_log.log.items(): + if not self.conformant(t, constraints): + if trace.nodes == list(t): + contribution_of_trace = count + + return contribution_of_trace / total_traces + + +def determine_powerset(elements): + """Determines the powerset of a list of elements + Args: + elements (set): Set of elements + Returns: + list: Powerset of elements + """ + lset = list(elements) + ps_elements = chain.from_iterable( + combinations(lset, option) for option in range(len(lset) + 1) + ) + return [set(ps_element) for ps_element in ps_elements] + + +def get_sublists(lst): + """ + Generates all possible non-empty sublists of a list. + + :param lst: The input list. + :return: A list of all non-empty sublists. + """ + sublists = [] + for r in range(2, len(lst) + 1): # Generate combinations of length 2 to n + sublists.extend(combinations(lst, r)) + return sublists + + +def get_iterative_subtrace(trace): + """ + Generates all possible non-empty contiguous sublists of a list, maintaining order. + + :param lst: The input list. + n: the minmum length of sublists + :return: A list of all non-empty contiguous sublists. + """ + sublists = [] + for i in range(0, len(trace)): + sublists.append(trace.nodes[0 : i + 1]) + + return sublists + + +def levenshtein_distance(seq1, seq2): + """ + Calculates the Levenshtein distance between two sequences. + + Args: + seq1 (str): The first sequence. + seq2 (str): The second sequence. + + Returns: + int: The Levenshtein distance between the two sequences. + """ + size_x = len(seq1) + 1 + size_y = len(seq2) + 1 + matrix = [[0] * size_y for _ in range(size_x)] + for x in range(size_x): + matrix[x][0] = x + for y in range(size_y): + matrix[0][y] = y + + for x in range(1, size_x): + for y in range(1, size_y): + if seq1[x - 1] == seq2[y - 1]: + matrix[x][y] = matrix[x - 1][y - 1] + else: + matrix[x][y] = min( + matrix[x - 1][y] + 1, matrix[x][y - 1] + 1, matrix[x - 1][y - 1] + 1 + ) + return matrix[size_x - 1][size_y - 1] diff --git a/explainer/tutorial/explainer_tutorial.ipynb b/explainer/tutorial/explainer_tutorial.ipynb new file mode 100644 index 0000000..c8c0504 --- /dev/null +++ b/explainer/tutorial/explainer_tutorial.ipynb @@ -0,0 +1,308 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Explainer utility in BPMN2CONSTRAINTS\n", + "\n", + "In this notebook, we explore the `Explainer` class, designed to analyze and explain the conformance of traces against predefined constraints. Trace analysis is crucial in domains such as process mining, where understanding the behavior of system executions against expected models can uncover inefficiencies, deviations, or compliance issues.\n", + "\n", + "The constraints currently consists of basic regex, this is because of it's similiarities and likeness to declarative constraints used in BPMN2CONSTRAINTS\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 1: Setup" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import sys\n", + "sys.path.append('../')\n", + "from explainer import Explainer, Trace" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 2: Basic Usage\n", + "Let's start by creating an instance of the `Explainer` and adding a simple constraint that a valid trace should contain the sequence \"A\" followed by \"B\" and then \"C\".\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "explainer = Explainer()\n", + "explainer.add_constraint('A.*B.*C')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 3: Analyzing Trace Conformance\n", + "\n", + "Now, we'll create a trace and check if it conforms to the constraints we've defined." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Is the trace conformant? True\n" + ] + } + ], + "source": [ + "trace = Trace(['A', 'X', 'B', 'Y', 'C'])\n", + "is_conformant = explainer.conformant(trace)\n", + "print(f\"Is the trace conformant? {is_conformant}\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 4: Explaining Non-conformance\n", + "\n", + "If a trace is not conformant, we can use the `minimal_expl` and `counterfactual_expl` methods to understand why and how to adjust the trace.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Constraint: A.*B.*C\n", + "Trace:['A', 'C']\n", + "\n", + "Addition (Added B at position 1): A->B->C\n", + "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('A', 'C')\n", + "-----------\n", + "Constraint: A.*B.*C\n", + "Trace:['C', 'B', 'A']\n", + "\n", + "Addition (Added A at position 1): C->A->B->A\n", + "Subtraction (Removed C from position 0): A->B->A\n", + "Addition (Added C at position 2): A->B->C->A\n", + "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('C', 'B')\n", + "-----------\n", + "Constraint: A.*B.*C\n", + "Trace:['A', 'A', 'C']\n", + "\n", + "Addition (Added B at position 2): A->A->B->C\n", + "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('A', 'A')\n", + "-----------\n", + "Constraint: A.*B.*C\n", + "Trace:['A', 'A', 'C', 'A', 'TEST', 'A', 'C', 'X', 'Y']\n", + "-----------\n", + "Constraint: AC\n", + "Trace:['A', 'X', 'C']\n", + "\n", + "Subtraction (Removed X from position 1): A->C\n", + "Non-conformance due to: Constraint (AC) is violated by subtrace: ('A', 'X')\n", + "-----------\n", + "constraint: AC\n", + "constraint: B.*A.*B.*C\n", + "constraint: A.*B.*C.*\n", + "constraint: A.*D.*B*\n", + "constraint: A[^D]*B\n", + "constraint: B.*[^X].*\n", + "Trace:['A', 'X', 'C']\n", + "\n", + "Subtraction (Removed X from position 1): A->C\n", + "Non-conformance due to: Constraint (AC) is violated by subtrace: ('A', 'X')\n" + ] + } + ], + "source": [ + "non_conformant_trace = Trace(['A', 'C'])\n", + "print('Constraint: A.*B.*C')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "print(explainer.counterfactual_expl(non_conformant_trace))\n", + "print(explainer.minimal_expl(non_conformant_trace))\n", + "\n", + "non_conformant_trace = Trace(['C', 'B', 'A'])\n", + "print('-----------')\n", + "print('Constraint: A.*B.*C')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "print(explainer.counterfactual_expl(non_conformant_trace))\n", + "print(explainer.minimal_expl(non_conformant_trace))\n", + "\n", + "non_conformant_trace = Trace(['A','A','C'])\n", + "print('-----------')\n", + "print('Constraint: A.*B.*C')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "print(explainer.counterfactual_expl(non_conformant_trace))\n", + "print(explainer.minimal_expl(non_conformant_trace))\n", + "\n", + "\n", + "non_conformant_trace = Trace(['A','A','C','A','TEST','A','C', 'X', 'Y']) \n", + "print('-----------')\n", + "print('Constraint: A.*B.*C')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "#print(explainer.counterfactual_expl(non_conformant_trace))\n", + "#print(explainer.minimal_expl(non_conformant_trace))\n", + "\n", + "\n", + "explainer.remove_constraint(0)\n", + "explainer.add_constraint('AC')\n", + "non_conformant_trace = Trace(['A', 'X', 'C']) #Substraction\n", + "print('-----------')\n", + "print('Constraint: AC')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "print(explainer.counterfactual_expl(non_conformant_trace))\n", + "print(explainer.minimal_expl(non_conformant_trace))\n", + "print('-----------')\n", + "\n", + "explainer.add_constraint('B.*A.*B.*C')\n", + "explainer.add_constraint('A.*B.*C.*')\n", + "explainer.add_constraint('A.*D.*B*')\n", + "explainer.add_constraint('A[^D]*B')\n", + "explainer.add_constraint('B.*[^X].*')\n", + "non_conformant_trace = Trace(['A', 'X', 'C']) #Substraction\n", + "for con in explainer.constraints:\n", + " print(f'constraint: {con}')\n", + "print('Trace:' + str(non_conformant_trace.nodes))\n", + "print(explainer.counterfactual_expl(non_conformant_trace))\n", + "print(explainer.minimal_expl(non_conformant_trace))\n", + "\n", + "\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Step 5: Event Logs and Shapely values\n", + "\n", + "The event logs in this context is built with traces, here's how you set them up." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Conformance rate: 0.2\n", + "Contribution ^A: 0.5\n", + "Contribution C$: 0.30000000000000004\n" + ] + } + ], + "source": [ + "from explainer import EventLog\n", + "\n", + "event_log = EventLog()\n", + "trace1 = Trace(['A', 'B', 'C'])\n", + "trace2 = Trace(['B', 'C'])\n", + "trace3 = Trace(['A', 'B'])\n", + "trace4 = Trace(['B'])\n", + "\n", + "event_log.add_trace(trace1, 5) # The second is how many traces you'd like to add, leave blank for 1\n", + "event_log.add_trace(trace2, 10)\n", + "event_log.add_trace(trace3, 5)\n", + "event_log.add_trace(trace4, 5)\n", + "\n", + "\n", + "exp = Explainer()\n", + "exp.add_constraint(\"^A\")\n", + "exp.add_constraint(\"C$\")\n", + "print(\"Conformance rate: \"+ str(exp.determine_conformance_rate(event_log)))\n", + "print('Contribution ^A:', exp.determine_shapley_value(event_log, exp.constraints, 0))\n", + "print('Contribution C$:', exp.determine_shapley_value(event_log, exp.constraints, 1))\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "conformant AC :True\n", + "Conformance rate: 0.14\n", + "Contribution C$: 0.21\n", + "Contribution ^A: 0.36\n", + "Contribution B+: 0.29\n" + ] + } + ], + "source": [ + "exp = Explainer()\n", + "event_log = EventLog()\n", + "trace1 = Trace(['A', 'B', 'C'])\n", + "trace2 = Trace(['B', 'C'])\n", + "trace3 = Trace(['A', 'B'])\n", + "trace4 = Trace(['B'])\n", + "trace5 = Trace(['A', 'C'])\n", + "\n", + "\n", + "event_log.add_trace(trace1, 5) # The second is how many traces you'd like to add, leave blank for 1\n", + "event_log.add_trace(trace2, 10)\n", + "event_log.add_trace(trace3, 5)\n", + "event_log.add_trace(trace4, 5)\n", + "event_log.add_trace(trace5, 10)\n", + "\n", + "\n", + "exp = Explainer()\n", + "exp.add_constraint(\"C$\")\n", + "exp.add_constraint(\"^A\")\n", + "exp.add_constraint(\"B+\")\n", + "print(\"conformant AC :\" + str(exp.conformant(trace5)))\n", + "print(\"Conformance rate: \"+ str(round(exp.determine_conformance_rate(event_log), 2)))\n", + "print('Contribution C$:', round(exp.determine_shapley_value(event_log, exp.constraints, 0), 2))\n", + "print('Contribution ^A:', round(exp.determine_shapley_value(event_log, exp.constraints, 1), 2))\n", + "print('Contribution B+:', round(exp.determine_shapley_value(event_log, exp.constraints, 2), 2))\n", + "\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.12" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/setup.py b/setup.py index dffd6fe..2882653 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,5 @@ """Setup for running the bpmnconstraints script.""" + import setuptools with open("README.md", encoding="utf-8") as file: diff --git a/tests/explainer/explainer_test.py b/tests/explainer/explainer_test.py new file mode 100644 index 0000000..e22fb32 --- /dev/null +++ b/tests/explainer/explainer_test.py @@ -0,0 +1,261 @@ +from explainer.explainer import * + + +# Test 1: Adding and checking constraints +def test_add_constraint(): + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + assert "A.*B.*C" in explainer.constraints, "Constraint 'A.*B.*C' should be added." + + +# Test 2: Removing constraints +def test_remove_constraint(): + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + explainer.add_constraint("B.*C") + explainer.remove_constraint(0) + assert ( + "A.*B.*C" not in explainer.constraints, + ), "Constraint 'A.*B.*C' should be removed." + + +# Test 3: Activation of constraints +def test_activation(): + trace = Trace(["A", "B", "C"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + assert explainer.activation(trace), "The trace should activate the constraint." + + +# Test 4: Checking conformance of traces +def test_conformance(): + trace = Trace(["A", "B", "C"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + assert explainer.conformant(trace), "The trace should be conformant." + + +# Test 5: Non-conformance explanation +def test_non_conformance_explanation(): + trace = Trace(["C", "A", "B"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + explanation = explainer.minimal_expl(trace) + assert "violated" in explanation, "The explanation should indicate a violation." + + +# Test 6: Overlapping constraints +def test_overlapping_constraints(): + trace = Trace(["A", "B", "A", "C"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + explainer.add_constraint("A.*A.*C") + assert explainer.conformant( + trace + ), "The trace should be conformant with overlapping constraints." + + +# Test 7: Partially meeting constraints +def test_partial_conformance(): + trace = Trace(["A", "C", "B"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + assert not explainer.conformant(trace), "The trace should not be fully conformant." + + +# Test 8: Constraints with repeated nodes +def test_constraints_with_repeated_nodes(): + trace = Trace(["A", "A", "B", "A"]) + explainer = Explainer() + explainer.add_constraint("A.*A.*B.*A") + assert explainer.conformant( + trace + ), "The trace should conform to the constraint with repeated nodes." + + +# Test 9: Removing constraints and checking nodes list +def test_remove_constraint_and_check_nodes(): + explainer = Explainer() + explainer.add_constraint("A.*B") + explainer.add_constraint("B.*C") + explainer.remove_constraint(0) + assert ( + "A" not in explainer.nodes and "B" in explainer.nodes and "C" in explainer.nodes + ), "Node 'A' should be removed, while 'B' and 'C' remain." + + +# Test 10: Complex regex constraint +def test_complex_regex_constraint(): + trace = Trace(["A", "X", "B", "Y", "C"]) + explainer = Explainer() + explainer.add_constraint( + "A.*X.*B.*Y.*C" + ) # Specifically expects certain nodes in order + assert explainer.conformant( + trace + ), "The trace should conform to the complex regex constraint." + + +# Test 11: Constraint not covered by any trace node +def test_constraint_not_covered(): + trace = Trace(["A", "B", "C"]) + explainer = Explainer() + explainer.add_constraint("D*") # This node "D" does not exist in the trace + assert explainer.activation(trace) == [ + 0 + ], "The constraint should not be activated by the trace." + + +# Test 12: Empty trace and constraints +def test_empty_trace_and_constraints(): + trace = Trace([]) + explainer = Explainer() + explainer.add_constraint("") # Adding an empty constraint + assert explainer.conformant( + trace + ), "An empty trace should be conformant with an empty constraint." + + +# Test 13: Removing non-existent constraint index +def test_remove_nonexistent_constraint(): + explainer = Explainer() + explainer.add_constraint("A.*B") + explainer.remove_constraint(10) # Non-existent index + assert ( + len(explainer.constraints) == 1 + ), "Removing a non-existent constraint should not change the constraints list." + + +# Test 14: Activation with no constraints +def test_activation_with_no_constraints(): + trace = Trace(["A", "B", "C"]) + explainer = Explainer() + assert not explainer.activation(trace), "No constraints should mean no activation." + + +# Test 15: Trace conformance against multiple constraints +def test_trace_conformance_against_multiple_constraints(): + trace1 = Trace( + ["A", "B", "D"] + ) # This trace should not be fully conformant as it only matches one constraint + trace2 = Trace( + ["A", "B", "C", "D"] + ) # This trace should be conformant as it matches both constraints + + explainer = Explainer() + explainer.add_constraint("A.*B.*C") # Both traces attempt to conform to this + explainer.add_constraint("B.*D") # And to this + + # Checking conformance + assert not explainer.conformant( + trace1 + ), "Trace1 should not be conformant as it does not satisfy all constraints." + assert explainer.conformant( + trace2 + ), "Trace2 should be conformant as it satisfies all constraints." + + +# Test 16: Conformant trace does not generate minimal explaination +def test_conformant_trace_handled_correctly(): + trace = Trace(["A", "B"]) + explainer = Explainer() + explainer.add_constraint("AB") + + assert ( + explainer.minimal_expl(trace) + == "The trace is already conformant, no changes needed." + ) + + +# Test 17: Conformant trace +def test_explainer_methods(): + trace = Trace(["A", "B", "C"]) + explainer = Explainer() + explainer.add_constraint("A.*B.*C") + explainer.add_constraint("B.*C") + + assert ( + explainer.conformant(trace) == True + ), "Test 1 Failed: Trace should be conformant." + assert ( + explainer.minimal_expl(trace) + == "The trace is already conformant, no changes needed." + ), "Test 1 Failed: Incorrect minimal explanation for a conformant trace." + assert ( + explainer.counterfactual_expl(trace) + == "The trace is already conformant, no changes needed." + ), "Test 1 Failed: Incorrect counterfactual explanation for a conformant trace." + + +# Test 18: Some explaination test +def test_explaination(): + explainer = Explainer() + + conformant_trace = Trace(["A", "B", "C"]) + non_conformant_trace = Trace(["A", "C"]) + + explainer.add_constraint("A.*B.*C") + + assert explainer.conformant(non_conformant_trace) == False + assert explainer.conformant(conformant_trace) == True + assert ( + explainer.minimal_expl(non_conformant_trace) + == "Non-conformance due to: Constraint (A.*B.*C) is violated by subtrace: ('A', 'C')" + ) + assert ( + explainer.counterfactual_expl(non_conformant_trace) + == "\nAddition (Added B at position 1): A->B->C" + ) + + +# Test 19: Complex explaination test. +""" +This part is not very complex as of now and is very much up for change, the complexity of counterfactuals +proved to be slightly larger than expected +""" + + +def test_complex_counterfactual_explanation(): + explainer = Explainer() + + explainer.add_constraint("ABB*C") + + non_conformant_trace = Trace(["A", "C", "E", "D"]) + + counterfactual_explanation = explainer.counterfactual_expl(non_conformant_trace) + + assert ( + counterfactual_explanation + == "\nAddition (Added B at position 1): A->B->C->E->D" + ) + + +# Test 20: Event logs +def test_event_log(): + event_log = EventLog() + assert event_log != None + trace = Trace(["A", "B", "C"]) + event_log.add_trace(trace) + assert event_log.log == { + ("A", "B", "C"): 1 + } # There should be one instance of the trace in the log + event_log.add_trace(trace, 5) + assert event_log.log == { + ("A", "B", "C"): 6 + } # There should be 6 instances of the trace in the log + event_log.remove_trace(trace) + assert event_log.log == { + ("A", "B", "C"): 5 + } # There should be 5 instances of the trace + event_log.remove_trace(trace, 5) + assert event_log.log == {} # The log should be emptied + event_log.add_trace(trace, 5) + event_log.remove_trace(trace, 10) + assert event_log.log == {} # The log should be emptied + trace2 = Trace(["X", "Y", "Z"]) + event_log.add_trace(trace, 5) + event_log.add_trace(trace2, 7) + assert event_log.log == { + ("A", "B", "C"): 5, + ("X", "Y", "Z"): 7, + } # There should be several traces in the log diff --git a/tutorial/tutorial.ipynb b/tutorial/tutorial.ipynb index 3755a8d..7e7d8a1 100644 --- a/tutorial/tutorial.ipynb +++ b/tutorial/tutorial.ipynb @@ -106,7 +106,7 @@ "outputs": [ { "data": { - "image/png": "\n", + "image/png": "", "text/plain": [ "" ] @@ -488,7 +488,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.10.2" + "version": "3.12.1" } }, "nbformat": 4,