fix(pi): use stdout session header (#126)

This commit is contained in:
banteg
2026-01-13 22:51:29 +04:00
committed by GitHub
parent e671da0a0f
commit 43fd594061
6 changed files with 73 additions and 111 deletions
+8 -6
View File
@@ -10,7 +10,7 @@ Provide the **`pi`** engine backend so Takopi can:
* Run Pi non-interactively via the **pi CLI** (`pi --print`).
* Stream progress by parsing **`--mode json`** (newline-delimited JSON). Each line is a JSON object.
* Support resumable sessions via **`--session <path>`** (Takopi emits a canonical resume line the user can reply with).
* Support resumable sessions via **`--session <token>`** (Takopi emits a canonical resume line the user can reply with).
### Non-goals (v1)
@@ -36,10 +36,10 @@ Takopi appends a **single backticked** resume line at the end of the message, li
Notes:
* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session <path>` instead.
* The resume token is the **session id** (short prefix), derived from the first JSON
object in the session file. If the id cannot be read, Takopi falls back to the
session file path.
* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session <token>` instead.
* The resume token is the **session id** (short prefix), derived from the session
header line (`{"type":"session", ...}`) emitted to stdout in `--mode json`.
This requires **pi-coding-agent >= 0.45.1**.
* If the path contains spaces, the runner will quote it.
### Non-interactive runs
@@ -91,7 +91,7 @@ The runner should launch Pi in headless JSON mode:
pi --print --mode json --session <session.jsonl> <prompt>
```
When resuming, `<session.jsonl>` is the resume token extracted from the chat.
When resuming, `<session.jsonl>` is replaced by the resume token extracted from the chat.
#### Event translation
@@ -116,6 +116,8 @@ Install the CLI globally:
npm install -g @mariozechner/pi-coding-agent
```
Minimum supported pi version: **0.45.1**.
Auth is stored under `~/.pi/agent/auth.json`. Run `pi` once interactively to
set up credentials before using Takopi.
@@ -6,6 +6,12 @@ required `type` field. These are `AgentSessionEvent` objects from
## Top-level event lines
### `session` (header, pi >= 0.45.1)
```json
{"type":"session","id":"ccd569e0-4e1b-4c7d-a981-637ed4107310","version":3,"timestamp":"2026-01-13T00:33:34.702Z","cwd":"/repo"}
```
### `agent_start`
```json
+8 -6
View File
@@ -33,12 +33,13 @@ Notes:
`pi --session <id>`
```
The token is the **short session id**, derived from the first JSON object in the
session file. If the id cannot be read, Takopi falls back to the session file path.
The token is the **short session id**, derived from the session header line
(`{"type":"session", ...}`) emitted on stdout when running in `--mode json`.
This requires **pi-coding-agent >= 0.45.1**.
Why not `--resume`?
- `--resume/-r` opens an interactive session picker; it does not accept a
session token. Takopi must use `--session <path>` instead.
session token. Takopi must use `--session <token>` instead.
---
@@ -47,8 +48,9 @@ Why not `--resume`?
Takopi requires **serialization per session token**:
- For new runs (`resume=None`), do **not** acquire a lock until a `started`
event is emitted (Takopi emits this as soon as the first JSON event arrives).
- Once the session is known, acquire a lock for `pi:<session_path>` and hold it
event is emitted (Takopi emits this as soon as the session header or first
JSON event arrives).
- Once the session is known, acquire a lock for `pi:<session_token>` and hold it
until the run completes.
- For resumed runs, acquire the lock immediately on entry.
@@ -103,7 +105,7 @@ Mapping:
- `ok = true` unless the last assistant message has `stopReason` `error` or `aborted`.
- `answer = last assistant text` (from `message_end` or `agent_end.messages`).
- `error = errorMessage` if present.
- `resume = ResumeToken(engine="pi", value=session_path)`.
- `resume = ResumeToken(engine="pi", value=session_token)`.
- `usage = last assistant usage`.
### 4.5 Other events
+24 -61
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import json
import os
import re
from collections.abc import AsyncIterator
@@ -44,7 +43,6 @@ _SESSION_ID_PREFIX_LEN = 8
@dataclass(slots=True)
class PiStreamState:
resume: ResumeToken
session_path: str | None = None
allow_id_promotion: bool = False
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
@@ -74,47 +72,17 @@ def _short_session_id(session_id: str) -> str:
return session_id
def _session_id_from_line(line: str) -> str | None:
try:
data = json.loads(line)
except json.JSONDecodeError:
return None
if not isinstance(data, dict):
return None
event_type = data.get("type")
if event_type is not None and event_type != "session":
return None
session_id = data.get("id")
if isinstance(session_id, str) and session_id:
return _short_session_id(session_id)
return None
def _session_id_from_path(path: Path) -> str | None:
path = path.expanduser()
try:
with path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
line = raw_line.strip()
if not line:
continue
return _session_id_from_line(line)
except OSError:
return None
return None
def _maybe_promote_session_id(state: PiStreamState) -> None:
def _maybe_promote_session_id(state: PiStreamState, session_id: str | None) -> None:
if not session_id:
return
if state.started:
return
if not state.allow_id_promotion:
return
session_path = state.session_path
if not session_path:
if not _looks_like_session_path(state.resume.value):
return
if state.resume.value != session_path:
return
session_id = _session_id_from_path(Path(session_path))
if session_id:
state.resume = ResumeToken(engine=ENGINE, value=session_id)
state.resume = ResumeToken(engine=ENGINE, value=_short_session_id(session_id))
state.allow_id_promotion = False
def _action_event(
@@ -186,7 +154,20 @@ def translate_pi_event(
state: PiStreamState,
) -> list[TakopiEvent]:
out: list[TakopiEvent] = []
_maybe_promote_session_id(state)
if isinstance(event, pi_schema.SessionHeader):
_maybe_promote_session_id(state, event.id)
if not state.started:
out.append(
StartedEvent(
engine=ENGINE,
resume=state.resume,
title=title,
meta=meta or None,
)
)
state.started = True
return out
if not state.started:
out.append(
StartedEvent(
@@ -313,7 +294,7 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
return super().run(prompt, self._normalize_resume_token(resume))
return super().run(prompt, resume)
def extract_resume(self, text: str | None) -> ResumeToken | None:
if not text:
@@ -329,24 +310,8 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
found = token
if not found:
return None
if _looks_like_session_path(found):
session_id = _session_id_from_path(Path(found))
if session_id:
found = session_id
return ResumeToken(engine=self.engine, value=found)
def _normalize_resume_token(self, resume: ResumeToken | None) -> ResumeToken | None:
if resume is None:
return None
if resume.engine != ENGINE:
return resume
if not _looks_like_session_path(resume.value):
return resume
session_id = _session_id_from_path(Path(resume.value))
if session_id:
return ResumeToken(engine=ENGINE, value=session_id)
return resume
def command(self) -> str:
return "pi"
@@ -387,11 +352,9 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
token = ResumeToken(engine=ENGINE, value=session_path)
return PiStreamState(
resume=token,
session_path=session_path,
allow_id_promotion=True,
)
session_path = resume.value if _looks_like_session_path(resume.value) else None
return PiStreamState(resume=resume, session_path=session_path)
return PiStreamState(resume=resume)
def translate(
self,
+10 -1
View File
@@ -11,6 +11,14 @@ class _Event(msgspec.Struct, tag_field="type", forbid_unknown_fields=False):
pass
class SessionHeader(_Event, tag="session"):
id: str | None = None
version: int | None = None
timestamp: str | None = None
cwd: str | None = None
parentSession: str | None = None
class AgentStart(_Event, tag="agent_start"):
pass
@@ -85,7 +93,8 @@ class AutoRetryEnd(_Event, tag="auto_retry_end"):
type PiEvent = (
AgentStart
SessionHeader
| AgentStart
| AgentEnd
| MessageStart
| MessageUpdate
+17 -37
View File
@@ -99,41 +99,28 @@ def test_translate_error_fixture() -> None:
assert completed.answer == "Request failed."
def test_session_id_promotion_from_file(tmp_path: Path) -> None:
session_path = tmp_path / "session.jsonl"
session_path.write_text(
'{"type":"session","version":3,'
'"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",'
'"timestamp":"2026-01-13T00:33:34.702Z",'
'"cwd":"/tmp"}\n',
encoding="utf-8",
def test_session_id_promotion_from_stdout() -> None:
state = PiStreamState(
resume=ResumeToken(engine=ENGINE, value="session.jsonl"),
allow_id_promotion=True,
)
runner = PiRunner(
extra_args=[],
model=None,
provider=None,
)
with patch(
"takopi.runners.pi.PiRunner._new_session_path",
return_value=str(session_path),
):
state = runner.new_state("prompt", None)
events = translate_pi_event(
pi_schema.AgentStart(), title="pi", meta=None, state=state
pi_schema.SessionHeader(
id="ccd569e0-4e1b-4c7d-a981-637ed4107310",
version=3,
timestamp="2026-01-13T00:33:34.702Z",
cwd="/tmp",
),
title="pi",
meta=None,
state=state,
)
started = next(evt for evt in events if isinstance(evt, StartedEvent))
assert started.resume.value == "ccd569e0"
def test_extract_resume_prefers_session_id(tmp_path: Path) -> None:
def test_extract_resume_keeps_session_path(tmp_path: Path) -> None:
session_path = tmp_path / "session.jsonl"
session_path.write_text(
'{"type":"session","version":3,'
'"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",'
'"timestamp":"2026-01-13T00:33:34.702Z",'
'"cwd":"/tmp"}\n',
encoding="utf-8",
)
runner = PiRunner(
extra_args=[],
model=None,
@@ -141,19 +128,12 @@ def test_extract_resume_prefers_session_id(tmp_path: Path) -> None:
)
token = runner.extract_resume(f"pi --session {session_path}")
assert token is not None
assert token.value == "ccd569e0"
assert token.value == str(session_path)
@pytest.mark.anyio
async def test_run_normalizes_resume_path(tmp_path: Path) -> None:
async def test_run_keeps_resume_path(tmp_path: Path) -> None:
session_path = tmp_path / "session.jsonl"
session_path.write_text(
'{"type":"session","version":3,'
'"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",'
'"timestamp":"2026-01-13T00:33:34.702Z",'
'"cwd":"/tmp"}\n',
encoding="utf-8",
)
runner = PiRunner(
extra_args=[],
model=None,
@@ -176,7 +156,7 @@ async def test_run_normalizes_resume_path(tmp_path: Path) -> None:
async for _event in runner.run("test", resume):
pass
assert seen_resume is not None
assert seen_resume.value == "ccd569e0"
assert seen_resume.value == str(session_path)
@pytest.mark.anyio