Fix websocket replay tests
This commit is contained in:
@@ -694,7 +694,17 @@ class TestLocalServerMoreCoverage:
|
||||
assert queue.get_nowait() == "existing"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_mark_route_activity_triggers_notification(self, server_with_no_apps):
|
||||
async def test_handle_session_data_marks_activity(self, server_with_no_apps, monkeypatch):
|
||||
ws = MagicMock()
|
||||
ws.send_bytes = AsyncMock()
|
||||
server_with_no_apps._websocket_connections["rk"] = ws
|
||||
server_with_no_apps._route_last_activity["rk"] = 0.0
|
||||
|
||||
await server_with_no_apps.handle_session_data("rk", b"data")
|
||||
assert server_with_no_apps._route_last_activity["rk"] > 0.0
|
||||
ws.send_bytes.assert_awaited_once_with(b"data")
|
||||
|
||||
def test_mark_route_activity_triggers_notification(self, server_with_no_apps):
|
||||
"""Test that mark_route_activity triggers SSE notification."""
|
||||
import asyncio
|
||||
|
||||
|
||||
Reference in New Issue
Block a user