feat(telegram-bridge): share markdown render
This commit is contained in:
@@ -2,13 +2,17 @@ from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sqlite3
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from markdown_it import MarkdownIt
|
||||
from sulguk import transform_html
|
||||
|
||||
TELEGRAM_HARD_LIMIT = 4096
|
||||
DEFAULT_CHUNK_LEN = 3500 # leave room for formatting / safety
|
||||
@@ -49,6 +53,22 @@ def config_get(config: Dict[str, Any], key: str) -> Any:
|
||||
return None
|
||||
|
||||
|
||||
def render_markdown(md: str) -> Tuple[str, List[Dict[str, Any]]]:
|
||||
html = MarkdownIt("commonmark", {"html": False}).render(md or "")
|
||||
rendered = transform_html(html)
|
||||
|
||||
text = re.sub("(?m)^(\\s*)\u2022", r"\1-", rendered.text)
|
||||
|
||||
# FIX: Telegram requires MessageEntity.language (if present) to be a String.
|
||||
entities: List[Dict[str, Any]] = []
|
||||
for e in rendered.entities:
|
||||
d = dict(e)
|
||||
if "language" in d and not isinstance(d["language"], str):
|
||||
d.pop("language", None)
|
||||
entities.append(d)
|
||||
return text, entities
|
||||
|
||||
|
||||
def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]:
|
||||
"""
|
||||
Telegram hard limit is 4096 chars. Chunk at newlines when possible.
|
||||
@@ -84,6 +104,76 @@ def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]:
|
||||
return out
|
||||
|
||||
|
||||
def _chunk_text_with_indices(text: str, limit: int) -> List[Tuple[str, int, int]]:
|
||||
text = text or ""
|
||||
if len(text) <= limit:
|
||||
return [(text, 0, len(text))]
|
||||
|
||||
out: List[Tuple[str, int, int]] = []
|
||||
buf: List[str] = []
|
||||
size = 0
|
||||
buf_start = 0
|
||||
pos = 0
|
||||
|
||||
for line in text.splitlines(keepends=True):
|
||||
line_len = len(line)
|
||||
line_start = pos
|
||||
line_end = pos + line_len
|
||||
|
||||
if line_len > limit:
|
||||
if buf:
|
||||
out.append(("".join(buf), buf_start, line_start))
|
||||
buf, size = [], 0
|
||||
for i in range(0, line_len, limit):
|
||||
part = line[i : i + limit]
|
||||
out.append((part, line_start + i, line_start + i + len(part)))
|
||||
pos = line_end
|
||||
buf_start = pos
|
||||
continue
|
||||
|
||||
if size + line_len > limit:
|
||||
out.append(("".join(buf), buf_start, line_start))
|
||||
buf = [line]
|
||||
size = line_len
|
||||
buf_start = line_start
|
||||
else:
|
||||
if not buf:
|
||||
buf_start = line_start
|
||||
buf.append(line)
|
||||
size += line_len
|
||||
|
||||
pos = line_end
|
||||
|
||||
if buf:
|
||||
out.append(("".join(buf), buf_start, pos))
|
||||
return out
|
||||
|
||||
|
||||
def _slice_entities(entities: List[Dict[str, Any]], start: int, end: int) -> List[Dict[str, Any]]:
|
||||
out: List[Dict[str, Any]] = []
|
||||
for ent in entities:
|
||||
try:
|
||||
ent_start = int(ent.get("offset", 0))
|
||||
ent_len = int(ent.get("length", 0))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
if ent_len <= 0:
|
||||
continue
|
||||
ent_end = ent_start + ent_len
|
||||
if ent_end <= start or ent_start >= end:
|
||||
continue
|
||||
new_start = max(ent_start, start)
|
||||
new_end = min(ent_end, end)
|
||||
new_len = new_end - new_start
|
||||
if new_len <= 0:
|
||||
continue
|
||||
new_ent = dict(ent)
|
||||
new_ent["offset"] = new_start - start
|
||||
new_ent["length"] = new_len
|
||||
out.append(new_ent)
|
||||
return out
|
||||
|
||||
|
||||
class TelegramClient:
|
||||
"""
|
||||
Minimal Telegram Bot API client using standard library (no requests dependency).
|
||||
@@ -136,6 +226,7 @@ class TelegramClient:
|
||||
text: str,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
disable_notification: bool = False,
|
||||
entities: Optional[List[Dict[str, Any]]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
if len(text) > TELEGRAM_HARD_LIMIT:
|
||||
raise ValueError("send_message received too-long text; chunk it first")
|
||||
@@ -146,6 +237,8 @@ class TelegramClient:
|
||||
}
|
||||
if reply_to_message_id is not None:
|
||||
params["reply_to_message_id"] = reply_to_message_id
|
||||
if entities is not None:
|
||||
params["entities"] = entities
|
||||
return self._call("sendMessage", params)
|
||||
|
||||
def send_message_chunked(
|
||||
@@ -168,6 +261,29 @@ class TelegramClient:
|
||||
sent.append(msg)
|
||||
return sent
|
||||
|
||||
def send_message_markdown_chunked(
|
||||
self,
|
||||
chat_id: int,
|
||||
text: str,
|
||||
reply_to_message_id: Optional[int] = None,
|
||||
disable_notification: bool = False,
|
||||
chunk_len: int = DEFAULT_CHUNK_LEN,
|
||||
) -> List[Dict[str, Any]]:
|
||||
sent: List[Dict[str, Any]] = []
|
||||
rendered_text, entities = render_markdown(text)
|
||||
chunks = _chunk_text_with_indices(rendered_text, limit=chunk_len)
|
||||
for i, (chunk, start, end) in enumerate(chunks):
|
||||
chunk_entities = _slice_entities(entities, start, end) if entities else None
|
||||
msg = self.send_message(
|
||||
chat_id=chat_id,
|
||||
text=chunk,
|
||||
reply_to_message_id=(reply_to_message_id if i == 0 else None),
|
||||
disable_notification=disable_notification,
|
||||
entities=chunk_entities,
|
||||
)
|
||||
sent.append(msg)
|
||||
return sent
|
||||
|
||||
def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict[str, Any]:
|
||||
params: Dict[str, Any] = {
|
||||
"chat_id": chat_id,
|
||||
|
||||
Reference in New Issue
Block a user