# Adding a Runner This guide explains how to add a **new engine runner** to Takopi. A *runner* is the adapter between an engine-specific CLI (Codex, Claude Code, …) and Takopi’s **normalized event model** (`StartedEvent`, `ActionEvent`, `CompletedEvent`). Takopi is designed so that adding a runner usually means **adding one new module** under `src/takopi/runners/`—no changes to the bridge, renderer, or CLI. The walkthrough below uses an **imaginary engine** named **Pi** (`pi`) and intentionally mirrors the patterns used in `runners/claude.py`. --- ## What “done” looks like After you add a runner, you should be able to: - Run `takopi pi` (CLI subcommand is auto-registered). - Start a new session and get a resume line like `` `pi --resume ` ``. - Reply to any bot message containing that resume line and continue the same session. - See progress updates (optional) and always get a final completion event. --- ## Mental model ### 1) Takopi owns the domain model Takopi’s core types live in `takopi.model`: - `ResumeToken(engine, value)` - `StartedEvent(engine, resume, title?, meta?)` - `ActionEvent(engine, action, phase, ok?, message?, level?)` - `CompletedEvent(engine, ok, answer, resume?, error?, usage?)` Runners **must not** invent new event types. They translate engine output into these. ### 2) The runner contract (invariants) A run must produce events with these invariants (see `tests/test_runner_contract.py`): - Exactly **one** `StartedEvent`. - Exactly **one** `CompletedEvent`. - `CompletedEvent` is the **last** event. - `CompletedEvent.resume == StartedEvent.resume` (same token). Action events are optional (minimal runner mode): - Minimum viable runner: `StartedEvent` → `CompletedEvent`. - You may add `ActionEvent`s later (recommended for better progress UX). ### 3) Resume lines are runner-owned Takopi deliberately treats the runner as the authority for: - How a resume line looks in chat (`format_resume()`) - How to parse a resume token out of text (`extract_resume()`) - How to detect a resume line reliably (`is_resume_line()`) This matters because Takopi’s Telegram truncation logic preserves resume lines. --- ## Step-by-step: add the imaginary `pi` runner ### Step 1 — Pick an engine id + resume command Choose a stable engine id string. This string becomes: - The config table name (`[pi]` in `takopi.toml`) - The CLI subcommand (`takopi pi`) - The `ResumeToken.engine` For Pi we’ll use: - Engine id: `"pi"` - Canonical resume command embedded in chat: `` `pi --resume ` `` #### Write a resume regex Follow the pattern used by Claude/Codex: accept optional backticks, be case-insensitive, match full line, and capture a group named `token`. ```py _RESUME_RE = re.compile( r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\s]+)`?\s*$" ) ``` Why this shape? - `(?m)` lets `^`/`$` match per-line inside multi-line messages. - Optional backticks (`\`?`) lets you match Telegram inline-code formatting. - Capturing the **last** token in a message lets users paste multiple resume lines. --- ### Step 2 — Create `src/takopi/runners/pi.py` Create a new module next to the existing runners: ``` src/takopi/runners/ codex.py claude.py mock.py pi.py # ← new ``` Takopi discovers engines by importing modules in `takopi.runners` and looking for a module-level `BACKEND: EngineBackend` (see `takopi.engines`). --- ### Step 3 — Translate Pi JSONL into Takopi events Most CLIs we integrate are JSONL-streaming processes. Takopi provides `JsonlSubprocessRunner`, which: - spawns the CLI - drains stderr into a bounded tail - reads stdout line-by-line as JSONL - calls your `translate(...)` method to convert each JSON object into Takopi events - guarantees “exactly one CompletedEvent” behavior - provides safe fallbacks for rc != 0 or stream ending without a completion event #### Define a state object Copy the Claude pattern: create a small dataclass to hold streaming state. Common things to track: - `pending_actions`: map tool_use_id → `Action` so tool results can complete them - `last_assistant_text`: fallback for final answer if the engine omits it - `note_seq`: counter used by `JsonlSubprocessRunner.note_event(...)` ```py from dataclasses import dataclass, field @dataclass class PiStreamState: pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None note_seq: int = 0 ``` #### Decide what Pi emits For this guide, assume Pi outputs events like: ```json {"type":"session.start","session_id":"pi_01","model":"pi-large"} {"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}} {"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false} {"type":"final","session_id":"pi_01","ok":true,"answer":"Done."} ``` #### Map them to Takopi events Use this mapping (mirrors Claude’s approach): - `session.start` → `StartedEvent(engine="pi", resume=ResumeToken("pi", session_id))` - `tool.use` → `ActionEvent(phase="started")` and stash action in `pending_actions` - `tool.result` → `ActionEvent(phase="completed", ok=...)` and pop from `pending_actions` - `final` → `CompletedEvent(ok, answer, resume)` **Important:** emit exactly one `CompletedEvent`. #### Make the translator a pure function Claude keeps translation logic in a standalone function (`translate_claude_event(...)`). This makes it easy to unit test without spawning a subprocess. Do the same for Pi: ```py def translate_pi_event( event: dict[str, Any], *, title: str, state: PiStreamState, ) -> list[TakopiEvent]: etype = event.get("type") if etype == "session.start": session_id = event.get("session_id") if not session_id: return [] model = event.get("model") event_title = str(model) if model else title return [ StartedEvent( engine=ENGINE, resume=ResumeToken(engine=ENGINE, value=str(session_id)), title=event_title, ) ] if etype == "tool.use": tool_id = event.get("id") if not isinstance(tool_id, str) or not tool_id: return [] name = str(event.get("name") or "tool") tool_input = event.get("input") if not isinstance(tool_input, dict): tool_input = {} # Keep titles short and friendly. # (Claude uses takopi.utils.paths.relativize_command / relativize_path) kind: ActionKind = "tool" title = name if name in {"Bash", "Shell"}: kind = "command" title = relativize_command(str(tool_input.get("command") or name)) action = Action( id=tool_id, kind=kind, title=title, detail={"name": name, "input": tool_input}, ) state.pending_actions[action.id] = action return [ActionEvent(engine=ENGINE, action=action, phase="started")] if etype == "tool.result": tool_use_id = event.get("tool_use_id") if not isinstance(tool_use_id, str) or not tool_use_id: return [] action = state.pending_actions.pop(tool_use_id, None) if action is None: action = Action( id=tool_use_id, kind="tool", title="tool result", detail={}, ) is_error = event.get("is_error") is True content = event.get("content") result_text = "" if content is None else (content if isinstance(content, str) else str(content)) detail = dict(action.detail) detail.update({"result_preview": result_text, "is_error": is_error}) return [ ActionEvent( engine=ENGINE, action=Action( id=action.id, kind=action.kind, title=action.title, detail=detail, ), phase="completed", ok=not is_error, ) ] if etype == "final": ok = event.get("ok") is True answer = event.get("answer") if not isinstance(answer, str): answer = "" if ok and not answer and state.last_assistant_text: answer = state.last_assistant_text session_id = event.get("session_id") resume = ( ResumeToken(engine=ENGINE, value=str(session_id)) if session_id else None ) error = None if not ok: err = event.get("error") error = str(err) if err else "pi run failed" return [ CompletedEvent( engine=ENGINE, ok=ok, answer=answer, resume=resume, error=error, ) ] return [] ``` This is intentionally close to Claude’s structure: - Parse `type` - Handle “init/session start” first - Emit action-start and action-complete events - Emit a final `CompletedEvent` --- ### Step 4 — Implement the `PiRunner` class Most engines can implement a runner by combining: - `ResumeTokenMixin` (resume parsing + resume-line detection) - `JsonlSubprocessRunner` (process + JSONL streaming + completion semantics) #### Why this combo? It matches Claude/Codex: - Runner owns resume format/regex. - Base class owns locking and subprocess lifecycle. - Translation stays in a pure function and is easily testable. #### Minimal skeleton ```py from __future__ import annotations import logging import re from dataclasses import dataclass from pathlib import Path from typing import Any from ..backends import EngineBackend, EngineConfig from ..model import ( CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent, ) from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner logger = logging.getLogger(__name__) ENGINE: EngineId = EngineId("pi") STDERR_TAIL_LINES = 200 _RESUME_RE = re.compile( r"(?im)^\s*`?pi\s+--resume\s+(?P[^`\s]+)`?\s*$" ) @dataclass class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): engine: EngineId = ENGINE resume_re: re.Pattern[str] = _RESUME_RE pi_cmd: str = "pi" model: str | None = None allowed_tools: list[str] | None = None session_title: str = "pi" stderr_tail_lines = STDERR_TAIL_LINES logger = logger def format_resume(self, token: ResumeToken) -> str: # Override because our canonical resume command is "pi --resume ...". if token.engine != ENGINE: raise RuntimeError(f"resume token is for engine {token.engine!r}") return f"`pi --resume {token.value}`" def command(self) -> str: return self.pi_cmd def build_args( self, prompt: str, resume: ResumeToken | None, *, state: Any, ) -> list[str]: _ = prompt, state args = ["--output-format", "stream-json", "--verbose"] if resume is not None: args.extend(["--resume", resume.value]) if self.model is not None: args.extend(["--model", str(self.model)]) if self.allowed_tools: args.extend(["--allowed-tools", ",".join(self.allowed_tools)]) return args def stdin_payload( self, prompt: str, resume: ResumeToken | None, *, state: Any, ) -> bytes | None: _ = resume, state # Pi reads the prompt from stdin. return prompt.encode() def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState: _ = prompt, resume return PiStreamState() def translate( self, data: dict[str, Any], *, state: PiStreamState, resume: ResumeToken | None, found_session: ResumeToken | None, ) -> list[TakopiEvent]: _ = resume, found_session return translate_pi_event(data, title=self.session_title, state=state) ``` Notes: - `JsonlSubprocessRunner` already enforces the “exactly one completed event” rule. - When `resume=None`, Takopi will acquire a per-session lock after it sees the first `StartedEvent`. This is why emitting `StartedEvent` early is important. #### Optional but recommended overrides (Claude-inspired) Depending on how robust you want the integration, consider adding: - `env(...)`: to strip or inject environment variables (Claude strips `ANTHROPIC_API_KEY` unless configured to use API billing). - `invalid_json_events(...)`: emit a helpful warning `ActionEvent` on malformed JSONL. - `process_error_events(...)`: customize rc != 0 behavior (include stderr tail in detail). - `stream_end_events(...)`: handle “process exited cleanly but never emitted a final event”. Claude uses these to produce better failures instead of silent hangs. --- ### Step 5 — Add `build_runner(...)` and `BACKEND` Takopi needs a way to build your runner from config. Follow the pattern in `runners/claude.py`: ```py def build_runner(config: EngineConfig, _config_path: Path) -> Runner: pi_cmd = "pi" model = config.get("model") allowed_tools = config.get("allowed_tools") title = str(model) if model is not None else "pi" return PiRunner( pi_cmd=pi_cmd, model=model, allowed_tools=allowed_tools, session_title=title, ) BACKEND = EngineBackend( id="pi", build_runner=build_runner, install_cmd="npm install -g @acme/pi-cli", ) ``` That’s it for wiring. Because engine backends are auto-discovered (`takopi.engines`), you do **not** need to register the runner elsewhere. If the binary name differs from the engine id, set: - `EngineBackend(cli_cmd="pi-cli")` so onboarding can find it on PATH. --- ### Step 6 — Add tests (copy Claude’s testing strategy) A good runner PR usually contains 3 types of tests. #### 1) Resume parsing tests Copy `tests/test_claude_runner.py::test_claude_resume_format_and_extract`. For Pi, assert: - `format_resume(...)` outputs the canonical resume line. - `extract_resume(...)` can parse it back out. - It ignores other engines’ resume lines. #### 2) Translation unit tests (fixtures) Claude’s translation tests load JSONL fixtures and feed them into the pure translator. Do the same: - `tests/fixtures/pi_stream_success.jsonl` - `tests/fixtures/pi_stream_error.jsonl` Then assert: - first event is `StartedEvent` - action events are correct (ids, kinds, titles) - the last event is a `CompletedEvent` - completed.resume matches started.resume #### 3) Lock/serialization tests (optional, but great) Claude has async tests proving that: - two runs with the same resume token serialize (`max_in_flight == 1`) - a new session run locks correctly after it emits `StartedEvent` If your runner uses `JsonlSubprocessRunner`, you get most of this for free, but having one targeted test catches regressions. --- ## Common pitfalls (and how Claude avoided them) - **StartedEvent arrives too late** - If you wait until the end to emit `StartedEvent`, Takopi can’t acquire the per-session lock early and another task might resume the same session concurrently. - Emit `StartedEvent` immediately when you learn the session id. - **Multiple completion events** - Some CLIs emit multiple “final-ish” events. Decide which one becomes Takopi’s `CompletedEvent`. - `JsonlSubprocessRunner` will stop reading after the first `CompletedEvent` it sees. - **Missing completion event** - Claude handles “stream ended without a result event” by emitting a synthetic `CompletedEvent` in `stream_end_events(...)`. - **Unhelpful error reporting** - Include stderr tail in a warning action (Claude includes `stderr_tail` in `detail`). - **Resume line gets truncated** - Ensure `is_resume_line()` matches your `format_resume()` output. Takopi tries to preserve resume lines during truncation. - **Leaking secrets** - If your engine can run in “subscription mode” without env keys, strip env vars like Claude does with `ANTHROPIC_API_KEY`. --- ## Final checklist Before you call the runner “done”: - [ ] `takopi pi` appears automatically (module exports `BACKEND`). - [ ] `format_resume()` matches `extract_resume()` + `is_resume_line()`. - [ ] Translation emits exactly one `StartedEvent` and one `CompletedEvent`. - [ ] `CompletedEvent.resume` matches `StartedEvent.resume`. - [ ] rc != 0 produces a failure `CompletedEvent` (via `process_error_events`). - [ ] “no final event” produces a failure `CompletedEvent` (via `stream_end_events`). - [ ] Tests cover resume parsing + at least one translation fixture.