feat: auto-discover runners (#12)

This commit is contained in:
banteg
2026-01-01 20:31:11 +04:00
committed by GitHub
parent 936ea5109b
commit d35752fc55
21 changed files with 1069 additions and 698 deletions
+26
View File
@@ -1,5 +1,31 @@
# changelog
## unreleased
### changes
- add a claude code runner via the `claude` CLI with stream-json parsing and resume support
- auto-discover engine backends and generate CLI subcommands from the registry
- add `BaseRunner` session locking plus a `JsonlSubprocessRunner` helper for jsonl subprocess engines
- add jsonl stream parsing and subprocess helpers for runners
- lazily allocate per-session locks and streamline backend setup/install metadata
- improve startup message formatting and markdown rendering
- add a debug onboarding helper for setup troubleshooting
### breaking
- runner implementations must define explicit resume parsing/formatting (no implicit standard resume pattern)
### fixes
- stop leaking a hidden `engine-id` CLI option on engine subcommands
### docs
- add a runner guide plus Claude Code docs (runner, events, stream-json cheatsheet)
- clarify the Claude runner file layout and add guidance for JSONL-based runners
- document “minimal” runner mode: Started+Completed only, completed-only actions allowed
## v0.2.0 (2025-12-31)
### changes
+89 -33
View File
@@ -5,13 +5,13 @@ domain model. Use the existing runners (Codex/Claude) as references.
## Quick checklist
1. Implement `Runner` in `src/takopi/runners/<engine>.py`.
1. Implement `Runner` in `src/takopi/runners/<engine>.py` (usually via
`JsonlSubprocessRunner`).
2. Emit Takopi events from `takopi.model` and implement resume helpers
(`format_resume`, `extract_resume`, `is_resume_line`).
3. Register an `EngineBackend` in `src/takopi/engines.py` with setup checks
and runner construction.
4. Add CLI subcommand in `src/takopi/cli.py`.
5. Extend tests (runner contract + engine-specific translation tests).
3. Define `BACKEND = EngineBackend(...)` in the runner module (auto-discovered),
including `install_cmd` (and `cli_cmd` only if the binary name differs).
4. Extend tests (runner contract + engine-specific translation tests).
---
@@ -25,20 +25,41 @@ make it easy to drop in another engine without changing the Takopi domain model.
- Engine id: `"pi"` (used in config, resume tokens, and CLI subcommand).
- Canonical resume line: the engines own CLI resume command, e.g.
`` `pi --resume <session_id>` ``.
- If your engine uses the standard `"<engine> resume <token>"` format, you can
reuse `compile_resume_pattern()`. Otherwise, define a custom regex in the
runner (like Claude does).
- Pick the resume line format you want to support and define a regex for it in
the runner (Claude is a good example). If you choose the
`"<engine> resume <token>"` shape, you can use that exact regex.
### 2) Implement `src/takopi/runners/pi.py`
Skeleton outline:
Recommended: `JsonlSubprocessRunner`
For JSONL CLIs, this base class centralizes subprocess + JSONL plumbing,
lock timing, and completion semantics. Your runner usually only needs:
- `command()` (binary name)
- `build_args(...)`
- `translate(...)` (map one JSON object to a list of Takopi events)
Optional hooks for common variants:
- `stdin_payload(...)`: return `None` if the prompt is passed via argv
- `env(...)`: add or redact environment variables
- `invalid_json_events(...)`: customize the warning event
- `process_error_events(...)`: customize `rc != 0` handling
- `stream_end_events(...)`: customize stream-end fallback (no `CompletedEvent`)
- `handle_started_event(...)`: customize session-id validation
If you call `note_event(...)`, your state object must include `note_seq` or
override `next_note_id(...)`.
Skeleton outline (JSONL CLI):
```py
ENGINE: EngineId = "pi"
_RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\\s]+)`?\\s*$")
@dataclass
class PiRunner(SessionLockMixin, ResumeTokenMixin, Runner):
class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re: re.Pattern[str] = _RESUME_RE
@@ -46,30 +67,55 @@ class PiRunner(SessionLockMixin, ResumeTokenMixin, Runner):
model: str | None = None
allowed_tools: list[str] | None = None
def _build_args(self, prompt: str, resume: ResumeToken | None) -> list[str]:
args = ["--jsonl"]
def command(self) -> str:
return self.pi_cmd
def build_args(
self, prompt: str, resume: ResumeToken | None, *, state: Any
) -> list[str]:
_ = prompt, state
args = ["--jsonl", "--verbose"]
if resume is not None:
args.extend(["--resume", resume.value])
if self.model is not None:
args.extend(["--model", self.model])
if self.allowed_tools:
args.extend(["--allowed-tools", ",".join(self.allowed_tools)])
args.append("--")
args.append(prompt)
return args
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
async for evt in self._run_with_resume_lock(prompt, resume, self._run):
yield evt
def stdin_payload(
self, prompt: str, resume: ResumeToken | None, *, state: Any
) -> bytes | None:
_ = resume, state
return prompt.encode()
def translate(
self,
data: dict[str, Any],
*,
state: Any,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = state, resume, found_session
...
```
Key implementation notes:
- Use `SessionLockMixin` to enforce per-session serialization.
- Use `ResumeTokenMixin` for `format_resume` / `extract_resume` / `is_resume_line`.
- Use `iter_jsonl(...)` + `drain_stderr(...)` from `takopi.utils.streams`.
- Use `BaseRunner` (or `JsonlSubprocessRunner`) for per-session serialization.
- Mix in `ResumeTokenMixin` (with a `resume_re`) or override
`format_resume` / `extract_resume` / `is_resume_line` so the runner owns
resume encoding/decoding.
- For JSONL CLIs, prefer `JsonlSubprocessRunner` and implement `command`,
`build_args`, and `translate` (override `stdin_payload` if the prompt should
be passed via argv instead of stdin).
- If you dont use `JsonlSubprocessRunner`, use `iter_jsonl(...)` +
`drain_stderr(...)` from `takopi.utils.streams`.
- **Minimal mode is supported:** start with exactly one `StartedEvent` and one
`CompletedEvent`. `ActionEvent`s are optional and can be added later. If you
do emit actions, you can emit only `phase="completed"` notes without tracking
pending state.
- **Do not truncate** tool outputs in the runner; pass full strings into events.
Truncation belongs in renderers.
@@ -94,13 +140,28 @@ Mapping guidance:
If Pi emits warnings/errors before the final event, surface them as completed
`ActionEvent`s (e.g., `kind="warning"`).
### 4) Register engine in `src/takopi/engines.py`
### 4) Expose the backend (auto-discovered)
Add:
Takopi discovers runners by importing modules in `takopi.runners` and looking
for a module-level `BACKEND: EngineBackend` (from `takopi.backends`).
- `_pi_check_setup()` that verifies `pi` exists on PATH
- `_pi_build_runner()` that reads `[pi]` config and returns `PiRunner`
- A new `EngineBackend(id="pi", display_name="Pi", ...)` entry
At the bottom of `src/takopi/runners/pi.py`, define:
```py
BACKEND = EngineBackend(
id="pi",
build_runner=build_runner,
install_cmd="npm install -g @acme/pi-cli",
)
```
No changes to `engines.py` or `cli.py` are required.
Only modules that define `BACKEND` are treated as engines. Internal/testing
modules (like `mock.py`) should omit it.
If the CLI binary name differs from the engine id, set `cli_cmd="pi-cli"` on
the backend.
Example config (minimal):
@@ -110,12 +171,7 @@ model = "pi-large"
allowed_tools = ["Bash", "Read"]
```
### 5) Add CLI subcommand
Expose `takopi pi` alongside `takopi codex` / `takopi claude` by adding a new
`@app.command()` in `src/takopi/cli.py`.
### 6) Tests + fixtures
### 5) Tests + fixtures
- Add `tests/test_pi_runner.py` for translation behavior.
- Reuse `tests/test_runner_contract.py` to ensure lock/resume invariants.
+7 -2
View File
@@ -106,9 +106,14 @@ Transforms takopi events into human-readable text:
| `model.py` | Domain types: resume tokens, actions, events, run result |
| `runner.py` | Runner protocol + event queue utilities |
### `engines.py` - Engine backend registry
### `backends.py` - Engine backend contracts
Registers available engines and provides setup checks + runner construction.
Defines `EngineBackend`, `SetupIssue`, and the `EngineConfig` type used by
runner modules.
### `engines.py` - Engine backend discovery
Auto-discovers runner modules in `takopi.runners` that export `BACKEND`.
### `runners/` - Runner implementations
+12 -12
View File
@@ -94,15 +94,19 @@ Notes:
## Code changes (by file)
### 1) `src/takopi/engines.py`
### 1) New file: `src/takopi/runners/claude.py`
Add a new backend:
#### Backend export
* Engine ID: `EngineId("claude")`
Expose a module-level `BACKEND = EngineBackend(...)` (from `takopi.backends`).
Takopi auto-discovers runners by importing `takopi.runners.*` and looking for
`BACKEND`.
* `check_setup()` should:
`BACKEND` should provide:
* `shutil.which("claude")` must exist.
* Engine id: `"claude"`
* `install_cmd`:
* Install command for `claude` (used by onboarding when missing on PATH).
* Error message should include official install options and “run `claude` once to authenticate”.
* Install methods include install scripts, Homebrew, and npm. ([Claude Code][4])
@@ -110,11 +114,7 @@ Add a new backend:
* `build_runner()` should parse `[claude]` config and instantiate `ClaudeRunner`.
* `startup_message()` e.g.:
* `takopi (claude) is ready\npwd: ...`
### 2) New file: `src/takopi/runners/claude.py`
#### Runner implementation
Implement a new `Runner`:
@@ -319,7 +319,7 @@ Mirror the existing `CodexRunner` tests patterns.
1. **Contract & locking**
* `test_run_serializes_same_session` (stub `_run` like Codex tests)
* `test_run_serializes_same_session` (stub `run_impl` like Codex tests)
* `test_run_allows_parallel_new_sessions`
* `test_run_serializes_new_session_after_session_is_known`:
@@ -367,7 +367,7 @@ Mirror the existing `CodexRunner` tests patterns.
## Implementation checklist
* [ ] Add `ClaudeBackend` in `src/takopi/engines.py` and register in `ENGINES`.
* [ ] Export `BACKEND = EngineBackend(...)` from `src/takopi/runners/claude.py`.
* [ ] Add `src/takopi/runners/claude.py` implementing the `Runner` protocol.
* [ ] Add tests + stub executable fixtures.
* [ ] Update README and developing docs.
+4 -6
View File
@@ -44,8 +44,8 @@ Notes:
`claude --resume <session_id>`
```
Runner must implement its own regex (cannot use `compile_resume_pattern` because
that only matches `<engine> resume <token>`). Suggested regex:
Runner must implement its own regex because the resume format is
`claude --resume <session_id>`. Suggested regex:
```
(?im)^\s*`?claude\s+(?:--resume|-r)\s+(?P<token>[^`\s]+)`?\s*$
@@ -202,11 +202,9 @@ Add a Claude runner without changing the Takopi domain model:
1. Create `takopi/runners/claude.py` implementing `Runner` and (custom)
resume parsing.
2. Update `takopi/engines.py`:
- add `claude` backend id
- `check_setup`: locate `claude` binary (PATH + common locations)
2. Define `BACKEND` in `takopi/runners/claude.py`:
- `install_cmd`: install command for the `claude` binary
- `build_runner`: read `[claude]` config + construct runner
- `startup_message`: `"claude is ready\npwd: <cwd>"`
3. Add new docs (this file + `claude-stream-json-cheatsheet.md`).
4. Add fixtures in `tests/fixtures/` (see below).
5. Add unit tests mirroring `tests/test_codex_*` but for Claude translation
+19 -1
View File
@@ -169,6 +169,17 @@ Takopi MUST support the following event types:
2. `action`
3. `completed`
**Minimal runner mode (supported):**
Runners MAY emit only:
- exactly one `started`
- exactly one `completed`
`action` events are optional. If emitted, a runner MAY emit only
`phase="completed"` action events (no requirement to emit `started` / `updated`
phases or track pending action state).
### 5.3 Required fields by event type
#### 5.3.1 `started`
@@ -199,6 +210,10 @@ Optional:
- `message: str` (freeform status/warning text)
- `level: "debug" | "info" | "warning" | "error"`
Notes:
- `phase="completed"` alone is valid; `started` / `updated` are optional.
#### 5.3.3 `completed`
Required:
@@ -424,6 +439,9 @@ The progress renderer SHOULD maintain:
- completed actions and status
- resume token if known
The progress renderer MUST tolerate “completed-only” actions (no prior
`started` / `updated`) and treat them as standalone steps.
If the runner emits multiple `action` events for the same `Action.id` while it is still running (e.g., repeated `phase="started"` or `phase="updated"`), the progress renderer SHOULD treat these as updates and collapse them into a single line (replacing the prior running line rather than appending a new one).
### 8.3 Final rendering
@@ -462,7 +480,7 @@ The architecture SHOULD keep this future change localized to a `RunnerRegistry`
1. **Runner contract tests**
- Emits exactly one `started`
- All actions have required fields and stable IDs
- All actions (if any) have required fields and stable IDs
- `completed.resume` matches started token (when present)
- Event ordering is preserved
- `ok` semantics match intended behavior
+24
View File
@@ -0,0 +1,24 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
if TYPE_CHECKING:
from .runner import Runner
EngineConfig = dict[str, Any]
@dataclass(frozen=True, slots=True)
class SetupIssue:
title: str
lines: tuple[str, ...]
@dataclass(frozen=True, slots=True)
class EngineBackend:
id: str
build_runner: Callable[[EngineConfig, Path], Runner]
cli_cmd: str | None = None
install_cmd: str | None = None
+14
View File
@@ -0,0 +1,14 @@
from __future__ import annotations
from .backends import SetupIssue
def install_issue(cmd: str, install_cmd: str | None) -> SetupIssue:
if install_cmd:
lines = (f" [dim]$[/] {install_cmd}",)
else:
lines = (" [dim]See engine setup docs for install instructions.[/]",)
return SetupIssue(
f"install {cmd}",
lines,
)
+6 -1
View File
@@ -206,7 +206,12 @@ class RunningTask:
async def _send_startup(cfg: BridgeConfig) -> None:
logger.debug("[startup] message: %s", cfg.startup_msg)
sent = await cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg)
sent, _ = await _send_or_edit_markdown(
cfg.bot,
chat_id=cfg.chat_id,
text=cfg.startup_msg,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
if sent is not None:
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
+21 -19
View File
@@ -1,14 +1,16 @@
from __future__ import annotations
import os
from typing import Callable
import anyio
import typer
from . import __version__
from .backends import EngineBackend
from .bridge import BridgeConfig, _run_main_loop
from .config import ConfigError, load_telegram_config
from .engines import EngineBackend, get_backend, get_engine_config, list_backends
from .engines import get_backend, get_engine_config, list_backends
from .logging import setup_logging
from .onboarding import check_setup, render_engine_choice, render_setup_guide
from .telegram import TelegramClient
@@ -51,7 +53,11 @@ def _parse_bridge_config(
chat_id = chat_id_value
engine_cfg = get_engine_config(config, backend.id, config_path)
startup_msg = backend.startup_message(startup_pwd)
startup_msg = (
f"\N{OCTOPUS} **takopi is ready**\n\n"
f"agent: `{backend.id}` \n"
f"working in: `{startup_pwd}`"
)
bot = TelegramClient(token)
runner = backend.build_runner(engine_cfg, config_path)
@@ -111,8 +117,8 @@ def app_main(
raise typer.Exit(code=1)
@app.command(help="Run with the Codex engine.")
def codex(
def make_engine_cmd(engine_id: str) -> Callable[..., None]:
def _cmd(
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
@@ -124,23 +130,19 @@ def codex(
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
_run_engine(engine="codex", final_notify=final_notify, debug=debug)
_run_engine(engine=engine_id, final_notify=final_notify, debug=debug)
_cmd.__name__ = f"run_{engine_id}"
return _cmd
@app.command(help="Run with the Claude engine.")
def claude(
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
_run_engine(engine="claude", final_notify=final_notify, debug=debug)
def register_engine_commands() -> None:
for backend in list_backends():
help_text = f"Run with the {backend.id} engine."
app.command(name=backend.id, help=help_text)(make_engine_cmd(backend.id))
register_engine_commands()
def main() -> None:
+3 -5
View File
@@ -2,8 +2,9 @@ from __future__ import annotations
import typer
from .backends import SetupIssue
from .config import ConfigError
from .engines import SetupIssue, get_backend, get_install_issue, list_backend_ids
from .engines import get_backend, list_backend_ids
from .onboarding import SetupResult, check_setup, config_issue, render_setup_guide
@@ -37,10 +38,7 @@ def run(
raise typer.Exit(code=1)
setup = check_setup(backend)
if force:
forced_issues = [
get_install_issue(backend.id),
config_issue(setup.config_path),
]
forced_issues = [config_issue(setup.config_path)]
setup = SetupResult(
issues=_dedupe_issues([*setup.issues, *forced_issues]),
config_path=setup.config_path,
+39 -138
View File
@@ -1,165 +1,66 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
import importlib
import pkgutil
from pathlib import Path
from typing import Any, Callable
from typing import Any
from .backends import EngineBackend, EngineConfig
from .config import ConfigError
from .runner import Runner
from .runners.codex import CodexRunner
from .runners.claude import ClaudeRunner
EngineConfig = dict[str, Any]
_BACKENDS: dict[str, EngineBackend] | None = None
@dataclass(frozen=True, slots=True)
class SetupIssue:
title: str
lines: tuple[str, ...]
def _discover_backends() -> dict[str, EngineBackend]:
import takopi.runners as runners_pkg
backends: dict[str, EngineBackend] = {}
prefix = runners_pkg.__name__ + "."
for module_info in pkgutil.iter_modules(runners_pkg.__path__, prefix):
module_name = module_info.name
mod = importlib.import_module(module_name)
backend = getattr(mod, "BACKEND", None)
if backend is None:
continue
if not isinstance(backend, EngineBackend):
raise RuntimeError(f"{module_name}.BACKEND is not an EngineBackend")
if backend.id in backends:
raise RuntimeError(f"Duplicate backend id: {backend.id}")
backends[backend.id] = backend
return backends
@dataclass(frozen=True, slots=True)
class EngineBackend:
id: str
display_name: str
check_setup: Callable[[EngineConfig, Path], list[SetupIssue]]
build_runner: Callable[[EngineConfig, Path], Runner]
startup_message: Callable[[str], str]
def _codex_check_setup(_config: EngineConfig, _config_path: Path) -> list[SetupIssue]:
if shutil.which("codex") is None:
return [_codex_install_issue()]
return []
def _codex_install_issue() -> SetupIssue:
return SetupIssue(
"Install the Codex CLI",
(" [dim]$[/] npm install -g @openai/codex",),
)
def _codex_build_runner(config: EngineConfig, config_path: Path) -> Runner:
codex_cmd = shutil.which("codex")
if not codex_cmd:
raise ConfigError(
"codex not found on PATH. Install the Codex CLI with:\n"
" npm install -g @openai/codex\n"
" # or on macOS\n"
" brew install codex"
)
extra_args_value = config.get("extra_args")
if extra_args_value is None:
extra_args = ["-c", "notify=[]"]
elif isinstance(extra_args_value, list) and all(
isinstance(item, str) for item in extra_args_value
):
extra_args = list(extra_args_value)
else:
raise ConfigError(
f"Invalid `codex.extra_args` in {config_path}; expected a list of strings."
)
title = "Codex"
profile_value = config.get("profile")
if profile_value:
if not isinstance(profile_value, str):
raise ConfigError(
f"Invalid `codex.profile` in {config_path}; expected a string."
)
extra_args.extend(["--profile", profile_value])
title = profile_value
return CodexRunner(codex_cmd=codex_cmd, extra_args=extra_args, title=title)
def _codex_startup_message(cwd: str) -> str:
return f"codex is ready\npwd: {cwd}"
def _claude_check_setup(_config: EngineConfig, _config_path: Path) -> list[SetupIssue]:
claude_cmd = "claude"
if shutil.which(claude_cmd) is None:
return [_claude_install_issue()]
return []
def _claude_install_issue() -> SetupIssue:
return SetupIssue(
"Install the Claude Code CLI",
(" [dim]$[/] npm install -g @anthropic-ai/claude-code",),
)
def _claude_build_runner(config: EngineConfig, _config_path: Path) -> Runner:
claude_cmd = "claude"
model = config.get("model")
allowed_tools = config.get("allowed_tools")
dangerously_skip_permissions = config.get("dangerously_skip_permissions") is True
use_api_billing = config.get("use_api_billing") is True
title = str(model) if model is not None else "claude"
return ClaudeRunner(
claude_cmd=claude_cmd,
model=model,
allowed_tools=allowed_tools,
dangerously_skip_permissions=dangerously_skip_permissions,
use_api_billing=use_api_billing,
session_title=title,
)
def _claude_startup_message(cwd: str) -> str:
return f"claude is ready\npwd: {cwd}"
_ENGINE_BACKENDS: dict[str, EngineBackend] = {
"codex": EngineBackend(
id="codex",
display_name="Codex",
check_setup=_codex_check_setup,
build_runner=_codex_build_runner,
startup_message=_codex_startup_message,
),
"claude": EngineBackend(
id="claude",
display_name="Claude",
check_setup=_claude_check_setup,
build_runner=_claude_build_runner,
startup_message=_claude_startup_message,
),
}
def _ensure_loaded() -> None:
global _BACKENDS
if _BACKENDS is None:
_BACKENDS = _discover_backends()
def get_backend(engine_id: str) -> EngineBackend:
_ensure_loaded()
assert _BACKENDS is not None
try:
return _ENGINE_BACKENDS[engine_id]
return _BACKENDS[engine_id]
except KeyError as exc:
available = ", ".join(sorted(_ENGINE_BACKENDS))
available = ", ".join(sorted(_BACKENDS))
raise ConfigError(
f"Unknown engine {engine_id!r}. Available: {available}."
) from exc
def get_install_issue(engine_id: str) -> SetupIssue:
if engine_id == "codex":
return _codex_install_issue()
if engine_id == "claude":
return _claude_install_issue()
available = ", ".join(sorted(_ENGINE_BACKENDS))
raise ConfigError(f"Unknown engine {engine_id!r}. Available: {available}.")
def list_backends() -> list[EngineBackend]:
return list(_ENGINE_BACKENDS.values())
_ensure_loaded()
assert _BACKENDS is not None
return [_BACKENDS[key] for key in sorted(_BACKENDS)]
def list_backend_ids() -> list[str]:
return sorted(_ENGINE_BACKENDS)
_ensure_loaded()
assert _BACKENDS is not None
return sorted(_BACKENDS)
def get_engine_config(
+12 -10
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Sequence
@@ -7,8 +8,9 @@ from typing import Sequence
from rich.console import Console
from rich.panel import Panel
from .backends import EngineBackend, SetupIssue
from .backends_helpers import install_issue
from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config
from .engines import EngineBackend, SetupIssue
_OCTOPUS = "\N{OCTOPUS}"
@@ -26,7 +28,7 @@ class SetupResult:
def config_issue(path: Path) -> SetupIssue:
config_display = _config_path_display(path)
return SetupIssue(
"Create a config",
"create a config",
(
f" [dim]{config_display}[/]",
"",
@@ -35,7 +37,7 @@ def config_issue(path: Path) -> SetupIssue:
"",
"[dim]" + ("-" * 56) + "[/]",
"",
"[bold]Getting your Telegram credentials:[/]",
"[bold]getting your telegram credentials:[/]",
"",
" [cyan]bot_token[/] create a bot with [link=https://t.me/BotFather]@BotFather[/]",
" [cyan]chat_id[/] message [link=https://t.me/myidbot]@myidbot[/] to get your id",
@@ -47,11 +49,15 @@ def check_setup(backend: EngineBackend) -> SetupResult:
issues: list[SetupIssue] = []
config_path = HOME_CONFIG_PATH
config: dict = {}
cmd = backend.cli_cmd or backend.id
backend_issues: list[SetupIssue] = []
if shutil.which(cmd) is None:
backend_issues.append(install_issue(cmd, backend.install_cmd))
try:
config, config_path = load_telegram_config()
except ConfigError:
issues.extend(backend.check_setup({}, config_path))
issues.extend(backend_issues)
issues.append(config_issue(config_path))
return SetupResult(issues=issues, config_path=config_path)
@@ -61,7 +67,7 @@ def check_setup(backend: EngineBackend) -> SetupResult:
missing_or_invalid_config = not (isinstance(token, str) and token.strip())
missing_or_invalid_config |= type(chat_id) is not int
issues.extend(backend.check_setup(config, config_path))
issues.extend(backend_issues)
if missing_or_invalid_config:
issues.append(config_issue(config_path))
@@ -113,11 +119,7 @@ def render_engine_choice(backends: Sequence[EngineBackend]) -> None:
parts.append("")
for idx, backend in enumerate(backends, start=1):
parts.append(f"[bold yellow]{idx}.[/] [dim]$[/] takopi {backend.id}")
if backend.id == "claude":
description = "use claude code"
else:
description = f"use {backend.display_name.lower()}"
parts.append(f" [dim]{description}[/]")
parts.append(f" [dim]use {backend.id}[/]")
parts.append("")
panel = Panel(
+339 -13
View File
@@ -2,19 +2,28 @@
from __future__ import annotations
import logging
import re
import subprocess
from collections import deque
from collections.abc import AsyncIterator, Callable
from typing import Protocol
from dataclasses import dataclass
from typing import Any, Protocol
from weakref import WeakValueDictionary
import anyio
from .model import EngineId, ResumeToken, TakopiEvent
def compile_resume_pattern(engine: EngineId) -> re.Pattern[str]:
name = re.escape(str(engine))
return re.compile(rf"(?im)^\s*`?{name}\s+resume\s+(?P<token>[^`\s]+)`?\s*$")
from .model import (
Action,
ActionEvent,
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from .utils.streams import drain_stderr, iter_jsonl
from .utils.subprocess import manage_subprocess
class ResumeTokenMixin:
@@ -44,17 +53,21 @@ class ResumeTokenMixin:
class SessionLockMixin:
engine: EngineId
_session_locks: WeakValueDictionary[str, anyio.Lock]
session_locks: WeakValueDictionary[str, anyio.Lock] | None = None
def _lock_for(self, token: ResumeToken) -> anyio.Lock:
def lock_for(self, token: ResumeToken) -> anyio.Lock:
locks = self.session_locks
if locks is None:
locks = WeakValueDictionary()
self.session_locks = locks
key = f"{token.engine}:{token.value}"
lock = self._session_locks.get(key)
lock = locks.get(key)
if lock is None:
lock = anyio.Lock()
self._session_locks[key] = lock
locks[key] = lock
return lock
async def _run_with_resume_lock(
async def run_with_resume_lock(
self,
prompt: str,
resume: ResumeToken | None,
@@ -69,12 +82,325 @@ class SessionLockMixin:
async for evt in run_fn(prompt, resume_token):
yield evt
return
lock = self._lock_for(resume_token)
lock = self.lock_for(resume_token)
async with lock:
async for evt in run_fn(prompt, resume_token):
yield evt
class BaseRunner(SessionLockMixin):
engine: EngineId
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
async for evt in self.run_locked(prompt, resume):
yield evt
async def run_locked(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
if resume is not None:
async for evt in self.run_with_resume_lock(prompt, resume, self.run_impl):
yield evt
return
lock: anyio.Lock | None = None
acquired = False
try:
async for evt in self.run_impl(prompt, None):
if lock is None and isinstance(evt, StartedEvent):
lock = self.lock_for(evt.resume)
await lock.acquire()
acquired = True
yield evt
finally:
if acquired and lock is not None:
lock.release()
async def run_impl(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
if False:
yield # pragma: no cover
raise NotImplementedError
@dataclass
class JsonlRunState:
note_seq: int = 0
class JsonlSubprocessRunner(BaseRunner):
stderr_tail_lines: int = 200
def get_logger(self) -> logging.Logger:
return getattr(self, "logger", logging.getLogger(__name__))
def command(self) -> str:
raise NotImplementedError
def tag(self) -> str:
return str(self.engine)
def build_args(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> list[str]:
raise NotImplementedError
def stdin_payload(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> bytes | None:
return prompt.encode()
def env(self, *, state: Any) -> dict[str, str] | None:
return None
def new_state(self, prompt: str, resume: ResumeToken | None) -> Any:
return JsonlRunState()
def start_run(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> None:
return None
def pipes_error_message(self) -> str:
return f"{self.tag()} failed to open subprocess pipes"
def next_note_id(self, state: Any) -> str:
try:
note_seq = state.note_seq
except AttributeError as exc:
raise RuntimeError(
"state must define note_seq or override next_note_id"
) from exc
state.note_seq = note_seq + 1
return f"{self.tag()}.note.{state.note_seq}"
def note_event(
self,
message: str,
*,
state: Any,
ok: bool = False,
detail: dict[str, Any] | None = None,
) -> TakopiEvent:
note_id = self.next_note_id(state)
action = Action(
id=note_id,
kind="warning",
title=message,
detail=detail or {},
)
return ActionEvent(
engine=self.engine,
action=action,
phase="completed",
ok=ok,
message=message,
level="info" if ok else "warning",
)
def invalid_json_events(
self,
*,
raw: str,
line: str,
state: Any,
) -> list[TakopiEvent]:
message = f"invalid JSON from {self.tag()}; ignoring line"
return [self.note_event(message, state=state, detail={"line": line})]
def process_error_events(
self,
rc: int,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: Any,
) -> list[TakopiEvent]:
message = f"{self.tag()} failed (rc={rc})."
resume_for_completed = found_session or resume
return [
self.note_event(message, state=state, detail={"stderr_tail": stderr_tail}),
CompletedEvent(
engine=self.engine,
ok=False,
answer="",
resume=resume_for_completed,
error=message,
),
]
def stream_end_events(
self,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: Any,
) -> list[TakopiEvent]:
message = f"{self.tag()} finished without a result event"
resume_for_completed = found_session or resume
return [
CompletedEvent(
engine=self.engine,
ok=False,
answer="",
resume=resume_for_completed,
error=message,
)
]
def translate(
self,
data: dict[str, Any],
*,
state: Any,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
raise NotImplementedError
def handle_started_event(
self,
event: StartedEvent,
*,
expected_session: ResumeToken | None,
found_session: ResumeToken | None,
) -> tuple[ResumeToken | None, bool]:
if event.engine != self.engine:
raise RuntimeError(f"{self.tag()} emitted session token for wrong engine")
if expected_session is not None and event.resume != expected_session:
message = f"{self.tag()} emitted a different session id than expected"
raise RuntimeError(message)
if found_session is None:
return event.resume, True
if event.resume != found_session:
message = f"{self.tag()} emitted a different session id than expected"
raise RuntimeError(message)
return found_session, False
async def run_impl(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
state = self.new_state(prompt, resume)
self.start_run(prompt, resume, state=state)
tag = self.tag()
logger = self.get_logger()
args = [self.command(), *self.build_args(prompt, resume, state=state)]
payload = self.stdin_payload(prompt, resume, state=state)
env = self.env(state=state)
async with manage_subprocess(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
) as proc:
if proc.stdout is None or proc.stderr is None:
raise RuntimeError(self.pipes_error_message())
if payload is not None and proc.stdin is None:
raise RuntimeError(self.pipes_error_message())
logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, args)
if payload is not None:
assert proc.stdin is not None
await proc.stdin.send(payload)
await proc.stdin.aclose()
elif proc.stdin is not None:
await proc.stdin.aclose()
stderr_chunks: deque[str] = deque(maxlen=self.stderr_tail_lines)
rc: int | None = None
expected_session: ResumeToken | None = resume
found_session: ResumeToken | None = None
did_emit_completed = False
async with anyio.create_task_group() as tg:
tg.start_soon(
drain_stderr,
proc.stderr,
stderr_chunks,
logger,
tag,
)
async for json_line in iter_jsonl(proc.stdout, logger=logger, tag=tag):
if did_emit_completed:
continue
if json_line.data is None:
events = self.invalid_json_events(
raw=json_line.raw,
line=json_line.line,
state=state,
)
else:
events = self.translate(
json_line.data,
state=state,
resume=resume,
found_session=found_session,
)
for evt in events:
if isinstance(evt, StartedEvent):
found_session, emit = self.handle_started_event(
evt,
expected_session=expected_session,
found_session=found_session,
)
if not emit:
continue
if isinstance(evt, CompletedEvent):
did_emit_completed = True
yield evt
break
yield evt
rc = await proc.wait()
logger.debug("[%s] process exit pid=%s rc=%s", tag, proc.pid, rc)
if did_emit_completed:
return
stderr_tail = "".join(stderr_chunks)
if rc is not None and rc != 0:
events = self.process_error_events(
rc,
resume=resume,
found_session=found_session,
stderr_tail=stderr_tail,
state=state,
)
for evt in events:
yield evt
return
events = self.stream_end_events(
resume=resume,
found_session=found_session,
stderr_tail=stderr_tail,
state=state,
)
for evt in events:
yield evt
class Runner(Protocol):
engine: str
+126 -147
View File
@@ -3,15 +3,11 @@ from __future__ import annotations
import logging
import os
import re
import subprocess
from collections import deque
from collections.abc import AsyncIterator
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Literal
from weakref import WeakValueDictionary
import anyio
from ..backends import EngineBackend, EngineConfig
from ..model import (
Action,
ActionEvent,
@@ -22,10 +18,8 @@ from ..model import (
StartedEvent,
TakopiEvent,
)
from ..runner import ResumeTokenMixin, Runner, SessionLockMixin
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..utils.paths import relativize_command, relativize_path
from ..utils.streams import drain_stderr, iter_jsonl
from ..utils.subprocess import manage_subprocess
logger = logging.getLogger(__name__)
@@ -41,6 +35,7 @@ _RESUME_RE = re.compile(
class ClaudeStreamState:
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
note_seq: int = 0
def _action_event(
@@ -61,27 +56,6 @@ def _action_event(
)
def _note_completed(
action_id: str,
message: str,
*,
ok: bool = False,
detail: dict[str, Any] | None = None,
) -> ActionEvent:
return _action_event(
phase="completed",
action=Action(
id=action_id,
kind="warning",
title=message,
detail=detail or {},
),
ok=ok,
message=message,
level="warning" if not ok else "info",
)
def _normalize_tool_result(content: Any) -> str:
if isinstance(content, list):
parts: list[str] = []
@@ -423,7 +397,7 @@ def translate_claude_event(
@dataclass
class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner):
class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re: re.Pattern[str] = _RESUME_RE
@@ -433,9 +407,8 @@ class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner):
dangerously_skip_permissions: bool = False
use_api_billing: bool = False
session_title: str = "claude"
_session_locks: WeakValueDictionary[str, anyio.Lock] = field(
default_factory=WeakValueDictionary, init=False, repr=False
)
stderr_tail_lines = STDERR_TAIL_LINES
logger = logger
def format_resume(self, token: ResumeToken) -> str:
if token.engine != ENGINE:
@@ -457,157 +430,163 @@ class ClaudeRunner(SessionLockMixin, ResumeTokenMixin, Runner):
args.append(prompt)
return args
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
async for evt in self._run_with_resume_lock(prompt, resume, self._run):
yield evt
def command(self) -> str:
return self.claude_cmd
async def _run( # noqa: C901
def build_args(
self,
prompt: str,
resume_token: ResumeToken | None,
) -> AsyncIterator[TakopiEvent]:
logger.info(
"[claude] start run resume=%r",
resume_token.value if resume_token else None,
)
logger.debug("[claude] prompt: %s", prompt)
args = [self.claude_cmd]
args.extend(self._build_args(prompt, resume_token))
resume: ResumeToken | None,
*,
state: Any,
) -> list[str]:
_ = state
return self._build_args(prompt, resume)
session_lock: anyio.Lock | None = None
session_lock_acquired = False
did_emit_completed = False
note_seq = 0
state = ClaudeStreamState()
expected_session = resume_token
found_session: ResumeToken | None = None
def stdin_payload(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> bytes | None:
_ = prompt, resume, state
return None
def next_note_id() -> str:
nonlocal note_seq
note_seq += 1
return f"claude.note.{note_seq}"
try:
env: dict[str, str] | None = None
def env(self, *, state: Any) -> dict[str, str] | None:
_ = state
if self.use_api_billing is not True:
env = dict(os.environ)
env.pop("ANTHROPIC_API_KEY", None)
async with manage_subprocess(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
) as proc:
if proc.stdout is None or proc.stderr is None:
raise RuntimeError("claude failed to open subprocess pipes")
proc_stdout = proc.stdout
proc_stderr = proc.stderr
if proc.stdin is not None:
await proc.stdin.aclose()
return env
return None
stderr_chunks: deque[str] = deque(maxlen=STDERR_TAIL_LINES)
rc: int | None = None
def new_state(self, prompt: str, resume: ResumeToken | None) -> ClaudeStreamState:
_ = prompt, resume
return ClaudeStreamState()
async with anyio.create_task_group() as tg:
tg.start_soon(
drain_stderr,
proc_stderr,
stderr_chunks,
logger,
"claude",
def start_run(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: ClaudeStreamState,
) -> None:
_ = state
logger.info(
"[claude] start run resume=%r",
resume.value if resume else None,
)
async for json_line in iter_jsonl(
proc_stdout, logger=logger, tag="claude"
):
if did_emit_completed:
continue
if json_line.data is None:
yield _note_completed(
next_note_id(),
"invalid JSON from claude; ignoring line",
ok=False,
detail={"line": json_line.raw},
)
continue
evt = json_line.data
logger.debug("[claude] prompt: %s", prompt)
for out_evt in translate_claude_event(
evt,
def invalid_json_events(
self,
*,
raw: str,
line: str,
state: ClaudeStreamState,
) -> list[TakopiEvent]:
_ = line
message = "invalid JSON from claude; ignoring line"
return [self.note_event(message, state=state, detail={"line": raw})]
def translate(
self,
data: dict[str, Any],
*,
state: ClaudeStreamState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = resume, found_session
return translate_claude_event(
data,
title=self.session_title,
state=state,
):
if isinstance(out_evt, StartedEvent):
session = out_evt.resume
if session.engine != ENGINE:
raise RuntimeError(
"claude emitted session token for wrong engine"
)
if (
expected_session is not None
and session != expected_session
):
raise RuntimeError(
"claude emitted a different session id than expected"
)
if expected_session is None:
session_lock = self._lock_for(session)
await session_lock.acquire()
session_lock_acquired = True
found_session = session
yield out_evt
continue
yield out_evt
if isinstance(out_evt, CompletedEvent):
did_emit_completed = True
break
rc = await proc.wait()
logger.debug("[claude] process exit pid=%s rc=%s", proc.pid, rc)
if did_emit_completed:
return
if rc != 0:
stderr_text = "".join(stderr_chunks)
def process_error_events(
self,
rc: int,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: ClaudeStreamState,
) -> list[TakopiEvent]:
message = f"claude failed (rc={rc})."
yield _note_completed(
next_note_id(),
resume_for_completed = found_session or resume
return [
self.note_event(
message,
state=state,
ok=False,
detail={"stderr_tail": stderr_text},
)
resume_for_completed = found_session or resume_token
yield CompletedEvent(
detail={"stderr_tail": stderr_tail},
),
CompletedEvent(
engine=ENGINE,
ok=False,
answer="",
resume=resume_for_completed,
error=message,
)
return
),
]
def stream_end_events(
self,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: ClaudeStreamState,
) -> list[TakopiEvent]:
_ = stderr_tail
if not found_session:
message = "claude finished but no session_id was captured"
resume_for_completed = resume_token
yield CompletedEvent(
resume_for_completed = resume
return [
CompletedEvent(
engine=ENGINE,
ok=False,
answer="",
resume=resume_for_completed,
error=message,
)
return
]
message = "claude finished without a result event"
yield CompletedEvent(
return [
CompletedEvent(
engine=ENGINE,
ok=False,
answer=state.last_assistant_text or "",
resume=found_session,
error=message,
)
finally:
if session_lock is not None and session_lock_acquired:
session_lock.release()
]
def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
claude_cmd = "claude"
model = config.get("model")
allowed_tools = config.get("allowed_tools")
dangerously_skip_permissions = config.get("dangerously_skip_permissions") is True
use_api_billing = config.get("use_api_billing") is True
title = str(model) if model is not None else "claude"
return ClaudeRunner(
claude_cmd=claude_cmd,
model=model,
allowed_tools=allowed_tools,
dangerously_skip_permissions=dangerously_skip_permissions,
use_api_billing=use_api_billing,
session_title=title,
)
BACKEND = EngineBackend(
id="claude",
build_runner=build_runner,
install_cmd="npm install -g @anthropic-ai/claude-code",
)
+182 -204
View File
@@ -1,14 +1,13 @@
from __future__ import annotations
import logging
import subprocess
from collections import deque
from collections.abc import AsyncIterator
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, cast
from weakref import WeakValueDictionary
import anyio
from ..backends import EngineBackend, EngineConfig
from ..config import ConfigError
from ..model import (
Action,
ActionEvent,
@@ -21,15 +20,8 @@ from ..model import (
StartedEvent,
TakopiEvent,
)
from ..runner import (
ResumeTokenMixin,
Runner,
SessionLockMixin,
compile_resume_pattern,
)
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..utils.paths import relativize_command
from ..utils.streams import drain_stderr, iter_jsonl
from ..utils.subprocess import manage_subprocess
logger = logging.getLogger(__name__)
@@ -46,7 +38,7 @@ _ACTION_KIND_MAP: dict[str, ActionKind] = {
"todo_list": "note",
}
_RESUME_RE = compile_resume_pattern(ENGINE)
_RESUME_RE = re.compile(r"(?im)^\s*`?codex\s+resume\s+(?P<token>[^`\s]+)`?\s*$")
def _started_event(token: ResumeToken, *, title: str) -> StartedEvent:
@@ -98,25 +90,6 @@ def _action_event(
)
def _note_completed(
action_id: str,
message: str,
*,
ok: bool = False,
detail: dict[str, Any] | None = None,
) -> TakopiEvent:
return _action_event(
phase="completed",
action_id=action_id,
kind="warning",
title=message,
detail=detail,
ok=ok,
message=message,
level="warning" if not ok else "info",
)
def _short_tool_name(item: dict[str, Any]) -> str:
name = ".".join(part for part in (item.get("server"), item.get("tool")) if part)
return name or "tool"
@@ -408,9 +381,18 @@ def translate_codex_event(event: dict[str, Any], *, title: str) -> list[TakopiEv
return []
class CodexRunner(SessionLockMixin, ResumeTokenMixin, Runner):
@dataclass
class CodexRunState:
note_seq: int = 0
final_answer: str | None = None
turn_index: int = 0
class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re = _RESUME_RE
stderr_tail_lines = STDERR_TAIL_LINES
logger = logger
def __init__(
self,
@@ -422,241 +404,237 @@ class CodexRunner(SessionLockMixin, ResumeTokenMixin, Runner):
self.codex_cmd = codex_cmd
self.extra_args = extra_args
self.session_title = title
self._session_locks: WeakValueDictionary[str, anyio.Lock] = (
WeakValueDictionary()
)
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
async for evt in self._run_with_resume_lock(prompt, resume, self._run):
yield evt
def command(self) -> str:
return self.codex_cmd
async def _run( # noqa: C901
def build_args(
self,
prompt: str,
resume_token: ResumeToken | None,
) -> AsyncIterator[TakopiEvent]:
logger.info(
"[codex] start run resume=%r", resume_token.value if resume_token else None
)
logger.debug("[codex] prompt: %s", prompt)
args = [self.codex_cmd]
args.extend(self.extra_args)
args.extend(["exec", "--json"])
if resume_token:
args.extend(["resume", resume_token.value, "-"])
resume: ResumeToken | None,
*,
state: Any,
) -> list[str]:
_ = prompt, state
args = [*self.extra_args, "exec", "--json"]
if resume:
args.extend(["resume", resume.value, "-"])
else:
args.append("-")
session_lock: anyio.Lock | None = None
session_lock_acquired = False
return args
try:
async with manage_subprocess(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
raise RuntimeError("codex exec failed to open subprocess pipes")
proc_stdin = proc.stdin
proc_stdout = proc.stdout
proc_stderr = proc.stderr
logger.debug("[codex] spawn pid=%s args=%r", proc.pid, args)
def new_state(self, prompt: str, resume: ResumeToken | None) -> CodexRunState:
_ = prompt, resume
return CodexRunState()
stderr_chunks: deque[str] = deque(maxlen=STDERR_TAIL_LINES)
rc: int | None = None
def start_run(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: CodexRunState,
) -> None:
_ = state
logger.info("[codex] start run resume=%r", resume.value if resume else None)
logger.debug("[codex] prompt: %s", prompt)
expected_session: ResumeToken | None = resume_token
found_session: ResumeToken | None = None
final_answer: str | None = None
note_seq = 0
did_emit_completed = False
turn_index = 0
def pipes_error_message(self) -> str:
return "codex exec failed to open subprocess pipes"
def next_note_id() -> str:
nonlocal note_seq
note_seq += 1
return f"codex.note.{note_seq}"
async with anyio.create_task_group() as tg:
tg.start_soon(
drain_stderr,
proc_stderr,
stderr_chunks,
logger,
"codex",
def handle_started_event(
self,
event: StartedEvent,
*,
expected_session: ResumeToken | None,
found_session: ResumeToken | None,
) -> tuple[ResumeToken | None, bool]:
if event.engine != ENGINE:
raise RuntimeError(
f"codex emitted session token for engine {event.engine!r}"
)
await proc_stdin.send(prompt.encode())
await proc_stdin.aclose()
if expected_session is not None and event.resume != expected_session:
message = "codex emitted a different session id than expected"
raise RuntimeError(message)
if found_session is None:
return event.resume, True
if event.resume != found_session:
message = "codex emitted a different session id than expected"
raise RuntimeError(message)
return found_session, False
async for json_line in iter_jsonl(
proc_stdout, logger=logger, tag="codex"
):
if did_emit_completed:
continue
if json_line.data is None:
note = _note_completed(
next_note_id(),
"invalid JSON from codex; ignoring line",
ok=False,
detail={"line": json_line.line},
)
yield note
continue
evt = json_line.data
etype = evt.get("type")
def translate(
self,
data: dict[str, Any],
*,
state: CodexRunState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
etype = data.get("type")
if etype == "error":
message = str(evt.get("message") or "codex error")
fatal_flag = evt.get("fatal")
message = str(data.get("message") or "codex error")
fatal_flag = data.get("fatal")
fatal = fatal_flag is True or fatal_flag is None
if fatal:
resume_for_completed = found_session or resume_token
yield _completed_event(
resume_for_completed = found_session or resume
return [
_completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
answer=state.final_answer or "",
error=message,
)
did_emit_completed = True
continue
note = _note_completed(
next_note_id(),
]
return [
self.note_event(
message,
state=state,
ok=False,
detail={
"code": evt.get("code"),
"fatal": evt.get("fatal"),
},
detail={"code": data.get("code"), "fatal": data.get("fatal")},
)
yield note
continue
]
if etype == "turn.failed":
error = evt.get("error") or {}
error = data.get("error") or {}
message = str(error.get("message") or "codex turn failed")
resume_for_completed = found_session or resume_token
yield _completed_event(
resume_for_completed = found_session or resume
return [
_completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
answer=state.final_answer or "",
error=message,
)
did_emit_completed = True
continue
]
if etype == "turn.rate_limited":
retry_ms = evt.get("retry_after_ms")
retry_ms = data.get("retry_after_ms")
message = "rate limited"
if isinstance(retry_ms, int):
message = f"rate limited (retry after {retry_ms}ms)"
note = _note_completed(next_note_id(), message, ok=False)
yield note
continue
return [self.note_event(message, state=state, ok=False)]
if etype == "turn.started":
action_id = f"turn_{turn_index}"
turn_index += 1
yield _action_event(
action_id = f"turn_{state.turn_index}"
state.turn_index += 1
return [
_action_event(
phase="started",
action_id=action_id,
kind="turn",
title="turn started",
)
continue
]
if etype == "turn.completed":
resume_for_completed = found_session or resume_token
yield _completed_event(
resume_for_completed = found_session or resume
return [
_completed_event(
resume=resume_for_completed,
ok=True,
answer=final_answer or "",
usage=evt.get("usage"),
answer=state.final_answer or "",
usage=data.get("usage"),
)
did_emit_completed = True
continue
]
if evt.get("type") == "item.completed":
item = evt.get("item") or {}
if data.get("type") == "item.completed":
item = data.get("item") or {}
item_type = item.get("type") or item.get("item_type")
if item_type == "assistant_message":
item_type = "agent_message"
if item_type == "agent_message" and isinstance(
item.get("text"), str
):
if final_answer is None:
final_answer = item["text"]
if item_type == "agent_message" and isinstance(item.get("text"), str):
if state.final_answer is None:
state.final_answer = item["text"]
else:
logger.debug(
"[codex] emitted multiple agent messages; using the last one"
)
final_answer = item["text"]
state.final_answer = item["text"]
for out_evt in translate_codex_event(
evt, title=self.session_title
):
if isinstance(out_evt, StartedEvent):
session = out_evt.resume
if found_session is None:
if session.engine != ENGINE:
raise RuntimeError(
f"codex emitted session token for engine {session.engine!r}"
)
if (
expected_session is not None
and session != expected_session
):
message = "codex emitted a different session id than expected"
raise RuntimeError(message)
if expected_session is None:
session_lock = self._lock_for(session)
await session_lock.acquire()
session_lock_acquired = True
found_session = session
yield out_evt
continue
yield out_evt
rc = await proc.wait()
return translate_codex_event(data, title=self.session_title)
logger.debug("[codex] process exit pid=%s rc=%s", proc.pid, rc)
if did_emit_completed:
return
if rc != 0:
stderr_text = "".join(stderr_chunks)
def process_error_events(
self,
rc: int,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: CodexRunState,
) -> list[TakopiEvent]:
message = f"codex exec failed (rc={rc})."
yield _note_completed(
next_note_id(),
resume_for_completed = found_session or resume
return [
self.note_event(
message,
state=state,
ok=False,
detail={"stderr_tail": stderr_text},
)
resume_for_completed = found_session or resume_token
yield _completed_event(
detail={"stderr_tail": stderr_tail},
),
_completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
answer=state.final_answer or "",
error=message,
)
return
),
]
def stream_end_events(
self,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: CodexRunState,
) -> list[TakopiEvent]:
_ = stderr_tail
if not found_session:
message = (
"codex exec finished but no session_id/thread_id was captured"
)
resume_for_completed = resume_token
yield _completed_event(
message = "codex exec finished but no session_id/thread_id was captured"
resume_for_completed = resume
return [
_completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
answer=state.final_answer or "",
error=message,
)
return
]
logger.info("[codex] done run session=%s", found_session.value)
yield _completed_event(
return [
_completed_event(
resume=found_session,
ok=True,
answer=final_answer or "",
answer=state.final_answer or "",
)
]
def build_runner(config: EngineConfig, config_path: Path) -> Runner:
codex_cmd = "codex"
extra_args_value = config.get("extra_args")
if extra_args_value is None:
extra_args = ["-c", "notify=[]"]
elif isinstance(extra_args_value, list) and all(
isinstance(item, str) for item in extra_args_value
):
extra_args = list(extra_args_value)
else:
raise ConfigError(
f"Invalid `codex.extra_args` in {config_path}; expected a list of strings."
)
title = "Codex"
profile_value = config.get("profile")
if profile_value:
if not isinstance(profile_value, str):
raise ConfigError(
f"Invalid `codex.profile` in {config_path}; expected a string."
)
extra_args.extend(["--profile", profile_value])
title = profile_value
return CodexRunner(codex_cmd=codex_cmd, extra_args=extra_args, title=title)
BACKEND = EngineBackend(
id="codex",
build_runner=build_runner,
install_cmd="npm install -g @openai/codex",
)
finally:
if session_lock is not None and session_lock_acquired:
session_lock.release()
+7 -7
View File
@@ -1,10 +1,10 @@
from __future__ import annotations
import re
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
from dataclasses import dataclass, replace
from typing import TypeAlias
from weakref import WeakValueDictionary
import anyio
@@ -16,7 +16,7 @@ from ..model import (
StartedEvent,
TakopiEvent,
)
from ..runner import ResumeTokenMixin, Runner, SessionLockMixin, compile_resume_pattern
from ..runner import ResumeTokenMixin, Runner, SessionLockMixin
ENGINE: EngineId = EngineId("mock")
@@ -76,10 +76,10 @@ class MockRunner(SessionLockMixin, ResumeTokenMixin, Runner):
self._answer = answer
self._resume_value = resume_value
self.title = title or str(engine).title()
self._session_locks: WeakValueDictionary[str, anyio.Lock] = (
WeakValueDictionary()
engine_name = re.escape(str(engine))
self.resume_re = re.compile(
rf"(?im)^\s*`?{engine_name}\s+resume\s+(?P<token>[^`\s]+)`?\s*$"
)
self.resume_re = compile_resume_pattern(engine)
async def run(
self, prompt: str, resume: ResumeToken | None
@@ -100,7 +100,7 @@ class MockRunner(SessionLockMixin, ResumeTokenMixin, Runner):
resume=token,
title=self.title,
)
lock = self._lock_for(token)
lock = self.lock_for(token)
async with lock:
yield session_evt
@@ -174,7 +174,7 @@ class ScriptRunner(MockRunner):
resume=token,
title=self.title,
)
lock = self._lock_for(token)
lock = self.lock_for(token)
async with lock:
if self._emit_session_start:
+1 -1
View File
@@ -149,7 +149,7 @@ async def test_run_serializes_same_session() -> None:
finally:
in_flight -= 1
runner._run = run_stub # type: ignore[assignment]
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
+39
View File
@@ -0,0 +1,39 @@
from typing import cast
import click
import typer
from takopi import cli, engines
def test_engine_discovery_skips_non_backend() -> None:
ids = engines.list_backend_ids()
assert "codex" in ids
assert "claude" in ids
assert "mock" not in ids
def test_cli_registers_engine_commands_sorted() -> None:
command_names = [cmd.name for cmd in cli.app.registered_commands]
engine_ids = engines.list_backend_ids()
assert set(engine_ids) <= set(command_names)
engine_commands = [name for name in command_names if name in engine_ids]
assert engine_commands == engine_ids
def test_engine_commands_do_not_expose_engine_id_option() -> None:
group = cast(click.Group, typer.main.get_command(cli.app))
engine_ids = engines.list_backend_ids()
ctx = group.make_context("takopi", [])
for engine_id in engine_ids:
command = group.get_command(ctx, engine_id)
assert command is not None
options: set[str] = set()
for param in command.params:
options.update(getattr(param, "opts", []))
options.update(getattr(param, "secondary_opts", []))
assert "--final-notify" in options
assert "--debug" in options
assert not any(opt.lstrip("-") == "engine-id" for opt in options)
+3 -3
View File
@@ -39,7 +39,7 @@ async def test_run_serializes_same_session() -> None:
finally:
in_flight -= 1
runner._run = run_stub # type: ignore[assignment]
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
@@ -76,7 +76,7 @@ async def test_run_allows_parallel_new_sessions() -> None:
finally:
in_flight -= 1
runner._run = run_stub # type: ignore[assignment]
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
@@ -112,7 +112,7 @@ async def test_run_allows_parallel_different_sessions() -> None:
finally:
in_flight -= 1
runner._run = run_stub # type: ignore[assignment]
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
+7 -7
View File
@@ -7,7 +7,7 @@ from takopi import engines, onboarding
def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None:
backend = engines.get_backend("codex")
monkeypatch.setattr(engines.shutil, "which", lambda _name: None)
monkeypatch.setattr(onboarding.shutil, "which", lambda _name: None)
monkeypatch.setattr(
onboarding,
"load_telegram_config",
@@ -17,14 +17,14 @@ def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None:
result = onboarding.check_setup(backend)
titles = {issue.title for issue in result.issues}
assert "Install the Codex CLI" in titles
assert "Create a config" not in titles
assert "install codex" in titles
assert "create a config" not in titles
assert result.ok is False
def test_check_setup_marks_missing_config(monkeypatch) -> None:
backend = engines.get_backend("codex")
monkeypatch.setattr(engines.shutil, "which", lambda _name: "/usr/bin/codex")
monkeypatch.setattr(onboarding.shutil, "which", lambda _name: "/usr/bin/codex")
def _raise() -> None:
raise onboarding.ConfigError("Missing config file")
@@ -34,13 +34,13 @@ def test_check_setup_marks_missing_config(monkeypatch) -> None:
result = onboarding.check_setup(backend)
titles = {issue.title for issue in result.issues}
assert "Create a config" in titles
assert "create a config" in titles
assert result.config_path == onboarding.HOME_CONFIG_PATH
def test_check_setup_marks_invalid_chat_id(monkeypatch, tmp_path: Path) -> None:
backend = engines.get_backend("codex")
monkeypatch.setattr(engines.shutil, "which", lambda _name: "/usr/bin/codex")
monkeypatch.setattr(onboarding.shutil, "which", lambda _name: "/usr/bin/codex")
monkeypatch.setattr(
onboarding,
"load_telegram_config",
@@ -50,4 +50,4 @@ def test_check_setup_marks_invalid_chat_id(monkeypatch, tmp_path: Path) -> None:
result = onboarding.check_setup(backend)
titles = {issue.title for issue in result.issues}
assert "Create a config" in titles
assert "create a config" in titles