d5343117d3
The replay buffer can contain DA1/DA2 terminal attribute responses (e.g., \x1b[?1;10;0c) that were captured before filtering was added to the session classes. These responses appear as visible text like '1;10;0c' when sent to the client on reconnect. This adds an additional filter pass when sending the replay buffer, ensuring no DA1 responses reach the client regardless of when they were captured.
530 lines
20 KiB
Python
530 lines
20 KiB
Python
"""Tests for terminal_session module."""
|
|
|
|
import asyncio
|
|
import os
|
|
import platform
|
|
import pty
|
|
import shlex
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from webterm.terminal_session import (
|
|
REPLAY_BUFFER_SIZE,
|
|
TerminalSession,
|
|
)
|
|
|
|
# Skip tests on Windows
|
|
pytestmark = pytest.mark.skipif(
|
|
platform.system() == "Windows",
|
|
reason="Terminal sessions not supported on Windows",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def terminal_session(mock_poller):
|
|
"""Create a TerminalSession for testing."""
|
|
return TerminalSession(mock_poller, "test-session", "bash")
|
|
|
|
|
|
class TestTerminalSession:
|
|
"""Tests for TerminalSession class."""
|
|
|
|
def test_import(self):
|
|
"""Test that module can be imported."""
|
|
assert TerminalSession is not None
|
|
|
|
def test_replay_buffer_size(self):
|
|
"""Test replay buffer size constant."""
|
|
assert REPLAY_BUFFER_SIZE == 256 * 1024
|
|
|
|
def test_init(self, terminal_session):
|
|
"""Test TerminalSession initialization."""
|
|
assert terminal_session.session_id == "test-session"
|
|
assert terminal_session.command == "bash"
|
|
assert terminal_session.master_fd is None
|
|
assert terminal_session.pid is None
|
|
assert terminal_session._task is None
|
|
|
|
def test_init_default_shell(self, mock_poller):
|
|
"""Test that default shell is used when command is empty."""
|
|
with patch.dict(os.environ, {"SHELL": "/bin/zsh"}):
|
|
session = TerminalSession(mock_poller, "test-session", "")
|
|
assert session.command == "/bin/zsh"
|
|
|
|
def test_package_version_fallback(self):
|
|
with (
|
|
patch("webterm.terminal_session.version", side_effect=RuntimeError()),
|
|
patch("webterm.terminal_session.PackageNotFoundError", RuntimeError),
|
|
):
|
|
assert TerminalSession._package_version() == "0.0.0"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_buffer_add(self, terminal_session):
|
|
"""Test adding data to replay buffer."""
|
|
await terminal_session._add_to_replay_buffer(b"test data")
|
|
assert terminal_session._replay_buffer_size == 9
|
|
assert await terminal_session.get_replay_buffer() == b"test data"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_buffer_multiple_adds(self, terminal_session):
|
|
"""Test adding multiple chunks to replay buffer."""
|
|
await terminal_session._add_to_replay_buffer(b"chunk1")
|
|
await terminal_session._add_to_replay_buffer(b"chunk2")
|
|
assert await terminal_session.get_replay_buffer() == b"chunk1chunk2"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_replay_buffer_overflow(self, terminal_session):
|
|
"""Test that replay buffer trims old data when exceeding limit."""
|
|
# Add more data than buffer size
|
|
chunk_size = 1024
|
|
for _i in range(100): # 100KB total
|
|
await terminal_session._add_to_replay_buffer(b"x" * chunk_size)
|
|
|
|
# Buffer should be trimmed
|
|
assert terminal_session._replay_buffer_size <= REPLAY_BUFFER_SIZE + chunk_size
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_screen_state_updates_with_data(self, terminal_session):
|
|
"""Test that pyte screen updates when data is received."""
|
|
await terminal_session._update_screen(b"Hello World\r\n")
|
|
lines = await terminal_session.get_screen_lines()
|
|
assert "Hello World" in lines[0]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_screen_handles_cursor_positioning(self, terminal_session):
|
|
"""Test that pyte screen correctly handles cursor positioning (tmux-style)."""
|
|
await terminal_session._update_screen(b"Line 1\r\nLine 2\r\nLine 3\r\n")
|
|
# Move cursor to line 2, column 1 and clear line, then write new content
|
|
await terminal_session._update_screen(b"\x1b[2;1H\x1b[KUpdated Line 2")
|
|
|
|
lines = await terminal_session.get_screen_lines()
|
|
|
|
assert lines[0] == "Line 1"
|
|
assert lines[1] == "Updated Line 2"
|
|
assert lines[2] == "Line 3"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_state_returns_dirty_flag(self, terminal_session):
|
|
"""Test that get_screen_state returns has_changes flag based on pyte dirty tracking."""
|
|
# After creation, all rows are dirty (initialized)
|
|
_w, _h, _buf, has_changes = await terminal_session.get_screen_state()
|
|
assert has_changes is True
|
|
|
|
# After getting state, dirty set is cleared
|
|
_, _, _, has_changes = await terminal_session.get_screen_state()
|
|
assert has_changes is False
|
|
|
|
# Feed new data
|
|
await terminal_session._update_screen(b"New content\r\n")
|
|
_, _, _, has_changes = await terminal_session.get_screen_state()
|
|
assert has_changes is True
|
|
|
|
# Check again without new data
|
|
_, _, _, has_changes = await terminal_session.get_screen_state()
|
|
assert has_changes is False
|
|
|
|
def test_update_connector(self, terminal_session):
|
|
"""Test updating connector."""
|
|
mock_connector = MagicMock()
|
|
terminal_session.update_connector(mock_connector)
|
|
assert terminal_session._connector == mock_connector
|
|
|
|
def test_is_running_not_started(self, terminal_session):
|
|
"""Test is_running when session not started."""
|
|
assert terminal_session.is_running() is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_bytes_no_fd(self, terminal_session):
|
|
"""Test send_bytes returns False when no master_fd."""
|
|
result = await terminal_session.send_bytes(b"test")
|
|
assert result is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_meta(self, terminal_session):
|
|
"""Test send_meta returns True."""
|
|
result = await terminal_session.send_meta({})
|
|
assert result is True
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_no_pid(self, terminal_session):
|
|
"""Test close when no pid."""
|
|
await terminal_session.close() # Should not raise
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_no_task(self, terminal_session):
|
|
"""Test wait when no task."""
|
|
await terminal_session.wait() # Should not raise
|
|
|
|
def test_repr(self, terminal_session):
|
|
"""Test repr output."""
|
|
repr_str = repr(terminal_session)
|
|
assert "test-session" in repr_str
|
|
assert "bash" in repr_str
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_uses_shlex_split_and_execvp_with_args(self, mock_poller):
|
|
command = 'echo "hello world"'
|
|
session = TerminalSession(mock_poller, "test-session", command)
|
|
|
|
with (
|
|
patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)) as mock_fork,
|
|
patch("webterm.terminal_session.version", return_value="0.0.0"),
|
|
patch("webterm.terminal_session.shlex.split", wraps=shlex.split) as mock_split,
|
|
patch("webterm.terminal_session.os.execvp", side_effect=OSError()) as mock_execvp,
|
|
patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
|
|
pytest.raises(SystemExit),
|
|
):
|
|
await session.open()
|
|
|
|
mock_fork.assert_called_once()
|
|
mock_split.assert_called_once_with(command)
|
|
mock_execvp.assert_called_once_with("echo", ["echo", "hello world"])
|
|
mock_exit.assert_called_once_with(1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_parent_branch_sets_fd_and_pid(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
|
|
with (
|
|
patch("webterm.terminal_session.pty.fork", return_value=(1234, 99)),
|
|
patch.object(session, "_set_terminal_size") as set_size,
|
|
):
|
|
await session.open(width=80, height=24)
|
|
|
|
assert session.pid == 1234
|
|
assert session.master_fd == 99
|
|
set_size.assert_called_once_with(80, 24)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_bad_command_exits(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bad")
|
|
|
|
with (
|
|
patch("webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)),
|
|
patch("webterm.terminal_session.shlex.split", side_effect=ValueError("bad")),
|
|
patch("webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
|
|
pytest.raises(SystemExit),
|
|
):
|
|
await session.open()
|
|
|
|
mock_exit.assert_called_once_with(1)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_lines_strips(self, terminal_session, dummy_lock):
|
|
terminal_session._screen = MagicMock()
|
|
terminal_session._screen.display = ["line ", "next"]
|
|
terminal_session._screen_lock = dummy_lock
|
|
|
|
lines = await terminal_session.get_screen_lines()
|
|
assert lines == ["line", "next"]
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_state_no_changes(
|
|
self, terminal_session, dummy_lock, mock_screen_char
|
|
):
|
|
terminal_session._screen = MagicMock()
|
|
terminal_session._screen.columns = 1
|
|
terminal_session._screen.lines = 1
|
|
terminal_session._screen.dirty = set()
|
|
terminal_session._screen.buffer = [[mock_screen_char()]]
|
|
terminal_session._sync_pyte_to_pty = AsyncMock()
|
|
terminal_session._screen_lock = dummy_lock
|
|
|
|
width, height, _buffer, changed = await terminal_session.get_screen_state()
|
|
assert width == 1
|
|
assert height == 1
|
|
assert changed is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_state_clears_dirty(
|
|
self, terminal_session, dummy_lock, mock_screen_char
|
|
):
|
|
terminal_session._screen = MagicMock()
|
|
terminal_session._screen.columns = 2
|
|
terminal_session._screen.lines = 1
|
|
terminal_session._screen.dirty = {1}
|
|
terminal_session._screen.buffer = [[mock_screen_char("x"), mock_screen_char("y")]]
|
|
terminal_session._sync_pyte_to_pty = AsyncMock()
|
|
terminal_session._screen_lock = dummy_lock
|
|
|
|
width, height, _buffer, changed = await terminal_session.get_screen_state()
|
|
assert width == 2
|
|
assert height == 1
|
|
assert changed is True
|
|
assert terminal_session._screen.dirty == set()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_snapshot_does_not_mutate_state(
|
|
self, terminal_session, dummy_lock, mock_screen_char
|
|
):
|
|
"""Test that get_screen_snapshot doesn't call _sync_pyte_to_pty or clear dirty."""
|
|
terminal_session._screen = MagicMock()
|
|
terminal_session._screen.columns = 2
|
|
terminal_session._screen.lines = 1
|
|
terminal_session._screen.dirty = {0}
|
|
terminal_session._screen.buffer = [[mock_screen_char("a"), mock_screen_char("b")]]
|
|
terminal_session._sync_pyte_to_pty = AsyncMock()
|
|
terminal_session._screen_lock = dummy_lock
|
|
terminal_session._change_counter = 1
|
|
terminal_session._last_snapshot_counter = 0
|
|
|
|
width, height, buffer, has_changes = await terminal_session.get_screen_snapshot()
|
|
|
|
# Verify dimensions and data returned correctly
|
|
assert width == 2
|
|
assert height == 1
|
|
assert has_changes is True
|
|
assert buffer[0][0]["data"] == "a"
|
|
assert buffer[0][1]["data"] == "b"
|
|
|
|
# Verify no mutation: _sync_pyte_to_pty not called, dirty not cleared
|
|
terminal_session._sync_pyte_to_pty.assert_not_awaited()
|
|
assert terminal_session._screen.dirty == {0} # NOT cleared
|
|
|
|
# Snapshot counter should be updated for change tracking
|
|
assert terminal_session._last_snapshot_counter == 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_snapshot_tracks_changes_correctly(self, terminal_session, dummy_lock):
|
|
"""Test that repeated snapshots correctly track changes."""
|
|
terminal_session._screen_lock = dummy_lock
|
|
terminal_session._change_counter = 5
|
|
terminal_session._last_snapshot_counter = 5
|
|
|
|
# No changes since last snapshot
|
|
_, _, _, has_changes = await terminal_session.get_screen_snapshot()
|
|
assert has_changes is False
|
|
|
|
# Simulate new screen data
|
|
terminal_session._change_counter = 6
|
|
_, _, _, has_changes = await terminal_session.get_screen_snapshot()
|
|
assert has_changes is True
|
|
assert terminal_session._last_snapshot_counter == 6
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_screen_increments_change_counter(self, terminal_session):
|
|
"""Test that _update_screen increments change counter when screen changes."""
|
|
initial_counter = terminal_session._change_counter
|
|
|
|
# Feed data that will mark screen as dirty
|
|
await terminal_session._update_screen(b"Hello\r\n")
|
|
|
|
assert terminal_session._change_counter > initial_counter
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_terminal_size_increments_change_counter(self, terminal_session, mock_poller):
|
|
"""Test that set_terminal_size increments change counter."""
|
|
terminal_session.master_fd = 10
|
|
initial_counter = terminal_session._change_counter
|
|
|
|
loop = asyncio.get_running_loop()
|
|
with patch.object(loop, "run_in_executor", new=AsyncMock()):
|
|
await terminal_session.set_terminal_size(100, 50)
|
|
|
|
assert terminal_session._change_counter == initial_counter + 1
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_screen_has_changes_uses_change_counter(self, terminal_session, dummy_lock):
|
|
"""Test that get_screen_has_changes uses the change counter."""
|
|
terminal_session._screen_lock = dummy_lock
|
|
|
|
# Initially no changes
|
|
terminal_session._change_counter = 0
|
|
terminal_session._last_snapshot_counter = 0
|
|
assert await terminal_session.get_screen_has_changes() is False
|
|
|
|
# After screen update increments counter
|
|
terminal_session._change_counter = 1
|
|
assert await terminal_session.get_screen_has_changes() is True
|
|
|
|
# After snapshot resets detection
|
|
terminal_session._last_snapshot_counter = 1
|
|
assert await terminal_session.get_screen_has_changes() is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_bytes_handles_closed_fd(self, mock_poller):
|
|
mock_poller.write = AsyncMock(side_effect=KeyError)
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
ok = await session.send_bytes(b"test")
|
|
assert ok is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_reads_from_poller_and_closes(self, mock_poller):
|
|
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
|
|
await queue.put(b"hello")
|
|
await queue.put(None)
|
|
|
|
mock_poller.add_file = MagicMock(return_value=queue)
|
|
mock_poller.remove_file = MagicMock()
|
|
|
|
connector = MagicMock()
|
|
connector.on_data = AsyncMock()
|
|
connector.on_close = AsyncMock()
|
|
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
session._connector = connector
|
|
|
|
with patch("webterm.terminal_session.os.close") as mock_close:
|
|
await session.run()
|
|
|
|
connector.on_data.assert_awaited_once_with(b"hello")
|
|
connector.on_close.assert_awaited_once()
|
|
mock_poller.remove_file.assert_called_once_with(10)
|
|
mock_close.assert_called_once_with(10)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_updates_connector_when_already_running(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
existing = asyncio.create_task(asyncio.sleep(0))
|
|
session._task = existing
|
|
|
|
connector = MagicMock()
|
|
task = await session.start(connector)
|
|
assert task is existing
|
|
assert session._connector is connector
|
|
|
|
await existing
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_send_bytes_writes_via_poller(self, mock_poller):
|
|
mock_poller.write = AsyncMock()
|
|
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
assert await session.send_bytes(b"x") is True
|
|
mock_poller.write.assert_awaited_once_with(10, b"x")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_open_set_terminal_size_oserror_closes_fd_and_clears_master_fd(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
|
|
with (
|
|
patch("webterm.terminal_session.pty.fork", return_value=(1234, 99)),
|
|
patch.object(session, "_set_terminal_size", side_effect=OSError("bad")),
|
|
patch("webterm.terminal_session.os.close") as mock_close,
|
|
pytest.raises(OSError),
|
|
):
|
|
await session.open(width=80, height=24)
|
|
|
|
mock_close.assert_called_once_with(99)
|
|
assert session.master_fd is None
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_set_terminal_size_uses_executor(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
loop = asyncio.get_running_loop()
|
|
with patch.object(loop, "run_in_executor", new=AsyncMock()) as run_in_executor:
|
|
await session.set_terminal_size(80, 24)
|
|
|
|
run_in_executor.assert_awaited_once_with(None, session._set_terminal_size, 80, 24)
|
|
|
|
def test__set_terminal_size_calls_ioctl(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
with patch("webterm.terminal_session.fcntl.ioctl") as mock_ioctl:
|
|
session._set_terminal_size(80, 24)
|
|
|
|
assert mock_ioctl.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_creates_task_when_not_running(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
|
|
session.run = AsyncMock() # type: ignore[method-assign]
|
|
|
|
connector = MagicMock()
|
|
task = await session.start(connector)
|
|
assert task is session._task
|
|
assert session._connector is connector
|
|
|
|
await task
|
|
session.run.assert_awaited_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_without_connector_still_closes(self, mock_poller):
|
|
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
|
|
await queue.put(b"hello")
|
|
await queue.put(None)
|
|
|
|
mock_poller.add_file = MagicMock(return_value=queue)
|
|
mock_poller.remove_file = MagicMock()
|
|
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
session._connector = None
|
|
|
|
with patch("webterm.terminal_session.os.close") as mock_close:
|
|
await session.run()
|
|
|
|
mock_poller.remove_file.assert_called_once_with(10)
|
|
mock_close.assert_called_once_with(10)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_oserror_still_closes(self, mock_poller):
|
|
queue = MagicMock()
|
|
queue.get = AsyncMock(side_effect=OSError("boom"))
|
|
|
|
mock_poller.add_file = MagicMock(return_value=queue)
|
|
mock_poller.remove_file = MagicMock()
|
|
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
session._connector = None
|
|
|
|
with patch("webterm.terminal_session.os.close") as mock_close:
|
|
await session.run()
|
|
|
|
mock_poller.remove_file.assert_called_once_with(10)
|
|
mock_close.assert_called_once_with(10)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_process_lookup_error_is_ignored(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.pid = 123
|
|
|
|
with patch("webterm.terminal_session.os.kill", side_effect=ProcessLookupError()):
|
|
await session.close()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close_logs_warning_on_unexpected_exception(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.pid = 123
|
|
|
|
with (
|
|
patch("webterm.terminal_session.os.kill", side_effect=RuntimeError("x")),
|
|
patch("webterm.terminal_session.log.warning") as warn,
|
|
):
|
|
await session.close()
|
|
|
|
assert warn.called
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_wait_suppresses_cancelled_error(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
|
|
task = asyncio.create_task(asyncio.sleep(10))
|
|
task.cancel()
|
|
session._task = task
|
|
|
|
await session.wait()
|
|
|
|
def test_is_running_false_when_kill_fails(self, mock_poller):
|
|
session = TerminalSession(mock_poller, "sid", "bash")
|
|
session.master_fd = 10
|
|
session._task = MagicMock()
|
|
session.pid = 123
|
|
|
|
with patch("webterm.terminal_session.os.kill", side_effect=OSError()):
|
|
assert session.is_running() is False
|