-
Notifications
You must be signed in to change notification settings - Fork 1
/
app.py
92 lines (74 loc) · 2.71 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os
import time
import uvicorn
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import RedirectResponse
from src.base.leads_loader.airtable import AirtableLeadLoader
from src.vapi_automation import VapiAutomation
from dotenv import load_dotenv
# Load .env file
load_dotenv()
app = FastAPI()
# Set all CORS enabled origins
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["*"],
)
# Initialize leads loader (Airtable, Sheets, Hubspot or add your own custom CRM)
lead_loader = AirtableLeadLoader(
access_token=os.getenv("AIRTABLE_ACCESS_TOKEN"),
base_id=os.getenv("AIRTABLE_BASE_ID"),
table_name=os.getenv("AIRTABLE_TABLE_NAME"),
)
# Get Vapi automation instance
automation = VapiAutomation(lead_loader)
@app.get("/")
async def redirect_root_to_docs():
return RedirectResponse("/docs")
@app.post("/execute")
async def execute(payload: dict):
"""
Trigger the lead processing workflow. Payload should contain list of lead IDs.
"""
try:
# Fetch leads based on provided IDs
lead_ids = payload.get("lead_ids", [])
print("Fetching lead data...")
leads = automation.load_leads(lead_ids=lead_ids)
if not leads:
return {"message": "No leads found."}
for lead in leads:
# Augment the lead data (web research, linkedIn profile,...)
automation.pre_call_processing(lead)
# Structure the required call parameters in Vapi format
call_params = automation.get_call_input_params(lead)
print("Call Inputs:\n", call_params)
# Initiate the call
print(f"Calling Lead {lead.id}...")
output = await automation.make_call(call_params)
# await 1s
time.sleep(1)
return {"message": "Calls initiated successfully for all leads."}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
print(e)
raise HTTPException(status_code=500, detail="An error occurred while executing the workflow")
@app.post("/webhook")
async def handle_webhook(request: Request):
"""
Handle incoming webhook requests from Vapi.
"""
try:
response = await automation.handle_webhook_call(request)
return response
except Exception as e:
print("Error processing webhook:", str(e))
raise HTTPException(status_code=400, detail="Invalid webhook payload")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)