From 381d84406874fdb6d6771638fd1c629762d8f641 Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Sun, 1 Feb 2026 09:45:23 +0000 Subject: [PATCH] Make C1 handling UTF-8 safe --- src/webterm/docker_exec_session.py | 82 ++++++++++++++++++++++-------- src/webterm/docker_watcher.py | 15 +++--- src/webterm/terminal_session.py | 82 ++++++++++++++++++++++-------- tests/test_docker_exec_session.py | 7 +++ tests/test_terminal_session.py | 7 +++ 5 files changed, 144 insertions(+), 49 deletions(-) diff --git a/src/webterm/docker_exec_session.py b/src/webterm/docker_exec_session.py index 3d3615c..0d578c1 100644 --- a/src/webterm/docker_exec_session.py +++ b/src/webterm/docker_exec_session.py @@ -52,26 +52,62 @@ PM_C1 = b"\x9e" APC_C1 = b"\x9f" -def _normalize_c1_controls(data: bytes) -> bytes: - if ( - CSI_C1 not in data - and OSC_C1 not in data - and ST_C1 not in data - and DCS_C1 not in data - and SOS_C1 not in data - and PM_C1 not in data - and APC_C1 not in data - ): - return data - return ( - data.replace(CSI_C1, b"\x1b[") - .replace(OSC_C1, b"\x1b]") - .replace(ST_C1, b"\x1b\\") - .replace(DCS_C1, b"\x1bP") - .replace(SOS_C1, b"\x1bX") - .replace(PM_C1, b"\x1b^") - .replace(APC_C1, b"\x1b_") - ) +def _normalize_c1_controls(data: bytes, utf8_buffer: bytes = b"") -> tuple[bytes, bytes]: + if not data and not utf8_buffer: + return b"", b"" + data = utf8_buffer + data + out = bytearray() + pending_utf8 = bytearray() + expected_continuations = 0 + c1_map = { + 0x9B: b"\x1b[", + 0x9D: b"\x1b]", + 0x9C: b"\x1b\\", + 0x90: b"\x1bP", + 0x98: b"\x1bX", + 0x9E: b"\x1b^", + 0x9F: b"\x1b_", + } + idx = 0 + while idx < len(data): + byte = data[idx] + if expected_continuations: + if 0x80 <= byte <= 0xBF: + pending_utf8.append(byte) + expected_continuations -= 1 + idx += 1 + if expected_continuations == 0: + out.extend(pending_utf8) + pending_utf8.clear() + continue + out.extend(pending_utf8) + pending_utf8.clear() + expected_continuations = 0 + continue + if 0xC2 <= byte <= 0xDF: + pending_utf8.append(byte) + expected_continuations = 1 + idx += 1 + continue + if 0xE0 <= byte <= 0xEF: + pending_utf8.append(byte) + expected_continuations = 2 + idx += 1 + continue + if 0xF0 <= byte <= 0xF4: + pending_utf8.append(byte) + expected_continuations = 3 + idx += 1 + continue + replacement = c1_map.get(byte) + if replacement is not None: + out.extend(replacement) + else: + out.append(byte) + idx += 1 + if pending_utf8: + return bytes(out), bytes(pending_utf8) + return bytes(out), b"" @dataclass(frozen=True) @@ -113,6 +149,7 @@ class DockerExecSession(Session): self._pending_output = b"" # Buffer for handling escape sequences split across socket reads self._escape_buffer = b"" + self._utf8_buffer = b"" def __repr__(self) -> str: return ( @@ -281,7 +318,10 @@ class DockerExecSession(Session): async def _update_screen(self, data: bytes) -> None: async with self._screen_lock: try: - self._stream.feed(_normalize_c1_controls(data)) + normalized, self._utf8_buffer = _normalize_c1_controls(data, self._utf8_buffer) + if not normalized: + return + self._stream.feed(normalized) if self._screen.dirty: self._change_counter += 1 except Exception as exc: diff --git a/src/webterm/docker_watcher.py b/src/webterm/docker_watcher.py index 8c44359..03be044 100644 --- a/src/webterm/docker_watcher.py +++ b/src/webterm/docker_watcher.py @@ -86,7 +86,8 @@ class DockerWatcher: # Read status line status_line = await reader.readline() - status_code = int(status_line.decode().split()[1]) + status_line_text = status_line.decode("utf-8", errors="replace") + status_code = int(status_line_text.split()[1]) # Read headers content_length = 0 @@ -95,7 +96,7 @@ class DockerWatcher: line = await reader.readline() if line == b"\r\n": break - header = line.decode().lower() + header = line.decode("utf-8", errors="replace").lower() if header.startswith("content-length:"): content_length = int(header.split(":")[1].strip()) if "transfer-encoding: chunked" in header: @@ -106,15 +107,15 @@ class DockerWatcher: body_parts = [] while True: size_line = await reader.readline() - size = int(size_line.decode().strip(), 16) + size = int(size_line.decode("utf-8", errors="replace").strip(), 16) if size == 0: break chunk = await reader.readexactly(size) body_parts.append(chunk) await reader.readline() # trailing CRLF - body = b"".join(body_parts).decode() + body = b"".join(body_parts).decode("utf-8", errors="replace") elif content_length > 0: - body = (await reader.readexactly(content_length)).decode() + body = (await reader.readexactly(content_length)).decode("utf-8", errors="replace") else: body = "" @@ -252,7 +253,7 @@ class DockerWatcher: if not size_line: break try: - size = int(size_line.decode().strip(), 16) + size = int(size_line.decode("utf-8", errors="replace").strip(), 16) except ValueError: continue if size == 0: @@ -262,7 +263,7 @@ class DockerWatcher: await reader.readline() # trailing CRLF try: - event = json.loads(chunk.decode()) + event = json.loads(chunk.decode("utf-8", errors="replace")) await self._handle_event(event) except json.JSONDecodeError: continue diff --git a/src/webterm/terminal_session.py b/src/webterm/terminal_session.py index b9259ae..09373d5 100644 --- a/src/webterm/terminal_session.py +++ b/src/webterm/terminal_session.py @@ -55,26 +55,62 @@ PM_C1 = b"\x9e" APC_C1 = b"\x9f" -def _normalize_c1_controls(data: bytes) -> bytes: - if ( - CSI_C1 not in data - and OSC_C1 not in data - and ST_C1 not in data - and DCS_C1 not in data - and SOS_C1 not in data - and PM_C1 not in data - and APC_C1 not in data - ): - return data - return ( - data.replace(CSI_C1, b"\x1b[") - .replace(OSC_C1, b"\x1b]") - .replace(ST_C1, b"\x1b\\") - .replace(DCS_C1, b"\x1bP") - .replace(SOS_C1, b"\x1bX") - .replace(PM_C1, b"\x1b^") - .replace(APC_C1, b"\x1b_") - ) +def _normalize_c1_controls(data: bytes, utf8_buffer: bytes = b"") -> tuple[bytes, bytes]: + if not data and not utf8_buffer: + return b"", b"" + data = utf8_buffer + data + out = bytearray() + pending_utf8 = bytearray() + expected_continuations = 0 + c1_map = { + 0x9B: b"\x1b[", + 0x9D: b"\x1b]", + 0x9C: b"\x1b\\", + 0x90: b"\x1bP", + 0x98: b"\x1bX", + 0x9E: b"\x1b^", + 0x9F: b"\x1b_", + } + idx = 0 + while idx < len(data): + byte = data[idx] + if expected_continuations: + if 0x80 <= byte <= 0xBF: + pending_utf8.append(byte) + expected_continuations -= 1 + idx += 1 + if expected_continuations == 0: + out.extend(pending_utf8) + pending_utf8.clear() + continue + out.extend(pending_utf8) + pending_utf8.clear() + expected_continuations = 0 + continue + if 0xC2 <= byte <= 0xDF: + pending_utf8.append(byte) + expected_continuations = 1 + idx += 1 + continue + if 0xE0 <= byte <= 0xEF: + pending_utf8.append(byte) + expected_continuations = 2 + idx += 1 + continue + if 0xF0 <= byte <= 0xF4: + pending_utf8.append(byte) + expected_continuations = 3 + idx += 1 + continue + replacement = c1_map.get(byte) + if replacement is not None: + out.extend(replacement) + else: + out.append(byte) + idx += 1 + if pending_utf8: + return bytes(out), bytes(pending_utf8) + return bytes(out), b"" class TerminalSession(Session): @@ -107,6 +143,7 @@ class TerminalSession(Session): self._last_snapshot_counter = 0 # Buffer for handling escape sequences split across reads self._escape_buffer = b"" + self._utf8_buffer = b"" super().__init__() def __repr__(self) -> str: @@ -219,7 +256,10 @@ class TerminalSession(Session): """Update the pyte screen with new terminal data.""" async with self._screen_lock: try: - self._stream.feed(_normalize_c1_controls(data)) + normalized, self._utf8_buffer = _normalize_c1_controls(data, self._utf8_buffer) + if not normalized: + return + self._stream.feed(normalized) # Increment change counter when screen is modified if self._screen.dirty: self._change_counter += 1 diff --git a/tests/test_docker_exec_session.py b/tests/test_docker_exec_session.py index 370b515..214facc 100644 --- a/tests/test_docker_exec_session.py +++ b/tests/test_docker_exec_session.py @@ -178,6 +178,13 @@ async def test_update_screen_logs_on_exception(docker_exec_session): assert warn.called +@pytest.mark.asyncio +async def test_update_screen_preserves_utf8_bytes_with_c1_values(docker_exec_session): + await docker_exec_session._update_screen("✓ ok\r\n".encode()) + lines = await docker_exec_session.get_screen_lines() + assert "✓ ok" in lines[0] + + @pytest.mark.asyncio async def test_add_to_replay_buffer_trims_old_data(docker_exec_session): first_chunk = b"a" * (REPLAY_BUFFER_SIZE - 1) diff --git a/tests/test_terminal_session.py b/tests/test_terminal_session.py index 399e6e9..33ef181 100644 --- a/tests/test_terminal_session.py +++ b/tests/test_terminal_session.py @@ -117,6 +117,13 @@ class TestTerminalSession: assert lines[1] == "Updated Line 2" assert lines[2] == "Line 3" + @pytest.mark.asyncio + async def test_screen_preserves_utf8_bytes_with_c1_values(self, terminal_session): + """Ensure UTF-8 bytes containing 0x9c aren't corrupted by C1 normalization.""" + await terminal_session._update_screen("✓ ok\r\n".encode()) + lines = await terminal_session.get_screen_lines() + assert "✓ ok" in lines[0] + @pytest.mark.asyncio async def test_get_screen_state_returns_dirty_flag(self, terminal_session): """Test that get_screen_state returns has_changes flag based on pyte dirty tracking."""