From d35752fc5568f068a1a1231bfce4d042c5c6fc20 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Thu, 1 Jan 2026 20:31:11 +0400 Subject: [PATCH] feat: auto-discover runners (#12) --- changelog.md | 26 ++ docs/adding-a-runner.md | 122 +++-- docs/developing.md | 9 +- docs/runner/claude/claude-runner.md | 24 +- docs/runner/claude/claude-takopi-events.md | 10 +- docs/specification.md | 20 +- src/takopi/backends.py | 24 + src/takopi/backends_helpers.py | 14 + src/takopi/bridge.py | 7 +- src/takopi/cli.py | 62 +-- src/takopi/debug_onboarding.py | 8 +- src/takopi/engines.py | 177 ++------ src/takopi/onboarding.py | 22 +- src/takopi/runner.py | 352 ++++++++++++++- src/takopi/runners/claude.py | 325 +++++++------- src/takopi/runners/codex.py | 490 ++++++++++----------- src/takopi/runners/mock.py | 14 +- tests/test_claude_runner.py | 2 +- tests/test_engine_discovery.py | 39 ++ tests/test_exec_runner.py | 6 +- tests/test_onboarding.py | 14 +- 21 files changed, 1069 insertions(+), 698 deletions(-) create mode 100644 src/takopi/backends.py create mode 100644 src/takopi/backends_helpers.py create mode 100644 tests/test_engine_discovery.py diff --git a/changelog.md b/changelog.md index 1b906f1..3cf3a18 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,31 @@ # changelog +## unreleased + +### changes + +- add a claude code runner via the `claude` CLI with stream-json parsing and resume support +- auto-discover engine backends and generate CLI subcommands from the registry +- add `BaseRunner` session locking plus a `JsonlSubprocessRunner` helper for jsonl subprocess engines +- add jsonl stream parsing and subprocess helpers for runners +- lazily allocate per-session locks and streamline backend setup/install metadata +- improve startup message formatting and markdown rendering +- add a debug onboarding helper for setup troubleshooting + +### breaking + +- runner implementations must define explicit resume parsing/formatting (no implicit standard resume pattern) + +### fixes + +- stop leaking a hidden `engine-id` CLI option on engine subcommands + +### docs + +- add a runner guide plus Claude Code docs (runner, events, stream-json cheatsheet) +- clarify the Claude runner file layout and add guidance for JSONL-based runners +- document “minimal” runner mode: Started+Completed only, completed-only actions allowed + ## v0.2.0 (2025-12-31) ### changes diff --git a/docs/adding-a-runner.md b/docs/adding-a-runner.md index 7fc5912..6ff87c5 100644 --- a/docs/adding-a-runner.md +++ b/docs/adding-a-runner.md @@ -5,13 +5,13 @@ domain model. Use the existing runners (Codex/Claude) as references. ## Quick checklist -1. Implement `Runner` in `src/takopi/runners/.py`. +1. Implement `Runner` in `src/takopi/runners/.py` (usually via + `JsonlSubprocessRunner`). 2. Emit Takopi events from `takopi.model` and implement resume helpers (`format_resume`, `extract_resume`, `is_resume_line`). -3. Register an `EngineBackend` in `src/takopi/engines.py` with setup checks - and runner construction. -4. Add CLI subcommand in `src/takopi/cli.py`. -5. Extend tests (runner contract + engine-specific translation tests). +3. Define `BACKEND = EngineBackend(...)` in the runner module (auto-discovered), + including `install_cmd` (and `cli_cmd` only if the binary name differs). +4. Extend tests (runner contract + engine-specific translation tests). --- @@ -25,20 +25,41 @@ make it easy to drop in another engine without changing the Takopi domain model. - Engine id: `"pi"` (used in config, resume tokens, and CLI subcommand). - Canonical resume line: the engine’s own CLI resume command, e.g. `` `pi --resume ` ``. -- If your engine uses the standard `" resume "` format, you can - reuse `compile_resume_pattern()`. Otherwise, define a custom regex in the - runner (like Claude does). +- Pick the resume line format you want to support and define a regex for it in + the runner (Claude is a good example). If you choose the + `" resume "` shape, you can use that exact regex. ### 2) Implement `src/takopi/runners/pi.py` -Skeleton outline: +Recommended: `JsonlSubprocessRunner` + +For JSONL CLIs, this base class centralizes subprocess + JSONL plumbing, +lock timing, and completion semantics. Your runner usually only needs: + +- `command()` (binary name) +- `build_args(...)` +- `translate(...)` (map one JSON object to a list of Takopi events) + +Optional hooks for common variants: + +- `stdin_payload(...)`: return `None` if the prompt is passed via argv +- `env(...)`: add or redact environment variables +- `invalid_json_events(...)`: customize the warning event +- `process_error_events(...)`: customize `rc != 0` handling +- `stream_end_events(...)`: customize stream-end fallback (no `CompletedEvent`) +- `handle_started_event(...)`: customize session-id validation + +If you call `note_event(...)`, your state object must include `note_seq` or +override `next_note_id(...)`. + +Skeleton outline (JSONL CLI): ```py ENGINE: EngineId = "pi" _RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\\s]+)`?\\s*$") @dataclass -class PiRunner(SessionLockMixin, ResumeTokenMixin, Runner): +class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): engine: EngineId = ENGINE resume_re: re.Pattern[str] = _RESUME_RE @@ -46,30 +67,55 @@ class PiRunner(SessionLockMixin, ResumeTokenMixin, Runner): model: str | None = None allowed_tools: list[str] | None = None - def _build_args(self, prompt: str, resume: ResumeToken | None) -> list[str]: - args = ["--jsonl"] + def command(self) -> str: + return self.pi_cmd + + def build_args( + self, prompt: str, resume: ResumeToken | None, *, state: Any + ) -> list[str]: + _ = prompt, state + args = ["--jsonl", "--verbose"] if resume is not None: args.extend(["--resume", resume.value]) if self.model is not None: args.extend(["--model", self.model]) if self.allowed_tools: args.extend(["--allowed-tools", ",".join(self.allowed_tools)]) - args.append("--") - args.append(prompt) return args - async def run( - self, prompt: str, resume: ResumeToken | None - ) -> AsyncIterator[TakopiEvent]: - async for evt in self._run_with_resume_lock(prompt, resume, self._run): - yield evt + def stdin_payload( + self, prompt: str, resume: ResumeToken | None, *, state: Any + ) -> bytes | None: + _ = resume, state + return prompt.encode() + + def translate( + self, + data: dict[str, Any], + *, + state: Any, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + _ = state, resume, found_session + ... ``` Key implementation notes: -- Use `SessionLockMixin` to enforce per-session serialization. -- Use `ResumeTokenMixin` for `format_resume` / `extract_resume` / `is_resume_line`. -- Use `iter_jsonl(...)` + `drain_stderr(...)` from `takopi.utils.streams`. +- Use `BaseRunner` (or `JsonlSubprocessRunner`) for per-session serialization. +- Mix in `ResumeTokenMixin` (with a `resume_re`) or override + `format_resume` / `extract_resume` / `is_resume_line` so the runner owns + resume encoding/decoding. +- For JSONL CLIs, prefer `JsonlSubprocessRunner` and implement `command`, + `build_args`, and `translate` (override `stdin_payload` if the prompt should + be passed via argv instead of stdin). +- If you don’t use `JsonlSubprocessRunner`, use `iter_jsonl(...)` + + `drain_stderr(...)` from `takopi.utils.streams`. +- **Minimal mode is supported:** start with exactly one `StartedEvent` and one + `CompletedEvent`. `ActionEvent`s are optional and can be added later. If you + do emit actions, you can emit only `phase="completed"` notes without tracking + pending state. - **Do not truncate** tool outputs in the runner; pass full strings into events. Truncation belongs in renderers. @@ -94,13 +140,28 @@ Mapping guidance: If Pi emits warnings/errors before the final event, surface them as completed `ActionEvent`s (e.g., `kind="warning"`). -### 4) Register engine in `src/takopi/engines.py` +### 4) Expose the backend (auto-discovered) -Add: +Takopi discovers runners by importing modules in `takopi.runners` and looking +for a module-level `BACKEND: EngineBackend` (from `takopi.backends`). -- `_pi_check_setup()` that verifies `pi` exists on PATH -- `_pi_build_runner()` that reads `[pi]` config and returns `PiRunner` -- A new `EngineBackend(id="pi", display_name="Pi", ...)` entry +At the bottom of `src/takopi/runners/pi.py`, define: + +```py +BACKEND = EngineBackend( + id="pi", + build_runner=build_runner, + install_cmd="npm install -g @acme/pi-cli", +) +``` + +No changes to `engines.py` or `cli.py` are required. + +Only modules that define `BACKEND` are treated as engines. Internal/testing +modules (like `mock.py`) should omit it. + +If the CLI binary name differs from the engine id, set `cli_cmd="pi-cli"` on +the backend. Example config (minimal): @@ -110,12 +171,7 @@ model = "pi-large" allowed_tools = ["Bash", "Read"] ``` -### 5) Add CLI subcommand - -Expose `takopi pi` alongside `takopi codex` / `takopi claude` by adding a new -`@app.command()` in `src/takopi/cli.py`. - -### 6) Tests + fixtures +### 5) Tests + fixtures - Add `tests/test_pi_runner.py` for translation behavior. - Reuse `tests/test_runner_contract.py` to ensure lock/resume invariants. diff --git a/docs/developing.md b/docs/developing.md index a9a5f0f..0455631 100644 --- a/docs/developing.md +++ b/docs/developing.md @@ -106,9 +106,14 @@ Transforms takopi events into human-readable text: | `model.py` | Domain types: resume tokens, actions, events, run result | | `runner.py` | Runner protocol + event queue utilities | -### `engines.py` - Engine backend registry +### `backends.py` - Engine backend contracts -Registers available engines and provides setup checks + runner construction. +Defines `EngineBackend`, `SetupIssue`, and the `EngineConfig` type used by +runner modules. + +### `engines.py` - Engine backend discovery + +Auto-discovers runner modules in `takopi.runners` that export `BACKEND`. ### `runners/` - Runner implementations diff --git a/docs/runner/claude/claude-runner.md b/docs/runner/claude/claude-runner.md index 25e7214..b931b34 100644 --- a/docs/runner/claude/claude-runner.md +++ b/docs/runner/claude/claude-runner.md @@ -94,15 +94,19 @@ Notes: ## Code changes (by file) -### 1) `src/takopi/engines.py` +### 1) New file: `src/takopi/runners/claude.py` -Add a new backend: +#### Backend export -* Engine ID: `EngineId("claude")` +Expose a module-level `BACKEND = EngineBackend(...)` (from `takopi.backends`). +Takopi auto-discovers runners by importing `takopi.runners.*` and looking for +`BACKEND`. -* `check_setup()` should: +`BACKEND` should provide: - * `shutil.which("claude")` must exist. +* Engine id: `"claude"` +* `install_cmd`: + * Install command for `claude` (used by onboarding when missing on PATH). * Error message should include official install options and “run `claude` once to authenticate”. * Install methods include install scripts, Homebrew, and npm. ([Claude Code][4]) @@ -110,11 +114,7 @@ Add a new backend: * `build_runner()` should parse `[claude]` config and instantiate `ClaudeRunner`. -* `startup_message()` e.g.: - - * `takopi (claude) is ready\npwd: ...` - -### 2) New file: `src/takopi/runners/claude.py` +#### Runner implementation Implement a new `Runner`: @@ -319,7 +319,7 @@ Mirror the existing `CodexRunner` tests patterns. 1. **Contract & locking** -* `test_run_serializes_same_session` (stub `_run` like Codex tests) +* `test_run_serializes_same_session` (stub `run_impl` like Codex tests) * `test_run_allows_parallel_new_sessions` * `test_run_serializes_new_session_after_session_is_known`: @@ -367,7 +367,7 @@ Mirror the existing `CodexRunner` tests patterns. ## Implementation checklist -* [ ] Add `ClaudeBackend` in `src/takopi/engines.py` and register in `ENGINES`. +* [ ] Export `BACKEND = EngineBackend(...)` from `src/takopi/runners/claude.py`. * [ ] Add `src/takopi/runners/claude.py` implementing the `Runner` protocol. * [ ] Add tests + stub executable fixtures. * [ ] Update README and developing docs. diff --git a/docs/runner/claude/claude-takopi-events.md b/docs/runner/claude/claude-takopi-events.md index 1d9b93d..11bf7e3 100644 --- a/docs/runner/claude/claude-takopi-events.md +++ b/docs/runner/claude/claude-takopi-events.md @@ -44,8 +44,8 @@ Notes: `claude --resume ` ``` -Runner must implement its own regex (cannot use `compile_resume_pattern` because -that only matches ` resume `). Suggested regex: +Runner must implement its own regex because the resume format is +`claude --resume `. Suggested regex: ``` (?im)^\s*`?claude\s+(?:--resume|-r)\s+(?P[^`\s]+)`?\s*$ @@ -202,11 +202,9 @@ Add a Claude runner without changing the Takopi domain model: 1. Create `takopi/runners/claude.py` implementing `Runner` and (custom) resume parsing. -2. Update `takopi/engines.py`: - - add `claude` backend id - - `check_setup`: locate `claude` binary (PATH + common locations) +2. Define `BACKEND` in `takopi/runners/claude.py`: + - `install_cmd`: install command for the `claude` binary - `build_runner`: read `[claude]` config + construct runner - - `startup_message`: `"claude is ready\npwd: "` 3. Add new docs (this file + `claude-stream-json-cheatsheet.md`). 4. Add fixtures in `tests/fixtures/` (see below). 5. Add unit tests mirroring `tests/test_codex_*` but for Claude translation diff --git a/docs/specification.md b/docs/specification.md index 0b44c20..c9e48f8 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -169,6 +169,17 @@ Takopi MUST support the following event types: 2. `action` 3. `completed` +**Minimal runner mode (supported):** + +Runners MAY emit only: + +- exactly one `started` +- exactly one `completed` + +`action` events are optional. If emitted, a runner MAY emit only +`phase="completed"` action events (no requirement to emit `started` / `updated` +phases or track pending action state). + ### 5.3 Required fields by event type #### 5.3.1 `started` @@ -199,6 +210,10 @@ Optional: - `message: str` (freeform status/warning text) - `level: "debug" | "info" | "warning" | "error"` +Notes: + +- `phase="completed"` alone is valid; `started` / `updated` are optional. + #### 5.3.3 `completed` Required: @@ -424,6 +439,9 @@ The progress renderer SHOULD maintain: - completed actions and status - resume token if known +The progress renderer MUST tolerate “completed-only” actions (no prior +`started` / `updated`) and treat them as standalone steps. + If the runner emits multiple `action` events for the same `Action.id` while it is still running (e.g., repeated `phase="started"` or `phase="updated"`), the progress renderer SHOULD treat these as updates and collapse them into a single line (replacing the prior running line rather than appending a new one). ### 8.3 Final rendering @@ -462,7 +480,7 @@ The architecture SHOULD keep this future change localized to a `RunnerRegistry` 1. **Runner contract tests** - Emits exactly one `started` - - All actions have required fields and stable IDs + - All actions (if any) have required fields and stable IDs - `completed.resume` matches started token (when present) - Event ordering is preserved - `ok` semantics match intended behavior diff --git a/src/takopi/backends.py b/src/takopi/backends.py new file mode 100644 index 0000000..ed6b62c --- /dev/null +++ b/src/takopi/backends.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from .runner import Runner + +EngineConfig = dict[str, Any] + + +@dataclass(frozen=True, slots=True) +class SetupIssue: + title: str + lines: tuple[str, ...] + + +@dataclass(frozen=True, slots=True) +class EngineBackend: + id: str + build_runner: Callable[[EngineConfig, Path], Runner] + cli_cmd: str | None = None + install_cmd: str | None = None diff --git a/src/takopi/backends_helpers.py b/src/takopi/backends_helpers.py new file mode 100644 index 0000000..14ef80e --- /dev/null +++ b/src/takopi/backends_helpers.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from .backends import SetupIssue + + +def install_issue(cmd: str, install_cmd: str | None) -> SetupIssue: + if install_cmd: + lines = (f" [dim]$[/] {install_cmd}",) + else: + lines = (" [dim]See engine setup docs for install instructions.[/]",) + return SetupIssue( + f"install {cmd}", + lines, + ) diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py index fd02f59..689be24 100644 --- a/src/takopi/bridge.py +++ b/src/takopi/bridge.py @@ -206,7 +206,12 @@ class RunningTask: async def _send_startup(cfg: BridgeConfig) -> None: logger.debug("[startup] message: %s", cfg.startup_msg) - sent = await cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg) + sent, _ = await _send_or_edit_markdown( + cfg.bot, + chat_id=cfg.chat_id, + text=cfg.startup_msg, + limit=TELEGRAM_MARKDOWN_LIMIT, + ) if sent is not None: logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id) diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 5738fa3..2c8422e 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -1,14 +1,16 @@ from __future__ import annotations import os +from typing import Callable import anyio import typer from . import __version__ +from .backends import EngineBackend from .bridge import BridgeConfig, _run_main_loop from .config import ConfigError, load_telegram_config -from .engines import EngineBackend, get_backend, get_engine_config, list_backends +from .engines import get_backend, get_engine_config, list_backends from .logging import setup_logging from .onboarding import check_setup, render_engine_choice, render_setup_guide from .telegram import TelegramClient @@ -51,7 +53,11 @@ def _parse_bridge_config( chat_id = chat_id_value engine_cfg = get_engine_config(config, backend.id, config_path) - startup_msg = backend.startup_message(startup_pwd) + startup_msg = ( + f"\N{OCTOPUS} **takopi is ready**\n\n" + f"agent: `{backend.id}` \n" + f"working in: `{startup_pwd}`" + ) bot = TelegramClient(token) runner = backend.build_runner(engine_cfg, config_path) @@ -111,36 +117,32 @@ def app_main( raise typer.Exit(code=1) -@app.command(help="Run with the Codex engine.") -def codex( - final_notify: bool = typer.Option( - True, - "--final-notify/--no-final-notify", - help="Send the final response as a new message (not an edit).", - ), - debug: bool = typer.Option( - False, - "--debug/--no-debug", - help="Log engine JSONL, Telegram requests, and rendered messages.", - ), -) -> None: - _run_engine(engine="codex", final_notify=final_notify, debug=debug) +def make_engine_cmd(engine_id: str) -> Callable[..., None]: + def _cmd( + final_notify: bool = typer.Option( + True, + "--final-notify/--no-final-notify", + help="Send the final response as a new message (not an edit).", + ), + debug: bool = typer.Option( + False, + "--debug/--no-debug", + help="Log engine JSONL, Telegram requests, and rendered messages.", + ), + ) -> None: + _run_engine(engine=engine_id, final_notify=final_notify, debug=debug) + + _cmd.__name__ = f"run_{engine_id}" + return _cmd -@app.command(help="Run with the Claude engine.") -def claude( - final_notify: bool = typer.Option( - True, - "--final-notify/--no-final-notify", - help="Send the final response as a new message (not an edit).", - ), - debug: bool = typer.Option( - False, - "--debug/--no-debug", - help="Log engine JSONL, Telegram requests, and rendered messages.", - ), -) -> None: - _run_engine(engine="claude", final_notify=final_notify, debug=debug) +def register_engine_commands() -> None: + for backend in list_backends(): + help_text = f"Run with the {backend.id} engine." + app.command(name=backend.id, help=help_text)(make_engine_cmd(backend.id)) + + +register_engine_commands() def main() -> None: diff --git a/src/takopi/debug_onboarding.py b/src/takopi/debug_onboarding.py index d2d15ad..bf33d51 100644 --- a/src/takopi/debug_onboarding.py +++ b/src/takopi/debug_onboarding.py @@ -2,8 +2,9 @@ from __future__ import annotations import typer +from .backends import SetupIssue from .config import ConfigError -from .engines import SetupIssue, get_backend, get_install_issue, list_backend_ids +from .engines import get_backend, list_backend_ids from .onboarding import SetupResult, check_setup, config_issue, render_setup_guide @@ -37,10 +38,7 @@ def run( raise typer.Exit(code=1) setup = check_setup(backend) if force: - forced_issues = [ - get_install_issue(backend.id), - config_issue(setup.config_path), - ] + forced_issues = [config_issue(setup.config_path)] setup = SetupResult( issues=_dedupe_issues([*setup.issues, *forced_issues]), config_path=setup.config_path, diff --git a/src/takopi/engines.py b/src/takopi/engines.py index 3bffaee..9a6b0c4 100644 --- a/src/takopi/engines.py +++ b/src/takopi/engines.py @@ -1,165 +1,66 @@ from __future__ import annotations -import shutil -from dataclasses import dataclass +import importlib +import pkgutil from pathlib import Path -from typing import Any, Callable +from typing import Any +from .backends import EngineBackend, EngineConfig from .config import ConfigError -from .runner import Runner -from .runners.codex import CodexRunner -from .runners.claude import ClaudeRunner -EngineConfig = dict[str, Any] +_BACKENDS: dict[str, EngineBackend] | None = None -@dataclass(frozen=True, slots=True) -class SetupIssue: - title: str - lines: tuple[str, ...] +def _discover_backends() -> dict[str, EngineBackend]: + import takopi.runners as runners_pkg + + backends: dict[str, EngineBackend] = {} + prefix = runners_pkg.__name__ + "." + + for module_info in pkgutil.iter_modules(runners_pkg.__path__, prefix): + module_name = module_info.name + mod = importlib.import_module(module_name) + + backend = getattr(mod, "BACKEND", None) + if backend is None: + continue + if not isinstance(backend, EngineBackend): + raise RuntimeError(f"{module_name}.BACKEND is not an EngineBackend") + if backend.id in backends: + raise RuntimeError(f"Duplicate backend id: {backend.id}") + backends[backend.id] = backend + + return backends -@dataclass(frozen=True, slots=True) -class EngineBackend: - id: str - display_name: str - check_setup: Callable[[EngineConfig, Path], list[SetupIssue]] - build_runner: Callable[[EngineConfig, Path], Runner] - startup_message: Callable[[str], str] - - -def _codex_check_setup(_config: EngineConfig, _config_path: Path) -> list[SetupIssue]: - if shutil.which("codex") is None: - return [_codex_install_issue()] - return [] - - -def _codex_install_issue() -> SetupIssue: - return SetupIssue( - "Install the Codex CLI", - (" [dim]$[/] npm install -g @openai/codex",), - ) - - -def _codex_build_runner(config: EngineConfig, config_path: Path) -> Runner: - codex_cmd = shutil.which("codex") - if not codex_cmd: - raise ConfigError( - "codex not found on PATH. Install the Codex CLI with:\n" - " npm install -g @openai/codex\n" - " # or on macOS\n" - " brew install codex" - ) - - extra_args_value = config.get("extra_args") - if extra_args_value is None: - extra_args = ["-c", "notify=[]"] - elif isinstance(extra_args_value, list) and all( - isinstance(item, str) for item in extra_args_value - ): - extra_args = list(extra_args_value) - else: - raise ConfigError( - f"Invalid `codex.extra_args` in {config_path}; expected a list of strings." - ) - - title = "Codex" - profile_value = config.get("profile") - if profile_value: - if not isinstance(profile_value, str): - raise ConfigError( - f"Invalid `codex.profile` in {config_path}; expected a string." - ) - extra_args.extend(["--profile", profile_value]) - title = profile_value - - return CodexRunner(codex_cmd=codex_cmd, extra_args=extra_args, title=title) - - -def _codex_startup_message(cwd: str) -> str: - return f"codex is ready\npwd: {cwd}" - - -def _claude_check_setup(_config: EngineConfig, _config_path: Path) -> list[SetupIssue]: - claude_cmd = "claude" - if shutil.which(claude_cmd) is None: - return [_claude_install_issue()] - return [] - - -def _claude_install_issue() -> SetupIssue: - return SetupIssue( - "Install the Claude Code CLI", - (" [dim]$[/] npm install -g @anthropic-ai/claude-code",), - ) - - -def _claude_build_runner(config: EngineConfig, _config_path: Path) -> Runner: - claude_cmd = "claude" - - model = config.get("model") - allowed_tools = config.get("allowed_tools") - dangerously_skip_permissions = config.get("dangerously_skip_permissions") is True - use_api_billing = config.get("use_api_billing") is True - title = str(model) if model is not None else "claude" - - return ClaudeRunner( - claude_cmd=claude_cmd, - model=model, - allowed_tools=allowed_tools, - dangerously_skip_permissions=dangerously_skip_permissions, - use_api_billing=use_api_billing, - session_title=title, - ) - - -def _claude_startup_message(cwd: str) -> str: - return f"claude is ready\npwd: {cwd}" - - -_ENGINE_BACKENDS: dict[str, EngineBackend] = { - "codex": EngineBackend( - id="codex", - display_name="Codex", - check_setup=_codex_check_setup, - build_runner=_codex_build_runner, - startup_message=_codex_startup_message, - ), - "claude": EngineBackend( - id="claude", - display_name="Claude", - check_setup=_claude_check_setup, - build_runner=_claude_build_runner, - startup_message=_claude_startup_message, - ), -} +def _ensure_loaded() -> None: + global _BACKENDS + if _BACKENDS is None: + _BACKENDS = _discover_backends() def get_backend(engine_id: str) -> EngineBackend: + _ensure_loaded() + assert _BACKENDS is not None try: - return _ENGINE_BACKENDS[engine_id] + return _BACKENDS[engine_id] except KeyError as exc: - available = ", ".join(sorted(_ENGINE_BACKENDS)) + available = ", ".join(sorted(_BACKENDS)) raise ConfigError( f"Unknown engine {engine_id!r}. Available: {available}." ) from exc -def get_install_issue(engine_id: str) -> SetupIssue: - if engine_id == "codex": - return _codex_install_issue() - if engine_id == "claude": - return _claude_install_issue() - available = ", ".join(sorted(_ENGINE_BACKENDS)) - raise ConfigError(f"Unknown engine {engine_id!r}. Available: {available}.") - - def list_backends() -> list[EngineBackend]: - return list(_ENGINE_BACKENDS.values()) + _ensure_loaded() + assert _BACKENDS is not None + return [_BACKENDS[key] for key in sorted(_BACKENDS)] def list_backend_ids() -> list[str]: - return sorted(_ENGINE_BACKENDS) + _ensure_loaded() + assert _BACKENDS is not None + return sorted(_BACKENDS) def get_engine_config( diff --git a/src/takopi/onboarding.py b/src/takopi/onboarding.py index c357220..cb7720c 100644 --- a/src/takopi/onboarding.py +++ b/src/takopi/onboarding.py @@ -1,5 +1,6 @@ from __future__ import annotations +import shutil from dataclasses import dataclass from pathlib import Path from typing import Sequence @@ -7,8 +8,9 @@ from typing import Sequence from rich.console import Console from rich.panel import Panel +from .backends import EngineBackend, SetupIssue +from .backends_helpers import install_issue from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config -from .engines import EngineBackend, SetupIssue _OCTOPUS = "\N{OCTOPUS}" @@ -26,7 +28,7 @@ class SetupResult: def config_issue(path: Path) -> SetupIssue: config_display = _config_path_display(path) return SetupIssue( - "Create a config", + "create a config", ( f" [dim]{config_display}[/]", "", @@ -35,7 +37,7 @@ def config_issue(path: Path) -> SetupIssue: "", "[dim]" + ("-" * 56) + "[/]", "", - "[bold]Getting your Telegram credentials:[/]", + "[bold]getting your telegram credentials:[/]", "", " [cyan]bot_token[/] create a bot with [link=https://t.me/BotFather]@BotFather[/]", " [cyan]chat_id[/] message [link=https://t.me/myidbot]@myidbot[/] to get your id", @@ -47,11 +49,15 @@ def check_setup(backend: EngineBackend) -> SetupResult: issues: list[SetupIssue] = [] config_path = HOME_CONFIG_PATH config: dict = {} + cmd = backend.cli_cmd or backend.id + backend_issues: list[SetupIssue] = [] + if shutil.which(cmd) is None: + backend_issues.append(install_issue(cmd, backend.install_cmd)) try: config, config_path = load_telegram_config() except ConfigError: - issues.extend(backend.check_setup({}, config_path)) + issues.extend(backend_issues) issues.append(config_issue(config_path)) return SetupResult(issues=issues, config_path=config_path) @@ -61,7 +67,7 @@ def check_setup(backend: EngineBackend) -> SetupResult: missing_or_invalid_config = not (isinstance(token, str) and token.strip()) missing_or_invalid_config |= type(chat_id) is not int - issues.extend(backend.check_setup(config, config_path)) + issues.extend(backend_issues) if missing_or_invalid_config: issues.append(config_issue(config_path)) @@ -113,11 +119,7 @@ def render_engine_choice(backends: Sequence[EngineBackend]) -> None: parts.append("") for idx, backend in enumerate(backends, start=1): parts.append(f"[bold yellow]{idx}.[/] [dim]$[/] takopi {backend.id}") - if backend.id == "claude": - description = "use claude code" - else: - description = f"use {backend.display_name.lower()}" - parts.append(f" [dim]{description}[/]") + parts.append(f" [dim]use {backend.id}[/]") parts.append("") panel = Panel( diff --git a/src/takopi/runner.py b/src/takopi/runner.py index 8334ae3..fc8991b 100644 --- a/src/takopi/runner.py +++ b/src/takopi/runner.py @@ -2,19 +2,28 @@ from __future__ import annotations +import logging import re +import subprocess +from collections import deque from collections.abc import AsyncIterator, Callable -from typing import Protocol +from dataclasses import dataclass +from typing import Any, Protocol from weakref import WeakValueDictionary import anyio -from .model import EngineId, ResumeToken, TakopiEvent - - -def compile_resume_pattern(engine: EngineId) -> re.Pattern[str]: - name = re.escape(str(engine)) - return re.compile(rf"(?im)^\s*`?{name}\s+resume\s+(?P[^`\s]+)`?\s*$") +from .model import ( + Action, + ActionEvent, + CompletedEvent, + EngineId, + ResumeToken, + StartedEvent, + TakopiEvent, +) +from .utils.streams import drain_stderr, iter_jsonl +from .utils.subprocess import manage_subprocess class ResumeTokenMixin: @@ -44,17 +53,21 @@ class ResumeTokenMixin: class SessionLockMixin: engine: EngineId - _session_locks: WeakValueDictionary[str, anyio.Lock] + session_locks: WeakValueDictionary[str, anyio.Lock] | None = None - def _lock_for(self, token: ResumeToken) -> anyio.Lock: + def lock_for(self, token: ResumeToken) -> anyio.Lock: + locks = self.session_locks + if locks is None: + locks = WeakValueDictionary() + self.session_locks = locks key = f"{token.engine}:{token.value}" - lock = self._session_locks.get(key) + lock = locks.get(key) if lock is None: lock = anyio.Lock() - self._session_locks[key] = lock + locks[key] = lock return lock - async def _run_with_resume_lock( + async def run_with_resume_lock( self, prompt: str, resume: ResumeToken | None, @@ -69,12 +82,325 @@ class SessionLockMixin: async for evt in run_fn(prompt, resume_token): yield evt return - lock = self._lock_for(resume_token) + lock = self.lock_for(resume_token) async with lock: async for evt in run_fn(prompt, resume_token): yield evt +class BaseRunner(SessionLockMixin): + engine: EngineId + + async def run( + self, prompt: str, resume: ResumeToken | None + ) -> AsyncIterator[TakopiEvent]: + async for evt in self.run_locked(prompt, resume): + yield evt + + async def run_locked( + self, prompt: str, resume: ResumeToken | None + ) -> AsyncIterator[TakopiEvent]: + if resume is not None: + async for evt in self.run_with_resume_lock(prompt, resume, self.run_impl): + yield evt + return + + lock: anyio.Lock | None = None + acquired = False + try: + async for evt in self.run_impl(prompt, None): + if lock is None and isinstance(evt, StartedEvent): + lock = self.lock_for(evt.resume) + await lock.acquire() + acquired = True + yield evt + finally: + if acquired and lock is not None: + lock.release() + + async def run_impl( + self, prompt: str, resume: ResumeToken | None + ) -> AsyncIterator[TakopiEvent]: + if False: + yield # pragma: no cover + raise NotImplementedError + + +@dataclass +class JsonlRunState: + note_seq: int = 0 + + +class JsonlSubprocessRunner(BaseRunner): + stderr_tail_lines: int = 200 + + def get_logger(self) -> logging.Logger: + return getattr(self, "logger", logging.getLogger(__name__)) + + def command(self) -> str: + raise NotImplementedError + + def tag(self) -> str: + return str(self.engine) + + def build_args( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> list[str]: + raise NotImplementedError + + def stdin_payload( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> bytes | None: + return prompt.encode() + + def env(self, *, state: Any) -> dict[str, str] | None: + return None + + def new_state(self, prompt: str, resume: ResumeToken | None) -> Any: + return JsonlRunState() + + def start_run( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> None: + return None + + def pipes_error_message(self) -> str: + return f"{self.tag()} failed to open subprocess pipes" + + def next_note_id(self, state: Any) -> str: + try: + note_seq = state.note_seq + except AttributeError as exc: + raise RuntimeError( + "state must define note_seq or override next_note_id" + ) from exc + state.note_seq = note_seq + 1 + return f"{self.tag()}.note.{state.note_seq}" + + def note_event( + self, + message: str, + *, + state: Any, + ok: bool = False, + detail: dict[str, Any] | None = None, + ) -> TakopiEvent: + note_id = self.next_note_id(state) + action = Action( + id=note_id, + kind="warning", + title=message, + detail=detail or {}, + ) + return ActionEvent( + engine=self.engine, + action=action, + phase="completed", + ok=ok, + message=message, + level="info" if ok else "warning", + ) + + def invalid_json_events( + self, + *, + raw: str, + line: str, + state: Any, + ) -> list[TakopiEvent]: + message = f"invalid JSON from {self.tag()}; ignoring line" + return [self.note_event(message, state=state, detail={"line": line})] + + def process_error_events( + self, + rc: int, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: Any, + ) -> list[TakopiEvent]: + message = f"{self.tag()} failed (rc={rc})." + resume_for_completed = found_session or resume + return [ + self.note_event(message, state=state, detail={"stderr_tail": stderr_tail}), + CompletedEvent( + engine=self.engine, + ok=False, + answer="", + resume=resume_for_completed, + error=message, + ), + ] + + def stream_end_events( + self, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: Any, + ) -> list[TakopiEvent]: + message = f"{self.tag()} finished without a result event" + resume_for_completed = found_session or resume + return [ + CompletedEvent( + engine=self.engine, + ok=False, + answer="", + resume=resume_for_completed, + error=message, + ) + ] + + def translate( + self, + data: dict[str, Any], + *, + state: Any, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + raise NotImplementedError + + def handle_started_event( + self, + event: StartedEvent, + *, + expected_session: ResumeToken | None, + found_session: ResumeToken | None, + ) -> tuple[ResumeToken | None, bool]: + if event.engine != self.engine: + raise RuntimeError(f"{self.tag()} emitted session token for wrong engine") + if expected_session is not None and event.resume != expected_session: + message = f"{self.tag()} emitted a different session id than expected" + raise RuntimeError(message) + if found_session is None: + return event.resume, True + if event.resume != found_session: + message = f"{self.tag()} emitted a different session id than expected" + raise RuntimeError(message) + return found_session, False + + async def run_impl( + self, prompt: str, resume: ResumeToken | None + ) -> AsyncIterator[TakopiEvent]: + state = self.new_state(prompt, resume) + self.start_run(prompt, resume, state=state) + + tag = self.tag() + logger = self.get_logger() + args = [self.command(), *self.build_args(prompt, resume, state=state)] + payload = self.stdin_payload(prompt, resume, state=state) + env = self.env(state=state) + + async with manage_subprocess( + *args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + ) as proc: + if proc.stdout is None or proc.stderr is None: + raise RuntimeError(self.pipes_error_message()) + if payload is not None and proc.stdin is None: + raise RuntimeError(self.pipes_error_message()) + + logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, args) + + if payload is not None: + assert proc.stdin is not None + await proc.stdin.send(payload) + await proc.stdin.aclose() + elif proc.stdin is not None: + await proc.stdin.aclose() + + stderr_chunks: deque[str] = deque(maxlen=self.stderr_tail_lines) + rc: int | None = None + expected_session: ResumeToken | None = resume + found_session: ResumeToken | None = None + did_emit_completed = False + + async with anyio.create_task_group() as tg: + tg.start_soon( + drain_stderr, + proc.stderr, + stderr_chunks, + logger, + tag, + ) + async for json_line in iter_jsonl(proc.stdout, logger=logger, tag=tag): + if did_emit_completed: + continue + if json_line.data is None: + events = self.invalid_json_events( + raw=json_line.raw, + line=json_line.line, + state=state, + ) + else: + events = self.translate( + json_line.data, + state=state, + resume=resume, + found_session=found_session, + ) + + for evt in events: + if isinstance(evt, StartedEvent): + found_session, emit = self.handle_started_event( + evt, + expected_session=expected_session, + found_session=found_session, + ) + if not emit: + continue + if isinstance(evt, CompletedEvent): + did_emit_completed = True + yield evt + break + yield evt + + rc = await proc.wait() + + logger.debug("[%s] process exit pid=%s rc=%s", tag, proc.pid, rc) + if did_emit_completed: + return + stderr_tail = "".join(stderr_chunks) + if rc is not None and rc != 0: + events = self.process_error_events( + rc, + resume=resume, + found_session=found_session, + stderr_tail=stderr_tail, + state=state, + ) + for evt in events: + yield evt + return + + events = self.stream_end_events( + resume=resume, + found_session=found_session, + stderr_tail=stderr_tail, + state=state, + ) + for evt in events: + yield evt + + class Runner(Protocol): engine: str diff --git a/src/takopi/runners/claude.py b/src/takopi/runners/claude.py index db924f1..5c57ca2 100644 --- a/src/takopi/runners/claude.py +++ b/src/takopi/runners/claude.py @@ -3,15 +3,11 @@ from __future__ import annotations import logging import os import re -import subprocess -from collections import deque -from collections.abc import AsyncIterator from dataclasses import dataclass, field +from pathlib import Path from typing import Any, Literal -from weakref import WeakValueDictionary - -import anyio +from ..backends import EngineBackend, EngineConfig from ..model import ( Action, ActionEvent, @@ -22,10 +18,8 @@ from ..model import ( StartedEvent, TakopiEvent, ) -from ..runner import ResumeTokenMixin, Runner, SessionLockMixin +from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..utils.paths import relativize_command, relativize_path -from ..utils.streams import drain_stderr, iter_jsonl -from ..utils.subprocess import manage_subprocess logger = logging.getLogger(__name__) @@ -41,6 +35,7 @@ _RESUME_RE = re.compile( class ClaudeStreamState: pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None + note_seq: int = 0 def _action_event( @@ -61,27 +56,6 @@ def _action_event( ) -def _note_completed( - action_id: str, - message: str, - *, - ok: bool = False, - detail: dict[str, Any] | None = None, -) -> ActionEvent: - return _action_event( - phase="completed", - action=Action( - id=action_id, - kind="warning", - title=message, - detail=detail or {}, - ), - ok=ok, - message=message, - level="warning" if not ok else "info", - ) - - def _normalize_tool_result(content: Any) -> str: if isinstance(content, list): parts: list[str] = [] @@ -423,7 +397,7 @@ def translate_claude_event( @dataclass -class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner): +class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): engine: EngineId = ENGINE resume_re: re.Pattern[str] = _RESUME_RE @@ -433,9 +407,8 @@ class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner): dangerously_skip_permissions: bool = False use_api_billing: bool = False session_title: str = "claude" - _session_locks: WeakValueDictionary[str, anyio.Lock] = field( - default_factory=WeakValueDictionary, init=False, repr=False - ) + stderr_tail_lines = STDERR_TAIL_LINES + logger = logger def format_resume(self, token: ResumeToken) -> str: if token.engine != ENGINE: @@ -457,157 +430,163 @@ class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner): args.append(prompt) return args - async def run( - self, prompt: str, resume: ResumeToken | None - ) -> AsyncIterator[TakopiEvent]: - async for evt in self._run_with_resume_lock(prompt, resume, self._run): - yield evt + def command(self) -> str: + return self.claude_cmd - async def _run( # noqa: C901 + def build_args( self, prompt: str, - resume_token: ResumeToken | None, - ) -> AsyncIterator[TakopiEvent]: + resume: ResumeToken | None, + *, + state: Any, + ) -> list[str]: + _ = state + return self._build_args(prompt, resume) + + def stdin_payload( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> bytes | None: + _ = prompt, resume, state + return None + + def env(self, *, state: Any) -> dict[str, str] | None: + _ = state + if self.use_api_billing is not True: + env = dict(os.environ) + env.pop("ANTHROPIC_API_KEY", None) + return env + return None + + def new_state(self, prompt: str, resume: ResumeToken | None) -> ClaudeStreamState: + _ = prompt, resume + return ClaudeStreamState() + + def start_run( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: ClaudeStreamState, + ) -> None: + _ = state logger.info( "[claude] start run resume=%r", - resume_token.value if resume_token else None, + resume.value if resume else None, ) logger.debug("[claude] prompt: %s", prompt) - args = [self.claude_cmd] - args.extend(self._build_args(prompt, resume_token)) - session_lock: anyio.Lock | None = None - session_lock_acquired = False - did_emit_completed = False - note_seq = 0 - state = ClaudeStreamState() - expected_session = resume_token - found_session: ResumeToken | None = None + def invalid_json_events( + self, + *, + raw: str, + line: str, + state: ClaudeStreamState, + ) -> list[TakopiEvent]: + _ = line + message = "invalid JSON from claude; ignoring line" + return [self.note_event(message, state=state, detail={"line": raw})] - def next_note_id() -> str: - nonlocal note_seq - note_seq += 1 - return f"claude.note.{note_seq}" + def translate( + self, + data: dict[str, Any], + *, + state: ClaudeStreamState, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + _ = resume, found_session + return translate_claude_event( + data, + title=self.session_title, + state=state, + ) - try: - env: dict[str, str] | None = None - if self.use_api_billing is not True: - env = dict(os.environ) - env.pop("ANTHROPIC_API_KEY", None) - async with manage_subprocess( - *args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env, - ) as proc: - if proc.stdout is None or proc.stderr is None: - raise RuntimeError("claude failed to open subprocess pipes") - proc_stdout = proc.stdout - proc_stderr = proc.stderr - if proc.stdin is not None: - await proc.stdin.aclose() + def process_error_events( + self, + rc: int, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: ClaudeStreamState, + ) -> list[TakopiEvent]: + message = f"claude failed (rc={rc})." + resume_for_completed = found_session or resume + return [ + self.note_event( + message, + state=state, + ok=False, + detail={"stderr_tail": stderr_tail}, + ), + CompletedEvent( + engine=ENGINE, + ok=False, + answer="", + resume=resume_for_completed, + error=message, + ), + ] - stderr_chunks: deque[str] = deque(maxlen=STDERR_TAIL_LINES) - rc: int | None = None - - async with anyio.create_task_group() as tg: - tg.start_soon( - drain_stderr, - proc_stderr, - stderr_chunks, - logger, - "claude", - ) - async for json_line in iter_jsonl( - proc_stdout, logger=logger, tag="claude" - ): - if did_emit_completed: - continue - if json_line.data is None: - yield _note_completed( - next_note_id(), - "invalid JSON from claude; ignoring line", - ok=False, - detail={"line": json_line.raw}, - ) - continue - evt = json_line.data - - for out_evt in translate_claude_event( - evt, - title=self.session_title, - state=state, - ): - if isinstance(out_evt, StartedEvent): - session = out_evt.resume - if session.engine != ENGINE: - raise RuntimeError( - "claude emitted session token for wrong engine" - ) - if ( - expected_session is not None - and session != expected_session - ): - raise RuntimeError( - "claude emitted a different session id than expected" - ) - if expected_session is None: - session_lock = self._lock_for(session) - await session_lock.acquire() - session_lock_acquired = True - found_session = session - yield out_evt - continue - yield out_evt - if isinstance(out_evt, CompletedEvent): - did_emit_completed = True - break - rc = await proc.wait() - - logger.debug("[claude] process exit pid=%s rc=%s", proc.pid, rc) - if did_emit_completed: - return - - if rc != 0: - stderr_text = "".join(stderr_chunks) - message = f"claude failed (rc={rc})." - yield _note_completed( - next_note_id(), - message, - ok=False, - detail={"stderr_tail": stderr_text}, - ) - resume_for_completed = found_session or resume_token - yield CompletedEvent( - engine=ENGINE, - ok=False, - answer="", - resume=resume_for_completed, - error=message, - ) - return - - if not found_session: - message = "claude finished but no session_id was captured" - resume_for_completed = resume_token - yield CompletedEvent( - engine=ENGINE, - ok=False, - answer="", - resume=resume_for_completed, - error=message, - ) - return - - message = "claude finished without a result event" - yield CompletedEvent( + def stream_end_events( + self, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: ClaudeStreamState, + ) -> list[TakopiEvent]: + _ = stderr_tail + if not found_session: + message = "claude finished but no session_id was captured" + resume_for_completed = resume + return [ + CompletedEvent( engine=ENGINE, ok=False, - answer=state.last_assistant_text or "", - resume=found_session, + answer="", + resume=resume_for_completed, error=message, ) - finally: - if session_lock is not None and session_lock_acquired: - session_lock.release() + ] + + message = "claude finished without a result event" + return [ + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_assistant_text or "", + resume=found_session, + error=message, + ) + ] + + +def build_runner(config: EngineConfig, _config_path: Path) -> Runner: + claude_cmd = "claude" + + model = config.get("model") + allowed_tools = config.get("allowed_tools") + dangerously_skip_permissions = config.get("dangerously_skip_permissions") is True + use_api_billing = config.get("use_api_billing") is True + title = str(model) if model is not None else "claude" + + return ClaudeRunner( + claude_cmd=claude_cmd, + model=model, + allowed_tools=allowed_tools, + dangerously_skip_permissions=dangerously_skip_permissions, + use_api_billing=use_api_billing, + session_title=title, + ) + + +BACKEND = EngineBackend( + id="claude", + build_runner=build_runner, + install_cmd="npm install -g @anthropic-ai/claude-code", +) diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index 00d637c..4527ab7 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -1,14 +1,13 @@ from __future__ import annotations import logging -import subprocess -from collections import deque -from collections.abc import AsyncIterator +import re from dataclasses import dataclass +from pathlib import Path from typing import Any, cast -from weakref import WeakValueDictionary -import anyio +from ..backends import EngineBackend, EngineConfig +from ..config import ConfigError from ..model import ( Action, ActionEvent, @@ -21,15 +20,8 @@ from ..model import ( StartedEvent, TakopiEvent, ) -from ..runner import ( - ResumeTokenMixin, - Runner, - SessionLockMixin, - compile_resume_pattern, -) +from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..utils.paths import relativize_command -from ..utils.streams import drain_stderr, iter_jsonl -from ..utils.subprocess import manage_subprocess logger = logging.getLogger(__name__) @@ -46,7 +38,7 @@ _ACTION_KIND_MAP: dict[str, ActionKind] = { "todo_list": "note", } -_RESUME_RE = compile_resume_pattern(ENGINE) +_RESUME_RE = re.compile(r"(?im)^\s*`?codex\s+resume\s+(?P[^`\s]+)`?\s*$") def _started_event(token: ResumeToken, *, title: str) -> StartedEvent: @@ -98,25 +90,6 @@ def _action_event( ) -def _note_completed( - action_id: str, - message: str, - *, - ok: bool = False, - detail: dict[str, Any] | None = None, -) -> TakopiEvent: - return _action_event( - phase="completed", - action_id=action_id, - kind="warning", - title=message, - detail=detail, - ok=ok, - message=message, - level="warning" if not ok else "info", - ) - - def _short_tool_name(item: dict[str, Any]) -> str: name = ".".join(part for part in (item.get("server"), item.get("tool")) if part) return name or "tool" @@ -408,9 +381,18 @@ def translate_codex_event(event: dict[str, Any], *, title: str) -> list[TakopiEv return [] -class CodexRunner(SessionLockMixin, ResumeTokenMixin, Runner): +@dataclass +class CodexRunState: + note_seq: int = 0 + final_answer: str | None = None + turn_index: int = 0 + + +class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): engine: EngineId = ENGINE resume_re = _RESUME_RE + stderr_tail_lines = STDERR_TAIL_LINES + logger = logger def __init__( self, @@ -422,241 +404,237 @@ class CodexRunner(SessionLockMixin, ResumeTokenMixin, Runner): self.codex_cmd = codex_cmd self.extra_args = extra_args self.session_title = title - self._session_locks: WeakValueDictionary[str, anyio.Lock] = ( - WeakValueDictionary() - ) - async def run( - self, prompt: str, resume: ResumeToken | None - ) -> AsyncIterator[TakopiEvent]: - async for evt in self._run_with_resume_lock(prompt, resume, self._run): - yield evt + def command(self) -> str: + return self.codex_cmd - async def _run( # noqa: C901 + def build_args( self, prompt: str, - resume_token: ResumeToken | None, - ) -> AsyncIterator[TakopiEvent]: - logger.info( - "[codex] start run resume=%r", resume_token.value if resume_token else None - ) - logger.debug("[codex] prompt: %s", prompt) - args = [self.codex_cmd] - args.extend(self.extra_args) - args.extend(["exec", "--json"]) - - if resume_token: - args.extend(["resume", resume_token.value, "-"]) + resume: ResumeToken | None, + *, + state: Any, + ) -> list[str]: + _ = prompt, state + args = [*self.extra_args, "exec", "--json"] + if resume: + args.extend(["resume", resume.value, "-"]) else: args.append("-") - session_lock: anyio.Lock | None = None - session_lock_acquired = False + return args - try: - async with manage_subprocess( - *args, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) as proc: - if proc.stdin is None or proc.stdout is None or proc.stderr is None: - raise RuntimeError("codex exec failed to open subprocess pipes") - proc_stdin = proc.stdin - proc_stdout = proc.stdout - proc_stderr = proc.stderr - logger.debug("[codex] spawn pid=%s args=%r", proc.pid, args) + def new_state(self, prompt: str, resume: ResumeToken | None) -> CodexRunState: + _ = prompt, resume + return CodexRunState() - stderr_chunks: deque[str] = deque(maxlen=STDERR_TAIL_LINES) - rc: int | None = None + def start_run( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: CodexRunState, + ) -> None: + _ = state + logger.info("[codex] start run resume=%r", resume.value if resume else None) + logger.debug("[codex] prompt: %s", prompt) - expected_session: ResumeToken | None = resume_token - found_session: ResumeToken | None = None - final_answer: str | None = None - note_seq = 0 - did_emit_completed = False - turn_index = 0 + def pipes_error_message(self) -> str: + return "codex exec failed to open subprocess pipes" - def next_note_id() -> str: - nonlocal note_seq - note_seq += 1 - return f"codex.note.{note_seq}" + def handle_started_event( + self, + event: StartedEvent, + *, + expected_session: ResumeToken | None, + found_session: ResumeToken | None, + ) -> tuple[ResumeToken | None, bool]: + if event.engine != ENGINE: + raise RuntimeError( + f"codex emitted session token for engine {event.engine!r}" + ) + if expected_session is not None and event.resume != expected_session: + message = "codex emitted a different session id than expected" + raise RuntimeError(message) + if found_session is None: + return event.resume, True + if event.resume != found_session: + message = "codex emitted a different session id than expected" + raise RuntimeError(message) + return found_session, False - async with anyio.create_task_group() as tg: - tg.start_soon( - drain_stderr, - proc_stderr, - stderr_chunks, - logger, - "codex", - ) - await proc_stdin.send(prompt.encode()) - await proc_stdin.aclose() - - async for json_line in iter_jsonl( - proc_stdout, logger=logger, tag="codex" - ): - if did_emit_completed: - continue - if json_line.data is None: - note = _note_completed( - next_note_id(), - "invalid JSON from codex; ignoring line", - ok=False, - detail={"line": json_line.line}, - ) - yield note - continue - evt = json_line.data - - etype = evt.get("type") - if etype == "error": - message = str(evt.get("message") or "codex error") - fatal_flag = evt.get("fatal") - fatal = fatal_flag is True or fatal_flag is None - if fatal: - resume_for_completed = found_session or resume_token - yield _completed_event( - resume=resume_for_completed, - ok=False, - answer=final_answer or "", - error=message, - ) - did_emit_completed = True - continue - note = _note_completed( - next_note_id(), - message, - ok=False, - detail={ - "code": evt.get("code"), - "fatal": evt.get("fatal"), - }, - ) - yield note - continue - if etype == "turn.failed": - error = evt.get("error") or {} - message = str(error.get("message") or "codex turn failed") - resume_for_completed = found_session or resume_token - yield _completed_event( - resume=resume_for_completed, - ok=False, - answer=final_answer or "", - error=message, - ) - did_emit_completed = True - continue - if etype == "turn.rate_limited": - retry_ms = evt.get("retry_after_ms") - message = "rate limited" - if isinstance(retry_ms, int): - message = f"rate limited (retry after {retry_ms}ms)" - note = _note_completed(next_note_id(), message, ok=False) - yield note - continue - if etype == "turn.started": - action_id = f"turn_{turn_index}" - turn_index += 1 - yield _action_event( - phase="started", - action_id=action_id, - kind="turn", - title="turn started", - ) - continue - if etype == "turn.completed": - resume_for_completed = found_session or resume_token - yield _completed_event( - resume=resume_for_completed, - ok=True, - answer=final_answer or "", - usage=evt.get("usage"), - ) - did_emit_completed = True - continue - - if evt.get("type") == "item.completed": - item = evt.get("item") or {} - item_type = item.get("type") or item.get("item_type") - if item_type == "assistant_message": - item_type = "agent_message" - if item_type == "agent_message" and isinstance( - item.get("text"), str - ): - if final_answer is None: - final_answer = item["text"] - else: - logger.debug( - "[codex] emitted multiple agent messages; using the last one" - ) - final_answer = item["text"] - - for out_evt in translate_codex_event( - evt, title=self.session_title - ): - if isinstance(out_evt, StartedEvent): - session = out_evt.resume - if found_session is None: - if session.engine != ENGINE: - raise RuntimeError( - f"codex emitted session token for engine {session.engine!r}" - ) - if ( - expected_session is not None - and session != expected_session - ): - message = "codex emitted a different session id than expected" - raise RuntimeError(message) - if expected_session is None: - session_lock = self._lock_for(session) - await session_lock.acquire() - session_lock_acquired = True - found_session = session - yield out_evt - continue - yield out_evt - rc = await proc.wait() - - logger.debug("[codex] process exit pid=%s rc=%s", proc.pid, rc) - if did_emit_completed: - return - if rc != 0: - stderr_text = "".join(stderr_chunks) - message = f"codex exec failed (rc={rc})." - yield _note_completed( - next_note_id(), - message, - ok=False, - detail={"stderr_tail": stderr_text}, - ) - resume_for_completed = found_session or resume_token - yield _completed_event( + def translate( + self, + data: dict[str, Any], + *, + state: CodexRunState, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + etype = data.get("type") + if etype == "error": + message = str(data.get("message") or "codex error") + fatal_flag = data.get("fatal") + fatal = fatal_flag is True or fatal_flag is None + if fatal: + resume_for_completed = found_session or resume + return [ + _completed_event( resume=resume_for_completed, ok=False, - answer=final_answer or "", + answer=state.final_answer or "", error=message, ) - return - - if not found_session: - message = ( - "codex exec finished but no session_id/thread_id was captured" - ) - resume_for_completed = resume_token - yield _completed_event( - resume=resume_for_completed, - ok=False, - answer=final_answer or "", - error=message, - ) - return - - logger.info("[codex] done run session=%s", found_session.value) - yield _completed_event( - resume=found_session, - ok=True, - answer=final_answer or "", + ] + return [ + self.note_event( + message, + state=state, + ok=False, + detail={"code": data.get("code"), "fatal": data.get("fatal")}, ) - finally: - if session_lock is not None and session_lock_acquired: - session_lock.release() + ] + if etype == "turn.failed": + error = data.get("error") or {} + message = str(error.get("message") or "codex turn failed") + resume_for_completed = found_session or resume + return [ + _completed_event( + resume=resume_for_completed, + ok=False, + answer=state.final_answer or "", + error=message, + ) + ] + if etype == "turn.rate_limited": + retry_ms = data.get("retry_after_ms") + message = "rate limited" + if isinstance(retry_ms, int): + message = f"rate limited (retry after {retry_ms}ms)" + return [self.note_event(message, state=state, ok=False)] + if etype == "turn.started": + action_id = f"turn_{state.turn_index}" + state.turn_index += 1 + return [ + _action_event( + phase="started", + action_id=action_id, + kind="turn", + title="turn started", + ) + ] + if etype == "turn.completed": + resume_for_completed = found_session or resume + return [ + _completed_event( + resume=resume_for_completed, + ok=True, + answer=state.final_answer or "", + usage=data.get("usage"), + ) + ] + + if data.get("type") == "item.completed": + item = data.get("item") or {} + item_type = item.get("type") or item.get("item_type") + if item_type == "assistant_message": + item_type = "agent_message" + if item_type == "agent_message" and isinstance(item.get("text"), str): + if state.final_answer is None: + state.final_answer = item["text"] + else: + logger.debug( + "[codex] emitted multiple agent messages; using the last one" + ) + state.final_answer = item["text"] + + return translate_codex_event(data, title=self.session_title) + + def process_error_events( + self, + rc: int, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: CodexRunState, + ) -> list[TakopiEvent]: + message = f"codex exec failed (rc={rc})." + resume_for_completed = found_session or resume + return [ + self.note_event( + message, + state=state, + ok=False, + detail={"stderr_tail": stderr_tail}, + ), + _completed_event( + resume=resume_for_completed, + ok=False, + answer=state.final_answer or "", + error=message, + ), + ] + + def stream_end_events( + self, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: CodexRunState, + ) -> list[TakopiEvent]: + _ = stderr_tail + if not found_session: + message = "codex exec finished but no session_id/thread_id was captured" + resume_for_completed = resume + return [ + _completed_event( + resume=resume_for_completed, + ok=False, + answer=state.final_answer or "", + error=message, + ) + ] + logger.info("[codex] done run session=%s", found_session.value) + return [ + _completed_event( + resume=found_session, + ok=True, + answer=state.final_answer or "", + ) + ] + + +def build_runner(config: EngineConfig, config_path: Path) -> Runner: + codex_cmd = "codex" + + extra_args_value = config.get("extra_args") + if extra_args_value is None: + extra_args = ["-c", "notify=[]"] + elif isinstance(extra_args_value, list) and all( + isinstance(item, str) for item in extra_args_value + ): + extra_args = list(extra_args_value) + else: + raise ConfigError( + f"Invalid `codex.extra_args` in {config_path}; expected a list of strings." + ) + + title = "Codex" + profile_value = config.get("profile") + if profile_value: + if not isinstance(profile_value, str): + raise ConfigError( + f"Invalid `codex.profile` in {config_path}; expected a string." + ) + extra_args.extend(["--profile", profile_value]) + title = profile_value + + return CodexRunner(codex_cmd=codex_cmd, extra_args=extra_args, title=title) + + +BACKEND = EngineBackend( + id="codex", + build_runner=build_runner, + install_cmd="npm install -g @openai/codex", +) diff --git a/src/takopi/runners/mock.py b/src/takopi/runners/mock.py index 7c85946..b4dcbfd 100644 --- a/src/takopi/runners/mock.py +++ b/src/takopi/runners/mock.py @@ -1,10 +1,10 @@ from __future__ import annotations +import re import uuid from collections.abc import AsyncIterator, Awaitable, Callable, Iterable from dataclasses import dataclass, replace from typing import TypeAlias -from weakref import WeakValueDictionary import anyio @@ -16,7 +16,7 @@ from ..model import ( StartedEvent, TakopiEvent, ) -from ..runner import ResumeTokenMixin, Runner, SessionLockMixin, compile_resume_pattern +from ..runner import ResumeTokenMixin, Runner, SessionLockMixin ENGINE: EngineId = EngineId("mock") @@ -76,10 +76,10 @@ class MockRunner(SessionLockMixin, ResumeTokenMixin, Runner): self._answer = answer self._resume_value = resume_value self.title = title or str(engine).title() - self._session_locks: WeakValueDictionary[str, anyio.Lock] = ( - WeakValueDictionary() + engine_name = re.escape(str(engine)) + self.resume_re = re.compile( + rf"(?im)^\s*`?{engine_name}\s+resume\s+(?P[^`\s]+)`?\s*$" ) - self.resume_re = compile_resume_pattern(engine) async def run( self, prompt: str, resume: ResumeToken | None @@ -100,7 +100,7 @@ class MockRunner(SessionLockMixin, ResumeTokenMixin, Runner): resume=token, title=self.title, ) - lock = self._lock_for(token) + lock = self.lock_for(token) async with lock: yield session_evt @@ -174,7 +174,7 @@ class ScriptRunner(MockRunner): resume=token, title=self.title, ) - lock = self._lock_for(token) + lock = self.lock_for(token) async with lock: if self._emit_session_start: diff --git a/tests/test_claude_runner.py b/tests/test_claude_runner.py index 1adf56c..ee50504 100644 --- a/tests/test_claude_runner.py +++ b/tests/test_claude_runner.py @@ -149,7 +149,7 @@ async def test_run_serializes_same_session() -> None: finally: in_flight -= 1 - runner._run = run_stub # type: ignore[assignment] + runner.run_impl = run_stub # type: ignore[assignment] async def drain(prompt: str, resume: ResumeToken | None) -> None: async for _event in runner.run(prompt, resume): diff --git a/tests/test_engine_discovery.py b/tests/test_engine_discovery.py new file mode 100644 index 0000000..2aae74c --- /dev/null +++ b/tests/test_engine_discovery.py @@ -0,0 +1,39 @@ +from typing import cast + +import click +import typer + +from takopi import cli, engines + + +def test_engine_discovery_skips_non_backend() -> None: + ids = engines.list_backend_ids() + assert "codex" in ids + assert "claude" in ids + assert "mock" not in ids + + +def test_cli_registers_engine_commands_sorted() -> None: + command_names = [cmd.name for cmd in cli.app.registered_commands] + engine_ids = engines.list_backend_ids() + assert set(engine_ids) <= set(command_names) + engine_commands = [name for name in command_names if name in engine_ids] + assert engine_commands == engine_ids + + +def test_engine_commands_do_not_expose_engine_id_option() -> None: + group = cast(click.Group, typer.main.get_command(cli.app)) + engine_ids = engines.list_backend_ids() + + ctx = group.make_context("takopi", []) + + for engine_id in engine_ids: + command = group.get_command(ctx, engine_id) + assert command is not None + options: set[str] = set() + for param in command.params: + options.update(getattr(param, "opts", [])) + options.update(getattr(param, "secondary_opts", [])) + assert "--final-notify" in options + assert "--debug" in options + assert not any(opt.lstrip("-") == "engine-id" for opt in options) diff --git a/tests/test_exec_runner.py b/tests/test_exec_runner.py index 0cc6075..5d7cd9c 100644 --- a/tests/test_exec_runner.py +++ b/tests/test_exec_runner.py @@ -39,7 +39,7 @@ async def test_run_serializes_same_session() -> None: finally: in_flight -= 1 - runner._run = run_stub # type: ignore[assignment] + runner.run_impl = run_stub # type: ignore[assignment] async def drain(prompt: str, resume: ResumeToken | None) -> None: async for _event in runner.run(prompt, resume): @@ -76,7 +76,7 @@ async def test_run_allows_parallel_new_sessions() -> None: finally: in_flight -= 1 - runner._run = run_stub # type: ignore[assignment] + runner.run_impl = run_stub # type: ignore[assignment] async def drain(prompt: str, resume: ResumeToken | None) -> None: async for _event in runner.run(prompt, resume): @@ -112,7 +112,7 @@ async def test_run_allows_parallel_different_sessions() -> None: finally: in_flight -= 1 - runner._run = run_stub # type: ignore[assignment] + runner.run_impl = run_stub # type: ignore[assignment] async def drain(prompt: str, resume: ResumeToken | None) -> None: async for _event in runner.run(prompt, resume): diff --git a/tests/test_onboarding.py b/tests/test_onboarding.py index d2839ea..442917e 100644 --- a/tests/test_onboarding.py +++ b/tests/test_onboarding.py @@ -7,7 +7,7 @@ from takopi import engines, onboarding def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None: backend = engines.get_backend("codex") - monkeypatch.setattr(engines.shutil, "which", lambda _name: None) + monkeypatch.setattr(onboarding.shutil, "which", lambda _name: None) monkeypatch.setattr( onboarding, "load_telegram_config", @@ -17,14 +17,14 @@ def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None: result = onboarding.check_setup(backend) titles = {issue.title for issue in result.issues} - assert "Install the Codex CLI" in titles - assert "Create a config" not in titles + assert "install codex" in titles + assert "create a config" not in titles assert result.ok is False def test_check_setup_marks_missing_config(monkeypatch) -> None: backend = engines.get_backend("codex") - monkeypatch.setattr(engines.shutil, "which", lambda _name: "/usr/bin/codex") + monkeypatch.setattr(onboarding.shutil, "which", lambda _name: "/usr/bin/codex") def _raise() -> None: raise onboarding.ConfigError("Missing config file") @@ -34,13 +34,13 @@ def test_check_setup_marks_missing_config(monkeypatch) -> None: result = onboarding.check_setup(backend) titles = {issue.title for issue in result.issues} - assert "Create a config" in titles + assert "create a config" in titles assert result.config_path == onboarding.HOME_CONFIG_PATH def test_check_setup_marks_invalid_chat_id(monkeypatch, tmp_path: Path) -> None: backend = engines.get_backend("codex") - monkeypatch.setattr(engines.shutil, "which", lambda _name: "/usr/bin/codex") + monkeypatch.setattr(onboarding.shutil, "which", lambda _name: "/usr/bin/codex") monkeypatch.setattr( onboarding, "load_telegram_config", @@ -50,4 +50,4 @@ def test_check_setup_marks_invalid_chat_id(monkeypatch, tmp_path: Path) -> None: result = onboarding.check_setup(backend) titles = {issue.title for issue in result.issues} - assert "Create a config" in titles + assert "create a config" in titles