From 36f0c7ce427b3a0c087c82a3173270e28026bfcf Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Mon, 29 Dec 2025 02:38:46 +0400 Subject: [PATCH] refactor: remove sqlite routing --- codex_telegram_bridge/readme.md | 7 +- .../src/codex_telegram_bridge/exec_bridge.py | 49 ++++------- .../src/codex_telegram_bridge/routes.py | 85 ------------------- .../codex_telegram_bridge/telegram_client.py | 25 ++---- 4 files changed, 28 insertions(+), 138 deletions(-) delete mode 100644 codex_telegram_bridge/src/codex_telegram_bridge/routes.py diff --git a/codex_telegram_bridge/readme.md b/codex_telegram_bridge/readme.md index a40892a..393ab5f 100644 --- a/codex_telegram_bridge/readme.md +++ b/codex_telegram_bridge/readme.md @@ -23,7 +23,7 @@ chat_id = 123456789 Optional keys: -- common: `bridge_db`, `allowed_chat_ids`, `startup_chat_ids` +- common: `allowed_chat_ids`, `startup_chat_ids` - exec/resume: `startup_message`, `codex_cmd`, `codex_workspace`, `codex_exec_args`, `max_workers`, `codex_io_mode`, `codex_command_timeout_s`, `codex_no_child_timeout_s` ## Option 1: exec/resume @@ -46,11 +46,14 @@ Optional flags: - `--workdir PATH` (override `codex_workspace`) - `--model NAME` (pass through to `codex exec`) +To resume an existing thread without a database, reply with (or include) the session id shown at the end of the bot response: + +`resume: \`019b66fc-64c2-7a71-81cd-081c504cfeb2\`` + ## Files - `src/codex_telegram_bridge/constants.py`: limits and config path constants - `src/codex_telegram_bridge/config.py`: config loading and chat-id parsing helpers - `src/codex_telegram_bridge/exec_render.py`: renderers for codex exec JSONL events - `src/codex_telegram_bridge/rendering.py`: markdown rendering -- `src/codex_telegram_bridge/routes.py`: sqlite routing store - `src/codex_telegram_bridge/telegram_client.py`: Telegram Bot API client - `src/codex_telegram_bridge/exec_bridge.py`: codex exec + resume bridge diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py index eb8784a..315f4b7 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/exec_bridge.py @@ -7,6 +7,7 @@ from __future__ import annotations import json import os +import re import shlex import subprocess import threading @@ -21,7 +22,6 @@ from .config import load_telegram_config from .constants import TELEGRAM_HARD_LIMIT from .exec_render import ExecProgressRenderer, ExecRenderState, render_event_cli from .rendering import render_markdown -from .routes import RouteStore from .telegram_client import TelegramClient # -------------------- Codex runner -------------------- @@ -343,7 +343,6 @@ def run( setup_file_logger(log_file if log_file else None) config = load_telegram_config() token = config["bot_token"] - db_path = config.get("bridge_db", "./bridge_routes.sqlite3") def _as_int_set(value: Any) -> set[int]: if isinstance(value, int): @@ -389,7 +388,6 @@ def run( extra_args.extend(["-c", "notify=[]"]) bot = TelegramClient(token) - store = RouteStore(db_path) runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) max_workers = config.get("max_workers") @@ -484,14 +482,6 @@ def run( thread_id = evt.get("thread_id") if isinstance(thread_id, str) and thread_id: session_box["id"] = thread_id - if progress_id is not None: - store.link( - chat_id, - progress_id, - "exec", - thread_id, - meta={"workspace": workspace}, - ) if progress_renderer.note_event(evt) and progress is not None: elapsed = time.monotonic() - started_at msg = progress_renderer.render_progress(elapsed) @@ -515,11 +505,9 @@ def run( except Exception as e: _stop_background() err = _clamp_tg_text(f"Error:\n{e}") - route_id = session_box["id"] or resume_session or "unknown" if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT: try: bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err) - store.link(chat_id, progress_id, "exec", route_id, meta={"error": True}) log( "[handle] error " f"chat_id={chat_id} user_msg_id={user_msg_id} " @@ -534,8 +522,6 @@ def run( text=err, reply_to_message_id=user_msg_id, ) - for m in sent_msgs: - store.link(chat_id, m["message_id"], "exec", route_id, meta={"error": True}) log( "[handle] error " f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}" @@ -549,6 +535,7 @@ def run( elapsed = time.monotonic() - started_at status = "done" if saw_agent_message else "error" final_md = progress_renderer.render_final(elapsed, answer, status=status) + final_md = final_md + f"\n\nresume: `{session_id}`" final_text, final_entities = render_markdown(final_md) can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT @@ -558,8 +545,6 @@ def run( text=final_md, reply_to_message_id=user_msg_id, ) - for m in sent_msgs: - store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace}) if progress_id is not None: try: bot.delete_message(chat_id=chat_id, message_id=progress_id) @@ -575,7 +560,6 @@ def run( text=final_text, entities=final_entities or None, ) - store.link(chat_id, progress_id, "exec", session_id, meta={"workspace": workspace}) log( "[handle] done " @@ -632,22 +616,19 @@ def run( f"chat_id={chat_id} user_msg_id={user_msg_id} text={_one_line(text)}" ) - # If user replied to a bot message, route to that session - resume_session: Optional[str] = None - r = msg.get("reply_to_message") - if r and "message_id" in r: - route = store.resolve(chat_id, r["message_id"]) - if route and route.route_type == "exec": - resume_session = route.route_id - log( - "[telegram] resolved reply route " - f"chat_id={chat_id} bot_message_id={r['message_id']} session_id={resume_session!r}" - ) - else: - log( - "[telegram] reply has no exec route " - f"chat_id={chat_id} bot_message_id={r['message_id']}" - ) + uuid_re = re.compile( + r"(?i)\\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\b" + ) + + def _extract_session_id(value: Optional[str]) -> Optional[str]: + if not value: + return None + m = uuid_re.search(value) + return m.group(0) if m else None + + resume_session = _extract_session_id(text) + r = msg.get("reply_to_message") or {} + resume_session = resume_session or _extract_session_id(r.get("text")) pool.submit(handle, chat_id, user_msg_id, text, resume_session) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/routes.py b/codex_telegram_bridge/src/codex_telegram_bridge/routes.py deleted file mode 100644 index 515b1ab..0000000 --- a/codex_telegram_bridge/src/codex_telegram_bridge/routes.py +++ /dev/null @@ -1,85 +0,0 @@ -from __future__ import annotations - -import json -import os -import sqlite3 -import time -from dataclasses import dataclass -from typing import Any, Dict, Optional - - -def _now_unix() -> int: - return int(time.time()) - - -@dataclass(frozen=True) -class Route: - route_type: str # "exec" - route_id: str # session_id - meta: Dict[str, Any] - - -class RouteStore: - """ - Stores mapping: (chat_id, bot_message_id) -> route - so Telegram replies can be routed. - """ - - def __init__(self, path: str) -> None: - os.makedirs(os.path.dirname(path) or ".", exist_ok=True) - self._conn = sqlite3.connect(path, check_same_thread=False) - self._conn.execute("PRAGMA journal_mode=WAL;") - self._conn.execute( - """ - CREATE TABLE IF NOT EXISTS routes ( - chat_id INTEGER NOT NULL, - bot_message_id INTEGER NOT NULL, - route_type TEXT NOT NULL, - route_id TEXT NOT NULL, - meta_json TEXT, - created_at INTEGER NOT NULL, - PRIMARY KEY (chat_id, bot_message_id) - ); - """ - ) - self._conn.execute( - "CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);" - ) - self._conn.commit() - - def link( - self, - chat_id: int, - bot_message_id: int, - route_type: str, - route_id: str, - meta: Optional[Dict[str, Any]] = None, - ) -> None: - meta_json = json.dumps(meta or {}, ensure_ascii=False) - self._conn.execute( - """ - INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at) - VALUES(?, ?, ?, ?, ?, ?) - """, - (chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()), - ) - self._conn.commit() - - def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]: - cur = self._conn.execute( - """ - SELECT route_type, route_id, meta_json - FROM routes - WHERE chat_id = ? AND bot_message_id = ? - """, - (chat_id, bot_message_id), - ) - row = cur.fetchone() - if not row: - return None - route_type, route_id, meta_json = row - try: - meta = json.loads(meta_json) if meta_json else {} - except json.JSONDecodeError: - meta = {} - return Route(route_type=route_type, route_id=route_id, meta=meta) diff --git a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py index 0303c43..2cbf805 100644 --- a/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py +++ b/codex_telegram_bridge/src/codex_telegram_bridge/telegram_client.py @@ -116,23 +116,14 @@ class TelegramClient: rendered_text, entities = render_markdown(text) limit = min(chunk_len, TELEGRAM_HARD_LIMIT) if len(rendered_text) > limit: - suffix = "\n" + ELLIPSIS - keep = max(0, limit - len(suffix)) - rendered_text = rendered_text[:keep] + suffix - if entities: - trimmed: List[Dict[str, Any]] = [] - for ent in entities: - start = int(ent["offset"]) - length = int(ent["length"]) - if start >= keep: - continue - end = min(start + length, keep) - if end <= start: - continue - d = dict(ent) - d["length"] = end - start - trimmed.append(d) - entities = trimmed + # Keep a tail section to preserve the "resume: `...`" line at the end. + # If we truncate, drop entities to avoid offset gymnastics. + sep = "\n" + ELLIPSIS + "\n" + tail_len = min(400, max(120, limit // 3)) + tail = rendered_text[-tail_len:] + head_len = max(0, limit - len(sep) - len(tail)) + rendered_text = rendered_text[:head_len] + sep + tail + entities = None msg = self.send_message( chat_id=chat_id,