diff --git a/pyproject.toml b/pyproject.toml index 3426741..da91eed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -99,10 +99,7 @@ branch = true omit = [ "*/tests/*", "*/__pycache__/*", - # Integration-heavy modules that require running servers/processes - "*/app_session.py", - "*/terminal_session.py", - "*/exit_poller.py", + # Thread/FD polling integration is harder to deterministically unit test "*/poller.py", ] diff --git a/tests/test_app_session.py b/tests/test_app_session.py index 71ced15..635c251 100644 --- a/tests/test_app_session.py +++ b/tests/test_app_session.py @@ -2,6 +2,7 @@ import asyncio import contextlib +import json from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -113,6 +114,8 @@ class TestAppSessionConnector: """Create a mock connector.""" connector = MagicMock() connector.on_data = AsyncMock() + connector.on_meta = AsyncMock() + connector.on_binary_encoded_message = AsyncMock() connector.on_close = AsyncMock() return connector @@ -131,3 +134,60 @@ class TestAppSessionConnector: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task + + def test_encode_packet(self, tmp_path): + session = AppSession(tmp_path, "echo test", "sid") + packet = session.encode_packet(b"D", b"abc") + assert packet[:1] == b"D" + assert packet[1:5] == (3).to_bytes(4, "big") + assert packet[5:] == b"abc" + + @pytest.mark.asyncio + async def test_send_bytes_handles_broken_pipe(self, tmp_path): + session = AppSession(tmp_path, "echo test", "sid") + stdin = MagicMock() + stdin.write = MagicMock(side_effect=BrokenPipeError()) + stdin.drain = AsyncMock() + session._process = MagicMock(stdin=stdin) + assert await session.send_bytes(b"x") is False + + @pytest.mark.asyncio + async def test_send_meta_encodes_json_and_writes(self, tmp_path): + session = AppSession(tmp_path, "echo test", "sid") + stdin = MagicMock() + stdin.write = MagicMock() + stdin.drain = AsyncMock() + session._process = MagicMock(stdin=stdin) + + meta = {"type": "hello", "n": 1} + assert await session.send_meta(meta) is True + written = stdin.write.call_args.args[0] + assert written[:1] == b"M" + payload = written[5:] + assert json.loads(payload.decode("utf-8")) == meta + + @pytest.mark.asyncio + async def test_open_sets_env_and_cwd(self, tmp_path, monkeypatch): + session = AppSession(tmp_path, "echo test", "sid", devtools=True) + + fake_proc = MagicMock() + fake_proc.stdin = MagicMock() + fake_proc.stdout = MagicMock() + fake_proc.stderr = MagicMock() + + async def fake_create(command, **kwargs): + assert command == "echo test" + assert kwargs["cwd"] == str(tmp_path) + env = kwargs["env"] + assert env["COLUMNS"] == "100" + assert env["ROWS"] == "40" + assert "TEXTUAL" in env + return fake_proc + + monkeypatch.setattr(asyncio, "create_subprocess_shell", fake_create) + monkeypatch.setattr(session, "set_terminal_size", AsyncMock()) + + await session.open(width=100, height=40) + assert session._process is fake_proc + + # run() packet decoding coverage is exercised in test_app_session_run_packets.py diff --git a/tests/test_app_session_run_packets.py b/tests/test_app_session_run_packets.py new file mode 100644 index 0000000..fc022a5 --- /dev/null +++ b/tests/test_app_session_run_packets.py @@ -0,0 +1,113 @@ +import asyncio +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from textual_webterm.app_session import AppSession + + +@pytest.fixture +def mock_connector(): + connector = MagicMock() + connector.on_data = AsyncMock() + connector.on_meta = AsyncMock() + connector.on_binary_encoded_message = AsyncMock() + connector.on_close = AsyncMock() + return connector + + +@pytest.mark.asyncio +async def test_run_decodes_packets_and_forwards(tmp_path, mock_connector, monkeypatch): + from textual_webterm import app_session + + session = AppSession(tmp_path, "echo test", "sid") + session._connector = mock_connector + session.start_time = 0.0 + + stdin = MagicMock() + stdin.write = MagicMock() + stdin.drain = AsyncMock() + + stdout = MagicMock() + # Provide a second empty line so AppSession's readiness loop terminates cleanly. + stdout.readline = AsyncMock(side_effect=[b"__GANGLION__\n", b""]) + + payload_data = b"hello" + payload_meta = json.dumps({"type": "custom", "x": 1}).encode("utf-8") + payload_meta_exit = json.dumps({"type": "exit"}).encode("utf-8") + payload_bin = b"\x00\x01" + + read_parts = [ + b"D", + len(payload_data).to_bytes(4, "big"), + payload_data, + b"M", + len(payload_meta).to_bytes(4, "big"), + payload_meta, + b"M", + len(payload_meta_exit).to_bytes(4, "big"), + payload_meta_exit, + b"P", + len(payload_bin).to_bytes(4, "big"), + payload_bin, + ] + + async def readexactly(n: int) -> bytes: + await asyncio.sleep(0) + if not read_parts: + raise asyncio.IncompleteReadError(partial=b"", expected=n) + part = read_parts.pop(0) + assert len(part) == n + return part + + stdout.readexactly = AsyncMock(side_effect=readexactly) + + stderr = MagicMock() + stderr.read = AsyncMock(return_value=b"") + + session._process = MagicMock(stdin=stdin, stdout=stdout, stderr=stderr, returncode=0) + monkeypatch.setattr(app_session.constants, "DEBUG", False) + + await session.run() + + mock_connector.on_data.assert_awaited_once_with(payload_data) + mock_connector.on_meta.assert_awaited_once_with({"type": "custom", "x": 1}) + mock_connector.on_binary_encoded_message.assert_awaited_once_with(payload_bin) + assert stdin.write.called + mock_connector.on_close.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_run_payload_too_large_breaks_loop(tmp_path, mock_connector, monkeypatch): + from textual_webterm import app_session + + session = AppSession(tmp_path, "echo test", "sid") + session._connector = mock_connector + session.start_time = 0.0 + + stdin = MagicMock() + stdin.write = MagicMock() + stdin.drain = AsyncMock() + + stdout = MagicMock() + stdout.readline = AsyncMock(side_effect=[b"__GANGLION__\n", b""]) + + async def readexactly(n: int) -> bytes: + await asyncio.sleep(0) + if n == 1: + return b"D" + if n == 4: + return (app_session.MAX_PAYLOAD_SIZE + 1).to_bytes(4, "big") + raise asyncio.IncompleteReadError(partial=b"", expected=n) + + stdout.readexactly = AsyncMock(side_effect=readexactly) + + stderr = MagicMock() + stderr.read = AsyncMock(return_value=b"") + + session._process = MagicMock(stdin=stdin, stdout=stdout, stderr=stderr, returncode=0) + monkeypatch.setattr(app_session.constants, "DEBUG", False) + + await session.run() + mock_connector.on_close.assert_awaited_once() diff --git a/tests/test_exit_poller.py b/tests/test_exit_poller.py new file mode 100644 index 0000000..cc4b0c8 --- /dev/null +++ b/tests/test_exit_poller.py @@ -0,0 +1,65 @@ +import asyncio + +import pytest + + +@pytest.mark.asyncio +async def test_exit_poller_noop_when_idle_wait_zero(monkeypatch): + from textual_webterm import exit_poller + from textual_webterm.exit_poller import ExitPoller + + monkeypatch.setattr(exit_poller, "EXIT_POLL_RATE", 0.01) + + class FakeServer: + def __init__(self): + class SM: + def __init__(self): + self.sessions = {} + + self.session_manager = SM() + self.exited = False + + def force_exit(self): + self.exited = True + + server = FakeServer() + poller = ExitPoller(server, idle_wait=0) + poller.start() + await asyncio.sleep(0.05) + poller.stop() + assert server.exited is False + + +@pytest.mark.asyncio +async def test_exit_poller_resets_idle_timer_when_session_appears(monkeypatch): + from textual_webterm import exit_poller + from textual_webterm.exit_poller import ExitPoller + + monkeypatch.setattr(exit_poller, "EXIT_POLL_RATE", 0.01) + + class FakeServer: + def __init__(self): + class SM: + def __init__(self): + self.sessions = {} + + self.session_manager = SM() + self.exited = False + + def force_exit(self): + self.exited = True + + server = FakeServer() + poller = ExitPoller(server, idle_wait=0.05) + poller.start() + + # Let it become idle briefly, then add a session to reset. + await asyncio.sleep(0.02) + server.session_manager.sessions["x"] = object() + await asyncio.sleep(0.02) + server.session_manager.sessions.clear() + + # Now ensure it can still exit after being idle long enough. + await asyncio.sleep(0.1) + poller.stop() + assert server.exited is True diff --git a/tests/test_terminal_session.py b/tests/test_terminal_session.py index a1aea92..f4af607 100644 --- a/tests/test_terminal_session.py +++ b/tests/test_terminal_session.py @@ -1,10 +1,11 @@ """Tests for terminal_session module.""" +import asyncio import os import platform import pty import shlex -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -192,3 +193,96 @@ class TestTerminalSession: mock_split.assert_called_once_with(command) mock_execvp.assert_called_once_with("echo", ["echo", "hello world"]) mock_exit.assert_called_once_with(1) + + @pytest.mark.asyncio + async def test_open_parent_branch_sets_fd_and_pid(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + + with ( + patch("textual_webterm.terminal_session.pty.fork", return_value=(1234, 99)), + patch.object(session, "_set_terminal_size") as set_size, + ): + await session.open(width=80, height=24) + + assert session.pid == 1234 + assert session.master_fd == 99 + set_size.assert_called_once_with(80, 24) + + @pytest.mark.asyncio + async def test_open_bad_command_exits(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bad") + + with ( + patch("textual_webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)), + patch("textual_webterm.terminal_session.shlex.split", side_effect=ValueError("bad")), + patch("textual_webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit, + pytest.raises(SystemExit), + ): + await session.open() + + mock_exit.assert_called_once_with(1) + + @pytest.mark.asyncio + async def test_run_reads_from_poller_and_closes(self): + from textual_webterm.terminal_session import TerminalSession + + queue: asyncio.Queue[bytes | None] = asyncio.Queue() + await queue.put(b"hello") + await queue.put(None) + + poller = MagicMock() + poller.add_file = MagicMock(return_value=queue) + poller.remove_file = MagicMock() + + connector = MagicMock() + connector.on_data = AsyncMock() + connector.on_close = AsyncMock() + + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + session._connector = connector + + with patch("textual_webterm.terminal_session.os.close") as mock_close: + await session.run() + + connector.on_data.assert_awaited_once_with(b"hello") + connector.on_close.assert_awaited_once() + poller.remove_file.assert_called_once_with(10) + mock_close.assert_called_once_with(10) + + @pytest.mark.asyncio + async def test_start_updates_connector_when_already_running(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + + existing = asyncio.create_task(asyncio.sleep(0)) + session._task = existing + + connector = MagicMock() + task = await session.start(connector) + assert task is existing + assert session._connector is connector + + await existing + + @pytest.mark.asyncio + async def test_send_bytes_writes_via_poller(self): + from textual_webterm.terminal_session import TerminalSession + + poller = MagicMock() + poller.write = AsyncMock() + + session = TerminalSession(poller, "sid", "bash") + session.master_fd = 10 + + assert await session.send_bytes(b"x") is True + poller.write.assert_awaited_once_with(10, b"x")