From 021fbcf162714c123eb6faa07653822e9cedc000 Mon Sep 17 00:00:00 2001 From: patcher99 Date: Fri, 8 Mar 2024 15:55:09 +0530 Subject: [PATCH] async anthropic support --- src/dokumetry/__init__.py | 8 +- src/dokumetry/async_anthropic.py | 138 +++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 src/dokumetry/async_anthropic.py diff --git a/src/dokumetry/__init__.py b/src/dokumetry/__init__.py index f579eb0..d82ee78 100644 --- a/src/dokumetry/__init__.py +++ b/src/dokumetry/__init__.py @@ -4,7 +4,9 @@ from .openai import init as init_openai from .anthropic import init as init_anthropic +from .async_anthropic import init as init_async_anthropic from .cohere import init as init_cohere +from anthropic import AsyncAnthropic, Anthropic # pylint: disable=too-few-public-methods class DokuConfig: @@ -49,5 +51,9 @@ def init(llm, doku_url, api_key, environment="default", application_name="defaul init_cohere(llm, doku_url, api_key, environment, application_name, skip_resp) return elif hasattr(llm, 'messages') and callable(llm.messages.create): - init_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) + if isinstance(llm, AsyncAnthropic): + init_async_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) + elif isinstance(llm, Anthropic): + init_anthropic(llm, doku_url, api_key, environment, application_name, skip_resp) + return diff --git a/src/dokumetry/async_anthropic.py b/src/dokumetry/async_anthropic.py new file mode 100644 index 0000000..17324a4 --- /dev/null +++ b/src/dokumetry/async_anthropic.py @@ -0,0 +1,138 @@ +""" +Module for monitoring Anthropic API calls. +""" + +import time +from .__helpers import send_data + +# pylint: disable=too-many-arguments +def init(llm, doku_url, api_key, environment, application_name, skip_resp): + """ + Initialize Anthropic integration with Doku. + + Args: + llm: The Anthropic function to be patched. + doku_url (str): Doku URL. + api_key (str): Authentication api_key. + environment (str): Doku environment. + application_name (str): Doku application name. + skip_resp (bool): Skip response processing. + """ + + original_messages_create = llm.messages.create + + async def patched_messages_create(*args, **kwargs): + """ + Patched version of Anthropic's messages.create method. + + Args: + *args: Variable positional arguments. + **kwargs: Variable keyword arguments. + + Returns: + AnthropicResponse: The response from Anthropic's completions.create. + """ + streaming = kwargs.get('stream', False) + start_time = time.time() + if streaming: + async def stream_generator(): + accumulated_content = "" + async for event in await original_messages_create(*args, **kwargs): + if event.type == "message_start": + response_id = event.message.id + prompt_tokens = event.message.usage.input_tokens + if event.type == "content_block_delta": + accumulated_content += event.delta.text + if event.type == "message_delta": + completion_tokens = event.usage.output_tokens + yield event + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + #pylint: disable=line-too-long + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + data = { + "llmReqId": response_id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "anthropic.messages", + "skipResp": skip_resp, + "requestDuration": duration, + "model": kwargs.get('model', "command"), + "prompt": prompt, + "response": accumulated_content, + "promptTokens": prompt_tokens, + "completionTokens": completion_tokens, + } + data["totalTokens"] = data["completionTokens"] + data["promptTokens"] + + send_data(data, doku_url, api_key) + + return stream_generator() + else: + start_time = time.time() + response = await original_messages_create(*args, **kwargs) + end_time = time.time() + duration = end_time - start_time + message_prompt = kwargs.get('messages', "No prompt provided") + formatted_messages = [] + + for message in message_prompt: + role = message["role"] + content = message["content"] + + if isinstance(content, list): + content_str = ", ".join( + f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" + if 'type' in item else f"text: {item['text']}" + for item in content + ) + formatted_messages.append(f"{role}: {content_str}") + else: + formatted_messages.append(f"{role}: {content}") + + prompt = "\n".join(formatted_messages) + + model = kwargs.get('model') + + prompt_tokens = response.usage.input_tokens + completion_tokens = response.usage.output_tokens + + data = { + "llmReqId": response.id, + "environment": environment, + "applicationName": application_name, + "sourceLanguage": "python", + "endpoint": "anthropic.messages", + "skipResp": skip_resp, + "completionTokens": completion_tokens, + "promptTokens": prompt_tokens, + "requestDuration": duration, + "model": model, + "prompt": prompt, + "finishReason": response.stop_reason, + "response": response.content[0].text + } + + send_data(data, doku_url, api_key) + + return response + + + llm.messages.create = patched_messages_create \ No newline at end of file