-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
19 changed files
with
1,156 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions | ||
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python | ||
|
||
name: Python package | ||
|
||
on: | ||
push: | ||
branches: [ "main" ] | ||
pull_request: | ||
branches: [ "main" ] | ||
|
||
jobs: | ||
build: | ||
|
||
runs-on: ubuntu-latest | ||
strategy: | ||
fail-fast: false | ||
matrix: | ||
python-version: ["3.9", "3.10", "3.11"] | ||
|
||
steps: | ||
- uses: actions/checkout@v3 | ||
- name: Set up Python ${{ matrix.python-version }} | ||
uses: actions/setup-python@v3 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
- name: Install dependencies | ||
run: | | ||
python -m pip install --upgrade pip | ||
python -m pip install flake8 pytest | ||
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi | ||
- name: Lint with flake8 | ||
run: | | ||
# stop the build if there are Python syntax errors or undefined names | ||
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics | ||
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide | ||
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics | ||
- name: Test with pytest | ||
run: | | ||
pytest |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
PD_CDM_SRC = "resources/cdm_pd.xlsx" | ||
AD_CDM_SRC = "resources/cdm_ad.csv" | ||
|
||
PPMI_DICT_SRC = "resources/dictionaries/pd/ppmi.csv" | ||
PPMI_EMBEDDINGS_SRC = "resources/embeddings/ppmi.csv" | ||
|
||
LUXPARK_DICT_SRC = "resources/dictionaries/pd/luxpark.xlsx" | ||
LUXPARK_EMBEDDINGS_SRC = "resources/embeddings/luxpark.csv" | ||
|
||
BIOFIND_DICT_SRC = "resources/dictionaries/pd/biofind.csv" | ||
BIOFIND_EMBEDDINGS_SRC = "resources/embeddings/biofind.csv" | ||
|
||
COLORS_AD = {'adni': '#d62728', 'aibl': '#ff7f0e', 'emif': '#8c564b', 'jadni': '#7f7f7f', | ||
'a4': '#aec7e8', 'dod-adni': '#ffbb78', 'prevent-ad': '#98df8a', 'arwibo': '#ff9896', | ||
'i-adni': '#c5b0d5', 'edsd': '#c49c94', 'pharmacog': '#c7c7c7', | ||
'vita': '#bcbd22', 'abvib': '#e0d9e2', 'ad-mapper': '#800000'} | ||
|
||
COLORS_PD = {'opdc': '#1f77b4', 'tpd': '#e377c2', 'biofind': '#9edae5', 'lrrk2': '#f7b6d2', 'luxpark': '#2ca02c', | ||
'ppmi': '#9467bd', 'passionate': '#00ff00'} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import logging | ||
from abc import ABC | ||
import numpy as np | ||
import openai | ||
|
||
|
||
class EmbeddingModel(ABC): | ||
|
||
def get_embedding(self, text: str) -> [float]: | ||
pass | ||
|
||
def get_embeddings(self, messages: [str]) -> [[float]]: | ||
pass | ||
|
||
|
||
class GPT4Adapter(EmbeddingModel): | ||
|
||
def __init__(self, api_key: str): | ||
self.api_key = api_key | ||
openai.api_key = api_key | ||
logging.getLogger().setLevel(logging.INFO) | ||
|
||
def get_embedding(self, text: str, model="text-embedding-ada-002"): | ||
logging.info(f"Getting embedding for {text}") | ||
try: | ||
if text is None or text == "" or text is np.nan: | ||
logging.warn(f"Empty text passed to get_embedding") | ||
return None | ||
if isinstance(text, str): | ||
text = text.replace("\n", " ") | ||
return openai.Embedding.create(input=[text], model=model)['data'][0]['embedding'] | ||
except Exception as e: | ||
logging.error(f"Error getting embedding for {text}: {e}") | ||
return None | ||
|
||
def get_embeddings(self, messages: [str], model="text-embedding-ada-002"): | ||
# store index of nan entries | ||
response = openai.Embedding.create(input=messages, model=model) | ||
return [item['embedding'] for item in response['data']] | ||
|
||
|
||
class TextEmbedding: | ||
|
||
def __init__(self, text: str, embedding: [float]): | ||
self.text = text | ||
self.embedding = embedding |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
from enum import Enum | ||
from thefuzz import fuzz | ||
from thefuzz import process | ||
import pandas as pd | ||
import numpy as np | ||
|
||
from index.mapping import MappingTable | ||
|
||
|
||
class MatchingMethod(Enum): | ||
EUCLIDEAN_EMBEDDING_DISTANCE = 1, | ||
FUZZY_STRING_MATCHING = 2 | ||
|
||
|
||
def enrichment_analysis(source_table: MappingTable, target_table: MappingTable, max_cumulative_match_rank: int = 10, | ||
matching_method=MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE) -> np.ndarray: | ||
""" | ||
Calculate accuracy for the n the closest matches for two mapping tables | ||
:param source_table: the table containing the source descriptions which should be matched | ||
:param target_table: the table containing the target descriptions to which the source descriptions should be matched | ||
:param matching_method: How the matching should be performed - either based on vector embeddings or fuzzy string | ||
matching | ||
:param max_cumulative_match_rank: The n the closest matches that should be taken into consideration | ||
:return: a dataframe containing the matches | ||
""" | ||
# index n will correspond to correctly match within the n the closest variables | ||
correct_matches = np.zeros(max_cumulative_match_rank) | ||
# not every variable can be matched | ||
max_matches = 0 | ||
# clean up source and target table (missing embeddings, descriptions etc.) | ||
source_table.joined_mapping_table.drop_duplicates(subset=['variable'], keep='first', inplace=True) | ||
source_table.joined_mapping_table.dropna(subset=["description"], inplace=True) | ||
target_table.joined_mapping_table.dropna(subset=["description"], inplace=True) | ||
if matching_method == MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE: | ||
source_table.joined_mapping_table.dropna(subset=["embedding"], inplace=True) | ||
target_table.joined_mapping_table.dropna(subset=["embedding"], inplace=True) | ||
# re-index to account for dropped rows | ||
target_table.joined_mapping_table = target_table.joined_mapping_table.reset_index(drop=True) | ||
for idx, source_table_row in source_table.joined_mapping_table.iterrows(): | ||
correct_target_index = target_table.joined_mapping_table[ | ||
target_table.joined_mapping_table["identifier"] == source_table_row["identifier"]].index | ||
if len(correct_target_index) == 0: | ||
# can not be matched -> skip | ||
continue | ||
# match is possible | ||
max_matches += 1 | ||
# compute distances to all possible matches | ||
distances = [] | ||
for idy, target_table_row in target_table.joined_mapping_table.iterrows(): | ||
if matching_method == MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE: | ||
source_table_embedding = source_table_row["embedding"] | ||
target_table_embedding = target_table_row["embedding"] | ||
distances.append(np.linalg.norm(np.array(source_table_embedding) - np.array(target_table_embedding))) | ||
elif matching_method == MatchingMethod.FUZZY_STRING_MATCHING: | ||
source_table_description = source_table_row["description"] | ||
target_table_description = target_table_row["description"] | ||
distances.append(100 - fuzz.ratio(source_table_description, target_table_description)) | ||
else: | ||
raise NotImplementedError("Specified matching method is not implemented!") | ||
min_distance_indices = np.argsort(np.array(distances))[:max_cumulative_match_rank] | ||
for n in range(max_cumulative_match_rank): | ||
# (due to upper level concepts) there may be more than one correct mapping | ||
if any(element in min_distance_indices[:n+1] for element in correct_target_index): | ||
correct_matches[n] += 1 | ||
return (correct_matches / max_matches).round(2) | ||
|
||
|
||
def match_closest_descriptions(source_table: MappingTable, target_table: MappingTable, | ||
matching_method=MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE) -> pd.DataFrame: | ||
""" | ||
Match descriptions from source table to target table based on the biggest similarity | ||
:param source_table: the table containing the source descriptions which should be matched | ||
:param target_table: the table containing the target descriptions to which the source descriptions should be matched | ||
:param matching_method: How the matching should be performed - either based on vector embeddings or fuzzy string | ||
matching | ||
:return: a dataframe containing the matches | ||
""" | ||
# sometimes the same concept gets mapped against multiple concepts in CDM, resulting in artifacts in the results | ||
# -> drop duplicates, only keep first | ||
source_table.joined_mapping_table.drop_duplicates(subset=['variable'], keep='first', inplace=True) | ||
# remove rows from source and target that do not contain either a description (in general) or embedding (for gpt) | ||
source_table.joined_mapping_table.dropna(subset=["description"], inplace=True) | ||
target_table.joined_mapping_table.dropna(subset=["description"], inplace=True) | ||
if matching_method == MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE: | ||
source_table.joined_mapping_table.dropna(subset=["embedding"], inplace=True) | ||
target_table.joined_mapping_table.dropna(subset=["embedding"], inplace=True) | ||
# method -> compute distance based on embeddings | ||
if matching_method == MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE: | ||
if "embedding" not in source_table.joined_mapping_table.columns \ | ||
or "embedding" not in target_table.joined_mapping_table.columns: | ||
raise ValueError("Mapping tables must contain an 'embedding' column") | ||
# re-index to account for dropped rows | ||
target_table.joined_mapping_table = target_table.joined_mapping_table.reset_index(drop=True) | ||
# METHOD: Euclidean Distance based on embeddings | ||
if matching_method == MatchingMethod.EUCLIDEAN_EMBEDDING_DISTANCE: | ||
if "embedding" not in source_table.joined_mapping_table.columns \ | ||
or "embedding" not in target_table.joined_mapping_table.columns: | ||
raise ValueError("Mapping tables must contain an 'embedding' column") | ||
source_embeddings = source_table.get_embeddings_numpy() | ||
target_embeddings = target_table.get_embeddings_numpy() | ||
distance_matrix = np.linalg.norm(source_embeddings[:, np.newaxis] - target_embeddings, axis=-1) | ||
closest_indices = np.argmin(distance_matrix, axis=1) | ||
distances = np.min(distance_matrix, axis=1) | ||
matched_target_descriptions = target_table.joined_mapping_table.loc[closest_indices, 'description'].tolist() | ||
# METHOD: Fuzzy String Matching based on Levenstein Distance | ||
elif matching_method == MatchingMethod.FUZZY_STRING_MATCHING: | ||
if "description" not in source_table.joined_mapping_table.columns \ | ||
or "description" not in target_table.joined_mapping_table.columns: | ||
raise ValueError("Mapping tables must contain an 'description' column") | ||
source_descriptions = source_table.joined_mapping_table["description"].to_numpy() | ||
target_descriptions = target_table.joined_mapping_table["description"].to_numpy() | ||
target_descriptions_dict = {idx: el for idx, el in enumerate(target_descriptions)} | ||
closest_indices = [] | ||
distances = [] | ||
matched_target_descriptions = [] | ||
for source_description in source_descriptions: | ||
matched_target_description, distance, target_idx = process.extractOne(source_description, | ||
target_descriptions_dict) | ||
closest_indices.append(target_idx) | ||
matched_target_descriptions.append(matched_target_description) | ||
# it is not a distance but a score [0,100] in this case -> take inverse (+1 to avoid division by 0) | ||
distances.append(1 / (101 - distance)) | ||
# NOT IMPLEMENTED -> raise error | ||
else: | ||
raise ValueError("Specified Matching method is not implemented!") | ||
source_concept_label = source_table.joined_mapping_table["identifier"] | ||
target_concept_label = target_table.joined_mapping_table.loc[closest_indices, 'identifier'].tolist() | ||
source_variable = source_table.joined_mapping_table["variable"] | ||
target_variable = target_table.joined_mapping_table.loc[closest_indices, 'variable'].tolist() | ||
correct = source_concept_label == target_concept_label | ||
ground_truth_target_descriptions = get_ground_truth_target_descriptions(source_table.joined_mapping_table, | ||
target_table.joined_mapping_table) | ||
source_descriptions = source_table.joined_mapping_table["description"] | ||
result = pd.DataFrame({"correct": correct, | ||
"source_variable": source_variable, | ||
"target_variable": target_variable, | ||
"source_concept_label": source_concept_label, | ||
"target_concept_label": target_concept_label, | ||
"source_description": source_descriptions, | ||
"matched_target_description": matched_target_descriptions, | ||
"ground_truth_target_description": ground_truth_target_descriptions, | ||
"distance": distances}) | ||
return result | ||
|
||
|
||
def get_ground_truth_target_descriptions(source_table: pd.DataFrame, target_table: pd.DataFrame) -> np.ndarray[str]: | ||
""" | ||
Get the ground truth target descriptions based on the matched identifiers | ||
:param source_table: The source table containing the identifiers | ||
:param target_table: The target table containing the identifiers and descriptions | ||
:return: An ordered numpy array containing the ground truth target descriptions | ||
""" | ||
# TODO: This is a very slow implementation, but it works for now | ||
descriptions = [] | ||
for source_id in source_table["identifier"]: | ||
try: | ||
target_description = target_table.loc[target_table["identifier"] == source_id, "description"].iloc[0] | ||
descriptions.append(target_description) | ||
except IndexError: | ||
descriptions.append(None) | ||
return np.array(descriptions) | ||
|
||
|
||
def score_mappings(matches: pd.DataFrame) -> float: | ||
""" | ||
Evaluate the matches based on the accuracy | ||
:param matches: the matches to be evaluated | ||
:return: the accuracy | ||
""" | ||
# ignore matches where there is no possible match for the source description | ||
matches = matches[matches["ground_truth_target_description"].notnull()] | ||
# TODO: investigate this | ||
matches = matches[matches["target_concept_label"].notnull()] | ||
accuracy = matches["correct"].sum() / len(matches) | ||
return accuracy |
Oops, something went wrong.