-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathvis_kmer_distributions.py
executable file
·158 lines (124 loc) · 5.05 KB
/
vis_kmer_distributions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/env python
"""A collection of functions for visualizing kmer distributions
"""
import os
import sys
sys.path.append("../")
#from signalAlignLib import TemplateModel, ComplementModel
import string
import numpy as np
from sklearn.neighbors import KernelDensity
from scipy.stats import gaussian_kde
from scipy.stats import norm
def nucleotideToIndex(base):
if base == 'A':
return 0
if base == 'C':
return 1
if base == 'E':
return 2
if base == 'G':
return 3
if base == 'O':
return 4
if base == 'T':
return 5
if base == 'N':
return 6
def getKmerIndex(kmer):
"""This is the algorithm for finding the rank (index) of a kmer)
"""
alphabet = "ACEGOT"
axisLength = len(alphabet)**len(kmer)
l = axisLength/len(alphabet)
i = 0
index = 0
while l > 1:
index += l*nucleotideToIndex(kmer[i])
i += 1
l = l/len(alphabet)
index += nucleotideToIndex(kmer[-1])
return int(index)
class KmerDistribution(object):
def __init__(self, data_directory):
self.data_directory = data_directory
class KmerAlignmentHistogram(KmerDistribution):
def __init__(self, data_directory, kmer):
super(KmerAlignmentHistogram, self).__init__(data_directory=data_directory)
self.kmer = kmer
self.data_file = data_directory + "{}_hist.txt".format(self.kmer)
assert (os.path.isfile(self.data_file)), "No data for kmer {kmer}".format(kmer=self.kmer)
self.histogram = None
self.x_vals = None
self.kde_pdf = None
self.parse_histogram()
def parse_histogram(self):
self.histogram = np.loadtxt(self.data_file, dtype=np.float64)
self.histogram = [x for x in self.histogram if 0 < x < 100]
def parse_xvals(self, x_vals_file):
self.x_vals = np.loadtxt(x_vals_file, dtype=np.float64)
def make_kde(self, x_vals):
kernel = gaussian_kde(self.histogram)
KDE_PDF = kernel.evaluate(x_vals)
return KDE_PDF
class KmerHdpDistribution(KmerDistribution):
def __init__(self, data_directory, kmer):
super(KmerHdpDistribution, self).__init__(data_directory=data_directory)
self.kmer = kmer
self.data_file = data_directory + "{}_distr.txt".format(self.kmer)
assert (os.path.isfile(self.data_file)), "Didn't find distribution file at {}".format(self.data_file)
self.density = None
self.parse_density_file()
def parse_density_file(self):
self.density = np.loadtxt(self.data_file, dtype=np.float64)
def get_kmer_variants(kmer):
mC_trans = string.maketrans("C", "E")
hmC_trans = string.maketrans("C", "O")
return kmer.translate(mC_trans), kmer.translate(hmC_trans)
def get_kmer_densities(path, kmer):
#mC_trans = string.maketrans("C", "E")
#hmC_trans = string.maketrans("C", "O")
mc_kmer, hmc_kmer = get_kmer_variants(kmer)
c_density = KmerHdpDistribution(path, kmer)
mc_density = KmerHdpDistribution(path, mc_kmer)
hmc_density = KmerHdpDistribution(path, hmc_kmer)
return c_density, mc_density, hmc_density
def plot_ont_distribution(kmer, fast5, x_vals):
def get_model_table(model):
return model.get_model_dict()
template_model = get_model_table(TemplateModel(fast5File=fast5))
complement_model = get_model_table(ComplementModel(fast5File=fast5))
return norm.pdf(x_vals, template_model[kmer][0], template_model[kmer][1]), \
norm.pdf(x_vals, complement_model[kmer][0], complement_model[kmer][1])
def hmm_distribution(kmer, hmm, x_vals):
def get_model_from_hmm(model_file):
fH = open(hmm, 'r')
# line 0, type, stateNumber, etc
line = map(float, fH.readline().split())
assert len(line) == 4, "Bad header line"
# line 1, transitions
line = map(float, fH.readline().split())
assert len(line) == 10, "Bad transitions line"
# line 2, model
line = map(float, fH.readline().split())
assert len(line) == 6**6 * 2 # number of kmers * normal distribution parameters
return line
model = get_model_from_hmm(hmm)
kmer_index = getKmerIndex(kmer)
table_index = kmer_index * 2
#print model[table_index], model[table_index + 1]
return norm.pdf(x_vals, model[table_index], model[table_index + 1])
def get_hmm_pdfs(kmer, path_to_hmm, x_vals):
#mC_trans = string.maketrans("C", "E")
#hmC_trans = string.maketrans("C", "O")
mc_kmer, hmc_kmer = get_kmer_variants(kmer)
c_pdf = hmm_distribution(kmer, path_to_hmm, x_vals)
mc_pdf = hmm_distribution(mc_kmer, path_to_hmm, x_vals)
hmc_pdf = hmm_distribution(hmc_kmer, path_to_hmm, x_vals)
return c_pdf, mc_pdf, hmc_pdf
def get_kde_densities(kmer, path_to_histograms, x_vals):
mc_kmer, hmc_kmer = get_kmer_variants(kmer)
c_kde = KmerAlignmentHistogram(path_to_histograms, kmer)
mc_kde = KmerAlignmentHistogram(path_to_histograms, mc_kmer)
hmc_kde = KmerAlignmentHistogram(path_to_histograms, hmc_kmer)
return c_kde.make_kde(x_vals), mc_kde.make_kde(x_vals), hmc_kde.make_kde(x_vals)