diff --git a/codex/codex_telegram_bridge/bridge_common.py b/codex/codex_telegram_bridge/bridge_common.py index 275ba1b..6662654 100644 --- a/codex/codex_telegram_bridge/bridge_common.py +++ b/codex/codex_telegram_bridge/bridge_common.py @@ -225,7 +225,7 @@ class TelegramClient: chat_id: int, text: str, reply_to_message_id: Optional[int] = None, - disable_notification: bool = False, + disable_notification: Optional[bool] = False, entities: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: if len(text) > TELEGRAM_HARD_LIMIT: @@ -233,14 +233,41 @@ class TelegramClient: params: Dict[str, Any] = { "chat_id": chat_id, "text": text, - "disable_notification": disable_notification, } + if disable_notification is not None: + params["disable_notification"] = disable_notification if reply_to_message_id is not None: params["reply_to_message_id"] = reply_to_message_id if entities is not None: params["entities"] = entities return self._call("sendMessage", params) + def edit_message_text( + self, + chat_id: int, + message_id: int, + text: str, + entities: Optional[List[Dict[str, Any]]] = None, + ) -> Dict[str, Any]: + if len(text) > TELEGRAM_HARD_LIMIT: + raise ValueError("edit_message_text received too-long text") + params: Dict[str, Any] = { + "chat_id": chat_id, + "message_id": message_id, + "text": text, + } + if entities is not None: + params["entities"] = entities + return self._call("editMessageText", params) + + def delete_message(self, chat_id: int, message_id: int) -> bool: + params: Dict[str, Any] = { + "chat_id": chat_id, + "message_id": message_id, + } + res = self._call("deleteMessage", params) + return bool(res) + def send_message_chunked( self, chat_id: int, diff --git a/codex/codex_telegram_bridge/exec_bridge.py b/codex/codex_telegram_bridge/exec_bridge.py index b19c0cc..5ed8c30 100644 --- a/codex/codex_telegram_bridge/exec_bridge.py +++ b/codex/codex_telegram_bridge/exec_bridge.py @@ -12,13 +12,15 @@ import subprocess import threading import time from concurrent.futures import ThreadPoolExecutor -from typing import Any, Dict, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple from bridge_common import ( TelegramClient, RouteStore, + TELEGRAM_HARD_LIMIT, config_get, load_telegram_config, + render_markdown, resolve_chat_ids, ) @@ -36,6 +38,101 @@ def _one_line(text: Optional[str]) -> str: return text.replace("\r", "\\r").replace("\n", "\\n") +TELEGRAM_TEXT_LIMIT = TELEGRAM_HARD_LIMIT + + +def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str: + if len(text) <= limit: + return text + return text[: limit - 20] + "\n...(truncated)" + + +def _summarize_item(item: Dict[str, Any]) -> str: + item_type = item.get("type") + if isinstance(item_type, str): + if item_type == "agent_message" and isinstance(item.get("text"), str): + snippet = item["text"].strip().replace("\n", " ") + if len(snippet) > 140: + snippet = snippet[:140] + "..." + return f"agent_message: {snippet}" + name = item.get("name") or item.get("tool_name") or item.get("id") + if isinstance(name, str): + return f"{item_type}: {name}" + return item_type + return "item.completed" + + +class ProgressEditor: + def __init__( + self, + bot: TelegramClient, + chat_id: int, + message_id: int, + edit_every_s: float, + ) -> None: + self.bot = bot + self.chat_id = chat_id + self.message_id = message_id + self.edit_every_s = edit_every_s + + self._lock = threading.Lock() + self._pending: Optional[str] = None + self._last_sent: Optional[str] = None + self._last_edit_at = 0.0 + + self._stop = threading.Event() + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def set(self, text: str) -> None: + text = _clamp_tg_text(text) + with self._lock: + self._pending = text + + def stop(self) -> None: + self._stop.set() + self._thread.join(timeout=1.0) + + def _edit(self, text: str) -> None: + try: + self.bot.edit_message_text( + chat_id=self.chat_id, + message_id=self.message_id, + text=text, + ) + except Exception as e: + log( + "[progress] edit failed " + f"chat_id={self.chat_id} message_id={self.message_id}: {e}" + ) + + def _run(self) -> None: + while not self._stop.is_set(): + to_send: Optional[str] = None + now = time.monotonic() + with self._lock: + if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s: + if self._pending != self._last_sent: + to_send = self._pending + self._last_sent = self._pending + self._last_edit_at = now + self._pending = None + + if to_send is not None: + self._edit(to_send) + + self._stop.wait(0.2) + + +def _typing_loop(bot: TelegramClient, chat_id: int, stop_evt: threading.Event) -> None: + while not stop_evt.is_set(): + try: + bot.send_chat_action(chat_id=chat_id, action="typing") + except Exception as e: + log(f"[typing] send_chat_action failed chat_id={chat_id}: {e}") + stop_evt.wait(4.0) + + class CodexExecRunner: """ Runs Codex in non-interactive mode: @@ -58,7 +155,12 @@ class CodexExecRunner: self._locks[session_id] = threading.Lock() return self._locks[session_id] - def run(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]: + def run( + self, + prompt: str, + session_id: Optional[str], + on_event: Optional[Callable[[Dict[str, Any]], None]] = None, + ) -> Tuple[str, str]: """ Returns (session_id, final_agent_message_text) """ @@ -111,6 +213,11 @@ class CodexExecRunner: evt = json.loads(line) except json.JSONDecodeError: continue + if on_event is not None: + try: + on_event(evt) + except Exception as e: + log(f"[codex][on_event] callback error: {e}") # From Codex JSONL event stream if evt.get("type") == "thread.started": @@ -134,15 +241,20 @@ class CodexExecRunner: log(f"[codex] done run session_id={found_session!r}") return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)") - def run_serialized(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]: + def run_serialized( + self, + prompt: str, + session_id: Optional[str], + on_event: Optional[Callable[[Dict[str, Any]], None]] = None, + ) -> Tuple[str, str]: """ If resuming, serialize per-session. """ if not session_id: - return self.run(prompt, session_id=None) + return self.run(prompt, session_id=None, on_event=on_event) lock = self._lock_for(session_id) with lock: - return self.run(prompt, session_id=session_id) + return self.run(prompt, session_id=session_id, on_event=on_event) # -------------------- Telegram loop -------------------- @@ -215,12 +327,116 @@ def main() -> None: f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r}" ) try: - try: - bot.send_chat_action(chat_id=chat_id, action="typing") - log(f"[handle] sent typing indicator chat_id={chat_id}") - except Exception as e: - log(f"[handle] failed typing indicator chat_id={chat_id}: {e}") - session_id, answer = runner.run_serialized(text, resume_session) + edit_every_s = float(os.environ.get("TG_PROGRESS_EDIT_EVERY_S", "2.5")) + except ValueError: + edit_every_s = 2.5 + silent_progress = os.environ.get("TG_PROGRESS_SILENT", "1") == "1" + loud_final = os.environ.get("TG_FINAL_NOTIFY", "1") == "1" + + typing_stop = threading.Event() + typing_thread = threading.Thread( + target=_typing_loop, + args=(bot, chat_id, typing_stop), + daemon=True, + ) + typing_thread.start() + + progress_id: Optional[int] = None + progress: Optional[ProgressEditor] = None + try: + progress_msg = bot.send_message( + chat_id=chat_id, + text="Working...", + reply_to_message_id=user_msg_id, + disable_notification=silent_progress, + ) + progress_id = int(progress_msg["message_id"]) + except Exception as e: + log(f"[handle] failed to send progress message chat_id={chat_id}: {e}") + + if progress_id is not None: + progress = ProgressEditor(bot, chat_id, progress_id, edit_every_s) + + started_at = time.monotonic() + session_box: dict[str, Optional[str]] = {"id": resume_session} + + def on_event(evt: Dict[str, Any]) -> None: + event_type = evt.get("type") + if event_type == "thread.started": + thread_id = evt.get("thread_id") + if isinstance(thread_id, str) and thread_id: + session_box["id"] = thread_id + if progress_id is not None: + store.link( + chat_id, + progress_id, + "exec", + thread_id, + meta={"workspace": workspace}, + ) + elif event_type == "item.completed": + item = evt.get("item") or {} + elapsed = int(time.monotonic() - started_at) + line = _summarize_item(item) if isinstance(item, dict) else "item.completed" + session_id = session_box["id"] + header = f"Working... ({elapsed}s)" + if session_id: + header += f"\nSession: {session_id}" + msg = f"{header}\n\n{line}" + if progress is not None: + progress.set(msg) + + def _stop_background() -> None: + typing_stop.set() + typing_thread.join(timeout=1.0) + if progress is not None: + progress.stop() + + try: + session_id, answer = runner.run_serialized(text, resume_session, on_event=on_event) + except Exception as e: + _stop_background() + err = _clamp_tg_text(f"Error:\n{e}") + route_id = session_box["id"] or resume_session or "unknown" + if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: + try: + bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err) + store.link(chat_id, progress_id, "exec", route_id, meta={"error": True}) + log( + "[handle] error " + f"chat_id={chat_id} user_msg_id={user_msg_id} " + f"resume_session={resume_session!r} err={e}" + ) + return + except Exception as ee: + log(f"[handle] failed to edit progress into error: {ee}") + + sent_msgs = bot.send_message_markdown_chunked( + chat_id=chat_id, + text=err, + reply_to_message_id=user_msg_id, + ) + for m in sent_msgs: + store.link(chat_id, m["message_id"], "exec", route_id, meta={"error": True}) + log( + "[handle] error " + f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}" + ) + return + + _stop_background() + + answer = answer or "(No agent_message captured from JSON stream.)" + final_text, final_entities = render_markdown(answer) + can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT + + if loud_final or not can_edit_final: + if progress_id is not None: + try: + bot.delete_message(chat_id=chat_id, message_id=progress_id) + except Exception as e: + log(f"[handle] delete progress failed chat_id={chat_id} message_id={progress_id}: {e}") + sent_msgs = bot.send_message_markdown_chunked( chat_id=chat_id, text=answer, @@ -228,23 +444,19 @@ def main() -> None: ) for m in sent_msgs: store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace}) - log( - "[handle] done " - f"chat_id={chat_id} user_msg_id={user_msg_id} session_id={session_id!r}" - ) - except Exception as e: - err = f"❌ Error:\n{e}" - sent_msgs = bot.send_message_markdown_chunked( + else: + bot.edit_message_text( chat_id=chat_id, - text=err, - reply_to_message_id=user_msg_id, - ) - for m in sent_msgs: - store.link(chat_id, m["message_id"], "exec", resume_session or "unknown", meta={"error": True}) - log( - "[handle] error " - f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}" + message_id=progress_id, + text=final_text, + entities=final_entities or None, ) + store.link(chat_id, progress_id, "exec", session_id, meta={"workspace": workspace}) + + log( + "[handle] done " + f"chat_id={chat_id} user_msg_id={user_msg_id} session_id={session_id!r}" + ) while True: try: