Skip to content

Commit

Permalink
process edit filtering within worker on shard
Browse files Browse the repository at this point in the history
  • Loading branch information
Eric Kofman committed Jul 24, 2024
1 parent d19da17 commit 63c2ce4
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 94 deletions.
99 changes: 31 additions & 68 deletions marine.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def bam_processing(overall_label_to_list_of_contents, output_folder, barcode_tag
import subprocess

def coverage_processing(output_folder, barcode_tag='CB', paired_end=False, verbose=False, cores=1, number_of_expected_bams=4,
min_read_quality=0, bam_filepath=''):
min_read_quality=0, bam_filepath='', filters=None):

# Single-cell or long read version:
edit_info_grouped_per_contig_combined = gather_edit_information_across_subcontigs(output_folder,
Expand All @@ -133,10 +133,11 @@ def coverage_processing(output_folder, barcode_tag='CB', paired_end=False, verbo
barcode_tag=barcode_tag,
paired_end=paired_end,
verbose=verbose,
processes=cores
processes=cores,
filters=filters
)

concat_command = 'for f in {}/coverage/*.tsv; do cat $f; done > {}/all_edit_info.tsv'.format(output_folder, output_folder)
concat_command = 'for f in {}/coverage/*_filtered.tsv; do cat $f; done > {}/final_edit_info.tsv'.format(output_folder, output_folder)

command_bash = '{}/concat_command.sh'.format(output_folder)
with open(command_bash, 'w') as f:
Expand All @@ -146,6 +147,7 @@ def coverage_processing(output_folder, barcode_tag='CB', paired_end=False, verbo
subprocess.run(['bash', command_bash])
print("Done concatenating.")


total_seconds_for_contig_df = pd.DataFrame.from_dict(total_seconds_for_contig, orient='index')
total_seconds_for_contig_df.columns = ['seconds']
total_seconds_for_contig_df['contig sections'] = total_seconds_for_contig_df.index
Expand Down Expand Up @@ -335,6 +337,12 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in

coverage_subfolder = '{}/coverage'.format(output_folder)
make_folder(coverage_subfolder)

filters = {
'dist_from_end': min_dist_from_end,
'base_quality': min_base_quality,
'max_edits_per_read': max_edits_per_read
}

results, total_time, total_seconds_for_contig_df = coverage_processing(output_folder,
barcode_tag=barcode_tag,
Expand All @@ -343,54 +351,21 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in
cores=cores,
number_of_expected_bams=number_of_expected_bams,
min_read_quality=min_read_quality,
bam_filepath=bam_filepath
bam_filepath=bam_filepath,
filters=filters
)
total_seconds_for_contig_df.to_csv("{}/coverage_calculation_timing.tsv".format(logging_folder), sep='\t')

all_filter_stats = pd.DataFrame([r[1] for r in results]).sum(axis=0)
print(all_filter_stats)

with open('{}/manifest.txt'.format(logging_folder), 'a+') as f:
f.write(f'original_edits\t{float(all_filter_stats.loc["original"])}\n')
f.write(f'filtered_edits\t{float(all_filter_stats.loc["filtered"])}\n')

pretty_print("Total time to calculate coverage: {} minutes".format(round(total_time/60, 3)))


# Filtering and site-level information
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
pretty_print("Filtering and calculating site-level statistics", style="~")
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

print("Loading edit information...")
all_edit_info_pd = pd.read_csv('{}/all_edit_info.tsv'.format(output_folder),
sep='\t',
names=['barcode', 'contig', 'position', 'ref', 'alt', 'read_id', 'strand',
'dist_from_end', 'base_quality', 'mapping_quality', 'barcode_position',
'coverage', 'source', 'position_barcode'], dtype={'base_quality': int, 'dist_from_end': int, 'contig': str})


coverage_per_unique_position_df = pd.DataFrame(all_edit_info_pd.groupby(
[
"position_barcode"
]).coverage.max())

distinguishing_columns = [
"barcode",
"contig",
"position",
"ref",
"alt",
"read_id",
"strand",
"dist_from_end",
"base_quality",
"mapping_quality"
]
all_edit_info_unique_position_df = all_edit_info_pd.drop_duplicates(distinguishing_columns)[distinguishing_columns]

all_edit_info_unique_position_df.index = all_edit_info_unique_position_df['position'].astype(str)\
+ '_' + all_edit_info_unique_position_df['barcode']

all_edit_info_unique_position_with_coverage_df = all_edit_info_unique_position_df.join(coverage_per_unique_position_df)


all_edit_info_unique_position_with_coverage_df.to_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t')

if skip_coverage:
# If we skip coverage steps, we have to just collate edit info results from the edit finding steps into one giant
# final_edit_info.tsv file (where coverage is None)
Expand All @@ -410,18 +385,13 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in
# Filtering steps
if not final_path_already_exists:
print("Filtering..")
if filtering_only:
all_edit_info_unique_position_with_coverage_df = pd.read_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t', dtype={"contig": str})

pretty_print("Filtering edited sites", style='~')
pretty_print("Minimum distance from end = {}, Minimum base-calling quality = {}".format(min_dist_from_end, min_base_quality))

#all_edit_info_unique_position_with_coverage_df['min_base_quality'] = all_edit_info_unique_position_with_coverage_df['min_base_quality'].astype(int)

all_edit_info_filtered = all_edit_info_unique_position_with_coverage_df[
(all_edit_info_unique_position_with_coverage_df["base_quality"] >= min_base_quality) &
(all_edit_info_unique_position_with_coverage_df["dist_from_end"] >= min_dist_from_end)]

all_edit_info_unique_position_with_coverage_df = pd.read_csv('{}/final_edit_info.tsv'.format(output_folder), sep='\t',
index_col=0,
names=[
'barcode', 'contig', 'position', 'ref', 'alt', 'read_id',
'strand','dist_from_end', 'base_quality', 'mapping_quality',
'coverage'],
dtype={'base_quality': int, 'dist_from_end': int, 'contig': str})

print("Deduplicating....")
distinguishing_columns = [
Expand All @@ -437,22 +407,14 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in
if not skip_coverage:
distinguishing_columns.append("coverage")

all_edit_info_filtered_deduped = all_edit_info_filtered.drop_duplicates(
all_edit_info_filtered_deduped = all_edit_info_unique_position_with_coverage_df.drop_duplicates(
distinguishing_columns)[distinguishing_columns]

pretty_print("\tNumber of edits before filtering:\n\t{}".format(len(all_edit_info_unique_position_with_coverage_df)))
pretty_print("\tNumber of edits after filtering:\n\t{}".format(len(all_edit_info_filtered)))
pretty_print("\tNumber of edits after deduplicating:\n\t{}".format(len(all_edit_info_filtered_deduped)))

if max_edits_per_read:
pretty_print("\t\tFiltering out reads with more than {} edits...".format(max_edits_per_read))
read_edits = all_edit_info_filtered_deduped.groupby('read_id').count().sort_values('barcode')
all_edit_info_filtered_deduped = all_edit_info_filtered_deduped[all_edit_info_filtered_deduped.read_id.isin(read_edits[read_edits['barcode'] <= max_edits_per_read].index)]

pretty_print("\tNumber of edits after filtering:\n\t{}".format(len(all_edit_info_unique_position_with_coverage_df)))
pretty_print("\tNumber of edits after deduplicating:\n\t{}".format(len(all_edit_info_filtered_deduped)))
all_edit_info_filtered_deduped.to_csv('{}/final_filtered_edit_info.tsv'.format(output_folder),
sep='\t', index=False)


all_edit_info_unique_position_with_coverage_deduped_df = pd.read_csv('{}/final_filtered_edit_info.tsv'.format(output_folder), sep='\t', dtype={"contig": str})


Expand Down Expand Up @@ -543,6 +505,7 @@ def run(bam_filepath, annotation_bedfile_path, output_folder, contigs=[], num_in
current, peak = tracemalloc.get_traced_memory()

with open('{}/manifest.txt'.format(logging_folder), 'a+') as f:
f.write(f'sites\t{len(final_site_level_information_df)}\n')
f.write(f'peak_memory_mb\t{peak/1e6}\n')
f.write(f'time_elapsed_seconds\t{time.time()-start_time:.2f}s\n')

Expand Down
11 changes: 7 additions & 4 deletions src/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,20 @@ def find_edits_and_split_bams_wrapper(parameters):



def run_coverage_calculator(edit_info_grouped_per_contig_combined, output_folder,
def run_coverage_calculator(edit_info_grouped_per_contig_combined,
output_folder,
barcode_tag='CB',
paired_end=False,
verbose=False,
processes=16
processes=16,
filters=None,
):
coverage_counting_job_params = get_job_params_for_coverage_for_edits_in_contig(
edit_info_grouped_per_contig_combined,
output_folder,
barcode_tag=barcode_tag,
paired_end=paired_end,
filters=filters,
verbose=verbose
)

Expand Down Expand Up @@ -337,12 +340,12 @@ def run_coverage_calculator(edit_info_grouped_per_contig_combined, output_folder


def get_job_params_for_coverage_for_edits_in_contig(edit_info_grouped_per_contig_combined, output_folder,
barcode_tag='CB', paired_end=False, verbose=False):
barcode_tag='CB', paired_end=False, filters=None, verbose=False):
job_params = []

for contig, edit_info in edit_info_grouped_per_contig_combined.items():

job_params.append([edit_info, contig, output_folder, barcode_tag, paired_end, verbose])
job_params.append([edit_info, contig, output_folder, barcode_tag, paired_end, filters, verbose])

return job_params

Expand Down
99 changes: 77 additions & 22 deletions src/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,35 +391,90 @@ def get_coverage_df(edit_info, contig, output_folder, barcode_tag='CB', paired_e

return coverage_df


def filter_output_df(output_df, filters, output_filename):
filter_stats = {}
filter_stats['original'] = len(output_df)

filtered_output_df = output_df[
(output_df.dist_from_end >= filters.get('dist_from_end')) &
(output_df.base_quality >= filters.get('base_quality'))]

coverage_per_unique_position_df = pd.DataFrame(filtered_output_df.groupby(
[
"position_barcode"
]).coverage.max())

distinguishing_columns = [
"barcode",
"contig",
"position",
"ref",
"alt",
"read_id",
"strand",
"dist_from_end",
"base_quality",
"mapping_quality"
]

all_edit_info_unique_position_df = filtered_output_df.drop_duplicates(distinguishing_columns)[distinguishing_columns]

all_edit_info_unique_position_df.index = all_edit_info_unique_position_df['position'].astype(str)\
+ '_' + all_edit_info_unique_position_df['barcode']

all_edit_info_unique_position_with_coverage_df = all_edit_info_unique_position_df.join(coverage_per_unique_position_df)

if filters.get('max_edits_per_read'):
#pretty_print("\t\tFiltering out reads with more than {} edits...".format(max_edits_per_read))
read_edits = all_edit_info_unique_position_with_coverage_df.groupby('read_id').count().sort_values('barcode')
all_edit_info_unique_position_with_coverage_df = all_edit_info_unique_position_with_coverage_df[all_edit_info_unique_position_with_coverage_df.read_id.isin(read_edits[read_edits['barcode'] <= max_edits_per_read].index)]

filter_stats['filtered'] = len(all_edit_info_unique_position_with_coverage_df)

all_edit_info_unique_position_with_coverage_df.to_csv(output_filename, sep='\t', header=False)

return filter_stats


def get_coverage_wrapper(parameters):
edit_info, contig, output_folder, barcode_tag, paired_end, verbose = parameters
edit_info, contig, output_folder, barcode_tag, paired_end, filters, verbose = parameters

output_filename = '{}/coverage/{}.tsv'.format(output_folder, contig, header=False)
filtered_output_filename = '{}/coverage/{}_filtered.tsv'.format(output_folder, contig, header=False)

if os.path.exists(output_filename):
return output_filename
# filter
edit_info_and_coverage_joined = pd.read_csv(output_filename, sep='\t', names=[
'barcode', 'contig', 'position', 'ref', 'alt', 'read_id', 'strand',
'dist_from_end', 'base_quality', 'mapping_quality', 'barcode_position',
'coverage', 'source', 'position_barcode'], dtype={'base_quality': int, 'dist_from_end': int, 'contig': str})
else:
edit_info = edit_info.with_columns(
pl.concat_str(
[
pl.col("barcode"),
pl.col("position")
],
separator=":",
).alias("barcode_position"))

edit_info_df = edit_info.to_pandas()
edit_info_df.index = edit_info_df['barcode_position']

edit_info = edit_info.with_columns(
pl.concat_str(
[
pl.col("barcode"),
pl.col("position")
],
separator=":",
).alias("barcode_position"))

edit_info_df = edit_info.to_pandas()
edit_info_df.index = edit_info_df['barcode_position']
coverage_df = get_coverage_df(edit_info, contig, output_folder, barcode_tag=barcode_tag,
paired_end=paired_end, verbose=verbose)

coverage_df = get_coverage_df(edit_info, contig, output_folder, barcode_tag=barcode_tag,
paired_end=paired_end, verbose=verbose)

# Combine edit information with coverage information
edit_info_and_coverage_joined = edit_info_df.join(coverage_df, how='inner')

edit_info_and_coverage_joined['position_barcode'] = edit_info_and_coverage_joined['position'].astype(str) + '_' + edit_info_and_coverage_joined['barcode'].astype(str)
edit_info_and_coverage_joined.to_csv(output_filename, sep='\t', header=False)
# Combine edit information with coverage information
edit_info_and_coverage_joined = edit_info_df.join(coverage_df, how='inner')

edit_info_and_coverage_joined['position_barcode'] = edit_info_and_coverage_joined['position'].astype(str) + '_' + edit_info_and_coverage_joined['barcode'].astype(str)
edit_info_and_coverage_joined.to_csv(output_filename, sep='\t', header=False)

return output_filename
filter_stats = filter_output_df(edit_info_and_coverage_joined, filters, filtered_output_filename)
assert(os.path.exists(output_filename))
assert(os.path.exists(filtered_output_filename))
return filtered_output_filename, filter_stats



Expand Down

0 comments on commit 63c2ce4

Please sign in to comment.