Skip to content

Commit

Permalink
Minor rewrite of Vt100Terminal for clarity.
Browse files Browse the repository at this point in the history
  • Loading branch information
salt-die committed Jul 22, 2024
1 parent 43438ec commit e378cab
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 75 deletions.
6 changes: 3 additions & 3 deletions src/batgrl/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,8 @@ def determine_nclicks(mouse_event: MouseEvent) -> None:
last_mouse_nclicks = mouse_event.nclicks
last_mouse_time = current_time

def dispatch_events(events: list[Event]) -> None:
"""Dispatch input events."""
def event_handler(events: list[Event]) -> None:
"""Handle input events."""
for event in events:
if isinstance(event, KeyEvent):
if (
Expand Down Expand Up @@ -357,7 +357,7 @@ async def auto_render():
render_root(root, terminal)
await asyncio.sleep(self.render_interval)

with app_mode(terminal, dispatch_events):
with app_mode(terminal, event_handler):
terminal.request_cursor_position_report()
if self.title:
terminal.set_title(self.title)
Expand Down
8 changes: 4 additions & 4 deletions src/batgrl/terminal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_platform_terminal() -> Vt100Terminal:
@contextmanager
def app_mode(
terminal: Vt100Terminal,
dispatch_events: Callable[[list[Event]], None],
event_handler: Callable[[list[Event]], None],
) -> ContextManager[None]:
"""
Put terminal into app mode and dispatch input events.
Expand All @@ -52,12 +52,12 @@ def app_mode(
----------
terminal : Vt100Terminal
Terminal to put in app mode.
dispatch_events : Callable[[list[Event]], None]
A callable that dispatches terminal input events.
event_handler : Callable[[list[Event]], None]
A callable that handles terminal input events.
"""
try:
terminal.raw_mode()
terminal.attach(dispatch_events)
terminal.attach(event_handler)
terminal.enable_mouse_support()
terminal.enable_bracketed_paste()
terminal.enable_reporting_focus()
Expand Down
29 changes: 15 additions & 14 deletions src/batgrl/terminal/linux_terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ class LinuxTerminal(Vt100Terminal):
Last reported cursor position.
"""

def events(self) -> list[Event]:
"""Return events from VT100 input stream."""
def process_stdin(self) -> None:
"""Read from stdin and feed data into input parser to generate events."""
reads = []
while select.select([STDIN], [], [], 0)[0]:
try:
Expand All @@ -42,10 +42,6 @@ def events(self) -> list[Event]:
data = b"".join(reads).decode(errors="surrogateescape")
self.feed(data)

events = self._events
self._events = []
return events

def raw_mode(self) -> None:
"""Set terminal to raw mode."""
self._original_mode = termios.tcgetattr(STDIN)
Expand All @@ -64,26 +60,31 @@ def restore_console(self) -> None:
termios.tcsetattr(STDIN, termios.TCSANOW, self._original_mode)
del self._original_mode

def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None:
"""Dispatch events through ``dispatch_events`` whenever stdin has data."""
self._event_dispatcher = dispatch_events
self._events.clear()
def attach(self, event_handler: Callable[[list[Event]], None]) -> None:
"""
Start generating events from stdin.
``event_handler`` will be called with generated events.
"""
self._event_buffer.clear()
self._event_handler = event_handler

def process():
dispatch_events(self.events())
self.process_stdin()
event_handler(self.events())

loop = asyncio.get_running_loop()
loop.add_reader(STDIN, process)

def on_resize(*_):
self._events.append(ResizeEvent(self.get_size()))
self._event_buffer.append(ResizeEvent(self.get_size()))
loop.call_soon_threadsafe(process)

signal.signal(signal.SIGWINCH, on_resize)

def unattach(self) -> None:
"""Stop dispatching input events."""
self._event_dispatcher = None
"""Stop generating events from stdin."""
self._event_handler = None
loop = asyncio.get_running_loop()
loop.remove_reader(STDIN)
signal.signal(signal.SIGWINCH, signal.SIG_DFL)
69 changes: 39 additions & 30 deletions src/batgrl/terminal/vt100_terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,21 @@ def __init__(self):
"""Whether the alternate screen buffer is enabled."""
self.last_cursor_position_response: Point = Point(0, 0)
"""Last reported cursor position."""
self._state: ParserState = ParserState.GROUND
"""State of VT100 input parser."""

self._escape_buffer: StringIO | None = None
"""Escape sequence buffer."""
self._paste_buffer: StringIO | None = None
"""Paste buffer."""
self._event_buffer: list[Event] = []
"""Events generated during input parsing."""
self._out_buffer: list[str] = []
"""
Output buffer.
Escapes for stdout are collected here before ``flush()`` is called.
"""
self._state: ParserState = ParserState.GROUND
"""State of VT100 input parser."""
self._reset_timer_handle: asyncio.TimerHandle | None = None
"""Timeout handle for executing escape buffer."""
self._expect_device_status_report: bool = False
Expand All @@ -89,21 +98,13 @@ def __init__(self):
"""Last mouse y-coordinate."""
self._last_x: int = 0
"""Laste mouse x-coordinate."""
self._events: list[Event] = []
"""Events generated during input parsing."""
self._last_drs_request_time: float = monotonic()
"""When the last device status report was requested."""
self._out_buffer: list[str] = []
"""
Output buffer.
Escapes for stdout are collected here before ``flush()`` is called.
"""
self._event_dispatcher: Callable[[list[Event]], None] | None = None
self._event_handler: Callable[[list[Event]], None] | None = None

@abstractmethod
def events(self) -> list[Event]:
"""Return events from VT100 input stream."""
def process_stdin(self) -> None:
"""Read from stdin and feed data into input parser to generate events."""

@abstractmethod
def raw_mode(self) -> None:
Expand All @@ -114,12 +115,22 @@ def restore_console(self) -> None:
"""Restore console to its original mode."""

@abstractmethod
def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None:
"""Dispatch events through ``dispatch_events`` whenever stdin has data."""
def attach(self, event_handler: Callable[[list[Event]], None]) -> None:
"""
Start generating events from stdin.
``event_handler`` will be called with generated events.
"""

@abstractmethod
def unattach(self) -> None:
"""Stop dispatching input events."""
"""Stop generating events from stdin."""

def events(self) -> list[Event]:
"""Return a list of input events and reset the event buffer."""
events = self._event_buffer
self._event_buffer = []
return events

def get_size(self) -> Size:
"""Get terminal size."""
Expand Down Expand Up @@ -157,7 +168,7 @@ def _feed1(self, char: str) -> None:
if char == "~":
paste = self._paste_buffer.getvalue()
if paste.endswith(BRACKETED_PASTE_END):
self._events.append(PasteEvent(paste[:-6]))
self._event_buffer.append(PasteEvent(paste[:-6]))
self._paste_buffer = None
self._state = ParserState.GROUND
elif self._state is ParserState.GROUND:
Expand All @@ -170,7 +181,7 @@ def _feed1(self, char: str) -> None:
self._escape_buffer.write(char)
self._execute()
else:
self._events.append(KeyEvent(char))
self._event_buffer.append(KeyEvent(char))
elif self._state is ParserState.ESCAPE:
if char == "\x1b":
self._execute()
Expand Down Expand Up @@ -216,7 +227,7 @@ def _execute(self) -> None:
self._expect_device_status_report = False
y, x = cpr_match.groups()
self.last_cursor_position_response = Point(int(y) - 1, int(x) - 1)
self._events.append(
self._event_buffer.append(
CursorPositionResponseEvent(self.last_cursor_position_response)
)
return
Expand All @@ -225,9 +236,9 @@ def _execute(self) -> None:
self._state = ParserState.PASTE
self._paste_buffer = StringIO(newline=None)
elif escape == FOCUS_IN:
self._events.append(FocusEvent("in"))
self._event_buffer.append(FocusEvent("in"))
elif escape == FOCUS_OUT:
self._events.append(FocusEvent("out"))
self._event_buffer.append(FocusEvent("out"))
elif sgr_match := MOUSE_SGR_RE.match(escape):
info = int(sgr_match[1])
y = int(sgr_match[3]) - 1
Expand All @@ -254,15 +265,15 @@ def _execute(self) -> None:
alt = bool(info & 8)
ctrl = bool(info & 16)

self._events.append(
self._event_buffer.append(
MouseEvent(Point(y, x), button, event_type, alt, ctrl, shift, dy, dx)
)
elif escape in ANSI_ESCAPES:
self._events.append(KeyEvent(*ANSI_ESCAPES[escape]))
self._event_buffer.append(KeyEvent(*ANSI_ESCAPES[escape]))
elif len(escape) == 2 and 32 <= ord(escape[1]) <= 126:
self._events.append(KeyEvent(escape[1], alt=True))
self._event_buffer.append(KeyEvent(escape[1], alt=True))
else:
self._events.append(UnknownEscapeSequence(escape))
self._event_buffer.append(UnknownEscapeSequence(escape))

def _reset_escape(self):
"""Execute escape buffer after a timeout period."""
Expand All @@ -278,15 +289,13 @@ def _reset_escape(self):
ending = paste[partial_escape_index:]
if BRACKETED_PASTE_END[: len(ending)] == ending:
paste = paste[:partial_escape_index]
self._events.append(PasteEvent(paste))
self._event_buffer.append(PasteEvent(paste))
self._paste_buffer = None
else:
self._execute()

if self._event_dispatcher is not None:
events = self._events
self._events = []
self._event_dispatcher(events)
if self._event_handler is not None:
self._event_handler(self.events())

def flush(self):
"""Write buffer to output stream and flush."""
Expand Down
51 changes: 27 additions & 24 deletions src/batgrl/terminal/windows_terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,8 @@ class WindowsTerminal(Vt100Terminal):
Last reported cursor position.
"""

def _purge(self, reads: list[str]):
data = (
"".join(reads).encode("utf-16", "surrogatepass").decode("utf-16")
) # Merge surrogate pairs.
reads.clear()
self.feed(data)

def events(self) -> list[Event]:
"""Return events from VT100 input stream."""
def process_stdin(self) -> None:
"""Read from stdin and feed data into input parser to generate events."""
nevents = DWORD()
windll.kernel32.GetNumberOfConsoleInputEvents(STDIN, byref(nevents))
InputRecordArray = INPUT_RECORD * nevents.value
Expand All @@ -181,18 +174,23 @@ def events(self) -> list[Event]:
for input_record in input_records:
if input_record.EventType == KEY_EVENT:
key_event = input_record.Event.KeyEvent
if key_event.KeyDown:
if key_event.ControlKeyState != 0 and key_event.VirtualKeyCode == 0:
continue
chars.append(key_event.uChar.UnicodeChar)
if not key_event.KeyDown:
continue
if key_event.ControlKeyState and not key_event.VirtualKeyCode:
continue
chars.append(key_event.uChar.UnicodeChar)
elif input_record.EventType == WINDOW_BUFFER_SIZE_EVENT:
self._purge(chars)
size = input_record.Event.WindowBufferSizeEvent.Size
self._events.append(ResizeEvent(Size(size.Y, size.X)))
self._event_buffer.append(ResizeEvent(Size(size.Y, size.X)))
self._purge(chars)
events = self._events
self._events = []
return events

def _purge(self, chars: list[str]):
data = (
"".join(chars).encode("utf-16", "surrogatepass").decode("utf-16")
) # Merge surrogate pairs.
chars.clear()
self.feed(data)

def raw_mode(self) -> None:
"""Set terminal to raw mode."""
Expand All @@ -212,10 +210,14 @@ def restore_console(self) -> None:
windll.kernel32.SetConsoleMode(STDOUT, self._original_output_mode)
del self._original_input_mode, self._original_output_mode

def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None:
"""Dispatch events through ``dispatch_events`` whenever stdin has data."""
self._event_dispatcher = dispatch_events
self._events.clear()
def attach(self, event_handler: Callable[[list[Event]], None]) -> None:
"""
Start generating events from stdin.
``event_handler`` will be called with generated events.
"""
self._event_buffer.clear()
self._event_handler = event_handler
loop = asyncio.get_running_loop()
wait_for = windll.kernel32.WaitForMultipleObjects
self._remove_event = HANDLE(
Expand All @@ -227,7 +229,8 @@ def attach(self, dispatch_events: Callable[[list[Event]], None]) -> None:

def ready():
try:
dispatch_events(self.events())
self.process_stdin()
event_handler(self.events())
finally:
loop.run_in_executor(None, wait)

Expand All @@ -241,8 +244,8 @@ def wait():
loop.run_in_executor(None, wait)

def unattach(self) -> None:
"""Stop dispatching input events."""
self._event_dispatcher = None
"""Stop generating events from stdin."""
self._event_handler = None
windll.kernel32.SetEvent(self._remove_event)


Expand Down

0 comments on commit e378cab

Please sign in to comment.