Skip to content

Commit

Permalink
Merge pull request #51 from ai-cfia/49-dspy-pydantic-schema
Browse files Browse the repository at this point in the history
Issue #49 : Use the pydantic model type in the dspy Signature
  • Loading branch information
k-allagbe authored Oct 25, 2024
2 parents 227a297 + 88180c5 commit 5f757f7
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 101 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ __pycache__/
*$py.class

# tests
logs/
test_logs/
reports/

# VS Code
.vscode
Expand Down
2 changes: 1 addition & 1 deletion expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"manufacturer_website": "https://www.agrisupply.com",
"manufacturer_phone_number": "987-654-3210",
"fertiliser_name": "GreenGrow Fertilizer 20-20-20",
"registration_number": "FG123456",
"registration_number": "2018007A",
"lot_number": "LOT20240901",
"weight": [
{
Expand Down
114 changes: 65 additions & 49 deletions performance_assessment.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
import os
import time
import csv
import datetime
import json
import os
import shutil
import datetime
import csv
import tempfile
import time

from dotenv import load_dotenv
from pipeline import analyze, LabelStorage, OCR, GPT

from pipeline import GPT, OCR, LabelStorage, analyze
from tests import levenshtein_similarity

ACCURACY_THRESHOLD = 80.0


def extract_leaf_fields(
data: dict | list, parent_key: str = ''
data: dict | list, parent_key: str = ""
) -> dict[str, str | int | float | bool | None]:
leaves: dict[str, str | int | float | bool | None] = {}

Expand All @@ -39,33 +42,35 @@ def find_test_cases(labels_folder: str) -> list[tuple[list[str], str]]:
label_directories = sorted(
os.path.join(labels_folder, directory)
for directory in os.listdir(labels_folder)
if os.path.isdir(os.path.join(labels_folder, directory)) and directory.startswith("label_")
if os.path.isdir(os.path.join(labels_folder, directory))
and directory.startswith("label_")
)
if len(label_directories) == 0:
print(f"No label directories found in {labels_folder}")
raise FileNotFoundError(f"No label directories found in {labels_folder}")

for label_directory in label_directories:
files = os.listdir(label_directory)
image_paths = [
files = sorted(os.listdir(label_directory))
image_paths = sorted(
os.path.join(label_directory, file)
for file in files
if file.lower().endswith((".png", ".jpg"))
]
)
expected_json_path = os.path.join(label_directory, "expected_output.json")

if not image_paths:
raise FileNotFoundError(f"No image files found in {label_directory}")
if not os.path.exists(expected_json_path):
raise FileNotFoundError(f"Expected output JSON not found in {label_directory}")
raise FileNotFoundError(
f"Expected output JSON not found in {label_directory}"
)
test_cases.append((image_paths, expected_json_path))

return test_cases


def calculate_accuracy(
expected_fields: dict[str, str],
actual_fields: dict[str, str]
expected_fields: dict[str, str], actual_fields: dict[str, str]
) -> dict[str, dict[str, str | float]]:
accuracy_results = {}
for field_name, expected_value in expected_fields.items():
Expand All @@ -76,10 +81,10 @@ def calculate_accuracy(
score = levenshtein_similarity(str(expected_value), str(actual_value))
pass_fail = "Pass" if score >= ACCURACY_THRESHOLD else "Fail"
accuracy_results[field_name] = {
'score': score,
'expected_value': expected_value,
'actual_value': actual_value,
'pass_fail': pass_fail,
"score": score,
"expected_value": expected_value,
"actual_value": actual_value,
"pass_fail": pass_fail,
}
return accuracy_results

Expand All @@ -90,7 +95,9 @@ def run_test_case(
# Copy images to temporary files to prevent deletion due to LabelStorage behavior
copied_image_paths = []
for image_path in image_paths:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(image_path)[1])
temp_file = tempfile.NamedTemporaryFile(
delete=False, suffix=os.path.splitext(image_path)[1]
)
shutil.copy2(image_path, temp_file.name)
copied_image_paths.append(temp_file.name)

Expand All @@ -109,15 +116,17 @@ def run_test_case(
# Run performance test
print("\tRunning analysis for test case...")
start_time = time.time()
actual_output = analyze(storage, ocr, gpt) # <-- the `analyse` function deletes the images it processes so we don't need to clean up our image copies
actual_output = analyze(
storage, ocr, gpt
) # <-- the `analyse` function deletes the images it processes so we don't need to clean up our image copies
performance = time.time() - start_time
print(f"\tAnalysis completed in {performance:.2f} seconds.")

# Process actual output
actual_fields = extract_leaf_fields(json.loads(actual_output.model_dump_json()))

# Load expected output
with open(expected_json_path, 'r') as file:
with open(expected_json_path, "r") as file:
expected_fields = extract_leaf_fields(json.load(file))

# Calculate accuracy
Expand All @@ -126,9 +135,9 @@ def run_test_case(

# Return results
return {
'test_case_number': test_case_number,
'performance': performance,
'accuracy_results': accuracy_results,
"test_case_number": test_case_number,
"performance": performance,
"accuracy_results": accuracy_results,
}


Expand All @@ -137,39 +146,43 @@ def generate_csv_report(results: list[dict[str, any]]) -> None:
os.makedirs("reports", exist_ok=True)
report_path = os.path.join("reports", f"test_results_{timestamp}.csv")

with open(report_path, mode='w', newline='') as file:
with open(report_path, mode="w", newline="") as file:
writer = csv.writer(file)
writer.writerow([
"Test Case",
"Field Name",
"Pass/Fail",
"Accuracy Score",
"Pipeline Speed (seconds)",
"Expected Value",
"Actual Value",
])
writer.writerow(
[
"Test Case",
"Field Name",
"Pass/Fail",
"Accuracy Score",
"Pipeline Speed (seconds)",
"Expected Value",
"Actual Value",
]
)

for result in results:
test_case_number = result['test_case_number']
performance = result['performance']
for field_name, data in result['accuracy_results'].items():
writer.writerow([
test_case_number,
field_name,
data['pass_fail'],
f"{data['score']:.2f}",
f"{performance:.4f}",
data['expected_value'],
data['actual_value'],
])
test_case_number = result["test_case_number"]
performance = result["performance"]
for field_name, data in result["accuracy_results"].items():
writer.writerow(
[
test_case_number,
field_name,
data["pass_fail"],
f"{data['score']:.2f}",
f"{performance:.4f}",
data["expected_value"],
data["actual_value"],
]
)
print(f"CSV report generated and saved to: {report_path}")


def main() -> None:
print("Script execution started.")

load_dotenv()

# Validate required environment variables
required_vars = [
"AZURE_API_ENDPOINT",
Expand All @@ -180,7 +193,9 @@ def main() -> None:
]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
raise RuntimeError(f"Missing required environment variables: {', '.join(missing_vars)}")
raise RuntimeError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)

test_cases = find_test_cases("test_data/labels")
print(f"Found {len(test_cases)} test case(s) to process.")
Expand All @@ -193,10 +208,11 @@ def main() -> None:
results.append(result)
except Exception as e:
print(f"Error processing test case {idx}: {e}")
continue # I'd rather continue processing the other test cases than stop the script for now
continue # I'd rather continue processing the other test cases than stop the script for now

generate_csv_report(results)
print("Script execution completed.")


if __name__ == "__main__":
main()
14 changes: 5 additions & 9 deletions pipeline/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from .gpt import GPT # noqa: F401

import os
import json
from datetime import datetime

def save_text_to_file(text: str, output_path: str): # pragma: no cover
Expand Down Expand Up @@ -40,15 +39,12 @@ def analyze(label_storage: LabelStorage, ocr: OCR, gpt: GPT, log_dir_path: str =
# Generate inspection from extracted text
prediction = gpt.create_inspection(result.content)

# Logs the results from GPT
save_text_to_file(prediction.inspection, f"{log_dir_path}/{now}.json")
save_text_to_file(prediction.rationale, f"{log_dir_path}/{now}.txt")

# Load a JSON from the text
raw_json = json.loads(prediction.inspection)

# Check the coninspectionity of the JSON
inspection = FertilizerInspection(**raw_json)
inspection = prediction.inspection

# Logs the results from GPT
save_text_to_file(prediction.reasoning, f"{log_dir_path}/{now}.txt")
save_text_to_file(inspection.model_dump_json(indent=2), f"{log_dir_path}/{now}.json")

# Clear the label cache
label_storage.clear()
Expand Down
67 changes: 40 additions & 27 deletions pipeline/gpt.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
import dspy
from dspy import Prediction
import dspy.adapters
import dspy.utils
from dspy import Prediction
from openinference.instrumentation.dspy import DSPyInstrumentor
from phoenix.otel import register

from pipeline.inspection import FertilizerInspection

SUPPORTED_MODELS = {
"gpt-3.5-turbo": {
"max_token": 12000,
"max_tokens": 12000,
"api_version": "2024-02-01",
"response_format": { "type": "json_object" },
"response_format": {"type": "json_object"},
},
"gpt-4o": {
"max_token": None,
"max_tokens": None,
"api_version": "2024-02-15-preview",
"response_format": { "type": "json_object" },
}
}
"response_format": {"type": "json_object"},
},
}

REQUIREMENTS = """
The content of keys with the suffix _en must be in English.
Expand All @@ -26,42 +28,53 @@
The JSON must contain exclusively keys specified in "keys".
"""


class ProduceLabelForm(dspy.Signature):
"""
You are a fertilizer label inspector working for the Canadian Food Inspection Agency.
You are a fertilizer label inspector working for the Canadian Food Inspection Agency.
Your task is to classify all information present in the provided text using the specified keys.
Your response should be accurate, intelligible, information in JSON, and contain all the text from the provided text.
"""

text = dspy.InputField(desc="The text of the fertilizer label extracted using OCR.")
json_schema = dspy.InputField(desc="The JSON schema of the object to be returned.")
requirements = dspy.InputField(desc="The instructions and guidelines to follow.")
inspection = dspy.OutputField(desc="Only a complete JSON.")

text: str = dspy.InputField(
desc="The text of the fertilizer label extracted using OCR."
)
requirements: str = dspy.InputField(
desc="The instructions and guidelines to follow."
)
inspection: FertilizerInspection = dspy.OutputField(desc="The inspection results.")


class GPT:
def __init__(self, api_endpoint, api_key, deployment_id):
def __init__(self, api_endpoint, api_key, deployment_id, phoenix_endpoint=None):
if not api_endpoint or not api_key or not deployment_id:
raise ValueError("The API endpoint, key and deployment_id are required to instantiate the GPT class.")
raise ValueError(
"The API endpoint, key and deployment_id are required to instantiate the GPT class."
)

config = SUPPORTED_MODELS.get(deployment_id)
if not config:
raise ValueError(f"The deployment_id {deployment_id} is not supported.")

self.dspy_client = dspy.AzureOpenAI(
user="fertiscan",

if phoenix_endpoint is not None:
tracer_provider = register(
project_name="gpt-fertiscan", # Default is 'default'
endpoint=phoenix_endpoint, # gRPC endpoint given by Phoenix when starting the server (default is "http://localhost:4317")
)

DSPyInstrumentor().instrument(tracer_provider=tracer_provider)

self.lm = dspy.LM(
model=f"azure/{deployment_id}",
api_base=api_endpoint,
api_key=api_key,
deployment_id=deployment_id,
# model_type='text',
api_version=config.get("api_version"),
max_tokens=config.get("max_token"),
response_format=config.get("response_format"),
max_tokens=config["max_tokens"],
api_version=config["api_version"],
)

def create_inspection(self, text) -> Prediction:
with dspy.context(lm=self.dspy_client, experimental=True):
json_schema = FertilizerInspection.model_json_schema()
signature = dspy.ChainOfThought(ProduceLabelForm)
prediction = signature(text=text, json_schema=json_schema, requirements=REQUIREMENTS)
with dspy.context(lm=self.lm, experimental=True):
predictor = dspy.TypedChainOfThought(ProduceLabelForm)
prediction = predictor(text=text, requirements=REQUIREMENTS)

return prediction
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "fertiscan_pipeline"
version = "0.0.3"
version = "0.0.4"
description = "A pipeline for the FertiScan project"
authors = [
{ name = "Albert Bryan Ndjeutcha", email = "[email protected]" }
Expand Down
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
azure-ai-documentintelligence==1.0.0b3
dspy-ai
dspy-ai==2.5.16
openai>=1.0
pydantic>=2.7.1
arize-phoenix-otel
openinference-instrumentation-dspy
python-dotenv
reportlab
setuptools
Expand Down
Loading

0 comments on commit 5f757f7

Please sign in to comment.