docs: rewrite adding a runner

This commit is contained in:
banteg
2026-01-01 20:36:44 +04:00
parent d35752fc55
commit 6bb847c2cd
+476 -97
View File
@@ -1,62 +1,349 @@
# Adding a Runner
This guide walks through adding a new engine to Takopi without changing the
domain model. Use the existing runners (Codex/Claude) as references.
This guide explains how to add a **new engine runner** to Takopi.
## Quick checklist
A *runner* is the adapter between an engine-specific CLI (Codex, Claude Code, …) and Takopis
**normalized event model** (`StartedEvent`, `ActionEvent`, `CompletedEvent`).
1. Implement `Runner` in `src/takopi/runners/<engine>.py` (usually via
`JsonlSubprocessRunner`).
2. Emit Takopi events from `takopi.model` and implement resume helpers
(`format_resume`, `extract_resume`, `is_resume_line`).
3. Define `BACKEND = EngineBackend(...)` in the runner module (auto-discovered),
including `install_cmd` (and `cli_cmd` only if the binary name differs).
4. Extend tests (runner contract + engine-specific translation tests).
Takopi is designed so that adding a runner usually means **adding one new module** under
`src/takopi/runners/`—no changes to the bridge, renderer, or CLI.
The walkthrough below uses an **imaginary engine** named **Pi** (`pi`) and intentionally mirrors
the patterns used in `runners/claude.py`.
---
## Example: adding a `pi` engine
## What “done” looks like
This is a concrete walkthrough for an imaginary CLI called `pi`. The goal is to
make it easy to drop in another engine without changing the Takopi domain model.
After you add a runner, you should be able to:
### 1) Decide engine identity + resume format
- Run `takopi pi` (CLI subcommand is auto-registered).
- Start a new session and get a resume line like `` `pi --resume <token>` ``.
- Reply to any bot message containing that resume line and continue the same session.
- See progress updates (optional) and always get a final completion event.
- Engine id: `"pi"` (used in config, resume tokens, and CLI subcommand).
- Canonical resume line: the engines own CLI resume command, e.g.
`` `pi --resume <session_id>` ``.
- Pick the resume line format you want to support and define a regex for it in
the runner (Claude is a good example). If you choose the
`"<engine> resume <token>"` shape, you can use that exact regex.
---
### 2) Implement `src/takopi/runners/pi.py`
## Mental model
Recommended: `JsonlSubprocessRunner`
### 1) Takopi owns the domain model
For JSONL CLIs, this base class centralizes subprocess + JSONL plumbing,
lock timing, and completion semantics. Your runner usually only needs:
Takopis core types live in `takopi.model`:
- `command()` (binary name)
- `build_args(...)`
- `translate(...)` (map one JSON object to a list of Takopi events)
- `ResumeToken(engine, value)`
- `StartedEvent(engine, resume, title?, meta?)`
- `ActionEvent(engine, action, phase, ok?, message?, level?)`
- `CompletedEvent(engine, ok, answer, resume?, error?, usage?)`
Optional hooks for common variants:
Runners **must not** invent new event types. They translate engine output into these.
- `stdin_payload(...)`: return `None` if the prompt is passed via argv
- `env(...)`: add or redact environment variables
- `invalid_json_events(...)`: customize the warning event
- `process_error_events(...)`: customize `rc != 0` handling
- `stream_end_events(...)`: customize stream-end fallback (no `CompletedEvent`)
- `handle_started_event(...)`: customize session-id validation
### 2) The runner contract (invariants)
If you call `note_event(...)`, your state object must include `note_seq` or
override `next_note_id(...)`.
A run must produce events with these invariants (see `tests/test_runner_contract.py`):
Skeleton outline (JSONL CLI):
- Exactly **one** `StartedEvent`.
- Exactly **one** `CompletedEvent`.
- `CompletedEvent` is the **last** event.
- `CompletedEvent.resume == StartedEvent.resume` (same token).
Action events are optional (minimal runner mode):
- Minimum viable runner: `StartedEvent``CompletedEvent`.
- You may add `ActionEvent`s later (recommended for better progress UX).
### 3) Resume lines are runner-owned
Takopi deliberately treats the runner as the authority for:
- How a resume line looks in chat (`format_resume()`)
- How to parse a resume token out of text (`extract_resume()`)
- How to detect a resume line reliably (`is_resume_line()`)
This matters because Takopis Telegram truncation logic preserves resume lines.
---
## Step-by-step: add the imaginary `pi` runner
### Step 1 — Pick an engine id + resume command
Choose a stable engine id string. This string becomes:
- The config table name (`[pi]` in `takopi.toml`)
- The CLI subcommand (`takopi pi`)
- The `ResumeToken.engine`
For Pi well use:
- Engine id: `"pi"`
- Canonical resume command embedded in chat: `` `pi --resume <token>` ``
#### Write a resume regex
Follow the pattern used by Claude/Codex: accept optional backticks, be case-insensitive,
match full line, and capture a group named `token`.
```py
ENGINE: EngineId = "pi"
_RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\\s]+)`?\\s*$")
_RESUME_RE = re.compile(
r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
)
```
Why this shape?
- `(?m)` lets `^`/`$` match per-line inside multi-line messages.
- Optional backticks (`\`?`) lets you match Telegram inline-code formatting.
- Capturing the **last** token in a message lets users paste multiple resume lines.
---
### Step 2 — Create `src/takopi/runners/pi.py`
Create a new module next to the existing runners:
```
src/takopi/runners/
codex.py
claude.py
mock.py
pi.py # ← new
```
Takopi discovers engines by importing modules in `takopi.runners` and looking for a
module-level `BACKEND: EngineBackend` (see `takopi.engines`).
---
### Step 3 — Translate Pi JSONL into Takopi events
Most CLIs we integrate are JSONL-streaming processes.
Takopi provides `JsonlSubprocessRunner`, which:
- spawns the CLI
- drains stderr into a bounded tail
- reads stdout line-by-line as JSONL
- calls your `translate(...)` method to convert each JSON object into Takopi events
- guarantees “exactly one CompletedEvent” behavior
- provides safe fallbacks for rc != 0 or stream ending without a completion event
#### Define a state object
Copy the Claude pattern: create a small dataclass to hold streaming state.
Common things to track:
- `pending_actions`: map tool_use_id → `Action` so tool results can complete them
- `last_assistant_text`: fallback for final answer if the engine omits it
- `note_seq`: counter used by `JsonlSubprocessRunner.note_event(...)`
```py
from dataclasses import dataclass, field
@dataclass
class PiStreamState:
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
note_seq: int = 0
```
#### Decide what Pi emits
For this guide, assume Pi outputs events like:
```json
{"type":"session.start","session_id":"pi_01","model":"pi-large"}
{"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}}
{"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false}
{"type":"final","session_id":"pi_01","ok":true,"answer":"Done."}
```
#### Map them to Takopi events
Use this mapping (mirrors Claudes approach):
- `session.start``StartedEvent(engine="pi", resume=ResumeToken("pi", session_id))`
- `tool.use``ActionEvent(phase="started")` and stash action in `pending_actions`
- `tool.result``ActionEvent(phase="completed", ok=...)` and pop from `pending_actions`
- `final``CompletedEvent(ok, answer, resume)`
**Important:** emit exactly one `CompletedEvent`.
#### Make the translator a pure function
Claude keeps translation logic in a standalone function (`translate_claude_event(...)`).
This makes it easy to unit test without spawning a subprocess.
Do the same for Pi:
```py
def translate_pi_event(
event: dict[str, Any],
*,
title: str,
state: PiStreamState,
) -> list[TakopiEvent]:
etype = event.get("type")
if etype == "session.start":
session_id = event.get("session_id")
if not session_id:
return []
model = event.get("model")
event_title = str(model) if model else title
return [
StartedEvent(
engine=ENGINE,
resume=ResumeToken(engine=ENGINE, value=str(session_id)),
title=event_title,
)
]
if etype == "tool.use":
tool_id = event.get("id")
if not isinstance(tool_id, str) or not tool_id:
return []
name = str(event.get("name") or "tool")
tool_input = event.get("input")
if not isinstance(tool_input, dict):
tool_input = {}
# Keep titles short and friendly.
# (Claude uses takopi.utils.paths.relativize_command / relativize_path)
kind: ActionKind = "tool"
title = name
if name in {"Bash", "Shell"}:
kind = "command"
title = relativize_command(str(tool_input.get("command") or name))
action = Action(
id=tool_id,
kind=kind,
title=title,
detail={"name": name, "input": tool_input},
)
state.pending_actions[action.id] = action
return [ActionEvent(engine=ENGINE, action=action, phase="started")]
if etype == "tool.result":
tool_use_id = event.get("tool_use_id")
if not isinstance(tool_use_id, str) or not tool_use_id:
return []
action = state.pending_actions.pop(tool_use_id, None)
if action is None:
action = Action(
id=tool_use_id,
kind="tool",
title="tool result",
detail={},
)
is_error = event.get("is_error") is True
content = event.get("content")
result_text = "" if content is None else (content if isinstance(content, str) else str(content))
detail = dict(action.detail)
detail.update({"result_preview": result_text, "is_error": is_error})
return [
ActionEvent(
engine=ENGINE,
action=Action(
id=action.id,
kind=action.kind,
title=action.title,
detail=detail,
),
phase="completed",
ok=not is_error,
)
]
if etype == "final":
ok = event.get("ok") is True
answer = event.get("answer")
if not isinstance(answer, str):
answer = ""
if ok and not answer and state.last_assistant_text:
answer = state.last_assistant_text
session_id = event.get("session_id")
resume = (
ResumeToken(engine=ENGINE, value=str(session_id)) if session_id else None
)
error = None
if not ok:
err = event.get("error")
error = str(err) if err else "pi run failed"
return [
CompletedEvent(
engine=ENGINE,
ok=ok,
answer=answer,
resume=resume,
error=error,
)
]
return []
```
This is intentionally close to Claudes structure:
- Parse `type`
- Handle “init/session start” first
- Emit action-start and action-complete events
- Emit a final `CompletedEvent`
---
### Step 4 — Implement the `PiRunner` class
Most engines can implement a runner by combining:
- `ResumeTokenMixin` (resume parsing + resume-line detection)
- `JsonlSubprocessRunner` (process + JSONL streaming + completion semantics)
#### Why this combo?
It matches Claude/Codex:
- Runner owns resume format/regex.
- Base class owns locking and subprocess lifecycle.
- Translation stays in a pure function and is easily testable.
#### Minimal skeleton
```py
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from ..backends import EngineBackend, EngineConfig
from ..model import (
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
logger = logging.getLogger(__name__)
ENGINE: EngineId = EngineId("pi")
STDERR_TAIL_LINES = 200
_RESUME_RE = re.compile(
r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
)
@dataclass
class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
@@ -66,88 +353,106 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
pi_cmd: str = "pi"
model: str | None = None
allowed_tools: list[str] | None = None
session_title: str = "pi"
stderr_tail_lines = STDERR_TAIL_LINES
logger = logger
def format_resume(self, token: ResumeToken) -> str:
# Override because our canonical resume command is "pi --resume ...".
if token.engine != ENGINE:
raise RuntimeError(f"resume token is for engine {token.engine!r}")
return f"`pi --resume {token.value}`"
def command(self) -> str:
return self.pi_cmd
def build_args(
self, prompt: str, resume: ResumeToken | None, *, state: Any
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> list[str]:
_ = prompt, state
args = ["--jsonl", "--verbose"]
args = ["--output-format", "stream-json", "--verbose"]
if resume is not None:
args.extend(["--resume", resume.value])
if self.model is not None:
args.extend(["--model", self.model])
args.extend(["--model", str(self.model)])
if self.allowed_tools:
args.extend(["--allowed-tools", ",".join(self.allowed_tools)])
return args
def stdin_payload(
self, prompt: str, resume: ResumeToken | None, *, state: Any
self,
prompt: str,
resume: ResumeToken | None,
*,
state: Any,
) -> bytes | None:
_ = resume, state
# Pi reads the prompt from stdin.
return prompt.encode()
def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState:
_ = prompt, resume
return PiStreamState()
def translate(
self,
data: dict[str, Any],
*,
state: Any,
state: PiStreamState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = state, resume, found_session
...
_ = resume, found_session
return translate_pi_event(data, title=self.session_title, state=state)
```
Key implementation notes:
Notes:
- Use `BaseRunner` (or `JsonlSubprocessRunner`) for per-session serialization.
- Mix in `ResumeTokenMixin` (with a `resume_re`) or override
`format_resume` / `extract_resume` / `is_resume_line` so the runner owns
resume encoding/decoding.
- For JSONL CLIs, prefer `JsonlSubprocessRunner` and implement `command`,
`build_args`, and `translate` (override `stdin_payload` if the prompt should
be passed via argv instead of stdin).
- If you dont use `JsonlSubprocessRunner`, use `iter_jsonl(...)` +
`drain_stderr(...)` from `takopi.utils.streams`.
- **Minimal mode is supported:** start with exactly one `StartedEvent` and one
`CompletedEvent`. `ActionEvent`s are optional and can be added later. If you
do emit actions, you can emit only `phase="completed"` notes without tracking
pending state.
- **Do not truncate** tool outputs in the runner; pass full strings into events.
Truncation belongs in renderers.
- `JsonlSubprocessRunner` already enforces the “exactly one completed event” rule.
- When `resume=None`, Takopi will acquire a per-session lock after it sees the first
`StartedEvent`. This is why emitting `StartedEvent` early is important.
### 3) Map Pi JSONL → Takopi events
#### Optional but recommended overrides (Claude-inspired)
Example Pi lines (imaginary):
Depending on how robust you want the integration, consider adding:
```json
{"type":"session.start","session_id":"pi_01","model":"pi-large"}
{"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}}
{"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false}
{"type":"final","session_id":"pi_01","ok":true,"answer":"Done."}
```
- `env(...)`: to strip or inject environment variables (Claude strips `ANTHROPIC_API_KEY`
unless configured to use API billing).
- `invalid_json_events(...)`: emit a helpful warning `ActionEvent` on malformed JSONL.
- `process_error_events(...)`: customize rc != 0 behavior (include stderr tail in detail).
- `stream_end_events(...)`: handle “process exited cleanly but never emitted a final event”.
Mapping guidance:
Claude uses these to produce better failures instead of silent hangs.
- `session.start``StartedEvent(engine="pi", resume=<session_id>, title=<model>)`
- `tool.use``ActionEvent(phase="started")`
- `tool.result``ActionEvent(phase="completed")` and **pop** pending actions
- `final``CompletedEvent(ok, answer, resume)` (emit **exactly one**)
---
If Pi emits warnings/errors before the final event, surface them as completed
`ActionEvent`s (e.g., `kind="warning"`).
### Step 5 — Add `build_runner(...)` and `BACKEND`
### 4) Expose the backend (auto-discovered)
Takopi needs a way to build your runner from config.
Takopi discovers runners by importing modules in `takopi.runners` and looking
for a module-level `BACKEND: EngineBackend` (from `takopi.backends`).
At the bottom of `src/takopi/runners/pi.py`, define:
Follow the pattern in `runners/claude.py`:
```py
def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
pi_cmd = "pi"
model = config.get("model")
allowed_tools = config.get("allowed_tools")
title = str(model) if model is not None else "pi"
return PiRunner(
pi_cmd=pi_cmd,
model=model,
allowed_tools=allowed_tools,
session_title=title,
)
BACKEND = EngineBackend(
id="pi",
build_runner=build_runner,
@@ -155,24 +460,98 @@ BACKEND = EngineBackend(
)
```
No changes to `engines.py` or `cli.py` are required.
Thats it for wiring.
Only modules that define `BACKEND` are treated as engines. Internal/testing
modules (like `mock.py`) should omit it.
Because engine backends are auto-discovered (`takopi.engines`), you do **not** need
to register the runner elsewhere.
If the CLI binary name differs from the engine id, set `cli_cmd="pi-cli"` on
the backend.
If the binary name differs from the engine id, set:
Example config (minimal):
- `EngineBackend(cli_cmd="pi-cli")`
```toml
[pi]
model = "pi-large"
allowed_tools = ["Bash", "Read"]
```
so onboarding can find it on PATH.
### 5) Tests + fixtures
---
### Step 6 — Add tests (copy Claudes testing strategy)
A good runner PR usually contains 3 types of tests.
#### 1) Resume parsing tests
Copy `tests/test_claude_runner.py::test_claude_resume_format_and_extract`.
For Pi, assert:
- `format_resume(...)` outputs the canonical resume line.
- `extract_resume(...)` can parse it back out.
- It ignores other engines resume lines.
#### 2) Translation unit tests (fixtures)
Claudes translation tests load JSONL fixtures and feed them into the pure translator.
Do the same:
- `tests/fixtures/pi_stream_success.jsonl`
- `tests/fixtures/pi_stream_error.jsonl`
Then assert:
- first event is `StartedEvent`
- action events are correct (ids, kinds, titles)
- the last event is a `CompletedEvent`
- completed.resume matches started.resume
#### 3) Lock/serialization tests (optional, but great)
Claude has async tests proving that:
- two runs with the same resume token serialize (`max_in_flight == 1`)
- a new session run locks correctly after it emits `StartedEvent`
If your runner uses `JsonlSubprocessRunner`, you get most of this for free, but having
one targeted test catches regressions.
---
## Common pitfalls (and how Claude avoided them)
- **StartedEvent arrives too late**
- If you wait until the end to emit `StartedEvent`, Takopi cant acquire the per-session lock
early and another task might resume the same session concurrently.
- Emit `StartedEvent` immediately when you learn the session id.
- **Multiple completion events**
- Some CLIs emit multiple “final-ish” events. Decide which one becomes Takopis `CompletedEvent`.
- `JsonlSubprocessRunner` will stop reading after the first `CompletedEvent` it sees.
- **Missing completion event**
- Claude handles “stream ended without a result event” by emitting a synthetic `CompletedEvent`
in `stream_end_events(...)`.
- **Unhelpful error reporting**
- Include stderr tail in a warning action (Claude includes `stderr_tail` in `detail`).
- **Resume line gets truncated**
- Ensure `is_resume_line()` matches your `format_resume()` output. Takopi tries to preserve
resume lines during truncation.
- **Leaking secrets**
- If your engine can run in “subscription mode” without env keys, strip env vars like Claude
does with `ANTHROPIC_API_KEY`.
---
## Final checklist
Before you call the runner “done”:
- [ ] `takopi pi` appears automatically (module exports `BACKEND`).
- [ ] `format_resume()` matches `extract_resume()` + `is_resume_line()`.
- [ ] Translation emits exactly one `StartedEvent` and one `CompletedEvent`.
- [ ] `CompletedEvent.resume` matches `StartedEvent.resume`.
- [ ] rc != 0 produces a failure `CompletedEvent` (via `process_error_events`).
- [ ] “no final event” produces a failure `CompletedEvent` (via `stream_end_events`).
- [ ] Tests cover resume parsing + at least one translation fixture.
- Add `tests/test_pi_runner.py` for translation behavior.
- Reuse `tests/test_runner_contract.py` to ensure lock/resume invariants.
- Add JSONL fixtures under `tests/fixtures/` for the Pi stream.