diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py index 5bc1b4e..b4ba487 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py @@ -10,6 +10,7 @@ import threading import time import logging import sys +from dataclasses import dataclass from weakref import WeakValueDictionary from logging.handlers import RotatingFileHandler from concurrent.futures import ThreadPoolExecutor @@ -336,6 +337,220 @@ class CodexExecRunner: # -------------------- Telegram loop -------------------- +@dataclass(frozen=True) +class BridgeConfig: + bot: TelegramClient + runner: CodexExecRunner + chat_id: int + pool: ThreadPoolExecutor + ignore_backlog: bool + progress_edit_every_s: float + progress_silent: bool + final_notify: bool + startup_msg: str + + +def _parse_bridge_config( + *, + progress_edit_every_s: float, + progress_silent: bool, + final_notify: bool, + ignore_backlog: bool, + cd: str | None, + model: str | None, +) -> BridgeConfig: + config = load_telegram_config() + token = config["bot_token"] + chat_id = int(config["chat_id"]) + + startup_pwd = os.getcwd() + startup_msg = f"codex exec bridge has started\npwd: {startup_pwd}" + + codex_cmd = shutil.which("codex") + if not codex_cmd: + raise RuntimeError("codex not found on PATH") + + workspace = cd if cd is not None else config.get("cd") + raw_exec_args = config.get("codex_exec_args", "") + if isinstance(raw_exec_args, list): + extra_args = [str(v) for v in raw_exec_args] + else: + extra_args = shlex.split(str(raw_exec_args)) # e.g. "--full-auto --search" + + if model: + extra_args.extend(["--model", model]) + + def _has_notify_override(args: list[str]) -> bool: + for i, arg in enumerate(args): + if arg in ("-c", "--config"): + key = args[i + 1].split("=", 1)[0].strip() + if key == "notify" or key.endswith(".notify"): + return True + elif arg.startswith(("--config=", "-c=")): + key = arg.split("=", 1)[1].split("=", 1)[0].strip() + if key == "notify" or key.endswith(".notify"): + return True + return False + + if not _has_notify_override(extra_args): + extra_args.extend(["-c", "notify=[]"]) + + bot = TelegramClient(token) + runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) + pool = ThreadPoolExecutor(max_workers=16) + + return BridgeConfig( + bot=bot, + runner=runner, + chat_id=chat_id, + pool=pool, + ignore_backlog=bool(ignore_backlog), + progress_edit_every_s=progress_edit_every_s, + progress_silent=progress_silent, + final_notify=final_notify, + startup_msg=startup_msg, + ) + + +def _send_startup(cfg: BridgeConfig) -> None: + try: + cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg) + logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id) + except Exception as e: + logger.info("[startup] failed to send startup message to chat_id=%s: %s", cfg.chat_id, e) + + +def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None: + if not cfg.ignore_backlog: + return offset + try: + updates = cfg.bot.get_updates(offset=offset, timeout_s=0, allowed_updates=["message"]) + except Exception as e: + logger.info("[startup] backlog drain failed: %s", e) + return offset + if updates: + offset = updates[-1]["update_id"] + 1 + logger.info("[startup] drained %s pending update(s)", len(updates)) + return offset + + +def _handle_message( + cfg: BridgeConfig, + *, + chat_id: int, + user_msg_id: int, + text: str, + resume_session: str | None, +) -> None: + started_at = time.monotonic() + progress_renderer = ExecProgressRenderer(max_actions=5) + + progress_id: int | None = None + progress: ProgressEditor | None = None + try: + initial_text = progress_renderer.render_progress(0.0) + initial_rendered, initial_entities = render_markdown(initial_text) + progress_msg = cfg.bot.send_message( + chat_id=chat_id, + text=initial_rendered, + entities=initial_entities or None, + reply_to_message_id=user_msg_id, + disable_notification=cfg.progress_silent, + ) + progress_id = int(progress_msg["message_id"]) + logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id) + except Exception as e: + logger.info("[handle] failed to send progress message chat_id=%s: %s", chat_id, e) + + if progress_id is not None: + progress = ProgressEditor( + cfg.bot, + chat_id, + progress_id, + cfg.progress_edit_every_s, + initial_text=initial_rendered, + initial_entities=initial_entities or None, + ) + + def on_event(evt: dict[str, Any]) -> None: + if progress_renderer.note_event(evt) and progress is not None: + elapsed = time.monotonic() - started_at + progress.set_markdown(progress_renderer.render_progress(elapsed)) + + def _stop_background() -> None: + if progress is not None: + progress.stop() + + try: + session_id, answer, saw_agent_message = cfg.runner.run_serialized( + text, + resume_session, + on_event=on_event, + ) + except Exception as e: + _stop_background() + err = _clamp_tg_text(f"Error:\n{e}") + if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: + cfg.bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err) + return + _send_markdown(cfg.bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) + return + + _stop_background() + + answer = answer or "(No agent_message captured from JSON stream.)" + elapsed = time.monotonic() - started_at + status = "done" if saw_agent_message else "error" + final_md = progress_renderer.render_final(elapsed, answer, status=status) + f"\n\nresume: `{session_id}`" + final_text, final_entities = render_markdown(final_md) + can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT + + if cfg.final_notify or not can_edit_final: + _send_markdown(cfg.bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id) + if progress_id is not None: + cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id) + else: + cfg.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_id, + text=final_text, + entities=final_entities or None, + ) + + +def _run_main_loop(cfg: BridgeConfig) -> None: + offset: int | None = None + offset = _drain_backlog(cfg, offset) + _send_startup(cfg) + + while True: + updates = cfg.bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) + for upd in updates: + offset = upd["update_id"] + 1 + msg = upd.get("message") or {} + msg_chat_id = msg.get("chat", {}).get("id") + if "text" not in msg: + continue + if int(msg_chat_id) != cfg.chat_id: + continue + if msg.get("from", {}).get("is_bot"): + continue + + text = msg["text"] + user_msg_id = msg["message_id"] + resume_session = extract_session_id(text) + r = msg.get("reply_to_message") or {} + resume_session = resume_session or extract_session_id(r.get("text")) + + cfg.pool.submit( + _handle_message, + cfg, + chat_id=msg_chat_id, + user_msg_id=user_msg_id, + text=text, + resume_session=resume_session, + ) + def run( progress_edit_every_s: float = typer.Option( @@ -376,292 +591,15 @@ def run( ), ) -> None: setup_logging(log_file if log_file else None) - config = load_telegram_config() - token = config["bot_token"] - config_chat_id = int(config["chat_id"]) - - startup_pwd = os.getcwd() - startup_msg = f"codex exec bridge has started\npwd: {startup_pwd}" - - codex_cmd = shutil.which("codex") - if not codex_cmd: - raise RuntimeError("codex not found on PATH") - workspace = cd if cd is not None else config.get("cd") - raw_exec_args = config.get("codex_exec_args", "") - if isinstance(raw_exec_args, list): - extra_args = [str(v) for v in raw_exec_args] - else: - extra_args = shlex.split(str(raw_exec_args)) # e.g. "--full-auto --search" - - if model: - extra_args.extend(["--model", model]) - - def _has_notify_override(args: list[str]) -> bool: - for i, arg in enumerate(args): - if arg in ("-c", "--config"): - if i + 1 >= len(args): - continue - key = args[i + 1].split("=", 1)[0].strip() - if key == "notify" or key.endswith(".notify"): - return True - elif arg.startswith(("--config=", "-c=")): - key = arg.split("=", 1)[1].split("=", 1)[0].strip() - if key == "notify" or key.endswith(".notify"): - return True - return False - - # Default: disable notify hook for exec-bridge runs to avoid duplicate messages. - if not _has_notify_override(extra_args): - extra_args.extend(["-c", "notify=[]"]) - - bot = TelegramClient(token) - runner = CodexExecRunner( - codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args + cfg = _parse_bridge_config( + progress_edit_every_s=progress_edit_every_s, + progress_silent=progress_silent, + final_notify=final_notify, + ignore_backlog=ignore_backlog, + cd=cd, + model=model, ) - - pool = ThreadPoolExecutor(max_workers=16) - offset: int | None = None - ignore_backlog = bool(ignore_backlog) - - if ignore_backlog: - try: - updates = bot.get_updates( - offset=offset, timeout_s=0, allowed_updates=["message"] - ) - except Exception as e: - logger.info("[startup] backlog drain failed: %s", e) - updates = [] - if updates: - offset = updates[-1]["update_id"] + 1 - logger.info("[startup] drained %s pending update(s)", len(updates)) - - logger.info("[startup] pwd=%s", startup_pwd) - logger.info("Option1 bridge running (codex exec). Long-polling Telegram...") - try: - bot.send_message(chat_id=config_chat_id, text=startup_msg) - logger.info("[startup] sent startup message to chat_id=%s", config_chat_id) - except Exception as e: - logger.info("[startup] failed to send startup message to chat_id=%s: %s", config_chat_id, e) - - def handle( - chat_id: int, user_msg_id: int, text: str, resume_session: str | None - ) -> None: - logger.info( - "[handle] start chat_id=%s user_msg_id=%s resume_session=%r", - chat_id, - user_msg_id, - resume_session, - ) - logger.debug( - "[handle] thread name=%s ident=%s", - threading.current_thread().name, - threading.get_ident(), - ) - edit_every_s = progress_edit_every_s - silent_progress = progress_silent - loud_final = final_notify - - started_at = time.monotonic() - session_box: dict[str, str | None] = {"id": resume_session} - progress_renderer = ExecProgressRenderer(max_actions=5) - - progress_id: int | None = None - progress: ProgressEditor | None = None - try: - initial_text = progress_renderer.render_progress(0.0) - initial_rendered, initial_entities = render_markdown(initial_text) - progress_msg = bot.send_message( - chat_id=chat_id, - text=initial_rendered, - entities=initial_entities or None, - reply_to_message_id=user_msg_id, - disable_notification=silent_progress, - ) - progress_id = int(progress_msg["message_id"]) - logger.debug( - "[progress] sent chat_id=%s message_id=%s", chat_id, progress_id - ) - except Exception as e: - logger.info( - "[handle] failed to send progress message chat_id=%s: %s", chat_id, e - ) - - if progress_id is not None: - progress = ProgressEditor( - bot, - chat_id, - progress_id, - edit_every_s, - initial_text=initial_rendered, - initial_entities=initial_entities or None, - ) - - def on_event(evt: dict[str, Any]) -> None: - event_type = evt.get("type") - item = evt.get("item") or {} - logger.debug( - "[codex] event type=%s item_id=%s item_type=%s status=%s", - event_type, - item.get("id"), - item.get("type"), - item.get("status"), - ) - if event_type == "thread.started": - thread_id = evt.get("thread_id") - if isinstance(thread_id, str) and thread_id: - session_box["id"] = thread_id - if progress_renderer.note_event(evt) and progress is not None: - elapsed = time.monotonic() - started_at - msg = progress_renderer.render_progress(elapsed) - progress.set_markdown(msg) - - def _stop_background() -> None: - if progress is not None: - progress.stop() - logger.debug("[progress] thread stopped") - - try: - session_id, answer, saw_agent_message = runner.run_serialized( - text, - resume_session, - on_event=on_event, - ) - except Exception as e: - _stop_background() - err = _clamp_tg_text(f"Error:\n{e}") - if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: - try: - bot.edit_message_text( - chat_id=chat_id, message_id=progress_id, text=err - ) - logger.info( - "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", - chat_id, - user_msg_id, - resume_session, - e, - ) - return - except Exception as ee: - logger.info("[handle] failed to edit progress into error: %s", ee) - - _send_markdown( - bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id - ) - logger.info( - "[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s", - chat_id, - user_msg_id, - resume_session, - e, - ) - return - - _stop_background() - - answer = answer or "(No agent_message captured from JSON stream.)" - elapsed = time.monotonic() - started_at - status = "done" if saw_agent_message else "error" - final_md = progress_renderer.render_final(elapsed, answer, status=status) - final_md = final_md + f"\n\nresume: `{session_id}`" - final_text, final_entities = render_markdown(final_md) - can_edit_final = ( - progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT - ) - - if loud_final or not can_edit_final: - _send_markdown( - bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id - ) - if progress_id is not None: - try: - bot.delete_message(chat_id=chat_id, message_id=progress_id) - except Exception as e: - logger.info( - "[handle] delete progress failed chat_id=%s message_id=%s: %s", - chat_id, - progress_id, - e, - ) - else: - bot.edit_message_text( - chat_id=chat_id, - message_id=progress_id, - text=final_text, - entities=final_entities or None, - ) - - logger.info( - "[handle] done chat_id=%s user_msg_id=%s session_id=%r", - chat_id, - user_msg_id, - session_id, - ) - - while True: - try: - updates = bot.get_updates( - offset=offset, timeout_s=50, allowed_updates=["message"] - ) - except Exception as e: - logger.info("[telegram] get_updates error: %s", e) - time.sleep(2.0) - continue - - for upd in updates: - offset = upd["update_id"] + 1 - msg = upd.get("message") or {} - chat_id = msg.get("chat", {}).get("id") - from_bot = msg.get("from", {}).get("is_bot") - msg_text = msg.get("text") - reply_to = (msg.get("reply_to_message") or {}).get("message_id") - logger.info( - "[telegram] received update_id=%s chat_id=%s from_bot=%s has_text=%s reply_to=%s text=%s", - upd.get("update_id"), - chat_id, - from_bot, - msg_text is not None, - reply_to, - repr(msg_text), - ) - if "text" not in msg: - logger.info( - "[telegram] ignoring non-text message chat_id=%s update_id=%s", - chat_id, - upd.get("update_id"), - ) - continue - - if int(chat_id) != config_chat_id: - logger.info( - "[telegram] rejected by ACL chat_id=%s allowed=%s", - chat_id, - config_chat_id, - ) - continue - - if msg.get("from", {}).get("is_bot"): - logger.info( - "[telegram] ignoring bot message chat_id=%s update_id=%s", - chat_id, - upd.get("update_id"), - ) - continue - - text = msg["text"] - user_msg_id = msg["message_id"] - logger.info( - "[telegram] accepted message chat_id=%s user_msg_id=%s text=%s", - chat_id, - user_msg_id, - repr(text), - ) - - resume_session = extract_session_id(text) - r = msg.get("reply_to_message") or {} - resume_session = resume_session or extract_session_id(r.get("text")) - - pool.submit(handle, chat_id, user_msg_id, text, resume_session) + _run_main_loop(cfg) def main() -> None: