diff --git a/codex_telegram_bridge/pyproject.toml b/codex_telegram_bridge/pyproject.toml index c6895a5..0e38f53 100644 --- a/codex_telegram_bridge/pyproject.toml +++ b/codex_telegram_bridge/pyproject.toml @@ -6,6 +6,7 @@ readme = "readme.md" requires-python = ">=3.12" dependencies = [ "markdown-it-py", + "requests", "sulguk", "typer", ] diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py index 5c15c01..0a4e630 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py @@ -1,8 +1,3 @@ -#!/usr/bin/env python3 -# /// script -# requires-python = ">=3.12" -# dependencies = ["markdown-it-py", "sulguk", "typer"] -# /// from __future__ import annotations import json @@ -31,6 +26,7 @@ from .telegram_client import TelegramClient logger = logging.getLogger("exec_bridge") + def setup_logging(log_file: str | None) -> None: logger.setLevel(logging.DEBUG) logger.handlers.clear() @@ -56,12 +52,6 @@ def setup_logging(log_file: str | None) -> None: logger.debug("[debug] file logger initialized path=%r", log_file) -def _one_line(text: str | None) -> str: - if text is None: - return "None" - return text.replace("\r", "\\r").replace("\n", "\\n") - - TELEGRAM_TEXT_LIMIT = TELEGRAM_HARD_LIMIT TELEGRAM_MARKDOWN_LIMIT = 3500 ELLIPSIS = "…" @@ -72,6 +62,7 @@ def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str: return text return text[: limit - 20] + "\n...(truncated)" + def _send_markdown( bot: TelegramClient, *, @@ -129,7 +120,9 @@ class ProgressEditor: text = _clamp_tg_text(text) with self._lock: self._pending = (text, entities) - logger.debug("[progress] set pending len=%s entities=%s", len(text), bool(entities)) + logger.debug( + "[progress] set pending len=%s entities=%s", len(text), bool(entities) + ) def set_markdown(self, text: str) -> None: rendered_text, entities = render_markdown(text) @@ -166,7 +159,10 @@ class ProgressEditor: to_send: tuple[str, list[dict[str, Any]] | None] | None = None now = time.monotonic() with self._lock: - if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s: + if ( + self._pending is not None + and (now - self._last_edit_at) >= self.edit_every_s + ): if self._pending != self._last_sent: to_send = self._pending self._last_sent = self._pending @@ -186,7 +182,9 @@ class CodexExecRunner: - resume: codex exec --json ... resume - """ - def __init__(self, codex_cmd: str, workspace: str | None, extra_args: list[str]) -> None: + def __init__( + self, codex_cmd: str, workspace: str | None, extra_args: list[str] + ) -> None: self.codex_cmd = codex_cmd self.workspace = workspace self.extra_args = extra_args @@ -210,7 +208,9 @@ class CodexExecRunner: """ Returns (session_id, final_agent_message_text) """ - logger.info("[codex] start run session_id=%r workspace=%r", session_id, self.workspace) + logger.info( + "[codex] start run session_id=%r workspace=%r", session_id, self.workspace + ) args = [self.codex_cmd, "exec", "--json"] args.extend(self.extra_args) if self.workspace: @@ -277,7 +277,9 @@ class CodexExecRunner: if evt.get("type") == "item.completed": item = evt.get("item") or {} - if item.get("type") == "agent_message" and isinstance(item.get("text"), str): + if item.get("type") == "agent_message" and isinstance( + item.get("text"), str + ): last_agent_text = item["text"] saw_agent_message = True @@ -290,10 +292,16 @@ class CodexExecRunner: raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}") if not found_session: - raise RuntimeError("codex exec finished but no session_id/thread_id was captured") + raise RuntimeError( + "codex exec finished but no session_id/thread_id was captured" + ) logger.info("[codex] done run session_id=%r", found_session) - return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)"), saw_agent_message + return ( + found_session, + (last_agent_text or "(No agent_message captured from JSON stream.)"), + saw_agent_message, + ) def run_serialized( self, @@ -400,7 +408,9 @@ def run( extra_args.extend(["-c", "notify=[]"]) bot = TelegramClient(token) - runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) + runner = CodexExecRunner( + codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args + ) max_workers = config.get("max_workers") pool = ThreadPoolExecutor(max_workers=max_workers or 4) @@ -409,7 +419,9 @@ def run( if ignore_backlog: try: - updates = bot.get_updates(offset=offset, timeout_s=0, allowed_updates=["message"]) + updates = bot.get_updates( + offset=offset, timeout_s=0, allowed_updates=["message"] + ) except Exception as e: logger.info("[startup] backlog drain failed: %s", e) updates = [] @@ -425,11 +437,17 @@ def run( bot.send_message(chat_id=chat_id, text=startup_msg) logger.info("[startup] sent startup message to chat_id=%s", chat_id) except Exception as e: - logger.info("[startup] failed to send startup message to chat_id=%s: %s", chat_id, e) + logger.info( + "[startup] failed to send startup message to chat_id=%s: %s", + chat_id, + e, + ) else: logger.info("[startup] no chat_id configured; skipping startup message") - def handle(chat_id: int, user_msg_id: int, text: str, resume_session: str | None) -> None: + def handle( + chat_id: int, user_msg_id: int, text: str, resume_session: str | None + ) -> None: logger.info( "[handle] start chat_id=%s user_msg_id=%s resume_session=%r", chat_id, @@ -462,9 +480,13 @@ def run( disable_notification=silent_progress, ) progress_id = int(progress_msg["message_id"]) - logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id) + logger.debug( + "[progress] sent chat_id=%s message_id=%s", chat_id, progress_id + ) except Exception as e: - logger.info("[handle] failed to send progress message chat_id=%s: %s", chat_id, e) + logger.info( + "[handle] failed to send progress message chat_id=%s: %s", chat_id, e + ) if progress_id is not None: progress = ProgressEditor( @@ -511,7 +533,9 @@ def run( err = _clamp_tg_text(f"Error:\n{e}") if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: try: - bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err) + bot.edit_message_text( + chat_id=chat_id, message_id=progress_id, text=err + ) logger.info( "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", chat_id, @@ -523,7 +547,9 @@ def run( except Exception as ee: logger.info("[handle] failed to edit progress into error: %s", ee) - _send_markdown(bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) + _send_markdown( + bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id + ) logger.info( "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", chat_id, @@ -541,10 +567,14 @@ def run( final_md = progress_renderer.render_final(elapsed, answer, status=status) final_md = final_md + f"\n\nresume: `{session_id}`" final_text, final_entities = render_markdown(final_md) - can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT + can_edit_final = ( + progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT + ) if loud_final or not can_edit_final: - _send_markdown(bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id) + _send_markdown( + bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id + ) if progress_id is not None: try: bot.delete_message(chat_id=chat_id, message_id=progress_id) @@ -572,7 +602,9 @@ def run( while True: try: - updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) + updates = bot.get_updates( + offset=offset, timeout_s=50, allowed_updates=["message"] + ) except Exception as e: logger.info("[telegram] get_updates error: %s", e) time.sleep(2.0) @@ -592,7 +624,7 @@ def run( from_bot, msg_text is not None, reply_to, - _one_line(msg_text), + repr(msg_text), ) if "text" not in msg: logger.info( @@ -603,7 +635,11 @@ def run( continue if allowed is not None and int(chat_id) not in allowed: - logger.info("[telegram] rejected by ACL chat_id=%s allowed=%s", chat_id, sorted(allowed)) + logger.info( + "[telegram] rejected by ACL chat_id=%s allowed=%s", + chat_id, + sorted(allowed), + ) continue if msg.get("from", {}).get("is_bot"): @@ -620,7 +656,7 @@ def run( "[telegram] accepted message chat_id=%s user_msg_id=%s text=%s", chat_id, user_msg_id, - _one_line(text), + repr(text), ) uuid_re = re.compile( diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py index 2b4c1c2..1e7e9c1 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py @@ -1,39 +1,27 @@ from __future__ import annotations -import json -import urllib.error -import urllib.request -from typing import Any +import requests + class TelegramClient: """ - Minimal Telegram Bot API client using standard library (no requests dependency). + Minimal Telegram Bot API client. """ - def __init__(self, token: str, timeout_s: int = 120) -> None: + def __init__(self, token: str, timeout_s: float = 120) -> None: if not token: raise ValueError("Telegram token is empty") self._base = f"https://api.telegram.org/bot{token}" self._timeout_s = timeout_s - def _call(self, method: str, params: dict[str, Any]) -> Any: - url = f"{self._base}/{method}" - data = json.dumps(params).encode("utf-8") - req = urllib.request.Request( - url, - data=data, - headers={"Content-Type": "application/json"}, - method="POST", + def _call(self, method: str, params: dict) -> object: + resp = requests.post( + f"{self._base}/{method}", + json=params, + timeout=self._timeout_s, ) - try: - with urllib.request.urlopen(req, timeout=self._timeout_s) as resp: - payload = json.loads(resp.read().decode("utf-8")) - except urllib.error.HTTPError as e: - body = e.read().decode("utf-8", errors="replace") - raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e - except urllib.error.URLError as e: - raise RuntimeError(f"Telegram URLError: {e}") from e - + resp.raise_for_status() + payload = resp.json() if not payload.get("ok"): raise RuntimeError(f"Telegram API error: {payload}") return payload["result"] @@ -43,13 +31,13 @@ class TelegramClient: offset: int | None, timeout_s: int = 50, allowed_updates: list[str] | None = None, - ) -> list[dict[str, Any]]: - params: dict[str, Any] = {"timeout": timeout_s} + ) -> list[dict]: + params: dict = {"timeout": timeout_s} if offset is not None: params["offset"] = offset if allowed_updates is not None: params["allowed_updates"] = allowed_updates - return self._call("getUpdates", params) + return self._call("getUpdates", params) # type: ignore[return-value] def send_message( self, @@ -57,9 +45,9 @@ class TelegramClient: text: str, reply_to_message_id: int | None = None, disable_notification: bool | None = False, - entities: list[dict[str, Any]] | None = None, - ) -> dict[str, Any]: - params: dict[str, Any] = { + entities: list[dict] | None = None, + ) -> dict: + params: dict = { "chat_id": chat_id, "text": text, } @@ -69,28 +57,31 @@ class TelegramClient: params["reply_to_message_id"] = reply_to_message_id if entities is not None: params["entities"] = entities - return self._call("sendMessage", params) + return self._call("sendMessage", params) # type: ignore[return-value] def edit_message_text( self, chat_id: int, message_id: int, text: str, - entities: list[dict[str, Any]] | None = None, - ) -> dict[str, Any]: - params: dict[str, Any] = { + entities: list[dict] | None = None, + ) -> dict: + params: dict = { "chat_id": chat_id, "message_id": message_id, "text": text, } if entities is not None: params["entities"] = entities - return self._call("editMessageText", params) + return self._call("editMessageText", params) # type: ignore[return-value] def delete_message(self, chat_id: int, message_id: int) -> bool: - params: dict[str, Any] = { - "chat_id": chat_id, - "message_id": message_id, - } - res = self._call("deleteMessage", params) + res = self._call( + "deleteMessage", + { + "chat_id": chat_id, + "message_id": message_id, + }, + ) return bool(res) +