From d4aad8e0680b2453f6e5441caf31a5ca28d32b3e Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Tue, 13 Jan 2026 05:42:17 +0400 Subject: [PATCH] feat(pi): add session resume shorthand (#113) --- docs/developing.md | 4 +- docs/runner/pi/pi-runner.md | 6 +- docs/runner/pi/pi-takopi-events.md | 5 +- docs/specification.md | 2 +- src/takopi/runners/pi.py | 101 ++++++++++++++++++++++++++++- tests/test_pi_runner.py | 98 +++++++++++++++++++++++++--- 6 files changed, 198 insertions(+), 18 deletions(-) diff --git a/docs/developing.md b/docs/developing.md index 984bcf8..acbfc06 100644 --- a/docs/developing.md +++ b/docs/developing.md @@ -45,7 +45,7 @@ The core handler module containing: **Key patterns:** - Progress edits are best-effort and only run when new events arrive (Telegram outbox handles rate limiting/coalescing) -- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``, `` `claude --resume ` ``, `` `pi --session ` ``) +- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``, `` `claude --resume ` ``, `` `pi --session ` ``) - Resume lines are stripped from the prompt before invoking the runner - Errors/cancellation render final status while preserving resume tokens when known @@ -356,7 +356,7 @@ transport.send()/edit() final message, delete progress if needed ### Resume Flow Same as above; auto-router polls all runners to extract resume tokens: -- Router returns first matching token (e.g. `` `claude --resume ` `` routes to Claude, `` `pi --session ` `` routes to Pi) +- Router returns first matching token (e.g. `` `claude --resume ` `` routes to Claude, `` `pi --session ` `` routes to Pi) - Selected runner spawns with resume (e.g. `codex exec --json resume -`, `pi --print --mode json --session `) - Per-token lock serializes concurrent resumes on the same thread diff --git a/docs/runner/pi/pi-runner.md b/docs/runner/pi/pi-runner.md index 6bde1e7..4d340bb 100644 --- a/docs/runner/pi/pi-runner.md +++ b/docs/runner/pi/pi-runner.md @@ -31,13 +31,15 @@ Provide the **`pi`** engine backend so Takopi can: Takopi appends a **single backticked** resume line at the end of the message, like: ```text -`pi --session /home/user/.pi/agent/sessions/--repo--/2026-01-02T12-34-56-789Z_abcd.jsonl` +`pi --session ccd569e0` ``` Notes: * `pi --resume/-r` opens an interactive session picker, so Takopi uses `--session ` instead. -* The resume token is the **session file path** (JSONL), treated as an opaque string. +* The resume token is the **session id** (short prefix), derived from the first JSON + object in the session file. If the id cannot be read, Takopi falls back to the + session file path. * If the path contains spaces, the runner will quote it. ### Non-interactive runs diff --git a/docs/runner/pi/pi-takopi-events.md b/docs/runner/pi/pi-takopi-events.md index 1734665..eeefb6b 100644 --- a/docs/runner/pi/pi-takopi-events.md +++ b/docs/runner/pi/pi-takopi-events.md @@ -30,10 +30,11 @@ Notes: - Canonical resume line (embedded in chat): ``` -`pi --session ` +`pi --session ` ``` -The token is the **session JSONL file path**. +The token is the **short session id**, derived from the first JSON object in the +session file. If the id cannot be read, Takopi falls back to the session file path. Why not `--resume`? - `--resume/-r` opens an interactive session picker; it does not accept a diff --git a/docs/specification.md b/docs/specification.md index 646e5e5..c07b581 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -41,7 +41,7 @@ The canonical ResumeLine embedded in chat MUST be the engine’s CLI resume comm - `codex resume ` - `claude --resume ` -- `pi --session ` +- `pi --session ` ResumeLine MUST resume the interactive session when the engine offers both interactive and headless modes. It MUST NOT point to a headless/batch command that requires a new prompt (e.g., a `run` subcommand that errors without a message). diff --git a/src/takopi/runners/pi.py b/src/takopi/runners/pi.py index a5ae786..3bb35c2 100644 --- a/src/takopi/runners/pi.py +++ b/src/takopi/runners/pi.py @@ -1,7 +1,9 @@ from __future__ import annotations +import json import os import re +from collections.abc import AsyncIterator from dataclasses import dataclass, field from datetime import datetime, UTC from pathlib import Path, PurePath @@ -36,10 +38,14 @@ ENGINE: EngineId = "pi" _RESUME_RE = re.compile(r"(?im)^\s*`?pi\s+--session\s+(?P.+?)`?\s*$") +_SESSION_ID_PREFIX_LEN = 8 + @dataclass(slots=True) class PiStreamState: resume: ResumeToken + session_path: str | None = None + allow_id_promotion: bool = False pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None last_assistant_error: str | None = None @@ -48,6 +54,69 @@ class PiStreamState: note_seq: int = 0 +def _looks_like_session_path(token: str) -> bool: + if not token: + return False + if token.endswith(".jsonl"): + return True + if "/" in token or "\\" in token: + return True + return token.startswith("~") + + +def _short_session_id(session_id: str) -> str: + if not session_id: + return session_id + if "-" in session_id: + return session_id.split("-", 1)[0] + if len(session_id) > _SESSION_ID_PREFIX_LEN: + return session_id[:_SESSION_ID_PREFIX_LEN] + return session_id + + +def _session_id_from_line(line: str) -> str | None: + try: + data = json.loads(line) + except json.JSONDecodeError: + return None + if not isinstance(data, dict): + return None + event_type = data.get("type") + if event_type is not None and event_type != "session": + return None + session_id = data.get("id") + if isinstance(session_id, str) and session_id: + return _short_session_id(session_id) + return None + + +def _session_id_from_path(path: Path) -> str | None: + path = path.expanduser() + try: + with path.open("r", encoding="utf-8") as handle: + for raw_line in handle: + line = raw_line.strip() + if not line: + continue + return _session_id_from_line(line) + except OSError: + return None + return None + + +def _maybe_promote_session_id(state: PiStreamState) -> None: + if not state.allow_id_promotion: + return + session_path = state.session_path + if not session_path: + return + if state.resume.value != session_path: + return + session_id = _session_id_from_path(Path(session_path)) + if session_id: + state.resume = ResumeToken(engine=ENGINE, value=session_id) + + def _action_event( *, phase: ActionPhase, @@ -117,6 +186,7 @@ def translate_pi_event( state: PiStreamState, ) -> list[TakopiEvent]: out: list[TakopiEvent] = [] + _maybe_promote_session_id(state) if not state.started: out.append( StartedEvent( @@ -240,6 +310,11 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): raise RuntimeError(f"resume token is for engine {token.engine!r}") return f"`pi --session {self._quote_token(token.value)}`" + def run( + self, prompt: str, resume: ResumeToken | None + ) -> AsyncIterator[TakopiEvent]: + return super().run(prompt, self._normalize_resume_token(resume)) + def extract_resume(self, text: str | None) -> ResumeToken | None: if not text: return None @@ -254,8 +329,24 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): found = token if not found: return None + if _looks_like_session_path(found): + session_id = _session_id_from_path(Path(found)) + if session_id: + found = session_id return ResumeToken(engine=self.engine, value=found) + def _normalize_resume_token(self, resume: ResumeToken | None) -> ResumeToken | None: + if resume is None: + return None + if resume.engine != ENGINE: + return resume + if not _looks_like_session_path(resume.value): + return resume + session_id = _session_id_from_path(Path(resume.value)) + if session_id: + return ResumeToken(engine=ENGINE, value=session_id) + return resume + def command(self) -> str: return "pi" @@ -294,9 +385,13 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): if resume is None: session_path = self._new_session_path() token = ResumeToken(engine=ENGINE, value=session_path) - else: - token = resume - return PiStreamState(resume=token) + return PiStreamState( + resume=token, + session_path=session_path, + allow_id_promotion=True, + ) + session_path = resume.value if _looks_like_session_path(resume.value) else None + return PiStreamState(resume=resume, session_path=session_path) def translate( self, diff --git a/tests/test_pi_runner.py b/tests/test_pi_runner.py index 25ce5d2..b3b0244 100644 --- a/tests/test_pi_runner.py +++ b/tests/test_pi_runner.py @@ -29,22 +29,24 @@ def _load_fixture(name: str) -> list[pi_schema.PiEvent]: return events -def test_pi_resume_format_and_extract() -> None: +def test_pi_resume_format_and_extract(tmp_path: Path) -> None: runner = PiRunner( extra_args=[], model=None, provider=None, ) - token = ResumeToken(engine=ENGINE, value="/tmp/pi/session.jsonl") + session_path = tmp_path / "session.jsonl" + token = ResumeToken(engine=ENGINE, value=str(session_path)) - assert runner.format_resume(token) == "`pi --session /tmp/pi/session.jsonl`" - assert runner.extract_resume("`pi --session /tmp/pi/session.jsonl`") == token - assert runner.extract_resume('pi --session "/tmp/pi/session.jsonl"') == token + assert runner.format_resume(token) == f"`pi --session {session_path}`" + assert runner.extract_resume(f"`pi --session {session_path}`") == token + assert runner.extract_resume(f'pi --session "{session_path}"') == token assert runner.extract_resume("`codex resume sid`") is None - spaced = ResumeToken(engine=ENGINE, value="/tmp/pi session.jsonl") - assert runner.format_resume(spaced) == '`pi --session "/tmp/pi session.jsonl"`' - assert runner.extract_resume('`pi --session "/tmp/pi session.jsonl"`') == spaced + spaced_path = tmp_path / "pi session.jsonl" + spaced = ResumeToken(engine=ENGINE, value=str(spaced_path)) + assert runner.format_resume(spaced) == f'`pi --session "{spaced_path}"`' + assert runner.extract_resume(f'`pi --session "{spaced_path}"`') == spaced def test_translate_success_fixture() -> None: @@ -97,6 +99,86 @@ def test_translate_error_fixture() -> None: assert completed.answer == "Request failed." +def test_session_id_promotion_from_file(tmp_path: Path) -> None: + session_path = tmp_path / "session.jsonl" + session_path.write_text( + '{"type":"session","version":3,' + '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' + '"timestamp":"2026-01-13T00:33:34.702Z",' + '"cwd":"/tmp"}\n', + encoding="utf-8", + ) + runner = PiRunner( + extra_args=[], + model=None, + provider=None, + ) + with patch( + "takopi.runners.pi.PiRunner._new_session_path", + return_value=str(session_path), + ): + state = runner.new_state("prompt", None) + events = translate_pi_event( + pi_schema.AgentStart(), title="pi", meta=None, state=state + ) + started = next(evt for evt in events if isinstance(evt, StartedEvent)) + assert started.resume.value == "ccd569e0" + + +def test_extract_resume_prefers_session_id(tmp_path: Path) -> None: + session_path = tmp_path / "session.jsonl" + session_path.write_text( + '{"type":"session","version":3,' + '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' + '"timestamp":"2026-01-13T00:33:34.702Z",' + '"cwd":"/tmp"}\n', + encoding="utf-8", + ) + runner = PiRunner( + extra_args=[], + model=None, + provider=None, + ) + token = runner.extract_resume(f"pi --session {session_path}") + assert token is not None + assert token.value == "ccd569e0" + + +@pytest.mark.anyio +async def test_run_normalizes_resume_path(tmp_path: Path) -> None: + session_path = tmp_path / "session.jsonl" + session_path.write_text( + '{"type":"session","version":3,' + '"id":"ccd569e0-4e1b-4c7d-a981-637ed4107310",' + '"timestamp":"2026-01-13T00:33:34.702Z",' + '"cwd":"/tmp"}\n', + encoding="utf-8", + ) + runner = PiRunner( + extra_args=[], + model=None, + provider=None, + ) + seen_resume: ResumeToken | None = None + + async def run_stub(_prompt: str, resume: ResumeToken | None): + nonlocal seen_resume + seen_resume = resume + yield CompletedEvent( + engine=ENGINE, + resume=resume, + ok=True, + answer="ok", + ) + + runner.run_impl = run_stub # type: ignore[assignment] + resume = ResumeToken(engine=ENGINE, value=str(session_path)) + async for _event in runner.run("test", resume): + pass + assert seen_resume is not None + assert seen_resume.value == "ccd569e0" + + @pytest.mark.anyio async def test_run_serializes_same_session() -> None: runner = PiRunner(