From 7c30674e5353db4bcff8e1af038d3f2b392fda0b Mon Sep 17 00:00:00 2001 From: "Sebastian N. Fernandez" Date: Fri, 2 Jan 2026 09:57:19 -0300 Subject: [PATCH] feat: opencode runner (#22) Co-authored-by: banteg <4562643+banteg@users.noreply.github.com> --- docs/runner/opencode/opencode-runner.md | 46 ++ .../opencode-stream-json-cheatsheet.md | 145 +++++ .../runner/opencode/opencode-takopi-events.md | 80 +++ docs/specification.md | 2 + src/takopi/runners/opencode.py | 564 ++++++++++++++++++ tests/fixtures/opencode_stream_error.jsonl | 2 + tests/fixtures/opencode_stream_success.jsonl | 6 + .../opencode_stream_success_no_reason.jsonl | 3 + tests/test_opencode_runner.py | 341 +++++++++++ 9 files changed, 1189 insertions(+) create mode 100644 docs/runner/opencode/opencode-runner.md create mode 100644 docs/runner/opencode/opencode-stream-json-cheatsheet.md create mode 100644 docs/runner/opencode/opencode-takopi-events.md create mode 100644 src/takopi/runners/opencode.py create mode 100644 tests/fixtures/opencode_stream_error.jsonl create mode 100644 tests/fixtures/opencode_stream_success.jsonl create mode 100644 tests/fixtures/opencode_stream_success_no_reason.jsonl create mode 100644 tests/test_opencode_runner.py diff --git a/docs/runner/opencode/opencode-runner.md b/docs/runner/opencode/opencode-runner.md new file mode 100644 index 0000000..21cb575 --- /dev/null +++ b/docs/runner/opencode/opencode-runner.md @@ -0,0 +1,46 @@ +# OpenCode Runner + +This runner integrates with the [OpenCode CLI](https://github.com/sst/opencode). + +## Installation + +```bash +npm i -g opencode-ai@latest +``` + +## Configuration + +Add to your `takopi.toml`: + +```toml +[opencode] +model = "claude-sonnet" # optional +``` + +## Usage + +```bash +takopi opencode +``` + +## Resume Format + +Resume line format: `` `opencode --session ses_XXX` `` + +The runner recognizes both `--session` and `-s` flags (with or without `run`). + +Note: The resume line is meant to reopen the interactive TUI session. `opencode run` is headless and requires a message or command, so it is not the canonical resume command. + +## JSON Event Format + +OpenCode outputs JSON events with the following types: + +| Event Type | Description | +|------------|-------------| +| `step_start` | Beginning of a processing step | +| `tool_use` | Tool invocation with input/output | +| `text` | Text output from the model | +| `step_finish` | End of a step (reason: "stop" or "tool-calls" when present) | +| `error` | Error event | + +See [opencode-stream-json-cheatsheet.md](./opencode-stream-json-cheatsheet.md) for detailed event format documentation. diff --git a/docs/runner/opencode/opencode-stream-json-cheatsheet.md b/docs/runner/opencode/opencode-stream-json-cheatsheet.md new file mode 100644 index 0000000..1070dee --- /dev/null +++ b/docs/runner/opencode/opencode-stream-json-cheatsheet.md @@ -0,0 +1,145 @@ +# OpenCode `run --format json` Event Cheatsheet + +`opencode run --format json` writes one JSON object per line (JSONL) to stdout. +Each line has a `type` field indicating the event type. + +## Event Types + +### `step_start` + +Marks the beginning of a processing step. + +Fields: +- `type`: `"step_start"` +- `timestamp`: Unix timestamp in milliseconds +- `sessionID`: Session identifier (format: `ses_XXX`) +- `part.id`: Part identifier +- `part.sessionID`: Session ID (duplicated) +- `part.messageID`: Message ID +- `part.type`: `"step-start"` +- `part.snapshot`: Git snapshot hash + +Example: +```json +{"type":"step_start","timestamp":1767036059338,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e7ec7001qAZUB7eTENxPpI","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"step-start","snapshot":"71db24a798b347669c0ebadb2dfad238f991753d"}} +``` + +### `tool_use` + +Tool invocation event. Emitted when a tool finishes (`status == "completed"`). + +Fields: +- `type`: `"tool_use"` +- `timestamp`: Unix timestamp in milliseconds +- `sessionID`: Session identifier +- `part.id`: Part identifier +- `part.callID`: Unique call ID for this tool invocation +- `part.tool`: Tool name (e.g., "bash", "read", "write", "grep") +- `part.state.status`: `"completed"` (the CLI JSON output does not emit pending/running tool states) +- `part.state.input`: Tool input parameters +- `part.state.output`: Tool output (when completed) +- `part.state.title`: Human-readable description +- `part.state.metadata`: Additional metadata (exit codes, etc.) +- `part.state.time.start`: Start timestamp +- `part.state.time.end`: End timestamp + +Example: +```json +{"type":"tool_use","timestamp":1767036061199,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e85bb001CzBoN2dDlEZJnP","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"tool","callID":"r9bQWsNLvOrJGIOz","tool":"bash","state":{"status":"completed","input":{"command":"echo hello","description":"Print hello to stdout"},"output":"hello\n","title":"Print hello to stdout","metadata":{"output":"hello\n","exit":0,"description":"Print hello to stdout"},"time":{"start":1767036061123,"end":1767036061173}}}} +``` + +### `text` + +Text output from the model. + +Fields: +- `type`: `"text"` +- `timestamp`: Unix timestamp in milliseconds +- `sessionID`: Session identifier +- `part.id`: Part identifier +- `part.type`: `"text"` +- `part.text`: The actual text content +- `part.time.start`: Start timestamp +- `part.time.end`: End timestamp + +Example: +```json +{"type":"text","timestamp":1767036064268,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e8ff2002mxSx9LtvAlf8Ng","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e8627001yM4qKJCXdC7W1L","type":"text","text":"```\nhello\n```","time":{"start":1767036064265,"end":1767036064265}}} +``` + +### `step_finish` + +Marks the end of a processing step. + +Fields: +- `type`: `"step_finish"` +- `timestamp`: Unix timestamp in milliseconds +- `sessionID`: Session identifier +- `part.id`: Part identifier +- `part.type`: `"step-finish"` +- `part.reason`: Optional. `"stop"` (final) or `"tool-calls"` (continuing) when present. +- `part.snapshot`: Git snapshot hash +- `part.cost`: Cost in USD +- `part.tokens.input`: Input token count +- `part.tokens.output`: Output token count +- `part.tokens.reasoning`: Reasoning token count +- `part.tokens.cache.read`: Cache read tokens +- `part.tokens.cache.write`: Cache write tokens + +Example (final step): +```json +{"type":"step_finish","timestamp":1767036064273,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e9209001ojZ4ECN1geZISm","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e8627001yM4qKJCXdC7W1L","type":"step-finish","reason":"stop","snapshot":"09dd05d11a4ac013136c1df10932efc0ad9116e8","cost":0.001,"tokens":{"input":671,"output":8,"reasoning":0,"cache":{"read":21415,"write":0}}}} +``` + +Example (tool-calls step): +```json +{"type":"step_finish","timestamp":1767036061205,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e85fb001L4I3WHMqH6EQNI","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"step-finish","reason":"tool-calls","snapshot":"ee3406d50c7d9048674bbb1a3e325d82513b74ed","cost":0,"tokens":{"input":21772,"output":110,"reasoning":0,"cache":{"read":0,"write":0}}}} +``` + +### `error` + +Session error event. + +Fields: +- `type`: `"error"` +- `timestamp`: Unix timestamp in milliseconds +- `sessionID`: Session identifier +- `error.name`: Error type +- `error.data.message`: Human-readable error (when available) + +Example: +```json +{"type":"error","timestamp":1767036065000,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","error":{"name":"APIError","data":{"message":"Rate limit exceeded","statusCode":429,"isRetryable":true}}} +``` + +## Mapping to Takopi Events + +| OpenCode Event | Takopi Event | Condition | +|----------------|--------------|-----------| +| `step_start` | `StartedEvent` | First occurrence | +| `tool_use` | `ActionEvent(phase="completed")` | `status == "completed"` | +| `text` | (accumulate text) | - | +| `step_finish` | `CompletedEvent` | `reason == "stop"` | +| `step_finish` | (ignored) | `reason == "tool-calls"` | +| `error` | `CompletedEvent(ok=False)` | - | + +If `step_finish` omits `reason`, Takopi treats a clean process exit as successful completion and emits `CompletedEvent(ok=True)` with accumulated usage. + +## Session ID Format + +OpenCode uses session IDs in the format: `ses_XXXXXXXXXXXXXXXXXXXX` + +Example: `ses_494719016ffe85dkDMj0FPRbHK` + +## Tool Types + +Common tool names in OpenCode: +- `bash`: Shell command execution +- `read`: Read file contents +- `write`: Write file contents +- `edit`: Edit file contents +- `glob`: File pattern matching +- `grep`: Content search +- `webfetch`: Fetch web content +- `websearch`: Web search +- `task`: Spawn sub-agent tasks diff --git a/docs/runner/opencode/opencode-takopi-events.md b/docs/runner/opencode/opencode-takopi-events.md new file mode 100644 index 0000000..ace6924 --- /dev/null +++ b/docs/runner/opencode/opencode-takopi-events.md @@ -0,0 +1,80 @@ +# OpenCode to Takopi Event Mapping + +This document describes how OpenCode JSON events are translated to Takopi's normalized event model. + +## Event Translation + +### StartedEvent + +Emitted on the first `step_start` event that contains a `sessionID`. + +``` +OpenCode: {"type":"step_start","sessionID":"ses_XXX",...} +Takopi: StartedEvent(engine="opencode", resume=ResumeToken(engine="opencode", value="ses_XXX")) +``` + +### ActionEvent + +Tool usage is translated to action events. Note: `opencode run --format json` currently only emits `tool_use` events when the tool finishes (`status == "completed"`). Pending/running tool states exist in the schema but are not emitted by the CLI JSON stream. + +**Started phase** (when tool is pending/running, if emitted by the JSON stream): +``` +OpenCode: {"type":"tool_use","part":{"tool":"bash","state":{"status":"pending",...}}} +Takopi: ActionEvent(engine="opencode", action=Action(kind="command"), phase="started") +``` + +**Completed phase** (when tool finishes): +``` +OpenCode: {"type":"tool_use","part":{"tool":"bash","state":{"status":"completed","metadata":{"exit":0}}}} +Takopi: ActionEvent(engine="opencode", action=Action(kind="command"), phase="completed", ok=True) +``` + +### CompletedEvent + +Emitted on `step_finish` with `reason="stop"` or on `error` events. + +**Success**: +``` +OpenCode: {"type":"step_finish","part":{"reason":"stop","tokens":{...},"cost":0.001}} +Takopi: CompletedEvent(engine="opencode", ok=True, answer="", usage={...}) +``` + +If `step_finish` omits `reason`, Takopi treats a clean process exit as successful completion and emits `CompletedEvent(ok=True)` with the accumulated usage. + +**Error**: +``` +OpenCode: {"type":"error","error":{"name":"APIError","data":{"message":"API rate limit exceeded"}}} +Takopi: CompletedEvent(engine="opencode", ok=False, error="API rate limit exceeded") +``` + +## Tool Kind Mapping + +| OpenCode Tool | Takopi ActionKind | +|---------------|-------------------| +| `bash`, `shell` | `command` | +| `edit`, `write`, `multiedit` | `file_change` | +| `read` | `tool` | +| `glob` | `tool` | +| `grep` | `tool` | +| `websearch`, `web_search` | `web_search` | +| `webfetch`, `web_fetch` | `web_search` | +| `todowrite`, `todoread` | `note` | +| `task` | `tool` | +| (other) | `tool` | + +## Usage Accumulation + +Token usage is accumulated across all `step_finish` events and reported in the final `CompletedEvent.usage`: + +```json +{ + "total_cost_usd": 0.001, + "tokens": { + "input": 22443, + "output": 118, + "reasoning": 0, + "cache_read": 21415, + "cache_write": 0 + } +} +``` diff --git a/docs/specification.md b/docs/specification.md index 27533ff..4fa6a6c 100644 --- a/docs/specification.md +++ b/docs/specification.md @@ -43,6 +43,8 @@ The canonical ResumeLine embedded in chat MUST be the engine’s CLI resume comm - `claude --resume ` - `pi --session ` +ResumeLine MUST resume the interactive session when the engine offers both interactive and headless modes. It MUST NOT point to a headless/batch command that requires a new prompt (e.g., a `run` subcommand that errors without a message). + Takopi MUST treat the runner as authoritative for: - formatting a ResumeToken into a ResumeLine diff --git a/src/takopi/runners/opencode.py b/src/takopi/runners/opencode.py new file mode 100644 index 0000000..7fdd2d1 --- /dev/null +++ b/src/takopi/runners/opencode.py @@ -0,0 +1,564 @@ +"""OpenCode CLI runner. + +This runner integrates with the OpenCode CLI (https://github.com/sst/opencode). + +OpenCode outputs JSON events in a streaming format with types: +- step_start: Marks the beginning of a processing step +- tool_use: Tool invocation with input/output +- text: Text output from the model +- step_finish: Marks the end of a step (with reason: "stop" or "tool-calls") + +Session IDs use the format: ses_XXXX (e.g., ses_494719016ffe85dkDMj0FPRbHK) +""" + +from __future__ import annotations + +import logging +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Literal + +from ..backends import EngineBackend, EngineConfig +from ..config import ConfigError +from ..model import ( + Action, + ActionEvent, + ActionKind, + CompletedEvent, + EngineId, + ResumeToken, + StartedEvent, + TakopiEvent, +) +from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner +from ..utils.paths import relativize_command, relativize_path + +logger = logging.getLogger(__name__) + +ENGINE: EngineId = EngineId("opencode") +STDERR_TAIL_LINES = 200 + +_RESUME_RE = re.compile( + r"(?im)^\s*`?opencode(?:\s+run)?\s+(?:--session|-s)\s+(?Pses_[A-Za-z0-9]+)`?\s*$" +) + + +@dataclass(slots=True) +class OpenCodeStreamState: + """State tracked during OpenCode JSONL streaming.""" + + pending_actions: dict[str, Action] = field(default_factory=dict) + last_text: str | None = None + note_seq: int = 0 + session_id: str | None = None + emitted_started: bool = False + saw_step_finish: bool = False + total_cost: float = 0.0 + total_tokens: dict[str, int] = field(default_factory=dict) + + +def _action_event( + *, + phase: Literal["started", "updated", "completed"], + action: Action, + ok: bool | None = None, + message: str | None = None, + level: Literal["debug", "info", "warning", "error"] | None = None, +) -> ActionEvent: + return ActionEvent( + engine=ENGINE, + action=action, + phase=phase, + ok=ok, + message=message, + level=level, + ) + + +def _tool_kind_and_title( + tool_name: str, tool_input: dict[str, Any] +) -> tuple[ActionKind, str]: + """Map OpenCode tool names to Takopi action kinds and titles.""" + name_lower = tool_name.lower() + + if name_lower in {"bash", "shell"}: + command = tool_input.get("command") + display = relativize_command(str(command or tool_name)) + return "command", display + + if name_lower in {"edit", "write", "multiedit"}: + path = tool_input.get("file_path") or tool_input.get("filePath") + if path: + return "file_change", relativize_path(str(path)) + return "file_change", str(tool_name) + + if name_lower == "read": + path = tool_input.get("file_path") or tool_input.get("filePath") + if path: + return "tool", f"read: `{relativize_path(str(path))}`" + return "tool", "read" + + if name_lower == "glob": + pattern = tool_input.get("pattern") + if pattern: + return "tool", f"glob: `{pattern}`" + return "tool", "glob" + + if name_lower == "grep": + pattern = tool_input.get("pattern") + if pattern: + return "tool", f"grep: {pattern}" + return "tool", "grep" + + if name_lower in {"websearch", "web_search"}: + query = tool_input.get("query") + return "web_search", str(query or "search") + + if name_lower in {"webfetch", "web_fetch"}: + url = tool_input.get("url") + return "web_search", str(url or "fetch") + + if name_lower in {"todowrite", "todoread"}: + return "note", "update todos" if "write" in name_lower else "read todos" + + if name_lower == "task": + desc = tool_input.get("description") or tool_input.get("prompt") + return "tool", str(desc or tool_name) + + return "tool", tool_name + + +def _normalize_tool_title( + title: str, + *, + tool_input: dict[str, Any], +) -> str: + if "`" in title: + return title + + path = tool_input.get("file_path") or tool_input.get("filePath") + if isinstance(path, str) and path: + rel_path = relativize_path(path) + if title == path or title == rel_path: + return f"`{rel_path}`" + + return title + + +def _extract_tool_action(event: dict[str, Any]) -> Action | None: + """Extract an Action from an OpenCode tool_use event.""" + part = event.get("part") or {} + state = part.get("state") or {} + + call_id = part.get("callID") + if not isinstance(call_id, str) or not call_id: + call_id = part.get("id") + if not isinstance(call_id, str) or not call_id: + return None + + tool_name = part.get("tool") or "tool" + tool_input = state.get("input") or {} + if not isinstance(tool_input, dict): + tool_input = {} + + kind, title = _tool_kind_and_title(tool_name, tool_input) + + state_title = state.get("title") + if isinstance(state_title, str) and state_title: + title = _normalize_tool_title(state_title, tool_input=tool_input) + + detail: dict[str, Any] = { + "name": tool_name, + "input": tool_input, + "callID": call_id, + } + + if kind == "file_change": + path = tool_input.get("file_path") or tool_input.get("filePath") + if path: + detail["changes"] = [{"path": path, "kind": "update"}] + + return Action(id=call_id, kind=kind, title=title, detail=detail) + + +def _usage_from_tokens(tokens: dict[str, int], cost: float) -> dict[str, Any]: + """Build usage payload from accumulated token counts.""" + usage: dict[str, Any] = {} + if cost > 0: + usage["total_cost_usd"] = cost + if tokens: + usage["tokens"] = tokens + return usage + + +def translate_opencode_event( + event: dict[str, Any], + *, + title: str, + state: OpenCodeStreamState, +) -> list[TakopiEvent]: + """Translate an OpenCode JSON event into Takopi events.""" + etype = event.get("type") + session_id = event.get("sessionID") + + if isinstance(session_id, str) and session_id: + if state.session_id is None: + state.session_id = session_id + + if etype == "step_start": + if not state.emitted_started and state.session_id: + state.emitted_started = True + return [ + StartedEvent( + engine=ENGINE, + resume=ResumeToken(engine=ENGINE, value=state.session_id), + title=title, + ) + ] + return [] + + if etype == "tool_use": + part = event.get("part") or {} + tool_state = part.get("state") or {} + status = tool_state.get("status") + + action = _extract_tool_action(event) + if action is None: + return [] + + if status == "completed": + output = tool_state.get("output") + metadata = tool_state.get("metadata") or {} + exit_code = metadata.get("exit") + + is_error = False + if isinstance(exit_code, int) and exit_code != 0: + is_error = True + + detail = dict(action.detail) + if output is not None: + detail["output_preview"] = ( + str(output)[:500] if len(str(output)) > 500 else str(output) + ) + detail["exit_code"] = exit_code + + state.pending_actions.pop(action.id, None) + + return [ + _action_event( + phase="completed", + action=Action( + id=action.id, + kind=action.kind, + title=action.title, + detail=detail, + ), + ok=not is_error, + ) + ] + if status == "error": + error = tool_state.get("error") + metadata = tool_state.get("metadata") or {} + exit_code = metadata.get("exit") + + detail = dict(action.detail) + if error is not None: + detail["error"] = error + detail["exit_code"] = exit_code + + state.pending_actions.pop(action.id, None) + + return [ + _action_event( + phase="completed", + action=Action( + id=action.id, + kind=action.kind, + title=action.title, + detail=detail, + ), + ok=False, + message=str(error) if error is not None else None, + ) + ] + else: + state.pending_actions[action.id] = action + return [_action_event(phase="started", action=action)] + + if etype == "text": + part = event.get("part") or {} + text = part.get("text") + if isinstance(text, str) and text: + if state.last_text is None: + state.last_text = text + else: + state.last_text += text + return [] + + if etype == "step_finish": + part = event.get("part") or {} + reason = part.get("reason") + state.saw_step_finish = True + + tokens = part.get("tokens") or {} + if isinstance(tokens, dict): + for key in ("input", "output", "reasoning"): + value = tokens.get(key) + if isinstance(value, int): + state.total_tokens[key] = state.total_tokens.get(key, 0) + value + cache = tokens.get("cache") or {} + if isinstance(cache, dict): + for key in ("read", "write"): + value = cache.get(key) + if not isinstance(value, int): + continue + cache_key = f"cache_{key}" + state.total_tokens[cache_key] = ( + state.total_tokens.get(cache_key, 0) + value + ) + + cost = part.get("cost") + if isinstance(cost, (int, float)): + state.total_cost += cost + + if reason == "stop": + resume = None + if state.session_id: + resume = ResumeToken(engine=ENGINE, value=state.session_id) + + usage = _usage_from_tokens(state.total_tokens, state.total_cost) + + return [ + CompletedEvent( + engine=ENGINE, + ok=True, + answer=state.last_text or "", + resume=resume, + usage=usage or None, + ) + ] + return [] + + if etype == "error": + raw_message = event.get("message") + if raw_message is None: + raw_message = event.get("error") + + message = raw_message + if isinstance(message, dict): + data = message.get("data") + if isinstance(data, dict) and data.get("message"): + message = data.get("message") + else: + message = ( + message.get("message") or message.get("name") or "opencode error" + ) + elif message is None: + message = "opencode error" + + resume = None + if state.session_id: + resume = ResumeToken(engine=ENGINE, value=state.session_id) + + return [ + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_text or "", + resume=resume, + error=str(message), + ) + ] + + return [] + + +@dataclass +class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): + """Runner for OpenCode CLI.""" + + engine: EngineId = ENGINE + resume_re: re.Pattern[str] = _RESUME_RE + + opencode_cmd: str = "opencode" + model: str | None = None + session_title: str = "opencode" + stderr_tail_lines: int = STDERR_TAIL_LINES + logger: logging.Logger = logger + + def format_resume(self, token: ResumeToken) -> str: + if token.engine != ENGINE: + raise RuntimeError(f"resume token is for engine {token.engine!r}") + return f"`opencode --session {token.value}`" + + def command(self) -> str: + return self.opencode_cmd + + def build_args( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> list[str]: + _ = state + args = ["run", "--format", "json"] + if resume is not None: + args.extend(["--session", resume.value]) + if self.model is not None: + args.extend(["--model", str(self.model)]) + args.extend(["--", prompt]) + return args + + def stdin_payload( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: Any, + ) -> bytes | None: + _ = prompt, resume, state + return None + + def new_state(self, prompt: str, resume: ResumeToken | None) -> OpenCodeStreamState: + _ = prompt, resume + return OpenCodeStreamState() + + def start_run( + self, + prompt: str, + resume: ResumeToken | None, + *, + state: OpenCodeStreamState, + ) -> None: + _ = state + logger.info( + "[opencode] start run resume=%r", + resume.value if resume else None, + ) + logger.debug("[opencode] prompt: %s", prompt) + + def invalid_json_events( + self, + *, + raw: str, + line: str, + state: OpenCodeStreamState, + ) -> list[TakopiEvent]: + _ = line + message = "invalid JSON from opencode; ignoring line" + return [self.note_event(message, state=state, detail={"line": raw})] + + def translate( + self, + data: dict[str, Any], + *, + state: OpenCodeStreamState, + resume: ResumeToken | None, + found_session: ResumeToken | None, + ) -> list[TakopiEvent]: + _ = resume, found_session + return translate_opencode_event( + data, + title=self.session_title, + state=state, + ) + + def process_error_events( + self, + rc: int, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: OpenCodeStreamState, + ) -> list[TakopiEvent]: + message = f"opencode failed (rc={rc})." + resume_for_completed = found_session or resume + return [ + self.note_event( + message, + state=state, + ok=False, + detail={"stderr_tail": stderr_tail}, + ), + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_text or "", + resume=resume_for_completed, + error=message, + ), + ] + + def stream_end_events( + self, + *, + resume: ResumeToken | None, + found_session: ResumeToken | None, + stderr_tail: str, + state: OpenCodeStreamState, + ) -> list[TakopiEvent]: + _ = stderr_tail + if not found_session: + message = "opencode finished but no session_id was captured" + resume_for_completed = resume + return [ + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_text or "", + resume=resume_for_completed, + error=message, + ) + ] + + if state.saw_step_finish: + usage = _usage_from_tokens(state.total_tokens, state.total_cost) + return [ + CompletedEvent( + engine=ENGINE, + ok=True, + answer=state.last_text or "", + resume=found_session, + usage=usage or None, + ) + ] + + message = "opencode finished without a result event" + return [ + CompletedEvent( + engine=ENGINE, + ok=False, + answer=state.last_text or "", + resume=found_session, + error=message, + ) + ] + + +def build_runner(config: EngineConfig, config_path: Path) -> Runner: + """Build an OpenCodeRunner from configuration.""" + opencode_cmd = "opencode" + + model = config.get("model") + if model is not None and not isinstance(model, str): + raise ConfigError( + f"Invalid `opencode.model` in {config_path}; expected a string." + ) + + title = str(model) if model is not None else "opencode" + + return OpenCodeRunner( + opencode_cmd=opencode_cmd, + model=model, + session_title=title, + ) + + +BACKEND = EngineBackend( + id="opencode", + build_runner=build_runner, + install_cmd="npm i -g opencode-ai@latest", +) diff --git a/tests/fixtures/opencode_stream_error.jsonl b/tests/fixtures/opencode_stream_error.jsonl new file mode 100644 index 0000000..f8ed992 --- /dev/null +++ b/tests/fixtures/opencode_stream_error.jsonl @@ -0,0 +1,2 @@ +{"type":"step_start","timestamp":1767037000000,"sessionID":"ses_error123","part":{"id":"prt_error1","sessionID":"ses_error123","messageID":"msg_error1","type":"step-start"}} +{"type":"error","timestamp":1767037001000,"sessionID":"ses_error123","error":{"name":"APIError","data":{"message":"Rate limit exceeded","statusCode":429,"isRetryable":true}}} diff --git a/tests/fixtures/opencode_stream_success.jsonl b/tests/fixtures/opencode_stream_success.jsonl new file mode 100644 index 0000000..36ccf44 --- /dev/null +++ b/tests/fixtures/opencode_stream_success.jsonl @@ -0,0 +1,6 @@ +{"type":"step_start","timestamp":1767036059338,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e7ec7001qAZUB7eTENxPpI","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"step-start","snapshot":"71db24a798b347669c0ebadb2dfad238f991753d"}} +{"type":"tool_use","timestamp":1767036061199,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e85bb001CzBoN2dDlEZJnP","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"tool","callID":"r9bQWsNLvOrJGIOz","tool":"bash","state":{"status":"completed","input":{"command":"echo hello","description":"Print hello to stdout"},"output":"hello\n","title":"Print hello to stdout","metadata":{"output":"hello\n","exit":0,"description":"Print hello to stdout"},"time":{"start":1767036061123,"end":1767036061173}}}} +{"type":"step_finish","timestamp":1767036061205,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e85fb001L4I3WHMqH6EQNI","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e702b0012XuEC4bGe0XhKa","type":"step-finish","reason":"tool-calls","snapshot":"ee3406d50c7d9048674bbb1a3e325d82513b74ed","cost":0,"tokens":{"input":21772,"output":110,"reasoning":0,"cache":{"read":0,"write":0}}}} +{"type":"step_start","timestamp":1767036063732,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e8ff2001hIElz1HRSMdHJY","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e8627001yM4qKJCXdC7W1L","type":"step-start","snapshot":"9017313c64af88e12921b4c81d57fd4806192416"}} +{"type":"text","timestamp":1767036064268,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e8ff2002mxSx9LtvAlf8Ng","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e8627001yM4qKJCXdC7W1L","type":"text","text":"```\nhello\n```","time":{"start":1767036064265,"end":1767036064265}}} +{"type":"step_finish","timestamp":1767036064273,"sessionID":"ses_494719016ffe85dkDMj0FPRbHK","part":{"id":"prt_b6b8e9209001ojZ4ECN1geZISm","sessionID":"ses_494719016ffe85dkDMj0FPRbHK","messageID":"msg_b6b8e8627001yM4qKJCXdC7W1L","type":"step-finish","reason":"stop","snapshot":"09dd05d11a4ac013136c1df10932efc0ad9116e8","cost":0.001,"tokens":{"input":671,"output":8,"reasoning":0,"cache":{"read":21415,"write":0}}}} diff --git a/tests/fixtures/opencode_stream_success_no_reason.jsonl b/tests/fixtures/opencode_stream_success_no_reason.jsonl new file mode 100644 index 0000000..f385786 --- /dev/null +++ b/tests/fixtures/opencode_stream_success_no_reason.jsonl @@ -0,0 +1,3 @@ +{"type":"step_start","timestamp":1767038000000,"sessionID":"ses_no_reason","part":{"id":"prt_nr_start","sessionID":"ses_no_reason","messageID":"msg_nr_1","type":"step-start"}} +{"type":"text","timestamp":1767038000500,"sessionID":"ses_no_reason","part":{"id":"prt_nr_text","sessionID":"ses_no_reason","messageID":"msg_nr_1","type":"text","text":"All done.","time":{"start":1767038000500,"end":1767038000500}}} +{"type":"step_finish","timestamp":1767038001000,"sessionID":"ses_no_reason","part":{"id":"prt_nr_finish","sessionID":"ses_no_reason","messageID":"msg_nr_1","type":"step-finish","cost":0.002,"tokens":{"input":12,"output":3,"reasoning":0,"cache":{"read":0,"write":0}}}} diff --git a/tests/test_opencode_runner.py b/tests/test_opencode_runner.py new file mode 100644 index 0000000..7cdbdd4 --- /dev/null +++ b/tests/test_opencode_runner.py @@ -0,0 +1,341 @@ +import json +from pathlib import Path + +import anyio +import pytest + +from takopi.model import ActionEvent, CompletedEvent, ResumeToken, StartedEvent +from takopi.runners.opencode import ( + OpenCodeRunner, + OpenCodeStreamState, + ENGINE, + translate_opencode_event, +) + + +def _load_fixture(name: str) -> list[dict]: + path = Path(__file__).parent / "fixtures" / name + return [json.loads(line) for line in path.read_text().splitlines() if line.strip()] + + +def test_opencode_resume_format_and_extract() -> None: + runner = OpenCodeRunner(opencode_cmd="opencode") + token = ResumeToken(engine=ENGINE, value="ses_abc123") + + assert runner.format_resume(token) == "`opencode --session ses_abc123`" + assert runner.extract_resume("`opencode --session ses_abc123`") == token + assert runner.extract_resume("opencode run -s ses_other") == ResumeToken( + engine=ENGINE, value="ses_other" + ) + assert runner.extract_resume("opencode -s ses_other") == ResumeToken( + engine=ENGINE, value="ses_other" + ) + assert runner.extract_resume("`claude --resume sid`") is None + assert runner.extract_resume("`codex resume sid`") is None + + +def test_translate_success_fixture() -> None: + state = OpenCodeStreamState() + events: list = [] + for event in _load_fixture("opencode_stream_success.jsonl"): + events.extend(translate_opencode_event(event, title="opencode", state=state)) + + assert isinstance(events[0], StartedEvent) + started = next(evt for evt in events if isinstance(evt, StartedEvent)) + assert started.resume.value == "ses_494719016ffe85dkDMj0FPRbHK" + assert started.resume.engine == ENGINE + + action_events = [evt for evt in events if isinstance(evt, ActionEvent)] + assert len(action_events) == 1 + + completed_actions = [evt for evt in action_events if evt.phase == "completed"] + assert len(completed_actions) == 1 + assert completed_actions[0].action.kind == "command" + assert completed_actions[0].ok is True + + completed = next(evt for evt in events if isinstance(evt, CompletedEvent)) + assert events[-1] == completed + assert completed.ok is True + assert completed.resume == started.resume + assert completed.answer == "```\nhello\n```" + + assert completed.usage is not None + assert "tokens" in completed.usage + + +def test_translate_missing_reason_success() -> None: + state = OpenCodeStreamState() + events: list = [] + for event in _load_fixture("opencode_stream_success_no_reason.jsonl"): + events.extend(translate_opencode_event(event, title="opencode", state=state)) + + started = next(evt for evt in events if isinstance(evt, StartedEvent)) + runner = OpenCodeRunner(opencode_cmd="opencode") + fallback = runner.stream_end_events( + resume=None, + found_session=started.resume, + stderr_tail="", + state=state, + ) + + completed = next(evt for evt in fallback if isinstance(evt, CompletedEvent)) + assert completed.ok is True + assert completed.resume == started.resume + assert completed.answer == "All done." + assert completed.usage is not None + + +def test_translate_accumulates_text() -> None: + state = OpenCodeStreamState() + + events = translate_opencode_event( + {"type": "step_start", "sessionID": "ses_test123", "part": {}}, + title="opencode", + state=state, + ) + assert len(events) == 1 + assert isinstance(events[0], StartedEvent) + + translate_opencode_event( + { + "type": "text", + "sessionID": "ses_test123", + "part": {"type": "text", "text": "Hello "}, + }, + title="opencode", + state=state, + ) + translate_opencode_event( + { + "type": "text", + "sessionID": "ses_test123", + "part": {"type": "text", "text": "World"}, + }, + title="opencode", + state=state, + ) + + assert state.last_text == "Hello World" + + events = translate_opencode_event( + { + "type": "step_finish", + "sessionID": "ses_test123", + "part": {"reason": "stop", "tokens": {"input": 100, "output": 10}}, + }, + title="opencode", + state=state, + ) + + assert len(events) == 1 + completed = events[0] + assert isinstance(completed, CompletedEvent) + assert completed.answer == "Hello World" + assert completed.ok is True + + +def test_translate_tool_use_completed() -> None: + state = OpenCodeStreamState() + state.session_id = "ses_test123" + state.emitted_started = True + + events = translate_opencode_event( + { + "type": "tool_use", + "sessionID": "ses_test123", + "part": { + "id": "prt_123", + "callID": "call_abc", + "tool": "bash", + "state": { + "status": "completed", + "input": {"command": "ls -la"}, + "output": "file1.txt\nfile2.txt", + "title": "List files", + "metadata": {"exit": 0}, + }, + }, + }, + title="opencode", + state=state, + ) + + assert len(events) == 1 + action_event = events[0] + assert isinstance(action_event, ActionEvent) + assert action_event.phase == "completed" + assert action_event.action.kind == "command" + assert action_event.action.title == "List files" + assert action_event.ok is True + + +def test_translate_tool_use_with_error() -> None: + state = OpenCodeStreamState() + state.session_id = "ses_test123" + state.emitted_started = True + + events = translate_opencode_event( + { + "type": "tool_use", + "sessionID": "ses_test123", + "part": { + "id": "prt_123", + "callID": "call_abc", + "tool": "bash", + "state": { + "status": "completed", + "input": {"command": "exit 1"}, + "output": "error", + "title": "Run failing command", + "metadata": {"exit": 1}, + }, + }, + }, + title="opencode", + state=state, + ) + + assert len(events) == 1 + action_event = events[0] + assert isinstance(action_event, ActionEvent) + assert action_event.phase == "completed" + assert action_event.ok is False + + +def test_translate_tool_use_read_title_wraps_path() -> None: + state = OpenCodeStreamState() + state.session_id = "ses_test123" + state.emitted_started = True + path = Path.cwd() / "src" / "takopi" / "runners" / "opencode.py" + + events = translate_opencode_event( + { + "type": "tool_use", + "sessionID": "ses_test123", + "part": { + "id": "prt_123", + "callID": "call_abc", + "tool": "read", + "state": { + "status": "completed", + "input": {"filePath": str(path)}, + "output": "file contents", + "title": "src/takopi/runners/opencode.py", + }, + }, + }, + title="opencode", + state=state, + ) + + assert len(events) == 1 + action_event = events[0] + assert isinstance(action_event, ActionEvent) + assert action_event.action.kind == "tool" + assert action_event.action.title == "`src/takopi/runners/opencode.py`" + + +def test_translate_error_fixture() -> None: + state = OpenCodeStreamState() + events: list = [] + for event in _load_fixture("opencode_stream_error.jsonl"): + events.extend(translate_opencode_event(event, title="opencode", state=state)) + + started = next(evt for evt in events if isinstance(evt, StartedEvent)) + completed = next(evt for evt in events if isinstance(evt, CompletedEvent)) + + assert completed.ok is False + assert completed.error == "Rate limit exceeded" + assert completed.resume == started.resume + + +def test_step_finish_tool_calls_does_not_complete() -> None: + state = OpenCodeStreamState() + state.session_id = "ses_test123" + state.emitted_started = True + + events = translate_opencode_event( + { + "type": "step_finish", + "sessionID": "ses_test123", + "part": {"reason": "tool-calls", "tokens": {"input": 100, "output": 10}}, + }, + title="opencode", + state=state, + ) + + assert len(events) == 0 + + +def test_build_args_new_session() -> None: + runner = OpenCodeRunner(opencode_cmd="opencode", model="claude-sonnet") + args = runner.build_args("hello world", None, state=OpenCodeStreamState()) + + assert args == [ + "run", + "--format", + "json", + "--model", + "claude-sonnet", + "--", + "hello world", + ] + + +def test_build_args_with_resume() -> None: + runner = OpenCodeRunner(opencode_cmd="opencode") + token = ResumeToken(engine=ENGINE, value="ses_abc123") + args = runner.build_args("continue", token, state=OpenCodeStreamState()) + + assert args == [ + "run", + "--format", + "json", + "--session", + "ses_abc123", + "--", + "continue", + ] + + +def test_stdin_payload_returns_none() -> None: + runner = OpenCodeRunner(opencode_cmd="opencode") + payload = runner.stdin_payload("prompt", None, state=OpenCodeStreamState()) + assert payload is None + + +@pytest.mark.anyio +async def test_run_serializes_same_session() -> None: + runner = OpenCodeRunner(opencode_cmd="opencode") + gate = anyio.Event() + in_flight = 0 + max_in_flight = 0 + + async def run_stub(*_args, **_kwargs): + nonlocal in_flight, max_in_flight + in_flight += 1 + max_in_flight = max(max_in_flight, in_flight) + try: + await gate.wait() + yield CompletedEvent( + engine=ENGINE, + resume=ResumeToken(engine=ENGINE, value="ses_test"), + ok=True, + answer="ok", + ) + finally: + in_flight -= 1 + + runner.run_impl = run_stub # type: ignore[assignment] + + async def drain(prompt: str, resume: ResumeToken | None) -> None: + async for _event in runner.run(prompt, resume): + pass + + token = ResumeToken(engine=ENGINE, value="ses_test") + async with anyio.create_task_group() as tg: + tg.start_soon(drain, "a", token) + tg.start_soon(drain, "b", token) + await anyio.sleep(0) + gate.set() + assert max_in_flight == 1