feat: auto router (#15)

This commit is contained in:
banteg
2026-01-02 03:13:29 +04:00
committed by GitHub
parent 73ba4836c1
commit bd9387f7f0
12 changed files with 694 additions and 73 deletions
+3
View File
@@ -2,6 +2,9 @@
## unreleased
- add auto-router runner selection with configurable default engine
- add `/cancel` + `/{engine}` command menu sync on startup
## v0.3.0 (2026-01-01)
### changes
+18 -9
View File
@@ -26,6 +26,9 @@ uv run ty check .
make check
```
Takopi runs in **auto-router** mode by default. `default_engine` in `takopi.toml` selects
the engine for new threads; engine subcommands override that default for the process.
## Module Responsibilities
### `bridge.py` - Telegram bridge loop
@@ -44,9 +47,11 @@ The orchestrator module containing:
**Key patterns:**
- Bridge schedules runs FIFO per thread to avoid concurrent progress messages; runner locks enforce per-thread serialization
- `/cancel` routes by reply-to progress message id (accepts extra text)
- Progress edits are throttled to ~1s intervals and only run when new events arrive
- `/{engine}` on the first line selects the engine for new threads
- Progress edits are throttled to 2s intervals and only run when new events arrive
- Resume tokens are runner-formatted command lines (e.g., `` `codex resume <token>` ``)
- Resume parsing is delegated to the active runner (no cross-engine fallback)
- Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match
- Bot command menu is synced on startup (`cancel` + engine commands)
### `cli.py` - CLI entry point
@@ -164,12 +169,16 @@ poll_updates() drains backlog, long-polls, filters chat_id == from_id == cfg.cha
run_main_loop() spawns tasks in TaskGroup
handle_message() spawned as task
router.resolve_resume(text, reply_text) → ResumeToken | None
router.runner_for(resume_token) → selects runner (default engine if None)
handle_message() spawned as task with selected runner
Send initial progress message (silent)
CodexRunner.run()
├── Spawns: codex exec --json ... -
runner.run(prompt, resume_token)
├── Spawns engine subprocess (e.g., codex exec --json)
├── Streams JSONL from stdout
├── Normalizes JSONL -> takopi events
├── Yields Takopi events (async iterator)
@@ -186,10 +195,10 @@ Send/edit final message
### Resume Flow
Same as above, but:
- Runners parse resume lines (e.g. `` `codex resume <token>` ``)
- Command becomes: `codex exec --json resume <token> -`
- Per-token lock serializes concurrent resumes
Same as above; auto-router polls all runners to extract resume tokens:
- Router returns first matching token (e.g. `` `claude --resume <id>` `` routes to Claude)
- Selected runner spawns with resume (e.g. `codex exec --json resume <token> -`)
- Per-token lock serializes concurrent resumes on the same thread
## Error Handling
+51 -13
View File
@@ -1,10 +1,10 @@
# Takopi Specification v0.2.0 (minimal) [2025-12-31]
# Takopi Specification v0.4.0 [2026-01-01]
This document is **normative**. The words **MUST**, **SHOULD**, and **MAY** express requirements.
## 1. Scope
Takopi v0.2.0 specifies:
Takopi v0.4.0 specifies:
- A **Telegram** bot bridge that runs an agent **Runner** and posts:
- a throttled, edited **progress message**
@@ -12,12 +12,12 @@ Takopi v0.2.0 specifies:
- **Thread continuation** via a **resume command** embedded in chat messages
- **Parallel runs across different threads**
- **Serialization within a thread** (no concurrent runs on the same thread)
- **Automatic runner selection** among multiple engines based on ResumeLine (with a configurable default for new threads)
- A Takopi-owned **normalized event model** produced by runners and consumed by renderers/bridge
Out of scope for v0.2.0:
Out of scope for v0.4.0:
- Non-Telegram clients (Slack/Discord/etc.)
- Auto-selecting among multiple runners
- Token-by-token streaming of the assistants final answer
- Engines/runners that cannot provide **stable action IDs** within a run
@@ -71,11 +71,13 @@ Constraints:
### 3.4 Bridge resume resolution (MUST)
Given `text` (user message) and optional `reply_text` (the message being replied to):
Given `text` (user message), optional `reply_text` (the message being replied to), and an ordered list of available runners `runners`:
1. The bridge MUST attempt `runner.extract_resume(text)`.
2. If not found, it MUST attempt `runner.extract_resume(reply_text)` if present.
3. If still not found, the run MUST start with `resume=None` (new thread).
1. The bridge MUST attempt to extract a resume token by polling all runners in order:
1. for each `r` in `runners`, attempt `r.extract_resume(text)`
2. choose the **first** runner that returns a non-`None` token and stop
2. If not found, it MUST repeat step (1) for `reply_text` if present.
3. If still not found, the run MUST start with `resume=None` (new thread) on the default runner (per §8, including chat-level overrides).
## 4. Normalized event model
@@ -335,12 +337,30 @@ Action update collapsing:
## 8. Configuration and engine selection
Decision (v0.2.0):
Decision (v0.4.0):
* Exactly one runner is selected at startup via a CLI subcommand (no default).
* If no engine subcommand is provided, Takopi prints an engine chooser panel and exits.
* Resume extraction uses only the selected runner.
* If a user provides a resume line for a different engine, extraction fails and the bridge treats the message as a new thread (`resume=None`).
* Takopi MUST support configuring a **default engine** used to start new threads (`resume=None`).
* If not configured, the default engine is implementation-defined (non-normative: the reference implementation defaults to `codex`).
* If no engine subcommand is provided, Takopi MUST run in **auto-router** mode:
* new threads use the configured default engine
* resumed threads are routed based on ResumeLine extraction (per §3.4)
* If an engine subcommand is provided, Takopi MUST still use the auto-router, but it overrides the configured default engine for new threads.
* Resume extraction MUST poll **all** available runners (per §3.4) and route to the first matching runner.
* New thread engine override (chat-level):
* Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude` or `/codex`) to select the engine for a **new** thread.
* The bridge MUST strip that directive from the prompt before invoking the runner.
* If a ResumeToken is resolved from the message or reply, it MUST take precedence and the `/{engine}` directive MUST be ignored.
### 8.1 Command menu (Telegram)
Takopi SHOULD keep the bots slash-command menu in sync at startup by calling
`setMyCommands` with the canonical list of supported commands.
* The command list MUST include:
* `cancel` — cancel the current run
* one entry per configured engine
* The command list MUST NOT include commands the bot does not support.
* Command descriptions SHOULD be terse and lowercase.
## 9. Testing requirements (MUST)
@@ -373,5 +393,23 @@ Tests MUST cover:
* completed-only actions render correctly
* repeated events for same Action.id collapse as intended
7. **Auto-router engine selection**
* resume lines for non-default engines are detected and routed correctly (poll all runners)
* new threads use the configured default engine, with CLI subcommand overriding it
Test tooling SHOULD include event factories, deterministic/fake time, and a script/mock runner.
## 10. Changelog
### v0.4.0 (2026-01-01)
- Add auto-router engine selection by polling all runners to decode resume lines; add configurable default engine for new threads (subcommand overrides default).
### v0.3.0 (2026-01-01)
- Require runners to implement explicit resume formatting/extraction/detection and treat runners as authoritative for resume tokens/lines.
### v0.2.0 (2025-12-31)
- Initial minimal Takopi specification (Telegram bridge + runner protocol + normalized events + resume support).
+9 -3
View File
@@ -2,7 +2,7 @@
🐙 *he just wants to help-pi*
telegram bridge for codex and claude code. runs the agent cli, streams progress, and supports resumable sessions.
telegram bridge for codex, claude code, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions.
## features
@@ -40,6 +40,8 @@ parallel runs across threads, per thread queue support.
global config `~/.takopi/takopi.toml`, repo-level config `.takopi/takopi.toml`
```toml
default_engine = "codex"
bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz"
chat_id = 123456789
@@ -61,13 +63,17 @@ start takopi in the repo you want to work on:
```sh
cd ~/dev/your-repo
takopi codex
# or
takopi
# or override the default engine for new threads:
takopi claude
```
resume lines always route to the matching engine; subcommands only override the default for new threads.
send a message to the bot.
start a new thread with a specific engine by prefixing your message with `/codex` or `/claude`.
to continue a thread, reply to a bot message containing a resume line.
you can also copy it to resume an interactive session in your terminal.
+168 -13
View File
@@ -1,4 +1,4 @@
"""Telegram bridge orchestration for running a single runner and streaming progress."""
"""Telegram bridge orchestration for running runners and streaming progress."""
from __future__ import annotations
@@ -12,8 +12,9 @@ from typing import Any
import anyio
from .markdown import TELEGRAM_MARKDOWN_LIMIT, prepare_telegram
from .model import CompletedEvent, ResumeToken, StartedEvent, TakopiEvent
from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent
from .render import ExecProgressRenderer, render_event_cli
from .router import AutoRouter, RunnerUnavailableError
from .runner import Runner
from .telegram import BotClient
@@ -21,12 +22,6 @@ from .telegram import BotClient
logger = logging.getLogger(__name__)
def _resolve_resume(
runner: Runner, text: str | None, reply_text: str | None
) -> ResumeToken | None:
return runner.extract_resume(text) or runner.extract_resume(reply_text)
def _log_runner_event(evt: TakopiEvent) -> None:
for line in render_event_cli(evt):
logger.info("[runner] %s", line)
@@ -45,6 +40,73 @@ def _is_cancel_command(text: str) -> bool:
return command == "/cancel" or command.startswith("/cancel@")
def _strip_engine_command(
text: str, *, engine_ids: tuple[EngineId, ...]
) -> tuple[str, EngineId | None]:
if not text:
return text, None
if not engine_ids:
return text, None
engine_map = {engine.lower(): engine for engine in engine_ids}
lines = text.splitlines()
idx = next((i for i, line in enumerate(lines) if line.strip()), None)
if idx is None:
return text, None
line = lines[idx].lstrip()
if not line.startswith("/"):
return text, None
parts = line.split(maxsplit=1)
command = parts[0][1:]
if "@" in command:
command = command.split("@", 1)[0]
engine = engine_map.get(command.lower())
if engine is None:
return text, None
remainder = parts[1] if len(parts) > 1 else ""
if remainder:
lines[idx] = remainder
else:
lines.pop(idx)
return "\n".join(lines).strip(), engine
def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]:
commands: list[dict[str, str]] = [
{"command": "cancel", "description": "cancel run"}
]
seen = {"cancel"}
for engine in router.engine_ids:
cmd = engine.lower()
if cmd in seen:
continue
commands.append({"command": cmd, "description": f"start {cmd}"})
seen.add(cmd)
return commands
async def _set_command_menu(cfg: BridgeConfig) -> None:
commands = _build_bot_commands(cfg.router)
if not commands:
return
try:
ok = await cfg.bot.set_my_commands(commands)
except Exception as exc:
logger.info("[startup] command menu update failed: %s", exc)
return
if not ok:
logger.info("[startup] command menu update rejected")
return
logger.info(
"[startup] command menu updated commands=%s",
", ".join(cmd["command"] for cmd in commands),
)
def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str:
stripped_lines: list[str] = []
for line in text.splitlines():
@@ -55,6 +117,34 @@ def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) ->
return prompt or "continue"
def _flatten_exception_group(error: BaseException) -> list[BaseException]:
if isinstance(error, BaseExceptionGroup):
flattened: list[BaseException] = []
for exc in error.exceptions:
flattened.extend(_flatten_exception_group(exc))
return flattened
return [error]
def _format_error(error: Exception) -> str:
cancel_exc = anyio.get_cancelled_exc_class()
flattened = [
exc
for exc in _flatten_exception_group(error)
if not isinstance(exc, cancel_exc)
]
if len(flattened) == 1:
return str(flattened[0]) or flattened[0].__class__.__name__
if not flattened:
return str(error) or error.__class__.__name__
messages = [str(exc) for exc in flattened if str(exc)]
if not messages:
return str(error) or error.__class__.__name__
if len(messages) == 1:
return messages[0]
return "\n".join(messages)
PROGRESS_EDIT_EVERY_S = 2.0
@@ -187,7 +277,7 @@ class ProgressEdits:
@dataclass(frozen=True)
class BridgeConfig:
bot: BotClient
runner: Runner
router: AutoRouter
chat_id: int
final_notify: bool
startup_msg: str
@@ -376,10 +466,12 @@ async def send_result_message(
async def handle_message(
cfg: BridgeConfig,
*,
runner: Runner,
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
strip_resume_line: Callable[[str], bool] | None = None,
running_tasks: dict[int, RunningTask] | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
@@ -395,9 +487,9 @@ async def handle_message(
text,
)
started_at = clock()
runner = cfg.runner
is_resume_line = runner.is_resume_line
runner_text = _strip_resume_lines(text, is_resume_line=is_resume_line)
resume_strip = strip_resume_line or is_resume_line
runner_text = _strip_resume_lines(text, is_resume_line=resume_strip)
progress_renderer = ExecProgressRenderer(
max_actions=5, resume_formatter=runner.format_resume
@@ -464,6 +556,7 @@ async def handle_message(
)
except Exception as e:
error = e
logger.exception("[handle] runner failed")
finally:
if (
running_task is not None
@@ -481,7 +574,7 @@ async def handle_message(
if error is not None:
sync_resume_token(progress_renderer, outcome.resume)
err_body = str(error)
err_body = _format_error(error)
final_md = progress_renderer.render_final(elapsed, err_body, status="error")
logger.debug("[error] markdown: %s", final_md)
await send_result_message(
@@ -681,6 +774,32 @@ async def _send_with_resume(
await enqueue(chat_id, user_msg_id, text, resume)
async def _send_runner_unavailable(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
resume_token: ResumeToken | None,
runner: Runner,
reason: str,
) -> None:
progress_renderer = ExecProgressRenderer(
max_actions=0, resume_formatter=runner.format_resume
)
if resume_token is not None:
progress_renderer.resume_token = resume_token
final_md = progress_renderer.render_final(0.0, f"Error:\n{reason}", status="error")
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=runner.is_resume_line,
)
async def run_main_loop(
cfg: BridgeConfig,
poller: Callable[[BridgeConfig], AsyncIterator[dict[str, Any]]] = poll_updates,
@@ -688,6 +807,7 @@ async def run_main_loop(
running_tasks: dict[int, RunningTask] = {}
try:
await _set_command_menu(cfg)
async with anyio.create_task_group() as tg:
scheduler_lock = anyio.Lock()
@@ -726,14 +846,44 @@ async def run_main_loop(
resume_token: ResumeToken | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
engine_override: EngineId | None = None,
) -> None:
try:
try:
entry = (
cfg.router.entry_for_engine(engine_override)
if resume_token is None
else cfg.router.entry_for(resume_token)
)
except RunnerUnavailableError as exc:
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=f"Error:\n{exc}",
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
return
if not entry.available:
reason = entry.issue or "engine unavailable"
await _send_runner_unavailable(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
resume_token=resume_token,
runner=entry.runner,
reason=reason,
)
return
await handle_message(
cfg,
runner=entry.runner,
chat_id=chat_id,
user_msg_id=user_msg_id,
text=text,
resume_token=resume_token,
strip_resume_line=cfg.router.is_resume_line,
running_tasks=running_tasks,
on_thread_known=on_thread_known,
progress_edit_every=cfg.progress_edit_every,
@@ -799,8 +949,12 @@ async def run_main_loop(
tg.start_soon(_handle_cancel, cfg, msg, running_tasks)
continue
text, engine_override = _strip_engine_command(
text, engine_ids=cfg.router.engine_ids
)
r = msg.get("reply_to_message") or {}
resume_token = _resolve_resume(cfg.runner, text, r.get("text"))
resume_token = cfg.router.resolve_resume(text, r.get("text"))
reply_id = r.get("message_id")
if resume_token is None and reply_id is not None:
running_task = running_tasks.get(int(reply_id))
@@ -824,6 +978,7 @@ async def run_main_loop(
text,
None,
note_thread_known,
engine_override,
)
else:
await enqueue(msg["chat"]["id"], user_msg_id, text, resume_token)
+157 -13
View File
@@ -1,7 +1,10 @@
from __future__ import annotations
import logging
import os
import shutil
from collections.abc import Callable
from pathlib import Path
import anyio
import typer
@@ -12,9 +15,12 @@ from .bridge import BridgeConfig, run_main_loop
from .config import ConfigError, load_telegram_config
from .engines import get_backend, get_engine_config, list_backends
from .logging import setup_logging
from .onboarding import check_setup, render_engine_choice, render_setup_guide
from .onboarding import check_setup, render_setup_guide
from .router import AutoRouter, RunnerEntry
from .telegram import TelegramClient
logger = logging.getLogger(__name__)
def _print_version_and_exit() -> None:
typer.echo(__version__)
@@ -26,10 +32,113 @@ def _version_callback(value: bool) -> None:
_print_version_and_exit()
def _default_engine_for_setup(override: str | None) -> str:
if override:
return override
try:
config, config_path = load_telegram_config()
except ConfigError:
return "codex"
value = config.get("default_engine")
if value is None:
return "codex"
if not isinstance(value, str) or not value.strip():
raise ConfigError(
f"Invalid `default_engine` in {config_path}; expected a non-empty string."
)
return value.strip()
def _resolve_default_engine(
*,
override: str | None,
config: dict,
config_path: Path,
backends: list[EngineBackend],
) -> str:
default_engine = override or config.get("default_engine") or "codex"
if not isinstance(default_engine, str) or not default_engine.strip():
raise ConfigError(
f"Invalid `default_engine` in {config_path}; expected a non-empty string."
)
default_engine = default_engine.strip()
backend_ids = {backend.id for backend in backends}
if default_engine not in backend_ids:
available = ", ".join(sorted(backend_ids))
raise ConfigError(
f"Unknown default engine {default_engine!r}. Available: {available}."
)
return default_engine
def _build_router(
*,
config: dict,
config_path: Path,
backends: list[EngineBackend],
default_engine: str,
) -> AutoRouter:
entries: list[RunnerEntry] = []
warnings: list[str] = []
for backend in backends:
engine_id = backend.id
issue: str | None = None
engine_cfg: dict
try:
engine_cfg = get_engine_config(config, engine_id, config_path)
except ConfigError as exc:
if engine_id == default_engine:
raise
issue = str(exc)
engine_cfg = {}
try:
runner = backend.build_runner(engine_cfg, config_path)
except Exception as exc:
if engine_id == default_engine:
raise
issue = issue or str(exc)
if engine_cfg:
try:
runner = backend.build_runner({}, config_path)
except Exception as fallback_exc:
warnings.append(f"{engine_id}: {issue or str(fallback_exc)}")
continue
else:
warnings.append(f"{engine_id}: {issue}")
continue
cmd = backend.cli_cmd or backend.id
if shutil.which(cmd) is None:
issue = issue or f"{cmd} not found on PATH"
if issue and engine_id == default_engine:
raise ConfigError(f"Default engine {engine_id!r} unavailable: {issue}")
available = issue is None
if issue and engine_id != default_engine:
warnings.append(f"{engine_id}: {issue}")
entries.append(
RunnerEntry(
engine=engine_id,
runner=runner,
available=available,
issue=issue,
)
)
for warning in warnings:
logger.warning("[setup] %s", warning)
return AutoRouter(entries=entries, default_engine=default_engine)
def _parse_bridge_config(
*,
final_notify: bool,
backend: EngineBackend,
default_engine_override: str | None,
) -> BridgeConfig:
startup_pwd = os.getcwd()
@@ -52,29 +161,46 @@ def _parse_bridge_config(
) from None
chat_id = chat_id_value
engine_cfg = get_engine_config(config, backend.id, config_path)
backends = list_backends()
default_engine = _resolve_default_engine(
override=default_engine_override,
config=config,
config_path=config_path,
backends=backends,
)
router = _build_router(
config=config,
config_path=config_path,
backends=backends,
default_engine=default_engine,
)
engine_list = ", ".join(router.engine_ids)
startup_msg = (
f"\N{OCTOPUS} **takopi is ready**\n\n"
f"agent: `{backend.id}` \n"
f"mode: `auto-router` \n"
f"default: `{router.default_engine}` \n"
f"engines: `{engine_list}` \n"
f"working in: `{startup_pwd}`"
)
bot = TelegramClient(token)
runner = backend.build_runner(engine_cfg, config_path)
return BridgeConfig(
bot=bot,
runner=runner,
router=router,
chat_id=chat_id,
final_notify=final_notify,
startup_msg=startup_msg,
)
def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None:
def _run_auto_router(
*, default_engine_override: str | None, final_notify: bool, debug: bool
) -> None:
setup_logging(debug=debug)
try:
backend = get_backend(engine)
default_engine = _default_engine_for_setup(default_engine_override)
backend = get_backend(default_engine)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
@@ -85,7 +211,7 @@ def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None:
try:
cfg = _parse_bridge_config(
final_notify=final_notify,
backend=backend,
default_engine_override=default_engine_override,
)
except ConfigError as e:
typer.echo(str(e), err=True)
@@ -96,7 +222,7 @@ def _run_engine(*, engine: str, final_notify: bool, debug: bool) -> None:
app = typer.Typer(
add_completion=False,
invoke_without_command=True,
help="Run takopi with an explicit engine subcommand.",
help="Run takopi with auto-router (subcommands override the default engine).",
)
@@ -110,11 +236,25 @@ def app_main(
callback=_version_callback,
is_eager=True,
),
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
"""Takopi CLI."""
if ctx.invoked_subcommand is None:
render_engine_choice(list_backends())
raise typer.Exit(code=1)
_run_auto_router(
default_engine_override=None,
final_notify=final_notify,
debug=debug,
)
raise typer.Exit()
def make_engine_cmd(engine_id: str) -> Callable[..., None]:
@@ -130,7 +270,11 @@ def make_engine_cmd(engine_id: str) -> Callable[..., None]:
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
_run_engine(engine=engine_id, final_notify=final_notify, debug=debug)
_run_auto_router(
default_engine_override=engine_id,
final_notify=final_notify,
debug=debug,
)
_cmd.__name__ = f"run_{engine_id}"
return _cmd
+99
View File
@@ -0,0 +1,99 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Iterable
from .model import EngineId, ResumeToken
from .runner import Runner
class RunnerUnavailableError(RuntimeError):
def __init__(self, engine: EngineId, issue: str | None = None) -> None:
message = f"engine {engine!r} is unavailable"
if issue:
message = f"{message}: {issue}"
super().__init__(message)
self.engine = engine
self.issue = issue
@dataclass(frozen=True, slots=True)
class RunnerEntry:
engine: EngineId
runner: Runner
available: bool = True
issue: str | None = None
class AutoRouter:
def __init__(
self, entries: Iterable[RunnerEntry], default_engine: EngineId
) -> None:
self._entries = tuple(entries)
if not self._entries:
raise ValueError("AutoRouter requires at least one runner.")
by_engine: dict[EngineId, RunnerEntry] = {}
for entry in self._entries:
if entry.engine in by_engine:
raise ValueError(f"duplicate runner engine: {entry.engine}")
by_engine[entry.engine] = entry
if default_engine not in by_engine:
raise ValueError(f"default engine {default_engine!r} is not configured")
self._by_engine = by_engine
self.default_engine = default_engine
@property
def entries(self) -> tuple[RunnerEntry, ...]:
return self._entries
@property
def engine_ids(self) -> tuple[EngineId, ...]:
return tuple(entry.engine for entry in self._entries)
@property
def default_entry(self) -> RunnerEntry:
return self._by_engine[self.default_engine]
def entry_for_engine(self, engine: EngineId | None) -> RunnerEntry:
engine = self.default_engine if engine is None else engine
entry = self._by_engine.get(engine)
if entry is None:
raise RunnerUnavailableError(engine, "engine not configured")
return entry
def entry_for(self, resume: ResumeToken | None) -> RunnerEntry:
if resume is None:
return self.entry_for_engine(None)
return self.entry_for_engine(resume.engine)
def runner_for(self, resume: ResumeToken | None) -> Runner:
entry = self.entry_for(resume)
if not entry.available:
raise RunnerUnavailableError(entry.engine, entry.issue)
return entry.runner
def format_resume(self, token: ResumeToken) -> str:
entry = self.entry_for(token)
return entry.runner.format_resume(token)
def extract_resume(self, text: str | None) -> ResumeToken | None:
if not text:
return None
for entry in self._entries:
token = entry.runner.extract_resume(text)
if token is not None:
return token
return None
def resolve_resume(
self, text: str | None, reply_text: str | None
) -> ResumeToken | None:
token = self.extract_resume(text)
if token is not None:
return token
if reply_text is None:
return None
return self.extract_resume(reply_text)
def is_resume_line(self, line: str) -> bool:
return any(entry.runner.is_resume_line(line) for entry in self._entries)
+11 -3
View File
@@ -283,14 +283,22 @@ class JsonlSubprocessRunner(BaseRunner):
found_session: ResumeToken | None,
) -> tuple[ResumeToken | None, bool]:
if event.engine != self.engine:
raise RuntimeError(f"{self.tag()} emitted session token for wrong engine")
raise RuntimeError(
f"{self.tag()} emitted session token for engine {event.engine!r}"
)
if expected_session is not None and event.resume != expected_session:
message = f"{self.tag()} emitted a different session id than expected"
message = (
f"{self.tag()} emitted session id {event.resume.value} "
f"but expected {expected_session.value}"
)
raise RuntimeError(message)
if found_session is None:
return event.resume, True
if event.resume != found_session:
message = f"{self.tag()} emitted a different session id than expected"
message = (
f"{self.tag()} emitted session id {event.resume.value} "
f"but expected {found_session.value}"
)
raise RuntimeError(message)
return found_session, False
+8 -2
View File
@@ -472,12 +472,18 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
f"codex emitted session token for engine {event.engine!r}"
)
if expected_session is not None and event.resume != expected_session:
message = "codex emitted a different session id than expected"
message = (
f"codex emitted session id {event.resume.value} "
f"but expected {expected_session.value}"
)
raise RuntimeError(message)
if found_session is None:
return event.resume, True
if event.resume != found_session:
message = "codex emitted a different session id than expected"
message = (
f"codex emitted session id {event.resume.value} "
f"but expected {found_session.value}"
)
raise RuntimeError(message)
return found_session, False
+23
View File
@@ -42,6 +42,14 @@ class BotClient(Protocol):
async def delete_message(self, chat_id: int, message_id: int) -> bool: ...
async def set_my_commands(
self,
commands: list[dict[str, Any]],
*,
scope: dict[str, Any] | None = None,
language_code: str | None = None,
) -> bool: ...
class TelegramClient:
def __init__(
@@ -184,3 +192,18 @@ class TelegramClient:
},
)
return bool(res)
async def set_my_commands(
self,
commands: list[dict[str, Any]],
*,
scope: dict[str, Any] | None = None,
language_code: str | None = None,
) -> bool:
params: dict[str, Any] = {"commands": commands}
if scope is not None:
params["scope"] = scope
if language_code is not None:
params["language_code"] = language_code
res = await self._post("setMyCommands", params)
return bool(res)
+48
View File
@@ -0,0 +1,48 @@
from takopi.model import ResumeToken
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.claude import ClaudeRunner
from takopi.runners.codex import CodexRunner
def _router() -> tuple[AutoRouter, ClaudeRunner, CodexRunner]:
codex = CodexRunner(codex_cmd="codex", extra_args=[])
claude = ClaudeRunner(claude_cmd="claude")
router = AutoRouter(
entries=[
RunnerEntry(engine=claude.engine, runner=claude),
RunnerEntry(engine=codex.engine, runner=codex),
],
default_engine=codex.engine,
)
return router, claude, codex
def test_router_resolves_text_before_reply() -> None:
router, _claude, _codex = _router()
token = router.resolve_resume("`codex resume abc`", "`claude --resume def`")
assert token == ResumeToken(engine="codex", value="abc")
def test_router_poll_order_selects_first_matching_runner() -> None:
router, _claude, _codex = _router()
text = "`codex resume abc`\n`claude --resume def`"
token = router.resolve_resume(text, None)
assert token == ResumeToken(engine="claude", value="def")
def test_router_resolves_reply_text_when_text_missing() -> None:
router, _claude, _codex = _router()
token = router.resolve_resume(None, "`codex resume xyz`")
assert token == ResumeToken(engine="codex", value="xyz")
def test_router_is_resume_line_union() -> None:
router, _claude, _codex = _router()
assert router.is_resume_line("`codex resume abc`")
assert router.is_resume_line("claude --resume def")
+99 -17
View File
@@ -3,9 +3,10 @@ import uuid
import anyio
import pytest
from takopi import engines
from takopi.bridge import _build_bot_commands, _strip_engine_command
from takopi.markdown import prepare_telegram, truncate_for_telegram
from takopi.model import EngineId, ResumeToken, TakopiEvent
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.codex import CodexRunner
from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Sleep, Wait
from tests.factories import action_completed, action_started
@@ -13,6 +14,13 @@ from tests.factories import action_completed, action_started
CODEX_ENGINE = EngineId("codex")
def _make_router(runner) -> AutoRouter:
return AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
def _patch_config(monkeypatch, config):
from pathlib import Path
@@ -33,7 +41,7 @@ def test_parse_bridge_config_rejects_empty_token(monkeypatch) -> None:
with pytest.raises(cli.ConfigError, match="bot_token"):
cli._parse_bridge_config(
final_notify=True,
backend=engines.get_backend("codex"),
default_engine_override=None,
)
@@ -45,7 +53,7 @@ def test_parse_bridge_config_rejects_string_chat_id(monkeypatch) -> None:
with pytest.raises(cli.ConfigError, match="chat_id"):
cli._parse_bridge_config(
final_notify=True,
backend=engines.get_backend("codex"),
default_engine_override=None,
)
@@ -114,6 +122,55 @@ def test_truncate_for_telegram_keeps_last_non_empty_line() -> None:
assert out.rstrip().endswith("last line")
def test_strip_engine_command_inline() -> None:
text, engine = _strip_engine_command(
"/claude do it", engine_ids=("codex", "claude")
)
assert engine == "claude"
assert text == "do it"
def test_strip_engine_command_newline() -> None:
text, engine = _strip_engine_command(
"/codex\nhello", engine_ids=("codex", "claude")
)
assert engine == "codex"
assert text == "hello"
def test_strip_engine_command_ignores_unknown() -> None:
text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude"))
assert engine is None
assert text == "/unknown hi"
def test_strip_engine_command_bot_suffix() -> None:
text, engine = _strip_engine_command(
"/claude@bunny_agent_bot hi", engine_ids=("claude",)
)
assert engine == "claude"
assert text == "hi"
def test_strip_engine_command_only_first_non_empty_line() -> None:
text, engine = _strip_engine_command(
"hello\n/claude hi", engine_ids=("codex", "claude")
)
assert engine is None
assert text == "hello\n/claude hi"
def test_build_bot_commands_includes_cancel_and_engine() -> None:
runner = ScriptRunner(
[Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid"
)
router = _make_router(runner)
commands = _build_bot_commands(router)
assert {"command": "cancel", "description": "cancel run"} in commands
assert any(cmd["command"] == "codex" for cmd in commands)
def test_prepare_telegram_drops_entities_on_truncate() -> None:
md = ("**bold** " * 200).strip()
@@ -126,6 +183,7 @@ def test_prepare_telegram_drops_entities_on_truncate() -> None:
class _FakeBot:
def __init__(self) -> None:
self._next_id = 1
self.command_calls: list[dict] = []
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[dict] = []
@@ -176,6 +234,22 @@ class _FakeBot:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
async def set_my_commands(
self,
commands: list[dict],
*,
scope: dict | None = None,
language_code: str | None = None,
) -> bool:
self.command_calls.append(
{
"commands": commands,
"scope": scope,
"language_code": language_code,
}
)
return True
async def get_updates(
self,
offset: int | None,
@@ -238,7 +312,7 @@ async def test_final_notify_sends_loud_final_message() -> None:
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -246,6 +320,7 @@ async def test_final_notify_sends_loud_final_message() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="hi",
@@ -265,7 +340,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -275,6 +350,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text=text,
@@ -295,7 +371,7 @@ async def test_new_final_message_forces_notification_when_too_long_to_edit() ->
runner = _return_runner(answer="x" * 10_000)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=False,
startup_msg="",
@@ -303,6 +379,7 @@ async def test_new_final_message_forces_notification_when_too_long_to_edit() ->
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="hi",
@@ -336,7 +413,7 @@ async def test_progress_edits_are_rate_limited() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -344,6 +421,7 @@ async def test_progress_edits_are_rate_limited() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="hi",
@@ -380,7 +458,7 @@ async def test_progress_edits_do_not_sleep_again_without_new_events() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -389,6 +467,7 @@ async def test_progress_edits_do_not_sleep_again_without_new_events() -> None:
async def run_handle_message() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="hi",
@@ -455,7 +534,7 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -463,6 +542,7 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=42,
text="do it",
@@ -488,7 +568,7 @@ async def test_handle_cancel_without_reply_prompts_user() -> None:
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -510,7 +590,7 @@ async def test_handle_cancel_with_no_progress_message_says_nothing_running() ->
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -536,7 +616,7 @@ async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -563,7 +643,7 @@ async def test_handle_cancel_cancels_running_task() -> None:
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -593,7 +673,7 @@ async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -638,7 +718,7 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -648,6 +728,7 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
async def run_handle_message() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="do something",
@@ -687,7 +768,7 @@ async def test_handle_message_error_preserves_resume_token() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
@@ -695,6 +776,7 @@ async def test_handle_message_error_preserves_resume_token() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="do something",
@@ -817,7 +899,7 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
)
cfg = BridgeConfig(
bot=bot,
runner=runner,
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",