diff --git a/.dockerignore b/.dockerignore index 42e292c2..3bd00184 100644 --- a/.dockerignore +++ b/.dockerignore @@ -3,4 +3,6 @@ ./docker/ .dockerignore .git -./venv \ No newline at end of file +./venv +./dist +./cpdb-venv/ diff --git a/cellphonedb/cellphonedb_cli.py b/cellphonedb/cellphonedb_cli.py index b5e52b39..afc5b2f9 100755 --- a/cellphonedb/cellphonedb_cli.py +++ b/cellphonedb/cellphonedb_cli.py @@ -3,11 +3,9 @@ import click -import cellphonedb.src.api_endpoints.terminal_api.database_terminal_api_endpoints.database_terminal_commands -from cellphonedb.src.api_endpoints.terminal_api import database_terminal_api_endpoints +from cellphonedb.src.api_endpoints.terminal_api.database_terminal_api_endpoints import database_terminal_commands from cellphonedb.src.api_endpoints.terminal_api.method_terminal_api_endpoints import method_terminal_commands from cellphonedb.src.api_endpoints.terminal_api.plot_terminal_api_endpoints import plot_terminal_commands - from cellphonedb.src.api_endpoints.terminal_api.query_terminal_api_endpoints import query_terminal_commands from cellphonedb.src.api_endpoints.terminal_api.tools_terminal_api_endpoints import tools_terminal_commands @@ -42,10 +40,10 @@ def plot(): query.add_command(query_terminal_commands.find_interactions_by_element) query.add_command(query_terminal_commands.get_interaction_gene) -database.add_command(database_terminal_api_endpoints.database_terminal_commands.download) -database.add_command(database_terminal_api_endpoints.database_terminal_commands.list_remote) -database.add_command(database_terminal_api_endpoints.database_terminal_commands.list_local) -database.add_command(database_terminal_api_endpoints.database_terminal_commands.generate) +database.add_command(database_terminal_commands.download) +database.add_command(database_terminal_commands.list_remote) +database.add_command(database_terminal_commands.list_local) +database.add_command(database_terminal_commands.generate) plot.add_command(plot_terminal_commands.dot_plot) plot.add_command(plot_terminal_commands.heatmap_plot) @@ -58,7 +56,8 @@ def tools(): pass - database.add_command(database_terminal_api_endpoints.database_terminal_commands.collect) + query.add_command(query_terminal_commands.autocomplete) + database.add_command(database_terminal_commands.collect) tools.add_command(tools_terminal_commands.generate_genes) tools.add_command(tools_terminal_commands.generate_proteins) tools.add_command(tools_terminal_commands.generate_complex) diff --git a/cellphonedb/src/api_endpoints/terminal_api/method_terminal_api_endpoints/method_terminal_commands.py b/cellphonedb/src/api_endpoints/terminal_api/method_terminal_api_endpoints/method_terminal_commands.py index 2926f549..f9fb1508 100644 --- a/cellphonedb/src/api_endpoints/terminal_api/method_terminal_api_endpoints/method_terminal_commands.py +++ b/cellphonedb/src/api_endpoints/terminal_api/method_terminal_api_endpoints/method_terminal_commands.py @@ -5,13 +5,13 @@ import click from click import Context, Argument +from cellphonedb.src.api_endpoints.terminal_api.util.choose_database import choose_database from cellphonedb.src.app import cpdb_app from cellphonedb.src.app.app_logger import app_logger from cellphonedb.src.core.exceptions.AllCountsFilteredException import AllCountsFilteredException from cellphonedb.src.core.exceptions.EmptyResultException import EmptyResultException from cellphonedb.src.core.exceptions.ThresholdValueException import ThresholdValueException from cellphonedb.src.core.utils.subsampler import Subsampler -from cellphonedb.src.database.manager import DatabaseVersionManager from cellphonedb.src.exceptions.ParseCountsException import ParseCountsException from cellphonedb.src.exceptions.ParseMetaException import ParseMetaException from cellphonedb.src.exceptions.ReadFileException import ReadFileException @@ -59,10 +59,6 @@ def subsampling_options(f: Callable) -> Callable: return f -def choose_database(ctx: Context, argument: Argument, value: str) -> Optional[str]: - return DatabaseVersionManager.find_database_for(value) - - def common_options(f: Callable) -> Callable: options = [ click.argument('meta-filename'), diff --git a/cellphonedb/src/api_endpoints/terminal_api/query_terminal_api_endpoints/query_terminal_commands.py b/cellphonedb/src/api_endpoints/terminal_api/query_terminal_api_endpoints/query_terminal_commands.py index f30a130d..760e61d8 100644 --- a/cellphonedb/src/api_endpoints/terminal_api/query_terminal_api_endpoints/query_terminal_commands.py +++ b/cellphonedb/src/api_endpoints/terminal_api/query_terminal_api_endpoints/query_terminal_commands.py @@ -1,22 +1,40 @@ -from click._unicodefun import click +from typing import Callable +import click + +from cellphonedb.src.api_endpoints.terminal_api.util.choose_database import choose_database +from cellphonedb.src.app import cpdb_app from cellphonedb.src.local_launchers.local_query_launcher import LocalQueryLauncher -from cellphonedb.src.app.cellphonedb_app import cellphonedb_app + + +def common_options(f: Callable) -> Callable: + options = [ + click.option('--verbose/--quiet', default=True, help='Print or hide cellphonedb logs [verbose]'), + click.option('--database', default='latest', callback=choose_database), + ] + + for option in reversed(options): + f = option(f) + + return f @click.command() @click.argument('element') -def find_interactions_by_element(element: str): - LocalQueryLauncher(cellphonedb_app).find_interactions_by_element(element) +@common_options +def find_interactions_by_element(element: str, verbose: bool, database: str): + LocalQueryLauncher(cpdb_app.create_app(verbose, database)).find_interactions_by_element(element) @click.command() @click.option('--columns', default=None, help='Columns to set in the result') -def get_interaction_gene(columns: str): - LocalQueryLauncher(cellphonedb_app).get_interaction_gene(columns) +@common_options +def get_interaction_gene(columns: str, verbose: bool, database: str): + LocalQueryLauncher(cpdb_app.create_app(verbose, database)).get_interaction_gene(columns) @click.command() @click.argument('partial_element') -def autocomplete(partial_element: str) -> None: - LocalQueryLauncher(cellphonedb_app).autocomplete_element(partial_element) +@common_options +def autocomplete(partial_element: str, verbose: bool, database: str) -> None: + LocalQueryLauncher(cpdb_app.create_app(verbose, database)).autocomplete_element(partial_element) diff --git a/cellphonedb/src/api_endpoints/terminal_api/util/__init__.py b/cellphonedb/src/api_endpoints/terminal_api/util/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cellphonedb/src/api_endpoints/terminal_api/util/choose_database.py b/cellphonedb/src/api_endpoints/terminal_api/util/choose_database.py new file mode 100644 index 00000000..98caa299 --- /dev/null +++ b/cellphonedb/src/api_endpoints/terminal_api/util/choose_database.py @@ -0,0 +1,9 @@ +from typing import Optional + +from click import Context, Argument + +from cellphonedb.src.database.manager import DatabaseVersionManager + + +def choose_database(ctx: Context, argument: Argument, value: str) -> Optional[str]: + return DatabaseVersionManager.find_database_for(value) \ No newline at end of file diff --git a/cellphonedb/src/core/queries/autocomplete_queries.py b/cellphonedb/src/core/queries/autocomplete_queries.py index 6e2857ee..3665a59b 100644 --- a/cellphonedb/src/core/queries/autocomplete_queries.py +++ b/cellphonedb/src/core/queries/autocomplete_queries.py @@ -1,21 +1,29 @@ -import pandas as pd import re +import pandas as pd + def autocomplete_query(genes: pd.DataFrame, multidatas: pd.DataFrame, partial_element: pd.DataFrame) -> pd.DataFrame: - values = genes[genes['ensembl'].str.contains(partial_element, flags=re.IGNORECASE)]['ensembl'] - values = values.append( - genes[genes['protein_name'].str.contains(partial_element, flags=re.IGNORECASE)]['protein_name'], - ignore_index=True) - values = values.append( - genes[genes['gene_name'].str.contains(partial_element, flags=re.IGNORECASE)]['gene_name'], - ignore_index=True) - values = values.append( - genes[genes['hgnc_symbol'].str.contains(partial_element, flags=re.IGNORECASE)]['hgnc_symbol'], - ignore_index=True) - values = values.append( - multidatas[multidatas['name'].str.contains(partial_element, flags=re.IGNORECASE)]['name'], - ignore_index=True) - result = pd.DataFrame(data=values, columns=['value']) + values = _partial_filter(genes, 'ensembl', partial_element) + + by_protein_name = _partial_filter(genes, 'protein_name', partial_element) + by_gene_name = _partial_filter(genes, 'gene_name', partial_element) + + with_hgnc_symbol = genes.dropna(subset=['hgnc_symbol']) + by_hgnc_symbol = _partial_filter(with_hgnc_symbol, 'hgnc_symbol', partial_element) + + by_name = _partial_filter(multidatas, 'name', partial_element) + + values = values.append(by_protein_name, ignore_index=True) + values = values.append(by_gene_name, ignore_index=True) + values = values.append(by_hgnc_symbol, ignore_index=True) + values = values.append(by_name, ignore_index=True) + + result = pd.DataFrame(data=values, columns=['value']).drop_duplicates() return result + + +def _partial_filter(input_data, name, partial_element): + matching = input_data[input_data[name].str.contains(partial_element, flags=re.IGNORECASE)][name] + return matching diff --git a/cellphonedb/src/core/tests/queries/__init__.py b/cellphonedb/src/core/tests/queries/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cellphonedb/src/core/tests/queries/test_autocomplete_queries.py b/cellphonedb/src/core/tests/queries/test_autocomplete_queries.py new file mode 100644 index 00000000..1487bf3e --- /dev/null +++ b/cellphonedb/src/core/tests/queries/test_autocomplete_queries.py @@ -0,0 +1,56 @@ +import random +from unittest import TestCase + +from cellphonedb.src.app.app_config import AppConfig +from cellphonedb.src.core.CellphonedbSqlalchemy import CellphonedbSqlalchemy + + +class TestAutocompleteQueries(TestCase): + + def setUp(self) -> None: + self.cellphone = CellphonedbSqlalchemy(AppConfig().get_cellphone_core_config()) + gene_repository = self.cellphone.database_manager.get_repository('gene') + self.all_genes = gene_repository.get_all_expanded().to_dict(orient='records') + + def test_find_elements_by_gene_name(self): + self._test_find_elements_by_('gene_name') + + def test_find_elements_by_protein_name(self): + self._test_find_elements_by_('protein_name') + + def test_find_elements_by_hgnc_symbol(self): + self._test_find_elements_by_('hgnc_symbol') + + def test_find_elements_by_ensembl(self): + self._test_find_elements_by_('ensembl') + + def test_find_elements_by_name(self): + self._test_find_elements_by_('name') + + def _test_find_elements_by_(self, field): + random_gene = random.choice(self.all_genes) + whole_input = random_gene[field] + + whole_query_result = self.cellphone.query.autocomplete_launcher(whole_input) + + whole_results = whole_query_result['value'].tolist() + self.assertIn(whole_input, whole_results) + + partial_input = self._random_substring(whole_input) + + partial_query_result = self.cellphone.query.autocomplete_launcher(partial_input) + + partial_results = partial_query_result['value'].tolist() + self.assertIn(whole_input, partial_results) + + self.assertGreaterEqual(len(partial_results), len(whole_results)) + + def _random_substring(self, whole_input): + start_index = self._random_position_to_half(whole_input) + end_index = self._random_position_to_half(whole_input) + + return whole_input[start_index:-end_index if end_index else None] + + @staticmethod + def _random_position_to_half(string): + return random.randint(0, int((len(string)) / 2)) diff --git a/cellphonedb/src/local_launchers/local_query_launcher.py b/cellphonedb/src/local_launchers/local_query_launcher.py index e3c02b94..d50e9081 100644 --- a/cellphonedb/src/local_launchers/local_query_launcher.py +++ b/cellphonedb/src/local_launchers/local_query_launcher.py @@ -14,13 +14,13 @@ def __init__(self, cellphonedb_app): self.cellphonedb_app = cellphonedb_app def find_interactions_by_element(self, element: str) -> None: - print(self.cellphonedb_app.cellphonedb.query.find_interactions_by_element(element).to_csv(index=False)) + print(self.cellphonedb_app.query.find_interactions_by_element(element).to_csv(index=False)) def get_interaction_gene(self, columns: str) -> None: if columns: columns = columns.split(',') - print(self.cellphonedb_app.cellphonedb.query.get_all_genes(columns).to_csv(index=False)) + print(self.cellphonedb_app.query.get_all_genes(columns).to_csv(index=False)) def autocomplete_element(self, partial_element: str) -> None: - print(self.cellphonedb_app.cellphonedb.query.autocomplete_launcher(partial_element).to_csv(index=False)) + print(self.cellphonedb_app.query.autocomplete_launcher(partial_element).to_csv(index=False)) diff --git a/cellphonedb/src/plotters/R/plot_dot_by_column_name.R b/cellphonedb/src/plotters/R/plot_dot_by_column_name.R index 9ec628ab..1a634842 100644 --- a/cellphonedb/src/plotters/R/plot_dot_by_column_name.R +++ b/cellphonedb/src/plotters/R/plot_dot_by_column_name.R @@ -14,23 +14,22 @@ dot_plot = function(selected_rows = NULL, all_pval = read.table(pvalues_path, header=T, stringsAsFactors = F, sep=means_separator, comment.char = '', check.names=F) all_means = read.table(means_path, header=T, stringsAsFactors = F, sep=pvalues_separator, comment.char = '', check.names=F) - rownames(all_pval) = all_pval$interacting_pair - rownames(all_means) = all_means$interacting_pair + intr_pairs = all_pval$interacting_pair all_pval = all_pval[,-c(1:11)] all_means = all_means[,-c(1:11)] if(is.null(selected_rows)){ - selected_rows = rownames(all_pval) + selected_rows = intr_pairs } if(is.null(selected_columns)){ selected_columns = colnames(all_pval) } - sel_pval = all_pval[selected_rows, selected_columns] - sel_means = all_means[selected_rows, selected_columns] + sel_pval = all_pval[match(selected_rows, intr_pairs), selected_columns] + sel_means = all_means[match(selected_rows, intr_pairs), selected_columns] - df_names = expand.grid(rownames(sel_pval), colnames(sel_pval)) + df_names = expand.grid(selected_rows, selected_columns) pval = unlist(sel_pval) pval[pval==0] = 0.0009 plot.data = cbind(df_names,pval) diff --git a/cellphonedb/tools/generate_data/getters/get_imex.py b/cellphonedb/tools/generate_data/getters/get_imex.py index 44517f80..bb5f180f 100644 --- a/cellphonedb/tools/generate_data/getters/get_imex.py +++ b/cellphonedb/tools/generate_data/getters/get_imex.py @@ -155,7 +155,7 @@ def _get_chunked_api_results(carry, columns_to_save, proteins, source): def _get_single_api_results(carry, columns_to_save, source): url = source['base_url'] - print('Fetching {}'.format(source['name'])) + tqdm.tqdm.write('Fetching {}'.format(source['name'])) try: response = requests.get(url) if response.text: diff --git a/rabbit_logger.py b/rabbit_logger.py new file mode 100644 index 00000000..60f52f1a --- /dev/null +++ b/rabbit_logger.py @@ -0,0 +1,24 @@ +import logging +import sys + + +class RabbitLogger: + def __init__(self): + self._logger = logging.getLogger(__name__) + formatter = logging.Formatter('[ ][QUEUE][%(asctime)s][%(levelname)s] %(message)s', "%d/%m/%y-%H:%M:%S") + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(formatter) + self._logger.addHandler(handler) + self._logger.setLevel(logging.INFO) + + def __getattr__(self, item): + return getattr(self._logger, item) + + +class RabbitAdapter(logging.LoggerAdapter): + def process(self, msg, kwargs): + return '[{}] {}'.format(self.extra['job_id'], msg), kwargs + + @classmethod + def logger_for(cls, logger, job_id): + return cls(logger, {'job_id': job_id}) diff --git a/run_cellphonedb_rabbitmq.py b/run_cellphonedb_rabbitmq.py index 25f00ac3..951664c4 100755 --- a/run_cellphonedb_rabbitmq.py +++ b/run_cellphonedb_rabbitmq.py @@ -7,13 +7,16 @@ import tempfile import time import traceback +from distutils.util import strtobool +from functools import wraps +from logging import INFO +from typing import Callable import boto3 import pandas as pd import pika from cellphonedb.src.app import cpdb_app -from cellphonedb.src.app.app_logger import app_logger from cellphonedb.src.core.exceptions.AllCountsFilteredException import AllCountsFilteredException from cellphonedb.src.core.exceptions.EmptyResultException import EmptyResultException from cellphonedb.src.core.exceptions.ThresholdValueException import ThresholdValueException @@ -25,6 +28,9 @@ from cellphonedb.src.exceptions.ReadFileException import ReadFileException from cellphonedb.src.plotters.r_plotter import dot_plot, heatmaps_plot from cellphonedb.utils import utils +from rabbit_logger import RabbitAdapter, RabbitLogger + +rabbit_logger = RabbitLogger() try: s3_access_key = os.environ['S3_ACCESS_KEY'] @@ -41,9 +47,31 @@ except KeyError as e: - app_logger.error('ENVIRONMENT VARIABLE {} not defined. Please set it'.format(e)) + rabbit_logger.error('ENVIRONMENT VARIABLE {} not defined. Please set it'.format(e)) exit(1) +verbose = bool(strtobool(os.getenv('VERBOSE', 'true'))) + +if verbose: + rabbit_logger.setLevel(INFO) + + +def logger_for_job(job_id): + return RabbitAdapter.logger_for(rabbit_logger, job_id) + + +def _track_success(f) -> Callable: + @wraps(f) + def wrapper(*args, **kwargs): + logger = kwargs.get('logger', rabbit_logger) + + logger.info('calling {} method'.format(f.__name__)) + result = f(*args, **kwargs) + logger.info('successfully called {} method'.format(f.__name__)) + return result + + return wrapper + def create_rabbit_connection(): return pika.BlockingConnection(pika.ConnectionParameters( @@ -92,6 +120,7 @@ def write_image_to_s3(path: str, filename: str): s3_client.put_object(Body=_io, Bucket=s3_bucket_name, Key=filename) +@_track_success def dot_plot_results(means: str, pvalues: str, rows: str, columns: str, job_id: str): with tempfile.TemporaryDirectory() as output_path: with tempfile.NamedTemporaryFile(suffix=os.path.splitext(means)[-1]) as means_file: @@ -130,6 +159,7 @@ def dot_plot_results(means: str, pvalues: str, rows: str, columns: str, job_id: return response +@_track_success def heatmaps_plot_results(meta: str, pvalues: str, pvalue: float, job_id: str): with tempfile.TemporaryDirectory() as output_path: with tempfile.NamedTemporaryFile(suffix=os.path.splitext(pvalues)[-1]) as pvalues_file: @@ -176,10 +206,11 @@ def _from_s3_to_temp(key, file): return file -def process_plot(method, properties, body) -> dict: +@_track_success +def process_plot(method, properties, body, logger) -> dict: metadata = json.loads(body.decode('utf-8')) job_id = metadata['job_id'] - app_logger.info('New Plot Queued: {}'.format(job_id)) + logger.info('New Plot Queued') plot_type = metadata.get('type', None) @@ -208,10 +239,11 @@ def process_plot(method, properties, body) -> dict: } -def process_method(method, properties, body) -> dict: +@_track_success +def process_method(method, properties, body, logger) -> dict: metadata = json.loads(body.decode('utf-8')) job_id = metadata['job_id'] - app_logger.info('New Job Queued: {}'.format(job_id)) + logger.info('New Job Queued') meta = read_data_from_s3(metadata['file_meta'], s3_bucket_name, index_column_first=False) counts = read_data_from_s3(metadata['file_counts'], s3_bucket_name, index_column_first=True) @@ -225,7 +257,7 @@ def process_method(method, properties, body) -> dict: if database_version not in list_local_versions() + ['latest']: database_version = 'latest' - app = cpdb_app.create_app(verbose=False, database_file=find_database_for(database_version)) + app = cpdb_app.create_app(verbose=verbose, database_file=find_database_for(database_version)) if metadata['iterations']: response = statistical_analysis(app, meta, counts, job_id, metadata, subsampler) @@ -235,6 +267,7 @@ def process_method(method, properties, body) -> dict: return response +@_track_success def statistical_analysis(app, meta, counts, job_id, metadata, subsampler): pvalues, means, significant_means, deconvoluted = \ app.method.cpdb_statistical_analysis_launcher(meta, @@ -265,6 +298,7 @@ def statistical_analysis(app, meta, counts, job_id, metadata, subsampler): return response +@_track_success def non_statistical_analysis(app, meta, counts, job_id, metadata, subsampler): means, significant_means, deconvoluted = \ app.method.cpdb_method_analysis_launcher(meta, @@ -302,11 +336,13 @@ def non_statistical_analysis(app, meta, counts, job_id, metadata, subsampler): job = channel.basic_get(queue=jobs_queue_name, no_ack=True) if all(job): + job_id = json.loads(job[2].decode('utf-8'))['job_id'] + job_logger = logger_for_job(job_id) try: if queue_type == 'plot': - job_response = process_plot(*job) + job_response = process_plot(*job, logger=job_logger) elif queue_type == 'method': - job_response = process_method(*job) + job_response = process_method(*job, logger=job_logger) else: raise Exception('Unknown queue type') @@ -316,11 +352,11 @@ def non_statistical_analysis(app, meta, counts, job_id, metadata, subsampler): channel.basic_qos(prefetch_count=1) channel.basic_publish(exchange='', routing_key=result_queue_name, body=json.dumps(job_response)) - app_logger.info('JOB %s PROCESSED' % job_response['job_id']) + job_logger.info('JOB PROCESSED') except (ReadFileException, ParseMetaException, ParseCountsException, ThresholdValueException, AllCountsFilteredException, EmptyResultException, PlotException) as e: error_response = { - 'job_id': json.loads(job[2].decode('utf-8'))['job_id'], + 'job_id': job_id, 'success': False, 'error': { 'id': str(e), @@ -330,16 +366,16 @@ def non_statistical_analysis(app, meta, counts, job_id, metadata, subsampler): } } print(traceback.print_exc(file=sys.stdout)) - app_logger.error('[-] ERROR DURING PROCESSING JOB %s' % error_response['job_id']) + job_logger.error('[-] ERROR DURING PROCESSING JOB') if connection.is_closed: connection = create_rabbit_connection() channel = connection.channel() channel.basic_qos(prefetch_count=1) channel.basic_publish(exchange='', routing_key=result_queue_name, body=json.dumps(error_response)) - app_logger.error(e) + job_logger.error(e) except Exception as e: error_response = { - 'job_id': json.loads(job[2].decode('utf-8'))['job_id'], + 'job_id': job_id, 'success': False, 'error': { 'id': 'unknown_error', @@ -347,17 +383,17 @@ def non_statistical_analysis(app, meta, counts, job_id, metadata, subsampler): } } print(traceback.print_exc(file=sys.stdout)) - app_logger.error('[-] ERROR DURING PROCESSING JOB %s' % error_response['job_id']) + job_logger.error('[-] ERROR DURING PROCESSING JOB') if connection.is_closed: connection = create_rabbit_connection() channel = connection.channel() channel.basic_qos(prefetch_count=1) channel.basic_publish(exchange='', routing_key=result_queue_name, body=json.dumps(error_response)) - app_logger.error(e) + job_logger.error(e) jobs_runned += 1 else: - app_logger.debug('Empty queue') + rabbit_logger.debug('Empty queue') time.sleep(1) diff --git a/setup.py b/setup.py index 6ed56597..16140fa6 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ name='CellPhoneDB', author='TeichLab', author_email='contact@cellphonedb.org', - version='2.0.4', + version='2.0.5', long_description=__doc__, packages=find_packages(), include_package_data=True,