refactor: use requests for telegram client

This commit is contained in:
banteg
2025-12-29 03:26:16 +04:00
parent c2c87a0d06
commit d804110369
3 changed files with 99 additions and 71 deletions
+1
View File
@@ -6,6 +6,7 @@ readme = "readme.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"markdown-it-py", "markdown-it-py",
"requests",
"sulguk", "sulguk",
"typer", "typer",
] ]
@@ -1,8 +1,3 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.12"
# dependencies = ["markdown-it-py", "sulguk", "typer"]
# ///
from __future__ import annotations from __future__ import annotations
import json import json
@@ -31,6 +26,7 @@ from .telegram_client import TelegramClient
logger = logging.getLogger("exec_bridge") logger = logging.getLogger("exec_bridge")
def setup_logging(log_file: str | None) -> None: def setup_logging(log_file: str | None) -> None:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
logger.handlers.clear() logger.handlers.clear()
@@ -56,12 +52,6 @@ def setup_logging(log_file: str | None) -> None:
logger.debug("[debug] file logger initialized path=%r", log_file) logger.debug("[debug] file logger initialized path=%r", log_file)
def _one_line(text: str | None) -> str:
if text is None:
return "None"
return text.replace("\r", "\\r").replace("\n", "\\n")
TELEGRAM_TEXT_LIMIT = TELEGRAM_HARD_LIMIT TELEGRAM_TEXT_LIMIT = TELEGRAM_HARD_LIMIT
TELEGRAM_MARKDOWN_LIMIT = 3500 TELEGRAM_MARKDOWN_LIMIT = 3500
ELLIPSIS = "" ELLIPSIS = ""
@@ -72,6 +62,7 @@ def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str:
return text return text
return text[: limit - 20] + "\n...(truncated)" return text[: limit - 20] + "\n...(truncated)"
def _send_markdown( def _send_markdown(
bot: TelegramClient, bot: TelegramClient,
*, *,
@@ -129,7 +120,9 @@ class ProgressEditor:
text = _clamp_tg_text(text) text = _clamp_tg_text(text)
with self._lock: with self._lock:
self._pending = (text, entities) self._pending = (text, entities)
logger.debug("[progress] set pending len=%s entities=%s", len(text), bool(entities)) logger.debug(
"[progress] set pending len=%s entities=%s", len(text), bool(entities)
)
def set_markdown(self, text: str) -> None: def set_markdown(self, text: str) -> None:
rendered_text, entities = render_markdown(text) rendered_text, entities = render_markdown(text)
@@ -166,7 +159,10 @@ class ProgressEditor:
to_send: tuple[str, list[dict[str, Any]] | None] | None = None to_send: tuple[str, list[dict[str, Any]] | None] | None = None
now = time.monotonic() now = time.monotonic()
with self._lock: with self._lock:
if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s: if (
self._pending is not None
and (now - self._last_edit_at) >= self.edit_every_s
):
if self._pending != self._last_sent: if self._pending != self._last_sent:
to_send = self._pending to_send = self._pending
self._last_sent = self._pending self._last_sent = self._pending
@@ -186,7 +182,9 @@ class CodexExecRunner:
- resume: codex exec --json ... resume <SESSION_ID> - - resume: codex exec --json ... resume <SESSION_ID> -
""" """
def __init__(self, codex_cmd: str, workspace: str | None, extra_args: list[str]) -> None: def __init__(
self, codex_cmd: str, workspace: str | None, extra_args: list[str]
) -> None:
self.codex_cmd = codex_cmd self.codex_cmd = codex_cmd
self.workspace = workspace self.workspace = workspace
self.extra_args = extra_args self.extra_args = extra_args
@@ -210,7 +208,9 @@ class CodexExecRunner:
""" """
Returns (session_id, final_agent_message_text) Returns (session_id, final_agent_message_text)
""" """
logger.info("[codex] start run session_id=%r workspace=%r", session_id, self.workspace) logger.info(
"[codex] start run session_id=%r workspace=%r", session_id, self.workspace
)
args = [self.codex_cmd, "exec", "--json"] args = [self.codex_cmd, "exec", "--json"]
args.extend(self.extra_args) args.extend(self.extra_args)
if self.workspace: if self.workspace:
@@ -277,7 +277,9 @@ class CodexExecRunner:
if evt.get("type") == "item.completed": if evt.get("type") == "item.completed":
item = evt.get("item") or {} item = evt.get("item") or {}
if item.get("type") == "agent_message" and isinstance(item.get("text"), str): if item.get("type") == "agent_message" and isinstance(
item.get("text"), str
):
last_agent_text = item["text"] last_agent_text = item["text"]
saw_agent_message = True saw_agent_message = True
@@ -290,10 +292,16 @@ class CodexExecRunner:
raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}") raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}")
if not found_session: if not found_session:
raise RuntimeError("codex exec finished but no session_id/thread_id was captured") raise RuntimeError(
"codex exec finished but no session_id/thread_id was captured"
)
logger.info("[codex] done run session_id=%r", found_session) logger.info("[codex] done run session_id=%r", found_session)
return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)"), saw_agent_message return (
found_session,
(last_agent_text or "(No agent_message captured from JSON stream.)"),
saw_agent_message,
)
def run_serialized( def run_serialized(
self, self,
@@ -400,7 +408,9 @@ def run(
extra_args.extend(["-c", "notify=[]"]) extra_args.extend(["-c", "notify=[]"])
bot = TelegramClient(token) bot = TelegramClient(token)
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) runner = CodexExecRunner(
codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args
)
max_workers = config.get("max_workers") max_workers = config.get("max_workers")
pool = ThreadPoolExecutor(max_workers=max_workers or 4) pool = ThreadPoolExecutor(max_workers=max_workers or 4)
@@ -409,7 +419,9 @@ def run(
if ignore_backlog: if ignore_backlog:
try: try:
updates = bot.get_updates(offset=offset, timeout_s=0, allowed_updates=["message"]) updates = bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
except Exception as e: except Exception as e:
logger.info("[startup] backlog drain failed: %s", e) logger.info("[startup] backlog drain failed: %s", e)
updates = [] updates = []
@@ -425,11 +437,17 @@ def run(
bot.send_message(chat_id=chat_id, text=startup_msg) bot.send_message(chat_id=chat_id, text=startup_msg)
logger.info("[startup] sent startup message to chat_id=%s", chat_id) logger.info("[startup] sent startup message to chat_id=%s", chat_id)
except Exception as e: except Exception as e:
logger.info("[startup] failed to send startup message to chat_id=%s: %s", chat_id, e) logger.info(
"[startup] failed to send startup message to chat_id=%s: %s",
chat_id,
e,
)
else: else:
logger.info("[startup] no chat_id configured; skipping startup message") logger.info("[startup] no chat_id configured; skipping startup message")
def handle(chat_id: int, user_msg_id: int, text: str, resume_session: str | None) -> None: def handle(
chat_id: int, user_msg_id: int, text: str, resume_session: str | None
) -> None:
logger.info( logger.info(
"[handle] start chat_id=%s user_msg_id=%s resume_session=%r", "[handle] start chat_id=%s user_msg_id=%s resume_session=%r",
chat_id, chat_id,
@@ -462,9 +480,13 @@ def run(
disable_notification=silent_progress, disable_notification=silent_progress,
) )
progress_id = int(progress_msg["message_id"]) progress_id = int(progress_msg["message_id"])
logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id) logger.debug(
"[progress] sent chat_id=%s message_id=%s", chat_id, progress_id
)
except Exception as e: except Exception as e:
logger.info("[handle] failed to send progress message chat_id=%s: %s", chat_id, e) logger.info(
"[handle] failed to send progress message chat_id=%s: %s", chat_id, e
)
if progress_id is not None: if progress_id is not None:
progress = ProgressEditor( progress = ProgressEditor(
@@ -511,7 +533,9 @@ def run(
err = _clamp_tg_text(f"Error:\n{e}") err = _clamp_tg_text(f"Error:\n{e}")
if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT:
try: try:
bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err) bot.edit_message_text(
chat_id=chat_id, message_id=progress_id, text=err
)
logger.info( logger.info(
"[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s",
chat_id, chat_id,
@@ -523,7 +547,9 @@ def run(
except Exception as ee: except Exception as ee:
logger.info("[handle] failed to edit progress into error: %s", ee) logger.info("[handle] failed to edit progress into error: %s", ee)
_send_markdown(bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) _send_markdown(
bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id
)
logger.info( logger.info(
"[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s",
chat_id, chat_id,
@@ -541,10 +567,14 @@ def run(
final_md = progress_renderer.render_final(elapsed, answer, status=status) final_md = progress_renderer.render_final(elapsed, answer, status=status)
final_md = final_md + f"\n\nresume: `{session_id}`" final_md = final_md + f"\n\nresume: `{session_id}`"
final_text, final_entities = render_markdown(final_md) final_text, final_entities = render_markdown(final_md)
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT can_edit_final = (
progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
)
if loud_final or not can_edit_final: if loud_final or not can_edit_final:
_send_markdown(bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id) _send_markdown(
bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id
)
if progress_id is not None: if progress_id is not None:
try: try:
bot.delete_message(chat_id=chat_id, message_id=progress_id) bot.delete_message(chat_id=chat_id, message_id=progress_id)
@@ -572,7 +602,9 @@ def run(
while True: while True:
try: try:
updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) updates = bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
except Exception as e: except Exception as e:
logger.info("[telegram] get_updates error: %s", e) logger.info("[telegram] get_updates error: %s", e)
time.sleep(2.0) time.sleep(2.0)
@@ -592,7 +624,7 @@ def run(
from_bot, from_bot,
msg_text is not None, msg_text is not None,
reply_to, reply_to,
_one_line(msg_text), repr(msg_text),
) )
if "text" not in msg: if "text" not in msg:
logger.info( logger.info(
@@ -603,7 +635,11 @@ def run(
continue continue
if allowed is not None and int(chat_id) not in allowed: if allowed is not None and int(chat_id) not in allowed:
logger.info("[telegram] rejected by ACL chat_id=%s allowed=%s", chat_id, sorted(allowed)) logger.info(
"[telegram] rejected by ACL chat_id=%s allowed=%s",
chat_id,
sorted(allowed),
)
continue continue
if msg.get("from", {}).get("is_bot"): if msg.get("from", {}).get("is_bot"):
@@ -620,7 +656,7 @@ def run(
"[telegram] accepted message chat_id=%s user_msg_id=%s text=%s", "[telegram] accepted message chat_id=%s user_msg_id=%s text=%s",
chat_id, chat_id,
user_msg_id, user_msg_id,
_one_line(text), repr(text),
) )
uuid_re = re.compile( uuid_re = re.compile(
@@ -1,39 +1,27 @@
from __future__ import annotations from __future__ import annotations
import json import requests
import urllib.error
import urllib.request
from typing import Any
class TelegramClient: class TelegramClient:
""" """
Minimal Telegram Bot API client using standard library (no requests dependency). Minimal Telegram Bot API client.
""" """
def __init__(self, token: str, timeout_s: int = 120) -> None: def __init__(self, token: str, timeout_s: float = 120) -> None:
if not token: if not token:
raise ValueError("Telegram token is empty") raise ValueError("Telegram token is empty")
self._base = f"https://api.telegram.org/bot{token}" self._base = f"https://api.telegram.org/bot{token}"
self._timeout_s = timeout_s self._timeout_s = timeout_s
def _call(self, method: str, params: dict[str, Any]) -> Any: def _call(self, method: str, params: dict) -> object:
url = f"{self._base}/{method}" resp = requests.post(
data = json.dumps(params).encode("utf-8") f"{self._base}/{method}",
req = urllib.request.Request( json=params,
url, timeout=self._timeout_s,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
) )
try: resp.raise_for_status()
with urllib.request.urlopen(req, timeout=self._timeout_s) as resp: payload = resp.json()
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Telegram URLError: {e}") from e
if not payload.get("ok"): if not payload.get("ok"):
raise RuntimeError(f"Telegram API error: {payload}") raise RuntimeError(f"Telegram API error: {payload}")
return payload["result"] return payload["result"]
@@ -43,13 +31,13 @@ class TelegramClient:
offset: int | None, offset: int | None,
timeout_s: int = 50, timeout_s: int = 50,
allowed_updates: list[str] | None = None, allowed_updates: list[str] | None = None,
) -> list[dict[str, Any]]: ) -> list[dict]:
params: dict[str, Any] = {"timeout": timeout_s} params: dict = {"timeout": timeout_s}
if offset is not None: if offset is not None:
params["offset"] = offset params["offset"] = offset
if allowed_updates is not None: if allowed_updates is not None:
params["allowed_updates"] = allowed_updates params["allowed_updates"] = allowed_updates
return self._call("getUpdates", params) return self._call("getUpdates", params) # type: ignore[return-value]
def send_message( def send_message(
self, self,
@@ -57,9 +45,9 @@ class TelegramClient:
text: str, text: str,
reply_to_message_id: int | None = None, reply_to_message_id: int | None = None,
disable_notification: bool | None = False, disable_notification: bool | None = False,
entities: list[dict[str, Any]] | None = None, entities: list[dict] | None = None,
) -> dict[str, Any]: ) -> dict:
params: dict[str, Any] = { params: dict = {
"chat_id": chat_id, "chat_id": chat_id,
"text": text, "text": text,
} }
@@ -69,28 +57,31 @@ class TelegramClient:
params["reply_to_message_id"] = reply_to_message_id params["reply_to_message_id"] = reply_to_message_id
if entities is not None: if entities is not None:
params["entities"] = entities params["entities"] = entities
return self._call("sendMessage", params) return self._call("sendMessage", params) # type: ignore[return-value]
def edit_message_text( def edit_message_text(
self, self,
chat_id: int, chat_id: int,
message_id: int, message_id: int,
text: str, text: str,
entities: list[dict[str, Any]] | None = None, entities: list[dict] | None = None,
) -> dict[str, Any]: ) -> dict:
params: dict[str, Any] = { params: dict = {
"chat_id": chat_id, "chat_id": chat_id,
"message_id": message_id, "message_id": message_id,
"text": text, "text": text,
} }
if entities is not None: if entities is not None:
params["entities"] = entities params["entities"] = entities
return self._call("editMessageText", params) return self._call("editMessageText", params) # type: ignore[return-value]
def delete_message(self, chat_id: int, message_id: int) -> bool: def delete_message(self, chat_id: int, message_id: int) -> bool:
params: dict[str, Any] = { res = self._call(
"chat_id": chat_id, "deleteMessage",
"message_id": message_id, {
} "chat_id": chat_id,
res = self._call("deleteMessage", params) "message_id": message_id,
},
)
return bool(res) return bool(res)