"""Docker container CPU stats via Unix socket.
Reads container stats from Docker socket using only asyncio and stdlib.
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import os
import socket
from collections import deque
from pathlib import Path
log = logging.getLogger("textual-webterm")
DEFAULT_DOCKER_SOCKET = "/var/run/docker.sock"
def get_docker_socket_path() -> str:
"""Get Docker socket path from DOCKER_HOST env var or default.
Supports unix:// scheme or plain path in DOCKER_HOST.
"""
docker_host = os.environ.get("DOCKER_HOST", "")
if docker_host:
if docker_host.startswith("unix://"):
return docker_host[7:] # Strip unix:// prefix
if docker_host.startswith("/"):
return docker_host
return DEFAULT_DOCKER_SOCKET
STATS_HISTORY_SIZE = 180 # Number of CPU readings to keep (30 min at 10s interval)
POLL_INTERVAL = 10.0 # Seconds between polls
class DockerStatsCollector:
"""Collects CPU stats from Docker containers via the Docker socket."""
def __init__(
self, socket_path: str | None = None, compose_project: str | None = None
) -> None:
self._socket_path = socket_path or get_docker_socket_path()
self._compose_project = compose_project
# container_name -> deque of CPU % values (0-100)
self._cpu_history: dict[str, deque[float]] = {}
self._running = False
self._task: asyncio.Task | None = None
# Track previous CPU values for delta calculation
self._prev_cpu: dict[str, tuple[int, int]] = {}
@property
def available(self) -> bool:
"""Check if Docker socket is available and accessible."""
path = Path(self._socket_path)
if not path.exists():
return False
# Also check we can actually connect
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(2.0)
sock.connect(self._socket_path)
sock.close()
return True
except (OSError, TimeoutError) as e:
log.warning("Docker socket exists but not accessible: %s", e)
return False
def get_cpu_history(self, container_name: str) -> list[float]:
"""Get CPU history for a container."""
if container_name not in self._cpu_history:
return []
return list(self._cpu_history[container_name])
async def _make_request(self, path: str) -> dict | list | None:
"""Make HTTP request to Docker socket."""
loop = asyncio.get_event_loop()
def _sync_request() -> bytes | None:
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.settimeout(10.0) # Increased timeout
sock.connect(self._socket_path)
# Use HTTP/1.0 to avoid chunked encoding
request = f"GET {path} HTTP/1.0\r\nHost: localhost\r\n\r\n"
sock.sendall(request.encode())
# Read response
chunks = []
while True:
chunk = sock.recv(8192)
if not chunk:
break
chunks.append(chunk)
sock.close()
return b"".join(chunks)
except (OSError, TimeoutError) as e:
log.debug("Socket error for %s: %s", path, e)
return None
response = await loop.run_in_executor(None, _sync_request)
if response is None:
log.debug("No response from Docker socket for %s", path)
return None
return self._parse_docker_response(path, response)
def _parse_docker_response(self, path: str, response: bytes) -> dict | list | None:
"""Parse HTTP response from Docker socket."""
try:
response_str = response.decode("utf-8", errors="replace")
# Split headers and body
if "\r\n\r\n" not in response_str:
return None
headers, body = response_str.split("\r\n\r\n", 1)
# Check for error status
first_line = headers.split("\r\n")[0]
if "200" not in first_line and "OK" not in first_line:
return None
body = body.strip()
# With HTTP/1.0, body should be plain JSON
if body.startswith("{") or body.startswith("["):
return json.loads(body)
# Fallback: try to find JSON in body
for line in body.split("\r\n"):
stripped = line.strip()
if stripped.startswith("{") or stripped.startswith("["):
return json.loads(stripped)
return None
except (json.JSONDecodeError, Exception):
return None
async def _discover_containers(self, service_names: list[str]) -> dict[str, str]:
"""Map service names to container IDs by querying Docker.
Returns:
Dict mapping service_name -> container_id
"""
# List all containers
containers = await self._make_request("/containers/json")
if not isinstance(containers, list):
return {}
mapping: dict[str, str] = {}
for container in containers:
if not isinstance(container, dict):
continue
container_id = container.get("Id", "")[:12] # Short ID
names = container.get("Names", [])
labels = container.get("Labels", {})
# Filter by compose project if specified
if self._compose_project:
project = labels.get("com.docker.compose.project", "")
if project != self._compose_project:
continue
# Check compose service label
service = labels.get("com.docker.compose.service", "")
if service in service_names:
mapping[service] = container_id
continue
# Fall back to container name matching
for name in names:
# Docker names start with /
clean_name = name.lstrip("/")
# Check if service name is part of container name
for svc in service_names:
if svc in clean_name or clean_name == svc:
mapping[svc] = container_id
break
if mapping:
log.debug("Discovered %d containers for stats (project=%s)", len(mapping), self._compose_project)
return mapping
def _calculate_cpu_percent(
self, container: str, cpu_stats: dict, precpu_stats: dict
) -> float | None:
"""Calculate CPU percentage from stats.
Formula: (cpu_delta / system_delta) * num_cpus * 100
"""
try:
cpu_usage = cpu_stats.get("cpu_usage", {})
precpu_usage = precpu_stats.get("cpu_usage", {})
cpu_total = cpu_usage.get("total_usage", 0)
precpu_total = precpu_usage.get("total_usage", 0)
system_cpu = cpu_stats.get("system_cpu_usage", 0)
presystem_cpu = precpu_stats.get("system_cpu_usage", 0)
# Use previous values if precpu_stats is empty (first read)
if precpu_total == 0 and container in self._prev_cpu:
precpu_total, presystem_cpu = self._prev_cpu[container]
# Store current values for next calculation
self._prev_cpu[container] = (cpu_total, system_cpu)
cpu_delta = cpu_total - precpu_total
system_delta = system_cpu - presystem_cpu
if system_delta <= 0 or cpu_delta < 0:
return None
# Get number of CPUs
online_cpus = cpu_stats.get("online_cpus")
if online_cpus is None:
percpu = cpu_usage.get("percpu_usage", [])
online_cpus = len(percpu) if percpu else 1
cpu_percent = (cpu_delta / system_delta) * online_cpus * 100.0
return min(cpu_percent, 100.0 * online_cpus) # Cap at max possible
except (KeyError, TypeError, ZeroDivisionError):
return None
async def _poll_container(self, service_name: str, container_id: str) -> None:
"""Poll stats for a single container."""
path = f"/containers/{container_id}/stats?stream=false"
stats = await self._make_request(path)
if not isinstance(stats, dict):
return
cpu_stats = stats.get("cpu_stats", {})
precpu_stats = stats.get("precpu_stats", {})
cpu_percent = self._calculate_cpu_percent(service_name, cpu_stats, precpu_stats)
if cpu_percent is not None:
if service_name not in self._cpu_history:
self._cpu_history[service_name] = deque(maxlen=STATS_HISTORY_SIZE)
self._cpu_history[service_name].append(cpu_percent)
async def _poll_loop(self, service_names: list[str]) -> None:
"""Background polling loop."""
# Discover container IDs on first run and periodically refresh
service_to_container: dict[str, str] = {}
refresh_counter = 0
warned_no_containers = False
while self._running:
# Refresh container mapping every 30 iterations (~5 minutes at 10s interval)
if refresh_counter % 30 == 0:
service_to_container = await self._discover_containers(service_names)
if not service_to_container and not warned_no_containers:
log.warning(
"No Docker containers found for CPU stats. "
"Ensure Docker socket is mounted (-v /var/run/docker.sock:/var/run/docker.sock)"
)
warned_no_containers = True
refresh_counter += 1
for service_name in service_names:
if not self._running:
break
container_id = service_to_container.get(service_name)
if not container_id:
continue
try:
await self._poll_container(service_name, container_id)
except Exception as e:
log.debug("Error polling stats for %s: %s", service_name, e)
await asyncio.sleep(POLL_INTERVAL)
def start(self, service_names: list[str]) -> None:
"""Start collecting stats for given service names."""
if not self.available:
log.debug("Docker socket not available at %s", self._socket_path)
return
if self._running:
return
self._running = True
self._task = asyncio.create_task(self._poll_loop(service_names))
log.info("Started Docker stats collection for %d services", len(service_names))
async def stop(self) -> None:
"""Stop collecting stats."""
self._running = False
if self._task:
self._task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self._task
self._task = None
def render_sparkline_svg(
values: list[float],
width: int = 100,
height: int = 20,
stroke_color: str = "#4ade80",
fill_color: str = "rgba(74, 222, 128, 0.2)",
) -> str:
"""Render a list of values as an SVG sparkline.
Args:
values: List of values to plot (0-100 range expected for CPU %)
width: SVG width in pixels
height: SVG height in pixels
stroke_color: Line color
fill_color: Fill color under the line
Returns:
SVG string
"""
if not values:
# Empty placeholder
return f''
# Prepend zero to start sparkline from baseline
values = [0.0, *values]
# Normalize values to 0-1 range
max_val = max(values) if max(values) > 0 else 1
normalized = [v / max_val for v in values]
# Calculate points
points = []
x_step = width / max(len(values) - 1, 1)
for i, v in enumerate(normalized):
x = i * x_step
y = height - (v * (height - 2)) - 1 # Leave 1px margin
points.append(f"{x:.1f},{y:.1f}")
path_line = " ".join(points)
# Create filled area path (line + close to bottom)
fill_points = [*points, f"{width},{height}", f"0,{height}"]
path_fill = " ".join(fill_points)
svg = f''''''
return svg