From 6bb847c2cdc651159ee3ae657844f0ae210a4030 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Thu, 1 Jan 2026 20:36:44 +0400 Subject: [PATCH] docs: rewrite adding a runner --- docs/adding-a-runner.md | 573 +++++++++++++++++++++++++++++++++------- 1 file changed, 476 insertions(+), 97 deletions(-) diff --git a/docs/adding-a-runner.md b/docs/adding-a-runner.md index 6ff87c5..9f940da 100644 --- a/docs/adding-a-runner.md +++ b/docs/adding-a-runner.md @@ -1,62 +1,349 @@ # Adding a Runner -This guide walks through adding a new engine to Takopi without changing the -domain model. Use the existing runners (Codex/Claude) as references. +This guide explains how to add a **new engine runner** to Takopi. -## Quick checklist +A *runner* is the adapter between an engine-specific CLI (Codex, Claude Code, …) and Takopi’s +**normalized event model** (`StartedEvent`, `ActionEvent`, `CompletedEvent`). -1. Implement `Runner` in `src/takopi/runners/.py` (usually via - `JsonlSubprocessRunner`). -2. Emit Takopi events from `takopi.model` and implement resume helpers - (`format_resume`, `extract_resume`, `is_resume_line`). -3. Define `BACKEND = EngineBackend(...)` in the runner module (auto-discovered), - including `install_cmd` (and `cli_cmd` only if the binary name differs). -4. Extend tests (runner contract + engine-specific translation tests). +Takopi is designed so that adding a runner usually means **adding one new module** under +`src/takopi/runners/`—no changes to the bridge, renderer, or CLI. + +The walkthrough below uses an **imaginary engine** named **Pi** (`pi`) and intentionally mirrors +the patterns used in `runners/claude.py`. --- -## Example: adding a `pi` engine +## What “done” looks like -This is a concrete walkthrough for an imaginary CLI called `pi`. The goal is to -make it easy to drop in another engine without changing the Takopi domain model. +After you add a runner, you should be able to: -### 1) Decide engine identity + resume format +- Run `takopi pi` (CLI subcommand is auto-registered). +- Start a new session and get a resume line like `` `pi --resume ` ``. +- Reply to any bot message containing that resume line and continue the same session. +- See progress updates (optional) and always get a final completion event. -- Engine id: `"pi"` (used in config, resume tokens, and CLI subcommand). -- Canonical resume line: the engine’s own CLI resume command, e.g. - `` `pi --resume ` ``. -- Pick the resume line format you want to support and define a regex for it in - the runner (Claude is a good example). If you choose the - `" resume "` shape, you can use that exact regex. +--- -### 2) Implement `src/takopi/runners/pi.py` +## Mental model -Recommended: `JsonlSubprocessRunner` +### 1) Takopi owns the domain model -For JSONL CLIs, this base class centralizes subprocess + JSONL plumbing, -lock timing, and completion semantics. Your runner usually only needs: +Takopi’s core types live in `takopi.model`: -- `command()` (binary name) -- `build_args(...)` -- `translate(...)` (map one JSON object to a list of Takopi events) +- `ResumeToken(engine, value)` +- `StartedEvent(engine, resume, title?, meta?)` +- `ActionEvent(engine, action, phase, ok?, message?, level?)` +- `CompletedEvent(engine, ok, answer, resume?, error?, usage?)` -Optional hooks for common variants: +Runners **must not** invent new event types. They translate engine output into these. -- `stdin_payload(...)`: return `None` if the prompt is passed via argv -- `env(...)`: add or redact environment variables -- `invalid_json_events(...)`: customize the warning event -- `process_error_events(...)`: customize `rc != 0` handling -- `stream_end_events(...)`: customize stream-end fallback (no `CompletedEvent`) -- `handle_started_event(...)`: customize session-id validation +### 2) The runner contract (invariants) -If you call `note_event(...)`, your state object must include `note_seq` or -override `next_note_id(...)`. +A run must produce events with these invariants (see `tests/test_runner_contract.py`): -Skeleton outline (JSONL CLI): +- Exactly **one** `StartedEvent`. +- Exactly **one** `CompletedEvent`. +- `CompletedEvent` is the **last** event. +- `CompletedEvent.resume == StartedEvent.resume` (same token). + +Action events are optional (minimal runner mode): + +- Minimum viable runner: `StartedEvent` → `CompletedEvent`. +- You may add `ActionEvent`s later (recommended for better progress UX). + +### 3) Resume lines are runner-owned + +Takopi deliberately treats the runner as the authority for: + +- How a resume line looks in chat (`format_resume()`) +- How to parse a resume token out of text (`extract_resume()`) +- How to detect a resume line reliably (`is_resume_line()`) + +This matters because Takopi’s Telegram truncation logic preserves resume lines. + +--- + +## Step-by-step: add the imaginary `pi` runner + +### Step 1 — Pick an engine id + resume command + +Choose a stable engine id string. This string becomes: + +- The config table name (`[pi]` in `takopi.toml`) +- The CLI subcommand (`takopi pi`) +- The `ResumeToken.engine` + +For Pi we’ll use: + +- Engine id: `"pi"` +- Canonical resume command embedded in chat: `` `pi --resume ` `` + +#### Write a resume regex + +Follow the pattern used by Claude/Codex: accept optional backticks, be case-insensitive, +match full line, and capture a group named `token`. ```py -ENGINE: EngineId = "pi" -_RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\\s]+)`?\\s*$") +_RESUME_RE = re.compile( + r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\s]+)`?\s*$" +) +``` + +Why this shape? + +- `(?m)` lets `^`/`$` match per-line inside multi-line messages. +- Optional backticks (`\`?`) lets you match Telegram inline-code formatting. +- Capturing the **last** token in a message lets users paste multiple resume lines. + +--- + +### Step 2 — Create `src/takopi/runners/pi.py` + +Create a new module next to the existing runners: + +``` +src/takopi/runners/ + codex.py + claude.py + mock.py + pi.py # ← new +``` + +Takopi discovers engines by importing modules in `takopi.runners` and looking for a +module-level `BACKEND: EngineBackend` (see `takopi.engines`). + +--- + +### Step 3 — Translate Pi JSONL into Takopi events + +Most CLIs we integrate are JSONL-streaming processes. + +Takopi provides `JsonlSubprocessRunner`, which: + +- spawns the CLI +- drains stderr into a bounded tail +- reads stdout line-by-line as JSONL +- calls your `translate(...)` method to convert each JSON object into Takopi events +- guarantees “exactly one CompletedEvent” behavior +- provides safe fallbacks for rc != 0 or stream ending without a completion event + +#### Define a state object + +Copy the Claude pattern: create a small dataclass to hold streaming state. + +Common things to track: + +- `pending_actions`: map tool_use_id → `Action` so tool results can complete them +- `last_assistant_text`: fallback for final answer if the engine omits it +- `note_seq`: counter used by `JsonlSubprocessRunner.note_event(...)` + +```py +from dataclasses import dataclass, field + +@dataclass +class PiStreamState: + pending_actions: dict[str, Action] = field(default_factory=dict) + last_assistant_text: str | None = None + note_seq: int = 0 +``` + +#### Decide what Pi emits + +For this guide, assume Pi outputs events like: + +```json +{"type":"session.start","session_id":"pi_01","model":"pi-large"} +{"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}} +{"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false} +{"type":"final","session_id":"pi_01","ok":true,"answer":"Done."} +``` + +#### Map them to Takopi events + +Use this mapping (mirrors Claude’s approach): + +- `session.start` → `StartedEvent(engine="pi", resume=ResumeToken("pi", session_id))` +- `tool.use` → `ActionEvent(phase="started")` and stash action in `pending_actions` +- `tool.result` → `ActionEvent(phase="completed", ok=...)` and pop from `pending_actions` +- `final` → `CompletedEvent(ok, answer, resume)` + +**Important:** emit exactly one `CompletedEvent`. + +#### Make the translator a pure function + +Claude keeps translation logic in a standalone function (`translate_claude_event(...)`). +This makes it easy to unit test without spawning a subprocess. + +Do the same for Pi: + +```py +def translate_pi_event( + event: dict[str, Any], + *, + title: str, + state: PiStreamState, +) -> list[TakopiEvent]: + etype = event.get("type") + + if etype == "session.start": + session_id = event.get("session_id") + if not session_id: + return [] + model = event.get("model") + event_title = str(model) if model else title + return [ + StartedEvent( + engine=ENGINE, + resume=ResumeToken(engine=ENGINE, value=str(session_id)), + title=event_title, + ) + ] + + if etype == "tool.use": + tool_id = event.get("id") + if not isinstance(tool_id, str) or not tool_id: + return [] + name = str(event.get("name") or "tool") + tool_input = event.get("input") + if not isinstance(tool_input, dict): + tool_input = {} + + # Keep titles short and friendly. + # (Claude uses takopi.utils.paths.relativize_command / relativize_path) + kind: ActionKind = "tool" + title = name + if name in {"Bash", "Shell"}: + kind = "command" + title = relativize_command(str(tool_input.get("command") or name)) + + action = Action( + id=tool_id, + kind=kind, + title=title, + detail={"name": name, "input": tool_input}, + ) + state.pending_actions[action.id] = action + return [ActionEvent(engine=ENGINE, action=action, phase="started")] + + if etype == "tool.result": + tool_use_id = event.get("tool_use_id") + if not isinstance(tool_use_id, str) or not tool_use_id: + return [] + action = state.pending_actions.pop(tool_use_id, None) + if action is None: + action = Action( + id=tool_use_id, + kind="tool", + title="tool result", + detail={}, + ) + + is_error = event.get("is_error") is True + content = event.get("content") + result_text = "" if content is None else (content if isinstance(content, str) else str(content)) + + detail = dict(action.detail) + detail.update({"result_preview": result_text, "is_error": is_error}) + + return [ + ActionEvent( + engine=ENGINE, + action=Action( + id=action.id, + kind=action.kind, + title=action.title, + detail=detail, + ), + phase="completed", + ok=not is_error, + ) + ] + + if etype == "final": + ok = event.get("ok") is True + answer = event.get("answer") + if not isinstance(answer, str): + answer = "" + if ok and not answer and state.last_assistant_text: + answer = state.last_assistant_text + + session_id = event.get("session_id") + resume = ( + ResumeToken(engine=ENGINE, value=str(session_id)) if session_id else None + ) + + error = None + if not ok: + err = event.get("error") + error = str(err) if err else "pi run failed" + + return [ + CompletedEvent( + engine=ENGINE, + ok=ok, + answer=answer, + resume=resume, + error=error, + ) + ] + + return [] +``` + +This is intentionally close to Claude’s structure: + +- Parse `type` +- Handle “init/session start” first +- Emit action-start and action-complete events +- Emit a final `CompletedEvent` + +--- + +### Step 4 — Implement the `PiRunner` class + +Most engines can implement a runner by combining: + +- `ResumeTokenMixin` (resume parsing + resume-line detection) +- `JsonlSubprocessRunner` (process + JSONL streaming + completion semantics) + +#### Why this combo? + +It matches Claude/Codex: + +- Runner owns resume format/regex. +- Base class owns locking and subprocess lifecycle. +- Translation stays in a pure function and is easily testable. + +#### Minimal skeleton + +```py +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from ..backends import EngineBackend, EngineConfig +from ..model import ( + CompletedEvent, + EngineId, + ResumeToken, + StartedEvent, + TakopiEvent, +) +from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner + +logger = logging.getLogger(__name__) + +ENGINE: EngineId = EngineId("pi") +STDERR_TAIL_LINES = 200 + +_RESUME_RE = re.compile( + r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\s]+)`?\s*$" +) + @dataclass class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): @@ -66,88 +353,106 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): pi_cmd: str = "pi" model: str | None = None allowed_tools: list[str] | None = None + session_title: str = "pi" + stderr_tail_lines = STDERR_TAIL_LINES + logger = logger + + def format_resume(self, token: ResumeToken) -> str: + # Override because our canonical resume command is "pi --resume ...". + if token.engine != ENGINE: + raise RuntimeError(f"resume token is for engine {token.engine!r}") + return f"`pi --resume {token.value}`" def command(self) -> str: return self.pi_cmd def build_args( - self, prompt: str, resume: ResumeToken | None, *, state: Any + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, ) -> list[str]: _ = prompt, state - args = ["--jsonl", "--verbose"] + args = ["--output-format", "stream-json", "--verbose"] if resume is not None: args.extend(["--resume", resume.value]) if self.model is not None: - args.extend(["--model", self.model]) + args.extend(["--model", str(self.model)]) if self.allowed_tools: args.extend(["--allowed-tools", ",".join(self.allowed_tools)]) return args def stdin_payload( - self, prompt: str, resume: ResumeToken | None, *, state: Any + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, ) -> bytes | None: _ = resume, state + # Pi reads the prompt from stdin. return prompt.encode() + def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState: + _ = prompt, resume + return PiStreamState() + def translate( self, data: dict[str, Any], *, - state: Any, + state: PiStreamState, resume: ResumeToken | None, found_session: ResumeToken | None, ) -> list[TakopiEvent]: - _ = state, resume, found_session - ... + _ = resume, found_session + return translate_pi_event(data, title=self.session_title, state=state) ``` -Key implementation notes: +Notes: -- Use `BaseRunner` (or `JsonlSubprocessRunner`) for per-session serialization. -- Mix in `ResumeTokenMixin` (with a `resume_re`) or override - `format_resume` / `extract_resume` / `is_resume_line` so the runner owns - resume encoding/decoding. -- For JSONL CLIs, prefer `JsonlSubprocessRunner` and implement `command`, - `build_args`, and `translate` (override `stdin_payload` if the prompt should - be passed via argv instead of stdin). -- If you don’t use `JsonlSubprocessRunner`, use `iter_jsonl(...)` + - `drain_stderr(...)` from `takopi.utils.streams`. -- **Minimal mode is supported:** start with exactly one `StartedEvent` and one - `CompletedEvent`. `ActionEvent`s are optional and can be added later. If you - do emit actions, you can emit only `phase="completed"` notes without tracking - pending state. -- **Do not truncate** tool outputs in the runner; pass full strings into events. - Truncation belongs in renderers. +- `JsonlSubprocessRunner` already enforces the “exactly one completed event” rule. +- When `resume=None`, Takopi will acquire a per-session lock after it sees the first + `StartedEvent`. This is why emitting `StartedEvent` early is important. -### 3) Map Pi JSONL → Takopi events +#### Optional but recommended overrides (Claude-inspired) -Example Pi lines (imaginary): +Depending on how robust you want the integration, consider adding: -```json -{"type":"session.start","session_id":"pi_01","model":"pi-large"} -{"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}} -{"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false} -{"type":"final","session_id":"pi_01","ok":true,"answer":"Done."} -``` +- `env(...)`: to strip or inject environment variables (Claude strips `ANTHROPIC_API_KEY` + unless configured to use API billing). +- `invalid_json_events(...)`: emit a helpful warning `ActionEvent` on malformed JSONL. +- `process_error_events(...)`: customize rc != 0 behavior (include stderr tail in detail). +- `stream_end_events(...)`: handle “process exited cleanly but never emitted a final event”. -Mapping guidance: +Claude uses these to produce better failures instead of silent hangs. -- `session.start` → `StartedEvent(engine="pi", resume=, title=)` -- `tool.use` → `ActionEvent(phase="started")` -- `tool.result` → `ActionEvent(phase="completed")` and **pop** pending actions -- `final` → `CompletedEvent(ok, answer, resume)` (emit **exactly one**) +--- -If Pi emits warnings/errors before the final event, surface them as completed -`ActionEvent`s (e.g., `kind="warning"`). +### Step 5 — Add `build_runner(...)` and `BACKEND` -### 4) Expose the backend (auto-discovered) +Takopi needs a way to build your runner from config. -Takopi discovers runners by importing modules in `takopi.runners` and looking -for a module-level `BACKEND: EngineBackend` (from `takopi.backends`). - -At the bottom of `src/takopi/runners/pi.py`, define: +Follow the pattern in `runners/claude.py`: ```py +def build_runner(config: EngineConfig, _config_path: Path) -> Runner: + pi_cmd = "pi" + + model = config.get("model") + allowed_tools = config.get("allowed_tools") + + title = str(model) if model is not None else "pi" + + return PiRunner( + pi_cmd=pi_cmd, + model=model, + allowed_tools=allowed_tools, + session_title=title, + ) + + BACKEND = EngineBackend( id="pi", build_runner=build_runner, @@ -155,24 +460,98 @@ BACKEND = EngineBackend( ) ``` -No changes to `engines.py` or `cli.py` are required. +That’s it for wiring. -Only modules that define `BACKEND` are treated as engines. Internal/testing -modules (like `mock.py`) should omit it. +Because engine backends are auto-discovered (`takopi.engines`), you do **not** need +to register the runner elsewhere. -If the CLI binary name differs from the engine id, set `cli_cmd="pi-cli"` on -the backend. +If the binary name differs from the engine id, set: -Example config (minimal): +- `EngineBackend(cli_cmd="pi-cli")` -```toml -[pi] -model = "pi-large" -allowed_tools = ["Bash", "Read"] -``` +so onboarding can find it on PATH. -### 5) Tests + fixtures +--- + +### Step 6 — Add tests (copy Claude’s testing strategy) + +A good runner PR usually contains 3 types of tests. + +#### 1) Resume parsing tests + +Copy `tests/test_claude_runner.py::test_claude_resume_format_and_extract`. + +For Pi, assert: + +- `format_resume(...)` outputs the canonical resume line. +- `extract_resume(...)` can parse it back out. +- It ignores other engines’ resume lines. + +#### 2) Translation unit tests (fixtures) + +Claude’s translation tests load JSONL fixtures and feed them into the pure translator. + +Do the same: + +- `tests/fixtures/pi_stream_success.jsonl` +- `tests/fixtures/pi_stream_error.jsonl` + +Then assert: + +- first event is `StartedEvent` +- action events are correct (ids, kinds, titles) +- the last event is a `CompletedEvent` +- completed.resume matches started.resume + +#### 3) Lock/serialization tests (optional, but great) + +Claude has async tests proving that: + +- two runs with the same resume token serialize (`max_in_flight == 1`) +- a new session run locks correctly after it emits `StartedEvent` + +If your runner uses `JsonlSubprocessRunner`, you get most of this for free, but having +one targeted test catches regressions. + +--- + +## Common pitfalls (and how Claude avoided them) + +- **StartedEvent arrives too late** + - If you wait until the end to emit `StartedEvent`, Takopi can’t acquire the per-session lock + early and another task might resume the same session concurrently. + - Emit `StartedEvent` immediately when you learn the session id. + +- **Multiple completion events** + - Some CLIs emit multiple “final-ish” events. Decide which one becomes Takopi’s `CompletedEvent`. + - `JsonlSubprocessRunner` will stop reading after the first `CompletedEvent` it sees. + +- **Missing completion event** + - Claude handles “stream ended without a result event” by emitting a synthetic `CompletedEvent` + in `stream_end_events(...)`. + +- **Unhelpful error reporting** + - Include stderr tail in a warning action (Claude includes `stderr_tail` in `detail`). + +- **Resume line gets truncated** + - Ensure `is_resume_line()` matches your `format_resume()` output. Takopi tries to preserve + resume lines during truncation. + +- **Leaking secrets** + - If your engine can run in “subscription mode” without env keys, strip env vars like Claude + does with `ANTHROPIC_API_KEY`. + +--- + +## Final checklist + +Before you call the runner “done”: + +- [ ] `takopi pi` appears automatically (module exports `BACKEND`). +- [ ] `format_resume()` matches `extract_resume()` + `is_resume_line()`. +- [ ] Translation emits exactly one `StartedEvent` and one `CompletedEvent`. +- [ ] `CompletedEvent.resume` matches `StartedEvent.resume`. +- [ ] rc != 0 produces a failure `CompletedEvent` (via `process_error_events`). +- [ ] “no final event” produces a failure `CompletedEvent` (via `stream_end_events`). +- [ ] Tests cover resume parsing + at least one translation fixture. -- Add `tests/test_pi_runner.py` for translation behavior. -- Reuse `tests/test_runner_contract.py` to ensure lock/resume invariants. -- Add JSONL fixtures under `tests/fixtures/` for the Pi stream.