Fix stdin stalls and test warnings

This commit is contained in:
GitHub Copilot
2026-02-11 09:21:29 +00:00
parent 3eeba0c1ec
commit 57a93a7cc6
5 changed files with 186 additions and 19 deletions
+33
View File
@@ -136,3 +136,36 @@ class TestPoller:
# Queues should have None
assert q1.get_nowait() is None
assert q2.get_nowait() is None
@pytest.mark.asyncio
async def test_write_with_timeout_returns_true_on_success(self):
"""write_with_timeout returns True when write completes."""
poller = Poller()
poller._loop = asyncio.get_event_loop()
with patch.object(poller._selector, "register"):
poller.add_file(42)
async def instant_write(fd, data):
# Simulate immediate completion
pass
with patch.object(poller, "write", side_effect=instant_write):
result = await poller.write_with_timeout(42, b"test", timeout=1.0)
assert result is True
@pytest.mark.asyncio
async def test_write_with_timeout_returns_false_on_timeout(self):
"""write_with_timeout returns False when write times out."""
poller = Poller()
poller._loop = asyncio.get_event_loop()
with patch.object(poller._selector, "register"):
poller.add_file(42)
async def slow_write(fd, data):
await asyncio.sleep(999)
with patch.object(poller, "write", side_effect=slow_write):
result = await poller.write_with_timeout(42, b"test", timeout=0.01)
assert result is False