feat: plugins and public api (#71)

This commit is contained in:
banteg
2026-01-09 03:23:57 +04:00
committed by GitHub
parent 16c0069aa0
commit f856338b94
32 changed files with 3135 additions and 622 deletions
+7
View File
@@ -9,3 +9,10 @@ sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
@pytest.fixture
def anyio_backend() -> str:
return "asyncio"
@pytest.fixture(autouse=True)
def reset_plugins_state() -> None:
import takopi.plugins as plugins
plugins.reset_plugin_state()
+47
View File
@@ -0,0 +1,47 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Iterable
@dataclass(frozen=True, slots=True)
class FakeDist:
name: str
class FakeEntryPoint:
def __init__(
self,
name: str,
value: str,
group: str,
*,
loader: Callable[[], Any] | None = None,
dist_name: str | None = "takopi",
) -> None:
self.name = name
self.value = value
self.group = group
self._loader = loader or (lambda: None)
self.dist = FakeDist(dist_name) if dist_name else None
def load(self) -> Any:
return self._loader()
class FakeEntryPoints(list):
def select(self, *, group: str) -> list[FakeEntryPoint]:
return [ep for ep in self if ep.group == group]
def get(self, group: str, default: Iterable[Any] | None = None) -> list[Any]:
_ = default
return [ep for ep in self if ep.group == group]
def install_entrypoints(monkeypatch, entrypoints: Iterable[FakeEntryPoint]) -> None:
from takopi import plugins
def _entry_points() -> FakeEntryPoints:
return FakeEntryPoints(entrypoints)
monkeypatch.setattr(plugins, "entry_points", _entry_points)
+47
View File
@@ -0,0 +1,47 @@
import pytest
from takopi import commands, plugins
from takopi.config import ConfigError
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
class DummyCommand:
id = "hello"
description = "Hello command"
async def handle(self, ctx):
_ = ctx
return None
@pytest.fixture
def command_entrypoints(monkeypatch):
entrypoints = [
FakeEntryPoint(
"hello",
"takopi.commands.hello:BACKEND",
plugins.COMMAND_GROUP,
loader=DummyCommand,
)
]
install_entrypoints(monkeypatch, entrypoints)
return entrypoints
def test_command_registry_lists_ids(command_entrypoints) -> None:
ids = commands.list_command_ids()
assert "hello" in ids
def test_command_registry_gets_command(command_entrypoints) -> None:
backend = commands.get_command("hello")
assert backend.id == "hello"
def test_command_registry_unknown(command_entrypoints) -> None:
with pytest.raises(ConfigError, match="Unknown command"):
commands.get_command("nope")
def test_command_registry_optional_missing(command_entrypoints) -> None:
assert commands.get_command("nope", required=False) is None
+38 -9
View File
@@ -1,28 +1,57 @@
from typing import cast
import pytest
import click
import typer
from takopi import cli, engines
from takopi import cli, engines, plugins
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
def test_engine_discovery_skips_non_backend() -> None:
@pytest.fixture
def engine_entrypoints(monkeypatch):
entrypoints = [
FakeEntryPoint(
"codex",
"takopi.runners.codex:BACKEND",
plugins.ENGINE_GROUP,
),
FakeEntryPoint(
"claude",
"takopi.runners.claude:BACKEND",
plugins.ENGINE_GROUP,
),
FakeEntryPoint(
"bad-id",
"takopi.runners.bad:BACKEND",
plugins.ENGINE_GROUP,
),
]
install_entrypoints(monkeypatch, entrypoints)
monkeypatch.setattr(cli, "_load_settings_optional", lambda: (None, None))
return entrypoints
def test_engine_discovery_filters_invalid_ids(engine_entrypoints) -> None:
ids = engines.list_backend_ids()
assert "codex" in ids
assert "claude" in ids
assert "mock" not in ids
assert ids == ["claude", "codex"]
def test_cli_registers_engine_commands_sorted() -> None:
command_names = [cmd.name for cmd in cli.app.registered_commands]
def test_cli_registers_engine_commands_sorted(engine_entrypoints) -> None:
app = cli.create_app()
command_names = [cmd.name for cmd in app.registered_commands]
engine_ids = engines.list_backend_ids()
assert set(engine_ids) <= set(command_names)
engine_commands = [name for name in command_names if name in engine_ids]
assert engine_commands == engine_ids
def test_engine_commands_do_not_expose_engine_id_option() -> None:
group = cast(click.Group, typer.main.get_command(cli.app))
def test_engine_commands_do_not_expose_engine_id_option(
engine_entrypoints,
) -> None:
app = cli.create_app()
group = cast(click.Group, typer.main.get_command(app))
engine_ids = engines.list_backend_ids()
ctx = group.make_context("takopi", [])
+184
View File
@@ -0,0 +1,184 @@
import pytest
from takopi import plugins
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
def test_list_ids_does_not_load_entrypoints(monkeypatch) -> None:
calls = {"count": 0}
def loader():
calls["count"] += 1
return object()
entrypoints = [
FakeEntryPoint(
"codex",
"takopi.runners.codex:BACKEND",
plugins.ENGINE_GROUP,
loader=loader,
)
]
install_entrypoints(monkeypatch, entrypoints)
ids = plugins.list_ids(plugins.ENGINE_GROUP)
assert ids == ["codex"]
assert calls["count"] == 0
def test_load_entrypoint_records_errors(monkeypatch) -> None:
def loader():
raise RuntimeError("boom")
entrypoints = [
FakeEntryPoint(
"broken",
"takopi.runners.broken:BACKEND",
plugins.ENGINE_GROUP,
loader=loader,
)
]
install_entrypoints(monkeypatch, entrypoints)
with pytest.raises(plugins.PluginLoadFailed):
plugins.load_entrypoint(plugins.ENGINE_GROUP, "broken")
errors = plugins.get_load_errors()
assert errors
assert errors[0].name == "broken"
assert "boom" in errors[0].error
def test_duplicate_entrypoints_are_rejected(monkeypatch) -> None:
entrypoints = [
FakeEntryPoint(
"dup",
"takopi.runners.one:BACKEND",
plugins.ENGINE_GROUP,
dist_name="one",
),
FakeEntryPoint(
"dup",
"takopi.runners.two:BACKEND",
plugins.ENGINE_GROUP,
dist_name="two",
),
]
install_entrypoints(monkeypatch, entrypoints)
ids = plugins.list_ids(plugins.ENGINE_GROUP)
assert ids == []
with pytest.raises(plugins.PluginLoadFailed):
plugins.load_entrypoint(plugins.ENGINE_GROUP, "dup")
errors = plugins.get_load_errors()
assert any("duplicate plugin id" in err.error for err in errors)
def test_allowlist_filters_by_distribution(monkeypatch) -> None:
entrypoints = [
FakeEntryPoint(
"codex",
"takopi.runners.codex:BACKEND",
plugins.ENGINE_GROUP,
dist_name="takopi",
),
FakeEntryPoint(
"thirdparty",
"takopi_thirdparty.backend:BACKEND",
plugins.ENGINE_GROUP,
dist_name="takopi-thirdparty",
),
]
install_entrypoints(monkeypatch, entrypoints)
ids = plugins.list_ids(plugins.ENGINE_GROUP, allowlist=["takopi"])
assert ids == ["codex"]
def test_validator_errors_are_captured(monkeypatch) -> None:
entrypoints = [
FakeEntryPoint(
"bad",
"takopi.runners.bad:BACKEND",
plugins.ENGINE_GROUP,
)
]
install_entrypoints(monkeypatch, entrypoints)
def validator(obj, ep):
raise TypeError("not valid")
with pytest.raises(plugins.PluginLoadFailed):
plugins.load_entrypoint(plugins.ENGINE_GROUP, "bad", validator=validator)
errors = plugins.get_load_errors()
assert any("not valid" in err.error for err in errors)
def test_reset_plugin_state_clears_cache(monkeypatch) -> None:
calls = {"count": 0}
def loader():
calls["count"] += 1
return object()
entrypoints = [
FakeEntryPoint(
"codex",
"takopi.runners.codex:BACKEND",
plugins.ENGINE_GROUP,
loader=loader,
)
]
install_entrypoints(monkeypatch, entrypoints)
plugins.load_entrypoint(plugins.ENGINE_GROUP, "codex")
plugins.load_entrypoint(plugins.ENGINE_GROUP, "codex")
assert calls["count"] == 1
plugins.reset_plugin_state()
plugins.load_entrypoint(plugins.ENGINE_GROUP, "codex")
assert calls["count"] == 2
def test_clear_load_errors_filters(monkeypatch) -> None:
def loader():
raise RuntimeError("boom")
entrypoints = [
FakeEntryPoint(
"broken_engine",
"takopi.runners.broken:BACKEND",
plugins.ENGINE_GROUP,
loader=loader,
dist_name="engine-dist",
),
FakeEntryPoint(
"broken_transport",
"takopi.transports.broken:BACKEND",
plugins.TRANSPORT_GROUP,
loader=loader,
dist_name="transport-dist",
),
]
install_entrypoints(monkeypatch, entrypoints)
with pytest.raises(plugins.PluginLoadFailed):
plugins.load_entrypoint(plugins.ENGINE_GROUP, "broken_engine")
with pytest.raises(plugins.PluginLoadFailed):
plugins.load_entrypoint(plugins.TRANSPORT_GROUP, "broken_transport")
errors = plugins.get_load_errors()
assert {err.group for err in errors} == {
plugins.ENGINE_GROUP,
plugins.TRANSPORT_GROUP,
}
plugins.clear_load_errors(group=plugins.ENGINE_GROUP)
errors = plugins.get_load_errors()
assert {err.group for err in errors} == {plugins.TRANSPORT_GROUP}
plugins.clear_load_errors(name="broken_transport")
assert plugins.get_load_errors() == ()
+4 -2
View File
@@ -35,13 +35,14 @@ def test_init_writes_project(monkeypatch, tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
monkeypatch.setattr("takopi.config.HOME_CONFIG_PATH", config_path)
monkeypatch.setattr(cli, "resolve_default_base", lambda _: "main")
monkeypatch.setattr(cli, "_load_settings_optional", lambda: (None, None))
repo_path = tmp_path / "repo"
repo_path.mkdir()
monkeypatch.chdir(repo_path)
runner = CliRunner()
result = runner.invoke(cli.app, ["init", "z80"])
result = runner.invoke(cli.create_app(), ["init", "z80"])
assert result.exit_code == 0
saved = config_path.read_text(encoding="utf-8")
@@ -56,13 +57,14 @@ def test_init_migrates_legacy_config(monkeypatch, tmp_path) -> None:
config_path.write_text('bot_token = "token"\nchat_id = 123\n', encoding="utf-8")
monkeypatch.setattr("takopi.config.HOME_CONFIG_PATH", config_path)
monkeypatch.setattr(cli, "resolve_default_base", lambda _: "main")
monkeypatch.setattr(cli, "_load_settings_optional", lambda: (None, None))
repo_path = tmp_path / "repo"
repo_path.mkdir()
monkeypatch.chdir(repo_path)
runner = CliRunner()
result = runner.invoke(cli.app, ["init", "z80"])
result = runner.invoke(cli.create_app(), ["init", "z80"])
assert result.exit_code == 0
raw = read_raw_toml(config_path)
+322 -56
View File
@@ -3,15 +3,16 @@ from pathlib import Path
import anyio
import pytest
from takopi import commands, plugins
import takopi.telegram.bridge as bridge
from takopi.directives import parse_directives
from takopi.telegram.bridge import (
TelegramBridgeConfig,
TelegramTransport,
_build_bot_commands,
_handle_cancel,
_is_cancel_command,
_resolve_message,
_send_with_resume,
_strip_engine_command,
run_main_loop,
)
from takopi.context import RunContext
@@ -20,8 +21,11 @@ from takopi.runner_bridge import ExecBridgeConfig, RunningTask
from takopi.markdown import MarkdownPresenter
from takopi.model import EngineId, ResumeToken
from takopi.router import AutoRouter, RunnerEntry
from takopi.transport_runtime import TransportRuntime
from takopi.runners.mock import Return, ScriptRunner, Sleep, Wait
from takopi.transport import IncomingMessage, MessageRef, RenderedMessage, SendOptions
from takopi.telegram.types import TelegramIncomingMessage
from takopi.transport import MessageRef, RenderedMessage, SendOptions
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
CODEX_ENGINE = EngineId("codex")
@@ -185,59 +189,78 @@ def _make_cfg(
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
return TelegramBridgeConfig(
bot=_FakeBot(),
router=_make_router(runner),
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
def test_strip_engine_command_inline() -> None:
text, engine = _strip_engine_command(
"/claude do it", engine_ids=("codex", "claude")
def test_parse_directives_inline_engine() -> None:
directives = parse_directives(
"/claude do it",
engine_ids=("codex", "claude"),
projects=empty_projects_config(),
)
assert engine == "claude"
assert text == "do it"
assert directives.engine == "claude"
assert directives.prompt == "do it"
def test_strip_engine_command_newline() -> None:
text, engine = _strip_engine_command(
"/codex\nhello", engine_ids=("codex", "claude")
def test_parse_directives_newline() -> None:
directives = parse_directives(
"/codex\nhello",
engine_ids=("codex", "claude"),
projects=empty_projects_config(),
)
assert engine == "codex"
assert text == "hello"
assert directives.engine == "codex"
assert directives.prompt == "hello"
def test_strip_engine_command_ignores_unknown() -> None:
text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude"))
assert engine is None
assert text == "/unknown hi"
def test_strip_engine_command_bot_suffix() -> None:
text, engine = _strip_engine_command(
"/claude@bunny_agent_bot hi", engine_ids=("claude",)
def test_parse_directives_ignores_unknown() -> None:
directives = parse_directives(
"/unknown hi",
engine_ids=("codex", "claude"),
projects=empty_projects_config(),
)
assert engine == "claude"
assert text == "hi"
assert directives.engine is None
assert directives.prompt == "/unknown hi"
def test_strip_engine_command_only_first_non_empty_line() -> None:
text, engine = _strip_engine_command(
"hello\n/claude hi", engine_ids=("codex", "claude")
def test_parse_directives_bot_suffix() -> None:
directives = parse_directives(
"/claude@bunny_agent_bot hi",
engine_ids=("claude",),
projects=empty_projects_config(),
)
assert engine is None
assert text == "hello\n/claude hi"
assert directives.engine == "claude"
assert directives.prompt == "hi"
def test_parse_directives_only_first_non_empty_line() -> None:
directives = parse_directives(
"hello\n/claude hi",
engine_ids=("codex", "claude"),
projects=empty_projects_config(),
)
assert directives.engine is None
assert directives.prompt == "hello\n/claude hi"
def test_build_bot_commands_includes_cancel_and_engine() -> None:
runner = ScriptRunner(
[Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid"
)
router = _make_router(runner)
commands = _build_bot_commands(router, empty_projects_config())
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
commands = _build_bot_commands(runtime)
assert {"command": "cancel", "description": "cancel run"} in commands
assert any(cmd["command"] == "codex" for cmd in commands)
@@ -264,12 +287,42 @@ def test_build_bot_commands_includes_projects() -> None:
default_project=None,
)
commands = _build_bot_commands(router, projects)
runtime = TransportRuntime(router=router, projects=projects)
commands = _build_bot_commands(runtime)
assert any(cmd["command"] == "good" for cmd in commands)
assert not any(cmd["command"] == "bad-name" for cmd in commands)
def test_build_bot_commands_includes_command_plugins(monkeypatch) -> None:
class _Command:
id = "pingcmd"
description = "ping command"
async def handle(self, ctx):
_ = ctx
return None
entrypoints = [
FakeEntryPoint(
"pingcmd",
"takopi.commands.ping:BACKEND",
plugins.COMMAND_GROUP,
loader=_Command,
)
]
install_entrypoints(monkeypatch, entrypoints)
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
commands_list = _build_bot_commands(runtime)
assert {"command": "pingcmd", "description": "ping command"} in commands_list
def test_build_bot_commands_caps_total() -> None:
runner = ScriptRunner(
[Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid"
@@ -287,7 +340,8 @@ def test_build_bot_commands_caps_total() -> None:
default_project=None,
)
commands = _build_bot_commands(router, projects)
runtime = TransportRuntime(router=router, projects=projects)
commands = _build_bot_commands(runtime)
assert len(commands) == 100
assert any(cmd["command"] == "codex" for cmd in commands)
@@ -410,7 +464,7 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
async def test_handle_cancel_without_reply_prompts_user() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
msg = IncomingMessage(
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=10,
@@ -431,7 +485,7 @@ async def test_handle_cancel_without_reply_prompts_user() -> None:
async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
msg = IncomingMessage(
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=10,
@@ -453,7 +507,7 @@ async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
progress_id = 99
msg = IncomingMessage(
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=10,
@@ -475,7 +529,7 @@ async def test_handle_cancel_cancels_running_task() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
progress_id = 42
msg = IncomingMessage(
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=10,
@@ -499,7 +553,7 @@ async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
cfg = _make_cfg(transport)
task_first = RunningTask()
task_second = RunningTask()
msg = IncomingMessage(
msg = TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=10,
@@ -527,23 +581,22 @@ def test_cancel_command_accepts_extra_text() -> None:
def test_resolve_message_accepts_backticked_ctx_line() -> None:
router = _make_router(ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE))
projects = ProjectsConfig(
projects={
"takopi": ProjectConfig(
alias="takopi",
path=Path("."),
worktrees_dir=Path(".worktrees"),
)
},
default_project=None,
runtime = TransportRuntime(
router=_make_router(ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)),
projects=ProjectsConfig(
projects={
"takopi": ProjectConfig(
alias="takopi",
path=Path("."),
worktrees_dir=Path(".worktrees"),
)
},
default_project=None,
),
)
resolved = _resolve_message(
resolved = runtime.resolve_message(
text="do it",
reply_text="`ctx: takopi @ feat/api`",
router=router,
projects=projects,
)
assert resolved.prompt == "do it"
@@ -643,16 +696,20 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
cfg = TelegramBridgeConfig(
bot=bot,
router=_make_router(runner),
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield IncomingMessage(
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
@@ -666,7 +723,7 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
assert isinstance(transport.progress_ref.message_id, int)
reply_id = transport.progress_ref.message_id
reply_ready.set()
yield IncomingMessage(
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=2,
@@ -694,3 +751,212 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
hold.set()
stop_polling.set()
tg.cancel_scope.cancel()
@pytest.mark.anyio
async def test_run_main_loop_handles_command_plugins(monkeypatch) -> None:
class _Command:
id = "echo_cmd"
description = "echo"
async def handle(self, ctx):
return commands.CommandResult(text=f"echo:{ctx.args_text}")
entrypoints = [
FakeEntryPoint(
"echo_cmd",
"takopi.commands.echo:BACKEND",
plugins.COMMAND_GROUP,
loader=_Command,
)
]
install_entrypoints(monkeypatch, entrypoints)
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="/echo_cmd hello",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
)
await run_main_loop(cfg, poller)
assert runner.calls == []
assert transport.send_calls
assert transport.send_calls[-1]["message"].text == "echo:hello"
@pytest.mark.anyio
async def test_run_main_loop_command_uses_project_default_engine(
monkeypatch,
) -> None:
class _Command:
id = "use_project"
description = "use project default"
async def handle(self, ctx):
result = await ctx.executor.run_one(
commands.RunRequest(
prompt="hello",
context=RunContext(project="proj"),
),
mode="capture",
)
return commands.CommandResult(text=f"ran:{result.engine}")
entrypoints = [
FakeEntryPoint(
"use_project",
"takopi.commands.use_project:BACKEND",
plugins.COMMAND_GROUP,
loader=_Command,
)
]
install_entrypoints(monkeypatch, entrypoints)
transport = _FakeTransport()
bot = _FakeBot()
codex_runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
pi_runner = ScriptRunner([Return(answer="ok")], engine=EngineId("pi"))
router = AutoRouter(
entries=[
RunnerEntry(engine=codex_runner.engine, runner=codex_runner),
RunnerEntry(engine=pi_runner.engine, runner=pi_runner),
],
default_engine=codex_runner.engine,
)
projects = ProjectsConfig(
projects={
"proj": ProjectConfig(
alias="proj",
path=Path("."),
worktrees_dir=Path(".worktrees"),
default_engine=pi_runner.engine,
)
},
default_project=None,
)
runtime = TransportRuntime(
router=router,
projects=projects,
)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="/use_project",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
)
await run_main_loop(cfg, poller)
assert codex_runner.calls == []
assert len(pi_runner.calls) == 1
assert transport.send_calls[-1]["message"].text == "ran:pi"
@pytest.mark.anyio
async def test_run_main_loop_refreshes_command_ids(monkeypatch) -> None:
class _Command:
id = "late_cmd"
description = "late command"
async def handle(self, ctx):
return commands.CommandResult(text="late")
entrypoints = [
FakeEntryPoint(
"late_cmd",
"takopi.commands.late:BACKEND",
plugins.COMMAND_GROUP,
loader=_Command,
)
]
install_entrypoints(monkeypatch, entrypoints)
calls = {"count": 0}
def _list_command_ids(*, allowlist=None):
_ = allowlist
calls["count"] += 1
if calls["count"] == 1:
return []
return ["late_cmd"]
monkeypatch.setattr(bridge, "list_command_ids", _list_command_ids)
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="/late_cmd hello",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
)
await run_main_loop(cfg, poller)
assert calls["count"] >= 2
assert transport.send_calls[-1]["message"].text == "late"
+52 -4
View File
@@ -1,19 +1,67 @@
import pytest
from takopi import transports
from takopi import plugins, transports
from takopi.config import ConfigError
from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints
def test_transport_registry_lists_telegram() -> None:
class DummyTransport:
id = "telegram"
description = "Telegram"
def check_setup(self, *args, **kwargs):
raise NotImplementedError
def interactive_setup(self, *, force: bool) -> bool:
raise NotImplementedError
def lock_token(self, *, transport_config: dict[str, object], config_path):
_ = transport_config, config_path
raise NotImplementedError
def build_and_run(
self,
*,
transport_config: dict[str, object],
config_path,
runtime,
final_notify: bool,
default_engine_override: str | None,
) -> None:
_ = (
transport_config,
config_path,
runtime,
final_notify,
default_engine_override,
)
raise NotImplementedError
@pytest.fixture
def transport_entrypoints(monkeypatch):
entrypoints = [
FakeEntryPoint(
"telegram",
"takopi.telegram.backend:telegram_backend",
plugins.TRANSPORT_GROUP,
loader=DummyTransport,
)
]
install_entrypoints(monkeypatch, entrypoints)
return entrypoints
def test_transport_registry_lists_telegram(transport_entrypoints) -> None:
ids = transports.list_transports()
assert "telegram" in ids
def test_transport_registry_gets_telegram() -> None:
def test_transport_registry_gets_telegram(transport_entrypoints) -> None:
backend = transports.get_transport("telegram")
assert backend.id == "telegram"
def test_transport_registry_unknown() -> None:
def test_transport_registry_unknown(transport_entrypoints) -> None:
with pytest.raises(ConfigError, match="Unknown transport"):
transports.get_transport("nope")
+45
View File
@@ -0,0 +1,45 @@
from pathlib import Path
from takopi.config import ProjectConfig, ProjectsConfig
from takopi.context import RunContext
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.mock import Return, ScriptRunner
from takopi.transport_runtime import TransportRuntime
def _make_runtime(*, project_default_engine: str | None = None) -> TransportRuntime:
codex = ScriptRunner([Return(answer="ok")], engine="codex")
pi = ScriptRunner([Return(answer="ok")], engine="pi")
router = AutoRouter(
entries=[
RunnerEntry(engine=codex.engine, runner=codex),
RunnerEntry(engine=pi.engine, runner=pi),
],
default_engine=codex.engine,
)
project = ProjectConfig(
alias="proj",
path=Path("."),
worktrees_dir=Path(".worktrees"),
default_engine=project_default_engine,
)
projects = ProjectsConfig(projects={"proj": project}, default_project=None)
return TransportRuntime(router=router, projects=projects)
def test_resolve_engine_uses_project_default() -> None:
runtime = _make_runtime(project_default_engine="pi")
engine = runtime.resolve_engine(
engine_override=None,
context=RunContext(project="proj"),
)
assert engine == "pi"
def test_resolve_engine_prefers_override() -> None:
runtime = _make_runtime(project_default_engine="pi")
engine = runtime.resolve_engine(
engine_override="codex",
context=RunContext(project="proj"),
)
assert engine == "codex"