Use SSE for real-time screenshot updates
- New /events SSE endpoint pushes activity notifications to browsers - Dashboard subscribes to SSE stream instead of polling - Screenshots refresh instantly when terminal activity occurs - Sparklines still poll every 30s (appropriate for 30min history) - SSE includes keepalive every 30s and auto-reconnect on error - Removes inefficient 5s polling; updates only on actual changes
This commit is contained in:
@@ -148,6 +148,14 @@ def _rewrite_svg_fonts(svg: str) -> str:
|
|||||||
class LocalServer:
|
class LocalServer:
|
||||||
def mark_route_activity(self, route_key: str) -> None:
|
def mark_route_activity(self, route_key: str) -> None:
|
||||||
self._route_last_activity[route_key] = asyncio.get_event_loop().time()
|
self._route_last_activity[route_key] = asyncio.get_event_loop().time()
|
||||||
|
# Notify SSE subscribers of activity
|
||||||
|
self._notify_activity(route_key)
|
||||||
|
|
||||||
|
def _notify_activity(self, route_key: str) -> None:
|
||||||
|
"""Notify SSE subscribers that a route has activity."""
|
||||||
|
for queue in self._sse_subscribers:
|
||||||
|
with contextlib.suppress(asyncio.QueueFull):
|
||||||
|
queue.put_nowait(route_key)
|
||||||
|
|
||||||
def _get_cached_screenshot_response(
|
def _get_cached_screenshot_response(
|
||||||
self, request: web.Request, route_key: str
|
self, request: web.Request, route_key: str
|
||||||
@@ -215,6 +223,9 @@ class LocalServer:
|
|||||||
self._screenshot_locks: dict[str, asyncio.Lock] = {}
|
self._screenshot_locks: dict[str, asyncio.Lock] = {}
|
||||||
self._route_last_activity: dict[str, float] = {}
|
self._route_last_activity: dict[str, float] = {}
|
||||||
|
|
||||||
|
# SSE subscribers for activity notifications
|
||||||
|
self._sse_subscribers: list[asyncio.Queue[str]] = []
|
||||||
|
|
||||||
# Docker stats collector (only used in compose mode)
|
# Docker stats collector (only used in compose mode)
|
||||||
self._docker_stats: DockerStatsCollector | None = None
|
self._docker_stats: DockerStatsCollector | None = None
|
||||||
|
|
||||||
@@ -298,6 +309,7 @@ class LocalServer:
|
|||||||
web.get("/ws/{route_key}", self._handle_websocket),
|
web.get("/ws/{route_key}", self._handle_websocket),
|
||||||
web.get("/screenshot.svg", self._handle_screenshot),
|
web.get("/screenshot.svg", self._handle_screenshot),
|
||||||
web.get("/cpu-sparkline.svg", self._handle_cpu_sparkline),
|
web.get("/cpu-sparkline.svg", self._handle_cpu_sparkline),
|
||||||
|
web.get("/events", self._handle_sse),
|
||||||
web.get("/health", self._handle_health_check),
|
web.get("/health", self._handle_health_check),
|
||||||
web.get("/", self._handle_root),
|
web.get("/", self._handle_root),
|
||||||
]
|
]
|
||||||
@@ -656,6 +668,40 @@ class LocalServer:
|
|||||||
headers = {"Cache-Control": "no-cache, max-age=0"}
|
headers = {"Cache-Control": "no-cache, max-age=0"}
|
||||||
return web.Response(text=svg, content_type="image/svg+xml", headers=headers)
|
return web.Response(text=svg, content_type="image/svg+xml", headers=headers)
|
||||||
|
|
||||||
|
async def _handle_sse(self, request: web.Request) -> web.StreamResponse:
|
||||||
|
"""Server-Sent Events endpoint for activity notifications."""
|
||||||
|
response = web.StreamResponse(
|
||||||
|
status=200,
|
||||||
|
reason="OK",
|
||||||
|
headers={
|
||||||
|
"Content-Type": "text/event-stream",
|
||||||
|
"Cache-Control": "no-cache",
|
||||||
|
"Connection": "keep-alive",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await response.prepare(request)
|
||||||
|
|
||||||
|
# Create queue for this subscriber
|
||||||
|
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=100)
|
||||||
|
self._sse_subscribers.append(queue)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
# Wait for activity with timeout for keepalive
|
||||||
|
route_key = await asyncio.wait_for(queue.get(), timeout=30.0)
|
||||||
|
# Send activity event
|
||||||
|
await response.write(f"event: activity\ndata: {route_key}\n\n".encode())
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
# Send keepalive comment
|
||||||
|
await response.write(b": keepalive\n\n")
|
||||||
|
except (ConnectionResetError, ConnectionAbortedError):
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
self._sse_subscribers.remove(queue)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
async def _handle_health_check(self, _request: web.Request) -> web.Response:
|
async def _handle_health_check(self, _request: web.Request) -> web.Response:
|
||||||
return web.Response(text="Local server is running")
|
return web.Response(text="Local server is running")
|
||||||
|
|
||||||
@@ -772,36 +818,80 @@ class LocalServer:
|
|||||||
}}
|
}}
|
||||||
const grid = document.getElementById('grid');
|
const grid = document.getElementById('grid');
|
||||||
const cards = tiles.map(makeTile);
|
const cards = tiles.map(makeTile);
|
||||||
cards.forEach(c => grid.appendChild(c));
|
const cardsBySlug = {{}};
|
||||||
async function refresh() {{
|
cards.forEach((c, i) => {{
|
||||||
for (const card of cards) {{
|
grid.appendChild(c);
|
||||||
const tile = tiles[cards.indexOf(card)];
|
cardsBySlug[tiles[i].slug] = c;
|
||||||
const url = `/screenshot.svg?route_key=${{encodeURIComponent(tile.slug)}}`;
|
}});
|
||||||
card.img.src = url;
|
|
||||||
if (composeMode && card.sparkline) {{
|
// Refresh a single tile's screenshot
|
||||||
|
function refreshTile(slug) {{
|
||||||
|
const card = cardsBySlug[slug];
|
||||||
|
if (!card) return;
|
||||||
|
card.img.src = `/screenshot.svg?route_key=${{encodeURIComponent(slug)}}&_t=${{Date.now()}}`;
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Refresh all screenshots (initial load)
|
||||||
|
function refreshAll() {{
|
||||||
|
for (const tile of tiles) {{
|
||||||
|
const card = cardsBySlug[tile.slug];
|
||||||
|
card.img.src = `/screenshot.svg?route_key=${{encodeURIComponent(tile.slug)}}`;
|
||||||
|
}}
|
||||||
|
}}
|
||||||
|
|
||||||
|
// Refresh sparklines periodically (CPU stats don't need SSE)
|
||||||
|
function refreshSparklines() {{
|
||||||
|
if (!composeMode) return;
|
||||||
|
for (const tile of tiles) {{
|
||||||
|
const card = cardsBySlug[tile.slug];
|
||||||
|
if (card.sparkline) {{
|
||||||
card.sparkline.src = `/cpu-sparkline.svg?container=${{encodeURIComponent(tile.slug)}}&width=80&height=16&_t=${{Date.now()}}`;
|
card.sparkline.src = `/cpu-sparkline.svg?container=${{encodeURIComponent(tile.slug)}}&width=80&height=16&_t=${{Date.now()}}`;
|
||||||
}}
|
}}
|
||||||
}}
|
}}
|
||||||
}}
|
}}
|
||||||
|
|
||||||
let refreshTimer = null;
|
// SSE connection for real-time screenshot updates
|
||||||
function startRefresh() {{
|
let eventSource = null;
|
||||||
if (refreshTimer !== null) return;
|
let sparklineTimer = null;
|
||||||
refresh();
|
|
||||||
refreshTimer = setInterval(refresh, 5000);
|
function startSSE() {{
|
||||||
|
if (eventSource) return;
|
||||||
|
eventSource = new EventSource('/events');
|
||||||
|
eventSource.addEventListener('activity', (e) => {{
|
||||||
|
refreshTile(e.data);
|
||||||
|
}});
|
||||||
|
eventSource.onerror = () => {{
|
||||||
|
// Reconnect on error
|
||||||
|
eventSource.close();
|
||||||
|
eventSource = null;
|
||||||
|
setTimeout(startSSE, 2000);
|
||||||
|
}};
|
||||||
|
// Initial load of all screenshots
|
||||||
|
refreshAll();
|
||||||
|
// Start sparkline polling (every 30s since it's 30min history)
|
||||||
|
if (composeMode && !sparklineTimer) {{
|
||||||
|
refreshSparklines();
|
||||||
|
sparklineTimer = setInterval(refreshSparklines, 30000);
|
||||||
|
}}
|
||||||
}}
|
}}
|
||||||
function stopRefresh() {{
|
|
||||||
if (refreshTimer === null) return;
|
function stopSSE() {{
|
||||||
clearInterval(refreshTimer);
|
if (eventSource) {{
|
||||||
refreshTimer = null;
|
eventSource.close();
|
||||||
|
eventSource = null;
|
||||||
|
}}
|
||||||
|
if (sparklineTimer) {{
|
||||||
|
clearInterval(sparklineTimer);
|
||||||
|
sparklineTimer = null;
|
||||||
|
}}
|
||||||
}}
|
}}
|
||||||
|
|
||||||
document.addEventListener('visibilitychange', () => {{
|
document.addEventListener('visibilitychange', () => {{
|
||||||
if (document.hidden) stopRefresh();
|
if (document.hidden) stopSSE();
|
||||||
else startRefresh();
|
else startSSE();
|
||||||
}});
|
}});
|
||||||
|
|
||||||
if (!document.hidden) startRefresh();
|
if (!document.hidden) startSSE();
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
</html>"""
|
</html>"""
|
||||||
|
|||||||
@@ -769,3 +769,42 @@ class TestLocalServerMoreCoverage:
|
|||||||
assert resp.content_type == "image/svg+xml"
|
assert resp.content_type == "image/svg+xml"
|
||||||
assert "<svg" in resp.text
|
assert "<svg" in resp.text
|
||||||
session.get_screen_state.assert_awaited_once()
|
session.get_screen_state.assert_awaited_once()
|
||||||
|
|
||||||
|
def test_notify_activity_pushes_to_subscribers(self, server_with_no_apps):
|
||||||
|
"""Test that activity notifications are pushed to SSE subscribers."""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=10)
|
||||||
|
server_with_no_apps._sse_subscribers.append(queue)
|
||||||
|
|
||||||
|
server_with_no_apps._notify_activity("test-route")
|
||||||
|
|
||||||
|
assert not queue.empty()
|
||||||
|
assert queue.get_nowait() == "test-route"
|
||||||
|
|
||||||
|
def test_notify_activity_handles_full_queue(self, server_with_no_apps):
|
||||||
|
"""Test that full queues don't cause errors."""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=1)
|
||||||
|
queue.put_nowait("existing")
|
||||||
|
server_with_no_apps._sse_subscribers.append(queue)
|
||||||
|
|
||||||
|
# Should not raise even though queue is full
|
||||||
|
server_with_no_apps._notify_activity("test-route")
|
||||||
|
|
||||||
|
# Only the original item should be there
|
||||||
|
assert queue.get_nowait() == "existing"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_mark_route_activity_triggers_notification(self, server_with_no_apps):
|
||||||
|
"""Test that mark_route_activity triggers SSE notification."""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
queue: asyncio.Queue[str] = asyncio.Queue(maxsize=10)
|
||||||
|
server_with_no_apps._sse_subscribers.append(queue)
|
||||||
|
|
||||||
|
server_with_no_apps.mark_route_activity("my-route")
|
||||||
|
|
||||||
|
assert not queue.empty()
|
||||||
|
assert queue.get_nowait() == "my-route"
|
||||||
|
|||||||
Reference in New Issue
Block a user