Skip to content

Commit

Permalink
Functions for correlation matrix, copula indicator and plotting (#103)
Browse files Browse the repository at this point in the history
* initial commit for correlation

* initial commit for correlation

* initial commit for correlation

* fixed correlation plot

* added filtering based on copula indicator

* Filtering of correlated reactions based on copula

* fix copula indicator

* added unittest and fixed heatmap

* fixed plots of corr matrix

* new method for additional reaction removal

* documentation correction

* documentation correction

* corr matrix pearson fix

* correction to correlation labels output

* correction to correlation labels output symbol

* added verbose boolean variable
  • Loading branch information
SotirisTouliopoulos authored Aug 15, 2024
1 parent 0c222c4 commit 4a961e7
Show file tree
Hide file tree
Showing 5 changed files with 323 additions and 58 deletions.
47 changes: 47 additions & 0 deletions dingo/illustrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.io as pio
import plotly.express as px
from dingo.utils import compute_copula

def plot_copula(data_flux1, data_flux2, n = 5, width = 900 , height = 600, export_format = "svg"):
Expand Down Expand Up @@ -82,3 +83,49 @@ def plot_histogram(reaction_fluxes, reaction, n_bins=40):
plt.axis([np.amin(reaction_fluxes), np.amax(reaction_fluxes), 0, np.amax(n) * 1.2])

plt.show()



def plot_corr_matrix(corr_matrix, reactions, removed_reactions=[], format="svg"):
"""A Python function to plot the heatmap of a model's pearson correlation matrix.
Keyword arguments:
corr_matrix -- A matrix produced from the "correlated_reactions" function
reactions -- A list with the model's reactions
removed_reactions -- A list with the removed reactions in case of a preprocess.
If provided removed reactions are not plotted.
"""

sns_colormap = [[0.0, '#3f7f93'],
[0.1, '#6397a7'],
[0.2, '#88b1bd'],
[0.3, '#acc9d2'],
[0.4, '#d1e2e7'],
[0.5, '#f2f2f2'],
[0.6, '#f6cdd0'],
[0.7, '#efa8ad'],
[0.8, '#e8848b'],
[0.9, '#e15e68'],
[1.0, '#da3b46']]

if removed_reactions != 0:
for reaction in reactions:
index = reactions.index(reaction)
if reaction in removed_reactions:
reactions[index] = None

fig = px.imshow(corr_matrix,
color_continuous_scale = sns_colormap,
x = reactions, y = reactions, origin="upper")

fig.update_layout(
xaxis=dict(tickfont=dict(size=5)),
yaxis=dict(tickfont=dict(size=5)),
width=900, height=900, plot_bgcolor="rgba(0,0,0,0)")

fig.update_traces(xgap=1, ygap=1, hoverongaps=False)

fig.show()

fig_name = "CorrelationMatrix." + format
pio.write_image(fig, fig_name, scale=2)
133 changes: 79 additions & 54 deletions dingo/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,39 @@
import cobra
import cobra.manipulation
from collections import Counter
from dingo import MetabolicNetwork
from dingo import MetabolicNetwork, PolytopeSampler
from dingo.utils import correlated_reactions
import numpy as np


class PreProcess:

def __init__(self, model, tol=1e-6, open_exchanges=False):
def __init__(self, model, tol = 1e-6, open_exchanges = False, verbose = False):

"""
model parameter gets a cobra model as input
model -- parameter gets a cobra model as input
tol parameter gets a cutoff value used to classify
zero-flux and mle reactions and compare FBA solutions
before and after reactions removal
tol -- parameter gets a cutoff value used to classify
zero-flux and mle reactions and compare FBA solutions
before and after reactions removal
open_exchanges parameter is used in the function that identifies blocked reactions
It controls whether or not to open all exchange reactions to very high flux ranges
open_exchanges -- parameter is used in the function that identifies blocked reactions
It controls whether or not to open all exchange reactions
to very high flux ranges.
verbose -- A boolean type variable that if True
additional information for preprocess is printed.
"""

self._model = model
self._tol = tol

if self._tol > 1e-6:
print("Tolerance value set to",self._tol,"while default value is 1e-6. A looser check will be performed")


self._open_exchanges = open_exchanges
self._verbose = verbose

if self._tol > 1e-6 and verbose == True:
print("Tolerance value set to",self._tol,"while default value is 1e-6. A looser check will be performed")

self._objective = self._objective_function()
self._initial_reactions = self._initial()
self._reaction_bounds_dict = self._reaction_bounds_dictionary()
Expand Down Expand Up @@ -170,10 +177,11 @@ def reduce(self, extend=False):
When the "extend" parameter is set to True, the function performes
an additional check to remove further reactions. These reactions
are the ones that if knocked-down, they do not affect the value
of the objective function. These reactions are removed simultaneously
from the model. If this simultaneous removal produces an infesible
solution (or a solution of 0) to the objective function,
these reactions are restored with their initial bounds.
of the objective function. Reactions are removed in an ordered way.
The ones with the least overall correlation in a correlation matrix
are removed first. These reactions are removed one by one from the model.
If this removal produces an infeasible solution (or a solution of 0)
to the objective function, these reactions are restored to their initial bounds.
A dingo-type tuple is then created from the cobra model
using the "cobra_dingo_tuple" function.
Expand Down Expand Up @@ -201,58 +209,75 @@ def reduce(self, extend=False):

elif extend == False:

print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)
if self._verbose == True:
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)

