Fix mobile key handling and improve coverage

This commit is contained in:
GitHub Copilot
2026-01-30 00:43:35 +00:00
parent 417471d337
commit 40535f448d
9 changed files with 452 additions and 45 deletions
+1 -5
View File
@@ -856,12 +856,8 @@ class LocalServer:
if cached_response is not None:
return cached_response
theme_name = None
app = self.session_manager.apps_by_slug.get(route_key)
if app is not None and app.theme:
theme_name = app.theme.lower()
else:
theme_name = self.theme.lower()
theme_name = app.theme.lower() if app is not None and app.theme else self.theme.lower()
palette = THEME_PALETTES.get(theme_name)
if palette is None:
File diff suppressed because one or more lines are too long
+30 -2
View File
@@ -651,6 +651,25 @@ class WebTerminal {
// Handle special keys via beforeinput to intercept before browser modifies textarea
textarea.addEventListener("beforeinput", (e) => {
if (e.inputType === "insertText" && e.data && (this.ctrlActive || this.shiftActive)) {
let toSend = e.data;
if (this.shiftActive && toSend.length === 1) {
toSend = toSend.toUpperCase();
}
if (this.ctrlActive && toSend.length === 1) {
const code = toSend.toUpperCase().charCodeAt(0);
if (code >= 65 && code <= 90) {
toSend = String.fromCharCode(code - 64);
}
}
e.preventDefault();
e.stopPropagation();
this.send(["stdin", toSend]);
textarea.value = "";
this.deactivateModifiers();
return;
}
let seq: string | null = null;
switch (e.inputType) {
case "insertLineBreak": // Enter key
@@ -665,6 +684,7 @@ class WebTerminal {
}
if (seq) {
e.preventDefault();
e.stopPropagation();
this.send(["stdin", seq]);
// Clear modifiers after sending special keys from soft keyboard
this.deactivateModifiers();
@@ -704,6 +724,7 @@ class WebTerminal {
const code = e.key.toUpperCase().charCodeAt(0);
if (code >= 65 && code <= 90) {
e.preventDefault();
e.stopPropagation();
this.send(["stdin", String.fromCharCode(code - 64)]); // Ctrl+A=0x01, Ctrl+C=0x03, etc.
this.deactivateModifiers(); // Clear modifiers after physical Ctrl+letter
return;
@@ -742,6 +763,7 @@ class WebTerminal {
}
if (seq) {
e.preventDefault();
e.stopPropagation();
this.send(["stdin", seq]);
// Always clear modifiers after any key
this.deactivateModifiers();
@@ -749,7 +771,9 @@ class WebTerminal {
});
// Apply keybar modifiers to physical keyboard input even when the textarea isn't focused.
document.addEventListener("keydown", (event) => {
document.addEventListener(
"keydown",
(event) => {
if (!this.ctrlActive && !this.shiftActive) {
return;
}
@@ -773,6 +797,7 @@ class WebTerminal {
}
}
event.preventDefault();
event.stopPropagation();
this.send(["stdin", toSend]);
handled = true;
} else {
@@ -815,6 +840,7 @@ class WebTerminal {
if (seq) {
event.preventDefault();
event.stopPropagation();
this.send(["stdin", seq]);
handled = true;
}
@@ -823,7 +849,9 @@ class WebTerminal {
if (handled) {
this.deactivateModifiers();
}
});
},
{ capture: true }
);
// Focus textarea on touch/click to show mobile keyboard
// iOS requires focus() to be called synchronously within the gesture
+267
View File
@@ -0,0 +1,267 @@
"""Tests for docker_exec_session module."""
from __future__ import annotations
import asyncio
from collections import deque
from unittest.mock import AsyncMock, MagicMock, call, patch
import pytest
from webterm.docker_exec_session import (
REPLAY_BUFFER_SIZE,
DockerExecSession,
DockerExecSpec,
)
class FakeSocket:
"""Simple fake socket for unit tests."""
def __init__(self, recv_chunks: list[bytes] | None = None) -> None:
self._recv_chunks = deque(recv_chunks or [])
self.sent = b""
self.connected_path: str | None = None
self.timeout: float | None = None
self.closed = False
def settimeout(self, timeout: float | None) -> None:
self.timeout = timeout
def connect(self, path: str) -> None:
self.connected_path = path
def sendall(self, data: bytes) -> None:
self.sent += data
def recv(self, size: int) -> bytes:
if not self._recv_chunks:
return b""
chunk = self._recv_chunks.popleft()
if len(chunk) > size:
self._recv_chunks.appendleft(chunk[size:])
return chunk[:size]
return chunk
def close(self) -> None:
self.closed = True
class FakePoller:
"""Minimal fake poller for unit tests."""
def __init__(self, queue: asyncio.Queue[bytes | None]) -> None:
self.queue = queue
self.added: int | None = None
self.removed: int | None = None
def add_file(self, file_descriptor: int) -> asyncio.Queue[bytes | None]:
self.added = file_descriptor
return self.queue
def remove_file(self, file_descriptor: int) -> None:
self.removed = file_descriptor
def build_response(status: int, body: bytes, headers: dict[str, str] | None = None) -> bytes:
header_map = {"Content-Length": str(len(body))}
if headers:
header_map.update(headers)
lines = [f"HTTP/1.1 {status} Status"] + [f"{k}: {v}" for k, v in header_map.items()]
return ("\r\n".join(lines) + "\r\n\r\n").encode("utf-8") + body
@pytest.fixture
def docker_exec_session(mock_poller) -> DockerExecSession:
return DockerExecSession(
mock_poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
def test_read_http_response_reads_full_body(docker_exec_session):
header = b"HTTP/1.1 200 OK\r\nContent-Length: 11\r\nX-Test: value\r\n\r\nhello "
fake_socket = FakeSocket([header, b"world"])
status, headers, body = docker_exec_session._read_http_response(fake_socket)
assert status == 200
assert headers["content-length"] == "11"
assert headers["x-test"] == "value"
assert body == b"hello world"
def test_read_http_response_incomplete_headers_returns_empty(docker_exec_session):
fake_socket = FakeSocket([b"HTTP/1.1 200 OK\r\n"])
status, headers, body = docker_exec_session._read_http_response(fake_socket)
assert status == 0
assert headers == {}
assert body == b""
def test_request_json_success_sends_payload(docker_exec_session, monkeypatch):
response = build_response(200, b'{"ok": true}')
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
result = docker_exec_session._request_json("POST", "/test", {"alpha": "beta"})
assert result == {"ok": True}
assert fake_socket.connected_path == "/tmp/docker.sock"
request = fake_socket.sent.decode("utf-8")
assert "POST /test HTTP/1.1" in request
assert "Content-Type: application/json" in request
payload = '{"alpha": "beta"}'
assert f"Content-Length: {len(payload)}" in request
assert request.endswith(payload)
assert fake_socket.closed is True
def test_request_json_error_raises(docker_exec_session, monkeypatch):
response = build_response(404, b"nope")
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
with pytest.raises(RuntimeError, match=r"Docker API request failed \(404\)"):
docker_exec_session._request_json("GET", "/missing", None)
assert fake_socket.closed is True
def test_start_exec_socket_error_status_closes_socket(docker_exec_session, monkeypatch):
response = build_response(500, b"boom")
fake_socket = FakeSocket([response])
def socket_factory(*_args, **_kwargs):
return fake_socket
monkeypatch.setattr("socket.socket", socket_factory)
with pytest.raises(RuntimeError, match=r"Docker API exec start failed"):
docker_exec_session._start_exec_socket("exec-id")
assert fake_socket.closed is True
def test_resize_exec_calls_request_json(docker_exec_session):
docker_exec_session._exec_id = "exec-id"
docker_exec_session._request_json = MagicMock() # type: ignore[method-assign]
docker_exec_session._resize_exec(100, 40)
docker_exec_session._request_json.assert_called_once_with(
"POST",
"/exec/exec-id/resize?h=40&w=100",
None,
)
@pytest.mark.asyncio
async def test_update_screen_increments_change_counter(docker_exec_session):
initial_counter = docker_exec_session._change_counter
await docker_exec_session._update_screen(b"Hello\r\n")
assert docker_exec_session._change_counter > initial_counter
@pytest.mark.asyncio
async def test_update_screen_logs_on_exception(docker_exec_session):
with (
patch.object(docker_exec_session._stream, "feed", side_effect=RuntimeError("boom")),
patch("webterm.docker_exec_session.log.warning") as warn,
):
await docker_exec_session._update_screen(b"\xff")
assert warn.called
@pytest.mark.asyncio
async def test_add_to_replay_buffer_trims_old_data(docker_exec_session):
first_chunk = b"a" * (REPLAY_BUFFER_SIZE - 1)
second_chunk = b"b" * 10
await docker_exec_session._add_to_replay_buffer(first_chunk)
await docker_exec_session._add_to_replay_buffer(second_chunk)
assert docker_exec_session._replay_buffer_size == len(second_chunk)
assert await docker_exec_session.get_replay_buffer() == second_chunk
@pytest.mark.asyncio
async def test_run_filters_da_responses():
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
await queue.put(b"hello\x1b[?1;10;0cworld")
await queue.put(b"done")
await queue.put(None)
poller = FakePoller(queue)
session = DockerExecSession(
poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
session.master_fd = 10
fake_socket = FakeSocket([])
session._sock = fake_socket
connector = MagicMock()
connector.on_data = AsyncMock()
connector.on_close = AsyncMock()
session._connector = connector
with (
patch.object(session, "_add_to_replay_buffer", new=AsyncMock()) as add_buffer,
patch.object(session, "_update_screen", new=AsyncMock()) as update_screen,
):
await session.run()
connector.on_data.assert_has_awaits([call(b"helloworld"), call(b"done")])
add_buffer.assert_has_awaits([call(b"helloworld"), call(b"done")])
update_screen.assert_has_awaits([call(b"helloworld"), call(b"done")])
connector.on_close.assert_awaited_once()
assert poller.removed == 10
assert fake_socket.closed is True
assert session.master_fd is None
assert session._escape_buffer == b""
@pytest.mark.asyncio
async def test_run_handles_partial_da_sequences():
queue: asyncio.Queue[bytes | None] = asyncio.Queue()
await queue.put(b"hi\x1b[?1")
await queue.put(b"0;0cbye")
await queue.put(None)
poller = FakePoller(queue)
session = DockerExecSession(
poller,
"sid",
DockerExecSpec(container="container", command=["/bin/sh"]),
socket_path="/tmp/docker.sock",
)
session.master_fd = 10
fake_socket = FakeSocket([])
session._sock = fake_socket
connector = MagicMock()
connector.on_data = AsyncMock()
connector.on_close = AsyncMock()
session._connector = connector
with (
patch.object(session, "_add_to_replay_buffer", new=AsyncMock()) as add_buffer,
patch.object(session, "_update_screen", new=AsyncMock()) as update_screen,
):
await session.run()
connector.on_data.assert_has_awaits([call(b"hi"), call(b"bye")])
add_buffer.assert_has_awaits([call(b"hi"), call(b"bye")])
update_screen.assert_has_awaits([call(b"hi"), call(b"bye")])
connector.on_close.assert_awaited_once()
assert poller.removed == 10
assert fake_socket.closed is True
assert session._escape_buffer == b""
+105 -1
View File
@@ -1,12 +1,14 @@
"""Tests for docker_stats module."""
from unittest.mock import MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from webterm.docker_stats import (
DEFAULT_DOCKER_SOCKET,
STATS_HISTORY_SIZE,
DockerStatsCollector,
get_docker_socket_path,
render_sparkline_svg,
)
@@ -92,6 +94,16 @@ class TestDockerStatsCollector:
socket_path.touch()
assert collector.available is False # File exists but can't connect
def test_get_docker_socket_path_env(self, monkeypatch):
monkeypatch.setenv("DOCKER_HOST", "unix:///tmp/custom.sock")
assert get_docker_socket_path() == "/tmp/custom.sock"
monkeypatch.setenv("DOCKER_HOST", "/tmp/alt.sock")
assert get_docker_socket_path() == "/tmp/alt.sock"
monkeypatch.setenv("DOCKER_HOST", "tcp://127.0.0.1:2375")
assert get_docker_socket_path() == DEFAULT_DOCKER_SOCKET
def test_get_cpu_history_empty(self):
"""Empty history returns empty list."""
collector = DockerStatsCollector("/nonexistent")
@@ -135,6 +147,22 @@ class TestDockerStatsCollector:
result = collector._calculate_cpu_percent("test", cpu_stats, precpu_stats)
assert result is None
def test_calculate_cpu_percent_uses_previous_stats(self):
collector = DockerStatsCollector("/nonexistent")
collector._prev_cpu["svc"] = (1000, 2000)
cpu_stats = {
"cpu_usage": {"total_usage": 2000},
"system_cpu_usage": 4000,
"online_cpus": 2,
}
precpu_stats = {
"cpu_usage": {"total_usage": 0},
"system_cpu_usage": 0,
}
result = collector._calculate_cpu_percent("svc", cpu_stats, precpu_stats)
assert result == 100.0
def test_start_without_socket(self, tmp_path):
"""Start does nothing if socket not available."""
collector = DockerStatsCollector(str(tmp_path / "nonexistent.sock"))
@@ -155,6 +183,61 @@ class TestDockerStatsCollector:
result = await collector._make_request("/test")
assert result is None
def test_parse_docker_response_parses_json(self):
collector = DockerStatsCollector("/nonexistent")
response = b'HTTP/1.0 200 OK\r\n\r\n{"ok": true}'
assert collector._parse_docker_response("/stats", response) == {"ok": True}
def test_parse_docker_response_filters_non_200(self):
collector = DockerStatsCollector("/nonexistent")
response = b'HTTP/1.0 404 Not Found\r\n\r\n{"ok": true}'
assert collector._parse_docker_response("/stats", response) is None
def test_parse_docker_response_finds_json_in_body(self):
collector = DockerStatsCollector("/nonexistent")
response = b'HTTP/1.0 200 OK\r\n\r\njunk\r\n{"ok": true}\r\n'
assert collector._parse_docker_response("/stats", response) == {"ok": True}
def test_parse_docker_response_invalid_json(self):
collector = DockerStatsCollector("/nonexistent")
response = b"HTTP/1.0 200 OK\r\n\r\n{bad json"
assert collector._parse_docker_response("/stats", response) is None
@pytest.mark.asyncio
async def test_discover_containers_maps_compose_services(self):
collector = DockerStatsCollector("/nonexistent", compose_project="demo")
collector._make_request = AsyncMock( # type: ignore[method-assign]
return_value=[
{
"Id": "abcdef1234567890",
"Names": ["/demo_web_1"],
"Labels": {
"com.docker.compose.project": "demo",
"com.docker.compose.service": "web",
},
}
]
)
mapping = await collector._discover_containers(["web"])
assert mapping == {"web": "abcdef123456"}
@pytest.mark.asyncio
async def test_discover_containers_falls_back_to_name(self):
collector = DockerStatsCollector("/nonexistent")
collector._make_request = AsyncMock( # type: ignore[method-assign]
return_value=[
{
"Id": "1234567890abcdef",
"Names": ["/api"],
"Labels": {},
}
]
)
mapping = await collector._discover_containers(["api"])
assert mapping == {"api": "1234567890ab"}
def test_cpu_history_max_size(self):
"""CPU history respects max size."""
from collections import deque
@@ -168,6 +251,27 @@ class TestDockerStatsCollector:
assert len(collector._cpu_history["test"]) == STATS_HISTORY_SIZE
@pytest.mark.asyncio
async def test_poll_container_appends_history(self):
collector = DockerStatsCollector("/nonexistent")
collector._make_request = AsyncMock( # type: ignore[method-assign]
return_value={
"cpu_stats": {"system_cpu_usage": 4000, "cpu_usage": {"total_usage": 2000}},
"precpu_stats": {
"system_cpu_usage": 2000,
"cpu_usage": {"total_usage": 1000},
},
}
)
with patch.object(
collector,
"_calculate_cpu_percent",
return_value=12.5,
):
await collector._poll_container("svc", "container")
assert collector.get_cpu_history("svc") == [12.5]
def test_add_service_dynamic(self):
"""Services can be added dynamically after start."""
collector = DockerStatsCollector("/nonexistent")
+5 -2
View File
@@ -330,11 +330,14 @@ class TestHandleEventWithThemeLabel:
async def mock_request(method, path):
if "/containers/" in path and "/json" in path:
import json
return 200, json.dumps({
return 200, json.dumps(
{
"Id": "abc123",
"Name": "/themed-container",
"Config": {"Labels": {THEME_LABEL: "monokai"}},
})
}
)
return 404, ""
watcher._docker_request = mock_request
+1 -1
View File
@@ -1,8 +1,8 @@
"""Tests for local_server module - unit tests for helper functions."""
import asyncio
from unittest.mock import AsyncMock, MagicMock
import asyncio
import pytest
from aiohttp import web
+15 -6
View File
@@ -57,27 +57,36 @@ class TestColorToHex:
def test_color_to_hex_uses_palette_defaults(self) -> None:
palette = {"red": "#123456"}
assert _color_to_hex(
assert (
_color_to_hex(
"default",
is_foreground=True,
palette=palette,
default_fg="#111111",
default_bg="#222222",
) == "#111111"
assert _color_to_hex(
)
== "#111111"
)
assert (
_color_to_hex(
"default",
is_foreground=False,
palette=palette,
default_fg="#111111",
default_bg="#222222",
) == "#222222"
assert _color_to_hex(
)
== "#222222"
)
assert (
_color_to_hex(
"red",
is_foreground=True,
palette=palette,
default_fg="#111111",
default_bg="#222222",
) == "#123456"
)
== "#123456"
)
class TestEscapeXml:
+1 -1
View File
@@ -4,7 +4,7 @@ from unittest.mock import AsyncMock, MagicMock
import pytest
from webterm.config import Config
from webterm.local_server import LocalServer, WS_SEND_TIMEOUT
from webterm.local_server import WS_SEND_TIMEOUT, LocalServer
@pytest.mark.asyncio