Adjust sparkline and screenshot timing

Sparklines:
- Poll interval: 2s -> 10s
- History size: 30 -> 180 readings
- Now shows 30 minutes of CPU history

Screenshots:
- Dashboard refresh interval: 15s -> 5s
- Combined with dirty tracking, updates on activity with 5s cap
This commit is contained in:
GitHub Copilot
2026-01-24 11:41:01 +00:00
parent 34aee378d9
commit 1ba4ce2a34
3 changed files with 87 additions and 28 deletions
+78 -22
View File
@@ -16,8 +16,8 @@ from pathlib import Path
log = logging.getLogger("textual-webterm") log = logging.getLogger("textual-webterm")
DOCKER_SOCKET = "/var/run/docker.sock" DOCKER_SOCKET = "/var/run/docker.sock"
STATS_HISTORY_SIZE = 30 # Number of CPU readings to keep STATS_HISTORY_SIZE = 180 # Number of CPU readings to keep (30 min at 10s interval)
POLL_INTERVAL = 2.0 # Seconds between polls POLL_INTERVAL = 10.0 # Seconds between polls
class DockerStatsCollector: class DockerStatsCollector:
@@ -43,7 +43,7 @@ class DockerStatsCollector:
return [] return []
return list(self._cpu_history[container_name]) return list(self._cpu_history[container_name])
async def _make_request(self, path: str) -> dict | None: async def _make_request(self, path: str) -> dict | list | None:
"""Make HTTP request to Docker socket.""" """Make HTTP request to Docker socket."""
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
@@ -81,14 +81,14 @@ class DockerStatsCollector:
else: else:
body = response_str body = response_str
# Handle chunked encoding - find the JSON object # Handle chunked encoding - find the JSON object/array
if body.startswith("{"): if body.startswith("{") or body.startswith("["):
json_str = body json_str = body
else: else:
# Skip chunk size line in chunked encoding # Skip chunk size line in chunked encoding
lines = body.split("\r\n") lines = body.split("\r\n")
for line in lines: for line in lines:
if line.startswith("{"): if line.startswith("{") or line.startswith("["):
json_str = line json_str = line
break break
else: else:
@@ -98,6 +98,44 @@ class DockerStatsCollector:
except (json.JSONDecodeError, ValueError): except (json.JSONDecodeError, ValueError):
return None return None
async def _discover_containers(self, service_names: list[str]) -> dict[str, str]:
"""Map service names to container IDs by querying Docker.
Returns:
Dict mapping service_name -> container_id
"""
# List all containers
containers = await self._make_request("/containers/json")
if not isinstance(containers, list):
return {}
mapping: dict[str, str] = {}
for container in containers:
if not isinstance(container, dict):
continue
container_id = container.get("Id", "")[:12] # Short ID
names = container.get("Names", [])
labels = container.get("Labels", {})
# Check compose service label
service = labels.get("com.docker.compose.service", "")
if service in service_names:
mapping[service] = container_id
continue
# Fall back to container name matching
for name in names:
# Docker names start with /
clean_name = name.lstrip("/")
# Check if service name is part of container name
for svc in service_names:
if svc in clean_name or clean_name == svc:
mapping[svc] = container_id
break
return mapping
def _calculate_cpu_percent( def _calculate_cpu_percent(
self, container: str, cpu_stats: dict, precpu_stats: dict self, container: str, cpu_stats: dict, precpu_stats: dict
) -> float | None: ) -> float | None:
@@ -139,38 +177,56 @@ class DockerStatsCollector:
except (KeyError, TypeError, ZeroDivisionError): except (KeyError, TypeError, ZeroDivisionError):
return None return None
async def _poll_container(self, container_name: str) -> None: async def _poll_container(self, service_name: str, container_id: str) -> None:
"""Poll stats for a single container.""" """Poll stats for a single container."""
# Docker API uses container name without leading slash path = f"/containers/{container_id}/stats?stream=false"
path = f"/containers/{container_name}/stats?stream=false"
stats = await self._make_request(path) stats = await self._make_request(path)
if stats is None: if not isinstance(stats, dict):
return return
cpu_stats = stats.get("cpu_stats", {}) cpu_stats = stats.get("cpu_stats", {})
precpu_stats = stats.get("precpu_stats", {}) precpu_stats = stats.get("precpu_stats", {})
cpu_percent = self._calculate_cpu_percent(container_name, cpu_stats, precpu_stats) cpu_percent = self._calculate_cpu_percent(service_name, cpu_stats, precpu_stats)
if cpu_percent is not None: if cpu_percent is not None:
if container_name not in self._cpu_history: if service_name not in self._cpu_history:
self._cpu_history[container_name] = deque(maxlen=STATS_HISTORY_SIZE) self._cpu_history[service_name] = deque(maxlen=STATS_HISTORY_SIZE)
self._cpu_history[container_name].append(cpu_percent) self._cpu_history[service_name].append(cpu_percent)
async def _poll_loop(self, containers: list[str]) -> None: async def _poll_loop(self, service_names: list[str]) -> None:
"""Background polling loop.""" """Background polling loop."""
# Discover container IDs on first run and periodically refresh
service_to_container: dict[str, str] = {}
refresh_counter = 0
while self._running: while self._running:
for container in containers: # Refresh container mapping every 30 iterations (~60 seconds)
if refresh_counter % 30 == 0:
service_to_container = await self._discover_containers(service_names)
if service_to_container:
log.debug(
"Discovered containers: %s",
", ".join(f"{k}={v}" for k, v in service_to_container.items()),
)
refresh_counter += 1
for service_name in service_names:
if not self._running: if not self._running:
break break
container_id = service_to_container.get(service_name)
if not container_id:
continue
try: try:
await self._poll_container(container) await self._poll_container(service_name, container_id)
except Exception: except Exception:
log.debug("Error polling stats for %s", container) log.debug("Error polling stats for %s", service_name)
await asyncio.sleep(POLL_INTERVAL) await asyncio.sleep(POLL_INTERVAL)
def start(self, containers: list[str]) -> None: def start(self, service_names: list[str]) -> None:
"""Start collecting stats for given containers.""" """Start collecting stats for given service names."""
if not self.available: if not self.available:
log.debug("Docker socket not available at %s", self._socket_path) log.debug("Docker socket not available at %s", self._socket_path)
return return
@@ -179,8 +235,8 @@ class DockerStatsCollector:
return return
self._running = True self._running = True
self._task = asyncio.create_task(self._poll_loop(containers)) self._task = asyncio.create_task(self._poll_loop(service_names))
log.info("Started Docker stats collection for %d containers", len(containers)) log.info("Started Docker stats collection for %d services", len(service_names))
async def stop(self) -> None: async def stop(self) -> None:
"""Stop collecting stats.""" """Stop collecting stats."""
+1 -1
View File
@@ -788,7 +788,7 @@ class LocalServer:
function startRefresh() {{ function startRefresh() {{
if (refreshTimer !== null) return; if (refreshTimer !== null) return;
refresh(); refresh();
refreshTimer = setInterval(refresh, 15000); refreshTimer = setInterval(refresh, 5000);
}} }}
function stopRefresh() {{ function stopRefresh() {{
if (refreshTimer === null) return; if (refreshTimer === null) return;
+8 -5
View File
@@ -1,11 +1,13 @@
"""Tests for docker_stats module.""" """Tests for docker_stats module."""
from unittest.mock import MagicMock
import pytest import pytest
from unittest.mock import MagicMock, patch, AsyncMock
from textual_webterm.docker_stats import ( from textual_webterm.docker_stats import (
STATS_HISTORY_SIZE,
DockerStatsCollector, DockerStatsCollector,
render_sparkline_svg, render_sparkline_svg,
STATS_HISTORY_SIZE,
) )
@@ -165,8 +167,9 @@ class TestLocalServerSparklineEndpoint:
async def test_sparkline_endpoint_missing_container(self): async def test_sparkline_endpoint_missing_container(self):
"""Missing container param returns 400.""" """Missing container param returns 400."""
from aiohttp.web import HTTPBadRequest from aiohttp.web import HTTPBadRequest
from textual_webterm.local_server import LocalServer
from textual_webterm.config import Config from textual_webterm.config import Config
from textual_webterm.local_server import LocalServer
server = LocalServer("./", Config(), compose_mode=True) server = LocalServer("./", Config(), compose_mode=True)
@@ -179,8 +182,8 @@ class TestLocalServerSparklineEndpoint:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_sparkline_endpoint_returns_svg(self): async def test_sparkline_endpoint_returns_svg(self):
"""Sparkline endpoint returns SVG.""" """Sparkline endpoint returns SVG."""
from textual_webterm.local_server import LocalServer
from textual_webterm.config import Config from textual_webterm.config import Config
from textual_webterm.local_server import LocalServer
server = LocalServer("./", Config(), compose_mode=True) server = LocalServer("./", Config(), compose_mode=True)
@@ -194,8 +197,8 @@ class TestLocalServerSparklineEndpoint:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_sparkline_with_stats_collector(self): async def test_sparkline_with_stats_collector(self):
"""Sparkline uses stats collector data when available.""" """Sparkline uses stats collector data when available."""
from textual_webterm.local_server import LocalServer
from textual_webterm.config import Config from textual_webterm.config import Config
from textual_webterm.local_server import LocalServer
server = LocalServer("./", Config(), compose_mode=True) server = LocalServer("./", Config(), compose_mode=True)
server._docker_stats = MagicMock() server._docker_stats = MagicMock()