Files
takopi/tests/test_telegram_queue.py
T
2026-01-09 20:57:04 +04:00

260 lines
6.7 KiB
Python

import anyio
import pytest
from takopi.telegram.client import TelegramClient, TelegramRetryAfter
class _FakeBot:
def __init__(self) -> None:
self.calls: list[str] = []
self.edit_calls: list[str] = []
self.delete_calls: list[tuple[int, int]] = []
self._edit_attempts = 0
self._updates_attempts = 0
self.retry_after: float | None = None
self.updates_retry_after: float | None = None
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
replace_message_id: int | None = None,
) -> dict:
_ = reply_to_message_id
_ = disable_notification
_ = entities
_ = parse_mode
_ = replace_message_id
self.calls.append("send_message")
return {"message_id": 1}
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = True,
) -> dict:
_ = chat_id
_ = message_id
_ = entities
_ = parse_mode
_ = wait
self.calls.append("edit_message_text")
self.edit_calls.append(text)
if self.retry_after is not None and self._edit_attempts == 0:
self._edit_attempts += 1
raise TelegramRetryAfter(self.retry_after)
self._edit_attempts += 1
return {"message_id": message_id}
async def delete_message(
self,
chat_id: int,
message_id: int,
) -> bool:
self.calls.append("delete_message")
self.delete_calls.append((chat_id, message_id))
return True
async def set_my_commands(
self,
commands: list[dict],
*,
scope: dict | None = None,
language_code: str | None = None,
) -> bool:
_ = commands
_ = scope
_ = language_code
return True
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
if self.updates_retry_after is not None and self._updates_attempts == 0:
self._updates_attempts += 1
raise TelegramRetryAfter(self.updates_retry_after)
self._updates_attempts += 1
return []
async def get_file(self, file_id: str) -> dict | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def close(self) -> None:
return None
async def get_me(self) -> dict | None:
return {"id": 1}
@pytest.mark.anyio
async def test_edits_coalesce_latest() -> None:
class _BlockingBot(_FakeBot):
def __init__(self) -> None:
super().__init__()
self.edit_started = anyio.Event()
self.release = anyio.Event()
self._block_first = True
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = True,
) -> dict:
if self._block_first:
self._block_first = False
self.edit_started.set()
await self.release.wait()
return await super().edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=text,
entities=entities,
parse_mode=parse_mode,
wait=wait,
)
bot = _BlockingBot()
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="first",
wait=False,
)
with anyio.fail_after(1):
await bot.edit_started.wait()
await client.edit_message_text(
chat_id=1,
message_id=1,
text="second",
wait=False,
)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="third",
wait=False,
)
bot.release.set()
with anyio.fail_after(1):
while len(bot.edit_calls) < 2:
await anyio.sleep(0)
assert bot.edit_calls == ["first", "third"]
@pytest.mark.anyio
async def test_send_preempts_pending_edit() -> None:
bot = _FakeBot()
client = TelegramClient(client=bot, private_chat_rps=10.0, group_chat_rps=10.0)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="first",
)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="progress",
wait=False,
)
with anyio.fail_after(1):
await client.send_message(chat_id=1, text="final")
await anyio.sleep(0.2)
assert bot.calls[0] == "edit_message_text"
assert bot.calls[1] == "send_message"
assert bot.calls[-1] == "edit_message_text"
@pytest.mark.anyio
async def test_delete_drops_pending_edits() -> None:
bot = _FakeBot()
client = TelegramClient(client=bot, private_chat_rps=10.0, group_chat_rps=10.0)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="first",
)
await client.edit_message_text(
chat_id=1,
message_id=1,
text="progress",
wait=False,
)
with anyio.fail_after(1):
await client.delete_message(
chat_id=1,
message_id=1,
)
await anyio.sleep(0.2)
assert bot.delete_calls == [(1, 1)]
assert bot.edit_calls == ["first"]
@pytest.mark.anyio
async def test_retry_after_retries_once() -> None:
bot = _FakeBot()
bot.retry_after = 0.0
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
result = await client.edit_message_text(
chat_id=1,
message_id=1,
text="retry",
)
assert result == {"message_id": 1}
assert bot._edit_attempts == 2
@pytest.mark.anyio
async def test_get_updates_retries_on_retry_after() -> None:
bot = _FakeBot()
bot.updates_retry_after = 0.0
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
with anyio.fail_after(1):
updates = await client.get_updates(offset=None, timeout_s=0)
assert updates == []
assert bot._updates_attempts == 2