From c2c87a0d06d3b304b683733db515990ba0b71f8e Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Mon, 29 Dec 2025 03:26:09 +0400 Subject: [PATCH] refactor: share event formatting --- .../src/codex_telegram_bridge/exec_render.py | 93 ++++++++++--------- 1 file changed, 50 insertions(+), 43 deletions(-) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py index 89d9457..dda7526 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_render.py @@ -58,20 +58,28 @@ def _shorten_path(path: str, width: int) -> str: return _shorten(path.replace("/", " /"), width).replace(" /", "/") -def render_event_cli(event: dict[str, Any], last_turn: int | None = None) -> tuple[int | None, list[str]]: +def format_event( + event: dict[str, Any], + last_turn: int | None, +) -> tuple[int | None, list[str], str | None, str | None]: + """ + Returns (new_last_turn, cli_lines, progress_line, progress_prefix). + progress_prefix is only set when progress_line is set, and is used for + replacing a preceding "running" line on completion. + """ lines: list[str] = [] match event["type"]: case "thread.started": - return last_turn, ["thread started"] + return last_turn, ["thread started"], None, None case "turn.started": - return last_turn, ["turn started"] + return last_turn, ["turn started"], None, None case "turn.completed": - return last_turn, ["turn completed"] + return last_turn, ["turn completed"], None, None case "turn.failed": - return last_turn, [f"turn failed: {event['error']['message']}"] + return last_turn, [f"turn failed: {event['error']['message']}"], None, None case "error": - return last_turn, [f"stream error: {event['message']}"] + return last_turn, [f"stream error: {event['message']}"], None, None case "item.started" | "item.updated" | "item.completed" as etype: item = event["item"] item_num = extract_numeric_id(item["id"], last_turn) @@ -82,25 +90,32 @@ def render_event_cli(event: dict[str, Any], last_turn: int | None = None) -> tup case ("agent_message", "item.completed"): lines.append("assistant:") lines.extend(indent(item["text"], " ").splitlines()) + return last_turn, lines, None, None case ("reasoning", "item.completed"): - lines.append(prefix + item["text"]) + line = prefix + item["text"] + return last_turn, [line], line, prefix case ("command_execution", "item.started"): command = f"`{_shorten(item['command'], MAX_CMD_LEN)}`" - lines.append(prefix + f"{STATUS_RUNNING} running: {command}") + line = prefix + f"{STATUS_RUNNING} running: {command}" + return last_turn, [line], line, prefix case ("command_execution", "item.completed"): command = f"`{_shorten(item['command'], MAX_CMD_LEN)}`" exit_code = item["exit_code"] exit_part = f" (exit {exit_code})" if exit_code is not None else "" - lines.append(prefix + f"{STATUS_DONE} ran: {command}{exit_part}") + line = prefix + f"{STATUS_DONE} ran: {command}{exit_part}" + return last_turn, [line], line, prefix case ("mcp_tool_call", "item.started"): name = ".".join(part for part in (item["server"], item["tool"]) if part) or "tool" - lines.append(prefix + f"{STATUS_RUNNING} tool: {name}") + line = prefix + f"{STATUS_RUNNING} tool: {name}" + return last_turn, [line], line, prefix case ("mcp_tool_call", "item.completed"): name = ".".join(part for part in (item["server"], item["tool"]) if part) or "tool" - lines.append(prefix + f"{STATUS_DONE} tool: {name}") + line = prefix + f"{STATUS_DONE} tool: {name}" + return last_turn, [line], line, prefix case ("web_search", "item.completed"): query = _shorten(item["query"], MAX_QUERY_LEN) - lines.append(prefix + f"{STATUS_DONE} searched: {query}") + line = prefix + f"{STATUS_DONE} searched: {query}" + return last_turn, [line], line, prefix case ("file_change", "item.completed"): paths = [change["path"] for change in item["changes"] if change.get("path")] if not paths: @@ -110,15 +125,21 @@ def render_event_cli(event: dict[str, Any], last_turn: int | None = None) -> tup desc = "updated " + ", ".join(f"`{_shorten_path(p, MAX_PATH_LEN)}`" for p in paths) else: desc = f"updated {len(paths)} files" - lines.append(prefix + f"{STATUS_DONE} {desc}") + line = prefix + f"{STATUS_DONE} {desc}" + return last_turn, [line], line, prefix case ("error", "item.completed"): warning = _shorten(item["message"], 120) - lines.append(prefix + f"{STATUS_DONE} warning: {warning}") + line = prefix + f"{STATUS_DONE} warning: {warning}" + return last_turn, [line], line, prefix case _: - pass - return last_turn, lines + return last_turn, [], None, None case _: - return last_turn, lines + return last_turn, [], None, None + + +def render_event_cli(event: dict[str, Any], last_turn: int | None = None) -> tuple[int | None, list[str]]: + last_turn, cli_lines, _, _ = format_event(event, last_turn) + return last_turn, cli_lines class ExecProgressRenderer: @@ -129,34 +150,21 @@ class ExecProgressRenderer: self.last_turn: int | None = None def note_event(self, event: dict[str, Any]) -> bool: - match event["type"]: - case "thread.started" | "turn.started": - return True - case "item.started" | "item.updated" | "item.completed" as etype: - item = event["item"] - item_id = extract_numeric_id(item["id"], self.last_turn) - self.last_turn = item_id if item_id is not None else self.last_turn - prefix = f"[{item_id if item_id is not None else '?'}] " + if event["type"] in {"thread.started", "turn.started"}: + return True - match item["type"]: - case "agent_message": - return False - case _: - _, lines = render_event_cli(event, self.last_turn) - if not lines: - return False - line = lines[0] + self.last_turn, _, progress_line, progress_prefix = format_event(event, self.last_turn) + if progress_line is None: + return False - # Replace the preceding "running" line for the same item on completion. - if etype == "item.completed" and self.recent_actions: - last = self.recent_actions[-1] - if last.startswith(prefix + f"{STATUS_RUNNING} "): - self.recent_actions.pop() + # Replace the preceding "running" line for the same item on completion. + if event["type"] == "item.completed" and progress_prefix and self.recent_actions: + last = self.recent_actions[-1] + if last.startswith(progress_prefix + f"{STATUS_RUNNING} "): + self.recent_actions.pop() - self.recent_actions.append(line) - return True - case _: - return False + self.recent_actions.append(progress_line) + return True def render_progress(self, elapsed_s: float) -> str: header = format_header(elapsed_s, self.last_turn, label="working") @@ -175,4 +183,3 @@ class ExecProgressRenderer: @staticmethod def _assemble(header: str, lines: list[str]) -> str: return header if not lines else header + "\n\n" + HARD_BREAK.join(lines) -