diff --git a/dot-env.template b/dot-env.template index 597c85260..d5f3cebf6 100644 --- a/dot-env.template +++ b/dot-env.template @@ -60,3 +60,8 @@ SIMLI_FACE_ID=... # Krisp KRISP_MODEL_PATH=... + +# Langfuse +LANGFUSE_HOST=... +LANGFUSE_PUBLIC_KEY=... +LANGFUSE_SECRET_KEY=... diff --git a/examples/foundational/28-langfuse-llm-tracing.py b/examples/foundational/28-langfuse-llm-tracing.py new file mode 100644 index 000000000..53aa96546 --- /dev/null +++ b/examples/foundational/28-langfuse-llm-tracing.py @@ -0,0 +1,77 @@ +# +# Copyright (c) 2024, Daily +# +# SPDX-License-Identifier: BSD 2-Clause License +# + +import asyncio +import aiohttp +import os +import sys + +from langfuse.decorators import observe, langfuse_context + +from pipecat.frames.frames import EndFrame, LLMMessagesFrame +from pipecat.pipeline.pipeline import Pipeline +from pipecat.pipeline.runner import PipelineRunner +from pipecat.pipeline.task import PipelineTask +from pipecat.services.cartesia import CartesiaTTSService +from pipecat.services.openai import OpenAILLMService +from pipecat.transports.services.daily import DailyParams, DailyTransport + +from runner import configure + +from loguru import logger + +from dotenv import load_dotenv + +load_dotenv(override=True) + +logger.remove(0) +logger.add(sys.stderr, level="DEBUG") + +@observe() +async def main(): + + async with aiohttp.ClientSession() as session: + (room_url, _) = await configure(session) + + langfuse_context.update_current_trace( + # session_id=, # fill in with session_id + # user_id=user_id, # fill in with user_id/participant + metadata={ + "room_url": room_url + }, tags=["langfuse", "llm", "daily"] + ) + + transport = DailyTransport( + room_url, None, "Say One Thing From an LLM", DailyParams(audio_out_enabled=True) + ) + + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="79a125e8-cd45-4c13-8a67-188112f4dd22", # British Lady + ) + + llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o") + + messages = [ + { + "role": "system", + "content": "You are an LLM in a WebRTC session, and this is a 'hello world' demo. Say hello to the world.", + } + ] + + runner = PipelineRunner() + + task = PipelineTask(Pipeline([llm, tts, transport.output()])) + + @transport.event_handler("on_first_participant_joined") + async def on_first_participant_joined(transport, participant): + await task.queue_frames([LLMMessagesFrame(messages), EndFrame()]) + + await runner.run(task) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index f3f7d6050..5e311319d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,7 @@ gstreamer = [ "pygobject~=3.48.2" ] fireworks = [ "openai~=1.57.2" ] krisp = [ "pipecat-ai-krisp~=0.3.0" ] langchain = [ "langchain~=0.2.14", "langchain-community~=0.2.12", "langchain-openai~=0.1.20" ] +langfuse = [ "langfuse==2.54.1"] livekit = [ "livekit~=0.17.5", "livekit-api~=0.7.1", "tenacity~=8.5.0" ] lmnt = [ "lmnt~=1.1.4" ] local = [ "pyaudio~=0.2.14" ] diff --git a/src/pipecat/services/openai.py b/src/pipecat/services/openai.py index 43ad16536..00789818d 100644 --- a/src/pipecat/services/openai.py +++ b/src/pipecat/services/openai.py @@ -64,6 +64,29 @@ raise Exception(f"Missing module: {e}") +try: + from langfuse.openai import AsyncOpenAI + from langfuse.decorators import observe +except ModuleNotFoundError as e: + logger.warning(f"Langfuse is not installed. Exception: {e}") + logger.warning( + "Langfuse integration is optional. To enable it, install the Langfuse package with " + "`pip install pipecat-ai[langfuse]` and set the `LANGFUSE_HOST`, " + "`LANGFUSE_PUBLIC_KEY`, and `LANGFUSE_SECRET_KEY` environment variables." + ) + + # Dummy observe decorator + def observe(*args, **kwargs): + def decorator(func): + async def wrapper(*func_args, **func_kwargs): + # Log or print that the dummy observe is being used, if needed + logger.debug("Using dummy observe decorator.") + return await func(*func_args, **func_kwargs) + + return wrapper + + return decorator + ValidVoice = Literal["alloy", "echo", "fable", "onyx", "nova", "shimmer"] VALID_VOICES: Dict[str, ValidVoice] = { @@ -144,6 +167,7 @@ def create_client(self, api_key=None, base_url=None, **kwargs): def can_generate_metrics(self) -> bool: return True + @observe async def get_chat_completions( self, context: OpenAILLMContext, messages: List[ChatCompletionMessageParam] ) -> AsyncStream[ChatCompletionChunk]: