refactor: simplify telegram markdown rendering (#20)

This commit is contained in:
banteg
2026-01-02 04:36:05 +04:00
committed by GitHub
parent bd9387f7f0
commit b8455c8691
7 changed files with 360 additions and 303 deletions
+8 -20
View File
@@ -60,13 +60,16 @@ The orchestrator module containing:
| `run()` / `main()` | Typer CLI entry points |
| `_parse_bridge_config()` | Reads config + builds `BridgeConfig` |
### `markdown.py` - Telegram markdown helpers
### `render.py` - Takopi event + Markdown helpers
| Function | Purpose |
|----------|---------|
| Function/Class | Purpose |
|----------------|---------|
| `render_markdown()` | Markdown → Telegram text + entities |
| `prepare_telegram()` | Render + truncate for Telegram limits |
| `truncate_for_telegram()` | Smart truncation preserving resume lines |
| `trim_body()` | Trim body to 3500 chars (header/footer preserved) |
| `prepare_telegram()` | Trim + render Markdown parts for Telegram |
| `ExecProgressRenderer` | Stateful renderer tracking recent actions for progress display |
| `render_event_cli()` | Format a takopi event for CLI logs |
| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` |
### `telegram.py` - Telegram API wrapper
@@ -89,21 +92,6 @@ The orchestrator module containing:
- Stderr is drained into a bounded tail (debug logging only)
- Event callbacks must not raise; callback errors abort the run
### `render.py` - Takopi event rendering
Transforms takopi events into human-readable text:
| Function/Class | Purpose |
|----------------|---------|
| `ExecProgressRenderer` | Stateful renderer tracking recent actions for progress display |
| `render_event_cli()` | Format a takopi event for CLI logs |
| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` |
**Supported event types:**
- `started`
- `action`
- `completed`
### `model.py` / `runner.py` - Core domain types
| File | Purpose |
+50 -59
View File
@@ -11,9 +11,14 @@ from typing import Any
import anyio
from .markdown import TELEGRAM_MARKDOWN_LIMIT, prepare_telegram
from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent
from .render import ExecProgressRenderer, render_event_cli
from .render import (
ExecProgressRenderer,
MarkdownParts,
assemble_markdown_parts,
prepare_telegram,
render_event_cli,
)
from .router import AutoRouter, RunnerUnavailableError
from .runner import Runner
from .telegram import BotClient
@@ -152,18 +157,14 @@ async def _send_or_edit_markdown(
bot: BotClient,
*,
chat_id: int,
text: str,
parts: MarkdownParts,
edit_message_id: int | None = None,
reply_to_message_id: int | None = None,
disable_notification: bool = False,
limit: int = TELEGRAM_MARKDOWN_LIMIT,
is_resume_line: Callable[[str], bool] | None = None,
prepared: tuple[str, list[dict[str, Any]] | None] | None = None,
prepared: tuple[str, list[dict[str, Any]]] | None = None,
) -> tuple[dict[str, Any] | None, bool]:
if prepared is None:
rendered, entities = prepare_telegram(
text, limit=limit, is_resume_line=is_resume_line
)
rendered, entities = prepare_telegram(parts)
else:
rendered, entities = prepared
if edit_message_id is not None:
@@ -200,10 +201,8 @@ class ProgressEdits:
progress_edit_every: float,
clock: Callable[[], float],
sleep: Callable[[float], Awaitable[None]],
limit: int,
last_edit_at: float,
last_rendered: str | None,
is_resume_line: Callable[[str], bool],
) -> None:
self.bot = bot
self.chat_id = chat_id
@@ -213,10 +212,8 @@ class ProgressEdits:
self.progress_edit_every = progress_edit_every
self.clock = clock
self.sleep = sleep
self.limit = limit
self.last_edit_at = last_edit_at
self.last_rendered = last_rendered
self.is_resume_line = is_resume_line
self.event_seq = 0
self.rendered_seq = 0
self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1)
@@ -240,10 +237,9 @@ class ProgressEdits:
seq_at_render = self.event_seq
now = self.clock()
md = self.renderer.render_progress(now - self.started_at)
rendered, entities = prepare_telegram(
md, limit=self.limit, is_resume_line=self.is_resume_line
)
parts = self.renderer.render_progress_parts(now - self.started_at)
md = assemble_markdown_parts(parts)
rendered, entities = prepare_telegram(parts)
if rendered != self.last_rendered:
logger.debug(
"[progress] edit message_id=%s md=%s", self.progress_id, md
@@ -297,8 +293,7 @@ async def _send_startup(cfg: BridgeConfig) -> None:
sent, _ = await _send_or_edit_markdown(
cfg.bot,
chat_id=cfg.chat_id,
text=cfg.startup_msg,
limit=TELEGRAM_MARKDOWN_LIMIT,
parts=MarkdownParts(header=cfg.startup_msg),
)
if sent is not None:
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
@@ -336,18 +331,15 @@ async def send_initial_progress(
user_msg_id: int,
label: str,
renderer: ExecProgressRenderer,
is_resume_line: Callable[[str], bool],
clock: Callable[[], float],
limit: int,
) -> ProgressMessageState:
progress_id: int | None = None
last_edit_at = 0.0
last_rendered: str | None = None
initial_md = renderer.render_progress(0.0, label=label)
initial_rendered, initial_entities = prepare_telegram(
initial_md, limit=limit, is_resume_line=is_resume_line
)
initial_parts = renderer.render_progress_parts(0.0, label=label)
initial_md = assemble_markdown_parts(initial_parts)
initial_rendered, initial_entities = prepare_telegram(initial_parts)
logger.debug(
"[progress] send reply_to=%s md=%s rendered=%s entities=%s",
user_msg_id,
@@ -438,22 +430,19 @@ async def send_result_message(
chat_id: int,
user_msg_id: int,
progress_id: int | None,
markdown: str,
parts: MarkdownParts,
disable_notification: bool,
edit_message_id: int | None,
is_resume_line: Callable[[str], bool],
prepared: tuple[str, list[dict[str, Any]] | None] | None = None,
prepared: tuple[str, list[dict[str, Any]]] | None = None,
delete_tag: str = "final",
) -> None:
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=markdown,
parts=parts,
edit_message_id=edit_message_id,
reply_to_message_id=user_msg_id,
disable_notification=disable_notification,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=is_resume_line,
prepared=prepared,
)
if final_msg is None:
@@ -492,18 +481,16 @@ async def handle_message(
runner_text = _strip_resume_lines(text, is_resume_line=resume_strip)
progress_renderer = ExecProgressRenderer(
max_actions=5, resume_formatter=runner.format_resume
max_actions=5, resume_formatter=runner.format_resume, engine=runner.engine
)
progress_state = await send_initial_progress(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
label=f"working ({runner.engine})",
label="starting",
renderer=progress_renderer,
is_resume_line=is_resume_line,
clock=clock,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
progress_id = progress_state.message_id
@@ -516,10 +503,8 @@ async def handle_message(
progress_edit_every=progress_edit_every,
clock=clock,
sleep=sleep,
limit=TELEGRAM_MARKDOWN_LIMIT,
last_edit_at=progress_state.last_edit_at,
last_rendered=progress_state.last_rendered,
is_resume_line=is_resume_line,
)
running_task: RunningTask | None = None
@@ -575,17 +560,21 @@ async def handle_message(
if error is not None:
sync_resume_token(progress_renderer, outcome.resume)
err_body = _format_error(error)
final_md = progress_renderer.render_final(elapsed, err_body, status="error")
logger.debug("[error] markdown: %s", final_md)
final_parts = progress_renderer.render_final_parts(
elapsed, err_body, status="error"
)
logger.debug(
"[error] markdown: %s",
assemble_markdown_parts(final_parts),
)
await send_result_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
markdown=final_md,
parts=final_parts,
disable_notification=True,
edit_message_id=progress_id,
is_resume_line=is_resume_line,
delete_tag="error",
)
return
@@ -597,16 +586,17 @@ async def handle_message(
resume.value if resume else None,
elapsed,
)
final_md = progress_renderer.render_progress(elapsed, label="`cancelled`")
final_parts = progress_renderer.render_progress_parts(
elapsed, label="`cancelled`"
)
await send_result_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
markdown=final_md,
parts=final_parts,
disable_notification=True,
edit_message_id=progress_id,
is_resume_line=is_resume_line,
delete_tag="cancel",
)
return
@@ -629,13 +619,16 @@ async def handle_message(
"error" if run_ok is False else ("done" if final_answer.strip() else "error")
)
sync_resume_token(progress_renderer, completed.resume or outcome.resume)
final_md = progress_renderer.render_final(elapsed, final_answer, status=status)
logger.debug("[final] markdown: %s", final_md)
final_rendered, final_entities = prepare_telegram(
final_md, limit=TELEGRAM_MARKDOWN_LIMIT, is_resume_line=is_resume_line
final_parts = progress_renderer.render_final_parts(
elapsed, final_answer, status=status
)
can_edit_final = progress_id is not None and final_entities is not None
logger.debug(
"[final] markdown: %s",
assemble_markdown_parts(final_parts),
)
final_rendered, final_entities = prepare_telegram(final_parts)
can_edit_final = progress_id is not None
edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id
if edit_message_id is None:
@@ -658,10 +651,9 @@ async def handle_message(
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
markdown=final_md,
parts=final_parts,
disable_notification=False,
edit_message_id=edit_message_id,
is_resume_line=is_resume_line,
prepared=(final_rendered, final_entities),
delete_tag="final",
)
@@ -784,19 +776,19 @@ async def _send_runner_unavailable(
reason: str,
) -> None:
progress_renderer = ExecProgressRenderer(
max_actions=0, resume_formatter=runner.format_resume
max_actions=0, resume_formatter=runner.format_resume, engine=runner.engine
)
if resume_token is not None:
progress_renderer.resume_token = resume_token
final_md = progress_renderer.render_final(0.0, f"Error:\n{reason}", status="error")
final_parts = progress_renderer.render_final_parts(
0.0, f"error:\n{reason}", status="error"
)
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
parts=final_parts,
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=runner.is_resume_line,
)
@@ -859,10 +851,9 @@ async def run_main_loop(
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=f"Error:\n{exc}",
parts=MarkdownParts(header=f"error:\n{exc}"),
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
return
if not entry.available:
-84
View File
@@ -1,84 +0,0 @@
"""Markdown rendering and truncation helpers for Telegram constraints."""
from __future__ import annotations
import re
from collections.abc import Callable
from typing import Any
from markdown_it import MarkdownIt
from sulguk import transform_html
TELEGRAM_MARKDOWN_LIMIT = 3500
_md = MarkdownIt("commonmark", {"html": False})
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _md.render(md or "")
rendered = transform_html(html)
text = re.sub(r"(?m)^(\s*)•", r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def truncate_for_telegram(
text: str, limit: int, *, is_resume_line: Callable[[str], bool]
) -> str:
"""
Truncate text to fit Telegram limits while preserving the trailing resume command
line (if present), otherwise preserving the last non-empty line.
"""
if len(text) <= limit:
return text
lines = text.splitlines()
tail_lines: list[str] | None = None
is_resume_tail = False
for i in range(len(lines) - 1, -1, -1):
line = lines[i]
if is_resume_line(line):
tail_lines = lines[i:]
is_resume_tail = True
break
if tail_lines is None:
for i in range(len(lines) - 1, -1, -1):
if lines[i].strip():
tail_lines = [lines[i]]
break
tail = "\n".join(tail_lines or []).strip("\n")
sep = "\n\n"
max_tail = limit if is_resume_tail else (limit // 4)
tail = tail[-max_tail:] if max_tail > 0 else ""
head_budget = limit - len(sep) - len(tail)
if head_budget <= 0:
return tail[-limit:] if tail else text[:limit]
head = text[:head_budget].rstrip()
return (head + sep + tail)[:limit]
def prepare_telegram(
md: str,
*,
limit: int,
is_resume_line: Callable[[str], bool] | None = None,
) -> tuple[str, list[dict[str, Any]] | None]:
rendered, entities = render_markdown(md)
if len(rendered) > limit:
if is_resume_line is None:
def _never_resume_line(_line: str) -> bool:
return False
is_resume_line = _never_resume_line
rendered = truncate_for_telegram(rendered, limit, is_resume_line=is_resume_line)
return rendered, None
return rendered, entities
+169 -96
View File
@@ -2,18 +2,21 @@
from __future__ import annotations
import re
import textwrap
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from markdown_it import MarkdownIt
from sulguk import transform_html
from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
from .utils.paths import relativize_path
STATUS_RUNNING = ""
STATUS_UPDATE = ""
STATUS_DONE = ""
STATUS_FAIL = ""
STATUS = {"running": "", "update": "", "done": "", "fail": ""}
HEADER_SEP = " · "
HARD_BREAK = " \n"
@@ -21,6 +24,47 @@ MAX_PROGRESS_CMD_LEN = 300
MAX_FILE_CHANGES_INLINE = 3
@dataclass(frozen=True)
class MarkdownParts:
header: str
body: str | None = None
footer: str | None = None
def assemble_markdown_parts(parts: MarkdownParts) -> str:
return "\n\n".join(
chunk for chunk in (parts.header, parts.body, parts.footer) if chunk
)
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
md_renderer = MarkdownIt("commonmark", {"html": False})
html = md_renderer.render(md or "")
rendered = transform_html(html)
text = re.sub(r"(?m)^(\s*)•", r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def trim_body(body: str | None) -> str | None:
if not body:
return None
if len(body) > 3500:
body = body[: 3500 - 1] + ""
return body if body.strip() else None
def prepare_telegram(parts: MarkdownParts) -> tuple[str, list[dict[str, Any]]]:
trimmed = MarkdownParts(
header=parts.header or "",
body=trim_body(parts.body),
footer=parts.footer,
)
return render_markdown(assemble_markdown_parts(trimmed))
def format_changed_file_path(path: str, *, base_dir: Path | None = None) -> str:
return f"`{relativize_path(path, base_dir=base_dir)}`"
@@ -36,9 +80,12 @@ def format_elapsed(elapsed_s: float) -> str:
return f"{seconds}s"
def format_header(elapsed_s: float, item: int | None, label: str) -> str:
def format_header(
elapsed_s: float, item: int | None, *, label: str, engine: str
) -> str:
elapsed = format_elapsed(elapsed_s)
parts = [label, elapsed]
parts = [label, engine]
parts.append(elapsed)
if item is not None:
parts.append(f"step {item}")
return HEADER_SEP.join(parts)
@@ -47,24 +94,26 @@ def format_header(elapsed_s: float, item: int | None, label: str) -> str:
def shorten(text: str, width: int | None) -> str:
if width is None:
return text
if width <= 0:
return ""
if len(text) <= width:
return text
return textwrap.shorten(text, width=width, placeholder="")
def action_status_symbol(
action: Action, *, completed: bool, ok: bool | None = None
) -> str:
def action_status(action: Action, *, completed: bool, ok: bool | None = None) -> str:
if not completed:
return STATUS_RUNNING
return STATUS["running"]
if ok is not None:
return STATUS_DONE if ok else STATUS_FAIL
return STATUS["done"] if ok else STATUS["fail"]
detail = action.detail or {}
exit_code = detail.get("exit_code")
if isinstance(exit_code, int) and exit_code != 0:
return STATUS_FAIL
return STATUS_DONE
return STATUS["fail"]
return STATUS["done"]
def action_exit_suffix(action: Action) -> str:
def action_suffix(action: Action) -> str:
detail = action.detail or {}
exit_code = detail.get("exit_code")
if isinstance(exit_code, int) and exit_code != 0:
@@ -118,17 +167,25 @@ def format_action_title(action: Action, *, command_width: int | None) -> str:
return shorten(title, command_width)
def phase_status_and_suffix(event: ActionEvent) -> tuple[str, str]:
action = event.action
match event.phase:
case "completed":
status = action_status_symbol(action, completed=True, ok=event.ok)
suffix = action_exit_suffix(action)
return status, suffix
case "updated":
return STATUS_UPDATE, ""
case _:
return STATUS_RUNNING, ""
def format_action_line(
action: Action,
phase: str,
ok: bool | None,
*,
command_width: int | None,
) -> str:
if phase != "completed":
status = STATUS["update"] if phase == "updated" else STATUS["running"]
return f"{status} {format_action_title(action, command_width=command_width)}"
status = action_status(action, completed=True, ok=ok)
suffix = action_suffix(action)
return (
f"{status} {format_action_title(action, command_width=command_width)}{suffix}"
)
def is_command_log_line(line: str) -> bool:
return any(line.startswith(f"{symbol} `") for symbol in STATUS.values())
def render_event_cli(event: TakopiEvent) -> list[str]:
@@ -139,32 +196,44 @@ def render_event_cli(event: TakopiEvent) -> list[str]:
action = action_event.action
if action.kind == "turn":
return []
status, suffix = phase_status_and_suffix(action_event)
title = format_action_title(action, command_width=MAX_PROGRESS_CMD_LEN)
return [f"{status} {title}{suffix}"]
return [
format_action_line(
action_event.action,
action_event.phase,
action_event.ok,
command_width=MAX_PROGRESS_CMD_LEN,
)
]
case _:
return []
@dataclass
class RecentLine:
action_id: str
text: str
completed: bool = False
class ExecProgressRenderer:
def __init__(
self,
engine: str,
max_actions: int = 5,
command_width: int | None = MAX_PROGRESS_CMD_LEN,
resume_formatter: Callable[[ResumeToken], str] | None = None,
show_title: bool = False,
) -> None:
self.max_actions = max_actions
self.max_actions = max(0, int(max_actions))
self.command_width = command_width
self.recent_actions: deque[str] = deque(maxlen=max_actions)
self._recent_action_ids: deque[str] = deque(maxlen=max_actions)
self._recent_action_completed: deque[bool] = deque(maxlen=max_actions)
self.lines: deque[RecentLine] = deque(maxlen=self.max_actions)
self.action_count = 0
self._started_counts: dict[str, int] = {}
self.seen_action_ids: set[str] = set()
self.resume_token: ResumeToken | None = None
self.session_title: str | None = None
self._resume_formatter = resume_formatter
self.show_title = show_title
self.engine = engine
def note_event(self, event: TakopiEvent) -> bool:
match event:
@@ -179,85 +248,89 @@ class ExecProgressRenderer:
if not action_id:
return False
completed = phase == "completed"
if completed:
is_update = False
else:
started_count = self._started_counts.get(action_id, 0)
is_update = phase == "updated" or started_count > 0
if started_count == 0:
self.action_count += 1
self._started_counts[action_id] = 1
elif phase == "started":
self._started_counts[action_id] = started_count + 1
else:
self._started_counts[action_id] = started_count
has_open = self.has_open_line(action_id)
is_update = phase == "updated" or (phase == "started" and has_open)
phase_for_line = "updated" if is_update and not completed else phase
line = format_action_line(
action, phase_for_line, ok, command_width=self.command_width
)
if action_id not in self.seen_action_ids:
self.seen_action_ids.add(action_id)
self.action_count += 1
self.upsert_line(action_id, line=line, completed=completed)
return True
case _:
return False
if completed:
count = self._started_counts.get(action_id, 0)
if count <= 0:
self.action_count += 1
elif count == 1:
self._started_counts.pop(action_id, None)
else:
self._started_counts[action_id] = count - 1
status = (
STATUS_UPDATE
if (is_update and not completed)
else action_status_symbol(action, completed=completed, ok=ok)
def has_open_line(self, action_id: str) -> bool:
return any(
line.action_id == action_id and not line.completed for line in self.lines
)
title = format_action_title(action, command_width=self.command_width)
suffix = action_exit_suffix(action) if completed else ""
line = f"{status} {title}{suffix}"
self._append_action(action_id, completed=completed, line=line)
return True
def _append_action(self, action_id: str, *, completed: bool, line: str) -> None:
for i in range(len(self._recent_action_ids) - 1, -1, -1):
if (
self._recent_action_ids[i] == action_id
and not self._recent_action_completed[i]
):
self.recent_actions[i] = line
if completed:
self._recent_action_completed[i] = True
def upsert_line(self, action_id: str, *, line: str, completed: bool) -> None:
for i in range(len(self.lines) - 1, -1, -1):
existing = self.lines[i]
if existing.action_id == action_id and not existing.completed:
self.lines[i] = RecentLine(
action_id=action_id,
text=line,
completed=existing.completed or completed,
)
return
self.lines.append(
RecentLine(action_id=action_id, text=line, completed=completed)
)
if len(self.recent_actions) >= self.max_actions:
self.recent_actions.popleft()
self._recent_action_ids.popleft()
self._recent_action_completed.popleft()
self.recent_actions.append(line)
self._recent_action_ids.append(action_id)
self._recent_action_completed.append(completed)
def render_progress(self, elapsed_s: float, label: str = "working") -> str:
def render_progress_parts(
self, elapsed_s: float, label: str = "working"
) -> MarkdownParts:
step = self.action_count or None
header = format_header(elapsed_s, step, label=self._label_with_title(label))
message = self._assemble(header, list(self.recent_actions))
return self._append_resume(message)
header = format_header(
elapsed_s,
step,
label=self.label_with_title(label),
engine=self.engine,
)
body = self.assemble_body([line.text for line in self.lines])
return MarkdownParts(header=header, body=body, footer=self.render_footer())
def render_final(self, elapsed_s: float, answer: str, status: str = "done") -> str:
def render_final_parts(
self, elapsed_s: float, answer: str, status: str = "done"
) -> MarkdownParts:
step = self.action_count or None
header = format_header(elapsed_s, step, label=self._label_with_title(status))
header = format_header(
elapsed_s,
step,
label=self.label_with_title(status),
engine=self.engine,
)
lines = [line.text for line in self.lines]
if status == "done":
lines = [line for line in lines if not is_command_log_line(line)]
body = self.assemble_body(lines)
answer = (answer or "").strip()
message = header + ("\n\n" + answer if answer else "")
return self._append_resume(message)
if answer:
body = answer if not body else body + "\n\n" + answer
return MarkdownParts(header=header, body=body, footer=self.render_footer())
def _label_with_title(self, label: str) -> str:
def label_with_title(self, label: str) -> str:
if self.show_title and self.session_title:
return f"{label} ({self.session_title})"
return label
def _append_resume(self, message: str) -> str:
def render_footer(self) -> str | None:
if not self.resume_token or self._resume_formatter is None:
return message
return message + "\n\n" + self._resume_formatter(self.resume_token)
return None
return self._resume_formatter(self.resume_token)
@property
def recent_actions(self) -> list[str]:
return [line.text for line in self.lines]
@staticmethod
def _assemble(header: str, lines: list[str]) -> str:
return header if not lines else header + "\n\n" + HARD_BREAK.join(lines)
def assemble_body(lines: list[str]) -> str | None:
if not lines:
return None
return HARD_BREAK.join(lines)
+27 -27
View File
@@ -4,8 +4,8 @@ import anyio
import pytest
from takopi.bridge import _build_bot_commands, _strip_engine_command
from takopi.markdown import prepare_telegram, truncate_for_telegram
from takopi.model import EngineId, ResumeToken, TakopiEvent
from takopi.render import MarkdownParts, prepare_telegram
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.codex import CodexRunner
from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Sleep, Wait
@@ -101,25 +101,33 @@ def test_codex_extract_resume_accepts_uuid7() -> None:
assert runner.extract_resume(text) == ResumeToken(engine=CODEX_ENGINE, value=token)
def test_truncate_for_telegram_preserves_resume_line() -> None:
uuid = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
md = ("x" * 10_000) + f"\n`codex resume {uuid}`"
def test_prepare_telegram_trims_body_preserves_footer() -> None:
body_limit = 3500
parts = MarkdownParts(
header="header",
body="x" * (body_limit + 100),
footer="footer",
)
runner = CodexRunner(codex_cmd="codex", extra_args=[])
out = truncate_for_telegram(md, 400, is_resume_line=runner.is_resume_line)
rendered, _ = prepare_telegram(parts)
assert len(out) <= 400
assert f"codex resume {uuid}" in out
assert out.rstrip().endswith(f"`codex resume {uuid}`")
chunks = [chunk for chunk in rendered.split("\n\n") if chunk]
assert chunks[0] == "header"
assert chunks[-1].rstrip() == "footer"
assert len(chunks[1]) == body_limit
assert chunks[1].endswith("")
def test_truncate_for_telegram_keeps_last_non_empty_line() -> None:
md = "intro\n\n" + ("x" * 500) + "\nlast line"
def test_prepare_telegram_preserves_entities_on_truncate() -> None:
body_limit = 3500
parts = MarkdownParts(
header="h",
body="**bold** " + ("x" * (body_limit + 100)),
)
out = truncate_for_telegram(md, 120, is_resume_line=lambda _line: False)
_, entities = prepare_telegram(parts)
assert len(out) <= 120
assert out.rstrip().endswith("last line")
assert any(e.get("type") == "bold" for e in entities)
def test_strip_engine_command_inline() -> None:
@@ -171,15 +179,6 @@ def test_build_bot_commands_includes_cancel_and_engine() -> None:
assert any(cmd["command"] == "codex" for cmd in commands)
def test_prepare_telegram_drops_entities_on_truncate() -> None:
md = ("**bold** " * 200).strip()
rendered, entities = prepare_telegram(md, limit=40)
assert len(rendered) <= 40
assert entities is None
class _FakeBot:
def __init__(self) -> None:
self._next_id = 1
@@ -364,7 +363,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
@pytest.mark.anyio
async def test_new_final_message_forces_notification_when_too_long_to_edit() -> None:
async def test_long_final_message_edits_progress_message() -> None:
from takopi.bridge import BridgeConfig, handle_message
bot = _FakeBot()
@@ -386,9 +385,9 @@ async def test_new_final_message_forces_notification_when_too_long_to_edit() ->
resume_token=None,
)
assert len(bot.send_calls) == 2
assert len(bot.send_calls) == 1
assert bot.send_calls[0]["disable_notification"] is True
assert bot.send_calls[1]["disable_notification"] is False
assert len(bot.edit_calls) == 1
@pytest.mark.anyio
@@ -553,7 +552,8 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
)
assert bot.send_calls[0]["reply_to_message_id"] == 42
assert "working" in bot.send_calls[0]["text"]
assert "starting" in bot.send_calls[0]["text"]
assert "codex" in bot.send_calls[0]["text"]
assert len(bot.edit_calls) >= 1
assert session_id in bot.send_calls[-1]["text"]
assert "codex resume" in bot.send_calls[-1]["text"].lower()
+105 -16
View File
@@ -2,9 +2,18 @@ from typing import cast
from types import SimpleNamespace
from pathlib import Path
from takopi.markdown import render_markdown
from takopi.model import TakopiEvent
from takopi.render import ExecProgressRenderer, render_event_cli
from takopi.model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
from takopi.render import (
ExecProgressRenderer,
STATUS,
action_status,
assemble_markdown_parts,
format_elapsed,
format_file_change_title,
render_event_cli,
render_markdown,
shorten,
)
from tests.factories import (
action_completed,
action_started,
@@ -109,17 +118,21 @@ def test_file_change_renders_relative_paths_inside_cwd() -> None:
def test_progress_renderer_renders_progress_and_final() -> None:
r = ExecProgressRenderer(max_actions=5, resume_formatter=_format_resume)
r = ExecProgressRenderer(
max_actions=5, resume_formatter=_format_resume, engine="codex"
)
for evt in SAMPLE_EVENTS:
r.note_event(evt)
progress = r.render_progress(3.0)
assert progress.startswith("working · 3s · step 2")
progress_parts = r.render_progress_parts(3.0)
progress = assemble_markdown_parts(progress_parts)
assert progress.startswith("working · codex · 3s · step 2")
assert "✓ `bash -lc ls`" in progress
assert "`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" in progress
final = r.render_final(3.0, "answer", status="done")
assert final.startswith("done · 3s · step 2")
final_parts = r.render_final_parts(3.0, "answer", status="done")
final = assemble_markdown_parts(final_parts)
assert final.startswith("done · codex · 3s · step 2")
assert "answer" in final
assert final.rstrip().endswith(
"`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`"
@@ -127,7 +140,7 @@ def test_progress_renderer_renders_progress_and_final() -> None:
def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None:
r = ExecProgressRenderer(max_actions=3, command_width=20)
r = ExecProgressRenderer(max_actions=3, command_width=20, engine="codex")
events = [
action_completed(
f"item_{i}",
@@ -150,7 +163,7 @@ def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None:
def test_progress_renderer_renders_commands_in_markdown() -> None:
r = ExecProgressRenderer(max_actions=5, command_width=None)
r = ExecProgressRenderer(max_actions=5, command_width=None, engine="codex")
for i in (30, 31, 32):
r.note_event(
action_completed(
@@ -162,7 +175,7 @@ def test_progress_renderer_renders_commands_in_markdown() -> None:
)
)
md = r.render_progress(0.0)
md = assemble_markdown_parts(r.render_progress_parts(0.0))
text, _ = render_markdown(md)
assert "✓ echo 30" in text
assert "✓ echo 31" in text
@@ -170,7 +183,7 @@ def test_progress_renderer_renders_commands_in_markdown() -> None:
def test_progress_renderer_handles_duplicate_action_ids() -> None:
r = ExecProgressRenderer(max_actions=5)
r = ExecProgressRenderer(max_actions=5, engine="codex")
events = [
action_started("dup", "command", "echo first"),
action_completed(
@@ -201,7 +214,7 @@ def test_progress_renderer_handles_duplicate_action_ids() -> None:
def test_progress_renderer_collapses_action_updates() -> None:
r = ExecProgressRenderer(max_actions=5)
r = ExecProgressRenderer(max_actions=5, engine="codex")
events = [
action_started("a-1", "command", "echo one"),
action_started("a-1", "command", "echo two"),
@@ -234,11 +247,87 @@ def test_progress_renderer_deterministic_output() -> None:
detail={"exit_code": 0},
),
]
r1 = ExecProgressRenderer(max_actions=5)
r2 = ExecProgressRenderer(max_actions=5)
r1 = ExecProgressRenderer(max_actions=5, engine="codex")
r2 = ExecProgressRenderer(max_actions=5, engine="codex")
for evt in events:
r1.note_event(evt)
r2.note_event(evt)
assert r1.render_progress(1.0) == r2.render_progress(1.0)
assert assemble_markdown_parts(
r1.render_progress_parts(1.0)
) == assemble_markdown_parts(r2.render_progress_parts(1.0))
def test_format_elapsed_branches() -> None:
assert format_elapsed(3661) == "1h 01m"
assert format_elapsed(61) == "1m 01s"
assert format_elapsed(1.4) == "1s"
def test_shorten_and_action_status_branches() -> None:
assert shorten("hello", None) == "hello"
assert shorten("hello", 0) == ""
shortened = shorten("hello world", 6)
assert shortened.endswith("")
assert len(shortened) <= 6
action_ok = Action(id="ok", kind="command", title="x", detail={"exit_code": 0})
action_fail = Action(id="fail", kind="command", title="x", detail={"exit_code": 2})
assert action_status(action_ok, completed=False, ok=None) == STATUS["running"]
assert action_status(action_ok, completed=True, ok=None) == STATUS["done"]
assert action_status(action_fail, completed=True, ok=None) == STATUS["fail"]
def test_format_file_change_title_handles_overflow_and_invalid() -> None:
action = Action(
id="f",
kind="file_change",
title="files",
detail={
"changes": [
"bad",
{"path": ""},
{"path": "a", "kind": "add"},
{"path": "b"},
{"path": "c"},
{"path": "d"},
]
},
)
title = format_file_change_title(action, command_width=200)
assert title.startswith("files: ")
assert "…(" in title
fallback = format_file_change_title(
Action(id="empty", kind="file_change", title="all files"), command_width=50
)
assert fallback == "files: all files"
def test_render_event_cli_ignores_turn_actions() -> None:
event = ActionEvent(
engine="codex",
action=Action(id="turn", kind="turn", title="turn"),
phase="started",
ok=None,
)
assert render_event_cli(event) == []
def test_progress_renderer_ignores_missing_action_id_and_titles() -> None:
renderer = ExecProgressRenderer(engine="codex", show_title=True)
resume = ResumeToken(engine="codex", value="abc")
renderer.note_event(StartedEvent(engine="codex", resume=resume, title="Session"))
event = ActionEvent(
engine="codex",
action=Action(id="", kind="command", title="echo"),
phase="started",
ok=None,
)
assert renderer.note_event(event) is False
header = assemble_markdown_parts(renderer.render_progress_parts(0.0))
assert header.startswith("working (Session) · codex · 0s")
+1 -1
View File
@@ -1,4 +1,4 @@
from takopi.markdown import render_markdown
from takopi.render import render_markdown
def test_render_markdown_basic_entities() -> None: