diff --git a/docs/reference/runners/pi/runner.md b/docs/reference/runners/pi/runner.md index 4d340bb..8b186bd 100644 --- a/docs/reference/runners/pi/runner.md +++ b/docs/reference/runners/pi/runner.md @@ -10,7 +10,7 @@ Provide the **`pi`** engine backend so Takopi can: * Run Pi non-interactively via the **pi CLI** (`pi --print`). * Stream progress by parsing **`--mode json`** (newline-delimited JSON). Each line is a JSON object. -* Support resumable sessions via **`--session `** (Takopi emits a canonical resume line the user can reply with). +* Support resumable sessions via **`--session `** (Takopi emits a canonical resume line the user can reply with). ### Non-goals (v1) @@ -36,10 +36,10 @@ Takopi appends a **single backticked** resume line at the end of the message, li Notes: -* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session ` instead. -* The resume token is the **session id** (short prefix), derived from the first JSON - object in the session file. If the id cannot be read, Takopi falls back to the - session file path. +* `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session ` instead. +* The resume token is the **session id** (short prefix), derived from the session + header line (`{"type":"session", ...}`) emitted to stdout in `--mode json`. + This requires **pi-coding-agent >= 0.45.1**. * If the path contains spaces, the runner will quote it. ### Non-interactive runs @@ -91,7 +91,7 @@ The runner should launch Pi in headless JSON mode: pi --print --mode json --session ``` -When resuming, `` is the resume token extracted from the chat. +When resuming, `` is replaced by the resume token extracted from the chat. #### Event translation @@ -116,6 +116,8 @@ Install the CLI globally: npm install -g @mariozechner/pi-coding-agent ``` +Minimum supported pi version: **0.45.1**. + Auth is stored under `~/.pi/agent/auth.json`. Run `pi` once interactively to set up credentials before using Takopi. diff --git a/docs/reference/runners/pi/stream-json-cheatsheet.md b/docs/reference/runners/pi/stream-json-cheatsheet.md index 582655c..18e069b 100644 --- a/docs/reference/runners/pi/stream-json-cheatsheet.md +++ b/docs/reference/runners/pi/stream-json-cheatsheet.md @@ -6,6 +6,12 @@ required `type` field. These are `AgentSessionEvent` objects from ## Top-level event lines +### `session` (header, pi >= 0.45.1) + +```json +{"type":"session","id":"ccd569e0-4e1b-4c7d-a981-637ed4107310","version":3,"timestamp":"2026-01-13T00:33:34.702Z","cwd":"/repo"} +``` + ### `agent_start` ```json diff --git a/docs/reference/runners/pi/takopi-events.md b/docs/reference/runners/pi/takopi-events.md index eeefb6b..d8325f8 100644 --- a/docs/reference/runners/pi/takopi-events.md +++ b/docs/reference/runners/pi/takopi-events.md @@ -33,12 +33,13 @@ Notes: `pi --session ` ``` -The token is the **short session id**, derived from the first JSON object in the -session file. If the id cannot be read, Takopi falls back to the session file path. +The token is the **short session id**, derived from the session header line +(`{"type":"session", ...}`) emitted on stdout when running in `--mode json`. +This requires **pi-coding-agent >= 0.45.1**. Why not `--resume`? - `--resume/-r` opens an interactive session picker; it does not accept a - session token. Takopi must use `--session ` instead. + session token. Takopi must use `--session ` instead. --- @@ -47,8 +48,9 @@ Why not `--resume`? Takopi requires **serialization per session token**: - For new runs (`resume=None`), do **not** acquire a lock until a `started` - event is emitted (Takopi emits this as soon as the first JSON event arrives). -- Once the session is known, acquire a lock for `pi:` and hold it + event is emitted (Takopi emits this as soon as the session header or first + JSON event arrives). +- Once the session is known, acquire a lock for `pi:` and hold it until the run completes. - For resumed runs, acquire the lock immediately on entry. @@ -103,7 +105,7 @@ Mapping: - `ok = true` unless the last assistant message has `stopReason` `error` or `aborted`. - `answer = last assistant text` (from `message_end` or `agent_end.messages`). - `error = errorMessage` if present. - - `resume = ResumeToken(engine="pi", value=session_path)`. + - `resume = ResumeToken(engine="pi", value=session_token)`. - `usage = last assistant usage`. ### 4.5 Other events diff --git a/src/takopi/runners/pi.py b/src/takopi/runners/pi.py index 3bb35c2..4790b27 100644 --- a/src/takopi/runners/pi.py +++ b/src/takopi/runners/pi.py @@ -1,6 +1,5 @@ from __future__ import annotations -import json import os import re from collections.abc import AsyncIterator @@ -44,7 +43,6 @@ _SESSION_ID_PREFIX_LEN = 8 @dataclass(slots=True) class PiStreamState: resume: ResumeToken - session_path: str | None = None allow_id_promotion: bool = False pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None @@ -74,47 +72,17 @@ def _short_session_id(session_id: str) -> str: return session_id -def _session_id_from_line(line: str) -> str | None: - try: - data = json.loads(line) - except json.JSONDecodeError: - return None - if not isinstance(data, dict): - return None - event_type = data.get("type") - if event_type is not None and event_type != "session": - return None - session_id = data.get("id") - if isinstance(session_id, str) and session_id: - return _short_session_id(session_id) - return None - - -def _session_id_from_path(path: Path) -> str | None: - path = path.expanduser() - try: - with path.open("r", encoding="utf-8") as handle: - for raw_line in handle: - line = raw_line.strip() - if not line: - continue - return _session_id_from_line(line) - except OSError: - return None - return None - - -def _maybe_promote_session_id(state: PiStreamState) -> None: +def _maybe_promote_session_id(state: PiStreamState, session_id: str | None) -> None: + if not session_id: + return + if state.started: + return if not state.allow_id_promotion: return - session_path = state.session_path - if not session_path: + if not _looks_like_session_path(state.resume.value): return - if state.resume.value != session_path: - return - session_id = _session_id_from_path(Path(session_path)) - if session_id: - state.resume = ResumeToken(engine=ENGINE, value=session_id) + state.resume = ResumeToken(engine=ENGINE, value=_short_session_id(session_id)) + state.allow_id_promotion = False def _action_event( @@ -186,7 +154,20 @@ def translate_pi_event( state: PiStreamState, ) -> list[TakopiEvent]: out: list[TakopiEvent] = [] - _maybe_promote_session_id(state) + if isinstance(event, pi_schema.SessionHeader): + _maybe_promote_session_id(state, event.id) + if not state.started: + out.append( + StartedEvent( + engine=ENGINE, + resume=state.resume, + title=title, + meta=meta or None, + ) + ) + state.started = True + return out + if not state.started: out.append( StartedEvent( @@ -313,7 +294,7 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): def run( self, prompt: str, resume: ResumeToken | None ) -> AsyncIterator[TakopiEvent]: - return super().run(prompt, self._normalize_resume_token(resume)) + return super().run(prompt, resume) def extract_resume(self, text: str | None) -> ResumeToken | None: if not text: @@ -329,24 +310,8 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): found = token if not found: return None - if _looks_like_session_path(found): - session_id = _session_id_from_path(Path(found)) - if session_id: - found = session_id return ResumeToken(engine=self.engine, value=found) - def _normalize_resume_token(self, resume: ResumeToken | None) -> ResumeToken | None: - if resume is None: - return None - if resume.engine != ENGINE: - return resume - if not _looks_like_session_path(resume.value): - return resume - session_id = _session_id_from_path(Path(resume.value)) - if session_id: - return ResumeToken(engine=ENGINE, value=session_id) - return resume - def command(self) -> str: return "pi" @@ -387,11 +352,9 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): token = ResumeToken(engine=ENGINE, value=session_path) return PiStreamState( resume=token, - session_path=session_path, allow_id_promotion=True, ) - session_path = resume.value if _looks_like_session_path(resume.value) else None - return PiStreamState(resume=resume, session_path=session_path) + return PiStreamState(resume=resume) def translate( self, diff --git a/src/takopi/schemas/pi.py b/src/takopi/schemas/pi.py index 4018a33..538a87b 100644 --- a/src/takopi/schemas/pi.py +++ b/src/takopi/schemas/pi.py @@ -11,6 +11,14 @@ class _Event(msgspec.Struct, tag_field="type", forbid_unknown_fields=False): pass +class SessionHeader(_Event, tag="session"): + id: str | None = None + version: int | None = None + timestamp: str | None = None + cwd: str | None = None + parentSession: str | None = None + + class AgentStart(_Event, tag="agent_start"): pass @@ -85,7 +93,8 @@ class AutoRetryEnd(_Event, tag="auto_retry_end"): type PiEvent = ( - AgentStart + SessionHeader + | AgentStart | AgentEnd | MessageStart | MessageUpdate diff --git a/tests/test_pi_runner.py b/tests/test_pi_runner.py index b3b0244..edd75f4 100644 --- a/tests/test_pi_runner.py +++ b/tests/test_pi_runner.py @@ -99,41 +99,28 @@ def test_translate_error_fixture() -> None: assert completed.answer == "Request failed." -def test_session_id_promotion_from_file(tmp_path: Path) -> None: - session_path = tmp_path / "session.jsonl" - session_path.write_text( - '{"type":"session","version":3,' - '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' - '"timestamp":"2026-01-13T00:33:34.702Z",' - '"cwd":"/tmp"}\n', - encoding="utf-8", +def test_session_id_promotion_from_stdout() -> None: + state = PiStreamState( + resume=ResumeToken(engine=ENGINE, value="session.jsonl"), + allow_id_promotion=True, ) - runner = PiRunner( - extra_args=[], - model=None, - provider=None, - ) - with patch( - "takopi.runners.pi.PiRunner._new_session_path", - return_value=str(session_path), - ): - state = runner.new_state("prompt", None) events = translate_pi_event( - pi_schema.AgentStart(), title="pi", meta=None, state=state + pi_schema.SessionHeader( + id="ccd569e0-4e1b-4c7d-a981-637ed4107310", + version=3, + timestamp="2026-01-13T00:33:34.702Z", + cwd="/tmp", + ), + title="pi", + meta=None, + state=state, ) started = next(evt for evt in events if isinstance(evt, StartedEvent)) assert started.resume.value == "ccd569e0" -def test_extract_resume_prefers_session_id(tmp_path: Path) -> None: +def test_extract_resume_keeps_session_path(tmp_path: Path) -> None: session_path = tmp_path / "session.jsonl" - session_path.write_text( - '{"type":"session","version":3,' - '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' - '"timestamp":"2026-01-13T00:33:34.702Z",' - '"cwd":"/tmp"}\n', - encoding="utf-8", - ) runner = PiRunner( extra_args=[], model=None, @@ -141,19 +128,12 @@ def test_extract_resume_prefers_session_id(tmp_path: Path) -> None: ) token = runner.extract_resume(f"pi --session {session_path}") assert token is not None - assert token.value == "ccd569e0" + assert token.value == str(session_path) @pytest.mark.anyio -async def test_run_normalizes_resume_path(tmp_path: Path) -> None: +async def test_run_keeps_resume_path(tmp_path: Path) -> None: session_path = tmp_path / "session.jsonl" - session_path.write_text( - '{"type":"session","version":3,' - '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' - '"timestamp":"2026-01-13T00:33:34.702Z",' - '"cwd":"/tmp"}\n', - encoding="utf-8", - ) runner = PiRunner( extra_args=[], model=None, @@ -176,7 +156,7 @@ async def test_run_normalizes_resume_path(tmp_path: Path) -> None: async for _event in runner.run("test", resume): pass assert seen_resume is not None - assert seen_resume.value == "ccd569e0" + assert seen_resume.value == str(session_path) @pytest.mark.anyio