From a15f952f24729e49c27c7f3755347ddf6c0b8e19 Mon Sep 17 00:00:00 2001 From: Josh Allmann Date: Thu, 31 Oct 2024 07:22:29 +0000 Subject: [PATCH] live/infer: improve out-ffmpeg handling --- runner/app/live/trickle/media.py | 18 ++++++++--------- runner/app/live/trickle/segmenter.py | 30 +++++++++------------------- 2 files changed, 18 insertions(+), 30 deletions(-) diff --git a/runner/app/live/trickle/media.py b/runner/app/live/trickle/media.py index 6422732b..df8b4ad3 100644 --- a/runner/app/live/trickle/media.py +++ b/runner/app/live/trickle/media.py @@ -25,6 +25,7 @@ async def run_subscribe(subscribe_url: str, image_callback): subscribe_task = asyncio.create_task(subscribe(subscribe_url, ffmpeg.stdin)) jpeg_task = asyncio.create_task(parse_jpegs(ffmpeg.stdout, image_callback)) await asyncio.gather(ffmpeg.wait(), logging_task, subscribe_task, jpeg_task) + logging.info("run_subscribe complete") except Exception as e: logging.error(f"preprocess got error {e}", e) raise e @@ -120,14 +121,13 @@ async def parse_jpegs(in_pipe, image_callback): parser.feed(chunk) def feed_ffmpeg(ffmpeg_fd, image_generator): - with os.fdopen(ffmpeg_fd, 'wb', buffering=0) as ffmpeg: - while True: - image = image_generator.get() - if image is None: - logging.info("Image generator empty, leaving feed_ffmpeg") - break - ffmpeg.write(image) - ffmpeg.flush() + while True: + image = image_generator.get() + if image is None: + logging.info("Image generator empty, leaving feed_ffmpeg") + break + os.write(ffmpeg_fd, image) + os.close(ffmpeg_fd) async def run_publish(publish_url: str, image_generator): try: @@ -178,7 +178,7 @@ def joins(): segment_thread.join() ffmpeg_feeder.join() await asyncio.to_thread(joins) - logging.info("postprocess complete") + logging.info("run_publish complete") except Exception as e: logging.error(f"postprocess got error {e}", e) diff --git a/runner/app/live/trickle/segmenter.py b/runner/app/live/trickle/segmenter.py index 37b44875..bf6be8cb 100644 --- a/runner/app/live/trickle/segmenter.py +++ b/runner/app/live/trickle/segmenter.py @@ -11,7 +11,7 @@ from datetime import datetime # Constants and initial values -READ_TIMEOUT = 2 +READ_TIMEOUT = 20 SLEEP_INTERVAL = 0.05 # TODO make this better configurable @@ -35,7 +35,7 @@ def remove_named_pipe(pipe_name): if e.errno != errno.ENOENT: raise -def ffmpeg_cmd(in_pipe_fd, out_pattern): +def ffmpeg_cmd(out_pattern): if GPU: cmd = [ @@ -43,7 +43,7 @@ def ffmpeg_cmd(in_pipe_fd, out_pattern): '-loglevel', 'warning', '-f', 'image2pipe', '-framerate', f"{FRAMERATE}", - '-i', f'pipe:{in_pipe_fd}', + '-i', 'pipe:0', # stdin '-c:v', 'h264_nvenc', '-bf', '0', # disable bframes for webrtc '-g', f'{GOP_SECS*FRAMERATE}', @@ -55,11 +55,10 @@ def ffmpeg_cmd(in_pipe_fd, out_pattern): else: cmd = [ 'ffmpeg', - '-loglevel', 'info', + '-loglevel', 'warning', '-f', 'image2pipe', '-framerate', f"{FRAMERATE}", - '-i', f'pipe:{in_pipe_fd}', - #'-i', f'-', # stdin + '-i', 'pipe:0', # stdin '-c:v', 'libx264', '-bf', '0', # disable bframes for webrtc '-g', f'{GOP_SECS*FRAMERATE}', @@ -115,7 +114,6 @@ def read_from_pipe(pipe_name, callback, ffmpeg_proc): return True def segment_reading_process(in_fd, callback): - logging.info("JOSH - in segment reading process") pipe_index = 0 out_pattern = generate_random_string() + "-%d.ts" @@ -125,19 +123,15 @@ def segment_reading_process(in_fd, callback): # Launch FFmpeg process with stdin, stdout, and stderr as pipes proc = subprocess.Popen( - ffmpeg_cmd(in_fd, out_pattern), - stdin=subprocess.PIPE, - #stdin=in_fd, + ffmpeg_cmd(out_pattern), + stdin=in_fd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - pass_fds=(in_fd,), + stderr=subprocess.STDOUT, ) # Create a thread to handle stderr redirection - thread = threading.Thread(target=print_proc, args=(proc.stderr,)) + thread = threading.Thread(target=print_proc, args=(proc.stdout,)) thread.start() - thread2 = threading.Thread(target=print_proc, args=(proc.stdout,)) - thread2.start() try: while True: @@ -158,17 +152,11 @@ def segment_reading_process(in_fd, callback): finally: os.close(in_fd) - #proc.stdin.close() logging.info("awaitng ffmpeg (output)") proc.wait() logging.info("proc complete ffmpeg (output)") thread.join() - thread2.join() logging.info("ffmpeg (output) complete") - #(stdout, stderr) = proc.communicate() - #logging.info("FFmpeg (output)") - #logging.info(stderr.decode()) - #logging.info(stdout.decode()) # Cleanup remaining pipes remove_named_pipe(current_pipe)