From d9c53b9e3a32cb2c962849a17db796add7bb8a40 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Fri, 2 Jan 2026 16:13:55 +0400 Subject: [PATCH] feat: add pi runner (#24) --- changelog.md | 3 +- docs/developing.md | 18 +- docs/runner/pi/pi-runner.md | 137 ++++++ docs/runner/pi/pi-stream-json-cheatsheet.md | 67 +++ docs/runner/pi/pi-takopi-events.md | 154 +++++++ docs/specification.md | 5 +- readme.md | 8 +- src/takopi/runners/pi.py | 484 ++++++++++++++++++++ tests/fixtures/pi_stream_error.jsonl | 3 + tests/fixtures/pi_stream_success.jsonl | 8 + tests/test_pi_runner.py | 219 +++++++++ 11 files changed, 1098 insertions(+), 8 deletions(-) create mode 100644 docs/runner/pi/pi-runner.md create mode 100644 docs/runner/pi/pi-stream-json-cheatsheet.md create mode 100644 docs/runner/pi/pi-takopi-events.md create mode 100644 src/takopi/runners/pi.py create mode 100644 tests/fixtures/pi_stream_error.jsonl create mode 100644 tests/fixtures/pi_stream_success.jsonl create mode 100644 tests/test_pi_runner.py diff --git a/changelog.md b/changelog.md index 8c3f36b..0dfe925 100644 --- a/changelog.md +++ b/changelog.md @@ -4,7 +4,8 @@ ### changes -- TBD +- add a pi runner via the `pi` CLI with jsonl streaming and resume support +- document the pi runner, event mapping, and jsonl stream capture tips ### fixes diff --git a/docs/developing.md b/docs/developing.md index 9f314b9..9ee5926 100644 --- a/docs/developing.md +++ b/docs/developing.md @@ -49,7 +49,7 @@ The orchestrator module containing: - `/cancel` routes by reply-to progress message id (accepts extra text) - `/{engine}` on the first line selects the engine for new threads - Progress edits are throttled to 2s intervals and only run when new events arrive -- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``) +- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``, `` `claude --resume ` ``, `` `pi --session ` ``) - Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match - Bot command menu is synced on startup (`cancel` + engine commands) @@ -92,6 +92,13 @@ The orchestrator module containing: - Stderr is drained into a bounded tail (debug logging only) - Translation errors abort the run; keep event normalization defensive +### `runners/pi.py` - Pi runner + +| Component | Purpose | +|-----------|---------| +| `PiRunner` | Spawns `pi --print --mode json`, streams JSONL, emits takopi events | +| `translate_pi_event()` | Normalizes Pi JSONL into the takopi event schema | + ### `model.py` / `runner.py` - Core domain types | File | Purpose | @@ -113,6 +120,8 @@ Auto-discovers runner modules in `takopi.runners` that export `BACKEND`. | File | Purpose | |------|---------| | `codex.py` | Codex runner (JSONL → takopi events) + per-resume locks | +| `claude.py` | Claude runner (JSONL → takopi events) + per-resume locks | +| `pi.py` | Pi runner (JSONL → takopi events) + per-resume locks | | `mock.py` | Mock runner for tests/demos | ### `config.py` - Configuration loading @@ -166,7 +175,7 @@ handle_message() spawned as task with selected runner Send initial progress message (silent) ↓ runner.run(prompt, resume_token) - ├── Spawns engine subprocess (e.g., codex exec --json) + ├── Spawns engine subprocess (e.g., codex exec --json, pi --print --mode json) ├── Streams JSONL from stdout ├── Normalizes JSONL -> takopi events ├── Yields Takopi events (async iterator) @@ -184,8 +193,8 @@ Send/edit final message ### Resume Flow Same as above; auto-router polls all runners to extract resume tokens: -- Router returns first matching token (e.g. `` `claude --resume ` `` routes to Claude) -- Selected runner spawns with resume (e.g. `codex exec --json resume -`) +- Router returns first matching token (e.g. `` `claude --resume ` `` routes to Claude, `` `pi --session ` `` routes to Pi) +- Selected runner spawns with resume (e.g. `codex exec --json resume -`, `pi --print --mode json --session `) - Per-token lock serializes concurrent resumes on the same thread ## Error Handling @@ -193,6 +202,7 @@ Same as above; auto-router polls all runners to extract resume tokens: | Scenario | Behavior | |----------|----------| | `codex exec` fails (rc != 0) | Emits a warning `action` plus `completed(ok=false, error=...)` | +| `pi` fails (rc != 0) | Emits a warning `action` plus `completed(ok=false, error=...)` | | Telegram API error | Logged, edit skipped (progress continues) | | Cancellation | Cancel scope terminates the process group (POSIX) and renders `cancelled` | | Errors in handler | Final render uses `status=error` and preserves resume tokens when known | diff --git a/docs/runner/pi/pi-runner.md b/docs/runner/pi/pi-runner.md new file mode 100644 index 0000000..3ea8052 --- /dev/null +++ b/docs/runner/pi/pi-runner.md @@ -0,0 +1,137 @@ +Below is a concrete implementation spec for adding **Pi (pi-coding-agent CLI)** as a first-class engine in Takopi (v0.4.0). + +--- + +## Scope + +### Goal + +Add a new engine backend **`pi`** so Takopi can: + +* Run Pi non-interactively via the **pi CLI** (`pi --print`). +* Stream progress by parsing **`--mode json`** (newline-delimited JSON). Each line is a JSON object. +* Support resumable sessions via **`--session `** (Takopi emits a canonical resume line the user can reply with). + +### Non-goals (v1) + +* Interactive TUI flows (session picker, prompts, etc.) +* RPC mode (requires a long-running process and JSON commands) + +--- + +## UX and behavior + +### Engine selection + +* Existing: `takopi codex` +* New: `takopi pi` + +### Resume UX (canonical line) + +Takopi appends a **single backticked** resume line at the end of the message, like: + +```text +`pi --session /home/user/.pi/agent/sessions/--repo--/2026-01-02T12-34-56-789Z_abcd.jsonl` +``` + +Notes: + +* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session ` instead. +* The resume token is the **session file path** (JSONL), treated as an opaque string. +* If the path contains spaces, the runner will quote it. + +### Non-interactive runs + +Use `--print` and `--mode json` for headless JSONL output. + +Pi does not accept `-- ` to protect prompts starting with `-`. Takopi prefixes a leading space if the prompt begins with `-` so it is not parsed as a flag. + +--- + +## Config additions + +Takopi config lives at either: + +* `.takopi/takopi.toml` (project-local), or +* `~/.takopi/takopi.toml` (home). + +Add a new optional `[pi]` section. + +Recommended v1 schema: + +```toml +# .takopi/takopi.toml + +default_engine = "pi" + +[pi] +cmd = "pi" # optional; defaults to "pi" +extra_args = [] # optional list of strings, appended verbatim +model = "..." # optional; passed as --model +provider = "..." # optional; passed as --provider +session_dir = "..." # optional; directory for session files +session_title = "pi" # optional; defaults to model or "pi" +``` + +Notes: + +* `extra_args` lets you pass new Pi flags without changing Takopi. +* If `session_dir` is omitted, Takopi uses Pi's default session dir: + `~/.pi/agent/sessions/----` (with path separators replaced by `-`). + +--- + +## Code changes (by file) + +### 1) New file: `src/takopi/runners/pi.py` + +Expose a module-level `BACKEND = EngineBackend(...)`. + +#### Runner invocation + +The runner should launch Pi in headless JSON mode: + +```text +pi --print --mode json --session +``` + +When resuming, `` is the resume token extracted from the chat. + +#### Event translation + +Pi JSONL output is `AgentSessionEvent` (from `@mariozechner/pi-agent-core`). +The runner should translate: + +* `tool_execution_start` -> `action` (phase: started) +* `tool_execution_end` -> `action` (phase: completed) +* `agent_end` -> `completed` + +For the final answer, use the most recent assistant message text (from +`message_end` events). For errors, if the assistant stopReason is `error` or +`aborted`, emit `completed(ok=false, error=...)`. + +--- + +## Installation and auth + +Install the CLI globally: + +```text +npm install -g @mariozechner/pi-coding-agent +``` + +Auth is stored under `~/.pi/agent/auth.json`. Run `pi` once interactively to +set up credentials before using Takopi. + +--- + +## Known pitfalls + +* `--resume` is interactive; Takopi uses `--session ` instead. +* Prompts that start with `-` are interpreted as flags by the CLI. Takopi + prefixes a space to make them safe. + +--- + +If you want, I can also add a sample `takopi.toml` snippet to the README or +include a small quickstart section for Pi in the onboarding panel. diff --git a/docs/runner/pi/pi-stream-json-cheatsheet.md b/docs/runner/pi/pi-stream-json-cheatsheet.md new file mode 100644 index 0000000..582655c --- /dev/null +++ b/docs/runner/pi/pi-stream-json-cheatsheet.md @@ -0,0 +1,67 @@ +# Pi `--mode json` event cheatsheet + +`pi --print --mode json` writes **one JSON object per line** (JSONL) with a +required `type` field. These are `AgentSessionEvent` objects from +`@mariozechner/pi-agent-core`. + +## Top-level event lines + +### `agent_start` + +```json +{"type":"agent_start"} +``` + +### `agent_end` + +```json +{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Done."}],"stopReason":"stop","timestamp":123}]} +``` + +### `turn_start` / `turn_end` + +```json +{"type":"turn_start"} +``` + +```json +{"type":"turn_end","message":{...},"toolResults":[...]} +``` + +### `message_start` / `message_update` / `message_end` + +```json +{"type":"message_start","message":{"role":"assistant","content":[{"type":"text","text":"Working..."}]}} +``` + +```json +{"type":"message_update","message":{...},"assistantMessageEvent":{"type":"text_delta","delta":"...","contentIndex":0}} +``` + +```json +{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Done."}],"stopReason":"stop"}} +``` + +### `tool_execution_start` + +```json +{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"}} +``` + +### `tool_execution_update` + +```json +{"type":"tool_execution_update","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"},"partialResult":{"content":[{"type":"text","text":"..."}]}} +``` + +### `tool_execution_end` + +```json +{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{"content":[{"type":"text","text":"ok"}],"details":{}},"isError":false} +``` + +## Notes + +* `message_end` with `role = "assistant"` contains the final assistant text. +* `assistantMessageEvent` in `message_update` provides streaming deltas. +* `tool_execution_*` events map cleanly to Takopi `action` events. diff --git a/docs/runner/pi/pi-takopi-events.md b/docs/runner/pi/pi-takopi-events.md new file mode 100644 index 0000000..fe995ef --- /dev/null +++ b/docs/runner/pi/pi-takopi-events.md @@ -0,0 +1,154 @@ +# Pi -> Takopi event mapping (spec) + +This document specifies how to add a Pi runner to Takopi by translating +Pi CLI `--mode json` JSONL events into Takopi events. The Pi JSONL stream is +`AgentSessionEvent` from `@mariozechner/pi-agent-core`. + +The goal is to make Pi feel identical to the Codex/Claude runners from the +bridge/renderer point of view while preserving Takopi invariants (stable action +ids, per-session serialization, single completed event). + +--- + +## 1. Input stream contract (Pi CLI) + +Pi CLI emits **one JSON object per line** (JSONL) when invoked with: + +``` +pi --print --mode json +``` + +Notes: +- `--print` is required for non-interactive runs. +- `--mode json` outputs all agent events (no TUI banners). +- Pi does not support `-- `; prompts starting with `-` must be + prefixed (Takopi does this automatically). + +--- + +## 2. Resume tokens and resume lines + +- Engine id: `pi` +- Canonical resume line (embedded in chat): + +``` +`pi --session ` +``` + +The token is the **session JSONL file path**. + +Why not `--resume`? +- `--resume/-r` opens an interactive session picker; it does not accept a + session token. Takopi must use `--session ` instead. + +--- + +## 3. Session lifecycle + serialization + +Takopi requires **serialization per session token**: + +- For new runs (`resume=None`), do **not** acquire a lock until a `started` + event is emitted (Takopi emits this as soon as the first JSON event arrives). +- Once the session is known, acquire a lock for `pi:` and hold it + until the run completes. +- For resumed runs, acquire the lock immediately on entry. + +--- + +## 4. Event translation (Pi JSONL -> Takopi) + +Pi emits `AgentSessionEvent` objects. Only a subset is required for Takopi. + +### 4.1 `tool_execution_start` + +Example: +```json +{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls"}} +``` + +Mapping: +- Emit `action` with `phase="started"`. +- `action.id = toolCallId`. +- `action.kind` from tool name (see section 5). +- `action.title` derived from tool + args. + +### 4.2 `tool_execution_end` + +Example: +```json +{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{...},"isError":false} +``` + +Mapping: +- Emit `action` with `phase="completed"`. +- `ok = !isError`. +- Carry `result` and `isError` in `detail` for debugging. + +### 4.3 `message_end` (assistant) + +Pi emits message lifecycle events. For `message_end` where `message.role == "assistant"`: + +- Store the latest assistant text as the **final answer fallback**. +- If `stopReason` is `error` or `aborted`, store `errorMessage`. +- Capture `usage` for `completed.usage`. + +### 4.4 `agent_end` + +Example: +```json +{"type":"agent_end","messages":[...]} +``` + +Mapping: +- Emit a single `completed` event: + - `ok = true` unless the last assistant message has `stopReason` `error` or `aborted`. + - `answer = last assistant text` (from `message_end` or `agent_end.messages`). + - `error = errorMessage` if present. + - `resume = ResumeToken(engine="pi", value=session_path)`. + - `usage = last assistant usage`. + +### 4.5 Other events + +Ignore unknown events. If a JSONL line is malformed, emit a warning action and +continue (default `JsonlSubprocessRunner` behavior). + +--- + +## 5. Tool name -> ActionKind mapping heuristics + +Pi tool names are lower-case by default. Suggested mapping: + +| Tool name | ActionKind | Title logic | +| --- | --- | --- | +| `bash` | `command` | `args.command` | +| `edit`, `write` | `file_change` | `args.path` | +| `read` | `tool` | `read: ` | +| `grep` | `tool` | `grep: ` | +| `find` | `tool` | `find: ` | +| `ls` | `tool` | `ls: ` | +| (default) | `tool` | tool name | + +For `file_change`, include `detail.changes = [{"path": , "kind": "update"}]`. + +--- + +## 6. Usage mapping + +Takopi `completed.usage` should mirror Pi's assistant `usage` object without +transformation. + +--- + +## 7. Suggested Takopi config keys + +A minimal TOML config for Pi: + +```toml +[pi] +cmd = "pi" +model = "..." +provider = "..." +extra_args = [] +``` + +Use `extra_args` for any newer Pi CLI flags not explicitly mapped. diff --git a/docs/specification.md b/docs/specification.md index d7de39b..27533ff 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -23,7 +23,7 @@ Out of scope for v0.4.0: ## 2. Terminology -- **EngineId**: string identifier of an engine (e.g., `"codex"`). +- **EngineId**: string identifier of an engine (e.g., `"codex"`, `"claude"`, `"pi"`). - **Runner**: Takopi adapter that executes an engine process and yields **Takopi events**. - **Thread**: a single engine-side conversation, identified in Takopi by a **ResumeToken**. - **ResumeToken**: Takopi-owned thread identifier `{ engine: EngineId, value: str }`. @@ -41,6 +41,7 @@ The canonical ResumeLine embedded in chat MUST be the engine’s CLI resume comm - `codex resume ` - `claude --resume ` +- `pi --session ` Takopi MUST treat the runner as authoritative for: @@ -347,7 +348,7 @@ Decision (v0.4.0): * If an engine subcommand is provided, Takopi MUST still use the auto-router, but it overrides the configured default engine for new threads. * Resume extraction MUST poll **all** available runners (per §3.4) and route to the first matching runner. * New thread engine override (chat-level): - * Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude` or `/codex`) to select the engine for a **new** thread. +* Users MAY prefix the first non-empty line with `/{engine}` (e.g. `/claude`, `/codex`, or `/pi`) to select the engine for a **new** thread. * The bridge MUST strip that directive from the prompt before invoking the runner. * If a ResumeToken is resolved from the message or reply, it MUST take precedence and the `/{engine}` directive MUST be ignored. diff --git a/readme.md b/readme.md index 302ffc7..7cd4e76 100644 --- a/readme.md +++ b/readme.md @@ -2,7 +2,7 @@ 🐙 *he just wants to help-pi* -telegram bridge for codex, claude code, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions. +telegram bridge for codex, claude code, pi, and [other agents](docs/adding-a-runner.md). runs the agent cli, streams progress, and supports resumable sessions. ## features @@ -22,6 +22,7 @@ parallel runs across threads, per thread queue support. - at least one engine installed: - `codex` on PATH (`npm install -g @openai/codex` or `brew install codex`) - `claude` on PATH (`npm install -g @anthropic-ai/claude-code`) + - `pi` on PATH (`npm install -g @mariozechner/pi-coding-agent`) ## install @@ -55,6 +56,10 @@ allowed_tools = ["Bash", "Read", "Write", "WebSearch"] dangerously_skip_permissions = false # uses subscription by default, override to use api billing use_api_billing = false + +[pi] +model = "gpt-4.1" +provider = "openai" ``` ## usage @@ -66,6 +71,7 @@ cd ~/dev/your-repo takopi # or override the default engine for new threads: takopi claude +takopi pi ``` resume lines always route to the matching engine; subcommands only override the default for new threads. diff --git a/src/takopi/runners/pi.py b/src/takopi/runners/pi.py new file mode 100644 index 0000000..b235ca9 --- /dev/null +++ b/src/takopi/runners/pi.py @@ -0,0 +1,484 @@ +from __future__ import annotations + +import logging +import os +import re +from dataclasses import dataclass, field +from datetime import datetime, timezone +from pathlib import Path +from typing import Any +from uuid import uuid4 + +from ..backends import EngineBackend, EngineConfig +from ..config import ConfigError +from ..model import ( + Action, + ActionEvent, + ActionKind, + ActionLevel, + ActionPhase, + CompletedEvent, + EngineId, + ResumeToken, + StartedEvent, + TakopiEvent, +) +from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner +from ..utils.paths import relativize_command, relativize_path + +logger = logging.getLogger(__name__) + +ENGINE: EngineId = EngineId("pi") +STDERR_TAIL_LINES = 200 + +_RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--session\s+(?P.+?)`?\s*$") + + +@dataclass(slots=True) +class PiStreamState: + resume: ResumeToken + pending_actions: dict[str, Action] = field(default_factory=dict) + last_assistant_text: str | None = None + last_assistant_error: str | None = None + last_usage: dict[str, Any] | None = None + started: bool = False + note_seq: int = 0 + + +def _action_event( + *, + phase: ActionPhase, + action: Action, + ok: bool | None = None, + message: str | None = None, + level: ActionLevel | None = None, +) -> ActionEvent: + return ActionEvent( + engine=ENGINE, + action=action, + phase=phase, + ok=ok, + message=message, + level=level, + ) + + +def _extract_text_blocks(content: Any) -> str | None: + if not isinstance(content, list): + return None + parts: list[str] = [] + for item in content: + if not isinstance(item, dict): + continue + if item.get("type") != "text": + continue + text = item.get("text") + if isinstance(text, str) and text: + parts.append(text) + if not parts: + return None + return "".join(parts).strip() or None + + +def _assistant_error(message: dict[str, Any]) -> str | None: + stop_reason = message.get("stopReason") + if stop_reason in {"error", "aborted"}: + error = message.get("errorMessage") + if isinstance(error, str) and error: + return error + return f"pi run {stop_reason}" + return None + + +def _tool_kind_and_title( + name: str, + args: dict[str, Any], +) -> tuple[ActionKind, str]: + tool = name.lower() + if tool == "bash": + command = args.get("command") + return "command", relativize_command(str(command or "bash")) + if tool in {"edit", "write"}: + path = args.get("path") + if path: + return "file_change", relativize_path(str(path)) + return "file_change", tool + if tool == "read": + path = args.get("path") + if path: + return "tool", f"read: `{relativize_path(str(path))}`" + return "tool", "read" + if tool == "grep": + pattern = args.get("pattern") + return "tool", f"grep: {pattern}" if pattern else "grep" + if tool == "find": + pattern = args.get("pattern") + return "tool", f"find: {pattern}" if pattern else "find" + if tool == "ls": + path = args.get("path") + if path: + return "tool", f"ls: `{relativize_path(str(path))}`" + return "tool", "ls" + return "tool", name + + +def _last_assistant_message(messages: Any) -> dict[str, Any] | None: + if not isinstance(messages, list): + return None + for item in reversed(messages): + if isinstance(item, dict) and item.get("role") == "assistant": + return item + return None + + +def translate_pi_event( + event: dict[str, Any], + *, + title: str, + meta: dict[str, Any] | None, + state: PiStreamState, +) -> list[TakopiEvent]: + out: list[TakopiEvent] = [] + if not state.started: + out.append( + StartedEvent( + engine=ENGINE, + resume=state.resume, + title=title, + meta=meta or None, + ) + ) + state.started = True + + etype = event.get("type") + + if etype == "tool_execution_start": + tool_id = event.get("toolCallId") + tool_name = event.get("toolName") + args = event.get("args") or {} + if not isinstance(args, dict): + args = {} + if isinstance(tool_id, str) and tool_id: + name = str(tool_name or "tool") + kind, title_str = _tool_kind_and_title(name, args) + detail: dict[str, Any] = {"tool_name": name, "args": args} + if kind == "file_change": + path = args.get("path") + if path: + detail["changes"] = [{"path": str(path), "kind": "update"}] + action = Action(id=tool_id, kind=kind, title=title_str, detail=detail) + state.pending_actions[action.id] = action + out.append(_action_event(phase="started", action=action)) + return out + + if etype == "tool_execution_end": + tool_id = event.get("toolCallId") + tool_name = event.get("toolName") + if isinstance(tool_id, str) and tool_id: + action = state.pending_actions.pop(tool_id, None) + name = str(tool_name or "tool") + if action is None: + action = Action(id=tool_id, kind="tool", title=name, detail={}) + detail = dict(action.detail) + detail["result"] = event.get("result") + detail["is_error"] = event.get("isError") + is_error = event.get("isError") is True + out.append( + _action_event( + phase="completed", + action=Action( + id=action.id, + kind=action.kind, + title=action.title, + detail=detail, + ), + ok=not is_error, + ) + ) + return out + + if etype == "message_end": + message = event.get("message") + if isinstance(message, dict) and message.get("role") == "assistant": + text = _extract_text_blocks(message.get("content")) + if text: + state.last_assistant_text = text + usage = message.get("usage") + if isinstance(usage, dict): + state.last_usage = usage + error = _assistant_error(message) + if error: + state.last_assistant_error = error + return out + + if etype == "agent_end": + assistant = _last_assistant_message(event.get("messages")) + if assistant: + text = _extract_text_blocks(assistant.get("content")) + if text: + state.last_assistant_text = text + usage = assistant.get("usage") + if isinstance(usage, dict): + state.last_usage = usage + error = _assistant_error(assistant) + if error: + state.last_assistant_error = error + + ok = state.last_assistant_error is None + error = state.last_assistant_error + answer = state.last_assistant_text or "" + + out.append( + CompletedEvent( + engine=ENGINE, + ok=ok, + answer=answer, + resume=state.resume, + error=error, + usage=state.last_usage, + ) + ) + return out + + return out + + +class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): + engine: EngineId = ENGINE + resume_re: re.Pattern[str] = _RESUME_RE + stderr_tail_lines = STDERR_TAIL_LINES + logger = logger + + def __init__( + self, + *, + pi_cmd: str, + extra_args: list[str], + model: str | None, + provider: str | None, + session_title: str, + session_dir: Path | None, + ) -> None: + self.pi_cmd = pi_cmd + self.extra_args = extra_args + self.model = model + self.provider = provider + self.session_title = session_title + self.session_dir = session_dir + + def format_resume(self, token: ResumeToken) -> str: + if token.engine != ENGINE: + raise RuntimeError(f"resume token is for engine {token.engine!r}") + return f"`pi --session {self._quote_token(token.value)}`" + + def extract_resume(self, text: str | None) -> ResumeToken | None: + if not text: + return None + found: str | None = None + for match in self.resume_re.finditer(text): + token = match.group("token") + if not token: + continue + token = token.strip() + if len(token) >= 2 and token[0] == token[-1] and token[0] in {'"', "'"}: + token = token[1:-1] + found = token + if not found: + return None + return ResumeToken(engine=self.engine, value=found) + + def command(self) -> str: + return self.pi_cmd + + def build_args( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: PiStreamState, + ) -> list[str]: + _ = resume + args: list[str] = [*self.extra_args, "--print", "--mode", "json"] + if self.provider: + args.extend(["--provider", self.provider]) + if self.model: + args.extend(["--model", self.model]) + args.extend(["--session", state.resume.value]) + args.append(self._sanitize_prompt(prompt)) + return args + + def stdin_payload( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: PiStreamState, + ) -> bytes | None: + _ = prompt, resume, state + return None + + def env(self, *, state: PiStreamState) -> dict[str, str] | None: + _ = state + env = dict(os.environ) + env.setdefault("NO_COLOR", "1") + env.setdefault("CI", "1") + return env + + def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState: + _ = prompt + if resume is None: + session_path = self._new_session_path() + token = ResumeToken(engine=ENGINE, value=session_path) + else: + token = resume + return PiStreamState(resume=token) + + def translate( + self, + data: dict[str, Any], + *, + state: PiStreamState, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + _ = resume, found_session + meta: dict[str, Any] = {"cwd": os.getcwd()} + if self.model: + meta["model"] = self.model + if self.provider: + meta["provider"] = self.provider + return translate_pi_event( + data, + title=self.session_title, + meta=meta or None, + state=state, + ) + + def process_error_events( + self, + rc: int, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: PiStreamState, + ) -> list[TakopiEvent]: + message = f"pi failed (rc={rc})." + resume_for_completed = found_session or resume or state.resume + return [ + self.note_event(message, state=state, detail={"stderr_tail": stderr_tail}), + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_assistant_text or "", + resume=resume_for_completed, + error=message, + usage=state.last_usage, + ), + ] + + def stream_end_events( + self, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: PiStreamState, + ) -> list[TakopiEvent]: + _ = stderr_tail + resume_for_completed = found_session or resume or state.resume + message = "pi finished without an agent_end event" + return [ + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_assistant_text or "", + resume=resume_for_completed, + error=message, + usage=state.last_usage, + ) + ] + + def _new_session_path(self) -> str: + session_dir = self.session_dir or _default_session_dir(Path.cwd()) + session_dir.mkdir(parents=True, exist_ok=True) + timestamp = datetime.now(timezone.utc).isoformat() + safe_timestamp = timestamp.replace(":", "-").replace(".", "-") + token = uuid4().hex + filename = f"{safe_timestamp}_{token}.jsonl" + return str(session_dir / filename) + + def _sanitize_prompt(self, prompt: str) -> str: + if prompt.startswith("-"): + return f" {prompt}" + return prompt + + def _quote_token(self, token: str) -> str: + if not token: + return token + needs_quotes = any(ch.isspace() for ch in token) + if not needs_quotes and '"' not in token: + return token + escaped = token.replace('"', '\\"') + return f'"{escaped}"' + + +def _default_session_dir(cwd: Path) -> Path: + agent_dir = os.environ.get("PI_CODING_AGENT_DIR") + base = Path(agent_dir).expanduser() if agent_dir else Path.home() / ".pi" / "agent" + safe_path = f"--{str(cwd).lstrip('/\\\\').replace('/', '-').replace('\\\\', '-').replace(':', '-')}--" + return base / "sessions" / safe_path + + +def build_runner(config: EngineConfig, config_path: Path) -> Runner: + cmd = config.get("cmd") or "pi" + if not isinstance(cmd, str): + raise ConfigError(f"Invalid `pi.cmd` in {config_path}; expected a string.") + + extra_args_value = config.get("extra_args") + if extra_args_value is None: + extra_args = [] + elif isinstance(extra_args_value, list) and all( + isinstance(x, str) for x in extra_args_value + ): + extra_args = list(extra_args_value) + else: + raise ConfigError( + f"Invalid `pi.extra_args` in {config_path}; expected a list of strings." + ) + + model = config.get("model") + if model is not None and not isinstance(model, str): + raise ConfigError(f"Invalid `pi.model` in {config_path}; expected a string.") + + provider = config.get("provider") + if provider is not None and not isinstance(provider, str): + raise ConfigError(f"Invalid `pi.provider` in {config_path}; expected a string.") + + session_dir_value = config.get("session_dir") + session_dir: Path | None = None + if session_dir_value is not None: + if not isinstance(session_dir_value, str): + raise ConfigError( + f"Invalid `pi.session_dir` in {config_path}; expected a string." + ) + session_dir = Path(session_dir_value).expanduser() + + title = str(config.get("session_title") or (model if model else "pi")) + + return PiRunner( + pi_cmd=cmd, + extra_args=extra_args, + model=model, + provider=provider, + session_title=title, + session_dir=session_dir, + ) + + +BACKEND = EngineBackend( + id="pi", + build_runner=build_runner, + cli_cmd="pi", + install_cmd="npm install -g @mariozechner/pi-coding-agent", +) diff --git a/tests/fixtures/pi_stream_error.jsonl b/tests/fixtures/pi_stream_error.jsonl new file mode 100644 index 0000000..9e96d87 --- /dev/null +++ b/tests/fixtures/pi_stream_error.jsonl @@ -0,0 +1,3 @@ +{"type":"agent_start"} +{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Request failed."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":5,"output":1,"cacheRead":0,"cacheWrite":0,"totalTokens":6,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"error","errorMessage":"Upstream error","timestamp":1}} +{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Request failed."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":5,"output":1,"cacheRead":0,"cacheWrite":0,"totalTokens":6,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"error","errorMessage":"Upstream error","timestamp":1}]} diff --git a/tests/fixtures/pi_stream_success.jsonl b/tests/fixtures/pi_stream_success.jsonl new file mode 100644 index 0000000..3da2a53 --- /dev/null +++ b/tests/fixtures/pi_stream_success.jsonl @@ -0,0 +1,8 @@ +{"type":"agent_start"} +{"type":"message_end","message":{"role":"assistant","content":[{"type":"toolCall","id":"tool_1","name":"bash","arguments":{"command":"ls -la"}}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":10,"output":4,"cacheRead":0,"cacheWrite":0,"totalTokens":14,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"toolUse","timestamp":1}} +{"type":"tool_execution_start","toolCallId":"tool_1","toolName":"bash","args":{"command":"ls -la"}} +{"type":"tool_execution_end","toolCallId":"tool_1","toolName":"bash","result":{"content":[{"type":"text","text":"ok"}],"details":{"exit_code":0}},"isError":false} +{"type":"tool_execution_start","toolCallId":"tool_2","toolName":"write","args":{"path":"notes.md","content":"hello"}} +{"type":"tool_execution_end","toolCallId":"tool_2","toolName":"write","result":{"content":[{"type":"text","text":"done"}],"details":{}},"isError":false} +{"type":"message_end","message":{"role":"assistant","content":[{"type":"text","text":"Done. Added notes.md."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":12,"output":6,"cacheRead":0,"cacheWrite":0,"totalTokens":18,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"stop","timestamp":2}} +{"type":"agent_end","messages":[{"role":"assistant","content":[{"type":"text","text":"Done. Added notes.md."}],"api":"openai-responses","provider":"openai","model":"gpt-4o-mini","usage":{"input":12,"output":6,"cacheRead":0,"cacheWrite":0,"totalTokens":18,"cost":{"input":0.0,"output":0.0,"cacheRead":0.0,"cacheWrite":0.0,"total":0.0}},"stopReason":"stop","timestamp":2}]} diff --git a/tests/test_pi_runner.py b/tests/test_pi_runner.py new file mode 100644 index 0000000..9280934 --- /dev/null +++ b/tests/test_pi_runner.py @@ -0,0 +1,219 @@ +import json +from pathlib import Path + +import anyio +import pytest + +from takopi.model import ActionEvent, CompletedEvent, ResumeToken, StartedEvent +from takopi.runners.pi import ENGINE, PiRunner, PiStreamState, translate_pi_event + + +def _load_fixture(name: str) -> list[dict]: + path = Path(__file__).parent / "fixtures" / name + return [json.loads(line) for line in path.read_text().splitlines() if line.strip()] + + +def test_pi_resume_format_and_extract() -> None: + runner = PiRunner( + pi_cmd="pi", + extra_args=[], + model=None, + provider=None, + session_title="pi", + session_dir=None, + ) + token = ResumeToken(engine=ENGINE, value="/tmp/pi/session.jsonl") + + assert runner.format_resume(token) == "`pi --session /tmp/pi/session.jsonl`" + assert runner.extract_resume("`pi --session /tmp/pi/session.jsonl`") == token + assert runner.extract_resume('pi --session "/tmp/pi/session.jsonl"') == token + assert runner.extract_resume("`codex resume sid`") is None + + spaced = ResumeToken(engine=ENGINE, value="/tmp/pi session.jsonl") + assert runner.format_resume(spaced) == '`pi --session "/tmp/pi session.jsonl"`' + assert runner.extract_resume('`pi --session "/tmp/pi session.jsonl"`') == spaced + + +def test_translate_success_fixture() -> None: + state = PiStreamState(resume=ResumeToken(engine=ENGINE, value="session.jsonl")) + events: list = [] + for event in _load_fixture("pi_stream_success.jsonl"): + events.extend(translate_pi_event(event, title="pi", meta=None, state=state)) + + assert isinstance(events[0], StartedEvent) + started = next(evt for evt in events if isinstance(evt, StartedEvent)) + + action_events = [evt for evt in events if isinstance(evt, ActionEvent)] + assert len(action_events) == 4 + + started_actions = { + (evt.action.id, evt.phase): evt + for evt in action_events + if evt.phase == "started" + } + assert started_actions[("tool_1", "started")].action.kind == "command" + write_action = started_actions[("tool_2", "started")].action + assert write_action.kind == "file_change" + assert write_action.detail["changes"][0]["path"] == "notes.md" + + completed_actions = { + (evt.action.id, evt.phase): evt + for evt in action_events + if evt.phase == "completed" + } + assert completed_actions[("tool_1", "completed")].ok is True + assert completed_actions[("tool_2", "completed")].ok is True + + completed = next(evt for evt in events if isinstance(evt, CompletedEvent)) + assert events[-1] == completed + assert completed.ok is True + assert completed.resume == started.resume + assert completed.answer == "Done. Added notes.md." + + +def test_translate_error_fixture() -> None: + state = PiStreamState(resume=ResumeToken(engine=ENGINE, value="session.jsonl")) + events: list = [] + for event in _load_fixture("pi_stream_error.jsonl"): + events.extend(translate_pi_event(event, title="pi", meta=None, state=state)) + + completed = next(evt for evt in events if isinstance(evt, CompletedEvent)) + assert completed.ok is False + assert completed.error == "Upstream error" + assert completed.answer == "Request failed." + + +@pytest.mark.anyio +async def test_run_serializes_same_session() -> None: + runner = PiRunner( + pi_cmd="pi", + extra_args=[], + model=None, + provider=None, + session_title="pi", + session_dir=None, + ) + gate = anyio.Event() + in_flight = 0 + max_in_flight = 0 + + async def run_stub(*_args, **_kwargs): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + try: + await gate.wait() + yield CompletedEvent( + engine=ENGINE, + resume=ResumeToken(engine=ENGINE, value="session.jsonl"), + ok=True, + answer="ok", + ) + finally: + in_flight -= 1 + + runner.run_impl = run_stub # type: ignore[assignment] + + async def drain(prompt: str, resume: ResumeToken | None) -> None: + async for _event in runner.run(prompt, resume): + pass + + token = ResumeToken(engine=ENGINE, value="session.jsonl") + async with anyio.create_task_group() as tg: + tg.start_soon(drain, "a", token) + tg.start_soon(drain, "b", token) + await anyio.sleep(0) + gate.set() + assert max_in_flight == 1 + + +@pytest.mark.anyio +async def test_run_serializes_new_session_after_session_is_known( + tmp_path, monkeypatch +) -> None: + gate_path = tmp_path / "gate" + resume_marker = tmp_path / "resume_started" + + pi_path = tmp_path / "pi" + pi_path.write_text( + "#!/usr/bin/env python3\n" + "import json\n" + "import os\n" + "import sys\n" + "import time\n" + "\n" + "gate = os.environ['PI_TEST_GATE']\n" + "resume_marker = os.environ['PI_TEST_RESUME_MARKER']\n" + "resume_value = os.environ.get('PI_TEST_RESUME_VALUE')\n" + "\n" + "args = sys.argv[1:]\n" + "session_path = None\n" + "if '--session' in args:\n" + " idx = args.index('--session')\n" + " if idx + 1 < len(args):\n" + " session_path = args[idx + 1]\n" + "\n" + "print(json.dumps({'type': 'agent_start'}), flush=True)\n" + "\n" + "if resume_value and session_path == resume_value:\n" + " with open(resume_marker, 'w', encoding='utf-8') as f:\n" + " f.write('started')\n" + " f.flush()\n" + " print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n" + " sys.exit(0)\n" + "\n" + "while not os.path.exists(gate):\n" + " time.sleep(0.001)\n" + "print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n" + "sys.exit(0)\n", + encoding="utf-8", + ) + pi_path.chmod(0o755) + + monkeypatch.setenv("PI_TEST_GATE", str(gate_path)) + monkeypatch.setenv("PI_TEST_RESUME_MARKER", str(resume_marker)) + + runner = PiRunner( + pi_cmd=str(pi_path), + extra_args=[], + model=None, + provider=None, + session_title="pi", + session_dir=tmp_path / "sessions", + ) + + session_started = anyio.Event() + resume_value: str | None = None + new_done = anyio.Event() + + async def run_new() -> None: + nonlocal resume_value + async for event in runner.run("hello", None): + if isinstance(event, StartedEvent): + resume_value = event.resume.value + session_started.set() + new_done.set() + + async def run_resume() -> None: + assert resume_value is not None + monkeypatch.setenv("PI_TEST_RESUME_VALUE", resume_value) + async for _event in runner.run( + "resume", ResumeToken(engine=ENGINE, value=resume_value) + ): + pass + + async with anyio.create_task_group() as tg: + tg.start_soon(run_new) + await session_started.wait() + + tg.start_soon(run_resume) + await anyio.sleep(0.01) + + assert not resume_marker.exists() + + gate_path.write_text("go", encoding="utf-8") + await new_done.wait() + + with anyio.fail_after(2): + while not resume_marker.exists(): + await anyio.sleep(0.001)