diff --git a/lamini/__init__.py b/lamini/__init__.py index b0b701f..b99df08 100644 --- a/lamini/__init__.py +++ b/lamini/__init__.py @@ -29,3 +29,5 @@ batch_size = int(os.environ.get("LAMINI_BATCH_SIZE", 5)) static_batching = bool(os.environ.get("LAMINI_STATIC_BATCHING", False)) bypass_reservation = bool(os.environ.get("LAMINI_BYPASS_RESERVATION", False)) + +__version__ = "3.1.0" diff --git a/lamini/api/classifier.py b/lamini/api/classifier.py index ddf522e..7cd1617 100644 --- a/lamini/api/classifier.py +++ b/lamini/api/classifier.py @@ -1,5 +1,5 @@ import time -from typing import List, Union +from typing import List, Union, Optional import lamini import requests @@ -8,7 +8,31 @@ class Classifier: - def __init__(self, model_id: int = None, api_key: str = None, api_url: str = None): + """Handler for classification functions of an already trained LLM for classification tasks + on the Lamini Platform + + Parameters + ---------- + model_id: int = None + Tuned Model designation on the Lamini platform + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the default. + default = "https://app.lamini.ai" + + """ + + def __init__( + self, + model_id: int = None, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + ): self.model_id = model_id self.config = get_config() self.api_key = api_key or lamini.api_key or get_configured_key(self.config) @@ -21,7 +45,36 @@ def classify( top_n: int = None, threshold: float = None, metadata: bool = None, - ): + ) -> str: + """Send a classification request for self.model_id with the provided prompt. + + Parameters + ---------- + prompt: Union[str, List[str]] + Text prompt for the LLM classifier + + top_n: int = None + Top N responses from the LLM Classifier, n indicates the limit + + threshold: float = None + Classifier threshold to indicate a prediction is 'confident' enough + for a predicted class + + metadata: bool = None + Boolean flag to request for metadata return from the request + + Raises + ------ + Exception + Raised if self.model_id was not set on instantiation. If no model_id + was provided then no model can be requested for a prediction. + + Returns + ------- + resp["classification"]: str + Returned predicted class as a string + """ + if self.model_id is None: raise Exception( "model_id must be set in order to classify. Upload a model or set an existing model_id" @@ -41,7 +94,26 @@ def classify( ) return resp["classification"] - def predict(self, prompt: Union[str, List[str]]): + def predict(self, prompt: Union[str, List[str]]) -> str: + """Send a prediction request for self.model_id with the provided prompt. + + Parameters + ---------- + prompt: Union[str, List[str]] + Text prompt for the LLM classifier + + Raises + ------ + Exception + Raised if self.model_id was not set on instantiation. If no model_id + was provided then no model can be requested for a prediction. + + Returns + ------- + resp["prediction"]: str + Returned predicted class as a string + """ + if self.model_id is None: raise Exception( "model_id must be set in order to classify. Upload a model or set an existing model_id" @@ -55,7 +127,20 @@ def predict(self, prompt: Union[str, List[str]]): ) return resp["prediction"] - def upload(self, file_path: str): + def upload(self, file_path: str) -> None: + """Upload file to Lamini platform + + + Parameters + ---------- + file_path: str + Path to file to upload + + Returns + ------- + None + """ + files = {"file": open(file_path, "rb")} headers = { "Authorization": "Bearer " + self.api_key, diff --git a/lamini/api/embedding.py b/lamini/api/embedding.py index 5c17cd2..17cf89e 100644 --- a/lamini/api/embedding.py +++ b/lamini/api/embedding.py @@ -7,6 +7,27 @@ class Embedding: + """Handler for embedding requests to the Lamini Platform + + + Parameters + ---------- + model_name: str = None + LLM hugging face ID, e.g. "meta-llama/Meta-Llama-3.1-8B-Instruct" + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + def __init__( self, model_name: str = None, @@ -19,7 +40,22 @@ def __init__( self.api_prefix = self.api_url + "/v1/" self.model_name = model_name - def generate(self, prompt: Union[str, List[str]]): + def generate(self, prompt: Union[str, List[str]]) -> List[np.ndarray]: + """Request to Lamini platform for an embedding encoding of the provided + prompt + + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to encoding into an embedding + + Returns + ------- + List[np.ndarray] + Formatted returned embedding from the Lamini platform + """ + params = {"prompt": prompt, "model_name": self.model_name} resp = make_web_request( self.api_key, self.api_prefix + "embedding", "post", params diff --git a/lamini/api/lamini.py b/lamini/api/lamini.py index 2b02a93..d307d81 100644 --- a/lamini/api/lamini.py +++ b/lamini/api/lamini.py @@ -1,21 +1,48 @@ import json +import jsonlines import logging import os +import pandas as pd import time -from typing import Dict, Iterable, List, Optional, Union -import jsonlines -import pandas as pd from lamini.api.lamini_config import get_config from lamini.api.rest_requests import get_version from lamini.api.train import Train from lamini.api.utils.completion import Completion from lamini.api.utils.upload_client import upload_to_blob +from lamini.error.error import ( + DownloadingModelError, +) +from typing import Dict, Iterable, List, Optional, Union, Any, Generator logger = logging.getLogger(__name__) class Lamini: + """Main interface for Lamini platform functionality. Key features are: + 1. Generation calls + 2. Data Upload/Downloading + 3. Training orchestration + 4. Evaluation + + Parameters + ---------- + model_name: str = None + LLM hugging face ID + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + def __init__( self, model_name: str, @@ -31,7 +58,19 @@ def __init__( self.upload_file_path = None self.upload_base_path = None - def version(self): + def version(self) -> str: + """Get the version of the Lamini platform + + Parameters + ---------- + None + + Returns + ------- + str + Returned version fo the platform + """ + return get_version(self.api_key, self.api_url, self.config) def generate( @@ -41,14 +80,54 @@ def generate( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - ): - result = self.completion.generate( - prompt=prompt, - model_name=model_name or self.model_name, - output_type=output_type, - max_tokens=max_tokens, - max_new_tokens=max_new_tokens, - ) + ) -> Union[str, Dict[str, Any]]: + """Generation request to the LLM with the provided prompt. + Model name will specify which LLM from hugging face to use. + Output type is used to handle structured output of the response. + max_tokens and max_new_tokens are related to the total amount of tokens + the model can use and generate. max_new_tokens is recommended to be used + over max_tokens to adjust model output. + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + Raises + ------ + DownloadingModelError + Raised when an issue occurs with the model_name provided has failed to download + + Returns + ------- + result: Union[str, Dict[str, Any]] + Generated response from the LLM, strings are returned when output_type is not + specified, otherwise a dictionary matching the output_type is returned. + """ + + result = None + try: + result = self.completion.generate( + prompt=prompt, + model_name=model_name or self.model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + except DownloadingModelError as e: + return e if output_type is None: if isinstance(prompt, list): result = [single_result["output"] for single_result in result] @@ -64,6 +143,42 @@ async def async_generate( max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, ): + """Asynchronous call for a generation request to the LLM with the provided prompt. + Model name will specify which LLM from hugging face to use. + Output type is used to handle structured output of the response. + max_tokens and max_new_tokens are related to the total amount of tokens + the model can use and generate. max_new_tokens is recommended to be used + over max_tokens to adjust model output. + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + Raises + ------ + DownloadingModelError + Raised when an issue occurs with the model_name provided has failed to download + + Returns + ------- + result: Union[str, Dict[str, Any]] + Generated response from the LLM, strings are returned when output_type is not + specified, otherwise a dictionary matching the output_type is returned. + """ + req_data = self.completion.make_llm_req_map( prompt=prompt, model_name=model_name or self.model_name, @@ -83,7 +198,32 @@ def upload_data( self, data: Iterable[Dict[str, Union[int, float, str, bool, Dict, List]]], is_public: Optional[bool] = None, - ): + ) -> str: + """Upload the provide data to the Lamini Platform + + Parameters + ---------- + data: Iterable[Dict[str, Union[int, float, str, bool, Dict, List]]] + Data to upload + + is_public: Optional[bool] = None + Flag to indicate if the platform should allow the dataset to be + publically shared. + + Raises + ------ + ValueError + Raised in data is None + + Exception + Raised if there was a failure during upload + + Returns + ------- + str + Dataset designation within the platform + """ + num_datapoints = 0 def get_data_str(d): @@ -117,7 +257,6 @@ def get_data_str(d): self.upload_file_path = response["dataset_location"] print("Data pairs uploaded to local.") - print(response) print( f"\nYour dataset id is: {response['dataset_id']} . Consider using this in the future to train using the same data. \nEg: " f"llm.train(data_or_dataset_id='{response['dataset_id']}')" @@ -131,7 +270,30 @@ def get_data_str(d): def upload_file( self, file_path: str, input_key: str = "input", output_key: str = "output" - ): + ) -> None: + """Upload a provided file to the Lamini Platform + + Parameters + ---------- + file_path: str + File path location to upload + + input_key: str = "input" + Key of the json dictionary to use as the input + + output_key: str = "output" + Key of the json dictionary to use as the output + + Raises + ------ + Exception + Raised if there is an issue with upload + + Returns + ------- + None + """ + items = self._upload_file_impl(file_path, input_key, output_key) try: dataset_id = self.upload_data(items) @@ -142,7 +304,37 @@ def upload_file( def _upload_file_impl( self, file_path: str, input_key: str = "input", output_key: str = "output" - ): + ) -> Generator[Dict[str, Any], None, None]: + """Private function to handle file types and loading for upload_file + + Parameters + ---------- + file_path: str + File path location to upload + + input_key: str = "input" + Key of the json dictionary to use as the input + + output_key: str = "output" + Key of the json dictionary to use as the output + + Raises + ------ + ValueError + Raised if input_key is not within the file contents provided + + KeyError + Raises if input_key or output_key is not within the file contents provided + + Exception + If a file type outside of csv or jsonlines is provided + + Yields + ------- + items: Dict[str, Any] + Contents of the file provided + """ + if os.path.getsize(file_path) > 1e10: raise Exception("File size is too large, please upload file less than 10GB") @@ -186,8 +378,38 @@ def train( finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, - **kwargs, - ): + ) -> str: + """Handler for training jobs through the Trainer object. This submits a training + job request to the platform using the provided data. + + Parameters + ---------- + data_or_dataset_id: Union[ + str, Iterable[Dict[str, Union[int, float, str, bool, Dict, List]]] + ] + Data or Id to use for the training job + + finetune_args: Optional[dict] = None + Arguments that are passed into the Trainer.train function + + gpu_config: Optional[dict] = None + Configuration for the GPUs on the platform + + is_public: Optional[bool] = None + Allow public access to the model and dataset + + Raises + ------ + AssertionError + Raises if dataset_id is None, a dataset_id is generated when data is provided + to this function instead of an id + + Returns + ------- + job: str + Job id for the train job on the platform + """ + if isinstance(data_or_dataset_id, str): dataset_id = data_or_dataset_id else: @@ -224,7 +446,43 @@ def train_and_wait( gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, **kwargs, - ): + ) -> str: + """Handler for training jobs through the Trainer object. This submits a training + job request to the platform using the provided data. This differs from the train + function in that this function will continuously poll until the job is completed. + + Parameters + ---------- + data_or_dataset_id: Union[ + str, Iterable[Dict[str, Union[int, float, str, bool, Dict, List]]] + ] + Data or Id to use for the training job + + finetune_args: Optional[dict] = None + Arguments that are passed into the Trainer.train function + + gpu_config: Optional[dict] = None + Configuration for the GPUs on the platform + + is_public: Optional[bool] = None + Allow public access to the model and dataset + + kwargs: Dict[str, Any] + Key word arguments + verbose + output text indicating the job is still runing + + Raises + ------ + KeyboardInterrupt + Raised when keyboard interrupt is called + + Returns + ------- + status: str + Job status on the platform + """ + job = self.train( data_or_dataset_id, finetune_args=finetune_args, @@ -266,22 +524,98 @@ def train_and_wait( # Add alias for tune tune_and_wait = train_and_wait - def cancel_job(self, job_id=None): + def cancel_job(self, job_id: str = None) -> str: + """Cancel to job specified by the id + + Parameters + ---------- + job_id: str=None + job id to cancel + + Returns + ------- + str + Output from platform of the confirming cancelling of the job + """ + return self.trainer.cancel_job(job_id) def cancel_all_jobs( self, - ): + ) -> str: + """Cancel all jobs from this user on the platform + + Parameters + ---------- + None + + Returns + ------- + str + Output from platform of the confirming cancelling of the job + """ + return self.trainer.cancel_all_jobs() - def resume_job(self, job_id=None): + def resume_job(self, job_id: str = None) -> str: + """Resume the specific job on the Lamini platform + + Parameters + ---------- + job_id: str=None + Job to be resumed + + Returns + ------- + str: + Returned status of the platform for the job + """ + return self.trainer.resume_job(job_id) - def check_job_status(self, job_id=None): + def check_job_status(self, job_id: str = None) -> str: + """Check the specified job on the Lamini platform + + Parameters + ---------- + job_id: str=None + Job to check status + + Returns + ------- + str + Returned status of the platform job + """ + return self.trainer.check_job_status(job_id) - def get_jobs(self): + def get_jobs(self) -> List[str]: + """Get all jobs for this user on the Lamini Platform + + Parameters + ---------- + None + + Returns + ------- + List[str]: + Returned list of all jobs + """ + return self.trainer.get_jobs() - def evaluate(self, job_id=None): + def evaluate(self, job_id: str = None) -> str: + """Run an evaluation job on the specified training job + + Parameters + ---------- + job_id: str=None + Job to evaluate + + Returns + ------- + str: + Status of the job on the platform + """ + return self.trainer.evaluate(job_id) diff --git a/lamini/api/lamini_config.py b/lamini/api/lamini_config.py index a269632..e97d76e 100644 --- a/lamini/api/lamini_config.py +++ b/lamini/api/lamini_config.py @@ -1,25 +1,75 @@ import os +from typing import Dict, Any import config global_config = None -def get_global_config(overrides={}): +def get_global_config(overrides: Dict[str, Any] = {}) -> config.Configuration: + """Getter for the global lamini config, update config if overrides are provided + + Parameters + ---------- + overrides: Dict[str, Any]={} + Dictionary contents to override within the global_config + + Raises + ------ + Assertion Exception + Thrown if global_config has not been set + + Returns + ------- + overriden_config: config.Configuration + Copy of the global config with provided overrides + """ + global global_config assert ( global_config is not None ), "global_config must be set before calling get_config" - return global_config + overriden_config = global_config.copy() + overriden_config.update(overrides) + + return overriden_config + + +def setup_config(dictionary: Dict[str, Any] = {}) -> config.Configuration: + """Initialize the global config with the provided dictionary + + Parameters + ---------- + dictionary: Dict[str, Any] + Key/values to wrap into a config + + Returns + ------- + global_config: config.Configuration + Newly initalized config + """ -def setup_config(dictionary={}): global global_config global_config = get_config(dictionary) return global_config -def get_config(dictionary={}): +def get_config(dictionary: Dict[str, Any] = {}) -> config.Configuration: + """Construct a Configuration from the provided dictionary, along with + the environment, lamini, and powerml configurations. + + Parameters + ---------- + dictionary: Dict[str, Any] + Key/values to wrap into a config + + Returns + ------- + new_config: config.Configuration + Newly construction config + """ + new_config = config.ConfigurationSet( config.config_from_dict(dictionary), config.config_from_env(prefix="LAMINI", separator="__", lowercase_keys=True), @@ -30,12 +80,36 @@ def get_config(dictionary={}): return new_config -def reset_config(): +def reset_config() -> None: + """Reset the global config to None + + Parameters + ---------- + None + + Returns + ------- + None + """ + global global_config global_config = None -def edit_config(dictionary={}): +def edit_config(dictionary: Dict[str, Any] = {}) -> config.Configuration: + """Update the global_config with the provided dictionary + + Parameters + ---------- + dictionary: Dict[str, Any] + Key/values to update into global_config + + Returns + ------- + global_config: config.Configuration + Updated global_config + """ + global global_config if global_config is None: global_config = setup_config(dictionary) @@ -44,7 +118,19 @@ def edit_config(dictionary={}): return global_config -def home_lamini_config(): +def home_lamini_config() -> config.Configuration: + """Gather the local lamini configuration and wrap into a config + + Parameters + ---------- + None + + Returns + ------- + yaml_config: config.Configuration + Home config key/values inside a config + """ + home = os.path.expanduser("~") home_config_path = os.path.join(home, ".lamini/configure.yaml") if os.path.exists(home_config_path): @@ -54,7 +140,19 @@ def home_lamini_config(): return yaml_config -def home_powerml_config(): +def home_powerml_config() -> config.Configuration: + """Gather the local powerml configuration and wrap into a config + + Parameters + ---------- + None + + Returns + ------- + yaml_config: config.Configuration + Home config key/values inside a config + """ + home = os.path.expanduser("~") home_config_path = os.path.join(home, ".powerml/configure_llama.yaml") if os.path.exists(home_config_path): @@ -64,7 +162,20 @@ def home_powerml_config(): return yaml_config -def get_configured_url(config): +def get_configured_url(config: config.Configuration) -> str: + """Extract the Lamini platform url from the config + + Parameters + ---------- + config: config.Configuration + Config storing the url + + Returns + ------- + url: str + Extracted platform url + """ + environment = os.environ.get("LLAMA_ENVIRONMENT") if environment == "LOCAL": url = config.get("local.url", "http://localhost:5001") @@ -75,7 +186,20 @@ def get_configured_url(config): return url -def get_configured_key(config): +def get_configured_key(config: config.Configuration) -> str: + """Extract the Lamini platform key from the config + + Parameters + ---------- + config: config.Configuration + Config storing the key + + Returns + ------- + key: str + Extracted platform key + """ + environment = os.environ.get("LLAMA_ENVIRONMENT") if environment == "LOCAL": key = config.get("local.key", None) diff --git a/lamini/api/rest_requests.py b/lamini/api/rest_requests.py index 680e762..2e3c715 100644 --- a/lamini/api/rest_requests.py +++ b/lamini/api/rest_requests.py @@ -1,16 +1,20 @@ +from typing import Optional, Dict, Any + import asyncio import importlib.metadata import logging import aiohttp import requests -from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.lamini_config import get_configured_key, get_configured_url from lamini.error.error import ( APIError, APIUnprocessableContentError, AuthenticationError, + DownloadingModelError, ModelNotFound, RateLimitError, + RequestTimeoutError, UnavailableResourceError, UserError, ) @@ -20,13 +24,53 @@ warn_once = False -def get_version(key, url, config): +def get_version( + key: Optional[str], url: Optional[str], config: Optional[Dict[str, Any]] +) -> str: + """Getter for the Lamini Platform version + + Parameters + ---------- + key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + config: Dict[str, Any] + Configuration storing the key and url + + Returns + ------- + str + Version of the Lamini Platform + """ + api_key = key or get_configured_key(config) api_url = url or get_configured_url(config) return make_web_request(api_key, api_url + "/v1/version", "get", None) -def check_version(resp): +def check_version(resp: Dict[str, Any]) -> None: + """If the flag of warn_once is not set then print the X-warning + from the post request response and set the flag to true. + + Parameters + ---------- + resp: Dict[str, Any] + Request response dictionary + + Returns + ------- + None + """ + global warn_once if not warn_once: if resp.headers is not None and "X-Warning" in resp.headers: @@ -34,7 +78,55 @@ def check_version(resp): print(resp.headers["X-Warning"]) -async def make_async_web_request(client, key, url, http_method, json=None): +async def make_async_web_request( + client: requests.Session, + key: str, + url: str, + http_method: str, + json: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Send asycn request to the Lamini Platform + + Parameters + ---------- + client: requests.Session + Session handler for web requests + + key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + http_method: str + Request type + + json: Optional[Dict[str, Any]]=None + Data to send with request + + Raises + ------ + AuthenticationError + Raised if key is not valid or missing + + AssertionError + http_method is not post or get + + asyncio.TimeoutError + Timeout from server + + Returns + ------- + json_response: Dict[str, Any] + Response from the web request + """ + try: headers = { "Content-Type": "application/json", @@ -77,7 +169,44 @@ async def make_async_web_request(client, key, url, http_method, json=None): return json_response -async def handle_error(resp: aiohttp.ClientResponse): +async def handle_error(resp: aiohttp.ClientResponse) -> None: + """Given the response from a requests.Session, provide the proper + readable output for the user. + + Parameters + ---------- + resp: aiohttp.ClientResponse + Response from the web request + + Raises + ------ + ModelNotFound + Raises from 594 + + RateLimitError + Raises from 429 + + AuthenticationError + Raises from 401 + + UserError + Raises from 400 + + APIUnprocessableContentError + Raises from 422 + + UnavailableResourceError + Raises from 503 + + APIError + Raises from 200 + + Returns + ------- + None + + """ + if resp.status == 594: try: json_response = await resp.json() @@ -129,7 +258,66 @@ async def handle_error(resp: aiohttp.ClientResponse): raise APIError(f"API error {description}") -def make_web_request(key, url, http_method, json=None): +def make_web_request( + key: str, url: str, http_method: str, json: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """Execute a web request + + Parameters + ---------- + key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + http_method: str + Request type + + json: Optional[Dict[str, Any]]=None + Data to send with request + + Raises + ------ + AuthenticationError + Raised from invalid or missing api key + + Exception + http_method requested is not post or get + + HTTPError + Raised from many possible reasons: + if resp.status_code == 594: + ModelNotFound + if resp.status_code == 429: + RateLimitError + if resp.status_code == 401: + AuthenticationError + if resp.status_code == 400: + UserError + if resp.status_code == 422: + UserError + if resp.status_code == 503: + UnavailableResourceError + if resp.status_code == 513: + DownloadingModelError + if resp.status_code == 524: + RequestTimeoutError + if resp.status_code != 200: + APIError + + Returns + ------- + Dic[str, Any] + Response from the request + """ + try: headers = { "Content-Type": "application/json", @@ -152,7 +340,6 @@ def make_web_request(key, url, http_method, json=None): check_version(resp) resp.raise_for_status() except requests.exceptions.HTTPError as e: - print("status code:", resp.status_code, url) if resp.status_code == 594: try: json_response = resp.json() @@ -193,6 +380,23 @@ def make_web_request(key, url, http_method, json=None): raise UnavailableResourceError( json_response.get("detail", "UnavailableResourceError") ) + if resp.status_code == 513: + message = "" + try: + json_response = resp.json() + message = json_response.get("detail") + message = message.split("Downloading", 1)[1].join(["Downloding", ""]) + except Exception: + json_response = {} + raise DownloadingModelError(message) + if resp.status_code == 524: + try: + json_response = resp.json() + except Exception: + json_response = {} + raise RequestTimeoutError( + json_response.get("detail", "RequestTimeoutError") + ) if resp.status_code != 200: try: description = resp.json() diff --git a/lamini/api/streaming_completion.py b/lamini/api/streaming_completion.py index 2d0f5a2..6d1428a 100644 --- a/lamini/api/streaming_completion.py +++ b/lamini/api/streaming_completion.py @@ -1,6 +1,6 @@ import asyncio import time -from typing import List, Optional, Union +from typing import List, Optional, Union, Dict, Any import aiohttp import lamini @@ -9,8 +9,39 @@ class StreamingCompletionObject: + """Handler for streaming API endpoint on the Lamini Platform + + Parameters + ---------- + request_params: Dict[str, Any] + Parameters to pass into the request + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + polling_interval: int + Interval to wait before polling again + + max_errors: int = 0 + Number of errors before raising an exception + """ + def __init__( - self, request_params, api_url, api_key, polling_interval, max_errors=0 + self, + request_params: Dict[str, Any], + api_url: str, + api_key: str, + polling_interval: int, + max_errors: int = 0, ): self.request_params = request_params self.api_url = api_url @@ -22,13 +53,48 @@ def __init__( self.error_count = 0 self.max_errors = max_errors - def __iter__(self): + def __iter__(self) -> object: + """Iteration definition + + Parameters + ---------- + None + + Returns + ------- + Reference to self + """ + return self - def __next__(self): + def __next__(self) -> str: + """Iterator next step definition + + Parameters + ---------- + None + + Returns + ------- + str + Streamed next result + """ + return self.next() - def next(self): + def next(self) -> str: + """Retrieve the next iteration of the response stream + + Parameters + ---------- + None + + Returns + ------- + self.current_result: str + Streamed result from the web request + """ + if self.done_streaming: raise StopIteration() time.sleep(self.polling_interval) @@ -54,8 +120,39 @@ def next(self): class AsyncStreamingCompletionObject: + """Handler for asynchronous streaming API endpoint on the Lamini Platform + + Parameters + ---------- + request_params: Dict[str, Any] + Parameters to pass into the request + + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + polling_interval: int + Interval to wait before polling again + + max_errors: int = 5 + Number of errors before raising an exception + """ + def __init__( - self, request_params, api_url, api_key, polling_interval, max_errors=5 + self, + request_params: Dict[str, Any], + api_url: str, + api_key: str, + polling_interval: int, + max_errors: int = 5, ): self.request_params = request_params self.api_url = api_url @@ -67,13 +164,48 @@ def __init__( self.error_count = 0 self.max_errors = max_errors - def __aiter__(self): + def __aiter__(self) -> object: + """Asychronous iteration definition + + Parameters + ---------- + None + + Returns + ------- + Reference to this instance of AsyncStreamingCompletionObject + """ + return self async def __anext__(self): + """Asynchronous next definition + + Parameters + ---------- + None + + Returns + ------- + str + Current streaming result from the web request + """ + return await self.next() async def next(self): + """Retrieve the next iteration of the response stream + + Parameters + ---------- + None + + Returns + ------- + self.current_result: str + Streamed result from the web request + """ + if self.done_streaming: raise StopAsyncIteration() await asyncio.sleep(self.polling_interval) @@ -100,6 +232,23 @@ async def next(self): class StreamingCompletion: + """Handler for streaming completions API endpoint on the Lamini Platform + + Parameters + ---------- + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + def __init__( self, api_key: str = None, @@ -118,7 +267,37 @@ def submit( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - ): + ) -> Dict[str, Any]: + """Conduct a web request to the streaming completions api endpoint with the + provided prompt to the model_name if provided. Output_type handles the formatting + of the output into a structure from this provided output_type. + max_tokens and max_new_tokens are related to the total amount of tokens + the model can use and generate. max_new_tokens is recommended to be used + over max_tokens to adjust model output. + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + Returns + ------- + Dict[str, Any] + Returned response from the web request + """ + req_data = self.make_llm_req_map( prompt=prompt, model_name=model_name, @@ -142,7 +321,37 @@ async def async_submit( output_type: Optional[dict] = None, max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, - ): + ) -> Dict[str, Any]: + """Asynchronously send a web request to the streaming completions api endpoint with the + provided prompt to the model_name if provided. Output_type handles the formatting + of the output into a structure from this provided output_type. + max_tokens and max_new_tokens are related to the total amount of tokens + the model can use and generate. max_new_tokens is recommended to be used + over max_tokens to adjust model output. + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + Returns + ------- + Dict[str, Any] + Returned response from the web request + """ + req_data = self.make_llm_req_map( prompt=prompt, model_name=model_name, @@ -168,7 +377,35 @@ def create( max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, polling_interval: Optional[float] = 1, - ): + ) -> object: + """Instantiate a new StreamingCompletionObject + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + polling_interval: Optional[float] = 1 + Interval to wait before polling again + + Returns + ------- + StreamingCompletionObject + Newly instantiated object + """ + self.done_streaming = False self.server = None self.prompt = prompt @@ -199,7 +436,35 @@ def async_create( max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, polling_interval: Optional[float] = 1, - ): + ) -> object: + """Instantiate a new AsyncStreamingCompletionObject + + Parameters + ---------- + prompt: Union[str, List[str]] + Prompt to send to LLM + + model_name: Optional[str] = None + Which model to use from hugging face + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + polling_interval: Optional[float] = 1 + Interval to wait before polling again + + Returns + ------- + AsyncStreamingCompletionObject + Newly instantiated object + """ + self.done_streaming = False self.server = None self.prompt = prompt @@ -223,8 +488,42 @@ def async_create( ) def make_llm_req_map( - self, model_name, prompt, output_type, max_tokens, max_new_tokens, server - ): + self, + model_name: Optional[str], + prompt: Union[str, List[str]], + output_type: Optional[dict], + max_tokens: Optional[int], + max_new_tokens: Optional[int], + server: Optional[str], + ) -> Dict[str, Any]: + """Make a web request to the Lamini Platform + + Parameters + ---------- + model_name: Optional[str] + Which model to use from hugging face + + prompt: Union[str, List[str]] + Prompt to send to LLM + + output_type: Optional[dict] = None + Structured output format + + max_tokens: Optional[int] = None + Max number of tokens for the model's generation + + max_new_tokens: Optional[int] = None + Max number of new tokens from the model's generation + + server: Optional[str] + Which Lamini Platform to make the request out to + + Returns + ------- + req_data: Dict[str, Any] + Response from the web request + """ + req_data = {} req_data["model_name"] = model_name req_data["prompt"] = prompt diff --git a/lamini/api/synchronize.py b/lamini/api/synchronize.py index 5fb9393..b136945 100644 --- a/lamini/api/synchronize.py +++ b/lamini/api/synchronize.py @@ -1,12 +1,27 @@ +from typing import Any import asyncio from threading import Thread, current_thread -def sync(awaitable): - """ - Get result of calling function on the given args. If it is awaitable, will +def sync(awaitable) -> Any: + """Get result of calling function on the given args. If it is awaitable, will block until it is finished. Runs in a new thread in such cases. Credit Piotr: https://github.com/truera/trulens/pull/793/files#diff-23a219ce07a4edb8892fe8ecf21aba06d5ebe012c80c3386f9a9e1fe80d23254 + + Parameters + ---------- + awaitable: Callable + Function to run if it is an async function + + Raises + ------ + Exception + Raised from asyncio loop failure + + Returns + ------- + Any + Result of the awaitable """ # Check if there is a running loop. @@ -24,7 +39,23 @@ def sync(awaitable): # Otherwise we cannot create a new one in this thread so we create a # new thread to run the awaitable until completion. - def run_in_new_loop(): + def run_in_new_loop() -> None: + """Create a new asycn loop for the Callable function + + Parameters + ---------- + None + + Raises + ------ + Exception + Thrown if an issue occurs within the new event loop + + Returns + ------- + None + """ + th = current_thread() # Attach return value and possibly exception to thread object so we # can retrieve from the starter of the thread. diff --git a/lamini/api/train.py b/lamini/api/train.py index 80bafe1..e3ab143 100644 --- a/lamini/api/train.py +++ b/lamini/api/train.py @@ -1,5 +1,5 @@ import logging -from typing import Optional +from typing import Any, Dict, Optional import lamini from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url @@ -10,6 +10,23 @@ class Train: + """Handler for the training jobs on the Lamini Platform + + Parameters + ---------- + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + """ + def __init__( self, api_key: Optional[str] = None, @@ -19,7 +36,6 @@ def __init__( self.api_key = api_key or lamini.api_key or get_configured_key(self.config) self.api_url = api_url or lamini.api_url or get_configured_url(self.config) self.api_prefix = self.api_url + "/v1/" - self.ui_url = "https://app.lamini.ai" def train( self, @@ -29,7 +45,34 @@ def train( finetune_args: Optional[dict] = None, gpu_config: Optional[dict] = None, is_public: Optional[bool] = None, - ): + ) -> str: + """Make a web request to start a training job using the dataset ID provided + + Parameters + ---------- + model_name: str + Which model to use from hugging face + + dataset_id: str + Dataset ID to use for the training job + + upload_file_path: Optional[str] = None + + finetune_args: Optional[dict] = None + Arguments that are passed into the Trainer.train function + + gpu_config: Optional[dict] = None + Configuration for the GPUs on the platform + + is_public: Optional[bool] = None + Allow public access to the model and dataset + + Returns + ------- + job: str + Job ID on the Platform + """ + req_data = {"model_name": model_name} req_data["dataset_id"] = dataset_id if upload_file_path is not None: @@ -45,7 +88,7 @@ def train( job = make_web_request(self.api_key, url, "post", req_data) self.job_id = job["job_id"] print( - f"Tuning job submitted! Check status of job {self.job_id} here: {self.ui_url}/train/{self.job_id}" + f"Tuning job submitted! Check status of job {self.job_id} here: {self.api_url}/train/{self.job_id}" ) return job @@ -53,7 +96,20 @@ def train( # Add alias for tune tune = train - def cancel_job(self, job_id=None): + def cancel_job(self, job_id: str = None) -> Dict[str, Any]: + """Cancel the job ID provided on the Lamini Platform + + Parameters + ---------- + job_id: str=None + Job to be cancelled + + Returns + ------- + Dict[str, Any] + Result from the web request + """ + if job_id is None: job_id = self.job_id url = self.api_prefix + "train/jobs/" + str(job_id) + "/cancel" @@ -62,38 +118,119 @@ def cancel_job(self, job_id=None): def cancel_all_jobs( self, - ): + ) -> Dict[str, Any]: + """Cancel all jobs for this user on the Lamini Platform + + Parameters + ---------- + None + + Returns + ------- + Dict[str, Any] + Result from the web request + """ + url = self.api_prefix + "train/jobs/cancel" return make_web_request(self.api_key, url, "post", {}) - def resume_job(self, job_id=None): + def resume_job(self, job_id: str = None) -> Dict[str, Any]: + """Resume the job ID on the Lamini Platform + + Parameters + ---------- + job_id: str=None + Job to be resumed + + Returns + ------- + Dict[str, Any] + Result from the web request + """ + if job_id is None: job_id = self.job_id url = self.api_prefix + "train/jobs/" + str(job_id) + "/resume" return make_web_request(self.api_key, url, "post", {}) - def check_job_status(self, job_id=None): + def check_job_status(self, job_id: str = None) -> str: + """Check the specified job on the Lamini platform + + Parameters + ---------- + job_id: str=None + Job to check status + + Returns + ------- + str + Returned status of the platform job + """ + if job_id is None: job_id = self.job_id url = self.api_prefix + "train/jobs/" + str(job_id) return make_web_request(self.api_key, url, "get") - def get_jobs(self): + def get_jobs(self) -> Dict[str, Any]: + """Get all jobs for this user on the Lamini Platform + + Parameters + ---------- + None + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "train/jobs" return make_web_request(self.api_key, url, "get") - def evaluate(self, job_id=None): + def evaluate(self, job_id: str = None) -> Dict[str, Any]: + """Run an evaluation job on the specified training job + + Parameters + ---------- + job_id: str=None + Job to evaluate + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + if job_id is None: job_id = self.job_id url = self.api_prefix + "train/jobs/" + str(job_id) + "/eval" return make_web_request(self.api_key, url, "get") - def create_blob_dataset_location(self, upload_base_path, is_public): + def create_blob_dataset_location( + self, upload_base_path: str, is_public: bool + ) -> Dict[str, Any]: + """Create a blob dataset on the Lamini Platform + + Parameters + ---------- + upload_base_path: str + Path for dataset base location + + is_public: bool + Flag to mark this dataset blog as public + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "data" req_data = { "upload_base_path": upload_base_path, @@ -109,7 +246,25 @@ def create_blob_dataset_location(self, upload_base_path, is_public): req_data, ) - def update_blob_dataset_num_datapoints(self, dataset_id, num_datapoints): + def update_blob_dataset_num_datapoints( + self, dataset_id: str, num_datapoints: int + ) -> Dict[str, Any]: + """Update an existing blob dataset and datapoints on the Lamini Platform + + Parameters + ---------- + dataset_id: str + Dataset to update + + num_datapoints: int + Number of datapoints to update + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "data/num-datapoints" req_data = { "num_datapoints": num_datapoints, @@ -123,11 +278,44 @@ def update_blob_dataset_num_datapoints(self, dataset_id, num_datapoints): req_data, ) - def get_upload_base_path(self): + def get_upload_base_path(self) -> Dict[str, Any]: + """Get the base path for uploads to the Lamini Platform + + Parameters + ---------- + None + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "get-upload-base-path" return make_web_request(self.api_key, url, "get") - def upload_dataset_locally(self, upload_base_path, is_public, data): + def upload_dataset_locally( + self, upload_base_path: str, is_public: bool, data: Any + ) -> Dict[str, Any]: + """Upload a local dataset to the Lamini Platform + + Parameters + ---------- + upload_base_path: str + Base path on Lamini Platform + + is_public: bool + Flag to make this data public + + data: Any + Serializable data set to send in web request + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "local-data" req_data = {} req_data["upload_base_path"] = upload_base_path @@ -141,10 +329,29 @@ def upload_dataset_locally(self, upload_base_path, is_public, data): req_data, ) - def get_existing_dataset(self, dataset_id, upload_base_path): + def get_existing_dataset( + self, dataset_id: str, upload_base_path: str + ) -> Dict[str, Any]: + """Retrieve the existing dataset on the Lamini Platform + + Parameters + ---------- + dataset_id: str + Dataset for which to retrieve + + upload_base_path: str + Base path on Lamini Platform + + Returns + ------- + Dict[str, Any]: + Returned information from the request + """ + url = self.api_prefix + "existing-data" req_data = {"dataset_id": dataset_id} req_data["upload_base_path"] = upload_base_path + print(f"EXISTING DATA {self.api_key}") return make_web_request( self.api_key, url, diff --git a/lamini/api/utils/batch.py b/lamini/api/utils/batch.py new file mode 100644 index 0000000..efc2b42 --- /dev/null +++ b/lamini/api/utils/batch.py @@ -0,0 +1,165 @@ +from typing import Any, Dict, List, Optional, Union + +import lamini +from lamini.api.lamini_config import get_config, get_configured_key, get_configured_url +from lamini.api.rest_requests import make_web_request + + +class Batch: + """Handler for formatting and POST request for the batch submission API + + Parameters + ---------- + api_key: Optional[str] + Lamini platform API key, if not provided the key stored + within ~.lamini/configure.yaml will be used. If either + don't exist then an error is raised. + + api_url: Optional[str] + Lamini platform api url, only needed if a different url is needed outside of the + defined ones here: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py#L68 + i.e. localhost, staging.lamini.ai, or api.lamini.ai + Additionally, LLAMA_ENVIRONMENT can be set as an environment variable + that will be grabbed for the url before any of the above defaults + + """ + + def __init__( + self, + api_key: Optional[str] = None, + api_url: Optional[str] = None, + ) -> None: + """ + Configuration dictionary for platform metadata provided by the following function: + https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py + Configurations currently hold the following keys and data as a yaml format: + local: + url: + staging: + url: + production: + url: + + local: + key: + staging: + key: + production: + key: + + """ + self.config = get_config() + + self.api_key = api_key or lamini.api_key or get_configured_key(self.config) + self.api_url = api_url or lamini.api_url or get_configured_url(self.config) + self.api_prefix = self.api_url + "/v1/" + + def submit( + self, + prompt: Union[str, List[str]], + model_name: str, + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + """Handles construction of the POST request headers and body, then + a web request is made with the response returned. + + Parameters + ---------- + prompt: Union[str, List[str]]: + Input prompt for the LLM + + model_name: str + LLM model name from HuggingFace + + output_type: Optional[dict] = None + Json format for the LLM output + + max_tokens: Optional[int] = None + Upper limit in total tokens + + max_new_tokens: Optional[int] = None + Upper limit for newly generated tokens + + Returns + ------- + resp: Dict[str, Any] + Json data returned from POST request + """ + + req_data = self.make_llm_req_map( + prompt=prompt, + model_name=model_name, + output_type=output_type, + max_tokens=max_tokens, + max_new_tokens=max_new_tokens, + ) + resp = make_web_request( + self.api_key, self.api_prefix + "batch_completions", "post", req_data + ) + return resp + + def check_result( + self, + id: str, + ) -> Dict[str, Any]: + """Check for the result of a batch request with the appropriate batch id.""" + + resp = make_web_request( + self.api_key, self.api_prefix + f"batch_completions/{id}/result", "get" + ) + return resp + + def make_llm_req_map( + self, + model_name: str, + prompt: Union[str, List[str]], + output_type: Optional[dict] = None, + max_tokens: Optional[int] = None, + max_new_tokens: Optional[int] = None, + ) -> Dict[str, Any]: + """Returns a dict of parameters for calling the remote LLM inference API. + + NOTE: Copied from lamini.py. + + TODO: Create a helper function that accepts all values and returns a dict. And replace callers + of self.make_llm_req_map() with the calling of the free function. + + Parameters + ---------- + model_name: str + LLM model name from HuggingFace + + prompt: Union[str, List[str]]: + Input prompt for the LLM + + output_type: Optional[dict] = None + Json format for the LLM output + + max_tokens: Optional[int] = None + Upper limit in total tokens + + max_new_tokens: Optional[int] = None + Upper limit for newly generated tokens + + Returns + ------- + req_data: Dict[str, Any] + Constructed dictionary with parameters provided into the correctly + specified keys for a REST request. + """ + + req_data = {} + req_data["model_name"] = model_name + # TODO: prompt should be named prompt to signal it's a batch. + if isinstance(prompt, list) and len(prompt) > 20: + print( + "For large inference batches, we strongly recommend using a Generation Pipeline to streamline your process: https://github.com/lamini-ai/lamini-examples/blob/main/05_data_pipeline/" + ) + req_data["prompt"] = prompt + req_data["output_type"] = output_type + req_data["max_tokens"] = max_tokens + if max_new_tokens is not None: + req_data["max_new_tokens"] = max_new_tokens + return req_data diff --git a/lamini/api/utils/completion.py b/lamini/api/utils/completion.py index 629a1b2..a90e3cf 100644 --- a/lamini/api/utils/completion.py +++ b/lamini/api/utils/completion.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Union, Dict, Any +from typing import Any, Dict, List, Optional, Union import aiohttp import lamini @@ -7,13 +7,13 @@ class Completion: - """ Hanlder for formatting and POST request for the completions + """Handler for formatting and POST request for the completions and streaming_completions API endpoints. - + Parameters - ---------- - api_key: Optinal[str] + ---------- + api_key: Optional[str] Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either don't exist then an error is raised. @@ -28,7 +28,6 @@ class Completion: """ def __init__(self, api_key, api_url) -> None: - """ Configuration dictionary for platform metadata provided by the following function: https://github.com/lamini-ai/lamini-platform/blob/main/sdk/lamini/api/lamini_config.py @@ -62,7 +61,7 @@ def generate( max_tokens: Optional[int] = None, max_new_tokens: Optional[int] = None, ) -> Dict[str, Any]: - """Handles construction of the POST request headers and body, then + """Handles construction of the POST request headers and body, then a web request is made with the response returned. Parameters @@ -81,7 +80,7 @@ def generate( max_new_tokens: Optional[int] = None Upper limit for newly generated tokens - + Returns ------- resp: Dict[str, Any] @@ -100,14 +99,16 @@ def generate( ) return resp - async def async_generate(self, params: Dict[str, Any], client: aiohttp.ClientSession = None) -> Dict[str, Any]: + async def async_generate( + self, params: Dict[str, Any], client: aiohttp.ClientSession = None + ) -> Dict[str, Any]: """ Parameters ---------- params: Dict[str, Any] POST Request input parameters - + client: aiohttp.ClientSession = None ClientSession handler @@ -165,7 +166,7 @@ def make_llm_req_map( max_new_tokens: Optional[int] = None Upper limit for newly generated tokens - + Returns ------- req_data: Dict[str, Any] diff --git a/lamini/api/utils/iterators.py b/lamini/api/utils/iterators.py index 2ba917b..664337f 100644 --- a/lamini/api/utils/iterators.py +++ b/lamini/api/utils/iterators.py @@ -8,7 +8,7 @@ async def async_iter(normal_iter: Iterator) -> AsyncGenerator[Any, None]: ---------- normal_iter: Iterator Iterator to wrap with a yield generator - + Yields ------- item: Any diff --git a/lamini/api/utils/reservations.py b/lamini/api/utils/reservations.py index 8c23f63..0c0fd7d 100644 --- a/lamini/api/utils/reservations.py +++ b/lamini/api/utils/reservations.py @@ -13,11 +13,11 @@ class Reservations: - """ Hanlder for API reservations endpoint. + """Handler for API reservations endpoint. + - Parameters - ---------- + ---------- api_key: Optinal[str] = None Lamini platform API key, if not provided the key stored within ~.lamini/configure.yaml will be used. If either @@ -65,16 +65,16 @@ def initialize_reservation( ---------- capacity: int Reservation capactiy - + model_name: str Model to use for the reserved request - + batch_size: int Batch size for the inference call - + max_tokens: Optional[int] Max tokens for the inference call - + Returns ------- None @@ -82,9 +82,17 @@ def initialize_reservation( Raises ------ Exception - General exception for reservation issues. The exception is logged + General exception for reservation issues. The exception is logged but execution is continued. """ + if lamini.bypass_reservation: + self.current_reservation = None + self.capacity_remaining = 0 + self.dynamic_max_batch_size = 0 + self.capacity_needed = 0 + self.model_name = model_name + self.max_tokens = None + return try: logger.info( @@ -125,12 +133,12 @@ def initialize_reservation( self.max_tokens = None def pause_for_reservation_start(self) -> None: - """ Barrier until specified start time for the reservation + """Barrier until specified start time for the reservation Parameters ---------- None - + Returns ------- None @@ -146,7 +154,9 @@ def pause_for_reservation_start(self) -> None: if sleep_time.total_seconds() > 0: time.sleep(sleep_time.total_seconds()) - async def wait_and_poll_for_reservation(self, client: aiohttp.ClientSession) -> None: + async def wait_and_poll_for_reservation( + self, client: aiohttp.ClientSession + ) -> None: """Wait for current reservation to finish and then make a new reservation. If this reservation is working (indicated by the self.is_working flag), then set the kickoff and timer based polling jobs. @@ -155,7 +165,7 @@ async def wait_and_poll_for_reservation(self, client: aiohttp.ClientSession) -> ---------- client: aiohttp.ClientSession Http Client Handler - + Returns ------- None @@ -185,28 +195,9 @@ async def wait_and_poll_for_reservation(self, client: aiohttp.ClientSession) -> async with self.condition: self.condition.notify(len(self.condition._waiters)) self.is_polling = False - if self.is_working: - self.polling_task = asyncio.create_task( - self.kickoff_reservation_polling(client) - ) - logger.info("Made reservation " + str(reservation)) - if "dynamic_max_batch_size" not in reservation: - reservation["dynamic_max_batch_size"] = lamini.batch_size - self.current_reservation = reservation - self.capacity_remaining = reservation["capacity_remaining"] - self.dynamic_max_batch_size = reservation["dynamic_max_batch_size"] - if self.variable_capacity: - self.capacity_needed = self.dynamic_max_batch_size * lamini.max_workers - async with self.condition: - self.condition.notify(len(self.condition._waiters)) - self.is_polling = False - if self.is_working: - self.polling_task = asyncio.create_task( - self.kickoff_reservation_polling(client) - ) - _ = asyncio.create_task( - self.timer_based_polling(reservation["end_time"]) - ) + self.polling_task = asyncio.create_task( + self.kickoff_reservation_polling(client) + ) async def timer_based_polling(self, wakeup_time: int) -> None: """Wait for the provided wakeup_time to run the polling for the @@ -216,7 +207,7 @@ async def timer_based_polling(self, wakeup_time: int) -> None: ---------- wakeup_time: int ISO format datetime - + Returns ------- None @@ -242,7 +233,7 @@ async def kickoff_reservation_polling(self, client: aiohttp.ClientSession) -> No ---------- client: aiohttp.ClientSession Http Session handler - + Returns ------- None @@ -264,7 +255,7 @@ async def async_pause_for_reservation_start(self) -> None: Parameters ---------- None - + Returns ------- None @@ -287,7 +278,7 @@ def update_capacity_use(self, queries: int) -> None: ---------- queries: int Quantity of queries to decrease from self.capacity_remaining - + Returns ------- None @@ -304,7 +295,7 @@ def update_capacity_needed(self, queries: int) -> None: ---------- queries: int Quantity of queries to decrease from self.capacity_needed - + Returns ------- None diff --git a/lamini/api/utils/shutdown.py b/lamini/api/utils/shutdown.py index 56bd333..ecb99e8 100644 --- a/lamini/api/utils/shutdown.py +++ b/lamini/api/utils/shutdown.py @@ -3,7 +3,7 @@ async def shutdown(signal, loop) -> None: - """ Cleanup tasks tied to the service's shutdown. + """Cleanup tasks tied to the service's shutdown. Parameters ---------- diff --git a/lamini/api/utils/upload_client.py b/lamini/api/utils/upload_client.py index bc2f879..1909b97 100644 --- a/lamini/api/utils/upload_client.py +++ b/lamini/api/utils/upload_client.py @@ -13,10 +13,10 @@ def upload_to_blob(data: Iterable[str], sas_url: str) -> None: ---------- data: Iterable[str] Data to upload - + sas_url: str Location to upload to - + Returns ------- None @@ -40,7 +40,7 @@ def upload_to_local(data: Iterable[str], dataset_location: str) -> None: ---------- data: Iterable[str] Data to upload - + dataset_location: str Local location to store data diff --git a/lamini/classify/lamini_classifier.py b/lamini/classify/lamini_classifier.py index 994fd29..1c4a1d8 100644 --- a/lamini/classify/lamini_classifier.py +++ b/lamini/classify/lamini_classifier.py @@ -6,6 +6,8 @@ from itertools import chain from typing import List +import warnings + import jsonlines from lamini import Lamini from lamini.api.embedding import Embedding @@ -32,6 +34,11 @@ def __init__( example_modifier=None, example_expander=None, ): + warnings.warn( + "LaminiClassifer will be removed in a future version and will be replaced with the API endpoint /v2/classifier", + DeprecationWarning, + stacklevel=2, + ) self.model_name = model_name self.augmented_example_count = augmented_example_count self.batch_size = batch_size @@ -64,20 +71,19 @@ def prompt_train(self, prompts: dict): """ try: for class_name, prompt in prompts.items(): - logger.info( - f"Generating examples for class '{class_name}' from prompt {prompt}" - ) - self.add_class(class_name) - - result = self.generate_examples_from_prompt( - class_name, prompt, self.examples.get(class_name, []) - ) + logger.info( + f"Generating examples for class '{class_name}' from prompt {prompt}" + ) + self.add_class(class_name) - self.examples[class_name] = result + result = self.generate_examples_from_prompt( + class_name, prompt, self.examples.get(class_name, []) + ) - # Save partial progress - self.save_examples() + self.examples[class_name] = result + # Save partial progress + self.save_examples() self.train() except Exception as e: @@ -86,6 +92,8 @@ def prompt_train(self, prompts: dict): logger.error( "Consider rerunning the generation task if the error is transient, e.g. 500" ) + return False + return True def train(self): # Form the embeddings diff --git a/lamini/error/error.py b/lamini/error/error.py index 16d44ac..ac8ca5b 100644 --- a/lamini/error/error.py +++ b/lamini/error/error.py @@ -36,3 +36,11 @@ class UnavailableResourceError(LlamaError): class ServerTimeoutError(LlamaError): """Model is still downloading""" + + +class DownloadingModelError(LlamaError): + """Downloading model""" + + +class RequestTimeoutError(LlamaError): + """Request Timeout. Please try again.""" diff --git a/lamini/evaluators/helm/mmlu_evaluator.py b/lamini/evaluators/helm/mmlu_evaluator.py index 2176699..a33f2ba 100644 --- a/lamini/evaluators/helm/mmlu_evaluator.py +++ b/lamini/evaluators/helm/mmlu_evaluator.py @@ -3,7 +3,7 @@ class MMLUEvaluator: def get_prompt(self, question: str) -> str: prompt = "[INST] You'll be presented with a task or question.\n" prompt += "Provide brief thoughts in 1-2 sentences, no longer than 100 words each. Then, respond with a single letter or number representing the multiple-choice option.\n" - prompt += "Output your answer as a JSON object in the format {\"explanation\" : str, \"answer\" : str}\n" + prompt += 'Output your answer as a JSON object in the format {"explanation" : str, "answer" : str}\n' prompt += "Use single quotes within your explanation. End your explanation with a double quote.\n" prompt += f"========== question =========\n{question}\n\n" prompt += "=" * 20 + "\n\n" diff --git a/lamini/generation/base_prompt_object.py b/lamini/generation/base_prompt_object.py index ff75126..649dd22 100644 --- a/lamini/generation/base_prompt_object.py +++ b/lamini/generation/base_prompt_object.py @@ -11,6 +11,7 @@ def __init__(self, prompt: str, response: str = None, data: dict = {}) -> None: self.data = data # Records the input prompt to the first node of the pipeline. self.orig_prompt: PromptObject = None + self.finish_reason = None def get_prompt(self) -> str: prompt = self.prompt diff --git a/lamini/generation/classifier_node.py b/lamini/generation/classifier_node.py index a4e9824..398138f 100644 --- a/lamini/generation/classifier_node.py +++ b/lamini/generation/classifier_node.py @@ -16,9 +16,7 @@ def __init__( model_name: Optional[str] = None, max_tokens: Optional[int] = None, ): - super(ClassifierNode, self).__init__( - model_name=model_name - ) + super(ClassifierNode, self).__init__(model_name=model_name) self.max_tokens = max_tokens self.classifier = classifier diff --git a/lamini/generation/embedding_node.py b/lamini/generation/embedding_node.py index cdc1cc2..0c49e2e 100644 --- a/lamini/generation/embedding_node.py +++ b/lamini/generation/embedding_node.py @@ -33,9 +33,7 @@ def __init__( self, model_name: Optional[str] = None, ): - super(EmbeddingNode, self).__init__( - model_name=model_name - ) + super(EmbeddingNode, self).__init__(model_name=model_name) def generate( self, diff --git a/lamini/generation/index_node.py b/lamini/generation/index_node.py index e497045..47b5d64 100644 --- a/lamini/generation/index_node.py +++ b/lamini/generation/index_node.py @@ -16,9 +16,7 @@ def __init__( model_name: Optional[str] = None, max_tokens: Optional[int] = None, ): - super(IndexNode, self).__init__( - model_name=model_name - ) + super(IndexNode, self).__init__(model_name=model_name) self.max_tokens = max_tokens self.index = index self.index_top_k = index_top_k diff --git a/lamini/generation/process_generation_batch.py b/lamini/generation/process_generation_batch.py index 8f44a2c..284d0b5 100644 --- a/lamini/generation/process_generation_batch.py +++ b/lamini/generation/process_generation_batch.py @@ -1,6 +1,8 @@ +import asyncio import logging from lamini.api.rest_requests import make_async_web_request +from lamini.api.utils.batch import Batch logger = logging.getLogger(__name__) @@ -56,8 +58,13 @@ def can_submit_query(): f"capacity remaining after query: {reservation_api.capacity_remaining}" ) reservation_api.poll_for_reservation.set() - for i, prompt_obj in enumerate(batch["prompt"]): - prompt_obj.response = result[i] + if batch["type"] == "embedding": + for i, prompt_obj in enumerate(batch["prompt"]): + prompt_obj.response = result[i] + else: + for i, prompt_obj in enumerate(batch["prompt"]): + prompt_obj.response = result["outputs"][i] + prompt_obj.finish_reason = result["finish_reason"][i] async def query_api(client, key, url, json, type): @@ -66,7 +73,18 @@ async def query_api(client, key, url, json, type): result = await make_async_web_request(client, key, url, "post", json) result = result["embedding"] else: - result = await make_async_web_request(client, key, url, "post", json) + batch_api = Batch() + submit_response = batch_api.submit( + prompt=json["prompt"], + model_name=json["model_name"], + output_type=json["output_type"], + max_new_tokens=json["max_new_tokens"], + ) # TODO: Don't resubmit work if an error is thrown in the while loop + while True: + await asyncio.sleep(5) + result = batch_api.check_result(submit_response["id"]) + if result: + break return result diff --git a/pyproject.toml b/pyproject.toml index a47ddf1..b7d868f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,9 +4,9 @@ build-backend = "setuptools.build_meta" [project] name = "lamini" -version = "3.0.5" +version = "3.1.0" authors = [ - { name="PowerML", email="info@powerml.co" }, + { name="Lamini", email="info@lamini.ai" }, ] description = "Build on large language models faster" readme = "README.md"