feat: format progress updates per exec spec

This commit is contained in:
banteg
2025-12-28 22:34:24 +04:00
parent 864b00d988
commit 978430cb12
2 changed files with 276 additions and 215 deletions
@@ -60,29 +60,34 @@ class ProgressEditor:
self.edit_every_s = edit_every_s self.edit_every_s = edit_every_s
self._lock = threading.Lock() self._lock = threading.Lock()
self._pending: Optional[str] = None self._pending: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None
self._last_sent: Optional[str] = None self._last_sent: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None
self._last_edit_at = 0.0 self._last_edit_at = 0.0
self._stop = threading.Event() self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True) self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start() self._thread.start()
def set(self, text: str) -> None: def set(self, text: str, entities: Optional[list[dict[str, Any]]] = None) -> None:
text = _clamp_tg_text(text) text = _clamp_tg_text(text)
with self._lock: with self._lock:
self._pending = text self._pending = (text, entities)
def set_markdown(self, text: str) -> None:
rendered_text, entities = render_markdown(text)
self.set(rendered_text, entities or None)
def stop(self) -> None: def stop(self) -> None:
self._stop.set() self._stop.set()
self._thread.join(timeout=1.0) self._thread.join(timeout=1.0)
def _edit(self, text: str) -> None: def _edit(self, text: str, entities: Optional[list[dict[str, Any]]]) -> None:
try: try:
self.bot.edit_message_text( self.bot.edit_message_text(
chat_id=self.chat_id, chat_id=self.chat_id,
message_id=self.message_id, message_id=self.message_id,
text=text, text=text,
entities=entities,
) )
except Exception as e: except Exception as e:
log( log(
@@ -92,7 +97,7 @@ class ProgressEditor:
def _run(self) -> None: def _run(self) -> None:
while not self._stop.is_set(): while not self._stop.is_set():
to_send: Optional[str] = None to_send: Optional[tuple[str, Optional[list[dict[str, Any]]]]] = None
now = time.monotonic() now = time.monotonic()
with self._lock: with self._lock:
if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s: if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s:
@@ -103,7 +108,7 @@ class ProgressEditor:
self._pending = None self._pending = None
if to_send is not None: if to_send is not None:
self._edit(to_send) self._edit(to_send[0], to_send[1])
self._stop.wait(0.2) self._stop.wait(0.2)
@@ -358,12 +363,19 @@ def run(
) )
typing_thread.start() typing_thread.start()
started_at = time.monotonic()
session_box: dict[str, Optional[str]] = {"id": resume_session}
progress_renderer = ExecProgressRenderer(max_actions=5)
progress_id: Optional[int] = None progress_id: Optional[int] = None
progress: Optional[ProgressEditor] = None progress: Optional[ProgressEditor] = None
try: try:
initial_text = progress_renderer.render_progress(0.0)
initial_rendered, initial_entities = render_markdown(initial_text)
progress_msg = bot.send_message( progress_msg = bot.send_message(
chat_id=chat_id, chat_id=chat_id,
text="Working...", text=initial_rendered,
entities=initial_entities or None,
reply_to_message_id=user_msg_id, reply_to_message_id=user_msg_id,
disable_notification=silent_progress, disable_notification=silent_progress,
) )
@@ -374,10 +386,6 @@ def run(
if progress_id is not None: if progress_id is not None:
progress = ProgressEditor(bot, chat_id, progress_id, edit_every_s) progress = ProgressEditor(bot, chat_id, progress_id, edit_every_s)
started_at = time.monotonic()
session_box: dict[str, Optional[str]] = {"id": resume_session}
progress_renderer = ExecProgressRenderer(max_lines=4)
def on_event(evt: Dict[str, Any]) -> None: def on_event(evt: Dict[str, Any]) -> None:
event_type = evt.get("type") event_type = evt.get("type")
if event_type == "thread.started": if event_type == "thread.started":
@@ -392,15 +400,10 @@ def run(
thread_id, thread_id,
meta={"workspace": workspace}, meta={"workspace": workspace},
) )
line = progress_renderer.note_event(evt) if progress_renderer.note_event(evt) and progress is not None:
if line and progress is not None: elapsed = time.monotonic() - started_at
elapsed = int(time.monotonic() - started_at) msg = progress_renderer.render_progress(elapsed)
session_id = session_box["id"] progress.set_markdown(msg)
header = f"Working... ({elapsed}s)"
if session_id:
header += f"\nSession: {session_id}"
msg = progress_renderer.render(header)
progress.set(msg)
def _stop_background() -> None: def _stop_background() -> None:
typing_stop.set() typing_stop.set()
@@ -443,13 +446,15 @@ def run(
_stop_background() _stop_background()
answer = answer or "(No agent_message captured from JSON stream.)" answer = answer or "(No agent_message captured from JSON stream.)"
final_text, final_entities = render_markdown(answer) elapsed = time.monotonic() - started_at
final_md = progress_renderer.render_final(elapsed, answer)
final_text, final_entities = render_markdown(final_md)
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
if loud_final or not can_edit_final: if loud_final or not can_edit_final:
sent_msgs = bot.send_message_markdown_chunked( sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id, chat_id=chat_id,
text=answer, text=final_md,
reply_to_message_id=user_msg_id, reply_to_message_id=user_msg_id,
) )
for m in sent_msgs: for m in sent_msgs:
@@ -5,7 +5,93 @@ import re
from collections import deque from collections import deque
from dataclasses import dataclass, field from dataclasses import dataclass, field
from textwrap import indent from textwrap import indent
from typing import Any, Dict, List, Optional from typing import Any, Optional
ELLIPSIS = ""
STATUS_RUNNING = ""
STATUS_DONE = ""
HEADER_SEP = " · "
MAX_CMD_LEN = 40
MAX_REASON_LEN = 80
MAX_QUERY_LEN = 60
MAX_PATH_LEN = 40
MAX_PROGRESS_CHARS = 300
def _one_line(text: str) -> str:
return " ".join((text or "").split())
def _truncate(text: str, max_len: int) -> str:
text = _one_line(text)
if len(text) <= max_len:
return text
if max_len <= len(ELLIPSIS):
return text[:max_len]
return text[: max_len - len(ELLIPSIS)] + ELLIPSIS
def _inline_code(text: str) -> str:
return f"`{text}`"
def _format_elapsed(elapsed_s: float) -> str:
total = max(0, int(elapsed_s))
minutes, seconds = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes:02d}m"
if minutes:
return f"{minutes}m {seconds:02d}s"
return f"{seconds}s"
def _format_header(elapsed_s: float, turn: Optional[int], done: bool) -> str:
label = "Done" if done else "codex"
elapsed = _format_elapsed(elapsed_s)
if turn is not None:
return f"{label}{HEADER_SEP}{elapsed}{HEADER_SEP}turn {turn}"
return f"{label}{HEADER_SEP}{elapsed}"
def _format_reasoning(text: str) -> str:
if not text:
return ""
return f"_{_truncate(text, MAX_REASON_LEN)}_"
def _format_command(command: str) -> str:
command = _truncate(command, MAX_CMD_LEN)
if not command:
command = "(empty)"
return _inline_code(command)
def _format_query(query: str) -> str:
return _truncate(query, MAX_QUERY_LEN)
def _format_paths(paths: list[str]) -> str:
rendered = []
for path in paths:
rendered.append(_inline_code(_truncate(path, MAX_PATH_LEN)))
return ", ".join(rendered)
def _format_file_change(changes: list[dict[str, Any]]) -> str:
paths = [change.get("path") for change in changes if change.get("path")]
if not paths:
total = len(changes)
return "updated files" if total == 0 else f"updated {total} files"
if len(paths) <= 3:
return f"updated {_format_paths(paths)}"
return f"updated {len(paths)} files"
def _format_tool_call(server: str, tool: str) -> str:
name = ".".join(part for part in (server, tool) if part)
return name or "tool"
def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> str: def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> str:
@@ -19,23 +105,6 @@ def _truncate_output(text: str, max_lines: int = 20, max_chars: int = 4000) -> s
return "\n".join(lines) return "\n".join(lines)
def _format_todo(items: list[dict[str, Any]]) -> str:
rendered: list[str] = []
for item in items:
status = "done" if item.get("completed") else "todo"
rendered.append(f"- [{status}] {item.get('text', '')}")
return "\n".join(rendered)
def _format_todo_summary(items: list[dict[str, Any]]) -> str:
total = len(items)
done = sum(1 for item in items if item.get("completed"))
next_text = next((item.get("text", "") for item in items if not item.get("completed")), "")
if next_text:
return f"plan: {done}/{total} done, next: {_shorten(next_text, 80)}"
return f"plan: {done}/{total} done"
def _maybe_parse_json(text: str) -> Optional[Any]: def _maybe_parse_json(text: str) -> Optional[Any]:
try: try:
return json.loads(text) return json.loads(text)
@@ -43,16 +112,14 @@ def _maybe_parse_json(text: str) -> Optional[Any]:
return None return None
def _shorten(text: str, max_len: int = 140) -> str:
if len(text) <= max_len:
return text
return text[: max_len - 3] + "..."
@dataclass @dataclass
class ExecRenderState: class ExecRenderState:
items: dict[str, dict[str, Any]] = field(default_factory=dict) items: dict[str, dict[str, Any]] = field(default_factory=dict)
recent: deque[str] = field(default_factory=lambda: deque(maxlen=4)) recent_actions: deque[str] = field(default_factory=lambda: deque(maxlen=5))
current_action: Optional[str] = None
current_action_id: Optional[str] = None
pending_reasoning: Optional[str] = None
current_reasoning: Optional[str] = None
last_turn: Optional[int] = None last_turn: Optional[int] = None
@@ -65,6 +132,31 @@ def _record_item(state: ExecRenderState, item: dict[str, Any]) -> None:
state.last_turn = int(match.group(1)) state.last_turn = int(match.group(1))
def _set_current_action(state: ExecRenderState, item_id: Optional[str], line: str) -> bool:
changed = False
if state.current_action != line or state.current_action_id != item_id:
state.current_action = line
state.current_action_id = item_id
if state.pending_reasoning:
state.current_reasoning = state.pending_reasoning
state.pending_reasoning = None
changed = True
return changed
def _complete_action(state: ExecRenderState, item_id: Optional[str], line: str) -> bool:
changed = False
if line:
state.recent_actions.append(line)
changed = True
if item_id and state.current_action_id == item_id:
state.current_action = None
state.current_action_id = None
state.current_reasoning = None
changed = True
return changed
def render_event_cli( def render_event_cli(
event: dict[str, Any], event: dict[str, Any],
state: ExecRenderState, state: ExecRenderState,
@@ -75,26 +167,13 @@ def render_event_cli(
lines: list[str] = [] lines: list[str] = []
if etype == "thread.started": if etype == "thread.started":
thread_id = event.get("thread_id", "") return ["thread started"]
if thread_id:
lines.append(f"thread started: {thread_id}")
lines.append(f"resume with: codex exec resume {thread_id}")
else:
lines.append("thread started")
return lines
if etype == "turn.started": if etype == "turn.started":
return ["turn started"] return ["turn started"]
if etype == "turn.completed": if etype == "turn.completed":
usage = event.get("usage", {}) return ["turn completed"]
lines.append(
"turn completed "
f"(in={usage.get('input_tokens', 0)} "
f"cached={usage.get('cached_input_tokens', 0)} "
f"out={usage.get('output_tokens', 0)})"
)
return lines
if etype == "turn.failed": if etype == "turn.failed":
error = event.get("error", {}).get("message", "") error = event.get("error", {}).get("message", "")
@@ -108,9 +187,8 @@ def render_event_cli(
_record_item(state, item) _record_item(state, item)
itype = item.get("type") itype = item.get("type")
status = item.get("status")
if itype == "agent_message": if itype == "agent_message" and etype == "item.completed":
text = item.get("text", "") text = item.get("text", "")
parsed = _maybe_parse_json(text) parsed = _maybe_parse_json(text)
if parsed is not None: if parsed is not None:
@@ -121,177 +199,155 @@ def render_event_cli(
lines.extend(indent(text, " ").splitlines() if text else [" (empty)"]) lines.extend(indent(text, " ").splitlines() if text else [" (empty)"])
elif itype == "reasoning" and show_reasoning: elif itype == "reasoning" and show_reasoning:
lines.append(f"reasoning: {item.get('text', '')}") reasoning = _format_reasoning(item.get("text", ""))
if reasoning:
lines.append(reasoning)
elif itype == "command_execution": elif itype == "command_execution":
command = item.get("command", "") command = _format_command(item.get("command", ""))
if etype == "item.started": if etype == "item.started":
lines.append(f"run: {command}") lines.append(f"{STATUS_RUNNING} running: {command}")
else: elif etype == "item.completed":
exit_code = item.get("exit_code") exit_code = item.get("exit_code")
outcome = "ok" if status == "completed" else status or "unknown" exit_part = f" (exit {exit_code})" if exit_code is not None else ""
lines.append(f"command {outcome} (exit={exit_code}): {command}") lines.append(f"{STATUS_DONE} ran: {command}{exit_part}")
output = _truncate_output(item.get("aggregated_output", "")) output = _truncate_output(item.get("aggregated_output", ""))
if output: if output:
lines.extend(indent(output, " ").splitlines()) lines.extend(indent(output, " ").splitlines())
elif itype == "file_change": elif itype == "file_change" and etype == "item.completed":
changes = item.get("changes", []) line = _format_file_change(item.get("changes", []))
counts = {"add": 0, "update": 0, "delete": 0} lines.append(f"{STATUS_DONE} {line}")
for change in changes:
kind = change.get("kind")
if kind in counts:
counts[kind] += 1
lines.append(
"file changes "
f"(status={status}) add={counts['add']} "
f"update={counts['update']} delete={counts['delete']}"
)
for change in changes:
lines.append(f" {change.get('kind')} {change.get('path')}")
elif itype == "mcp_tool_call": elif itype == "mcp_tool_call":
server = item.get("server", "") name = _format_tool_call(item.get("server", ""), item.get("tool", ""))
tool = item.get("tool", "")
if etype == "item.started": if etype == "item.started":
lines.append(f"tool call: {server}.{tool}") lines.append(f"{STATUS_RUNNING} tool: {name}")
else: elif etype == "item.completed":
outcome = "ok" if status == "completed" else status or "unknown" lines.append(f"{STATUS_DONE} tool: {name}")
lines.append(f"tool {outcome}: {server}.{tool}")
if item.get("error"):
lines.append(f" error: {item['error'].get('message', '')}")
result = item.get("result") or {}
if result.get("structured_content") is not None:
lines.append(" result:")
lines.extend(
indent(json.dumps(result["structured_content"], indent=2), " ").splitlines()
)
elif itype == "web_search": elif itype == "web_search" and etype == "item.completed":
lines.append(f"web search: {item.get('query', '')}") query = _format_query(item.get("query", ""))
lines.append(f"{STATUS_DONE} searched: {query}")
elif itype == "todo_list": elif itype == "error" and etype == "item.completed":
todo = _format_todo(item.get("items", [])) warning = _truncate(item.get("message", ""), 120)
lines.append(f"plan ({etype}):") lines.append(f"{STATUS_DONE} warning: {warning}")
lines.extend(indent(todo, " ").splitlines())
elif itype == "error":
lines.append(f"warning: {item.get('message', '')}")
else:
lines.append(f"{etype}: {item}")
else:
if etype:
lines.append(f"event: {etype}")
else:
lines.append(f"event: {event}")
return lines return lines
def render_event_progress(event: dict[str, Any], state: ExecRenderState) -> Optional[str]: class ExecProgressRenderer:
def __init__(self, max_actions: int = 5, max_chars: int = MAX_PROGRESS_CHARS) -> None:
self.state = ExecRenderState(recent_actions=deque(maxlen=max_actions))
self.max_chars = max_chars
def note_event(self, event: dict[str, Any]) -> bool:
etype = event.get("type") etype = event.get("type")
changed = False
if etype == "thread.started": if etype in {"thread.started", "turn.started"}:
thread_id = event.get("thread_id", "") return True
return f"thread started: {thread_id}" if thread_id else "thread started"
if etype == "turn.started":
return "turn started"
if etype == "turn.completed":
usage = event.get("usage", {})
return (
"turn completed "
f"(in={usage.get('input_tokens', 0)} "
f"cached={usage.get('cached_input_tokens', 0)} "
f"out={usage.get('output_tokens', 0)})"
)
if etype == "turn.failed":
error = event.get("error", {}).get("message", "")
return f"turn failed: {error}"
if etype == "error":
return f"stream error: {event.get('message', '')}"
if etype in {"item.started", "item.updated", "item.completed"}: if etype in {"item.started", "item.updated", "item.completed"}:
item = event.get("item", {}) or {} item = event.get("item", {}) or {}
_record_item(state, item) _record_item(self.state, item)
itype = item.get("type") itype = item.get("type")
status = item.get("status") item_id = item.get("id") if isinstance(item.get("id"), str) else None
if itype == "agent_message" and etype == "item.completed": if itype == "reasoning":
text = item.get("text", "") reasoning = _format_reasoning(item.get("text", ""))
snippet = text.splitlines()[0] if text else "" if reasoning:
if snippet: if self.state.current_action and not self.state.current_reasoning:
return f"assistant: {_shorten(snippet, 120)}" self.state.current_reasoning = reasoning
return "assistant response ready" changed = True
else:
self.state.pending_reasoning = reasoning
return changed
if itype == "command_execution": if itype == "command_execution":
command = item.get("command", "") command = _format_command(item.get("command", ""))
if etype == "item.started": if etype == "item.started":
return f"run: {_shorten(command, 160)}" line = f"{STATUS_RUNNING} running: {command}"
changed = _set_current_action(self.state, item_id, line) or changed
elif etype == "item.completed":
exit_code = item.get("exit_code") exit_code = item.get("exit_code")
outcome = "ok" if status == "completed" else status or "unknown" exit_part = f" (exit {exit_code})" if exit_code is not None else ""
return f"command {outcome} (exit={exit_code}): {_shorten(command, 120)}" line = f"{STATUS_DONE} ran: {command}{exit_part}"
changed = _complete_action(self.state, item_id, line) or changed
if itype == "file_change": return changed
changes = item.get("changes", [])
counts = {"add": 0, "update": 0, "delete": 0}
for change in changes:
kind = change.get("kind")
if kind in counts:
counts[kind] += 1
return (
"file changes "
f"+{counts['add']} ~{counts['update']} -{counts['delete']}"
)
if itype == "mcp_tool_call": if itype == "mcp_tool_call":
server = item.get("server", "") name = _format_tool_call(item.get("server", ""), item.get("tool", ""))
tool = item.get("tool", "")
if etype == "item.started": if etype == "item.started":
return f"tool call: {server}.{tool}" line = f"{STATUS_RUNNING} tool: {name}"
outcome = "ok" if status == "completed" else status or "unknown" changed = _set_current_action(self.state, item_id, line) or changed
return f"tool {outcome}: {server}.{tool}" elif etype == "item.completed":
line = f"{STATUS_DONE} tool: {name}"
changed = _complete_action(self.state, item_id, line) or changed
return changed
if itype == "web_search": if itype == "web_search" and etype == "item.completed":
return f"web search: {_shorten(item.get('query', ''), 120)}" query = _format_query(item.get("query", ""))
line = f"{STATUS_DONE} searched: {query}"
return _complete_action(self.state, item_id, line) or changed
if itype == "todo_list": if itype == "file_change" and etype == "item.completed":
return _format_todo_summary(item.get("items", [])) line = f"{STATUS_DONE} {_format_file_change(item.get('changes', []))}"
return _complete_action(self.state, item_id, line) or changed
if itype == "error": if itype == "error" and etype == "item.completed":
return f"warning: {_shorten(item.get('message', ''), 120)}" warning = _truncate(item.get("message", ""), 120)
line = f"{STATUS_DONE} warning: {warning}"
return _complete_action(self.state, item_id, line) or changed
return None return changed
def render_progress(self, elapsed_s: float) -> str:
header = _format_header(elapsed_s, self.state.last_turn, done=False)
actions = list(self.state.recent_actions)
current_reasoning = self.state.current_reasoning
current_action = self.state.current_action
class ExecProgressRenderer:
def __init__(self, max_lines: int = 4) -> None:
self.state = ExecRenderState(recent=deque(maxlen=max_lines))
def note_event(self, event: dict[str, Any]) -> Optional[str]:
line = render_event_progress(event, self.state)
if not line:
return None
line = line.strip()
if not line:
return None
if self.state.recent and self.state.recent[-1] == line:
return line
self.state.recent.append(line)
return line
def render(self, header: str) -> str:
lines: list[str] = [] lines: list[str] = []
if self.state.last_turn is not None: if actions:
lines.append(f"Turn: {self.state.last_turn}") lines.extend(actions)
if self.state.recent: if current_reasoning and current_action:
lines.extend(self.state.recent) lines.append(current_reasoning)
if current_action:
lines.append(current_action)
message = self._assemble(header, lines)
if len(message) <= self.max_chars:
return message
trimmed_actions = list(actions)
while len(message) > self.max_chars and trimmed_actions:
trimmed_actions.pop(0)
lines = []
if trimmed_actions:
lines.extend(trimmed_actions)
if current_reasoning and current_action:
lines.append(current_reasoning)
if current_action:
lines.append(current_action)
message = self._assemble(header, lines)
return message
def render_final(self, elapsed_s: float, answer: str) -> str:
header = _format_header(elapsed_s, self.state.last_turn, done=True)
lines: list[str] = []
if self.state.recent_actions:
lines.extend(self.state.recent_actions)
body = self._assemble(header, lines)
answer = (answer or "").strip()
if answer:
body = body + "\n\n" + answer
return body
@staticmethod
def _assemble(header: str, lines: list[str]) -> str:
if not lines: if not lines:
return header return header
return header + "\n\n" + "\n".join(lines) return header + "\n\n" + "\n".join(lines)