From 0219b3f90eb9e3978c5c94122880a1d4e4add579 Mon Sep 17 00:00:00 2001 From: weilycoder Date: Tue, 10 Dec 2024 22:47:49 +0800 Subject: [PATCH 1/6] Add SwitchGraph --- cyaron/graph.py | 89 +++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 87 insertions(+), 2 deletions(-) diff --git a/cyaron/graph.py b/cyaron/graph.py index 3775b47..3f8f349 100644 --- a/cyaron/graph.py +++ b/cyaron/graph.py @@ -1,9 +1,10 @@ from .utils import * from .vector import Vector import random -from typing import TypeVar, Callable +import itertools +from typing import Dict, Iterable, List, Tuple, TypeVar, Callable, Union -__all__ = ["Edge", "Graph"] +__all__ = ["Edge", "Graph", "SwitchGraph"] class Edge: @@ -34,6 +35,90 @@ def unweighted_edge(edge): return '%d %d' % (edge.start, edge.end) +class SwitchGraph: + """A graph which can switch edges quickly + """ + directed: bool + __edges: Dict[Tuple[int, int], int] + + def get_edges(self): + ret: List[Tuple[int, int]] = [] + for k in self.__edges: + ret.extend(itertools.repeat(k, self.__edges[k])) + return ret + + def __insert(self, u: int, v: int): + if (u, v) not in self.__edges: + self.__edges[(u, v)] = 0 + self.__edges[(u, v)] += 1 + + def __remove(self, u: int, v: int): + self.__edges[(u, v)] -= 1 + if self.__edges[(u, v)] == 0: + self.__edges.pop((u, v)) + + def insert(self, u: int, v: int): + """Add edge (u, v) + """ + self.__insert(u, v) + if not self.directed and u != v: + self.__insert(v, u) + + def remove(self, u: int, v: int): + """Remove edge (u, v) + """ + self.__remove(u, v) + if not self.directed and u != v: + self.__remove(v, u) + + def __init__(self, + E: Iterable[Union[Edge, Tuple[int, int]]], + directed: bool = True): + self.directed = directed + self.__edges = {} + for e in E: + if isinstance(e, Edge): + self.__insert(e.start, e.end) + else: + self.__insert(e[0], e[1]) + + def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): + """Mutates the current directed graph by swapping pairs of edges, without impacting the degree sequence. + + A switch is a general term for a small change in the structure of a graph, achieved by swapping small numbers + of edges. + + Returns: + If a switch was performed, then return True. If the switch was rejected, then return False. + """ + first, second = random.choices(list(self.__edges.keys()), + list(self.__edges.values()), + k=2) + x1, y1 = first + x2, y2 = second + + if self_loop: + if x1 == x2 or y1 == y2: + return False + else: + if {x1, y1} & {x2, y2} != set(): + return False + + if repeated_edges: + if (x1, y2) in self.__edges or (x2, y1) in self.__edges: + return False + + self.remove(x1, y1) + self.insert(x1, y2) + self.remove(x2, y2) + self.insert(x2, y1) + + return True + + def __iter__(self): + return iter(self.__edges) + + class Graph: """Class Graph: A class of the graph """ From 4192f07fd179e69cc85430d37e8c7de1d604ce9c Mon Sep 17 00:00:00 2001 From: weilycoder Date: Sat, 14 Dec 2024 21:34:07 +0800 Subject: [PATCH 2/6] Support to generate a directed graph greedily based on the degree sequence --- cyaron/graph.py | 49 ++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/cyaron/graph.py b/cyaron/graph.py index 3f8f349..0db95d8 100644 --- a/cyaron/graph.py +++ b/cyaron/graph.py @@ -2,7 +2,7 @@ from .vector import Vector import random import itertools -from typing import Dict, Iterable, List, Tuple, TypeVar, Callable, Union +from typing import Callable, Dict, Iterable, List, Sequence, Tuple, TypeVar, Union, cast __all__ = ["Edge", "Graph", "SwitchGraph"] @@ -115,6 +115,53 @@ def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): return True + @staticmethod + def from_directed_degree_sequence( + degree_sequence: Sequence[Tuple[int, int]], + start_id: int = 1, + *, + self_loop: bool = False, + repeated_edges: bool = False) -> "SwitchGraph": + if any(x < 0 or y < 0 for (x, y) in degree_sequence): + raise ValueError("Degree sequence is not graphical.") + + x, y = zip(*degree_sequence) + if sum(x) != sum(y): + raise ValueError("Degree sequence is not graphical.") + + ret = SwitchGraph((), True) + + if len(degree_sequence) == 0: + return ret + + degseq = [[sout, sin, vn] + for vn, (sin, sout) in enumerate(degree_sequence, start_id)] + degseq.sort(reverse=True) + + try: + while max(s[1] for s in degseq) > 0: + kk = [i for i in range(len(degseq)) if degseq[i][1] > 0] + _, in_deg, vto = degseq[kk[0]] + degseq[kk[0]][1] = 0 + j = 0 + while in_deg: + _, _, vfrom = degseq[j] + if vto == vfrom and not self_loop: + j += 1 + _, _, vfrom = degseq[j] + while in_deg and degseq[j][0]: + in_deg -= 1 + degseq[j][0] -= 1 + ret.insert(vfrom, vto) + if not repeated_edges: + break + j += 1 + degseq.sort(reverse=True) + except IndexError: + raise ValueError("Degree sequence is not graphical.") + + return ret + def __iter__(self): return iter(self.__edges) From 2ad15a780ec8d7d350b311935964358008f66c98 Mon Sep 17 00:00:00 2001 From: weilycoder Date: Sun, 15 Dec 2024 02:00:37 +0800 Subject: [PATCH 3/6] Support undirected graph --- cyaron/graph.py | 58 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/cyaron/graph.py b/cyaron/graph.py index 0db95d8..793da59 100644 --- a/cyaron/graph.py +++ b/cyaron/graph.py @@ -44,8 +44,9 @@ class SwitchGraph: def get_edges(self): ret: List[Tuple[int, int]] = [] for k in self.__edges: - ret.extend(itertools.repeat(k, self.__edges[k])) - return ret + if self.directed or k[0] <= k[1]: + ret.extend(itertools.repeat(k, self.__edges[k])) + return sorted(ret) def __insert(self, u: int, v: int): if (u, v) not in self.__edges: @@ -78,9 +79,9 @@ def __init__(self, self.__edges = {} for e in E: if isinstance(e, Edge): - self.__insert(e.start, e.end) + self.insert(e.start, e.end) else: - self.__insert(e[0], e[1]) + self.insert(e[0], e[1]) def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): """Mutates the current directed graph by swapping pairs of edges, without impacting the degree sequence. @@ -94,8 +95,8 @@ def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): first, second = random.choices(list(self.__edges.keys()), list(self.__edges.values()), k=2) - x1, y1 = first - x2, y2 = second + x1, y1 = first if self.directed else sorted(first) + x2, y2 = second if self.directed else sorted(second) if self_loop: if x1 == x2 or y1 == y2: @@ -162,6 +163,51 @@ def from_directed_degree_sequence( return ret + @staticmethod + def from_undirected_degree_sequence( + degree_sequence: Sequence[int], + start_id: int = 1, + *, + self_loop: bool = False, + repeated_edges: bool = False) -> "SwitchGraph": + if any(x < 0 for x in degree_sequence): + raise ValueError("Degree sequence is not graphical.") + + if sum(degree_sequence) % 2 != 0: + raise ValueError("Degree sequence is not graphical.") + + if len(degree_sequence) == 0: + return SwitchGraph((), False) + + degseq = [[deg, i] for i, deg in enumerate(degree_sequence, start_id)] + degseq.sort(reverse=True) + + edges: List[Tuple[int, int]] = [] + try: + while len(edges) * 2 < sum(degree_sequence): + deg, x = degseq[0] + degseq[0][0] = 0 + if self_loop: + while deg > 1: + deg -= 2 + edges.append((x, x)) + if not repeated_edges: + break + y = 1 + while deg: + while deg and degseq[y][0]: + deg -= 1 + degseq[y][0] -= 1 + edges.append((x, degseq[y][1])) + if not repeated_edges: + break + y += 1 + degseq.sort(reverse=True) + except IndexError: + raise ValueError("Degree sequence is not graphical.") + + return SwitchGraph(edges, False) + def __iter__(self): return iter(self.__edges) From cc764bce76ead69790858ca086e07739b7385d63 Mon Sep 17 00:00:00 2001 From: weilycoder Date: Sun, 15 Dec 2024 19:21:10 +0800 Subject: [PATCH 4/6] Support for generating graphs randomly based on a degree sequence --- cyaron/graph.py | 140 +++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 114 insertions(+), 26 deletions(-) diff --git a/cyaron/graph.py b/cyaron/graph.py index 793da59..30ac63b 100644 --- a/cyaron/graph.py +++ b/cyaron/graph.py @@ -1,8 +1,9 @@ from .utils import * from .vector import Vector +import math import random import itertools -from typing import Callable, Dict, Iterable, List, Sequence, Tuple, TypeVar, Union, cast +from typing import * # type: ignore __all__ = ["Edge", "Graph", "SwitchGraph"] @@ -36,8 +37,8 @@ def unweighted_edge(edge): class SwitchGraph: - """A graph which can switch edges quickly - """ + """A graph which can switch edges quickly""" + directed: bool __edges: Dict[Tuple[int, int], int] @@ -48,6 +49,13 @@ def get_edges(self): ret.extend(itertools.repeat(k, self.__edges[k])) return sorted(ret) + def edge_count(self): + val = 0 + for k in self.__edges: + if k[0] <= k[1]: + val += self.__edges[k] + return val + def __insert(self, u: int, v: int): if (u, v) not in self.__edges: self.__edges[(u, v)] = 0 @@ -59,22 +67,20 @@ def __remove(self, u: int, v: int): self.__edges.pop((u, v)) def insert(self, u: int, v: int): - """Add edge (u, v) - """ + """Add edge (u, v)""" self.__insert(u, v) if not self.directed and u != v: self.__insert(v, u) def remove(self, u: int, v: int): - """Remove edge (u, v) - """ + """Remove edge (u, v)""" self.__remove(u, v) if not self.directed and u != v: self.__remove(v, u) - def __init__(self, - E: Iterable[Union[Edge, Tuple[int, int]]], - directed: bool = True): + def __init__( + self, E: Iterable[Union[Edge, Tuple[int, int]]], directed: bool = True + ): self.directed = directed self.__edges = {} for e in E: @@ -92,9 +98,9 @@ def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): Returns: If a switch was performed, then return True. If the switch was rejected, then return False. """ - first, second = random.choices(list(self.__edges.keys()), - list(self.__edges.values()), - k=2) + first, second = random.choices( + list(self.__edges.keys()), list(self.__edges.values()), k=2 + ) x1, y1 = first if self.directed else sorted(first) x2, y2 = second if self.directed else sorted(second) @@ -105,7 +111,7 @@ def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): if {x1, y1} & {x2, y2} != set(): return False - if repeated_edges: + if not repeated_edges: if (x1, y2) in self.__edges or (x2, y1) in self.__edges: return False @@ -118,11 +124,13 @@ def switch(self, *, self_loop: bool = False, repeated_edges: bool = False): @staticmethod def from_directed_degree_sequence( - degree_sequence: Sequence[Tuple[int, int]], - start_id: int = 1, - *, - self_loop: bool = False, - repeated_edges: bool = False) -> "SwitchGraph": + degree_sequence: Sequence[Tuple[int, int]], + start_id: int = 1, + *, + self_loop: bool = False, + repeated_edges: bool = False + ): + """Generate a directed graph greedily based on the degree sequence.""" if any(x < 0 or y < 0 for (x, y) in degree_sequence): raise ValueError("Degree sequence is not graphical.") @@ -135,8 +143,9 @@ def from_directed_degree_sequence( if len(degree_sequence) == 0: return ret - degseq = [[sout, sin, vn] - for vn, (sin, sout) in enumerate(degree_sequence, start_id)] + degseq = [ + [sout, sin, vn] for vn, (sin, sout) in enumerate(degree_sequence, start_id) + ] degseq.sort(reverse=True) try: @@ -165,11 +174,13 @@ def from_directed_degree_sequence( @staticmethod def from_undirected_degree_sequence( - degree_sequence: Sequence[int], - start_id: int = 1, - *, - self_loop: bool = False, - repeated_edges: bool = False) -> "SwitchGraph": + degree_sequence: Sequence[int], + start_id: int = 1, + *, + self_loop: bool = False, + repeated_edges: bool = False + ): + """Generate an undirected graph greedily based on the degree sequence.""" if any(x < 0 for x in degree_sequence): raise ValueError("Degree sequence is not graphical.") @@ -504,6 +515,58 @@ def graph(point_count, edge_count, **kwargs): i += 1 return graph + @staticmethod + def from_degree_sequence( + degree_sequence: Union[Sequence[Tuple[int, int]], Sequence[int]], + n_iter: Optional[int] = None, + *, + self_loop: bool = False, + repeated_edges: bool = False, + weight_limit: Union[int, Tuple[int, int]] = (1, 1), + weight_gen: Optional[Callable[[], int]] = None, + iter_limit: int = int(1e6) + ): + if len(degree_sequence) == 0: + return Graph(0) + if isinstance(weight_limit, int): + weight_limit = (1, weight_limit) + if weight_gen is None: + weight_gen = lambda: random.randint(*weight_limit) + if isinstance(degree_sequence[0], int): + directed = False + sg = SwitchGraph.from_undirected_degree_sequence( + cast(Sequence[int], degree_sequence), + self_loop=self_loop, + repeated_edges=repeated_edges, + ) + else: + directed = True + sg = SwitchGraph.from_directed_degree_sequence( + cast(Sequence[Tuple[int, int]], degree_sequence), + self_loop=self_loop, + repeated_edges=repeated_edges, + ) + point_cnt = len(degree_sequence) + edge_cnt = sg.edge_count() + if n_iter is None: + n_iter = int( + Graph._estimate_upperbound( + point_cnt, + edge_cnt, + directed, + self_loop, + repeated_edges, + ) + / math.log(edge_cnt) + ) + n_iter = min(n_iter, iter_limit) + for _ in range(n_iter): + sg.switch(self_loop=self_loop, repeated_edges=repeated_edges) + g = Graph(len(degree_sequence), directed) + for edge in sg.get_edges(): + g.add_edge(*edge, weight=weight_gen()) + return g + @staticmethod def DAG(point_count, edge_count, **kwargs): """DAG(point_count, edge_count, **kwargs) -> Graph @@ -713,6 +776,31 @@ def _calc_max_edge(point_count, directed, self_loop): max_edge += point_count return max_edge + @staticmethod + def _estimate_comb(n: int, k: int): + try: + return float(sum(math.log(n - i) - math.log(i + 1) for i in range(k))) + except ValueError: + return 0.0 + + @staticmethod + def _estimate_upperbound( + point_count: int, + edge_count: int, + directed: bool, + self_loop: bool, + repeated_edges: bool, + ): + tot_edge = point_count * (point_count - 1) + if not directed: + tot_edge //= 2 + if self_loop: + tot_edge += point_count + if repeated_edges: + return Graph._estimate_comb(edge_count + tot_edge - 1, edge_count) + else: + return Graph._estimate_comb(tot_edge, edge_count) + @staticmethod def forest(point_count, tree_count, **kwargs): """ From faf7b3f7cb3945e3413be1bf8d438782f7557baf Mon Sep 17 00:00:00 2001 From: weilycoder Date: Sun, 15 Dec 2024 22:05:10 +0800 Subject: [PATCH 5/6] Add test for undirected graph --- cyaron/graph.py | 5 +---- cyaron/tests/graph_test.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/cyaron/graph.py b/cyaron/graph.py index 30ac63b..b3de5c7 100644 --- a/cyaron/graph.py +++ b/cyaron/graph.py @@ -240,10 +240,7 @@ def edge_count(self): """edge_count(self) -> int Return the count of the edges in the graph. """ - cnt = sum(len(node) for node in self.edges) - if not self.directed: - cnt //= 2 - return cnt + return len(list(self.iterate_edges())) def to_matrix(self, **kwargs): """to_matrix(self, **kwargs) -> GraphMatrix diff --git a/cyaron/tests/graph_test.py b/cyaron/tests/graph_test.py index 55a63a5..e3e0bbe 100644 --- a/cyaron/tests/graph_test.py +++ b/cyaron/tests/graph_test.py @@ -225,3 +225,18 @@ def test_forest(self): if dsu.get_father(i) == i: count += 1 self.assertEqual(count, part_count) + + def test_from_undirected_degree_sequence(self): + get_deg_seq = lambda g: tuple(map(len, g.edges[1:])) + g1 = Graph.from_degree_sequence((2, 2, 1, 1, 1, 1)) + self.assertEqual(get_deg_seq(g1), (2, 2, 1, 1, 1, 1)) + for _ in range(8): + g0 = Graph.graph( + 100, 400, directed=False, self_loop=False, repeated_edges=False + ) + dsq = get_deg_seq(g0) + g1 = Graph.from_degree_sequence(dsq) + with self.assertRaises(ValueError): + Graph.from_degree_sequence((6, 6, 6)) + with self.assertRaises(ValueError): + Graph.from_degree_sequence((1, 1, 1)) From 2d89614f954f92c22210b9aebbe1f775a189df84 Mon Sep 17 00:00:00 2001 From: weilycoder Date: Mon, 16 Dec 2024 01:11:59 +0800 Subject: [PATCH 6/6] Add test for directed graph --- cyaron/tests/graph_test.py | 508 +++++++++++++++++++------------------ 1 file changed, 266 insertions(+), 242 deletions(-) diff --git a/cyaron/tests/graph_test.py b/cyaron/tests/graph_test.py index e3e0bbe..61ef470 100644 --- a/cyaron/tests/graph_test.py +++ b/cyaron/tests/graph_test.py @@ -1,242 +1,266 @@ -import unittest -from cyaron import Graph -from random import randint - - -class UnionFindSet: - - def __init__(self, size): - self.father = [0] + [i + 1 for i in range(size)] - - def get_father(self, node): - if self.father[node] == node: - return node - else: - self.father[node] = self.get_father(self.father[node]) - return self.father[node] - - def merge(self, l, r): - l = self.get_father(l) - r = self.get_father(r) - self.father[l] = r - - def test_same(self, l, r): - return self.get_father(l) == self.get_father(r) - - -def tarjan(graph, n): - - def new_array(len, val=0): - return [val for _ in range(len + 1)] - - instack = new_array(n, False) - low = new_array(n) - dfn = new_array(n, 0) - stap = new_array(n) - belong = new_array(n) - var = [0, 0, 0] # cnt, bc, stop - - # cnt = bc = stop = 0 - - def dfs(cur): - var[0] += 1 - dfn[cur] = low[cur] = var[0] - instack[cur] = True - stap[var[2]] = cur - var[2] += 1 - - for v in graph.edges[cur]: - if dfn[v.end] == 0: - dfs(v.end) - low[cur] = min(low[cur], low[v.end]) - elif instack[v.end]: - low[cur] = min(low[cur], dfn[v.end]) - - if dfn[cur] == low[cur]: - v = cur + 1 # set v != cur - var[1] += 1 - while v != cur: - var[2] -= 1 - v = stap[var[2]] - instack[v] = False - belong[v] = var[1] - - for i in range(n): - if dfn[i + 1] == 0: - dfs(i + 1) - - return belong - - -class TestGraph(unittest.TestCase): - - def test_self_loop(self): - graph_size = 20 - for _ in range(20): - graph = Graph.graph(graph_size, - int(graph_size * 2), - self_loop=True) - has_self_loop = max( - [e.start == e.end for e in graph.iterate_edges()]) - if has_self_loop: - break - self.assertTrue(has_self_loop) - - for _ in range(10): - graph = Graph.graph(graph_size, - int(graph_size * 2), - self_loop=False) - self.assertFalse( - max([e.start == e.end for e in graph.iterate_edges()])) - - def test_repeated_edges(self): - graph_size = 20 - for _ in range(20): - graph = Graph.graph(graph_size, - int(graph_size * 2), - repeated_edges=True) - edges = [(e.start, e.end) for e in graph.iterate_edges()] - has_repeated_edges = len(edges) > len(set(edges)) - if has_repeated_edges: - break - self.assertTrue(has_repeated_edges) - - for _ in range(10): - graph = Graph.graph(graph_size, - int(graph_size * 2), - repeated_edges=False) - edges = [(e.start, e.end) for e in graph.iterate_edges()] - self.assertEqual(len(edges), len(set(edges))) - - def test_tree_connected(self): - graph_size = 20 - for _ in range(20): - ufs = UnionFindSet(graph_size) - tree = Graph.tree(graph_size) - for edge in tree.iterate_edges(): - ufs.merge(edge.start, edge.end) - for i in range(graph_size - 1): - self.assertTrue(ufs.test_same(i + 1, i + 2)) - - def test_DAG(self): - graph_size = 20 - for _ in range(10): # test 10 times - ufs = UnionFindSet(graph_size) - graph = Graph.DAG(graph_size, - int(graph_size * 1.6), - repeated_edges=False, - self_loop=False, - loop=True) - - self.assertEqual(len(list(graph.iterate_edges())), - int(graph_size * 1.6)) - - for edge in graph.iterate_edges(): - ufs.merge(edge.start, edge.end) - for i in range(graph_size - 1): - self.assertTrue(ufs.test_same(i + 1, i + 2)) - - def test_DAG_without_loop(self): - graph_size = 20 - for _ in range(10): # test 10 times - ufs = UnionFindSet(graph_size) - graph = Graph.DAG(graph_size, - int(graph_size * 1.6), - repeated_edges=False, - self_loop=False, - loop=False) - - self.assertEqual(len(list(graph.iterate_edges())), - int(graph_size * 1.6)) - - for edge in graph.iterate_edges(): - ufs.merge(edge.start, edge.end) - for i in range(graph_size - 1): - self.assertTrue(ufs.test_same(i + 1, i + 2)) - - belong = tarjan(graph, graph_size) - self.assertEqual(max(belong), graph_size) - - def test_undirected_graph(self): - graph_size = 20 - for _ in range(10): # test 10 times - ufs = UnionFindSet(graph_size) - graph = Graph.UDAG(graph_size, - int(graph_size * 1.6), - repeated_edges=False, - self_loop=False) - - self.assertEqual(len(list(graph.iterate_edges())), - int(graph_size * 1.6)) - - for edge in graph.iterate_edges(): - ufs.merge(edge.start, edge.end) - for i in range(graph_size - 1): - self.assertTrue(ufs.test_same(i + 1, i + 2)) - - def test_DAG_boundary(self): - with self.assertRaises( - Exception, - msg= - "the number of edges of connected graph must more than the number of nodes - 1" - ): - Graph.DAG(8, 6) - Graph.DAG(8, 7) - - def test_GraphMatrix(self): - g = Graph(3, True) - edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1), - (1, 3, 3)] - for u, v, w in edge_set: - g.add_edge(u, v, weight=w) - self.assertEqual(str(g.to_matrix()), "-1 -1 3\n-1 -1 4\n-1 1 1") - self.assertEqual(str(g.to_matrix(default=0)), "0 0 3\n0 0 4\n0 1 1") - # lambda val, edge: edge.weight - gcd = lambda a, b: (gcd(b, a % b) if b else a) - lcm = lambda a, b: a * b // gcd(a, b) - merge1 = lambda v, e: v if v != -1 else e.weight - merge2 = lambda val, edge: max(edge.weight, val) - merge3 = lambda val, edge: min(edge.weight, val) - merge4 = lambda val, edge: gcd(val, edge.weight) - merge5 = lambda val, edge: lcm(val, edge.weight - ) if val else edge.weight - self.assertEqual(str(g.to_matrix(merge=merge1)), - "-1 -1 3\n-1 -1 3\n-1 1 1") - self.assertEqual(str(g.to_matrix(merge=merge2)), - "-1 -1 3\n-1 -1 7\n-1 1 1") - self.assertEqual(str(g.to_matrix(default=9, merge=merge3)), - "9 9 3\n9 9 3\n9 1 1") - self.assertEqual(str(g.to_matrix(default=0, merge=merge4)), - "0 0 3\n0 0 1\n0 1 1") - self.assertEqual(str(g.to_matrix(default=0, merge=merge5)), - "0 0 3\n0 0 84\n0 1 1") - - def test_forest(self): - for i in range(10): - size = randint(1, 100) - part_count = randint(1, size) - forest = Graph.forest(size, part_count) - dsu = UnionFindSet(size) - for edge in forest.iterate_edges(): - self.assertFalse(dsu.test_same(edge.start, edge.end)) - dsu.merge(edge.start, edge.end) - count = 0 - for i in range(1, size + 1): - if dsu.get_father(i) == i: - count += 1 - self.assertEqual(count, part_count) - - def test_from_undirected_degree_sequence(self): - get_deg_seq = lambda g: tuple(map(len, g.edges[1:])) - g1 = Graph.from_degree_sequence((2, 2, 1, 1, 1, 1)) - self.assertEqual(get_deg_seq(g1), (2, 2, 1, 1, 1, 1)) - for _ in range(8): - g0 = Graph.graph( - 100, 400, directed=False, self_loop=False, repeated_edges=False - ) - dsq = get_deg_seq(g0) - g1 = Graph.from_degree_sequence(dsq) - with self.assertRaises(ValueError): - Graph.from_degree_sequence((6, 6, 6)) - with self.assertRaises(ValueError): - Graph.from_degree_sequence((1, 1, 1)) +import unittest +from cyaron import Graph +from random import randint + + +class UnionFindSet: + + def __init__(self, size): + self.father = [0] + [i + 1 for i in range(size)] + + def get_father(self, node): + if self.father[node] == node: + return node + else: + self.father[node] = self.get_father(self.father[node]) + return self.father[node] + + def merge(self, l, r): + l = self.get_father(l) + r = self.get_father(r) + self.father[l] = r + + def test_same(self, l, r): + return self.get_father(l) == self.get_father(r) + + +def tarjan(graph, n): + + def new_array(len, val=0): + return [val for _ in range(len + 1)] + + instack = new_array(n, False) + low = new_array(n) + dfn = new_array(n, 0) + stap = new_array(n) + belong = new_array(n) + var = [0, 0, 0] # cnt, bc, stop + + # cnt = bc = stop = 0 + + def dfs(cur): + var[0] += 1 + dfn[cur] = low[cur] = var[0] + instack[cur] = True + stap[var[2]] = cur + var[2] += 1 + + for v in graph.edges[cur]: + if dfn[v.end] == 0: + dfs(v.end) + low[cur] = min(low[cur], low[v.end]) + elif instack[v.end]: + low[cur] = min(low[cur], dfn[v.end]) + + if dfn[cur] == low[cur]: + v = cur + 1 # set v != cur + var[1] += 1 + while v != cur: + var[2] -= 1 + v = stap[var[2]] + instack[v] = False + belong[v] = var[1] + + for i in range(n): + if dfn[i + 1] == 0: + dfs(i + 1) + + return belong + + +class TestGraph(unittest.TestCase): + + def test_self_loop(self): + graph_size = 20 + for _ in range(20): + graph = Graph.graph(graph_size, + int(graph_size * 2), + self_loop=True) + has_self_loop = max( + [e.start == e.end for e in graph.iterate_edges()]) + if has_self_loop: + break + self.assertTrue(has_self_loop) + + for _ in range(10): + graph = Graph.graph(graph_size, + int(graph_size * 2), + self_loop=False) + self.assertFalse( + max([e.start == e.end for e in graph.iterate_edges()])) + + def test_repeated_edges(self): + graph_size = 20 + for _ in range(20): + graph = Graph.graph(graph_size, + int(graph_size * 2), + repeated_edges=True) + edges = [(e.start, e.end) for e in graph.iterate_edges()] + has_repeated_edges = len(edges) > len(set(edges)) + if has_repeated_edges: + break + self.assertTrue(has_repeated_edges) + + for _ in range(10): + graph = Graph.graph(graph_size, + int(graph_size * 2), + repeated_edges=False) + edges = [(e.start, e.end) for e in graph.iterate_edges()] + self.assertEqual(len(edges), len(set(edges))) + + def test_tree_connected(self): + graph_size = 20 + for _ in range(20): + ufs = UnionFindSet(graph_size) + tree = Graph.tree(graph_size) + for edge in tree.iterate_edges(): + ufs.merge(edge.start, edge.end) + for i in range(graph_size - 1): + self.assertTrue(ufs.test_same(i + 1, i + 2)) + + def test_DAG(self): + graph_size = 20 + for _ in range(10): # test 10 times + ufs = UnionFindSet(graph_size) + graph = Graph.DAG(graph_size, + int(graph_size * 1.6), + repeated_edges=False, + self_loop=False, + loop=True) + + self.assertEqual(len(list(graph.iterate_edges())), + int(graph_size * 1.6)) + + for edge in graph.iterate_edges(): + ufs.merge(edge.start, edge.end) + for i in range(graph_size - 1): + self.assertTrue(ufs.test_same(i + 1, i + 2)) + + def test_DAG_without_loop(self): + graph_size = 20 + for _ in range(10): # test 10 times + ufs = UnionFindSet(graph_size) + graph = Graph.DAG(graph_size, + int(graph_size * 1.6), + repeated_edges=False, + self_loop=False, + loop=False) + + self.assertEqual(len(list(graph.iterate_edges())), + int(graph_size * 1.6)) + + for edge in graph.iterate_edges(): + ufs.merge(edge.start, edge.end) + for i in range(graph_size - 1): + self.assertTrue(ufs.test_same(i + 1, i + 2)) + + belong = tarjan(graph, graph_size) + self.assertEqual(max(belong), graph_size) + + def test_undirected_graph(self): + graph_size = 20 + for _ in range(10): # test 10 times + ufs = UnionFindSet(graph_size) + graph = Graph.UDAG(graph_size, + int(graph_size * 1.6), + repeated_edges=False, + self_loop=False) + + self.assertEqual(len(list(graph.iterate_edges())), + int(graph_size * 1.6)) + + for edge in graph.iterate_edges(): + ufs.merge(edge.start, edge.end) + for i in range(graph_size - 1): + self.assertTrue(ufs.test_same(i + 1, i + 2)) + + def test_DAG_boundary(self): + with self.assertRaises( + Exception, + msg= + "the number of edges of connected graph must more than the number of nodes - 1" + ): + Graph.DAG(8, 6) + Graph.DAG(8, 7) + + def test_GraphMatrix(self): + g = Graph(3, True) + edge_set = [(2, 3, 3), (3, 3, 1), (2, 3, 7), (2, 3, 4), (3, 2, 1), + (1, 3, 3)] + for u, v, w in edge_set: + g.add_edge(u, v, weight=w) + self.assertEqual(str(g.to_matrix()), "-1 -1 3\n-1 -1 4\n-1 1 1") + self.assertEqual(str(g.to_matrix(default=0)), "0 0 3\n0 0 4\n0 1 1") + # lambda val, edge: edge.weight + gcd = lambda a, b: (gcd(b, a % b) if b else a) + lcm = lambda a, b: a * b // gcd(a, b) + merge1 = lambda v, e: v if v != -1 else e.weight + merge2 = lambda val, edge: max(edge.weight, val) + merge3 = lambda val, edge: min(edge.weight, val) + merge4 = lambda val, edge: gcd(val, edge.weight) + merge5 = lambda val, edge: lcm(val, edge.weight + ) if val else edge.weight + self.assertEqual(str(g.to_matrix(merge=merge1)), + "-1 -1 3\n-1 -1 3\n-1 1 1") + self.assertEqual(str(g.to_matrix(merge=merge2)), + "-1 -1 3\n-1 -1 7\n-1 1 1") + self.assertEqual(str(g.to_matrix(default=9, merge=merge3)), + "9 9 3\n9 9 3\n9 1 1") + self.assertEqual(str(g.to_matrix(default=0, merge=merge4)), + "0 0 3\n0 0 1\n0 1 1") + self.assertEqual(str(g.to_matrix(default=0, merge=merge5)), + "0 0 3\n0 0 84\n0 1 1") + + def test_forest(self): + for i in range(10): + size = randint(1, 100) + part_count = randint(1, size) + forest = Graph.forest(size, part_count) + dsu = UnionFindSet(size) + for edge in forest.iterate_edges(): + self.assertFalse(dsu.test_same(edge.start, edge.end)) + dsu.merge(edge.start, edge.end) + count = 0 + for i in range(1, size + 1): + if dsu.get_father(i) == i: + count += 1 + self.assertEqual(count, part_count) + + def test_from_undirected_degree_sequence(self): + get_deg_seq = lambda g: tuple(map(len, g.edges[1:])) + g1 = Graph.from_degree_sequence((2, 2, 1, 1, 1, 1)) + self.assertEqual(get_deg_seq(g1), (2, 2, 1, 1, 1, 1)) + for _ in range(8): + g0 = Graph.graph( + 100, 400, directed=False, self_loop=False, repeated_edges=False + ) + dsq = get_deg_seq(g0) + g1 = Graph.from_degree_sequence(dsq) + self.assertEqual(get_deg_seq(g1), dsq) + with self.assertRaises(ValueError): + Graph.from_degree_sequence((6, 6, 6)) + with self.assertRaises(ValueError): + Graph.from_degree_sequence((1, 1, 1)) + + def test_from_directed_degree_sequence(self): + def get_deg_seq(g: Graph): + cnt = len(g.edges) - 1 + indeg, outdeg = [0] * cnt, [0] * cnt + for edge in g.iterate_edges(): + indeg[edge.end - 1] += 1 + outdeg[edge.start - 1] += 1 + return tuple(zip(indeg, outdeg)) + + g1 = Graph.from_degree_sequence(((1, 2), (1, 1), (1, 0))) + self.assertEqual(get_deg_seq(g1), ((1, 2), (1, 1), (1, 0))) + + for _ in range(8): + g0 = Graph.graph( + 100, 400, directed=True, self_loop=False, repeated_edges=False + ) + dsq = get_deg_seq(g0) + g1 = Graph.from_degree_sequence(dsq) + self.assertEqual(get_deg_seq(g1), dsq) + + with self.assertRaises(ValueError): + Graph.from_degree_sequence(((2, 1), (0, 1)))