This repository has been archived by the owner on Apr 18, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
145 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
""" | ||
Module for monitoring Anthropic API calls. | ||
""" | ||
|
||
import time | ||
from .__helpers import send_data | ||
|
||
# pylint: disable=too-many-arguments | ||
def init(llm, doku_url, api_key, environment, application_name, skip_resp): | ||
""" | ||
Initialize Anthropic integration with Doku. | ||
Args: | ||
llm: The Anthropic function to be patched. | ||
doku_url (str): Doku URL. | ||
api_key (str): Authentication api_key. | ||
environment (str): Doku environment. | ||
application_name (str): Doku application name. | ||
skip_resp (bool): Skip response processing. | ||
""" | ||
|
||
original_messages_create = llm.messages.create | ||
|
||
async def patched_messages_create(*args, **kwargs): | ||
""" | ||
Patched version of Anthropic's messages.create method. | ||
Args: | ||
*args: Variable positional arguments. | ||
**kwargs: Variable keyword arguments. | ||
Returns: | ||
AnthropicResponse: The response from Anthropic's completions.create. | ||
""" | ||
streaming = kwargs.get('stream', False) | ||
start_time = time.time() | ||
if streaming: | ||
async def stream_generator(): | ||
accumulated_content = "" | ||
async for event in await original_messages_create(*args, **kwargs): | ||
if event.type == "message_start": | ||
response_id = event.message.id | ||
prompt_tokens = event.message.usage.input_tokens | ||
if event.type == "content_block_delta": | ||
accumulated_content += event.delta.text | ||
if event.type == "message_delta": | ||
completion_tokens = event.usage.output_tokens | ||
yield event | ||
end_time = time.time() | ||
duration = end_time - start_time | ||
message_prompt = kwargs.get('messages', "No prompt provided") | ||
formatted_messages = [] | ||
for message in message_prompt: | ||
role = message["role"] | ||
content = message["content"] | ||
|
||
if isinstance(content, list): | ||
content_str = ", ".join( | ||
#pylint: disable=line-too-long | ||
f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" | ||
if 'type' in item else f"text: {item['text']}" | ||
for item in content | ||
) | ||
formatted_messages.append(f"{role}: {content_str}") | ||
else: | ||
formatted_messages.append(f"{role}: {content}") | ||
|
||
prompt = "\n".join(formatted_messages) | ||
data = { | ||
"llmReqId": response_id, | ||
"environment": environment, | ||
"applicationName": application_name, | ||
"sourceLanguage": "python", | ||
"endpoint": "anthropic.messages", | ||
"skipResp": skip_resp, | ||
"requestDuration": duration, | ||
"model": kwargs.get('model', "command"), | ||
"prompt": prompt, | ||
"response": accumulated_content, | ||
"promptTokens": prompt_tokens, | ||
"completionTokens": completion_tokens, | ||
} | ||
data["totalTokens"] = data["completionTokens"] + data["promptTokens"] | ||
|
||
send_data(data, doku_url, api_key) | ||
|
||
return stream_generator() | ||
else: | ||
start_time = time.time() | ||
response = await original_messages_create(*args, **kwargs) | ||
end_time = time.time() | ||
duration = end_time - start_time | ||
message_prompt = kwargs.get('messages', "No prompt provided") | ||
formatted_messages = [] | ||
|
||
for message in message_prompt: | ||
role = message["role"] | ||
content = message["content"] | ||
|
||
if isinstance(content, list): | ||
content_str = ", ".join( | ||
f"{item['type']}: {item['text'] if 'text' in item else item['image_url']}" | ||
if 'type' in item else f"text: {item['text']}" | ||
for item in content | ||
) | ||
formatted_messages.append(f"{role}: {content_str}") | ||
else: | ||
formatted_messages.append(f"{role}: {content}") | ||
|
||
prompt = "\n".join(formatted_messages) | ||
|
||
model = kwargs.get('model') | ||
|
||
prompt_tokens = response.usage.input_tokens | ||
completion_tokens = response.usage.output_tokens | ||
|
||
data = { | ||
"llmReqId": response.id, | ||
"environment": environment, | ||
"applicationName": application_name, | ||
"sourceLanguage": "python", | ||
"endpoint": "anthropic.messages", | ||
"skipResp": skip_resp, | ||
"completionTokens": completion_tokens, | ||
"promptTokens": prompt_tokens, | ||
"requestDuration": duration, | ||
"model": model, | ||
"prompt": prompt, | ||
"finishReason": response.stop_reason, | ||
"response": response.content[0].text | ||
} | ||
|
||
send_data(data, doku_url, api_key) | ||
|
||
return response | ||
|
||
|
||
llm.messages.create = patched_messages_create |