Skip to content

Latest commit

 

History

History
114 lines (77 loc) · 5.1 KB

README.md

File metadata and controls

114 lines (77 loc) · 5.1 KB

Data Pipelines

This guide presents an example of using Lamini LLM pipeline to implement a performant and reliable LLM processing workflow to generate meaningful questions and answers from earning call transcripts of publicly-traded companies. The pipeline can be seen as "chat with earning scripts in batches".

The source code is in generate_data.py, and we'll walk through the code in the rest of this guide. You can read the source code and comments in generate_data.py to better understand how GenerationPipeline works.

We use Llama 3 in this guide. Llama 3 can read english and reason over it. We insert processing before and after calling to Llama3 inference RPCs.

Run the follow script to have Llama 3 read through earnings calls, pretend to be a financial analyst, and ask relevant questions, and answer them using the source text.

cd 05_data_pipeline
python3 generate_data.py

We are only generating QA for the first line for this example since the transcript is massive. Below is a sample of the output of the data pipeline.

{
  "company": "WPP",
  "question": "What is the percentage growth rate of WPP's business in Germany in Q1, according to Mark Read?",
  "answer": "16%"
}
{
  "company": "GDOT",
  "question": "What is the size of the asset size that GDOT aims to maintain to protect its revenue",
  "answer": "According to the transcript, GDOT aims to maintain an asset size of $10 billion or less to protect its revenue"
}

Pipelines for performance and fault-tolerance

The Lamini LLM pipeline will automatically distribute your LLM calls over the entire cluster so you don't have to think about thread pools and batching.

LLMs are extremely computationally intensive. Processing even a modest amount of data (e.g. GBs) may require hundreds of GPUs to process quickly. So we recommend using this interface for any data processing with more than ~100 LLM calls.

Pipeline also has automated retry to make sure transient failures in calling Llama 3 inference RPCs do not break down the whole pipeline.

Building Lamini pipeline

Overview

A Lamini LLM pipeline is a series of stages. Each stage is implemented as a subclass of GenerationNode class. Each stage accepts an AsyncGenerator and produces another AsyncGenerator.

In this guide, the pipeline is defined in QuestionAnswerPipeline. It has two stages: QuestionGenerator and AnswerGenerator, as shown in the forward() function below.

class QuestionAnswerPipeline(GenerationPipeline):
def __init__(self):
super(QuestionAnswerPipeline, self).__init__()
self.question_generator = QuestionGenerator()
self.answer_generator = AnswerGenerator()
def forward(self, x):
x = self.question_generator(x, output_type={
"question_1": "str",
"question_2": "str",
"question_3": "str",
})
x = self.answer_generator(x)
return x

We need to provide input to and save the results from the pipeline. This is shown in run_pipeline() below, where the input was provided by load_earnings_call(), and the results are saved by save_answers():

async def run_pipeline():
earnings_calls = load_earnings_calls()
answers = QuestionAnswerPipeline().call(earnings_calls)
await save_answers(answers)

Data Loading

The input to the pipeline is provided by load_earnings_call(), which is an AsyncGenerator, because GenerationNode subclasses requires an input as AsyncGenerator.

async def load_earnings_calls():
path = "/app/lamini-earnings-sdk/data/test_set_transcripts.jsonl"
with jsonlines.open(path) as reader:
for line in itertools.islice(reader, 1):
logger.info(f"Loaded earnings call for {line['ticker']}")
yield PromptObject(prompt="", data=line)

QuestionGenerator

The first stage reads a passage from an earnings call, and ask LLMs to generate three questions about it. This is achieved by the prompt on line 79 in make_prompt() and output_type in postprocess() to force it to generate three questions and automatically parse them:

def postprocess(self, obj: PromptObject):
response = obj.response
questions = [
response["question_1"],
response["question_2"],
response["question_3"],
]
for question in questions:
ans = PromptObject(prompt=question, data=obj.data.copy())
yield ans
def make_prompt(self, obj):
prompt = (
"<s>[INSTR]You are a financial analyst with extensive experience at Goldman Sachs."
)
prompt += "You are reading the earnings call transcript for the following company:\n\n"
prompt += "====================\n\n"
prompt += get_company_info(obj) + "\n"
prompt += "====================\n\n"
prompt += (
"You are reading the following section of the earnings call transcript:\n\n"
)
prompt += "====================\n\n"
prompt += obj.data["transcript"]
prompt += "====================\n\n"
prompt += "Consider the numbers in the transcript. "
prompt += "Ask three questions about the numbers in the transcript that require precise answers. "
prompt += "Only ask questions that can be answered using the transcript."
prompt +="[/INSTR]"
return prompt

preprocess & postprocess

One can define their own preprocess() to transform an PromptObject of a GenerationNode before passing it to remote LLM inference API. Additionally, postprocess() to transfrom the result from LLM inference API.

In this example, QuestionGenerator has its own preprocess() & postprocess():

def preprocess(self, obj: PromptObject):
obj.prompt = self.make_prompt(obj)
logger.info(f"Generating question for {obj.data['ticker']}, {obj.data['q']}")
def postprocess(self, obj: PromptObject):
response = obj.response
questions = [
response["question_1"],
response["question_2"],
response["question_3"],
]
for question in questions:
ans = PromptObject(prompt=question, data=obj.data.copy())
yield ans

AnswerGenerator

The answer generator is similar, just with a different prompt. You can control it by editing the prompt.

class AnswerGenerator(GenerationNode):
def __init__(self):
super(AnswerGenerator, self).__init__(
model_name="meta-llama/Meta-Llama-3-8B-Instruct", max_new_tokens=150
)
def postprocess(self, obj: PromptObject):
logger.info(f"Generated answer for {obj}")
def preprocess(self, obj: PromptObject):
obj.data["question"] = obj.prompt
obj.prompt = self.make_prompt(obj)
def make_prompt(self, obj: PromptObject):
prompt = (
"<s>[INSTR] You are a financial analyst with extensive experience at Goldman Sachs."
)
prompt += "You are reading the earnings call transcript for the following company:\n\n"
prompt += "====================\n\n"
prompt += get_company_info(obj)
prompt += "====================\n\n"
prompt += (
"You are reading the following section of the earnings call transcript:\n\n"
)
prompt += "====================\n\n"
prompt += obj.data["transcript"] + "\n"
prompt += "====================\n\n"
prompt += "Consider the numbers in the transcript. "
prompt += "If the answer to the question cannot be found in the transcript, reply that you do not know. "
prompt += "Answer the following questions about the numbers in the transcript. "
prompt += obj.prompt
prompt += "[/INSTR]"
return prompt

Saving results

The output of the final GenerationNode is an AsyncGenerator that should be saved somewhere. This is done in save_answers(), which uses async for to iterator through the results, and write them into an output file.

async def save_answers(answers):
path = "/app/lamini-earnings-sdk/data/results/generated_q_a.jsonl"
with jsonlines.open(path, "w") as writer:
pbar = tqdm(desc="Saving answers", unit=" answers")
async for answer in answers:
answer = {
"ticker": answer.data["ticker"],
"q": answer.data["q"],
"date": answer.data["date"],
"transcript": answer.data["transcript"],
"prompt": answer.prompt,
"question": answer.data["question"],
"answer": answer.response["output"],
}
writer.write(answer)
pbar.update()