From e378cabebe6ffa065b360ddb7d8b031f1d8a2228 Mon Sep 17 00:00:00 2001 From: salt-die Date: Mon, 22 Jul 2024 05:37:03 -0500 Subject: [PATCH] Minor rewrite of `Vt100Terminal` for clarity. --- src/batgrl/app.py | 6 +-- src/batgrl/terminal/__init__.py | 8 +-- src/batgrl/terminal/linux_terminal.py | 29 ++++++----- src/batgrl/terminal/vt100_terminal.py | 69 ++++++++++++++----------- src/batgrl/terminal/windows_terminal.py | 51 +++++++++--------- 5 files changed, 88 insertions(+), 75 deletions(-) diff --git a/src/batgrl/app.py b/src/batgrl/app.py index b8bc0fee..d65788fa 100644 --- a/src/batgrl/app.py +++ b/src/batgrl/app.py @@ -301,8 +301,8 @@ def determine_nclicks(mouse_event: MouseEvent) -> None: last_mouse_nclicks = mouse_event.nclicks last_mouse_time = current_time - def dispatch_events(events: list[Event]) -> None: - """Dispatch input events.""" + def event_handler(events: list[Event]) -> None: + """Handle input events.""" for event in events: if isinstance(event, KeyEvent): if ( @@ -357,7 +357,7 @@ async def auto_render(): render_root(root, terminal) await asyncio.sleep(self.render_interval) - with app_mode(terminal, dispatch_events): + with app_mode(terminal, event_handler): terminal.request_cursor_position_report() if self.title: terminal.set_title(self.title) diff --git a/src/batgrl/terminal/__init__.py b/src/batgrl/terminal/__init__.py index 6b787ef0..852d10f5 100644 --- a/src/batgrl/terminal/__init__.py +++ b/src/batgrl/terminal/__init__.py @@ -43,7 +43,7 @@ def get_platform_terminal() -> Vt100Terminal: @contextmanager def app_mode( terminal: Vt100Terminal, - dispatch_events: Callable[[list[Event]], None], + event_handler: Callable[[list[Event]], None], ) -> ContextManager[None]: """ Put terminal into app mode and dispatch input events. @@ -52,12 +52,12 @@ def app_mode( ---------- terminal : Vt100Terminal Terminal to put in app mode. - dispatch_events : Callable[[list[Event]], None] - A callable that dispatches terminal input events. + event_handler : Callable[[list[Event]], None] + A callable that handles terminal input events. """ try: terminal.raw_mode() - terminal.attach(dispatch_events) + terminal.attach(event_handler) terminal.enable_mouse_support() terminal.enable_bracketed_paste() terminal.enable_reporting_focus() diff --git a/src/batgrl/terminal/linux_terminal.py b/src/batgrl/terminal/linux_terminal.py index 9bf112ee..7a0cf121 100644 --- a/src/batgrl/terminal/linux_terminal.py +++ b/src/batgrl/terminal/linux_terminal.py @@ -28,8 +28,8 @@ class LinuxTerminal(Vt100Terminal): Last reported cursor position. """ - def events(self) -> list[Event]: - """Return events from VT100 input stream.""" + def process_stdin(self) -> None: + """Read from stdin and feed data into input parser to generate events.""" reads = [] while select.select([STDIN], [], [], 0)[0]: try: @@ -42,10 +42,6 @@ def events(self) -> list[Event]: data = b"".join(reads).decode(errors="surrogateescape") self.feed(data) - events = self._events - self._events = [] - return events - def raw_mode(self) -> None: """Set terminal to raw mode.""" self._original_mode = termios.tcgetattr(STDIN) @@ -64,26 +60,31 @@ def restore_console(self) -> None: termios.tcsetattr(STDIN, termios.TCSANOW, self._original_mode) del self._original_mode - def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None: - """Dispatch events through ``dispatch_events`` whenever stdin has data.""" - self._event_dispatcher = dispatch_events - self._events.clear() + def attach(self, event_handler: Callable[[list[Event]], None]) -> None: + """ + Start generating events from stdin. + + ``event_handler`` will be called with generated events. + """ + self._event_buffer.clear() + self._event_handler = event_handler def process(): - dispatch_events(self.events()) + self.process_stdin() + event_handler(self.events()) loop = asyncio.get_running_loop() loop.add_reader(STDIN, process) def on_resize(*_): - self._events.append(ResizeEvent(self.get_size())) + self._event_buffer.append(ResizeEvent(self.get_size())) loop.call_soon_threadsafe(process) signal.signal(signal.SIGWINCH, on_resize) def unattach(self) -> None: - """Stop dispatching input events.""" - self._event_dispatcher = None + """Stop generating events from stdin.""" + self._event_handler = None loop = asyncio.get_running_loop() loop.remove_reader(STDIN) signal.signal(signal.SIGWINCH, signal.SIG_DFL) diff --git a/src/batgrl/terminal/vt100_terminal.py b/src/batgrl/terminal/vt100_terminal.py index 754cfc5e..57d326a9 100644 --- a/src/batgrl/terminal/vt100_terminal.py +++ b/src/batgrl/terminal/vt100_terminal.py @@ -75,12 +75,21 @@ def __init__(self): """Whether the alternate screen buffer is enabled.""" self.last_cursor_position_response: Point = Point(0, 0) """Last reported cursor position.""" - self._state: ParserState = ParserState.GROUND - """State of VT100 input parser.""" + self._escape_buffer: StringIO | None = None """Escape sequence buffer.""" self._paste_buffer: StringIO | None = None """Paste buffer.""" + self._event_buffer: list[Event] = [] + """Events generated during input parsing.""" + self._out_buffer: list[str] = [] + """ + Output buffer. + + Escapes for stdout are collected here before ``flush()`` is called. + """ + self._state: ParserState = ParserState.GROUND + """State of VT100 input parser.""" self._reset_timer_handle: asyncio.TimerHandle | None = None """Timeout handle for executing escape buffer.""" self._expect_device_status_report: bool = False @@ -89,21 +98,13 @@ def __init__(self): """Last mouse y-coordinate.""" self._last_x: int = 0 """Laste mouse x-coordinate.""" - self._events: list[Event] = [] - """Events generated during input parsing.""" self._last_drs_request_time: float = monotonic() """When the last device status report was requested.""" - self._out_buffer: list[str] = [] - """ - Output buffer. - - Escapes for stdout are collected here before ``flush()`` is called. - """ - self._event_dispatcher: Callable[[list[Event]], None] | None = None + self._event_handler: Callable[[list[Event]], None] | None = None @abstractmethod - def events(self) -> list[Event]: - """Return events from VT100 input stream.""" + def process_stdin(self) -> None: + """Read from stdin and feed data into input parser to generate events.""" @abstractmethod def raw_mode(self) -> None: @@ -114,12 +115,22 @@ def restore_console(self) -> None: """Restore console to its original mode.""" @abstractmethod - def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None: - """Dispatch events through ``dispatch_events`` whenever stdin has data.""" + def attach(self, event_handler: Callable[[list[Event]], None]) -> None: + """ + Start generating events from stdin. + + ``event_handler`` will be called with generated events. + """ @abstractmethod def unattach(self) -> None: - """Stop dispatching input events.""" + """Stop generating events from stdin.""" + + def events(self) -> list[Event]: + """Return a list of input events and reset the event buffer.""" + events = self._event_buffer + self._event_buffer = [] + return events def get_size(self) -> Size: """Get terminal size.""" @@ -157,7 +168,7 @@ def _feed1(self, char: str) -> None: if char == "~": paste = self._paste_buffer.getvalue() if paste.endswith(BRACKETED_PASTE_END): - self._events.append(PasteEvent(paste[:-6])) + self._event_buffer.append(PasteEvent(paste[:-6])) self._paste_buffer = None self._state = ParserState.GROUND elif self._state is ParserState.GROUND: @@ -170,7 +181,7 @@ def _feed1(self, char: str) -> None: self._escape_buffer.write(char) self._execute() else: - self._events.append(KeyEvent(char)) + self._event_buffer.append(KeyEvent(char)) elif self._state is ParserState.ESCAPE: if char == "\x1b": self._execute() @@ -216,7 +227,7 @@ def _execute(self) -> None: self._expect_device_status_report = False y, x = cpr_match.groups() self.last_cursor_position_response = Point(int(y) - 1, int(x) - 1) - self._events.append( + self._event_buffer.append( CursorPositionResponseEvent(self.last_cursor_position_response) ) return @@ -225,9 +236,9 @@ def _execute(self) -> None: self._state = ParserState.PASTE self._paste_buffer = StringIO(newline=None) elif escape == FOCUS_IN: - self._events.append(FocusEvent("in")) + self._event_buffer.append(FocusEvent("in")) elif escape == FOCUS_OUT: - self._events.append(FocusEvent("out")) + self._event_buffer.append(FocusEvent("out")) elif sgr_match := MOUSE_SGR_RE.match(escape): info = int(sgr_match[1]) y = int(sgr_match[3]) - 1 @@ -254,15 +265,15 @@ def _execute(self) -> None: alt = bool(info & 8) ctrl = bool(info & 16) - self._events.append( + self._event_buffer.append( MouseEvent(Point(y, x), button, event_type, alt, ctrl, shift, dy, dx) ) elif escape in ANSI_ESCAPES: - self._events.append(KeyEvent(*ANSI_ESCAPES[escape])) + self._event_buffer.append(KeyEvent(*ANSI_ESCAPES[escape])) elif len(escape) == 2 and 32 <= ord(escape[1]) <= 126: - self._events.append(KeyEvent(escape[1], alt=True)) + self._event_buffer.append(KeyEvent(escape[1], alt=True)) else: - self._events.append(UnknownEscapeSequence(escape)) + self._event_buffer.append(UnknownEscapeSequence(escape)) def _reset_escape(self): """Execute escape buffer after a timeout period.""" @@ -278,15 +289,13 @@ def _reset_escape(self): ending = paste[partial_escape_index:] if BRACKETED_PASTE_END[: len(ending)] == ending: paste = paste[:partial_escape_index] - self._events.append(PasteEvent(paste)) + self._event_buffer.append(PasteEvent(paste)) self._paste_buffer = None else: self._execute() - if self._event_dispatcher is not None: - events = self._events - self._events = [] - self._event_dispatcher(events) + if self._event_handler is not None: + self._event_handler(self.events()) def flush(self): """Write buffer to output stream and flush.""" diff --git a/src/batgrl/terminal/windows_terminal.py b/src/batgrl/terminal/windows_terminal.py index 78e64aa2..1a084190 100644 --- a/src/batgrl/terminal/windows_terminal.py +++ b/src/batgrl/terminal/windows_terminal.py @@ -161,15 +161,8 @@ class WindowsTerminal(Vt100Terminal): Last reported cursor position. """ - def _purge(self, reads: list[str]): - data = ( - "".join(reads).encode("utf-16", "surrogatepass").decode("utf-16") - ) # Merge surrogate pairs. - reads.clear() - self.feed(data) - - def events(self) -> list[Event]: - """Return events from VT100 input stream.""" + def process_stdin(self) -> None: + """Read from stdin and feed data into input parser to generate events.""" nevents = DWORD() windll.kernel32.GetNumberOfConsoleInputEvents(STDIN, byref(nevents)) InputRecordArray = INPUT_RECORD * nevents.value @@ -181,18 +174,23 @@ def events(self) -> list[Event]: for input_record in input_records: if input_record.EventType == KEY_EVENT: key_event = input_record.Event.KeyEvent - if key_event.KeyDown: - if key_event.ControlKeyState != 0 and key_event.VirtualKeyCode == 0: - continue - chars.append(key_event.uChar.UnicodeChar) + if not key_event.KeyDown: + continue + if key_event.ControlKeyState and not key_event.VirtualKeyCode: + continue + chars.append(key_event.uChar.UnicodeChar) elif input_record.EventType == WINDOW_BUFFER_SIZE_EVENT: self._purge(chars) size = input_record.Event.WindowBufferSizeEvent.Size - self._events.append(ResizeEvent(Size(size.Y, size.X))) + self._event_buffer.append(ResizeEvent(Size(size.Y, size.X))) self._purge(chars) - events = self._events - self._events = [] - return events + + def _purge(self, chars: list[str]): + data = ( + "".join(chars).encode("utf-16", "surrogatepass").decode("utf-16") + ) # Merge surrogate pairs. + chars.clear() + self.feed(data) def raw_mode(self) -> None: """Set terminal to raw mode.""" @@ -212,10 +210,14 @@ def restore_console(self) -> None: windll.kernel32.SetConsoleMode(STDOUT, self._original_output_mode) del self._original_input_mode, self._original_output_mode - def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None: - """Dispatch events through ``dispatch_events`` whenever stdin has data.""" - self._event_dispatcher = dispatch_events - self._events.clear() + def attach(self, event_handler: Callable[[list[Event]], None]) -> None: + """ + Start generating events from stdin. + + ``event_handler`` will be called with generated events. + """ + self._event_buffer.clear() + self._event_handler = event_handler loop = asyncio.get_running_loop() wait_for = windll.kernel32.WaitForMultipleObjects self._remove_event = HANDLE( @@ -227,7 +229,8 @@ def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None: def ready(): try: - dispatch_events(self.events()) + self.process_stdin() + event_handler(self.events()) finally: loop.run_in_executor(None, wait) @@ -241,8 +244,8 @@ def wait(): loop.run_in_executor(None, wait) def unattach(self) -> None: - """Stop dispatching input events.""" - self._event_dispatcher = None + """Stop generating events from stdin.""" + self._event_handler = None windll.kernel32.SetEvent(self._remove_event)