Skip to content

Commit

Permalink
Interleave edit finding and bam reconfiguration
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Kofman committed Jul 26, 2024
1 parent 0c69e40 commit 5900db5
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 169 deletions.
12 changes: 9 additions & 3 deletions marine.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,13 @@ def bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag


# BAM Generation
cores = 1
total_bam_generation_time, total_seconds_for_bams = run_bam_reconfiguration(split_bams_folder, bam_filepath, overall_label_to_list_of_contents, contigs_to_generate_bams_for, barcode_tag=barcode_tag, cores=cores,
number_of_expected_bams=number_of_expected_bams,
verbose=verbose)

print("Deleting data for contig {} from memory...".format(contig))
del overall_label_to_list_of_contents[contig]

total_seconds_for_bams_df = pd.DataFrame.from_dict(total_seconds_for_bams, orient='index')
total_seconds_for_bams_df.columns = ['seconds']
Expand Down Expand Up @@ -259,7 +263,7 @@ def collate_edit_info_shards(output_folder):
def monitor_event(event, contig, bam_reconfig_launcher):
event.wait()
#print(f"Event {contig} triggered, launching bam_reconfig_launcher(contig)")
bam_reconfig_launcher(contig)
bam_reconfig_launcher(contig, event)

def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_intervals_per_contig=16, strandedness=True, barcode_tag="CB", paired_end=False, barcode_whitelist_file=None, verbose=False, coverage_only=False, filtering_only=False, annotation_only=False, bedgraphs_list=[], sailor=False, min_base_quality = 15, min_read_quality = 0, min_dist_from_end = 10, max_edits_per_read = None, cores = 64, number_of_expected_bams=4,
keep_intermediate_files=False,
Expand Down Expand Up @@ -318,7 +322,7 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in
# Define and start a separate thread to monitor each event
threads = []

def bam_reconfig_launcher(contig):
def bam_reconfig_launcher(contig, event):
if barcode_tag:
# Make a subfolder into which the split bams will be placed
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -329,7 +333,9 @@ def bam_reconfig_launcher(contig):
total_bam_generation_time, total_seconds_for_bams_df = bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag=barcode_tag, cores=cores, number_of_expected_bams=number_of_expected_bams, verbose=verbose, contig=contig)
#total_seconds_for_bams_df.to_csv("{}/bam_reconfiguration_timing.tsv".format(logging_folder), sep='\t')
#pretty_print("Total time to concat and write bams: {} minutes".format(round(total_bam_generation_time/60, 3)))


event.clear()

if barcode_tag:
for c in contigs:
thread = threading.Thread(target=monitor_event, args=(events[c], c, bam_reconfig_launcher))
Expand Down
10 changes: 9 additions & 1 deletion src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,15 @@ def run_edit_identifier(bampath, output_folder, strandedness, barcode_tag="CB",
if len(overall_label_to_list_of_contents[contig]) == num_intervals_per_contig:
#print('{} has {} chunks done'.format(contig, len(overall_label_to_list_of_contents[contig])))
events[contig].set()


configs_processed = len(overall_label_to_list_of_contents)
if configs_processed > 3:
print(overall_label_to_list_of_contents.keys())
for key, e in events.items():
if e.is_set():
while e.is_set():
time.sleep(0.1)

total_reads = _[3]
counts_summary_df = _[4]
all_counts_summary_dfs.append(counts_summary_df)
Expand Down
Loading

0 comments on commit 5900db5

Please sign in to comment.