diff --git a/pyproject.toml b/pyproject.toml index da91eed..039dbe1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "textual-webterm" -version = "0.1.10" +version = "0.1.11" description = "Serve terminal sessions over the web" authors = ["Will McGugan "] license = "MIT" diff --git a/src/textual_webterm/local_server.py b/src/textual_webterm/local_server.py index ca0550b..26d588d 100644 --- a/src/textual_webterm/local_server.py +++ b/src/textual_webterm/local_server.py @@ -8,6 +8,7 @@ import hashlib import io import json import logging +import re import signal from pathlib import Path from typing import TYPE_CHECKING @@ -37,6 +38,12 @@ SCREENSHOT_MAX_BYTES = 65536 SCREENSHOT_CACHE_SECONDS = 1.0 SCREENSHOT_MAX_CACHE_SECONDS = 60.0 +SVG_MONO_FONT_STACK = ( + 'ui-monospace, "SFMono-Regular", "FiraCode Nerd Font", "FiraMono Nerd Font", ' + '"Fira Code", "Roboto Mono", Menlo, Monaco, Consolas, "Liberation Mono", ' + '"DejaVu Sans Mono", "Courier New", monospace' +) + WEBTERM_STATIC_PATH = Path(__file__).parent / "static" @@ -84,6 +91,22 @@ class LocalClientConnector(SessionConnector): await self.server.handle_session_close(self.session_id, self.route_key) +def _rewrite_svg_fonts(svg: str) -> str: + """Make Rich SVG output self-contained and aligned with our monospace styling.""" + + # Rich export_svg embeds @font-face rules that reference external CDNs. + svg = re.sub(r"@font-face\s*\{.*?\}\s*", "", svg, flags=re.DOTALL) + + # Force our local monospace stack even if Rich sets font-family to Fira Code. + override = f"\ntext {{ font-family: {SVG_MONO_FONT_STACK} !important; }}\n" + if "" in svg: + svg = svg.replace("", override + "", 1) + else: + svg = svg.replace(" ", 1) + + return svg + + def _apply_carriage_returns(text: str) -> list[str]: """Interpret \r as 'return to start of line' (overwrite), not a newline. @@ -110,6 +133,22 @@ class LocalServer: def mark_route_activity(self, route_key: str) -> None: self._route_last_activity[route_key] = asyncio.get_event_loop().time() + def _get_cached_screenshot_response( + self, request: web.Request, route_key: str + ) -> web.Response | None: + cached = self._screenshot_cache.get(route_key) + if cached is None: + return None + + etag = self._screenshot_cache_etag.get(route_key) + if etag and request.headers.get("If-None-Match") == etag: + raise web.HTTPNotModified(headers={"ETag": etag, "Cache-Control": "no-cache"}) + + headers = {"Cache-Control": "no-cache"} + if etag: + headers["ETag"] = etag + return web.Response(text=cached[1], content_type="image/svg+xml", headers=headers) + def _get_screenshot_cache_ttl(self, route_key: str, now: float) -> float: last_activity = self._route_last_activity.get(route_key, 0.0) idle_for = max(0.0, now - last_activity) @@ -157,6 +196,7 @@ class LocalServer: self._screenshot_cache_etag: dict[str, str] = {} self._screenshot_locks: dict[str, asyncio.Lock] = {} self._route_last_activity: dict[str, float] = {} + self._screenshot_last_rendered_activity: dict[str, float] = {} @property def app_count(self) -> int: @@ -288,12 +328,14 @@ class LocalServer: msg_type = envelope[0] if msg_type == "stdin": + self.mark_route_activity(route_key) data = envelope[1] if len(envelope) > 1 else "" session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key)) if session_process: await session_process.send_bytes(data.encode("utf-8")) elif msg_type == "resize": + self.mark_route_activity(route_key) size_data = envelope[1] if len(envelope) > 1 else {} width = max(1, min(500, int(size_data.get("width", 80)))) height = max(1, min(500, int(size_data.get("height", 24)))) @@ -422,6 +464,15 @@ class LocalServer: if session_process is None or not hasattr(session_process, "get_replay_buffer"): raise web.HTTPNotFound(text="Session not found") + # If nothing has changed since the last render, serve cached screenshot without + # touching the session replay buffer. + last_activity = self._route_last_activity.get(route_key, 0.0) + last_rendered_activity = self._screenshot_last_rendered_activity.get(route_key, -1.0) + if last_activity <= last_rendered_activity: + cached_response = self._get_cached_screenshot_response(request, route_key) + if cached_response is not None: + return cached_response + replay_data = await session_process.get_replay_buffer() # type: ignore[func-returns-value] if len(replay_data) > SCREENSHOT_MAX_BYTES: replay_data = replay_data[-SCREENSHOT_MAX_BYTES:] @@ -447,16 +498,17 @@ class LocalServer: ttl = self._get_screenshot_cache_ttl(route_key, now) cached = self._screenshot_cache.get(route_key) - if cached is not None: - etag = self._screenshot_cache_etag.get(route_key) - if etag and request.headers.get("If-None-Match") == etag: - raise web.HTTPNotModified(headers={"ETag": etag, "Cache-Control": "no-cache"}) + # If we have a cached screenshot and the session is idle, keep serving it until + # new activity occurs (no periodic re-render). + if cached is not None and self._route_last_activity.get(route_key, 0.0) == 0.0: + cached_response = self._get_cached_screenshot_response(request, route_key) + if cached_response is not None: + return cached_response - if (now - cached[0]) < ttl: - headers = {"Cache-Control": "no-cache"} - if etag: - headers["ETag"] = etag - return web.Response(text=cached[1], content_type="image/svg+xml", headers=headers) + if cached is not None and (now - cached[0]) < ttl: + cached_response = self._get_cached_screenshot_response(request, route_key) + if cached_response is not None: + return cached_response lock = self._screenshot_locks.get(route_key) if lock is None: @@ -467,15 +519,10 @@ class LocalServer: # Another request may have refreshed the cache while we waited. ttl = self._get_screenshot_cache_ttl(route_key, now) cached = self._screenshot_cache.get(route_key) - etag = self._screenshot_cache_etag.get(route_key) - if cached is not None: - if etag and request.headers.get("If-None-Match") == etag: - raise web.HTTPNotModified(headers={"ETag": etag, "Cache-Control": "no-cache"}) - if (now - cached[0]) < ttl: - headers = {"Cache-Control": "no-cache"} - if etag: - headers["ETag"] = etag - return web.Response(text=cached[1], content_type="image/svg+xml", headers=headers) + if cached is not None and (now - cached[0]) < ttl: + cached_response = self._get_cached_screenshot_response(request, route_key) + if cached_response is not None: + return cached_response def _render_svg() -> str: console = Console(record=True, width=width, height=height, file=io.StringIO()) @@ -505,9 +552,13 @@ class LocalServer: ) svg = await asyncio.to_thread(_render_svg) + svg = _rewrite_svg_fonts(svg) etag = hashlib.sha1(svg.encode("utf-8"), usedforsecurity=False).hexdigest() self._screenshot_cache[route_key] = (asyncio.get_event_loop().time(), svg) self._screenshot_cache_etag[route_key] = etag + self._screenshot_last_rendered_activity[route_key] = self._route_last_activity.get( + route_key, 0.0 + ) headers = {"Cache-Control": "no-cache", "ETag": etag} return web.Response(text=svg, content_type="image/svg+xml", headers=headers) diff --git a/tests/test_local_server_unit.py b/tests/test_local_server_unit.py index a04157a..55804a3 100644 --- a/tests/test_local_server_unit.py +++ b/tests/test_local_server_unit.py @@ -622,3 +622,149 @@ class TestLocalServerMoreCoverage: await connector.on_close() server.handle_session_close.assert_awaited_once_with("sid", "rk") + + @pytest.mark.asyncio + async def test_run_stops_exit_poller_and_exits_poller(self, server_with_no_apps, monkeypatch): + async def boom(): + raise RuntimeError("boom") + + monkeypatch.setattr(server_with_no_apps, "_run", boom) + server_with_no_apps._exit_poller.stop = MagicMock() + server_with_no_apps._poller.exit = MagicMock() + + with pytest.raises(RuntimeError): + await server_with_no_apps.run() + + server_with_no_apps._exit_poller.stop.assert_called_once() + server_with_no_apps._poller.exit.assert_called_once() + + def test_on_keyboard_interrupt_sets_event_when_already_shutting_down(self, server_with_no_apps): + server_with_no_apps._shutdown_started = True + assert not server_with_no_apps.exit_event.is_set() + server_with_no_apps.on_keyboard_interrupt() + assert server_with_no_apps.exit_event.is_set() + + @pytest.mark.asyncio + async def test_on_keyboard_interrupt_schedules_shutdown_in_running_loop(self, server_with_no_apps): + called = {"shutdown": False} + + async def shutdown(): + called["shutdown"] = True + server_with_no_apps.exit_event.set() + + server_with_no_apps._shutdown = shutdown # type: ignore[method-assign] + server_with_no_apps.on_keyboard_interrupt() + + assert server_with_no_apps._shutdown_task is not None + await server_with_no_apps._shutdown_task + assert called["shutdown"] is True + + def test_on_keyboard_interrupt_uses_call_soon_threadsafe_when_loop_running( + self, server_with_no_apps, monkeypatch + ): + async def shutdown(): + return None + + server_with_no_apps._shutdown = shutdown # type: ignore[method-assign] + + fake_loop = MagicMock() + fake_loop.is_running = MagicMock(return_value=True) + server_with_no_apps._loop = fake_loop + + created = {"called": False} + + def fake_create_task(coro): + created["called"] = True + coro.close() + return MagicMock() + + monkeypatch.setattr("textual_webterm.local_server.asyncio.create_task", fake_create_task) + + server_with_no_apps.on_keyboard_interrupt() + assert fake_loop.call_soon_threadsafe.called + + schedule = fake_loop.call_soon_threadsafe.call_args.args[0] + schedule() + assert created["called"] is True + + def test_build_routes_logs_error_when_static_path_missing(self, server_with_no_apps, monkeypatch): + from pathlib import Path + + from textual_webterm import local_server + + class FakePath(Path): + _flavour = type(Path())._flavour + + def exists(self) -> bool: # type: ignore[override] + return False + + monkeypatch.setattr(local_server, "STATIC_PATH", FakePath("/definitely-missing")) + monkeypatch.setattr(local_server.log, "error", MagicMock()) + + server_with_no_apps._build_routes() + local_server.log.error.assert_called() + + @pytest.mark.asyncio + async def test_dispatch_ws_message_stdin_without_payload_sends_empty(self, server_with_no_apps, monkeypatch): + session = MagicMock() + session.send_bytes = AsyncMock() + monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) + + ws = MagicMock() + created = await server_with_no_apps._dispatch_ws_message(["stdin"], "rk", ws, False) + assert created is False + session.send_bytes.assert_awaited_once_with(b"") + + @pytest.mark.asyncio + async def test_dispatch_ws_message_resize_updates_existing_session(self, server_with_no_apps, monkeypatch): + session = MagicMock() + session.set_terminal_size = AsyncMock() + monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) + + ws = MagicMock() + created = await server_with_no_apps._dispatch_ws_message( + ["resize", {"width": 100, "height": 50}], "rk", ws, True + ) + assert created is True + session.set_terminal_size.assert_awaited_once_with(100, 50) + + @pytest.mark.asyncio + async def test_dispatch_ws_message_resize_no_session_noop(self, server_with_no_apps, monkeypatch): + monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: None) + + ws = MagicMock() + created = await server_with_no_apps._dispatch_ws_message( + ["resize", {"width": 100, "height": 50}], "rk", ws, True + ) + assert created is True + + @pytest.mark.asyncio + async def test_handle_screenshot_truncates_replay_buffer_before_decode(self, server_with_no_apps, monkeypatch): + from textual_webterm.local_server import SCREENSHOT_MAX_BYTES + + request = MagicMock() + request.query = {"route_key": "rk"} + request.headers = {} + + session = MagicMock() + session.get_replay_buffer = AsyncMock(return_value=b"x" * (SCREENSHOT_MAX_BYTES + 10)) + monkeypatch.setattr(server_with_no_apps.session_manager, "get_session_by_route_key", lambda _rk: session) + + server_with_no_apps._route_last_activity["rk"] = 1.0 + + captured = {"len": None} + + def apply_cr(text: str): + captured["len"] = len(text) + return ["x"] + + async def fake_to_thread(_fn): + return "" + + monkeypatch.setattr("textual_webterm.local_server._apply_carriage_returns", apply_cr) + monkeypatch.setattr("textual_webterm.local_server.asyncio.to_thread", AsyncMock(side_effect=fake_to_thread)) + monkeypatch.setattr("textual_webterm.local_server._rewrite_svg_fonts", lambda s: s) + + resp = await server_with_no_apps._handle_screenshot(request) + assert resp.content_type == "image/svg+xml" + assert captured["len"] == SCREENSHOT_MAX_BYTES diff --git a/tests/test_terminal_session.py b/tests/test_terminal_session.py index f4af607..31f49bc 100644 --- a/tests/test_terminal_session.py +++ b/tests/test_terminal_session.py @@ -286,3 +286,160 @@ class TestTerminalSession: assert await session.send_bytes(b"x") is True poller.write.assert_awaited_once_with(10, b"x") + + @pytest.mark.asyncio + async def test_open_set_terminal_size_oserror_closes_fd_and_clears_master_fd(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + + with ( + patch("textual_webterm.terminal_session.pty.fork", return_value=(1234, 99)), + patch.object(session, "_set_terminal_size", side_effect=OSError("bad")), + patch("textual_webterm.terminal_session.os.close") as mock_close, + pytest.raises(OSError), + ): + await session.open(width=80, height=24) + + mock_close.assert_called_once_with(99) + assert session.master_fd is None + + @pytest.mark.asyncio + async def test_set_terminal_size_uses_executor(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + + loop = asyncio.get_running_loop() + with patch.object(loop, "run_in_executor", new=AsyncMock()) as run_in_executor: + await session.set_terminal_size(80, 24) + + run_in_executor.assert_awaited_once_with(None, session._set_terminal_size, 80, 24) + + def test__set_terminal_size_calls_ioctl(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + + with patch("textual_webterm.terminal_session.fcntl.ioctl") as mock_ioctl: + session._set_terminal_size(80, 24) + + assert mock_ioctl.called + + @pytest.mark.asyncio + async def test_start_creates_task_when_not_running(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + + session.run = AsyncMock() # type: ignore[method-assign] + + connector = MagicMock() + task = await session.start(connector) + assert task is session._task + assert session._connector is connector + + await task + session.run.assert_awaited_once() + + @pytest.mark.asyncio + async def test_run_without_connector_still_closes(self): + from textual_webterm.terminal_session import TerminalSession + + queue: asyncio.Queue[bytes | None] = asyncio.Queue() + await queue.put(b"hello") + await queue.put(None) + + poller = MagicMock() + poller.add_file = MagicMock(return_value=queue) + poller.remove_file = MagicMock() + + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + session._connector = None + + with patch("textual_webterm.terminal_session.os.close") as mock_close: + await session.run() + + poller.remove_file.assert_called_once_with(10) + mock_close.assert_called_once_with(10) + + @pytest.mark.asyncio + async def test_run_oserror_still_closes(self): + from textual_webterm.terminal_session import TerminalSession + + queue = MagicMock() + queue.get = AsyncMock(side_effect=OSError("boom")) + + poller = MagicMock() + poller.add_file = MagicMock(return_value=queue) + poller.remove_file = MagicMock() + + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + session._connector = None + + with patch("textual_webterm.terminal_session.os.close") as mock_close: + await session.run() + + poller.remove_file.assert_called_once_with(10) + mock_close.assert_called_once_with(10) + + @pytest.mark.asyncio + async def test_close_process_lookup_error_is_ignored(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.pid = 123 + + with patch("textual_webterm.terminal_session.os.kill", side_effect=ProcessLookupError()): + await session.close() + + @pytest.mark.asyncio + async def test_close_logs_warning_on_unexpected_exception(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.pid = 123 + + with ( + patch("textual_webterm.terminal_session.os.kill", side_effect=RuntimeError("x")), + patch("textual_webterm.terminal_session.log.warning") as warn, + ): + await session.close() + + assert warn.called + + @pytest.mark.asyncio + async def test_wait_suppresses_cancelled_error(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + + task = asyncio.create_task(asyncio.sleep(10)) + task.cancel() + session._task = task + + await session.wait() + + def test_is_running_false_when_kill_fails(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + session._task = MagicMock() + session.pid = 123 + + with patch("textual_webterm.terminal_session.os.kill", side_effect=OSError()): + assert session.is_running() is False