# call this functon to convert cobra to dingo model
self._dingo_model = MetabolicNetwork.from_cobra_model(self._model)
return self._removed_reactions, self._dingo_model

elif extend == True:

# find additional reactions with a possibility of removal
additional_removed_reactions_list = []
additional_removed_reactions_count = 0
for reaction in remained_reactions:

fba_solution_before = self._model.optimize().objective_value
reduced_dingo_model = MetabolicNetwork.from_cobra_model(self._model)
reactions = reduced_dingo_model.reactions
sampler = PolytopeSampler(reduced_dingo_model)
steady_states = sampler.generate_steady_states()

# calculate correlation matrix with additional filtering from copula indicator
corr_matrix = correlated_reactions(
steady_states,
pearson_cutoff = 0,
indicator_cutoff = 0,
cells = 10,
cop_coeff = 0.3,
lower_triangle = False)

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

fba_solution_after = self._model.optimize().objective_value
# convert pearson values to absolute values
abs_array = abs(corr_matrix)
# sum absolute pearson values per row
sum_array = np.sum((abs_array), axis=1)
# get indices of ordered sum values
order_sum_indices = np.argsort(sum_array)

if fba_solution_after != None:
if (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_list.append(reaction)
additional_removed_reactions_count += 1

self._model.reactions.get_by_id(reaction).upper_bound = self._reaction_bounds_dict[reaction][1]
self._model.reactions.get_by_id(reaction).lower_bound = self._reaction_bounds_dict[reaction][0]
fba_solution_before = self._model.optimize().objective_value

# count additional reactions with a possibility of removal
additional_removed_reactions_count = 0

# compare FBA solution before and after the removal of additional reactions
fba_solution_initial = self._model.optimize().objective_value
self._remove_model_reactions()
fba_solution_final = self._model.optimize().objective_value


# if FBA solution after removal is infeasible or altered
# restore the initial reactions bounds
if (fba_solution_final == None) | (abs(fba_solution_final - fba_solution_initial) > tol):
for reaction in additional_removed_reactions_list:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]
self._removed_reactions.remove(reaction)
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)
# find additional reactions with a possibility of removal
for index in order_sum_indices:
if reactions[index] in remained_reactions:
reaction = reactions[index]

# perform a knock-out and check the output
self._model.reactions.get_by_id(reaction).lower_bound = 0
self._model.reactions.get_by_id(reaction).upper_bound = 0

try:
fba_solution_after = self._model.optimize().objective_value
if (abs(fba_solution_after - fba_solution_before) > tol):
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

# if system has no solution
except:
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

finally:
if (fba_solution_after != None) and (abs(fba_solution_after - fba_solution_before) < tol):
self._removed_reactions.append(reaction)
additional_removed_reactions_count += 1
else:
# restore bounds
self._model.reactions.get_by_id(reaction).bounds = self._reaction_bounds_dict[reaction]

else:

if self._verbose == True:
print(len(self._removed_reactions), "of the", len(self._initial_reactions), \
"reactions were removed from the model with extend set to", extend)


print(additional_removed_reactions_count, "additional reaction(s) removed")
# call this functon to convert cobra to dingo model
self._dingo_model = MetabolicNetwork.from_cobra_model(self._model)
return self._removed_reactions, self._dingo_model
140 changes: 140 additions & 0 deletions dingo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,143 @@ def get_matrices_of_full_dim_polytope(A, b, Aeq, beq):
print("gmscale failed to compute a good scaling.")

return A, b, N, N_shift



