diff --git a/src/webterm/alt_screen.py b/src/webterm/alt_screen.py new file mode 100644 index 0000000..659294a --- /dev/null +++ b/src/webterm/alt_screen.py @@ -0,0 +1,113 @@ +"""Custom pyte Screen with alternate screen buffer support. + +pyte's standard Screen class doesn't implement DECSET/DECRST 1049 (alternate screen buffer) +which causes issues when programs like tmux, vim, less, etc. switch between main and alternate +screens. Without this, screen clearing in tmux panes shows overlapping old and new content +in screenshots. + +This module provides AltScreen, a Screen subclass that properly saves and restores +the screen buffer when switching between main and alternate screen modes. +""" + +from __future__ import annotations + +import copy +from typing import TYPE_CHECKING, Any + +import pyte + +if TYPE_CHECKING: + from pyte.screens import Char + +# Private mode 1049 (alternate screen buffer) - shifted by 5 as per pyte's convention +DECALTBUF = 1049 << 5 + + +class AltScreen(pyte.Screen): + """A pyte Screen with proper alternate screen buffer support. + + Implements DECSET/DECRST 1049 to save and restore the main screen buffer + when programs switch to alternate screen mode. + + """ + + def __init__(self, columns: int, lines: int, *args: Any, **kwargs: Any) -> None: + super().__init__(columns, lines, *args, **kwargs) + # Storage for main screen state when in alternate mode + self._saved_buffer: dict[int, dict[int, Char]] | None = None + self._saved_cursor: pyte.screens.Cursor | None = None + + def _save_main_screen(self) -> None: + """Save the current screen buffer and cursor for later restoration.""" + # Deep copy the buffer to avoid aliasing + # Save all rows within current screen bounds with all their column data + self._saved_buffer = {} + for row_idx in range(self.lines): + self._saved_buffer[row_idx] = { + col: self.buffer[row_idx][col] for col in range(self.columns) + } + # Save cursor state + self._saved_cursor = copy.copy(self.cursor) + + def _restore_main_screen(self) -> None: + """Restore the previously saved screen buffer and cursor.""" + if self._saved_buffer is not None: + # Restore buffer - copy characters into existing line structures + for row_idx in range(self.lines): + if row_idx in self._saved_buffer: + saved_row = self._saved_buffer[row_idx] + for col in range(self.columns): + if col in saved_row: + self.buffer[row_idx][col] = saved_row[col] + else: + self.buffer[row_idx][col] = self.default_char + else: + # Clear rows that weren't in saved buffer + for col in range(self.columns): + self.buffer[row_idx][col] = self.default_char + self._saved_buffer = None + + if self._saved_cursor is not None: + self.cursor = self._saved_cursor + self._saved_cursor = None + + # Mark all lines as dirty for re-render + self.dirty.update(range(self.lines)) + + def set_mode(self, *modes: int, **kwargs: Any) -> None: + """Set (enable) modes, with special handling for alternate screen buffer.""" + # Check if we're entering alternate screen mode (private mode 1049) + if kwargs.get("private") and 1049 in modes and DECALTBUF not in self.mode: + # Save main screen before switching + self._save_main_screen() + # Clear screen for alternate buffer + self.erase_in_display(2) + self.cursor_position() + + # Call parent implementation + super().set_mode(*modes, **kwargs) + + def reset_mode(self, *modes: int, **kwargs: Any) -> None: + """Reset (disable) modes, with special handling for alternate screen buffer.""" + # Check if we're leaving alternate screen mode (private mode 1049) + if kwargs.get("private") and 1049 in modes and DECALTBUF in self.mode: + # Will be removed by parent, restore main screen after + super().reset_mode(*modes, **kwargs) + self._restore_main_screen() + return + + # Call parent implementation + super().reset_mode(*modes, **kwargs) + + def resize(self, lines: int | None = None, columns: int | None = None) -> None: + """Resize screen, clearing saved alternate buffer if size changes.""" + # If we're in alternate mode and resizing, the saved buffer may be invalid + if self._saved_buffer is not None and ( + (lines is not None and lines != self.lines) + or (columns is not None and columns != self.columns) + ): + # Invalidate saved buffer on resize - it won't match the new dimensions + self._saved_buffer = None + self._saved_cursor = None + + super().resize(lines, columns) diff --git a/src/webterm/docker_exec_session.py b/src/webterm/docker_exec_session.py index 19549b6..47733db 100644 --- a/src/webterm/docker_exec_session.py +++ b/src/webterm/docker_exec_session.py @@ -14,6 +14,7 @@ from typing import TYPE_CHECKING import pyte +from .alt_screen import AltScreen from .docker_stats import get_docker_socket_path from .session import Session, SessionConnector @@ -70,7 +71,7 @@ class DockerExecSession(Session): self._replay_buffer: deque[bytes] = deque() self._replay_buffer_size = 0 self._replay_lock = asyncio.Lock() - self._screen = pyte.Screen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) + self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) self._stream = pyte.Stream(self._screen) self._screen_lock = asyncio.Lock() self._last_width = DEFAULT_SCREEN_WIDTH @@ -218,7 +219,7 @@ class DockerExecSession(Session): self._last_width = width self._last_height = height async with self._screen_lock: - self._screen = pyte.Screen(width, height) + self._screen = AltScreen(width, height) self._stream = pyte.Stream(self._screen) exec_id = await asyncio.to_thread(self._create_exec) self._exec_id = exec_id diff --git a/src/webterm/docker_watcher.py b/src/webterm/docker_watcher.py index 2d94dfe..8c44359 100644 --- a/src/webterm/docker_watcher.py +++ b/src/webterm/docker_watcher.py @@ -279,8 +279,6 @@ class DockerWatcher: action = event.get("Action", "") actor = event.get("Actor", {}) container_id = actor.get("ID", "") - attributes = actor.get("Attributes", {}) - if action == "start": # Get full container info status, body = await self._docker_request("GET", f"/containers/{container_id}/json") diff --git a/src/webterm/terminal_session.py b/src/webterm/terminal_session.py index 2429b76..3d5a087 100644 --- a/src/webterm/terminal_session.py +++ b/src/webterm/terminal_session.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING import pyte from importlib_metadata import PackageNotFoundError, version +from .alt_screen import AltScreen from .session import Session, SessionConnector if TYPE_CHECKING: @@ -63,8 +64,8 @@ class TerminalSession(Session): self._replay_buffer: deque[bytes] = deque() self._replay_buffer_size = 0 self._replay_lock = asyncio.Lock() - # pyte screen for accurate terminal state tracking - self._screen = pyte.Screen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) + # pyte screen for accurate terminal state tracking (AltScreen for alternate buffer support) + self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT) self._stream = pyte.Stream(self._screen) self._screen_lock = asyncio.Lock() # Track last known terminal size for reconnection @@ -94,7 +95,7 @@ class TerminalSession(Session): self._last_height = height # Initialize pyte screen with the requested size (under lock to prevent races) async with self._screen_lock: - self._screen = pyte.Screen(width, height) + self._screen = AltScreen(width, height) self._stream = pyte.Stream(self._screen) pid, master_fd = pty.fork() diff --git a/tests/test_alt_screen.py b/tests/test_alt_screen.py new file mode 100644 index 0000000..7089bc8 --- /dev/null +++ b/tests/test_alt_screen.py @@ -0,0 +1,104 @@ +"""Tests for the AltScreen class with alternate screen buffer support.""" + +import pyte + +from webterm.alt_screen import DECALTBUF, AltScreen + + +class TestAltScreen: + """Tests for AltScreen alternate buffer support.""" + + def test_basic_screen_operations(self): + """Test that basic screen operations still work.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + stream.feed("Hello World\r\n") + stream.feed("Line 2") + + assert "Hello World" in screen.display[0] + assert "Line 2" in screen.display[1] + + def test_alternate_screen_save_restore(self): + """Test DECSET/DECRST 1049 saves and restores main screen.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + # Write to main screen + stream.feed("MAIN SCREEN LINE 1\r\n") + stream.feed("MAIN SCREEN LINE 2\r\n") + assert "MAIN SCREEN LINE 1" in screen.display[0] + assert "MAIN SCREEN LINE 2" in screen.display[1] + + # Enter alternate screen (DECSET 1049) + stream.feed("\x1b[?1049h") + # Screen should be cleared + assert screen.display[0].strip() == "" + assert screen.display[1].strip() == "" + + # Write to alternate screen + stream.feed("ALT SCREEN CONTENT\r\n") + assert "ALT SCREEN CONTENT" in screen.display[0] + + # Exit alternate screen (DECRST 1049) + stream.feed("\x1b[?1049l") + # Main screen should be restored + assert "MAIN SCREEN LINE 1" in screen.display[0] + assert "MAIN SCREEN LINE 2" in screen.display[1] + + def test_alternate_screen_mode_flag(self): + """Test that DECALTBUF mode flag is set correctly.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + assert DECALTBUF not in screen.mode + + stream.feed("\x1b[?1049h") + assert DECALTBUF in screen.mode + + stream.feed("\x1b[?1049l") + assert DECALTBUF not in screen.mode + + def test_multiple_alt_screen_switches(self): + """Test multiple switches between main and alternate screen.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + # Main content + stream.feed("MAIN 1\r\n") + stream.feed("\x1b[?1049h") # Enter alt + stream.feed("ALT 1\r\n") + stream.feed("\x1b[?1049l") # Exit alt + assert "MAIN 1" in screen.display[0] + + # More main content + stream.feed("MAIN 2\r\n") + stream.feed("\x1b[?1049h") # Enter alt again + assert screen.display[0].strip() == "" # Alt screen is clear + stream.feed("\x1b[?1049l") # Exit alt + assert "MAIN 1" in screen.display[0] + assert "MAIN 2" in screen.display[1] + + def test_resize_invalidates_saved_buffer(self): + """Test that resizing clears the saved alternate screen buffer.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + stream.feed("MAIN CONTENT\r\n") + stream.feed("\x1b[?1049h") # Enter alt + assert screen._saved_buffer is not None + + # Resize while in alt mode + screen.resize(20, 80) + assert screen._saved_buffer is None + + def test_ed_clear_still_works(self): + """Test that explicit ED (erase display) still works.""" + screen = AltScreen(40, 10) + stream = pyte.Stream(screen) + + stream.feed("Line 1\r\n") + stream.feed("Line 2\r\n") + stream.feed("\x1b[2J") # ED 2 - erase entire display + + assert all(line.strip() == "" for line in screen.display)