fix(exec-bridge): restore sulguk rendering pipeline

This commit is contained in:
banteg
2025-12-29 11:57:14 +04:00
parent 489a50aec6
commit f464627864
4 changed files with 215 additions and 81 deletions
+1
View File
@@ -7,6 +7,7 @@ requires-python = ">=3.12"
dependencies = [
"httpx>=0.28.1",
"markdown-it-py",
"sulguk>=0.11.0",
"typer",
]
@@ -13,7 +13,6 @@ import time
from collections import deque
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from html import unescape
from logging.handlers import RotatingFileHandler
from typing import Any
@@ -22,7 +21,7 @@ import typer
from .config import load_telegram_config
from .constants import TELEGRAM_HARD_LIMIT
from .exec_render import ExecProgressRenderer, render_event_cli
from .rendering import render_to_html, strip_tags
from .rendering import render_markdown
from .telegram_client import TelegramClient
logger = logging.getLogger("exec_bridge")
@@ -89,6 +88,45 @@ def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str:
return text[: limit - 20] + "\n...(truncated)"
def truncate_for_telegram(text: str, limit: int) -> str:
"""
Truncate text to fit Telegram limits while preserving the trailing `resume: ...`
line (if present), otherwise preserving the last non-empty line.
"""
if len(text) <= limit:
return text
lines = text.splitlines()
tail_lines: list[str] | None = None
is_resume_tail = False
for i in range(len(lines) - 1, -1, -1):
line = lines[i]
if "resume" in line and UUID_PATTERN.search(line):
tail_lines = lines[i:]
is_resume_tail = True
break
if tail_lines is None:
for i in range(len(lines) - 1, -1, -1):
if lines[i].strip():
tail_lines = [lines[i]]
break
tail = "\n".join(tail_lines or []).strip("\n")
sep = "\n\n"
max_tail = limit if is_resume_tail else (limit // 4)
tail = tail[-max_tail:] if max_tail > 0 else ""
head_budget = limit - len(sep) - len(tail)
if head_budget <= 0:
return tail[-limit:] if tail else text[:limit]
head = text[:head_budget].rstrip()
return (head + sep + tail)[:limit]
async def _send_markdown(
bot: TelegramClient,
*,
@@ -97,24 +135,15 @@ async def _send_markdown(
reply_to_message_id: int | None = None,
disable_notification: bool = False,
) -> dict[str, Any]:
md = text
if len(md) > TELEGRAM_MARKDOWN_LIMIT:
md = md[: TELEGRAM_MARKDOWN_LIMIT - 20] + "\n…(truncated)"
rendered = render_to_html(md)
if len(rendered) > TELEGRAM_TEXT_LIMIT:
plain = _clamp_tg_text(unescape(strip_tags(rendered)))
return await bot.send_message(
chat_id=chat_id,
text=plain,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
)
rendered, entities = render_markdown(text)
if len(rendered) > TELEGRAM_MARKDOWN_LIMIT:
rendered = truncate_for_telegram(rendered, TELEGRAM_MARKDOWN_LIMIT)
entities = []
return await bot.send_message(
chat_id=chat_id,
text=rendered,
parse_mode="HTML",
entities=entities or None,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
)
@@ -382,17 +411,16 @@ async def _handle_message(
async def _edit_progress(md: str) -> None:
if progress_id is None:
return
parse_mode: str | None = "HTML"
rendered = render_to_html(md)
rendered, entities = render_markdown(md)
if len(rendered) > TELEGRAM_TEXT_LIMIT:
rendered = _clamp_tg_text(unescape(strip_tags(rendered)))
parse_mode = None
rendered = truncate_for_telegram(rendered, TELEGRAM_TEXT_LIMIT)
entities = []
try:
await cfg.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_id,
text=rendered,
parse_mode=parse_mode,
entities=entities or None,
)
except Exception as e:
logger.info(
@@ -404,11 +432,14 @@ async def _handle_message(
try:
initial_md = progress_renderer.render_progress(0.0)
initial_rendered = render_to_html(initial_md)
initial_rendered, initial_entities = render_markdown(initial_md)
if len(initial_rendered) > TELEGRAM_TEXT_LIMIT:
initial_rendered = truncate_for_telegram(initial_rendered, TELEGRAM_TEXT_LIMIT)
initial_entities = []
progress_msg = await cfg.bot.send_message(
chat_id=chat_id,
text=initial_rendered,
parse_mode="HTML",
entities=initial_entities or None,
reply_to_message_id=user_msg_id,
disable_notification=cfg.progress_silent,
)
@@ -474,7 +505,7 @@ async def _handle_message(
progress_renderer.render_final(elapsed, answer, status=status)
+ f"\n\nresume: `{session_id}`"
)
final_rendered = render_to_html(final_md)
final_rendered, final_entities = render_markdown(final_md)
can_edit_final = progress_id is not None and len(final_rendered) <= TELEGRAM_TEXT_LIMIT
if cfg.final_notify or not can_edit_final:
@@ -483,7 +514,7 @@ async def _handle_message(
chat_id=chat_id,
text=final_md,
reply_to_message_id=user_msg_id,
disable_notification=cfg.progress_silent,
disable_notification=False,
)
if progress_id is not None:
try:
@@ -495,7 +526,7 @@ async def _handle_message(
chat_id=chat_id,
message_id=progress_id,
text=final_rendered,
parse_mode="HTML",
entities=final_entities or None,
)
@@ -1,65 +1,26 @@
from __future__ import annotations
import re
from html import escape
from typing import Any
from markdown_it import MarkdownIt
from sulguk import transform_html
_md = MarkdownIt("commonmark", {"html": False, "breaks": True})
_CODE_CLASS_RE = re.compile(r'<code class="[^"]+">')
_IMG_ALT_RE = re.compile(r'<img[^>]*alt="([^"]*)"[^>]*/?>')
_IMG_RE = re.compile(r"<img[^>]*>")
_OL_OPEN_RE = re.compile(r'<ol(?: start="\d+")?>\s*')
_TAG_RE = re.compile(r"<[^>]+>")
_md = MarkdownIt("commonmark", {"html": False})
def strip_tags(html: str) -> str:
return _TAG_RE.sub("", html)
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _md.render(md or "")
rendered = transform_html(html)
text = re.sub(r"(?m)^(\s*)•", r"\1-", rendered.text)
def render_to_html(text: str) -> str:
"""
Render Markdown to Telegram-compatible HTML.
# FIX: Telegram requires MessageEntity.language (if present) to be a String.
entities: list[dict[str, Any]] = []
for e in rendered.entities:
d = dict(e)
if "language" in d and not isinstance(d["language"], str):
d.pop("language", None)
entities.append(d)
return text, entities
Telegram supports only a subset of HTML tags, so we post-process the
MarkdownIt output to flatten unsupported block tags (p/ul/li/etc) into
plain text with newlines and simple bullets.
"""
html = _md.render(text or "")
# Paragraphs and line breaks.
html = html.replace("<p>", "")
html = html.replace("<br />\n", "\n").replace("<br>\n", "\n")
html = html.replace("<br />", "\n").replace("<br>", "\n")
html = html.replace("</p>\n", "\n\n").replace("</p>", "\n\n")
# Lists -> "- " lines.
html = html.replace("<ul>\n", "").replace("</ul>\n", "")
html = _OL_OPEN_RE.sub("", html).replace("</ol>\n", "")
html = html.replace("<li>", "- ")
html = html.replace("</li>\n", "\n").replace("</li>", "\n")
# Headings -> bold line.
for level in range(1, 7):
html = html.replace(f"<h{level}>", "<b>")
html = html.replace(f"</h{level}>\n", "</b>\n\n").replace(
f"</h{level}>", "</b>\n\n"
)
# Code fences may include language class; Telegram doesn't need it.
html = _CODE_CLASS_RE.sub("<code>", html)
# Images are not supported: keep alt text if present.
html = _IMG_ALT_RE.sub(lambda m: escape(m.group(1) or ""), html)
html = _IMG_RE.sub("", html)
# <hr> isn't supported; render a separator line.
html = html.replace("<hr />", "\n----\n\n").replace("<hr>", "\n----\n\n")
# Flatten blockquotes.
html = html.replace("<blockquote>\n", "")
html = html.replace("</blockquote>\n", "\n\n").replace("</blockquote>", "\n\n")
html = re.sub(r"\n{3,}", "\n\n", html)
return html.strip()
+142 -1
View File
@@ -1,4 +1,6 @@
from codex_telegram_bridge.exec_bridge import extract_session_id
import asyncio
from codex_telegram_bridge.exec_bridge import extract_session_id, truncate_for_telegram
def test_extract_session_id_finds_uuid_v7() -> None:
@@ -7,3 +9,142 @@ def test_extract_session_id_finds_uuid_v7() -> None:
assert extract_session_id(text) == uuid
def test_truncate_for_telegram_preserves_resume_line() -> None:
uuid = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
md = ("x" * 10_000) + f"\nresume: `{uuid}`"
out = truncate_for_telegram(md, 400)
assert len(out) <= 400
assert uuid in out
assert out.rstrip().endswith(f"resume: `{uuid}`")
class _FakeBot:
def __init__(self) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[dict] = []
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> dict:
self.send_calls.append(
{
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
"disable_notification": disable_notification,
"entities": entities,
"parse_mode": parse_mode,
}
)
msg_id = self._next_id
self._next_id += 1
return {"message_id": msg_id}
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> dict:
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
}
)
return {"message_id": message_id}
async def delete_message(self, chat_id: int, message_id: int) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
class _FakeRunner:
def __init__(self, *, answer: str, saw_agent_message: bool = True) -> None:
self._answer = answer
self._saw_agent_message = saw_agent_message
async def run_serialized(self, *_args, **_kwargs) -> tuple[str, str, bool]:
return ("019b66fc-64c2-7a71-81cd-081c504cfeb2", self._answer, self._saw_agent_message)
def test_final_notify_sends_loud_final_message() -> None:
from codex_telegram_bridge.exec_bridge import BridgeConfig, _handle_message
bot = _FakeBot()
runner = _FakeRunner(answer="ok")
cfg = BridgeConfig(
bot=bot, # type: ignore[arg-type]
runner=runner, # type: ignore[arg-type]
chat_id=123,
ignore_backlog=True,
progress_edit_every_s=999.0,
progress_silent=True,
final_notify=True,
startup_msg="",
max_concurrency=1,
)
asyncio.run(
_handle_message(
cfg,
semaphore=asyncio.Semaphore(1),
chat_id=123,
user_msg_id=10,
text="hi",
resume_session=None,
)
)
assert len(bot.send_calls) == 2
assert bot.send_calls[0]["disable_notification"] is True
assert bot.send_calls[1]["disable_notification"] is False
def test_new_final_message_forces_notification_when_too_long_to_edit() -> None:
from codex_telegram_bridge.exec_bridge import BridgeConfig, _handle_message
bot = _FakeBot()
runner = _FakeRunner(answer="x" * 10_000)
cfg = BridgeConfig(
bot=bot, # type: ignore[arg-type]
runner=runner, # type: ignore[arg-type]
chat_id=123,
ignore_backlog=True,
progress_edit_every_s=999.0,
progress_silent=True,
final_notify=False,
startup_msg="",
max_concurrency=1,
)
asyncio.run(
_handle_message(
cfg,
semaphore=asyncio.Semaphore(1),
chat_id=123,
user_msg_id=10,
text="hi",
resume_session=None,
)
)
assert len(bot.send_calls) == 2
assert bot.send_calls[0]["disable_notification"] is True
assert bot.send_calls[1]["disable_notification"] is False