docs: align runner guide with factory pattern

This commit is contained in:
banteg
2026-01-04 14:55:41 +04:00
parent 105466cc95
commit 465dda20a2
+102 -105
View File
@@ -138,6 +138,7 @@ Copy the Claude pattern: create a small dataclass to hold streaming state.
Common things to track:
- `factory`: `EventFactory` instance for creating Takopi events and tracking resume
- `pending_actions`: map tool_use_id → `Action` so tool results can complete them
- `last_assistant_text`: fallback for final answer if the engine omits it
- `note_seq`: counter used by `JsonlSubprocessRunner.note_event(...)`
@@ -145,8 +146,11 @@ Common things to track:
```py
from dataclasses import dataclass, field
from ..events import EventFactory
@dataclass
class PiStreamState:
factory: EventFactory = field(default_factory=lambda: EventFactory(ENGINE))
pending_actions: dict[str, Action] = field(default_factory=dict)
last_assistant_text: str | None = None
note_seq: int = 0
@@ -228,125 +232,117 @@ Use this mapping (mirrors Claudes approach):
Claude keeps translation logic in a standalone function (`translate_claude_event(...)`).
This makes it easy to unit test without spawning a subprocess.
Do the same for Pi:
Do the same for Pi. Use pattern matching against msgspec shapes, and rely on the
`EventFactory` (as in Codex/Claude) to standardize event creation:
```py
def translate_pi_event(
event: dict[str, Any],
event: pi_schema.PiEvent,
*,
title: str,
state: PiStreamState,
factory: EventFactory,
) -> list[TakopiEvent]:
etype = event.get("type")
match event:
case pi_schema.SessionStart(session_id=session_id, model=model):
if not session_id:
return []
event_title = str(model) if model else title
token = ResumeToken(engine=ENGINE, value=session_id)
return [factory.started(token, title=event_title)]
if etype == "session.start":
session_id = event.get("session_id")
if not session_id:
return []
model = event.get("model")
event_title = str(model) if model else title
return [
StartedEvent(
engine=ENGINE,
resume=ResumeToken(engine=ENGINE, value=str(session_id)),
title=event_title,
)
]
case pi_schema.ToolUse(id=tool_id, name=name, input=tool_input):
if not tool_id:
return []
tool_input = tool_input or {}
name = str(name or "tool")
if etype == "tool.use":
tool_id = event.get("id")
if not isinstance(tool_id, str) or not tool_id:
return []
name = str(event.get("name") or "tool")
tool_input = event.get("input")
if not isinstance(tool_input, dict):
tool_input = {}
# Keep titles short and friendly.
# (Claude uses takopi.utils.paths.relativize_command / relativize_path)
kind: ActionKind = "tool"
title = name
if name in {"Bash", "Shell"}:
kind = "command"
title = relativize_command(str(tool_input.get("command") or name))
# Keep titles short and friendly.
# (Claude uses takopi.utils.paths.relativize_command / relativize_path)
kind: ActionKind = "tool"
title = name
if name in {"Bash", "Shell"}:
kind = "command"
title = relativize_command(str(tool_input.get("command") or name))
action = Action(
id=tool_id,
kind=kind,
title=title,
detail={"name": name, "input": tool_input},
)
state.pending_actions[action.id] = action
return [ActionEvent(engine=ENGINE, action=action, phase="started")]
if etype == "tool.result":
tool_use_id = event.get("tool_use_id")
if not isinstance(tool_use_id, str) or not tool_use_id:
return []
action = state.pending_actions.pop(tool_use_id, None)
if action is None:
action = Action(
id=tool_use_id,
kind="tool",
title="tool result",
detail={},
id=tool_id,
kind=kind,
title=title,
detail={"name": name, "input": tool_input},
)
is_error = event.get("is_error") is True
content = event.get("content")
result_text = "" if content is None else (content if isinstance(content, str) else str(content))
detail = dict(action.detail)
detail.update({"result_preview": result_text, "is_error": is_error})
return [
ActionEvent(
engine=ENGINE,
action=Action(
id=action.id,
state.pending_actions[action.id] = action
return [
factory.action_started(
action_id=action.id,
kind=action.kind,
title=action.title,
detail=action.detail,
)
]
case pi_schema.ToolResult(
tool_use_id=tool_use_id, content=content, is_error=is_error
):
if not tool_use_id:
return []
action = state.pending_actions.pop(tool_use_id, None)
if action is None:
action = Action(
id=tool_use_id,
kind="tool",
title="tool result",
detail={},
)
result_text = (
""
if content is None
else (content if isinstance(content, str) else str(content))
)
detail = dict(action.detail)
detail.update(
{"result_preview": result_text, "is_error": bool(is_error)}
)
return [
factory.action_completed(
action_id=action.id,
kind=action.kind,
title=action.title,
ok=not bool(is_error),
detail=detail,
),
phase="completed",
ok=not is_error,
)
]
case pi_schema.Final(session_id=session_id, ok=ok, answer=answer, error=error):
answer = answer or ""
if ok and not answer and state.last_assistant_text:
answer = state.last_assistant_text
resume = (
ResumeToken(engine=ENGINE, value=session_id) if session_id else None
)
]
if etype == "final":
ok = event.get("ok") is True
answer = event.get("answer")
if not isinstance(answer, str):
answer = ""
if ok and not answer and state.last_assistant_text:
answer = state.last_assistant_text
if ok:
return [factory.completed_ok(answer=answer, resume=resume)]
session_id = event.get("session_id")
resume = (
ResumeToken(engine=ENGINE, value=str(session_id)) if session_id else None
)
error_text = str(error) if error else "pi run failed"
return [
factory.completed_error(
error=error_text,
answer=answer,
resume=resume,
)
]
error = None
if not ok:
err = event.get("error")
error = str(err) if err else "pi run failed"
return [
CompletedEvent(
engine=ENGINE,
ok=ok,
answer=answer,
resume=resume,
error=error,
)
]
return []
case _:
return []
```
This is intentionally close to Claudes structure:
- Parse `type`
- Match on the msgspec event type
- Handle “init/session start” first
- Emit action-start and action-complete events
- Emit a final `CompletedEvent`
@@ -377,17 +373,14 @@ import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, cast
from typing import Any
from ..backends import EngineBackend, EngineConfig
from ..model import (
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
import msgspec
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import pi as pi_schema
@@ -458,21 +451,25 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
raw: bytes,
line: bytes,
state: PiStreamState,
) -> dict[str, Any] | None:
) -> pi_schema.PiEvent | None:
_ = raw, state
event = pi_schema.decode_event(line)
return cast(dict[str, Any], msgspec.to_builtins(event))
return pi_schema.decode_event(line)
def translate(
self,
data: dict[str, Any],
data: pi_schema.PiEvent,
*,
state: PiStreamState,
resume: ResumeToken | None,
found_session: ResumeToken | None,
) -> list[TakopiEvent]:
_ = resume, found_session
return translate_pi_event(data, title=self.session_title, state=state)
return translate_pi_event(
data,
title=self.session_title,
state=state,
factory=state.factory,
)
```
Notes: