Skip to content

Commit

Permalink
fast api server setup and webhook for dialogueflow
Browse files Browse the repository at this point in the history
  • Loading branch information
prajjwalkumarpanzade committed Dec 20, 2024
1 parent 8481463 commit 4133fd7
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 68 deletions.
26 changes: 25 additions & 1 deletion service/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi import FastAPI, UploadFile, File, HTTPException, Request
from fastapi.responses import JSONResponse
import os

Expand Down Expand Up @@ -33,3 +33,27 @@ async def upload_audio(file: UploadFile = File(...)):
except Exception as e:
# Handle any unexpected errors
return JSONResponse(content={"message": str(e)}, status_code=500)

@app.get("/")
def index():
return {"message": "Hello World!"}

# Function for webhook responses
async def results(request: Request):
try:
# Parse JSON request body
req = await request.json()

# Extract action from the request
action = req.get('queryResult', {}).get('action', 'unknown')

# Return a fulfillment response
return {"fulfillmentText": f"This is a response from webhook. Action: {action}"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error processing request: {e}")

# Route for webhook
@app.post("/webhook")
async def webhook(request: Request):
response = await results(request)
return JSONResponse(content=response)
63 changes: 27 additions & 36 deletions service/chunks.py
Original file line number Diff line number Diff line change
@@ -1,86 +1,77 @@
import pyaudio
import wave
import sys
import subprocess
import socket
import subprocess
import threading
import time
import sys

class AudioStreamClient:
def __init__(self, host=socket.gethostname(), port=5000):
self.host = host
self.port = port
self.chunk_size = 1024
self.chunk_size = 8192
self.running = False

def connect(self):
self.client_socket = socket.socket()
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
self.client_socket.connect((self.host, self.port))
return True
except Exception as e:
print(f"Connection error: {e}")
return False

def stream_audio(self, filename):
if not self.connect():
return

try:
# Start FFmpeg process to convert input file to raw PCM
# Convert MP3 to raw PCM using FFmpeg
ffmpeg_process = subprocess.Popen([
"ffmpeg",
"-i", filename,
"-i", filename, # Input file
"-loglevel", "panic",
"-vn", # Disable video
"-f", "s16le", # Output format
"-acodec", "pcm_s16le", # Audio codec
"-vn", # No video
"-f", "s16le", # Raw PCM output
"-acodec", "pcm_s16le",
"-ar", "44100", # Sample rate
"-ac", "2", # Channels
"-ac", "2", # Stereo channels
"pipe:1"
], stdout=subprocess.PIPE)
], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)

print("Starting audio stream...")
self.running = True

# Read and stream data

while self.running:
data = ffmpeg_process.stdout.read(self.chunk_size)
if len(data) == 0:
if not data:
break

# Send to socket
self.client_socket.send(data)

self.client_socket.sendall(data)

print("Streaming finished")

except Exception as e:
print(f"Streaming error: {e}")

finally:
# Cleanup
if 'ffmpeg_process' in locals():
ffmpeg_process.kill()
ffmpeg_process.kill()
self.client_socket.close()

def stop(self):
self.running = False


def main():
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} filename")
sys.exit(-1)

client = AudioStreamClient()

# Start streaming in a separate thread
stream_thread = threading.Thread(target=client.stream_audio, args=(sys.argv[1],))
stream_thread.start()

# Wait for user input to stop

input("Press Enter to stop streaming...")
client.stop()
stream_thread.join()


if __name__ == "__main__":
main()
main()
55 changes: 24 additions & 31 deletions service/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,32 @@
import time

class AudioStreamServer:
def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=5):
def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=10):
self.host = host
self.port = port
self.chunk_duration = chunk_duration
self.buffer = bytearray()
self.buffer_lock = threading.Lock()

# Create directory for saved chunks
self.save_dir = "ts_chunks"
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)

# Size of chunks to read from socket
self.save_dir = "raw_chunks"
os.makedirs(self.save_dir, exist_ok=True)

self.socket_chunk_size = 8192

# Approximate size for 5 seconds of audio data (bitrate * duration)
# Assuming 128kbps audio
self.bytes_per_chunk = 128 * 1024 * chunk_duration // 8
self.bytes_per_chunk = 44100 * 2 * 2 * self.chunk_duration # 44.1kHz, 16-bit, stereo

def start(self):
server_socket = socket.socket()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((self.host, self.port))
server_socket.listen(1)
print(f"Server listening on port {self.port}...")

processing_thread = threading.Thread(target=self.process_chunks)
processing_thread.daemon = True
processing_thread.start()


threading.Thread(target=self.process_chunks, daemon=True).start()

while True:
conn, address = server_socket.accept()
print(f"Connection from: {address}")
self.handle_client(conn)
threading.Thread(target=self.handle_client, args=(conn,), daemon=True).start()

def handle_client(self, conn):
try:
while True:
Expand All @@ -48,35 +40,36 @@ def handle_client(self, conn):

with self.buffer_lock:
self.buffer.extend(data)
print(f"Received {len(data)} bytes")

except Exception as e:
print(f"Error handling client: {e}")
print(f"Client handling error: {e}")
finally:
conn.close()

def process_chunks(self):
while True:
with self.buffer_lock:
if len(self.buffer) >= self.bytes_per_chunk:
# Extract chunk
chunk_data = bytes(self.buffer[:self.bytes_per_chunk])
self.buffer = self.buffer[self.bytes_per_chunk:]
self.save_chunk(chunk_data)
self.save_chunk_as_raw(chunk_data)
time.sleep(0.1)
def save_chunk(self, chunk_data):

def save_chunk_as_raw(self, chunk_data):
try:
# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"chunk_{timestamp}.ts"
filename = f"chunk_{timestamp}.raw"
filepath = os.path.join(self.save_dir, filename)

# Save the audio chunk as a TS file

with open(filepath, 'wb') as f:
f.write(chunk_data)
print(f"Saved chunk as TS: {filename}")

print(f"Saved chunk as RAW: {filename}")

except Exception as e:
print(f"Error saving chunk: {e}")
print(f"Error saving RAW file: {e}")


if __name__ == '__main__':
server = AudioStreamServer()
Expand Down

0 comments on commit 4133fd7

Please sign in to comment.