docs: improve documentation coverage (#52)

This commit is contained in:
banteg
2026-01-04 19:00:27 +04:00
committed by GitHub
parent 465dda20a2
commit c64913ed6d
11 changed files with 197 additions and 219 deletions
+62 -62
View File
@@ -9,7 +9,7 @@ Takopi is designed so that adding a runner usually means **adding one new module
`src/takopi/runners/` plus a small **msgspec schema** module under `src/takopi/schemas/`
no changes to the bridge, renderer, or CLI.
The walkthrough below uses an **imaginary engine** named **Pi** (`pi`) and intentionally mirrors
The walkthrough below uses an **imaginary engine** named **Acme** (`acme`) and intentionally mirrors
the patterns used in `runners/claude.py`.
---
@@ -18,8 +18,8 @@ the patterns used in `runners/claude.py`.
After you add a runner, you should be able to:
- Run `takopi pi` (CLI subcommand is auto-registered).
- Start a new session and get a resume line like `` `pi --resume <token>` ``.
- Run `takopi acme` (CLI subcommand is auto-registered).
- Start a new session and get a resume line like `` `acme --resume <token>` ``.
- Reply to any bot message containing that resume line and continue the same session.
- See progress updates (optional) and always get a final completion event.
@@ -64,20 +64,20 @@ This matters because Takopis Telegram truncation logic preserves resume lines
---
## Step-by-step: add the imaginary `pi` runner
## Step-by-step: add the imaginary `acme` runner
### Step 1 — Pick an engine id + resume command
Choose a stable engine id string. This string becomes:
- The config table name (`[pi]` in `takopi.toml`)
- The CLI subcommand (`takopi pi`)
- The config table name (`[acme]` in `takopi.toml`)
- The CLI subcommand (`takopi acme`)
- The `ResumeToken.engine`
For Pi well use:
For Acme well use:
- Engine id: `"pi"`
- Canonical resume command embedded in chat: `` `pi --resume <token>` ``
- Engine id: `"acme"`
- Canonical resume command embedded in chat: `` `acme --resume <token>` ``
#### Write a resume regex
@@ -86,7 +86,7 @@ match full line, and capture a group named `token`.
```py
_RESUME_RE = re.compile(
r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
r"(?im)^\s*`?acme\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
)
```
@@ -98,20 +98,20 @@ Why this shape?
---
### Step 2 — Create `src/takopi/schemas/pi.py` + `src/takopi/runners/pi.py`
### Step 2 — Create `src/takopi/schemas/acme.py` + `src/takopi/runners/acme.py`
Create a new schema module and a runner module:
```
src/takopi/schemas/
codex.py
pi.py # ← new
acme.py # ← new
src/takopi/runners/
codex.py
claude.py
mock.py
pi.py # ← new
acme.py # ← new
```
Takopi discovers engines by importing modules in `takopi.runners` and looking for a
@@ -119,7 +119,7 @@ module-level `BACKEND: EngineBackend` (see `takopi.engines`).
---
### Step 3 — Translate Pi JSONL into Takopi events
### Step 3 — Translate Acme JSONL into Takopi events
Most CLIs we integrate are JSONL-streaming processes.
@@ -149,7 +149,7 @@ from dataclasses import dataclass, field
from ..events import EventFactory
@dataclass
class PiStreamState:
class AcmeStreamState:
factory: EventFactory = field(default_factory=lambda: EventFactory(ENGINE))
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
@@ -196,31 +196,31 @@ class Final(msgspec.Struct, tag="final", kw_only=True):
error: str | None = None
PiEvent: TypeAlias = SessionStart | ToolUse | ToolResult | Final
AcmeEvent: TypeAlias = SessionStart | ToolUse | ToolResult | Final
_DECODER = msgspec.json.Decoder(PiEvent)
_DECODER = msgspec.json.Decoder(AcmeEvent)
def decode_event(data: bytes | str) -> PiEvent:
def decode_event(data: bytes | str) -> AcmeEvent:
return _DECODER.decode(data)
```
#### Decide what Pi emits
#### Decide what Acme emits
For this guide, assume Pi outputs events like:
For this guide, assume Acme outputs events like:
```json
{"type":"session.start","session_id":"pi_01","model":"pi-large"}
{"type":"session.start","session_id":"acme_01","model":"acme-large"}
{"type":"tool.use","id":"toolu_1","name":"Bash","input":{"command":"ls"}}
{"type":"tool.result","tool_use_id":"toolu_1","content":"ok","is_error":false}
{"type":"final","session_id":"pi_01","ok":true,"answer":"Done."}
{"type":"final","session_id":"acme_01","ok":true,"answer":"Done."}
```
#### Map them to Takopi events
Use this mapping (mirrors Claudes approach):
- `session.start``StartedEvent(engine="pi", resume=ResumeToken("pi", session_id))`
- `session.start``StartedEvent(engine="acme", resume=ResumeToken("acme", session_id))`
- `tool.use``ActionEvent(phase="started")` and stash action in `pending_actions`
- `tool.result``ActionEvent(phase="completed", ok=...)` and pop from `pending_actions`
- `final``CompletedEvent(ok, answer, resume)`
@@ -232,26 +232,26 @@ Use this mapping (mirrors Claudes approach):
Claude keeps translation logic in a standalone function (`translate_claude_event(...)`).
This makes it easy to unit test without spawning a subprocess.
Do the same for Pi. Use pattern matching against msgspec shapes, and rely on the
Do the same for Acme. Use pattern matching against msgspec shapes, and rely on the
`EventFactory` (as in Codex/Claude) to standardize event creation:
```py
def translate_pi_event(
event: pi_schema.PiEvent,
def translate_acme_event(
event: acme_schema.AcmeEvent,
*,
title: str,
state: PiStreamState,
state: AcmeStreamState,
factory: EventFactory,
) -> list[TakopiEvent]:
match event:
case pi_schema.SessionStart(session_id=session_id, model=model):
case acme_schema.SessionStart(session_id=session_id, model=model):
if not session_id:
return []
event_title = str(model) if model else title
token = ResumeToken(engine=ENGINE, value=session_id)
return [factory.started(token, title=event_title)]
case pi_schema.ToolUse(id=tool_id, name=name, input=tool_input):
case acme_schema.ToolUse(id=tool_id, name=name, input=tool_input):
if not tool_id:
return []
tool_input = tool_input or {}
@@ -281,7 +281,7 @@ def translate_pi_event(
)
]
case pi_schema.ToolResult(
case acme_schema.ToolResult(
tool_use_id=tool_use_id, content=content, is_error=is_error
):
if not tool_use_id:
@@ -315,7 +315,7 @@ def translate_pi_event(
)
]
case pi_schema.Final(session_id=session_id, ok=ok, answer=answer, error=error):
case acme_schema.Final(session_id=session_id, ok=ok, answer=answer, error=error):
answer = answer or ""
if ok and not answer and state.last_assistant_text:
answer = state.last_assistant_text
@@ -327,7 +327,7 @@ def translate_pi_event(
if ok:
return [factory.completed_ok(answer=answer, resume=resume)]
error_text = str(error) if error else "pi run failed"
error_text = str(error) if error else "acme run failed"
return [
factory.completed_error(
error=error_text,
@@ -349,7 +349,7 @@ This is intentionally close to Claudes structure:
---
### Step 4 — Implement the `PiRunner` class
### Step 4 — Implement the `AcmeRunner` class
Most engines can implement a runner by combining:
@@ -383,35 +383,35 @@ from ..model import (
)
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import pi as pi_schema
from ..schemas import acme as acme_schema
logger = logging.getLogger(__name__)
ENGINE: EngineId = EngineId("pi")
ENGINE: EngineId = EngineId("acme")
_RESUME_RE = re.compile(
r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
r"(?im)^\s*`?acme\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
)
@dataclass
class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
class AcmeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re: re.Pattern[str] = _RESUME_RE
pi_cmd: str = "pi"
acme_cmd: str = "acme"
model: str | None = None
allowed_tools: list[str] | None = None
session_title: str = "pi"
session_title: str = "acme"
logger = logger
def format_resume(self, token: ResumeToken) -> str:
# Override because our canonical resume command is "pi --resume ...".
# Override because our canonical resume command is "acme --resume ...".
if token.engine != ENGINE:
raise RuntimeError(f"resume token is for engine {token.engine!r}")
return f"`pi --resume {token.value}`"
return f"`acme --resume {token.value}`"
def command(self) -> str:
return self.pi_cmd
return self.acme_cmd
def build_args(
self,
@@ -438,33 +438,33 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
state: Any,
) -> bytes | None:
_ = resume, state
# Pi reads the prompt from stdin.
# Acme reads the prompt from stdin.
return prompt.encode()
def new_state(self, prompt: str, resume: ResumeToken | None) -> PiStreamState:
def new_state(self, prompt: str, resume: ResumeToken | None) -> AcmeStreamState:
_ = prompt, resume
return PiStreamState()
return AcmeStreamState()
def decode_jsonl(
self,
*,
raw: bytes,
line: bytes,
state: PiStreamState,
) -> pi_schema.PiEvent | None:
state: AcmeStreamState,
) -> acme_schema.AcmeEvent | None:
_ = raw, state
return pi_schema.decode_event(line)
return acme_schema.decode_event(line)
def translate(
self,
data: pi_schema.PiEvent,
data: acme_schema.AcmeEvent,
*,
state: PiStreamState,
state: AcmeStreamState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = resume, found_session
return translate_pi_event(
return translate_acme_event(
data,
title=self.session_title,
state=state,
@@ -501,15 +501,15 @@ Follow the pattern in `runners/claude.py`:
```py
def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
pi_cmd = "pi"
acme_cmd = "acme"
model = config.get("model")
allowed_tools = config.get("allowed_tools")
title = str(model) if model is not None else "pi"
title = str(model) if model is not None else "acme"
return PiRunner(
pi_cmd=pi_cmd,
return AcmeRunner(
acme_cmd=acme_cmd,
model=model,
allowed_tools=allowed_tools,
session_title=title,
@@ -517,9 +517,9 @@ def build_runner(config: EngineConfig, _config_path: Path) -> Runner:
BACKEND = EngineBackend(
id="pi",
id="acme",
build_runner=build_runner,
install_cmd="npm install -g @acme/pi-cli",
install_cmd="npm install -g @acme/acme-cli",
)
```
@@ -530,7 +530,7 @@ to register the runner elsewhere.
If the binary name differs from the engine id, set:
- `EngineBackend(cli_cmd="pi-cli")`
- `EngineBackend(cli_cmd="acme-cli")`
so onboarding can find it on PATH.
@@ -544,7 +544,7 @@ A good runner PR usually contains 3 types of tests.
Copy `tests/test_claude_runner.py::test_claude_resume_format_and_extract`.
For Pi, assert:
For Acme, assert:
- `format_resume(...)` outputs the canonical resume line.
- `extract_resume(...)` can parse it back out.
@@ -556,8 +556,8 @@ Claudes translation tests load JSONL fixtures and feed them into the pure tra
Do the same:
- `tests/fixtures/pi_stream_success.jsonl`
- `tests/fixtures/pi_stream_error.jsonl`
- `tests/fixtures/acme_stream_success.jsonl`
- `tests/fixtures/acme_stream_error.jsonl`
Then assert:
@@ -614,7 +614,7 @@ one targeted test catches regressions.
Before you call the runner “done”:
- [ ] `takopi pi` appears automatically (module exports `BACKEND`).
- [ ] `takopi acme` appears automatically (module exports `BACKEND`).
- [ ] `format_resume()` matches `extract_resume()` + `is_resume_line()`.
- [ ] Translation emits exactly one `StartedEvent` and one `CompletedEvent`.
- [ ] `CompletedEvent.resume` matches `StartedEvent.resume`.
+63 -1
View File
@@ -121,9 +121,67 @@ Auto-discovers runner modules in `takopi.runners` that export `BACKEND`.
|------|---------|
| `codex.py` | Codex runner (JSONL → takopi events) + per-resume locks |
| `claude.py` | Claude runner (JSONL → takopi events) + per-resume locks |
| `opencode.py` | OpenCode runner (JSONL → takopi events) + per-resume locks |
| `pi.py` | Pi runner (JSONL → takopi events) + per-resume locks |
| `mock.py` | Mock runner for tests/demos |
### `schemas/` - JSONL decoding schemas
Self-documenting msgspec schemas for decoding engine JSONL streams.
| File | Purpose |
|------|---------|
| `codex.py` | `codex exec --json` event schemas |
| `claude.py` | `claude -p --output-format stream-json --verbose` event schemas |
| `opencode.py` | `opencode run --format json` event schemas |
| `pi.py` | `pi --print --mode json` event schemas |
### `utils/` - Utility modules
| File | Purpose |
|------|---------|
| `paths.py` | `relativize_path()`, `relativize_command()` helpers |
| `streams.py` | `iter_bytes_lines()`, `drain_stderr()` for async stream handling |
| `subprocess.py` | `manage_subprocess()`, `terminate_process()`, `kill_process()` |
### `router.py` - Auto-router
| Component | Purpose |
|-----------|---------|
| `AutoRouter` | Resolves resume tokens by polling all runners, routes to matching engine |
| `RunnerEntry` | Dataclass holding runner + backend metadata |
| `RunnerUnavailableError` | Raised when requested engine is not available |
### `scheduler.py` - Thread scheduling
| Component | Purpose |
|-----------|---------|
| `ThreadScheduler` | Per-thread FIFO job queuing with serialization |
| `ThreadJob` | Dataclass representing a queued job |
| `note_thread_known()` | Registers a thread as busy when token discovered mid-run |
### `events.py` - Event factory
| Component | Purpose |
|-----------|---------|
| `EventFactory` | Helper class for creating takopi events with consistent engine/resume |
| Builder methods | `started()`, `action()`, `action_started()`, `action_updated()`, `action_completed()`, `completed()`, `completed_ok()`, `completed_error()` |
### `lockfile.py` - Single-instance enforcement
| Component | Purpose |
|-----------|---------|
| `acquire_lock()` | Acquire lock for bot token, returns `LockHandle` context manager |
| `LockHandle` | Context manager for automatic lock release |
| `LockInfo` | Dataclass with `pid` and `token_fingerprint` |
| `token_fingerprint()` | SHA256 hash of bot token, truncated to 10 chars |
### `backends_helpers.py` - Backend utilities
| Function | Purpose |
|----------|---------|
| `install_issue()` | Creates `SetupIssue` with install instructions for missing CLI |
### `config.py` - Configuration loading
```python
@@ -145,6 +203,10 @@ Environment flags:
- `TAKOPI_LOG_COLOR` (`1/true/yes/on` to force color, `0/false/no/off` to disable)
- `TAKOPI_LOG_FILE` (append JSON lines to a file)
- `TAKOPI_TRACE_PIPELINE` (log pipeline events at info instead of debug)
- `TAKOPI_NO_INTERACTIVE` (disable interactive prompts for CI/non-TTY environments)
- `PI_CODING_AGENT_DIR` (override Pi agent session directory base path)
CLI flag: `--debug` enables debug logging (overrides `TAKOPI_LOG_LEVEL`).
### `onboarding.py` - Setup validation
@@ -173,7 +235,7 @@ run_main_loop() spawns tasks in TaskGroup
router.resolve_resume(text, reply_text) → ResumeToken | None
router.runner_for(resume_token) → selects runner (default engine if None)
router.entry_for(resume_token) or router.entry_for_engine(default) → RunnerEntry
handle_message() spawned as task with selected runner
+3 -9
View File
@@ -1,16 +1,10 @@
# Claude Code -> Takopi event mapping (spec)
This document specifies how the Claude Code runner (implemented in Takopi v0.3.0)
translates Claude CLI `--output-format stream-json` JSONL events into Takopi events.
It is based on the reverse-engineered schema in `humanlayer/claudecode-go`:
This document describes how the Claude Code runner translates Claude CLI JSONL events into Takopi events.
- `claudecode-go/types.go` (StreamEvent, Message, Content, Result)
- `claudecode-go/client.go` (CLI flags, stream parsing)
- `claudecode-go/client_test.go` (schema validation + permission_denials)
> **Authoritative source:** The schema definitions are in `src/takopi/schemas/claude.py` and the translation logic is in `src/takopi/runners/claude.py`. When in doubt, refer to the code.
The goal is to make a Claude runner feel identical to the Codex runner from the
bridge/renderer point of view while preserving Takopi invariants (stable action
ids, per-session serialization, single completed event).
The goal is to make a Claude runner feel identical to the Codex runner from the bridge/renderer point of view while preserving Takopi invariants (stable action ids, per-session serialization, single completed event).
---
+7 -3
View File
@@ -1,8 +1,12 @@
Heres a clean way to make “Takopi events” just **3 shapes** while still covering **every `codex exec --json` line type** and preserving the invariants you care about (stable IDs, resume/thread ownership, final answer delivery).
# Codex -> Takopi event mapping
This document describes how Codex exec --json events are translated to Takopi's normalized event model.
> **Authoritative source:** The schema definitions are in `src/takopi/schemas/codex.py` and the translation logic is in `src/takopi/runners/codex.py`. When in doubt, refer to the code.
## The 3-event Takopi schema
Id model it like this (JSON-ish). The important trick is: **your single `action` event needs a `phase`**, otherwise you cant represent started/updated/completed lifecycles.
The Takopi event model uses 3 event types. The `action` event includes a `phase` field to represent started/updated/completed lifecycles.
### 1) `started`
@@ -28,7 +32,7 @@ Emitted for **everything that is progress / updates / warnings / per-item lifecy
"engine": "codex",
"action": {
"id": "item_5",
"kind": "tool", // command | tool | file_change | web_search | note | turn | warning | telemetry
"kind": "tool", // command | tool | file_change | web_search | subagent | note | turn | warning | telemetry
"title": "docs.search", // short label for renderer
"detail": { ... } // structured payload (freeform)
},
@@ -1,7 +1,8 @@
# OpenCode to Takopi Event Mapping
This document describes how OpenCode JSON events are translated to Takopi's normalized event model.
The OpenCode runner shipped in Takopi v0.5.0.
> **Authoritative source:** The schema definitions are in `src/takopi/schemas/opencode.py` and the translation logic is in `src/takopi/runners/opencode.py`. When in doubt, refer to the code.
## Event Translation
+3 -6
View File
@@ -54,7 +54,7 @@ Takopi config lives at `~/.takopi/takopi.toml`.
Add a new optional `[pi]` section.
Recommended v1 schema:
Recommended schema:
```toml
# ~/.takopi/takopi.toml
@@ -62,18 +62,15 @@ Recommended v1 schema:
default_engine = "pi"
[pi]
cmd = "pi" # optional; defaults to "pi"
extra_args = [] # optional list of strings, appended verbatim
model = "..." # optional; passed as --model
provider = "..." # optional; passed as --provider
session_dir = "..." # optional; directory for session files
session_title = "pi" # optional; defaults to model or "pi"
extra_args = [] # optional list of strings, appended verbatim
```
Notes:
* `extra_args` lets you pass new Pi flags without changing Takopi.
* If `session_dir` is omitted, Takopi uses Pi's default session dir:
* Session files are stored under Pi's default session dir:
`~/.pi/agent/sessions/--<cwd>--` (with path separators replaced by `-`).
---
+5 -8
View File
@@ -1,12 +1,10 @@
# Pi -> Takopi event mapping (spec)
This document specifies how the Pi runner shipped in Takopi v0.5.0 translates
Pi CLI `--mode json` JSONL events into Takopi events. The Pi JSONL stream is
`AgentSessionEvent` from `@mariozechner/pi-agent-core`.
This document describes how the Pi runner translates Pi CLI `--mode json` JSONL events into Takopi events.
The goal is to make Pi feel identical to the Codex/Claude runners from the
bridge/renderer point of view while preserving Takopi invariants (stable action
ids, per-session serialization, single completed event).
> **Authoritative source:** The schema definitions are in `src/takopi/schemas/pi.py` and the translation logic is in `src/takopi/runners/pi.py`. When in doubt, refer to the code.
The goal is to make Pi feel identical to the Codex/Claude runners from the bridge/renderer point of view while preserving Takopi invariants (stable action ids, per-session serialization, single completed event).
---
@@ -145,10 +143,9 @@ A minimal TOML config for Pi:
```toml
[pi]
cmd = "pi"
model = "..."
provider = "..."
extra_args = []
```
Use `extra_args` for any newer Pi CLI flags not explicitly mapped.
Use `extra_args` for any Pi CLI flags not explicitly mapped.
+44 -5
View File
@@ -1,10 +1,10 @@
# Takopi Specification v0.5.0 [2026-01-02]
# Takopi Specification v0.8.0 [2026-01-04]
This document is **normative**. The words **MUST**, **SHOULD**, and **MAY** express requirements.
## 1. Scope
Takopi v0.5.0 specifies:
Takopi v0.8.0 specifies:
- A **Telegram** bot bridge that runs an agent **Runner** and posts:
- a throttled, edited **progress message**
@@ -15,7 +15,7 @@ Takopi v0.5.0 specifies:
- **Automatic runner selection** among multiple engines based on ResumeLine (with a configurable default for new threads)
- A Takopi-owned **normalized event model** produced by runners and consumed by renderers/bridge
Out of scope for v0.5.0:
Out of scope for v0.8.0:
- Non-Telegram clients (Slack/Discord/etc.)
- Token-by-token streaming of the assistants final answer
@@ -173,7 +173,7 @@ Stability requirements:
Action kinds SHOULD come from an extensible stable set, e.g.:
* `command`, `tool`, `file_change`, `web_search`, `turn`, `warning`, `telemetry`, `note`
* `command`, `tool`, `file_change`, `web_search`, `subagent`, `turn`, `warning`, `telemetry`, `note`
Unknown kinds MAY be rendered as `note`.
@@ -403,7 +403,46 @@ Tests MUST cover:
Test tooling SHOULD include event factories, deterministic/fake time, and a script/mock runner.
## 10. Changelog
## 10. Lockfile (single-instance enforcement)
Takopi MUST prevent multiple instances from racing `getUpdates` offsets for the same bot token.
### 10.1 Lock file location
The lock file MUST be stored at `<config_path>.lock`. For the default config path, this resolves to `~/.takopi/takopi.lock`.
### 10.2 Lock file format
The lock file MUST contain JSON with:
* `pid: int` — the process ID holding the lock
* `token_fingerprint: str` — SHA256 hash of the bot token, truncated to 10 characters
### 10.3 Lock acquisition rules
* If the lock file does not exist, acquire and write the lock.
* If the lock file exists and the PID is dead (not running), replace the lock.
* If the lock file exists and the token fingerprint differs (different bot), replace the lock.
* If the lock file exists, the PID is alive, and the fingerprint matches, fail with an error instructing the user to stop the other instance.
### 10.4 Lock release
The lock file SHOULD be removed on clean shutdown. Stale locks from crashed processes are handled by the acquisition rules above.
## 11. Changelog
### v0.8.0 (2026-01-04)
- Add `subagent` action kind for agent/task delegation tools.
- Add lockfile specification for single-instance enforcement (§10).
### v0.7.0 (2026-01-04)
- No normative changes; implementation migrated to structlog and msgspec schemas.
### v0.6.0 (2026-01-03)
- No normative changes; added interactive onboarding and lockfile implementation.
### v0.5.0 (2026-01-02)
+5
View File
@@ -66,9 +66,14 @@ dangerously_skip_permissions = false
# uses subscription by default, override to use api billing
use_api_billing = false
[opencode]
model = "claude-sonnet-4-20250514"
[pi]
model = "gpt-4.1"
provider = "openai"
# optional: additional CLI arguments
extra_args = ["--no-color"]
```
## usage
+3 -26
View File
@@ -245,24 +245,19 @@ def translate_pi_event(
class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
engine: EngineId = ENGINE
resume_re: re.Pattern[str] = _RESUME_RE
session_title: str = "pi"
logger = logger
def __init__(
self,
*,
pi_cmd: str,
extra_args: list[str],
model: str | None,
provider: str | None,
session_title: str,
session_dir: Path | None,
) -> None:
self.pi_cmd = pi_cmd
self.extra_args = extra_args
self.model = model
self.provider = provider
self.session_title = session_title
self.session_dir = session_dir
def format_resume(self, token: ResumeToken) -> str:
if token.engine != ENGINE:
@@ -286,7 +281,7 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
return ResumeToken(engine=self.engine, value=found)
def command(self) -> str:
return self.pi_cmd
return "pi"
def build_args(
self,
@@ -426,7 +421,7 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
]
def _new_session_path(self) -> str:
session_dir = self.session_dir or _default_session_dir(Path.cwd())
session_dir = _default_session_dir(Path.cwd())
session_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now(timezone.utc).isoformat()
safe_timestamp = timestamp.replace(":", "-").replace(".", "-")
@@ -457,10 +452,6 @@ def _default_session_dir(cwd: Path) -> Path:
def build_runner(config: EngineConfig, config_path: Path) -> Runner:
cmd = config.get("cmd") or "pi"
if not isinstance(cmd, str):
raise ConfigError(f"Invalid `pi.cmd` in {config_path}; expected a string.")
extra_args_value = config.get("extra_args")
if extra_args_value is None:
extra_args = []
@@ -481,24 +472,10 @@ def build_runner(config: EngineConfig, config_path: Path) -> Runner:
if provider is not None and not isinstance(provider, str):
raise ConfigError(f"Invalid `pi.provider` in {config_path}; expected a string.")
session_dir_value = config.get("session_dir")
session_dir: Path | None = None
if session_dir_value is not None:
if not isinstance(session_dir_value, str):
raise ConfigError(
f"Invalid `pi.session_dir` in {config_path}; expected a string."
)
session_dir = Path(session_dir_value).expanduser()
title = str(config.get("session_title") or (model if model else "pi"))
return PiRunner(
pi_cmd=cmd,
extra_args=extra_args,
model=model,
provider=provider,
session_title=title,
session_dir=session_dir,
)
-98
View File
@@ -24,12 +24,9 @@ def _load_fixture(name: str) -> list[pi_schema.PiEvent]:
def test_pi_resume_format_and_extract() -> None:
runner = PiRunner(
pi_cmd="pi",
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=None,
)
token = ResumeToken(engine=ENGINE, value="/tmp/pi/session.jsonl")
@@ -95,12 +92,9 @@ def test_translate_error_fixture() -> None:
@pytest.mark.anyio
async def test_run_serializes_same_session() -> None:
runner = PiRunner(
pi_cmd="pi",
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=None,
)
gate = anyio.Event()
in_flight = 0
@@ -134,95 +128,3 @@ async def test_run_serializes_same_session() -> None:
await anyio.sleep(0)
gate.set()
assert max_in_flight == 1
@pytest.mark.anyio
async def test_run_serializes_new_session_after_session_is_known(
tmp_path, monkeypatch
) -> None:
gate_path = tmp_path / "gate"
resume_marker = tmp_path / "resume_started"
pi_path = tmp_path / "pi"
pi_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"import sys\n"
"import time\n"
"\n"
"gate = os.environ['PI_TEST_GATE']\n"
"resume_marker = os.environ['PI_TEST_RESUME_MARKER']\n"
"resume_value = os.environ.get('PI_TEST_RESUME_VALUE')\n"
"\n"
"args = sys.argv[1:]\n"
"session_path = None\n"
"if '--session' in args:\n"
" idx = args.index('--session')\n"
" if idx + 1 < len(args):\n"
" session_path = args[idx + 1]\n"
"\n"
"print(json.dumps({'type': 'agent_start'}), flush=True)\n"
"\n"
"if resume_value and session_path == resume_value:\n"
" with open(resume_marker, 'w', encoding='utf-8') as f:\n"
" f.write('started')\n"
" f.flush()\n"
" print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n"
" sys.exit(0)\n"
"\n"
"while not os.path.exists(gate):\n"
" time.sleep(0.001)\n"
"print(json.dumps({'type': 'agent_end', 'messages': []}), flush=True)\n"
"sys.exit(0)\n",
encoding="utf-8",
)
pi_path.chmod(0o755)
monkeypatch.setenv("PI_TEST_GATE", str(gate_path))
monkeypatch.setenv("PI_TEST_RESUME_MARKER", str(resume_marker))
runner = PiRunner(
pi_cmd=str(pi_path),
extra_args=[],
model=None,
provider=None,
session_title="pi",
session_dir=tmp_path / "sessions",
)
session_started = anyio.Event()
resume_value: str | None = None
new_done = anyio.Event()
async def run_new() -> None:
nonlocal resume_value
async for event in runner.run("hello", None):
if isinstance(event, StartedEvent):
resume_value = event.resume.value
session_started.set()
new_done.set()
async def run_resume() -> None:
assert resume_value is not None
monkeypatch.setenv("PI_TEST_RESUME_VALUE", resume_value)
async for _event in runner.run(
"resume", ResumeToken(engine=ENGINE, value=resume_value)
):
pass
async with anyio.create_task_group() as tg:
tg.start_soon(run_new)
await session_started.wait()
tg.start_soon(run_resume)
await anyio.sleep(0.01)
assert not resume_marker.exists()
gate_path.write_text("go", encoding="utf-8")
await new_done.wait()
with anyio.fail_after(2):
while not resume_marker.exists():
await anyio.sleep(0.001)