diff --git a/bbconf/bbagent.py b/bbconf/bbagent.py index c41f350..1ea6d26 100644 --- a/bbconf/bbagent.py +++ b/bbconf/bbagent.py @@ -23,18 +23,20 @@ class BedBaseAgent(object): def __init__( self, config: Union[Path, str], + init_ml: bool = True, ): """ Initialize connection to the pep_db database. You can use the basic connection parameters or libpq connection string. + + :param config: path to the configuration file + :param init_ml: initialize ML models for search (default: True) """ - _LOGGER.info(f"Initializing BedBaseConfig object") - self.config = BedBaseConfig(config) - _LOGGER.info(f"Initializing BedBaseAgent object") + + self.config = BedBaseConfig(config, init_ml) + self._bed = BedAgentBedFile(self.config, self) - _LOGGER.info(f"Initializing BedAgentBedSet object") self._bedset = BedAgentBedSet(self.config) - _LOGGER.info(f"Initializing BBObjects object") self._objects = BBObjects(self.config) @property diff --git a/bbconf/config_parser/bedbaseconfig.py b/bbconf/config_parser/bedbaseconfig.py index c1ec715..204af1a 100644 --- a/bbconf/config_parser/bedbaseconfig.py +++ b/bbconf/config_parser/bedbaseconfig.py @@ -46,28 +46,34 @@ class BedBaseConfig(object): Class to handle BEDbase configuration file and create objects for different modules. """ - def __init__(self, config: Union[Path, str], init_search_interfaces: bool = True): - _LOGGER.info(f"Loading configuration file: {config}") + def __init__(self, config: Union[Path, str], init_ml: bool = True): + """ + Initialize BedBaseConfig object + + :param config: path to the configuration file + :param init_ml: initialize machine learning models used for search + """ + self.cfg_path = get_bedbase_cfg(config) self._config = self._read_config_file(self.cfg_path) - - _LOGGER.info(f"Initializing database engine...") self._db_engine = self._init_db_engine() - _LOGGER.info(f"Initializing qdrant engine...") - self._qdrant_engine = self._init_qdrant_backend() - _LOGGER.info(f"Initializing qdrant text engine...") + self._qdrant_engine = self._init_qdrant_backend() self._qdrant_text_engine = self._init_qdrant_text_backend() - if init_search_interfaces: - _LOGGER.info(f"Initializing search interfaces...") + if init_ml: self._b2bsi = self._init_b2bsi_object() - _LOGGER.info(f"Initializing R2V object...") self._r2v = self._init_r2v_object() - _LOGGER.info(f"Initializing Bivec object...") self._bivec = self._init_bivec_object() + else: + _LOGGER.info( + f"Skipping initialization of ML models, init_ml parameter set to False." + ) + + self._b2bsi = None + self._r2v = None + self._bivec = None - _LOGGER.info(f"Initializing PEPHub client...") self._phc = self._init_pephubclient() self._boto3_client = self._init_boto3_client() @@ -197,6 +203,11 @@ def zarr_root(self) -> Union[Z_GROUP, None]: return zarr.group(store=cache, overwrite=False) def _init_db_engine(self) -> BaseEngine: + """ + Create database engine object using credentials provided in config file + """ + + _LOGGER.info(f"Initializing database engine...") return BaseEngine( host=self._config.database.host, port=self._config.database.port, @@ -212,6 +223,8 @@ def _init_qdrant_backend(self) -> QdrantBackend: :return: QdrantClient """ + + _LOGGER.info(f"Initializing qdrant engine...") try: return QdrantBackend( collection=self._config.qdrant.file_collection, @@ -225,12 +238,14 @@ def _init_qdrant_backend(self) -> QdrantBackend: f"error in Connection to qdrant! skipping... Error: {err}", UserWarning ) - def _init_qdrant_text_backend(self) -> QdrantBackend: + def _init_qdrant_text_backend(self) -> Union[QdrantBackend, None]: """ Create qdrant client text embedding object using credentials provided in config file :return: QdrantClient """ + + _LOGGER.info(f"Initializing qdrant text engine...") try: return QdrantBackend( dim=TEXT_EMBEDDING_DIMENSION, @@ -239,7 +254,11 @@ def _init_qdrant_text_backend(self) -> QdrantBackend: qdrant_api_key=self.config.qdrant.api_key, ) except Exception as e: - _LOGGER.error(f"Error while connecting to qdrant text engine: {e}") + _LOGGER.error(f"Error in Connection to qdrant text! skipping {e}") + warnings.warn( + "Error in Connection to qdrant text! skipping...", UserWarning + ) + return None def _init_bivec_object(self) -> Union[BiVectorSearchInterface, None]: """ @@ -266,6 +285,7 @@ def _init_b2bsi_object(self) -> Union[BED2BEDSearchInterface, None]: :return: Bed2BEDSearchInterface object """ try: + _LOGGER.info(f"Initializing search interfaces...") return BED2BEDSearchInterface( backend=self.qdrant_engine, query2vec=BED2Vec(model=self._config.path.region2vec), @@ -286,6 +306,7 @@ def _init_pephubclient() -> Union[PEPHubClient, None]: :return: PephubClient """ try: + _LOGGER.info(f"Initializing PEPHub client...") return PEPHubClient() except Exception as e: _LOGGER.error(f"Error in creating PephubClient object: {e}") @@ -317,6 +338,7 @@ def _init_r2v_object(self) -> Union[Region2VecExModel, None]: Create Region2VecExModel object using credentials provided in config file """ try: + _LOGGER.info(f"Initializing R2V object...") return Region2VecExModel(self.config.path.region2vec) except Exception as e: _LOGGER.error(f"Error in creating Region2VecExModel object: {e}") diff --git a/bbconf/db_utils.py b/bbconf/db_utils.py index d929ec8..aff568d 100644 --- a/bbconf/db_utils.py +++ b/bbconf/db_utils.py @@ -130,6 +130,9 @@ class Bed(Base): ref_classifier: Mapped["GenomeRefStats"] = relationship( "GenomeRefStats", back_populates="bed", cascade="all, delete-orphan" ) + processed: Mapped[bool] = mapped_column( + default=False, comment="Whether the bed file was processed" + ) class BedMetadata(Base): @@ -255,6 +258,11 @@ class Files(Base): bedfile: Mapped["Bed"] = relationship("Bed", back_populates="files") bedset: Mapped["BedSets"] = relationship("BedSets", back_populates="files") + __table_args__ = ( + UniqueConstraint("name", "bedfile_id"), + UniqueConstraint("name", "bedset_id"), + ) + class BedFileBedSetRelation(Base): __tablename__ = "bedfile_bedset_relation" @@ -303,6 +311,10 @@ class BedSets(Base): author: Mapped[str] = mapped_column(nullable=True, comment="Author of the bedset") source: Mapped[str] = mapped_column(nullable=True, comment="Source of the bedset") + processed: Mapped[bool] = mapped_column( + default=False, comment="Whether the bedset was processed" + ) + class Universes(Base): __tablename__ = "universes" @@ -339,7 +351,7 @@ class TokenizedBed(Base): nullable=False, ) universe_id: Mapped[str] = mapped_column( - ForeignKey("universes.id", ondelete="CASCADE", passive_deletes=True), + ForeignKey("universes.id", ondelete="CASCADE"), primary_key=True, index=True, nullable=False, @@ -352,7 +364,7 @@ class TokenizedBed(Base): universe: Mapped["Universes"] = relationship( "Universes", back_populates="tokenized", - passive_deletes=True, + passive_deletes="all", ) diff --git a/bbconf/helpers.py b/bbconf/helpers.py index 47fe67c..4cb3792 100644 --- a/bbconf/helpers.py +++ b/bbconf/helpers.py @@ -22,6 +22,8 @@ def get_bedbase_cfg(cfg: str = None) -> str: Optional, the $BEDBASE config env var will be used if not provided :return str: absolute configuration file path """ + + _LOGGER.info(f"Loading configuration file: {cfg}") selected_cfg = select_config(config_filepath=cfg, config_env_vars=CFG_ENV_VARS) if not selected_cfg: raise BedBaseConnectionError( diff --git a/bbconf/models/bed_models.py b/bbconf/models/bed_models.py index 4ba7118..745a5ca 100644 --- a/bbconf/models/bed_models.py +++ b/bbconf/models/bed_models.py @@ -75,7 +75,7 @@ class BedStatsModel(BaseModel): class BedPEPHub(BaseModel): - sample_name: str + sample_name: str = "" genome: str = "" organism: str = "" species_id: str = "" @@ -233,3 +233,9 @@ class RefGenValidModel(BaseModel): tier_ranking: int model_config = ConfigDict(extra="forbid") + + +class RefGenValidReturnModel(BaseModel): + id: str + provided_genome: Union[str, None] = None + compared_genome: List[RefGenValidModel] diff --git a/bbconf/modules/bedfiles.py b/bbconf/modules/bedfiles.py index 3716178..c04a547 100644 --- a/bbconf/modules/bedfiles.py +++ b/bbconf/modules/bedfiles.py @@ -1,3 +1,4 @@ +import datetime import os from logging import getLogger from typing import Dict, List, Union @@ -10,10 +11,12 @@ from pydantic import BaseModel from qdrant_client.models import Distance, PointIdsList, VectorParams from sqlalchemy import and_, delete, func, or_, select +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session, aliased from tqdm import tqdm from bbconf.config_parser.bedbaseconfig import BedBaseConfig +from geniml.search.backends import QdrantBackend from bbconf.const import DEFAULT_LICENSE, PKG_NAME, ZARR_TOKENIZED_FOLDER from bbconf.db_utils import ( Bed, @@ -49,6 +52,7 @@ FileModel, QdrantSearchResult, RefGenValidModel, + RefGenValidReturnModel, StandardMeta, TokenizedBedResponse, TokenizedPathResponse, @@ -438,6 +442,45 @@ def get_ids_list( results=result_list, ) + def get_reference_validation(self, identifier: str) -> RefGenValidReturnModel: + """ + Get results of reference genome validation for the bed file. + + :param identifier: bed file identifier + :return: reference genome validation results + """ + + if not self.exists(identifier): + raise BEDFileNotFoundError(f"Bed file with id: {identifier} not found.") + + with Session(self._sa_engine) as session: + statement = select(GenomeRefStats).where( + GenomeRefStats.bed_id == identifier + ) + + results = session.scalars(statement) + + result_list = [] + + for result in results: + result_list.append( + RefGenValidModel( + provided_genome=result.provided_genome, + compared_genome=result.compared_genome, + xs=result.xs, + oobr=result.oobr, + sequence_fit=result.sequence_fit, + assigned_points=result.assigned_points, + tier_ranking=result.tier_ranking, + ) + ) + + return RefGenValidReturnModel( + id=identifier, + provided_genome=result.provided_genome, + compared_genome=result_list, + ) + def add( self, identifier: str, @@ -454,6 +497,7 @@ def add( local_path: str = None, overwrite: bool = False, nofail: bool = False, + processed: bool = True, ) -> None: """ Add bed file to the database. @@ -473,6 +517,7 @@ def add( :param local_path: local path to the output files :param overwrite: overwrite bed file if it already exists :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite + :param processed: true if bedfile was processed and statistics and plots were calculated :return: None """ _LOGGER.info(f"Adding bed file to database. bed_id: {identifier}") @@ -514,6 +559,7 @@ def add( _LOGGER.warning( f"Could not upload to pephub. Error: {e}. nofail: {nofail}" ) + upload_pephub = False if not nofail: raise e else: @@ -554,6 +600,7 @@ def add( license_id=license_id, indexed=upload_qdrant, pephub=upload_pephub, + processed=processed, ) session.add(new_bed) if upload_s3: @@ -608,18 +655,21 @@ def add( def update( self, identifier: str, - stats: dict, - metadata: dict = None, - plots: dict = None, - files: dict = None, - classification: dict = None, - add_to_qdrant: bool = False, - upload_pephub: bool = False, - upload_s3: bool = False, + stats: Union[dict, None] = None, + metadata: Union[dict, None] = None, + plots: Union[dict, None] = None, + files: Union[dict, None] = None, + classification: Union[dict, None] = None, + ref_validation: Union[Dict[str, BaseModel], None] = None, + license_id: str = DEFAULT_LICENSE, + upload_qdrant: bool = True, + upload_pephub: bool = True, + upload_s3: bool = True, local_path: str = None, overwrite: bool = False, nofail: bool = False, - ): + processed: bool = True, + ) -> None: """ Update bed file to the database. @@ -631,23 +681,34 @@ def update( :param plots: bed file plots :param files: bed file files :param classification: bed file classification - :param add_to_qdrant: add bed file to qdrant indexs + :param ref_validation: reference validation data. RefGenValidModel + :param license_id: bed file license id (default: 'DUO:0000042'). + :param upload_qdrant: add bed file to qdrant indexs :param upload_pephub: add bed file to pephub :param upload_s3: upload files to s3 :param local_path: local path to the output files :param overwrite: overwrite bed file if it already exists :param nofail: do not raise an error for error in pephub/s3/qdrant or record exsist and not overwrite + :param processed: true if bedfile was processed and statistics and plots were calculated :return: None """ if not self.exists(identifier): raise BEDFileNotFoundError( f"Bed file with id: {identifier} not found. Cannot update." ) + _LOGGER.info(f"Updating bed file: '{identifier}'") - stats = BedStatsModel(**stats) - plots = BedPlots(**plots) - files = BedFiles(**files) - classification = BedClassification(**classification) + if license_id not in self.bb_agent.list_of_licenses and not license_id: + raise BedBaseConfError( + f"License: {license_id} is not in the list of licenses. Please provide a valid license." + f"List of licenses: {self.bb_agent.list_of_licenses}" + ) + + stats = BedStatsModel(**stats if stats else {}) + plots = BedPlots(**plots if plots else {}) + files = BedFiles(**files if files else {}) + bed_metadata = StandardMeta(**metadata if metadata else {}) + classification = BedClassification(**classification if classification else {}) if upload_pephub: metadata = BedPEPHub(**metadata) @@ -662,56 +723,261 @@ def update( else: _LOGGER.info("upload_pephub set to false. Skipping pephub..") - if add_to_qdrant: + if upload_qdrant: self.upload_file_qdrant( identifier, files.bed_file.path, payload=metadata.model_dump() ) - statement = select(Bed).where(and_(Bed.id == identifier)) - - if upload_s3: - _LOGGER.warning("S3 upload is not implemented yet") - # if files: - # files = self._config.upload_files_s3( - # identifier, files=files, base_path=local_path, type="files" - # ) - # - # if plots: - # plots = self._config.upload_files_s3( - # identifier, files=plots, base_path=local_path, type="plots" - # ) - with Session(self._sa_engine) as session: - bed_object = session.scalar(statement) + bed_statement = select(Bed).where(and_(Bed.id == identifier)) + bed_object = session.scalar(bed_statement) - setattr(bed_object, **stats.model_dump()) - setattr(bed_object, **classification.model_dump()) + self._update_classification( + sa_session=session, bed_object=bed_object, classification=classification + ) - bed_object.indexed = add_to_qdrant - bed_object.pephub = upload_pephub + self._update_metadata( + sa_session=session, + bed_object=bed_object, + bed_metadata=bed_metadata, + ) + self._update_stats(sa_session=session, bed_object=bed_object, stats=stats) if upload_s3: - _LOGGER.warning("S3 upload is not implemented yet") - # for k, v in files: - # if v: - # new_file = Files( - # **v.model_dump(exclude_none=True, exclude_unset=True), - # bedfile_id=identifier, - # type="file", - # ) - # session.add(new_file) - # for k, v in plots: - # if v: - # new_plot = Files( - # **v.model_dump(exclude_none=True, exclude_unset=True), - # bedfile_id=identifier, - # type="plot", - # ) - # session.add(new_plot) + self._update_plots( + sa_session=session, + bed_object=bed_object, + plots=plots, + local_path=local_path, + ) + self._update_files( + sa_session=session, + bed_object=bed_object, + files=files, + local_path=local_path, + ) + + self._update_ref_validation( + sa_session=session, bed_object=bed_object, ref_validation=ref_validation + ) + + bed_object.processed = processed + bed_object.indexed = upload_qdrant + bed_object.last_update_date = datetime.datetime.now(datetime.timezone.utc) session.commit() - raise NotImplementedError + return None + + @staticmethod + def _update_classification( + sa_session: Session, bed_object: Bed, classification: BedClassification + ) -> None: + """ + Update bed file classification + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param classification: bed file classification as BedClassification object + + :return: None + """ + classification_dict = classification.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + for k, v in classification_dict.items(): + setattr(bed_object, k, v) + + sa_session.commit() + + @staticmethod + def _update_stats( + sa_session: Session, bed_object: Bed, stats: BedStatsModel + ) -> None: + """ + Update bed file statistics + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param stats: bed file statistics as BedStatsModel object + :return: None + """ + + stats_dict = stats.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + if not bed_object.stats: + new_bedstat = BedStats(**stats.model_dump(), id=bed_object.id) + sa_session.add(new_bedstat) + else: + for k, v in stats_dict.items(): + setattr(bed_object.stats, k, v) + + sa_session.commit() + + @staticmethod + def _update_metadata( + sa_session: Session, bed_object: Bed, bed_metadata: StandardMeta + ) -> None: + """ + Update bed file metadata + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param bed_metadata: bed file metadata as StandardMeta object + + :return: None + """ + + metadata_dict = bed_metadata.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + if not bed_object.annotations: + new_metadata = BedMetadata( + **bed_metadata.model_dump(exclude={"description"}), id=bed_object.id + ) + sa_session.add(new_metadata) + else: + for k, v in metadata_dict.items(): + setattr(bed_object.annotations, k, v) + + sa_session.commit() + + def _update_plots( + self, + sa_session: Session, + bed_object: Bed, + plots: BedPlots, + local_path: str = None, + ) -> None: + """ + Update bed file plots + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param plots: bed file plots + :param local_path: local path to the output files + """ + + _LOGGER.info("Updating bed file plots..") + if plots: + plots = self._config.upload_files_s3( + bed_object.id, files=plots, base_path=local_path, type="plots" + ) + plots_dict = plots.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + if not plots_dict: + return None + + for k, v in plots: + if v: + new_plot = Files( + **v.model_dump( + exclude_none=True, + exclude_unset=True, + exclude={"object_id", "access_methods"}, + ), + bedfile_id=bed_object.id, + type="plot", + ) + try: + sa_session.add(new_plot) + sa_session.commit() + except IntegrityError as _: + sa_session.rollback() + _LOGGER.debug( + f"Plot with name: {v.name} already exists. Updating.." + ) + + return None + + def _update_files( + self, + sa_session: Session, + bed_object: Bed, + files: BedFiles, + local_path: str = None, + ) -> None: + """ + Update bed files + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param files: bed file files + """ + + _LOGGER.info("Updating bed files..") + if files: + files = self._config.upload_files_s3( + bed_object.id, files=files, base_path=local_path, type="files" + ) + + files_dict = files.model_dump( + exclude_defaults=True, exclude_none=True, exclude_unset=True + ) + if not files_dict: + return None + + for k, v in files: + if v: + new_file = Files( + **v.model_dump( + exclude_none=True, + exclude_unset=True, + exclude={"object_id", "access_methods"}, + ), + bedfile_id=bed_object.id, + type="file", + ) + + try: + sa_session.add(new_file) + sa_session.commit() + except IntegrityError as _: + sa_session.rollback() + _LOGGER.debug( + f"File with name: {v.name} already exists. Updating.." + ) + + @staticmethod + def _update_ref_validation( + sa_session: Session, bed_object: Bed, ref_validation: Dict[str, BaseModel] + ) -> None: + """ + Update reference validation data + + ! This function won't update the reference validation data, if it exists, it will skip it. + + :param sa_session: sqlalchemy session + :param bed_object: bed sqlalchemy object + :param ref_validation: bed file metadata + """ + + if not ref_validation: + return None + + _LOGGER.info("Updating reference validation data..") + + for ref_gen_check, data in ref_validation.items(): + new_gen_ref = GenomeRefStats( + **RefGenValidModel( + **data.model_dump(), + provided_genome=bed_object.genome_alias, + compared_genome=ref_gen_check, + ).model_dump(), + bed_id=bed_object.id, + ) + try: + sa_session.add(new_gen_ref) + sa_session.commit() + except IntegrityError as _: + sa_session.rollback() + _LOGGER.info( + f"Reference validation exists for BED id: {bed_object.id} and ref_gen_check." + ) + + return None def delete(self, identifier: str) -> None: """ @@ -754,17 +1020,22 @@ def upload_pephub(self, identifier: str, metadata: dict, overwrite: bool = False overwrite=overwrite, ) - def update_pephub(self, identifier: str, metadata: dict, overwrite: bool = False): - if not metadata: - _LOGGER.warning("No metadata provided. Skipping pephub upload..") - return False - self._config.phc.sample.update( - namespace=self._config.config.phc.namespace, - name=self._config.config.phc.name, - tag=self._config.config.phc.tag, - sample_name=identifier, - sample_dict=metadata, - ) + def update_pephub( + self, identifier: str, metadata: dict, overwrite: bool = False + ) -> None: + try: + if not metadata: + _LOGGER.warning("No metadata provided. Skipping pephub upload..") + return None + self._config.phc.sample.update( + namespace=self._config.config.phc.namespace, + name=self._config.config.phc.name, + tag=self._config.config.phc.tag, + sample_name=identifier, + sample_dict=metadata, + ) + except ResponseError as e: + _LOGGER.warning(f"Could not update pephub. Error: {e}") def delete_pephub_sample(self, identifier: str): """ @@ -800,6 +1071,10 @@ def upload_file_qdrant( """ _LOGGER.debug(f"Adding bed file to qdrant. bed_id: {bed_id}") + + if not isinstance(self._qdrant_engine, QdrantBackend): + raise QdrantInstanceNotInitializedError("Could not upload file.") + bed_embedding = self._embed_file(bed_file) self._qdrant_engine.load( @@ -826,7 +1101,11 @@ def _embed_file(self, bed_file: Union[str, RegionSet]) -> np.ndarray: ) if isinstance(bed_file, str): - bed_region_set = GRegionSet(bed_file) + # Use try if file is corrupted. In Python RegionSet we have functionality to tackle this problem + try: + bed_region_set = GRegionSet(bed_file) + except RuntimeError as _: + bed_region_set = RegionSet(bed_file) elif isinstance(bed_file, RegionSet) or isinstance(bed_file, GRegionSet): bed_region_set = bed_file else: @@ -1336,3 +1615,54 @@ def get_missing_plots( results = [result for result in results] return results + + def get_unprocessed(self, limit: int = 1000, offset: int = 0) -> BedListResult: + """ + Get bed files that are not processed. + + :param limit: number of results to return + :param offset: offset to start from + + :return: list of bed file identifiers + """ + with Session(self._sa_engine) as session: + query = ( + select(Bed).where(Bed.processed.is_(False)).limit(limit).offset(offset) + ) + count_query = select(func.count()).where(Bed.processed.is_(False)) + + count = session.execute(count_query).one()[0] + + bed_results = session.scalars(query) + + results = [] + for bed_object in bed_results: + results.append( + BedMetadataBasic( + id=bed_object.id, + name=bed_object.name, + genome_alias=bed_object.genome_alias, + genome_digest=bed_object.genome_digest, + bed_type=bed_object.bed_type, + bed_format=bed_object.bed_format, + description=bed_object.description, + annotation=StandardMeta( + **( + bed_object.annotations.__dict__ + if bed_object.annotations + else {} + ) + ), + last_update_date=bed_object.last_update_date, + submission_date=bed_object.submission_date, + is_universe=bed_object.is_universe, + license_id=bed_object.license_id, + ) + ) + + return BedListResult( + count=count, + limit=limit, + offset=offset, + results=results, + ) diff --git a/bbconf/modules/bedsets.py b/bbconf/modules/bedsets.py index 3c6ceb8..7d99105 100644 --- a/bbconf/modules/bedsets.py +++ b/bbconf/modules/bedsets.py @@ -3,18 +3,18 @@ from geniml.io.utils import compute_md5sum_bedset from sqlalchemy import Float, Numeric, func, or_, select -from sqlalchemy.orm import Session, relationship from sqlalchemy.exc import IntegrityError +from sqlalchemy.orm import Session, relationship from bbconf.config_parser import BedBaseConfig from bbconf.const import PKG_NAME from bbconf.db_utils import Bed, BedFileBedSetRelation, BedSets, BedStats, Files from bbconf.exceptions import ( BedBaseConfError, + BEDFileNotFoundError, BedSetExistsError, BedSetNotFoundError, BedSetTrackHubLimitError, - BEDFileNotFoundError, ) from bbconf.models.bed_models import BedStatsModel, StandardMeta from bbconf.models.bedset_models import ( @@ -291,6 +291,7 @@ def create( local_path: str = "", no_fail: bool = False, overwrite: bool = False, + processed: bool = True, ) -> None: """ Create bedset in the database. @@ -307,6 +308,7 @@ def create( :param local_path: local path to the output files :param no_fail: do not raise an error if bedset already exists :param overwrite: overwrite the record in the database + :param processed: flag to indicate that bedset is processed. [Default: True] :return: None """ _LOGGER.info(f"Creating bedset '{identifier}'") @@ -347,6 +349,7 @@ def create( md5sum=compute_md5sum_bedset(bedid_list), author=annotation.get("author"), source=annotation.get("source"), + processed=processed, ) if upload_s3: @@ -599,6 +602,60 @@ def exists(self, identifier: str) -> bool: return True return False + def get_unprocessed(self, limit: int = 100, offset: int = 0) -> BedSetListResult: + """ + Get unprocessed bedset from the database. + + :param limit: limit of results + :param offset: offset of results + + :return: bedset metadata + """ + + with Session(self._db_engine.engine) as session: + + statement = ( + select(BedSets) + .where(BedSets.processed.is_(False)) + .limit(limit) + .offset(offset) + ) + count_statement = select(func.count()).where(BedSets.processed.is_(False)) + + count = session.execute(count_statement).one()[0] + + bedset_object_list = session.scalars(statement) + + results = [] + + for bedset_obj in bedset_object_list: + list_of_bedfiles = [ + bedset_obj.bedfile_id for bedset_obj in bedset_obj.bedfiles + ] + + results.append( + BedSetMetadata( + id=bedset_obj.id, + name=bedset_obj.name, + description=bedset_obj.description, + md5sum=bedset_obj.md5sum, + statistics=None, + plots=None, + bed_ids=list_of_bedfiles, + submission_date=bedset_obj.submission_date, + last_update_date=bedset_obj.last_update_date, + author=bedset_obj.author, + source=bedset_obj.source, + ) + ) + + return BedSetListResult( + count=count, + limit=limit, + offset=offset, + results=results, + ) + def add_bedfile(self, identifier: str, bedfile: str) -> None: raise NotImplementedError diff --git a/tests/conftest.py b/tests/conftest.py index 773aaa6..2f269aa 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,19 +16,6 @@ -p 5432:5432 postgres """ -# try: -# subprocess.check_output( -# "docker inspect bedbase-test --format '{{.State.Status}}'", shell=True -# ) -# SERVICE_UNAVAILABLE = False -# except: -# register( -# print, f"Some tests require a test database. To initiate it, run:\n{DB_CMD}" -# ) -# SERVICE_UNAVAILABLE = True -SERVICE_UNAVAILABLE = False - - TESTS_DIR = os.path.dirname(os.path.abspath(__file__)) CONFIG_PATH = os.path.join( @@ -40,6 +27,14 @@ "data", ) +# try: +# BedBaseAgent(config=CONFIG_PATH) +# SERVICE_UNAVAILABLE = False +# except Exception as _: +# SERVICE_UNAVAILABLE = True +SERVICE_UNAVAILABLE = False + + if not SERVICE_UNAVAILABLE: agent = BedBaseAgent(config=CONFIG_PATH) diff --git a/tests/test_bedfile.py b/tests/test_bedfile.py index 07489c7..9875b78 100644 --- a/tests/test_bedfile.py +++ b/tests/test_bedfile.py @@ -204,13 +204,44 @@ def test_bed_delete_not_found(self, bbagent_obj): with pytest.raises(BEDFileNotFoundError): bbagent_obj.bed.delete("not_found") - @pytest.mark.skip("Skipped, not fully implemented") - def test_bed_update(self): - # agent = BedBaseAgent(config=config) - # ff = agent.bed.update("91b2754c8ff01769bacfc80e6923c46e", {"number_of_regions": 44}) - # print(ff) - # assert ff != None - pass + def test_bed_update(self, bbagent_obj): + + # TODO: has to be expanded + with ContextManagerDBTesting(config=bbagent_obj.config, add_data=True): + + bed_file = bbagent_obj.bed.get(BED_TEST_ID, full=True) + # assert bed_file.annotation.model_dump(exclude_defaults=True) == {} + assert bed_file.annotation.cell_line == "" + + new_metadata = { + "cell_line": "K562", + "tissue": "blood", + } + bbagent_obj.bed.update( + identifier=BED_TEST_ID, + metadata=new_metadata, + upload_qdrant=False, + upload_s3=False, + ) + + new_bed_file = bbagent_obj.bed.get(BED_TEST_ID, full=True) + + assert new_bed_file.annotation.cell_line == "K562" + + def test_get_unprocessed(self, bbagent_obj): + with ContextManagerDBTesting(config=bbagent_obj.config, add_data=True): + return_result = bbagent_obj.bed.get_unprocessed(limit=100, offset=0) + + assert return_result.count == 1 + assert return_result.results[0].id == BED_TEST_ID + + def test_get_missing_plots(self, bbagent_obj): + with ContextManagerDBTesting(config=bbagent_obj.config, add_data=True): + return_result = bbagent_obj.bed.get_missing_plots( + "tss_distance", limit=100, offset=0 + ) + + assert return_result[0] == BED_TEST_ID @pytest.mark.skip("Skipped, because ML models and qdrant needed") diff --git a/tests/test_bedset.py b/tests/test_bedset.py index e5b03da..ecc8948 100644 --- a/tests/test_bedset.py +++ b/tests/test_bedset.py @@ -204,3 +204,10 @@ def test_delete_s3_error(self, bbagent_obj): ): with pytest.raises(BedbaseS3ConnectionError): bbagent_obj.bedset.delete(BEDSET_TEST_ID) + + def test_retrieve_unprocessed(self, bbagent_obj): + with ContextManagerDBTesting( + config=bbagent_obj.config, add_data=True, bedset=True + ): + result = bbagent_obj.bedset.get_unprocessed() + assert result.count == 1 diff --git a/tests/utils.py b/tests/utils.py index 7ceb072..e1be271 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -39,6 +39,7 @@ def get_example_dict() -> dict: "genome_alias": "hg38", "genome_digest": "2230c535660fb4774114bfa966a62f823fdb6d21acf138d4", "name": "random_name", + "processed": False, } return value @@ -109,6 +110,7 @@ def __enter__(self): self._add_bedset_data() def __exit__(self, exc_type, exc_value, exc_traceback): + # If we want to keep data, and schema, comment out the following line self.db_engine.delete_schema() pass @@ -135,6 +137,7 @@ def _add_bedset_data(self): bedset_means=stats, bedset_standard_deviation=stats, md5sum="bbad0000000000000000000000000000", + processed=False, ) new_bed_bedset = BedFileBedSetRelation( bedfile_id=BED_TEST_ID,