Files
webterm/tests/test_docker_exec_session.py
T
2026-02-01 09:45:23 +00:00

275 lines
8.8 KiB
Python

"""Tests for docker_exec_session module."""
from __future__ import annotations
import asyncio
from collections import deque
from unittest.mock import AsyncMock, MagicMock, call, patch
import pytest
from webterm.docker_exec_session import (
REPLAY_BUFFER_SIZE,
DockerExecSession,
DockerExecSpec,
)
class FakeSocket:
"""Simple fake socket for unit tests."""
def __init__(self, recv_chunks: list[bytes] | None = None) -> None:
self._recv_chunks = deque(recv_chunks or [])
self.sent = b""
self.connected_path: str | None = None
self.timeout: float | None = None
self.closed = False
def settimeout(self, timeout: float | None) -> None:
self.timeout = timeout
def connect(self, path: str) -> None:
self.connected_path = path
def sendall(self, data: bytes) -> None:
self.sent += data
def recv(self, size: int) -> bytes:
if not self._recv_chunks:
return b""
chunk = self._recv_chunks.popleft()
if len(chunk) > size:
self._recv_chunks.appendleft(chunk[size:])
return chunk[:size]
return chunk
def close(self) -> None:
self.closed = True
class FakePoller:
"""Minimal fake poller for unit tests."""
def __init__(self, queue: asyncio.Queue[bytes | None]) -> None:
self.queue = queue
self.added: int | None = None
self.removed: int | None = None
def add_file(self, file_descriptor: int) -> asyncio.Queue[bytes | None]:
self.added = file_descriptor
return self.queue
def remove_file(self, file_descriptor: int) -> None:
self.removed = file_descriptor
def build_response(status: int, body: bytes, headers: dict[str, str] | None = None) -> bytes:
header_map = {"Content-Length": str(len(body))}
if headers:
header_map.update(headers)
lines = [f"HTTP/1.1 {status} Status"] + [f"{k}: {v}" for k, v in header_map.items()]
return ("\r\n".join(lines) + "\r\n\r\n").encode("utf-8") + body
@pytest.fixture
def docker_exec_session(mock_poller) -> DockerExecSession:
return DockerExecSession(
mock_poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
def test_read_http_response_reads_full_body(docker_exec_session):
header = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\nX-Test: value\r\n\r\nhello "
fake_socket = FakeSocket([header, b"world"])
status, headers, body = docker_exec_session._read_http_response(fake_socket)
assert status == 200
assert headers["content-length"] == "11"
assert headers["x-test"] == "value"
assert body == b"hello world"
def test_read_http_response_incomplete_headers_returns_empty(docker_exec_session):
fake_socket = FakeSocket([b"HTTP/1.1 200 OK\r\n"])
status, headers, body = docker_exec_session._read_http_response(fake_socket)
assert status == 0
assert headers == {}
assert body == b""
def test_request_json_success_sends_payload(docker_exec_session, monkeypatch):
response = build_response(200, b'{"ok": true}')
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
result = docker_exec_session._request_json("POST", "/test", {"alpha": "beta"})
assert result == {"ok": True}
assert fake_socket.connected_path == "/tmp/docker.sock"
request = fake_socket.sent.decode("utf-8")
assert "POST /test HTTP/1.1" in request
assert "Content-Type: application/json" in request
payload = '{"alpha": "beta"}'
assert f"Content-Length: {len(payload)}" in request
assert request.endswith(payload)
assert fake_socket.closed is True
def test_request_json_error_raises(docker_exec_session, monkeypatch):
response = build_response(404, b"nope")
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
with pytest.raises(RuntimeError, match=r"Docker API request failed \(404\)"):
docker_exec_session._request_json("GET", "/missing", None)
assert fake_socket.closed is True
def test_start_exec_socket_error_status_closes_socket(docker_exec_session, monkeypatch):
response = build_response(500, b"boom")
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
with pytest.raises(RuntimeError, match=r"Docker API exec start failed"):
docker_exec_session._start_exec_socket("exec-id")
assert fake_socket.closed is True
def test_resize_exec_calls_request_json(docker_exec_session):
docker_exec_session._exec_id = "exec-id"
docker_exec_session._request_json = MagicMock() # type: ignore[method-assign]
docker_exec_session._resize_exec(100, 40)
docker_exec_session._request_json.assert_called_once_with(
"POST",
"/exec/exec-id/resize?h=40&w=100",
None,
)
@pytest.mark.asyncio
async def test_update_screen_increments_change_counter(docker_exec_session):
initial_counter = docker_exec_session._change_counter
await docker_exec_session._update_screen(b"Hello\r\n")
assert docker_exec_session._change_counter > initial_counter
@pytest.mark.asyncio
async def test_update_screen_logs_on_exception(docker_exec_session):
with (
patch.object(docker_exec_session._stream, "feed", side_effect=RuntimeError("boom")),
patch("webterm.docker_exec_session.log.warning") as warn,
):
await docker_exec_session._update_screen(b"\xff")
assert warn.called
@pytest.mark.asyncio
async def test_update_screen_preserves_utf8_bytes_with_c1_values(docker_exec_session):
await docker_exec_session._update_screen("✓ ok\r\n".encode())
lines = await docker_exec_session.get_screen_lines()
assert "✓ ok" in lines[0]
@pytest.mark.asyncio
async def test_add_to_replay_buffer_trims_old_data(docker_exec_session):
first_chunk = b"a" * (REPLAY_BUFFER_SIZE - 1)
second_chunk = b"b" * 10
await docker_exec_session._add_to_replay_buffer(first_chunk)
await docker_exec_session._add_to_replay_buffer(second_chunk)
assert docker_exec_session._replay_buffer_size == len(second_chunk)
assert await docker_exec_session.get_replay_buffer() == second_chunk
@pytest.mark.asyncio
async def test_run_filters_da_responses():
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
await queue.put(b"hello\x1b[?1;10;0cworld")
await queue.put(b"done")
await queue.put(None)
poller = FakePoller(queue)
session = DockerExecSession(
poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
session.master_fd = 10
fake_socket = FakeSocket([])
session._sock = fake_socket
connector = MagicMock()
connector.on_data = AsyncMock()
connector.on_close = AsyncMock()
session._connector = connector
with (
patch.object(session, "_add_to_replay_buffer", new=AsyncMock()) as add_buffer,
patch.object(session, "_update_screen", new=AsyncMock()) as update_screen,
):
await session.run()
connector.on_data.assert_has_awaits([call(b"helloworld"), call(b"done")])
add_buffer.assert_has_awaits([call(b"helloworld"), call(b"done")])
update_screen.assert_has_awaits([call(b"helloworld"), call(b"done")])
connector.on_close.assert_awaited_once()
assert poller.removed == 10
assert fake_socket.closed is True
assert session.master_fd is None
assert session._escape_buffer == b""
@pytest.mark.asyncio
async def test_run_handles_partial_da_sequences():
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
await queue.put(b"hi\x1b[?1")
await queue.put(b"0;0cbye")
await queue.put(None)
poller = FakePoller(queue)
session = DockerExecSession(
poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
session.master_fd = 10
fake_socket = FakeSocket([])
session._sock = fake_socket
connector = MagicMock()
connector.on_data = AsyncMock()
connector.on_close = AsyncMock()
session._connector = connector
with (
patch.object(session, "_add_to_replay_buffer", new=AsyncMock()) as add_buffer,
patch.object(session, "_update_screen", new=AsyncMock()) as update_screen,
):
await session.run()
connector.on_data.assert_has_awaits([call(b"hi"), call(b"bye")])
add_buffer.assert_has_awaits([call(b"hi"), call(b"bye")])
update_screen.assert_has_awaits([call(b"hi"), call(b"bye")])
connector.on_close.assert_awaited_once()
assert poller.removed == 10
assert fake_socket.closed is True
assert session._escape_buffer == b""