diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 7a56dc2..8d75854 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -7,14 +7,14 @@ on: branches: [ "main" ] workflow_dispatch: schedule: - - cron: '0 9 * * *' + - cron: '0 0 * * 0' env: OPENAI_API_TOKEN: ${{ secrets.OPENAI_API_TOKEN }} COHERE_API_TOKEN: ${{ secrets.COHERE_API_TOKEN }} ANTHROPIC_API_TOKEN: ${{ secrets.ANTHROPIC_API_TOKEN }} - DOKU_URL: ${{ secrets.DOKU_URL }} - DOKU_TOKEN: ${{ secrets.DOKU_TOKEN }} + MISTRAL_API_TOKEN: ${{ secrets.MISTRAL_API_TOKEN }} + DOKU_URL: http://127.0.0.1:9044 jobs: build: @@ -27,6 +27,24 @@ jobs: steps: - uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@0d103c3126aa41d772a8362f6aa67afac040f80c # v3.1.0 + + - name: Setup Doku Stack + run: docker-compose up -d + + - name: Sleep for 30 seconds + run: sleep 30 + + - name: Make API Request and Set DOKU_TOKEN + run: | + RESPONSE=$(curl -X POST $DOKU_URL/api/keys \ + -H 'Authorization: ""' \ + -H 'Content-Type: application/json' \ + -d '{"Name": "GITHUBACTION"}') + MESSAGE=$(echo $RESPONSE | jq -r '.message') + echo "DOKU_TOKEN=${MESSAGE}" >> $GITHUB_ENV + - name: Set up Python ${{ matrix.python-version }} uses: actions/setup-python@v3 with: diff --git a/README.md b/README.md index 3078630..f9b0aff 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ - ✅ OpenAI - ✅ Anthropic - ✅ Cohere +- ✅ Mistral Deployed as the backbone for all your LLM monitoring needs, `dokumetry` channels crucial usage data directly to Doku, streamlining the tracking process. Unlock efficient and effective observability for your LLM applications with DokuMetry. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..ebc8adf --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,52 @@ +version: '3.8' + +services: + clickhouse: + image: clickhouse/clickhouse-server:24.1.5 + container_name: clickhouse + environment: + CLICKHOUSE_PASSWORD: ${DOKU_DB_PASSWORD:-DOKU} + CLICKHOUSE_USER: ${DOKU_DB_USER:-default} + volumes: + - clickhouse-data:/var/lib/clickhouse + ports: + - "9000:9000" + - "8123:8123" + restart: always + + doku-ingester: + image: ghcr.io/dokulabs/doku-ingester:latest + container_name: doku-ingester + environment: + DOKU_DB_HOST: clickhouse + DOKU_DB_PORT: 9000 + DOKU_DB_NAME: ${DOKU_DB_NAME:-default} + DOKU_DB_USER: ${DOKU_DB_USER:-default} + DOKU_DB_PASSWORD: ${DOKU_DB_PASSWORD:-DOKU} + ports: + - "9044:9044" + depends_on: + - clickhouse + restart: always + + doku-client: + image: ghcr.io/dokulabs/doku-client:latest + container_name: doku-client + environment: + INIT_DB_HOST: clickhouse + INIT_DB_PORT: 8123 + INIT_DB_DATABASE: ${DOKU_DB_NAME:-default} + INIT_DB_USERNAME: ${DOKU_DB_USER:-default} + INIT_DB_PASSWORD: ${DOKU_DB_PASSWORD:-DOKU} + SQLITE_DATABASE_URL: file:/app/client/data/data.db + ports: + - "3000:3000" + depends_on: + - clickhouse + volumes: + - doku-client-data:/app/client/data + restart: always + +volumes: + clickhouse-data: + doku-client-data: \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 8096fdd..4ebcc25 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "dokumetry" -version = "0.1.0" +version = "0.1.1" description = "A Python library for tracking LLM and GenAI usage and sending the usage data to Doku" authors = ["Doku Labs"] repository = "https://github.com/dokulabs/dokumetry-python" @@ -11,8 +11,9 @@ keywords = ["openai", "anthropic", "claude", "cohere", "llm monitoring", "observ [tool.poetry.dependencies] python = "^3.7.1" requests = "^2.26.0" -openai = "^1.13.0" +openai = "^1.1.0" anthropic = "^0.19.0" +mistralai = "^0.1.5" [build-system] requires = ["poetry-core>=1.1.0"] diff --git a/src/dokumetry/__init__.py b/src/dokumetry/__init__.py index 4071d20..29f0f3a 100644 --- a/src/dokumetry/__init__.py +++ b/src/dokumetry/__init__.py @@ -2,14 +2,19 @@ __init__ module for dokumetry package. """ from anthropic import AsyncAnthropic, Anthropic - -from openai import AsyncOpenAI, OpenAI +from openai import AsyncOpenAI, OpenAI, AzureOpenAI, AsyncAzureOpenAI +from mistralai.async_client import MistralAsyncClient +from mistralai.client import MistralClient from .openai import init as init_openai from .async_openai import init as init_async_openai +from .azure_openai import init as init_azure_openai +from .async_azure_openai import init as init_async_azure_openai from .anthropic import init as init_anthropic from .async_anthropic import init as init_async_anthropic from .cohere import init as init_cohere +from .mistral import init as init_mistral +from .async_mistral import init as init_async_mistral # pylint: disable=too-few-public-methods class DokuConfig: @@ -24,7 +29,7 @@ class DokuConfig: application_name = None skip_resp = None -# pylint: disable=too-many-arguments, line-too-long +# pylint: disable=too-many-arguments, line-too-long, too-many-return-statements def init(llm, doku_url, api_key, environment="default", application_name="default", skip_resp=False): """ Initialize Doku configuration based on the provided function. @@ -52,14 +57,25 @@ def init(llm, doku_url, api_key, environment="default", application_name="defaul elif isinstance(llm, AsyncOpenAI): init_async_openai(llm, doku_url, api_key, environment, application_name, skip_resp) return - # pylint: disable=no-else-return + # pylint: disable=no-else-return, line-too-long + if hasattr(llm, 'moderations') and callable(llm.chat.completions.create) and ('.openai.azure.com/' in str(llm.base_url)): + if isinstance(llm, AzureOpenAI): + init_azure_openai(llm, doku_url, api_key, environment, application_name, skip_resp) + elif isinstance(llm, AsyncAzureOpenAI): + init_async_azure_openai(llm, doku_url, api_key, environment, application_name, skip_resp) + return + if isinstance(llm, MistralClient): + init_mistral(llm, doku_url, api_key, environment, application_name, skip_resp) + return + elif isinstance(llm, MistralAsyncClient): + init_async_mistral(llm, doku_url, api_key, environment, application_name, skip_resp) + return + elif isinstance(llm, AsyncAnthropic): + init_async_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) + return + elif isinstance(llm, Anthropic): + init_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) + return elif hasattr(llm, 'generate') and callable(llm.generate): init_cohere(llm, doku_url, api_key, environment, application_name, skip_resp) return - elif hasattr(llm, 'messages') and callable(llm.messages.create): - if isinstance(llm, AsyncAnthropic): - init_async_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) - elif isinstance(llm, Anthropic): - init_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) - - return diff --git a/src/dokumetry/async_azure_openai.py b/src/dokumetry/async_azure_openai.py new file mode 100644 index 0000000..9de8553 --- /dev/null +++ b/src/dokumetry/async_azure_openai.py @@ -0,0 +1,335 @@ +# pylint: disable=duplicate-code +""" +Module for monitoring OpenAI API calls. +""" + +import time +from .__helpers import send_data + +# pylint: disable=too-many-locals +# pylint: disable=too-many-arguments +# pylint: disable=too-many-statements +def init(llm, doku_url, api_key, environment, application_name, skip_resp): + """ + Initialize OpenAI monitoring for Doku. + + Args: + llm: The OpenAI function to be patched. + doku_url (str): Doku URL. + api_key (str): Doku Authentication api_key. + environment (str): Doku environment. + application_name (str): Doku application name. + skip_resp (bool): Skip response processing. + """ + + original_chat_create = llm.chat.completions.create + original_completions_create = llm.completions.create + original_embeddings_create = llm.embeddings.create + original_images_create = llm.images.generate + + async def llm_chat_completions(*args, **kwargs): + """ + Patched version of OpenAI's chat completions create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from OpenAI's chat completions create method. + """ + is_streaming = kwargs.get('stream', False) + start_time = time.time() + #pylint: disable=no-else-return + if is_streaming: + async def stream_generator(): + accumulated_content = "" + async for chunk in await original_chat_create(*args, **kwargs): + #pylint: disable=line-too-long + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + content = chunk.choices[0].delta.content + if content: + accumulated_content += content + yield chunk + response_id = chunk.id + model = chunk.model + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + #pylint: disable=line-too-long + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + data = { + "environment": environment, + "llmReqId": response_id, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.chat.completions", + "skipResp": skip_resp, + "requestDuration": duration, + "model": "azure_" + model, + "prompt": prompt, + "response": accumulated_content, + } + + send_data(data, doku_url, api_key) + + return stream_generator() + else: + start_time = time.time() + response = await original_chat_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + + data = { + "llmReqId": response.id, + "endpoint": "azure.chat.completions", + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + } + + if "tools" not in kwargs: + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + data["finishReason"] = response.choices[0].finish_reason + + if "n" not in kwargs or kwargs["n"] == 1: + data["response"] = response.choices[0].message.content + else: + i = 0 + while i < kwargs["n"]: + data["response"] = response.choices[i].message.content + i += 1 + send_data(data, doku_url, api_key) + return response + elif "tools" in kwargs: + data["response"] = "Function called with tools" + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + + send_data(data, doku_url, api_key) + + return response + + async def llm_completions(*args, **kwargs): + """ + Patched version of OpenAI's completions create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from OpenAI's completions create method. + """ + start_time = time.time() + streaming = kwargs.get('stream', False) + #pylint: disable=no-else-return + if streaming: + async def stream_generator(): + accumulated_content = "" + async for chunk in await original_completions_create(*args, **kwargs): + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'text'): + content = chunk.choices[0].text + if content: + accumulated_content += content + yield chunk + response_id = chunk.id + model = chunk.model + end_time = time.time() + duration = end_time - start_time + prompt = kwargs.get('prompt', "No prompt provided") + data = { + "endpoint": "azure.completions", + "llmReqId": response_id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "skipResp": skip_resp, + "requestDuration": duration, + "model": "azure_" + model, + "prompt": prompt, + "response": accumulated_content, + } + + send_data(data, doku_url, api_key) + + return stream_generator() + else: + start_time = time.time() + response = await original_completions_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + prompt = kwargs.get('prompt', "No prompt provided") + + data = { + "environment": environment, + "applicationName": application_name, + "llmReqId": response.id, + "sourceLanguage": "python", + "endpoint": "azure.completions", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + } + + if "tools" not in kwargs: + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + data["finishReason"] = response.choices[0].finish_reason + + if "n" not in kwargs or kwargs["n"] == 1: + data["response"] = response.choices[0].text + else: + i = 0 + while i < kwargs["n"]: + data["response"] = response.choices[i].text + i += 1 + send_data(data, doku_url, api_key) + return response + elif "tools" in kwargs: + data["response"] = "Function called with tools" + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + + send_data(data, doku_url, api_key) + + return response + + async def patched_embeddings_create(*args, **kwargs): + """ + Patched version of OpenAI's embeddings create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from OpenAI's embeddings create method. + """ + + start_time = time.time() + response = await original_embeddings_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + prompt = ', '.join(kwargs.get('input', [])) + + data = { + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.embeddings", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "promptTokens": response.usage.prompt_tokens, + "totalTokens": response.usage.total_tokens + } + + send_data(data, doku_url, api_key) + + return response + + async def patched_image_create(*args, **kwargs): + """ + Patched version of OpenAI's images generate method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from OpenAI's images generate method. + """ + + start_time = time.time() + response = await original_images_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_dall-e-3" + prompt = kwargs.get('prompt', "No prompt provided") + size = kwargs.get('size', '1024x1024') + + if 'response_format' in kwargs and kwargs['response_format'] == 'b64_json': + image = "b64_json" + else: + image = "url" + + if 'quality' not in kwargs: + quality = "standard" + else: + quality = kwargs['quality'] + + for items in response.data: + data = { + "llmReqId": response.created, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.images.create", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "imageSize": size, + "imageQuality": quality, + "revisedPrompt": items.revised_prompt, + "image": getattr(items, image) + } + + send_data(data, doku_url, api_key) + + return response + + llm.chat.completions.create = llm_chat_completions + llm.completions.create = llm_completions + llm.embeddings.create = patched_embeddings_create + llm.images.generate = patched_image_create diff --git a/src/dokumetry/async_mistral.py b/src/dokumetry/async_mistral.py new file mode 100644 index 0000000..eb341fc --- /dev/null +++ b/src/dokumetry/async_mistral.py @@ -0,0 +1,194 @@ +# pylint: disable=duplicate-code +""" +Module for monitoring Mistral API calls. +""" + +import time +from .__helpers import send_data + +# pylint: disable=too-many-arguments, too-many-statements +def init(llm, doku_url, api_key, environment, application_name, skip_resp): + """ + Initialize Mistral integration with Doku. + + Args: + llm: The Mistral function to be patched. + doku_url (str): Doku URL. + api_key (str): Authentication api_key. + environment (str): Doku environment. + application_name (str): Doku application name. + skip_resp (bool): Skip response processing. + """ + + original_mistral_chat = llm.chat + original_mistral_chat_stream = llm.chat_stream + original_mistral_embeddings = llm.embeddings + + #pylint: disable=too-many-locals + async def patched_chat(*args, **kwargs): + """ + Patched version of Mistral's chat method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + MistalResponse: The response from Mistral's chat. + """ + start_time = time.time() + response = await original_mistral_chat(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message.role + content = message.content + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = " ".join(formatted_messages) + model = kwargs.get('model') + + prompt_tokens = response.usage.prompt_tokens + completion_tokens = response.usage.completion_tokens + total_tokens = response.usage.total_tokens + + data = { + "llmReqId": response.id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.chat", + "skipResp": skip_resp, + "completionTokens": completion_tokens, + "promptTokens": prompt_tokens, + "totalTokens": total_tokens, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "finishReason": response.choices[0].finish_reason, + "response": response.choices[0].message.content + } + + send_data(data, doku_url, api_key) + + return response + + #pylint: disable=too-many-locals + async def patched_chat_stream(*args, **kwargs): + """ + Patched version of Mistral's chat_stream method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + MistalResponse: The response from Mistral's chat_stream. + """ + start_time = time.time() + async def stream_generator(): + accumulated_content = "" + async for event in original_mistral_chat_stream(*args, **kwargs): + response_id = event.id + accumulated_content += event.choices[0].delta.content + if event.usage is not None: + prompt_tokens = event.usage.prompt_tokens + completion_tokens = event.usage.completion_tokens + total_tokens = event.usage.total_tokens + finish_reason = event.choices[0].finish_reason + yield event + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message.role + content = message.content + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = " ".join(formatted_messages) + + data = { + "llmReqId": response_id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.chat", + "skipResp": skip_resp, + "requestDuration": duration, + "model": kwargs.get('model', "command"), + "prompt": prompt, + "response": accumulated_content, + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + "totalTokens": total_tokens, + "finishReason": finish_reason + } + + send_data(data, doku_url, api_key) + + return stream_generator() + + async def patched_embeddings(*args, **kwargs): + """ + Patched version of Cohere's embeddings generate method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + CohereResponse: The response from Cohere's embeddings generate method. + """ + + start_time = time.time() + response = await original_mistral_embeddings(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = kwargs.get('model', "mistral-embed") + prompt = ', '.join(kwargs.get('input', [])) + + data = { + "llmReqId": response.id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.embeddings", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "promptTokens": response.usage.prompt_tokens, + "completionTokens": response.usage.completion_tokens, + "totalTokens": response.usage.total_tokens, + } + + send_data(data, doku_url, api_key) + + return response + + llm.chat = patched_chat + llm.chat_stream = patched_chat_stream + llm.embeddings = patched_embeddings diff --git a/src/dokumetry/async_openai.py b/src/dokumetry/async_openai.py index d7af70f..23bdac2 100644 --- a/src/dokumetry/async_openai.py +++ b/src/dokumetry/async_openai.py @@ -48,11 +48,12 @@ async def llm_chat_completions(*args, **kwargs): async def stream_generator(): accumulated_content = "" async for chunk in await original_chat_create(*args, **kwargs): - #pylint: disable=line-too-long - if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): - content = chunk.choices[0].delta.content - if content: - accumulated_content += content + if len(chunk.choices) > 0: + #pylint: disable=line-too-long + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + content = chunk.choices[0].delta.content + if content: + accumulated_content += content yield chunk response_id = chunk.id end_time = time.time() @@ -170,11 +171,12 @@ async def llm_completions(*args, **kwargs): if streaming: async def stream_generator(): accumulated_content = "" - async for chunk in await original_chat_create(*args, **kwargs): - if hasattr(chunk.choices[0].text, 'content'): - content = chunk.choices[0].text - if content: - accumulated_content += content + async for chunk in await original_completions_create(*args, **kwargs): + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'text'): + content = chunk.choices[0].text + if content: + accumulated_content += content yield chunk response_id = chunk.id end_time = time.time() @@ -258,7 +260,7 @@ async def patched_embeddings_create(*args, **kwargs): end_time = time.time() duration = end_time - start_time model = kwargs.get('model', "No Model provided") - prompt = kwargs.get('input', "No prompt provided") + prompt = ', '.join(kwargs.get('input', [])) data = { "environment": environment, diff --git a/src/dokumetry/azure_openai.py b/src/dokumetry/azure_openai.py new file mode 100644 index 0000000..b418006 --- /dev/null +++ b/src/dokumetry/azure_openai.py @@ -0,0 +1,335 @@ +# pylint: disable=duplicate-code +""" +Module for monitoring Azure OpenAI API calls. +""" + +import time +from .__helpers import send_data + +# pylint: disable=too-many-locals +# pylint: disable=too-many-arguments +# pylint: disable=too-many-statements +def init(llm, doku_url, api_key, environment, application_name, skip_resp): + """ + Initialize Azure OpenAI monitoring for Doku. + + Args: + llm: The Azure OpenAI function to be patched. + doku_url (str): Doku URL. + api_key (str): Doku Authentication api_key. + environment (str): Doku environment. + application_name (str): Doku application name. + skip_resp (bool): Skip response processing. + """ + + original_chat_create = llm.chat.completions.create + original_completions_create = llm.completions.create + original_embeddings_create = llm.embeddings.create + original_images_create = llm.images.generate + + def llm_chat_completions(*args, **kwargs): + """ + Patched version of Azure OpenAI's chat completions create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from Azure OpenAI's chat completions create method. + """ + is_streaming = kwargs.get('stream', False) + start_time = time.time() + #pylint: disable=no-else-return + if is_streaming: + def stream_generator(): + accumulated_content = "" + for chunk in original_chat_create(*args, **kwargs): + #pylint: disable=line-too-long + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + content = chunk.choices[0].delta.content + if content: + accumulated_content += content + yield chunk + response_id = chunk.id + model = chunk.model + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + #pylint: disable=line-too-long + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + data = { + "environment": environment, + "llmReqId": response_id, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.chat.completions", + "skipResp": skip_resp, + "requestDuration": duration, + "model": "azure_" + model, + "prompt": prompt, + "response": accumulated_content, + } + + send_data(data, doku_url, api_key) + + return stream_generator() + else: + start_time = time.time() + response = original_chat_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + + data = { + "llmReqId": response.id, + "endpoint": "azure.chat.completions", + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + } + + if "tools" not in kwargs: + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + data["finishReason"] = response.choices[0].finish_reason + + if "n" not in kwargs or kwargs["n"] == 1: + data["response"] = response.choices[0].message.content + else: + i = 0 + while i < kwargs["n"]: + data["response"] = response.choices[i].message.content + i += 1 + send_data(data, doku_url, api_key) + return response + elif "tools" in kwargs: + data["response"] = "Function called with tools" + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + + send_data(data, doku_url, api_key) + + return response + + def llm_completions(*args, **kwargs): + """ + Patched version of Azure OpenAI's completions create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from Azure OpenAI's completions create method. + """ + start_time = time.time() + streaming = kwargs.get('stream', False) + #pylint: disable=no-else-return + if streaming: + def stream_generator(): + accumulated_content = "" + for chunk in original_completions_create(*args, **kwargs): + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'text'): + content = chunk.choices[0].text + if content: + accumulated_content += content + yield chunk + response_id = chunk.id + model = chunk.model + end_time = time.time() + duration = end_time - start_time + prompt = kwargs.get('prompt', "No prompt provided") + data = { + "endpoint": "azure.completions", + "llmReqId": response_id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "skipResp": skip_resp, + "requestDuration": duration, + "model": "azure_" + model, + "prompt": prompt, + "response": accumulated_content, + } + + send_data(data, doku_url, api_key) + + return stream_generator() + else: + start_time = time.time() + response = original_completions_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + prompt = kwargs.get('prompt', "No prompt provided") + + data = { + "environment": environment, + "applicationName": application_name, + "llmReqId": response.id, + "sourceLanguage": "python", + "endpoint": "azure.completions", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + } + + if "tools" not in kwargs: + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + data["finishReason"] = response.choices[0].finish_reason + + if "n" not in kwargs or kwargs["n"] == 1: + data["response"] = response.choices[0].text + else: + i = 0 + while i < kwargs["n"]: + data["response"] = response.choices[i].text + i += 1 + send_data(data, doku_url, api_key) + return response + elif "tools" in kwargs: + data["response"] = "Function called with tools" + data["completionTokens"] = response.usage.completion_tokens + data["promptTokens"] = response.usage.prompt_tokens + data["totalTokens"] = response.usage.total_tokens + + send_data(data, doku_url, api_key) + + return response + + def patched_embeddings_create(*args, **kwargs): + """ + Patched version of Azure OpenAI's embeddings create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from Azure OpenAI's embeddings create method. + """ + + start_time = time.time() + response = original_embeddings_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_" + response.model + prompt = ', '.join(kwargs.get('input', [])) + + data = { + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.embeddings", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "promptTokens": response.usage.prompt_tokens, + "totalTokens": response.usage.total_tokens + } + + send_data(data, doku_url, api_key) + + return response + + def patched_image_create(*args, **kwargs): + """ + Patched version of Azure OpenAI's images generate method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + OpenAIResponse: The response from Azure OpenAI's images generate method. + """ + + start_time = time.time() + response = original_images_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = "azure_dall-e-3" + prompt = kwargs.get('prompt', "No prompt provided") + size = kwargs.get('size', '1024x1024') + + if 'response_format' in kwargs and kwargs['response_format'] == 'b64_json': + image = "b64_json" + else: + image = "url" + + if 'quality' not in kwargs: + quality = "standard" + else: + quality = kwargs['quality'] + + for items in response.data: + data = { + "llmReqId": response.created, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "azure.images.create", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "imageSize": size, + "imageQuality": quality, + "revisedPrompt": items.revised_prompt, + "image": getattr(items, image) + } + + send_data(data, doku_url, api_key) + + return response + + llm.chat.completions.create = llm_chat_completions + llm.completions.create = llm_completions + llm.embeddings.create = patched_embeddings_create + llm.images.generate = patched_image_create diff --git a/src/dokumetry/mistral.py b/src/dokumetry/mistral.py new file mode 100644 index 0000000..71ac5ca --- /dev/null +++ b/src/dokumetry/mistral.py @@ -0,0 +1,194 @@ +# pylint: disable=duplicate-code +""" +Module for monitoring Mistral API calls. +""" + +import time +from .__helpers import send_data + +# pylint: disable=too-many-arguments, too-many-statements +def init(llm, doku_url, api_key, environment, application_name, skip_resp): + """ + Initialize Mistral integration with Doku. + + Args: + llm: The Mistral function to be patched. + doku_url (str): Doku URL. + api_key (str): Authentication api_key. + environment (str): Doku environment. + application_name (str): Doku application name. + skip_resp (bool): Skip response processing. + """ + + original_mistral_chat = llm.chat + original_mistral_chat_stream = llm.chat_stream + original_mistral_embeddings = llm.embeddings + + #pylint: disable=too-many-locals + def patched_chat(*args, **kwargs): + """ + Patched version of Mistral's chat method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + MistalResponse: The response from Mistral's chat. + """ + start_time = time.time() + response = original_mistral_chat(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message.role + content = message.content + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = " ".join(formatted_messages) + model = kwargs.get('model') + + prompt_tokens = response.usage.prompt_tokens + completion_tokens = response.usage.completion_tokens + total_tokens = response.usage.total_tokens + + data = { + "llmReqId": response.id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.chat", + "skipResp": skip_resp, + "completionTokens": completion_tokens, + "promptTokens": prompt_tokens, + "totalTokens": total_tokens, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "finishReason": response.choices[0].finish_reason, + "response": response.choices[0].message.content + } + + send_data(data, doku_url, api_key) + + return response + + #pylint: disable=too-many-locals + def patched_chat_stream(*args, **kwargs): + """ + Patched version of Mistral's chat_stream method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + MistalResponse: The response from Mistral's chat_stream. + """ + start_time = time.time() + def stream_generator(): + accumulated_content = "" + for event in original_mistral_chat_stream(*args, **kwargs): + response_id = event.id + accumulated_content += event.choices[0].delta.content + if event.usage is not None: + prompt_tokens = event.usage.prompt_tokens + completion_tokens = event.usage.completion_tokens + total_tokens = event.usage.total_tokens + finish_reason = event.choices[0].finish_reason + yield event + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message.role + content = message.content + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = " ".join(formatted_messages) + + data = { + "llmReqId": response_id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.chat", + "skipResp": skip_resp, + "requestDuration": duration, + "model": kwargs.get('model', "command"), + "prompt": prompt, + "response": accumulated_content, + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + "totalTokens": total_tokens, + "finishReason": finish_reason + } + + send_data(data, doku_url, api_key) + + return stream_generator() + + def patched_embeddings(*args, **kwargs): + """ + Patched version of Cohere's embeddings generate method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + CohereResponse: The response from Cohere's embeddings generate method. + """ + + start_time = time.time() + response = original_mistral_embeddings(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + model = kwargs.get('model', "mistral-embed") + prompt = ', '.join(kwargs.get('input', [])) + + data = { + "llmReqId": response.id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "mistral.embeddings", + "skipResp": skip_resp, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "promptTokens": response.usage.prompt_tokens, + "completionTokens": response.usage.completion_tokens, + "totalTokens": response.usage.total_tokens, + } + + send_data(data, doku_url, api_key) + + return response + + llm.chat = patched_chat + llm.chat_stream = patched_chat_stream + llm.embeddings = patched_embeddings diff --git a/src/dokumetry/openai.py b/src/dokumetry/openai.py index 0a7835f..dbdaae6 100644 --- a/src/dokumetry/openai.py +++ b/src/dokumetry/openai.py @@ -48,11 +48,12 @@ def llm_chat_completions(*args, **kwargs): def stream_generator(): accumulated_content = "" for chunk in original_chat_create(*args, **kwargs): - #pylint: disable=line-too-long - if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): - content = chunk.choices[0].delta.content - if content: - accumulated_content += content + if len(chunk.choices) > 0: + #pylint: disable=line-too-long + if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): + content = chunk.choices[0].delta.content + if content: + accumulated_content += content yield chunk response_id = chunk.id end_time = time.time() @@ -170,11 +171,12 @@ def llm_completions(*args, **kwargs): if streaming: def stream_generator(): accumulated_content = "" - for chunk in original_chat_create(*args, **kwargs): - if hasattr(chunk.choices[0].text, 'content'): - content = chunk.choices[0].text - if content: - accumulated_content += content + for chunk in original_completions_create(*args, **kwargs): + if len(chunk.choices) > 0: + if hasattr(chunk.choices[0], 'text'): + content = chunk.choices[0].text + if content: + accumulated_content += content yield chunk response_id = chunk.id end_time = time.time() @@ -258,7 +260,7 @@ def patched_embeddings_create(*args, **kwargs): end_time = time.time() duration = end_time - start_time model = kwargs.get('model', "No Model provided") - prompt = kwargs.get('input', "No prompt provided") + prompt = ', '.join(kwargs.get('input', [])) data = { "environment": environment, diff --git a/tests/requirements.txt b/tests/requirements.txt index 088393e..22f742b 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -4,4 +4,5 @@ openai>=1.1.1 anthropic>=0.19.0 pytest>=7.4.0 requests>=2.31.0 -pylint>=3.0.2 \ No newline at end of file +pylint>=3.0.2 +mistralai>=0.1.5 \ No newline at end of file diff --git a/tests/test_anthropic.py b/tests/test_anthropic.py.hold similarity index 100% rename from tests/test_anthropic.py rename to tests/test_anthropic.py.hold diff --git a/tests/test_azure.py.hold b/tests/test_azure.py.hold new file mode 100644 index 0000000..bd2e21c --- /dev/null +++ b/tests/test_azure.py.hold @@ -0,0 +1,99 @@ +""" +Azure OpenAI Test Suite + +This module contains a suite of tests for OpenAI functionality using the OpenAI Python library. +It includes tests for various OpenAI API endpoints such as completions, chat completions, +embeddings creation, fine-tuning job creation, image generation, image variation creation, +and audio speech generation. + +The tests are designed to cover different aspects of OpenAI's capabilities and serve as a +validation mechanism for the integration with the Doku monitoring system. + +Global client and initialization are set up for the OpenAI client and Doku monitoring. + +Environment Variables: + - AZURE_OPENAI_API_TOKEN: OpenAI API key for authentication. + - DOKU_URL: Doku URL for monitoring data submission. + - DOKU_TOKEN: Doku authentication api_key. + +Note: Ensure the environment variables are properly set before running the tests. +""" + +import os +from openai import AzureOpenAI +import dokumetry + +# Global client +client = AzureOpenAI( + api_key=os.getenv("AZURE_OPENAI_API_TOKEN"), + api_version = "2024-02-01", + azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") +) + +azure_chat_model = os.getenv("AZURE_OPENAI_CHAT_MODEL") +azure_embedding_model = os.getenv("AZURE_OPENAI_EMBEDDING_MODEL") +azure_image_model = os.getenv("AZURE_OPENAI_IMAGE_MODEL") + +# Global initialization +# pylint: disable=line-too-long +dokumetry.init(llm=client, doku_url=os.getenv("DOKU_URL"), api_key=os.getenv("DOKU_TOKEN"), environment="dokumetry-testing", application_name="dokumetry-python-test", skip_resp=False) + +def test_completion(): + """ + Test the completions. + + Raises: + AssertionError: If the completion response object is not as expected. + """ + + completions_resp = client.completions.create( + model=azure_chat_model, + prompt="Hello world", + max_tokens=100 + ) + assert completions_resp.object == 'text_completion' + +def test_chat_completion(): + """ + Test chat completions. + + Raises: + AssertionError: If the chat completion response object is not as expected. + """ + + chat_completions_resp = client.chat.completions.create( + model=azure_chat_model, + max_tokens=100, + messages=[{"role": "user", "content": "What is Grafana?"}] + ) + assert chat_completions_resp.object == 'chat.completion' + +def test_embedding_creation(): + """ + Test embedding creation. + + Raises: + AssertionError: If the embedding response object is not as expected. + """ + + embeddings_resp = client.embeddings.create( + model=azure_embedding_model, + input="The quick brown fox jumped over the lazy dog", + encoding_format="float" + ) + assert embeddings_resp.data[0].object == 'embedding' + +def test_image_generation(): + """ + Test image generation. + + Raises: + AssertionError: If the image generation response created timestamp is not present. + """ + + image_generation_resp = client.images.generate( + model=azure_chat_model, + prompt='Generate an image for LLM Observability Dashboard', + n=1 + ) + assert image_generation_resp.created is not None diff --git a/tests/test_mistral.py.hold b/tests/test_mistral.py.hold new file mode 100644 index 0000000..56bf5a9 --- /dev/null +++ b/tests/test_mistral.py.hold @@ -0,0 +1,62 @@ +""" +Mistral Test Suite + +This module contains a suite of tests for Mistral functionality +using the Mistral Python library. It includes tests for various +Mistral API endpoints such as text summarization, text generation +with a prompt,text embeddings creation, and chat-based +language understanding. + +The tests are designed to cover different aspects of Mistral's +capabilities and serve as a validation mechanism for the integration +with the Doku monitoring system. + +Global Mistral client and initialization are set up for the +Mistral client and Doku monitoring. + +Environment Variables: + - Mistral_API_TOKEN: Mistral API api_key for authentication. + - DOKU_URL: Doku URL for monitoring data submission. + - DOKU_TOKEN: Doku authentication api_key. + +Note: Ensure the environment variables are properly set before running the tests. +""" + +import os +from mistralai.client import MistralClient +from mistralai.models.chat_completion import ChatMessage +import dokumetry + +# Global Mistral client +client = MistralClient( + api_key=os.getenv("MISTRAL_API_TOKEN") +) + +# Global Mistral initialization +# pylint: disable=line-too-long +dokumetry.init(llm=client, doku_url=os.getenv("DOKU_URL"), api_key=os.getenv("DOKU_TOKEN"), environment="dokumetry-testing", application_name="dokumetry-python-test", skip_resp=False) + +def test_chat(): + """ + Test the 'chat' function of the Mistral client. + """ + messages = [ + ChatMessage(role="user", content="What is the best French cheese?") + ] + + # No streaming + message = client.chat( + model="mistral-large-latest", + messages=messages, + ) + assert message.object == 'chat.completion' + +def test_embeddings(): + """ + Test the 'embeddings' function of the Mistral client. + """ + response = client.embeddings( + model="mistral-embed", + input=["Embed this sentence.", "As well as this one."], + ) + assert response.object == 'list' diff --git a/tests/test_openai.py b/tests/test_openai.py index bf753e5..c00d5ec 100644 --- a/tests/test_openai.py +++ b/tests/test_openai.py @@ -86,7 +86,7 @@ def test_fine_tuning_job_creation(): """ try: fine_tuning_job_resp = client.fine_tuning.jobs.create( - training_file="file-m36cc45komO83VJKAY1qVgeP", + training_file="file-BTkFuN0HKX3bAaOawvDtXgEe", model="gpt-3.5-turbo-1106" ) assert fine_tuning_job_resp.object == 'fine_tuning.job'