diff --git a/nerd-dictation b/nerd-dictation index 2b8c7fc..1418fba 100755 --- a/nerd-dictation +++ b/nerd-dictation @@ -33,6 +33,7 @@ import subprocess import sys import tempfile import time +from functools import partial # Types. from typing import ( @@ -94,12 +95,9 @@ def file_remove_if_exists(filepath: str) -> bool: return False -def file_handle_make_non_blocking(file_handle: IO[bytes]) -> None: - import fcntl - - # Get current `file_handle` flags. - flags = fcntl.fcntl(file_handle.fileno(), fcntl.F_GETFL) - fcntl.fcntl(file_handle, fcntl.F_SETFL, flags | os.O_NONBLOCK) +def enqueue_output(vosk_out, queue): + for block in iter(partial(vosk_out.read, 1024), b""): + queue.put(block) def execfile(filepath: str, mod: Optional[ModuleType] = None) -> Optional[ModuleType]: @@ -538,22 +536,38 @@ def text_from_vosk_pipe( ) sys.exit(1) - cmd = ( - "parec", - "--record", - "--rate=%d" % sample_rate, - "--channels=1", - *(("--device=%s" % pulse_device_name,) if pulse_device_name else ()), - "--format=s16ne", - "--latency=10", - ) + if os.name == "posix": + cmd = ( + "parec", + "--record", + "--rate=%d" % sample_rate, + "--channels=1", + *(("--device=%s" % pulse_device_name,) if pulse_device_name else ()), + "--format=s16ne", + "--latency=10", + ) + else: + # https://stsaz.github.io/fmedia/recording/#stdout + cmd = ( + "fmedia", + "--record", + "--out=@stdout.wav", + "--rate=%d" % sample_rate, + "--channels=mono", + "--format=int16", + "--notui", + ) ps = subprocess.Popen(cmd, stdout=subprocess.PIPE) - stdout = ps.stdout assert stdout is not None - # Needed so whatever is available can be read (without waiting). - file_handle_make_non_blocking(stdout) + from threading import Thread + from queue import Queue, Empty + + vosk_queue:Queue = Queue() + t = Thread(target=enqueue_output, args=(stdout, vosk_queue)) + t.daemon = True + t.start() # `mypy` doesn't know about VOSK. import vosk # type: ignore @@ -635,20 +649,19 @@ def text_from_vosk_pipe( # Skip idling in the event dictation can't keep up with the recording. idle_time_curr = time.time() idle_time_test = idle_time - (idle_time_curr - idle_time_prev) - if idle_time_test > 0.0: + if vosk_queue.empty() and idle_time_test > 0.0: # Prevents excessive processor load. time.sleep(idle_time_test) idle_time_prev = time.time() else: idle_time_prev = idle_time_curr - # Mostly the data read is quite small (under 1k). - # Only the 1st entry in the loop reads a lot of data due to the time it takes to initialize the VOSK module. - data = stdout.read(block_size) - - if data: + try: + data = vosk_queue.get_nowait() + except Empty: + pass + else: ok = rec.AcceptWaveform(data) - if ok: json_text = rec.Result() json_text_partial_prev = "" @@ -719,6 +732,7 @@ def main_begin( delay_exit: float = 0.0, punctuate_from_previous_timeout: float = 0.0, output: str = "TYPE", + input_method: str = "auto", ) -> None: """ Initialize audio recording, then full text to speech conversion can take place. @@ -732,13 +746,21 @@ def main_begin( if not vosk_model_dir: vosk_model_dir = calc_user_config_path("model") # If this still doesn't exist the error is handled later. - # # Initialize the recording state and perform some sanity checks. # if not path_to_cookie: path_to_cookie = os.path.join(tempfile.gettempdir(), TEMP_COOKIE_NAME) + if input_method == "pynput": + try: + from pynput.keyboard import Key, Controller # type: ignore + + keyboard = Controller() + except ImportError: + sys.stderr.write("Module 'pynput' is not installed. Defaulting input method to xdotool.") + input_method = "xdotool" + is_run_on = False if punctuate_from_previous_timeout > 0.0: age_in_seconds: Optional[float] = None @@ -840,27 +862,35 @@ def main_begin( if not text_block: pass elif text_block.startswith("\x08"): - cmd = ( - "xdotool", - "key", - "--clearmodifiers", - "--delay", - "10", - *(("BackSpace",) * len(text_block)), - ) - subprocess.check_output(cmd).decode("utf-8") + if input_method == "pynput": + keyboard.press(Key.backspace) + time.sleep(0.01) + keyboard.release(Key.backspace) + else: + cmd = ( + "xdotool", + "key", + "--clearmodifiers", + "--delay", + "10", + *(("BackSpace",) * len(text_block)), + ) + subprocess.check_output(cmd).decode("utf-8") else: - cmd = ( - "xdotool", - "type", - "--clearmodifiers", - # Use a value higher than twelve so the characters don't get skipped (tsk!). - "--delay", - "10", - "--", - text_block, - ) - subprocess.check_output(cmd).decode("utf-8") + if input_method == "pynput": + keyboard.type(text_block) + else: + cmd = ( + "xdotool", + "type", + "--clearmodifiers", + # Use a value higher than twelve so the characters don't get skipped (tsk!). + "--delay", + "10", + "--", + text_block, + ) + subprocess.check_output(cmd).decode("utf-8") elif output == "STDOUT": @@ -1110,6 +1140,19 @@ This creates the directory used to store internal data, so other commands such a ), ) + subparse.add_argument( + "--simulate-input-method", + dest="input_method", + default="auto", + choices=("auto", "pynput"), + help=( + "Choose the tool to simulate keyboard inputs. Valid choices are:\n" + "- ``auto`` uses xdotool with Xorg (default)\n" + "- ``pynput``\n" + ), + required=False, + ) + subparse.set_defaults( func=lambda args: main_begin( path_to_cookie=args.path_to_cookie, @@ -1126,6 +1169,7 @@ This creates the directory used to store internal data, so other commands such a delay_exit=args.delay_exit, punctuate_from_previous_timeout=args.punctuate_from_previous_timeout, output=args.output, + input_method=args.input_method, ), )