diff --git a/marine.py b/marine.py index 059e21f..a605b51 100755 --- a/marine.py +++ b/marine.py @@ -9,8 +9,8 @@ import polars as pl import psutil import pysam -from scipy.special import betainc import shutil +import subprocess import sys from sys import getsizeof import time @@ -32,71 +32,15 @@ from utils import get_intervals, index_bam, write_rows_to_info_file, write_header_to_edit_info, \ write_read_to_bam_file, remove_file_if_exists, make_folder, concat_and_write_bams_wrapper, \ -pretty_print, read_barcode_whitelist_file, get_contigs_that_need_bams_written +pretty_print, read_barcode_whitelist_file, get_contigs_that_need_bams_written, \ +make_depth_command_script_single_cell, generate_and_run_bash_merge, get_sailor_sites, \ +concatenate_files, run_command, get_edits_with_coverage_df, zero_edit_found, delete_intermediate_files from core import run_edit_identifier, run_bam_reconfiguration, \ gather_edit_information_across_subcontigs, run_coverage_calculator, generate_site_level_information from annotate import annotate_sites, get_strand_specific_conversion -def zero_edit_found(final_site_level_information_df, output_folder, sailor_list, bedgraphs_list, keep_intermediate_files, start_time, logging_folder): - print("No edits found.") - sites_columns = ['site_id','barcode','contig','position','ref','alt','strand','count','coverage','conversion','strand_conversion'] - sites_empty_df = pd.DataFrame(columns=sites_columns) - sites_empty_df.to_csv('{}/final_filtered_site_info.tsv'.format(output_folder), sep='\t', index=False) - - Annotated_sites_columns = ['site_id','barcode','contig','position','ref','alt','strand','count','coverage','conversion','strand_conversion','feature_name','feature_type','feature_strand'] - annotated_sites_empty_df = pd.DataFrame(columns=Annotated_sites_columns) - annotated_sites_empty_df.to_csv('{}/final_filtered_site_info_annotated.tsv'.format(output_folder), sep='\t', index=False) - - empty_df = pd.DataFrame() - if len(sailor_list) > 0: - for conversion in sailor_list: - conversion_search = conversion[0] + '>' + conversion[1] - empty_df.to_csv('{}/sailor_style_sites_{}.bed'.format( - output_folder, - conversion_search.replace(">", "-")), - header=False, index=False, sep='\t') - - if len(bedgraphs_list) > 0: - bedgraph_folder = '{}/bedgraphs'.format(output_folder) - make_folder(bedgraph_folder) - for conversion in bedgraphs_list: - conversion_search = conversion[0] + '>' + conversion[1] - empty_df.to_csv('{}/{}_{}.bedgraph'.format(bedgraph_folder, output_folder.split('/')[-1], conversion), sep='\t', index=False, header=False) - - current, peak = tracemalloc.get_traced_memory() - - with open('{}/manifest.txt'.format(logging_folder), 'a+') as f: - f.write(f'sites\t{len(final_site_level_information_df)}\n') - f.write(f'peak_memory_mb\t{peak/1e6}\n') - f.write(f'time_elapsed_seconds\t{time.time()-start_time:.2f}s\n') - - print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB") - print(f'Time elapsed: {time.time()-start_time:.2f}s') - - if not keep_intermediate_files: - pretty_print("Deleting intermediate files...", style="-") - delete_intermediate_files(output_folder) - - pretty_print("Done!", style="+") - return 'Done' - - -def delete_intermediate_files(output_folder): - to_delete = ['coverage', 'edit_info', 'split_bams', 'all_edit_info.tsv', - 'concat_command.sh', 'depth_command.sh', 'combined.bed', 'merge_command.sh', - 'final_edit_info_no_coverage.tsv', 'final_edit_info_no_coverage_sorted.tsv', - 'depth_modified.tsv', 'final_edit_info.tsv', 'final_filtered_edit_info.tsv'] - for object in to_delete: - object_path = '{}/{}'.format(output_folder, object) - - if os.path.exists(object_path): - if os.path.isfile(object_path): - os.remove(object_path) - else: - shutil.rmtree(object_path) - def edit_finder(bam_filepath, output_folder, strandedness, barcode_tag="CB", barcode_whitelist=None, contigs=[], verbose=False, cores=64, min_read_quality=0, min_base_quality=0, dist_from_end=0, interval_length=2000000): @@ -140,6 +84,7 @@ def edit_finder(bam_filepath, output_folder, strandedness, barcode_tag="CB", bar return overall_label_to_list_of_contents, results, total_seconds_for_reads_df, overall_total_reads, counts_summary_df + def bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag='CB', cores=1, number_of_expected_bams=4, verbose=False): # Only used for single-cell and/or long read reconfiguration of bams to optimize coverage calculation @@ -148,7 +93,7 @@ def bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag contigs_to_generate_bams_for = get_contigs_that_need_bams_written(list(overall_label_to_list_of_contents.keys()), split_bams_folder, barcode_tag=barcode_tag, - number_of_expected_bams=number_of_expected_bams + number_of_expected_bams=number_of_expected_bams ) if verbose: pretty_print("Will split and reconfigure the following contigs: {}".format(",".join(contigs_to_generate_bams_for))) @@ -167,56 +112,6 @@ def bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag return total_bam_generation_time, total_seconds_for_bams_df -import subprocess - - -def concatenate_files(source_folder, file_pattern, output_filepath): - # Create the concatenation command with numeric sorting and header skipping - concat_command = ( - f"for f in $(ls -v {source_folder}/{file_pattern}); do " - "tail -n +2 \"$f\"; " # Skip the header row for each file - "done > {}".format(output_filepath) - ) - - # Write the command to a shell script - concat_bash = f"{source_folder}/concat_command.sh" - with open(concat_bash, 'w') as f: - f.write(concat_command) - - print("Concatenating files in numerical order without headers...") - subprocess.run(['bash', concat_bash]) - print("Done concatenating.") - - -def coverage_processing(output_folder, barcode_tag='CB', paired_end=False, verbose=False, cores=1, number_of_expected_bams=4, - min_read_quality=0, bam_filepath=''): - - # Single-cell or long read version: - edit_info_grouped_per_contig_combined = gather_edit_information_across_subcontigs(output_folder, - barcode_tag=barcode_tag, - number_of_expected_bams=number_of_expected_bams - ) - - #if verbose: - # print('edit_info_grouped_per_contig_combined', edit_info_grouped_per_contig_combined.keys()) - - - results, total_time, total_seconds_for_contig = run_coverage_calculator(edit_info_grouped_per_contig_combined, output_folder, - barcode_tag=barcode_tag, - paired_end=paired_end, - verbose=verbose, - processes=cores - ) - concatenate_files(output_folder, "coverage/*.tsv", "{}/final_edit_info.tsv".format(output_folder)) - - total_seconds_for_contig_df = pd.DataFrame.from_dict(total_seconds_for_contig, orient='index') - total_seconds_for_contig_df.columns = ['seconds'] - total_seconds_for_contig_df['contig sections'] = total_seconds_for_contig_df.index - total_seconds_for_contig_df.index = range(len(total_seconds_for_contig_df)) - - return results, total_time, total_seconds_for_contig_df - - def print_marine_logo(): logo_lines = [ ":::: :::: ::: ::::::::: ::::::::::: :::: ::: :::::::::: ", @@ -232,85 +127,6 @@ def print_marine_logo(): pretty_print("Multi-core Algorithm for Rapid Identification of Nucleotide Edits", style="=") -def calculate_sailor_score(sailor_row): - edits = sailor_row['count'] - num_reads = sailor_row['coverage'] - original_base_counts = num_reads - edits - - cov_margin = 0.01 - alpha, beta = 0, 0 - - num_failures = 0 - failure_messages = [] - try: - edit_frac = float(edits) / float(num_reads) - except Exception as e: - num_failures += 1 - print("Bad Row: {}".format(sailor_row)) - return None - - # calc smoothed counts and confidence - destination_base_smoothed = edits + alpha - origin_base_smoothed = original_base_counts + beta - - ######## MOST IMPORTANT LINE ######## - # calculates the confidence of theta as - # P( theta < cov_margin | A, G) ~ Beta_theta(G, A) - confidence = 1 - betainc(destination_base_smoothed, origin_base_smoothed, cov_margin) - return confidence - - -def get_sailor_sites(final_site_level_information_df, conversion="C>T", skip_coverage=False): - final_site_level_information_df = final_site_level_information_df[final_site_level_information_df['strand_conversion'] == conversion] - - if skip_coverage: - # Case where we want to skip coverage counting, we will just set it to -1 in the sailor-style output - coverage_col = "-1" - weird_sites = pd.DataFrame() - else: - # Normally, assume that there is a coverage column - coverage_col = final_site_level_information_df['coverage'].astype(str) - - weird_sites = final_site_level_information_df[ - (final_site_level_information_df.coverage == 0) |\ - (final_site_level_information_df.coverage < final_site_level_information_df['count'])] - - print("{} rows had coverage of 0 or more edits than coverage... filtering these out, but look into them...".format( - len(weird_sites))) - - final_site_level_information_df = final_site_level_information_df[ - (final_site_level_information_df.coverage > 0) & \ - (final_site_level_information_df.coverage >= final_site_level_information_df['count']) - ] - - - final_site_level_information_df['combo'] = final_site_level_information_df['count'].astype(str) + ',' + coverage_col - - if skip_coverage: - final_site_level_information_df['score'] = -1 - else: - final_site_level_information_df['score'] = final_site_level_information_df.apply(calculate_sailor_score, axis=1) - - final_site_level_information_df['start'] = final_site_level_information_df['position'] - final_site_level_information_df['end'] = final_site_level_information_df['position'] + 1 - - final_site_level_information_df = final_site_level_information_df[['contig', 'start', 'end', 'score', 'combo', 'strand']] - return final_site_level_information_df, weird_sites - - -def collate_edit_info_shards(output_folder): - edit_info_folder = "{}/edit_info".format(output_folder) - edit_info_files = glob("{}/*".format(edit_info_folder)) - print("\tFound {} files in {}...".format(len(edit_info_files), edit_info_folder)) - all_dfs = [] - for f in edit_info_files: - edit_info_df = pd.read_csv(f, sep='\t') - all_dfs.append(edit_info_df) - - collated_df = pd.concat(all_dfs) - print("\tShape of collated edit info df: {}".format(collated_df.shape)) - print("\tColumns of collated edit info df: {}".format(collated_df.columns)) - return collated_df def get_broken_up_contigs(contigs, num_per_sublist): broken_up_contigs = [] @@ -329,43 +145,56 @@ def get_broken_up_contigs(contigs, num_per_sublist): broken_up_contigs.append(contig_sublist) return broken_up_contigs -import subprocess - - -import subprocess - -def generate_and_run_bash_merge(output_folder, file1_path, file2_path, output_file_path, header_columns): - # Convert header list into a tab-separated string - header = "\t".join(header_columns) +def split_bed_file(input_bed_file, output_folder, bam_filepaths, output_suffix=''): + """ + Split a BED file into multiple files based on suffixes in the first column. + Each line is assigned to the appropriate file based on the suffix. - # Generate the Bash command for processing - bash_command = f"""#!/bin/bash - # Step 1: Adjust the depths file by adding a new column that incorporates both contig and position - # for join purposes, and sort by this column. Output in tab-separated format. - awk -v OFS='\\t' '{{print $1, $2-1, $3, $1":"($2)}}' "{file2_path}" | sort -k4,4n | uniq > {output_folder}/depth_modified.tsv + e.g.: - # Step 2: Sort the first file numerically by the join column (the column incuding both contig and position information) - sort -k3,3n "{file1_path}" | uniq > {output_folder}/final_edit_info_no_coverage_sorted.tsv - - # Step 3: Join the files on the specified columns, output all columns, and select necessary columns with tab separation -join -1 3 -2 4 -t $'\\t' {output_folder}/final_edit_info_no_coverage_sorted.tsv {output_folder}/depth_modified.tsv | awk -v OFS='\\t' '{{print $2, $3, $4, $5, $6, $7, $8, $11}}' > "{output_file_path}" + 10_AAACGAAAGTCACACT-1 6143263 6143264 + 10_AAACGAAAGTCACACT-1 11912575 11912576 + 10_AAACGAAAGTCACACT-1 12209751 12209752 + 10_AAACGAAAGTCACACT-1 13320235 13320236 + 10_AAACGAAAGTCACACT-1 27036085 27036086 - # Step 4: Add header to the output file - echo -e "{header}" | cat - "{output_file_path}" > temp && mv temp "{output_file_path}" """ + if not os.path.exists(output_folder): + os.makedirs(output_folder) - # Write the command to a shell script - bash_script_path = "{}/merge_command.sh".format(output_folder) - with open(bash_script_path, "w") as file: - file.write(bash_command) - - # Run the generated bash script - print("Running Bash merge script...") - subprocess.run(["bash", bash_script_path], check=True) - print(f"Merged file saved to {output_file_path}") + single_cell_approach = len(bam_filepaths) > 0 + + suffix_pairs = [ + (os.path.basename(bam).split("_")[0], + os.path.basename(bam).split("_")[1].split(".")[0]) for bam in bam_filepaths + ] + + # Open file handles for each suffix + file_handles = {} + for prefix, suffix in suffix_pairs: + output_file = os.path.join(output_folder, f"combined_{output_suffix}_{prefix}_{suffix}.bed") + file_handles[prefix + suffix] = open(output_file, 'w') + try: + with open(input_bed_file, 'r') as infile: + for line in infile: + # Parse the first column to determine the suffix + columns = line.split() + + chrom = columns[0] # Assuming the first column is the chromosome + for prefix, suffix in suffix_pairs: + if chrom.startswith(f"{prefix}_") and chrom.endswith(suffix): + file_handles[prefix + suffix].write(line) + break + + finally: + # Close all file handles + for handle in file_handles.values(): + handle.close() + -def generate_depths_with_samtools(output_folder, bam_filepaths): +def generate_depths(output_folder, bam_filepaths, paired_end=False, barcode_tag=None): + coverage_start_time = time.perf_counter() all_depth_commands = [] @@ -374,82 +203,221 @@ def generate_depths_with_samtools(output_folder, bam_filepaths): "echo 'concatenating bed file...';" "for file in {}/edit_info/*edit_info.tsv; do " "awk 'NR > 1 {{print $2, $4-1, $4}}' OFS='\t' \"$file\"; " - "done | sort -k1,1 -k2,2n -u > {}/combined.bed;" + "done | sort -k1,1 -k2,2n -u > {}/combined_source_cells.bed;" ).format(output_folder, output_folder) + + if not os.path.exists(f'{output_folder}/combined_source_cells.bed'): + run_command(combine_edit_sites_command) all_depth_commands.append(combine_edit_sites_command) + + output_suffix = 'source_cells' - for i, bam_filepath in enumerate(bam_filepaths): - depth_command = ( - "echo 'running samtools depth on {}...';" - "samtools depth -a -b {}/combined.bed {} >> {}/coverage/depths.txt" - ).format(bam_filepath, output_folder, bam_filepath, output_folder) - all_depth_commands.append(depth_command) + if barcode_tag: + coverage_subfolder = '{}/coverage'.format(output_folder) + make_folder(coverage_subfolder) + + # Single cell mode + split_bed_file( + f"{output_folder}/combined_{output_suffix}.bed", + f"{output_folder}/combined_{output_suffix}_split_by_suffix", + bam_filepaths, + output_suffix=output_suffix + ) - depth_bash = '{}/depth_command.sh'.format(output_folder) - with open(depth_bash, 'w') as f: - for depth_command in all_depth_commands: - f.write('{}\n\n'.format(depth_command)) + make_depth_command_script_single_cell(paired_end, bam_filepaths, output_folder, + all_depth_commands=all_depth_commands, output_suffix='source_cells', run=True, processes=cores, barcode_tag=barcode_tag) + + else: + if paired_end: + paired_end_flag = '-s ' + else: + paired_end_flag = '' + + # Bulk mode, we will not split the bed and simply use samtools depth on the combined.bed + samtools_depth_command = f"samtools depth {paired_end_flag}-a -b {output_folder}/combined_source_cells.bed {bam_filepath} > {output_folder}/depths_source_cells.txt" + run_command(samtools_depth_command) - print("Calculating depths...") - subprocess.run(['bash', depth_bash]) - print("Done calculating depths.") print("Concatenating edit info files...") - concatenate_files(output_folder, "edit_info/*edit_info.tsv", "{}/final_edit_info_no_coverage.tsv".format(output_folder)) + concatenate_files(output_folder, "edit_info/*edit_info.tsv", + "{}/final_edit_info_no_coverage.tsv".format(output_folder), + run=True) print("Append the depth columns to the concatenated final_edit_info file...") header_columns = ['barcode', 'contig', 'position', 'ref', 'alt', 'read_id', 'strand', 'coverage'] + generate_and_run_bash_merge(output_folder, '{}/final_edit_info_no_coverage.tsv'.format(output_folder), - '{}/coverage/depths.txt'.format(output_folder), - '{}/final_edit_info.tsv'.format(output_folder), header_columns) + '{}/depths_source_cells.txt'.format(output_folder), + '{}/final_edit_info.tsv'.format(output_folder), + header_columns, barcode_tag=barcode_tag) coverage_total_time = time.perf_counter() - coverage_start_time total_seconds_for_contig_df = pd.DataFrame({'coverage_total_time': [coverage_total_time]}) return coverage_total_time, total_seconds_for_contig_df + +def convert_sites_to_sailor(final_site_level_information_df, sailor_list, output_folder, skip_coverage): + # Output SAILOR-formatted file for use in FLARE downstream + # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # 1 629275 629276 0.966040688 2,30 + + # 1 629309 629310 2.8306e-05 1,1043 + + + for conversion in sailor_list: + conversion_search = conversion[0] + '>' + conversion[1] + + print("Generating SAILOR-style bed outputs for conversion {}...".format(conversion)) + + sailor_sites,weird_sites = get_sailor_sites(final_site_level_information_df, conversion_search, skip_coverage=skip_coverage) + sailor_sites = sailor_sites.drop_duplicates() + + print("{} final deduplicated {} SAILOR-formatted sites".format(len(sailor_sites), conversion_search)) + sailor_sites.to_csv('{}/sailor_style_sites_{}.bed'.format( + output_folder, + conversion_search.replace(">", "-")), + header=False, + index=False, + sep='\t') -def get_edits_with_coverage_df(output_folder, - barcode_tag=None, - paired_end=False): - - # For paired end, which was calculated using the pysam coverage functions. - if paired_end: - all_edit_info_unique_position_with_coverage_df = pd.read_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t', - index_col=0, - names=[ - 'barcode_position_index', 'barcode', 'contig', 'contig_position', 'position', 'ref', 'alt', 'read_id', - 'strand', 'barcode_position', - 'coverage', 'source', 'position_barcode'], dtype={'coverage': int, 'position': int, - 'contig': str}) - else: - # For bulk single-end or for single-cell, for which coverages were calculated using samtools depth. - all_edit_info_unique_position_with_coverage_df = pd.read_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t', - dtype={'coverage': int, 'position': int, - 'contig': str}) +def generate_bedgraphs(final_site_level_information_df, conversion_search, output_folder): + bedgraph_folder = '{}/bedgraphs'.format(output_folder) + make_folder(bedgraph_folder) - # If there was a barcode specified, then the contigs will have been set to a combination of contig and barcode ID. - # For example, we will find a contig to be 9_GATCCCTCAGTAACGG-1, and we will want to simplify it back to simply 9, - # as the barcode information is separately already present as it's own column in this dataframe. To ensure code continuity, - # this will still be true even with no barcode specified, in which case the contig will be _no_barcode + pretty_print("Making bedgraphs for {} conversions...\n".format(bedgraphs_list)) + for conversion in bedgraphs_list: + conversion_search = conversion[0] + '>' + conversion[1] + sites_for_conversion = final_site_level_information_df[final_site_level_information_df.conversion == conversion_search] + sites_for_conversion['edit_fraction'] = sites_for_conversion['count']/sites_for_conversion['coverage'] + sites_for_conversion['start'] = sites_for_conversion['position'] - 1 + sites_for_conversion_bedgraph_cols = sites_for_conversion[['contig', 'start', 'position', 'edit_fraction']] - # Therefore: replace the contig with the part before the barcode - all_edit_info_unique_position_with_coverage_df['contig'] = all_edit_info_unique_position_with_coverage_df.apply(lambda row: row['contig'].replace('_{}'.format(row['barcode']), ''), axis=1) + sites_for_conversion_bedgraph_cols.to_csv('{}/{}_{}.bedgraph'.format(bedgraph_folder, output_folder.split('/')[-1], conversion), sep='\t', index=False, header=False) - return all_edit_info_unique_position_with_coverage_df - - + +def prepare_combinations_for_split(df, bam_filepaths, output_folder, output_suffix): + """ + Prepares the chromosome-suffix combinations for multiprocessing. + For each edited position in a given barcode, we want to look at the coverage at that + position for that chromosome across all the other barcodes. + + Args: + df (pd.DataFrame): Filtered DataFrame containing edit data. + bam_filepaths (list): List of BAM filepaths to extract suffix pairs. + output_folder (str): Path to the output folder for split BED files. + output_suffix (str): Suffix for output files. + + Returns: + list: List of tuples for processing. + """ + # Extract prefix and suffix from BAM filenames + suffix_pairs = [ + (os.path.basename(bam).split("_")[0], os.path.basename(bam).split("_")[1].split(".")[0]) + for bam in bam_filepaths + ] + print(f"suffix_pairs is {suffix_pairs}") + + # Unique chromosomes in the dataset + unique_chromosomes = df['contig'].unique() + + # Prepare combinations of chromosomes and suffix pairs + combinations = [] + for chrom in unique_chromosomes: + print(f"\tChecking {chrom}...") + + chrom = str(chrom) + df['contig'] = df['contig'].astype(str) + df_for_chrom = df[df['contig'] == chrom] + unique_positions = df_for_chrom.position.unique() + for prefix, suffix in suffix_pairs: + + if prefix == chrom: + print(f"\t\tGenerating for ({prefix},{suffix})") + df_for_prefix_suffix = df_for_chrom[df_for_chrom['barcode'].str.endswith(suffix)] + unique_barcodes = df_for_prefix_suffix.barcode.unique() + + combinations.append((chrom, prefix, suffix, unique_positions, unique_barcodes, output_folder, output_suffix)) + + print(f"\t\t\t{prefix}_{suffix}: Unique positions: {len(unique_positions)}, Unique barcodes: {len(unique_barcodes)}") + + return combinations + +def process_combination_for_split(args): + """ + Processes a single combination of chromosome, prefix, suffix, positions, and barcodes + to write split BED files. + + Args: + args (tuple): Contains chromosome, prefix, suffix, positions, barcodes, + output folder, and output suffix. + """ + chrom, prefix, suffix, unique_positions, unique_barcodes, output_folder, output_suffix = args + + # Output file path + output_file = os.path.join(output_folder, f"combined_{output_suffix}_{prefix}_{suffix}.bed") + + # Write combinations directly to the file + with open(output_file, "w") as f: + for position in unique_positions: + for barcode in unique_barcodes: + contig = f"{chrom}_{barcode}" # Construct contig using chromosome and barcode + f.write(f"{contig}\t{position-1}\t{position}\n") + + print(f"\t\t\t>>> Processed {chrom}, {prefix}_{suffix} -> {output_file}") + + +def generate_and_split_bed_files_for_all_edits(output_folder, bam_filepaths, strand_conversion, processes=4, output_suffix="all_cells"): + """ + Generates combined BED files for all edit sites and splits them into suffix-specific files. + + Args: + output_folder (str): Path to the output folder. + bam_filepaths (list): List of BAM filepaths for suffix extraction. + strand_conversion (str): Strand conversion type (e.g., 'A>G'). + processes (int): Number of multiprocessing workers. + output_suffix (str): Suffix for output files. + """ + input_file = f"{output_folder}/final_filtered_site_info.tsv" + df = pd.read_csv(input_file, sep="\t") + + # Filter by strand conversion + df = df[df['strand_conversion'] == strand_conversion] + print(f"Running all positions for {len(df)} {strand_conversion} edits") + + # Prepare combinations for multiprocessing + split_bed_folder = f"{output_folder}/combined_{output_suffix}_split_by_suffix" + os.makedirs(split_bed_folder, exist_ok=True) + + # Cleanup existing .bed files in the output folder + existing_bed_files = glob(os.path.join(split_bed_folder, "*.bed")) + if existing_bed_files: + print(f"Found {len(existing_bed_files)} existing .bed files. Removing...") + for file in existing_bed_files: + os.remove(file) + print("Existing .bed files removed. Starting fresh.") + + combinations = prepare_combinations_for_split(df, bam_filepaths, f"{output_folder}/combined_{output_suffix}_split_by_suffix", output_suffix) + + # Run the processing with multiprocessing + with Pool(processes=processes) as pool: + pool.map(process_combination_for_split, combinations) + + print(f"All split BED files generated in {output_folder}/combined_{output_suffix}_split_by_suffix") + + + def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strandedness=True, barcode_tag="CB", paired_end=False, barcode_whitelist_file=None, verbose=False, coverage_only=False, filtering_only=False, annotation_only=False, bedgraphs_list=[], sailor_list=[], min_base_quality = 15, min_read_quality = 0, min_dist_from_end = 10, max_edits_per_read = None, cores = 64, number_of_expected_bams=4, keep_intermediate_files=False, num_per_sublist=6, - skip_coverage=False, interval_length=2000000): + skip_coverage=False, interval_length=2000000, + all_cells_coverage=False + ): # Check to make sure the folder is empty, otherwise prompt for overwriting if any(os.scandir(output_folder)): @@ -474,8 +442,20 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand f.write('min_read_quality\t{}\n'.format(min_read_quality)) f.write('min_dist_from_end\t{}\n'.format(min_dist_from_end)) f.write('skip_coverage\t{}\n'.format(skip_coverage)) - - if not (coverage_only or filtering_only): + + + # Check if filtering step finished + final_filtered_sites_path = '{}/final_filtered_site_info.tsv'.format(output_folder) + final_path_already_exists = False + final_annotated_path_already_exists = False + + if os.path.exists(final_filtered_sites_path): + print("{} exists... skipping edit finding.".format(final_filtered_sites_path)) + final_path_already_exists = True + + # Edit finding + if not (coverage_only or filtering_only) and not final_path_already_exists: + overall_total_reads_processed = 0 if barcode_whitelist_file: barcode_whitelist = read_barcode_whitelist_file(barcode_whitelist_file) else: @@ -532,7 +512,8 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand pretty_print("Contigs processed\n\n\t{}".format(sorted(list(overall_label_to_list_of_contents.keys())))) pretty_print("Splitting and reconfiguring BAMs to optimize coverage calculations", style="~") # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - + + total_bam_generation_time, total_seconds_for_bams_df = bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag=barcode_tag, cores=cores, number_of_expected_bams=number_of_expected_bams, verbose=verbose) #total_seconds_for_bams_df.to_csv("{}/bam_reconfiguration_timing.tsv".format(logging_folder), sep='\t') pretty_print("Total time to concat and write bams: {} minutes".format(round(total_bam_generation_time/60, 3))) @@ -540,71 +521,31 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand print("Deleting overall_label_to_list_of_contents...") del overall_label_to_list_of_contents - - with open('{}/manifest.txt'.format(logging_folder), 'a+') as f: - f.write(f'total_reads_processed\t{overall_total_reads_processed}\n') - for k, v in overall_counts_summary_df.items(): - f.write(f'{k}\t{v}\n') + + with open('{}/manifest.txt'.format(logging_folder), 'a+') as f: + f.write(f'total_reads_processed\t{overall_total_reads_processed}\n') + for k, v in overall_counts_summary_df.items(): + f.write(f'{k}\t{v}\n') - f.write(f'edits per read (EPR)\t{overall_counts_summary_df.get("total_edits")/overall_total_reads_processed}\n') + f.write(f'edits per read (EPR)\t{overall_counts_summary_df.get("total_edits")/overall_total_reads_processed}\n') - if not filtering_only and not skip_coverage: + reconfigured_bam_filepaths = glob('{}/split_bams/*/*.bam'.format(output_folder)) + + if not final_path_already_exists and not skip_coverage: # Coverage calculation # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ pretty_print("Calculating coverage at edited sites, minimum read quality is {}...".format(min_read_quality), style='~') - coverage_subfolder = '{}/coverage'.format(output_folder) - make_folder(coverage_subfolder) - - if paired_end: - results, total_time, total_seconds_for_contig_df = coverage_processing(output_folder, - barcode_tag=barcode_tag, - paired_end=paired_end, - verbose=verbose, - cores=cores, - number_of_expected_bams=number_of_expected_bams, - min_read_quality=min_read_quality, - bam_filepath=bam_filepath - ) - else: - # for bulk single-end or single-cell, we will just concatenate all the edits files into one big bed file, and then run samtools depth - if not barcode_tag: - total_time, total_seconds_for_contig_df = generate_depths_with_samtools(output_folder, [bam_filepath]) - elif barcode_tag: - # for single-end, we want to run the samtools depth command for everyone of the reconfigured bam files - reconfigured_bam_filepaths = glob('{}/split_bams/*/*.bam'.format(output_folder)) - print("Running samtools depth on {} reconfigured bam paths...".format(len(reconfigured_bam_filepaths))) - total_time, total_seconds_for_contig_df = generate_depths_with_samtools(output_folder, reconfigured_bam_filepaths) - - + # We want to run the samtools depth command for each of the reconfigured bam files + print("Running samtools depth on {} subset bam paths...".format(len(reconfigured_bam_filepaths))) + total_time, total_seconds_for_contig_df = generate_depths(output_folder, reconfigured_bam_filepaths, paired_end=paired_end, barcode_tag=barcode_tag) + total_seconds_for_contig_df.to_csv("{}/coverage_calculation_timing.tsv".format(logging_folder), sep='\t') pretty_print("Total time to calculate coverage: {} minutes".format(round(total_time/60, 3))) - - if skip_coverage: - # If we skip coverage steps, we have to just collate edit info results from the edit finding steps into one giant - # final_edit_info.tsv file (where coverage is None) - pretty_print("Skipped coverage, so combining edit info alone...") - all_edit_info_unique_position_with_coverage_df = collate_edit_info_shards(output_folder) - - - # Check if filtering step finished - final_filtered_sites_path = '{}/final_filtered_site_info.tsv'.format(output_folder) - final_path_already_exists = False - final_annotated_path_already_exists = False - - if os.path.exists(final_filtered_sites_path): - print("{} exists...".format(final_filtered_sites_path)) - final_path_already_exists = True - - # Filtering steps - if not final_path_already_exists: - print("Filtering..") - all_edit_info_unique_position_with_coverage_df = get_edits_with_coverage_df(output_folder, - barcode_tag=barcode_tag, - paired_end=paired_end) + barcode_tag=barcode_tag) pretty_print("\tNumber of edits after filtering:\n\t{}".format(len(all_edit_info_unique_position_with_coverage_df))) @@ -634,43 +575,11 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand if len(sailor_list) > 0: print("{} sites being converted to SAILOR format...".format(len(final_site_level_information_df))) - - # Output SAILOR-formatted file for use in FLARE downstream - # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - # 1 629275 629276 0.966040688 2,30 + - # 1 629309 629310 2.8306e-05 1,1043 + - - for conversion in sailor_list: - conversion_search = conversion[0] + '>' + conversion[1] - - print("Generating SAILOR-style bed outputs for conversion {}...".format(conversion)) - - sailor_sites,weird_sites = get_sailor_sites(final_site_level_information_df, conversion_search, skip_coverage=skip_coverage) - sailor_sites = sailor_sites.drop_duplicates() - - print("{} final deduplicated {} SAILOR-formatted sites".format(len(sailor_sites), conversion_search)) - sailor_sites.to_csv('{}/sailor_style_sites_{}.bed'.format( - output_folder, - conversion_search.replace(">", "-")), - header=False, - index=False, - sep='\t') - + convert_sites_to_sailor(final_site_level_information_df, sailor_list, output_folder, skip_coverage) + if len(bedgraphs_list) > 0: # Make plot of edit distributions - bedgraph_folder = '{}/bedgraphs'.format(output_folder) - make_folder(bedgraph_folder) - - pretty_print("Making bedgraphs for {} conversions...\n".format(bedgraphs_list)) - for conversion in bedgraphs_list: - conversion_search = conversion[0] + '>' + conversion[1] - sites_for_conversion = final_site_level_information_df[final_site_level_information_df.conversion == conversion_search] - sites_for_conversion['edit_fraction'] = sites_for_conversion['count']/sites_for_conversion['coverage'] - sites_for_conversion['start'] = sites_for_conversion['position'] - 1 - sites_for_conversion_bedgraph_cols = sites_for_conversion[['contig', 'start', 'position', 'edit_fraction']] - - sites_for_conversion_bedgraph_cols.to_csv('{}/{}_{}.bedgraph'.format(bedgraph_folder, output_folder.split('/')[-1], conversion), sep='\t', index=False, - header=False) + generate_bedgraphs(final_site_level_information_df, bedgraphs_list, output_folder) if not annotation_bedfile_path: print("annotation_bedfile_path argument not provided ...\ @@ -683,7 +592,8 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand if len(final_site_level_information_df) == 0: output_zero_edit_files = zero_edit_found(final_site_level_information_df, output_folder, sailor_list, bedgraphs_list, keep_intermediate_files, start_time, logging_folder) return 'Done!' - + + # Annotation option if final_path_already_exists and annotation_bedfile_path: final_site_level_information_df = pd.read_csv('{}/final_filtered_site_info.tsv'.format(output_folder), sep='\t') @@ -693,12 +603,11 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand sep='\t', index=False) final_annotated_path_already_exists = True - + # Make plot of edit distributions if final_path_already_exists: final_site_level_information_df = pd.read_csv('{}/final_filtered_site_info.tsv'.format(output_folder), sep='\t') - # Make plot of edit distributions plot_folder = '{}/plots'.format(output_folder) make_folder(plot_folder) @@ -719,6 +628,34 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], strand print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB") print(f'Time elapsed: {time.time()-start_time:.2f}s') + if final_path_already_exists and all_cells_coverage: + output_suffix = "all_cells" + print("Calculating coverage at all edit sites in all cells...") + + # Define the strand conversion type (e.g., 'A>G') + strand_conversion = "A>G" + + # Get the list of BAM file paths + bam_filepaths = glob(f"{output_folder}/split_bams/*/*.bam") + + # Generate and split BED files using multiprocessing + generate_and_split_bed_files_for_all_edits(output_folder, + bam_filepaths, + strand_conversion="A>G", + processes=cores, + output_suffix=output_suffix) + + make_depth_command_script_single_cell( + paired_end, + reconfigured_bam_filepaths, + output_folder, + output_suffix=output_suffix, + run=True, + pivot=True, + processes=cores, + barcode_tag=barcode_tag + ) + if not keep_intermediate_files: pretty_print("Deleting intermediate files...", style="-") delete_intermediate_files(output_folder) @@ -772,6 +709,7 @@ def check_samtools(): parser.add_argument('--num_per_sublist', dest='num_per_sublist', type=int, default=6) parser.add_argument('--paired_end', dest='paired_end', action='store_true', help='Assess coverage taking without double-counting paired end overlapping regions... slower but more accurate. Edits by default are only counted once for an entire pair, whether they show up on both ends or not.') parser.add_argument('--skip_coverage', dest='skip_coverage', action='store_true') + parser.add_argument('--all_cells_coverage', dest='all_cells_coverage', action='store_true') parser.add_argument('--max_edits_per_read', type=int, default=None) parser.add_argument('--num_intervals_per_contig', type=int, default=200, help='Deprecated') parser.add_argument('--interval_length', type=int, default=2000000, help='Length of intervals split analysis into...') @@ -795,8 +733,9 @@ def check_samtools(): verbose = args.verbose keep_intermediate_files = args.keep_intermediate_files paired_end = args.paired_end + all_cells_coverage = args.all_cells_coverage skip_coverage = args.skip_coverage - + barcode_tag = args.barcode_tag min_base_quality = args.min_base_quality min_read_quality = args.min_read_quality @@ -808,6 +747,19 @@ def check_samtools(): num_per_sublist = args.num_per_sublist + # all_cells_coverage only applies for single cell case + if not barcode_tag: + if all_cells_coverage == True: + all_cells_coverage = False + + """ + # Convert all_cells_coverage into list of conversions for which to output all cell info + if not all_cells_coverage is None: + all_cells_coverage_list = all_cells_coverage.upper().replace('I', 'G').split(',') + for c in all_cells_coverage_list: + assert(c in ['AC', 'AG', 'AT', 'CA', 'CG', 'CT', 'GA', 'GC', 'GT', 'TA', 'TC', 'TG']) + """ + # Convert bedgraphs argument into list of conversions if not bedgraphs is None: if barcode_tag in ['CB', 'IB']: @@ -882,7 +834,8 @@ def check_samtools(): "\tVerbose:\t{}".format(verbose), "\tKeep intermediate files:\t{}".format(keep_intermediate_files), "\tSkip coverage?:\t{}".format(skip_coverage), - "\tFor single-cell: \t{} contigs at at time\n".format(num_per_sublist) + "\tFor single-cell: \t{} contigs at at time\n".format(num_per_sublist), + "\tCalculate coverage in all barcodes?: \t{}\n".format(all_cells_coverage) ]) if not paired_end: @@ -920,7 +873,8 @@ def check_samtools(): skip_coverage=skip_coverage, keep_intermediate_files=keep_intermediate_files, num_per_sublist=num_per_sublist, - interval_length=interval_length + interval_length=interval_length, + all_cells_coverage=all_cells_coverage ) - \ No newline at end of file + diff --git a/src/core.py b/src/core.py index b96123c..8b9a1ef 100644 --- a/src/core.py +++ b/src/core.py @@ -149,7 +149,7 @@ def find_edits(bampath, contig, split_index, start, end, output_folder, barcode_ output_file = '{}/{}_{}_{}_{}_edit_info.tsv'.format(edit_info_subfolder, contig, split_index, start, end) output_bedfile = '{}/{}_{}_{}_{}_edit_positions.bed'.format(edit_info_subfolder, contig, split_index, start, end) - + remove_file_if_exists(output_file) with open(output_file, 'w') as f: diff --git a/src/utils.py b/src/utils.py index ff04d72..d005bde 100644 --- a/src/utils.py +++ b/src/utils.py @@ -6,10 +6,16 @@ import pandas as pd import numpy as np import sys +import subprocess from collections import OrderedDict, defaultdict from itertools import product +from scipy.special import betainc +import shutil +from multiprocessing import Pool +import multiprocessing +import time -cb_n = 2 +CB_N = 1 def generate_permutations_list_for_CB(n): """ @@ -30,10 +36,9 @@ def generate_permutations_list_for_CB(n): result = [f"{combo}-1" for combo in combinations] return result - - + suffixes = { - 'CB': generate_permutations_list_for_CB(cb_n), + 'CB': generate_permutations_list_for_CB(CB_N), 'IS': [ "00","01","02","03","04","05","06","07","08","09", "10","11","12","13","14","15","16","17","18","19", @@ -99,7 +104,7 @@ def get_contigs_that_need_bams_written(expected_contigs, split_bams_folder, barc subsets_per_contig[contig_label] += 1 if barcode_tag == 'CB': - number_of_expected_bams = 4 + number_of_expected_bams = 4**CB_N else: number_of_expected_bams = number_of_expected_bams @@ -577,6 +582,18 @@ def concat_and_write_bams(contig, df_dict, header_string, split_bams_folder, bar print("\t{} suffixes".format(len(suffix_options))) for suffix in suffix_options: + # Make a sub-subfolder to put the bams for this specific contig + contig_folder = '{}/{}_{}/'.format(split_bams_folder, contig, suffix) + if not os.path.exists(contig_folder): + os.mkdir(contig_folder) + + + bam_file_name = '{}/{}_{}.bam'.format(contig_folder, contig, suffix) + + if os.path.exists(f"{bam_file_name}.bai"): + print(f"{bam_file_name}.bai, skipping...") + return + if barcode_tag: all_contents_for_suffix = all_contents_df.filter(pl.col('barcode').str.ends_with(suffix)) else: @@ -614,15 +631,6 @@ def concat_and_write_bams(contig, df_dict, header_string, split_bams_folder, bar reads_count_for_suffix )) sys.exit(1) - - - # Make a sub-subfolder to put the bams for this specific contig - contig_folder = '{}/{}_{}/'.format(split_bams_folder, contig, suffix) - if not os.path.exists(contig_folder): - os.mkdir(contig_folder) - - - bam_file_name = '{}/{}_{}.bam'.format(contig_folder, contig, suffix) # Write, sort and index bam immediately write_reads_to_file(reads_deduped, bam_file_name, header_string) @@ -634,6 +642,7 @@ def concat_and_write_bams(contig, df_dict, header_string, split_bams_folder, bar rm_bam(bam_file_name) except Exception as e: print("Failed at indexing {}".format(bam_file_name)) + raise e def concat_and_write_bams_wrapper(params): @@ -641,3 +650,475 @@ def concat_and_write_bams_wrapper(params): # print("df_dict keys: {}".format(df_dict.keys())) concat_and_write_bams(contig, df_dict, header_string, split_bams_folder, barcode_tag=barcode_tag, number_of_expected_bams=number_of_expected_bams, verbose=verbose) + + +def generate_and_run_bash_merge(output_folder, file1_path, file2_path, output_file_path, header_columns, barcode_tag=None): + # Convert header list into a tab-separated string + header = "\t".join(header_columns) + + position_adjustment = '1' # for samtools view + if not barcode_tag: + position_adjustment = '0' # for samtools depth + print(f"position adjustment is {position_adjustment} (barcode_tag is {barcode_tag})") + + # Generate the Bash command for processing + bash_command = f"""#!/bin/bash + # Step 1: Adjust the depths file by adding a new column that incorporates both contig and position + # for join purposes, and sort by this column. Output in tab-separated format. + awk -v OFS='\\t' '{{print $1, $2+{position_adjustment}, $3, $1":"($2+{position_adjustment})}}' "{file2_path}" | sort -k4,4V | uniq > {output_folder}/depth_modified.tsv + + # Step 2: Sort the first file numerically by the join column (the column incuding both contig and position information) + sort -k3,3V "{file1_path}" | uniq > {output_folder}/final_edit_info_no_coverage_sorted.tsv + + # Step 3: Join the files on the specified columns, output all columns, and select necessary columns with tab separation +join -1 3 -2 4 -t $'\\t' {output_folder}/final_edit_info_no_coverage_sorted.tsv {output_folder}/depth_modified.tsv | awk -v OFS='\\t' '{{print $2, $3, $4, $5, $6, $7, $8, $11}}' > "{output_file_path}" + + # Step 4: Add header to the output file + echo -e "{header}" | cat - "{output_file_path}" > temp && mv temp "{output_file_path}" + """ + + # Write the command to a shell script + bash_script_path = "{}/merge_command.sh".format(output_folder) + with open(bash_script_path, "w") as file: + file.write(bash_command) + + # Run the generated bash script + print("Running Bash merge script...") + subprocess.run(["bash", bash_script_path], check=True) + print(f"Merged file saved to {output_file_path}") + + +def generate_empty_matrix_file(matrix_output_filepath): + pass + + +def pivot_depths_output(depths_input_filepath, matrix_output_filepath): + """ + Transform depths data into a pivoted matrix with reformatted row and column labels. + + The rows will look like this: + + 9_GATCCCTCAGTAACGG-1 3000448 1 + 9_GATCCCTCAGTAACGG-1 3000469 1 + 9_GATCCCTCAGTAACGG-1 3000508 3 + + We want them to look like this: + + GATCCCTCAGTAACGG-1 9:3000448 1 + GATCCCTCAGTAACGG-1 9:3000469 1 + GATCCCTCAGTAACGG-1 9:3000508 3 + + # And after the pivot: + 9:3000448 9:3000469 9:3000508 + GATCCCTCAGTAACGG 1 1 3 + + """ + # Read the universal coverage data + df = pd.read_csv(depths_input_filepath, sep="\t", header=None, names=["Contig", "Position", "Coverage"]) + + + # Extract sample ID from the Contig and create a combined column for Position + df["Barcode"] = df["Contig"].str.split("_", n=1).str[1] # Extract everything after the first underscore + df["CombinedPosition"] = df["Contig"].str.split("_", n=1).str[0] + ":" + df["Position"].astype(str) + + # Drop the original Contig and Position columns (optional, for clarity) + df = df[["Barcode", "CombinedPosition", "Coverage"]] + + # Pivot the data to make a matrix of Sample x CombinedPosition with values being Coverage + pivot = df.pivot(index="CombinedPosition", columns="Barcode", values="Coverage").fillna(0) + + # Write to output + pivot.to_csv(matrix_output_filepath, sep="\t") + + + +def run_command(command): + # Run the samtools depth command + subprocess.run(command, shell=True, check=True) + + +def merge_files_by_chromosome(args): + """ + Helper function to merge files for a single chromosome. + """ + chromosome, files, output_folder = args + first_file = files[0] + other_files = files[1:] + merged_file = os.path.join(output_folder, f"{chromosome}_comprehensive_coverage_matrix.tsv") + + # Prepare the paste command + strip_headers_command = " ".join( + [f"<(cut -f2- {file})" for file in other_files] + ) + paste_command = f"paste {first_file} {strip_headers_command} > {merged_file}" + + # Use bash to execute the paste command + run_command(f"bash -c '{paste_command}'") + print(f"Columnar merge complete for {chromosome}. Output saved to {merged_file}.") + + +def prepare_matrix_files_multiprocess(output_matrix_folder, + output_folder, + processes=4): + """ + Merges matrix files column-wise, grouping by chromosome, using multiprocessing. + """ + print("Merging matrices column-wise by chromosome...") + + # Group files by chromosome + matrix_files = [ + os.path.join(output_matrix_folder, file) + for file in os.listdir(output_matrix_folder) + if file.endswith("matrix.tsv") and os.path.getsize(os.path.join(output_matrix_folder, file)) > 20 # Skip empty files + ] + + if not matrix_files: + print("No non-empty matrix files to merge.") + return + + # Group files by chromosome + file_groups = {} + for file in matrix_files: + chromosome = os.path.basename(file).split("all_cells_")[-1].split("_")[0] # Adjust this split logic as needed + file_groups.setdefault(chromosome, []).append(file) + + # Prepare arguments for multiprocessing + task_args = [(chromosome, files, output_folder) for chromosome, files in file_groups.items()] + + # Use multiprocessing to run the merge for each chromosome + with Pool(processes=processes) as pool: + pool.map(merge_files_by_chromosome, task_args) + + print("All columnar merges complete.") + + + +def merge_depth_files(output_folder, output_suffix=''): + """ + Merges depth files into a single file. + """ + print("Merging depths...") + + depth_files = f"{output_folder}/coverage/depths_{output_suffix}*.txt" + merged_depth_file = f"{output_folder}/depths_{output_suffix}.txt" + run_command(f"cat {depth_files} > {merged_depth_file}") + print(f"Depths merged into {merged_depth_file}.") + + + +def calculate_coverage(bam_filepath, bed_filepath, output_filepath, output_matrix_folder): + """ + Mimic samtools depth -a by including positions with zero coverage and applying equivalent filters. + """ + + depths_file = output_filepath + + print(f"Running samtools view on {bed_filepath} for {bam_filepath}, outputting to {output_filepath}") + + regions = [] + with open(bed_filepath, "r") as bed: + for line in bed: + fields = line.strip().split() + chrom, start, end = fields[0], int(fields[1]), int(fields[2]) + regions.append((chrom, start, end)) + + if len(regions) > 0: + print(f"\t{len(regions)} regions") + + with pysam.AlignmentFile(bam_filepath, "rb") as bam, open(depths_file, "w") as out: + try: + for chrom, start, end in regions: + # Get coverage, applying base quality filter + coverage = bam.count_coverage( + chrom, start, end, quality_threshold=0 # Mimics -Q 13 in samtools depth + ) + + # Calculate total coverage for each position and include zero-coverage positions + for pos in range(start, end): + total_coverage = sum(base[pos - start] for base in coverage) + out.write(f"{chrom}\t{pos}\t{total_coverage}\n") + except Exception as e: + # Return the exception and the arguments for debugging + raise RuntimeError( + f"Error processing bam {bam_filepath} using bed {bed_filepath} at {chrom}:{start}-{end}, " + f"outputting to {depths_file}: {e}" + ) + + if output_matrix_folder: + matrix_output_file = os.path.join(output_matrix_folder, f"{os.path.basename(depths_file).split('.')[0]}_matrix.tsv") + + if not os.path.exists(matrix_output_file) or os.path.getsize(matrix_output_file) == 0: + # Pivot the depths output file into a matrix + pivot_depths_output(depths_file, matrix_output_file) + + +def run_pysam_count_coverage(args_list, processes): + """ + Runs pysam.count_coverage in parallel using multiprocessing. + """ + try: + with Pool(processes=processes) as pool: + pool.starmap(calculate_coverage, args_list) + + except Exception as e: + print(f"Aborting: One or more coverage calculations failed with an error: {e}", file=sys.stderr) + sys.exit(1) # Exit with an error code + + +def prepare_pysam_coverage_args(bam_filepaths, output_folder, output_suffix='', pivot=False, barcode_tag=None): + """ + Prepare arguments for pysam coverage calculation for each BAM file and its BED file. + """ + # Create a folder for matrix outputs if pivoting is enabled + if pivot: + output_matrix_folder = f"{output_folder}/matrix_outputs" + os.makedirs(output_matrix_folder, exist_ok=True) + else: + output_matrix_folder = None + + args_list = [] + + # Format output suffix + if len(output_suffix) > 0: + output_suffix = f"_{output_suffix}" + + print("prepare_pysam_coverage_args, barcode_tag is {}".format(barcode_tag)) + + for bam_filepath in bam_filepaths: + # Extract suffix from BAM filename + bam_filename = os.path.basename(bam_filepath) + bam_prefix, bam_suffix = bam_filename.split("_")[0], bam_filename.split("_")[1].split(".")[0] + + if barcode_tag: + # Path to the corresponding split BED file + bed_filepath = f"{output_folder}/combined{output_suffix}_split_by_suffix/combined{output_suffix}_{bam_prefix}_{bam_suffix}.bed" + output_filepath = f"{output_folder}/coverage/depths{output_suffix}_{bam_prefix}_{bam_suffix}.txt" + + else: + # bulk case + bed_filepath = f"{output_folder}/combined{output_suffix}.bed" + output_filepath = f"{output_folder}/coverage/depths{output_suffix}_{bam_filename.split('_')[1]}.txt" + + + if os.path.exists(bed_filepath): + args_list.append((bam_filepath, bed_filepath, output_filepath, output_matrix_folder)) + else: + print(f"Did not find {bed_filepath}") + + + return args_list + + + +def make_depth_command_script_single_cell(paired_end, bam_filepaths, output_folder, all_depth_commands=[], + output_suffix='', run=False, pivot=False, processes=4, barcode_tag=None): + """ + Main function to generate and execute samtools depth commands, and optionally pivot and merge matrices. + """ + samtools_depth_start_time = time.perf_counter() + + # Prepare pysam coverage arguments + pysam_coverage_args = prepare_pysam_coverage_args(bam_filepaths, output_folder, output_suffix=output_suffix, pivot=pivot, barcode_tag=barcode_tag) + + print(f"\tPrepared coverage arguments for {len(pysam_coverage_args)} BAM files.") + + #all_depth_commands += samtools_depth_commands + #print(f"\tsamtools_depth_commands: {len(samtools_depth_commands)}") + + with open('{}/depth_commands_{}.txt'.format(output_folder, output_suffix), 'w') as f: + for d in all_depth_commands: + f.write('{}\n\n'.format(d)) + + if run: + print("Calculating depths using multiprocessing with pysam...") + run_pysam_count_coverage(pysam_coverage_args, processes) + + merge_depth_files(output_folder, output_suffix) + + if pivot: + output_matrix_folder = f"{output_folder}/matrix_outputs" + final_combined_matrices_folder = f"{output_folder}/final_matrix_outputs" + os.makedirs(final_combined_matrices_folder, exist_ok=True) + + print("\nRunning pivot...\n") + prepare_matrix_files_multiprocess( + output_matrix_folder, + final_combined_matrices_folder, + processes=processes + ) + + samtools_depth_total_time = time.perf_counter() - samtools_depth_start_time + + + if pivot: + print(f"Total seconds for samtools depth and matrix generation commands to run: {samtools_depth_total_time}") + else: + print(f"Total seconds for samtools depth commands to run: {samtools_depth_total_time}") + + + +def calculate_sailor_score(sailor_row): + edits = sailor_row['count'] + num_reads = sailor_row['coverage'] + original_base_counts = num_reads - edits + + cov_margin = 0.01 + alpha, beta = 0, 0 + + num_failures = 0 + failure_messages = [] + try: + edit_frac = float(edits) / float(num_reads) + except Exception as e: + num_failures += 1 + print("Bad Row: {}".format(sailor_row)) + return None + + # calc smoothed counts and confidence + destination_base_smoothed = edits + alpha + origin_base_smoothed = original_base_counts + beta + + ######## MOST IMPORTANT LINE ######## + # calculates the confidence of theta as + # P( theta < cov_margin | A, G) ~ Beta_theta(G, A) + confidence = 1 - betainc(destination_base_smoothed, origin_base_smoothed, cov_margin) + return confidence + + +def get_sailor_sites(final_site_level_information_df, conversion="C>T", skip_coverage=False): + final_site_level_information_df = final_site_level_information_df[final_site_level_information_df['strand_conversion'] == conversion] + + if skip_coverage: + # Case where we want to skip coverage counting, we will just set it to -1 in the sailor-style output + coverage_col = "-1" + weird_sites = pd.DataFrame() + else: + # Normally, assume that there is a coverage column + coverage_col = final_site_level_information_df['coverage'].astype(str) + + weird_sites = final_site_level_information_df[ + (final_site_level_information_df.coverage == 0) |\ + (final_site_level_information_df.coverage < final_site_level_information_df['count'])] + + print("{} rows had coverage of 0 or more edits than coverage... filtering these out, but look into them...".format( + len(weird_sites))) + + final_site_level_information_df = final_site_level_information_df[ + (final_site_level_information_df.coverage > 0) & \ + (final_site_level_information_df.coverage >= final_site_level_information_df['count']) + ] + + + final_site_level_information_df['combo'] = final_site_level_information_df['count'].astype(str) + ',' + coverage_col + + if skip_coverage: + final_site_level_information_df['score'] = -1 + else: + final_site_level_information_df['score'] = final_site_level_information_df.apply(calculate_sailor_score, axis=1) + + final_site_level_information_df['start'] = final_site_level_information_df['position'] + final_site_level_information_df['end'] = final_site_level_information_df['position'] + 1 + + final_site_level_information_df = final_site_level_information_df[['contig', 'start', 'end', 'score', 'combo', 'strand']] + return final_site_level_information_df, weird_sites + + +def concatenate_files(source_folder, file_pattern, output_filepath, run=False): + # Create the concatenation command with numeric sorting and header skipping + concat_command = ( + f"for f in $(ls -v {source_folder}/{file_pattern}); do " + "tail -n +2 \"$f\"; " # Skip the header row for each file + "done > {}".format(output_filepath) + ) + + # Write the command to a shell script + concat_bash = f"{source_folder}/concat_command.sh" + with open(concat_bash, 'w') as f: + f.write(concat_command) + + if run: + print("Concatenating files in numerical order without headers...") + subprocess.run(['bash', concat_bash]) + print("Done concatenating.") + + +def get_edits_with_coverage_df(output_folder, + barcode_tag=None): + + all_edit_info_unique_position_with_coverage_df = pd.read_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t', + dtype={'coverage': int, 'position': int, + 'contig': str}) + # If there was a barcode specified, then the contigs will have been set to a combination of contig and barcode ID. + # For example, we will find a contig to be 9_GATCCCTCAGTAACGG-1, and we will want to simplify it back to simply 9, + # as the barcode information is separately already present as it's own column in this dataframe. To ensure code continuity, + # this will still be true even with no barcode specified, in which case the contig will be _no_barcode + + # Therefore: replace the contig with the part before the barcode + all_edit_info_unique_position_with_coverage_df['contig'] = all_edit_info_unique_position_with_coverage_df.apply(lambda row: row['contig'].replace('_{}'.format(row['barcode']), ''), axis=1) + + return all_edit_info_unique_position_with_coverage_df + + + +def zero_edit_found(final_site_level_information_df, output_folder, sailor_list, bedgraphs_list, keep_intermediate_files, start_time, logging_folder): + print("No edits found.") + sites_columns = ['site_id','barcode','contig','position','ref','alt','strand','count','coverage','conversion','strand_conversion'] + sites_empty_df = pd.DataFrame(columns=sites_columns) + sites_empty_df.to_csv('{}/final_filtered_site_info.tsv'.format(output_folder), sep='\t', index=False) + + Annotated_sites_columns = ['site_id','barcode','contig','position','ref','alt','strand','count','coverage','conversion','strand_conversion','feature_name','feature_type','feature_strand'] + annotated_sites_empty_df = pd.DataFrame(columns=Annotated_sites_columns) + annotated_sites_empty_df.to_csv('{}/final_filtered_site_info_annotated.tsv'.format(output_folder), sep='\t', index=False) + + empty_df = pd.DataFrame() + if len(sailor_list) > 0: + for conversion in sailor_list: + conversion_search = conversion[0] + '>' + conversion[1] + empty_df.to_csv('{}/sailor_style_sites_{}.bed'.format( + output_folder, + conversion_search.replace(">", "-")), + header=False, index=False, sep='\t') + + if len(bedgraphs_list) > 0: + bedgraph_folder = '{}/bedgraphs'.format(output_folder) + make_folder(bedgraph_folder) + for conversion in bedgraphs_list: + conversion_search = conversion[0] + '>' + conversion[1] + empty_df.to_csv('{}/{}_{}.bedgraph'.format(bedgraph_folder, output_folder.split('/')[-1], conversion), sep='\t', index=False, header=False) + + current, peak = tracemalloc.get_traced_memory() + + with open('{}/manifest.txt'.format(logging_folder), 'a+') as f: + f.write(f'sites\t{len(final_site_level_information_df)}\n') + f.write(f'peak_memory_mb\t{peak/1e6}\n') + f.write(f'time_elapsed_seconds\t{time.time()-start_time:.2f}s\n') + + print(f"Current memory usage {current/1e6}MB; Peak: {peak/1e6}MB") + print(f'Time elapsed: {time.time()-start_time:.2f}s') + + if not keep_intermediate_files: + pretty_print("Deleting intermediate files...", style="-") + delete_intermediate_files(output_folder) + + pretty_print("Done!", style="+") + return 'Done' + + +def delete_intermediate_files(output_folder): + to_delete = ['coverage', 'edit_info', 'split_bams', 'combined_all_cells_split_by_suffix', + 'combined_source_cells_split_by_suffix', + 'matrix_outputs', + 'all_edit_info.tsv', + 'concat_command.sh', 'depth_command_source_cells.sh', 'combined.bed', 'merge_command.sh', + 'final_edit_info_no_coverage.tsv', 'final_edit_info_no_coverage_sorted.tsv', + 'depths_source_cells.txt', 'depth_modified.tsv', 'final_edit_info.tsv', 'final_filtered_edit_info.tsv', + 'combined_all_cells.bed', 'depth_command_all_cells.sh', 'depths_all_cells.txt', 'combined_source_cells.bed' + ] + for object in to_delete: + object_path = '{}/{}'.format(output_folder, object) + + if os.path.exists(object_path): + if os.path.isfile(object_path): + os.remove(object_path) + else: + shutil.rmtree(object_path) diff --git a/tests/integration_tests.ipynb b/tests/integration_tests.ipynb index 25464a8..6c9d709 100644 --- a/tests/integration_tests.ipynb +++ b/tests/integration_tests.ipynb @@ -2889,7 +2889,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 7, "id": "f5c3ed3e-13dd-4399-924e-3d1ac17ce387", "metadata": {}, "outputs": [ @@ -3012,6 +3012,7 @@ "\t >>> long_read_sc_test passed! <<<\n", "\n", "Checking that analyzing a single-cell dataset in 'bulk' mode (i.e. not specificying the 'CB' barcode) yields the exact same positions and base changes, but with counts and coverages aggregated rather than at a single-cell resolution\n", + "grouped_sc_rows: 62, bulk_rows: 62\n", "\n", "\t >>> single-cell and bulk on same dataset comparison passed! <<<\n", "\n", @@ -3341,6 +3342,7 @@ "bulk_folder = 'only_5_cells_bulk_mode_test'\n", "sc_5_cells = pd.read_csv('singlecell_tests/{}/final_filtered_site_info.tsv'.format(sc_folder), sep='\\t').sort_values(['position', 'strand_conversion'])\n", "bulk_5_cells = pd.read_csv('singlecell_tests/{}/final_filtered_site_info.tsv'.format(bulk_folder), sep='\\t').sort_values(['position', 'strand_conversion'])\n", + "\n", "print(\"Checking that analyzing a single-cell dataset in 'bulk' mode (i.e. not specificying the 'CB' barcode) yields the exact same positions and base changes, but with counts and coverages aggregated rather than at a single-cell resolution\")\n", "grouped_sc = pd.DataFrame(sc_5_cells.groupby(['contig', 'position', 'strand_conversion']).agg({'count': sum, 'strand_conversion': 'unique'}))\n", "grouped_sc.index.names = ['contig', 'position', 'c']\n", @@ -3357,6 +3359,7 @@ " bulk_rows.append((r['contig'], r['position'], r['strand_conversion']))\n", "\n", "try:\n", + " print(\"grouped_sc_rows: {}, bulk_rows: {}\".format(len(grouped_sc_rows), len(bulk_rows)))\n", " assert(grouped_sc_rows == bulk_rows)\n", " for bulk_item, grouped_sc_item in zip(bulk_rows, grouped_sc_rows):\n", " assert(bulk_item == grouped_sc_item)\n", diff --git a/tests/integration_tests_run.sh b/tests/integration_tests_run.sh index f7ab6be..3d10906 100755 --- a/tests/integration_tests_run.sh +++ b/tests/integration_tests_run.sh @@ -34,7 +34,7 @@ echo "SC tests scripts" ls -lh $MARINE/tests/$tests_folder/scripts/ -for t in "only_5_cells_test" "only_5_cells_bulk_mode_test" "long_read_sc_test" "edge_case_test" "edge_case_dist_filter_test" +for t in "only_5_cells_test" "only_5_cells_bulk_mode_test" "only_5_cells_all_cells_coverage_test" "long_read_sc_test" "edge_case_test" "edge_case_dist_filter_test" do echo $t diff --git a/tests/singlecell_tests/scripts/only_5_cells_bulk_mode_test.sh b/tests/singlecell_tests/scripts/only_5_cells_bulk_mode_test.sh index 2a60470..88796ec 100644 --- a/tests/singlecell_tests/scripts/only_5_cells_bulk_mode_test.sh +++ b/tests/singlecell_tests/scripts/only_5_cells_bulk_mode_test.sh @@ -14,4 +14,4 @@ $MARINE/tests/singlecell_tests/only_5_cells_bulk_mode_test \ --cores \ 4 \ --strandedness 2 --verbose \ ---contigs "9" --interval_length 32000000 \ No newline at end of file +--contigs "9" --interval_length 32000000 --keep_intermediate_files diff --git a/tests/singlecell_tests/scripts/only_5_cells_test.sh b/tests/singlecell_tests/scripts/only_5_cells_test.sh index cfd6ccc..5413f0c 100644 --- a/tests/singlecell_tests/scripts/only_5_cells_test.sh +++ b/tests/singlecell_tests/scripts/only_5_cells_test.sh @@ -15,4 +15,4 @@ $MARINE/tests/singlecell_tests/only_5_cells_test \ 4 \ --barcode_tag "CB" \ --strandedness 2 \ ---contigs "9" --interval_length 32000000 \ No newline at end of file +--keep_intermediate_files --contigs "9" --interval_length 32000000 diff --git a/tests/strandedness_tests/scripts/.ipynb_checkpoints/F1R2_pair_test-checkpoint.sh b/tests/strandedness_tests/scripts/.ipynb_checkpoints/F1R2_pair_test-checkpoint.sh index eeffa87..887a8f9 100644 --- a/tests/strandedness_tests/scripts/.ipynb_checkpoints/F1R2_pair_test-checkpoint.sh +++ b/tests/strandedness_tests/scripts/.ipynb_checkpoints/F1R2_pair_test-checkpoint.sh @@ -17,4 +17,4 @@ $MARINE/tests/strandedness_tests/F1R2_pair_test \ --strandedness 2 \ --contigs "chr17" \ --sailor \ ---num_intervals_per_contig 16 \ No newline at end of file +--keep_intermediate_files \ No newline at end of file diff --git a/tests/strandedness_tests/scripts/F1R2_pair_test.sh b/tests/strandedness_tests/scripts/F1R2_pair_test.sh index eeffa87..887a8f9 100644 --- a/tests/strandedness_tests/scripts/F1R2_pair_test.sh +++ b/tests/strandedness_tests/scripts/F1R2_pair_test.sh @@ -17,4 +17,4 @@ $MARINE/tests/strandedness_tests/F1R2_pair_test \ --strandedness 2 \ --contigs "chr17" \ --sailor \ ---num_intervals_per_contig 16 \ No newline at end of file +--keep_intermediate_files \ No newline at end of file