feat: add pi runner (#24)

This commit is contained in:
banteg
2026-01-02 16:13:55 +04:00
committed by GitHub
parent 7e5d6e3d40
commit d9c53b9e3a
11 changed files with 1098 additions and 8 deletions
+2 -1
View File
@@ -4,7 +4,8 @@
### changes
- TBD
- add a pi runner via the `pi` CLI with jsonl streaming and resume support
- document the pi runner, event mapping, and jsonl stream capture tips
### fixes
+14 -4
View File
@@ -49,7 +49,7 @@ The orchestrator module containing:
- `/cancel` routes by reply-to progress message id (accepts extra text)
- `/{engine}` on the first line selects the engine for new threads
- Progress edits are throttled to 2s intervals and only run when new events arrive
- Resume tokens are runner-formatted command lines (e.g., `` `codex resume <token>` ``)
- Resume tokens are runner-formatted command lines (e.g., `` `codex resume <token>` ``, `` `claude --resume <token>` ``, `` `pi --session <path>` ``)
- Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match
- Bot command menu is synced on startup (`cancel` + engine commands)
@@ -92,6 +92,13 @@ The orchestrator module containing:
- Stderr is drained into a bounded tail (debug logging only)
- Translation errors abort the run; keep event normalization defensive
### `runners/pi.py` - Pi runner
| Component | Purpose |
|-----------|---------|
| `PiRunner` | Spawns `pi --print --mode json`, streams JSONL, emits takopi events |
| `translate_pi_event()` | Normalizes Pi JSONL into the takopi event schema |
### `model.py` / `runner.py` - Core domain types
| File | Purpose |
@@ -113,6 +120,8 @@ Auto-discovers runner modules in `takopi.runners` that export `BACKEND`.
| File | Purpose |
|------|---------|
| `codex.py` | Codex runner (JSONL → takopi events) + per-resume locks |
| `claude.py` | Claude runner (JSONL → takopi events) + per-resume locks |
| `pi.py` | Pi runner (JSONL → takopi events) + per-resume locks |
| `mock.py` | Mock runner for tests/demos |
### `config.py` - Configuration loading
@@ -166,7 +175,7 @@ handle_message() spawned as task with selected runner
Send initial progress message (silent)
runner.run(prompt, resume_token)
├── Spawns engine subprocess (e.g., codex exec --json)
├── Spawns engine subprocess (e.g., codex exec --json, pi --print --mode json)
├── Streams JSONL from stdout
├── Normalizes JSONL -> takopi events
├── Yields Takopi events (async iterator)
@@ -184,8 +193,8 @@ Send/edit final message
### Resume Flow
Same as above; auto-router polls all runners to extract resume tokens:
- Router returns first matching token (e.g. `` `claude --resume <id>` `` routes to Claude)
- Selected runner spawns with resume (e.g. `codex exec --json resume <token> -`)
- Router returns first matching token (e.g. `` `claude --resume <id>` `` routes to Claude, `` `pi --session <path>` `` routes to Pi)
- Selected runner spawns with resume (e.g. `codex exec --json resume <token> -`, `pi --print --mode json --session <path> <prompt>`)
- Per-token lock serializes concurrent resumes on the same thread
## Error Handling
@@ -193,6 +202,7 @@ Same as above; auto-router polls all runners to extract resume tokens:
| Scenario | Behavior |
|----------|----------|
| `codex exec` fails (rc != 0) | Emits a warning `action` plus `completed(ok=false, error=...)` |
| `pi` fails (rc != 0) | Emits a warning `action` plus `completed(ok=false, error=...)` |
| Telegram API error | Logged, edit skipped (progress continues) |
| Cancellation | Cancel scope terminates the process group (POSIX) and renders `cancelled` |
| Errors in handler | Final render uses `status=error` and preserves resume tokens when known |
+137
View File
@@ -0,0 +1,137 @@
Below is a concrete implementation spec for adding **Pi (pi-coding-agent CLI)** as a first-class engine in Takopi (v0.4.0).
---
## Scope
### Goal
Add a new engine backend **`pi`** so Takopi can:
* Run Pi non-interactively via the **pi CLI** (`pi --print`).
* Stream progress by parsing **`--mode json`** (newline-delimited JSON). Each line is a JSON object.
* Support resumable sessions via **`--session <path>`** (Takopi emits a canonical resume line the user can reply with).
### Non-goals (v1)
* Interactive TUI flows (session picker, prompts, etc.)
* RPC mode (requires a long-running process and JSON commands)
---
## UX and behavior
### Engine selection
* Existing: `takopi codex`
* New: `takopi pi`
### Resume UX (canonical line)
Takopi appends a **single backticked** resume line at the end of the message, like:
```text
`pi --session /home/user/.pi/agent/sessions/--repo--/2026-01-02T12-34-56-789Z_abcd.jsonl`
```
Notes:
* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session <path>` instead.
* The resume token is the **session file path** (JSONL), treated as an opaque string.
* If the path contains spaces, the runner will quote it.
### Non-interactive runs
Use `--print` and `--mode json` for headless JSONL output.
Pi does not accept `-- <prompt>` to protect prompts starting with `-`. Takopi prefixes a leading space if the prompt begins with `-` so it is not parsed as a flag.
---
## Config additions
Takopi config lives at either:
* `.takopi/takopi.toml` (project-local), or
* `~/.takopi/takopi.toml` (home).
Add a new optional `[pi]` section.
Recommended v1 schema:
```toml
# .takopi/takopi.toml
default_engine = "pi"
[pi]
cmd = "pi" # optional; defaults to "pi"
extra_args = [] # optional list of strings, appended verbatim
model = "..." # optional; passed as --model
provider = "..." # optional; passed as --provider
session_dir = "..." # optional; directory for session files
session_title = "pi" # optional; defaults to model or "pi"
```
Notes:
* `extra_args` lets you pass new Pi flags without changing Takopi.
* If `session_dir` is omitted, Takopi uses Pi's default session dir:
`~/.pi/agent/sessions/--<cwd>--` (with path separators replaced by `-`).
---
## Code changes (by file)
### 1) New file: `src/takopi/runners/pi.py`
Expose a module-level `BACKEND = EngineBackend(...)`.
#### Runner invocation
The runner should launch Pi in headless JSON mode:
```text
pi --print --mode json --session <session.jsonl> <prompt>
```
When resuming, `<session.jsonl>` is the resume token extracted from the chat.
#### Event translation
Pi JSONL output is `AgentSessionEvent` (from `@mariozechner/pi-agent-core`).
The runner should translate:
* `tool_execution_start` -> `action` (phase: started)
* `tool_execution_end` -> `action` (phase: completed)
* `agent_end` -> `completed`
For the final answer, use the most recent assistant message text (from
`message_end` events). For errors, if the assistant stopReason is `error` or
`aborted`, emit `completed(ok=false, error=...)`.
---
## Installation and auth
Install the CLI globally:
```text
npm install -g @mariozechner/pi-coding-agent
```
Auth is stored under `~/.pi/agent/auth.json`. Run `pi` once interactively to
set up credentials before using Takopi.
---
## Known pitfalls
* `--resume` is interactive; Takopi uses `--session <path>` instead.
* Prompts that start with `-` are interpreted as flags by the CLI. Takopi
prefixes a space to make them safe.
---
If you want, I can also add a sample `takopi.toml` snippet to the README or
include a small quickstart section for Pi in the onboarding panel.
@@ -0,0 +1,67 @@
# Pi `--mode json` event cheatsheet
`pi --print --mode json` writes **one JSON object per line** (JSONL) with a
required `type` field. These are `AgentSessionEvent` objects from
`@mariozechner/pi-agent-core`.
## Top-level event lines
### `agent_start`
```json
{"type":"agent_start"}
```
### `agent_end`
```json
{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Done."}],"stopReason":"stop","timestamp":123}]}
```
### `turn_start` / `turn_end`
```json
{"type":"turn_start"}
```
```json
{"type":"turn_end","message":{...},"toolResults":[...]}
```
### `message_start` / `message_update` / `message_end`
```json
{"type":"message_start","message":{"role":"assistant","content":[{"type":"text","text":"Working..."}]}}
```
```json
{"type":"message_update","message":{...},"assistantMessageEvent":{"type":"text_delta","delta":"...","contentIndex":0}}
```
```json
{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Done."}],"stopReason":"stop"}}
```
### `tool_execution_start`
```json
{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"}}
```
### `tool_execution_update`
```json
{"type":"tool_execution_update","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"},"partialResult":{"content":[{"type":"text","text":"..."}]}}
```
### `tool_execution_end`
```json
{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{"content":[{"type":"text","text":"ok"}],"details":{}},"isError":false}
```
## Notes
* `message_end` with `role = "assistant"` contains the final assistant text.
* `assistantMessageEvent` in `message_update` provides streaming deltas.
* `tool_execution_*` events map cleanly to Takopi `action` events.
+154
View File
@@ -0,0 +1,154 @@
# Pi -> Takopi event mapping (spec)
This document specifies how to add a Pi runner to Takopi by translating
Pi CLI `--mode json` JSONL events into Takopi events. The Pi JSONL stream is
`AgentSessionEvent` from `@mariozechner/pi-agent-core`.
The goal is to make Pi feel identical to the Codex/Claude runners from the
bridge/renderer point of view while preserving Takopi invariants (stable action
ids, per-session serialization, single completed event).
---
## 1. Input stream contract (Pi CLI)
Pi CLI emits **one JSON object per line** (JSONL) when invoked with:
```
pi --print --mode json <prompt>
```
Notes:
- `--print` is required for non-interactive runs.
- `--mode json` outputs all agent events (no TUI banners).
- Pi does not support `-- <prompt>`; prompts starting with `-` must be
prefixed (Takopi does this automatically).
---
## 2. Resume tokens and resume lines
- Engine id: `pi`
- Canonical resume line (embedded in chat):
```
`pi --session <path>`
```
The token is the **session JSONL file path**.
Why not `--resume`?
- `--resume/-r` opens an interactive session picker; it does not accept a
session token. Takopi must use `--session <path>` instead.
---
## 3. Session lifecycle + serialization
Takopi requires **serialization per session token**:
- For new runs (`resume=None`), do **not** acquire a lock until a `started`
event is emitted (Takopi emits this as soon as the first JSON event arrives).
- Once the session is known, acquire a lock for `pi:<session_path>` and hold it
until the run completes.
- For resumed runs, acquire the lock immediately on entry.
---
## 4. Event translation (Pi JSONL -> Takopi)
Pi emits `AgentSessionEvent` objects. Only a subset is required for Takopi.
### 4.1 `tool_execution_start`
Example:
```json
{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"}}
```
Mapping:
- Emit `action` with `phase="started"`.
- `action.id = toolCallId`.
- `action.kind` from tool name (see section 5).
- `action.title` derived from tool + args.
### 4.2 `tool_execution_end`
Example:
```json
{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{...},"isError":false}
```
Mapping:
- Emit `action` with `phase="completed"`.
- `ok = !isError`.
- Carry `result` and `isError` in `detail` for debugging.
### 4.3 `message_end` (assistant)
Pi emits message lifecycle events. For `message_end` where `message.role == "assistant"`:
- Store the latest assistant text as the **final answer fallback**.
- If `stopReason` is `error` or `aborted`, store `errorMessage`.
- Capture `usage` for `completed.usage`.
### 4.4 `agent_end`
Example:
```json
{"type":"agent_end","messages":[...]}
```
Mapping:
- Emit a single `completed` event:
- `ok = true` unless the last assistant message has `stopReason` `error` or `aborted`.
- `answer = last assistant text` (from `message_end` or `agent_end.messages`).
- `error = errorMessage` if present.
- `resume = ResumeToken(engine="pi", value=session_path)`.
- `usage = last assistant usage`.
### 4.5 Other events
Ignore unknown events. If a JSONL line is malformed, emit a warning action and
continue (default `JsonlSubprocessRunner` behavior).
---
## 5. Tool name -> ActionKind mapping heuristics
Pi tool names are lower-case by default. Suggested mapping:
| Tool name | ActionKind | Title logic |
| --- | --- | --- |
| `bash` | `command` | `args.command` |
| `edit`, `write` | `file_change` | `args.path` |
| `read` | `tool` | `read: <path>` |
| `grep` | `tool` | `grep: <pattern>` |
| `find` | `tool` | `find: <pattern>` |
| `ls` | `tool` | `ls: <path>` |
| (default) | `tool` | tool name |
For `file_change`, include `detail.changes = [{"path": <path>, "kind": "update"}]`.
---
## 6. Usage mapping
Takopi `completed.usage` should mirror Pi's assistant `usage` object without
transformation.
---
## 7. Suggested Takopi config keys
A minimal TOML config for Pi:
```toml
[pi]
cmd = "pi"
model = "..."
provider = "..."
extra_args = []
```
Use `extra_args` for any newer Pi CLI flags not explicitly mapped.
+3 -2
View File
@@ -23,7 +23,7 @@ Out of scope for v0.4.0:
## 2. Terminology
- **EngineId**: string identifier of an engine (e.g., `"codex"`).
- **EngineId**: string identifier of an engine (e.g., `"codex"`, `"claude"`, `"pi"`).
- **Runner**: Takopi adapter that executes an engine process and yields **Takopi events**.
- **Thread**: a single engine-side conversation, identified in Takopi by a **ResumeToken**.
- **ResumeToken**: Takopi-owned thread identifier `{ engine: EngineId, value: str }`.
@@ -41,6 +41,7 @@ The canonical ResumeLine embedded in chat MUST be the engines CLI resume comm
- `codex resume <id>`
- `claude --resume <id>`
- `pi --session <path>`
Takopi MUST treat the runner as authoritative for:
@@ -347,7 +348,7 @@ Decision (v0.4.0):
* If an engine subcommand is provided, Takopi MUST still use the auto-router, but it overrides the configured default engine for new threads.
* Resume extraction MUST poll **all** available runners (per §3.4) and route to the first matching runner.
* New thread engine override (chat-level):
* Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude` or `/codex`) to select the engine for a **new** thread.
* Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude`, `/codex`, or `/pi`) to select the engine for a **new** thread.
* The bridge MUST strip that directive from the prompt before invoking the runner.
* If a ResumeToken is resolved from the message or reply, it MUST take precedence and the `/{engine}` directive MUST be ignored.
+7 -1
View File
@@ -2,7 +2,7 @@
🐙 *he just wants to help-pi*
telegram bridge for codex, claude code, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions.
telegram bridge for codex, claude code, pi, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions.
## features
@@ -22,6 +22,7 @@ parallel runs across threads, per thread queue support.
- at least one engine installed:
- `codex` on PATH (`npm install -g @openai/codex` or `brew install codex`)
- `claude` on PATH (`npm install -g @anthropic-ai/claude-code`)
- `pi` on PATH (`npm install -g @mariozechner/pi-coding-agent`)
## install
@@ -55,6 +56,10 @@ allowed_tools = ["Bash", "Read", "Write", "WebSearch"]
dangerously_skip_permissions = false
# uses subscription by default, override to use api billing
use_api_billing = false
[pi]
model = "gpt-4.1"
provider = "openai"
```
## usage
@@ -66,6 +71,7 @@ cd ~/dev/your-repo
takopi
# or override the default engine for new threads:
takopi claude
takopi pi
```
resume lines always route to the matching engine; subcommands only override the default for new threads.
+484
View File
@@ -0,0 +1,484 @@
from __future__ import annotations
import logging
import os
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from uuid import uuid4
from ..backends import EngineBackend, EngineConfig
from ..config import ConfigError
from ..model import (
Action,
ActionEvent,
ActionKind,
ActionLevel,
ActionPhase,
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..utils.paths import relativize_command, relativize_path
logger = logging.getLogger(__name__)
ENGINE: EngineId = EngineId("pi")
STDERR_TAIL_LINES = 200
_RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--session\s+(?P<token>.+?)`?\s*$")
@dataclass(slots=True)
class PiStreamState:
resume: ResumeToken
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
last_assistant_error: str | None = None
last_usage: dict[str, Any] | None = None
started: bool = False
note_seq: int = 0
def _action_event(
*,
phase: ActionPhase,
action: Action,
ok: bool | None = None,
message: str | None = None,
level: ActionLevel | None = None,
) -> ActionEvent:
return ActionEvent(
engine=ENGINE,
action=action,
phase=phase,
ok=ok,
message=message,
level=level,
)
def _extract_text_blocks(content: Any) -> str | None:
if not isinstance(content, list):
return None
parts: list[str] = []
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") != "text":
continue
text = item.get("text")
if isinstance(text, str) and text:
parts.append(text)
if not parts:
return None
return "".join(parts).strip() or None
def _assistant_error(message: dict[str, Any]) -> str | None:
stop_reason = message.get("stopReason")
if stop_reason in {"error", "aborted"}:
error = message.get("errorMessage")
if isinstance(error, str) and error:
return error
return f"pi run {stop_reason}"
return None
def _tool_kind_and_title(
name: str,
args: dict[str, Any],
) -> tuple[ActionKind, str]:
tool = name.lower()
if tool == "bash":
command = args.get("command")
return "command", relativize_command(str(command or "bash"))
if tool in {"edit", "write"}:
path = args.get("path")
if path:
return "file_change", relativize_path(str(path))
return "file_change", tool
if tool == "read":
path = args.get("path")
if path:
return "tool", f"read: `{relativize_path(str(path))}`"
return "tool", "read"
if tool == "grep":
pattern = args.get("pattern")
return "tool", f"grep: {pattern}" if pattern else "grep"
if tool == "find":
pattern = args.get("pattern")
return "tool", f"find: {pattern}" if pattern else "find"
if tool == "ls":
path = args.get("path")
if path:
return "tool", f"ls: `{relativize_path(str(path))}`"
return "tool", "ls"
return "tool", name
def _last_assistant_message(messages: Any) -> dict[str, Any] | None:
if not isinstance(messages, list):
return None
for item in reversed(messages):
if isinstance(item, dict) and item.get("role") == "assistant":
return item
return None
def translate_pi_event(
event: dict[str, Any],
*,
title: str,
meta: dict[str, Any] | None,
state: PiStreamState,
) -> list[TakopiEvent]:
out: list[TakopiEvent] = []
if not state.started:
out.append(
StartedEvent(
engine=ENGINE,
resume=state.resume,
title=title,
meta=meta or None,
)
)
state.started = True
etype = event.get("type")
if etype == "tool_execution_start":
tool_id = event.get("toolCallId")
tool_name = event.get("toolName")
args = event.get("args") or {}
if not isinstance(args, dict):
args = {}
if isinstance(tool_id, str) and tool_id:
name = str(tool_name or "tool")
kind, title_str = _tool_kind_and_title(name, args)
detail: dict[str, Any] = {"tool_name": name, "args": args}
if kind == "file_change":
path = args.get("path")
if path:
detail["changes"] = [{"path": str(path), "kind": "update"}]
action = Action(id=tool_id, kind=kind, title=title_str, detail=detail)
state.pending_actions[action.id] = action
out.append(_action_event(phase="started", action=action))
return out
if etype == "tool_execution_end":
tool_id = event.get("toolCallId")
tool_name = event.get("toolName")
if isinstance(tool_id, str) and tool_id:
action = state.pending_actions.pop(tool_id, None)
name = str(tool_name or "tool")
if action is None:
action = Action(id=tool_id, kind="tool", title=name, detail={})
detail = dict(action.detail)
detail["result"] = event.get("result")
detail["is_error"] = event.get("isError")
is_error = event.get("isError") is True
out.append(
_action_event(
phase="completed",
action=Action(
id=action.id,
kind=action.kind,
title=action.title,
detail=detail,
),
ok=not is_error,
)
)
return out
if etype == "message_end":
message = event.get("message")
if isinstance(message, dict) and message.get("role") == "assistant":
text = _extract_text_blocks(message.get("content"))
if text:
state.last_assistant_text = text
usage = message.get("usage")
if isinstance(usage, dict):
state.last_usage = usage
error = _assistant_error(message)
if error:
state.last_assistant_error = error
return out
if etype == "agent_end":
assistant = _last_assistant_message(event.get("messages"))
if assistant:
text = _extract_text_blocks(assistant.get("content"))
if text:
state.last_assistant_text = text
usage = assistant.get("usage")
if isinstance(usage, dict):
state.last_usage = usage
error = _assistant_error(assistant)
if error:
state.last_assistant_error = error
ok = state.last_assistant_error is None
error = state.last_assistant_error
answer = state.last_assistant_text or ""
out.append(
CompletedEvent(
engine=ENGINE,
ok=ok,
answer=answer,
resume=state.resume,
error=error,
usage=state.last_usage,
)
)
return out
return out
class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re: re.Pattern[str] = _RESUME_RE
stderr_tail_lines = STDERR_TAIL_LINES
logger = logger
def __init__(
self,
*,
pi_cmd: str,
extra_args: list[str],
model: str | None,
provider: str | None,
session_title: str,
session_dir: Path | None,
) -> None:
self.pi_cmd = pi_cmd
self.extra_args = extra_args
self.model = model
self.provider = provider
self.session_title = session_title
self.session_dir = session_dir
def format_resume(self, token: ResumeToken) -> str:
if token.engine != ENGINE:
raise RuntimeError(f"resume token is for engine {token.engine!r}")
return f"`pi --session {self._quote_token(token.value)}`"
def extract_resume(self, text: str | None) -> ResumeToken | None:
if not text:
return None
found: str | None = None
for match in self.resume_re.finditer(text):
token = match.group("token")
if not token:
continue
token = token.strip()
if len(token) >= 2 and token[0] == token[-1] and token[0] in {'"', "'"}:
token = token[1:-1]
found = token
if not found:
return None
return ResumeToken(engine=self.engine, value=found)
def command(self) -> str:
return self.pi_cmd
def build_args(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: PiStreamState,
) -> list[str]:
_ = resume
args: list[str] = [*self.extra_args, "--print", "--mode", "json"]
if self.provider:
args.extend(["--provider", self.provider])
if self.model:
args.extend(["--model", self.model])
args.extend(["--session", state.resume.value])
args.append(self._sanitize_prompt(prompt))
return args
def stdin_payload(
self,
prompt: str,
resume: ResumeToken | None,
*,
state: PiStreamState,
) -> bytes | None:
_ = prompt, resume, state
return None
def env(self, *, state: PiStreamState) -> dict[str, str] | None:
_ = state
env = dict(os.environ)
env.setdefault("NO_COLOR", "1")
env.setdefault("CI", "1")
return env
def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState:
_ = prompt
if resume is None:
session_path = self._new_session_path()
token = ResumeToken(engine=ENGINE, value=session_path)
else:
token = resume
return PiStreamState(resume=token)
def translate(
self,
data: dict[str, Any],
*,
state: PiStreamState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = resume, found_session
meta: dict[str, Any] = {"cwd": os.getcwd()}
if self.model:
meta["model"] = self.model
if self.provider:
meta["provider"] = self.provider
return translate_pi_event(
data,
title=self.session_title,
meta=meta or None,
state=state,
)
def process_error_events(
self,
rc: int,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: PiStreamState,
) -> list[TakopiEvent]:
message = f"pi failed (rc={rc})."
resume_for_completed = found_session or resume or state.resume
return [
self.note_event(message, state=state, detail={"stderr_tail": stderr_tail}),
CompletedEvent(
engine=ENGINE,
ok=False,
answer=state.last_assistant_text or "",
resume=resume_for_completed,
error=message,
usage=state.last_usage,
),
]
def stream_end_events(
self,
*,
resume: ResumeToken | None,
found_session: ResumeToken | None,
stderr_tail: str,
state: PiStreamState,
) -> list[TakopiEvent]:
_ = stderr_tail
resume_for_completed = found_session or resume or state.resume
message = "pi finished without an agent_end event"
return [
CompletedEvent(
engine=ENGINE,
ok=False,
answer=state.last_assistant_text or "",
resume=resume_for_completed,
error=message,
usage=state.last_usage,
)
]
def _new_session_path(self) -> str:
session_dir = self.session_dir or _default_session_dir(Path.cwd())
session_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).isoformat()
safe_timestamp = timestamp.replace(":", "-").replace(".", "-")
token = uuid4().hex
filename = f"{safe_timestamp}_{token}.jsonl"
return str(session_dir / filename)
def _sanitize_prompt(self, prompt: str) -> str:
if prompt.startswith("-"):
return f" {prompt}"
return prompt
def _quote_token(self, token: str) -> str:
if not token:
return token
needs_quotes = any(ch.isspace() for ch in token)
if not needs_quotes and '"' not in token:
return token
escaped = token.replace('"', '\\"')
return f'"{escaped}"'
def _default_session_dir(cwd: Path) -> Path:
agent_dir = os.environ.get("PI_CODING_AGENT_DIR")
base = Path(agent_dir).expanduser() if agent_dir else Path.home() / ".pi" / "agent"
safe_path = f"--{str(cwd).lstrip('/\\\\').replace('/', '-').replace('\\\\', '-').replace(':', '-')}--"
return base / "sessions" / safe_path
def build_runner(config: EngineConfig, config_path: Path) -> Runner:
cmd = config.get("cmd") or "pi"
if not isinstance(cmd, str):
raise ConfigError(f"Invalid `pi.cmd` in {config_path}; expected a string.")
extra_args_value = config.get("extra_args")
if extra_args_value is None:
extra_args = []
elif isinstance(extra_args_value, list) and all(
isinstance(x, str) for x in extra_args_value
):
extra_args = list(extra_args_value)
else:
raise ConfigError(
f"Invalid `pi.extra_args` in {config_path}; expected a list of strings."
)
model = config.get("model")
if model is not None and not isinstance(model, str):
raise ConfigError(f"Invalid `pi.model` in {config_path}; expected a string.")
provider = config.get("provider")
if provider is not None and not isinstance(provider, str):
raise ConfigError(f"Invalid `pi.provider` in {config_path}; expected a string.")
session_dir_value = config.get("session_dir")
session_dir: Path | None = None
if session_dir_value is not None:
if not isinstance(session_dir_value, str):
raise ConfigError(
f"Invalid `pi.session_dir` in {config_path}; expected a string."
)
session_dir = Path(session_dir_value).expanduser()
title = str(config.get("session_title") or (model if model else "pi"))
return PiRunner(
pi_cmd=cmd,
extra_args=extra_args,
model=model,
provider=provider,
session_title=title,
session_dir=session_dir,
)
BACKEND = EngineBackend(
id="pi",
build_runner=build_runner,
cli_cmd="pi",
install_cmd="npm install -g @mariozechner/pi-coding-agent",
)
+3
View File
@@ -0,0 +1,3 @@
{"type":"agent_start"}
{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Request failed."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":5,"output":1,"cacheRead":0,"cacheWrite":0,"totalTokens":6,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"error","errorMessage":"Upstream error","timestamp":1}}
{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Request failed."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":5,"output":1,"cacheRead":0,"cacheWrite":0,"totalTokens":6,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"error","errorMessage":"Upstream error","timestamp":1}]}
+8
View File
@@ -0,0 +1,8 @@
{"type":"agent_start"}
{"type":"message_end","message":{"role":"assistant","content":[{"type":"toolCall","id":"tool_1","name":"bash","arguments":{"command":"ls -la"}}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":10,"output":4,"cacheRead":0,"cacheWrite":0,"totalTokens":14,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"toolUse","timestamp":1}}
{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls -la"}}
{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{"content":[{"type":"text","text":"ok"}],"details":{"exit_code":0}},"isError":false}
{"type":"tool_execution_start","toolCallId":"tool_2","toolName":"write","args":{"path":"notes.md","content":"hello"}}
{"type":"tool_execution_end","toolCallId":"tool_2","toolName":"write","result":{"content":[{"type":"text","text":"done"}],"details":{}},"isError":false}
{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Done. Added notes.md."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":12,"output":6,"cacheRead":0,"cacheWrite":0,"totalTokens":18,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"stop","timestamp":2}}
{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Done. Added notes.md."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":12,"output":6,"cacheRead":0,"cacheWrite":0,"totalTokens":18,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"stop","timestamp":2}]}
+219
View File
@@ -0,0 +1,219 @@
import json
from pathlib import Path
import anyio
import pytest
from takopi.model import ActionEvent, CompletedEvent, ResumeToken, StartedEvent
from takopi.runners.pi import ENGINE, PiRunner, PiStreamState, translate_pi_event
def _load_fixture(name: str) -> list[dict]:
path = Path(__file__).parent / "fixtures" / name
return [json.loads(line) for line in path.read_text().splitlines() if line.strip()]
def test_pi_resume_format_and_extract() -> None:
runner = PiRunner(
pi_cmd="pi",
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=None,
)
token = ResumeToken(engine=ENGINE, value="/tmp/pi/session.jsonl")
assert runner.format_resume(token) == "`pi --session /tmp/pi/session.jsonl`"
assert runner.extract_resume("`pi --session /tmp/pi/session.jsonl`") == token
assert runner.extract_resume('pi --session "/tmp/pi/session.jsonl"') == token
assert runner.extract_resume("`codex resume sid`") is None
spaced = ResumeToken(engine=ENGINE, value="/tmp/pi session.jsonl")
assert runner.format_resume(spaced) == '`pi --session "/tmp/pi session.jsonl"`'
assert runner.extract_resume('`pi --session "/tmp/pi session.jsonl"`') == spaced
def test_translate_success_fixture() -> None:
state = PiStreamState(resume=ResumeToken(engine=ENGINE, value="session.jsonl"))
events: list = []
for event in _load_fixture("pi_stream_success.jsonl"):
events.extend(translate_pi_event(event, title="pi", meta=None, state=state))
assert isinstance(events[0], StartedEvent)
started = next(evt for evt in events if isinstance(evt, StartedEvent))
action_events = [evt for evt in events if isinstance(evt, ActionEvent)]
assert len(action_events) == 4
started_actions = {
(evt.action.id, evt.phase): evt
for evt in action_events
if evt.phase == "started"
}
assert started_actions[("tool_1", "started")].action.kind == "command"
write_action = started_actions[("tool_2", "started")].action
assert write_action.kind == "file_change"
assert write_action.detail["changes"][0]["path"] == "notes.md"
completed_actions = {
(evt.action.id, evt.phase): evt
for evt in action_events
if evt.phase == "completed"
}
assert completed_actions[("tool_1", "completed")].ok is True
assert completed_actions[("tool_2", "completed")].ok is True
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
assert events[-1] == completed
assert completed.ok is True
assert completed.resume == started.resume
assert completed.answer == "Done. Added notes.md."
def test_translate_error_fixture() -> None:
state = PiStreamState(resume=ResumeToken(engine=ENGINE, value="session.jsonl"))
events: list = []
for event in _load_fixture("pi_stream_error.jsonl"):
events.extend(translate_pi_event(event, title="pi", meta=None, state=state))
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
assert completed.ok is False
assert completed.error == "Upstream error"
assert completed.answer == "Request failed."
@pytest.mark.anyio
async def test_run_serializes_same_session() -> None:
runner = PiRunner(
pi_cmd="pi",
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=None,
)
gate = anyio.Event()
in_flight = 0
max_in_flight = 0
async def run_stub(*_args, **_kwargs):
nonlocal in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
try:
await gate.wait()
yield CompletedEvent(
engine=ENGINE,
resume=ResumeToken(engine=ENGINE, value="session.jsonl"),
ok=True,
answer="ok",
)
finally:
in_flight -= 1
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
pass
token = ResumeToken(engine=ENGINE, value="session.jsonl")
async with anyio.create_task_group() as tg:
tg.start_soon(drain, "a", token)
tg.start_soon(drain, "b", token)
await anyio.sleep(0)
gate.set()
assert max_in_flight == 1
@pytest.mark.anyio
async def test_run_serializes_new_session_after_session_is_known(
tmp_path, monkeypatch
) -> None:
gate_path = tmp_path / "gate"
resume_marker = tmp_path / "resume_started"
pi_path = tmp_path / "pi"
pi_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"import sys\n"
"import time\n"
"\n"
"gate = os.environ['PI_TEST_GATE']\n"
"resume_marker = os.environ['PI_TEST_RESUME_MARKER']\n"
"resume_value = os.environ.get('PI_TEST_RESUME_VALUE')\n"
"\n"
"args = sys.argv[1:]\n"
"session_path = None\n"
"if '--session' in args:\n"
" idx = args.index('--session')\n"
" if idx + 1 < len(args):\n"
" session_path = args[idx + 1]\n"
"\n"
"print(json.dumps({'type': 'agent_start'}), flush=True)\n"
"\n"
"if resume_value and session_path == resume_value:\n"
" with open(resume_marker, 'w', encoding='utf-8') as f:\n"
" f.write('started')\n"
" f.flush()\n"
" print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n"
" sys.exit(0)\n"
"\n"
"while not os.path.exists(gate):\n"
" time.sleep(0.001)\n"
"print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n"
"sys.exit(0)\n",
encoding="utf-8",
)
pi_path.chmod(0o755)
monkeypatch.setenv("PI_TEST_GATE", str(gate_path))
monkeypatch.setenv("PI_TEST_RESUME_MARKER", str(resume_marker))
runner = PiRunner(
pi_cmd=str(pi_path),
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=tmp_path / "sessions",
)
session_started = anyio.Event()
resume_value: str | None = None
new_done = anyio.Event()
async def run_new() -> None:
nonlocal resume_value
async for event in runner.run("hello", None):
if isinstance(event, StartedEvent):
resume_value = event.resume.value
session_started.set()
new_done.set()
async def run_resume() -> None:
assert resume_value is not None
monkeypatch.setenv("PI_TEST_RESUME_VALUE", resume_value)
async for _event in runner.run(
"resume", ResumeToken(engine=ENGINE, value=resume_value)
):
pass
async with anyio.create_task_group() as tg:
tg.start_soon(run_new)
await session_started.wait()
tg.start_soon(run_resume)
await anyio.sleep(0.01)
assert not resume_marker.exists()
gate_path.write_text("go", encoding="utf-8")
await new_done.wait()
with anyio.fail_after(2):
while not resume_marker.exists():
await anyio.sleep(0.001)