-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
308 lines (263 loc) · 10.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
from itertools import combinations
import pandas as pd
import networkx as nx
import random
import scipy.special as sp
import numpy as np
import os
from scipy.stats import bernoulli
import warnings
def powerset(items):
"""computes power set of iterable object items
Args:
items: iterables object, e.g. list, array
Returns:
power set of items
"""
combo = []
for r in range(len(items) + 1):
# use a list to coerce an actual list from the combinations generator
combo.append(list(combinations(items, r)))
return combo
def d_separation(g, y, g2=None, mc=None, random_state=None):
"""Test d-separation of each single node and y given every possible conditioning set in graph g
Args:
g : nx.DiGraph
y : target node with respect to which d-separation is tested
g2: potential second graph to be tested, that must contain the same nodes as g (typically an estimate of g)
mc : if int given, mc sampling a subset of d-separations to be tested, recommended for large graphs
random_state : seed for random and np.random when mc is not None
Returns:
if mc is None:
pandas.DataFrame of Bools for d-separation for every node except y
if mc is not None:
Array of Bools for d-separation; it cannot be traced back which d-separations were tested explicitly
"""
# list of nodes (strings)
predictors = list(g.nodes)
# sort list to get consistent results across different graphs learned on same features (when using mc)
predictors.sort()
# remove the target from list of predictors
predictors.remove(y)
n = len(predictors)
# number of possible d-separations between one feature and target, i.e. number of potential conditioning sets
if mc is None:
# number of potential d-separations per node
no_d_seps = (2 ** (n-1))
# warn user if number of d-separation tests is large
if no_d_seps > 1000000:
warnings.warn("Warning: No. of d-separation tests per node > 1M, can lead to large runtime")
# initiate dataframe for all d-separations
d_separations = pd.DataFrame(index=predictors, columns=range(no_d_seps))
if mc is not None:
if random_state is not None:
random.seed(random_state)
np.random.seed(random_state)
rng = np.random.default_rng(seed=random_state)
else:
rng = np.random.default_rng()
# initiate vector to store True/False for d-separations (cannot track nodes and conditioning sets)
d_seps_bool = []
if g2 is not None:
d_seps_bool_2 = []
# get a vector of probabilities to draw the size of the conditioning set; note that for n-1 potential
# deconfounders there are n different sizes of conditioning sets because of the empty set
probs = []
for jj in range(n):
probs.append((sp.comb(n-1, jj)) / (2**(n-1)))
k = 0
while k < mc:
# draw index for feature of interest
ind = random.randint(0, n-1)
# retrieve feature of interest
node = predictors[ind]
# list of all features but feature of interest
deconfounders = list(g.nodes)
deconfounders.remove(y)
deconfounders.remove(node)
# sample a conditioning set from deconfounders
# draw a cardinality
card = np.random.choice(np.arange(n), p=probs)
if card == 0:
# test d-separation with empty set as conditioning set
cond_set = set()
d_seps_bool.append(nx.d_separated(g, {node}, {y}, cond_set))
if g2 is not None:
d_seps_bool_2.append(nx.d_separated(g2, {node}, {y}, cond_set))
else:
# draw as many as 'card' numbers from range(n-1) as indices for conditioning set
indices = rng.choice(n-1, size=card, replace=False)
cond_set = set()
for ii in range(len(indices)):
# index for first
index = indices[ii]
cond_set.add(deconfounders[index])
d_seps_bool.append(nx.d_separated(g, {node}, {y}, cond_set))
if g2 is not None:
d_seps_bool_2.append(nx.d_separated(g2, {node}, {y}, cond_set))
k += 1
if g2 is not None:
return d_seps_bool, d_seps_bool_2
else:
return d_seps_bool # vector of booleans
else:
if g2 is not None:
print("Note: g2 only for Monte Carlo inference, will not be regarded here")
for i in range(n):
# test d-separation w.r.t. target using all conditional sets possible
# for current predictor at i-th position in predictors
node = predictors[i]
deconfounders = list(g.nodes)
deconfounders.remove(y)
deconfounders.remove(node)
power_set = powerset(deconfounders)
j = 0
while j < no_d_seps:
for k in range(len(power_set)):
# k == 0 refers to the empty set in power_set
if k == 0:
cond_set = set()
d_separations.iloc[i, j] = nx.d_separated(g, {node}, {y}, cond_set)
j += 1
else:
for jj in range(len(power_set[k])):
cond_set = {power_set[k][jj][0]}
for m in range(len(power_set[k][jj]) - 1):
cond_set.add(power_set[k][jj][m + 1])
d_separations.iloc[i, j] = nx.d_separated(
g, {node}, {y}, cond_set
)
j += 1
return d_separations # df with Boolean indicators of d-separation for every predictor
# compute the number of d-separation statements from n
def dsep_mb(n, mb):
"""computes a lower bound for the number of d-separation statements between a target node
and every other node in the graph given the size of the target node's Markov blanket
Args:
n: number of nodes in graph
mb: size of Markov blanket of target
Returns:
Lower bound for number of d-separations
"""
# number of nodes other than y and Markov blanket: n-1-mb
# number of d-separations for such nodes 2**(n-2-mb)
return (n - 1 - mb) * 2 ** (n - 2 - mb)
def dsep_degree(n, max_degree, sink=False):
"""computes a lower bound for the number of d-separation statements between a target node
and every other node in the graph given the max degree of a node
Args:
n : number of nodes in graph
max_degree : maximal degree of a node in the graph (max. number of edges associated to a node
sink : Bool, whether target is sink node or not
Returns:
Lower bound for number of d-separations
"""
if sink is False:
# maximal size of Markov blanket
max_mb = max_degree + max_degree ** 2
return dsep_mb(n, max_mb)
else:
max_mb = max_degree
return dsep_mb(n, max_mb)
def potential_dseps(n):
"""For a graph of size n, return the maximal number of d-separations between each node and a potentially
dedicated target node
Args:
n: number of nodes in graph
Return:
Number of potential d-separation statements (float)
"""
return (n - 1) * (2 ** (n - 2))
def create_folder(directory):
"""Creates directory as specified in function argument if not already existing
Args:
directory: string specifying the directory
"""
try:
if not os.path.exists(directory):
os.makedirs(directory)
except OSError:
print("Error: Creating directory: " + directory)
def create_amat(n, p):
"""Create a random adjacency matrix of size n x n
Args:
n: number of nodes
p: probability of existence of an edge for each pair of nodes
Returns:
Adjacency matrix as pd.DataFrame
"""
# create col_names
variables = []
for i in range(n):
variables.append(str(i+1))
# create df for amat
amat = pd.DataFrame(columns=variables, index=variables)
for j in range(n):
for k in range(n):
amat.iloc[j, k] = bernoulli(p)
def exact(g_true, g_est, y):
# the two dataframes of d-separations
true_dseps = d_separation(g_true, y)
est_dseps = d_separation(g_est, y)
# now compare every entry
tp = 0
tn = 0
fp = 0
fn = 0
for i in range(true_dseps.shape[0]):
for j in range(true_dseps.shape[1]):
if true_dseps.iloc[i, j] == est_dseps.iloc[i, j]:
if true_dseps.iloc[i, j] is True:
tp += 1
else:
tn += 1
else:
if true_dseps.iloc[i, j] is True:
fn += 1
else:
fp += 1
# total number of d-separations in true graph
d_separated_total = tp + fn
d_connected_total = tn + fp
return tp, tn, fp, fn, d_separated_total, d_connected_total
def approx(g_true, g_est, y, mc=None, rand_state=None):
true_dseps, est_dseps = d_separation(g_true, y, g2=g_est, mc=mc, random_state=rand_state)
# now compare every entry
tp = 0
tn = 0
fp = 0
fn = 0
for i in range(len(true_dseps)):
if true_dseps[i] == est_dseps[i]:
if true_dseps[i] is True:
tp += 1
else:
tn += 1
else:
if true_dseps[i] is True:
fn += 1
else:
fp += 1
# total number of d-separation among tested nodes (make a node if d-separations were approximated via mc)
d_separated_total = tp + fn
d_connected_total = tn + fp
return tp, tn, fp, fn, d_separated_total, d_connected_total
def convert_amat(df, col_names=False):
"""Convert adjacency matrix of Bools to 0-1 for use in networkx package
Args:
df: adjacency matrix
col_names: toggle overriding of column names with strings starting from "1"
"""
if col_names:
col_names_str = []
for k in range(len(df.columns)):
col_names_str.append(str(k+1))
df.columns = col_names_str
mapping_rf = {False: 0, True: 1}
col_names = df.columns
for j in col_names:
df[j] = df.replace({j: mapping_rf})[j] # TODO (cl) [j] required in the end?
# modify adjacency matrix for use in networkx package
df = df.set_axis(df.columns, axis=0)
# store modified adjacency matrix
return df