From 1b695000add1acde7a35815ed9a12fc4b8b1b97b Mon Sep 17 00:00:00 2001 From: nik Date: Fri, 10 Nov 2023 00:36:26 +0000 Subject: [PATCH] Add guidance,langchain,openai runtimes; multi-I/O skills --- README.md | 29 ++- adala/agents/base.py | 116 +++++------ adala/environments/__init__.py | 4 +- adala/environments/base.py | 222 +++++++++------------- adala/environments/console.py | 7 +- adala/environments/servers/discord_bot.py | 10 +- adala/environments/web.py | 47 +++-- adala/runtimes/__init__.py | 2 +- adala/runtimes/_guidance.py | 113 +++++++++++ adala/runtimes/_langchain.py | 68 +++++++ adala/runtimes/_openai.py | 205 ++++++++++++++++++++ adala/runtimes/base.py | 100 ++++++++++ adala/runtimes/openai.py | 74 -------- adala/skills/__init__.py | 1 + adala/skills/_base.py | 168 ++++++++++++++++ adala/skills/collection/__init__.py | 0 adala/skills/collection/analyze_errors.py | 65 +++++++ adala/skills/collection/classification.py | 9 + adala/skills/collection/improve_llm.py | 24 +++ adala/skills/skillset.py | 103 +++++----- adala/utils/internal_data.py | 3 +- adala/utils/parse.py | 73 +++++++ tests/test_classification.py | 10 +- tests/test_openai_runtime.py | 2 +- tests/utils.py | 7 + 25 files changed, 1095 insertions(+), 367 deletions(-) create mode 100644 adala/runtimes/_guidance.py create mode 100644 adala/runtimes/_langchain.py create mode 100644 adala/runtimes/_openai.py delete mode 100644 adala/runtimes/openai.py create mode 100644 adala/skills/_base.py create mode 100644 adala/skills/collection/__init__.py create mode 100644 adala/skills/collection/analyze_errors.py create mode 100644 adala/skills/collection/classification.py create mode 100644 adala/skills/collection/improve_llm.py create mode 100644 adala/utils/parse.py diff --git a/README.md b/README.md index 3c773d4..129c608 100644 --- a/README.md +++ b/README.md @@ -104,52 +104,45 @@ Click [here](./examples/quickstart.ipynb) to see an extended quickstart example. import pandas as pd from adala.agents import Agent -from adala.datasets import DataFrameDataset -from adala.environments import BasicEnvironment +from adala.environments import StaticEnvironment from adala.skills import ClassificationSkill -from adala.runtimes import OpenAIRuntime +from adala.runtimes import OpenAIChatRuntime from rich import print # Train dataset -ground_truth_df = pd.DataFrame([ +train_df = pd.DataFrame([ ["It was the negative first impressions, and then it started working.", "Positive"], ["Not loud enough and doesn't turn on like it should.", "Negative"], ["I don't know what to say.", "Neutral"], ["Manager was rude, but the most important that mic shows very flat frequency response.", "Positive"], ["The phone doesn't seem to accept anything except CBR mp3s.", "Negative"], ["I tried it before, I bought this device for my son.", "Neutral"], -], columns=["text", "ground_truth"]) +], columns=["text", "sentiment"]) # Test dataset -predict_df = pd.DataFrame([ +test_df = pd.DataFrame([ "All three broke within two months of use.", "The device worked for a long time, can't say anything bad.", "Just a random line of text." ], columns=["text"]) -ground_truth_dataset = DataFrameDataset(df=ground_truth_df) -predict_dataset = DataFrameDataset(df=predict_df) - agent = Agent( # connect to a dataset - environment=BasicEnvironment( - ground_truth_dataset=ground_truth_dataset, - ground_truth_columns={"sentiment_classification": "ground_truth"} - ), + environment=StaticEnvironment(df=train_df), # define a skill skills=ClassificationSkill( - name='sentiment_classification', + name='sentiment', instructions="Label text as positive, negative or neutral.", labels=["Positive", "Negative", "Neutral"], - input_data_field='text' + input_template="Text: {text}", + output_template="Sentiment: {sentiment}" ), # define all the different runtimes your skills may use runtimes = { # You can specify your OPENAI API KEY here via `OpenAIRuntime(..., api_key='your-api-key')` - 'openai': OpenAIRuntime(model='gpt-3.5-turbo-instruct'), - 'openai-gpt3': OpenAIRuntime(model='gpt-3.5-turbo') + 'openai': OpenAIChatRuntime(model='gpt-3.5-turbo'), }, default_runtime='openai', @@ -166,7 +159,7 @@ print(agent.skills) agent.learn(learning_iterations=3, accuracy_threshold=0.95) print('\n=> Run tests ...') -predictions = agent.run(predict_dataset) +predictions = agent.run(test_df) print('\n => Test results:') print(predictions) ``` diff --git a/adala/agents/base.py b/adala/agents/base.py index 03e96a7..cc8f414 100644 --- a/adala/agents/base.py +++ b/adala/agents/base.py @@ -3,15 +3,19 @@ from typing import Any, Optional, List, Dict, Union, Tuple from rich import print -from adala.environments.base import Environment, BasicEnvironment, GroundTruthSignal +from adala.environments.base import Environment, StaticEnvironment, GroundTruthSignal from adala.datasets import Dataset, DataFrameDataset from adala.runtimes.base import Runtime, LLMRuntime, LLMRuntimeType, LLMRuntimeModelType -from adala.runtimes.openai import OpenAIRuntime +# from adala.runtimes.openai import OpenAIRuntime +from adala.runtimes._openai import OpenAIChatRuntime +from adala.skills._base import Skill from adala.memories.base import Memory from adala.skills.base import BaseSkill from adala.skills.skillset import SkillSet, LinearSkillSet from adala.utils.logs import print_dataframe, print_text, print_error from adala.utils.internal_data import InternalDataFrame, InternalDataFrameConcat +from adala.skills.collection.analyze_errors import AnalyzeLLMPromptErrorsExactMatch +from adala.skills.collection.improve_llm import ImproveLLMInstructions class Agent(BaseModel, ABC): @@ -35,7 +39,7 @@ class Agent(BaseModel, ABC): memory: Memory = Field(default=None) runtimes: Optional[Dict[str, Runtime]] = Field( default_factory=lambda: { - 'openai': OpenAIRuntime(model='gpt-3.5-turbo-instruct'), + 'openai': OpenAIChatRuntime(model='gpt-3.5-turbo'), # 'llama2': LLMRuntime( # llm_runtime_type=LLMRuntimeModelType.Transformers, # llm_params={ @@ -47,7 +51,7 @@ class Agent(BaseModel, ABC): ) teacher_runtimes: Optional[Dict[str, Runtime]] = Field( default_factory=lambda: { - 'openai-gpt3': OpenAIRuntime(model='gpt-3.5-turbo'), + 'openai-gpt3': OpenAIChatRuntime(model='gpt-3.5-turbo'), # 'openai-gpt4': OpenAIRuntime(model='gpt-4') } ) @@ -89,9 +93,7 @@ def environment_validator(cls, v) -> Environment: Environment: The validated environment. """ if isinstance(v, InternalDataFrame): - v = DataFrameDataset(df=v) - if isinstance(v, Dataset): - v = BasicEnvironment(dataset=v) + v = StaticEnvironment(df=v) return v @field_validator('skills', mode='before') @@ -105,13 +107,12 @@ def skills_validator(cls, v) -> SkillSet: Returns: SkillSet: The validated set of skills. """ - if isinstance(v, SkillSet): return v - elif isinstance(v, BaseSkill): - return LinearSkillSet(skills={v.name: v}) + elif isinstance(v, Skill): + return LinearSkillSet(skills=[v]) else: - return LinearSkillSet(skills=v) + raise ValueError(f"skills must be of type SkillSet or Skill, not {type(v)}") @model_validator(mode='after') def verify_input_parameters(self): @@ -172,23 +173,23 @@ def get_teacher_runtime(self, runtime: Optional[str] = None) -> Runtime: def run( self, - dataset: Optional[Union[Dataset, InternalDataFrame]] = None, + input: InternalDataFrame = None, runtime: Optional[str] = None ) -> InternalDataFrame: """ Runs the agent on the specified dataset. Args: - dataset (Union[Dataset, InternalDataFrame]): The dataset to run the agent on. + input (InternalDataFrame): The dataset to run the agent on. runtime (str, optional): The name of the runtime to use. Defaults to None, use the default runtime. Returns: InternalDataFrame: The dataset with the agent's predictions. """ - if dataset is None: - dataset = self.environment.as_dataset() + if input is None: + input = self.environment.get_data_batch() runtime = self.get_runtime(runtime=runtime) - predictions = self.skills.apply(dataset, runtime=runtime) + predictions = self.skills.apply(input, runtime=runtime) return predictions def learn( @@ -197,8 +198,8 @@ def learn( accuracy_threshold: float = 0.9, update_memory: bool = True, request_environment_feedback: bool = True, - wait_for_environment_feedback: Optional[float] = None, - num_predictions_feedback: Optional[int] = None, + wait_for_feedback: Optional[float] = True, + num_feedbacks: Optional[int] = None, runtime: Optional[str] = None, teacher_runtime: Optional[str] = None, ) -> GroundTruthSignal: @@ -210,8 +211,8 @@ def learn( accuracy_threshold (float, optional): The desired accuracy threshold to reach. Defaults to 0.9. update_memory (bool, optional): Flag to determine if memory should be updated after learning. Defaults to True. request_environment_feedback (bool, optional): Flag to determine if feedback should be requested from the environment. Defaults to True. - wait_for_environment_feedback (float, optional): The timeout in seconds to wait for environment feedback. Defaults to None. - num_predictions_feedback (int, optional): The number of predictions to request feedback for. Defaults to None. + wait_for_feedback (float, optional): The timeout in seconds to wait for environment feedback. Defaults to None. + num_feedbacks (int, optional): The number of predictions to request feedback for. Defaults to None. runtime (str, optional): The runtime to be used for the learning process. Defaults to None. teacher_runtime (str, optional): The teacher runtime to be used for the learning process. Defaults to None. Returns: @@ -221,10 +222,10 @@ def learn( runtime = self.get_runtime(runtime=runtime) teacher_runtime = self.get_teacher_runtime(runtime=teacher_runtime) - dataset = self.environment.as_dataset() + data_batch = self.environment.get_data_batch() # Apply agent skills to dataset and get experience with predictions - predictions = self.skills.apply(dataset, runtime=runtime) + predictions = self.skills.apply(data_batch, runtime=runtime) ground_truth_signal = None @@ -233,54 +234,57 @@ def learn( # Request feedback from environment is necessary if request_environment_feedback: - if num_predictions_feedback is not None: - # predictions_for_feedback = predictions.sample(num_predictions_feedback) - predictions_for_feedback = predictions.head(num_predictions_feedback) - else: - predictions_for_feedback = predictions - self.environment.request_feedback(self.skills, predictions_for_feedback) + self.environment.request_feedback(self.skills, predictions, num_feedbacks, wait_for_feedback) # Compare predictions to ground truth -> get ground truth signal - ground_truth_signal = self.environment.compare_to_ground_truth( - self.skills, - predictions, - wait=wait_for_environment_feedback - ) + ground_truth_signal = self.environment.compare_to_ground_truth(self.skills, predictions) print_text(f'Comparing predictions to ground truth data ...') print_dataframe(InternalDataFrameConcat([predictions, ground_truth_signal.match], axis=1)) # Use ground truth signal to find the skill to improve accuracy = ground_truth_signal.get_accuracy() - train_skill = self.skills.select_skill_to_improve(accuracy, accuracy_threshold) - if not train_skill: + train_skill_name, train_skill_output = '', '' + for skill_output, skill_name in self.skills.get_skill_outputs().items(): + if accuracy[skill_output] < accuracy_threshold: + train_skill_name, train_skill_output = skill_name, skill_output + break + + if not train_skill_name: print_text(f'No skill to improve found. Stopping learning process.') break + + train_skill = self.skills[train_skill_name] # select the worst performing skill - print_text(f'Accuracy = {accuracy[train_skill.name] * 100:0.2f}%', style='bold red') + print_text(f'Output to improve: "{train_skill_output}" (Skill="{train_skill_name}")\n' + f'Accuracy = {accuracy[train_skill_output] * 100:0.2f}%', style='bold red') - skill_errors = ground_truth_signal.get_errors(train_skill.name) + skill_errors = ground_truth_signal.get_errors(train_skill_output).rename('_ground_truth') + skill_errors = InternalDataFrameConcat((skill_errors, predictions), axis=1, join='inner') + print(f'Errors for skill "{train_skill_name}":') + print_dataframe(skill_errors) # 2. ANALYSIS PHASE: Analyze evaluation experience, optionally use long term memory - print_text(f'Analyze evaluation experience ...') - error_analysis = train_skill.analyze( - predictions=predictions, - errors=skill_errors, - student_runtime=runtime, - teacher_runtime=teacher_runtime, - memory=self.memory - ) - print_text(f'Error analysis for skill "{train_skill.name}":\n') - print_text(error_analysis, style='green') - if self.memory and update_memory: - self.memory.remember(error_analysis, self.skills) - - # 3. IMPROVEMENT PHASE: Improve skills based on analysis - print_text(f"Improve \"{train_skill.name}\" skill based on analysis ...") - train_skill.improve( - error_analysis=error_analysis, - runtime=teacher_runtime, - ) + + teacher = LinearSkillSet(skills=[ + AnalyzeLLMPromptErrorsExactMatch( + name='analyze', + input_template=train_skill.input_template, + output_template='{error_report}', + initial_llm_instructions=train_skill.instructions, + prediction_column=train_skill_output, + ground_truth_column='_ground_truth', + field_schema=train_skill.field_schema, + ), + ImproveLLMInstructions( + name='improve', + old_instructions=train_skill.instructions, + field_schema=train_skill.field_schema, + ) + ]) + + result = teacher.apply(skill_errors, runtime=teacher_runtime) + train_skill.instructions = result['new_instructions'] print_text(f'Updated instructions for skill "{train_skill.name}":\n') print_text(train_skill.instructions, style='bold green') diff --git a/adala/environments/__init__.py b/adala/environments/__init__.py index 6009d4a..be2b7f8 100644 --- a/adala/environments/__init__.py +++ b/adala/environments/__init__.py @@ -1,3 +1,3 @@ -from .base import Environment, BasicEnvironment +from .base import Environment, StaticEnvironment from .console import ConsoleEnvironment -from .web import WebEnvironment \ No newline at end of file +from .web import WebStaticEnvironment \ No newline at end of file diff --git a/adala/environments/base.py b/adala/environments/base.py index cd3029d..7163240 100644 --- a/adala/environments/base.py +++ b/adala/environments/base.py @@ -1,6 +1,7 @@ from pydantic import BaseModel, Field, field_validator from abc import ABC, abstractmethod from typing import Any, Optional, Dict, Union, Callable, Dict +from collections import defaultdict from adala.utils.internal_data import InternalDataFrame, InternalSeries, InternalDataFrameConcat from adala.utils.matching import fuzzy_match @@ -43,7 +44,7 @@ class GroundTruthSignal(BaseModel): """ match: InternalDataFrame = Field(default_factory=InternalDataFrame) - errors: Optional[Dict[str, InternalDataFrame]] = None + errors: Optional[Dict[str, InternalSeries]] = None def get_accuracy(self) -> InternalSeries: """ @@ -56,10 +57,9 @@ def get_accuracy(self) -> InternalSeries: """ - return self.match.mean() - def get_errors(self, skill_name: str) -> InternalDataFrame: + def get_errors(self, skill_output: str) -> InternalSeries: """ Retrieve the errors associated with a particular skill. @@ -67,16 +67,13 @@ def get_errors(self, skill_name: str) -> InternalDataFrame: skill_name (str): The name of the skill to retrieve errors for. Returns: - InternalDataFrame: A DataFrame with two columns ["predictions", "ground_truth name"] - representing the errors. - + InternalSeries: A series representing the errors of predictions for the given skill. + Index is prediction index, value is ground truth. Raises: AssertionError: If the error DataFrame does not have exactly two columns. """ - errors = self.errors[skill_name] - assert len(errors.columns) == 2 # ["predictions", "ground_truth name"] - return errors + return self.errors[skill_output] def __rich__(self): text = '[bold blue]Ground Truth Signal:[/bold blue]\n\n' @@ -98,64 +95,78 @@ class Environment(BaseModel, ABC): Subclasses should implement methods to handle feedback requests, comparison to ground truth, dataset conversion, and state persistence. """ - data_stream: Optional[Dataset] = None ground_truth_columns: Optional[Dict[str, str]] = None matching_function: str = 'exact' matching_threshold: float = 0.8 - @field_validator('data_stream', mode='before') - def _validate_data_stream(cls, v): + @abstractmethod + def get_data_batch(self) -> InternalDataFrame: """ - Validate the data stream field to ensure it is converted to DataFrameDataset if needed. - - Args: - v: The value to validate. + Get a batch of data from data stream to be processed by the skill set. Returns: - The validated value, possibly converted to DataFrameDataset. + InternalDataFrame: The data batch. + """ - Raises: - ValidationError: If the validation fails. + @abstractmethod + def request_feedback( + self, + skills: SkillSet, + predictions: InternalDataFrame, + num_feedbacks: Optional[int] = None, + wait_for_feedback: Optional[bool] = False + ): """ + Abstract method to request user feedback on the predictions made by the model. - if isinstance(v, InternalDataFrame): - return DataFrameDataset(df=v) - return v + Args: + skills (SkillSet): The set of skills/models whose predictions are being evaluated. + predictions (InternalDataFrame): The predictions made by the skills. + num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions + wait_for_feedback (Optional[bool], optional): Whether to wait for feedback to be available. Defaults to False. + """ - def get_ground_truth_dataset(self, wait: Optional[float] = None) -> InternalDataFrame: + @abstractmethod + def get_ground_truth(self, predictions: InternalDataFrame) -> InternalDataFrame: """ - Get the ground truth dataset. + Get ground truth data for the predictions. Args: - wait (Optional[float], optional): The timeout to wait for ground truth data to be available. Defaults to None. + predictions (InternalDataFrame): The predictions to compare with the ground truth. Returns: - InternalDataFrame: The ground truth dataset. + InternalDataFrame: The ground truth data for the predictions. """ - + + @abstractmethod + def save(self): + """ + Save the current state of the BasicEnvironment. + + Raises: + NotImplementedError: This method is not implemented for BasicEnvironment. + """ + @abstractmethod - def request_feedback(self, skill_set: SkillSet, predictions: InternalDataFrame): + def restore(self): """ - Abstract method to request user feedback on the predictions made by the model. + Restore the state of the BasicEnvironment. - Args: - skill_set (SkillSet): The set of skills/models whose predictions are being evaluated. - predictions (InternalDataFrame): The predictions made by the skills/models. + Raises: + NotImplementedError: This method is not implemented for BasicEnvironment. """ def compare_to_ground_truth( self, - skill_set: SkillSet, + skills: SkillSet, predictions: InternalDataFrame, - wait: Optional[float] = None, - ) -> Optional[GroundTruthSignal]: + ) -> GroundTruthSignal: """ Compare the predictions with the ground truth using the specified matching function. Args: - skill_set (SkillSet): The skill set being evaluated. + skills (SkillSet): The skill set being evaluated. predictions (InternalDataFrame): The predictions to compare with the ground truth. - wait (Optional[float], optional): The timeout to wait for ground truth data to be available. Defaults to None. Returns: GroundTruthSignal: The resulting ground truth signal, with matches and errors detailed. @@ -164,19 +175,21 @@ def compare_to_ground_truth( NotImplementedError: If the matching_function is unknown. """ - ground_truth_match = InternalDataFrame() errors = {} - ground_truth_dataset = self.get_ground_truth_dataset(wait=wait) + ground_truth_dataset = self.get_ground_truth(predictions=predictions) if ground_truth_dataset.empty: - return + raise ValueError('Ground truth dataset is empty. Run `request_feedback()` first.') + + pred_columns = list(skills.get_skill_outputs()) - for skill_id, skill in skill_set.skills.items(): - if not self.ground_truth_columns or skill.name not in self.ground_truth_columns: - gt_column = skill.name + skill_match = {} + for pred_column in pred_columns: + if not self.ground_truth_columns: + gt_column = pred_column else: - gt_column = self.ground_truth_columns[skill.name] + gt_column = self.ground_truth_columns[pred_column] gt = ground_truth_dataset[gt_column] - pred = predictions[skill.name] + pred = predictions[pred_column] # from ground truth dataset, select only the rows that are in the predictions gt, pred = gt.align(pred) nonnull_index = gt.notnull() & pred.notnull() @@ -196,113 +209,64 @@ def compare_to_ground_truth( error_index = gt_pred_match[~gt_pred_match].index # concatenate errors - dataframe with two columns: predictions and ground truth - errors[skill.name] = InternalDataFrameConcat([pred[error_index], gt[error_index]], axis=1) - errors[skill.name].columns = ["predictions", gt_column] + errors[pred_column] = gt[error_index] # concatenate matching columns - ground_truth_match = InternalDataFrameConcat([ - # previous skills' ground truth matches - ground_truth_match, - # current skill's ground truth match - gt_pred_match.rename(skill.name), - ], axis=1) - - return GroundTruthSignal( - match=ground_truth_match.reindex(predictions.index), - errors=errors - ) - - def as_dataset(self) -> Dataset: - """ - Abstract method to convert the environment's state into a dataset. - - Returns: - Dataset: A dataset representing the environment's state. - """ - return self.data_stream + skill_match[pred_column] = gt_pred_match + match = InternalDataFrame(skill_match).reindex(predictions.index) - def save(self): - """ - Save the current state of the BasicEnvironment. - - Raises: - NotImplementedError: This method is not implemented for BasicEnvironment. - """ - - raise NotImplementedError - - def restore(self): - """ - Restore the state of the BasicEnvironment. - - Raises: - NotImplementedError: This method is not implemented for BasicEnvironment. - """ - - raise NotImplementedError + return GroundTruthSignal(match=match, errors=errors) class Config: arbitrary_types_allowed = True -class BasicEnvironment(Environment): +class StaticEnvironment(Environment): """ - A concrete implementation of the Environment abstract base class, - assuming the ground truth is provided and comparison is based on exact or fuzzy matching. - - Attributes: - ground_truth_dataset (Union[InternalDataFrame, DataFrameDataset]): Dataset containing - the ground truth data, defaulting to an empty DataFrameDataset. - ground_truth_columns (Dict[str, str]): A dictionary mapping skill names to their corresponding - ground truth columns in the dataset. - matching_function (str): The name of the matching function to use, defaults to 'exact'. - matching_threshold (float): The threshold for fuzzy matching, defaults to 0.8. + Static environment that initializes everything from the dataframe + and doesn't not require requesting feedback to create the ground truth. """ - ground_truth_dataset: DataFrameDataset = None + df: InternalDataFrame = None - @field_validator('ground_truth_dataset', mode='before') - def _validate_ground_truth_dataset(cls, v): + def request_feedback( + self, skills: SkillSet, + predictions: InternalDataFrame, + num_feedbacks: Optional[int] = None, + wait_for_feedback: Optional[float] = False + ): """ - Validate the ground_truth_dataset field to ensure it is converted to DataFrameDataset if needed. + In the StaticEnvironment, this method is a placeholder as ground truth is already provided with the input data. Args: - v: The value to validate. - - Returns: - The validated value, possibly converted to DataFrameDataset. - - Raises: - ValidationError: If the validation fails. + skills (SkillSet): The set of skills/models whose predictions are being evaluated. + predictions (InternalDataFrame): The predictions made by the skills. + num_feedbacks (Optional[int], optional): The number of feedbacks to request. Defaults to all predictions. + wait_for_feedback (Optional[float], optional): If True, wait for feedback to be available. Defaults to False. """ - - if isinstance(v, InternalDataFrame): - return DataFrameDataset(df=v) - return v + pass - def request_feedback(self, skills: SkillSet, predictions: InternalDataFrame): + def get_ground_truth(self, predictions: InternalDataFrame) -> InternalDataFrame: """ - In the BasicEnvironment, this method is a placeholder as ground truth is already provided with the input data. - - Args: - skill (BaseSkill): The skill being evaluated. - predictions (InternalDataFrame): The predictions to be reviewed. + Get the ground truth dataset. """ + return self.df - def get_ground_truth_dataset(self, wait: Optional[float] = None) -> InternalDataFrame: + def get_data_batch(self) -> InternalDataFrame: """ - Get the ground truth dataset. + Return the dataset containing the ground truth data. Returns: - InternalDataFrame: The ground truth dataset. + Dataset: The ground truth dataset as a DataFrameDataset. """ - return self.ground_truth_dataset.df + return self.df - def as_dataset(self) -> Dataset: + def save(self): """ - Return the dataset containing the ground truth data. + Save the current state of the StaticEnvironment. + """ + raise NotImplementedError('StaticEnvironment does not support save/restore.') - Returns: - Dataset: The ground truth dataset as a DataFrameDataset. + def restore(self): + """ + Restore the state of the StaticEnvironment. """ - if self.ground_truth_dataset is not None: - return self.ground_truth_dataset - return super(BasicEnvironment, self).as_dataset() + raise NotImplementedError('StaticEnvironment does not support save/restore.') diff --git a/adala/environments/console.py b/adala/environments/console.py index a7dd55a..ddfa282 100644 --- a/adala/environments/console.py +++ b/adala/environments/console.py @@ -1,13 +1,13 @@ from rich import print from rich.prompt import Prompt -from .base import BasicEnvironment +from .base import StaticEnvironment from adala.skills import SkillSet from adala.utils.internal_data import InternalDataFrame from adala.utils.logs import print_series from adala.datasets import Dataset, DataFrameDataset -class ConsoleEnvironment(BasicEnvironment): +class ConsoleEnvironment(StaticEnvironment): def request_feedback(self, skill_set: SkillSet, predictions: InternalDataFrame): @@ -24,5 +24,4 @@ def request_feedback(self, skill_set: SkillSet, predictions: InternalDataFrame): pred_row[skill.name] = ground_truth ground_truth_dataset.append(pred_row) - self.ground_truth_dataset = DataFrameDataset( - df=InternalDataFrame(ground_truth_dataset)) + self.df = InternalDataFrame(ground_truth_dataset) diff --git a/adala/environments/servers/discord_bot.py b/adala/environments/servers/discord_bot.py index 09660e0..5756c25 100644 --- a/adala/environments/servers/discord_bot.py +++ b/adala/environments/servers/discord_bot.py @@ -141,13 +141,13 @@ async def request_feedback( if not channel: raise Exception(f'Channel with id {CHANNEL_ID} not found') ground_truths = [] - skill_names = [skill['name'] for skill in skills] - for skill in skill_names: + skill_outputs = sum([skill['outputs'] for skill in skills], []) + for skill_output in skill_outputs: for prediction in predictions: text = '========================\n' - text += '\n'.join(f'**{k}**: {v}' for k, v in prediction.items() if k not in skill_names + ['index']) - text += f'\n\n__**{skill}**__: {prediction[skill]}' - ground_truth = GroundTruth(prediction_id=prediction['index'], skill_name=skill) + text += '\n'.join(f'**{k}**: {v}' for k, v in prediction.items() if k not in skill_outputs + ['index']) + text += f'\n\n__**{skill_output}**__: {prediction[skill_output]}' + ground_truth = GroundTruth(prediction_id=prediction['index'], skill_name=skill_output) message = await channel.send( text, view=AcceptRejectView( diff --git a/adala/environments/web.py b/adala/environments/web.py index db2295b..904f156 100644 --- a/adala/environments/web.py +++ b/adala/environments/web.py @@ -1,7 +1,7 @@ import requests import time from typing import Optional -from .base import Environment +from .base import StaticEnvironment from .servers.base import GroundTruth from adala.skills import SkillSet from adala.utils.internal_data import InternalDataFrame, InternalSeries @@ -9,7 +9,7 @@ from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn -class WebEnvironment(Environment): +class WebStaticEnvironment(StaticEnvironment): """ Web environment interacts with server API to request feedback and retrieve ground truth. Following endpoints are expected: @@ -18,11 +18,36 @@ class WebEnvironment(Environment): """ url: str - def request_feedback(self, skill_set: SkillSet, predictions: InternalDataFrame): - requests.post(f'{self.url}/feedback', json={ - 'skills': [dict(skill) for skill in skill_set.skills.values()], + def request_feedback( + self, + skills: SkillSet, + predictions: InternalDataFrame, + num_feedbacks: Optional[int] = None, + wait_for_feedback: Optional[bool] = True + ): + predictions = predictions.sample(n=num_feedbacks) + skills_payload = [] + for skill in skills.skills.values(): + skill_payload = dict(skill) + skill_payload['outputs'] = skill.get_output_fields() + skills_payload.append(skill_payload) + + payload = { + 'skills': skills_payload, 'predictions': predictions.reset_index().to_dict(orient='records') - }, timeout=3) + } + + requests.post(f'{self.url}/feedback', json=payload, timeout=3) + + if wait_for_feedback: + total_timeout = 3600 + with Progress() as progress: + task = progress.add_task(f"Waiting for feedback...", total=total_timeout) + gt_records = [] + while len(gt_records) < num_feedbacks: + progress.advance(task, 10) + time.sleep(10) + gt_records = self.get_gt_records() def get_gt_records(self): gt_records = requests.get(f'{self.url}/ground-truth', timeout=3).json() @@ -30,14 +55,7 @@ def get_gt_records(self): gt_records = [r for r in gt_records if r.gt_data or r.gt_match] return gt_records - def get_ground_truth_dataset(self, wait: Optional[float] = None) -> InternalDataFrame: - gt_records = [] - if wait: - with Progress() as progress: - task = progress.add_task(f"Waiting for ground truth {wait} seconds...", total=wait) - while not progress.finished: - progress.advance(task, wait / 100) - time.sleep(wait / 100) + def get_ground_truth(self, predictions: InternalDataFrame) -> InternalDataFrame: gt_records = self.get_gt_records() @@ -51,4 +69,3 @@ def get_ground_truth_dataset(self, wait: Optional[float] = None) -> InternalData df = InternalDataFrame({skill: InternalSeries(g) for skill, g in gt.items()}) return df - diff --git a/adala/runtimes/__init__.py b/adala/runtimes/__init__.py index 807e817..0594c4f 100644 --- a/adala/runtimes/__init__.py +++ b/adala/runtimes/__init__.py @@ -1,2 +1,2 @@ from .base import Runtime, LLMRuntime, LLMRuntimeModelType -from .openai import OpenAIRuntime +from ._openai import OpenAIChatRuntime, OpenAIVisionRuntime diff --git a/adala/runtimes/_guidance.py b/adala/runtimes/_guidance.py new file mode 100644 index 0000000..8c15ef5 --- /dev/null +++ b/adala/runtimes/_guidance.py @@ -0,0 +1,113 @@ +import guidance +import enum +import re +from rich import print +from typing import Dict, Optional, Any +from .base import Runtime +from adala.utils.parse import parse_template, partial_str_format + + +class GuidanceModelType(enum.Enum): + """Enumeration for LLM runtime model types.""" + OpenAI = 'OpenAI' + Transformers = 'Transformers' + + +class GuidanceRuntime(Runtime): + llm_runtime_model_type: GuidanceModelType = GuidanceModelType.OpenAI + llm_params: Dict[str, str] = { + 'model': 'gpt-3.5-turbo-instruct', + # 'max_tokens': 10, + # 'temperature': 0, + } + + _llm = None + _program = None + # do not override this template + _llm_template: str = '''\ +{{>instructions_program}} + +{{>input_program}} +{{>output_program}}''' + + def init_runtime(self) -> Runtime: + # create an LLM instance + if self.llm_runtime_model_type.value == GuidanceModelType.OpenAI.value: + self._llm = guidance.llms.OpenAI(**self.llm_params) + elif self.llm_runtime_model_type.value == GuidanceModelType.Transformers.value: + self._llm = guidance.llms.Transformers(**self.llm_params) + else: + raise NotImplementedError(f'LLM runtime type {self.llm_runtime_model_type} is not implemented.') + self._program = guidance(self._llm_template, llm=self._llm, silent=not self.verbose) + return self + + def _double_brackets(self, text): + # This regex replaces occurrences of {word} with {{word}}, + # but ignores occurrences of {{word}}. + # Negative lookbehind (? Dict[str, str]: + + extra_fields = extra_fields or {} + field_types = field_schema or {} + + if not isinstance(record, dict): + record = record.to_dict() + else: + record = record.copy() + program_input = record + + output_fields = parse_template(partial_str_format(output_template, **record), include_texts=False) + for output_field in output_fields: + field_name = output_field['text'] + if field_name in field_schema and field_schema[field_name]['type'] == 'array': + # when runtime is called with a categorical field: + # runtime.record_to_record( + # ..., + # output_template='Predictions: {labels}', + # field_schema={'labels': {'type': 'array', 'items': {'type': 'string', 'enum': ['a', 'b', 'c']}}} + # ) + # replace {field_name} with {select 'field_name' options=field_name_options} + # and add "field_name_options" to program input + output_template = output_template.replace( + f'{{{field_name}}}', + f'{{{{select \'{field_name}\' options={field_name}}}}}' + ) + program_input[field_name] = field_types[field_name]['items']['enum'] + + # exclude guidance parameter from input + if 'text' in program_input: + program_input['text_'] = program_input['text'] + del program_input['text'] + # TODO: this check is brittle, will likely to fail in various cases + if '{text}' in input_template: + input_template = input_template.replace('{text}', '{text_}') + program_input['input_program'] = guidance(self._double_brackets(input_template), llm=self._llm, silent=not self.verbose) + program_input['output_program'] = guidance(self._double_brackets(output_template), llm=self._llm) + program_input['instructions_program'] = guidance(self._double_brackets(instructions_template), llm=self._llm) + program_input.update(extra_fields) + + if self.verbose: + print(program_input) + + result = self._program( + silent=not self.verbose, + **program_input + ) + + output = {} + for output_field in output_fields: + if output_field['text'] in extra_fields: + continue + output[output_field['text']] = result[output_field['text']] + return output diff --git a/adala/runtimes/_langchain.py b/adala/runtimes/_langchain.py new file mode 100644 index 0000000..e226a28 --- /dev/null +++ b/adala/runtimes/_langchain.py @@ -0,0 +1,68 @@ +from typing import Any, Dict, Optional +from .base import Runtime +from pydantic import Field +from langchain.prompts import ChatPromptTemplate +from langchain.chat_models import ChatOpenAI +from langchain.output_parsers import StructuredOutputParser, ResponseSchema +from adala.utils.parse import parse_template, partial_str_format +from adala.utils.internal_data import InternalDataFrame + + +class LangChainRuntime(Runtime): + """A runtime for the LangChain API.""" + lc_model_name: str = Field(alias='model') + + def _prepare_chain( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, Any]] = None, + field_schema: Optional[Dict] = None + ): + + field_schema = field_schema or {} + extra_fields = extra_fields or {} + output_fields = parse_template(partial_str_format(output_template, **record, **extra_fields), + include_texts=False) + response_schemas = [] + for output_field in output_fields: + name = output_field['text'] + if name in field_schema and 'description' in field_schema[name]: + description = field_schema[name]['description'] + else: + description = name + response_schemas.append(ResponseSchema(name=name, description=description)) + + output_parser = StructuredOutputParser.from_response_schemas(response_schemas) + format_instructions = output_parser.get_format_instructions() + + model = ChatOpenAI(model_name=self.lc_model_name, verbose=self.verbose) + + prompt = ChatPromptTemplate.from_template( + '{instructions_template}\n{format_instructions}\n{input_template}', + partial_variables={ + "format_instructions": format_instructions, + "instructions_template": instructions_template.format(**record, **extra_fields), + "input_template": input_template.format(**record, **extra_fields), + }) + + if self.verbose: + print(f'**Prompt content**:\n{prompt}') + + chain = prompt | model | output_parser + return chain + + def record_to_record( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, Any]] = None, + field_schema: Optional[Dict] = None, + ) -> Dict[str, str]: + + chain = self._prepare_chain(record, input_template, instructions_template, output_template, extra_fields, field_schema) + return chain.invoke(record) diff --git a/adala/runtimes/_openai.py b/adala/runtimes/_openai.py new file mode 100644 index 0000000..aa6b70b --- /dev/null +++ b/adala/runtimes/_openai.py @@ -0,0 +1,205 @@ +import os +from rich import print +from openai import OpenAI, NotFoundError +from pydantic import model_validator, field_validator, ValidationInfo, Field +from typing import Optional, Dict, Any, List +from .base import ( + LLMRuntime, LLMRuntimeType, LLMRuntimeModelType, Runtime +) +from adala.utils.logs import print_error +from adala.utils.internal_data import InternalDataFrame +from adala.utils.parse import parse_template, partial_str_format + + +class OpenAIRuntime(LLMRuntime): + """Runtime class specifically designed for OpenAI models. + + This class is tailored to use OpenAI models, particularly GPT models. + It inherits from the `LLMRuntime` class and thus can utilize its functionalities but specializes + for the OpenAI ecosystem. + + Attributes: + api_key (str): The API key required to access OpenAI's API. + gpt_model_name (str): Name of the GPT model. Defaults to 'gpt-3.5-turbo-instruct'. + temperature (float): Sampling temperature for the GPT model's output. + A higher value makes output more random, while a lower value makes it more deterministic. + Defaults to 0.0. + """ + + api_key: Optional[str] = None + gpt_model_name: Optional[str] = Field(default='gpt-3.5-turbo-instruct', alias='model') + temperature: Optional[float] = 0.0 + + def _check_api_key(self): + if self.api_key: + return + self.api_key = os.getenv('OPENAI_API_KEY') + if not self.api_key: + print_error( + 'OpenAI API key is not provided. Please set the OPENAI_API_KEY environment variable:\n\n' + 'export OPENAI_API_KEY=your-openai-api-key\n\n' + 'or set the `api_key` attribute of the `OpenAIRuntime` python class:\n\n' + f'{self.__class__.__name__}(..., api_key="your-openai-api-key")\n\n' + f'Read more about OpenAI API keys at https://platform.openai.com/docs/quickstart/step-2-setup-your-api-key') + raise ValueError('OpenAI API key is not provided.') + + def _check_model_availability(self): + models = openai.Model.list(api_key=self.api_key) + models = set(model['id'] for model in models['data']) + # models = openai.models.list() + # models = set(model.id for model in models.data) + if self.gpt_model_name not in models: + print_error( + f'Requested model "{self.gpt_model_name}" is not available in your OpenAI account. ' + f'Available models are: {models}\n\n' + f'Try to change the runtime settings for {self.__class__.__name__}, for example:\n\n' + f'{self.__class__.__name__}(..., model="gpt-3.5-turbo")\n\n' + ) + raise ValueError(f'Requested model {self.gpt_model_name} is not available in your OpenAI account.') + + def init_runtime(self): + self._check_api_key() + self._check_model_availability() + + student_models = {'gpt-3.5-turbo-instruct', 'text-davinci-003'} + teacher_models = {'gpt-4', 'gpt-3.5-turbo', 'gpt-3.5-turbo-16k', 'gpt-4-1106-preview', 'gpt-4-vision-preview'} + + if self.gpt_model_name in student_models: + self.llm_runtime_type = LLMRuntimeType.STUDENT + elif self.gpt_model_name in teacher_models: + self.llm_runtime_type = LLMRuntimeType.TEACHER + else: + raise NotImplementedError(f'Not supported model: {self.gpt_model_name}.') + + self.llm_runtime_model_type = LLMRuntimeModelType.OpenAI + self.llm_params = { + 'model': self.gpt_model_name, + 'temperature': self.temperature, + 'api_key': self.api_key + } + self._create_program() + return self + + +class OpenAIChatRuntime(Runtime): + openai_model: str = Field(alias='model') + openai_api_key: Optional[str] = Field(default=os.getenv('OPENAI_API_KEY'), alias='api_key') + max_tokens: Optional[int] = 1000 + + _client: OpenAI = None + + def init_runtime(self) -> 'Runtime': + if self._client is None: + self._client = OpenAI(api_key=self.openai_api_key) + + # check model availability + try: + self._client.models.retrieve(self.openai_model) + except NotFoundError: + raise ValueError(f'Requested model "{self.openai_model}" is not available in your OpenAI account.') + return self + + def record_to_record( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, str]] = None, + field_schema: Optional[Dict] = None, + ) -> Dict[str, str]: + + extra_fields = extra_fields or {} + + output_fields = parse_template(partial_str_format(output_template, **extra_fields), include_texts=False) + if len(output_fields) > 1: + raise NotImplementedError(f'{self.__class__.__name__} does not support multiple output fields. ' + f'Found: {output_fields}') + output_field = output_fields[0] + output_field_name = output_field['text'] + system_prompt = instructions_template.format(**record, **extra_fields) + user_prompt = input_template.format(**record, **extra_fields) + # TODO: this truncates the suffix of the output template + # for example, output template "Output: {answer} is correct" results in output_prefix "Output: " + output_prefix = output_template[:output_field['start']] + user_prompt += f'\n\n{output_prefix}' + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt} + ] + if self.verbose: + print(f'OpenAI request: {messages}') + + completion = self._client.chat.completions.create( + model=self.openai_model, + messages=messages + ) + completion_text = completion.choices[0].message.content + return {output_field_name: completion_text} + + +class OpenAIVisionRuntime(OpenAIChatRuntime): + + def record_to_record( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, str]] = None, + field_schema: Optional[Dict] = None, + ) -> Dict[str, str]: + + extra_fields = extra_fields or {} + field_schema = field_schema or {} + + output_fields = parse_template(partial_str_format(output_template, **extra_fields), include_texts=False) + + if len(output_fields) > 1: + raise NotImplementedError(f'{self.__class__.__name__} does not support multiple output fields. ' + f'Found: {output_fields}') + output_field = output_fields[0] + output_field_name = output_field['text'] + + input_fields = parse_template(input_template) + + # split input template into text and image parts + input_text = '' + content = [{ + 'type': 'text', + 'text': instructions_template.format(**dict(**record, **extra_fields)) + }] + for field in input_fields: + if field['type'] == 'text': + input_text += field['text'] + elif field['type'] == 'var': + if field['text'] not in field_schema: + input_text += record[field['text']] + elif field_schema[field['text']]['type'] == 'string': + if field_schema[field['text']].get('format') == 'uri': + if input_text: + content.append({'type': 'text', 'text': input_text}) + input_text = '' + content.append({'type': 'image_url', 'image_url': record[field['text']]}) + else: + input_text += record[field['text']] + else: + raise ValueError(f'Unsupported field type: {field_schema[field["text"]]["type"]}') + if input_text: + content.append({'type': 'text', 'text': input_text}) + + if self.verbose: + print(f'**Prompt content**:\n{content}') + + completion = self._client.chat.completions.create( + model=self.openai_model, + messages=[{ + "role": "user", + "content": content + }], + max_tokens=self.max_tokens + ) + + completion_text = completion.choices[0].message.content + return {output_field_name: completion_text} diff --git a/adala/runtimes/base.py b/adala/runtimes/base.py index a3e95ff..cfe73d3 100644 --- a/adala/runtimes/base.py +++ b/adala/runtimes/base.py @@ -5,6 +5,7 @@ from tqdm import tqdm from abc import ABC, abstractmethod from pydantic import BaseModel, model_validator +from pydantic.dataclasses import dataclass from typing import List, Dict, Optional, Tuple, Any, Callable from adala.datasets.base import InternalDataFrame from adala.utils.logs import print_text @@ -32,6 +33,105 @@ def init_runtime(self) -> 'Runtime': """ return self + @abstractmethod + def record_to_record( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, Any]] = None, + field_schema: Optional[Dict] = None, + ) -> Dict[str, str]: + """ + Processes a record. + + Args: + record (Dict[str, str]): The record to process. + input_template (str): The input template. + instructions_template (str): The instructions template. + output_template (str): The output template. + extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None. + field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings, + i.e. analogous to {"field_n": {"type": "string"}}. + + Returns: + Dict[str, str]: The processed record. + """ + + def batch_to_batch( + self, + batch: InternalDataFrame, + input_template: str, + instructions_template: str, + output_template: str, + extra_fields: Optional[Dict[str, str]] = None, + field_schema: Optional[Dict] = None, + ) -> InternalDataFrame: + """ + Processes a record. + + Args: + batch (InternalDataFrame): The batch to process. + input_template (str): The input template. + instructions_template (str): The instructions template. + output_template (str): The output template. + extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None. + field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings, + i.e. analogous to {"field_n": {"type": "string"}}. + + Returns: + InternalDataFrame: The processed batch. + """ + output = batch.progress_apply( + self.record_to_record, + axis=1, + result_type='expand', + input_template=input_template, + instructions_template=instructions_template, + output_template=output_template, + extra_fields=extra_fields, + field_schema=field_schema + ) + return output + + def record_to_batch( + self, + record: Dict[str, str], + input_template: str, + instructions_template: str, + output_template: str, + output_batch_size: int = 1, + extra_fields: Optional[Dict[str, str]] = None, + field_schema: Optional[Dict] = None, + ) -> InternalDataFrame: + + """ + Processes a record and return a batch. + + Args: + record (Dict[str, str]): The record to process. + input_template (str): The input template. + instructions_template (str): The instructions template. + output_template (str): The output template. + output_batch_size (int): The batch size for the output. Defaults to 1. + extra_fields (Optional[Dict[str, str]]): Extra fields to use in the templates. Defaults to None. + field_schema (Optional[Dict]): Field JSON schema to use in the templates. Defaults to all fields are strings, + i.e. analogous to {"field_n": {"type": "string"}}. + + Returns: + InternalDataFrame: The processed batch. + """ + batch = InternalDataFrame([record] * output_batch_size) + return self.batch_to_batch( + batch=batch, + input_template=input_template, + instructions_template=instructions_template, + output_template=output_template, + extra_fields=extra_fields, + field_schema=field_schema + ) + class LLMRuntimeType(enum.Enum): STUDENT = 'student' diff --git a/adala/runtimes/openai.py b/adala/runtimes/openai.py deleted file mode 100644 index 7e0ea60..0000000 --- a/adala/runtimes/openai.py +++ /dev/null @@ -1,74 +0,0 @@ -import os -import openai -from pydantic import model_validator, field_validator, ValidationInfo, Field -from typing import Optional, Dict -from .base import LLMRuntime, LLMRuntimeType, LLMRuntimeModelType -from adala.utils.logs import print_error - - -class OpenAIRuntime(LLMRuntime): - """Runtime class specifically designed for OpenAI models. - - This class is tailored to use OpenAI models, particularly GPT models. - It inherits from the `LLMRuntime` class and thus can utilize its functionalities but specializes - for the OpenAI ecosystem. - - Attributes: - api_key (str): The API key required to access OpenAI's API. - gpt_model_name (str): Name of the GPT model. Defaults to 'gpt-3.5-turbo-instruct'. - temperature (float): Sampling temperature for the GPT model's output. - A higher value makes output more random, while a lower value makes it more deterministic. - Defaults to 0.0. - """ - - api_key: Optional[str] = None - gpt_model_name: Optional[str] = Field(default='gpt-3.5-turbo-instruct', alias='model') - temperature: Optional[float] = 0.0 - - def _check_api_key(self): - if self.api_key: - return - self.api_key = os.getenv('OPENAI_API_KEY') - if not self.api_key: - print_error( - 'OpenAI API key is not provided. Please set the OPENAI_API_KEY environment variable:\n\n' - 'export OPENAI_API_KEY=your-openai-api-key\n\n' - 'or set the `api_key` attribute of the `OpenAIRuntime` python class:\n\n' - f'{self.__class__.__name__}(..., api_key="your-openai-api-key")\n\n' - f'Read more about OpenAI API keys at https://platform.openai.com/docs/quickstart/step-2-setup-your-api-key') - raise ValueError('OpenAI API key is not provided.') - - def _check_model_availability(self): - models = openai.Model.list(api_key=self.api_key) - models = set(model['id'] for model in models['data']) - if self.gpt_model_name not in models: - print_error( - f'Requested model "{self.gpt_model_name}" is not available in your OpenAI account. ' - f'Available models are: {models}\n\n' - f'Try to change the runtime settings for {self.__class__.__name__}, for example:\n\n' - f'{self.__class__.__name__}(..., model="gpt-3.5-turbo")\n\n' - ) - raise ValueError(f'Requested model {self.gpt_model_name} is not available in your OpenAI account.') - - def init_runtime(self): - self._check_api_key() - self._check_model_availability() - - student_models = {'gpt-3.5-turbo-instruct', 'text-davinci-003'} - teacher_models = {'gpt-4', 'gpt-3.5-turbo', 'gpt-3.5-turbo-16k', 'gpt-4-1106-preview', 'gpt-4-vision-preview'} - - if self.gpt_model_name in student_models: - self.llm_runtime_type = LLMRuntimeType.STUDENT - elif self.gpt_model_name in teacher_models: - self.llm_runtime_type = LLMRuntimeType.TEACHER - else: - raise NotImplementedError(f'Not supported model: {self.gpt_model_name}.') - - self.llm_runtime_model_type = LLMRuntimeModelType.OpenAI - self.llm_params = { - 'model': self.gpt_model_name, - 'temperature': self.temperature, - 'api_key': self.api_key - } - self._create_program() - return self diff --git a/adala/skills/__init__.py b/adala/skills/__init__.py index 9cb3417..fdcd692 100644 --- a/adala/skills/__init__.py +++ b/adala/skills/__init__.py @@ -3,4 +3,5 @@ from .generation.base import TextGenerationSkill from .generation.qa import QuestionAnsweringSkill from .generation.summarization import SummarizationSkill +from .collection.classification import ClassificationSkill diff --git a/adala/skills/_base.py b/adala/skills/_base.py new file mode 100644 index 0000000..ff8f7e3 --- /dev/null +++ b/adala/skills/_base.py @@ -0,0 +1,168 @@ +from pydantic import BaseModel, Field, field_validator +from typing import List, Optional, Any, Dict, Tuple, Union +from abc import ABC, abstractmethod +from adala.utils.internal_data import InternalDataFrame +from adala.utils.parse import parse_template, partial_str_format +from adala.runtimes.base import Runtime + + +Record = Dict[str, str] + + +class Skill(BaseModel, ABC): + name: str = Field( + title='Skill name', + description='Unique name of the skill', + examples=['labeling', 'classification', 'text-generation'] + ) + instructions: str = Field( + title='Skill instructions', + description='Instructs agent what to do with the input data. ' + 'Can use templating to refer to input fields.', + examples=['Label the input text with the following labels: {labels}'] + ) + input_template: str = Field( + title='Input template', + description='Template for the input data. ' + 'Can use templating to refer to input parameters and perform data transformations.', + examples=['Input: {input}', 'Input: {input}\nLabels: {labels}\nOutput: '] + ) + output_template: str = Field( + title='Output template', + description='Template for the output data. ' + 'Can use templating to refer to input parameters and perform data transformations', + examples=["Output: {output}", "{predictions}"] + ) + description: Optional[str] = Field( + default='', + title='Skill description', + description='Description of the skill. Can be used to retrieve skill from the library.', + examples=['The skill to perform sentiment analysis on the input text.'] + ) + field_schema: Optional[Dict[str, Any]] = Field( + default=None, + title='Field schema', + description='JSON schema for the fields of the input and output data.', + examples=[{ + "input": {"type": "string"}, + "output": {"type": "string"}, + "labels": { + "type": "array", + "items": { + "type": "string", + "enum": ["positive", "negative", "neutral"] + } + } + }]) + + def _get_extra_fields(self): + """ + Retrieves fields that are not categorized as system fields. + + Returns: + dict: A dictionary containing fields that are not system fields. + """ + + # TODO: more robust way to exclude system fields + system_fields = { + 'name', 'description', 'input_template', 'output_template', 'instructions', + 'field_schema'} + extra_fields = self.model_dump(exclude=system_fields) + return extra_fields + + def get_output_fields(self): + """ + Retrieves output fields. + + Returns: + List[str]: A list of output fields. + """ + extra_fields = self._get_extra_fields() + # TODO: input fields are not considered - shall we disallow input fields in output template? + output_fields = parse_template(partial_str_format(self.output_template, **extra_fields), include_texts=False) + return [f['text'] for f in output_fields] + + +class TransformSkill(Skill): + + def apply( + self, + input: Union[InternalDataFrame, Record], + runtime: Runtime, + ) -> InternalDataFrame: + """ + Applies the skill to a dataframe and returns another dataframe. + + Args: + input (InternalDataFrame): The input data to be processed. + runtime (Runtime): The runtime instance to be used for processing. + + Returns: + InternalDataFrame: The processed data. + """ + + if isinstance(input, dict): + input = InternalDataFrame([input]) + return runtime.batch_to_batch( + input, + input_template=self.input_template, + output_template=self.output_template, + instructions_template=self.instructions, + field_schema=self.field_schema, + extra_fields=self._get_extra_fields(), + ) + + +class SynthesisSkill(Skill): + + def apply( + self, + input: Record, + runtime: Runtime, + ) -> InternalDataFrame: + """ + Applies the skill to a record and returns a dataframe. + + Args: + input (Dict[str, str]): The input data to be processed. + runtime (Runtime): The runtime instance to be used for processing. + """ + return runtime.record_to_batch( + input, + input_template=self.input_template, + output_template=self.output_template, + instructions_template=self.instructions, + field_schema=self.field_schema, + extra_fields=self._get_extra_fields(), + ) + + +class AnalysisSkill(Skill): + + def apply( + self, + input: Union[InternalDataFrame, Record], + runtime: Runtime, + ) -> Record: + """ + Applies the skill to a dataframe and returns a record. + + Args: + input (InternalDataFrame): The input data to be processed. + runtime (Runtime): The runtime instance to be used for processing. + """ + if isinstance(input, dict): + input = InternalDataFrame([input]) + elif isinstance(input, InternalDataFrame): + if len(input) > 1: + raise ValueError('Input dataframe must contain only one record.') + + output = runtime.batch_to_batch( + input, + input_template=self.input_template, + output_template=self.output_template, + instructions_template=self.instructions, + field_schema=self.field_schema, + extra_fields=self._get_extra_fields(), + ) + return output.to_dict(orient='records')[0] diff --git a/adala/skills/collection/__init__.py b/adala/skills/collection/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/adala/skills/collection/analyze_errors.py b/adala/skills/collection/analyze_errors.py new file mode 100644 index 0000000..4743ef6 --- /dev/null +++ b/adala/skills/collection/analyze_errors.py @@ -0,0 +1,65 @@ +from adala.skills._base import AnalysisSkill +from adala.utils.internal_data import InternalDataFrame, InternalDataFrameConcat +from adala.utils.parse import parse_template, partial_str_format +from adala.runtimes.base import Runtime +from typing import Dict + + +class AnalyzeLLMPromptErrorsExactMatch(AnalysisSkill): + """ + Analyzes errors in a text. + """ + name: str = 'analyze_llm_prompt_errors_exact_match' + initial_llm_instructions: str + instructions: str = "LLM prompt was created by concatenating instructions with text input:\n\n" \ + "Prediction = LLM(Input, Instructions)\n\n" \ + "We expect the prediction to be equal to the ground truth.\n" \ + "Your task is to provide a reason for the error due to the original instruction.\n" \ + "Be concise and specific. The reason for the error should fit within a single line.\n\n" \ + "Instructions:\n{initial_llm_instructions}\n\n" + prediction_column: str + ground_truth_column: str + + def apply( + self, + input: InternalDataFrame, + runtime: Runtime, + ) -> Dict[str, str]: + """ + Applies the skill to a record and returns a dataframe. + + Args: + input (InternalDataFrame): The input data to be processed. + runtime (Runtime): The runtime instance to be used for processing. + """ + + output_fields = parse_template(self.output_template, include_texts=False) + if len(output_fields) > 1: + raise ValueError(f'Output template should contain only one field, got {output_fields}') + output_field_name = output_fields[0]['text'] + + MAX_ERRORS = 3 + errors = input.sample(n=min(MAX_ERRORS, input.shape[0])) + + input_template = f'{self.input_template}\n' \ + f'Prediction: {{{self.prediction_column}}}\n' \ + f'Ground truth: {{{self.ground_truth_column}}}\n' + output_template = 'Error reason: {reason}\n' + + errors_with_reason = runtime.batch_to_batch( + batch=errors, + input_template=input_template, + output_template=output_template, + instructions_template=self.instructions, + extra_fields={'initial_llm_instructions': self.initial_llm_instructions}, + field_schema=self.field_schema, + ) + + errors_with_reason = InternalDataFrameConcat([errors, errors_with_reason], axis=1) + + agg_template = f'{input_template}{output_template}' + aggregated_errors = errors_with_reason.apply( + lambda row: agg_template.format(**row), axis=1 + ).str.cat(sep='\n') + + return {output_field_name: aggregated_errors} diff --git a/adala/skills/collection/classification.py b/adala/skills/collection/classification.py new file mode 100644 index 0000000..d7a00fc --- /dev/null +++ b/adala/skills/collection/classification.py @@ -0,0 +1,9 @@ +from adala.skills._base import TransformSkill +from typing import List + + +class ClassificationSkill(TransformSkill): + """ + Classifies into one of the given labels. + """ + labels: List[str] diff --git a/adala/skills/collection/improve_llm.py b/adala/skills/collection/improve_llm.py new file mode 100644 index 0000000..a73a741 --- /dev/null +++ b/adala/skills/collection/improve_llm.py @@ -0,0 +1,24 @@ +from adala.skills._base import AnalysisSkill + + +class ImproveLLMInstructions(AnalysisSkill): + """ + Improves LLM instructions given the error analysis report of the previous instruction + """ + old_instructions: str + name: str = 'improve_llm_instructions' + instructions: str = "LLM prompt was created by concatenating instructions with text input:\n\n" \ + "Prediction = LLM(Input, Instructions)\n\n" \ + "We expect the prediction to be equal to the ground truth.\n" \ + "Your task is to analyze errors made by old instructions " \ + "and craft new instructions for the LLM.\n" \ + "Follow best practices for LLM prompt engineering.\n" \ + "Include 2-3 examples at the end of your response " \ + "to demonstrate how the new instruction would be applied.\n" \ + "Use the following format for your examples:\n" \ + "Input: ...\n" \ + "Output: ...\n\n" + + input_template: str = "Old instructions:\n{old_instructions}\n\n" \ + "Errors:\n{error_report}\n\n" + output_template: str = "New instructions:\n{new_instructions}\n\n" diff --git a/adala/skills/skillset.py b/adala/skills/skillset.py index ec759ac..30fd880 100644 --- a/adala/skills/skillset.py +++ b/adala/skills/skillset.py @@ -5,8 +5,9 @@ from adala.datasets.base import Dataset from adala.runtimes.base import Runtime from adala.utils.logs import print_text -from adala.utils.internal_data import InternalDataFrame, InternalSeries, InternalDataFrameConcat +from adala.utils.internal_data import InternalDataFrame, InternalSeries, InternalDataFrameConcat, Record from .base import BaseSkill, LLMSkill +from ._base import Skill class SkillSet(BaseModel, ABC): @@ -19,15 +20,15 @@ class SkillSet(BaseModel, ABC): cases, task decomposition can involve a graph-based approach. Attributes: - skills (Dict[str, BaseSkill]): A dictionary of skills in the skill set. + skills (Dict[str, Skill]): A dictionary of skills in the skill set. """ - skills: Dict[str, BaseSkill] + skills: Dict[str, Skill] @abstractmethod def apply( self, - dataset: Union[Dataset, InternalDataFrame], + input: Union[Record, InternalDataFrame], runtime: Runtime, improved_skill: Optional[str] = None ) -> InternalDataFrame: @@ -35,26 +36,14 @@ def apply( Apply the skill set to a dataset using a specified runtime. Args: - dataset (Union[Dataset, InternalDataFrame]): The dataset to apply the skill set to. + input (Union[Record, InternalDataFrame]): Input data to apply the skill set to. runtime (Runtime): The runtime environment in which to apply the skills. improved_skill (Optional[str], optional): Name of the skill to start from (to optimize calculations). Defaults to None. Returns: InternalDataFrame: Skill predictions. """ - @abstractmethod - def select_skill_to_improve(self, accuracy: Mapping, accuracy_threshold: Optional[float] = 1.0) -> Optional[BaseSkill]: - """ - Select skill to improve based on accuracy. - - Args: - accuracy (Mapping): Skills accuracies. - accuracy_threshold (Optional[float], optional): Accuracy threshold. Defaults to 1.0. - Returns: - Optional[BaseSkill]: Skill to improve. None if no skill to improve. - """ - - def __getitem__(self, skill_name) -> BaseSkill: + def __getitem__(self, skill_name) -> Skill: """ Select skill by name. @@ -66,7 +55,7 @@ def __getitem__(self, skill_name) -> BaseSkill: """ return self.skills[skill_name] - def __setitem__(self, skill_name, skill: BaseSkill): + def __setitem__(self, skill_name, skill: Skill): """ Set skill by name. @@ -85,6 +74,15 @@ def get_skill_names(self) -> List[str]: """ return list(self.skills.keys()) + def get_skill_outputs(self) -> Dict[str, str]: + """ + Get dictionary of skill outputs. + + Returns: + Dict[str, str]: Dictionary of skill outputs. Keys are output names and values are skill names + """ + return {field: skill.name for skill in self.skills.values() for field in skill.get_output_fields()} + class LinearSkillSet(SkillSet): """ @@ -115,10 +113,9 @@ class LinearSkillSet(SkillSet): """ skill_sequence: List[str] = None - input_data_field: Optional[str] = None @field_validator('skills', mode='before') - def skills_validator(cls, v: Union[List[str], List[BaseSkill], Dict[str, BaseSkill]]) -> Dict[str, BaseSkill]: + def skills_validator(cls, v: Union[List[Skill], Dict[str, Skill]]) -> Dict[str, Skill]: """ Validates and converts the skills attribute to a dictionary of skill names to BaseSkill instances. @@ -132,29 +129,7 @@ def skills_validator(cls, v: Union[List[str], List[BaseSkill], Dict[str, BaseSki if not v: return skills - input_data_field = None - if isinstance(v, list) and isinstance(v[0], str): - # if list of strings presented, they are interpreted as skill instructions - for i, instructions in enumerate(v): - skill_name = f"skill_{i}" - skills[skill_name] = LLMSkill( - name=skill_name, - instructions=instructions, - input_data_field=input_data_field - ) - # Linear skillset creates skills pipeline - update input_data_field for next skill - input_data_field = skill_name - elif isinstance(v, dict) and isinstance(v[list(v.keys())[0]], str): - # if dictionary of strings presented, they are interpreted as skill instructions - for skill_name, instructions in v.items(): - skills[skill_name] = LLMSkill( - name=skill_name, - instructions=instructions, - input_data_field=input_data_field - ) - # Linear skillset creates skills pipeline - update input_data_field for next skill - input_data_field = skill_name - elif isinstance(v, list) and isinstance(v[0], BaseSkill): + elif isinstance(v, list) and isinstance(v[0], Skill): # convert list of skill names to dictionary for skill in v: skills[skill.name] = skill @@ -183,7 +158,7 @@ def skill_sequence_validator(self) -> 'LinearSkillSet': def apply( self, - dataset: Union[Dataset, InternalDataFrame], + input: Union[Record, InternalDataFrame], runtime: Runtime, improved_skill: Optional[str] = None, ) -> InternalDataFrame: @@ -191,33 +166,50 @@ def apply( Sequentially applies each skill on the dataset, enhancing the agent's experience. Args: - dataset (Dataset): The dataset to apply the skills on. + input (InternalDataFrame): Input dataset. runtime (Runtime): The runtime environment in which to apply the skills. improved_skill (Optional[str], optional): Name of the skill to improve. Defaults to None. Returns: InternalDataFrame: Skill predictions. """ - - predictions = None if improved_skill: # start from the specified skill, assuming previous skills have already been applied skill_sequence = self.skill_sequence[self.skill_sequence.index(improved_skill):] else: skill_sequence = self.skill_sequence + skill_input = input for i, skill_name in enumerate(skill_sequence): skill = self.skills[skill_name] # use input dataset for the first node in the pipeline - input_dataset = dataset if i == 0 else predictions print_text(f"Applying skill: {skill_name}") - predictions = skill.apply(input_dataset, runtime) - - return predictions + skill_output = skill.apply(skill_input, runtime) + if isinstance(skill_output, InternalDataFrame) and isinstance(skill_input, InternalDataFrame): + # Columns to drop from skill_input because they are also in skill_output + cols_to_drop = set(skill_output.columns) & set(skill_input.columns) + skill_input_reduced = skill_input.drop(columns=cols_to_drop) + + skill_input = skill_output.merge( + skill_input_reduced, + left_index=True, + right_index=True, + how='inner' + ) + elif isinstance(skill_output, InternalDataFrame) and isinstance(skill_input, dict): + skill_input = skill_output + elif isinstance(skill_output, dict) and isinstance(skill_input, InternalDataFrame): + skill_input = skill_output + elif isinstance(skill_output, dict) and isinstance(skill_input, dict): + skill_input = dict(skill_output, **skill_input) + else: + raise ValueError(f"Unsupported input type: {type(skill_input)} and output type: {type(skill_output)}") + + return skill_input def select_skill_to_improve( self, accuracy: Mapping, accuracy_threshold: Optional[float] = 1.0 - ) -> Optional[BaseSkill]: + ) -> Optional[Skill]: """ Selects the skill with the lowest accuracy to improve. @@ -227,8 +219,9 @@ def select_skill_to_improve( Returns: Optional[BaseSkill]: Skill to improve. None if no skill to improve. """ - for skill_name in self.skill_sequence: - if accuracy[skill_name] < accuracy_threshold: + + for skill_output, skill_name in self.get_skill_outputs(): + if accuracy[skill_output] < accuracy_threshold: return self.skills[skill_name] def __rich__(self): diff --git a/adala/utils/internal_data.py b/adala/utils/internal_data.py index bec57e4..35b9930 100644 --- a/adala/utils/internal_data.py +++ b/adala/utils/internal_data.py @@ -1,8 +1,7 @@ import pandas as pd from typing import List, Dict, Any, Union, Iterable -RawRecord = Dict[str, Any] -RawRecords = List[RawRecord] +Record = Dict[str, str] # Internal data tables representation. Replace this with Dask or Polars in the future. InternalDataFrame = pd.DataFrame diff --git a/adala/utils/parse.py b/adala/utils/parse.py new file mode 100644 index 0000000..d01b70d --- /dev/null +++ b/adala/utils/parse.py @@ -0,0 +1,73 @@ +import re +import string +from string import Formatter +from typing import List, TypedDict + + +class PartialStringFormatter(string.Formatter): + def get_value(self, key, args, kwds): + if isinstance(key, str): + try: + return kwds[key] + except KeyError: + return '{' + key + '}' + else: + Formatter.get_value(key, args, kwds) + + +PartialStringFormat = PartialStringFormatter() + + +def partial_str_format(string, **kwargs): + return PartialStringFormat.format(string, **kwargs) + + +class TemplateChunks(TypedDict): + text: str + start: int + end: int + type: str + + +def parse_template(string, include_texts=True) -> List[TemplateChunks]: + """ + Parses a template string to extract output fields and the text between them. + + Args: + string (str): The template string to parse. + include_texts (bool): Whether to include the text between the fields in the output. + + Returns: + List[Element]: A list of dictionaries with the keys 'text', 'start', 'end', and 'type'. + + Example: + >>> parse_template("some text {field1} some more text {field2}") + [{"text": "some text ", "start": 0, "end": 10, "type": "text"}, + {"text": "field1", "start": 11, "end": 17, "type": "var"}, + {"text": " some more text ", "start": 18, "end": 35, "type": "text"}, + {"text": "field2", "start": 36, "end": 42, "type": "var"}] + """ + + chunks: List[TemplateChunks] = [] + last_index = 0 + + for match in re.finditer(r'\{(.*?)\}', string): + # Text before field + if last_index < match.start() and include_texts: + text = string[last_index:match.start()] + chunks.append({"text": text, "start": last_index, "end": match.start(), "type": "text"}) + + # Field itself + field = match.group(1) + start = match.start() + end = match.end() + chunks.append({"text": field, "start": start, "end": end, "type": "var"}) + + last_index = match.end() + + # Text after the last field + if last_index < len(string) and include_texts: + text = string[last_index:] + chunks.append({"text": text, "start": last_index, "end": len(string), "type": "text"}) + + return chunks diff --git a/tests/test_classification.py b/tests/test_classification.py index e685dd9..c165c2f 100644 --- a/tests/test_classification.py +++ b/tests/test_classification.py @@ -1,10 +1,10 @@ import pandas as pd from unittest.mock import MagicMock, patch -from adala.runtimes.openai import OpenAIRuntime +from adala.runtimes import OpenAIChatRuntime from adala.agents import Agent from adala.datasets import DataFrameDataset -from adala.environments import BasicEnvironment +from adala.environments import StaticEnvironment from adala.skills import ClassificationSkill from adala.utils.logs import print_dataframe @@ -34,9 +34,9 @@ def process_record_generator(*args, **kwargs): yield {'sentiment': 'Neutral'} -@patch.object(OpenAIRuntime, '_check_api_key', return_value=None) -@patch.object(OpenAIRuntime, '_check_model_availability', return_value=None) -@patch.object(OpenAIRuntime, '_process_record', side_effect=process_record_generator()) +@patch.object(OpenAIChatRuntime, '_check_api_key', return_value=None) +@patch.object(OpenAIChatRuntime, '_check_model_availability', return_value=None) +@patch.object(OpenAIChatRuntime, '_process_record', side_effect=process_record_generator()) def test_classification_skill( mock_check_api_key, mock_check_model_availability, diff --git a/tests/test_openai_runtime.py b/tests/test_openai_runtime.py index 39c2d13..6f8de73 100644 --- a/tests/test_openai_runtime.py +++ b/tests/test_openai_runtime.py @@ -1,4 +1,4 @@ -from utils import patching, PatchedCalls +from utils import patching, PatchedCalls, mdict @patching( diff --git a/tests/utils.py b/tests/utils.py index df2e55d..b8d3c50 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -4,6 +4,7 @@ class PatchedCalls(enum.Enum): GUIDANCE = 'guidance._program.Program.__call__' + # OPENAI_MODEL_LIST = 'openai.models.list' OPENAI_MODEL_LIST = 'openai.api_resources.model.Model.list' @@ -71,3 +72,9 @@ def side_effect(*args, **kwargs): return wrapper return decorator + + +class mdict(dict): + + def __getattr__(self, item): + return self[item]