forked from ckrushton/LGenIC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generate_input.py
executable file
·1314 lines (1097 loc) · 61.9 KB
/
generate_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Created by Christopher Rushton 2020/04/20
# This converts a MAF file and copy number matrix (optional) into the input files used be the LymphGen lymphoma
# classifier. See https://llmpp.nih.gov/lymphgen/index.php for more information
import argparse
import math
import os
import bisect
import logging
from collections import Counter
from sortedcontainers import SortedSet
class Gene:
"""
A simple class storing the chromosome, start, end, and name of a gene
"""
__slots__ = ["name", "chrom", "start", "end"]
def __init__(self, chrom, start, end, name):
self.name = name
self.chrom = chrom
self.start = start
self.end = end
class Chromosome:
"""
A simple class storying the genomic range of each chromosome, as well as the coordiates of the p and q arm
"""
def __init__(self, chrom):
self.chrom = chrom
self.p_start = None
self.p_end = None
self.q_start = None
self.q_end = None
self.q_length = 0.0
self.p_length = 0.0
def add(self, start, end, arm):
if arm == "p":
# Check to see if we already have genomic coordinates for this arm
if self.p_start is not None or self.p_end is not None:
raise AttributeError("Coordinates already assigned for %s arm of chromosome \'%s\'" % (arm, self.chrom))
self.p_start = start
self.p_end = end
self.p_length = end - start
# Sanity check this does not overlap the q arm
if self.q_start is not None and self.p_end > self.q_start:
raise AttributeError("For chromosome \'%s\'. The q arm starts before the end of the p arm" % self.chrom)
elif arm == "q":
# Check to see if we already have genomic coordinates for this arm
if self.q_start is not None or self.q_end is not None:
raise AttributeError("Coordinates already assigned for %s arm of chromosome \'%s\'" % (arm, self.chrom))
self.q_start = start
self.q_end = end
self.q_length = end - start
# Sanity check this does not overlap the p arm
if self.p_end is not None and self.q_start < self.p_end:
raise AttributeError("For chromosome \'%s\'. The q arm starts before the end of the p arm" % self.chrom)
else:
# Not the p or q arm. Invalid
raise AttributeError("Unknown arm specified for chromosome \'%s\':\'%s\'" % (self.chrom, arm))
class SampleCNVs:
"""
Stores all the copy number events within a given class
I was going to write this in a function, but having a class is a lot cleaner
"""
def __init__(self):
self.starts = {}
self.ends = {}
self.cn_states = {}
self.len_seg = {}
self.is_chr_prefixed = None
self.ploidy = None
def merge_overlapping_cnvs(self, existStarts: iter, existEnds: iter, existCNs: iter, newStart: int, newEnd: int, newCN:int):
"""
Handle overlapping copy number segments, merging and editing existing segments as necessary
In essence, if this segment overlaps an existing segment with the same copy number state, those segments are merged
together. If the copy number state is different, the largest increase is kept,
truncating or even breaking apart the copy-neutral segment in the process. In the worse case senario,
this will lead to three different segments being created
:param existStarts: The start position of the existing segment
:param existEnds: The end position of the existing segment
:param existCNs: The copy number state of the existing segment
:param newStart: The start position of the new segment
:param newEnd: The end position of the new segment
:param newCN: The copy number state of the new segment
:return: A tuple containing the new and old events. Ex {[newEvent1, newEvent2], [oldEvent1, oldEvent2]}
"""
def reduce_new_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd):
# The existing event has higher priority than the old event
# Truncate the newer event
# Since this could break the newer event apart into two pieces, lets do that now, and see
# if the output segments are valid
outStart1 = newStart
outEnd1 = oldStart
outStart2 = oldEnd
outEnd2 = newEnd
if outStart1 < outEnd1: # Valid segment, process
existStarts, existEnds, existCNs = self.merge_overlapping_cnvs(existStarts, existEnds, existCNs,
outStart1, outEnd1, newCN)
if outStart2 < outEnd2: # Valid segment
existStarts, existEnds, existCNs = self.merge_overlapping_cnvs(existStarts, existEnds, existCNs,
outStart2, outEnd2, newCN)
return existStarts, existEnds, existCNs
def reduce_old_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd, oldCN):
# The newer event is a "higher"-level deletion
# Split/Truncate the older event
outStart1 = oldStart
outEnd1 = newStart
outStart2 = newEnd
outEnd2 = oldEnd
outCN = oldCN
# Delete existing event
existStarts.pop(start_bisect)
existEnds.pop(start_bisect)
existCNs.pop(start_bisect)
if outStart2 < outEnd2: # Valid segment, do the last one first to maintain sorted order
existStarts.insert(start_bisect, outStart2)
existEnds.insert(start_bisect, outEnd2)
existCNs.insert(start_bisect, outCN)
if outStart1 < outEnd1: # Also valid
existStarts.insert(start_bisect, outStart1)
existEnds.insert(start_bisect, outEnd1)
existCNs.insert(start_bisect, outCN)
# Check for any more overlaps
return self.merge_overlapping_cnvs(existStarts, existEnds, existCNs, newStart, newEnd, newCN)
# First, does this new segment actually overlap anything?
start_bisect = bisect.bisect_right(existEnds, newStart)
end_bisect = bisect.bisect_left(existStarts, newEnd)
if start_bisect == end_bisect:
# There are no overlaps. Simply add this new event in, and return it
existStarts.insert(start_bisect, newStart)
existEnds.insert(start_bisect, newEnd)
existCNs.insert(start_bisect, newCN)
return existStarts, existEnds, existCNs
# Grab the first overlap
oldStart = existStarts[start_bisect]
oldEnd = existEnds[start_bisect]
oldCN = existCNs[start_bisect]
# Simplest case first. If both these events have the same CN state, lets merge them together
if oldCN == newCN:
outStart = oldStart if oldStart < newStart else newStart
outEnd = oldEnd if oldEnd > newEnd else newEnd
# Delete the existing event
existStarts.pop(start_bisect)
existEnds.pop(start_bisect)
existCNs.pop(start_bisect)
# Check for any more overlaps
return self.merge_overlapping_cnvs(existStarts, existEnds, existCNs, outStart, outEnd, newCN)
else:
# These segments overlap and have different CN states.
# Lets keep the highest-level event
if newCN <= 2 and oldCN <= 2: # Deletion
if oldCN < newCN:
# The older event is a "higher"-level deletion
return reduce_new_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd)
else:
return reduce_old_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd, oldCN)
if newCN >= 2 and oldCN >= 2: # Gain/Amp
if oldCN > newCN:
# The older event is a higher level gain. Split/truncate the new event
return reduce_new_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart,
oldEnd)
else:
return reduce_old_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd,
oldCN)
else:
# One event must be a gain/amp, while the other is a deletion. In this case, keep both, and subset the
# larger event by the smaller one
if oldEnd - oldStart < newEnd - newStart:
# The older event is smaller. Split the newer event
return reduce_new_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart,
oldEnd)
else:
# The newer event is smaller. We should split the older event
return reduce_old_event(existStarts, existEnds, existCNs, newStart, newEnd, newCN, oldStart, oldEnd,
oldCN)
def add(self, chrom: str, start: int, end: int, cn: int):
# For LymphGen compatability: Strip "chr" prefix
chrom = chrom.replace("chr", "")
if end <= start:
if start == end:
# This segment has a length of 0?!. Skip this I guesss
return None
else:
# This segment has a negative length??
raise TypeError("Invalid CNV segment: \'%s:%s-%s\'" % (chrom, start, end))
# Have we seen any events for this chromosome before?
if chrom not in self.len_seg:
# If not, just store this segment. Simple!
self.starts[chrom] = [start]
self.ends[chrom] = [end]
self.cn_states[chrom] = [cn]
self.len_seg[chrom] = 1
# Are we using chr-prefixed contig names?
if self.is_chr_prefixed is None:
self.is_chr_prefixed = True if chrom.startswith("chr") else False
else:
# If we have seen events on this chromosome before, we need to compare this new segment with existing segments
# In theory, events should be non-overlapping, but I don't want to assume that because then things will break
# horribly later
self.starts[chrom], self.ends[chrom], self.cn_states[chrom] = \
self.merge_overlapping_cnvs(self.starts[chrom], self.ends[chrom], self.cn_states[chrom], start, end, cn)
self.len_seg[chrom] = len(self.cn_states)
def overlap_chrom(self, chromosome: Chromosome, threshold: float = 0.8):
"""
Calculates the overlap between the segments stored here and a given chromosome
:param chromosome: A Chromosome() object defining the start and end of the p and q arms, respectively
:return: A dictionary containing
"""
# Do the CNVs within this chromosome encompass a given chromosome more than the specified threshold?
chrom_name = chromosome.chrom
# Handle "chr" prefix nonsense
if self.is_chr_prefixed and not chrom_name.startswith("chr"):
chrom_name = "chr" + chrom_name
elif not self.is_chr_prefixed and chrom_name.startswith("chr"):
chrom_name = chrom_name.replace("chr", "")
try:
chrom_starts = self.starts[chrom_name]
except KeyError:
# No CN events were provided for this chromosome, hence there can be no overlap
return {}
chrom_ends = self.ends[chrom_name]
chrom_cn = self.cn_states[chrom_name]
homdel_sum = {"p": 0, "q": 0, "Chrom": 0}
del_sum = {"p": 0, "q": 0, "Chrom": 0}
gain_sum = {"p": 0, "q": 0, "Chrom": 0}
amp_sum = {"p": 0, "q": 0, "Chrom": 0}
# Check for overlapping segments for each arm
for (start, end, cn) in zip(chrom_starts, chrom_ends, chrom_cn):
# Ignore copy-neutral segments
if cn == 2:
continue
if end < chromosome.p_start or start > chromosome.q_end:
# Segment falls out of range
continue
if start < chromosome.p_end:
olap_start = start if start > chromosome.p_start else chromosome.p_start
olap_end = end if end < chromosome.p_end else chromosome.p_end
if cn < 2:
del_sum["p"] += olap_end - olap_start
del_sum["Chrom"] += olap_end - olap_start
if cn < 1:
homdel_sum["p"] += olap_end - olap_start
homdel_sum["Chrom"] += olap_end - olap_start
elif cn > 2:
gain_sum["p"] += olap_end - olap_start
gain_sum["Chrom"] += olap_end - olap_start
if cn > 3:
amp_sum["p"] += olap_end - olap_start
amp_sum["Chrom"] += olap_end - olap_start
if end > chromosome.q_start: # We use an if, not elif, in case a segment overlaps both the p and q arm
olap_start = start if start > chromosome.q_start else chromosome.q_start
olap_end = end if end < chromosome.q_end else chromosome.q_end
if cn < 2:
del_sum["q"] += olap_end - olap_start
del_sum["Chrom"] += olap_end - olap_start
if cn < 1:
homdel_sum["q"] += olap_end - olap_start
homdel_sum["Chrom"] += olap_end - olap_start
elif cn > 2:
gain_sum["q"] += olap_end - olap_start
gain_sum["Chrom"] += olap_end - olap_start
if cn > 3:
amp_sum["q"] += olap_end - olap_start
amp_sum["Chrom"] += olap_end - olap_start
events = {}
# Now, calculate the fraction of each overlap
# Start from the highest level and biggest event, and work our way down/smaller
# Amplifications/gains
if amp_sum["Chrom"] / (chromosome.p_length + chromosome.q_length) > threshold: # Whole chromosome Amp
events[chrom_name + "Chrom"] = "AMP"
# Now check for arm level events
else:
if amp_sum["p"] / chromosome.p_length > threshold: # p Amp
events[chrom_name + "p"] = "AMP"
elif amp_sum["q"] / chromosome.q_length > threshold: # q Amp
events[chrom_name + "q"] = "AMP"
if chrom_name + "Chrom" not in events and gain_sum["Chrom"] / (chromosome.p_length + chromosome.q_length) > threshold: # Whole chromosome gain
events[chrom_name + "Chrom"] = "GAIN"
else:
if chrom_name + "p" not in events and gain_sum["p"] / chromosome.p_length > threshold: # p Gain
events[chrom_name + "p"] = "GAIN"
if chrom_name + "q" not in events and gain_sum["q"] / chromosome.q_length > threshold: # q Gain
events[chrom_name + "q"] = "GAIN"
# Homozygous and heterozygous deletions
if homdel_sum["Chrom"] / (chromosome.p_length + chromosome.q_length) > threshold: # Whole chromosome Homozygous deletion
events[chrom_name + "Chrom"] = "HOMDEL"
# Now check for arm level events
else:
if homdel_sum["p"] / chromosome.p_length > threshold: # p HOMDEL
events[chrom_name + "p"] = "HOMDEL"
elif homdel_sum["q"] / chromosome.q_length > threshold: # q HOMDEL
events[chrom_name + "q"] = "HOMDEL"
if chrom_name + "Chrom" not in events and del_sum["Chrom"] / (chromosome.p_length + chromosome.q_length) > threshold: # Whole chromosome deletions
events[chrom_name + "Chrom"] = "HETLOSS"
else:
if chrom_name + "p" not in events and del_sum["p"] / chromosome.p_length > threshold: # p deletion
events[chrom_name + "p"] = "HETLOSS"
if chrom_name + "q" not in events and del_sum["q"] / chromosome.q_length > threshold: # q deletions
events[chrom_name + "q"] = "HETLOSS"
return events
def adjust_ploidy(self, arm_coords, sample_name, redo=False):
"""
Adjust a sample's CN states based upon ploidy
Calculate the average copy number state across the entire genome. If a sample is not diploid, normalize all the CN
changes
:param arm_coords: A dictionary containing {"chromosome_name": Chromosome()} objects which list chromosomal coordinates
:param sample_name: A string specifying the sample name. Used for debugging/error purposes
:param redo: Should we override any existing ploidy state for this sample?
:return: None
"""
if self.ploidy is not None and not redo:
# Ploidy for this sample has already been calculated. Don't do anything
return None
# Store the length affected for each ploidy state
ploidy_cov = {}
genome_size = 0
for chromosome in arm_coords.values():
# Find overlapping CNVs for this arm
# if the p or q arm coordinates are not set, use a placeholder
if chromosome.p_start is None:
chromosome.p_start = -100000
chromosome.p_end = -100000
chromosome.p_length = 1
if chromosome.q_start is None:
chromosome.q_start = -100000
chromosome.q_end = -100000
chromosome.q_length = 1
# Save the size of this chromosome
genome_size += chromosome.p_length
genome_size += chromosome.q_length
chrom_name = chromosome.chrom
# Handle "chr" prefix nonsense
if self.is_chr_prefixed and not chrom_name.startswith("chr"):
chrom_name = "chr" + chrom_name
elif not self.is_chr_prefixed and chrom_name.startswith("chr"):
chrom_name = chrom_name.replace("chr", "")
try:
chrom_starts = self.starts[chrom_name]
except KeyError:
# No CN events were provided for this chromosome, hence there can be no overlap
continue
chrom_ends = self.ends[chrom_name]
chrom_cn = self.cn_states[chrom_name]
# Check for overlapping segments for each arm
for (start, end, cn) in zip(chrom_starts, chrom_ends, chrom_cn):
# Check p-arm overlap
if end < chromosome.p_start or start > chromosome.q_end:
# Segment falls out of range
continue
if start < chromosome.p_end:
olap_start = start if start > chromosome.p_start else chromosome.p_start
olap_end = end if end < chromosome.p_end else chromosome.p_end
# Store the length of this overlap and associated CN
if cn not in ploidy_cov:
ploidy_cov[cn] = 0
ploidy_cov[cn] += olap_end - olap_start
if end > chromosome.q_start: # We use an if, not elif, in case a segment overlaps both the p and q arm
olap_start = start if start > chromosome.q_start else chromosome.q_start
olap_end = end if end < chromosome.q_end else chromosome.q_end
# Store the length of this overlap and associated CN
if cn not in ploidy_cov:
ploidy_cov[cn] = 0
ploidy_cov[cn] += olap_end - olap_start
# Now that we have calculated the number of bases affected by each CN state, calculate the ploidy
x = 0
for cn, bases in ploidy_cov.items():
x += cn * bases
ploidy = x / genome_size
av_ploidy = round(ploidy)
# Sanity check
if av_ploidy < 1:
raise TypeError("Ploidy of sample \'%s\' was calculated to be below 1!" % sample_name)
# If this tumour is not diploid, adjust the ploidy
if av_ploidy != 2:
logging.debug("\'%s\' was calculated to have a ploidy of %s" % (sample_name, av_ploidy))
ploidy_dif = av_ploidy - 2
new_cns = {}
for chrom, cn in self.cn_states.items():
x = list(y - ploidy_dif for y in cn)
new_cns[chrom] = x
self.cn_states = new_cns
self.ploidy = av_ploidy
def get_args():
def is_valid_dir(path, parser):
"""
Checks to ensure the directory path exists
:param path: A string containing a filepath to a directory
:param parser: An argparse.ArgumentParser() object
:return: path, if the string is a valid directory
:raises: parser.error() if the directory does not exist
"""
if os.path.exists(path) and os.path.isdir(path):
return path
else:
raise parser.error("Unable to set \'%s\' as the output directory: Not a valid directory" % path)
def is_valid_file(path, parser):
"""
Checks to ensure the specified file exists
:param path: A string containing a filepath
:param parser: An argparse.ArgumentParser() object
:return: path, if the string is a valid directory
:raises: parser.error() if the file does not exist
"""
if os.path.exists(path) and os.path.isfile(path):
return path
else:
raise parser.error("Unable to locate \'%s\': No such file or directory" % path)
epilog = os.linesep.join(["The --arms, --entrez_ids, and --lymphgen_genes files can be found in the \'resources\' folder. ",
"Note that genome and exome sequencing types are handled exactly the same. ",
"The --entrez-ids file must have the Hugo_Symbol under the column \"Approved Symbol\" and the Entrez ID under the column \"NCBI Gene ID(supplied by NCBI)\". ",
"The --cnvs file should contain the following colummns: Tumor_Sample_Barcode, chromosome, start, end, CN. ",
"The --arms file should contain the following columns: chromosome, start, end, arm. "
])
parser = argparse.ArgumentParser(description="Generates input files for the LymphGen classifier\nVersion 1.0.0", epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
snvs_args = parser.add_argument_group("Input mutation files")
snvs_args.add_argument("-m", "--maf", metavar="MAF", required=True, type=lambda x: is_valid_file(x, parser), help="Input MAF file listing somatic mutations")
snvs_args.add_argument("-e", "--entrez_ids", metavar="TSV", required=True, type=lambda x: is_valid_file(x, parser), help="A tab-delimited file containing gene names (Hugo Symbol) and the corresponding Entrez gene ID")
cnv_args = parser.add_argument_group("(Optional) Input CNV files")
cnv_args.add_argument("-c", "--cnvs", metavar="TSV", default=None, type=lambda x: is_valid_file(x, parser), help="Input tab-delimited file summarizing copy number events")
cnv_args.add_argument("--log2", action="store_true", help="Does the input CNV file use log2 ratios? (i.e. log2(absoluteCN) - 1)")
cnv_args.add_argument("-g", "--genes", metavar="BED", default=None, type=lambda x: is_valid_file(x, parser), help="Input BED4+ file listing start and end positions of genes/exons")
cnv_args.add_argument("-a", "--arms", metavar="TSV", default=None, type=lambda x: is_valid_file(x, parser), help="Input tab-delimited file listing the positions of chromosome arms")
parser.add_argument("-l", "--lymphgen_genes", metavar="TXT", default=None, type=lambda x: is_valid_file(x, parser), required=False, help="An optional file listing all Entrez IDs considered by the LymphGen classifier. Output will be subset to these genes")
parser.add_argument("-s", "--sequencing_type", metavar="SEQ", choices=["targeted", "exome", "genome"], required=True, help="Sequencing type used to obtain somatic mutations")
parser.add_argument("-o", "--outdir", metavar="PATH", required=True, type=lambda x: is_valid_dir(x, parser), help="Output directory for LymphGen input files")
parser.add_argument("--outprefix", metavar="STRING", default=None, help="Output files will be prefixed using this string [Default: Use the base name of the MAF file]")
parser.add_argument("-v", "--verbose", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], default="INFO", help="Set logging verbosity")
args = parser.parse_args()
# If no outprefix was set, use the basename of the maf file as the output prefix
if args.outprefix is None:
args.outprefix = os.path.basename(args.maf).split(".")[0]
# If --cnvs is specified, the annotation files are also required
if args.cnvs:
if not args.genes or not args.arms:
raise parser.error("--genes and --arms are required if --cnvs is specified")
logging.basicConfig(level=getattr(logging, args.verbose), format='%(levelname)s:%(message)s')
return args
def load_entrez_ids(entrez_file: str, hugo_column: str="Approved symbol", entrez_column: str="NCBI Gene ID(supplied by NCBI)",
status_column: str="Status", prev_name_column: str="Previous symbols"):
"""
Creates a dictionary which contains the gene name (Hugo Symbol) and corresponding entrez ID (NCBI ID) from the
specified text file
Note the input file must have a header. An example file can be obtained from https://www.genenames.org/download/custom/
Ensure the "Approved symbol" and "NCBI Gene ID" columns are selected. The default column names reflect those obtained
from this source.
The "Approval Status" column is optional, but will be used to remove gene names which have since been withdrawn.
If the "Previous symbols" solumn is included, these gene names will also be assigned to that Entrez ID
The subset_file should list one Entrez ID per line
All extra columns are ignored
:param entrez_file: A string containing a filepath to a tsv file containing the gene names and gene IDs
:param hugo_column: A string specifying the name of the column which contains
:param entrez_column: A string specifying the name of the column which contains the Entrez gene ID
:param status_column: A string specifying the name of the column which contains the Approval Status. Optional.
:param prev_name_column: A string specifting the name of the column which specifies previously used names for each gene. Optional
:return: Two dictionaries containing {"Gene Name": "Entrez_ID"}
"""
hugo_name_to_id = {}
alt_hugo_name_to_id = {} # If Previous symbols are included
skipped_genes = []
hugo_col_num = None
entrez_col_num = None
status_col_num = None # Optional
prev_name_col_num = None # Optional
i = 0
with open(entrez_file) as f:
for line in f:
i += 1
line = line.rstrip("\n").rstrip("\r") # Remove line endings
cols = line.split("\t")
if len(cols) < 2: # Sanity check this file has multiple columns
raise TypeError("Input file %s does not appear to be a tab-delimited file" % entrez_file)
# If we have not assigned column IDs yet, then we have not processed the file header
# Load the file header
if hugo_col_num is None or entrez_col_num is None:
j = 0
for col in cols:
if col == hugo_column:
hugo_col_num = j
if col == entrez_column:
entrez_col_num = j
if status_column is not None and col == status_column:
status_col_num = j
if prev_name_column is not None and col == prev_name_column:
prev_name_col_num = j
j += 1
# Sanity check that we have found all the columns we need
if hugo_col_num is None:
raise AttributeError("Unable to locate a column corresponding to \'%s\' in the input file \'%s\'" % (hugo_column, entrez_file))
elif entrez_col_num is None:
raise AttributeError("Unable to locate a column corresponding to \'%s\' in the input file \'%s\'" % (entrez_column, entrez_file))
# If we have made it this far, then the header was valid
continue
# Check the Approval Status column to see if the gene name has been withdrawn
if status_col_num is not None:
if cols[status_col_num] != "Approved":
continue
# Parse the gene name and ID from the input file
try:
hugo_name = cols[hugo_col_num]
entrez_id = cols[entrez_col_num]
# Sanity check: The entrez ID is ALWAYS a number
if not entrez_id.isdigit():
# If there is no entrez ID, skip this gene
if entrez_id == "":
skipped_genes.append(hugo_name)
continue
else:
raise TypeError("When parsing line %s of \'%s\', the Entrez gene ID \'%s\' does not appear to be a valid Entrez Gene ID" % (i, entrez_file, entrez_id))
except IndexError as e:
# If we end up in here, then there were too few columns in the current line to parse the IDs
raise AttributeError("Unable to parse line %s of \'%s\': Line is truncated?" % (i, entrez_file)) from e
# Sanity check: Make sure we haven't seen this gene name before
if hugo_name in hugo_name_to_id:
raise AttributeError("Gene \'%s\' appears duplicated in \'%s\'" % (hugo_name, entrez_file))
hugo_name_to_id[hugo_name] = entrez_id
# If the Previous symbols column was found, annotate these gene names with the same ID
previous_names = cols[prev_name_col_num].split(",")
if previous_names[0] == "":
continue # There are no previous names for this gene
previous_names = [x.replace(" ", "") for x in previous_names] # Remove extra spaces
for hugo_name in previous_names:
# If we have seen this gene name before, then don't overwrite it. Just assume that the first instance
# encountered is correct. This likely isn't the case, but these cases should be so rare that they are
# not worth worrying about
if hugo_name in alt_hugo_name_to_id:
continue
alt_hugo_name_to_id[hugo_name] = entrez_id
# Final sanity check: As Hugo Symbols and Entrez IDs have a 1-1 mapping, make sure there are no duplicate Entrez IDs
if len(hugo_name_to_id) != len(set(hugo_name_to_id.values())):
num_ids = Counter(hugo_name_to_id.values())
# For simplicity, just print out one Entrez ID which is duplicated
bad_id = num_ids.most_common(1)[0][0]
bad_count = num_ids.most_common(1)[0][1]
raise AttributeError("Entrez gene ID \'\%s\' appears %s times in the input file" % (bad_id, bad_count))
return hugo_name_to_id, alt_hugo_name_to_id
def load_subset_ids(id_file):
"""
Loads a set of Entrez IDs from a specified file. One ID/line
:param id_file:
"""
subset_ids = []
with open(id_file) as f:
for line in f:
line = line.rstrip("\n").rstrip("\r")
if not line.isdigit():
raise TypeError("Invalid Entrez ID when processing the --lyphmgen_genes file: %s" % line)
subset_ids.append(line)
subset_ids = set(subset_ids)
return subset_ids
def generate_mut_flat(in_maf: str, seq_type: str, gene_ids: dict, out_mut_flat: str, out_gene_list: str, alt_gene_ids: dict = None, subset_ids: iter = None):
"""
Converts a MAF file into the mutation flat file and gene list used by LymphGen
WARNING: GRCh37/hg19 is currently only supported!
An example of the output mutation FLAT file:
Sample ENTREZ.ID Type Location
Sample1 604 MUTATION 187451395
Sample1 604 MUTATION 187451408
Sample1 613 MUTATION 23523596
Sample1 970 TRUNC 6590925
Sample1 1840 Synon 113495932
An example of the output gene list:
"ENTREZ.ID"
60
355
567
596
604
605
613
639
673
If targeted sequencing is specified, only the genes with at least one non-synonmymous mutation in the MAF file are used for
the gene list. Otherwise, ALL genes are used.
The following MAF columns are required: Hugo_Symbol, Variant_Classification, Tumor_Sample_Barcode. If NCBI_Build and
Start_Position are found, the output mutation flat file will contain an additional column specifying "Location".
If Tumor_Seq_Allele2 is provided, the MYD88 hotspot mutations will be annotated as "L265P"
:param in_maf: A string containing a filepath to a MAF file containing the variants of interest
:param seq_type: A string specifying the sequencing type used to identify mutations. Only "targeted", "exome", and "genome" are supported
:param gene_ids: A dictionary containing the mapping between Hugo_Symbol: Entrez_Gene_ID.
:param out_mut_flat: A string specifying the output path to the mutation flat file
:param out_gene_list: A string specifying the output path to the gene list
:param alt_gene_ids: An additional dictionary specifying alternate Hugo_Symbol: Entrez_Gene_ID mappings
:param subset_ids: An interable specifying a list of Entrez IDs. Only mutations within these Entrez IDs will be output
:return: A list specifying all samples analyzed for mutations
"""
logging.info("Processing MAF file")
# Non-synonmymous mutation types:
non_syn = {"In_Frame_Del", "In_Frame_Ins", "Missense_Mutation"}
trunc_mut = {"Frame_Shift_Del", "Frame_Shift_Ins", "Nonsense_Mutation", "Nonstop_Mutation", "Splice_Site", "Translation_Start_Site"}
synon_mut = {"Silent", "5'UTR", "5'Flank", "Intron", "3'UTR"} # Use for the "Synon" mutation type
# Which samples are we analyzing?
sample_list = SortedSet() # Used to ensure the output files are in somewhat sorted order
required_cols = ["Hugo_Symbol", "Variant_Classification", "Tumor_Sample_Barcode"]
optional_cols = ["NCBI_Build", "Start_Position", "Tumor_Seq_Allele2"]
header_cols = {}
out_header_written = False
out_header_cols = ["Sample", "ENTREZ.ID", "Type"]
if alt_gene_ids is None:
alt_gene_ids = {}
# For targeted sequencing, which genes have we sequenced and seen mutations in?
genes_seen = {}
skipped_mut = [] # Store mutations in genes with no Entrez ID
i = 0
with open(in_maf) as f, open(out_mut_flat, "w") as o:
for line in f:
i += 1
# Ignore comment lines
if line[0] == "#":
continue
line = line.rstrip("\n").rstrip("\r")
cols = line.split("\t")
# Skip empty lines
if not line:
continue
# If we haven't parsed the header yet, assume we are parsing the first line of the file, and that is the header
if not header_cols:
j = 0
for col in cols:
if col in required_cols:
header_cols[col] = j
elif col in optional_cols:
header_cols[col] = j
j += 1
# Check that we have all required columns
for col in required_cols:
if col not in header_cols:
raise AttributeError("Unable to locate a column specifying \'%s\' in the MAF file header" % col)
# See which optional columns we have
if "Start_Position" in header_cols:
out_header_cols.append("Location")
logging.info("\'Start_Position\' column found in MAF file. Output mutation flat file will be annotated with \'Position\'")
if "Tumor_Seq_Allele2" in header_cols:
logging.info(
"\'Tumor_Seq_Allele2\' column found in MAF file. MYD88 hotspot mutations will be annotated as \'L265P\'")
continue
# Process mutations
mut_attributes = {x: cols[y] for x, y in header_cols.items()}
# Check genome build, as only GRCh37 is supported
if "NCBI_Build" in mut_attributes and "Start_Position" in mut_attributes and mut_attributes["NCBI_Build"] != "GRCh37":
raise NotImplementedError("Only GRCh37 is currently supported as a reference genome")
# Lets get the easiest stuff out of the way first. Specify the output sample name and gene ID
sample_name = mut_attributes["Tumor_Sample_Barcode"]
if sample_name == "" or sample_name == ".":
raise AttributeError("No tumor_sample_barcode was found when processing line %s of the MAF file" % i)
hugo_name = mut_attributes["Hugo_Symbol"]
try:
# Skip intergenic mutations
if hugo_name == "Unknown":
continue
entrez_id = gene_ids[hugo_name]
except KeyError:
# If this gene does not have a Entrez ID, are we prehaps using an older Hugo_Symbol?
if hugo_name in alt_gene_ids:
entrez_id = alt_gene_ids[hugo_name]
else:
# Skip this mutation if there is no Entrez ID
skipped_mut.append(line)
continue
# Obtain position (if Start_Position is specified)
if "Start_Position" in mut_attributes:
position = mut_attributes["Start_Position"]
else:
position = None
# Finally, specify the variant type
if mut_attributes["Variant_Classification"] in trunc_mut:
genes_seen[entrez_id] = hugo_name # This gene has a non-synonmymous mutation, so lets assume we have sequenced it
type = "TRUNC"
elif mut_attributes["Variant_Classification"] in non_syn:
genes_seen[entrez_id] = hugo_name # This gene has a non-synonmymous mutation, so lets assume we have sequenced it
type = "MUTATION"
elif mut_attributes["Variant_Classification"] in synon_mut:
genes_seen[entrez_id] = hugo_name
type = "Synon"
else:
# Ignore intronic mutations, 3'UTRs etc
continue
# If this mutation is in MYD88, does it affect the hotspot?
if hugo_name == "MYD88" and "Tumor_Seq_Allele2" in mut_attributes:
if mut_attributes["Start_Position"] == "38182641" and mut_attributes["Tumor_Seq_Allele2"] == "C":
type = "L265P"
if sample_name not in sample_list:
sample_list.add(sample_name)
# If this gene isn't in the subset list provide it, skip it
if subset_ids is not None and entrez_id not in subset_ids:
continue
# Finally, write this variant
# Write a header line, if we have not done so already
if not out_header_written:
out_header_written = True
o.write("\t".join(out_header_cols))
o.write(os.linesep)
out_line = [sample_name, entrez_id, type, position + os.linesep]
o.write("\t".join(out_line))
# Check to see that we actually converted mutations
if len(genes_seen) == 0:
raise AttributeError("No mutations were successfully converted. Check that the --entrez_ids file has valid Hugo Symbols matching the --maf file")
elif skipped_mut:
logging.warning("%s mutations in the MAF file were not converted. These either don't have a valid Entrez ID, or they were not in the --lymphgen_gene file" % len(skipped_mut))
logging.info("Finished processing MAF file")
logging.info("Generating gene list")
# Generate the gene list file
# This file a single column with the Entrez ID, as that is all LymphGen currently supports
with open(out_gene_list, "w") as o:
# Write header
o.write("\"ENTREZ.ID\"" + os.linesep)
# Write out all genes we have seen a mutation in
for entrez_id in genes_seen.keys():
if subset_ids is not None and entrez_id not in subset_ids:
continue # Skip these gene, as it wasn't in the --lymphgen_gene list
o.write(entrez_id)
o.write(os.linesep)
if seq_type == "exome" or seq_type == "genome":
# We have sequenced (effectively) all genes in the human genome. Write out all remaining genes
for entrez_id in gene_ids.values():
if entrez_id in genes_seen: # We have already written out this gene. Skip it
continue
if subset_ids is not None and entrez_id not in subset_ids:
continue # Skip these gene, as it wasn't in the --lymphgen_gene list
o.write(entrez_id)
o.write(os.linesep)
return list(sample_list)
def load_gene_coords_bed(bed_file, gene_ids, alt_gene_ids=None):
"""
Load in the genomic coordinates of genes in the human genome from the specified BED4+ file.
The following columns are required (no header)
chrom start end gene
A gene can be split across multiple BED entries (ex. exonic coordinates). In this case, the start and end of the first and last exon,
respectively, will be used as the gene start/end coordinates
:param bed_file: A string containing a filepath to a BED4+ file listing the positions of genes
:return: A dictionary storing {gene_name: Gene()}
"""
if alt_gene_ids is None:
alt_gene_ids = {}
gene_coords = {}
skipped_genes = []
i = 0
with open(bed_file) as f:
for line in f:
i += 1
line = line.rstrip("\n").rstrip("\r") # Remove line endings
cols = line.split("\t")
# Skip empty lines
if not line:
continue
# Since this BED file could contain more than 4 columns (ex. a BED6 or BED12 file), manually unpack and inspect the first
# four columns, since those are the columns we care about
try:
chrom = cols[0]
start = cols[1]
end = cols[2]
gene = cols[3]
except IndexError:
# This BED entry was trucated
raise AttributeError("Unable to parse line %s of %s as it contains less than four columns (chrom, start, end, gene)" % (i, bed_file))
# Check that the start and end are actually genomic coordinates
if not start.isdigit():
raise TypeError("When parsing line %s of %s, the start position \'%s\' is not a valid genomic coordinate" % (i, bed_file, start))
if not end.isdigit():
raise TypeError("When parsing line %s of %s, the end position \'%s\' is not a valid genomic coordinate" % (i, bed_file, end))
start = int(start)
end = int(end)
chrom = chrom.replace("chr", "")
# Is the gene name the Hugo Symbol or the Entrez gene ID?
# If its an integer, lets assume its the Entrez ID
# If its not, assume it is the Hugo Symbol and convert it to the Entrez ID
if not gene.isdigit():
try:
entrez_id = gene_ids[gene]
except KeyError:
# If we can't find this Hugo Symbol in the default mappings, check the alt gene IDs
if gene in alt_gene_ids:
entrez_id = alt_gene_ids[gene]
else:
# This gene doesn't have an Entrez ID. Skip it
skipped_genes.append(gene)
continue
else:
entrez_id = gene
# Have we seen this chromosome before?
if chrom not in gene_coords:
gene_coords[chrom] = {}
# Have we seen this gene before?
if entrez_id in gene_coords[chrom]:
# If so, then we need to update the start/end of this gene based on this new BED entry
existing_gene_entry = gene_coords[chrom][entrez_id]
# Sanity check
if chrom != existing_gene_entry.chrom:
raise AttributeError("We found two entries for gene \'%s\'. One is found on chromosome \'%s\', while the other is found on \'%s\'"
% (gene, chrom, existing_gene_entry.chrom))
if start < existing_gene_entry.start:
existing_gene_entry.start = start
if end > existing_gene_entry.end:
existing_gene_entry.end = end
else:
# If we haven't seen this gene before, create a new entry for it
gene_coords[chrom][entrez_id] = Gene(chrom, start, end, entrez_id)
# Now that we have processed all genes, make sure we actually found Entrez IDs for a handful of genes
# If we haven't, it means the input file likely didn't contain Hugo Symbols
if len(gene_coords) == 0:
raise AttributeError("No Hugo Symbols from the --genes file (column 4) were found in the --entrez_ids file. An example gene is %s" % (skipped_genes[0]))
return gene_coords
def load_chrom_arm(arm_file):
"""
Loads in the genomic coordinates corresponding to the arm of each chromosome
The input should be a tab-delimited file with the following columns
chromosome start end arm
:param arm_file: A string containing a filepath to a tab-delimited file containing genomic coordinates for chromosome arms
:return: A dictionary storing the genomic range of each chromosome, and each individual arm
"""
arm_chrom = {}
required_cols = ["chromosome", "start", "end", "arm"]
header_cols = {}
i = 0
with open(arm_file) as f:
for line in f:
i += 1
line = line.rstrip("\n").rstrip("\r") # Remove line endings
cols = line.split("\t")
# Skip empty lines
if not line:
continue
# If we haven't parsed the header yet, assume this is the first line of the file (aka the header)
if not header_cols:
j = 0
for col in cols:
if col in required_cols:
header_cols[col] = j
j += 1
# Check to make sure all required columns are found
for col in required_cols:
if col not in header_cols:
raise AttributeError("Unable to locate column %s in the chromosome arm positions file \'%s\'" % (col, arm_file))
# If we get this far, the header is valid
continue
try:
arm_attributes = {x: cols[y] for x, y in header_cols.items()}
except IndexError:
# We were unable to find a required column. This line is likely truncated
raise AttributeError("Unable to parse line %s of the chromosome arm positions file %s: The line appears truncated" % (i, arm_file))
# Have we seen this chromosome before? If not, lets create an object to store its genomic features
chrom = arm_attributes["chromosome"]
# Strip chr prefix
chrom = chrom.replace("chr", "")