Include app/terminal/exit modules in coverage
This commit is contained in:
+1
-4
@@ -99,10 +99,7 @@ branch = true
|
|||||||
omit = [
|
omit = [
|
||||||
"*/tests/*",
|
"*/tests/*",
|
||||||
"*/__pycache__/*",
|
"*/__pycache__/*",
|
||||||
# Integration-heavy modules that require running servers/processes
|
# Thread/FD polling integration is harder to deterministically unit test
|
||||||
"*/app_session.py",
|
|
||||||
"*/terminal_session.py",
|
|
||||||
"*/exit_poller.py",
|
|
||||||
"*/poller.py",
|
"*/poller.py",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import contextlib
|
import contextlib
|
||||||
|
import json
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -113,6 +114,8 @@ class TestAppSessionConnector:
|
|||||||
"""Create a mock connector."""
|
"""Create a mock connector."""
|
||||||
connector = MagicMock()
|
connector = MagicMock()
|
||||||
connector.on_data = AsyncMock()
|
connector.on_data = AsyncMock()
|
||||||
|
connector.on_meta = AsyncMock()
|
||||||
|
connector.on_binary_encoded_message = AsyncMock()
|
||||||
connector.on_close = AsyncMock()
|
connector.on_close = AsyncMock()
|
||||||
return connector
|
return connector
|
||||||
|
|
||||||
@@ -131,3 +134,60 @@ class TestAppSessionConnector:
|
|||||||
task.cancel()
|
task.cancel()
|
||||||
with contextlib.suppress(asyncio.CancelledError):
|
with contextlib.suppress(asyncio.CancelledError):
|
||||||
await task
|
await task
|
||||||
|
|
||||||
|
def test_encode_packet(self, tmp_path):
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid")
|
||||||
|
packet = session.encode_packet(b"D", b"abc")
|
||||||
|
assert packet[:1] == b"D"
|
||||||
|
assert packet[1:5] == (3).to_bytes(4, "big")
|
||||||
|
assert packet[5:] == b"abc"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_bytes_handles_broken_pipe(self, tmp_path):
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid")
|
||||||
|
stdin = MagicMock()
|
||||||
|
stdin.write = MagicMock(side_effect=BrokenPipeError())
|
||||||
|
stdin.drain = AsyncMock()
|
||||||
|
session._process = MagicMock(stdin=stdin)
|
||||||
|
assert await session.send_bytes(b"x") is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_meta_encodes_json_and_writes(self, tmp_path):
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid")
|
||||||
|
stdin = MagicMock()
|
||||||
|
stdin.write = MagicMock()
|
||||||
|
stdin.drain = AsyncMock()
|
||||||
|
session._process = MagicMock(stdin=stdin)
|
||||||
|
|
||||||
|
meta = {"type": "hello", "n": 1}
|
||||||
|
assert await session.send_meta(meta) is True
|
||||||
|
written = stdin.write.call_args.args[0]
|
||||||
|
assert written[:1] == b"M"
|
||||||
|
payload = written[5:]
|
||||||
|
assert json.loads(payload.decode("utf-8")) == meta
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_sets_env_and_cwd(self, tmp_path, monkeypatch):
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid", devtools=True)
|
||||||
|
|
||||||
|
fake_proc = MagicMock()
|
||||||
|
fake_proc.stdin = MagicMock()
|
||||||
|
fake_proc.stdout = MagicMock()
|
||||||
|
fake_proc.stderr = MagicMock()
|
||||||
|
|
||||||
|
async def fake_create(command, **kwargs):
|
||||||
|
assert command == "echo test"
|
||||||
|
assert kwargs["cwd"] == str(tmp_path)
|
||||||
|
env = kwargs["env"]
|
||||||
|
assert env["COLUMNS"] == "100"
|
||||||
|
assert env["ROWS"] == "40"
|
||||||
|
assert "TEXTUAL" in env
|
||||||
|
return fake_proc
|
||||||
|
|
||||||
|
monkeypatch.setattr(asyncio, "create_subprocess_shell", fake_create)
|
||||||
|
monkeypatch.setattr(session, "set_terminal_size", AsyncMock())
|
||||||
|
|
||||||
|
await session.open(width=100, height=40)
|
||||||
|
assert session._process is fake_proc
|
||||||
|
|
||||||
|
# run() packet decoding coverage is exercised in test_app_session_run_packets.py
|
||||||
|
|||||||
@@ -0,0 +1,113 @@
|
|||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from textual_webterm.app_session import AppSession
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_connector():
|
||||||
|
connector = MagicMock()
|
||||||
|
connector.on_data = AsyncMock()
|
||||||
|
connector.on_meta = AsyncMock()
|
||||||
|
connector.on_binary_encoded_message = AsyncMock()
|
||||||
|
connector.on_close = AsyncMock()
|
||||||
|
return connector
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_run_decodes_packets_and_forwards(tmp_path, mock_connector, monkeypatch):
|
||||||
|
from textual_webterm import app_session
|
||||||
|
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid")
|
||||||
|
session._connector = mock_connector
|
||||||
|
session.start_time = 0.0
|
||||||
|
|
||||||
|
stdin = MagicMock()
|
||||||
|
stdin.write = MagicMock()
|
||||||
|
stdin.drain = AsyncMock()
|
||||||
|
|
||||||
|
stdout = MagicMock()
|
||||||
|
# Provide a second empty line so AppSession's readiness loop terminates cleanly.
|
||||||
|
stdout.readline = AsyncMock(side_effect=[b"__GANGLION__\n", b""])
|
||||||
|
|
||||||
|
payload_data = b"hello"
|
||||||
|
payload_meta = json.dumps({"type": "custom", "x": 1}).encode("utf-8")
|
||||||
|
payload_meta_exit = json.dumps({"type": "exit"}).encode("utf-8")
|
||||||
|
payload_bin = b"\x00\x01"
|
||||||
|
|
||||||
|
read_parts = [
|
||||||
|
b"D",
|
||||||
|
len(payload_data).to_bytes(4, "big"),
|
||||||
|
payload_data,
|
||||||
|
b"M",
|
||||||
|
len(payload_meta).to_bytes(4, "big"),
|
||||||
|
payload_meta,
|
||||||
|
b"M",
|
||||||
|
len(payload_meta_exit).to_bytes(4, "big"),
|
||||||
|
payload_meta_exit,
|
||||||
|
b"P",
|
||||||
|
len(payload_bin).to_bytes(4, "big"),
|
||||||
|
payload_bin,
|
||||||
|
]
|
||||||
|
|
||||||
|
async def readexactly(n: int) -> bytes:
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
if not read_parts:
|
||||||
|
raise asyncio.IncompleteReadError(partial=b"", expected=n)
|
||||||
|
part = read_parts.pop(0)
|
||||||
|
assert len(part) == n
|
||||||
|
return part
|
||||||
|
|
||||||
|
stdout.readexactly = AsyncMock(side_effect=readexactly)
|
||||||
|
|
||||||
|
stderr = MagicMock()
|
||||||
|
stderr.read = AsyncMock(return_value=b"")
|
||||||
|
|
||||||
|
session._process = MagicMock(stdin=stdin, stdout=stdout, stderr=stderr, returncode=0)
|
||||||
|
monkeypatch.setattr(app_session.constants, "DEBUG", False)
|
||||||
|
|
||||||
|
await session.run()
|
||||||
|
|
||||||
|
mock_connector.on_data.assert_awaited_once_with(payload_data)
|
||||||
|
mock_connector.on_meta.assert_awaited_once_with({"type": "custom", "x": 1})
|
||||||
|
mock_connector.on_binary_encoded_message.assert_awaited_once_with(payload_bin)
|
||||||
|
assert stdin.write.called
|
||||||
|
mock_connector.on_close.assert_awaited_once()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_run_payload_too_large_breaks_loop(tmp_path, mock_connector, monkeypatch):
|
||||||
|
from textual_webterm import app_session
|
||||||
|
|
||||||
|
session = AppSession(tmp_path, "echo test", "sid")
|
||||||
|
session._connector = mock_connector
|
||||||
|
session.start_time = 0.0
|
||||||
|
|
||||||
|
stdin = MagicMock()
|
||||||
|
stdin.write = MagicMock()
|
||||||
|
stdin.drain = AsyncMock()
|
||||||
|
|
||||||
|
stdout = MagicMock()
|
||||||
|
stdout.readline = AsyncMock(side_effect=[b"__GANGLION__\n", b""])
|
||||||
|
|
||||||
|
async def readexactly(n: int) -> bytes:
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
if n == 1:
|
||||||
|
return b"D"
|
||||||
|
if n == 4:
|
||||||
|
return (app_session.MAX_PAYLOAD_SIZE + 1).to_bytes(4, "big")
|
||||||
|
raise asyncio.IncompleteReadError(partial=b"", expected=n)
|
||||||
|
|
||||||
|
stdout.readexactly = AsyncMock(side_effect=readexactly)
|
||||||
|
|
||||||
|
stderr = MagicMock()
|
||||||
|
stderr.read = AsyncMock(return_value=b"")
|
||||||
|
|
||||||
|
session._process = MagicMock(stdin=stdin, stdout=stdout, stderr=stderr, returncode=0)
|
||||||
|
monkeypatch.setattr(app_session.constants, "DEBUG", False)
|
||||||
|
|
||||||
|
await session.run()
|
||||||
|
mock_connector.on_close.assert_awaited_once()
|
||||||
@@ -0,0 +1,65 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_exit_poller_noop_when_idle_wait_zero(monkeypatch):
|
||||||
|
from textual_webterm import exit_poller
|
||||||
|
from textual_webterm.exit_poller import ExitPoller
|
||||||
|
|
||||||
|
monkeypatch.setattr(exit_poller, "EXIT_POLL_RATE", 0.01)
|
||||||
|
|
||||||
|
class FakeServer:
|
||||||
|
def __init__(self):
|
||||||
|
class SM:
|
||||||
|
def __init__(self):
|
||||||
|
self.sessions = {}
|
||||||
|
|
||||||
|
self.session_manager = SM()
|
||||||
|
self.exited = False
|
||||||
|
|
||||||
|
def force_exit(self):
|
||||||
|
self.exited = True
|
||||||
|
|
||||||
|
server = FakeServer()
|
||||||
|
poller = ExitPoller(server, idle_wait=0)
|
||||||
|
poller.start()
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
poller.stop()
|
||||||
|
assert server.exited is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_exit_poller_resets_idle_timer_when_session_appears(monkeypatch):
|
||||||
|
from textual_webterm import exit_poller
|
||||||
|
from textual_webterm.exit_poller import ExitPoller
|
||||||
|
|
||||||
|
monkeypatch.setattr(exit_poller, "EXIT_POLL_RATE", 0.01)
|
||||||
|
|
||||||
|
class FakeServer:
|
||||||
|
def __init__(self):
|
||||||
|
class SM:
|
||||||
|
def __init__(self):
|
||||||
|
self.sessions = {}
|
||||||
|
|
||||||
|
self.session_manager = SM()
|
||||||
|
self.exited = False
|
||||||
|
|
||||||
|
def force_exit(self):
|
||||||
|
self.exited = True
|
||||||
|
|
||||||
|
server = FakeServer()
|
||||||
|
poller = ExitPoller(server, idle_wait=0.05)
|
||||||
|
poller.start()
|
||||||
|
|
||||||
|
# Let it become idle briefly, then add a session to reset.
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
server.session_manager.sessions["x"] = object()
|
||||||
|
await asyncio.sleep(0.02)
|
||||||
|
server.session_manager.sessions.clear()
|
||||||
|
|
||||||
|
# Now ensure it can still exit after being idle long enough.
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
poller.stop()
|
||||||
|
assert server.exited is True
|
||||||
@@ -1,10 +1,11 @@
|
|||||||
"""Tests for terminal_session module."""
|
"""Tests for terminal_session module."""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
import pty
|
import pty
|
||||||
import shlex
|
import shlex
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import AsyncMock, MagicMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
@@ -192,3 +193,96 @@ class TestTerminalSession:
|
|||||||
mock_split.assert_called_once_with(command)
|
mock_split.assert_called_once_with(command)
|
||||||
mock_execvp.assert_called_once_with("echo", ["echo", "hello world"])
|
mock_execvp.assert_called_once_with("echo", ["echo", "hello world"])
|
||||||
mock_exit.assert_called_once_with(1)
|
mock_exit.assert_called_once_with(1)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_parent_branch_sets_fd_and_pid(self):
|
||||||
|
from textual_webterm.terminal_session import TerminalSession
|
||||||
|
|
||||||
|
poller = MagicMock()
|
||||||
|
session = TerminalSession(poller, "sid", "bash")
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("textual_webterm.terminal_session.pty.fork", return_value=(1234, 99)),
|
||||||
|
patch.object(session, "_set_terminal_size") as set_size,
|
||||||
|
):
|
||||||
|
await session.open(width=80, height=24)
|
||||||
|
|
||||||
|
assert session.pid == 1234
|
||||||
|
assert session.master_fd == 99
|
||||||
|
set_size.assert_called_once_with(80, 24)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_open_bad_command_exits(self):
|
||||||
|
from textual_webterm.terminal_session import TerminalSession
|
||||||
|
|
||||||
|
poller = MagicMock()
|
||||||
|
session = TerminalSession(poller, "sid", "bad")
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch("textual_webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)),
|
||||||
|
patch("textual_webterm.terminal_session.shlex.split", side_effect=ValueError("bad")),
|
||||||
|
patch("textual_webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
|
||||||
|
pytest.raises(SystemExit),
|
||||||
|
):
|
||||||
|
await session.open()
|
||||||
|
|
||||||
|
mock_exit.assert_called_once_with(1)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_run_reads_from_poller_and_closes(self):
|
||||||
|
from textual_webterm.terminal_session import TerminalSession
|
||||||
|
|
||||||
|
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
|
||||||
|
await queue.put(b"hello")
|
||||||
|
await queue.put(None)
|
||||||
|
|
||||||
|
poller = MagicMock()
|
||||||
|
poller.add_file = MagicMock(return_value=queue)
|
||||||
|
poller.remove_file = MagicMock()
|
||||||
|
|
||||||
|
connector = MagicMock()
|
||||||
|
connector.on_data = AsyncMock()
|
||||||
|
connector.on_close = AsyncMock()
|
||||||
|
|
||||||
|
session = TerminalSession(poller, "sid", "bash")
|
||||||
|
session.master_fd = 10
|
||||||
|
session._connector = connector
|
||||||
|
|
||||||
|
with patch("textual_webterm.terminal_session.os.close") as mock_close:
|
||||||
|
await session.run()
|
||||||
|
|
||||||
|
connector.on_data.assert_awaited_once_with(b"hello")
|
||||||
|
connector.on_close.assert_awaited_once()
|
||||||
|
poller.remove_file.assert_called_once_with(10)
|
||||||
|
mock_close.assert_called_once_with(10)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_updates_connector_when_already_running(self):
|
||||||
|
from textual_webterm.terminal_session import TerminalSession
|
||||||
|
|
||||||
|
poller = MagicMock()
|
||||||
|
session = TerminalSession(poller, "sid", "bash")
|
||||||
|
session.master_fd = 10
|
||||||
|
|
||||||
|
existing = asyncio.create_task(asyncio.sleep(0))
|
||||||
|
session._task = existing
|
||||||
|
|
||||||
|
connector = MagicMock()
|
||||||
|
task = await session.start(connector)
|
||||||
|
assert task is existing
|
||||||
|
assert session._connector is connector
|
||||||
|
|
||||||
|
await existing
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_send_bytes_writes_via_poller(self):
|
||||||
|
from textual_webterm.terminal_session import TerminalSession
|
||||||
|
|
||||||
|
poller = MagicMock()
|
||||||
|
poller.write = AsyncMock()
|
||||||
|
|
||||||
|
session = TerminalSession(poller, "sid", "bash")
|
||||||
|
session.master_fd = 10
|
||||||
|
|
||||||
|
assert await session.send_bytes(b"x") is True
|
||||||
|
poller.write.assert_awaited_once_with(10, b"x")
|
||||||
|
|||||||
Reference in New Issue
Block a user