From 978430cb12626e0cf3bfc2d294d18362775b7fbd Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Sun, 28 Dec 2025 22:34:24 +0400 Subject: [PATCH] feat: format progress updates per exec spec --- .../src/codex_telegram_bridge/exec_bridge.py | 51 +- .../src/codex_telegram_bridge/exec_render.py | 440 ++++++++++-------- 2 files changed, 276 insertions(+), 215 deletions(-) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py index 43676d8..11a56d9 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py @@ -60,29 +60,34 @@ class ProgressEditor: self.edit_every_s = edit_every_s self._lock = threading.Lock() - self._pending: Optional[str] = None - self._last_sent: Optional[str] = None + self._pending: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None + self._last_sent: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None self._last_edit_at = 0.0 self._stop = threading.Event() self._thread = threading.Thread(target=self._run, daemon=True) self._thread.start() - def set(self, text: str) -> None: + def set(self, text: str, entities: Optional[list[dict[str, Any]]] = None) -> None: text = _clamp_tg_text(text) with self._lock: - self._pending = text + self._pending = (text, entities) + + def set_markdown(self, text: str) -> None: + rendered_text, entities = render_markdown(text) + self.set(rendered_text, entities or None) def stop(self) -> None: self._stop.set() self._thread.join(timeout=1.0) - def _edit(self, text: str) -> None: + def _edit(self, text: str, entities: Optional[list[dict[str, Any]]]) -> None: try: self.bot.edit_message_text( chat_id=self.chat_id, message_id=self.message_id, text=text, + entities=entities, ) except Exception as e: log( @@ -92,7 +97,7 @@ class ProgressEditor: def _run(self) -> None: while not self._stop.is_set(): - to_send: Optional[str] = None + to_send: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None now = time.monotonic() with self._lock: if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s: @@ -103,7 +108,7 @@ class ProgressEditor: self._pending = None if to_send is not None: - self._edit(to_send) + self._edit(to_send[0], to_send[1]) self._stop.wait(0.2) @@ -358,12 +363,19 @@ def run( ) typing_thread.start() + started_at = time.monotonic() + session_box: dict[str, Optional[str]] = {"id": resume_session} + progress_renderer = ExecProgressRenderer(max_actions=5) + progress_id: Optional[int] = None progress: Optional[ProgressEditor] = None try: + initial_text = progress_renderer.render_progress(0.0) + initial_rendered, initial_entities = render_markdown(initial_text) progress_msg = bot.send_message( chat_id=chat_id, - text="Working...", + text=initial_rendered, + entities=initial_entities or None, reply_to_message_id=user_msg_id, disable_notification=silent_progress, ) @@ -374,10 +386,6 @@ def run( if progress_id is not None: progress = ProgressEditor(bot, chat_id, progress_id, edit_every_s) - started_at = time.monotonic() - session_box: dict[str, Optional[str]] = {"id": resume_session} - progress_renderer = ExecProgressRenderer(max_lines=4) - def on_event(evt: Dict[str, Any]) -> None: event_type = evt.get("type") if event_type == "thread.started": @@ -392,15 +400,10 @@ def run( thread_id, meta={"workspace": workspace}, ) - line = progress_renderer.note_event(evt) - if line and progress is not None: - elapsed = int(time.monotonic() - started_at) - session_id = session_box["id"] - header = f"Working... ({elapsed}s)" - if session_id: - header += f"\nSession: {session_id}" - msg = progress_renderer.render(header) - progress.set(msg) + if progress_renderer.note_event(evt) and progress is not None: + elapsed = time.monotonic() - started_at + msg = progress_renderer.render_progress(elapsed) + progress.set_markdown(msg) def _stop_background() -> None: typing_stop.set() @@ -443,13 +446,15 @@ def run( _stop_background() answer = answer or "(No agent_message captured from JSON stream.)" - final_text, final_entities = render_markdown(answer) + elapsed = time.monotonic() - started_at + final_md = progress_renderer.render_final(elapsed, answer) + final_text, final_entities = render_markdown(final_md) can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT if loud_final or not can_edit_final: sent_msgs = bot.send_message_markdown_chunked( chat_id=chat_id, - text=answer, + text=final_md, reply_to_message_id=user_msg_id, ) for m in sent_msgs: diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py index 08b92b2..572fe2c 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py @@ -5,7 +5,93 @@ import re from collections import deque from dataclasses import dataclass, field from textwrap import indent -from typing import Any, Dict, List, Optional +from typing import Any, Optional + +ELLIPSIS = "…" +STATUS_RUNNING = "▸" +STATUS_DONE = "✓" +HEADER_SEP = " · " + +MAX_CMD_LEN = 40 +MAX_REASON_LEN = 80 +MAX_QUERY_LEN = 60 +MAX_PATH_LEN = 40 +MAX_PROGRESS_CHARS = 300 + + +def _one_line(text: str) -> str: + return " ".join((text or "").split()) + + +def _truncate(text: str, max_len: int) -> str: + text = _one_line(text) + if len(text) <= max_len: + return text + if max_len <= len(ELLIPSIS): + return text[:max_len] + return text[: max_len - len(ELLIPSIS)] + ELLIPSIS + + +def _inline_code(text: str) -> str: + return f"`{text}`" + + +def _format_elapsed(elapsed_s: float) -> str: + total = max(0, int(elapsed_s)) + minutes, seconds = divmod(total, 60) + hours, minutes = divmod(minutes, 60) + if hours: + return f"{hours}h {minutes:02d}m" + if minutes: + return f"{minutes}m {seconds:02d}s" + return f"{seconds}s" + + +def _format_header(elapsed_s: float, turn: Optional[int], done: bool) -> str: + label = "Done" if done else "codex" + elapsed = _format_elapsed(elapsed_s) + if turn is not None: + return f"{label}{HEADER_SEP}{elapsed}{HEADER_SEP}turn {turn}" + return f"{label}{HEADER_SEP}{elapsed}" + + +def _format_reasoning(text: str) -> str: + if not text: + return "" + return f"_{_truncate(text, MAX_REASON_LEN)}_" + + +def _format_command(command: str) -> str: + command = _truncate(command, MAX_CMD_LEN) + if not command: + command = "(empty)" + return _inline_code(command) + + +def _format_query(query: str) -> str: + return _truncate(query, MAX_QUERY_LEN) + + +def _format_paths(paths: list[str]) -> str: + rendered = [] + for path in paths: + rendered.append(_inline_code(_truncate(path, MAX_PATH_LEN))) + return ", ".join(rendered) + + +def _format_file_change(changes: list[dict[str, Any]]) -> str: + paths = [change.get("path") for change in changes if change.get("path")] + if not paths: + total = len(changes) + return "updated files" if total == 0 else f"updated {total} files" + if len(paths) <= 3: + return f"updated {_format_paths(paths)}" + return f"updated {len(paths)} files" + + +def _format_tool_call(server: str, tool: str) -> str: + name = ".".join(part for part in (server, tool) if part) + return name or "tool" def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> str: @@ -19,23 +105,6 @@ def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> s return "\n".join(lines) -def _format_todo(items: list[dict[str, Any]]) -> str: - rendered: list[str] = [] - for item in items: - status = "done" if item.get("completed") else "todo" - rendered.append(f"- [{status}] {item.get('text', '')}") - return "\n".join(rendered) - - -def _format_todo_summary(items: list[dict[str, Any]]) -> str: - total = len(items) - done = sum(1 for item in items if item.get("completed")) - next_text = next((item.get("text", "") for item in items if not item.get("completed")), "") - if next_text: - return f"plan: {done}/{total} done, next: {_shorten(next_text, 80)}" - return f"plan: {done}/{total} done" - - def _maybe_parse_json(text: str) -> Optional[Any]: try: return json.loads(text) @@ -43,16 +112,14 @@ def _maybe_parse_json(text: str) -> Optional[Any]: return None -def _shorten(text: str, max_len: int = 140) -> str: - if len(text) <= max_len: - return text - return text[: max_len - 3] + "..." - - @dataclass class ExecRenderState: items: dict[str, dict[str, Any]] = field(default_factory=dict) - recent: deque[str] = field(default_factory=lambda: deque(maxlen=4)) + recent_actions: deque[str] = field(default_factory=lambda: deque(maxlen=5)) + current_action: Optional[str] = None + current_action_id: Optional[str] = None + pending_reasoning: Optional[str] = None + current_reasoning: Optional[str] = None last_turn: Optional[int] = None @@ -65,6 +132,31 @@ def _record_item(state: ExecRenderState, item: dict[str, Any]) -> None: state.last_turn = int(match.group(1)) +def _set_current_action(state: ExecRenderState, item_id: Optional[str], line: str) -> bool: + changed = False + if state.current_action != line or state.current_action_id != item_id: + state.current_action = line + state.current_action_id = item_id + if state.pending_reasoning: + state.current_reasoning = state.pending_reasoning + state.pending_reasoning = None + changed = True + return changed + + +def _complete_action(state: ExecRenderState, item_id: Optional[str], line: str) -> bool: + changed = False + if line: + state.recent_actions.append(line) + changed = True + if item_id and state.current_action_id == item_id: + state.current_action = None + state.current_action_id = None + state.current_reasoning = None + changed = True + return changed + + def render_event_cli( event: dict[str, Any], state: ExecRenderState, @@ -75,26 +167,13 @@ def render_event_cli( lines: list[str] = [] if etype == "thread.started": - thread_id = event.get("thread_id", "") - if thread_id: - lines.append(f"thread started: {thread_id}") - lines.append(f"resume with: codex exec resume {thread_id}") - else: - lines.append("thread started") - return lines + return ["thread started"] if etype == "turn.started": return ["turn started"] if etype == "turn.completed": - usage = event.get("usage", {}) - lines.append( - "turn completed " - f"(in={usage.get('input_tokens', 0)} " - f"cached={usage.get('cached_input_tokens', 0)} " - f"out={usage.get('output_tokens', 0)})" - ) - return lines + return ["turn completed"] if etype == "turn.failed": error = event.get("error", {}).get("message", "") @@ -108,9 +187,8 @@ def render_event_cli( _record_item(state, item) itype = item.get("type") - status = item.get("status") - if itype == "agent_message": + if itype == "agent_message" and etype == "item.completed": text = item.get("text", "") parsed = _maybe_parse_json(text) if parsed is not None: @@ -121,177 +199,155 @@ def render_event_cli( lines.extend(indent(text, " ").splitlines() if text else [" (empty)"]) elif itype == "reasoning" and show_reasoning: - lines.append(f"reasoning: {item.get('text', '')}") + reasoning = _format_reasoning(item.get("text", "")) + if reasoning: + lines.append(reasoning) elif itype == "command_execution": - command = item.get("command", "") + command = _format_command(item.get("command", "")) if etype == "item.started": - lines.append(f"run: {command}") - else: + lines.append(f"{STATUS_RUNNING} running: {command}") + elif etype == "item.completed": exit_code = item.get("exit_code") - outcome = "ok" if status == "completed" else status or "unknown" - lines.append(f"command {outcome} (exit={exit_code}): {command}") + exit_part = f" (exit {exit_code})" if exit_code is not None else "" + lines.append(f"{STATUS_DONE} ran: {command}{exit_part}") output = _truncate_output(item.get("aggregated_output", "")) if output: lines.extend(indent(output, " ").splitlines()) - elif itype == "file_change": - changes = item.get("changes", []) - counts = {"add": 0, "update": 0, "delete": 0} - for change in changes: - kind = change.get("kind") - if kind in counts: - counts[kind] += 1 - lines.append( - "file changes " - f"(status={status}) add={counts['add']} " - f"update={counts['update']} delete={counts['delete']}" - ) - for change in changes: - lines.append(f" {change.get('kind')} {change.get('path')}") + elif itype == "file_change" and etype == "item.completed": + line = _format_file_change(item.get("changes", [])) + lines.append(f"{STATUS_DONE} {line}") elif itype == "mcp_tool_call": - server = item.get("server", "") - tool = item.get("tool", "") + name = _format_tool_call(item.get("server", ""), item.get("tool", "")) if etype == "item.started": - lines.append(f"tool call: {server}.{tool}") - else: - outcome = "ok" if status == "completed" else status or "unknown" - lines.append(f"tool {outcome}: {server}.{tool}") - if item.get("error"): - lines.append(f" error: {item['error'].get('message', '')}") - result = item.get("result") or {} - if result.get("structured_content") is not None: - lines.append(" result:") - lines.extend( - indent(json.dumps(result["structured_content"], indent=2), " ").splitlines() - ) + lines.append(f"{STATUS_RUNNING} tool: {name}") + elif etype == "item.completed": + lines.append(f"{STATUS_DONE} tool: {name}") - elif itype == "web_search": - lines.append(f"web search: {item.get('query', '')}") + elif itype == "web_search" and etype == "item.completed": + query = _format_query(item.get("query", "")) + lines.append(f"{STATUS_DONE} searched: {query}") - elif itype == "todo_list": - todo = _format_todo(item.get("items", [])) - lines.append(f"plan ({etype}):") - lines.extend(indent(todo, " ").splitlines()) - - elif itype == "error": - lines.append(f"warning: {item.get('message', '')}") - - else: - lines.append(f"{etype}: {item}") - - else: - if etype: - lines.append(f"event: {etype}") - else: - lines.append(f"event: {event}") + elif itype == "error" and etype == "item.completed": + warning = _truncate(item.get("message", ""), 120) + lines.append(f"{STATUS_DONE} warning: {warning}") return lines -def render_event_progress(event: dict[str, Any], state: ExecRenderState) -> Optional[str]: - etype = event.get("type") - - if etype == "thread.started": - thread_id = event.get("thread_id", "") - return f"thread started: {thread_id}" if thread_id else "thread started" - - if etype == "turn.started": - return "turn started" - - if etype == "turn.completed": - usage = event.get("usage", {}) - return ( - "turn completed " - f"(in={usage.get('input_tokens', 0)} " - f"cached={usage.get('cached_input_tokens', 0)} " - f"out={usage.get('output_tokens', 0)})" - ) - - if etype == "turn.failed": - error = event.get("error", {}).get("message", "") - return f"turn failed: {error}" - - if etype == "error": - return f"stream error: {event.get('message', '')}" - - if etype in {"item.started", "item.updated", "item.completed"}: - item = event.get("item", {}) or {} - _record_item(state, item) - - itype = item.get("type") - status = item.get("status") - - if itype == "agent_message" and etype == "item.completed": - text = item.get("text", "") - snippet = text.splitlines()[0] if text else "" - if snippet: - return f"assistant: {_shorten(snippet, 120)}" - return "assistant response ready" - - if itype == "command_execution": - command = item.get("command", "") - if etype == "item.started": - return f"run: {_shorten(command, 160)}" - exit_code = item.get("exit_code") - outcome = "ok" if status == "completed" else status or "unknown" - return f"command {outcome} (exit={exit_code}): {_shorten(command, 120)}" - - if itype == "file_change": - changes = item.get("changes", []) - counts = {"add": 0, "update": 0, "delete": 0} - for change in changes: - kind = change.get("kind") - if kind in counts: - counts[kind] += 1 - return ( - "file changes " - f"+{counts['add']} ~{counts['update']} -{counts['delete']}" - ) - - if itype == "mcp_tool_call": - server = item.get("server", "") - tool = item.get("tool", "") - if etype == "item.started": - return f"tool call: {server}.{tool}" - outcome = "ok" if status == "completed" else status or "unknown" - return f"tool {outcome}: {server}.{tool}" - - if itype == "web_search": - return f"web search: {_shorten(item.get('query', ''), 120)}" - - if itype == "todo_list": - return _format_todo_summary(item.get("items", [])) - - if itype == "error": - return f"warning: {_shorten(item.get('message', ''), 120)}" - - return None - - class ExecProgressRenderer: - def __init__(self, max_lines: int = 4) -> None: - self.state = ExecRenderState(recent=deque(maxlen=max_lines)) + def __init__(self, max_actions: int = 5, max_chars: int = MAX_PROGRESS_CHARS) -> None: + self.state = ExecRenderState(recent_actions=deque(maxlen=max_actions)) + self.max_chars = max_chars - def note_event(self, event: dict[str, Any]) -> Optional[str]: - line = render_event_progress(event, self.state) - if not line: - return None - line = line.strip() - if not line: - return None - if self.state.recent and self.state.recent[-1] == line: - return line - self.state.recent.append(line) - return line + def note_event(self, event: dict[str, Any]) -> bool: + etype = event.get("type") + changed = False + + if etype in {"thread.started", "turn.started"}: + return True + + if etype in {"item.started", "item.updated", "item.completed"}: + item = event.get("item", {}) or {} + _record_item(self.state, item) + itype = item.get("type") + item_id = item.get("id") if isinstance(item.get("id"), str) else None + + if itype == "reasoning": + reasoning = _format_reasoning(item.get("text", "")) + if reasoning: + if self.state.current_action and not self.state.current_reasoning: + self.state.current_reasoning = reasoning + changed = True + else: + self.state.pending_reasoning = reasoning + return changed + + if itype == "command_execution": + command = _format_command(item.get("command", "")) + if etype == "item.started": + line = f"{STATUS_RUNNING} running: {command}" + changed = _set_current_action(self.state, item_id, line) or changed + elif etype == "item.completed": + exit_code = item.get("exit_code") + exit_part = f" (exit {exit_code})" if exit_code is not None else "" + line = f"{STATUS_DONE} ran: {command}{exit_part}" + changed = _complete_action(self.state, item_id, line) or changed + return changed + + if itype == "mcp_tool_call": + name = _format_tool_call(item.get("server", ""), item.get("tool", "")) + if etype == "item.started": + line = f"{STATUS_RUNNING} tool: {name}" + changed = _set_current_action(self.state, item_id, line) or changed + elif etype == "item.completed": + line = f"{STATUS_DONE} tool: {name}" + changed = _complete_action(self.state, item_id, line) or changed + return changed + + if itype == "web_search" and etype == "item.completed": + query = _format_query(item.get("query", "")) + line = f"{STATUS_DONE} searched: {query}" + return _complete_action(self.state, item_id, line) or changed + + if itype == "file_change" and etype == "item.completed": + line = f"{STATUS_DONE} {_format_file_change(item.get('changes', []))}" + return _complete_action(self.state, item_id, line) or changed + + if itype == "error" and etype == "item.completed": + warning = _truncate(item.get("message", ""), 120) + line = f"{STATUS_DONE} warning: {warning}" + return _complete_action(self.state, item_id, line) or changed + + return changed + + def render_progress(self, elapsed_s: float) -> str: + header = _format_header(elapsed_s, self.state.last_turn, done=False) + actions = list(self.state.recent_actions) + current_reasoning = self.state.current_reasoning + current_action = self.state.current_action - def render(self, header: str) -> str: lines: list[str] = [] - if self.state.last_turn is not None: - lines.append(f"Turn: {self.state.last_turn}") - if self.state.recent: - lines.extend(self.state.recent) + if actions: + lines.extend(actions) + if current_reasoning and current_action: + lines.append(current_reasoning) + if current_action: + lines.append(current_action) + + message = self._assemble(header, lines) + if len(message) <= self.max_chars: + return message + + trimmed_actions = list(actions) + while len(message) > self.max_chars and trimmed_actions: + trimmed_actions.pop(0) + lines = [] + if trimmed_actions: + lines.extend(trimmed_actions) + if current_reasoning and current_action: + lines.append(current_reasoning) + if current_action: + lines.append(current_action) + message = self._assemble(header, lines) + + return message + + def render_final(self, elapsed_s: float, answer: str) -> str: + header = _format_header(elapsed_s, self.state.last_turn, done=True) + lines: list[str] = [] + if self.state.recent_actions: + lines.extend(self.state.recent_actions) + body = self._assemble(header, lines) + answer = (answer or "").strip() + if answer: + body = body + "\n\n" + answer + return body + + @staticmethod + def _assemble(header: str, lines: list[str]) -> str: if not lines: return header return header + "\n\n" + "\n".join(lines)