def correlated_reactions(steady_states, reactions=[], pearson_cutoff = 0.90, indicator_cutoff = 10,
cells = 10, cop_coeff = 0.3, lower_triangle = True, verbose = False):
"""A Python function to calculate the pearson correlation matrix of a model
and filter values based on the copula's indicator
Keyword arguments:
steady_states -- A numpy array of the generated steady states fluxes
reactions -- A list with the model's reactions
pearson_cutoff -- A cutoff to filter reactions based on pearson coefficient
indicator_cutoff -- A cutoff to filter reactions based on indicator value
cells -- Number of cells to compute the copula
cop_coeff -- A value that narrows or widens the width of the copula's diagonal
lower_triangle -- A boolean variable that if True plots only the lower triangular matrix
verbose -- A boolean variable that if True additional information is printed as an output.
"""

if cop_coeff > 0.4 or cop_coeff < 0.2:
raise Exception("Input value to cop_coeff parameter must be between 0.2 and 0.4")

# calculate coefficients to access red and blue copula mass
cop_coeff_1 = cop_coeff
cop_coeff_2 = 1 - cop_coeff
cop_coeff_3 = 1 + cop_coeff

# compute correlation matrix
corr_matrix = np.corrcoef(steady_states, rowvar=True)

# replace not assigned values with 0
corr_matrix[np.isnan(corr_matrix)] = 0

# create a copy of correlation matrix to replace/filter values
filtered_corr_matrix = corr_matrix.copy()

# find indices of correlation matrix where correlation does not occur
no_corr_indices = np.argwhere((filtered_corr_matrix < pearson_cutoff) & (filtered_corr_matrix > -pearson_cutoff))

# replace values from the correlation matrix that do not overcome
# the pearson cutoff with 0
for i in range(0, no_corr_indices.shape[0]):
index1 = no_corr_indices[i][0]
index2 = no_corr_indices[i][1]

filtered_corr_matrix[index1, index2] = 0

# if user does not provide an indicator cutoff then do not proceed
# with the filtering of the correlation matrix
if indicator_cutoff == 0:
if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix
else:
# a dictionary that will store for each filtered reaction combination,
# the pearson correlation value, the copula's indicator value
# and the correlation classification
indicator_dict = {}

# keep only the lower triangle
corr_matrix = np.tril(corr_matrix)
# replace diagonal values with 0
np.fill_diagonal(corr_matrix, 0)

# find indices of correlation matrix where correlation occurs
corr_indices = np.argwhere((corr_matrix > pearson_cutoff) | (corr_matrix < -pearson_cutoff))

# compute copula for each set of correlated reactions
for i in range(0, corr_indices.shape[0]):

index1 = corr_indices[i][0]
index2 = corr_indices[i][1]

reaction1 = reactions[index1]
reaction2 = reactions[index2]

flux1 = steady_states[index1]
flux2 = steady_states[index2]

copula = compute_copula(flux1, flux2, cells)
rows, cols = copula.shape

red_mass = 0
blue_mass = 0
indicator = 0

for row in range(rows):
for col in range(cols):
# values in the diagonal
if ((row-col >= -cop_coeff_1*rows) & (row-col <= cop_coeff_1*rows)):
# values near the top left and bottom right corner
if ((row+col < cop_coeff_2*rows) | (row+col > cop_coeff_3*rows)):
red_mass = red_mass + copula[row][col]
else:
# values near the top right and bottom left corner
if ((row+col >= cop_coeff_2*rows-1) & (row+col <= cop_coeff_3*rows-1)):
blue_mass = blue_mass + copula[row][col]

indicator = (red_mass+1e-9) / (blue_mass+1e-9)

# classify specific pair of reactions as positive or negative
# correlated based on indicator cutoff
if indicator > indicator_cutoff:
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "positive"}

elif indicator < 1/indicator_cutoff:
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "negative"}

# if they do not overcome the cutoff replace their corresponding
# value in the correlation matrix with 0
else:
filtered_corr_matrix[index1, index2] = 0
filtered_corr_matrix[index2, index1] = 0
pearson = filtered_corr_matrix[index1, index2]
indicator_dict[reaction1 + "~" + reaction2] = {'pearson': pearson,
'indicator': indicator,
'classification': "no correlation"}

if verbose == True:
print("Completed process of",i+1,"from",corr_indices.shape[0],"copulas")

if lower_triangle == True:
filtered_corr_matrix[np.triu_indices(filtered_corr_matrix.shape[0], 1)] = np.nan
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix, indicator_dict

else:
np.fill_diagonal(filtered_corr_matrix, 1)
return filtered_corr_matrix, indicator_dict

Loading

0 comments on commit 4a961e7

Please sign in to comment.