feat: msgspec schemas for jsonl decoding (#37)

This commit is contained in:
banteg
2026-01-04 03:41:07 +04:00
committed by GitHub
parent 30fe5cef43
commit a0c16c325e
33 changed files with 2235 additions and 1148 deletions
+82 -13
View File
@@ -6,7 +6,8 @@ A *runner* is the adapter between an engine-specific CLI (Codex, Claude Code,
**normalized event model** (`StartedEvent`, `ActionEvent`, `CompletedEvent`).
Takopi is designed so that adding a runner usually means **adding one new module** under
`src/takopi/runners/`—no changes to the bridge, renderer, or CLI.
`src/takopi/runners/` plus a small **msgspec schema** module under `src/takopi/schemas/`
no changes to the bridge, renderer, or CLI.
The walkthrough below uses an **imaginary engine** named **Pi** (`pi`) and intentionally mirrors
the patterns used in `runners/claude.py`.
@@ -97,16 +98,20 @@ Why this shape?
---
### Step 2 — Create `src/takopi/runners/pi.py`
### Step 2 — Create `src/takopi/schemas/pi.py` + `src/takopi/runners/pi.py`
Create a new module next to the existing runners:
Create a new schema module and a runner module:
```
src/takopi/schemas/
codex.py
pi.py # ← new
src/takopi/runners/
codex.py
claude.py
mock.py
pi.py # ← new
pi.py # ← new
```
Takopi discovers engines by importing modules in `takopi.runners` and looking for a
@@ -121,9 +126,9 @@ Most CLIs we integrate are JSONL-streaming processes.
Takopi provides `JsonlSubprocessRunner`, which:
- spawns the CLI
- drains stderr into a bounded tail
- reads stdout line-by-line as JSONL
- calls your `translate(...)` method to convert each JSON object into Takopi events
- drains stderr and logs it
- reads stdout line-by-line as JSONL bytes
- calls your `decode_jsonl(...)` and then `translate(...)` to convert each event into Takopi events
- guarantees “exactly one CompletedEvent” behavior
- provides safe fallbacks for rc != 0 or stream ending without a completion event
@@ -147,6 +152,55 @@ class PiStreamState:
note_seq: int = 0
```
#### Define a msgspec schema (recommended path)
Codex now decodes JSONL with **msgspec**, and new runners should follow that pattern.
Create a small schema module under `src/takopi/schemas/` and expose a `decode_event(...)`
function. Only include the event shapes your CLI actually emits.
Minimal example:
```py
from __future__ import annotations
from typing import Any, Literal, TypeAlias
import msgspec
class SessionStart(msgspec.Struct, tag="session.start", kw_only=True):
session_id: str
model: str | None = None
class ToolUse(msgspec.Struct, tag="tool.use", kw_only=True):
id: str
name: str
input: dict[str, Any] | None = None
class ToolResult(msgspec.Struct, tag="tool.result", kw_only=True):
tool_use_id: str
content: Any
is_error: bool | None = None
class Final(msgspec.Struct, tag="final", kw_only=True):
session_id: str
ok: bool
answer: str | None = None
error: str | None = None
PiEvent: TypeAlias = SessionStart | ToolUse | ToolResult | Final
_DECODER = msgspec.json.Decoder(PiEvent)
def decode_event(data: bytes | str) -> PiEvent:
return _DECODER.decode(data)
```
#### Decide what Pi emits
For this guide, assume Pi outputs events like:
@@ -323,7 +377,7 @@ import logging
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Any, cast
from ..backends import EngineBackend, EngineConfig
from ..model import (
@@ -333,13 +387,14 @@ from ..model import (
StartedEvent,
TakopiEvent,
)
import msgspec
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import pi as pi_schema
logger = logging.getLogger(__name__)
ENGINE: EngineId = EngineId("pi")
STDERR_TAIL_LINES = 200
_RESUME_RE = re.compile(
r"(?im)^\s*`?pi\s+--resume\s+(?P<token>[^`\s]+)`?\s*$"
)
@@ -354,7 +409,6 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
model: str | None = None
allowed_tools: list[str] | None = None
session_title: str = "pi"
stderr_tail_lines = STDERR_TAIL_LINES
logger = logger
def format_resume(self, token: ResumeToken) -> str:
@@ -398,6 +452,17 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
_ = prompt, resume
return PiStreamState()
def decode_jsonl(
self,
*,
raw: bytes,
line: bytes,
state: PiStreamState,
) -> dict[str, Any] | None:
_ = raw, state
event = pi_schema.decode_event(line)
return cast(dict[str, Any], msgspec.to_builtins(event))
def translate(
self,
data: dict[str, Any],
@@ -423,7 +488,8 @@ Depending on how robust you want the integration, consider adding:
- `env(...)`: to strip or inject environment variables (Claude strips `ANTHROPIC_API_KEY`
unless configured to use API billing).
- `invalid_json_events(...)`: emit a helpful warning `ActionEvent` on malformed JSONL.
- `process_error_events(...)`: customize rc != 0 behavior (include stderr tail in detail).
- `decode_error_events(...)`: log + drop `msgspec.DecodeError` if the engine emits garbage.
- `process_error_events(...)`: customize rc != 0 behavior.
- `stream_end_events(...)`: handle “process exited cleanly but never emitted a final event”.
Claude uses these to produce better failures instead of silent hangs.
@@ -503,6 +569,10 @@ Then assert:
- the last event is a `CompletedEvent`
- completed.resume matches started.resume
If you use msgspec, also add a tiny schema sanity test (pattern from
`tests/test_codex_schema.py`) that decodes your fixture with
`takopi.schemas.<engine>.decode_event`.
#### 3) Lock/serialization tests (optional, but great)
Claude has async tests proving that:
@@ -554,4 +624,3 @@ Before you call the runner “done”:
- [ ] rc != 0 produces a failure `CompletedEvent` (via `process_error_events`).
- [ ] “no final event” produces a failure `CompletedEvent` (via `stream_end_events`).
- [ ] Tests cover resume parsing + at least one translation fixture.