From 4133fd786a6ec50849e79705c592c8947ed0f020 Mon Sep 17 00:00:00 2001 From: prajjwalkumarpanzade Date: Sat, 21 Dec 2024 04:11:23 +0530 Subject: [PATCH] fast api server setup and webhook for dialogueflow --- service/app.py | 26 ++++++++++++++++++- service/chunks.py | 63 ++++++++++++++++++++--------------------------- service/server.py | 55 ++++++++++++++++++----------------------- 3 files changed, 76 insertions(+), 68 deletions(-) diff --git a/service/app.py b/service/app.py index c04fee4..461addf 100644 --- a/service/app.py +++ b/service/app.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi import FastAPI, UploadFile, File, HTTPException, Request from fastapi.responses import JSONResponse import os @@ -33,3 +33,27 @@ async def upload_audio(file: UploadFile = File(...)): except Exception as e: # Handle any unexpected errors return JSONResponse(content={"message": str(e)}, status_code=500) + +@app.get("/") +def index(): + return {"message": "Hello World!"} + +# Function for webhook responses +async def results(request: Request): + try: + # Parse JSON request body + req = await request.json() + + # Extract action from the request + action = req.get('queryResult', {}).get('action', 'unknown') + + # Return a fulfillment response + return {"fulfillmentText": f"This is a response from webhook. Action: {action}"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error processing request: {e}") + +# Route for webhook +@app.post("/webhook") +async def webhook(request: Request): + response = await results(request) + return JSONResponse(content=response) \ No newline at end of file diff --git a/service/chunks.py b/service/chunks.py index 55c5a76..2ea72c0 100644 --- a/service/chunks.py +++ b/service/chunks.py @@ -1,86 +1,77 @@ -import pyaudio -import wave -import sys -import subprocess import socket +import subprocess import threading -import time +import sys class AudioStreamClient: def __init__(self, host=socket.gethostname(), port=5000): self.host = host self.port = port - self.chunk_size = 1024 + self.chunk_size = 8192 self.running = False - + def connect(self): - self.client_socket = socket.socket() + self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.client_socket.connect((self.host, self.port)) return True except Exception as e: print(f"Connection error: {e}") return False - + def stream_audio(self, filename): if not self.connect(): return - + try: - # Start FFmpeg process to convert input file to raw PCM + # Convert MP3 to raw PCM using FFmpeg ffmpeg_process = subprocess.Popen([ "ffmpeg", - "-i", filename, + "-i", filename, # Input file "-loglevel", "panic", - "-vn", # Disable video - "-f", "s16le", # Output format - "-acodec", "pcm_s16le", # Audio codec + "-vn", # No video + "-f", "s16le", # Raw PCM output + "-acodec", "pcm_s16le", "-ar", "44100", # Sample rate - "-ac", "2", # Channels + "-ac", "2", # Stereo channels "pipe:1" - ], stdout=subprocess.PIPE) + ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) print("Starting audio stream...") self.running = True - - # Read and stream data + while self.running: data = ffmpeg_process.stdout.read(self.chunk_size) - if len(data) == 0: + if not data: break - - # Send to socket - self.client_socket.send(data) - + self.client_socket.sendall(data) + print("Streaming finished") - + except Exception as e: print(f"Streaming error: {e}") - + finally: - # Cleanup - if 'ffmpeg_process' in locals(): - ffmpeg_process.kill() + ffmpeg_process.kill() self.client_socket.close() - + def stop(self): self.running = False + def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} filename") sys.exit(-1) - + client = AudioStreamClient() - - # Start streaming in a separate thread stream_thread = threading.Thread(target=client.stream_audio, args=(sys.argv[1],)) stream_thread.start() - - # Wait for user input to stop + input("Press Enter to stop streaming...") client.stop() stream_thread.join() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/service/server.py b/service/server.py index 7f79a89..d6494a4 100644 --- a/service/server.py +++ b/service/server.py @@ -5,40 +5,32 @@ import time class AudioStreamServer: - def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=5): + def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=10): self.host = host self.port = port self.chunk_duration = chunk_duration self.buffer = bytearray() self.buffer_lock = threading.Lock() - # Create directory for saved chunks - self.save_dir = "ts_chunks" - if not os.path.exists(self.save_dir): - os.makedirs(self.save_dir) - - # Size of chunks to read from socket + self.save_dir = "raw_chunks" + os.makedirs(self.save_dir, exist_ok=True) + self.socket_chunk_size = 8192 - - # Approximate size for 5 seconds of audio data (bitrate * duration) - # Assuming 128kbps audio - self.bytes_per_chunk = 128 * 1024 * chunk_duration // 8 + self.bytes_per_chunk = 44100 * 2 * 2 * self.chunk_duration # 44.1kHz, 16-bit, stereo def start(self): - server_socket = socket.socket() + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((self.host, self.port)) server_socket.listen(1) print(f"Server listening on port {self.port}...") - - processing_thread = threading.Thread(target=self.process_chunks) - processing_thread.daemon = True - processing_thread.start() - + + threading.Thread(target=self.process_chunks, daemon=True).start() + while True: conn, address = server_socket.accept() print(f"Connection from: {address}") - self.handle_client(conn) - + threading.Thread(target=self.handle_client, args=(conn,), daemon=True).start() + def handle_client(self, conn): try: while True: @@ -48,35 +40,36 @@ def handle_client(self, conn): with self.buffer_lock: self.buffer.extend(data) + print(f"Received {len(data)} bytes") + except Exception as e: - print(f"Error handling client: {e}") + print(f"Client handling error: {e}") finally: conn.close() - + def process_chunks(self): while True: with self.buffer_lock: if len(self.buffer) >= self.bytes_per_chunk: - # Extract chunk chunk_data = bytes(self.buffer[:self.bytes_per_chunk]) self.buffer = self.buffer[self.bytes_per_chunk:] - self.save_chunk(chunk_data) + self.save_chunk_as_raw(chunk_data) time.sleep(0.1) - - def save_chunk(self, chunk_data): + + def save_chunk_as_raw(self, chunk_data): try: - # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"chunk_{timestamp}.ts" + filename = f"chunk_{timestamp}.raw" filepath = os.path.join(self.save_dir, filename) - - # Save the audio chunk as a TS file + with open(filepath, 'wb') as f: f.write(chunk_data) - print(f"Saved chunk as TS: {filename}") + print(f"Saved chunk as RAW: {filename}") + except Exception as e: - print(f"Error saving chunk: {e}") + print(f"Error saving RAW file: {e}") + if __name__ == '__main__': server = AudioStreamServer()