diff --git a/src/textual_webterm/local_server.py b/src/textual_webterm/local_server.py index 4418e53..a0c7f19 100644 --- a/src/textual_webterm/local_server.py +++ b/src/textual_webterm/local_server.py @@ -14,7 +14,6 @@ from pathlib import Path from typing import TYPE_CHECKING import aiohttp -import pyte from aiohttp import WSMsgType, web from rich.console import Console from rich.style import Style @@ -471,7 +470,7 @@ class LocalServer: ) session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key)) - if session_process is None or not hasattr(session_process, "get_screen_lines"): + if session_process is None or not hasattr(session_process, "get_screen_state"): raise web.HTTPNotFound(text="Session not found") # If nothing has changed since the last render, serve cached screenshot without @@ -483,29 +482,9 @@ class LocalServer: if cached_response is not None: return cached_response - try: - width = int(request.query.get("width", "120")) - except ValueError: - width = 120 - width = max(10, min(400, width)) - - try: - height = int(request.query.get("height", str(DISCONNECT_RESIZE[1]))) - except ValueError: - height = DISCONNECT_RESIZE[1] - height = max(5, min(200, height)) - - # Hybrid approach for colored screenshots: - # 1. Get screen dimensions from pyte (accurate viewport) - # 2. Get raw ANSI from replay buffer for colors - # 3. Use pyte to interpret the ANSI and get proper screen state - # 4. Render through Rich for SVG with colors - replay_data = await session_process.get_replay_buffer() # type: ignore[union-attr] - # Limit replay data to prevent excessive processing - max_replay = 128 * 1024 # 128KB should be plenty for a screen - if len(replay_data) > max_replay: - replay_data = replay_data[-max_replay:] - ansi_text = replay_data.decode("utf-8", errors="replace") + # Get the actual screen state from the terminal session's pyte screen + # This uses the correct dimensions that match what the terminal is rendering + screen_width, screen_height, screen_buffer = await session_process.get_screen_state() # type: ignore[union-attr] now = asyncio.get_event_loop().time() ttl = self._get_screenshot_cache_ttl(route_key, now) @@ -538,34 +517,31 @@ class LocalServer: return cached_response def _render_svg() -> str: - # Use pyte to interpret ANSI sequences and get accurate screen state - screen = pyte.Screen(width, height) - stream = pyte.Stream(screen) - stream.feed(ansi_text) + # Use the session's screen buffer directly - this has the correct + # dimensions matching the actual terminal, preventing wrapping issues + console = Console( + record=True, width=screen_width, height=screen_height, file=io.StringIO() + ) - # Convert pyte screen buffer to Rich Text with colors - console = Console(record=True, width=width, height=height, file=io.StringIO()) - - for row in range(height): + for row_data in screen_buffer: line = Text() - for col in range(width): - char = screen.buffer[row][col] - char_data = char.data if char.data else " " + for char in row_data: + char_data = char["data"] # Build Rich style from pyte character attributes # Map pyte color names to Rich-compatible names style_kwargs = {} - if char.fg != "default": - style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char.fg, char.fg) - if char.bg != "default": - style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char.bg, char.bg) - if char.bold: + if char["fg"] != "default": + style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char["fg"], char["fg"]) + if char["bg"] != "default": + style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char["bg"], char["bg"]) + if char["bold"]: style_kwargs["bold"] = True - if char.italics: + if char["italics"]: style_kwargs["italic"] = True - if char.underscore: + if char["underscore"]: style_kwargs["underline"] = True - if char.reverse: + if char["reverse"]: style_kwargs["reverse"] = True if style_kwargs: diff --git a/src/textual_webterm/terminal_session.py b/src/textual_webterm/terminal_session.py index 86ad039..8795670 100644 --- a/src/textual_webterm/terminal_session.py +++ b/src/textual_webterm/terminal_session.py @@ -138,6 +138,34 @@ class TerminalSession(Session): async with self._screen_lock: return [line.rstrip() for line in self._screen.display] + async def get_screen_state(self) -> tuple[int, int, list]: + """Get the current screen state including dimensions and character buffer. + + Returns: + Tuple of (width, height, buffer) where buffer is a list of rows, + each row containing character data with styling attributes. + """ + async with self._screen_lock: + width = self._screen.columns + height = self._screen.lines + # Copy the buffer data to avoid holding the lock + buffer = [] + for row in range(height): + row_data = [] + for col in range(width): + char = self._screen.buffer[row][col] + row_data.append({ + "data": char.data if char.data else " ", + "fg": char.fg, + "bg": char.bg, + "bold": char.bold, + "italics": char.italics, + "underscore": char.underscore, + "reverse": char.reverse, + }) + buffer.append(row_data) + return (width, height, buffer) + def update_connector(self, connector: SessionConnector) -> None: """Update the connector for reconnection without restarting the session.""" self._connector = connector diff --git a/tests/test_local_server_unit.py b/tests/test_local_server_unit.py index ea1378e..0ba48ca 100644 --- a/tests/test_local_server_unit.py +++ b/tests/test_local_server_unit.py @@ -191,11 +191,15 @@ class TestLocalServerHelpers: @pytest.mark.asyncio async def test_screenshot_svg_handler_returns_svg(self, server, monkeypatch, capsys): request = MagicMock() - request.query = {"route_key": "rk", "width": "80"} + request.query = {"route_key": "rk"} + # Mock screen state: width=80, height=2, buffer with "hello" on first line + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["hello", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"hello\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: session) @@ -210,11 +214,15 @@ class TestLocalServerHelpers: @pytest.mark.asyncio async def test_screenshot_creates_session_for_known_slug(self, server, monkeypatch): request = MagicMock() - request.query = {"route_key": "known", "width": "90"} + request.query = {"route_key": "known"} + # Mock screen state + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "world" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["world", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"world\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) # Pretend app exists for slug "known" server.session_manager.apps_by_slug["known"] = App( @@ -546,7 +554,7 @@ class TestLocalServerMoreCoverage: request.headers = {} session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["SHOULD_NOT_BE_READ"]) + session.get_screen_state = AsyncMock(return_value=(80, 2, [])) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) server_with_no_apps._screenshot_cache["rk"] = (0.0, "cached") @@ -556,17 +564,21 @@ class TestLocalServerMoreCoverage: resp = await server_with_no_apps._handle_screenshot(request) assert "cached" in resp.text - session.get_screen_lines.assert_not_awaited() + session.get_screen_state.assert_not_awaited() @pytest.mark.asyncio - async def test_handle_screenshot_invalid_width_height_defaults(self, server_with_no_apps, monkeypatch): + async def test_handle_screenshot_renders_screen_state(self, server_with_no_apps, monkeypatch): request = MagicMock() - request.query = {"route_key": "rk", "width": "nope", "height": "nope"} + request.query = {"route_key": "rk"} request.headers = {} + # Mock screen state with some content + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75], + [{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80, + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["hello", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"hello\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) resp = await server_with_no_apps._handle_screenshot(request) @@ -736,15 +748,19 @@ class TestLocalServerMoreCoverage: assert created is True @pytest.mark.asyncio - async def test_handle_screenshot_uses_replay_buffer_with_pyte(self, server_with_no_apps, monkeypatch): - """Test that screenshot uses replay buffer with pyte for colored rendering.""" + async def test_handle_screenshot_uses_screen_state(self, server_with_no_apps, monkeypatch): + """Test that screenshot uses get_screen_state for rendering.""" request = MagicMock() request.query = {"route_key": "rk"} request.headers = {} + # Mock screen state + screen_buffer = [ + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line1" + " " * 75], + [{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line2" + " " * 75], + ] session = MagicMock() - session.get_screen_lines = AsyncMock(return_value=["line1", "line2", ""]) - session.get_replay_buffer = AsyncMock(return_value=b"line1\r\nline2\r\n") + session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer)) monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) server_with_no_apps._route_last_activity["rk"] = 1.0 @@ -752,4 +768,4 @@ class TestLocalServerMoreCoverage: resp = await server_with_no_apps._handle_screenshot(request) assert resp.content_type == "image/svg+xml" assert "