refactor: remove sqlite routing

This commit is contained in:
banteg
2025-12-29 02:38:46 +04:00
parent 171f8b5c21
commit 36f0c7ce42
4 changed files with 28 additions and 138 deletions
+5 -2
View File
@@ -23,7 +23,7 @@ chat_id = 123456789
Optional keys:
- common: `bridge_db`, `allowed_chat_ids`, `startup_chat_ids`
- common: `allowed_chat_ids`, `startup_chat_ids`
- exec/resume: `startup_message`, `codex_cmd`, `codex_workspace`, `codex_exec_args`, `max_workers`, `codex_io_mode`, `codex_command_timeout_s`, `codex_no_child_timeout_s`
## Option 1: exec/resume
@@ -46,11 +46,14 @@ Optional flags:
- `--workdir PATH` (override `codex_workspace`)
- `--model NAME` (pass through to `codex exec`)
To resume an existing thread without a database, reply with (or include) the session id shown at the end of the bot response:
`resume: \`019b66fc-64c2-7a71-81cd-081c504cfeb2\``
## Files
- `src/codex_telegram_bridge/constants.py`: limits and config path constants
- `src/codex_telegram_bridge/config.py`: config loading and chat-id parsing helpers
- `src/codex_telegram_bridge/exec_render.py`: renderers for codex exec JSONL events
- `src/codex_telegram_bridge/rendering.py`: markdown rendering
- `src/codex_telegram_bridge/routes.py`: sqlite routing store
- `src/codex_telegram_bridge/telegram_client.py`: Telegram Bot API client
- `src/codex_telegram_bridge/exec_bridge.py`: codex exec + resume bridge
@@ -7,6 +7,7 @@ from __future__ import annotations
import json
import os
import re
import shlex
import subprocess
import threading
@@ -21,7 +22,6 @@ from .config import load_telegram_config
from .constants import TELEGRAM_HARD_LIMIT
from .exec_render import ExecProgressRenderer, ExecRenderState, render_event_cli
from .rendering import render_markdown
from .routes import RouteStore
from .telegram_client import TelegramClient
# -------------------- Codex runner --------------------
@@ -343,7 +343,6 @@ def run(
setup_file_logger(log_file if log_file else None)
config = load_telegram_config()
token = config["bot_token"]
db_path = config.get("bridge_db", "./bridge_routes.sqlite3")
def _as_int_set(value: Any) -> set[int]:
if isinstance(value, int):
@@ -389,7 +388,6 @@ def run(
extra_args.extend(["-c", "notify=[]"])
bot = TelegramClient(token)
store = RouteStore(db_path)
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args)
max_workers = config.get("max_workers")
@@ -484,14 +482,6 @@ def run(
thread_id = evt.get("thread_id")
if isinstance(thread_id, str) and thread_id:
session_box["id"] = thread_id
if progress_id is not None:
store.link(
chat_id,
progress_id,
"exec",
thread_id,
meta={"workspace": workspace},
)
if progress_renderer.note_event(evt) and progress is not None:
elapsed = time.monotonic() - started_at
msg = progress_renderer.render_progress(elapsed)
@@ -515,11 +505,9 @@ def run(
except Exception as e:
_stop_background()
err = _clamp_tg_text(f"Error:\n{e}")
route_id = session_box["id"] or resume_session or "unknown"
if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT:
try:
bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err)
store.link(chat_id, progress_id, "exec", route_id, meta={"error": True})
log(
"[handle] error "
f"chat_id={chat_id} user_msg_id={user_msg_id} "
@@ -534,8 +522,6 @@ def run(
text=err,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "exec", route_id, meta={"error": True})
log(
"[handle] error "
f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}"
@@ -549,6 +535,7 @@ def run(
elapsed = time.monotonic() - started_at
status = "done" if saw_agent_message else "error"
final_md = progress_renderer.render_final(elapsed, answer, status=status)
final_md = final_md + f"\n\nresume: `{session_id}`"
final_text, final_entities = render_markdown(final_md)
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
@@ -558,8 +545,6 @@ def run(
text=final_md,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace})
if progress_id is not None:
try:
bot.delete_message(chat_id=chat_id, message_id=progress_id)
@@ -575,7 +560,6 @@ def run(
text=final_text,
entities=final_entities or None,
)
store.link(chat_id, progress_id, "exec", session_id, meta={"workspace": workspace})
log(
"[handle] done "
@@ -632,22 +616,19 @@ def run(
f"chat_id={chat_id} user_msg_id={user_msg_id} text={_one_line(text)}"
)
# If user replied to a bot message, route to that session
resume_session: Optional[str] = None
r = msg.get("reply_to_message")
if r and "message_id" in r:
route = store.resolve(chat_id, r["message_id"])
if route and route.route_type == "exec":
resume_session = route.route_id
log(
"[telegram] resolved reply route "
f"chat_id={chat_id} bot_message_id={r['message_id']} session_id={resume_session!r}"
)
else:
log(
"[telegram] reply has no exec route "
f"chat_id={chat_id} bot_message_id={r['message_id']}"
)
uuid_re = re.compile(
r"(?i)\\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\\b"
)
def _extract_session_id(value: Optional[str]) -> Optional[str]:
if not value:
return None
m = uuid_re.search(value)
return m.group(0) if m else None
resume_session = _extract_session_id(text)
r = msg.get("reply_to_message") or {}
resume_session = resume_session or _extract_session_id(r.get("text"))
pool.submit(handle, chat_id, user_msg_id, text, resume_session)
@@ -1,85 +0,0 @@
from __future__ import annotations
import json
import os
import sqlite3
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
def _now_unix() -> int:
return int(time.time())
@dataclass(frozen=True)
class Route:
route_type: str # "exec"
route_id: str # session_id
meta: Dict[str, Any]
class RouteStore:
"""
Stores mapping: (chat_id, bot_message_id) -> route
so Telegram replies can be routed.
"""
def __init__(self, path: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL;")
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS routes (
chat_id INTEGER NOT NULL,
bot_message_id INTEGER NOT NULL,
route_type TEXT NOT NULL,
route_id TEXT NOT NULL,
meta_json TEXT,
created_at INTEGER NOT NULL,
PRIMARY KEY (chat_id, bot_message_id)
);
"""
)
self._conn.execute(
"CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);"
)
self._conn.commit()
def link(
self,
chat_id: int,
bot_message_id: int,
route_type: str,
route_id: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
meta_json = json.dumps(meta or {}, ensure_ascii=False)
self._conn.execute(
"""
INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at)
VALUES(?, ?, ?, ?, ?, ?)
""",
(chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()),
)
self._conn.commit()
def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]:
cur = self._conn.execute(
"""
SELECT route_type, route_id, meta_json
FROM routes
WHERE chat_id = ? AND bot_message_id = ?
""",
(chat_id, bot_message_id),
)
row = cur.fetchone()
if not row:
return None
route_type, route_id, meta_json = row
try:
meta = json.loads(meta_json) if meta_json else {}
except json.JSONDecodeError:
meta = {}
return Route(route_type=route_type, route_id=route_id, meta=meta)
@@ -116,23 +116,14 @@ class TelegramClient:
rendered_text, entities = render_markdown(text)
limit = min(chunk_len, TELEGRAM_HARD_LIMIT)
if len(rendered_text) > limit:
suffix = "\n" + ELLIPSIS
keep = max(0, limit - len(suffix))
rendered_text = rendered_text[:keep] + suffix
if entities:
trimmed: List[Dict[str, Any]] = []
for ent in entities:
start = int(ent["offset"])
length = int(ent["length"])
if start >= keep:
continue
end = min(start + length, keep)
if end <= start:
continue
d = dict(ent)
d["length"] = end - start
trimmed.append(d)
entities = trimmed
# Keep a tail section to preserve the "resume: `...`" line at the end.
# If we truncate, drop entities to avoid offset gymnastics.
sep = "\n" + ELLIPSIS + "\n"
tail_len = min(400, max(120, limit // 3))
tail = rendered_text[-tail_len:]
head_len = max(0, limit - len(sep) - len(tail))
rendered_text = rendered_text[:head_len] + sep + tail
entities = None
msg = self.send_message(
chat_id=chat_id,