From 245d22eb82fb9b50e90c36866001967a69acd7c3 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Sun, 28 Dec 2025 20:30:33 +0400 Subject: [PATCH] feat(exec-bridge): add startup notifications and logging --- codex/codex_telegram_bridge/exec_bridge.py | 99 +++++++++++++++++++++- codex/codex_telegram_bridge/readme.md | 2 + 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/codex/codex_telegram_bridge/exec_bridge.py b/codex/codex_telegram_bridge/exec_bridge.py index 2afc6f3..2bfd359 100644 --- a/codex/codex_telegram_bridge/exec_bridge.py +++ b/codex/codex_telegram_bridge/exec_bridge.py @@ -14,6 +14,17 @@ from bridge_common import TelegramClient, RouteStore, parse_allowed_chat_ids # -------------------- Codex runner -------------------- +def log(msg: str) -> None: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"[{ts}] {msg}", flush=True) + + +def _one_line(text: Optional[str]) -> str: + if text is None: + return "None" + return text.replace("\r", "\\r").replace("\n", "\\n") + + class CodexExecRunner: """ Runs Codex in non-interactive mode: @@ -40,6 +51,7 @@ class CodexExecRunner: """ Returns (session_id, final_agent_message_text) """ + log(f"[codex] start run session_id={session_id!r} workspace={self.workspace!r}") args = [self.codex_cmd, "exec", "--json"] args.extend(self.extra_args) if self.workspace: @@ -70,6 +82,7 @@ class CodexExecRunner: def _drain_stderr() -> None: for line in proc.stderr: + log(f"[codex][stderr] {line.rstrip()}") stderr_lines.append(line) t = threading.Thread(target=_drain_stderr, daemon=True) @@ -82,6 +95,7 @@ class CodexExecRunner: line = line.strip() if not line: continue + log(f"[codex][event] {line}") try: evt = json.loads(line) except json.JSONDecodeError: @@ -106,6 +120,7 @@ class CodexExecRunner: if not found_session: raise RuntimeError("codex exec finished but no session_id/thread_id was captured") + log(f"[codex] done run session_id={found_session!r}") return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)") def run_serialized(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]: @@ -126,11 +141,33 @@ def main() -> None: token = os.environ.get("TELEGRAM_BOT_TOKEN", "") db_path = os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3") allowed = parse_allowed_chat_ids(os.environ.get("ALLOWED_CHAT_IDS", "")) + startup_ids = parse_allowed_chat_ids(os.environ.get("STARTUP_CHAT_IDS", "")) or allowed + startup_msg = os.environ.get("STARTUP_MESSAGE", "✅ exec_bridge started (codex exec).") + startup_pwd = os.getcwd() + startup_msg = f"{startup_msg}\nPWD: {startup_pwd}" codex_cmd = os.environ.get("CODEX_CMD", "codex") workspace = os.environ.get("CODEX_WORKSPACE") # optional extra_args = shlex.split(os.environ.get("CODEX_EXEC_ARGS", "")) # e.g. "--full-auto --search" + def _has_notify_override(args: list[str]) -> bool: + for i, arg in enumerate(args): + if arg in ("-c", "--config"): + if i + 1 >= len(args): + continue + key = args[i + 1].split("=", 1)[0].strip() + if key == "notify" or key.endswith(".notify"): + return True + elif arg.startswith(("--config=", "-c=")): + key = arg.split("=", 1)[1].split("=", 1)[0].strip() + if key == "notify" or key.endswith(".notify"): + return True + return False + + # Default: disable notify hook for exec-bridge runs to avoid duplicate messages. + if not _has_notify_override(extra_args): + extra_args.extend(["-c", "notify=[]"]) + bot = TelegramClient(token) store = RouteStore(db_path) runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) @@ -138,9 +175,23 @@ def main() -> None: pool = ThreadPoolExecutor(max_workers=int(os.environ.get("MAX_WORKERS", "4"))) offset: Optional[int] = None - print("Option1 bridge running (codex exec). Long-polling Telegram...") + log(f"[startup] pwd={startup_pwd}") + log("Option1 bridge running (codex exec). Long-polling Telegram...") + if startup_ids: + for chat_id in startup_ids: + try: + bot.send_message(chat_id=chat_id, text=startup_msg) + log(f"[startup] sent startup message to chat_id={chat_id}") + except Exception as e: + log(f"[startup] failed to send startup message to chat_id={chat_id}: {e}") + else: + log("[startup] no STARTUP_CHAT_IDS or ALLOWED_CHAT_IDS set; skipping startup message") def handle(chat_id: int, user_msg_id: int, text: str, resume_session: Optional[str]) -> None: + log( + "[handle] start " + f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r}" + ) try: session_id, answer = runner.run_serialized(text, resume_session) sent_msgs = bot.send_message_chunked( @@ -150,35 +201,68 @@ def main() -> None: ) for m in sent_msgs: store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace}) + log( + "[handle] done " + f"chat_id={chat_id} user_msg_id={user_msg_id} session_id={session_id!r}" + ) except Exception as e: err = f"❌ Error:\n{e}" sent_msgs = bot.send_message_chunked(chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) for m in sent_msgs: store.link(chat_id, m["message_id"], "exec", resume_session or "unknown", meta={"error": True}) + log( + "[handle] error " + f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}" + ) while True: try: updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) except Exception as e: - print(f"[telegram] get_updates error: {e}") + log(f"[telegram] get_updates error: {e}") time.sleep(2.0) continue for upd in updates: offset = upd["update_id"] + 1 msg = upd.get("message") or {} + chat_id = msg.get("chat", {}).get("id") + from_bot = msg.get("from", {}).get("is_bot") + msg_text = msg.get("text") + reply_to = (msg.get("reply_to_message") or {}).get("message_id") + log( + "[telegram] received " + f"update_id={upd.get('update_id')} chat_id={chat_id} " + f"from_bot={from_bot} has_text={msg_text is not None} " + f"reply_to={reply_to} text={_one_line(msg_text)}" + ) if "text" not in msg: + log( + "[telegram] ignoring non-text message " + f"chat_id={chat_id} update_id={upd.get('update_id')}" + ) continue - chat_id = msg["chat"]["id"] if allowed is not None and int(chat_id) not in allowed: + log( + "[telegram] rejected by ACL " + f"chat_id={chat_id} allowed={sorted(allowed)}" + ) continue if msg.get("from", {}).get("is_bot"): + log( + "[telegram] ignoring bot message " + f"chat_id={chat_id} update_id={upd.get('update_id')}" + ) continue text = msg["text"] user_msg_id = msg["message_id"] + log( + "[telegram] accepted message " + f"chat_id={chat_id} user_msg_id={user_msg_id} text={_one_line(text)}" + ) # If user replied to a bot message, route to that session resume_session: Optional[str] = None @@ -187,6 +271,15 @@ def main() -> None: route = store.resolve(chat_id, r["message_id"]) if route and route.route_type == "exec": resume_session = route.route_id + log( + "[telegram] resolved reply route " + f"chat_id={chat_id} bot_message_id={r['message_id']} session_id={resume_session!r}" + ) + else: + log( + "[telegram] reply has no exec route " + f"chat_id={chat_id} bot_message_id={r['message_id']}" + ) pool.submit(handle, chat_id, user_msg_id, text, resume_session) diff --git a/codex/codex_telegram_bridge/readme.md b/codex/codex_telegram_bridge/readme.md index 212aa37..11e79b2 100644 --- a/codex/codex_telegram_bridge/readme.md +++ b/codex/codex_telegram_bridge/readme.md @@ -24,6 +24,8 @@ export BRIDGE_DB="./bridge_routes.sqlite3" export CODEX_CMD="codex" export CODEX_WORKSPACE="/path/to/repo" export CODEX_EXEC_ARGS="--full-auto" +export STARTUP_CHAT_IDS="123456789" # optional; defaults to ALLOWED_CHAT_IDS if set +export STARTUP_MESSAGE="✅ exec_bridge started (codex exec)." # optional; PWD is appended uv run exec_bridge.py ```