Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add docdb utils #57

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions src/aind_data_access_api/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
"""Package for common methods used for interfacing with DocDB."""

from typing import Dict, Iterator, List, Optional
from urllib.parse import urlparse

from pymongo import MongoClient


def get_s3_bucket_and_prefix(s3_location: str) -> Dict[str, str]:
"""
For a location url like s3://bucket/prefix, it will return the bucket
and prefix. It doesn't check the scheme is s3. It will strip the leading
and trailing forward slashes from the prefix.
Parameters
----------
s3_location : str
For example, 's3://some_bucket/some_prefix'

Returns
-------
Dict[str, str]
For example, {'bucket': 'some_bucket', 'prefix': 'some_prefix'}

"""
parts = urlparse(s3_location, allow_fragments=False)
stripped_prefix = parts.path.strip("/")
return {"bucket": parts.netloc, "prefix": stripped_prefix}


def get_s3_location(bucket: str, prefix: str) -> str:
"""
For a given bucket and prefix, return a location url in format
s3://{bucket}/{prefix}
Parameters
----------
bucket : str
prefix : str

Returns
-------
str
For example, 's3://some_bucket/some_prefix'

"""
stripped_prefix = prefix.strip("/")
return f"s3://{bucket}/{stripped_prefix}"


def is_dict_corrupt(input_dict: dict) -> bool:
"""
Checks that all the keys, included nested keys, don't contain '$' or '.'
Parameters
----------
input_dict : dict

Returns
-------
bool
True if input_dict is not a dict, or if nested dictionary keys contain
forbidden characters. False otherwise.

"""
if not isinstance(input_dict, dict):
return True
for key, value in input_dict.items():
if "$" in key or "." in key:
return True
elif isinstance(value, dict):
if is_dict_corrupt(value):
return True
return False


def does_metadata_record_exist_in_docdb(
docdb_client: MongoClient,
db_name: str,
collection_name: str,
bucket: str,
prefix: str,
) -> bool:
"""
For a given bucket and prefix, check if there is already a record in DocDb
Parameters
----------
docdb_client : MongoClient
db_name : str
collection_name : str
bucket : str
prefix : str

Returns
-------
True if there is a record in DocDb. Otherwise, False.

"""
location = get_s3_location(bucket=bucket, prefix=prefix)
db = docdb_client[db_name]
collection = db[collection_name]
records = list(
collection.find(
filter={"location": location}, projection={"_id": 1}, limit=1
)
)
if len(records) == 0:
return False
else:
return True


def get_record_from_docdb(
docdb_client: MongoClient,
db_name: str,
collection_name: str,
record_id: str,
) -> Optional[dict]:
"""
Download a record from docdb using the record _id.
Parameters
----------
docdb_client : MongoClient
db_name : str
collection_name : str
record_id : str

Returns
-------
Optional[dict]
None if record does not exist. Otherwise, it will return the record as
a dict.

"""
db = docdb_client[db_name]
collection = db[collection_name]
records = list(collection.find(filter={"_id": record_id}, limit=1))
if len(records) > 0:
return records[0]
else:
return None


def paginate_docdb(
db_name: str,
collection_name: str,
docdb_client: MongoClient,
page_size: int = 1000,
filter_query: Optional[dict] = None,
projection: Optional[dict] = None,
) -> Iterator[List[dict]]:
"""
Paginate through records in DocDb.
Parameters
----------
db_name : str
collection_name : str
docdb_client : MongoClient
page_size : int
Default is 1000
filter_query : Optional[dict]
projection : Optional[dict]

Returns
-------
Iterator[List[dict]]

"""
if filter_query is None:
filter_query = {}
if projection is None:
projection = {}
db = docdb_client[db_name]
collection = db[collection_name]
cursor = collection.find(filter=filter_query, projection=projection)
obj = next(cursor, None)
while obj:
page = []
while len(page) < page_size and obj:
page.append(obj)
obj = next(cursor, None)
yield page


