feat(exec-bridge): add startup notifications and logging
This commit is contained in:
@@ -14,6 +14,17 @@ from bridge_common import TelegramClient, RouteStore, parse_allowed_chat_ids
|
||||
# -------------------- Codex runner --------------------
|
||||
|
||||
|
||||
def log(msg: str) -> None:
|
||||
ts = time.strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(f"[{ts}] {msg}", flush=True)
|
||||
|
||||
|
||||
def _one_line(text: Optional[str]) -> str:
|
||||
if text is None:
|
||||
return "None"
|
||||
return text.replace("\r", "\\r").replace("\n", "\\n")
|
||||
|
||||
|
||||
class CodexExecRunner:
|
||||
"""
|
||||
Runs Codex in non-interactive mode:
|
||||
@@ -40,6 +51,7 @@ class CodexExecRunner:
|
||||
"""
|
||||
Returns (session_id, final_agent_message_text)
|
||||
"""
|
||||
log(f"[codex] start run session_id={session_id!r} workspace={self.workspace!r}")
|
||||
args = [self.codex_cmd, "exec", "--json"]
|
||||
args.extend(self.extra_args)
|
||||
if self.workspace:
|
||||
@@ -70,6 +82,7 @@ class CodexExecRunner:
|
||||
|
||||
def _drain_stderr() -> None:
|
||||
for line in proc.stderr:
|
||||
log(f"[codex][stderr] {line.rstrip()}")
|
||||
stderr_lines.append(line)
|
||||
|
||||
t = threading.Thread(target=_drain_stderr, daemon=True)
|
||||
@@ -82,6 +95,7 @@ class CodexExecRunner:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
log(f"[codex][event] {line}")
|
||||
try:
|
||||
evt = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
@@ -106,6 +120,7 @@ class CodexExecRunner:
|
||||
if not found_session:
|
||||
raise RuntimeError("codex exec finished but no session_id/thread_id was captured")
|
||||
|
||||
log(f"[codex] done run session_id={found_session!r}")
|
||||
return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)")
|
||||
|
||||
def run_serialized(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]:
|
||||
@@ -126,11 +141,33 @@ def main() -> None:
|
||||
token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
db_path = os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3")
|
||||
allowed = parse_allowed_chat_ids(os.environ.get("ALLOWED_CHAT_IDS", ""))
|
||||
startup_ids = parse_allowed_chat_ids(os.environ.get("STARTUP_CHAT_IDS", "")) or allowed
|
||||
startup_msg = os.environ.get("STARTUP_MESSAGE", "✅ exec_bridge started (codex exec).")
|
||||
startup_pwd = os.getcwd()
|
||||
startup_msg = f"{startup_msg}\nPWD: {startup_pwd}"
|
||||
|
||||
codex_cmd = os.environ.get("CODEX_CMD", "codex")
|
||||
workspace = os.environ.get("CODEX_WORKSPACE") # optional
|
||||
extra_args = shlex.split(os.environ.get("CODEX_EXEC_ARGS", "")) # e.g. "--full-auto --search"
|
||||
|
||||
def _has_notify_override(args: list[str]) -> bool:
|
||||
for i, arg in enumerate(args):
|
||||
if arg in ("-c", "--config"):
|
||||
if i + 1 >= len(args):
|
||||
continue
|
||||
key = args[i + 1].split("=", 1)[0].strip()
|
||||
if key == "notify" or key.endswith(".notify"):
|
||||
return True
|
||||
elif arg.startswith(("--config=", "-c=")):
|
||||
key = arg.split("=", 1)[1].split("=", 1)[0].strip()
|
||||
if key == "notify" or key.endswith(".notify"):
|
||||
return True
|
||||
return False
|
||||
|
||||
# Default: disable notify hook for exec-bridge runs to avoid duplicate messages.
|
||||
if not _has_notify_override(extra_args):
|
||||
extra_args.extend(["-c", "notify=[]"])
|
||||
|
||||
bot = TelegramClient(token)
|
||||
store = RouteStore(db_path)
|
||||
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args)
|
||||
@@ -138,9 +175,23 @@ def main() -> None:
|
||||
pool = ThreadPoolExecutor(max_workers=int(os.environ.get("MAX_WORKERS", "4")))
|
||||
offset: Optional[int] = None
|
||||
|
||||
print("Option1 bridge running (codex exec). Long-polling Telegram...")
|
||||
log(f"[startup] pwd={startup_pwd}")
|
||||
log("Option1 bridge running (codex exec). Long-polling Telegram...")
|
||||
if startup_ids:
|
||||
for chat_id in startup_ids:
|
||||
try:
|
||||
bot.send_message(chat_id=chat_id, text=startup_msg)
|
||||
log(f"[startup] sent startup message to chat_id={chat_id}")
|
||||
except Exception as e:
|
||||
log(f"[startup] failed to send startup message to chat_id={chat_id}: {e}")
|
||||
else:
|
||||
log("[startup] no STARTUP_CHAT_IDS or ALLOWED_CHAT_IDS set; skipping startup message")
|
||||
|
||||
def handle(chat_id: int, user_msg_id: int, text: str, resume_session: Optional[str]) -> None:
|
||||
log(
|
||||
"[handle] start "
|
||||
f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r}"
|
||||
)
|
||||
try:
|
||||
session_id, answer = runner.run_serialized(text, resume_session)
|
||||
sent_msgs = bot.send_message_chunked(
|
||||
@@ -150,35 +201,68 @@ def main() -> None:
|
||||
)
|
||||
for m in sent_msgs:
|
||||
store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace})
|
||||
log(
|
||||
"[handle] done "
|
||||
f"chat_id={chat_id} user_msg_id={user_msg_id} session_id={session_id!r}"
|
||||
)
|
||||
except Exception as e:
|
||||
err = f"❌ Error:\n{e}"
|
||||
sent_msgs = bot.send_message_chunked(chat_id=chat_id, text=err, reply_to_message_id=user_msg_id)
|
||||
for m in sent_msgs:
|
||||
store.link(chat_id, m["message_id"], "exec", resume_session or "unknown", meta={"error": True})
|
||||
log(
|
||||
"[handle] error "
|
||||
f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}"
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"])
|
||||
except Exception as e:
|
||||
print(f"[telegram] get_updates error: {e}")
|
||||
log(f"[telegram] get_updates error: {e}")
|
||||
time.sleep(2.0)
|
||||
continue
|
||||
|
||||
for upd in updates:
|
||||
offset = upd["update_id"] + 1
|
||||
msg = upd.get("message") or {}
|
||||
chat_id = msg.get("chat", {}).get("id")
|
||||
from_bot = msg.get("from", {}).get("is_bot")
|
||||
msg_text = msg.get("text")
|
||||
reply_to = (msg.get("reply_to_message") or {}).get("message_id")
|
||||
log(
|
||||
"[telegram] received "
|
||||
f"update_id={upd.get('update_id')} chat_id={chat_id} "
|
||||
f"from_bot={from_bot} has_text={msg_text is not None} "
|
||||
f"reply_to={reply_to} text={_one_line(msg_text)}"
|
||||
)
|
||||
if "text" not in msg:
|
||||
log(
|
||||
"[telegram] ignoring non-text message "
|
||||
f"chat_id={chat_id} update_id={upd.get('update_id')}"
|
||||
)
|
||||
continue
|
||||
|
||||
chat_id = msg["chat"]["id"]
|
||||
if allowed is not None and int(chat_id) not in allowed:
|
||||
log(
|
||||
"[telegram] rejected by ACL "
|
||||
f"chat_id={chat_id} allowed={sorted(allowed)}"
|
||||
)
|
||||
continue
|
||||
|
||||
if msg.get("from", {}).get("is_bot"):
|
||||
log(
|
||||
"[telegram] ignoring bot message "
|
||||
f"chat_id={chat_id} update_id={upd.get('update_id')}"
|
||||
)
|
||||
continue
|
||||
|
||||
text = msg["text"]
|
||||
user_msg_id = msg["message_id"]
|
||||
log(
|
||||
"[telegram] accepted message "
|
||||
f"chat_id={chat_id} user_msg_id={user_msg_id} text={_one_line(text)}"
|
||||
)
|
||||
|
||||
# If user replied to a bot message, route to that session
|
||||
resume_session: Optional[str] = None
|
||||
@@ -187,6 +271,15 @@ def main() -> None:
|
||||
route = store.resolve(chat_id, r["message_id"])
|
||||
if route and route.route_type == "exec":
|
||||
resume_session = route.route_id
|
||||
log(
|
||||
"[telegram] resolved reply route "
|
||||
f"chat_id={chat_id} bot_message_id={r['message_id']} session_id={resume_session!r}"
|
||||
)
|
||||
else:
|
||||
log(
|
||||
"[telegram] reply has no exec route "
|
||||
f"chat_id={chat_id} bot_message_id={r['message_id']}"
|
||||
)
|
||||
|
||||
pool.submit(handle, chat_id, user_msg_id, text, resume_session)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user