diff --git a/docs/adding-a-runner.md b/docs/adding-a-runner.md index 5d6ac0d..ac04991 100644 --- a/docs/adding-a-runner.md +++ b/docs/adding-a-runner.md @@ -138,6 +138,7 @@ Copy the Claude pattern: create a small dataclass to hold streaming state. Common things to track: +- `factory`: `EventFactory` instance for creating Takopi events and tracking resume - `pending_actions`: map tool_use_id → `Action` so tool results can complete them - `last_assistant_text`: fallback for final answer if the engine omits it - `note_seq`: counter used by `JsonlSubprocessRunner.note_event(...)` @@ -145,8 +146,11 @@ Common things to track: ```py from dataclasses import dataclass, field +from ..events import EventFactory + @dataclass class PiStreamState: + factory: EventFactory = field(default_factory=lambda: EventFactory(ENGINE)) pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None note_seq: int = 0 @@ -228,125 +232,117 @@ Use this mapping (mirrors Claude’s approach): Claude keeps translation logic in a standalone function (`translate_claude_event(...)`). This makes it easy to unit test without spawning a subprocess. -Do the same for Pi: +Do the same for Pi. Use pattern matching against msgspec shapes, and rely on the +`EventFactory` (as in Codex/Claude) to standardize event creation: ```py def translate_pi_event( - event: dict[str, Any], + event: pi_schema.PiEvent, *, title: str, state: PiStreamState, + factory: EventFactory, ) -> list[TakopiEvent]: - etype = event.get("type") + match event: + case pi_schema.SessionStart(session_id=session_id, model=model): + if not session_id: + return [] + event_title = str(model) if model else title + token = ResumeToken(engine=ENGINE, value=session_id) + return [factory.started(token, title=event_title)] - if etype == "session.start": - session_id = event.get("session_id") - if not session_id: - return [] - model = event.get("model") - event_title = str(model) if model else title - return [ - StartedEvent( - engine=ENGINE, - resume=ResumeToken(engine=ENGINE, value=str(session_id)), - title=event_title, - ) - ] + case pi_schema.ToolUse(id=tool_id, name=name, input=tool_input): + if not tool_id: + return [] + tool_input = tool_input or {} + name = str(name or "tool") - if etype == "tool.use": - tool_id = event.get("id") - if not isinstance(tool_id, str) or not tool_id: - return [] - name = str(event.get("name") or "tool") - tool_input = event.get("input") - if not isinstance(tool_input, dict): - tool_input = {} + # Keep titles short and friendly. + # (Claude uses takopi.utils.paths.relativize_command / relativize_path) + kind: ActionKind = "tool" + title = name + if name in {"Bash", "Shell"}: + kind = "command" + title = relativize_command(str(tool_input.get("command") or name)) - # Keep titles short and friendly. - # (Claude uses takopi.utils.paths.relativize_command / relativize_path) - kind: ActionKind = "tool" - title = name - if name in {"Bash", "Shell"}: - kind = "command" - title = relativize_command(str(tool_input.get("command") or name)) - - action = Action( - id=tool_id, - kind=kind, - title=title, - detail={"name": name, "input": tool_input}, - ) - state.pending_actions[action.id] = action - return [ActionEvent(engine=ENGINE, action=action, phase="started")] - - if etype == "tool.result": - tool_use_id = event.get("tool_use_id") - if not isinstance(tool_use_id, str) or not tool_use_id: - return [] - action = state.pending_actions.pop(tool_use_id, None) - if action is None: action = Action( - id=tool_use_id, - kind="tool", - title="tool result", - detail={}, + id=tool_id, + kind=kind, + title=title, + detail={"name": name, "input": tool_input}, ) - - is_error = event.get("is_error") is True - content = event.get("content") - result_text = "" if content is None else (content if isinstance(content, str) else str(content)) - - detail = dict(action.detail) - detail.update({"result_preview": result_text, "is_error": is_error}) - - return [ - ActionEvent( - engine=ENGINE, - action=Action( - id=action.id, + state.pending_actions[action.id] = action + return [ + factory.action_started( + action_id=action.id, kind=action.kind, title=action.title, + detail=action.detail, + ) + ] + + case pi_schema.ToolResult( + tool_use_id=tool_use_id, content=content, is_error=is_error + ): + if not tool_use_id: + return [] + action = state.pending_actions.pop(tool_use_id, None) + if action is None: + action = Action( + id=tool_use_id, + kind="tool", + title="tool result", + detail={}, + ) + + result_text = ( + "" + if content is None + else (content if isinstance(content, str) else str(content)) + ) + detail = dict(action.detail) + detail.update( + {"result_preview": result_text, "is_error": bool(is_error)} + ) + + return [ + factory.action_completed( + action_id=action.id, + kind=action.kind, + title=action.title, + ok=not bool(is_error), detail=detail, - ), - phase="completed", - ok=not is_error, + ) + ] + + case pi_schema.Final(session_id=session_id, ok=ok, answer=answer, error=error): + answer = answer or "" + if ok and not answer and state.last_assistant_text: + answer = state.last_assistant_text + + resume = ( + ResumeToken(engine=ENGINE, value=session_id) if session_id else None ) - ] - if etype == "final": - ok = event.get("ok") is True - answer = event.get("answer") - if not isinstance(answer, str): - answer = "" - if ok and not answer and state.last_assistant_text: - answer = state.last_assistant_text + if ok: + return [factory.completed_ok(answer=answer, resume=resume)] - session_id = event.get("session_id") - resume = ( - ResumeToken(engine=ENGINE, value=str(session_id)) if session_id else None - ) + error_text = str(error) if error else "pi run failed" + return [ + factory.completed_error( + error=error_text, + answer=answer, + resume=resume, + ) + ] - error = None - if not ok: - err = event.get("error") - error = str(err) if err else "pi run failed" - - return [ - CompletedEvent( - engine=ENGINE, - ok=ok, - answer=answer, - resume=resume, - error=error, - ) - ] - - return [] + case _: + return [] ``` This is intentionally close to Claude’s structure: -- Parse `type` +- Match on the msgspec event type - Handle “init/session start” first - Emit action-start and action-complete events - Emit a final `CompletedEvent` @@ -377,17 +373,14 @@ import logging import re from dataclasses import dataclass from pathlib import Path -from typing import Any, cast +from typing import Any from ..backends import EngineBackend, EngineConfig from ..model import ( - CompletedEvent, EngineId, ResumeToken, - StartedEvent, TakopiEvent, ) -import msgspec from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..schemas import pi as pi_schema @@ -458,21 +451,25 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): raw: bytes, line: bytes, state: PiStreamState, - ) -> dict[str, Any] | None: + ) -> pi_schema.PiEvent | None: _ = raw, state - event = pi_schema.decode_event(line) - return cast(dict[str, Any], msgspec.to_builtins(event)) + return pi_schema.decode_event(line) def translate( self, - data: dict[str, Any], + data: pi_schema.PiEvent, *, state: PiStreamState, resume: ResumeToken | None, found_session: ResumeToken | None, ) -> list[TakopiEvent]: _ = resume, found_session - return translate_pi_event(data, title=self.session_title, state=state) + return translate_pi_event( + data, + title=self.session_title, + state=state, + factory=state.factory, + ) ``` Notes: