diff --git a/src/webterm/docker_exec_session.py b/src/webterm/docker_exec_session.py index 47733db..3d3615c 100644 --- a/src/webterm/docker_exec_session.py +++ b/src/webterm/docker_exec_session.py @@ -42,6 +42,37 @@ DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[[?>=][\d;]*c") # These need to be held back until more data arrives to see if they complete DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:[?>=][\d;]*)?)?$") +# Map C1 control sequences to 7-bit ESC equivalents for pyte compatibility +CSI_C1 = b"\x9b" +OSC_C1 = b"\x9d" +ST_C1 = b"\x9c" +DCS_C1 = b"\x90" +SOS_C1 = b"\x98" +PM_C1 = b"\x9e" +APC_C1 = b"\x9f" + + +def _normalize_c1_controls(data: bytes) -> bytes: + if ( + CSI_C1 not in data + and OSC_C1 not in data + and ST_C1 not in data + and DCS_C1 not in data + and SOS_C1 not in data + and PM_C1 not in data + and APC_C1 not in data + ): + return data + return ( + data.replace(CSI_C1, b"\x1b[") + .replace(OSC_C1, b"\x1b]") + .replace(ST_C1, b"\x1b\\") + .replace(DCS_C1, b"\x1bP") + .replace(SOS_C1, b"\x1bX") + .replace(PM_C1, b"\x1b^") + .replace(APC_C1, b"\x1b_") + ) + @dataclass(frozen=True) class DockerExecSpec: @@ -72,7 +103,7 @@ class DockerExecSession(Session): self._replay_buffer_size = 0 self._replay_lock = asyncio.Lock() self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) - self._stream = pyte.Stream(self._screen) + self._stream = pyte.ByteStream(self._screen) self._screen_lock = asyncio.Lock() self._last_width = DEFAULT_SCREEN_WIDTH self._last_height = DEFAULT_SCREEN_HEIGHT @@ -220,7 +251,7 @@ class DockerExecSession(Session): self._last_height = height async with self._screen_lock: self._screen = AltScreen(width, height) - self._stream = pyte.Stream(self._screen) + self._stream = pyte.ByteStream(self._screen) exec_id = await asyncio.to_thread(self._create_exec) self._exec_id = exec_id self._sock = await asyncio.to_thread(self._start_exec_socket, exec_id) @@ -250,8 +281,7 @@ class DockerExecSession(Session): async def _update_screen(self, data: bytes) -> None: async with self._screen_lock: try: - text = data.decode("utf-8", errors="replace") - self._stream.feed(text) + self._stream.feed(_normalize_c1_controls(data)) if self._screen.dirty: self._change_counter += 1 except Exception as exc: diff --git a/src/webterm/terminal_session.py b/src/webterm/terminal_session.py index 3d5a087..b9259ae 100644 --- a/src/webterm/terminal_session.py +++ b/src/webterm/terminal_session.py @@ -45,6 +45,37 @@ DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[[?>=][\d;]*c") # Matches: \x1b, \x1b[, \x1b[?, \x1b[>, \x1b[=, \x1b[?1, \x1b[>1;10, etc. DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:[?>=][\d;]*)?)?$") +# Map C1 control sequences to 7-bit ESC equivalents for pyte compatibility +CSI_C1 = b"\x9b" +OSC_C1 = b"\x9d" +ST_C1 = b"\x9c" +DCS_C1 = b"\x90" +SOS_C1 = b"\x98" +PM_C1 = b"\x9e" +APC_C1 = b"\x9f" + + +def _normalize_c1_controls(data: bytes) -> bytes: + if ( + CSI_C1 not in data + and OSC_C1 not in data + and ST_C1 not in data + and DCS_C1 not in data + and SOS_C1 not in data + and PM_C1 not in data + and APC_C1 not in data + ): + return data + return ( + data.replace(CSI_C1, b"\x1b[") + .replace(OSC_C1, b"\x1b]") + .replace(ST_C1, b"\x1b\\") + .replace(DCS_C1, b"\x1bP") + .replace(SOS_C1, b"\x1bX") + .replace(PM_C1, b"\x1b^") + .replace(APC_C1, b"\x1b_") + ) + class TerminalSession(Session): """A session that manages a terminal.""" @@ -66,7 +97,7 @@ class TerminalSession(Session): self._replay_lock = asyncio.Lock() # pyte screen for accurate terminal state tracking (AltScreen for alternate buffer support) self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) - self._stream = pyte.Stream(self._screen) + self._stream = pyte.ByteStream(self._screen) self._screen_lock = asyncio.Lock() # Track last known terminal size for reconnection self._last_width = DEFAULT_SCREEN_WIDTH @@ -96,7 +127,7 @@ class TerminalSession(Session): # Initialize pyte screen with the requested size (under lock to prevent races) async with self._screen_lock: self._screen = AltScreen(width, height) - self._stream = pyte.Stream(self._screen) + self._stream = pyte.ByteStream(self._screen) pid, master_fd = pty.fork() self.pid = pid @@ -188,8 +219,7 @@ class TerminalSession(Session): """Update the pyte screen with new terminal data.""" async with self._screen_lock: try: - text = data.decode("utf-8", errors="replace") - self._stream.feed(text) + self._stream.feed(_normalize_c1_controls(data)) # Increment change counter when screen is modified if self._screen.dirty: self._change_counter += 1 diff --git a/tests/test_terminal_session.py b/tests/test_terminal_session.py index 25871d5..399e6e9 100644 --- a/tests/test_terminal_session.py +++ b/tests/test_terminal_session.py @@ -104,6 +104,19 @@ class TestTerminalSession: assert lines[1] == "Updated Line 2" assert lines[2] == "Line 3" + @pytest.mark.asyncio + async def test_screen_handles_c1_csi_sequences(self, terminal_session): + """Ensure C1 CSI (0x9b) sequences are parsed for clearing lines.""" + await terminal_session._update_screen(b"Line 1\r\nLine 2\r\nLine 3\r\n") + # C1 CSI equivalent of ESC[2;1H ESC[K + await terminal_session._update_screen(b"\x9b2;1H\x9bKUpdated Line 2") + + lines = await terminal_session.get_screen_lines() + + assert lines[0] == "Line 1" + assert lines[1] == "Updated Line 2" + assert lines[2] == "Line 3" + @pytest.mark.asyncio async def test_get_screen_state_returns_dirty_flag(self, terminal_session): """Test that get_screen_state returns has_changes flag based on pyte dirty tracking."""