Add Docker exec PTY sessions
This commit is contained in:
@@ -21,6 +21,8 @@ build-all: clean-all node_modules build install-dev check ## Full reproducible b
|
||||
# =============================================================================
|
||||
# Python targets
|
||||
# =============================================================================
|
||||
# NOTE: Install dev tools (ruff/pytest/etc.) using `pip install --user --break-system-packages`
|
||||
# from the dev dependency list (pyproject.toml or requirements-dev.txt).
|
||||
|
||||
install: ## Install package in editable mode
|
||||
$(PIP) install -e .
|
||||
|
||||
@@ -101,7 +101,7 @@ webterm --docker-watch
|
||||
|
||||
When a container starts with the label, it automatically appears in the dashboard. When it stops, it's removed. Label values:
|
||||
|
||||
- `webterm-command: auto` - Runs `docker exec -it <container> /bin/bash` (override with `WEBTERM_DOCKER_AUTO_COMMAND`)
|
||||
- `webterm-command: auto` (or empty) - Opens a PTY via Docker exec API (override with `WEBTERM_DOCKER_AUTO_COMMAND`)
|
||||
- `webterm-command: <command>` - Runs the specified command
|
||||
- `webterm-theme: <theme>` - Sets the terminal theme for that container (xterm, monokai, dark, light, dracula, catppuccin, nord, gruvbox, solarized, tokyo)
|
||||
|
||||
|
||||
@@ -0,0 +1,343 @@
|
||||
"""Docker exec-based terminal session using Docker API and socket."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import contextlib
|
||||
import json
|
||||
import logging
|
||||
import socket
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import pyte
|
||||
|
||||
from .docker_stats import get_docker_socket_path
|
||||
from .session import Session, SessionConnector
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .poller import Poller
|
||||
from .types import Meta, SessionID
|
||||
|
||||
|
||||
log = logging.getLogger("webterm")
|
||||
|
||||
REPLAY_BUFFER_SIZE = 256 * 1024 # 256KB
|
||||
DEFAULT_SCREEN_WIDTH = 132
|
||||
DEFAULT_SCREEN_HEIGHT = 45
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class DockerExecSpec:
|
||||
container: str
|
||||
command: list[str]
|
||||
|
||||
|
||||
class DockerExecSession(Session):
|
||||
"""Terminal session backed by Docker exec API."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
poller: Poller,
|
||||
session_id: SessionID,
|
||||
exec_spec: DockerExecSpec,
|
||||
socket_path: str | None = None,
|
||||
) -> None:
|
||||
self.poller = poller
|
||||
self.session_id = session_id
|
||||
self.exec_spec = exec_spec
|
||||
self._socket_path = socket_path or get_docker_socket_path()
|
||||
self.master_fd: int | None = None
|
||||
self._sock: socket.socket | None = None
|
||||
self._task: asyncio.Task | None = None
|
||||
self._connector = SessionConnector()
|
||||
self._replay_buffer: deque[bytes] = deque()
|
||||
self._replay_buffer_size = 0
|
||||
self._replay_lock = asyncio.Lock()
|
||||
self._screen = pyte.Screen(DEFAULT_SCREEN_WIDTH, DEFAULT_SCREEN_HEIGHT)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
self._screen_lock = asyncio.Lock()
|
||||
self._last_width = DEFAULT_SCREEN_WIDTH
|
||||
self._last_height = DEFAULT_SCREEN_HEIGHT
|
||||
self._change_counter = 0
|
||||
self._last_snapshot_counter = 0
|
||||
self._exec_id: str | None = None
|
||||
self._pending_output = b""
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return (
|
||||
"DockerExecSession(session_id="
|
||||
f"{self.session_id!r}, container={self.exec_spec.container!r})"
|
||||
)
|
||||
|
||||
def _read_http_response(self, sock: socket.socket) -> tuple[int, dict, bytes]:
|
||||
sock.settimeout(10.0)
|
||||
data = b""
|
||||
while b"\r\n\r\n" not in data:
|
||||
chunk = sock.recv(4096)
|
||||
if not chunk:
|
||||
break
|
||||
data += chunk
|
||||
if b"\r\n\r\n" not in data:
|
||||
return 0, {}, b""
|
||||
header_bytes, body = data.split(b"\r\n\r\n", 1)
|
||||
headers = header_bytes.decode("utf-8", errors="replace").split("\r\n")
|
||||
status_line = headers[0] if headers else ""
|
||||
status = 0
|
||||
if status_line:
|
||||
parts = status_line.split()
|
||||
if len(parts) >= 2:
|
||||
try:
|
||||
status = int(parts[1])
|
||||
except ValueError:
|
||||
status = 0
|
||||
header_map: dict[str, str] = {}
|
||||
for header in headers[1:]:
|
||||
if ":" not in header:
|
||||
continue
|
||||
key, value = header.split(":", 1)
|
||||
header_map[key.strip().lower()] = value.strip()
|
||||
if "content-length" in header_map:
|
||||
try:
|
||||
length = int(header_map["content-length"])
|
||||
except ValueError:
|
||||
length = 0
|
||||
remaining = length - len(body)
|
||||
while remaining > 0:
|
||||
chunk = sock.recv(min(4096, remaining))
|
||||
if not chunk:
|
||||
break
|
||||
body += chunk
|
||||
remaining -= len(chunk)
|
||||
return status, header_map, body
|
||||
|
||||
def _request_json(self, method: str, path: str, payload: dict | None = None) -> dict:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.connect(self._socket_path)
|
||||
body = json.dumps(payload or {}).encode("utf-8") if payload is not None else b""
|
||||
headers = [
|
||||
f"{method} {path} HTTP/1.1",
|
||||
"Host: localhost",
|
||||
]
|
||||
if payload is not None:
|
||||
headers.append("Content-Type: application/json")
|
||||
headers.append(f"Content-Length: {len(body)}")
|
||||
headers.append("")
|
||||
headers.append("")
|
||||
request = "\r\n".join(headers).encode("utf-8") + body
|
||||
sock.sendall(request)
|
||||
status, _headers, body_bytes = self._read_http_response(sock)
|
||||
sock.close()
|
||||
if status < 200 or status >= 300:
|
||||
detail = body_bytes.decode("utf-8", errors="replace")
|
||||
raise RuntimeError(f"Docker API request failed ({status}): {detail}")
|
||||
if not body_bytes:
|
||||
return {}
|
||||
try:
|
||||
return json.loads(body_bytes.decode("utf-8", errors="replace"))
|
||||
except json.JSONDecodeError as exc:
|
||||
raise RuntimeError("Docker API returned invalid JSON") from exc
|
||||
|
||||
def _create_exec(self) -> str:
|
||||
payload = {
|
||||
"AttachStdin": True,
|
||||
"AttachStdout": True,
|
||||
"AttachStderr": True,
|
||||
"Tty": True,
|
||||
"Cmd": self.exec_spec.command,
|
||||
}
|
||||
response = self._request_json("POST", f"/containers/{self.exec_spec.container}/exec", payload)
|
||||
exec_id = response.get("Id")
|
||||
if not isinstance(exec_id, str) or not exec_id:
|
||||
raise RuntimeError("Docker API did not return exec ID")
|
||||
return exec_id
|
||||
|
||||
def _start_exec_socket(self, exec_id: str) -> socket.socket:
|
||||
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
sock.connect(self._socket_path)
|
||||
payload = json.dumps({"Detach": False, "Tty": True}).encode("utf-8")
|
||||
headers = [
|
||||
f"POST /exec/{exec_id}/start HTTP/1.1",
|
||||
"Host: localhost",
|
||||
"Content-Type: application/json",
|
||||
f"Content-Length: {len(payload)}",
|
||||
"Connection: Upgrade",
|
||||
"Upgrade: tcp",
|
||||
"",
|
||||
"",
|
||||
]
|
||||
sock.sendall("\r\n".join(headers).encode("utf-8") + payload)
|
||||
status, _headers, body = self._read_http_response(sock)
|
||||
if status not in (101,) and (status < 200 or status >= 300):
|
||||
sock.close()
|
||||
detail = body.decode("utf-8", errors="replace")
|
||||
raise RuntimeError(f"Docker API exec start failed ({status}): {detail}")
|
||||
if body:
|
||||
self._pending_output += body
|
||||
sock.settimeout(None)
|
||||
return sock
|
||||
|
||||
def _resize_exec(self, width: int, height: int) -> None:
|
||||
assert self._exec_id is not None
|
||||
path = f"/exec/{self._exec_id}/resize?h={height}&w={width}"
|
||||
self._request_json("POST", path, None)
|
||||
|
||||
async def open(self, width: int = 80, height: int = 24) -> None:
|
||||
log.info(
|
||||
"Opening Docker exec session %s for %s",
|
||||
self.session_id,
|
||||
self.exec_spec.container,
|
||||
)
|
||||
self._last_width = width
|
||||
self._last_height = height
|
||||
async with self._screen_lock:
|
||||
self._screen = pyte.Screen(width, height)
|
||||
self._stream = pyte.Stream(self._screen)
|
||||
exec_id = await asyncio.to_thread(self._create_exec)
|
||||
self._exec_id = exec_id
|
||||
self._sock = await asyncio.to_thread(self._start_exec_socket, exec_id)
|
||||
self.master_fd = self._sock.fileno()
|
||||
await asyncio.to_thread(self._resize_exec, width, height)
|
||||
|
||||
async def set_terminal_size(self, width: int, height: int) -> None:
|
||||
self._last_width = width
|
||||
self._last_height = height
|
||||
async with self._screen_lock:
|
||||
self._screen.resize(height, width)
|
||||
self._change_counter += 1
|
||||
if self._exec_id:
|
||||
await asyncio.to_thread(self._resize_exec, width, height)
|
||||
|
||||
async def force_redraw(self) -> None:
|
||||
await self.set_terminal_size(self._last_width, self._last_height)
|
||||
|
||||
async def _add_to_replay_buffer(self, data: bytes) -> None:
|
||||
async with self._replay_lock:
|
||||
self._replay_buffer.append(data)
|
||||
self._replay_buffer_size += len(data)
|
||||
while self._replay_buffer_size > REPLAY_BUFFER_SIZE and self._replay_buffer:
|
||||
old = self._replay_buffer.popleft()
|
||||
self._replay_buffer_size -= len(old)
|
||||
|
||||
async def _update_screen(self, data: bytes) -> None:
|
||||
async with self._screen_lock:
|
||||
try:
|
||||
text = data.decode("utf-8", errors="replace")
|
||||
self._stream.feed(text)
|
||||
if self._screen.dirty:
|
||||
self._change_counter += 1
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def _drain_pending_output(self) -> None:
|
||||
if not self._pending_output:
|
||||
return
|
||||
data = self._pending_output
|
||||
self._pending_output = b""
|
||||
await self._add_to_replay_buffer(data)
|
||||
await self._update_screen(data)
|
||||
if self._connector:
|
||||
await self._connector.on_data(data)
|
||||
|
||||
async def get_replay_buffer(self) -> bytes:
|
||||
async with self._replay_lock:
|
||||
return b"".join(self._replay_buffer)
|
||||
|
||||
async def get_screen_lines(self) -> list[str]:
|
||||
async with self._screen_lock:
|
||||
return [line.rstrip() for line in self._screen.display]
|
||||
|
||||
async def get_screen_snapshot(self) -> tuple[int, int, list, bool]:
|
||||
async with self._screen_lock:
|
||||
width = self._screen.columns
|
||||
height = self._screen.lines
|
||||
has_changes = self._change_counter > self._last_snapshot_counter
|
||||
self._last_snapshot_counter = self._change_counter
|
||||
snapshot = [
|
||||
[self._screen.buffer[row][col] for col in range(width)] for row in range(height)
|
||||
]
|
||||
|
||||
buffer = []
|
||||
for row_data in snapshot:
|
||||
row_chars = []
|
||||
for char in row_data:
|
||||
row_chars.append(
|
||||
{
|
||||
"data": char.data if char.data else " ",
|
||||
"fg": char.fg,
|
||||
"bg": char.bg,
|
||||
"bold": char.bold,
|
||||
"italics": char.italics,
|
||||
"underscore": char.underscore,
|
||||
"reverse": char.reverse,
|
||||
}
|
||||
)
|
||||
buffer.append(row_chars)
|
||||
return (width, height, buffer, has_changes)
|
||||
|
||||
def update_connector(self, connector: SessionConnector) -> None:
|
||||
self._connector = connector
|
||||
|
||||
async def start(self, connector: SessionConnector) -> asyncio.Task:
|
||||
self._connector = connector
|
||||
if self.master_fd is None:
|
||||
raise RuntimeError("Docker exec session not opened")
|
||||
if self._task is not None:
|
||||
return self._task
|
||||
self._task = asyncio.create_task(self.run())
|
||||
return self._task
|
||||
|
||||
async def run(self) -> None:
|
||||
assert self.master_fd is not None
|
||||
queue = self.poller.add_file(self.master_fd)
|
||||
try:
|
||||
await self._drain_pending_output()
|
||||
while True:
|
||||
data = await queue.get()
|
||||
if not data:
|
||||
break
|
||||
await self._add_to_replay_buffer(data)
|
||||
await self._update_screen(data)
|
||||
if self._connector:
|
||||
await self._connector.on_data(data)
|
||||
except OSError:
|
||||
log.exception("error in docker exec session run")
|
||||
finally:
|
||||
if self._connector:
|
||||
await self._connector.on_close()
|
||||
if self.master_fd is not None:
|
||||
fd = self.master_fd
|
||||
self.master_fd = None
|
||||
self.poller.remove_file(fd)
|
||||
if self._sock is not None:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
|
||||
async def send_bytes(self, data: bytes) -> bool:
|
||||
fd = self.master_fd
|
||||
if fd is None:
|
||||
return False
|
||||
try:
|
||||
await self.poller.write(fd, data)
|
||||
except (KeyError, OSError):
|
||||
return False
|
||||
return True
|
||||
|
||||
async def send_meta(self, data: Meta) -> bool:
|
||||
return True
|
||||
|
||||
async def close(self) -> None:
|
||||
if self._task is not None and not self._task.done():
|
||||
self._task.cancel()
|
||||
if self._sock is not None:
|
||||
self._sock.close()
|
||||
self._sock = None
|
||||
|
||||
async def wait(self, timeout: float = 2.0) -> None:
|
||||
if self._task is not None:
|
||||
with contextlib.suppress(asyncio.CancelledError, TimeoutError):
|
||||
await asyncio.wait_for(asyncio.shield(self._task), timeout=timeout)
|
||||
|
||||
def is_running(self) -> bool:
|
||||
return not (self.master_fd is None or self._task is None)
|
||||
@@ -24,12 +24,20 @@ LABEL_NAME = "webterm-command"
|
||||
THEME_LABEL = "webterm-theme"
|
||||
AUTO_COMMAND_ENV = "WEBTERM_DOCKER_AUTO_COMMAND"
|
||||
DEFAULT_COMMAND = "/bin/bash"
|
||||
AUTO_COMMAND_SENTINEL = "__docker_exec__"
|
||||
|
||||
|
||||
def _get_auto_command() -> str:
|
||||
return os.environ.get(AUTO_COMMAND_ENV, DEFAULT_COMMAND)
|
||||
|
||||
|
||||
def _is_auto_label(value: str | None) -> bool:
|
||||
if value is None:
|
||||
return True
|
||||
stripped = value.strip()
|
||||
return stripped == "" or stripped.lower() == "auto"
|
||||
|
||||
|
||||
class DockerWatcher:
|
||||
"""Watch Docker events and manage terminal sessions dynamically."""
|
||||
|
||||
@@ -123,11 +131,10 @@ class DockerWatcher:
|
||||
If label is 'auto', returns default exec command.
|
||||
"""
|
||||
labels = container.get("Labels", {})
|
||||
label_value = labels.get(LABEL_NAME, "auto")
|
||||
label_value = labels.get(LABEL_NAME)
|
||||
|
||||
if label_value.lower() == "auto":
|
||||
container_name = self._get_container_name(container)
|
||||
return f"docker exec -it {container_name} {_get_auto_command()}"
|
||||
if _is_auto_label(label_value):
|
||||
return AUTO_COMMAND_SENTINEL
|
||||
return label_value
|
||||
|
||||
def _get_container_theme(self, container: dict) -> str | None:
|
||||
|
||||
@@ -2,11 +2,14 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import shlex
|
||||
import sys
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from . import config, constants
|
||||
from ._two_way_dict import TwoWayDict
|
||||
from .docker_exec_session import DockerExecSession, DockerExecSpec
|
||||
from .docker_watcher import AUTO_COMMAND_SENTINEL, _get_auto_command
|
||||
from .identity import generate
|
||||
|
||||
if TYPE_CHECKING:
|
||||
@@ -133,12 +136,18 @@ class SessionManager:
|
||||
if constants.WINDOWS:
|
||||
log.warning("Sorry, webterm does not currently support terminals on Windows")
|
||||
return None
|
||||
|
||||
session_process = TerminalSession(
|
||||
self.poller,
|
||||
session_id,
|
||||
app.command,
|
||||
)
|
||||
if app.command == AUTO_COMMAND_SENTINEL:
|
||||
exec_spec = DockerExecSpec(
|
||||
container=app.name,
|
||||
command=shlex.split(_get_auto_command()),
|
||||
)
|
||||
session_process = DockerExecSession(self.poller, session_id, exec_spec)
|
||||
else:
|
||||
session_process = TerminalSession(
|
||||
self.poller,
|
||||
session_id,
|
||||
app.command,
|
||||
)
|
||||
log.info("Created terminal session %s", session_id)
|
||||
|
||||
# Open the session BEFORE registering it, so it's fully initialized
|
||||
|
||||
@@ -6,7 +6,12 @@ from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from webterm.docker_watcher import LABEL_NAME, THEME_LABEL, DockerWatcher, _get_auto_command
|
||||
from webterm.docker_watcher import (
|
||||
AUTO_COMMAND_SENTINEL,
|
||||
LABEL_NAME,
|
||||
THEME_LABEL,
|
||||
DockerWatcher,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -56,8 +61,7 @@ class TestDockerWatcher:
|
||||
def test_get_container_command_auto(self, docker_watcher):
|
||||
"""Test command generation when label is 'auto'."""
|
||||
container = {"Names": ["/my-container"], "Labels": {LABEL_NAME: "auto"}}
|
||||
expected = f"docker exec -it my-container {_get_auto_command()}"
|
||||
assert docker_watcher._get_container_command(container) == expected
|
||||
assert docker_watcher._get_container_command(container) == AUTO_COMMAND_SENTINEL
|
||||
|
||||
def test_get_container_command_custom(self, docker_watcher):
|
||||
"""Test command when label has custom value."""
|
||||
@@ -93,7 +97,7 @@ class TestDockerWatcher:
|
||||
session_manager.add_app.assert_called_once()
|
||||
call_args = session_manager.add_app.call_args
|
||||
assert call_args[0][0] == "test-container" # name
|
||||
assert "docker exec -it test-container" in call_args[0][1] # command
|
||||
assert call_args[0][1] == AUTO_COMMAND_SENTINEL # command
|
||||
assert call_args[0][2] == "test-container" # slug
|
||||
assert call_args[1]["terminal"] is True
|
||||
assert call_args[1]["theme"] == "monokai"
|
||||
@@ -235,8 +239,9 @@ class TestDockerWatcherIntegration:
|
||||
("labels", "expected"),
|
||||
[
|
||||
({"webterm-command": "echo hi"}, "echo hi"),
|
||||
({"webterm-command": "auto"}, f"docker exec -it my-container {_get_auto_command()}"),
|
||||
({"other": "value"}, f"docker exec -it my-container {_get_auto_command()}"),
|
||||
({"webterm-command": "auto"}, AUTO_COMMAND_SENTINEL),
|
||||
({"webterm-command": ""}, AUTO_COMMAND_SENTINEL),
|
||||
({"other": "value"}, AUTO_COMMAND_SENTINEL),
|
||||
],
|
||||
)
|
||||
def test_get_container_command_variants(docker_watcher, labels, expected):
|
||||
@@ -247,7 +252,7 @@ def test_get_container_command_variants(docker_watcher, labels, expected):
|
||||
def test_auto_command_env_override(monkeypatch, docker_watcher):
|
||||
monkeypatch.setenv("WEBTERM_DOCKER_AUTO_COMMAND", "/bin/sh")
|
||||
container = {"Names": ["/my-container"], "Labels": {LABEL_NAME: "auto"}}
|
||||
assert docker_watcher._get_container_command(container) == "docker exec -it my-container /bin/sh"
|
||||
assert docker_watcher._get_container_command(container) == AUTO_COMMAND_SENTINEL
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -6,6 +6,7 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
||||
import pytest
|
||||
|
||||
from webterm.config import App
|
||||
from webterm.docker_watcher import AUTO_COMMAND_SENTINEL
|
||||
from webterm.session_manager import SessionManager
|
||||
from webterm.types import RouteKey, SessionID
|
||||
|
||||
@@ -190,6 +191,30 @@ class TestSessionManager:
|
||||
assert SessionID("test-session") in manager.sessions
|
||||
assert RouteKey("test-route") in manager.routes
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.skipif(platform.system() == "Windows", reason="Terminal not supported on Windows")
|
||||
async def test_new_docker_exec_session(self, mock_poller, mock_path):
|
||||
from webterm.docker_exec_session import DockerExecSession
|
||||
|
||||
app = App(
|
||||
name="my-container",
|
||||
slug="my-container",
|
||||
path="./",
|
||||
command=AUTO_COMMAND_SENTINEL,
|
||||
terminal=True,
|
||||
)
|
||||
manager = SessionManager(mock_poller, mock_path, [app])
|
||||
|
||||
with patch.object(DockerExecSession, "open", new_callable=AsyncMock):
|
||||
result = await manager.new_session(
|
||||
"my-container",
|
||||
SessionID("test-session"),
|
||||
RouteKey("test-route"),
|
||||
)
|
||||
|
||||
assert result is not None
|
||||
assert isinstance(result, DockerExecSession)
|
||||
|
||||
class TestSessionManagerRoutes:
|
||||
"""Tests for SessionManager route handling."""
|
||||
|
||||
|
||||
Reference in New Issue
Block a user