fix: use session's actual screen state for screenshots

The screenshot was creating a new pyte screen with arbitrary dimensions
from query params, but the replay buffer contains ANSI sequences meant
for the session's actual terminal size. This mismatch caused wrapping.

Now we use get_screen_state() which returns the actual screen buffer
from the terminal session's pyte screen, with the correct dimensions.
This ensures the screenshot matches exactly what the terminal rendered.
This commit is contained in:
GitHub Copilot
2026-01-24 11:14:12 +00:00
parent 3e433f5af5
commit e85213315e
3 changed files with 81 additions and 61 deletions
+20 -44
View File
@@ -14,7 +14,6 @@ from pathlib import Path
from typing import TYPE_CHECKING
import aiohttp
import pyte
from aiohttp import WSMsgType, web
from rich.console import Console
from rich.style import Style
@@ -471,7 +470,7 @@ class LocalServer:
)
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process is None or not hasattr(session_process, "get_screen_lines"):
if session_process is None or not hasattr(session_process, "get_screen_state"):
raise web.HTTPNotFound(text="Session not found")
# If nothing has changed since the last render, serve cached screenshot without
@@ -483,29 +482,9 @@ class LocalServer:
if cached_response is not None:
return cached_response
try:
width = int(request.query.get("width", "120"))
except ValueError:
width = 120
width = max(10, min(400, width))
try:
height = int(request.query.get("height", str(DISCONNECT_RESIZE[1])))
except ValueError:
height = DISCONNECT_RESIZE[1]
height = max(5, min(200, height))
# Hybrid approach for colored screenshots:
# 1. Get screen dimensions from pyte (accurate viewport)
# 2. Get raw ANSI from replay buffer for colors
# 3. Use pyte to interpret the ANSI and get proper screen state
# 4. Render through Rich for SVG with colors
replay_data = await session_process.get_replay_buffer() # type: ignore[union-attr]
# Limit replay data to prevent excessive processing
max_replay = 128 * 1024 # 128KB should be plenty for a screen
if len(replay_data) > max_replay:
replay_data = replay_data[-max_replay:]
ansi_text = replay_data.decode("utf-8", errors="replace")
# Get the actual screen state from the terminal session's pyte screen
# This uses the correct dimensions that match what the terminal is rendering
screen_width, screen_height, screen_buffer = await session_process.get_screen_state() # type: ignore[union-attr]
now = asyncio.get_event_loop().time()
ttl = self._get_screenshot_cache_ttl(route_key, now)
@@ -538,34 +517,31 @@ class LocalServer:
return cached_response
def _render_svg() -> str:
# Use pyte to interpret ANSI sequences and get accurate screen state
screen = pyte.Screen(width, height)
stream = pyte.Stream(screen)
stream.feed(ansi_text)
# Use the session's screen buffer directly - this has the correct
# dimensions matching the actual terminal, preventing wrapping issues
console = Console(
record=True, width=screen_width, height=screen_height, file=io.StringIO()
)
# Convert pyte screen buffer to Rich Text with colors
console = Console(record=True, width=width, height=height, file=io.StringIO())
for row in range(height):
for row_data in screen_buffer:
line = Text()
for col in range(width):
char = screen.buffer[row][col]
char_data = char.data if char.data else " "
for char in row_data:
char_data = char["data"]
# Build Rich style from pyte character attributes
# Map pyte color names to Rich-compatible names
style_kwargs = {}
if char.fg != "default":
style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char.fg, char.fg)
if char.bg != "default":
style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char.bg, char.bg)
if char.bold:
if char["fg"] != "default":
style_kwargs["color"] = PYTE_TO_RICH_COLOR.get(char["fg"], char["fg"])
if char["bg"] != "default":
style_kwargs["bgcolor"] = PYTE_TO_RICH_COLOR.get(char["bg"], char["bg"])
if char["bold"]:
style_kwargs["bold"] = True
if char.italics:
if char["italics"]:
style_kwargs["italic"] = True
if char.underscore:
if char["underscore"]:
style_kwargs["underline"] = True
if char.reverse:
if char["reverse"]:
style_kwargs["reverse"] = True
if style_kwargs:
+28
View File
@@ -138,6 +138,34 @@ class TerminalSession(Session):
async with self._screen_lock:
return [line.rstrip() for line in self._screen.display]
async def get_screen_state(self) -> tuple[int, int, list]:
"""Get the current screen state including dimensions and character buffer.
Returns:
Tuple of (width, height, buffer) where buffer is a list of rows,
each row containing character data with styling attributes.
"""
async with self._screen_lock:
width = self._screen.columns
height = self._screen.lines
# Copy the buffer data to avoid holding the lock
buffer = []
for row in range(height):
row_data = []
for col in range(width):
char = self._screen.buffer[row][col]
row_data.append({
"data": char.data if char.data else " ",
"fg": char.fg,
"bg": char.bg,
"bold": char.bold,
"italics": char.italics,
"underscore": char.underscore,
"reverse": char.reverse,
})
buffer.append(row_data)
return (width, height, buffer)
def update_connector(self, connector: SessionConnector) -> None:
"""Update the connector for reconnection without restarting the session."""
self._connector = connector
+33 -17
View File
@@ -191,11 +191,15 @@ class TestLocalServerHelpers:
@pytest.mark.asyncio
async def test_screenshot_svg_handler_returns_svg(self, server, monkeypatch, capsys):
request = MagicMock()
request.query = {"route_key": "rk", "width": "80"}
request.query = {"route_key": "rk"}
# Mock screen state: width=80, height=2, buffer with "hello" on first line
screen_buffer = [
[{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75],
[{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80,
]
session = MagicMock()
session.get_screen_lines = AsyncMock(return_value=["hello", ""])
session.get_replay_buffer = AsyncMock(return_value=b"hello\r\n")
session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer))
monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: session)
@@ -210,11 +214,15 @@ class TestLocalServerHelpers:
@pytest.mark.asyncio
async def test_screenshot_creates_session_for_known_slug(self, server, monkeypatch):
request = MagicMock()
request.query = {"route_key": "known", "width": "90"}
request.query = {"route_key": "known"}
# Mock screen state
screen_buffer = [
[{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "world" + " " * 75],
[{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80,
]
session = MagicMock()
session.get_screen_lines = AsyncMock(return_value=["world", ""])
session.get_replay_buffer = AsyncMock(return_value=b"world\r\n")
session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer))
# Pretend app exists for slug "known"
server.session_manager.apps_by_slug["known"] = App(
@@ -546,7 +554,7 @@ class TestLocalServerMoreCoverage:
request.headers = {}
session = MagicMock()
session.get_screen_lines = AsyncMock(return_value=["SHOULD_NOT_BE_READ"])
session.get_screen_state = AsyncMock(return_value=(80, 2, []))
monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session)
server_with_no_apps._screenshot_cache["rk"] = (0.0, "<svg>cached</svg>")
@@ -556,17 +564,21 @@ class TestLocalServerMoreCoverage:
resp = await server_with_no_apps._handle_screenshot(request)
assert "cached" in resp.text
session.get_screen_lines.assert_not_awaited()
session.get_screen_state.assert_not_awaited()
@pytest.mark.asyncio
async def test_handle_screenshot_invalid_width_height_defaults(self, server_with_no_apps, monkeypatch):
async def test_handle_screenshot_renders_screen_state(self, server_with_no_apps, monkeypatch):
request = MagicMock()
request.query = {"route_key": "rk", "width": "nope", "height": "nope"}
request.query = {"route_key": "rk"}
request.headers = {}
# Mock screen state with some content
screen_buffer = [
[{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "hello" + " " * 75],
[{"data": " ", "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False}] * 80,
]
session = MagicMock()
session.get_screen_lines = AsyncMock(return_value=["hello", ""])
session.get_replay_buffer = AsyncMock(return_value=b"hello\n")
session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer))
monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session)
resp = await server_with_no_apps._handle_screenshot(request)
@@ -736,15 +748,19 @@ class TestLocalServerMoreCoverage:
assert created is True
@pytest.mark.asyncio
async def test_handle_screenshot_uses_replay_buffer_with_pyte(self, server_with_no_apps, monkeypatch):
"""Test that screenshot uses replay buffer with pyte for colored rendering."""
async def test_handle_screenshot_uses_screen_state(self, server_with_no_apps, monkeypatch):
"""Test that screenshot uses get_screen_state for rendering."""
request = MagicMock()
request.query = {"route_key": "rk"}
request.headers = {}
# Mock screen state
screen_buffer = [
[{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line1" + " " * 75],
[{"data": c, "fg": "default", "bg": "default", "bold": False, "italics": False, "underscore": False, "reverse": False} for c in "line2" + " " * 75],
]
session = MagicMock()
session.get_screen_lines = AsyncMock(return_value=["line1", "line2", ""])
session.get_replay_buffer = AsyncMock(return_value=b"line1\r\nline2\r\n")
session.get_screen_state = AsyncMock(return_value=(80, 2, screen_buffer))
monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session)
server_with_no_apps._route_last_activity["rk"] = 1.0
@@ -752,4 +768,4 @@ class TestLocalServerMoreCoverage:
resp = await server_with_no_apps._handle_screenshot(request)
assert resp.content_type == "image/svg+xml"
assert "<svg" in resp.text
session.get_replay_buffer.assert_awaited_once()
session.get_screen_state.assert_awaited_once()