Filter DA1 responses from replay buffer on WebSocket connect

The replay buffer can contain DA1/DA2 terminal attribute responses
(e.g., \x1b[?1;10;0c) that were captured before filtering was added
to the session classes. These responses appear as visible text like
'1;10;0c' when sent to the client on reconnect.

This adds an additional filter pass when sending the replay buffer,
ensuring no DA1 responses reach the client regardless of when they
were captured.
This commit is contained in:
GitHub Copilot
2026-01-29 19:13:40 +00:00
parent 3c4e62b572
commit d5343117d3
9 changed files with 48 additions and 33 deletions
+8 -6
View File
@@ -31,12 +31,12 @@ DEFAULT_SCREEN_HEIGHT = 45
# Pattern to filter out terminal device attribute responses that cause display issues
# These are responses to queries that shouldn't be displayed as text.
# Matches complete DA1/DA2 responses like \x1b[?1;10;0c or \x1b[?64;1;2;...c
DA_RESPONSE_PATTERN = re.compile(rb'\x1b\[\?[\d;]+c')
DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c")
# Pattern to detect partial DA responses at end of data (incomplete escape sequence)
# Matches: \x1b, \x1b[, \x1b[?, \x1b[?1, \x1b[?1;, \x1b[?1;10, etc.
# These need to be held back until more data arrives to see if they complete
DA_PARTIAL_PATTERN = re.compile(rb'\x1b(?:\[(?:\?[\d;]*)?)?$')
DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:\?[\d;]*)?)?$")
@dataclass(frozen=True)
@@ -160,7 +160,9 @@ class DockerExecSession(Session):
"Tty": True,
"Cmd": self.exec_spec.command,
}
response = self._request_json("POST", f"/containers/{self.exec_spec.container}/exec", payload)
response = self._request_json(
"POST", f"/containers/{self.exec_spec.container}/exec", payload
)
exec_id = response.get("Id")
if not isinstance(exec_id, str) or not exec_id:
raise RuntimeError("Docker API did not return exec ID")
@@ -316,7 +318,7 @@ class DockerExecSession(Session):
self._escape_buffer = b""
# Filter out complete DA1/DA2 responses (e.g., \x1b[?1;10;0c)
data = DA_RESPONSE_PATTERN.sub(b'', data)
data = DA_RESPONSE_PATTERN.sub(b"", data)
if not data:
continue
@@ -324,8 +326,8 @@ class DockerExecSession(Session):
# Hold it back until we get more data to see if it completes
match = DA_PARTIAL_PATTERN.search(data)
if match:
self._escape_buffer = data[match.start():]
data = data[:match.start()]
self._escape_buffer = data[match.start() :]
data = data[: match.start()]
if not data:
continue
+22 -2
View File
@@ -7,6 +7,7 @@ import contextlib
import hashlib
import json
import logging
import re
import signal
import time
from pathlib import Path
@@ -26,6 +27,11 @@ from .session_manager import SessionManager
from .svg_exporter import render_terminal_svg
from .types import Meta, RouteKey, SessionID
# Pattern to filter terminal device attribute responses (DA1/DA2) from replay buffer.
# These responses can appear as visible text like "1;10;0c" if split across reads.
# See docker_exec_session.py and terminal_session.py for main filtering.
DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c")
if TYPE_CHECKING:
from .config import Config
@@ -324,10 +330,20 @@ class LocalServer:
self._docker_stats = DockerStatsCollector(compose_project=self._compose_project)
if self._docker_stats.available:
# Pass service names (not slugs) for Docker matching
service_names = [app.name for app in (self._landing_apps if self._compose_mode else self.session_manager.apps)]
service_names = [
app.name
for app in (
self._landing_apps if self._compose_mode else self.session_manager.apps
)
]
self._docker_stats.start(service_names)
# Create slug->name mapping for lookups
self._slug_to_service = {app.slug: app.name for app in (self._landing_apps if self._compose_mode else self.session_manager.apps)}
self._slug_to_service = {
app.slug: app.name
for app in (
self._landing_apps if self._compose_mode else self.session_manager.apps
)
}
log.info("Slug to service mapping: %s", self._slug_to_service)
stack.push_async_callback(self._docker_stats.stop)
@@ -457,6 +473,10 @@ class LocalServer:
if session_created and session is not None and hasattr(session, "get_replay_buffer"):
replay = await session.get_replay_buffer()
if replay:
# Filter out any DA1/DA2 responses that may have been captured
# in the replay buffer before filtering was added to session classes
replay = DA_RESPONSE_PATTERN.sub(b"", replay)
if replay:
await ws.send_bytes(replay)
+5 -5
View File
@@ -35,11 +35,11 @@ DEFAULT_SCREEN_HEIGHT = 45
# Pattern to filter out terminal device attribute responses that cause display issues
# These are responses to DA1/DA2 queries that shouldn't be displayed as text
# Matches complete responses like \x1b[?1;10;0c or \x1b[?64;1;2;...c
DA_RESPONSE_PATTERN = re.compile(rb'\x1b\[\?[\d;]+c')
DA_RESPONSE_PATTERN = re.compile(rb"\x1b\[\?[\d;]+c")
# Pattern to detect partial DA responses at end of data (incomplete escape sequence)
# Matches: \x1b, \x1b[, \x1b[?, \x1b[?1, \x1b[?1;, etc.
DA_PARTIAL_PATTERN = re.compile(rb'\x1b(?:\[(?:\?[\d;]*)?)?$')
DA_PARTIAL_PATTERN = re.compile(rb"\x1b(?:\[(?:\?[\d;]*)?)?$")
class TerminalSession(Session):
@@ -331,7 +331,7 @@ class TerminalSession(Session):
self._escape_buffer = b""
# Filter out complete DA1/DA2 responses (e.g., \x1b[?1;10;0c)
data = DA_RESPONSE_PATTERN.sub(b'', data)
data = DA_RESPONSE_PATTERN.sub(b"", data)
if not data:
continue
@@ -339,8 +339,8 @@ class TerminalSession(Session):
# Hold it back until we get more data to see if it completes
match = DA_PARTIAL_PATTERN.search(data)
if match:
self._escape_buffer = data[match.start():]
data = data[:match.start()]
self._escape_buffer = data[match.start() :]
data = data[: match.start()]
if not data:
continue
+1
View File
@@ -40,6 +40,7 @@ class TestCLI:
def test_cli_runs_default_shell(self, monkeypatch):
import os
calls: dict[str, object] = {}
class FakeServer:
-2
View File
@@ -7,7 +7,6 @@ from webterm import cli
def test_cli_landing_manifest_runs(monkeypatch, tmp_path: Path):
manifest = tmp_path / "landing.yaml"
manifest.write_text(
"""
@@ -40,7 +39,6 @@ def test_cli_landing_manifest_runs(monkeypatch, tmp_path: Path):
def test_cli_compose_manifest_runs(monkeypatch, tmp_path: Path):
manifest = tmp_path / "compose.yaml"
manifest.write_text(
"""
@@ -16,6 +16,7 @@ from webterm.types import RouteKey, SessionID
if TYPE_CHECKING:
from collections.abc import AsyncIterator
async def _make_client(server: LocalServer) -> TestClient:
app = web.Application()
app.add_routes(server._build_routes())
+1
View File
@@ -215,6 +215,7 @@ class TestSessionManager:
assert result is not None
assert isinstance(result, DockerExecSession)
class TestSessionManagerRoutes:
"""Tests for SessionManager route handling."""
+4 -12
View File
@@ -168,17 +168,11 @@ class TestTerminalSession:
session = TerminalSession(mock_poller, "test-session", command)
with (
patch(
"webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)
) as mock_fork,
patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)) as mock_fork,
patch("webterm.terminal_session.version", return_value="0.0.0"),
patch("webterm.terminal_session.shlex.split", wraps=shlex.split) as mock_split,
patch(
"webterm.terminal_session.os.execvp", side_effect=OSError()
) as mock_execvp,
patch(
"webterm.terminal_session.os._exit", side_effect=SystemExit(1)
) as mock_exit,
patch("webterm.terminal_session.os.execvp", side_effect=OSError()) as mock_execvp,
patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
pytest.raises(SystemExit),
):
await session.open()
@@ -209,9 +203,7 @@ class TestTerminalSession:
with (
patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)),
patch("webterm.terminal_session.shlex.split", side_effect=ValueError("bad")),
patch(
"webterm.terminal_session.os._exit", side_effect=SystemExit(1)
) as mock_exit,
patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
pytest.raises(SystemExit),
):
await session.open()