def build_docdb_location_to_id_map(
db_name: str,
collection_name: str,
docdb_client: MongoClient,
bucket: str,
prefixes: List[str],
) -> Dict[str, str]:
"""
For a given s3 bucket and list of prefixes, return a dictionary that looks
like {'s3://bucket/prefix': 'abc-1234'} where the value is the id of the
record in DocDb. If the record does not exist, then there will be no key
in the dictionary.
Parameters
----------
db_name : str
collection_name : ste
docdb_client : MongoClient
bucket : str
prefixes : List[str]

Returns
-------
Dict[str, str]

"""
locations = [get_s3_location(bucket=bucket, prefix=p) for p in prefixes]
filter_query = {"location": {"$in": locations}}
projection = {"_id": 1, "location": 1}
db = docdb_client[db_name]
collection = db[collection_name]
results = collection.find(filter=filter_query, projection=projection)
location_to_id_map = {r["location"]: r["_id"] for r in results}
return location_to_id_map
116 changes: 116 additions & 0 deletions tests/resources/utils/example_metadata.nd.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
{
"_id": "488bbe42-832b-4c37-8572-25eb87cc50e2",
"acquisition": null,
"created": "2024-05-13T22:01:56.035469",
"data_description": null,
"describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/core/metadata.py",
"external_links": [],
"instrument": null,
"last_modified": "2024-05-13T22:01:56.035475",
"location": "s3://aind-ephys-data-dev-u5u0i5/ecephys_642478_2023-01-17_13-56-29",
"metadata_status": "Unknown",
"name": "ecephys_642478_2023-01-17_13-56-29",
"procedures": null,
"processing": {
"data_processes": [
{
"code_url": "https://github.com/AllenNeuralDynamics/aind-data-transfer",
"end_date_time": "2023-01-20T19:13:36.434644+00:00",
"input_location": "\\\\allen\\programs\\aind\\workgroups\\ephys\\data\\ephys\\642478_2023-01-17_13-56-29",
"name": "Ephys preprocessing",
"notes": null,
"output_location": "s3://aind-ephys-data/ecephys_642478_2023-01-17_13-56-29",
"parameters": {
"aws_secret_names": {
"code_ocean_api_token_name": "codeocean-api-token",
"region": "us-west-2",
"video_encryption_password": "video_encryption_password"
},
"clip_data_job": {
"clip_kwargs": {}
},
"compress_data_job": {
"compressor": {
"compressor_name": "wavpack",
"kwargs": {
"level": 3
}
},
"format_kwargs": {},
"scale_params": {},
"write_kwargs": {
"chunk_duration": "1s",
"n_jobs": 24,
"progress_bar": true
}
},
"data": {
"name": "openephys"
},
"endpoints": {
"code_repo_location": "https://github.com/AllenNeuralDynamics/aind-data-transfer",
"codeocean_domain": "https://codeocean.allenneuraldynamics.org",
"dest_data_dir": "ecephys_642478_2023-01-17_13-56-29",
"gcp_prefix": "ecephys_642478_2023-01-17_13-56-29",
"metadata_service_url": "http://aind-metadata-service",
"raw_data_dir": "\\\\allen\\programs\\aind\\workgroups\\ephys\\data\\ephys\\642478_2023-01-17_13-56-29",
"s3_bucket": "aind-ephys-data",
"s3_prefix": "ecephys_642478_2023-01-17_13-56-29"
},
"jobs": {
"attach_metadata": true,
"clip": true,
"compress": true,
"trigger_codeocean_job": true,
"upload_to_gcp": false,
"upload_to_s3": true
},
"logging": {
"level": "INFO"
},
"trigger_codeocean_job": {
"bucket": "aind-ephys-data",
"capsule_id": "648473aa-791e-4372-bd25-205cc587ec56",
"job_type": "openephys",
"prefix": "ecephys_642478_2023-01-17_13-56-29"
},
"upload_data_job": {
"dryrun": false
}
},
"start_date_time": "2023-01-20T19:06:02.945386+00:00",
"version": "0.2.9"
}
],
"describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/src/aind_data_schema/processing.py",
"pipeline_url": null,
"pipeline_version": null,
"schema_version": "0.1.0"
},
"rig": null,
"schema_version": "0.2.7",
"session": null,
"subject": {
"background_strain": null,
"breeding_group": "Chat-IRES-Cre-neo",
"date_of_birth": "2022-07-16",
"describedBy": "https://raw.githubusercontent.com/AllenNeuralDynamics/aind-data-schema/main/site-packages/aind_data_schema/subject.py",
"genotype": "Chat-IRES-Cre-neo/Chat-IRES-Cre-neo",
"home_cage_enrichment": null,
"light_cycle": null,
"maternal_genotype": "Chat-IRES-Cre-neo/Chat-IRES-Cre-neo",
"maternal_id": "624133",
"mgi_allele_ids": null,
"notes": null,
"paternal_genotype": "Chat-IRES-Cre-neo/Chat-IRES-Cre-neo",
"paternal_id": "624115",
"restrictions": null,
"rrid": null,
"schema_version": "0.2.2",
"sex": "Male",
"source": null,
"species": "Mus musculus",
"subject_id": "642478",
"wellness_reports": null
}
}
Loading
Loading