refactor(telegram): boundary types (#90)

This commit is contained in:
banteg
2026-01-11 21:36:07 +04:00
committed by GitHub
parent c6c34ac17f
commit e8c478d786
23 changed files with 1116 additions and 581 deletions
+10 -7
View File
@@ -1,8 +1,9 @@
from __future__ import annotations
from takopi.backends import EngineBackend
from takopi.config import dump_toml
from takopi.telegram import onboarding
from takopi.backends import EngineBackend
from takopi.telegram.api_models import User
def test_mask_token_short() -> None:
@@ -91,7 +92,7 @@ def test_interactive_setup_writes_config(monkeypatch, tmp_path) -> None:
def _fake_run(func, *args, **kwargs):
if func is onboarding.get_bot_info:
return {"username": "my_bot"}
return User(id=1, username="my_bot")
if func is onboarding.wait_for_chat:
return onboarding.ChatInfo(
chat_id=123,
@@ -136,7 +137,7 @@ def test_interactive_setup_preserves_projects(monkeypatch, tmp_path) -> None:
def _fake_run(func, *args, **kwargs):
if func is onboarding.get_bot_info:
return {"username": "my_bot"}
return User(id=1, username="my_bot")
if func is onboarding.wait_for_chat:
return onboarding.ChatInfo(
chat_id=123,
@@ -173,7 +174,7 @@ def test_interactive_setup_no_agents_aborts(monkeypatch, tmp_path) -> None:
def _fake_run(func, *args, **kwargs):
if func is onboarding.get_bot_info:
return {"username": "my_bot"}
return User(id=1, username="my_bot")
if func is onboarding.wait_for_chat:
return onboarding.ChatInfo(
chat_id=123,
@@ -211,7 +212,7 @@ def test_interactive_setup_recovers_from_malformed_toml(monkeypatch, tmp_path) -
def _fake_run(func, *args, **kwargs):
if func is onboarding.get_bot_info:
return {"username": "my_bot"}
return User(id=1, username="my_bot")
if func is onboarding.wait_for_chat:
return onboarding.ChatInfo(
chat_id=123,
@@ -239,7 +240,7 @@ def test_interactive_setup_recovers_from_malformed_toml(monkeypatch, tmp_path) -
def test_capture_chat_id_with_token(monkeypatch) -> None:
def _fake_run(func, *args, **kwargs):
if func is onboarding.get_bot_info:
return {"username": "my_bot"}
return User(id=1, username="my_bot")
if func is onboarding.wait_for_chat:
return onboarding.ChatInfo(
chat_id=456,
@@ -261,7 +262,9 @@ def test_capture_chat_id_with_token(monkeypatch) -> None:
def test_capture_chat_id_prompts_for_token(monkeypatch) -> None:
monkeypatch.setattr(
onboarding, "_prompt_token", lambda _console: ("token", {"username": "bot"})
onboarding,
"_prompt_token",
lambda _console: ("token", User(id=1, username="bot")),
)
def _fake_run(func, *args, **kwargs):
+64 -14
View File
@@ -9,6 +9,11 @@ from takopi.config import ProjectsConfig
from takopi.model import EngineId
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.mock import Return, ScriptRunner
from takopi.settings import (
TelegramFilesSettings,
TelegramTopicsSettings,
TelegramTransportSettings,
)
from takopi.telegram import backend as telegram_backend
from takopi.transport_runtime import TransportRuntime
@@ -20,8 +25,13 @@ def test_build_startup_message_includes_missing_engines(tmp_path: Path) -> None:
missing = ScriptRunner([Return(answer="ok")], engine=pi)
router = AutoRouter(
entries=[
RunnerEntry(engine=codex, runner=runner, available=True),
RunnerEntry(engine=pi, runner=missing, available=False, issue="missing"),
RunnerEntry(engine=codex, runner=runner),
RunnerEntry(
engine=pi,
runner=missing,
status="missing_cli",
issue="missing",
),
],
default_engine=codex,
)
@@ -40,6 +50,44 @@ def test_build_startup_message_includes_missing_engines(tmp_path: Path) -> None:
assert "projects: `none`" in message
def test_build_startup_message_surfaces_unavailable_engine_reasons(
tmp_path: Path,
) -> None:
codex = EngineId("codex")
pi = EngineId("pi")
claude = EngineId("claude")
runner = ScriptRunner([Return(answer="ok")], engine=codex)
bad_cfg = ScriptRunner([Return(answer="ok")], engine=pi)
load_err = ScriptRunner([Return(answer="ok")], engine=claude)
router = AutoRouter(
entries=[
RunnerEntry(engine=codex, runner=runner),
RunnerEntry(engine=pi, runner=bad_cfg, status="bad_config", issue="bad"),
RunnerEntry(
engine=claude,
runner=load_err,
status="load_error",
issue="failed",
),
],
default_engine=codex,
)
runtime = TransportRuntime(
router=router,
projects=ProjectsConfig(projects={}, default_project=None),
watch_config=True,
)
message = telegram_backend._build_startup_message(
runtime, startup_pwd=str(tmp_path)
)
assert "agents: `codex" in message
assert "misconfigured: pi" in message
assert "failed to load: claude" in message
def test_telegram_backend_build_and_run_wires_config(
tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
@@ -55,7 +103,7 @@ def test_telegram_backend_build_and_run_wires_config(
codex = EngineId("codex")
runner = ScriptRunner([Return(answer="ok")], engine=codex)
router = AutoRouter(
entries=[RunnerEntry(engine=codex, runner=runner, available=True)],
entries=[RunnerEntry(engine=codex, runner=runner)],
default_engine=codex,
)
runtime = TransportRuntime(
@@ -80,13 +128,14 @@ def test_telegram_backend_build_and_run_wires_config(
monkeypatch.setattr(telegram_backend, "run_main_loop", fake_run_main_loop)
monkeypatch.setattr(telegram_backend, "TelegramClient", _FakeClient)
transport_config = {
"bot_token": "token",
"chat_id": 321,
"voice_transcription": True,
"files": {"enabled": True, "allowed_user_ids": [1, 2]},
"topics": {"enabled": True, "scope": "main"},
}
transport_config = TelegramTransportSettings(
bot_token="token",
chat_id=321,
voice_transcription=True,
voice_max_bytes=1234,
files=TelegramFilesSettings(enabled=True, allowed_user_ids=[1, 2]),
topics=TelegramTopicsSettings(enabled=True, scope="main"),
)
telegram_backend.TelegramBackend().build_and_run(
transport_config=transport_config,
@@ -100,18 +149,19 @@ def test_telegram_backend_build_and_run_wires_config(
kwargs = captured["kwargs"]
assert cfg.chat_id == 321
assert cfg.voice_transcription is True
assert cfg.voice_max_bytes == 1234
assert cfg.files.enabled is True
assert cfg.files.allowed_user_ids == frozenset({1, 2})
assert cfg.files.allowed_user_ids == [1, 2]
assert cfg.topics.enabled is True
assert cfg.bot.token == "token"
assert kwargs["watch_config"] is True
assert kwargs["transport_id"] == "telegram"
def test_build_files_config_defaults() -> None:
cfg = telegram_backend._build_files_config({})
def test_telegram_files_settings_defaults() -> None:
cfg = TelegramFilesSettings()
assert cfg.enabled is False
assert cfg.auto_put is True
assert cfg.uploads_dir == "incoming"
assert cfg.allowed_user_ids == frozenset()
assert cfg.allowed_user_ids == []
+44 -40
View File
@@ -6,22 +6,30 @@ import anyio
import pytest
from takopi import commands, plugins
import takopi.telegram.bridge as bridge
import takopi.telegram.loop as telegram_loop
import takopi.telegram.commands as telegram_commands
import takopi.telegram.topics as telegram_topics
from takopi.directives import parse_directives
from takopi.telegram.api_models import (
Chat,
ChatMember,
File,
ForumTopic,
Message,
Update,
User,
)
from takopi.settings import TelegramFilesSettings, TelegramTopicsSettings
from takopi.telegram.bridge import (
TelegramBridgeConfig,
TelegramFilesConfig,
TelegramPresenter,
TelegramTransport,
build_bot_commands,
handle_callback_cancel,
handle_cancel,
is_cancel_command,
send_with_resume,
run_main_loop,
send_with_resume,
)
from takopi.telegram.client import BotClient
from takopi.telegram.topic_state import TopicStateStore, resolve_state_path
@@ -122,13 +130,13 @@ class _FakeBot(BotClient):
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict[str, Any]] | None:
) -> list[Update] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
return []
async def get_file(self, file_id: str) -> dict[str, Any] | None:
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return None
@@ -148,7 +156,7 @@ class _FakeBot(BotClient):
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> dict[str, Any]:
) -> Message:
self.send_calls.append(
{
"chat_id": chat_id,
@@ -162,7 +170,7 @@ class _FakeBot(BotClient):
"replace_message_id": replace_message_id,
}
)
return {"message_id": 1}
return Message(message_id=1)
async def send_document(
self,
@@ -173,7 +181,7 @@ class _FakeBot(BotClient):
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> dict[str, Any]:
) -> Message:
self.document_calls.append(
{
"chat_id": chat_id,
@@ -185,7 +193,7 @@ class _FakeBot(BotClient):
"caption": caption,
}
)
return {"message_id": 2}
return Message(message_id=2)
async def edit_message_text(
self,
@@ -197,7 +205,7 @@ class _FakeBot(BotClient):
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> dict[str, Any]:
) -> Message:
self.edit_calls.append(
{
"chat_id": chat_id,
@@ -209,7 +217,7 @@ class _FakeBot(BotClient):
"wait": wait,
}
)
return {"message_id": message_id}
return Message(message_id=message_id)
async def delete_message(self, chat_id: int, message_id: int) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
@@ -231,26 +239,22 @@ class _FakeBot(BotClient):
)
return True
async def get_me(self) -> dict[str, Any] | None:
return {"id": 1}
async def get_me(self) -> User | None:
return User(id=1, username="bot")
async def get_chat(self, chat_id: int) -> dict[str, Any] | None:
async def get_chat(self, chat_id: int) -> Chat | None:
_ = chat_id
return {"id": chat_id, "type": "supergroup", "is_forum": True}
return Chat(id=chat_id, type="supergroup", is_forum=True)
async def get_chat_member(
self, chat_id: int, user_id: int
) -> dict[str, Any] | None:
async def get_chat_member(self, chat_id: int, user_id: int) -> ChatMember | None:
_ = chat_id
_ = user_id
return {"status": "administrator", "can_manage_topics": True}
return ChatMember(status="administrator", can_manage_topics=True)
async def create_forum_topic(
self, chat_id: int, name: str
) -> dict[str, Any] | None:
async def create_forum_topic(self, chat_id: int, name: str) -> ForumTopic | None:
_ = chat_id
_ = name
return {"message_thread_id": 1}
return ForumTopic(message_thread_id=1)
async def edit_forum_topic(
self, chat_id: int, message_thread_id: int, name: str
@@ -539,10 +543,10 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict[str, Any]] | None:
) -> list[Update] | None:
return None
async def get_file(self, file_id: str) -> dict[str, Any] | None:
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return None
@@ -562,7 +566,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> dict | None:
) -> Message | None:
_ = reply_markup
return None
@@ -575,7 +579,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> dict | None:
) -> Message | None:
_ = (
chat_id,
filename,
@@ -597,7 +601,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> dict | None:
) -> Message | None:
self.edit_calls.append(
{
"chat_id": chat_id,
@@ -611,7 +615,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
)
if not wait:
return None
return {"message_id": message_id}
return Message(message_id=message_id)
async def delete_message(
self,
@@ -629,7 +633,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
) -> bool:
return False
async def get_me(self) -> dict[str, Any] | None:
async def get_me(self) -> User | None:
return None
async def close(self) -> None:
@@ -778,9 +782,9 @@ async def test_handle_file_put_writes_file(tmp_path: Path) -> None:
payload = b"hello"
class _FileBot(_FakeBot):
async def get_file(self, file_id: str) -> dict[str, Any] | None:
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return {"file_path": "files/hello.txt"}
return File(file_path="files/hello.txt")
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
@@ -811,7 +815,7 @@ async def test_handle_file_put_writes_file(tmp_path: Path) -> None:
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
files=TelegramFilesConfig(enabled=True),
files=TelegramFilesSettings(enabled=True),
)
msg = TelegramIncomingMessage(
transport="telegram",
@@ -876,9 +880,9 @@ async def test_handle_file_get_sends_document_for_allowed_user(
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
files=TelegramFilesConfig(
files=TelegramFilesSettings(
enabled=True,
allowed_user_ids=frozenset({42}),
allowed_user_ids=[42],
),
)
msg = TelegramIncomingMessage(
@@ -1006,7 +1010,7 @@ def test_topic_title_projects_scope_includes_project() -> None:
transport = _FakeTransport()
cfg = replace(
_make_cfg(transport),
topics=bridge.TelegramTopicsConfig(
topics=TelegramTopicsSettings(
enabled=True,
scope="projects",
),
@@ -1277,7 +1281,7 @@ async def test_run_main_loop_persists_topic_sessions_in_project_scope(
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
topics=bridge.TelegramTopicsConfig(
topics=TelegramTopicsSettings(
enabled=True,
scope="projects",
),
@@ -1363,11 +1367,11 @@ async def test_run_main_loop_batches_media_group_upload(
}
class _MediaBot(_FakeBot):
async def get_file(self, file_id: str) -> dict[str, Any] | None:
async def get_file(self, file_id: str) -> File | None:
file_path = file_map.get(file_id)
if file_path is None:
return None
return {"file_path": file_path}
return File(file_path=file_path)
async def download_file(self, file_path: str) -> bytes | None:
return payloads.get(file_path)
@@ -1397,7 +1401,7 @@ async def test_run_main_loop_batches_media_group_upload(
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
files=TelegramFilesConfig(enabled=True, auto_put=True),
files=TelegramFilesSettings(enabled=True, auto_put=True),
)
msg1 = TelegramIncomingMessage(
transport="telegram",
+172
View File
@@ -57,3 +57,175 @@ async def test_no_token_in_logs_on_http_error(
out = capsys.readouterr().out
assert token not in out
assert "bot[REDACTED]" in out
@pytest.mark.anyio
async def test_telegram_429_no_retry_post_form() -> None:
calls: list[int] = []
def handler(request: httpx.Request) -> httpx.Response:
calls.append(1)
return httpx.Response(
429,
json={
"ok": False,
"description": "retry",
"parameters": {"retry_after": 2},
},
request=request,
)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client)
with pytest.raises(TelegramRetryAfter) as exc:
await tg._post_form(
"sendDocument",
{"chat_id": 1},
files={"document": ("note.txt", b"hi")},
)
finally:
await client.aclose()
assert exc.value.retry_after == 2
assert len(calls) == 1
@pytest.mark.anyio
async def test_telegram_429_defaults_retry_after_on_bad_body() -> None:
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(429, text="nope", request=request)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client)
with pytest.raises(TelegramRetryAfter) as exc:
await tg._post("sendMessage", {"chat_id": 1, "text": "hi"})
finally:
await client.aclose()
assert exc.value.retry_after == 5.0
@pytest.mark.anyio
async def test_telegram_ok_false_returns_none() -> None:
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(
200,
json={"ok": False, "error_code": 400, "description": "bad"},
request=request,
)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client)
result = await tg._post("getUpdates", {"timeout": 1})
finally:
await client.aclose()
assert result is None
@pytest.mark.anyio
async def test_telegram_invalid_payload_returns_none() -> None:
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json=["not", "a", "dict"], request=request)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client)
result = await tg._post("getUpdates", {"timeout": 1})
finally:
await client.aclose()
assert result is None
@pytest.mark.anyio
async def test_telegram_decode_failure_returns_none() -> None:
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(
200,
json={"ok": True, "result": {"username": "bot-only"}},
request=request,
)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client)
result = await tg.get_me()
finally:
await client.aclose()
assert result is None
@pytest.mark.anyio
async def test_telegram_download_file_retries_on_429() -> None:
calls: list[int] = []
sleeps: list[float] = []
async def sleep(delay: float) -> None:
sleeps.append(delay)
def handler(request: httpx.Request) -> httpx.Response:
calls.append(1)
if len(calls) == 1:
return httpx.Response(
429,
json={"ok": False, "parameters": {"retry_after": 3}},
request=request,
)
return httpx.Response(200, content=b"ok", request=request)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client, sleep=sleep)
payload = await tg.download_file("path/to/file")
finally:
await client.aclose()
assert payload == b"ok"
assert sleeps == [3.0]
assert len(calls) == 2
@pytest.mark.anyio
async def test_telegram_download_file_429_defaults_retry_after_on_bad_body() -> None:
sleeps: list[float] = []
async def sleep(delay: float) -> None:
sleeps.append(delay)
calls: list[int] = []
def handler(request: httpx.Request) -> httpx.Response:
calls.append(1)
if len(calls) == 1:
return httpx.Response(429, text="nope", request=request)
return httpx.Response(200, content=b"ok", request=request)
transport = httpx.MockTransport(handler)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient("123:abcDEF_ghij", http_client=client, sleep=sleep)
payload = await tg.download_file("path")
finally:
await client.aclose()
assert payload == b"ok"
assert sleeps == [5.0]
assert len(calls) == 2
+14 -12
View File
@@ -3,6 +3,7 @@ from typing import Any
import anyio
import pytest
from takopi.telegram.api_models import File, Message, Update, User
from takopi.telegram.client import BotClient, TelegramClient, TelegramRetryAfter
@@ -29,7 +30,7 @@ class _FakeBot(BotClient):
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> dict[str, Any]:
) -> Message | None:
_ = reply_to_message_id
_ = disable_notification
_ = message_thread_id
@@ -38,7 +39,7 @@ class _FakeBot(BotClient):
_ = reply_markup
_ = replace_message_id
self.calls.append("send_message")
return {"message_id": 1}
return Message(message_id=1)
async def send_document(
self,
@@ -49,7 +50,7 @@ class _FakeBot(BotClient):
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> dict[str, Any]:
) -> Message | None:
_ = (
chat_id,
filename,
@@ -60,7 +61,7 @@ class _FakeBot(BotClient):
caption,
)
self.calls.append("send_document")
return {"message_id": 1}
return Message(message_id=1)
async def edit_message_text(
self,
@@ -72,7 +73,7 @@ class _FakeBot(BotClient):
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> dict[str, Any]:
) -> Message | None:
_ = chat_id
_ = message_id
_ = entities
@@ -85,7 +86,7 @@ class _FakeBot(BotClient):
self._edit_attempts += 1
raise TelegramRetryAfter(self.retry_after)
self._edit_attempts += 1
return {"message_id": message_id}
return Message(message_id=message_id)
async def delete_message(
self,
@@ -113,7 +114,7 @@ class _FakeBot(BotClient):
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict[str, Any]] | None:
) -> list[Update] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
@@ -123,7 +124,7 @@ class _FakeBot(BotClient):
self._updates_attempts += 1
return []
async def get_file(self, file_id: str) -> dict[str, Any] | None:
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return None
@@ -134,8 +135,8 @@ class _FakeBot(BotClient):
async def close(self) -> None:
return None
async def get_me(self) -> dict[str, Any] | None:
return {"id": 1}
async def get_me(self) -> User | None:
return User(id=1)
async def answer_callback_query(
self,
@@ -187,7 +188,7 @@ async def test_edits_coalesce_latest() -> None:
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> dict:
) -> Message | None:
if self._block_first:
self._block_first = False
self.edit_started.set()
@@ -305,7 +306,8 @@ async def test_retry_after_retries_once() -> None:
text="retry",
)
assert result == {"message_id": 1}
assert result is not None
assert result.message_id == 1
assert bot._edit_attempts == 2
+243
View File
@@ -0,0 +1,243 @@
from __future__ import annotations
import pytest
from takopi.telegram.api_models import (
Chat,
ChatMember,
File,
ForumTopic,
Message,
Update,
User,
)
from takopi.telegram.client import BotClient
from takopi.telegram.types import TelegramIncomingMessage, TelegramVoice
from takopi.telegram.voice import transcribe_voice
class _Bot(BotClient):
def __init__(self, *, file_info: File | None, audio: bytes | None) -> None:
self._file_info = file_info
self._audio = audio
async def close(self) -> None:
return None
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[Update] | None:
_ = offset, timeout_s, allowed_updates
return []
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return self._file_info
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return self._audio
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
message_thread_id: int | None = None,
entities: list[dict] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> Message | None:
_ = (
chat_id,
text,
reply_to_message_id,
disable_notification,
message_thread_id,
entities,
parse_mode,
reply_markup,
replace_message_id,
)
raise AssertionError("send_message should not be called")
async def send_document(
self,
chat_id: int,
filename: str,
content: bytes,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> Message | None:
_ = (
chat_id,
filename,
content,
reply_to_message_id,
message_thread_id,
disable_notification,
caption,
)
raise AssertionError("send_document should not be called")
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> Message | None:
_ = (
chat_id,
message_id,
text,
entities,
parse_mode,
reply_markup,
wait,
)
raise AssertionError("edit_message_text should not be called")
async def delete_message(self, chat_id: int, message_id: int) -> bool:
_ = chat_id, message_id
raise AssertionError("delete_message should not be called")
async def set_my_commands(
self,
commands: list[dict],
*,
scope: dict | None = None,
language_code: str | None = None,
) -> bool:
_ = commands, scope, language_code
raise AssertionError("set_my_commands should not be called")
async def get_me(self) -> User | None:
raise AssertionError("get_me should not be called")
async def answer_callback_query(
self,
callback_query_id: str,
text: str | None = None,
show_alert: bool | None = None,
) -> bool:
_ = callback_query_id, text, show_alert
raise AssertionError("answer_callback_query should not be called")
async def get_chat(self, chat_id: int) -> Chat | None:
_ = chat_id
raise AssertionError("get_chat should not be called")
async def get_chat_member(self, chat_id: int, user_id: int) -> ChatMember | None:
_ = chat_id, user_id
raise AssertionError("get_chat_member should not be called")
async def create_forum_topic(self, chat_id: int, name: str) -> ForumTopic | None:
_ = chat_id, name
raise AssertionError("create_forum_topic should not be called")
async def edit_forum_topic(
self, chat_id: int, message_thread_id: int, name: str
) -> bool:
_ = chat_id, message_thread_id, name
raise AssertionError("edit_forum_topic should not be called")
def _voice_message(*, file_size: int = 123) -> TelegramIncomingMessage:
voice = TelegramVoice(
file_id="voice-id",
mime_type="audio/ogg",
file_size=file_size,
duration=1,
raw={},
)
return TelegramIncomingMessage(
transport="telegram",
chat_id=1,
message_id=1,
text="",
reply_to_message_id=None,
reply_to_text=None,
sender_id=1,
voice=voice,
raw={},
)
@pytest.mark.anyio
async def test_transcribe_voice_handles_missing_file() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
bot = _Bot(file_info=None, audio=None)
result = await transcribe_voice(
bot=bot,
msg=_voice_message(),
enabled=True,
reply=reply,
)
assert result is None
assert replies[-1] == "failed to fetch voice file."
@pytest.mark.anyio
async def test_transcribe_voice_handles_missing_download() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
bot = _Bot(file_info=File(file_path="voice.ogg"), audio=None)
result = await transcribe_voice(
bot=bot,
msg=_voice_message(),
enabled=True,
reply=reply,
)
assert result is None
assert replies[-1] == "failed to download voice file."
@pytest.mark.anyio
async def test_transcribe_voice_rejects_large_voice_without_downloading() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
class _NoFetchBot(_Bot):
async def get_file(self, file_id: str) -> File | None: # type: ignore[override]
_ = file_id
raise AssertionError("get_file should not be called")
async def download_file(self, file_path: str) -> bytes | None: # type: ignore[override]
_ = file_path
raise AssertionError("download_file should not be called")
bot = _NoFetchBot(file_info=None, audio=None)
result = await transcribe_voice(
bot=bot,
msg=_voice_message(file_size=10_000),
enabled=True,
max_bytes=100,
reply=reply,
)
assert result is None
assert replies[-1] == "voice message is too large to transcribe."
+2 -2
View File
@@ -15,14 +15,14 @@ class DummyTransport:
def interactive_setup(self, *, force: bool) -> bool:
raise NotImplementedError
def lock_token(self, *, transport_config: dict[str, object], config_path):
def lock_token(self, *, transport_config: object, config_path):
_ = transport_config, config_path
raise NotImplementedError
def build_and_run(
self,
*,
transport_config: dict[str, object],
transport_config: object,
config_path,
runtime,
final_notify: bool,