Skip to content

Commit

Permalink
live/infer: improve out-ffmpeg handling
Browse files Browse the repository at this point in the history
  • Loading branch information
j0sh authored and victorges committed Nov 1, 2024
1 parent 894f817 commit a15f952
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 30 deletions.
18 changes: 9 additions & 9 deletions runner/app/live/trickle/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ async def run_subscribe(subscribe_url: str, image_callback):
subscribe_task = asyncio.create_task(subscribe(subscribe_url, ffmpeg.stdin))
jpeg_task = asyncio.create_task(parse_jpegs(ffmpeg.stdout, image_callback))
await asyncio.gather(ffmpeg.wait(), logging_task, subscribe_task, jpeg_task)
logging.info("run_subscribe complete")
except Exception as e:
logging.error(f"preprocess got error {e}", e)
raise e
Expand Down Expand Up @@ -120,14 +121,13 @@ async def parse_jpegs(in_pipe, image_callback):
parser.feed(chunk)

def feed_ffmpeg(ffmpeg_fd, image_generator):
with os.fdopen(ffmpeg_fd, 'wb', buffering=0) as ffmpeg:
while True:
image = image_generator.get()
if image is None:
logging.info("Image generator empty, leaving feed_ffmpeg")
break
ffmpeg.write(image)
ffmpeg.flush()
while True:
image = image_generator.get()
if image is None:
logging.info("Image generator empty, leaving feed_ffmpeg")
break
os.write(ffmpeg_fd, image)
os.close(ffmpeg_fd)

async def run_publish(publish_url: str, image_generator):
try:
Expand Down Expand Up @@ -178,7 +178,7 @@ def joins():
segment_thread.join()
ffmpeg_feeder.join()
await asyncio.to_thread(joins)
logging.info("postprocess complete")
logging.info("run_publish complete")

except Exception as e:
logging.error(f"postprocess got error {e}", e)
Expand Down
30 changes: 9 additions & 21 deletions runner/app/live/trickle/segmenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime

# Constants and initial values
READ_TIMEOUT = 2
READ_TIMEOUT = 20
SLEEP_INTERVAL = 0.05

# TODO make this better configurable
Expand All @@ -35,15 +35,15 @@ def remove_named_pipe(pipe_name):
if e.errno != errno.ENOENT:
raise

def ffmpeg_cmd(in_pipe_fd, out_pattern):
def ffmpeg_cmd(out_pattern):

if GPU:
cmd = [
'ffmpeg',
'-loglevel', 'warning',
'-f', 'image2pipe',
'-framerate', f"{FRAMERATE}",
'-i', f'pipe:{in_pipe_fd}',
'-i', 'pipe:0', # stdin
'-c:v', 'h264_nvenc',
'-bf', '0', # disable bframes for webrtc
'-g', f'{GOP_SECS*FRAMERATE}',
Expand All @@ -55,11 +55,10 @@ def ffmpeg_cmd(in_pipe_fd, out_pattern):
else:
cmd = [
'ffmpeg',
'-loglevel', 'info',
'-loglevel', 'warning',
'-f', 'image2pipe',
'-framerate', f"{FRAMERATE}",
'-i', f'pipe:{in_pipe_fd}',
#'-i', f'-', # stdin
'-i', 'pipe:0', # stdin
'-c:v', 'libx264',
'-bf', '0', # disable bframes for webrtc
'-g', f'{GOP_SECS*FRAMERATE}',
Expand Down Expand Up @@ -115,7 +114,6 @@ def read_from_pipe(pipe_name, callback, ffmpeg_proc):
return True

def segment_reading_process(in_fd, callback):
logging.info("JOSH - in segment reading process")
pipe_index = 0
out_pattern = generate_random_string() + "-%d.ts"

Expand All @@ -125,19 +123,15 @@ def segment_reading_process(in_fd, callback):

# Launch FFmpeg process with stdin, stdout, and stderr as pipes
proc = subprocess.Popen(
ffmpeg_cmd(in_fd, out_pattern),
stdin=subprocess.PIPE,
#stdin=in_fd,
ffmpeg_cmd(out_pattern),
stdin=in_fd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
pass_fds=(in_fd,),
stderr=subprocess.STDOUT,
)

# Create a thread to handle stderr redirection
thread = threading.Thread(target=print_proc, args=(proc.stderr,))
thread = threading.Thread(target=print_proc, args=(proc.stdout,))
thread.start()
thread2 = threading.Thread(target=print_proc, args=(proc.stdout,))
thread2.start()

try:
while True:
Expand All @@ -158,17 +152,11 @@ def segment_reading_process(in_fd, callback):

finally:
os.close(in_fd)
#proc.stdin.close()
logging.info("awaitng ffmpeg (output)")
proc.wait()
logging.info("proc complete ffmpeg (output)")
thread.join()
thread2.join()
logging.info("ffmpeg (output) complete")
#(stdout, stderr) = proc.communicate()
#logging.info("FFmpeg (output)")
#logging.info(stderr.decode())
#logging.info(stdout.decode())

# Cleanup remaining pipes
remove_named_pipe(current_pipe)
Expand Down

0 comments on commit a15f952

Please sign in to comment.