Make C1 handling UTF-8 safe

This commit is contained in:
GitHub Copilot
2026-02-01 09:45:23 +00:00
parent fadde71aa5
commit 381d844068
5 changed files with 144 additions and 49 deletions
+61 -21
View File
@@ -52,26 +52,62 @@ PM_C1 = b"\x9e"
APC_C1 = b"\x9f"
def _normalize_c1_controls(data: bytes) -> bytes:
if (
CSI_C1 not in data
and OSC_C1 not in data
and ST_C1 not in data
and DCS_C1 not in data
and SOS_C1 not in data
and PM_C1 not in data
and APC_C1 not in data
):
return data
return (
data.replace(CSI_C1, b"\x1b[")
.replace(OSC_C1, b"\x1b]")
.replace(ST_C1, b"\x1b\\")
.replace(DCS_C1, b"\x1bP")
.replace(SOS_C1, b"\x1bX")
.replace(PM_C1, b"\x1b^")
.replace(APC_C1, b"\x1b_")
)
def _normalize_c1_controls(data: bytes, utf8_buffer: bytes = b"") -> tuple[bytes, bytes]:
if not data and not utf8_buffer:
return b"", b""
data = utf8_buffer + data
out = bytearray()
pending_utf8 = bytearray()
expected_continuations = 0
c1_map = {
0x9B: b"\x1b[",
0x9D: b"\x1b]",
0x9C: b"\x1b\\",
0x90: b"\x1bP",
0x98: b"\x1bX",
0x9E: b"\x1b^",
0x9F: b"\x1b_",
}
idx = 0
while idx < len(data):
byte = data[idx]
if expected_continuations:
if 0x80 <= byte <= 0xBF:
pending_utf8.append(byte)
expected_continuations -= 1
idx += 1
if expected_continuations == 0:
out.extend(pending_utf8)
pending_utf8.clear()
continue
out.extend(pending_utf8)
pending_utf8.clear()
expected_continuations = 0
continue
if 0xC2 <= byte <= 0xDF:
pending_utf8.append(byte)
expected_continuations = 1
idx += 1
continue
if 0xE0 <= byte <= 0xEF:
pending_utf8.append(byte)
expected_continuations = 2
idx += 1
continue
if 0xF0 <= byte <= 0xF4:
pending_utf8.append(byte)
expected_continuations = 3
idx += 1
continue
replacement = c1_map.get(byte)
if replacement is not None:
out.extend(replacement)
else:
out.append(byte)
idx += 1
if pending_utf8:
return bytes(out), bytes(pending_utf8)
return bytes(out), b""
@dataclass(frozen=True)
@@ -113,6 +149,7 @@ class DockerExecSession(Session):
self._pending_output = b""
# Buffer for handling escape sequences split across socket reads
self._escape_buffer = b""
self._utf8_buffer = b""
def __repr__(self) -> str:
return (
@@ -281,7 +318,10 @@ class DockerExecSession(Session):
async def _update_screen(self, data: bytes) -> None:
async with self._screen_lock:
try:
self._stream.feed(_normalize_c1_controls(data))
normalized, self._utf8_buffer = _normalize_c1_controls(data, self._utf8_buffer)
if not normalized:
return
self._stream.feed(normalized)
if self._screen.dirty:
self._change_counter += 1
except Exception as exc:
+8 -7
View File
@@ -86,7 +86,8 @@ class DockerWatcher:
# Read status line
status_line = await reader.readline()
status_code = int(status_line.decode().split()[1])
status_line_text = status_line.decode("utf-8", errors="replace")
status_code = int(status_line_text.split()[1])
# Read headers
content_length = 0
@@ -95,7 +96,7 @@ class DockerWatcher:
line = await reader.readline()
if line == b"\r\n":
break
header = line.decode().lower()
header = line.decode("utf-8", errors="replace").lower()
if header.startswith("content-length:"):
content_length = int(header.split(":")[1].strip())
if "transfer-encoding: chunked" in header:
@@ -106,15 +107,15 @@ class DockerWatcher:
body_parts = []
while True:
size_line = await reader.readline()
size = int(size_line.decode().strip(), 16)
size = int(size_line.decode("utf-8", errors="replace").strip(), 16)
if size == 0:
break
chunk = await reader.readexactly(size)
body_parts.append(chunk)
await reader.readline() # trailing CRLF
body = b"".join(body_parts).decode()
body = b"".join(body_parts).decode("utf-8", errors="replace")
elif content_length > 0:
body = (await reader.readexactly(content_length)).decode()
body = (await reader.readexactly(content_length)).decode("utf-8", errors="replace")
else:
body = ""
@@ -252,7 +253,7 @@ class DockerWatcher:
if not size_line:
break
try:
size = int(size_line.decode().strip(), 16)
size = int(size_line.decode("utf-8", errors="replace").strip(), 16)
except ValueError:
continue
if size == 0:
@@ -262,7 +263,7 @@ class DockerWatcher:
await reader.readline() # trailing CRLF
try:
event = json.loads(chunk.decode())
event = json.loads(chunk.decode("utf-8", errors="replace"))
await self._handle_event(event)
except json.JSONDecodeError:
continue
+61 -21
View File
@@ -55,26 +55,62 @@ PM_C1 = b"\x9e"
APC_C1 = b"\x9f"
def _normalize_c1_controls(data: bytes) -> bytes:
if (
CSI_C1 not in data
and OSC_C1 not in data
and ST_C1 not in data
and DCS_C1 not in data
and SOS_C1 not in data
and PM_C1 not in data
and APC_C1 not in data
):
return data
return (
data.replace(CSI_C1, b"\x1b[")
.replace(OSC_C1, b"\x1b]")
.replace(ST_C1, b"\x1b\\")
.replace(DCS_C1, b"\x1bP")
.replace(SOS_C1, b"\x1bX")
.replace(PM_C1, b"\x1b^")
.replace(APC_C1, b"\x1b_")
)
def _normalize_c1_controls(data: bytes, utf8_buffer: bytes = b"") -> tuple[bytes, bytes]:
if not data and not utf8_buffer:
return b"", b""
data = utf8_buffer + data
out = bytearray()
pending_utf8 = bytearray()
expected_continuations = 0
c1_map = {
0x9B: b"\x1b[",
0x9D: b"\x1b]",
0x9C: b"\x1b\\",
0x90: b"\x1bP",
0x98: b"\x1bX",
0x9E: b"\x1b^",
0x9F: b"\x1b_",
}
idx = 0
while idx < len(data):
byte = data[idx]
if expected_continuations:
if 0x80 <= byte <= 0xBF:
pending_utf8.append(byte)
expected_continuations -= 1
idx += 1
if expected_continuations == 0:
out.extend(pending_utf8)
pending_utf8.clear()
continue
out.extend(pending_utf8)
pending_utf8.clear()
expected_continuations = 0
continue
if 0xC2 <= byte <= 0xDF:
pending_utf8.append(byte)
expected_continuations = 1
idx += 1
continue
if 0xE0 <= byte <= 0xEF:
pending_utf8.append(byte)
expected_continuations = 2
idx += 1
continue
if 0xF0 <= byte <= 0xF4:
pending_utf8.append(byte)
expected_continuations = 3
idx += 1
continue
replacement = c1_map.get(byte)
if replacement is not None:
out.extend(replacement)
else:
out.append(byte)
idx += 1
if pending_utf8:
return bytes(out), bytes(pending_utf8)
return bytes(out), b""
class TerminalSession(Session):
@@ -107,6 +143,7 @@ class TerminalSession(Session):
self._last_snapshot_counter = 0
# Buffer for handling escape sequences split across reads
self._escape_buffer = b""
self._utf8_buffer = b""
super().__init__()
def __repr__(self) -> str:
@@ -219,7 +256,10 @@ class TerminalSession(Session):
"""Update the pyte screen with new terminal data."""
async with self._screen_lock:
try:
self._stream.feed(_normalize_c1_controls(data))
normalized, self._utf8_buffer = _normalize_c1_controls(data, self._utf8_buffer)
if not normalized:
return
self._stream.feed(normalized)
# Increment change counter when screen is modified
if self._screen.dirty:
self._change_counter += 1
+7
View File
@@ -178,6 +178,13 @@ async def test_update_screen_logs_on_exception(docker_exec_session):
assert warn.called
@pytest.mark.asyncio
async def test_update_screen_preserves_utf8_bytes_with_c1_values(docker_exec_session):
await docker_exec_session._update_screen("✓ ok\r\n".encode())
lines = await docker_exec_session.get_screen_lines()
assert "✓ ok" in lines[0]
@pytest.mark.asyncio
async def test_add_to_replay_buffer_trims_old_data(docker_exec_session):
first_chunk = b"a" * (REPLAY_BUFFER_SIZE - 1)
+7
View File
@@ -117,6 +117,13 @@ class TestTerminalSession:
assert lines[1] == "Updated Line 2"
assert lines[2] == "Line 3"
@pytest.mark.asyncio
async def test_screen_preserves_utf8_bytes_with_c1_values(self, terminal_session):
"""Ensure UTF-8 bytes containing 0x9c aren't corrupted by C1 normalization."""
await terminal_session._update_screen("✓ ok\r\n".encode())
lines = await terminal_session.get_screen_lines()
assert "✓ ok" in lines[0]
@pytest.mark.asyncio
async def test_get_screen_state_returns_dirty_flag(self, terminal_session):
"""Test that get_screen_state returns has_changes flag based on pyte dirty tracking."""