-
Notifications
You must be signed in to change notification settings - Fork 0
/
LFR_gen.py
139 lines (111 loc) · 4.55 KB
/
LFR_gen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import networkx as nx
import time
import Parameters
from Parameters import *
import Display_graph
import Run_test
SEED = 2 # Root seed used for RNG. Modify if running multiple times with the same set of parameters.
MAXTRIES = 10 # Maximum tries to generate a graph using different seeds
RECREATE_FILES = False # Whether or not to overwrite existing networks with the exact same parameters
RUN_TESTS = False # Set to True to automatically run 'Run_test.py' after the graph is generated
DISPLAY_GRAPH = False # Set to True to automatically run 'Display_graph.py' after the graph is generated
# Simple xorshift for RNG
def xorshift(seed):
x = seed
x ^= x << 13
x ^= x >> 17
x ^= x << 5
return x
# Generate one network with the specified parameters
# Returns True if it ran and False if not (if RECREATE_FILES is True the file already existed)
def generate_LFR(runNum, n, name, params, seed):
# Set the path and file name
path = os.path.join("Graphs", "LFR", name)
name = "LFR_" + name + "_" + str(n) + "_" + str(runNum)
os.makedirs(path, exist_ok = True)
# Verify if results file already exists
if os.path.isfile(os.path.join(path, name + ".txt")) and not RECREATE_FILES:
return False
params["seed"] = seed
print(runNum, n, name, seed)
start = time.time()
# Generate the graph
G = nx.LFR_benchmark_graph(n, **params)
end = time.time()
# Remove self loops
#G.remove_edges_from(nx.selfloop_edges(G))
# Return total number of communities in G
communities = list({frozenset(G.nodes[v]["community"]) for v in G})
print("Number of communities:", len(communities))
# Save the communities in a list as pairs of (node, community)
community_labels = []
for i in range(len(communities)):
for v in communities[i]:
community_labels.append((v,i))
community_labels.sort() # might not be necessary but is nice for layout.
# Export paramters and time
with open(os.path.join(path, name + "_params.txt"), "w") as file:
file.write("n: " + str(n) + "\n")
for key, value in params.items():
if key != "seed":
file.write(key + ": " + str(value) + "\n")
file.write("seed: " + str(seed) + "\n")
file.write("Number of communities: " + str(len(communities)) + "\n")
file.write("Time taken: " + str(round(end - start, 2)) + "s")
# Export graph to .txt file
nx.write_edgelist(G, os.path.join(path, name + ".txt"), data = False)
# Write the community labels to a file
labelsFile = os.path.join(path, name + "_labels.txt")
with open(labelsFile, "w") as file:
for v in community_labels:
file.write(str(v[0]) + " " + str(v[1]) + "\n")
# Write the communities to each line
with open(os.path.join(path, name + "_cmty.txt"), "w") as file:
for community in communities:
for node in community:
file.write(str(node) + " ")
file.write("\n")
# Automatically display network
if DISPLAY_GRAPH:
Display_graph.display_graph(os.path.join(path, name + ".txt"))
# Automatically run tests
if RUN_TESTS:
Run_test.run_test(name, path, labelsFile)
return True
#Start timer
totalTime = time.time()
totalFails = 0
lastError = None
for i in range(NUM_SAMPLES):
genSeed = (i+1) * SEED
for size in SIZES:
for name, params in Parameters.params.items():
#sometimes it works, sometimes it does not. Seems very dependent of the seed
tries = 0
seed = genSeed
ran = False
#Start timer
start = time.time()
while tries < MAXTRIES:
try:
ran = generate_LFR(i+1, size, name, params, seed)
tries = MAXTRIES+1
except Exception as e:
seed = xorshift(seed)
tries += 1
lastError = e
continue
except KeyboardInterrupt:
seed = xorshift(seed)
tries += 1
continue
# End timer and display time
if ran:
end = time.time()
print("Time taken:" + str(round(end - start, 2)) + "s\n")
if tries == MAXTRIES:
print("Could not generate", i+1, size, name, params)
print("ERROR: '" + str(lastError) + "'")
totalFails += 1
print("Done. Total time:", str(round(time.time() - totalTime, 2)), "s - Total fails:", totalFails)