From 31465a9a97ca6534a824f4fb82c6a3b4a90ec5b9 Mon Sep 17 00:00:00 2001 From: Martin Bernstorff Date: Sat, 23 Mar 2024 14:48:51 +0100 Subject: [PATCH] feat: move async handling to outer loop (#71) - [ ] I have considered whether this PR needs review, and requested a review if necessary. Fixes issue # # Notes for reviewers Reviewers can skip X, but should pay attention to Y. --- .ruff.toml | 1 - memorymarker/__main__.py | 18 ++++---- .../question_generator/baseline_pipeline.py | 2 +- .../highlight_to_question.py | 2 +- memorymarker/question_generator/main.py | 26 +++++++----- .../question_generator/pipeline_runner.py | 42 ++++++++++++++----- .../question_generator/reasoned_pipeline.py | 13 ++---- 7 files changed, 62 insertions(+), 42 deletions(-) diff --git a/.ruff.toml b/.ruff.toml index 386dddf..629929a 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -11,7 +11,6 @@ select = [ "COM", "D417", "E", - "ERA", "F", "I", "ICN", diff --git a/memorymarker/__main__.py b/memorymarker/__main__.py index 74e45c0..720a3a7 100644 --- a/memorymarker/__main__.py +++ b/memorymarker/__main__.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import os import time @@ -105,15 +106,18 @@ def typer_cli( typer.echo(f"Received {highlights.count()} new highlights") typer.echo("Generating questions from highlights...") - questions = BaselinePipeline( - _name="gpt-4-basic", - openai_api_key=os.getenv( - "OPENAI_API_KEY", "No OPENAI_API_KEY environment variable set" - ), - model="gpt-4-turbo-preview", - )(highlights) + questions = asyncio.run( + BaselinePipeline( + _name="gpt-4-basic", + openai_api_key=os.getenv( + "OPENAI_API_KEY", "No OPENAI_API_KEY environment variable set" + ), + model="gpt-4-turbo-preview", + )(highlights) + ) typer.echo("Writing questions to markdown...") + for question in questions: write_qa_prompt_to_md(save_dir=output_dir, highlight=question) diff --git a/memorymarker/question_generator/baseline_pipeline.py b/memorymarker/question_generator/baseline_pipeline.py index 1e066b7..d4be731 100644 --- a/memorymarker/question_generator/baseline_pipeline.py +++ b/memorymarker/question_generator/baseline_pipeline.py @@ -72,7 +72,7 @@ async def _highlights_to_qa( response = await asyncio.gather(*questions) return response - def __call__( + async def __call__( self, highlights: "Iter[ContextualizedHighlight]" ) -> "Iter[ReasonedHighlight]": response: Sequence[QAPromptResponseModel] = asyncio.run( diff --git a/memorymarker/question_generator/highlight_to_question.py b/memorymarker/question_generator/highlight_to_question.py index 48a80c7..5a34d93 100644 --- a/memorymarker/question_generator/highlight_to_question.py +++ b/memorymarker/question_generator/highlight_to_question.py @@ -10,7 +10,7 @@ class HighlightToQuestion(Protocol): - def __call__( + async def __call__( self, highlights: "Iter[ContextualizedHighlight]" ) -> "Iter[ReasonedHighlight]": ... diff --git a/memorymarker/question_generator/main.py b/memorymarker/question_generator/main.py index 96ec35e..8dceac2 100644 --- a/memorymarker/question_generator/main.py +++ b/memorymarker/question_generator/main.py @@ -1,3 +1,4 @@ +import asyncio import os from dataclasses import dataclass from typing import TYPE_CHECKING, Sequence @@ -8,7 +9,6 @@ ContextualizedHighlight, ) from memorymarker.document_providers.omnivore import Omnivore -from memorymarker.question_generator.baseline_pipeline import BaselinePipeline from memorymarker.question_generator.example_repo_airtable import ( AirtableExampleRepo, PipelineHighlightIdentity, @@ -70,7 +70,7 @@ def _select_highlights_from_omnivore( return selected_highlights -if __name__ == "__main__": +async def main(): repository = AirtableExampleRepo() selected_highlights = _select_highlights_from_omnivore( search_terms={ @@ -99,16 +99,20 @@ def _select_highlights_from_omnivore( "OPENAI_API_KEY", "No OPENAI_API_KEY environment variable set" ), model="gpt-4-turbo-preview", - ), - BaselinePipeline( - openai_api_key=os.getenv( - "OPENAI_API_KEY", "No OPENAI_API_KEY environment variable set" - ), - model="gpt-4-turbo-preview", - _name="gpt-4-basic", - ), + ) + # BaselinePipeline( + # openai_api_key=os.getenv( + # "OPENAI_API_KEY", "No OPENAI_API_KEY environment variable set" + # ), + # model="gpt-4-turbo-preview", + # _name="gpt-4-basic", + # ), ], ).filter(lambda pair: pair.__hash__() not in old_example_hashes) - new_responses = Iter(run_pipelines(new_highlights)).flatten() + new_responses = await run_pipelines(new_highlights) update_repository(new_responses, repository=repository) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/memorymarker/question_generator/pipeline_runner.py b/memorymarker/question_generator/pipeline_runner.py index b7ca546..bf7ebf6 100644 --- a/memorymarker/question_generator/pipeline_runner.py +++ b/memorymarker/question_generator/pipeline_runner.py @@ -1,25 +1,45 @@ -from typing import TYPE_CHECKING, Sequence +from typing import TYPE_CHECKING, Mapping, Sequence from iterpy.iter import Iter if TYPE_CHECKING: + from memorymarker.document_providers.contextualized_highlight import ( + ContextualizedHighlight, + ) + from memorymarker.question_generator.highlight_to_question import ( + HighlightToQuestion, + ) from memorymarker.question_generator.main import HighlightWithPipeline from memorymarker.question_generator.reasoned_highlight import ReasonedHighlight -def run_pipelines( +async def run_pipeline( + pipeline_name: str, + pipelinename2pipeline: Mapping[str, "HighlightToQuestion"], + highlights: Sequence["ContextualizedHighlight"], +) -> Iter["ReasonedHighlight"]: + pipeline = pipelinename2pipeline[pipeline_name] + prompts = pipeline(Iter(highlights)) + return await prompts + + +async def run_pipelines( pairs: "Iter[HighlightWithPipeline]", -) -> Sequence["ReasonedHighlight"]: +) -> Iter["ReasonedHighlight"]: pipelinename2pipeline = {pair.pipeline.name: pair.pipeline for pair in pairs} pipelines_with_highlights = pairs.groupby(lambda _: _.pipeline.name) - examples: Sequence[ReasonedHighlight] = [] - for pipeline_name, pair in pipelines_with_highlights: - print(f"Creating examples for {pipeline_name}") - highlights = [pair.highlight for pair in pair] - pipeline = pipelinename2pipeline[pipeline_name] - prompts = pipeline(Iter(highlights)) - examples.extend(prompts) + examples: Sequence["ReasonedHighlight"] = [] + for pipeline_name, pairs_instance in pipelines_with_highlights: + print(f"Running pipeline {pipeline_name}") + for pair in pairs_instance: + examples.extend( + await run_pipeline( + pipeline_name=pipeline_name, + pipelinename2pipeline=pipelinename2pipeline, + highlights=[pair.highlight], + ) + ) - return examples + return Iter(examples) diff --git a/memorymarker/question_generator/reasoned_pipeline.py b/memorymarker/question_generator/reasoned_pipeline.py index a4a0386..88af2c0 100644 --- a/memorymarker/question_generator/reasoned_pipeline.py +++ b/memorymarker/question_generator/reasoned_pipeline.py @@ -126,16 +126,9 @@ async def _highlight_to_qa( ) ) - async def _highlights_to_qa( - self, highlights: Iter["ContextualizedHighlight"] - ) -> Iter[ReasonedHighlight]: + async def __call__( + self, highlights: "Iter[ContextualizedHighlight]" + ) -> "Iter[ReasonedHighlight]": questions = [self._highlight_to_qa(highlight) for highlight in highlights] response = await asyncio.gather(*questions) return Iter(response).flatten() - - def __call__( - self, highlights: "Iter[ContextualizedHighlight]" - ) -> "Iter[ReasonedHighlight]": - response = asyncio.run(self._highlights_to_qa(highlights)) - - return response