From d5343117d36124cb5150d202c8c8f912f66bde59 Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Thu, 29 Jan 2026 19:13:40 +0000 Subject: [PATCH] Filter DA1 responses from replay buffer on WebSocket connect The replay buffer can contain DA1/DA2 terminal attribute responses (e.g., \x1b[?1;10;0c) that were captured before filtering was added to the session classes. These responses appear as visible text like '1;10;0c' when sent to the client on reconnect. This adds an additional filter pass when sending the replay buffer, ensuring no DA1 responses reach the client regardless of when they were captured. --- src/webterm/docker_exec_session.py | 14 +++++----- src/webterm/local_server.py | 26 ++++++++++++++++--- src/webterm/terminal_session.py | 10 +++---- tests/test_cli.py | 1 + tests/test_cli_landing.py | 2 -- tests/test_docker_stats.py | 10 +++---- ...test_local_server_websocket_integration.py | 1 + tests/test_session_manager.py | 1 + tests/test_terminal_session.py | 16 +++--------- 9 files changed, 48 insertions(+), 33 deletions(-) diff --git a/src/webterm/docker_exec_session.py b/src/webterm/docker_exec_session.py index e2b66d7..a8866e4 100644 --- a/src/webterm/docker_exec_session.py +++ b/src/webterm/docker_exec_session.py @@ -31,12 +31,12 @@ DEFAULT_SCREEN_HEIGHT = 45 # Pattern to filter out terminal device attribute responses that cause display issues # These are responses to queries that shouldn't be displayed as text. # Matches complete DA1/DA2 responses like \x1b[?1;10;0c or \x1b[?64;1;2;...c -DA_RESPONSE_PATTERN = re.compile(rb'\x1b\[\?[\d;]+c') +DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c") # Pattern to detect partial DA responses at end of data (incomplete escape sequence) # Matches: \x1b, \x1b[, \x1b[?, \x1b[?1, \x1b[?1;, \x1b[?1;10, etc. # These need to be held back until more data arrives to see if they complete -DA_PARTIAL_PATTERN = re.compile(rb'\x1b(?:\[(?:\?[\d;]*)?)?$') +DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:\?[\d;]*)?)?$") @dataclass(frozen=True) @@ -160,7 +160,9 @@ class DockerExecSession(Session): "Tty": True, "Cmd": self.exec_spec.command, } - response = self._request_json("POST", f"/containers/{self.exec_spec.container}/exec", payload) + response = self._request_json( + "POST", f"/containers/{self.exec_spec.container}/exec", payload + ) exec_id = response.get("Id") if not isinstance(exec_id, str) or not exec_id: raise RuntimeError("Docker API did not return exec ID") @@ -316,7 +318,7 @@ class DockerExecSession(Session): self._escape_buffer = b"" # Filter out complete DA1/DA2 responses (e.g., \x1b[?1;10;0c) - data = DA_RESPONSE_PATTERN.sub(b'', data) + data = DA_RESPONSE_PATTERN.sub(b"", data) if not data: continue @@ -324,8 +326,8 @@ class DockerExecSession(Session): # Hold it back until we get more data to see if it completes match = DA_PARTIAL_PATTERN.search(data) if match: - self._escape_buffer = data[match.start():] - data = data[:match.start()] + self._escape_buffer = data[match.start() :] + data = data[: match.start()] if not data: continue diff --git a/src/webterm/local_server.py b/src/webterm/local_server.py index 1ca6fc5..6605019 100644 --- a/src/webterm/local_server.py +++ b/src/webterm/local_server.py @@ -7,6 +7,7 @@ import contextlib import hashlib import json import logging +import re import signal import time from pathlib import Path @@ -26,6 +27,11 @@ from .session_manager import SessionManager from .svg_exporter import render_terminal_svg from .types import Meta, RouteKey, SessionID +# Pattern to filter terminal device attribute responses (DA1/DA2) from replay buffer. +# These responses can appear as visible text like "1;10;0c" if split across reads. +# See docker_exec_session.py and terminal_session.py for main filtering. +DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c") + if TYPE_CHECKING: from .config import Config @@ -324,10 +330,20 @@ class LocalServer: self._docker_stats = DockerStatsCollector(compose_project=self._compose_project) if self._docker_stats.available: # Pass service names (not slugs) for Docker matching - service_names = [app.name for app in (self._landing_apps if self._compose_mode else self.session_manager.apps)] + service_names = [ + app.name + for app in ( + self._landing_apps if self._compose_mode else self.session_manager.apps + ) + ] self._docker_stats.start(service_names) # Create slug->name mapping for lookups - self._slug_to_service = {app.slug: app.name for app in (self._landing_apps if self._compose_mode else self.session_manager.apps)} + self._slug_to_service = { + app.slug: app.name + for app in ( + self._landing_apps if self._compose_mode else self.session_manager.apps + ) + } log.info("Slug to service mapping: %s", self._slug_to_service) stack.push_async_callback(self._docker_stats.stop) @@ -458,7 +474,11 @@ class LocalServer: if session_created and session is not None and hasattr(session, "get_replay_buffer"): replay = await session.get_replay_buffer() if replay: - await ws.send_bytes(replay) + # Filter out any DA1/DA2 responses that may have been captured + # in the replay buffer before filtering was added to session classes + replay = DA_RESPONSE_PATTERN.sub(b"", replay) + if replay: + await ws.send_bytes(replay) try: async for msg in ws: diff --git a/src/webterm/terminal_session.py b/src/webterm/terminal_session.py index f88ae0a..3414dc1 100644 --- a/src/webterm/terminal_session.py +++ b/src/webterm/terminal_session.py @@ -35,11 +35,11 @@ DEFAULT_SCREEN_HEIGHT = 45 # Pattern to filter out terminal device attribute responses that cause display issues # These are responses to DA1/DA2 queries that shouldn't be displayed as text # Matches complete responses like \x1b[?1;10;0c or \x1b[?64;1;2;...c -DA_RESPONSE_PATTERN = re.compile(rb'\x1b\[\?[\d;]+c') +DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c") # Pattern to detect partial DA responses at end of data (incomplete escape sequence) # Matches: \x1b, \x1b[, \x1b[?, \x1b[?1, \x1b[?1;, etc. -DA_PARTIAL_PATTERN = re.compile(rb'\x1b(?:\[(?:\?[\d;]*)?)?$') +DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:\?[\d;]*)?)?$") class TerminalSession(Session): @@ -331,7 +331,7 @@ class TerminalSession(Session): self._escape_buffer = b"" # Filter out complete DA1/DA2 responses (e.g., \x1b[?1;10;0c) - data = DA_RESPONSE_PATTERN.sub(b'', data) + data = DA_RESPONSE_PATTERN.sub(b"", data) if not data: continue @@ -339,8 +339,8 @@ class TerminalSession(Session): # Hold it back until we get more data to see if it completes match = DA_PARTIAL_PATTERN.search(data) if match: - self._escape_buffer = data[match.start():] - data = data[:match.start()] + self._escape_buffer = data[match.start() :] + data = data[: match.start()] if not data: continue diff --git a/tests/test_cli.py b/tests/test_cli.py index 8862633..f2d17a7 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -40,6 +40,7 @@ class TestCLI: def test_cli_runs_default_shell(self, monkeypatch): import os + calls: dict[str, object] = {} class FakeServer: diff --git a/tests/test_cli_landing.py b/tests/test_cli_landing.py index f9377ac..f9fd824 100644 --- a/tests/test_cli_landing.py +++ b/tests/test_cli_landing.py @@ -7,7 +7,6 @@ from webterm import cli def test_cli_landing_manifest_runs(monkeypatch, tmp_path: Path): - manifest = tmp_path / "landing.yaml" manifest.write_text( """ @@ -40,7 +39,6 @@ def test_cli_landing_manifest_runs(monkeypatch, tmp_path: Path): def test_cli_compose_manifest_runs(monkeypatch, tmp_path: Path): - manifest = tmp_path / "compose.yaml" manifest.write_text( """ diff --git a/tests/test_docker_stats.py b/tests/test_docker_stats.py index 801049c..5f02283 100644 --- a/tests/test_docker_stats.py +++ b/tests/test_docker_stats.py @@ -172,10 +172,10 @@ class TestDockerStatsCollector: """Services can be added dynamically after start.""" collector = DockerStatsCollector("/nonexistent") collector._service_names = ["svc1"] - + collector.add_service("svc2") assert "svc2" in collector._service_names - + # Adding same service again is a no-op collector.add_service("svc2") assert collector._service_names.count("svc2") == 1 @@ -183,17 +183,17 @@ class TestDockerStatsCollector: def test_remove_service_dynamic(self): """Services can be removed dynamically.""" from collections import deque - + collector = DockerStatsCollector("/nonexistent") collector._service_names = ["svc1", "svc2"] collector._cpu_history["svc1"] = deque([10.0, 20.0]) collector._prev_cpu["svc1"] = (100, 200) - + collector.remove_service("svc1") assert "svc1" not in collector._service_names assert "svc1" not in collector._cpu_history assert "svc1" not in collector._prev_cpu - + # Removing non-existent service is safe collector.remove_service("nonexistent") # Should not raise diff --git a/tests/test_local_server_websocket_integration.py b/tests/test_local_server_websocket_integration.py index d9e1a4a..39ccca6 100644 --- a/tests/test_local_server_websocket_integration.py +++ b/tests/test_local_server_websocket_integration.py @@ -16,6 +16,7 @@ from webterm.types import RouteKey, SessionID if TYPE_CHECKING: from collections.abc import AsyncIterator + async def _make_client(server: LocalServer) -> TestClient: app = web.Application() app.add_routes(server._build_routes()) diff --git a/tests/test_session_manager.py b/tests/test_session_manager.py index 363358d..c95027b 100644 --- a/tests/test_session_manager.py +++ b/tests/test_session_manager.py @@ -215,6 +215,7 @@ class TestSessionManager: assert result is not None assert isinstance(result, DockerExecSession) + class TestSessionManagerRoutes: """Tests for SessionManager route handling.""" diff --git a/tests/test_terminal_session.py b/tests/test_terminal_session.py index 94af94c..25871d5 100644 --- a/tests/test_terminal_session.py +++ b/tests/test_terminal_session.py @@ -168,17 +168,11 @@ class TestTerminalSession: session = TerminalSession(mock_poller, "test-session", command) with ( - patch( - "webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123) - ) as mock_fork, + patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)) as mock_fork, patch("webterm.terminal_session.version", return_value="0.0.0"), patch("webterm.terminal_session.shlex.split", wraps=shlex.split) as mock_split, - patch( - "webterm.terminal_session.os.execvp", side_effect=OSError() - ) as mock_execvp, - patch( - "webterm.terminal_session.os._exit", side_effect=SystemExit(1) - ) as mock_exit, + patch("webterm.terminal_session.os.execvp", side_effect=OSError()) as mock_execvp, + patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit, pytest.raises(SystemExit), ): await session.open() @@ -209,9 +203,7 @@ class TestTerminalSession: with ( patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)), patch("webterm.terminal_session.shlex.split", side_effect=ValueError("bad")), - patch( - "webterm.terminal_session.os._exit", side_effect=SystemExit(1) - ) as mock_exit, + patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit, pytest.raises(SystemExit), ): await session.open()