diff --git a/changelog.md b/changelog.md index bf51cc9..4a06f0f 100644 --- a/changelog.md +++ b/changelog.md @@ -2,6 +2,9 @@ ## unreleased +- add auto-router runner selection with configurable default engine +- add `/cancel` + `/{engine}` command menu sync on startup + ## v0.3.0 (2026-01-01) ### changes diff --git a/docs/developing.md b/docs/developing.md index fd92456..c7ba50f 100644 --- a/docs/developing.md +++ b/docs/developing.md @@ -26,6 +26,9 @@ uv run ty check . make check ``` +Takopi runs in **auto-router** mode by default. `default_engine` in `takopi.toml` selects +the engine for new threads; engine subcommands override that default for the process. + ## Module Responsibilities ### `bridge.py` - Telegram bridge loop @@ -44,9 +47,11 @@ The orchestrator module containing: **Key patterns:** - Bridge schedules runs FIFO per thread to avoid concurrent progress messages; runner locks enforce per-thread serialization - `/cancel` routes by reply-to progress message id (accepts extra text) -- Progress edits are throttled to ~1s intervals and only run when new events arrive +- `/{engine}` on the first line selects the engine for new threads +- Progress edits are throttled to 2s intervals and only run when new events arrive - Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``) -- Resume parsing is delegated to the active runner (no cross-engine fallback) +- Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match +- Bot command menu is synced on startup (`cancel` + engine commands) ### `cli.py` - CLI entry point @@ -164,12 +169,16 @@ poll_updates() drains backlog, long-polls, filters chat_id == from_id == cfg.cha ↓ run_main_loop() spawns tasks in TaskGroup ↓ -handle_message() spawned as task +router.resolve_resume(text, reply_text) → ResumeToken | None + ↓ +router.runner_for(resume_token) → selects runner (default engine if None) + ↓ +handle_message() spawned as task with selected runner ↓ Send initial progress message (silent) ↓ -CodexRunner.run() - ├── Spawns: codex exec --json ... - +runner.run(prompt, resume_token) + ├── Spawns engine subprocess (e.g., codex exec --json) ├── Streams JSONL from stdout ├── Normalizes JSONL -> takopi events ├── Yields Takopi events (async iterator) @@ -186,10 +195,10 @@ Send/edit final message ### Resume Flow -Same as above, but: -- Runners parse resume lines (e.g. `` `codex resume ` ``) -- Command becomes: `codex exec --json resume -` -- Per-token lock serializes concurrent resumes +Same as above; auto-router polls all runners to extract resume tokens: +- Router returns first matching token (e.g. `` `claude --resume ` `` routes to Claude) +- Selected runner spawns with resume (e.g. `codex exec --json resume -`) +- Per-token lock serializes concurrent resumes on the same thread ## Error Handling diff --git a/docs/specification.md b/docs/specification.md index 373687a..d7de39b 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -1,10 +1,10 @@ -# Takopi Specification v0.2.0 (minimal) [2025-12-31] +# Takopi Specification v0.4.0 [2026-01-01] This document is **normative**. The words **MUST**, **SHOULD**, and **MAY** express requirements. ## 1. Scope -Takopi v0.2.0 specifies: +Takopi v0.4.0 specifies: - A **Telegram** bot bridge that runs an agent **Runner** and posts: - a throttled, edited **progress message** @@ -12,12 +12,12 @@ Takopi v0.2.0 specifies: - **Thread continuation** via a **resume command** embedded in chat messages - **Parallel runs across different threads** - **Serialization within a thread** (no concurrent runs on the same thread) +- **Automatic runner selection** among multiple engines based on ResumeLine (with a configurable default for new threads) - A Takopi-owned **normalized event model** produced by runners and consumed by renderers/bridge -Out of scope for v0.2.0: +Out of scope for v0.4.0: - Non-Telegram clients (Slack/Discord/etc.) -- Auto-selecting among multiple runners - Token-by-token streaming of the assistant’s final answer - Engines/runners that cannot provide **stable action IDs** within a run @@ -71,11 +71,13 @@ Constraints: ### 3.4 Bridge resume resolution (MUST) -Given `text` (user message) and optional `reply_text` (the message being replied to): +Given `text` (user message), optional `reply_text` (the message being replied to), and an ordered list of available runners `runners`: -1. The bridge MUST attempt `runner.extract_resume(text)`. -2. If not found, it MUST attempt `runner.extract_resume(reply_text)` if present. -3. If still not found, the run MUST start with `resume=None` (new thread). +1. The bridge MUST attempt to extract a resume token by polling all runners in order: + 1. for each `r` in `runners`, attempt `r.extract_resume(text)` + 2. choose the **first** runner that returns a non-`None` token and stop +2. If not found, it MUST repeat step (1) for `reply_text` if present. +3. If still not found, the run MUST start with `resume=None` (new thread) on the default runner (per §8, including chat-level overrides). ## 4. Normalized event model @@ -335,12 +337,30 @@ Action update collapsing: ## 8. Configuration and engine selection -Decision (v0.2.0): +Decision (v0.4.0): -* Exactly one runner is selected at startup via a CLI subcommand (no default). -* If no engine subcommand is provided, Takopi prints an engine chooser panel and exits. -* Resume extraction uses only the selected runner. -* If a user provides a resume line for a different engine, extraction fails and the bridge treats the message as a new thread (`resume=None`). +* Takopi MUST support configuring a **default engine** used to start new threads (`resume=None`). + * If not configured, the default engine is implementation-defined (non-normative: the reference implementation defaults to `codex`). +* If no engine subcommand is provided, Takopi MUST run in **auto-router** mode: + * new threads use the configured default engine + * resumed threads are routed based on ResumeLine extraction (per §3.4) +* If an engine subcommand is provided, Takopi MUST still use the auto-router, but it overrides the configured default engine for new threads. +* Resume extraction MUST poll **all** available runners (per §3.4) and route to the first matching runner. +* New thread engine override (chat-level): + * Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude` or `/codex`) to select the engine for a **new** thread. + * The bridge MUST strip that directive from the prompt before invoking the runner. + * If a ResumeToken is resolved from the message or reply, it MUST take precedence and the `/{engine}` directive MUST be ignored. + +### 8.1 Command menu (Telegram) + +Takopi SHOULD keep the bot’s slash-command menu in sync at startup by calling +`setMyCommands` with the canonical list of supported commands. + +* The command list MUST include: + * `cancel` — cancel the current run + * one entry per configured engine +* The command list MUST NOT include commands the bot does not support. +* Command descriptions SHOULD be terse and lowercase. ## 9. Testing requirements (MUST) @@ -373,5 +393,23 @@ Tests MUST cover: * completed-only actions render correctly * repeated events for same Action.id collapse as intended +7. **Auto-router engine selection** + + * resume lines for non-default engines are detected and routed correctly (poll all runners) + * new threads use the configured default engine, with CLI subcommand overriding it Test tooling SHOULD include event factories, deterministic/fake time, and a script/mock runner. + +## 10. Changelog + +### v0.4.0 (2026-01-01) + +- Add auto-router engine selection by polling all runners to decode resume lines; add configurable default engine for new threads (subcommand overrides default). + +### v0.3.0 (2026-01-01) + +- Require runners to implement explicit resume formatting/extraction/detection and treat runners as authoritative for resume tokens/lines. + +### v0.2.0 (2025-12-31) + +- Initial minimal Takopi specification (Telegram bridge + runner protocol + normalized events + resume support). diff --git a/readme.md b/readme.md index a29cb14..302ffc7 100644 --- a/readme.md +++ b/readme.md @@ -2,7 +2,7 @@ 🐙 *he just wants to help-pi* -telegram bridge for codex and claude code. runs the agent cli, streams progress, and supports resumable sessions. +telegram bridge for codex, claude code, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions. ## features @@ -40,6 +40,8 @@ parallel runs across threads, per thread queue support. global config `~/.takopi/takopi.toml`, repo-level config `.takopi/takopi.toml` ```toml +default_engine = "codex" + bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" chat_id = 123456789 @@ -61,13 +63,17 @@ start takopi in the repo you want to work on: ```sh cd ~/dev/your-repo -takopi codex -# or +takopi +# or override the default engine for new threads: takopi claude ``` +resume lines always route to the matching engine; subcommands only override the default for new threads. + send a message to the bot. +start a new thread with a specific engine by prefixing your message with `/codex` or `/claude`. + to continue a thread, reply to a bot message containing a resume line. you can also copy it to resume an interactive session in your terminal. diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py index f5d3d9f..fc4d541 100644 --- a/src/takopi/bridge.py +++ b/src/takopi/bridge.py @@ -1,4 +1,4 @@ -"""Telegram bridge orchestration for running a single runner and streaming progress.""" +"""Telegram bridge orchestration for running runners and streaming progress.""" from __future__ import annotations @@ -12,8 +12,9 @@ from typing import Any import anyio from .markdown import TELEGRAM_MARKDOWN_LIMIT, prepare_telegram -from .model import CompletedEvent, ResumeToken, StartedEvent, TakopiEvent +from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent from .render import ExecProgressRenderer, render_event_cli +from .router import AutoRouter, RunnerUnavailableError from .runner import Runner from .telegram import BotClient @@ -21,12 +22,6 @@ from .telegram import BotClient logger = logging.getLogger(__name__) -def _resolve_resume( - runner: Runner, text: str | None, reply_text: str | None -) -> ResumeToken | None: - return runner.extract_resume(text) or runner.extract_resume(reply_text) - - def _log_runner_event(evt: TakopiEvent) -> None: for line in render_event_cli(evt): logger.info("[runner] %s", line) @@ -45,6 +40,73 @@ def _is_cancel_command(text: str) -> bool: return command == "/cancel" or command.startswith("/cancel@") +def _strip_engine_command( + text: str, *, engine_ids: tuple[EngineId, ...] +) -> tuple[str, EngineId | None]: + if not text: + return text, None + + if not engine_ids: + return text, None + + engine_map = {engine.lower(): engine for engine in engine_ids} + lines = text.splitlines() + idx = next((i for i, line in enumerate(lines) if line.strip()), None) + if idx is None: + return text, None + + line = lines[idx].lstrip() + if not line.startswith("/"): + return text, None + + parts = line.split(maxsplit=1) + command = parts[0][1:] + if "@" in command: + command = command.split("@", 1)[0] + engine = engine_map.get(command.lower()) + if engine is None: + return text, None + + remainder = parts[1] if len(parts) > 1 else "" + if remainder: + lines[idx] = remainder + else: + lines.pop(idx) + return "\n".join(lines).strip(), engine + + +def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]: + commands: list[dict[str, str]] = [ + {"command": "cancel", "description": "cancel run"} + ] + seen = {"cancel"} + for engine in router.engine_ids: + cmd = engine.lower() + if cmd in seen: + continue + commands.append({"command": cmd, "description": f"start {cmd}"}) + seen.add(cmd) + return commands + + +async def _set_command_menu(cfg: BridgeConfig) -> None: + commands = _build_bot_commands(cfg.router) + if not commands: + return + try: + ok = await cfg.bot.set_my_commands(commands) + except Exception as exc: + logger.info("[startup] command menu update failed: %s", exc) + return + if not ok: + logger.info("[startup] command menu update rejected") + return + logger.info( + "[startup] command menu updated commands=%s", + ", ".join(cmd["command"] for cmd in commands), + ) + + def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str: stripped_lines: list[str] = [] for line in text.splitlines(): @@ -55,6 +117,34 @@ def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> return prompt or "continue" +def _flatten_exception_group(error: BaseException) -> list[BaseException]: + if isinstance(error, BaseExceptionGroup): + flattened: list[BaseException] = [] + for exc in error.exceptions: + flattened.extend(_flatten_exception_group(exc)) + return flattened + return [error] + + +def _format_error(error: Exception) -> str: + cancel_exc = anyio.get_cancelled_exc_class() + flattened = [ + exc + for exc in _flatten_exception_group(error) + if not isinstance(exc, cancel_exc) + ] + if len(flattened) == 1: + return str(flattened[0]) or flattened[0].__class__.__name__ + if not flattened: + return str(error) or error.__class__.__name__ + messages = [str(exc) for exc in flattened if str(exc)] + if not messages: + return str(error) or error.__class__.__name__ + if len(messages) == 1: + return messages[0] + return "\n".join(messages) + + PROGRESS_EDIT_EVERY_S = 2.0 @@ -187,7 +277,7 @@ class ProgressEdits: @dataclass(frozen=True) class BridgeConfig: bot: BotClient - runner: Runner + router: AutoRouter chat_id: int final_notify: bool startup_msg: str @@ -376,10 +466,12 @@ async def send_result_message( async def handle_message( cfg: BridgeConfig, *, + runner: Runner, chat_id: int, user_msg_id: int, text: str, resume_token: ResumeToken | None, + strip_resume_line: Callable[[str], bool] | None = None, running_tasks: dict[int, RunningTask] | None = None, on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None = None, @@ -395,9 +487,9 @@ async def handle_message( text, ) started_at = clock() - runner = cfg.runner is_resume_line = runner.is_resume_line - runner_text = _strip_resume_lines(text, is_resume_line=is_resume_line) + resume_strip = strip_resume_line or is_resume_line + runner_text = _strip_resume_lines(text, is_resume_line=resume_strip) progress_renderer = ExecProgressRenderer( max_actions=5, resume_formatter=runner.format_resume @@ -464,6 +556,7 @@ async def handle_message( ) except Exception as e: error = e + logger.exception("[handle] runner failed") finally: if ( running_task is not None @@ -481,7 +574,7 @@ async def handle_message( if error is not None: sync_resume_token(progress_renderer, outcome.resume) - err_body = str(error) + err_body = _format_error(error) final_md = progress_renderer.render_final(elapsed, err_body, status="error") logger.debug("[error] markdown: %s", final_md) await send_result_message( @@ -681,6 +774,32 @@ async def _send_with_resume( await enqueue(chat_id, user_msg_id, text, resume) +async def _send_runner_unavailable( + cfg: BridgeConfig, + *, + chat_id: int, + user_msg_id: int, + resume_token: ResumeToken | None, + runner: Runner, + reason: str, +) -> None: + progress_renderer = ExecProgressRenderer( + max_actions=0, resume_formatter=runner.format_resume + ) + if resume_token is not None: + progress_renderer.resume_token = resume_token + final_md = progress_renderer.render_final(0.0, f"Error:\n{reason}", status="error") + await _send_or_edit_markdown( + cfg.bot, + chat_id=chat_id, + text=final_md, + reply_to_message_id=user_msg_id, + disable_notification=False, + limit=TELEGRAM_MARKDOWN_LIMIT, + is_resume_line=runner.is_resume_line, + ) + + async def run_main_loop( cfg: BridgeConfig, poller: Callable[[BridgeConfig], AsyncIterator[dict[str, Any]]] = poll_updates, @@ -688,6 +807,7 @@ async def run_main_loop( running_tasks: dict[int, RunningTask] = {} try: + await _set_command_menu(cfg) async with anyio.create_task_group() as tg: scheduler_lock = anyio.Lock() @@ -726,14 +846,44 @@ async def run_main_loop( resume_token: ResumeToken | None, on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None = None, + engine_override: EngineId | None = None, ) -> None: try: + try: + entry = ( + cfg.router.entry_for_engine(engine_override) + if resume_token is None + else cfg.router.entry_for(resume_token) + ) + except RunnerUnavailableError as exc: + await _send_or_edit_markdown( + cfg.bot, + chat_id=chat_id, + text=f"Error:\n{exc}", + reply_to_message_id=user_msg_id, + disable_notification=False, + limit=TELEGRAM_MARKDOWN_LIMIT, + ) + return + if not entry.available: + reason = entry.issue or "engine unavailable" + await _send_runner_unavailable( + cfg, + chat_id=chat_id, + user_msg_id=user_msg_id, + resume_token=resume_token, + runner=entry.runner, + reason=reason, + ) + return await handle_message( cfg, + runner=entry.runner, chat_id=chat_id, user_msg_id=user_msg_id, text=text, resume_token=resume_token, + strip_resume_line=cfg.router.is_resume_line, running_tasks=running_tasks, on_thread_known=on_thread_known, progress_edit_every=cfg.progress_edit_every, @@ -799,8 +949,12 @@ async def run_main_loop( tg.start_soon(_handle_cancel, cfg, msg, running_tasks) continue + text, engine_override = _strip_engine_command( + text, engine_ids=cfg.router.engine_ids + ) + r = msg.get("reply_to_message") or {} - resume_token = _resolve_resume(cfg.runner, text, r.get("text")) + resume_token = cfg.router.resolve_resume(text, r.get("text")) reply_id = r.get("message_id") if resume_token is None and reply_id is not None: running_task = running_tasks.get(int(reply_id)) @@ -824,6 +978,7 @@ async def run_main_loop( text, None, note_thread_known, + engine_override, ) else: await enqueue(msg["chat"]["id"], user_msg_id, text, resume_token) diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 48511b7..6bc8f1b 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -1,7 +1,10 @@ from __future__ import annotations +import logging import os +import shutil from collections.abc import Callable +from pathlib import Path import anyio import typer @@ -12,9 +15,12 @@ from .bridge import BridgeConfig, run_main_loop from .config import ConfigError, load_telegram_config from .engines import get_backend, get_engine_config, list_backends from .logging import setup_logging -from .onboarding import check_setup, render_engine_choice, render_setup_guide +from .onboarding import check_setup, render_setup_guide +from .router import AutoRouter, RunnerEntry from .telegram import TelegramClient +logger = logging.getLogger(__name__) + def _print_version_and_exit() -> None: typer.echo(__version__) @@ -26,10 +32,113 @@ def _version_callback(value: bool) -> None: _print_version_and_exit() +def _default_engine_for_setup(override: str | None) -> str: + if override: + return override + try: + config, config_path = load_telegram_config() + except ConfigError: + return "codex" + value = config.get("default_engine") + if value is None: + return "codex" + if not isinstance(value, str) or not value.strip(): + raise ConfigError( + f"Invalid `default_engine` in {config_path}; expected a non-empty string." + ) + return value.strip() + + +def _resolve_default_engine( + *, + override: str | None, + config: dict, + config_path: Path, + backends: list[EngineBackend], +) -> str: + default_engine = override or config.get("default_engine") or "codex" + if not isinstance(default_engine, str) or not default_engine.strip(): + raise ConfigError( + f"Invalid `default_engine` in {config_path}; expected a non-empty string." + ) + default_engine = default_engine.strip() + backend_ids = {backend.id for backend in backends} + if default_engine not in backend_ids: + available = ", ".join(sorted(backend_ids)) + raise ConfigError( + f"Unknown default engine {default_engine!r}. Available: {available}." + ) + return default_engine + + +def _build_router( + *, + config: dict, + config_path: Path, + backends: list[EngineBackend], + default_engine: str, +) -> AutoRouter: + entries: list[RunnerEntry] = [] + warnings: list[str] = [] + + for backend in backends: + engine_id = backend.id + issue: str | None = None + engine_cfg: dict + try: + engine_cfg = get_engine_config(config, engine_id, config_path) + except ConfigError as exc: + if engine_id == default_engine: + raise + issue = str(exc) + engine_cfg = {} + + try: + runner = backend.build_runner(engine_cfg, config_path) + except Exception as exc: + if engine_id == default_engine: + raise + issue = issue or str(exc) + if engine_cfg: + try: + runner = backend.build_runner({}, config_path) + except Exception as fallback_exc: + warnings.append(f"{engine_id}: {issue or str(fallback_exc)}") + continue + else: + warnings.append(f"{engine_id}: {issue}") + continue + + cmd = backend.cli_cmd or backend.id + if shutil.which(cmd) is None: + issue = issue or f"{cmd} not found on PATH" + + if issue and engine_id == default_engine: + raise ConfigError(f"Default engine {engine_id!r} unavailable: {issue}") + + available = issue is None + if issue and engine_id != default_engine: + warnings.append(f"{engine_id}: {issue}") + + entries.append( + RunnerEntry( + engine=engine_id, + runner=runner, + available=available, + issue=issue, + ) + ) + + for warning in warnings: + logger.warning("[setup] %s", warning) + + return AutoRouter(entries=entries, default_engine=default_engine) + + def _parse_bridge_config( *, final_notify: bool, - backend: EngineBackend, + default_engine_override: str | None, ) -> BridgeConfig: startup_pwd = os.getcwd() @@ -52,29 +161,46 @@ def _parse_bridge_config( ) from None chat_id = chat_id_value - engine_cfg = get_engine_config(config, backend.id, config_path) + backends = list_backends() + default_engine = _resolve_default_engine( + override=default_engine_override, + config=config, + config_path=config_path, + backends=backends, + ) + router = _build_router( + config=config, + config_path=config_path, + backends=backends, + default_engine=default_engine, + ) + engine_list = ", ".join(router.engine_ids) startup_msg = ( f"\N{OCTOPUS} **takopi is ready**\n\n" - f"agent: `{backend.id}` \n" + f"mode: `auto-router` \n" + f"default: `{router.default_engine}` \n" + f"engines: `{engine_list}` \n" f"working in: `{startup_pwd}`" ) bot = TelegramClient(token) - runner = backend.build_runner(engine_cfg, config_path) return BridgeConfig( bot=bot, - runner=runner, + router=router, chat_id=chat_id, final_notify=final_notify, startup_msg=startup_msg, ) -def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None: +def _run_auto_router( + *, default_engine_override: str | None, final_notify: bool, debug: bool +) -> None: setup_logging(debug=debug) try: - backend = get_backend(engine) + default_engine = _default_engine_for_setup(default_engine_override) + backend = get_backend(default_engine) except ConfigError as e: typer.echo(str(e), err=True) raise typer.Exit(code=1) @@ -85,7 +211,7 @@ def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None: try: cfg = _parse_bridge_config( final_notify=final_notify, - backend=backend, + default_engine_override=default_engine_override, ) except ConfigError as e: typer.echo(str(e), err=True) @@ -96,7 +222,7 @@ def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None: app = typer.Typer( add_completion=False, invoke_without_command=True, - help="Run takopi with an explicit engine subcommand.", + help="Run takopi with auto-router (subcommands override the default engine).", ) @@ -110,11 +236,25 @@ def app_main( callback=_version_callback, is_eager=True, ), + final_notify: bool = typer.Option( + True, + "--final-notify/--no-final-notify", + help="Send the final response as a new message (not an edit).", + ), + debug: bool = typer.Option( + False, + "--debug/--no-debug", + help="Log engine JSONL, Telegram requests, and rendered messages.", + ), ) -> None: """Takopi CLI.""" if ctx.invoked_subcommand is None: - render_engine_choice(list_backends()) - raise typer.Exit(code=1) + _run_auto_router( + default_engine_override=None, + final_notify=final_notify, + debug=debug, + ) + raise typer.Exit() def make_engine_cmd(engine_id: str) -> Callable[..., None]: @@ -130,7 +270,11 @@ def make_engine_cmd(engine_id: str) -> Callable[..., None]: help="Log engine JSONL, Telegram requests, and rendered messages.", ), ) -> None: - _run_engine(engine=engine_id, final_notify=final_notify, debug=debug) + _run_auto_router( + default_engine_override=engine_id, + final_notify=final_notify, + debug=debug, + ) _cmd.__name__ = f"run_{engine_id}" return _cmd diff --git a/src/takopi/router.py b/src/takopi/router.py new file mode 100644 index 0000000..dc8a67b --- /dev/null +++ b/src/takopi/router.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Iterable + +from .model import EngineId, ResumeToken +from .runner import Runner + + +class RunnerUnavailableError(RuntimeError): + def __init__(self, engine: EngineId, issue: str | None = None) -> None: + message = f"engine {engine!r} is unavailable" + if issue: + message = f"{message}: {issue}" + super().__init__(message) + self.engine = engine + self.issue = issue + + +@dataclass(frozen=True, slots=True) +class RunnerEntry: + engine: EngineId + runner: Runner + available: bool = True + issue: str | None = None + + +class AutoRouter: + def __init__( + self, entries: Iterable[RunnerEntry], default_engine: EngineId + ) -> None: + self._entries = tuple(entries) + if not self._entries: + raise ValueError("AutoRouter requires at least one runner.") + by_engine: dict[EngineId, RunnerEntry] = {} + for entry in self._entries: + if entry.engine in by_engine: + raise ValueError(f"duplicate runner engine: {entry.engine}") + by_engine[entry.engine] = entry + if default_engine not in by_engine: + raise ValueError(f"default engine {default_engine!r} is not configured") + self._by_engine = by_engine + self.default_engine = default_engine + + @property + def entries(self) -> tuple[RunnerEntry, ...]: + return self._entries + + @property + def engine_ids(self) -> tuple[EngineId, ...]: + return tuple(entry.engine for entry in self._entries) + + @property + def default_entry(self) -> RunnerEntry: + return self._by_engine[self.default_engine] + + def entry_for_engine(self, engine: EngineId | None) -> RunnerEntry: + engine = self.default_engine if engine is None else engine + entry = self._by_engine.get(engine) + if entry is None: + raise RunnerUnavailableError(engine, "engine not configured") + return entry + + def entry_for(self, resume: ResumeToken | None) -> RunnerEntry: + if resume is None: + return self.entry_for_engine(None) + return self.entry_for_engine(resume.engine) + + def runner_for(self, resume: ResumeToken | None) -> Runner: + entry = self.entry_for(resume) + if not entry.available: + raise RunnerUnavailableError(entry.engine, entry.issue) + return entry.runner + + def format_resume(self, token: ResumeToken) -> str: + entry = self.entry_for(token) + return entry.runner.format_resume(token) + + def extract_resume(self, text: str | None) -> ResumeToken | None: + if not text: + return None + for entry in self._entries: + token = entry.runner.extract_resume(text) + if token is not None: + return token + return None + + def resolve_resume( + self, text: str | None, reply_text: str | None + ) -> ResumeToken | None: + token = self.extract_resume(text) + if token is not None: + return token + if reply_text is None: + return None + return self.extract_resume(reply_text) + + def is_resume_line(self, line: str) -> bool: + return any(entry.runner.is_resume_line(line) for entry in self._entries) diff --git a/src/takopi/runner.py b/src/takopi/runner.py index 0691e02..ed8a8cd 100644 --- a/src/takopi/runner.py +++ b/src/takopi/runner.py @@ -283,14 +283,22 @@ class JsonlSubprocessRunner(BaseRunner): found_session: ResumeToken | None, ) -> tuple[ResumeToken | None, bool]: if event.engine != self.engine: - raise RuntimeError(f"{self.tag()} emitted session token for wrong engine") + raise RuntimeError( + f"{self.tag()} emitted session token for engine {event.engine!r}" + ) if expected_session is not None and event.resume != expected_session: - message = f"{self.tag()} emitted a different session id than expected" + message = ( + f"{self.tag()} emitted session id {event.resume.value} " + f"but expected {expected_session.value}" + ) raise RuntimeError(message) if found_session is None: return event.resume, True if event.resume != found_session: - message = f"{self.tag()} emitted a different session id than expected" + message = ( + f"{self.tag()} emitted session id {event.resume.value} " + f"but expected {found_session.value}" + ) raise RuntimeError(message) return found_session, False diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index 073aafd..bb37719 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -472,12 +472,18 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): f"codex emitted session token for engine {event.engine!r}" ) if expected_session is not None and event.resume != expected_session: - message = "codex emitted a different session id than expected" + message = ( + f"codex emitted session id {event.resume.value} " + f"but expected {expected_session.value}" + ) raise RuntimeError(message) if found_session is None: return event.resume, True if event.resume != found_session: - message = "codex emitted a different session id than expected" + message = ( + f"codex emitted session id {event.resume.value} " + f"but expected {found_session.value}" + ) raise RuntimeError(message) return found_session, False diff --git a/src/takopi/telegram.py b/src/takopi/telegram.py index 45c274e..c399275 100644 --- a/src/takopi/telegram.py +++ b/src/takopi/telegram.py @@ -42,6 +42,14 @@ class BotClient(Protocol): async def delete_message(self, chat_id: int, message_id: int) -> bool: ... + async def set_my_commands( + self, + commands: list[dict[str, Any]], + *, + scope: dict[str, Any] | None = None, + language_code: str | None = None, + ) -> bool: ... + class TelegramClient: def __init__( @@ -184,3 +192,18 @@ class TelegramClient: }, ) return bool(res) + + async def set_my_commands( + self, + commands: list[dict[str, Any]], + *, + scope: dict[str, Any] | None = None, + language_code: str | None = None, + ) -> bool: + params: dict[str, Any] = {"commands": commands} + if scope is not None: + params["scope"] = scope + if language_code is not None: + params["language_code"] = language_code + res = await self._post("setMyCommands", params) + return bool(res) diff --git a/tests/test_auto_router.py b/tests/test_auto_router.py new file mode 100644 index 0000000..0828ebb --- /dev/null +++ b/tests/test_auto_router.py @@ -0,0 +1,48 @@ +from takopi.model import ResumeToken +from takopi.router import AutoRouter, RunnerEntry +from takopi.runners.claude import ClaudeRunner +from takopi.runners.codex import CodexRunner + + +def _router() -> tuple[AutoRouter, ClaudeRunner, CodexRunner]: + codex = CodexRunner(codex_cmd="codex", extra_args=[]) + claude = ClaudeRunner(claude_cmd="claude") + router = AutoRouter( + entries=[ + RunnerEntry(engine=claude.engine, runner=claude), + RunnerEntry(engine=codex.engine, runner=codex), + ], + default_engine=codex.engine, + ) + return router, claude, codex + + +def test_router_resolves_text_before_reply() -> None: + router, _claude, _codex = _router() + token = router.resolve_resume("`codex resume abc`", "`claude --resume def`") + + assert token == ResumeToken(engine="codex", value="abc") + + +def test_router_poll_order_selects_first_matching_runner() -> None: + router, _claude, _codex = _router() + text = "`codex resume abc`\n`claude --resume def`" + + token = router.resolve_resume(text, None) + + assert token == ResumeToken(engine="claude", value="def") + + +def test_router_resolves_reply_text_when_text_missing() -> None: + router, _claude, _codex = _router() + + token = router.resolve_resume(None, "`codex resume xyz`") + + assert token == ResumeToken(engine="codex", value="xyz") + + +def test_router_is_resume_line_union() -> None: + router, _claude, _codex = _router() + + assert router.is_resume_line("`codex resume abc`") + assert router.is_resume_line("claude --resume def") diff --git a/tests/test_exec_bridge.py b/tests/test_exec_bridge.py index 4546690..381e7af 100644 --- a/tests/test_exec_bridge.py +++ b/tests/test_exec_bridge.py @@ -3,9 +3,10 @@ import uuid import anyio import pytest -from takopi import engines +from takopi.bridge import _build_bot_commands, _strip_engine_command from takopi.markdown import prepare_telegram, truncate_for_telegram from takopi.model import EngineId, ResumeToken, TakopiEvent +from takopi.router import AutoRouter, RunnerEntry from takopi.runners.codex import CodexRunner from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Sleep, Wait from tests.factories import action_completed, action_started @@ -13,6 +14,13 @@ from tests.factories import action_completed, action_started CODEX_ENGINE = EngineId("codex") +def _make_router(runner) -> AutoRouter: + return AutoRouter( + entries=[RunnerEntry(engine=runner.engine, runner=runner)], + default_engine=runner.engine, + ) + + def _patch_config(monkeypatch, config): from pathlib import Path @@ -33,7 +41,7 @@ def test_parse_bridge_config_rejects_empty_token(monkeypatch) -> None: with pytest.raises(cli.ConfigError, match="bot_token"): cli._parse_bridge_config( final_notify=True, - backend=engines.get_backend("codex"), + default_engine_override=None, ) @@ -45,7 +53,7 @@ def test_parse_bridge_config_rejects_string_chat_id(monkeypatch) -> None: with pytest.raises(cli.ConfigError, match="chat_id"): cli._parse_bridge_config( final_notify=True, - backend=engines.get_backend("codex"), + default_engine_override=None, ) @@ -114,6 +122,55 @@ def test_truncate_for_telegram_keeps_last_non_empty_line() -> None: assert out.rstrip().endswith("last line") +def test_strip_engine_command_inline() -> None: + text, engine = _strip_engine_command( + "/claude do it", engine_ids=("codex", "claude") + ) + assert engine == "claude" + assert text == "do it" + + +def test_strip_engine_command_newline() -> None: + text, engine = _strip_engine_command( + "/codex\nhello", engine_ids=("codex", "claude") + ) + assert engine == "codex" + assert text == "hello" + + +def test_strip_engine_command_ignores_unknown() -> None: + text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude")) + assert engine is None + assert text == "/unknown hi" + + +def test_strip_engine_command_bot_suffix() -> None: + text, engine = _strip_engine_command( + "/claude@bunny_agent_bot hi", engine_ids=("claude",) + ) + assert engine == "claude" + assert text == "hi" + + +def test_strip_engine_command_only_first_non_empty_line() -> None: + text, engine = _strip_engine_command( + "hello\n/claude hi", engine_ids=("codex", "claude") + ) + assert engine is None + assert text == "hello\n/claude hi" + + +def test_build_bot_commands_includes_cancel_and_engine() -> None: + runner = ScriptRunner( + [Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid" + ) + router = _make_router(runner) + commands = _build_bot_commands(router) + + assert {"command": "cancel", "description": "cancel run"} in commands + assert any(cmd["command"] == "codex" for cmd in commands) + + def test_prepare_telegram_drops_entities_on_truncate() -> None: md = ("**bold** " * 200).strip() @@ -126,6 +183,7 @@ def test_prepare_telegram_drops_entities_on_truncate() -> None: class _FakeBot: def __init__(self) -> None: self._next_id = 1 + self.command_calls: list[dict] = [] self.send_calls: list[dict] = [] self.edit_calls: list[dict] = [] self.delete_calls: list[dict] = [] @@ -176,6 +234,22 @@ class _FakeBot: self.delete_calls.append({"chat_id": chat_id, "message_id": message_id}) return True + async def set_my_commands( + self, + commands: list[dict], + *, + scope: dict | None = None, + language_code: str | None = None, + ) -> bool: + self.command_calls.append( + { + "commands": commands, + "scope": scope, + "language_code": language_code, + } + ) + return True + async def get_updates( self, offset: int | None, @@ -238,7 +312,7 @@ async def test_final_notify_sends_loud_final_message() -> None: runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -246,6 +320,7 @@ async def test_final_notify_sends_loud_final_message() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="hi", @@ -265,7 +340,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None: runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -275,6 +350,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text=text, @@ -295,7 +371,7 @@ async def test_new_final_message_forces_notification_when_too_long_to_edit() -> runner = _return_runner(answer="x" * 10_000) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=False, startup_msg="", @@ -303,6 +379,7 @@ async def test_new_final_message_forces_notification_when_too_long_to_edit() -> await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="hi", @@ -336,7 +413,7 @@ async def test_progress_edits_are_rate_limited() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -344,6 +421,7 @@ async def test_progress_edits_are_rate_limited() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="hi", @@ -380,7 +458,7 @@ async def test_progress_edits_do_not_sleep_again_without_new_events() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -389,6 +467,7 @@ async def test_progress_edits_do_not_sleep_again_without_new_events() -> None: async def run_handle_message() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="hi", @@ -455,7 +534,7 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -463,6 +542,7 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=42, text="do it", @@ -488,7 +568,7 @@ async def test_handle_cancel_without_reply_prompts_user() -> None: runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -510,7 +590,7 @@ async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -536,7 +616,7 @@ async def test_handle_cancel_with_finished_task_says_nothing_running() -> None: runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -563,7 +643,7 @@ async def test_handle_cancel_cancels_running_task() -> None: runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -593,7 +673,7 @@ async def test_handle_cancel_only_cancels_matching_progress_message() -> None: runner = _return_runner(answer="ok") cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -638,7 +718,7 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -648,6 +728,7 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None: async def run_handle_message() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="do something", @@ -687,7 +768,7 @@ async def test_handle_message_error_preserves_resume_token() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="", @@ -695,6 +776,7 @@ async def test_handle_message_error_preserves_resume_token() -> None: await handle_message( cfg, + runner=runner, chat_id=123, user_msg_id=10, text="do something", @@ -817,7 +899,7 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None: ) cfg = BridgeConfig( bot=bot, - runner=runner, + router=_make_router(runner), chat_id=123, final_notify=True, startup_msg="",