diff --git a/changelog.md b/changelog.md index 6ae6f04..a8a01f4 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,13 @@ # changelog +## v0.5.2 (2026-01-02) + +### fixes + +- treat codex reconnect notices as non-fatal progress updates instead of errors +- filter unavailable agents from startup display and slash command menu +- avoid crashes when codex tool/file-change events omit error fields + ## v0.5.1 (2026-01-02) ### changes diff --git a/docs/runner/codex/codex-takopi-events.md b/docs/runner/codex/codex-takopi-events.md index c4035d0..92c2a26 100644 --- a/docs/runner/codex/codex-takopi-events.md +++ b/docs/runner/codex/codex-takopi-events.md @@ -171,7 +171,10 @@ Codex: {"type":"error","message":"stream error: broken pipe"} ``` -Cheatsheet meaning: this is a **fatal stream failure** (not just a tool failure). +Cheatsheet meaning: this is a **fatal stream failure** (not just a tool failure). +However, Codex may also emit transient reconnect notices as `type="error"` with +messages like `"Reconnecting... 1/5"` while it retries a dropped stream. Treat +those as non-fatal progress updates (do **not** end the run). → Takopi: @@ -237,7 +240,8 @@ This is usually safe to show as a short “what it’s doing” line (or ignore ### 3) `command_execution` (`item.started` and `item.completed`) -Codex fields include `command`, `exit_code`, `status`, `aggregated_output` (often noisy). +Codex fields include `command`, `status`, `aggregated_output` (often noisy), and +`exit_code` (null or omitted until completion). → Takopi `action`: @@ -246,7 +250,7 @@ Codex fields include `command`, `exit_code`, `status`, `aggregated_output` (ofte * `detail={ command, exit_code, status }` (optionally include output tail) * `phase="started"` on `item.started` * `phase="completed"` on `item.completed` -* `ok = (item.status == "completed")` (or `exit_code == 0`) +* `ok = (item.status == "completed")` (and `exit_code == 0` when present) Note: “failed” command becomes `ok=false` but it’s still just an `action` completion — the overall run might still succeed later, depending on agent behavior. @@ -270,7 +274,8 @@ This is a great progress line for your UI (“updated docs/…, added …”). ### 5) `mcp_tool_call` (`item.started` and `item.completed`) -Codex contains server/tool/arguments/result/error/status. Result can be large; may include base64 in content blocks. +Codex contains server/tool/arguments/status and may include result/error on +completion. Result can be large; may include base64 in content blocks. → Takopi `action`: diff --git a/docs/runner/codex/exec-json-cheatsheet.md b/docs/runner/codex/exec-json-cheatsheet.md index c97922d..12e2fc6 100644 --- a/docs/runner/codex/exec-json-cheatsheet.md +++ b/docs/runner/codex/exec-json-cheatsheet.md @@ -3,8 +3,10 @@ `codex exec --json` writes **one JSON object per line** (JSONL) to stdout. Each line is a top-level **thread event** with a `type` field. -Below: **all fields** for every line type plus a **full-line example** for each -shape that can be emitted. +Below: **required + commonly emitted fields** for every line type plus a +**full-line example** for each shape that can be emitted. Fields noted as +optional may be omitted (or `null`) depending on Codex version and lifecycle. +Unknown fields may appear; ignore what you don't use. ## Top-level event lines (non-item) @@ -64,6 +66,10 @@ Example: {"type":"error","message":"stream error: broken pipe"} ``` +Note: Codex may emit transient reconnect notices as `type="error"` with messages +like `"Reconnecting... 1/5"` while it retries a dropped stream. Treat those as +non-fatal progress updates (the turn continues). + ## Item event lines (`item.*`) Every item line includes: @@ -99,7 +105,7 @@ Example: Fields: - `item.command` - `item.aggregated_output` -- `item.exit_code` (null until completion) +- `item.exit_code` (null or omitted until completion) - `item.status` (`in_progress`, `completed`, `failed`) Example (started): @@ -138,10 +144,10 @@ Fields: - `item.server` - `item.tool` - `item.arguments` (JSON value; defaults to `null` if absent) -- `item.result` (object or `null`) +- `item.result` (object or `null`; may be omitted) - `item.result.content` (array of MCP content blocks) - `item.result.structured_content` (JSON value or `null`) -- `item.error` (object or `null`) +- `item.error` (object or `null`; may be omitted) - `item.error.message` (if `error` is present) - `item.status` (`in_progress`, `completed`, `failed`) @@ -317,7 +323,8 @@ machine-only metadata. - `file_change.status`: `completed` = patch applied, `failed` = patch failed. - `mcp_tool_call.status`: `completed` = tool succeeded, `failed` = tool failed. - **Fatal stream errors:** `type = "error"` means the JSONL stream itself hit an - unrecoverable error. + unrecoverable error (except transient `"Reconnecting... X/Y"` notices, which + are non-fatal). ### Suggested minimal rendering @@ -327,3 +334,12 @@ If you want a compact UI, the following is usually enough: - Final answer: `item.completed` with `item.type = "agent_message"` - Optional progress: `item.started` / `item.completed` for `command_execution` and `file_change` + +### Optional/conditional emission notes + +- `turn.failed` only appears on failure; otherwise `turn.completed` is emitted. +- `reasoning` items only appear when reasoning summaries are enabled. +- `todo_list` items only appear when the plan tool is active; they are the + primary source of `item.updated`. +- `file_change` and `web_search` items are emitted only as `item.completed` + in the current `codex exec --json` stream. diff --git a/pyproject.toml b/pyproject.toml index c925f9c..050e307 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [project] name = "takopi" authors = [{name = "banteg"}] -version = "0.5.1" +version = "0.5.2" description = "Telegram bridge for Codex, Claude Code, and other agent CLIs." readme = "readme.md" license = { file = "LICENSE" } diff --git a/src/takopi/__init__.py b/src/takopi/__init__.py index dd9b22c..7225152 100644 --- a/src/takopi/__init__.py +++ b/src/takopi/__init__.py @@ -1 +1 @@ -__version__ = "0.5.1" +__version__ = "0.5.2" diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py index 6c86e10..d96aace 100644 --- a/src/takopi/bridge.py +++ b/src/takopi/bridge.py @@ -83,8 +83,8 @@ def _strip_engine_command( def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]: commands: list[dict[str, str]] = [] seen: set[str] = set() - for engine in router.engine_ids: - cmd = engine.lower() + for entry in router.available_entries: + cmd = entry.engine.lower() if cmd in seen: continue commands.append({"command": cmd, "description": f"start {cmd}"}) diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 7d09981..7cdd26c 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -174,7 +174,11 @@ def _parse_bridge_config( backends=backends, default_engine=default_engine, ) - engine_list = ", ".join(router.engine_ids) + available_engines = [entry.engine for entry in router.available_entries] + missing_engines = [entry.engine for entry in router.entries if not entry.available] + engine_list = ", ".join(available_engines) if available_engines else "none" + if missing_engines: + engine_list = f"{engine_list} (not installed: {', '.join(missing_engines)})" startup_msg = ( f"\N{OCTOPUS} **takopi is ready**\n\n" f"default: `{router.default_engine}` \n" diff --git a/src/takopi/router.py b/src/takopi/router.py index dc8a67b..f08804b 100644 --- a/src/takopi/router.py +++ b/src/takopi/router.py @@ -46,6 +46,10 @@ class AutoRouter: def entries(self) -> tuple[RunnerEntry, ...]: return self._entries + @property + def available_entries(self) -> tuple[RunnerEntry, ...]: + return tuple(entry for entry in self._entries if entry.available) + @property def engine_ids(self) -> tuple[EngineId, ...]: return tuple(entry.engine for entry in self._entries) diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index acaba57..93dcf23 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -41,6 +41,10 @@ _ACTION_KIND_MAP: dict[str, ActionKind] = { _RESUME_RE = re.compile(r"(?im)^\s*`?codex\s+resume\s+(?P[^`\s]+)`?\s*$") _ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]") _TRUSTED_DIR_RE = re.compile(r"not inside a trusted directory", re.IGNORECASE) +_RECONNECTING_RE = re.compile( + r"^Reconnecting\.{3}\s*(?P\d+)/(?P\d+)\s*$", + re.IGNORECASE, +) def _strip_ansi(text: str) -> str: @@ -60,6 +64,18 @@ def _extract_stderr_reason(stderr_tail: str) -> str | None: return lines[-1] +def _parse_reconnect_message(message: str) -> tuple[int, int] | None: + match = _RECONNECTING_RE.match(message) + if not match: + return None + try: + attempt = int(match.group("attempt")) + max_attempts = int(match.group("max")) + except (TypeError, ValueError): + return None + return (attempt, max_attempts) + + def _started_event(token: ResumeToken, *, title: str) -> StartedEvent: return StartedEvent(engine=token.engine, resume=token, title=title) @@ -229,13 +245,14 @@ def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent] ) ] if phase == "completed": - exit_code = item["exit_code"] - ok = item["status"] != "failed" - if exit_code is not None: + status = item["status"] + exit_code = item.get("exit_code") + ok = status == "completed" + if isinstance(exit_code, int): ok = ok and exit_code == 0 detail = { "exit_code": exit_code, - "status": item["status"], + "status": status, } return [ _action_event( @@ -254,7 +271,7 @@ def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent] title = str(name) if name else "tool" detail = { "name": name, - "status": item["status"], + "status": item.get("status"), "arguments": item.get("arguments"), } else: @@ -263,7 +280,7 @@ def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent] detail = { "server": item["server"], "tool": item["tool"], - "status": item["status"], + "status": item.get("status"), "arguments": item.get("arguments"), } @@ -278,10 +295,14 @@ def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent] ) ] if phase == "completed": - ok = item["status"] != "failed" and not item["error"] - error = item["error"] + status = item.get("status") + error = item.get("error") + ok = status == "completed" and not error if error: - detail["error_message"] = str(error.get("message") or error) + if isinstance(error, dict): + detail["error_message"] = str(error.get("message") or error) + else: + detail["error_message"] = str(error) result_summary = _summarize_tool_result(item.get("result")) if result_summary is not None: detail["result_summary"] = result_summary @@ -326,11 +347,11 @@ def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent] return [] title = _format_change_summary(item) detail = { - "changes": item["changes"], - "status": item["status"], - "error": item["error"], + "changes": item.get("changes", []), + "status": item.get("status"), + "error": item.get("error"), } - ok = item["status"] != "failed" + ok = item.get("status") == "completed" return [ _action_event( phase="completed", @@ -459,7 +480,22 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): etype = data["type"] match etype: case "error": - message = str(data["message"]) + message = str(data.get("message") or "") + reconnect = _parse_reconnect_message(message) + if reconnect is not None: + attempt, max_attempts = reconnect + phase: ActionPhase = "started" if attempt <= 1 else "updated" + return [ + _action_event( + phase=phase, + action_id="codex.reconnect", + kind="note", + title=message, + detail={"attempt": attempt, "max": max_attempts}, + level="info", + ) + ] + fatal_flag = data.get("fatal") fatal = fatal_flag is True or fatal_flag is None if fatal: diff --git a/tests/test_codex_tool_result_summary.py b/tests/test_codex_tool_result_summary.py index cdc9fb1..23d3abd 100644 --- a/tests/test_codex_tool_result_summary.py +++ b/tests/test_codex_tool_result_summary.py @@ -66,3 +66,41 @@ def test_translate_mcp_tool_call_summarizes_legacy_structured_key() -> None: assert len(out) == 1 assert isinstance(out[0], ActionEvent) assert out[0].action.detail["result_summary"]["has_structured"] is True + + +def test_translate_mcp_tool_call_missing_error_is_ok() -> None: + evt = { + "type": "item.completed", + "item": { + "id": "item_4", + "type": "mcp_tool_call", + "server": "docs", + "tool": "search", + "status": "completed", + "result": {"content": []}, + }, + } + + out = translate_codex_event(evt, title="Codex") + assert len(out) == 1 + assert isinstance(out[0], ActionEvent) + assert out[0].ok is True + + +def test_translate_command_execution_allows_missing_exit_code() -> None: + evt = { + "type": "item.completed", + "item": { + "id": "item_5", + "type": "command_execution", + "command": "ls -la", + "aggregated_output": "", + "status": "completed", + }, + } + + out = translate_codex_event(evt, title="Codex") + assert len(out) == 1 + assert isinstance(out[0], ActionEvent) + assert out[0].ok is True + assert out[0].action.detail["exit_code"] is None diff --git a/tests/test_exec_runner.py b/tests/test_exec_runner.py index e673876..f26851a 100644 --- a/tests/test_exec_runner.py +++ b/tests/test_exec_runner.py @@ -243,6 +243,77 @@ async def test_codex_runner_preserves_warning_order(tmp_path) -> None: assert seen[2].answer == "ok" +@pytest.mark.anyio +async def test_codex_runner_reconnect_notice_is_non_fatal(tmp_path) -> None: + thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc" + + codex_path = tmp_path / "codex" + codex_path.write_text( + "#!/usr/bin/env python3\n" + "import json\n" + "import sys\n" + "\n" + "sys.stdin.read()\n" + "print(json.dumps({'type': 'error', 'message': 'Reconnecting... 1/5'}), flush=True)\n" + f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n" + "print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n", + encoding="utf-8", + ) + codex_path.chmod(0o755) + + runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[]) + seen = [evt async for evt in runner.run("hi", None)] + + assert len(seen) == 3 + assert isinstance(seen[0], ActionEvent) + assert seen[0].phase == "started" + assert seen[0].ok is None + assert seen[0].action.kind == "note" + assert seen[0].action.title == "Reconnecting... 1/5" + + assert isinstance(seen[1], StartedEvent) + assert seen[1].resume.value == thread_id + + assert isinstance(seen[2], CompletedEvent) + assert seen[2].resume == seen[1].resume + assert seen[2].answer == "ok" + + +@pytest.mark.anyio +async def test_codex_runner_reconnect_notice_updates_phase(tmp_path) -> None: + thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc" + + codex_path = tmp_path / "codex" + codex_path.write_text( + "#!/usr/bin/env python3\n" + "import json\n" + "import sys\n" + "\n" + "sys.stdin.read()\n" + "print(json.dumps({'type': 'error', 'message': 'Reconnecting... 1/5'}), flush=True)\n" + "print(json.dumps({'type': 'error', 'message': 'Reconnecting... 2/5'}), flush=True)\n" + f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n" + "print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n" + "print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n", + encoding="utf-8", + ) + codex_path.chmod(0o755) + + runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[]) + seen = [evt async for evt in runner.run("hi", None)] + + assert len(seen) == 4 + first = seen[0] + second = seen[1] + assert isinstance(first, ActionEvent) + assert isinstance(second, ActionEvent) + assert first.phase == "started" + assert second.phase == "updated" + assert first.action.id == second.action.id == "codex.reconnect" + assert isinstance(seen[2], StartedEvent) + assert isinstance(seen[3], CompletedEvent) + + @pytest.mark.anyio async def test_codex_runner_includes_stderr_reason(tmp_path) -> None: codex_path = tmp_path / "codex" diff --git a/uv.lock b/uv.lock index 224bc7b..5a0e5d4 100644 --- a/uv.lock +++ b/uv.lock @@ -354,7 +354,7 @@ wheels = [ [[package]] name = "takopi" -version = "0.5.1" +version = "0.5.2" source = { editable = "." } dependencies = [ { name = "anyio" },