test: improve coverage and raise threshold to 80% (#154)

This commit is contained in:
banteg
2026-01-16 13:19:41 +04:00
committed by GitHub
parent 92b33c5181
commit da881fcee5
26 changed files with 4282 additions and 412 deletions
+1 -1
View File
@@ -66,7 +66,7 @@ docs = [
]
[tool.pytest.ini_options]
addopts = ["--cov=takopi", "--cov-report=term-missing", "--cov-fail-under=75"]
addopts = ["--cov=takopi", "--cov-branch", "--cov-report=term-missing", "--cov-fail-under=80"]
testpaths = ["tests"]
[tool.ruff.lint]
+38 -18
View File
@@ -2,6 +2,7 @@ from __future__ import annotations
import io
from collections.abc import Awaitable, Callable
from typing import Protocol
from ..logging import get_logger
from openai import AsyncOpenAI, OpenAIError
@@ -22,6 +23,22 @@ VOICE_TRANSCRIPTION_DISABLED_HINT = (
)
class VoiceTranscriber(Protocol):
async def transcribe(self, *, model: str, audio_bytes: bytes) -> str: ...
class OpenAIVoiceTranscriber:
async def transcribe(self, *, model: str, audio_bytes: bytes) -> str:
audio_file = io.BytesIO(audio_bytes)
audio_file.name = "voice.ogg"
async with AsyncOpenAI(timeout=120) as client:
response = await client.audio.transcriptions.create(
model=model,
file=audio_file,
)
return response.text
async def transcribe_voice(
*,
bot: BotClient,
@@ -30,6 +47,7 @@ async def transcribe_voice(
model: str,
max_bytes: int | None = None,
reply: Callable[..., Awaitable[None]],
transcriber: VoiceTranscriber | None = None,
) -> str | None:
voice = msg.voice
if voice is None:
@@ -55,21 +73,23 @@ async def transcribe_voice(
if max_bytes is not None and len(audio_bytes) > max_bytes:
await reply(text="voice message is too large to transcribe.")
return None
audio_file = io.BytesIO(audio_bytes)
audio_file.name = "voice.ogg"
async with AsyncOpenAI(timeout=120) as client:
try:
response = await client.audio.transcriptions.create(
model=model,
file=audio_file,
)
except OpenAIError as exc:
logger.error(
"openai.transcribe.error",
error=str(exc),
error_type=exc.__class__.__name__,
)
await reply(text=str(exc).strip() or "voice transcription failed")
return None
return response.text
if transcriber is None:
transcriber = OpenAIVoiceTranscriber()
try:
return await transcriber.transcribe(model=model, audio_bytes=audio_bytes)
except OpenAIError as exc:
logger.error(
"openai.transcribe.error",
error=str(exc),
error_type=exc.__class__.__name__,
)
await reply(text=str(exc).strip() or "voice transcription failed")
return None
except (RuntimeError, OSError, ValueError) as exc:
logger.error(
"voice.transcribe.error",
error=str(exc),
error_type=exc.__class__.__name__,
)
await reply(text=str(exc).strip() or "voice transcription failed")
return None
+18 -9
View File
@@ -1,18 +1,27 @@
import sys
from pathlib import Path
from collections.abc import Callable
import pytest
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from takopi.telegram.bridge import TelegramBridgeConfig
from takopi.runners.mock import ScriptRunner
from tests.telegram_fakes import FakeBot, FakeTransport, make_cfg as build_cfg
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
def fake_transport() -> FakeTransport:
return FakeTransport()
@pytest.fixture(autouse=True)
def reset_plugins_state() -> None:
import takopi.plugins as plugins
@pytest.fixture
def fake_bot() -> FakeBot:
return FakeBot()
plugins.reset_plugin_state()
@pytest.fixture
def make_cfg() -> Callable[..., TelegramBridgeConfig]:
def _factory(
transport: FakeTransport, runner: ScriptRunner | None = None
) -> TelegramBridgeConfig:
return build_cfg(transport, runner)
return _factory
+288
View File
@@ -0,0 +1,288 @@
from typing import Any
import anyio
from takopi.config import ProjectsConfig
from takopi.markdown import MarkdownPresenter
from takopi.router import AutoRouter, RunnerEntry
from takopi.runner_bridge import ExecBridgeConfig
from takopi.runners.mock import Return, ScriptRunner
from takopi.telegram.api_models import (
Chat,
ChatMember,
File,
ForumTopic,
Message,
Update,
User,
)
from takopi.telegram.bridge import TelegramBridgeConfig
from takopi.telegram.client import BotClient
from takopi.transport import MessageRef, RenderedMessage, SendOptions
from takopi.transport_runtime import TransportRuntime
DEFAULT_ENGINE_ID = "codex"
def _empty_projects() -> ProjectsConfig:
return ProjectsConfig(projects={}, default_project=None)
def _make_router(runner: Any) -> AutoRouter:
return AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
class FakeTransport:
def __init__(self, progress_ready: anyio.Event | None = None) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[MessageRef] = []
self.progress_ready = progress_ready
self.progress_ref: MessageRef | None = None
async def send(
self,
*,
channel_id: int | str,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef:
ref = MessageRef(channel_id=channel_id, message_id=self._next_id)
self._next_id += 1
self.send_calls.append(
{
"ref": ref,
"channel_id": channel_id,
"message": message,
"options": options,
}
)
if (
self.progress_ref is None
and options is not None
and options.reply_to is not None
and options.notify is False
):
self.progress_ref = ref
if self.progress_ready is not None:
self.progress_ready.set()
return ref
async def edit(
self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True
) -> MessageRef:
self.edit_calls.append({"ref": ref, "message": message, "wait": wait})
return ref
async def delete(self, *, ref: MessageRef) -> bool:
self.delete_calls.append(ref)
return True
async def close(self) -> None:
return None
class FakeBot(BotClient):
def __init__(self) -> None:
self.command_calls: list[dict] = []
self.callback_calls: list[dict] = []
self.send_calls: list[dict] = []
self.document_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.edit_topic_calls: list[dict[str, Any]] = []
self.delete_calls: list[dict] = []
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[Update] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
return []
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
message_thread_id: int | None = None,
entities: list[dict[str, Any]] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> Message:
self.send_calls.append(
{
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
"disable_notification": disable_notification,
"message_thread_id": message_thread_id,
"entities": entities,
"parse_mode": parse_mode,
"reply_markup": reply_markup,
"replace_message_id": replace_message_id,
}
)
return Message(message_id=1)
async def send_document(
self,
chat_id: int,
filename: str,
content: bytes,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> Message:
self.document_calls.append(
{
"chat_id": chat_id,
"filename": filename,
"content": content,
"reply_to_message_id": reply_to_message_id,
"message_thread_id": message_thread_id,
"disable_notification": disable_notification,
"caption": caption,
}
)
return Message(message_id=2)
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict[str, Any]] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> Message:
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
"reply_markup": reply_markup,
"wait": wait,
}
)
return Message(message_id=message_id)
async def delete_message(self, chat_id: int, message_id: int) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
async def set_my_commands(
self,
commands: list[dict[str, Any]],
*,
scope: dict[str, Any] | None = None,
language_code: str | None = None,
) -> bool:
self.command_calls.append(
{
"commands": commands,
"scope": scope,
"language_code": language_code,
}
)
return True
async def get_me(self) -> User | None:
return User(id=1, username="bot")
async def get_chat(self, chat_id: int) -> Chat | None:
_ = chat_id
return Chat(id=chat_id, type="supergroup", is_forum=True)
async def get_chat_member(self, chat_id: int, user_id: int) -> ChatMember | None:
_ = chat_id
_ = user_id
return ChatMember(status="administrator", can_manage_topics=True)
async def create_forum_topic(self, chat_id: int, name: str) -> ForumTopic | None:
_ = chat_id
_ = name
return ForumTopic(message_thread_id=1)
async def edit_forum_topic(
self, chat_id: int, message_thread_id: int, name: str
) -> bool:
self.edit_topic_calls.append(
{
"chat_id": chat_id,
"message_thread_id": message_thread_id,
"name": name,
}
)
return True
async def close(self) -> None:
return None
async def answer_callback_query(
self,
callback_query_id: str,
text: str | None = None,
show_alert: bool | None = None,
) -> bool:
self.callback_calls.append(
{
"callback_query_id": callback_query_id,
"text": text,
"show_alert": show_alert,
}
)
return True
def make_cfg(
transport: FakeTransport,
runner: ScriptRunner | None = None,
*,
engine_id: str = DEFAULT_ENGINE_ID,
forward_coalesce_s: float = 0.0,
media_group_debounce_s: float = 0.0,
) -> TelegramBridgeConfig:
if runner is None:
runner = ScriptRunner([Return(answer="ok")], engine=engine_id)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=_empty_projects(),
)
return TelegramBridgeConfig(
bot=FakeBot(),
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
forward_coalesce_s=forward_coalesce_s,
media_group_debounce_s=media_group_debounce_s,
)
+7
View File
@@ -0,0 +1,7 @@
from takopi import api
def test_api_exports() -> None:
assert api.TAKOPI_PLUGIN_API_VERSION == 1
assert "TransportRuntime" in api.__all__
assert api.TransportRuntime is not None
+178
View File
@@ -0,0 +1,178 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
import pytest
from takopi import cli
from takopi.backends import EngineBackend, SetupIssue
from takopi.settings import TakopiSettings
from takopi.transports import SetupResult
@dataclass
class _DummyLock:
released: bool = False
def release(self) -> None:
self.released = True
class _FakeTransport:
id = "fake"
description = "fake transport"
def __init__(self, setup: SetupResult) -> None:
self._setup = setup
self.check_calls: list[tuple[object, str | None]] = []
self.lock_calls: list[tuple[object, Path]] = []
self.build_calls: list[dict[str, object]] = []
def check_setup(self, engine_backend, *, transport_override=None) -> SetupResult:
self.check_calls.append((engine_backend, transport_override))
return self._setup
def interactive_setup(self, *, force: bool) -> bool:
_ = force
return True
def lock_token(self, *, transport_config: object, _config_path: Path) -> str | None:
self.lock_calls.append((transport_config, _config_path))
return "lock"
def build_and_run(
self,
*,
transport_config: object,
config_path: Path,
runtime,
final_notify: bool,
default_engine_override: str | None,
) -> None:
self.build_calls.append(
{
"transport_config": transport_config,
"config_path": config_path,
"runtime": runtime,
"final_notify": final_notify,
"default_engine_override": default_engine_override,
}
)
def _engine_backend() -> EngineBackend:
return EngineBackend(id="codex", build_runner=lambda _cfg, _path: None)
def _settings() -> TakopiSettings:
return TakopiSettings.model_validate(
{
"transport": "telegram",
"transports": {"telegram": {"bot_token": "token", "chat_id": 123}},
}
)
def test_run_auto_router_success_releases_lock(monkeypatch, tmp_path: Path) -> None:
setup = SetupResult(issues=[], config_path=tmp_path / "takopi.toml")
transport = _FakeTransport(setup)
engine_backend = _engine_backend()
config_path = tmp_path / "takopi.toml"
monkeypatch.setattr(
cli,
"_resolve_setup_engine",
lambda _override: (None, None, None, "codex", engine_backend),
)
monkeypatch.setattr(cli, "_resolve_transport_id", lambda _override: "wire")
monkeypatch.setattr(cli, "get_transport", lambda _id, allowlist=None: transport)
monkeypatch.setattr(cli, "load_settings", lambda: (_settings(), config_path))
monkeypatch.setattr(cli, "setup_logging", lambda **_kwargs: None)
spec_calls: dict[str, object] = {}
class _Spec:
def to_runtime(self, *, config_path: Path):
spec_calls["runtime_config_path"] = config_path
return "runtime"
def _build_runtime_spec(**kwargs):
spec_calls.update(kwargs)
return _Spec()
monkeypatch.setattr(cli, "build_runtime_spec", _build_runtime_spec)
lock = _DummyLock()
monkeypatch.setattr(cli, "acquire_config_lock", lambda _path, _token: lock)
cli._run_auto_router(
default_engine_override=None,
transport_override="wire",
final_notify=True,
debug=False,
onboard=False,
)
assert transport.build_calls
assert lock.released is True
assert spec_calls["reserved"] == cli.RESERVED_CHAT_COMMANDS
assert transport.lock_calls[0][0] == {}
def test_run_auto_router_requires_tty_for_onboard(monkeypatch, tmp_path: Path) -> None:
setup = SetupResult(issues=[], config_path=tmp_path / "takopi.toml")
transport = _FakeTransport(setup)
monkeypatch.setattr(
cli,
"_resolve_setup_engine",
lambda _override: (None, None, None, "codex", _engine_backend()),
)
monkeypatch.setattr(cli, "_resolve_transport_id", lambda _override: "fake")
monkeypatch.setattr(cli, "get_transport", lambda _id, allowlist=None: transport)
monkeypatch.setattr(cli, "_should_run_interactive", lambda: False)
monkeypatch.setattr(cli, "setup_logging", lambda **_kwargs: None)
with pytest.raises(cli.typer.Exit) as exc:
cli._run_auto_router(
default_engine_override=None,
transport_override=None,
final_notify=True,
debug=False,
onboard=True,
)
assert exc.value.exit_code == 1
assert not transport.build_calls
def test_run_auto_router_missing_config_noninteractive(
monkeypatch, tmp_path: Path
) -> None:
setup = SetupResult(
issues=[SetupIssue(title="create a config", lines=())],
config_path=tmp_path / "missing.toml",
)
transport = _FakeTransport(setup)
monkeypatch.setattr(
cli,
"_resolve_setup_engine",
lambda _override: (None, None, None, "codex", _engine_backend()),
)
monkeypatch.setattr(cli, "_resolve_transport_id", lambda _override: "fake")
monkeypatch.setattr(cli, "get_transport", lambda _id, allowlist=None: transport)
monkeypatch.setattr(cli, "_should_run_interactive", lambda: False)
monkeypatch.setattr(cli, "setup_logging", lambda **_kwargs: None)
with pytest.raises(cli.typer.Exit) as exc:
cli._run_auto_router(
default_engine_override=None,
transport_override=None,
final_notify=True,
debug=False,
onboard=False,
)
assert exc.value.exit_code == 1
assert not transport.build_calls
+240
View File
@@ -0,0 +1,240 @@
from __future__ import annotations
from pathlib import Path
import tomllib
from typer.testing import CliRunner
from takopi import cli
from takopi.config import ConfigError
from takopi.plugins import (
COMMAND_GROUP,
ENGINE_GROUP,
TRANSPORT_GROUP,
PluginLoadError,
)
from takopi.settings import TakopiSettings
from tests.plugin_fixtures import FakeEntryPoint
def _min_config() -> dict:
return {
"transport": "telegram",
"transports": {"telegram": {"bot_token": "token", "chat_id": 123}},
}
def test_init_registers_project(monkeypatch, tmp_path: Path) -> None:
config = _min_config()
config_path = tmp_path / "takopi.toml"
repo_path = tmp_path / "repo"
repo_path.mkdir()
monkeypatch.chdir(repo_path)
monkeypatch.setattr(cli, "load_or_init_config", lambda: (config, config_path))
monkeypatch.setattr(cli, "resolve_main_worktree_root", lambda _path: None)
monkeypatch.setattr(cli, "resolve_default_base", lambda _path: "main")
monkeypatch.setattr(cli, "list_backend_ids", lambda allowlist=None: ["codex"])
monkeypatch.setattr(cli, "resolve_plugins_allowlist", lambda _settings: None)
monkeypatch.setattr(cli.typer, "prompt", lambda *args, **kwargs: "demo")
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["init", "--default"])
assert result.exit_code == 0
assert "saved project" in result.output
data = tomllib.loads(config_path.read_text(encoding="utf-8"))
project = data["projects"]["demo"]
assert project["path"] == str(repo_path)
assert project["default_engine"] == "codex"
assert project["worktrees_dir"] == ".worktrees"
assert project["worktree_base"] == "main"
assert data["default_project"] == "demo"
def test_init_declines_overwrite(monkeypatch, tmp_path: Path) -> None:
config = _min_config()
config["projects"] = {"demo": {"path": "/tmp/repo"}}
config_path = tmp_path / "takopi.toml"
repo_path = tmp_path / "repo"
repo_path.mkdir()
monkeypatch.chdir(repo_path)
monkeypatch.setattr(cli, "load_or_init_config", lambda: (config, config_path))
monkeypatch.setattr(cli, "resolve_main_worktree_root", lambda _path: None)
monkeypatch.setattr(cli, "resolve_default_base", lambda _path: None)
monkeypatch.setattr(cli, "list_backend_ids", lambda allowlist=None: ["codex"])
monkeypatch.setattr(cli, "resolve_plugins_allowlist", lambda _settings: None)
monkeypatch.setattr(cli.typer, "confirm", lambda *args, **kwargs: False)
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["init", "demo"])
assert result.exit_code == 1
def test_plugins_cmd_loads_and_reports_errors(monkeypatch) -> None:
entrypoints = {
ENGINE_GROUP: [
FakeEntryPoint(
"codex",
"takopi.runners.codex:BACKEND",
ENGINE_GROUP,
dist_name="takopi",
),
FakeEntryPoint(
"broken",
"takopi.runners.broken:BACKEND",
ENGINE_GROUP,
dist_name="takopi",
),
],
TRANSPORT_GROUP: [
FakeEntryPoint(
"telegram",
"takopi.transports.telegram:BACKEND",
TRANSPORT_GROUP,
dist_name="takopi",
)
],
COMMAND_GROUP: [
FakeEntryPoint(
"hello",
"takopi.commands.hello:BACKEND",
COMMAND_GROUP,
dist_name="thirdparty",
)
],
}
def _list_entrypoints(group: str, reserved_ids=None):
_ = reserved_ids
return entrypoints[group]
calls: list[tuple[str, str]] = []
def _get_backend(name: str, allowlist=None):
_ = allowlist
calls.append(("engine", name))
if name == "broken":
raise ConfigError("boom")
return object()
def _get_transport(name: str, allowlist=None):
_ = allowlist
calls.append(("transport", name))
return object()
def _get_command(name: str, allowlist=None):
_ = allowlist
calls.append(("command", name))
return object()
monkeypatch.setattr(cli, "_load_settings_optional", lambda: (None, None))
monkeypatch.setattr(cli, "resolve_plugins_allowlist", lambda _settings: ["takopi"])
monkeypatch.setattr(cli, "list_entrypoints", _list_entrypoints)
monkeypatch.setattr(cli, "get_backend", _get_backend)
monkeypatch.setattr(cli, "get_transport", _get_transport)
monkeypatch.setattr(cli, "get_command", _get_command)
monkeypatch.setattr(
cli,
"get_load_errors",
lambda: [
PluginLoadError(
ENGINE_GROUP,
"broken",
"takopi.runners.broken:BACKEND",
"takopi",
"boom",
),
PluginLoadError(
TRANSPORT_GROUP,
"wire",
"takopi.transports.wire:BACKEND",
"takopi",
"missing",
),
PluginLoadError(
COMMAND_GROUP,
"hello",
"takopi.commands.hello:BACKEND",
"thirdparty",
"oops",
),
],
)
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["plugins", "--load"])
assert result.exit_code == 0
assert "engine backends:" in result.output
assert "transport backends:" in result.output
assert "command backends:" in result.output
assert "codex (takopi) enabled" in result.output
assert "hello (thirdparty) disabled" in result.output
assert "errors:" in result.output
assert "engine broken (takopi): boom" in result.output
assert "transport wire (takopi): missing" in result.output
assert "command hello (thirdparty): oops" in result.output
assert ("engine", "codex") in calls
assert ("engine", "broken") in calls
assert ("transport", "telegram") in calls
assert ("command", "hello") not in calls
def test_onboarding_paths_calls_debug(monkeypatch) -> None:
called = {"count": 0}
def _debug() -> None:
called["count"] += 1
monkeypatch.setattr(cli.onboarding, "debug_onboarding_paths", _debug)
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["onboarding-paths"])
assert result.exit_code == 0
assert called["count"] == 1
def test_config_path_cmd_outputs_override(tmp_path: Path) -> None:
config_path = tmp_path / "takopi.toml"
runner = CliRunner()
result = runner.invoke(
cli.create_app(),
["config", "path", "--config-path", str(config_path)],
)
assert result.exit_code == 0
assert result.output.strip() == str(config_path)
def test_config_path_cmd_defaults_to_home(monkeypatch, tmp_path: Path) -> None:
monkeypatch.setenv("HOME", str(tmp_path))
config_path = tmp_path / ".takopi" / "takopi.toml"
monkeypatch.setattr(cli, "HOME_CONFIG_PATH", config_path)
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["config", "path"])
assert result.exit_code == 0
assert result.output.strip() == "~/.takopi/takopi.toml"
def test_doctor_rejects_non_telegram_transport(monkeypatch) -> None:
settings = TakopiSettings.model_validate(
{
"transport": "local",
"transports": {"telegram": {"bot_token": "token", "chat_id": 123}},
}
)
monkeypatch.setattr(cli, "load_settings", lambda: (settings, Path("x")))
runner = CliRunner()
result = runner.invoke(cli.create_app(), ["doctor"])
assert result.exit_code == 1
assert "telegram transport only" in result.output
+73
View File
@@ -1,9 +1,13 @@
from pathlib import Path
import pytest
from typer.testing import CliRunner
from takopi import cli
from takopi.config import ConfigError
from takopi.settings import TakopiSettings
from takopi.settings import TelegramTopicsSettings
from takopi.telegram.api_models import Chat, User
def _settings() -> TakopiSettings:
@@ -50,3 +54,72 @@ def test_doctor_errors_exit_nonzero(monkeypatch) -> None:
assert result.exit_code == 1
assert "telegram token: error" in result.output
class _FakeBot:
def __init__(self, me: User | None, chat: Chat | None) -> None:
self._me = me
self._chat = chat
self.closed = False
async def get_me(self) -> User | None:
return self._me
async def get_chat(self, chat_id: int) -> Chat | None:
_ = chat_id
return self._chat
async def close(self) -> None:
self.closed = True
@pytest.mark.anyio
async def test_doctor_telegram_checks_invalid_token(monkeypatch) -> None:
bot = _FakeBot(me=None, chat=None)
monkeypatch.setattr(cli, "TelegramClient", lambda _token: bot)
topics = TelegramTopicsSettings(enabled=True)
checks = await cli._doctor_telegram_checks(
"token",
123,
topics,
(),
)
assert [check.label for check in checks] == [
"telegram token",
"chat_id",
"topics",
]
assert checks[0].status == "error"
assert checks[1].detail == "skipped (token invalid)"
assert checks[2].detail == "skipped (token invalid)"
assert bot.closed is True
@pytest.mark.anyio
async def test_doctor_telegram_checks_chat_and_topics_error(monkeypatch) -> None:
bot = _FakeBot(
me=User(id=1, username="bot", first_name=None, last_name=None),
chat=None,
)
monkeypatch.setattr(cli, "TelegramClient", lambda _token: bot)
async def _raise_topics(*_args, **_kwargs) -> None:
raise ConfigError("bad topics")
monkeypatch.setattr(cli, "_validate_topics_setup_for", _raise_topics)
topics = TelegramTopicsSettings(enabled=True)
checks = await cli._doctor_telegram_checks(
"token",
321,
topics,
(),
)
assert checks[0].detail == "@bot"
assert checks[1].status == "error"
assert "unreachable" in (checks[1].detail or "")
assert checks[2].detail == "bad topics"
assert bot.closed is True
+177
View File
@@ -0,0 +1,177 @@
from __future__ import annotations
import pytest
from takopi import cli
from takopi.config import ConfigError
from takopi.lockfile import LockError
from takopi.settings import TakopiSettings
def _settings(overrides: dict | None = None) -> TakopiSettings:
payload = {
"transport": "telegram",
"transports": {"telegram": {"bot_token": "token", "chat_id": 123}},
}
if overrides:
payload.update(overrides)
return TakopiSettings.model_validate(payload)
def test_parse_key_path_valid() -> None:
assert cli._parse_key_path("transports.telegram.chat_id") == [
"transports",
"telegram",
"chat_id",
]
def test_parse_key_path_invalid_segment() -> None:
with pytest.raises(ConfigError):
cli._parse_key_path("transports..chat_id")
def test_parse_value_toml_and_fallback() -> None:
assert cli._parse_value("true") is True
assert cli._parse_value("123") == 123
assert cli._parse_value("not-toml") == "not-toml"
def test_toml_literal_and_error() -> None:
assert cli._toml_literal("hello") == '"hello"'
with pytest.raises(ConfigError):
cli._toml_literal({"a": 1})
def test_flatten_config() -> None:
flattened = cli._flatten_config(
{"transports": {"telegram": {"chat_id": 123}}, "watch_config": True}
)
assert ("transports.telegram.chat_id", 123) in flattened
assert ("watch_config", True) in flattened
def test_normalized_value_from_settings() -> None:
settings = _settings()
assert cli._normalized_value_from_settings(settings, ["transport"]) == "telegram"
assert (
cli._normalized_value_from_settings(
settings, ["transports", "telegram", "chat_id"]
)
== 123
)
def test_should_run_interactive(monkeypatch) -> None:
class _Tty:
def isatty(self) -> bool:
return True
class _NotTty:
def isatty(self) -> bool:
return False
monkeypatch.setenv("TAKOPI_NO_INTERACTIVE", "1")
assert cli._should_run_interactive() is False
monkeypatch.delenv("TAKOPI_NO_INTERACTIVE")
monkeypatch.setattr(cli.sys, "stdin", _Tty())
monkeypatch.setattr(cli.sys, "stdout", _Tty())
assert cli._should_run_interactive() is True
monkeypatch.setattr(cli.sys, "stdin", _NotTty())
monkeypatch.setattr(cli.sys, "stdout", _Tty())
assert cli._should_run_interactive() is False
def test_resolve_transport_id_override(monkeypatch) -> None:
assert cli._resolve_transport_id(" telegram ") == "telegram"
with pytest.raises(ConfigError):
cli._resolve_transport_id(" ")
def _raise() -> None:
raise ConfigError("boom")
monkeypatch.setattr(cli, "load_or_init_config", _raise)
assert cli._resolve_transport_id(None) == "telegram"
def test_doctor_file_checks() -> None:
settings = _settings()
checks = cli._doctor_file_checks(settings)
assert checks[0].detail == "disabled"
settings = _settings(
{
"transports": {
"telegram": {
"bot_token": "token",
"chat_id": 1,
"files": {"enabled": True},
}
}
}
)
checks = cli._doctor_file_checks(settings)
assert checks[0].status == "warning"
def test_doctor_voice_checks(monkeypatch) -> None:
settings = _settings()
checks = cli._doctor_voice_checks(settings)
assert checks[0].detail == "disabled"
settings = _settings(
{
"transports": {
"telegram": {
"bot_token": "token",
"chat_id": 1,
"voice_transcription": True,
}
}
}
)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
checks = cli._doctor_voice_checks(settings)
assert checks[0].status == "error"
monkeypatch.setenv("OPENAI_API_KEY", "key")
checks = cli._doctor_voice_checks(settings)
assert checks[0].status == "ok"
def test_load_settings_optional(monkeypatch, tmp_path) -> None:
def _raise() -> None:
raise ConfigError("boom")
monkeypatch.setattr(cli, "load_settings_if_exists", _raise)
assert cli._load_settings_optional() == (None, None)
monkeypatch.setattr(cli, "load_settings_if_exists", lambda: None)
assert cli._load_settings_optional() == (None, None)
settings = _settings()
config_path = tmp_path / "takopi.toml"
monkeypatch.setattr(cli, "load_settings_if_exists", lambda: (settings, config_path))
assert cli._load_settings_optional() == (settings, config_path)
def test_acquire_config_lock_reports_error(monkeypatch, tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
error = LockError(path=config_path, state="running")
def _raise(*_args, **_kwargs):
raise error
messages: list[tuple[str, bool]] = []
monkeypatch.setattr(cli, "acquire_lock", _raise)
monkeypatch.setattr(
cli.typer, "echo", lambda msg, err=False: messages.append((msg, err))
)
with pytest.raises(cli.typer.Exit) as exc:
cli.acquire_config_lock(config_path, "token")
assert exc.value.exit_code == 1
assert any("already running" in msg for msg, _ in messages)
+204
View File
@@ -0,0 +1,204 @@
from __future__ import annotations
from pathlib import Path
import pytest
from takopi.backends import EngineConfig
from takopi.config import ConfigError
from takopi.events import EventFactory
from takopi.model import ActionEvent, CompletedEvent, StartedEvent
from takopi.runners.codex import (
CodexRunner,
_format_change_summary,
_normalize_change_list,
_parse_reconnect_message,
_short_tool_name,
_summarize_todo_list,
_summarize_tool_result,
_todo_title,
build_runner,
find_exec_only_flag,
translate_codex_event,
)
from takopi.schemas import codex as codex_schema
def test_codex_helper_functions() -> None:
assert find_exec_only_flag(["--json"]) == "--json"
assert find_exec_only_flag(["--output-schema=foo"]) == "--output-schema=foo"
assert find_exec_only_flag(["--model", "gpt-4"]) is None
assert _parse_reconnect_message("Reconnecting... 2/5") == (2, 5)
assert _parse_reconnect_message("Reconnecting... x/y") is None
assert _parse_reconnect_message("nope") is None
assert _short_tool_name("docs", "search") == "docs.search"
assert _short_tool_name(None, "search") == "search"
assert _short_tool_name(None, None) == "tool"
summary = _summarize_tool_result({"content": ["hi"], "structured": {"ok": True}})
assert summary == {"content_blocks": 1, "has_structured": True}
summary = _summarize_tool_result({"content": "hello", "structured_content": None})
assert summary == {"content_blocks": 1, "has_structured": False}
assert _summarize_tool_result({"other": 1}) is None
changes = [
codex_schema.FileUpdateChange(path="a.txt", kind="update"),
{"path": "b.txt", "kind": "delete"},
{"path": ""},
]
assert _normalize_change_list(changes) == [
{"path": "a.txt", "kind": "update"},
{"path": "b.txt", "kind": "delete"},
]
assert _format_change_summary(changes) == "a.txt, b.txt"
assert _format_change_summary([{"path": ""}]) == "1 files"
def test_summarize_todo_list_and_title() -> None:
items = [
codex_schema.TodoItem(text="first", completed=True),
codex_schema.TodoItem(text="next", completed=False),
{"text": "later", "completed": False},
]
summary = _summarize_todo_list(items)
assert summary.done == 1
assert summary.total == 3
assert summary.next_text == "next"
assert _todo_title(summary) == "todo 1/3: next"
done_summary = _summarize_todo_list([{"text": "done", "completed": True}])
assert _todo_title(done_summary) == "todo 1/1: done"
assert _todo_title(_summarize_todo_list("nope")) == "todo"
def test_translate_codex_events_for_items() -> None:
factory = EventFactory("codex")
event = codex_schema.ItemStarted(
item=codex_schema.WebSearchItem(id="w1", query="query")
)
out = translate_codex_event(event, title="Codex", factory=factory)
assert len(out) == 1
assert isinstance(out[0], ActionEvent)
assert out[0].action.kind == "web_search"
assert out[0].phase == "started"
event = codex_schema.ItemCompleted(
item=codex_schema.WebSearchItem(id="w1", query="query")
)
out = translate_codex_event(event, title="Codex", factory=factory)
assert isinstance(out[0], ActionEvent)
assert out[0].phase == "completed"
assert out[0].ok is True
event = codex_schema.ItemStarted(
item=codex_schema.ReasoningItem(id="r1", text="thinking")
)
out = translate_codex_event(event, title="Codex", factory=factory)
assert isinstance(out[0], ActionEvent)
assert out[0].action.kind == "note"
assert out[0].action.title == "thinking"
event = codex_schema.ItemUpdated(
item=codex_schema.TodoListItem(
id="t1",
items=[
codex_schema.TodoItem(text="todo one", completed=False),
codex_schema.TodoItem(text="todo two", completed=True),
],
)
)
out = translate_codex_event(event, title="Codex", factory=factory)
assert isinstance(out[0], ActionEvent)
assert out[0].action.detail["done"] == 1
assert out[0].action.detail["total"] == 2
assert "todo 1/2" in out[0].action.title
started = codex_schema.ItemStarted(
item=codex_schema.ErrorItem(id="e1", message="boom")
)
assert translate_codex_event(started, title="Codex", factory=factory) == []
completed = codex_schema.ItemCompleted(
item=codex_schema.ErrorItem(id="e1", message="boom")
)
out = translate_codex_event(completed, title="Codex", factory=factory)
assert isinstance(out[0], ActionEvent)
assert out[0].action.kind == "warning"
assert out[0].ok is False
def test_translate_codex_thread_started() -> None:
factory = EventFactory("codex")
event = codex_schema.ThreadStarted(thread_id="sess-1")
out = translate_codex_event(event, title="Codex", factory=factory)
assert len(out) == 1
assert isinstance(out[0], StartedEvent)
assert out[0].resume.value == "sess-1"
def test_codex_runner_translate_reconnect_message() -> None:
runner = CodexRunner(codex_cmd="codex", extra_args=[])
state = runner.new_state("hi", None)
event = codex_schema.StreamError(message="Reconnecting... 2/3")
out = runner.translate(event, state=state, resume=None, found_session=None)
assert len(out) == 1
assert isinstance(out[0], ActionEvent)
assert out[0].phase == "updated"
assert out[0].action.detail["attempt"] == 2
assert out[0].action.detail["max"] == 3
def test_codex_runner_process_and_stream_end_events() -> None:
runner = CodexRunner(codex_cmd="codex", extra_args=[])
state = runner.new_state("hi", None)
out = runner.process_error_events(2, resume=None, found_session=None, state=state)
assert len(out) == 2
completed = out[-1]
assert isinstance(completed, CompletedEvent)
assert completed.ok is False
end = runner.stream_end_events(resume=None, found_session=None, state=state)
assert len(end) == 1
end_event = end[0]
assert isinstance(end_event, CompletedEvent)
assert end_event.ok is False
started = translate_codex_event(
codex_schema.ThreadStarted(thread_id="sess-2"),
title="Codex",
factory=EventFactory("codex"),
)[0]
assert isinstance(started, StartedEvent)
end = runner.stream_end_events(
resume=None,
found_session=started.resume,
state=state,
)
end_event = end[0]
assert isinstance(end_event, CompletedEvent)
assert end_event.ok is True
def test_codex_build_runner_configs(tmp_path: Path) -> None:
cfg: EngineConfig = {}
runner = build_runner(cfg, tmp_path)
assert isinstance(runner, CodexRunner)
assert runner.extra_args == ["-c", "notify=[]"]
cfg = {"extra_args": ["--foo"], "profile": "Demo"}
runner = build_runner(cfg, tmp_path)
assert isinstance(runner, CodexRunner)
assert runner.extra_args[-2:] == ["--profile", "Demo"]
assert runner.session_title == "Demo"
with pytest.raises(ConfigError):
build_runner({"extra_args": ["--json"]}, tmp_path)
with pytest.raises(ConfigError):
build_runner({"extra_args": ["--foo", 1]}, tmp_path)
with pytest.raises(ConfigError):
build_runner({"profile": 123}, tmp_path)
+9 -9
View File
@@ -16,7 +16,7 @@ from tests.factories import action_completed, action_started
CODEX_ENGINE = "codex"
class _FakeTransport:
class FakeTransport:
def __init__(self) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
@@ -181,7 +181,7 @@ def test_prepare_telegram_preserves_entities_on_truncate() -> None:
@pytest.mark.anyio
async def test_final_notify_sends_loud_final_message() -> None:
transport = _FakeTransport()
transport = FakeTransport()
runner = _return_runner(answer="ok")
cfg = ExecBridgeConfig(
transport=transport,
@@ -203,7 +203,7 @@ async def test_final_notify_sends_loud_final_message() -> None:
@pytest.mark.anyio
async def test_handle_message_strips_resume_line_from_prompt() -> None:
transport = _FakeTransport()
transport = FakeTransport()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
cfg = ExecBridgeConfig(
transport=transport,
@@ -228,7 +228,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
@pytest.mark.anyio
async def test_long_final_message_edits_progress_message() -> None:
transport = _FakeTransport()
transport = FakeTransport()
runner = _return_runner(answer="x" * 10_000)
cfg = ExecBridgeConfig(
transport=transport,
@@ -251,7 +251,7 @@ async def test_long_final_message_edits_progress_message() -> None:
@pytest.mark.anyio
async def test_progress_edits_are_best_effort() -> None:
transport = _FakeTransport()
transport = FakeTransport()
clock = _FakeClock()
events: list[TakopiEvent] = [
action_started("item_0", "command", "echo 1"),
@@ -288,7 +288,7 @@ async def test_progress_edits_are_best_effort() -> None:
@pytest.mark.anyio
async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
transport = _FakeTransport()
transport = FakeTransport()
clock = _FakeClock()
events: list[TakopiEvent] = [
action_started("item_0", "command", "echo ok"),
@@ -336,7 +336,7 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
@pytest.mark.anyio
async def test_final_message_includes_ctx_line() -> None:
transport = _FakeTransport()
transport = FakeTransport()
clock = _FakeClock()
session_id = "123e4567-e89b-12d3-a456-426614174000"
runner = ScriptRunner(
@@ -367,7 +367,7 @@ async def test_final_message_includes_ctx_line() -> None:
@pytest.mark.anyio
async def test_handle_message_cancelled_renders_cancelled_state() -> None:
transport = _FakeTransport()
transport = FakeTransport()
session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
hold = anyio.Event()
runner = ScriptRunner(
@@ -414,7 +414,7 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
@pytest.mark.anyio
async def test_handle_message_error_preserves_resume_token() -> None:
transport = _FakeTransport()
transport = FakeTransport()
session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
runner = ScriptRunner(
[Raise(RuntimeError("boom"))],
+76
View File
@@ -1,5 +1,7 @@
from pathlib import Path
import subprocess
from takopi.utils.git import git_is_worktree, git_ok, git_run, git_stdout
from takopi.utils.git import resolve_default_base, resolve_main_worktree_root
@@ -53,3 +55,77 @@ def test_resolve_default_base_prefers_master_over_main(monkeypatch) -> None:
monkeypatch.setattr("takopi.utils.git.git_stdout", _fake_stdout)
monkeypatch.setattr("takopi.utils.git.git_ok", _fake_ok)
assert resolve_default_base(Path("/repo")) == "master"
def test_resolve_default_base_uses_origin_head(monkeypatch) -> None:
def _fake_stdout(args, **kwargs):
if args[:2] == ["symbolic-ref", "-q"]:
return "refs/remotes/origin/main"
return None
monkeypatch.setattr("takopi.utils.git.git_stdout", _fake_stdout)
assert resolve_default_base(Path("/repo")) == "main"
def test_resolve_default_base_uses_current_branch(monkeypatch) -> None:
def _fake_stdout(args, **kwargs):
if args[:2] == ["symbolic-ref", "-q"]:
return None
if args == ["branch", "--show-current"]:
return "feature"
return None
monkeypatch.setattr("takopi.utils.git.git_stdout", _fake_stdout)
assert resolve_default_base(Path("/repo")) == "feature"
def test_git_run_handles_missing_git(monkeypatch) -> None:
def _raise(*_args, **_kwargs):
raise FileNotFoundError
monkeypatch.setattr("takopi.utils.git.subprocess.run", _raise)
assert git_run(["status"], cwd=Path("/repo")) is None
def test_git_stdout_returns_none_on_error(monkeypatch) -> None:
def _fake_run(*_args, **_kwargs):
return subprocess.CompletedProcess(
args=["git"],
returncode=1,
stdout="oops",
stderr="",
)
monkeypatch.setattr("takopi.utils.git._run_git", _fake_run)
assert git_stdout(["status"], cwd=Path("/repo")) is None
def test_git_ok_false_when_run_missing(monkeypatch) -> None:
monkeypatch.setattr("takopi.utils.git._run_git", lambda *_args, **_kwargs: None)
assert git_ok(["status"], cwd=Path("/repo")) is False
def test_git_stdout_returns_trimmed_output(monkeypatch) -> None:
def _fake_run(*_args, **_kwargs):
return subprocess.CompletedProcess(
args=["git"],
returncode=0,
stdout=" ok \n",
stderr="",
)
monkeypatch.setattr("takopi.utils.git._run_git", _fake_run)
assert git_stdout(["status"], cwd=Path("/repo")) == "ok"
def test_git_is_worktree_false_when_no_top(monkeypatch) -> None:
monkeypatch.setattr("takopi.utils.git.git_stdout", lambda *_a, **_k: None)
assert git_is_worktree(Path("/repo")) is False
def test_git_is_worktree_matches_path(monkeypatch) -> None:
monkeypatch.setattr(
"takopi.utils.git.git_stdout",
lambda *_a, **_k: "/repo",
)
assert git_is_worktree(Path("/repo")) is True
+481
View File
@@ -0,0 +1,481 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, cast
import pytest
from rich.table import Table
from rich.text import Text
from takopi.config import ConfigError
from takopi.telegram import onboarding
from takopi.telegram.api_models import Update, User
from takopi.telegram.client import TelegramRetryAfter
class DummyUI:
def __init__(
self,
*,
confirms: list[bool | None] | None = None,
selects: list[Any] | None = None,
passwords: list[str | None] | None = None,
) -> None:
self.confirms = iter(confirms or [])
self.selects = iter(selects or [])
self.passwords = iter(passwords or [])
self.printed: list[object] = []
self.steps: list[tuple[str, int]] = []
def panel(
self, title: str | None, body: str, *, border_style: str = "yellow"
) -> None:
self.printed.append(("panel", title, body, border_style))
def step(self, title: str, *, number: int) -> None:
self.steps.append((title, number))
def print(self, text: object = "", *, markup: bool | None = None) -> None:
_ = markup
self.printed.append(text)
async def confirm(self, _prompt: str, default: bool = True) -> bool | None:
_ = default
return next(self.confirms)
async def select(self, _prompt: str, choices: list[tuple[str, Any]]) -> Any | None:
_ = choices
return next(self.selects)
async def password(self, _prompt: str) -> str | None:
return next(self.passwords)
class DummyServices:
def __init__(
self,
*,
bot_info: list[User | None] | None = None,
chat: onboarding.ChatInfo | None = None,
topics_issue: ConfigError | None = None,
engines: list[tuple[str, bool, str | None]] | None = None,
config: dict[str, Any] | None = None,
) -> None:
self.bot_info = iter(bot_info or [])
self.chat = chat
self.topics_issue = topics_issue
self.engines = engines or []
self.config = config or {}
self.writes: list[tuple[Path, dict[str, Any]]] = []
async def get_bot_info(self, _token: str) -> User | None:
return next(self.bot_info)
async def wait_for_chat(self, _token: str) -> onboarding.ChatInfo:
assert self.chat is not None
return self.chat
async def validate_topics(
self, _token: str, _chat_id: int, _scope: onboarding.TopicScope
) -> ConfigError | None:
return self.topics_issue
def list_engines(self) -> list[tuple[str, bool, str | None]]:
return list(self.engines)
def read_config(self, _path: Path) -> dict[str, Any]:
return dict(self.config)
def write_config(self, path: Path, data: dict[str, Any]) -> None:
self.writes.append((path, data))
def test_chat_info_display_and_kind() -> None:
chat = onboarding.ChatInfo(
chat_id=1,
username="alice",
title=None,
first_name=None,
last_name=None,
chat_type="private",
)
assert chat.display == "@alice"
assert chat.kind == "private chat"
group = onboarding.ChatInfo(
chat_id=2,
username=None,
title="Team",
first_name=None,
last_name=None,
chat_type="supergroup",
)
assert group.display == 'group "Team"'
assert group.kind == 'supergroup "Team"'
channel = onboarding.ChatInfo(
chat_id=3,
username=None,
title=None,
first_name=None,
last_name=None,
chat_type="channel",
)
assert channel.display == "channel"
assert channel.kind == "channel"
unnamed = onboarding.ChatInfo(
chat_id=4,
username=None,
title=None,
first_name="Ada",
last_name="Lovelace",
chat_type=None,
)
assert unnamed.display == "Ada Lovelace"
assert unnamed.kind == "private chat"
def test_onboarding_state_helpers(tmp_path: Path) -> None:
state = onboarding.OnboardingState(config_path=tmp_path / "cfg", force=False)
assert state.is_stateful is False
assert state.bot_ref == "your bot"
state.session_mode = "chat"
assert state.is_stateful is True
state.session_mode = "stateless"
state.topics_enabled = True
assert state.is_stateful is True
state.bot_name = "Takopi"
assert state.bot_ref == "Takopi"
state.bot_username = "takopi_bot"
assert state.bot_ref == "@takopi_bot"
def test_display_path(tmp_path: Path) -> None:
home_path = Path.home() / "takopi" / "cfg.toml"
assert onboarding.display_path(home_path).startswith("~/")
assert onboarding.display_path(tmp_path / "cfg.toml") == str(tmp_path / "cfg.toml")
def test_build_transport_patch_requires_fields(tmp_path: Path) -> None:
state = onboarding.OnboardingState(config_path=tmp_path / "cfg", force=False)
with pytest.raises(RuntimeError, match="missing chat"):
onboarding.build_transport_patch(state, bot_token="x")
state.chat = onboarding.ChatInfo(
chat_id=1,
username=None,
title=None,
first_name=None,
last_name=None,
chat_type="private",
)
with pytest.raises(RuntimeError, match="missing session mode"):
onboarding.build_transport_patch(state, bot_token="x")
state.session_mode = "chat"
with pytest.raises(RuntimeError, match="missing resume choice"):
onboarding.build_transport_patch(state, bot_token="x")
def test_build_config_patch_and_merge(tmp_path: Path) -> None:
state = onboarding.OnboardingState(config_path=tmp_path / "cfg", force=False)
state.chat = onboarding.ChatInfo(
chat_id=10,
username=None,
title=None,
first_name=None,
last_name=None,
chat_type="private",
)
state.session_mode = "chat"
state.show_resume_line = False
state.default_engine = "codex"
state.topics_enabled = True
state.topics_scope = "all"
patch = onboarding.build_config_patch(state, bot_token="token")
assert patch["default_engine"] == "codex"
merged = onboarding.merge_config(
{"bot_token": "old", "chat_id": 1, "transports": {}},
patch,
config_path=tmp_path / "cfg.toml",
)
assert merged["transport"] == "telegram"
assert merged["transports"]["telegram"]["bot_token"] == "token"
assert merged["transports"]["telegram"]["topics"]["enabled"] is True
assert "bot_token" not in merged
assert "chat_id" not in merged
def test_render_helpers() -> None:
text = onboarding.render_botfather_instructions()
assert "BotFather" in text.plain
assert "send /newbot" in text.plain
topics = onboarding.render_topics_group_instructions("@bot")
assert "topics" in topics.plain
generic = onboarding.render_generic_capture_prompt("@bot")
assert "send /start" in generic.plain
warning = onboarding.render_topics_validation_warning(ConfigError("boom"))
assert "warning" in warning.plain
config_warning = onboarding.render_config_malformed_warning(ConfigError("bad"))
assert "config is malformed" in config_warning.plain
backup_warning = onboarding.render_backup_failed_warning(OSError("nope"))
assert "failed to back up" in backup_warning.plain
tabs = onboarding.render_persona_tabs()
assert isinstance(tabs, Table)
preview = onboarding.render_workspace_preview()
assert "memory-box" in preview.plain
assistant = onboarding.render_assistant_preview()
assert "make happy wings fit" in assistant.plain
handoff = onboarding.render_handoff_preview()
assert "resume" in handoff.plain
convo = Text()
onboarding.append_dialogue(convo, "bot", "hello", speaker_style="cyan")
assert "hello" in convo.plain
def test_render_engine_table_prints() -> None:
ui = DummyUI()
onboarding.render_engine_table(
cast(onboarding.UI, ui),
[("codex", True, None), ("other", False, "brew install other")],
)
assert ui.printed
@pytest.mark.anyio
async def test_get_bot_info_retries(monkeypatch) -> None:
class _Bot:
def __init__(self, _token: str) -> None:
self.calls = 0
self.closed = False
async def get_me(self) -> User | None:
self.calls += 1
if self.calls < 3:
raise TelegramRetryAfter(0)
return User(id=1, username="bot")
async def close(self) -> None:
self.closed = True
async def _sleep(_seconds: float) -> None:
return None
monkeypatch.setattr(onboarding, "TelegramClient", _Bot)
monkeypatch.setattr(onboarding.anyio, "sleep", _sleep)
info = await onboarding.get_bot_info("token")
assert info is not None
assert info.username == "bot"
@pytest.mark.anyio
async def test_get_bot_info_gives_up(monkeypatch) -> None:
class _Bot:
def __init__(self, _token: str) -> None:
self.calls = 0
async def get_me(self) -> User | None:
self.calls += 1
raise TelegramRetryAfter(0)
async def close(self) -> None:
return None
async def _sleep(_seconds: float) -> None:
return None
monkeypatch.setattr(onboarding, "TelegramClient", _Bot)
monkeypatch.setattr(onboarding.anyio, "sleep", _sleep)
info = await onboarding.get_bot_info("token")
assert info is None
@pytest.mark.anyio
async def test_wait_for_chat_filters_updates(monkeypatch) -> None:
updates = [
[Update(update_id=1, message={"from": {"is_bot": True}, "chat": {"id": 1}})],
None,
[],
[Update(update_id=2, message=None)],
[
Update(
update_id=3,
message={"from": {"is_bot": True}, "chat": {"id": 2}},
)
],
[
Update(
update_id=4,
message={"from": {"is_bot": False}, "chat": "nope"},
)
],
[
Update(
update_id=5,
message={"from": {"is_bot": False}, "chat": {"id": "bad"}},
)
],
[
Update(
update_id=6,
message={
"from": {"is_bot": False},
"chat": {"id": 7, "username": "bob", "type": "private"},
},
)
],
]
class _Bot:
def __init__(self, _token: str) -> None:
self.calls = 0
async def get_updates(self, *args, **kwargs):
_ = args, kwargs
idx = self.calls
self.calls += 1
return updates[idx]
async def close(self) -> None:
return None
async def _sleep(_seconds: float) -> None:
return None
monkeypatch.setattr(onboarding, "TelegramClient", _Bot)
monkeypatch.setattr(onboarding.anyio, "sleep", _sleep)
chat = await onboarding.wait_for_chat("token")
assert chat.chat_id == 7
assert chat.username == "bob"
@pytest.mark.anyio
async def test_validate_topics_onboarding_errors(monkeypatch) -> None:
class _Bot:
def __init__(self, _token: str) -> None:
return None
async def close(self) -> None:
return None
async def _raise_config(*_args, **_kwargs):
raise ConfigError("bad")
async def _raise_other(*_args, **_kwargs):
raise RuntimeError("boom")
monkeypatch.setattr(onboarding, "TelegramClient", _Bot)
monkeypatch.setattr(onboarding, "_validate_topics_setup_for", _raise_config)
err = await onboarding.validate_topics_onboarding("token", 1, "auto", ())
assert isinstance(err, ConfigError)
monkeypatch.setattr(onboarding, "_validate_topics_setup_for", _raise_other)
err = await onboarding.validate_topics_onboarding("token", 1, "auto", ())
assert isinstance(err, ConfigError)
assert "topics validation failed" in str(err)
@pytest.mark.anyio
async def test_prompt_token_success_and_retry() -> None:
ui = DummyUI(confirms=[True], passwords=["", "token", "token"]) # empty then retry
svc = DummyServices(
bot_info=[None, User(id=1, username="bot")],
)
token, info = await onboarding.prompt_token(
cast(onboarding.UI, ui), cast(onboarding.Services, svc)
)
assert token == "token"
assert info.username == "bot"
@pytest.mark.anyio
async def test_prompt_token_cancel_on_failure() -> None:
ui = DummyUI(confirms=[False], passwords=["token"])
svc = DummyServices(bot_info=[None])
with pytest.raises(onboarding.OnboardingCancelled):
await onboarding.prompt_token(
cast(onboarding.UI, ui), cast(onboarding.Services, svc)
)
@pytest.mark.anyio
async def test_capture_chat_sets_state() -> None:
chat = onboarding.ChatInfo(
chat_id=5,
username=None,
title="Team",
first_name=None,
last_name=None,
chat_type="group",
)
ui = DummyUI()
svc = DummyServices(chat=chat)
state = onboarding.OnboardingState(config_path=Path("/tmp/cfg"), force=False)
state.token = "token"
await onboarding.capture_chat(
cast(onboarding.UI, ui), cast(onboarding.Services, svc), state
)
assert state.chat == chat
@pytest.mark.anyio
async def test_step_capture_chat_workspace_switches_to_assistant(
tmp_path: Path,
) -> None:
chat = onboarding.ChatInfo(
chat_id=6,
username=None,
title=None,
first_name="Ada",
last_name=None,
chat_type="private",
)
ui = DummyUI(selects=["assistant"])
svc = DummyServices(chat=chat, topics_issue=ConfigError("nope"))
state = onboarding.OnboardingState(config_path=tmp_path / "cfg", force=False)
state.token = "token"
state.bot_name = "Takopi"
state.persona = "workspace"
state.topics_scope = "auto"
state.topics_enabled = True
await onboarding.step_capture_chat(
cast(onboarding.UI, ui), cast(onboarding.Services, svc), state
)
assert state.persona == "assistant"
assert state.topics_enabled is False
@pytest.mark.anyio
async def test_step_default_engine_no_installed(tmp_path: Path) -> None:
ui = DummyUI(confirms=[False])
svc = DummyServices(engines=[("codex", False, None)])
state = onboarding.OnboardingState(config_path=tmp_path / "cfg", force=False)
with pytest.raises(onboarding.OnboardingCancelled):
await onboarding.step_default_engine(
cast(onboarding.UI, ui), cast(onboarding.Services, svc), state
)
+9
View File
@@ -1,9 +1,18 @@
from collections.abc import Iterator
import pytest
from takopi import plugins
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
@pytest.fixture(autouse=True)
def _reset_plugin_state() -> Iterator[None]:
plugins.reset_plugin_state()
yield
plugins.reset_plugin_state()
def test_list_ids_does_not_load_entrypoints(monkeypatch) -> None:
calls = {"count": 0}
@@ -0,0 +1,225 @@
from dataclasses import replace
from pathlib import Path
import pytest
from takopi.telegram.api_models import ChatMember
from takopi.telegram.commands.agent import _handle_agent_command
from takopi.telegram.commands.trigger import _handle_trigger_command
from takopi.telegram.chat_prefs import ChatPrefsStore
from takopi.telegram.topic_state import TopicStateStore
from takopi.telegram.types import TelegramIncomingMessage
from takopi.settings import TelegramTopicsSettings
from tests.telegram_fakes import FakeBot, FakeTransport, make_cfg
def _msg(
text: str,
*,
chat_id: int = 123,
message_id: int = 10,
sender_id: int | None = 42,
chat_type: str | None = "private",
thread_id: int | None = None,
) -> TelegramIncomingMessage:
return TelegramIncomingMessage(
transport="telegram",
chat_id=chat_id,
message_id=message_id,
text=text,
reply_to_message_id=None,
reply_to_text=None,
sender_id=sender_id,
chat_type=chat_type,
thread_id=thread_id,
)
def _last_text(transport: FakeTransport) -> str:
assert transport.send_calls
return transport.send_calls[-1]["message"].text
class _MemberBot(FakeBot):
async def get_chat_member(self, chat_id: int, user_id: int) -> ChatMember | None:
_ = chat_id, user_id
return ChatMember(status="member", can_manage_topics=False)
@pytest.mark.anyio
async def test_agent_show_private_defaults() -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
msg = _msg("/agent", chat_type="private")
await _handle_agent_command(
cfg,
msg,
args_text="",
ambient_context=None,
topic_store=None,
chat_prefs=None,
)
text = _last_text(transport)
assert "agent: codex" in text
assert "available: codex" in text
@pytest.mark.anyio
async def test_agent_set_clear_group_admin(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
prefs = ChatPrefsStore(tmp_path / "prefs.json")
msg = _msg("/agent set codex", chat_type="supergroup")
await _handle_agent_command(
cfg,
msg,
args_text="set codex",
ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_default_engine(msg.chat_id) == "codex"
assert "chat default agent set" in _last_text(transport)
await _handle_agent_command(
cfg,
msg,
args_text="clear",
ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_default_engine(msg.chat_id) is None
assert "chat default agent cleared" in _last_text(transport)
@pytest.mark.anyio
async def test_agent_set_denied_for_non_admin(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = replace(make_cfg(transport), bot=_MemberBot())
prefs = ChatPrefsStore(tmp_path / "prefs.json")
msg = _msg("/agent set codex", chat_type="supergroup")
await _handle_agent_command(
cfg,
msg,
args_text="set codex",
ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_default_engine(msg.chat_id) is None
assert "restricted to group admins" in _last_text(transport)
@pytest.mark.anyio
async def test_agent_set_invalid_engine(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
msg = _msg("/agent set nope", chat_type="private")
await _handle_agent_command(
cfg,
msg,
args_text="set nope",
ambient_context=None,
topic_store=None,
chat_prefs=ChatPrefsStore(tmp_path / "prefs.json"),
)
text = _last_text(transport)
assert "unknown engine" in text
assert "available agents" in text
@pytest.mark.anyio
@pytest.mark.parametrize(
("topic_mode", "chat_mode", "expected_source", "expected_trigger"),
[
("mentions", None, "topic override", "mentions"),
(None, "mentions", "chat default", "mentions"),
(None, None, "default", "all"),
],
)
async def test_trigger_show_sources(
tmp_path: Path,
topic_mode: str | None,
chat_mode: str | None,
expected_source: str,
expected_trigger: str,
) -> None:
transport = FakeTransport()
cfg = replace(
make_cfg(transport),
topics=TelegramTopicsSettings(enabled=True, scope="all"),
)
topic_store = TopicStateStore(tmp_path / "topics.json")
chat_prefs = ChatPrefsStore(tmp_path / "prefs.json")
msg = _msg("/trigger", chat_type="supergroup", thread_id=7)
if topic_mode is not None:
await topic_store.set_trigger_mode(msg.chat_id, msg.thread_id or 0, topic_mode)
if chat_mode is not None:
await chat_prefs.set_trigger_mode(msg.chat_id, chat_mode)
await _handle_trigger_command(
cfg,
msg,
args_text="",
_ambient_context=None,
topic_store=topic_store,
chat_prefs=chat_prefs,
)
text = _last_text(transport)
assert f"trigger: {expected_trigger} ({expected_source})" in text
assert "available: all, mentions" in text
@pytest.mark.anyio
async def test_trigger_set_clear_permissions(tmp_path: Path) -> None:
transport = FakeTransport()
prefs = ChatPrefsStore(tmp_path / "prefs.json")
msg = _msg("/trigger mentions", chat_type="supergroup")
denied_cfg = replace(make_cfg(transport), bot=_MemberBot())
await _handle_trigger_command(
denied_cfg,
msg,
args_text="mentions",
_ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_trigger_mode(msg.chat_id) is None
assert "restricted to group admins" in _last_text(transport)
transport = FakeTransport()
allow_cfg = make_cfg(transport)
await _handle_trigger_command(
allow_cfg,
msg,
args_text="mentions",
_ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_trigger_mode(msg.chat_id) == "mentions"
assert "chat trigger mode set" in _last_text(transport)
await _handle_trigger_command(
allow_cfg,
msg,
args_text="clear",
_ambient_context=None,
topic_store=None,
chat_prefs=prefs,
)
assert await prefs.get_trigger_mode(msg.chat_id) is None
assert "chat trigger mode reset" in _last_text(transport)
+107 -367
View File
@@ -14,15 +14,7 @@ from takopi.telegram.commands.topics import _handle_topic_command
import takopi.telegram.loop as telegram_loop
import takopi.telegram.topics as telegram_topics
from takopi.directives import parse_directives
from takopi.telegram.api_models import (
Chat,
ChatMember,
File,
ForumTopic,
Message,
Update,
User,
)
from takopi.telegram.api_models import File, ForumTopic, Message, Update, User
from takopi.settings import TelegramFilesSettings, TelegramTopicsSettings
from takopi.telegram.bridge import (
TelegramBridgeConfig,
@@ -59,6 +51,13 @@ from takopi.telegram.types import (
)
from takopi.transport import MessageRef, RenderedMessage, SendOptions
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
from tests.telegram_fakes import (
FakeBot,
FakeTransport,
_empty_projects,
make_cfg,
_make_router,
)
CODEX_ENGINE = "codex"
FAST_FORWARD_COALESCE_S = 0.0
@@ -67,271 +66,12 @@ BATCH_MEDIA_GROUP_DEBOUNCE_S = 0.05
DEBOUNCE_FORWARD_COALESCE_S = 0.05
def _empty_projects() -> ProjectsConfig:
return ProjectsConfig(projects={}, default_project=None)
def _make_router(runner) -> AutoRouter:
return AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
class _NoopTaskGroup:
def start_soon(self, func, *args: Any) -> None:
_ = func, args
return None
class _FakeTransport:
def __init__(self, progress_ready: anyio.Event | None = None) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[MessageRef] = []
self.progress_ready = progress_ready
self.progress_ref: MessageRef | None = None
async def send(
self,
*,
channel_id: int | str,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef:
ref = MessageRef(channel_id=channel_id, message_id=self._next_id)
self._next_id += 1
self.send_calls.append(
{
"ref": ref,
"channel_id": channel_id,
"message": message,
"options": options,
}
)
if (
self.progress_ref is None
and options is not None
and options.reply_to is not None
and options.notify is False
):
self.progress_ref = ref
if self.progress_ready is not None:
self.progress_ready.set()
return ref
async def edit(
self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True
) -> MessageRef:
self.edit_calls.append({"ref": ref, "message": message, "wait": wait})
return ref
async def delete(self, *, ref: MessageRef) -> bool:
self.delete_calls.append(ref)
return True
async def close(self) -> None:
return None
class _FakeBot(BotClient):
def __init__(self) -> None:
self.command_calls: list[dict] = []
self.callback_calls: list[dict] = []
self.send_calls: list[dict] = []
self.document_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.edit_topic_calls: list[dict[str, Any]] = []
self.delete_calls: list[dict] = []
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[Update] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
return []
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
message_thread_id: int | None = None,
entities: list[dict[str, Any]] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
replace_message_id: int | None = None,
) -> Message:
self.send_calls.append(
{
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
"disable_notification": disable_notification,
"message_thread_id": message_thread_id,
"entities": entities,
"parse_mode": parse_mode,
"reply_markup": reply_markup,
"replace_message_id": replace_message_id,
}
)
return Message(message_id=1)
async def send_document(
self,
chat_id: int,
filename: str,
content: bytes,
reply_to_message_id: int | None = None,
message_thread_id: int | None = None,
disable_notification: bool | None = False,
caption: str | None = None,
) -> Message:
self.document_calls.append(
{
"chat_id": chat_id,
"filename": filename,
"content": content,
"reply_to_message_id": reply_to_message_id,
"message_thread_id": message_thread_id,
"disable_notification": disable_notification,
"caption": caption,
}
)
return Message(message_id=2)
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict[str, Any]] | None = None,
parse_mode: str | None = None,
reply_markup: dict | None = None,
*,
wait: bool = True,
) -> Message:
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
"reply_markup": reply_markup,
"wait": wait,
}
)
return Message(message_id=message_id)
async def delete_message(self, chat_id: int, message_id: int) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
async def set_my_commands(
self,
commands: list[dict[str, Any]],
*,
scope: dict[str, Any] | None = None,
language_code: str | None = None,
) -> bool:
self.command_calls.append(
{
"commands": commands,
"scope": scope,
"language_code": language_code,
}
)
return True
async def get_me(self) -> User | None:
return User(id=1, username="bot")
async def get_chat(self, chat_id: int) -> Chat | None:
_ = chat_id
return Chat(id=chat_id, type="supergroup", is_forum=True)
async def get_chat_member(self, chat_id: int, user_id: int) -> ChatMember | None:
_ = chat_id
_ = user_id
return ChatMember(status="administrator", can_manage_topics=True)
async def create_forum_topic(self, chat_id: int, name: str) -> ForumTopic | None:
_ = chat_id
_ = name
return ForumTopic(message_thread_id=1)
async def edit_forum_topic(
self, chat_id: int, message_thread_id: int, name: str
) -> bool:
self.edit_topic_calls.append(
{
"chat_id": chat_id,
"message_thread_id": message_thread_id,
"name": name,
}
)
return True
async def close(self) -> None:
return None
async def answer_callback_query(
self,
callback_query_id: str,
text: str | None = None,
show_alert: bool | None = None,
) -> bool:
self.callback_calls.append(
{
"callback_query_id": callback_query_id,
"text": text,
"show_alert": show_alert,
}
)
return True
def _make_cfg(
transport: _FakeTransport, runner: ScriptRunner | None = None
) -> TelegramBridgeConfig:
if runner is None:
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=_empty_projects(),
)
return TelegramBridgeConfig(
bot=_FakeBot(),
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
forward_coalesce_s=FAST_FORWARD_COALESCE_S,
media_group_debounce_s=FAST_MEDIA_GROUP_DEBOUNCE_S,
)
def test_parse_directives_inline_engine() -> None:
directives = parse_directives(
"/claude do it",
@@ -547,7 +287,7 @@ def test_telegram_presenter_split_overflow_adds_followups() -> None:
@pytest.mark.anyio
async def test_telegram_transport_passes_replace_and_wait() -> None:
bot = _FakeBot()
bot = FakeBot()
transport = TelegramTransport(bot)
reply = MessageRef(channel_id=123, message_id=10)
replace = MessageRef(channel_id=123, message_id=11)
@@ -571,7 +311,7 @@ async def test_telegram_transport_passes_replace_and_wait() -> None:
@pytest.mark.anyio
async def test_telegram_transport_passes_reply_markup() -> None:
bot = _FakeBot()
bot = FakeBot()
transport = TelegramTransport(bot)
markup = {"inline_keyboard": []}
@@ -593,7 +333,7 @@ async def test_telegram_transport_passes_reply_markup() -> None:
@pytest.mark.anyio
async def test_telegram_transport_sends_followups() -> None:
bot = _FakeBot()
bot = FakeBot()
transport = TelegramTransport(bot)
reply = MessageRef(channel_id=123, message_id=10)
followup = RenderedMessage(text="part 2")
@@ -614,7 +354,7 @@ async def test_telegram_transport_sends_followups() -> None:
@pytest.mark.anyio
async def test_telegram_transport_edits_and_sends_followups() -> None:
bot = _FakeBot()
bot = FakeBot()
transport = TelegramTransport(bot)
followup = RenderedMessage(text="part 2")
@@ -772,8 +512,8 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
@pytest.mark.anyio
async def test_handle_cancel_without_reply_prompts_user() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
@@ -793,8 +533,8 @@ async def test_handle_cancel_without_reply_prompts_user() -> None:
@pytest.mark.anyio
async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
@@ -814,8 +554,8 @@ async def test_handle_cancel_with_no_progress_message_says_nothing_running() ->
@pytest.mark.anyio
async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
progress_id = 99
msg = TelegramIncomingMessage(
transport="telegram",
@@ -836,8 +576,8 @@ async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
@pytest.mark.anyio
async def test_handle_cancel_cancels_running_task() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
progress_id = 42
msg = TelegramIncomingMessage(
transport="telegram",
@@ -859,8 +599,8 @@ async def test_handle_cancel_cancels_running_task() -> None:
@pytest.mark.anyio
async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
task_first = RunningTask()
task_second = RunningTask()
msg = TelegramIncomingMessage(
@@ -886,8 +626,8 @@ async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
@pytest.mark.anyio
async def test_handle_cancel_cancels_queued_job() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
async def _noop_run_job(_) -> None:
return None
@@ -924,7 +664,7 @@ async def test_handle_cancel_cancels_queued_job() -> None:
async def test_handle_file_put_writes_file(tmp_path: Path) -> None:
payload = b"hello"
class _FileBot(_FakeBot):
class _FileBot(FakeBot):
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return File(file_path="files/hello.txt")
@@ -933,7 +673,7 @@ async def test_handle_file_put_writes_file(tmp_path: Path) -> None:
_ = file_path
return payload
transport = _FakeTransport()
transport = FakeTransport()
bot = _FileBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
projects = ProjectsConfig(
@@ -998,8 +738,8 @@ async def test_handle_file_get_sends_document_for_allowed_user(
target = tmp_path / "hello.txt"
target.write_bytes(payload)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
projects = ProjectsConfig(
projects={
@@ -1050,8 +790,8 @@ async def test_handle_file_get_sends_document_for_allowed_user(
@pytest.mark.anyio
async def test_handle_callback_cancel_cancels_running_task() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
progress_id = 42
running_task = RunningTask()
running_tasks = {MessageRef(channel_id=123, message_id=progress_id): running_task}
@@ -1068,15 +808,15 @@ async def test_handle_callback_cancel_cancels_running_task() -> None:
assert running_task.cancel_requested.is_set() is True
assert len(transport.send_calls) == 0
bot = cast(_FakeBot, cfg.bot)
bot = cast(FakeBot, cfg.bot)
assert bot.callback_calls
assert bot.callback_calls[-1]["text"] == "cancelling..."
@pytest.mark.anyio
async def test_handle_callback_cancel_cancels_queued_job() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
async def _noop_run_job(_) -> None:
return None
@@ -1105,15 +845,15 @@ async def test_handle_callback_cancel_cancels_queued_job() -> None:
assert transport.edit_calls
assert "cancelled" in transport.edit_calls[0]["message"].text.lower()
bot = cast(_FakeBot, cfg.bot)
bot = cast(FakeBot, cfg.bot)
assert bot.callback_calls
assert bot.callback_calls[-1]["text"] == "dropped from queue."
@pytest.mark.anyio
async def test_handle_callback_cancel_without_task_acknowledges() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
query = TelegramCallbackQuery(
transport="telegram",
chat_id=123,
@@ -1126,7 +866,7 @@ async def test_handle_callback_cancel_without_task_acknowledges() -> None:
await handle_callback_cancel(cfg, query, {})
assert len(transport.send_calls) == 0
bot = cast(_FakeBot, cfg.bot)
bot = cast(FakeBot, cfg.bot)
assert bot.callback_calls
assert "nothing is currently running" in bot.callback_calls[-1]["text"].lower()
@@ -1176,8 +916,8 @@ def test_is_forwarded_detects_forward_fields() -> None:
def test_topic_title_matches_command_syntax() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
title = telegram_topics._topic_title(
runtime=cfg.runtime,
@@ -1202,9 +942,9 @@ def test_topic_title_matches_command_syntax() -> None:
def test_topic_title_projects_scope_includes_project() -> None:
transport = _FakeTransport()
transport = FakeTransport()
cfg = replace(
_make_cfg(transport),
make_cfg(transport),
topics=TelegramTopicsSettings(
enabled=True,
scope="projects",
@@ -1221,8 +961,8 @@ def test_topic_title_projects_scope_includes_project() -> None:
@pytest.mark.anyio
async def test_maybe_rename_topic_updates_title(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
store = TopicStateStore(tmp_path / "telegram_topics_state.json")
await store.set_context(
@@ -1240,7 +980,7 @@ async def test_maybe_rename_topic_updates_title(tmp_path: Path) -> None:
context=RunContext(project="takopi", branch="new"),
)
bot = cast(_FakeBot, cfg.bot)
bot = cast(FakeBot, cfg.bot)
assert bot.edit_topic_calls
assert bot.edit_topic_calls[-1]["name"] == "takopi @new"
snapshot = await store.get_thread(123, 77)
@@ -1250,8 +990,8 @@ async def test_maybe_rename_topic_updates_title(tmp_path: Path) -> None:
@pytest.mark.anyio
async def test_maybe_rename_topic_skips_when_title_matches(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
store = TopicStateStore(tmp_path / "telegram_topics_state.json")
await store.set_context(
@@ -1271,13 +1011,13 @@ async def test_maybe_rename_topic_skips_when_title_matches(tmp_path: Path) -> No
snapshot=snapshot,
)
bot = cast(_FakeBot, cfg.bot)
bot = cast(FakeBot, cfg.bot)
assert bot.edit_topic_calls == []
@pytest.mark.anyio
async def test_topic_command_recreates_stale_topic(tmp_path: Path) -> None:
class _StaleTopicBot(_FakeBot):
class _StaleTopicBot(FakeBot):
def __init__(self) -> None:
super().__init__()
self.create_topic_calls: list[dict[str, Any]] = []
@@ -1300,7 +1040,7 @@ async def test_topic_command_recreates_stale_topic(tmp_path: Path) -> None:
)
return False
transport = _FakeTransport()
transport = FakeTransport()
bot = _StaleTopicBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
projects = ProjectsConfig(
@@ -1365,8 +1105,8 @@ async def test_topic_command_recreates_stale_topic(tmp_path: Path) -> None:
@pytest.mark.anyio
async def test_model_command_show_reports_overrides(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
cfg = replace(cfg, topics=TelegramTopicsSettings(enabled=True, scope="main"))
chat_prefs = ChatPrefsStore(tmp_path / "telegram_chat_prefs_state.json")
topic_store = TopicStateStore(tmp_path / "telegram_topics_state.json")
@@ -1412,8 +1152,8 @@ async def test_model_command_show_reports_overrides(tmp_path: Path) -> None:
@pytest.mark.anyio
async def test_model_command_set_and_clear_chat_override(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
chat_prefs = ChatPrefsStore(tmp_path / "telegram_chat_prefs_state.json")
await chat_prefs.set_engine_override(
123,
@@ -1472,8 +1212,8 @@ async def test_model_command_set_and_clear_chat_override(tmp_path: Path) -> None
@pytest.mark.anyio
async def test_reasoning_command_set_and_clear_topic_override(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
cfg = replace(cfg, topics=TelegramTopicsSettings(enabled=True, scope="main"))
topic_store = TopicStateStore(tmp_path / "telegram_topics_state.json")
await topic_store.set_engine_override(
@@ -1542,8 +1282,8 @@ async def test_reasoning_command_set_and_clear_topic_override(tmp_path: Path) ->
@pytest.mark.anyio
async def test_reasoning_command_show_reports_overrides(tmp_path: Path) -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
cfg = replace(cfg, topics=TelegramTopicsSettings(enabled=True, scope="main"))
chat_prefs = ChatPrefsStore(tmp_path / "telegram_chat_prefs_state.json")
topic_store = TopicStateStore(tmp_path / "telegram_topics_state.json")
@@ -1589,8 +1329,8 @@ async def test_reasoning_command_show_reports_overrides(tmp_path: Path) -> None:
@pytest.mark.anyio
async def test_send_with_resume_waits_for_token() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
sent: list[
tuple[
int,
@@ -1664,8 +1404,8 @@ async def test_send_with_resume_waits_for_token() -> None:
@pytest.mark.anyio
async def test_send_with_resume_reports_when_missing() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
transport = FakeTransport()
cfg = make_cfg(transport)
sent: list[
tuple[
int,
@@ -1766,8 +1506,8 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
reply_ready = anyio.Event()
hold = anyio.Event()
transport = _FakeTransport(progress_ready=progress_ready)
bot = _FakeBot()
transport = FakeTransport(progress_ready=progress_ready)
bot = FakeBot()
resume_value = "abc123"
runner = ScriptRunner(
[Wait(hold), Sleep(0.05), Return(answer="ok")],
@@ -1845,8 +1585,8 @@ async def test_run_main_loop_persists_topic_sessions_in_project_scope(
project_chat_id = -100
resume_value = "resume-123"
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -1924,8 +1664,8 @@ async def test_run_main_loop_auto_resumes_topic_default_engine(
)
await store.set_default_engine(123, 77, "claude")
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
codex_runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
claude_runner = ScriptRunner([Return(answer="ok")], engine="claude")
router = AutoRouter(
@@ -1996,8 +1736,8 @@ async def test_run_main_loop_auto_resumes_chat_sessions(tmp_path: Path) -> None:
resume_value = "resume-123"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -2096,7 +1836,7 @@ async def test_run_main_loop_prompt_upload_uses_caption_directives(
proj_dir.mkdir()
other_dir.mkdir()
class _UploadBot(_FakeBot):
class _UploadBot(FakeBot):
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return File(file_path="files/hello.txt")
@@ -2105,7 +1845,7 @@ async def test_run_main_loop_prompt_upload_uses_caption_directives(
_ = file_path
return payload
transport = _FakeTransport()
transport = FakeTransport()
bot = _UploadBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
@@ -2188,14 +1928,14 @@ async def test_run_main_loop_voice_transcript_preserves_directive(
default_engine=claude_runner.engine,
)
runtime = TransportRuntime(router=router, projects=_empty_projects())
transport = _FakeTransport()
transport = FakeTransport()
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
cfg = TelegramBridgeConfig(
bot=_FakeBot(),
bot=FakeBot(),
runtime=runtime,
chat_id=123,
startup_msg="",
@@ -2259,14 +1999,14 @@ async def test_run_main_loop_debounces_forwarded_messages_preserves_directives()
default_engine=claude_runner.engine,
)
runtime = TransportRuntime(router=router, projects=_empty_projects())
transport = _FakeTransport()
transport = FakeTransport()
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
cfg = TelegramBridgeConfig(
bot=_FakeBot(),
bot=FakeBot(),
runtime=runtime,
chat_id=123,
startup_msg="",
@@ -2329,14 +2069,14 @@ async def test_run_main_loop_debounces_forwarded_messages_preserves_directives()
async def test_run_main_loop_ignores_forwarded_without_prompt() -> None:
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
runtime = TransportRuntime(router=_make_router(runner), projects=_empty_projects())
transport = _FakeTransport()
transport = FakeTransport()
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
cfg = TelegramBridgeConfig(
bot=_FakeBot(),
bot=FakeBot(),
runtime=runtime,
chat_id=123,
startup_msg="",
@@ -2378,7 +2118,7 @@ async def test_run_main_loop_forwarded_document_still_uploads(
) -> None:
payload = b"hello"
class _UploadBot(_FakeBot):
class _UploadBot(FakeBot):
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return File(file_path="files/hello.txt")
@@ -2399,7 +2139,7 @@ async def test_run_main_loop_forwarded_document_still_uploads(
default_project="proj",
)
runtime = TransportRuntime(router=_make_router(runner), projects=projects)
transport = _FakeTransport()
transport = FakeTransport()
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
@@ -2460,7 +2200,7 @@ async def test_run_main_loop_prompt_upload_auto_resumes_chat_sessions(
project_dir = tmp_path / "proj"
project_dir.mkdir()
class _UploadBot(_FakeBot):
class _UploadBot(FakeBot):
async def get_file(self, file_id: str) -> File | None:
_ = file_id
return File(file_path="files/hello.txt")
@@ -2481,7 +2221,7 @@ async def test_run_main_loop_prompt_upload_auto_resumes_chat_sessions(
)
bot = _UploadBot()
transport = _FakeTransport()
transport = FakeTransport()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -2538,7 +2278,7 @@ async def test_run_main_loop_prompt_upload_auto_resumes_chat_sessions(
stored = await store.get_session_resume(123, None, CODEX_ENGINE)
assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
transport2 = _FakeTransport()
transport2 = FakeTransport()
runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg2 = ExecBridgeConfig(
transport=transport2,
@@ -2619,8 +2359,8 @@ async def test_run_main_loop_command_updates_chat_session_resume(
resume_value = "resume-123"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -2666,7 +2406,7 @@ async def test_run_main_loop_command_updates_chat_session_resume(
stored = await store.get_session_resume(123, None, CODEX_ENGINE)
assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
transport2 = _FakeTransport()
transport2 = FakeTransport()
runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg2 = ExecBridgeConfig(
transport=transport2,
@@ -2717,8 +2457,8 @@ async def test_run_main_loop_hides_resume_line_when_disabled(
resume_value = "resume-123"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -2782,8 +2522,8 @@ async def test_run_main_loop_chat_sessions_isolate_group_senders(
resume_value = "resume-group"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
@@ -2866,8 +2606,8 @@ async def test_run_main_loop_new_clears_chat_sessions(tmp_path: Path) -> None:
123, None, ResumeToken(engine=CODEX_ENGINE, value="resume-1")
)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
@@ -2916,8 +2656,8 @@ async def test_run_main_loop_new_clears_topic_sessions(tmp_path: Path) -> None:
123, 77, ResumeToken(engine=CODEX_ENGINE, value="resume-1")
)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
@@ -2962,8 +2702,8 @@ async def test_run_main_loop_new_clears_topic_sessions(tmp_path: Path) -> None:
@pytest.mark.anyio
async def test_run_main_loop_replies_in_same_thread() -> None:
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
@@ -3020,7 +2760,7 @@ async def test_run_main_loop_batches_media_group_upload(
"doc-2": "photos/file_2.jpg",
}
class _MediaBot(_FakeBot):
class _MediaBot(FakeBot):
async def get_file(self, file_id: str) -> File | None:
file_path = file_map.get(file_id)
if file_path is None:
@@ -3030,7 +2770,7 @@ async def test_run_main_loop_batches_media_group_upload(
async def download_file(self, file_path: str) -> bytes | None:
return payloads.get(file_path)
transport = _FakeTransport()
transport = FakeTransport()
bot = _MediaBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
projects = ProjectsConfig(
@@ -3144,8 +2884,8 @@ async def test_run_main_loop_handles_command_plugins(monkeypatch) -> None:
]
install_entrypoints(monkeypatch, entrypoints)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
@@ -3212,8 +2952,8 @@ async def test_run_main_loop_command_uses_project_default_engine(
]
install_entrypoints(monkeypatch, entrypoints)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
codex_runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
pi_runner = ScriptRunner([Return(answer="ok")], engine="pi")
router = AutoRouter(
@@ -3296,8 +3036,8 @@ async def test_run_main_loop_command_defaults_to_chat_project(
]
install_entrypoints(monkeypatch, entrypoints)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
codex_runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
pi_runner = ScriptRunner([Return(answer="ok")], engine="pi")
router = AutoRouter(
@@ -3387,8 +3127,8 @@ async def test_run_main_loop_refreshes_command_ids(monkeypatch) -> None:
monkeypatch.setattr(telegram_loop, "list_command_ids", _list_command_ids)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
@@ -3447,8 +3187,8 @@ async def test_run_main_loop_mentions_only_skips_voice_and_files(
telegram_loop, "_handle_file_put_default", fake_handle_file_put_default
)
transport = _FakeTransport()
bot = _FakeBot()
transport = FakeTransport()
bot = FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
+174
View File
@@ -0,0 +1,174 @@
import httpx
import pytest
from takopi.telegram.client_api import (
HttpBotClient,
TelegramRetryAfter,
retry_after_from_payload,
)
from takopi.telegram.api_models import User
def _response() -> httpx.Response:
request = httpx.Request("POST", "https://example.com")
return httpx.Response(200, request=request)
def test_retry_after_from_payload() -> None:
assert retry_after_from_payload({}) is None
assert retry_after_from_payload({"parameters": {"retry_after": 2}}) == 2.0
def test_parse_envelope_invalid_payload() -> None:
client = HttpBotClient("token", http_client=httpx.AsyncClient())
assert (
client._parse_telegram_envelope(
method="sendMessage",
resp=_response(),
payload="nope",
)
is None
)
def test_parse_envelope_rate_limited() -> None:
client = HttpBotClient("token", http_client=httpx.AsyncClient())
payload = {"ok": False, "error_code": 429, "parameters": {"retry_after": 1}}
with pytest.raises(TelegramRetryAfter) as exc:
client._parse_telegram_envelope(
method="sendMessage",
resp=_response(),
payload=payload,
)
assert exc.value.retry_after == 1.0
def test_parse_envelope_api_error() -> None:
client = HttpBotClient("token", http_client=httpx.AsyncClient())
payload = {"ok": False, "error_code": 400, "description": "boom"}
assert (
client._parse_telegram_envelope(
method="sendMessage",
resp=_response(),
payload=payload,
)
is None
)
def test_parse_envelope_ok() -> None:
client = HttpBotClient("token", http_client=httpx.AsyncClient())
payload = {"ok": True, "result": {"message_id": 1}}
assert client._parse_telegram_envelope(
method="sendMessage",
resp=_response(),
payload=payload,
) == {"message_id": 1}
@pytest.mark.anyio
async def test_client_methods_build_params_and_decode() -> None:
payloads = {
"getUpdates": [{"update_id": 1}],
"getFile": {"file_path": "path"},
"sendMessage": {"message_id": 1},
"sendDocument": {"message_id": 2},
"editMessageText": {"message_id": 3},
"deleteMessage": True,
"setMyCommands": True,
"getMe": {"id": 7},
"answerCallbackQuery": True,
"getChat": {"id": 5, "type": "private"},
"getChatMember": {"status": "member"},
"createForumTopic": {"message_thread_id": 11},
"editForumTopic": True,
}
class _StubClient(HttpBotClient):
def __init__(self) -> None:
super().__init__("token", http_client=httpx.AsyncClient())
self.calls: list[tuple[str, dict | None, dict | None, dict | None]] = []
async def _request(
self,
method: str,
*,
json: dict | None = None,
data: dict | None = None,
files: dict | None = None,
) -> object | None:
self.calls.append((method, json, data, files))
return payloads.get(method)
client = _StubClient()
updates = await client.get_updates(offset=10, allowed_updates=["message"])
assert updates and updates[0].update_id == 1
assert await client.get_file("file") is not None
msg = await client.send_message(
1,
"hi",
reply_to_message_id=2,
disable_notification=True,
message_thread_id=3,
entities=[{"type": "bold", "offset": 0, "length": 2}],
parse_mode="Markdown",
reply_markup={"inline_keyboard": []},
)
assert msg and msg.message_id == 1
doc = await client.send_document(
1,
"file.txt",
b"data",
reply_to_message_id=2,
message_thread_id=3,
disable_notification=True,
caption="doc",
)
assert doc and doc.message_id == 2
edit = await client.edit_message_text(
1,
2,
"edit",
entities=[{"type": "italic", "offset": 0, "length": 4}],
parse_mode="Markdown",
reply_markup={"inline_keyboard": []},
)
assert edit and edit.message_id == 3
assert await client.delete_message(1, 2) is True
assert await client.set_my_commands(
[{"command": "ping", "description": "pong"}],
scope={"type": "chat"},
language_code="en",
)
assert await client.answer_callback_query("cb", text="ok", show_alert=True) is True
assert await client.get_chat(1) is not None
assert await client.get_chat_member(1, 2) is not None
assert await client.create_forum_topic(1, "topic") is not None
assert await client.edit_forum_topic(1, 2, "topic") is True
await client.close()
send_call = next(call for call in client.calls if call[0] == "sendMessage")
assert send_call[1]["disable_notification"] is True
assert send_call[1]["reply_to_message_id"] == 2
assert send_call[1]["message_thread_id"] == 3
assert send_call[1]["entities"]
assert send_call[1]["parse_mode"] == "Markdown"
assert send_call[1]["reply_markup"]
doc_call = next(call for call in client.calls if call[0] == "sendDocument")
assert doc_call[2]["caption"] == "doc"
assert doc_call[3]["document"][0] == "file.txt"
@pytest.mark.anyio
async def test_decode_result_invalid_payload_returns_none() -> None:
client = HttpBotClient("token", http_client=httpx.AsyncClient())
assert client._decode_result(method="getMe", payload=["bad"], model=User) is None
await client.close()
+167
View File
@@ -0,0 +1,167 @@
from dataclasses import replace
from pathlib import Path
from takopi.config import ProjectConfig, ProjectsConfig
from takopi.context import RunContext
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.mock import Return, ScriptRunner
from takopi.telegram import context as tg_context
from takopi.telegram.topic_state import TopicThreadSnapshot
from takopi.transport_runtime import TransportRuntime
from tests.telegram_fakes import DEFAULT_ENGINE_ID, FakeTransport, make_cfg
def _runtime(tmp_path: Path) -> TransportRuntime:
runner = ScriptRunner([Return(answer="ok")], engine=DEFAULT_ENGINE_ID)
router = AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
projects = ProjectsConfig(
projects={
"alpha": ProjectConfig(
alias="Alpha",
path=tmp_path,
worktrees_dir=Path(".worktrees"),
),
"beta": ProjectConfig(
alias="Beta",
path=tmp_path / "beta",
worktrees_dir=Path(".worktrees"),
),
},
default_project="alpha",
chat_map={123: "alpha"},
)
return TransportRuntime(router=router, projects=projects)
def _cfg(tmp_path: Path):
transport = FakeTransport()
return replace(make_cfg(transport), runtime=_runtime(tmp_path))
def test_format_context_variants(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
assert tg_context._format_context(runtime, None) == "none"
assert tg_context._format_context(runtime, RunContext(project="alpha")) == "Alpha"
assert (
tg_context._format_context(runtime, RunContext(project="alpha", branch="dev"))
== "Alpha @dev"
)
def test_usage_helpers() -> None:
assert (
tg_context._usage_ctx_set(chat_project=None)
== "usage: `/ctx set <project> [@branch]`"
)
assert (
tg_context._usage_ctx_set(chat_project="alpha") == "usage: `/ctx set [@branch]`"
)
assert (
tg_context._usage_topic(chat_project=None)
== "usage: `/topic <project> @branch`"
)
assert tg_context._usage_topic(chat_project="alpha") == "usage: `/topic @branch`"
def test_parse_project_branch_args_missing_project(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
context, error = tg_context._parse_project_branch_args(
"",
runtime=runtime,
require_branch=False,
chat_project=None,
)
assert context is None
assert error == "usage: `/ctx set <project> [@branch]`"
def test_parse_project_branch_args_requires_branch(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
context, error = tg_context._parse_project_branch_args(
"alpha",
runtime=runtime,
require_branch=True,
chat_project=None,
)
assert context is None
assert error == "branch is required"
def test_parse_project_branch_args_chat_project_mismatch(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
context, error = tg_context._parse_project_branch_args(
"beta @dev",
runtime=runtime,
require_branch=True,
chat_project="alpha",
)
assert context is None
assert error is not None
assert "project mismatch" in error
assert "Alpha" in error
def test_parse_project_branch_args_missing_at_prefix(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
context, error = tg_context._parse_project_branch_args(
"alpha dev",
runtime=runtime,
require_branch=False,
chat_project=None,
)
assert context is None
assert error == "branch must be prefixed with @"
def test_parse_project_branch_args_chat_project_branch_only(tmp_path: Path) -> None:
runtime = _runtime(tmp_path)
context, error = tg_context._parse_project_branch_args(
"@feature",
runtime=runtime,
require_branch=True,
chat_project="alpha",
)
assert error is None
assert context == RunContext(project="alpha", branch="feature")
def test_format_ctx_status_includes_sessions(tmp_path: Path) -> None:
cfg = _cfg(tmp_path)
runtime = cfg.runtime
snapshot = TopicThreadSnapshot(
chat_id=cfg.chat_id,
thread_id=1,
context=None,
sessions={"b": "token", "a": "token2"},
topic_title=None,
default_engine=None,
)
text = tg_context._format_ctx_status(
cfg=cfg,
runtime=runtime,
bound=None,
resolved=RunContext(project="alpha", branch="main"),
context_source="directives",
snapshot=snapshot,
chat_project=None,
)
assert "topics: enabled" in text
assert "bound ctx: none" in text
assert "resolved ctx: Alpha @main" in text
assert "note: unbound topic" in text
assert "sessions: a, b" in text
def test_merge_topic_context() -> None:
assert tg_context._merge_topic_context(chat_project=None, bound=None) is None
assert tg_context._merge_topic_context(
chat_project="alpha",
bound=None,
) == RunContext(project="alpha", branch=None)
assert tg_context._merge_topic_context(
chat_project="alpha",
bound=RunContext(project=None, branch="dev"),
) == RunContext(project="alpha", branch="dev")
File diff suppressed because it is too large Load Diff
+68
View File
@@ -6,6 +6,7 @@ from pathlib import Path
import pytest
from takopi.telegram import files as tg_files
from takopi.telegram.files import ZipTooLargeError, zip_directory
@@ -41,3 +42,70 @@ def test_zip_directory_limits_size(tmp_path: Path) -> None:
with pytest.raises(ZipTooLargeError):
zip_directory(root, Path("dir"), deny_globs=(), max_bytes=10)
def test_split_command_args_falls_back_on_bad_quotes() -> None:
assert tg_files.split_command_args('bad "quote') == ("bad", '"quote')
def test_parse_file_command_unknown_command() -> None:
command, rest, error = tg_files.parse_file_command("nope arg")
assert command is None
assert rest == "arg"
assert error == tg_files.file_usage()
def test_parse_file_prompt_errors() -> None:
path, force, error = tg_files.parse_file_prompt("--wat", allow_empty=False)
assert path is None
assert force is False
assert error == "unknown flag: --wat"
path, force, error = tg_files.parse_file_prompt("", allow_empty=False)
assert path is None
assert force is False
assert error == "missing path"
def test_parse_file_prompt_force_flag() -> None:
path, force, error = tg_files.parse_file_prompt(
"--force note.txt", allow_empty=False
)
assert path == "note.txt"
assert force is True
assert error is None
def test_normalize_relative_path_rejects_invalid() -> None:
for value in ("", " ", "~/.ssh", "/etc/passwd", "../secret", ".git/config"):
assert tg_files.normalize_relative_path(value) is None
def test_normalize_relative_path_rejects_dot_only() -> None:
assert tg_files.normalize_relative_path("./") is None
def test_normalize_relative_path_strips_dots() -> None:
assert tg_files.normalize_relative_path("docs/./guide.txt") == Path(
"docs/guide.txt"
)
def test_resolve_path_within_root_rejects_escape(tmp_path: Path) -> None:
assert tg_files.resolve_path_within_root(tmp_path, Path("../escape")) is None
def test_deny_reason_matches_patterns() -> None:
assert tg_files.deny_reason(Path(".git/config"), ["**/*.pem"]) == ".git/**"
assert tg_files.deny_reason(Path("secrets/key.pem"), ["**/*.pem"]) == "**/*.pem"
def test_format_bytes_various_units() -> None:
assert tg_files.format_bytes(0) == "0 b"
assert tg_files.format_bytes(1536) == "1.5 kb"
assert tg_files.format_bytes(20480) == "20 kb"
def test_default_upload_name_fallbacks() -> None:
assert tg_files.default_upload_name("", "files/report.txt") == "report.txt"
assert tg_files.default_upload_name(None, None) == "upload.bin"
+212
View File
@@ -0,0 +1,212 @@
from dataclasses import replace
from pathlib import Path
import pytest
from takopi.context import RunContext
from takopi.settings import TelegramFilesSettings
from takopi.telegram.commands import media as media_commands
from takopi.telegram.commands.file_transfer import _FilePutResult, _SavedFilePutGroup
from takopi.telegram.types import TelegramDocument, TelegramIncomingMessage
from takopi.transport_runtime import ResolvedMessage
from tests.telegram_fakes import FakeTransport, make_cfg
def _msg(
text: str,
*,
message_id: int = 1,
chat_id: int = 123,
document: TelegramDocument | None = None,
) -> TelegramIncomingMessage:
return TelegramIncomingMessage(
transport="telegram",
chat_id=chat_id,
message_id=message_id,
text=text,
reply_to_message_id=None,
reply_to_text=None,
sender_id=1,
document=document,
)
@pytest.mark.anyio
async def test_media_group_empty_is_noop() -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
await media_commands._handle_media_group(cfg, [], topic_store=None)
assert transport.send_calls == []
@pytest.mark.anyio
async def test_media_group_file_command_reports_usage() -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
msg = _msg("/file")
await media_commands._handle_media_group(cfg, [msg], topic_store=None)
assert transport.send_calls
text = transport.send_calls[-1]["message"].text
assert "usage: /file put <path>" in text
assert "or /file get <path>" in text
@pytest.mark.anyio
async def test_media_group_file_put_delegates(monkeypatch) -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
msg = _msg("/file put uploads/")
calls: dict[str, int] = {"count": 0}
async def _fake_handle(*_args, **_kwargs) -> None:
calls["count"] += 1
monkeypatch.setattr(media_commands, "_handle_file_put_group", _fake_handle)
await media_commands._handle_media_group(cfg, [msg], topic_store=None)
assert calls["count"] == 1
@pytest.mark.anyio
async def test_media_group_auto_put_without_caption_delegates(monkeypatch) -> None:
transport = FakeTransport()
cfg = replace(make_cfg(transport), files=TelegramFilesSettings(enabled=True))
msg = _msg("")
calls: dict[str, int] = {"count": 0}
async def _fake_handle(*_args, **_kwargs) -> None:
calls["count"] += 1
monkeypatch.setattr(media_commands, "_handle_file_put_group", _fake_handle)
await media_commands._handle_media_group(cfg, [msg], topic_store=None)
assert calls["count"] == 1
@pytest.mark.anyio
async def test_media_group_auto_put_prompt_resolve_none(monkeypatch) -> None:
transport = FakeTransport()
files = TelegramFilesSettings(enabled=True, auto_put=True, auto_put_mode="prompt")
cfg = replace(make_cfg(transport), files=files)
msg = _msg("caption")
async def _resolve_prompt(*_args, **_kwargs):
return None
monkeypatch.setattr(media_commands, "_save_file_put_group", lambda *_a, **_k: None)
await media_commands._handle_media_group(
cfg,
[msg],
topic_store=None,
resolve_prompt=_resolve_prompt,
)
assert transport.send_calls == []
@pytest.mark.anyio
async def test_media_group_auto_put_prompt_runs_prompt(monkeypatch) -> None:
transport = FakeTransport()
files = TelegramFilesSettings(enabled=True, auto_put=True, auto_put_mode="prompt")
cfg = replace(make_cfg(transport), files=files)
msg = _msg("caption")
resolved = ResolvedMessage(
prompt="do the thing",
resume_token=None,
engine_override=None,
context=RunContext(project="proj"),
context_source="directives",
)
saved_group = _SavedFilePutGroup(
context=resolved.context,
base_dir=None,
saved=[
_FilePutResult(
name="a.txt",
rel_path=Path("incoming/a.txt"),
size=1,
error=None,
)
],
failed=[],
)
prompt_calls: list[str] = []
async def _resolve_prompt(*_args, **_kwargs):
return resolved
async def _save_group(*_args, **_kwargs):
return saved_group
async def _run_prompt(_msg, prompt: str, _resolved: ResolvedMessage) -> None:
prompt_calls.append(prompt)
monkeypatch.setattr(media_commands, "_save_file_put_group", _save_group)
await media_commands._handle_media_group(
cfg,
[msg],
topic_store=None,
resolve_prompt=_resolve_prompt,
run_prompt=_run_prompt,
)
assert prompt_calls
assert "[uploaded files]" in prompt_calls[0]
assert "incoming/a.txt" in prompt_calls[0]
@pytest.mark.anyio
async def test_media_group_auto_put_prompt_saved_failure(monkeypatch) -> None:
transport = FakeTransport()
files = TelegramFilesSettings(enabled=True, auto_put=True, auto_put_mode="prompt")
cfg = replace(make_cfg(transport), files=files)
msg = _msg("caption")
resolved = ResolvedMessage(
prompt="do the thing",
resume_token=None,
engine_override=None,
context=RunContext(project="proj"),
context_source="directives",
)
saved_group = _SavedFilePutGroup(
context=resolved.context,
base_dir=None,
saved=[],
failed=[
_FilePutResult(
name="a.txt",
rel_path=None,
size=None,
error="boom",
)
],
)
async def _resolve_prompt(*_args, **_kwargs):
return resolved
async def _save_group(*_args, **_kwargs):
return saved_group
monkeypatch.setattr(media_commands, "_save_file_put_group", _save_group)
await media_commands._handle_media_group(
cfg,
[msg],
topic_store=None,
resolve_prompt=_resolve_prompt,
)
assert transport.send_calls
text = transport.send_calls[-1]["message"].text
assert "failed to upload files" in text
assert "failed:" in text
assert "boom" in text
+7 -7
View File
@@ -7,7 +7,7 @@ from takopi.telegram.api_models import File, Message, Update, User
from takopi.telegram.client import BotClient, TelegramClient, TelegramRetryAfter
class _FakeBot(BotClient):
class FakeBot(BotClient):
def __init__(self) -> None:
self.calls: list[str] = []
self.edit_calls: list[str] = []
@@ -157,7 +157,7 @@ class _FakeBot(BotClient):
@pytest.mark.anyio
async def test_edit_forum_topic_uses_outbox() -> None:
bot = _FakeBot()
bot = FakeBot()
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
result = await client.edit_forum_topic(
@@ -171,7 +171,7 @@ async def test_edit_forum_topic_uses_outbox() -> None:
@pytest.mark.anyio
async def test_edits_coalesce_latest() -> None:
class _BlockingBot(_FakeBot):
class _BlockingBot(FakeBot):
def __init__(self) -> None:
super().__init__()
self.edit_started = anyio.Event()
@@ -240,7 +240,7 @@ async def test_edits_coalesce_latest() -> None:
@pytest.mark.anyio
async def test_send_preempts_pending_edit() -> None:
bot = _FakeBot()
bot = FakeBot()
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
await client.edit_message_text(
@@ -269,7 +269,7 @@ async def test_send_preempts_pending_edit() -> None:
@pytest.mark.anyio
async def test_delete_drops_pending_edits() -> None:
bot = _FakeBot()
bot = FakeBot()
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
await client.edit_message_text(
@@ -300,7 +300,7 @@ async def test_delete_drops_pending_edits() -> None:
@pytest.mark.anyio
async def test_retry_after_retries_once() -> None:
bot = _FakeBot()
bot = FakeBot()
bot.retry_after = 0.0
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
@@ -317,7 +317,7 @@ async def test_retry_after_retries_once() -> None:
@pytest.mark.anyio
async def test_get_updates_retries_on_retry_after() -> None:
bot = _FakeBot()
bot = FakeBot()
bot.updates_retry_after = 0.0
client = TelegramClient(client=bot, private_chat_rps=0.0, group_chat_rps=0.0)
+131
View File
@@ -0,0 +1,131 @@
from dataclasses import replace
from pathlib import Path
import pytest
from takopi.settings import TelegramTopicsSettings
from takopi.telegram.chat_sessions import ChatSessionStore
from takopi.telegram.commands.topics import (
_handle_chat_new_command,
_handle_ctx_command,
_handle_new_command,
_handle_topic_command,
)
from takopi.telegram.topic_state import TopicStateStore
from takopi.telegram.types import TelegramIncomingMessage
from tests.telegram_fakes import FakeTransport, make_cfg
def _msg(
text: str,
*,
chat_id: int = 123,
message_id: int = 1,
thread_id: int | None = None,
chat_type: str | None = "private",
) -> TelegramIncomingMessage:
return TelegramIncomingMessage(
transport="telegram",
chat_id=chat_id,
message_id=message_id,
text=text,
reply_to_message_id=None,
reply_to_text=None,
sender_id=1,
thread_id=thread_id,
chat_type=chat_type,
)
@pytest.mark.anyio
async def test_ctx_command_requires_topic(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = replace(
make_cfg(transport),
topics=TelegramTopicsSettings(enabled=True, scope="all"),
)
store = TopicStateStore(tmp_path / "topics.json")
msg = _msg("/ctx")
await _handle_ctx_command(
cfg,
msg,
args_text="",
store=store,
resolved_scope="all",
scope_chat_ids=frozenset({msg.chat_id}),
)
text = transport.send_calls[-1]["message"].text
assert "only works inside a topic" in text
@pytest.mark.anyio
async def test_new_command_requires_topic(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = replace(
make_cfg(transport),
topics=TelegramTopicsSettings(enabled=True, scope="all"),
)
store = TopicStateStore(tmp_path / "topics.json")
msg = _msg("/new")
await _handle_new_command(
cfg,
msg,
store=store,
resolved_scope="all",
scope_chat_ids=frozenset({msg.chat_id}),
)
text = transport.send_calls[-1]["message"].text
assert "only works inside a topic" in text
@pytest.mark.anyio
async def test_chat_new_command_no_sessions(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
store = ChatSessionStore(tmp_path / "sessions.json")
msg = _msg("/new", chat_type="private")
await _handle_chat_new_command(cfg, msg, store, session_key=None)
text = transport.send_calls[-1]["message"].text
assert "no stored sessions" in text
@pytest.mark.anyio
async def test_chat_new_command_group_clears(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = make_cfg(transport)
store = ChatSessionStore(tmp_path / "sessions.json")
msg = _msg("/new", chat_type="supergroup")
await _handle_chat_new_command(cfg, msg, store, session_key=(msg.chat_id, 1))
text = transport.send_calls[-1]["message"].text
assert "cleared stored sessions for you in this chat" in text
@pytest.mark.anyio
async def test_topic_command_requires_args(tmp_path: Path) -> None:
transport = FakeTransport()
cfg = replace(
make_cfg(transport),
topics=TelegramTopicsSettings(enabled=True, scope="all"),
)
store = TopicStateStore(tmp_path / "topics.json")
msg = _msg("/topic")
await _handle_topic_command(
cfg,
msg,
args_text="",
store=store,
resolved_scope="all",
scope_chat_ids=frozenset({msg.chat_id}),
)
text = transport.send_calls[-1]["message"].text
assert "usage: /topic" in text
+34
View File
@@ -0,0 +1,34 @@
from dataclasses import replace
from takopi.settings import TelegramTopicsSettings
from takopi.telegram.topics import _resolve_topics_scope_raw, _topics_command_error
from tests.telegram_fakes import FakeTransport, make_cfg
def test_resolve_topics_scope_raw() -> None:
resolved, chat_ids = _resolve_topics_scope_raw("auto", 1, ())
assert resolved == "main"
assert chat_ids == frozenset({1})
resolved, chat_ids = _resolve_topics_scope_raw("projects", 1, (2, 3))
assert resolved == "projects"
assert chat_ids == frozenset({2, 3})
resolved, chat_ids = _resolve_topics_scope_raw("all", 1, (2,))
assert resolved == "all"
assert chat_ids == frozenset({1, 2})
def test_topics_command_error_for_wrong_chat() -> None:
transport = FakeTransport()
cfg = replace(
make_cfg(transport),
topics=TelegramTopicsSettings(enabled=True, scope="main"),
)
error = _topics_command_error(
cfg,
chat_id=999,
resolved_scope="main",
scope_chat_ids=frozenset({cfg.chat_id}),
)
assert error == "topics commands are only available in the main chat."
+107 -1
View File
@@ -13,7 +13,7 @@ from takopi.telegram.api_models import (
)
from takopi.telegram.client import BotClient
from takopi.telegram.types import TelegramIncomingMessage, TelegramVoice
from takopi.telegram.voice import transcribe_voice
from takopi.telegram.voice import VOICE_TRANSCRIPTION_DISABLED_HINT, transcribe_voice
class _Bot(BotClient):
@@ -176,6 +176,42 @@ def _voice_message(*, file_size: int = 123) -> TelegramIncomingMessage:
)
class _Transcriber:
def __init__(self, *, result: str | None = None, error: Exception | None = None):
self.calls: list[tuple[str, bytes]] = []
self._result = result
self._error = error
async def transcribe(self, *, model: str, audio_bytes: bytes) -> str:
self.calls.append((model, audio_bytes))
if self._error is not None:
raise self._error
assert self._result is not None
return self._result
@pytest.mark.anyio
async def test_transcribe_voice_disabled_replies_with_hint() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
transcriber = _Transcriber(result="should-not-run")
result = await transcribe_voice(
bot=_Bot(file_info=None, audio=None),
msg=_voice_message(),
enabled=False,
model="whisper-1",
reply=reply,
transcriber=transcriber,
)
assert result is None
assert replies[-1] == VOICE_TRANSCRIPTION_DISABLED_HINT
assert transcriber.calls == []
@pytest.mark.anyio
async def test_transcribe_voice_handles_missing_file() -> None:
replies: list[str] = []
@@ -244,3 +280,73 @@ async def test_transcribe_voice_rejects_large_voice_without_downloading() -> Non
assert result is None
assert replies[-1] == "voice message is too large to transcribe."
@pytest.mark.anyio
async def test_transcribe_voice_rejects_large_download() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
transcriber = _Transcriber(result="should-not-run")
bot = _Bot(file_info=File(file_path="voice.ogg"), audio=b"x" * 200)
result = await transcribe_voice(
bot=bot,
msg=_voice_message(file_size=10),
enabled=True,
model="whisper-1",
max_bytes=100,
reply=reply,
transcriber=transcriber,
)
assert result is None
assert replies[-1] == "voice message is too large to transcribe."
assert transcriber.calls == []
@pytest.mark.anyio
async def test_transcribe_voice_handles_transcriber_error() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
transcriber = _Transcriber(error=RuntimeError("boom"))
bot = _Bot(file_info=File(file_path="voice.ogg"), audio=b"ok")
result = await transcribe_voice(
bot=bot,
msg=_voice_message(file_size=2),
enabled=True,
model="whisper-1",
reply=reply,
transcriber=transcriber,
)
assert result is None
assert replies[-1] == "boom"
assert transcriber.calls
@pytest.mark.anyio
async def test_transcribe_voice_success() -> None:
replies: list[str] = []
async def reply(**kwargs) -> None:
replies.append(kwargs["text"])
transcriber = _Transcriber(result="transcribed")
bot = _Bot(file_info=File(file_path="voice.ogg"), audio=b"ok")
result = await transcribe_voice(
bot=bot,
msg=_voice_message(file_size=2),
enabled=True,
model="whisper-1",
reply=reply,
transcriber=transcriber,
)
assert result == "transcribed"
assert replies == []
assert transcriber.calls
+78
View File
@@ -0,0 +1,78 @@
from __future__ import annotations
import pytest
from takopi.runners import tool_actions
from takopi.utils.paths import reset_run_base_dir, set_run_base_dir
def test_tool_input_path_picks_first_match() -> None:
tool_input = {"path": "src/main.py", "file": "ignored.txt"}
assert (
tool_actions.tool_input_path(tool_input, path_keys=("file", "path"))
== "ignored.txt"
)
assert (
tool_actions.tool_input_path(tool_input, path_keys=("path", "file"))
== "src/main.py"
)
assert tool_actions.tool_input_path(tool_input, path_keys=("missing",)) is None
@pytest.mark.parametrize(
("tool_name", "tool_input", "expected_kind", "expected_title"),
[
("bash", {"command": "echo hi"}, "command", "echo hi"),
("shell", {"command": "pwd"}, "command", "pwd"),
("edit", {"path": "src/app.py"}, "file_change", "src/app.py"),
("write", {"path": "notes.txt"}, "file_change", "notes.txt"),
("read", {"path": "README.md"}, "tool", "read: `README.md`"),
("glob", {"pattern": "*.py"}, "tool", "glob: `*.py`"),
("grep", {"pattern": "TODO"}, "tool", "grep: TODO"),
("find", {"pattern": "*.toml"}, "tool", "find: *.toml"),
("ls", {"path": "src"}, "tool", "ls: `src`"),
("websearch", {"query": "takopi"}, "web_search", "takopi"),
(
"webfetch",
{"url": "https://example.com"},
"web_search",
"https://example.com",
),
("todowrite", {}, "note", "update todos"),
("todoread", {}, "note", "read todos"),
("askuserquestion", {}, "note", "ask user"),
("task", {"description": "do work"}, "subagent", "do work"),
("agent", {"prompt": "assist"}, "subagent", "assist"),
("unknown", {}, "tool", "unknown"),
],
)
def test_tool_kind_and_title_cases(
tool_name: str,
tool_input: dict[str, object],
expected_kind: str,
expected_title: str,
) -> None:
token = set_run_base_dir(None)
try:
kind, title = tool_actions.tool_kind_and_title(
tool_name,
tool_input,
path_keys=("path", "file"),
)
finally:
reset_run_base_dir(token)
assert kind == expected_kind
assert title == expected_title
def test_tool_kind_and_title_task_kind_override() -> None:
kind, title = tool_actions.tool_kind_and_title(
"agent",
{"description": "spawn worker"},
path_keys=(),
task_kind="warning",
)
assert kind == "warning"
assert title == "spawn worker"