From e85213315ea9be27b90e9238ddd6505bbedc8c8c Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Sat, 24 Jan 2026 11:14:12 +0000 Subject: [PATCH] fix: use session's actual screen state for screenshots The screenshot was creating a new pyte screen with arbitrary dimensions from query params, but the replay buffer contains ANSI sequences meant for the session's actual terminal size. This mismatch caused wrapping. Now we use get_screen_state() which returns the actual screen buffer from the terminal session's pyte screen, with the correct dimensions. This ensures the screenshot matches exactly what the terminal rendered. --- src/textual_webterm/local_server.py | 64 ++++++++----------------- src/textual_webterm/terminal_session.py | 28 +++++++++++ tests/test_local_server_unit.py | 50 ++++++++++++------- 3 files changed, 81 insertions(+), 61 deletions(-) diff --git a/src/textual_webterm/local_server.py b/src/textual_webterm/local_server.py index 4418e53..a0c7f19 100644 --- a/src/textual_webterm/local_server.py +++ b/src/textual_webterm/local_server.py @@ -14,7 +14,6 @@ from pathlib import Path from typing import TYPE_CHECKING import aiohttp -import pyte from aiohttp import WSMsgType, web from rich.console import Console from rich.style import Style @@ -471,7 +470,7 @@ class LocalServer: ) session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key)) - if session_process is None or not hasattr(session_process, "get_screen_lines"): + if session_process is None or not hasattr(session_process, "get_screen_state"): raise web.HTTPNotFound(text="Session not found") # If nothing has changed since the last render, serve cached screenshot without @@ -483,29 +482,9 @@ class LocalServer: if cached_response is not None: return cached_response - try: - width = int(request.query.get("width", "120")) - except ValueError: - width = 120 - width = max(10, min(400, width)) - - try: - height = int(request.query.get("height", str(DISCONNECT_RESIZE[1]))) - except ValueError: - height = DISCONNECT_RESIZE[1] - height = max(5, min(200, height)) - - # Hybrid approach for colored screenshots: - # 1. Get screen dimensions from pyte (accurate viewport) - # 2. Get raw ANSI from replay buffer for colors - # 3. Use pyte to interpret the ANSI and get proper screen state - # 4. Render through Rich for SVG with colors - replay_data = await session_process.get_replay_buffer() # type: ignore[union-attr] - # Limit replay data to prevent excessive processing - max_replay = 128 * 1024 # 128KB should be plenty for a screen - if len(replay_data) > max_replay: - replay_data = replay_data[-max_replay:] - ansi_text = replay_data.decode("utf-8", errors="replace") + # Get the actual screen state from the terminal session's pyte screen + # This uses the correct dimensions that match what the terminal is rendering + screen_width, screen_height, screen_buffer = await session_process.get_screen_state() # type: ignore[union-attr] now = asyncio.get_event_loop().time() ttl = self._get_screenshot_cache_ttl(route_key, now) @@ -538,34 +517,31 @@ class LocalServer: return cached_response def _render_svg() -> str: - # Use pyte to interpret ANSI sequences and get accurate screen state - screen = pyte.Screen(width, height) - stream = pyte.Stream(screen) - stream.feed(ansi_text) + # Use the session's screen buffer directly - this has the correct + # dimensions matching the actual terminal, preventing wrapping issues + console = Console( + record=True, width=screen_width, height=screen_height, file=io.StringIO() + ) - # Convert pyte screen buffer to Rich Text with colors - console = Console(record=True, width=width, height=height, file=io.StringIO()) - - for row in range(height): + for row_data in screen_buffer: line = Text() - for col in range(width): - char = screen.buffer[row][col] - char_data = char.data if char.data else " " + for char in row_data: + char_data = char["data"] # Build Rich style from pyte character attributes # Map pyte color names to Rich-compatible names style_kwargs = {} - if char.fg != "default": - style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char.fg, char.fg) - if char.bg != "default": - style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char.bg, char.bg) - if char.bold: + if char["fg"] != "default": + style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char["fg"], char["fg"]) + if char["bg"] != "default": + style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char["bg"], char["bg"]) + if char["bold"]: style_kwargs["bold"] = True - if char.italics: + if char["italics"]: style_kwargs["italic"] = True - if char.underscore: + if char["underscore"]: style_kwargs["underline"] = True - if char.reverse: + if char["reverse"]: style_kwargs["reverse"] = True if style_kwargs: diff --git a/src/textual_webterm/terminal_session.py b/src/textual_webterm/terminal_session.py index 86ad039..8795670 100644 --- a/src/textual_webterm/terminal_session.py +++ b/src/textual_webterm/terminal_session.py @@ -138,6 +138,34 @@ class TerminalSession(Session): async with self._screen_lock: return [line.rstrip() for line in self._screen.display] + async def get_screen_state(self) -> tuple[int, int, list]: + """Get the current screen state including dimensions and character buffer. + + Returns: + Tuple of (width, height, buffer) where buffer is a list of rows, + each row containing character data with styling attributes. + """ + async with self._screen_lock: + width = self._screen.columns + height = self._screen.lines + # Copy the buffer data to avoid holding the lock + buffer = [] + for row in range(height): + row_data = [] + for col in range(width): + char = self._screen.buffer[row][col] + row_data.append({ + "data": char.data if char.data else " ", + "fg": char.fg, + "bg": char.bg, + "bold": char.bold, + "italics": char.italics, + "underscore": char.underscore, + "reverse": char.reverse, + }) + buffer.append(row_data) + return (width, height, buffer) + def update_connector(self, connector: SessionConnector) -> None: """Update the connector for reconnection without restarting the session.""" self._connector = connector diff --git a/tests/test_local_server_unit.py b/tests/test_local_server_unit.py index ea1378e..0ba48ca 100644 --- a/tests/test_local_server_unit.py +++ b/tests/test_local_server_unit.py @@ -191,11 +191,15 @@ class TestLocalServerHelpers: @pytest.mark.asyncio async def test_screenshot_svg_handler_returns_svg(self, server, monkeypatch, capsys): request = MagicMock() - request.query = {"route_key": "rk", "width": "80"} + request.query = {"route_key": "rk"} + # Mock screen state: width=80, height=2, buffer with "hello" on first line + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["hello", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"hello\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: session) @@ -210,11 +214,15 @@ class TestLocalServerHelpers: @pytest.mark.asyncio async def test_screenshot_creates_session_for_known_slug(self, server, monkeypatch): request = MagicMock() - request.query = {"route_key": "known", "width": "90"} + request.query = {"route_key": "known"} + # Mock screen state + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "world" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["world", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"world\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) # Pretend app exists for slug "known" server.session_manager.apps_by_slug["known"] = App( @@ -546,7 +554,7 @@ class TestLocalServerMoreCoverage: request.headers = {} session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["SHOULD_NOT_BE_READ"]) + session.get_screen_state = AsyncMock(return_value=(80, 2, [])) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) server_with_no_apps._screenshot_cache["rk"] = (0.0, "cached") @@ -556,17 +564,21 @@ class TestLocalServerMoreCoverage: resp = await server_with_no_apps._handle_screenshot(request) assert "cached" in resp.text - session.get_screen_lines.assert_not_awaited() + session.get_screen_state.assert_not_awaited() @pytest.mark.asyncio - async def test_handle_screenshot_invalid_width_height_defaults(self, server_with_no_apps, monkeypatch): + async def test_handle_screenshot_renders_screen_state(self, server_with_no_apps, monkeypatch): request = MagicMock() - request.query = {"route_key": "rk", "width": "nope", "height": "nope"} + request.query = {"route_key": "rk"} request.headers = {} + # Mock screen state with some content + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["hello", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"hello\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) resp = await server_with_no_apps._handle_screenshot(request) @@ -736,15 +748,19 @@ class TestLocalServerMoreCoverage: assert created is True @pytest.mark.asyncio - async def test_handle_screenshot_uses_replay_buffer_with_pyte(self, server_with_no_apps, monkeypatch): - """Test that screenshot uses replay buffer with pyte for colored rendering.""" + async def test_handle_screenshot_uses_screen_state(self, server_with_no_apps, monkeypatch): + """Test that screenshot uses get_screen_state for rendering.""" request = MagicMock() request.query = {"route_key": "rk"} request.headers = {} + # Mock screen state + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line1" + " " * 75], + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line2" + " " * 75], + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["line1", "line2", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"line1\r\nline2\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) server_with_no_apps._route_last_activity["rk"] = 1.0 @@ -752,4 +768,4 @@ class TestLocalServerMoreCoverage: resp = await server_with_no_apps._handle_screenshot(request) assert resp.content_type == "image/svg+xml" assert "