feat: add exec event renderers

This commit is contained in:
banteg
2025-12-28 22:07:32 +04:00
parent 2930731bea
commit ae13b25391
3 changed files with 303 additions and 28 deletions
+1
View File
@@ -79,6 +79,7 @@ Add `--chat-id` if `chat_id` is not set in `~/.codex/telegram.toml`.
## Files
- `src/codex_telegram_bridge/constants.py`: limits and config path constants
- `src/codex_telegram_bridge/config.py`: config loading and chat-id parsing helpers
- `src/codex_telegram_bridge/exec_render.py`: renderers for codex exec JSONL events
- `src/codex_telegram_bridge/rendering.py`: markdown rendering + chunking
- `src/codex_telegram_bridge/routes.py`: sqlite routing store
- `src/codex_telegram_bridge/telegram_client.py`: Telegram Bot API client
@@ -18,6 +18,7 @@ import typer
from .config import config_get, load_telegram_config, resolve_chat_ids
from .constants import TELEGRAM_HARD_LIMIT
from .exec_render import ExecProgressRenderer, ExecRenderState, render_event_cli
from .rendering import render_markdown
from .routes import RouteStore
from .telegram_client import TelegramClient
@@ -45,21 +46,6 @@ def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str:
return text[: limit - 20] + "\n...(truncated)"
def _summarize_item(item: Dict[str, Any]) -> str:
item_type = item.get("type")
if isinstance(item_type, str):
if item_type == "agent_message" and isinstance(item.get("text"), str):
snippet = item["text"].strip().replace("\n", " ")
if len(snippet) > 140:
snippet = snippet[:140] + "..."
return f"agent_message: {snippet}"
name = item.get("name") or item.get("tool_name") or item.get("id")
if isinstance(name, str):
return f"{item_type}: {name}"
return item_type
return "item.completed"
class ProgressEditor:
def __init__(
self,
@@ -202,15 +188,18 @@ class CodexExecRunner:
found_session: Optional[str] = session_id
last_agent_text: Optional[str] = None
cli_state = ExecRenderState()
for line in proc.stdout:
line = line.strip()
if not line:
continue
log(f"[codex][event] {line}")
try:
evt = json.loads(line)
except json.JSONDecodeError:
continue
for out in render_event_cli(evt, cli_state):
log(f"[codex] {out}")
if on_event is not None:
try:
on_event(evt)
@@ -387,6 +376,7 @@ def run(
started_at = time.monotonic()
session_box: dict[str, Optional[str]] = {"id": resume_session}
progress_renderer = ExecProgressRenderer(max_lines=4)
def on_event(evt: Dict[str, Any]) -> None:
event_type = evt.get("type")
@@ -402,16 +392,14 @@ def run(
thread_id,
meta={"workspace": workspace},
)
elif event_type == "item.completed":
item = evt.get("item") or {}
line = progress_renderer.note_event(evt)
if line and progress is not None:
elapsed = int(time.monotonic() - started_at)
line = _summarize_item(item) if isinstance(item, dict) else "item.completed"
session_id = session_box["id"]
header = f"Working... ({elapsed}s)"
if session_id:
header += f"\nSession: {session_id}"
msg = f"{header}\n\n{line}"
if progress is not None:
msg = progress_renderer.render(header)
progress.set(msg)
def _stop_background() -> None:
@@ -459,12 +447,6 @@ def run(
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
if loud_final or not can_edit_final:
if progress_id is not None:
try:
bot.delete_message(chat_id=chat_id, message_id=progress_id)
except Exception as e:
log(f"[handle] delete progress failed chat_id={chat_id} message_id={progress_id}: {e}")
sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id,
text=answer,
@@ -472,6 +454,11 @@ def run(
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace})
if progress_id is not None:
try:
bot.delete_message(chat_id=chat_id, message_id=progress_id)
except Exception as e:
log(f"[handle] delete progress failed chat_id={chat_id} message_id={progress_id}: {e}")
else:
bot.edit_message_text(
chat_id=chat_id,
@@ -0,0 +1,287 @@
from __future__ import annotations
import json
from collections import deque
from dataclasses import dataclass, field
from textwrap import indent
from typing import Any, Dict, List, Optional
def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> str:
if not text:
return ""
if len(text) > max_chars:
text = text[-max_chars:]
lines = text.splitlines()
if len(lines) > max_lines:
lines = ["..."] + lines[-max_lines:]
return "\n".join(lines)
def _format_todo(items: list[dict[str, Any]]) -> str:
rendered: list[str] = []
for item in items:
status = "done" if item.get("completed") else "todo"
rendered.append(f"- [{status}] {item.get('text', '')}")
return "\n".join(rendered)
def _format_todo_summary(items: list[dict[str, Any]]) -> str:
total = len(items)
done = sum(1 for item in items if item.get("completed"))
next_text = next((item.get("text", "") for item in items if not item.get("completed")), "")
if next_text:
return f"plan: {done}/{total} done, next: {_shorten(next_text, 80)}"
return f"plan: {done}/{total} done"
def _maybe_parse_json(text: str) -> Optional[Any]:
try:
return json.loads(text)
except json.JSONDecodeError:
return None
def _shorten(text: str, max_len: int = 140) -> str:
if len(text) <= max_len:
return text
return text[: max_len - 3] + "..."
@dataclass
class ExecRenderState:
items: dict[str, dict[str, Any]] = field(default_factory=dict)
recent: deque[str] = field(default_factory=lambda: deque(maxlen=4))
def _record_item(state: ExecRenderState, item: dict[str, Any]) -> None:
item_id = item.get("id")
if isinstance(item_id, str) and item_id:
state.items[item_id] = item
def render_event_cli(
event: dict[str, Any],
state: ExecRenderState,
*,
show_reasoning: bool = False,
) -> list[str]:
etype = event.get("type")
lines: list[str] = []
if etype == "thread.started":
thread_id = event.get("thread_id", "")
if thread_id:
lines.append(f"thread started: {thread_id}")
lines.append(f"resume with: codex exec resume {thread_id}")
else:
lines.append("thread started")
return lines
if etype == "turn.started":
return ["turn started"]
if etype == "turn.completed":
usage = event.get("usage", {})
lines.append(
"turn completed "
f"(in={usage.get('input_tokens', 0)} "
f"cached={usage.get('cached_input_tokens', 0)} "
f"out={usage.get('output_tokens', 0)})"
)
return lines
if etype == "turn.failed":
error = event.get("error", {}).get("message", "")
return [f"turn failed: {error}"]
if etype == "error":
return [f"stream error: {event.get('message', '')}"]
if etype in {"item.started", "item.updated", "item.completed"}:
item = event.get("item", {}) or {}
_record_item(state, item)
itype = item.get("type")
status = item.get("status")
if itype == "agent_message":
text = item.get("text", "")
parsed = _maybe_parse_json(text)
if parsed is not None:
lines.append("assistant (json):")
lines.extend(indent(json.dumps(parsed, indent=2), " ").splitlines())
else:
lines.append("assistant:")
lines.extend(indent(text, " ").splitlines() if text else [" (empty)"])
elif itype == "reasoning" and show_reasoning:
lines.append(f"reasoning: {item.get('text', '')}")
elif itype == "command_execution":
command = item.get("command", "")
if etype == "item.started":
lines.append(f"run: {command}")
else:
exit_code = item.get("exit_code")
outcome = "ok" if status == "completed" else status or "unknown"
lines.append(f"command {outcome} (exit={exit_code}): {command}")
output = _truncate_output(item.get("aggregated_output", ""))
if output:
lines.extend(indent(output, " ").splitlines())
elif itype == "file_change":
changes = item.get("changes", [])
counts = {"add": 0, "update": 0, "delete": 0}
for change in changes:
kind = change.get("kind")
if kind in counts:
counts[kind] += 1
lines.append(
"file changes "
f"(status={status}) add={counts['add']} "
f"update={counts['update']} delete={counts['delete']}"
)
for change in changes:
lines.append(f" {change.get('kind')} {change.get('path')}")
elif itype == "mcp_tool_call":
server = item.get("server", "")
tool = item.get("tool", "")
if etype == "item.started":
lines.append(f"tool call: {server}.{tool}")
else:
outcome = "ok" if status == "completed" else status or "unknown"
lines.append(f"tool {outcome}: {server}.{tool}")
if item.get("error"):
lines.append(f" error: {item['error'].get('message', '')}")
result = item.get("result") or {}
if result.get("structured_content") is not None:
lines.append(" result:")
lines.extend(
indent(json.dumps(result["structured_content"], indent=2), " ").splitlines()
)
elif itype == "web_search":
lines.append(f"web search: {item.get('query', '')}")
elif itype == "todo_list":
todo = _format_todo(item.get("items", []))
lines.append(f"plan ({etype}):")
lines.extend(indent(todo, " ").splitlines())
elif itype == "error":
lines.append(f"warning: {item.get('message', '')}")
else:
lines.append(f"{etype}: {item}")
else:
if etype:
lines.append(f"event: {etype}")
else:
lines.append(f"event: {event}")
return lines
def render_event_progress(event: dict[str, Any], state: ExecRenderState) -> Optional[str]:
etype = event.get("type")
if etype == "thread.started":
thread_id = event.get("thread_id", "")
return f"thread started: {thread_id}" if thread_id else "thread started"
if etype == "turn.started":
return "turn started"
if etype == "turn.completed":
usage = event.get("usage", {})
return (
"turn completed "
f"(in={usage.get('input_tokens', 0)} "
f"cached={usage.get('cached_input_tokens', 0)} "
f"out={usage.get('output_tokens', 0)})"
)
if etype == "turn.failed":
error = event.get("error", {}).get("message", "")
return f"turn failed: {error}"
if etype == "error":
return f"stream error: {event.get('message', '')}"
if etype in {"item.started", "item.updated", "item.completed"}:
item = event.get("item", {}) or {}
_record_item(state, item)
itype = item.get("type")
status = item.get("status")
if itype == "agent_message" and etype == "item.completed":
text = item.get("text", "")
snippet = text.splitlines()[0] if text else ""
if snippet:
return f"assistant: {_shorten(snippet, 120)}"
return "assistant response ready"
if itype == "command_execution":
command = item.get("command", "")
if etype == "item.started":
return f"run: {_shorten(command, 160)}"
exit_code = item.get("exit_code")
outcome = "ok" if status == "completed" else status or "unknown"
return f"command {outcome} (exit={exit_code}): {_shorten(command, 120)}"
if itype == "file_change":
changes = item.get("changes", [])
counts = {"add": 0, "update": 0, "delete": 0}
for change in changes:
kind = change.get("kind")
if kind in counts:
counts[kind] += 1
return (
"file changes "
f"+{counts['add']} ~{counts['update']} -{counts['delete']}"
)
if itype == "mcp_tool_call":
server = item.get("server", "")
tool = item.get("tool", "")
if etype == "item.started":
return f"tool call: {server}.{tool}"
outcome = "ok" if status == "completed" else status or "unknown"
return f"tool {outcome}: {server}.{tool}"
if itype == "web_search":
return f"web search: {_shorten(item.get('query', ''), 120)}"
if itype == "todo_list":
return _format_todo_summary(item.get("items", []))
if itype == "error":
return f"warning: {_shorten(item.get('message', ''), 120)}"
return None
class ExecProgressRenderer:
def __init__(self, max_lines: int = 4) -> None:
self.state = ExecRenderState(recent=deque(maxlen=max_lines))
def note_event(self, event: dict[str, Any]) -> Optional[str]:
line = render_event_progress(event, self.state)
if not line:
return None
line = line.strip()
if not line:
return None
if self.state.recent and self.state.recent[-1] == line:
return line
self.state.recent.append(line)
return line
def render(self, header: str) -> str:
if not self.state.recent:
return header
return header + "\n\n" + "\n".join(f"- {line}" for line in self.state.recent)