Add alt screen buffer support
This commit is contained in:
@@ -0,0 +1,113 @@
|
||||
"""Custom pyte Screen with alternate screen buffer support.
|
||||
|
||||
pyte's standard Screen class doesn't implement DECSET/DECRST 1049 (alternate screen buffer)
|
||||
which causes issues when programs like tmux, vim, less, etc. switch between main and alternate
|
||||
screens. Without this, screen clearing in tmux panes shows overlapping old and new content
|
||||
in screenshots.
|
||||
|
||||
This module provides AltScreen, a Screen subclass that properly saves and restores
|
||||
the screen buffer when switching between main and alternate screen modes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import copy
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
import pyte
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from pyte.screens import Char
|
||||
|
||||
# Private mode 1049 (alternate screen buffer) - shifted by 5 as per pyte's convention
|
||||
DECALTBUF = 1049 << 5
|
||||
|
||||
|
||||
class AltScreen(pyte.Screen):
|
||||
"""A pyte Screen with proper alternate screen buffer support.
|
||||
|
||||
Implements DECSET/DECRST 1049 to save and restore the main screen buffer
|
||||
when programs switch to alternate screen mode.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, columns: int, lines: int, *args: Any, **kwargs: Any) -> None:
|
||||
super().__init__(columns, lines, *args, **kwargs)
|
||||
# Storage for main screen state when in alternate mode
|
||||
self._saved_buffer: dict[int, dict[int, Char]] | None = None
|
||||
self._saved_cursor: pyte.screens.Cursor | None = None
|
||||
|
||||
def _save_main_screen(self) -> None:
|
||||
"""Save the current screen buffer and cursor for later restoration."""
|
||||
# Deep copy the buffer to avoid aliasing
|
||||
# Save all rows within current screen bounds with all their column data
|
||||
self._saved_buffer = {}
|
||||
for row_idx in range(self.lines):
|
||||
self._saved_buffer[row_idx] = {
|
||||
col: self.buffer[row_idx][col] for col in range(self.columns)
|
||||
}
|
||||
# Save cursor state
|
||||
self._saved_cursor = copy.copy(self.cursor)
|
||||
|
||||
def _restore_main_screen(self) -> None:
|
||||
"""Restore the previously saved screen buffer and cursor."""
|
||||
if self._saved_buffer is not None:
|
||||
# Restore buffer - copy characters into existing line structures
|
||||
for row_idx in range(self.lines):
|
||||
if row_idx in self._saved_buffer:
|
||||
saved_row = self._saved_buffer[row_idx]
|
||||
for col in range(self.columns):
|
||||
if col in saved_row:
|
||||
self.buffer[row_idx][col] = saved_row[col]
|
||||
else:
|
||||
self.buffer[row_idx][col] = self.default_char
|
||||
else:
|
||||
# Clear rows that weren't in saved buffer
|
||||
for col in range(self.columns):
|
||||
self.buffer[row_idx][col] = self.default_char
|
||||
self._saved_buffer = None
|
||||
|
||||
if self._saved_cursor is not None:
|
||||
self.cursor = self._saved_cursor
|
||||
self._saved_cursor = None
|
||||
|
||||
# Mark all lines as dirty for re-render
|
||||
self.dirty.update(range(self.lines))
|
||||
|
||||
def set_mode(self, *modes: int, **kwargs: Any) -> None:
|
||||
"""Set (enable) modes, with special handling for alternate screen buffer."""
|
||||
# Check if we're entering alternate screen mode (private mode 1049)
|
||||
if kwargs.get("private") and 1049 in modes and DECALTBUF not in self.mode:
|
||||
# Save main screen before switching
|
||||
self._save_main_screen()
|
||||
# Clear screen for alternate buffer
|
||||
self.erase_in_display(2)
|
||||
self.cursor_position()
|
||||
|
||||
# Call parent implementation
|
||||
super().set_mode(*modes, **kwargs)
|
||||
|
||||
def reset_mode(self, *modes: int, **kwargs: Any) -> None:
|
||||
"""Reset (disable) modes, with special handling for alternate screen buffer."""
|
||||
# Check if we're leaving alternate screen mode (private mode 1049)
|
||||
if kwargs.get("private") and 1049 in modes and DECALTBUF in self.mode:
|
||||
# Will be removed by parent, restore main screen after
|
||||
super().reset_mode(*modes, **kwargs)
|
||||
self._restore_main_screen()
|
||||
return
|
||||
|
||||
# Call parent implementation
|
||||
super().reset_mode(*modes, **kwargs)
|
||||
|
||||
def resize(self, lines: int | None = None, columns: int | None = None) -> None:
|
||||
"""Resize screen, clearing saved alternate buffer if size changes."""
|
||||
# If we're in alternate mode and resizing, the saved buffer may be invalid
|
||||
if self._saved_buffer is not None and (
|
||||
(lines is not None and lines != self.lines)
|
||||
or (columns is not None and columns != self.columns)
|
||||
):
|
||||
# Invalidate saved buffer on resize - it won't match the new dimensions
|
||||
self._saved_buffer = None
|
||||
self._saved_cursor = None
|
||||
|
||||
super().resize(lines, columns)
|
||||
@@ -14,6 +14,7 @@ from typing import TYPE_CHECKING
|
||||
|
||||
import pyte
|
||||
|
||||
from .alt_screen import AltScreen
|
||||
from .docker_stats import get_docker_socket_path
|
||||
from .session import Session, SessionConnector
|
||||
|
||||
@@ -70,7 +71,7 @@ class DockerExecSession(Session):
|
||||
self._replay_buffer: deque[bytes] = deque()
|
||||
self._replay_buffer_size = 0
|
||||
self._replay_lock = asyncio.Lock()
|
||||
self._screen = pyte.Screen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT)
|
||||
self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
self._screen_lock = asyncio.Lock()
|
||||
self._last_width = DEFAULT_SCREEN_WIDTH
|
||||
@@ -218,7 +219,7 @@ class DockerExecSession(Session):
|
||||
self._last_width = width
|
||||
self._last_height = height
|
||||
async with self._screen_lock:
|
||||
self._screen = pyte.Screen(width, height)
|
||||
self._screen = AltScreen(width, height)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
exec_id = await asyncio.to_thread(self._create_exec)
|
||||
self._exec_id = exec_id
|
||||
|
||||
@@ -279,8 +279,6 @@ class DockerWatcher:
|
||||
action = event.get("Action", "")
|
||||
actor = event.get("Actor", {})
|
||||
container_id = actor.get("ID", "")
|
||||
attributes = actor.get("Attributes", {})
|
||||
|
||||
if action == "start":
|
||||
# Get full container info
|
||||
status, body = await self._docker_request("GET", f"/containers/{container_id}/json")
|
||||
|
||||
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING
|
||||
import pyte
|
||||
from importlib_metadata import PackageNotFoundError, version
|
||||
|
||||
from .alt_screen import AltScreen
|
||||
from .session import Session, SessionConnector
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -63,8 +64,8 @@ class TerminalSession(Session):
|
||||
self._replay_buffer: deque[bytes] = deque()
|
||||
self._replay_buffer_size = 0
|
||||
self._replay_lock = asyncio.Lock()
|
||||
# pyte screen for accurate terminal state tracking
|
||||
self._screen = pyte.Screen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT)
|
||||
# pyte screen for accurate terminal state tracking (AltScreen for alternate buffer support)
|
||||
self._screen = AltScreen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
self._screen_lock = asyncio.Lock()
|
||||
# Track last known terminal size for reconnection
|
||||
@@ -94,7 +95,7 @@ class TerminalSession(Session):
|
||||
self._last_height = height
|
||||
# Initialize pyte screen with the requested size (under lock to prevent races)
|
||||
async with self._screen_lock:
|
||||
self._screen = pyte.Screen(width, height)
|
||||
self._screen = AltScreen(width, height)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
|
||||
pid, master_fd = pty.fork()
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
"""Tests for the AltScreen class with alternate screen buffer support."""
|
||||
|
||||
import pyte
|
||||
|
||||
from webterm.alt_screen import DECALTBUF, AltScreen
|
||||
|
||||
|
||||
class TestAltScreen:
|
||||
"""Tests for AltScreen alternate buffer support."""
|
||||
|
||||
def test_basic_screen_operations(self):
|
||||
"""Test that basic screen operations still work."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
stream.feed("Hello World\r\n")
|
||||
stream.feed("Line 2")
|
||||
|
||||
assert "Hello World" in screen.display[0]
|
||||
assert "Line 2" in screen.display[1]
|
||||
|
||||
def test_alternate_screen_save_restore(self):
|
||||
"""Test DECSET/DECRST 1049 saves and restores main screen."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
# Write to main screen
|
||||
stream.feed("MAIN SCREEN LINE 1\r\n")
|
||||
stream.feed("MAIN SCREEN LINE 2\r\n")
|
||||
assert "MAIN SCREEN LINE 1" in screen.display[0]
|
||||
assert "MAIN SCREEN LINE 2" in screen.display[1]
|
||||
|
||||
# Enter alternate screen (DECSET 1049)
|
||||
stream.feed("\x1b[?1049h")
|
||||
# Screen should be cleared
|
||||
assert screen.display[0].strip() == ""
|
||||
assert screen.display[1].strip() == ""
|
||||
|
||||
# Write to alternate screen
|
||||
stream.feed("ALT SCREEN CONTENT\r\n")
|
||||
assert "ALT SCREEN CONTENT" in screen.display[0]
|
||||
|
||||
# Exit alternate screen (DECRST 1049)
|
||||
stream.feed("\x1b[?1049l")
|
||||
# Main screen should be restored
|
||||
assert "MAIN SCREEN LINE 1" in screen.display[0]
|
||||
assert "MAIN SCREEN LINE 2" in screen.display[1]
|
||||
|
||||
def test_alternate_screen_mode_flag(self):
|
||||
"""Test that DECALTBUF mode flag is set correctly."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
assert DECALTBUF not in screen.mode
|
||||
|
||||
stream.feed("\x1b[?1049h")
|
||||
assert DECALTBUF in screen.mode
|
||||
|
||||
stream.feed("\x1b[?1049l")
|
||||
assert DECALTBUF not in screen.mode
|
||||
|
||||
def test_multiple_alt_screen_switches(self):
|
||||
"""Test multiple switches between main and alternate screen."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
# Main content
|
||||
stream.feed("MAIN 1\r\n")
|
||||
stream.feed("\x1b[?1049h") # Enter alt
|
||||
stream.feed("ALT 1\r\n")
|
||||
stream.feed("\x1b[?1049l") # Exit alt
|
||||
assert "MAIN 1" in screen.display[0]
|
||||
|
||||
# More main content
|
||||
stream.feed("MAIN 2\r\n")
|
||||
stream.feed("\x1b[?1049h") # Enter alt again
|
||||
assert screen.display[0].strip() == "" # Alt screen is clear
|
||||
stream.feed("\x1b[?1049l") # Exit alt
|
||||
assert "MAIN 1" in screen.display[0]
|
||||
assert "MAIN 2" in screen.display[1]
|
||||
|
||||
def test_resize_invalidates_saved_buffer(self):
|
||||
"""Test that resizing clears the saved alternate screen buffer."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
stream.feed("MAIN CONTENT\r\n")
|
||||
stream.feed("\x1b[?1049h") # Enter alt
|
||||
assert screen._saved_buffer is not None
|
||||
|
||||
# Resize while in alt mode
|
||||
screen.resize(20, 80)
|
||||
assert screen._saved_buffer is None
|
||||
|
||||
def test_ed_clear_still_works(self):
|
||||
"""Test that explicit ED (erase display) still works."""
|
||||
screen = AltScreen(40, 10)
|
||||
stream = pyte.Stream(screen)
|
||||
|
||||
stream.feed("Line 1\r\n")
|
||||
stream.feed("Line 2\r\n")
|
||||
stream.feed("\x1b[2J") # ED 2 - erase entire display
|
||||
|
||||
assert all(line.strip() == "" for line in screen.display)
|
||||
Reference in New Issue
Block a user