diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py index c5fef34..b77ec57 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py @@ -36,11 +36,20 @@ RESUME_LINE = re.compile( def extract_session_id(text: str | None) -> str | None: if not text: return None - if m := RESUME_LINE.search(text): - return m.group("id") + found = None + for match in RESUME_LINE.finditer(text): + found = match.group("id") + if found: + return found return None +def resolve_resume_session( + text: str | None, reply_text: str | None +) -> str | None: + return extract_session_id(text) or extract_session_id(reply_text) + + async def _drain_stderr(stderr: asyncio.StreamReader | None, tail: deque[str]) -> None: if stderr is None: return @@ -72,6 +81,7 @@ async def manage_subprocess(*args, **kwargs): TELEGRAM_MARKDOWN_LIMIT = 3500 +PROGRESS_EDIT_EVERY_S = 2.0 def _clamp_tg_text(text: str, limit: int = TELEGRAM_MARKDOWN_LIMIT) -> str: @@ -436,6 +446,8 @@ async def _handle_message( user_msg_id: int, text: str, resume_session: str | None, + clock: Callable[[], float] = time.monotonic, + progress_edit_every: float = PROGRESS_EDIT_EVERY_S, ) -> None: logger.debug( "[handle] incoming chat_id=%s message_id=%s resume=%r text=%s", @@ -444,7 +456,7 @@ async def _handle_message( resume_session, text, ) - started_at = time.monotonic() + started_at = clock() progress_renderer = ExecProgressRenderer(max_actions=5) progress_id: int | None = None @@ -498,7 +510,7 @@ async def _handle_message( disable_notification=True, ) progress_id = int(progress_msg["message_id"]) - last_edit_at = time.monotonic() + last_edit_at = clock() logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id) except Exception as e: logger.info( @@ -511,8 +523,8 @@ async def _handle_message( return if not progress_renderer.note_event(evt): return - now = time.monotonic() - if (now - last_edit_at) < 2.0: + now = clock() + if (now - last_edit_at) < progress_edit_every: return if edit_task is not None and not edit_task.done(): return @@ -547,7 +559,7 @@ async def _handle_message( await asyncio.gather(edit_task, return_exceptions=True) answer = answer or "(No agent_message captured from JSON stream.)" - elapsed = time.monotonic() - started_at + elapsed = clock() - started_at status = "done" if saw_agent_message else "error" final_md = ( progress_renderer.render_final(elapsed, answer, status=status) @@ -647,9 +659,8 @@ async def _run_main_loop(cfg: BridgeConfig) -> None: async for msg in poll_updates(cfg): text = msg["text"] user_msg_id = msg["message_id"] - resume_session = extract_session_id(text) r = msg.get("reply_to_message") or {} - resume_session = resume_session or extract_session_id(r.get("text")) + resume_session = resolve_resume_session(text, r.get("text")) await queue.put( (msg["chat"]["id"], user_msg_id, text, resume_session) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/logging.py b/codex_telegram_bridge/src/codex_telegram_bridge/logging.py index 1b4dad3..12b23c9 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/logging.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/logging.py @@ -37,4 +37,5 @@ def setup_logging(*, debug: bool = False) -> None: console.setLevel(logging.DEBUG if debug else logging.INFO) console.setFormatter(fmt) console.addFilter(redactor) + root_logger.addFilter(redactor) root_logger.addHandler(console) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py index 6eec012..30c24b9 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py @@ -2,11 +2,15 @@ from __future__ import annotations import asyncio import logging +from collections.abc import Awaitable, Callable from typing import Any import httpx +from .logging import RedactTokenFilter + logger = logging.getLogger(__name__) +logger.addFilter(RedactTokenFilter()) class TelegramAPIError(RuntimeError): @@ -24,14 +28,23 @@ class TelegramClient: Minimal Telegram Bot API client. """ - def __init__(self, token: str, timeout_s: float = 120) -> None: + def __init__( + self, + token: str, + timeout_s: float = 120, + client: httpx.AsyncClient | None = None, + sleep: Callable[[float], Awaitable[None]] = asyncio.sleep, + ) -> None: if not token: raise ValueError("Telegram token is empty") self._base = f"https://api.telegram.org/bot{token}" - self._client = httpx.AsyncClient(timeout=timeout_s) + self._client = client or httpx.AsyncClient(timeout=timeout_s) + self._owns_client = client is None + self._sleep = sleep async def close(self) -> None: - await self._client.aclose() + if self._owns_client: + await self._client.aclose() async def _post(self, method: str, json_data: dict[str, Any]) -> Any: try: @@ -50,7 +63,7 @@ class TelegramClient: logger.warning( "[telegram] 429 retry_after=%s method=%s", retry_after, method ) - await asyncio.sleep(retry_after) + await self._sleep(retry_after) return await self._post(method, json_data) raise TelegramAPIError(method, payload, resp.status_code) logger.debug("[telegram] response %s: %s", method, payload)