refactor: extract bridge config and loop

This commit is contained in:
banteg
2025-12-29 03:43:39 +04:00
parent 5ed3cb191a
commit ee69010587
@@ -10,6 +10,7 @@ import threading
import time
import logging
import sys
from dataclasses import dataclass
from weakref import WeakValueDictionary
from logging.handlers import RotatingFileHandler
from concurrent.futures import ThreadPoolExecutor
@@ -336,6 +337,220 @@ class CodexExecRunner:
# -------------------- Telegram loop --------------------
@dataclass(frozen=True)
class BridgeConfig:
bot: TelegramClient
runner: CodexExecRunner
chat_id: int
pool: ThreadPoolExecutor
ignore_backlog: bool
progress_edit_every_s: float
progress_silent: bool
final_notify: bool
startup_msg: str
def _parse_bridge_config(
*,
progress_edit_every_s: float,
progress_silent: bool,
final_notify: bool,
ignore_backlog: bool,
cd: str | None,
model: str | None,
) -> BridgeConfig:
config = load_telegram_config()
token = config["bot_token"]
chat_id = int(config["chat_id"])
startup_pwd = os.getcwd()
startup_msg = f"codex exec bridge has started\npwd: {startup_pwd}"
codex_cmd = shutil.which("codex")
if not codex_cmd:
raise RuntimeError("codex not found on PATH")
workspace = cd if cd is not None else config.get("cd")
raw_exec_args = config.get("codex_exec_args", "")
if isinstance(raw_exec_args, list):
extra_args = [str(v) for v in raw_exec_args]
else:
extra_args = shlex.split(str(raw_exec_args)) # e.g. "--full-auto --search"
if model:
extra_args.extend(["--model", model])
def _has_notify_override(args: list[str]) -> bool:
for i, arg in enumerate(args):
if arg in ("-c", "--config"):
key = args[i + 1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
elif arg.startswith(("--config=", "-c=")):
key = arg.split("=", 1)[1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
return False
if not _has_notify_override(extra_args):
extra_args.extend(["-c", "notify=[]"])
bot = TelegramClient(token)
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args)
pool = ThreadPoolExecutor(max_workers=16)
return BridgeConfig(
bot=bot,
runner=runner,
chat_id=chat_id,
pool=pool,
ignore_backlog=bool(ignore_backlog),
progress_edit_every_s=progress_edit_every_s,
progress_silent=progress_silent,
final_notify=final_notify,
startup_msg=startup_msg,
)
def _send_startup(cfg: BridgeConfig) -> None:
try:
cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg)
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
except Exception as e:
logger.info("[startup] failed to send startup message to chat_id=%s: %s", cfg.chat_id, e)
def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
if not cfg.ignore_backlog:
return offset
try:
updates = cfg.bot.get_updates(offset=offset, timeout_s=0, allowed_updates=["message"])
except Exception as e:
logger.info("[startup] backlog drain failed: %s", e)
return offset
if updates:
offset = updates[-1]["update_id"] + 1
logger.info("[startup] drained %s pending update(s)", len(updates))
return offset
def _handle_message(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
text: str,
resume_session: str | None,
) -> None:
started_at = time.monotonic()
progress_renderer = ExecProgressRenderer(max_actions=5)
progress_id: int | None = None
progress: ProgressEditor | None = None
try:
initial_text = progress_renderer.render_progress(0.0)
initial_rendered, initial_entities = render_markdown(initial_text)
progress_msg = cfg.bot.send_message(
chat_id=chat_id,
text=initial_rendered,
entities=initial_entities or None,
reply_to_message_id=user_msg_id,
disable_notification=cfg.progress_silent,
)
progress_id = int(progress_msg["message_id"])
logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id)
except Exception as e:
logger.info("[handle] failed to send progress message chat_id=%s: %s", chat_id, e)
if progress_id is not None:
progress = ProgressEditor(
cfg.bot,
chat_id,
progress_id,
cfg.progress_edit_every_s,
initial_text=initial_rendered,
initial_entities=initial_entities or None,
)
def on_event(evt: dict[str, Any]) -> None:
if progress_renderer.note_event(evt) and progress is not None:
elapsed = time.monotonic() - started_at
progress.set_markdown(progress_renderer.render_progress(elapsed))
def _stop_background() -> None:
if progress is not None:
progress.stop()
try:
session_id, answer, saw_agent_message = cfg.runner.run_serialized(
text,
resume_session,
on_event=on_event,
)
except Exception as e:
_stop_background()
err = _clamp_tg_text(f"Error:\n{e}")
if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT:
cfg.bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err)
return
_send_markdown(cfg.bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id)
return
_stop_background()
answer = answer or "(No agent_message captured from JSON stream.)"
elapsed = time.monotonic() - started_at
status = "done" if saw_agent_message else "error"
final_md = progress_renderer.render_final(elapsed, answer, status=status) + f"\n\nresume: `{session_id}`"
final_text, final_entities = render_markdown(final_md)
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
if cfg.final_notify or not can_edit_final:
_send_markdown(cfg.bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id)
if progress_id is not None:
cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
else:
cfg.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_id,
text=final_text,
entities=final_entities or None,
)
def _run_main_loop(cfg: BridgeConfig) -> None:
offset: int | None = None
offset = _drain_backlog(cfg, offset)
_send_startup(cfg)
while True:
updates = cfg.bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"])
for upd in updates:
offset = upd["update_id"] + 1
msg = upd.get("message") or {}
msg_chat_id = msg.get("chat", {}).get("id")
if "text" not in msg:
continue
if int(msg_chat_id) != cfg.chat_id:
continue
if msg.get("from", {}).get("is_bot"):
continue
text = msg["text"]
user_msg_id = msg["message_id"]
resume_session = extract_session_id(text)
r = msg.get("reply_to_message") or {}
resume_session = resume_session or extract_session_id(r.get("text"))
cfg.pool.submit(
_handle_message,
cfg,
chat_id=msg_chat_id,
user_msg_id=user_msg_id,
text=text,
resume_session=resume_session,
)
def run(
progress_edit_every_s: float = typer.Option(
@@ -376,292 +591,15 @@ def run(
),
) -> None:
setup_logging(log_file if log_file else None)
config = load_telegram_config()
token = config["bot_token"]
config_chat_id = int(config["chat_id"])
startup_pwd = os.getcwd()
startup_msg = f"codex exec bridge has started\npwd: {startup_pwd}"
codex_cmd = shutil.which("codex")
if not codex_cmd:
raise RuntimeError("codex not found on PATH")
workspace = cd if cd is not None else config.get("cd")
raw_exec_args = config.get("codex_exec_args", "")
if isinstance(raw_exec_args, list):
extra_args = [str(v) for v in raw_exec_args]
else:
extra_args = shlex.split(str(raw_exec_args)) # e.g. "--full-auto --search"
if model:
extra_args.extend(["--model", model])
def _has_notify_override(args: list[str]) -> bool:
for i, arg in enumerate(args):
if arg in ("-c", "--config"):
if i + 1 >= len(args):
continue
key = args[i + 1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
elif arg.startswith(("--config=", "-c=")):
key = arg.split("=", 1)[1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
return False
# Default: disable notify hook for exec-bridge runs to avoid duplicate messages.
if not _has_notify_override(extra_args):
extra_args.extend(["-c", "notify=[]"])
bot = TelegramClient(token)
runner = CodexExecRunner(
codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args
cfg = _parse_bridge_config(
progress_edit_every_s=progress_edit_every_s,
progress_silent=progress_silent,
final_notify=final_notify,
ignore_backlog=ignore_backlog,
cd=cd,
model=model,
)
pool = ThreadPoolExecutor(max_workers=16)
offset: int | None = None
ignore_backlog = bool(ignore_backlog)
if ignore_backlog:
try:
updates = bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
except Exception as e:
logger.info("[startup] backlog drain failed: %s", e)
updates = []
if updates:
offset = updates[-1]["update_id"] + 1
logger.info("[startup] drained %s pending update(s)", len(updates))
logger.info("[startup] pwd=%s", startup_pwd)
logger.info("Option1 bridge running (codex exec). Long-polling Telegram...")
try:
bot.send_message(chat_id=config_chat_id, text=startup_msg)
logger.info("[startup] sent startup message to chat_id=%s", config_chat_id)
except Exception as e:
logger.info("[startup] failed to send startup message to chat_id=%s: %s", config_chat_id, e)
def handle(
chat_id: int, user_msg_id: int, text: str, resume_session: str | None
) -> None:
logger.info(
"[handle] start chat_id=%s user_msg_id=%s resume_session=%r",
chat_id,
user_msg_id,
resume_session,
)
logger.debug(
"[handle] thread name=%s ident=%s",
threading.current_thread().name,
threading.get_ident(),
)
edit_every_s = progress_edit_every_s
silent_progress = progress_silent
loud_final = final_notify
started_at = time.monotonic()
session_box: dict[str, str | None] = {"id": resume_session}
progress_renderer = ExecProgressRenderer(max_actions=5)
progress_id: int | None = None
progress: ProgressEditor | None = None
try:
initial_text = progress_renderer.render_progress(0.0)
initial_rendered, initial_entities = render_markdown(initial_text)
progress_msg = bot.send_message(
chat_id=chat_id,
text=initial_rendered,
entities=initial_entities or None,
reply_to_message_id=user_msg_id,
disable_notification=silent_progress,
)
progress_id = int(progress_msg["message_id"])
logger.debug(
"[progress] sent chat_id=%s message_id=%s", chat_id, progress_id
)
except Exception as e:
logger.info(
"[handle] failed to send progress message chat_id=%s: %s", chat_id, e
)
if progress_id is not None:
progress = ProgressEditor(
bot,
chat_id,
progress_id,
edit_every_s,
initial_text=initial_rendered,
initial_entities=initial_entities or None,
)
def on_event(evt: dict[str, Any]) -> None:
event_type = evt.get("type")
item = evt.get("item") or {}
logger.debug(
"[codex] event type=%s item_id=%s item_type=%s status=%s",
event_type,
item.get("id"),
item.get("type"),
item.get("status"),
)
if event_type == "thread.started":
thread_id = evt.get("thread_id")
if isinstance(thread_id, str) and thread_id:
session_box["id"] = thread_id
if progress_renderer.note_event(evt) and progress is not None:
elapsed = time.monotonic() - started_at
msg = progress_renderer.render_progress(elapsed)
progress.set_markdown(msg)
def _stop_background() -> None:
if progress is not None:
progress.stop()
logger.debug("[progress] thread stopped")
try:
session_id, answer, saw_agent_message = runner.run_serialized(
text,
resume_session,
on_event=on_event,
)
except Exception as e:
_stop_background()
err = _clamp_tg_text(f"Error:\n{e}")
if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT:
try:
bot.edit_message_text(
chat_id=chat_id, message_id=progress_id, text=err
)
logger.info(
"[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s",
chat_id,
user_msg_id,
resume_session,
e,
)
return
except Exception as ee:
logger.info("[handle] failed to edit progress into error: %s", ee)
_send_markdown(
bot, chat_id=chat_id, text=err, reply_to_message_id=user_msg_id
)
logger.info(
"[handle] error chat_id=%s user_msg_id=%s resume_session=%r err=%s",
chat_id,
user_msg_id,
resume_session,
e,
)
return
_stop_background()
answer = answer or "(No agent_message captured from JSON stream.)"
elapsed = time.monotonic() - started_at
status = "done" if saw_agent_message else "error"
final_md = progress_renderer.render_final(elapsed, answer, status=status)
final_md = final_md + f"\n\nresume: `{session_id}`"
final_text, final_entities = render_markdown(final_md)
can_edit_final = (
progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
)
if loud_final or not can_edit_final:
_send_markdown(
bot, chat_id=chat_id, text=final_md, reply_to_message_id=user_msg_id
)
if progress_id is not None:
try:
bot.delete_message(chat_id=chat_id, message_id=progress_id)
except Exception as e:
logger.info(
"[handle] delete progress failed chat_id=%s message_id=%s: %s",
chat_id,
progress_id,
e,
)
else:
bot.edit_message_text(
chat_id=chat_id,
message_id=progress_id,
text=final_text,
entities=final_entities or None,
)
logger.info(
"[handle] done chat_id=%s user_msg_id=%s session_id=%r",
chat_id,
user_msg_id,
session_id,
)
while True:
try:
updates = bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
except Exception as e:
logger.info("[telegram] get_updates error: %s", e)
time.sleep(2.0)
continue
for upd in updates:
offset = upd["update_id"] + 1
msg = upd.get("message") or {}
chat_id = msg.get("chat", {}).get("id")
from_bot = msg.get("from", {}).get("is_bot")
msg_text = msg.get("text")
reply_to = (msg.get("reply_to_message") or {}).get("message_id")
logger.info(
"[telegram] received update_id=%s chat_id=%s from_bot=%s has_text=%s reply_to=%s text=%s",
upd.get("update_id"),
chat_id,
from_bot,
msg_text is not None,
reply_to,
repr(msg_text),
)
if "text" not in msg:
logger.info(
"[telegram] ignoring non-text message chat_id=%s update_id=%s",
chat_id,
upd.get("update_id"),
)
continue
if int(chat_id) != config_chat_id:
logger.info(
"[telegram] rejected by ACL chat_id=%s allowed=%s",
chat_id,
config_chat_id,
)
continue
if msg.get("from", {}).get("is_bot"):
logger.info(
"[telegram] ignoring bot message chat_id=%s update_id=%s",
chat_id,
upd.get("update_id"),
)
continue
text = msg["text"]
user_msg_id = msg["message_id"]
logger.info(
"[telegram] accepted message chat_id=%s user_msg_id=%s text=%s",
chat_id,
user_msg_id,
repr(text),
)
resume_session = extract_session_id(text)
r = msg.get("reply_to_message") or {}
resume_session = resume_session or extract_session_id(r.get("text"))
pool.submit(handle, chat_id, user_msg_id, text, resume_session)
_run_main_loop(cfg)
def main() -> None: