From 8481463d002ae112a726fb9970bb5b586fc77d2b Mon Sep 17 00:00:00 2001 From: prajjwalkumarpanzade Date: Fri, 20 Dec 2024 23:30:45 +0530 Subject: [PATCH 1/2] setup server for stream using sockets --- service/.gitignore | 168 +++++++++++++++++++++++++++++++++++++++++++++ service/app.py | 35 ++++++++++ service/chunks.py | 86 +++++++++++++++++++++++ service/client.py | 34 +++++++++ service/server.py | 83 ++++++++++++++++++++++ 5 files changed, 406 insertions(+) create mode 100644 service/.gitignore create mode 100644 service/app.py create mode 100644 service/chunks.py create mode 100644 service/client.py create mode 100644 service/server.py diff --git a/service/.gitignore b/service/.gitignore new file mode 100644 index 0000000..da17d7c --- /dev/null +++ b/service/.gitignore @@ -0,0 +1,168 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +# lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +.pybuilder/ +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +# For a library or package, you might want to ignore these files since the code is +# intended to run in multiple environments; otherwise, check them in: +# .python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# poetry +# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control. +# This is especially recommended for binary packages to ensure reproducibility, and is more +# commonly ignored for libraries. +# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control +#poetry.lock + +# pdm +# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control. +#pdm.lock +# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it +# in version control. +# https://pdm.fming.dev/latest/usage/project/#working-with-version-control +.pdm.toml +.pdm-python +.pdm-build/ + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ +whisper_model/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pytype static type analyzer +.pytype/ + +# Cython debug symbols +cython_debug/ + +# PyCharm +# JetBrains specific template is maintained in a separate JetBrains.gitignore that can +# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore +# and can be added to the global gitignore or merged into this file. For a more nuclear +# option (not recommended) you can uncomment the following to ignore the entire idea folder. +#.idea/ + +/service/service.json +/service/user_account.json +/service/models +service.json \ No newline at end of file diff --git a/service/app.py b/service/app.py new file mode 100644 index 0000000..c04fee4 --- /dev/null +++ b/service/app.py @@ -0,0 +1,35 @@ +from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi.responses import JSONResponse +import os + +app = FastAPI() + +# Directory where audio files will be saved +UPLOAD_DIRECTORY = "uploaded_audio_files" + +# Ensure the directory exists +if not os.path.exists(UPLOAD_DIRECTORY): + os.makedirs(UPLOAD_DIRECTORY) + +@app.post("/upload-audio") +async def upload_audio(file: UploadFile = File(...)): + try: + # Get the file type based on file extension + if not file.filename.endswith(('.m4a', '.mp4', '.mp3', '.webm', '.mpga', '.wav', '.mpeg', '.ogg')): + raise HTTPException(status_code=400, detail="Invalid file type") + + # Define the path to save the uploaded file + file_path = os.path.join(UPLOAD_DIRECTORY, file.filename) + + # Save the uploaded file locally without using shutil + with open(file_path, "wb") as f: + # Read the file in chunks and write directly to the file + while chunk := await file.read(1024): # Read 1024 bytes at a time + f.write(chunk) + + # Return a response with the file location + return JSONResponse(content={"message": "File uploaded successfully!", "file_path": file_path}, status_code=200) + + except Exception as e: + # Handle any unexpected errors + return JSONResponse(content={"message": str(e)}, status_code=500) diff --git a/service/chunks.py b/service/chunks.py new file mode 100644 index 0000000..55c5a76 --- /dev/null +++ b/service/chunks.py @@ -0,0 +1,86 @@ +import pyaudio +import wave +import sys +import subprocess +import socket +import threading +import time + +class AudioStreamClient: + def __init__(self, host=socket.gethostname(), port=5000): + self.host = host + self.port = port + self.chunk_size = 1024 + self.running = False + + def connect(self): + self.client_socket = socket.socket() + try: + self.client_socket.connect((self.host, self.port)) + return True + except Exception as e: + print(f"Connection error: {e}") + return False + + def stream_audio(self, filename): + if not self.connect(): + return + + try: + # Start FFmpeg process to convert input file to raw PCM + ffmpeg_process = subprocess.Popen([ + "ffmpeg", + "-i", filename, + "-loglevel", "panic", + "-vn", # Disable video + "-f", "s16le", # Output format + "-acodec", "pcm_s16le", # Audio codec + "-ar", "44100", # Sample rate + "-ac", "2", # Channels + "pipe:1" + ], stdout=subprocess.PIPE) + + print("Starting audio stream...") + self.running = True + + # Read and stream data + while self.running: + data = ffmpeg_process.stdout.read(self.chunk_size) + if len(data) == 0: + break + + # Send to socket + self.client_socket.send(data) + + print("Streaming finished") + + except Exception as e: + print(f"Streaming error: {e}") + + finally: + # Cleanup + if 'ffmpeg_process' in locals(): + ffmpeg_process.kill() + self.client_socket.close() + + def stop(self): + self.running = False + +def main(): + if len(sys.argv) < 2: + print(f"Usage: {sys.argv[0]} filename") + sys.exit(-1) + + client = AudioStreamClient() + + # Start streaming in a separate thread + stream_thread = threading.Thread(target=client.stream_audio, args=(sys.argv[1],)) + stream_thread.start() + + # Wait for user input to stop + input("Press Enter to stop streaming...") + client.stop() + stream_thread.join() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/service/client.py b/service/client.py new file mode 100644 index 0000000..7460055 --- /dev/null +++ b/service/client.py @@ -0,0 +1,34 @@ +import socket +import time + +class MP3StreamClient: + def __init__(self, host=socket.gethostname(), port=5000): + self.host = host + self.port = port + self.chunk_size = 8192 # Size of chunks to read from file + + def stream_file(self, filename): + client_socket = socket.socket() + try: + client_socket.connect((self.host, self.port)) + + with open(filename, 'rb') as file: + while True: + data = file.read(self.chunk_size) + if not data: + break + client_socket.send(data) + # Small delay to simulate real-time streaming + time.sleep(0.1) + + print("Finished streaming file") + + except Exception as e: + print(f"Error: {e}") + finally: + client_socket.close() + +if __name__ == '__main__': + client = MP3StreamClient() + # Replace with your MP3 file path + client.stream_file('uploaded_audio_files/recording.mp3') \ No newline at end of file diff --git a/service/server.py b/service/server.py new file mode 100644 index 0000000..7f79a89 --- /dev/null +++ b/service/server.py @@ -0,0 +1,83 @@ +import socket +import os +from datetime import datetime +import threading +import time + +class AudioStreamServer: + def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=5): + self.host = host + self.port = port + self.chunk_duration = chunk_duration + self.buffer = bytearray() + self.buffer_lock = threading.Lock() + + # Create directory for saved chunks + self.save_dir = "ts_chunks" + if not os.path.exists(self.save_dir): + os.makedirs(self.save_dir) + + # Size of chunks to read from socket + self.socket_chunk_size = 8192 + + # Approximate size for 5 seconds of audio data (bitrate * duration) + # Assuming 128kbps audio + self.bytes_per_chunk = 128 * 1024 * chunk_duration // 8 + + def start(self): + server_socket = socket.socket() + server_socket.bind((self.host, self.port)) + server_socket.listen(1) + print(f"Server listening on port {self.port}...") + + processing_thread = threading.Thread(target=self.process_chunks) + processing_thread.daemon = True + processing_thread.start() + + while True: + conn, address = server_socket.accept() + print(f"Connection from: {address}") + self.handle_client(conn) + + def handle_client(self, conn): + try: + while True: + data = conn.recv(self.socket_chunk_size) + if not data: + break + + with self.buffer_lock: + self.buffer.extend(data) + except Exception as e: + print(f"Error handling client: {e}") + finally: + conn.close() + + def process_chunks(self): + while True: + with self.buffer_lock: + if len(self.buffer) >= self.bytes_per_chunk: + # Extract chunk + chunk_data = bytes(self.buffer[:self.bytes_per_chunk]) + self.buffer = self.buffer[self.bytes_per_chunk:] + self.save_chunk(chunk_data) + time.sleep(0.1) + + def save_chunk(self, chunk_data): + try: + # Generate filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"chunk_{timestamp}.ts" + filepath = os.path.join(self.save_dir, filename) + + # Save the audio chunk as a TS file + with open(filepath, 'wb') as f: + f.write(chunk_data) + print(f"Saved chunk as TS: {filename}") + + except Exception as e: + print(f"Error saving chunk: {e}") + +if __name__ == '__main__': + server = AudioStreamServer() + server.start() From 4133fd786a6ec50849e79705c592c8947ed0f020 Mon Sep 17 00:00:00 2001 From: prajjwalkumarpanzade Date: Sat, 21 Dec 2024 04:11:23 +0530 Subject: [PATCH 2/2] fast api server setup and webhook for dialogueflow --- service/app.py | 26 ++++++++++++++++++- service/chunks.py | 63 ++++++++++++++++++++--------------------------- service/server.py | 55 ++++++++++++++++++----------------------- 3 files changed, 76 insertions(+), 68 deletions(-) diff --git a/service/app.py b/service/app.py index c04fee4..461addf 100644 --- a/service/app.py +++ b/service/app.py @@ -1,4 +1,4 @@ -from fastapi import FastAPI, UploadFile, File, HTTPException +from fastapi import FastAPI, UploadFile, File, HTTPException, Request from fastapi.responses import JSONResponse import os @@ -33,3 +33,27 @@ async def upload_audio(file: UploadFile = File(...)): except Exception as e: # Handle any unexpected errors return JSONResponse(content={"message": str(e)}, status_code=500) + +@app.get("/") +def index(): + return {"message": "Hello World!"} + +# Function for webhook responses +async def results(request: Request): + try: + # Parse JSON request body + req = await request.json() + + # Extract action from the request + action = req.get('queryResult', {}).get('action', 'unknown') + + # Return a fulfillment response + return {"fulfillmentText": f"This is a response from webhook. Action: {action}"} + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error processing request: {e}") + +# Route for webhook +@app.post("/webhook") +async def webhook(request: Request): + response = await results(request) + return JSONResponse(content=response) \ No newline at end of file diff --git a/service/chunks.py b/service/chunks.py index 55c5a76..2ea72c0 100644 --- a/service/chunks.py +++ b/service/chunks.py @@ -1,86 +1,77 @@ -import pyaudio -import wave -import sys -import subprocess import socket +import subprocess import threading -import time +import sys class AudioStreamClient: def __init__(self, host=socket.gethostname(), port=5000): self.host = host self.port = port - self.chunk_size = 1024 + self.chunk_size = 8192 self.running = False - + def connect(self): - self.client_socket = socket.socket() + self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.client_socket.connect((self.host, self.port)) return True except Exception as e: print(f"Connection error: {e}") return False - + def stream_audio(self, filename): if not self.connect(): return - + try: - # Start FFmpeg process to convert input file to raw PCM + # Convert MP3 to raw PCM using FFmpeg ffmpeg_process = subprocess.Popen([ "ffmpeg", - "-i", filename, + "-i", filename, # Input file "-loglevel", "panic", - "-vn", # Disable video - "-f", "s16le", # Output format - "-acodec", "pcm_s16le", # Audio codec + "-vn", # No video + "-f", "s16le", # Raw PCM output + "-acodec", "pcm_s16le", "-ar", "44100", # Sample rate - "-ac", "2", # Channels + "-ac", "2", # Stereo channels "pipe:1" - ], stdout=subprocess.PIPE) + ], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) print("Starting audio stream...") self.running = True - - # Read and stream data + while self.running: data = ffmpeg_process.stdout.read(self.chunk_size) - if len(data) == 0: + if not data: break - - # Send to socket - self.client_socket.send(data) - + self.client_socket.sendall(data) + print("Streaming finished") - + except Exception as e: print(f"Streaming error: {e}") - + finally: - # Cleanup - if 'ffmpeg_process' in locals(): - ffmpeg_process.kill() + ffmpeg_process.kill() self.client_socket.close() - + def stop(self): self.running = False + def main(): if len(sys.argv) < 2: print(f"Usage: {sys.argv[0]} filename") sys.exit(-1) - + client = AudioStreamClient() - - # Start streaming in a separate thread stream_thread = threading.Thread(target=client.stream_audio, args=(sys.argv[1],)) stream_thread.start() - - # Wait for user input to stop + input("Press Enter to stop streaming...") client.stop() stream_thread.join() + if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/service/server.py b/service/server.py index 7f79a89..d6494a4 100644 --- a/service/server.py +++ b/service/server.py @@ -5,40 +5,32 @@ import time class AudioStreamServer: - def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=5): + def __init__(self, host=socket.gethostname(), port=5000, chunk_duration=10): self.host = host self.port = port self.chunk_duration = chunk_duration self.buffer = bytearray() self.buffer_lock = threading.Lock() - # Create directory for saved chunks - self.save_dir = "ts_chunks" - if not os.path.exists(self.save_dir): - os.makedirs(self.save_dir) - - # Size of chunks to read from socket + self.save_dir = "raw_chunks" + os.makedirs(self.save_dir, exist_ok=True) + self.socket_chunk_size = 8192 - - # Approximate size for 5 seconds of audio data (bitrate * duration) - # Assuming 128kbps audio - self.bytes_per_chunk = 128 * 1024 * chunk_duration // 8 + self.bytes_per_chunk = 44100 * 2 * 2 * self.chunk_duration # 44.1kHz, 16-bit, stereo def start(self): - server_socket = socket.socket() + server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.bind((self.host, self.port)) server_socket.listen(1) print(f"Server listening on port {self.port}...") - - processing_thread = threading.Thread(target=self.process_chunks) - processing_thread.daemon = True - processing_thread.start() - + + threading.Thread(target=self.process_chunks, daemon=True).start() + while True: conn, address = server_socket.accept() print(f"Connection from: {address}") - self.handle_client(conn) - + threading.Thread(target=self.handle_client, args=(conn,), daemon=True).start() + def handle_client(self, conn): try: while True: @@ -48,35 +40,36 @@ def handle_client(self, conn): with self.buffer_lock: self.buffer.extend(data) + print(f"Received {len(data)} bytes") + except Exception as e: - print(f"Error handling client: {e}") + print(f"Client handling error: {e}") finally: conn.close() - + def process_chunks(self): while True: with self.buffer_lock: if len(self.buffer) >= self.bytes_per_chunk: - # Extract chunk chunk_data = bytes(self.buffer[:self.bytes_per_chunk]) self.buffer = self.buffer[self.bytes_per_chunk:] - self.save_chunk(chunk_data) + self.save_chunk_as_raw(chunk_data) time.sleep(0.1) - - def save_chunk(self, chunk_data): + + def save_chunk_as_raw(self, chunk_data): try: - # Generate filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - filename = f"chunk_{timestamp}.ts" + filename = f"chunk_{timestamp}.raw" filepath = os.path.join(self.save_dir, filename) - - # Save the audio chunk as a TS file + with open(filepath, 'wb') as f: f.write(chunk_data) - print(f"Saved chunk as TS: {filename}") + print(f"Saved chunk as RAW: {filename}") + except Exception as e: - print(f"Error saving chunk: {e}") + print(f"Error saving RAW file: {e}") + if __name__ == '__main__': server = AudioStreamServer()