chore: move telegram projects to top-level

This commit is contained in:
banteg
2025-12-28 21:31:48 +04:00
parent 5f4f97f529
commit 39c0bcec45
11 changed files with 2 additions and 2 deletions
+27
View File
@@ -0,0 +1,27 @@
[project]
name = "codex-telegram-bridge"
version = "0.1.0"
description = "Telegram bridge tools for Codex."
readme = "readme.md"
requires-python = ">=3.10"
dependencies = [
"markdown-it-py",
"sulguk",
"tomli; python_version < '3.11'",
]
[project.scripts]
codex-telegram-exec-bridge = "codex_telegram_bridge.exec_bridge:main"
codex-telegram-mcp-bridge = "codex_telegram_bridge.mcp_bridge:main"
codex-telegram-tmux-notify = "codex_telegram_bridge.tmux_notify:main"
codex-telegram-tmux-reply = "codex_telegram_bridge.tmux_reply_bot:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/codex_telegram_bridge"]
[tool.hatch.build.targets.sdist]
include = ["src/codex_telegram_bridge", "readme.md"]
+71
View File
@@ -0,0 +1,71 @@
# Telegram Codex Bridge (Codex)
Route Telegram replies back into Codex sessions. Includes three options:
1. Non-interactive `codex exec` + `codex exec resume`.
2. `codex mcp-server` with MCP stdio JSON-RPC.
3. tmux injection for interactive Codex sessions.
All options store a mapping from `(chat_id, bot_message_id)` to a route so replies can be routed correctly.
## Install
1. Ensure `uv` is installed.
2. From this folder, run the entrypoints with `uv run` (uses `pyproject.toml` deps).
3. Put your Telegram credentials in `~/.codex/telegram.toml`.
Example `~/.codex/telegram.toml`:
```toml
bot_token = "123:abc"
chat_id = 123456789
```
For Python < 3.11, install `tomli` to read TOML. `chat_id` is used both for allowed messages
and startup notifications.
Optional keys (by mode):
- common: `bridge_db`, `allowed_chat_ids`, `startup_chat_ids`
- exec/resume: `startup_message`, `codex_cmd`, `codex_workspace`, `codex_exec_args`, `max_workers`
- MCP server: `codex_mcp_cmd`, `codex_workspace`, `codex_sandbox`, `codex_approval_policy`
## Option 1: exec/resume
Run:
```bash
uv run codex-telegram-exec-bridge
```
## Option 2: MCP server
Run:
```bash
uv run codex-telegram-mcp-bridge
```
## Option 3: tmux
Reply injector:
```bash
uv run codex-telegram-tmux-reply
```
Notifier (call from your existing hook):
```bash
uv run codex-telegram-tmux-notify --tmux-target "codex1:0.0" --text "$TURN_TEXT"
```
Add `--chat-id` if `chat_id` is not set in `~/.codex/telegram.toml`.
## Files
- `src/codex_telegram_bridge/bridge_common.py`: shared Telegram client, chunking, and routing store
- `src/codex_telegram_bridge/exec_bridge.py`: codex exec + resume bridge
- `src/codex_telegram_bridge/mcp_bridge.py`: MCP stdio JSON-RPC bridge
- `src/codex_telegram_bridge/tmux_notify.py`: tmux notifier helper
- `src/codex_telegram_bridge/tmux_reply_bot.py`: tmux reply injector
@@ -0,0 +1,2 @@
"""Telegram Codex bridge package."""
@@ -0,0 +1,442 @@
from __future__ import annotations
import json
import os
import re
import sqlite3
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from markdown_it import MarkdownIt
from sulguk import transform_html
TELEGRAM_HARD_LIMIT = 4096
DEFAULT_CHUNK_LEN = 3500 # leave room for formatting / safety
TELEGRAM_CONFIG_PATH = Path.home() / ".codex" / "telegram.toml"
def _now_unix() -> int:
return int(time.time())
def _load_toml(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
try:
import tomllib # type: ignore[attr-defined]
except ModuleNotFoundError:
try:
import tomli as tomllib # type: ignore[import-not-found]
except ModuleNotFoundError as e:
raise RuntimeError(
f"TOML config found at {path} but tomllib/tomli is unavailable. "
"Use Python 3.11+ or install tomli."
) from e
return tomllib.loads(path.read_text(encoding="utf-8"))
def load_telegram_config(path: Optional[str] = None) -> Dict[str, Any]:
cfg_path = Path(path) if path else TELEGRAM_CONFIG_PATH
return _load_toml(cfg_path)
def config_get(config: Dict[str, Any], key: str) -> Any:
if key in config:
return config[key]
nested = config.get("telegram")
if isinstance(nested, dict) and key in nested:
return nested[key]
return None
def render_markdown(md: str) -> Tuple[str, List[Dict[str, Any]]]:
html = MarkdownIt("commonmark", {"html": False}).render(md or "")
rendered = transform_html(html)
text = re.sub("(?m)^(\\s*)\u2022", r"\1-", rendered.text)
# FIX: Telegram requires MessageEntity.language (if present) to be a String.
entities: List[Dict[str, Any]] = []
for e in rendered.entities:
d = dict(e)
if "language" in d and not isinstance(d["language"], str):
d.pop("language", None)
entities.append(d)
return text, entities
def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]:
"""
Telegram hard limit is 4096 chars. Chunk at newlines when possible.
"""
text = text or ""
if len(text) <= limit:
return [text]
out: List[str] = []
buf: List[str] = []
size = 0
for line in text.splitlines(keepends=True):
if len(line) > limit:
# flush current buffer
if buf:
out.append("".join(buf))
buf, size = [], 0
# hard-split this long line
for i in range(0, len(line), limit):
out.append(line[i : i + limit])
continue
if size + len(line) > limit:
out.append("".join(buf))
buf, size = [line], len(line)
else:
buf.append(line)
size += len(line)
if buf:
out.append("".join(buf))
return out
def _chunk_text_with_indices(text: str, limit: int) -> List[Tuple[str, int, int]]:
text = text or ""
if len(text) <= limit:
return [(text, 0, len(text))]
out: List[Tuple[str, int, int]] = []
buf: List[str] = []
size = 0
buf_start = 0
pos = 0
for line in text.splitlines(keepends=True):
line_len = len(line)
line_start = pos
line_end = pos + line_len
if line_len > limit:
if buf:
out.append(("".join(buf), buf_start, line_start))
buf, size = [], 0
for i in range(0, line_len, limit):
part = line[i : i + limit]
out.append((part, line_start + i, line_start + i + len(part)))
pos = line_end
buf_start = pos
continue
if size + line_len > limit:
out.append(("".join(buf), buf_start, line_start))
buf = [line]
size = line_len
buf_start = line_start
else:
if not buf:
buf_start = line_start
buf.append(line)
size += line_len
pos = line_end
if buf:
out.append(("".join(buf), buf_start, pos))
return out
def _slice_entities(entities: List[Dict[str, Any]], start: int, end: int) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for ent in entities:
try:
ent_start = int(ent.get("offset", 0))
ent_len = int(ent.get("length", 0))
except (TypeError, ValueError):
continue
if ent_len <= 0:
continue
ent_end = ent_start + ent_len
if ent_end <= start or ent_start >= end:
continue
new_start = max(ent_start, start)
new_end = min(ent_end, end)
new_len = new_end - new_start
if new_len <= 0:
continue
new_ent = dict(ent)
new_ent["offset"] = new_start - start
new_ent["length"] = new_len
out.append(new_ent)
return out
class TelegramClient:
"""
Minimal Telegram Bot API client using standard library (no requests dependency).
"""
def __init__(self, token: str, timeout_s: int = 120) -> None:
if not token:
raise ValueError("Telegram token is empty")
self._base = f"https://api.telegram.org/bot{token}"
self._timeout_s = timeout_s
def _call(self, method: str, params: Dict[str, Any]) -> Any:
url = f"{self._base}/{method}"
data = json.dumps(params).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=self._timeout_s) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Telegram URLError: {e}") from e
if not payload.get("ok"):
raise RuntimeError(f"Telegram API error: {payload}")
return payload["result"]
def get_updates(
self,
offset: Optional[int],
timeout_s: int = 50,
allowed_updates: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
params: Dict[str, Any] = {"timeout": timeout_s}
if offset is not None:
params["offset"] = offset
if allowed_updates is not None:
params["allowed_updates"] = allowed_updates
return self._call("getUpdates", params)
def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: Optional[bool] = False,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("send_message received too-long text; chunk it first")
params: Dict[str, Any] = {
"chat_id": chat_id,
"text": text,
}
if disable_notification is not None:
params["disable_notification"] = disable_notification
if reply_to_message_id is not None:
params["reply_to_message_id"] = reply_to_message_id
if entities is not None:
params["entities"] = entities
return self._call("sendMessage", params)
def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("edit_message_text received too-long text")
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
"text": text,
}
if entities is not None:
params["entities"] = entities
return self._call("editMessageText", params)
def delete_message(self, chat_id: int, message_id: int) -> bool:
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
}
res = self._call("deleteMessage", params)
return bool(res)
def send_message_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
chunks = chunk_text(text, limit=chunk_len)
for i, c in enumerate(chunks):
msg = self.send_message(
chat_id=chat_id,
text=c,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
)
sent.append(msg)
return sent
def send_message_markdown_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
rendered_text, entities = render_markdown(text)
chunks = _chunk_text_with_indices(rendered_text, limit=chunk_len)
for i, (chunk, start, end) in enumerate(chunks):
chunk_entities = _slice_entities(entities, start, end) if entities else None
msg = self.send_message(
chat_id=chat_id,
text=chunk,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
entities=chunk_entities,
)
sent.append(msg)
return sent
def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict[str, Any]:
params: Dict[str, Any] = {
"chat_id": chat_id,
"action": action,
}
return self._call("sendChatAction", params)
@dataclass(frozen=True)
class Route:
route_type: str # "exec" | "mcp" | "tmux"
route_id: str # session_id / conversationId / tmux target
meta: Dict[str, Any]
class RouteStore:
"""
Stores mapping: (chat_id, bot_message_id) -> route
so Telegram replies can be routed.
"""
def __init__(self, path: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL;")
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS routes (
chat_id INTEGER NOT NULL,
bot_message_id INTEGER NOT NULL,
route_type TEXT NOT NULL,
route_id TEXT NOT NULL,
meta_json TEXT,
created_at INTEGER NOT NULL,
PRIMARY KEY (chat_id, bot_message_id)
);
"""
)
self._conn.execute(
"CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);"
)
self._conn.commit()
def link(
self,
chat_id: int,
bot_message_id: int,
route_type: str,
route_id: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
meta_json = json.dumps(meta or {}, ensure_ascii=False)
self._conn.execute(
"""
INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at)
VALUES(?, ?, ?, ?, ?, ?)
""",
(chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()),
)
self._conn.commit()
def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]:
cur = self._conn.execute(
"""
SELECT route_type, route_id, meta_json
FROM routes
WHERE chat_id = ? AND bot_message_id = ?
""",
(chat_id, bot_message_id),
)
row = cur.fetchone()
if not row:
return None
route_type, route_id, meta_json = row
try:
meta = json.loads(meta_json) if meta_json else {}
except json.JSONDecodeError:
meta = {}
return Route(route_type=route_type, route_id=route_id, meta=meta)
def close(self) -> None:
self._conn.close()
def parse_allowed_chat_ids(value: str) -> Optional[set[int]]:
"""
Parse a comma-separated chat id string like "123,456".
"""
v = (value or "").strip()
if not v:
return None
out: set[int] = set()
for part in v.split(","):
part = part.strip()
if not part:
continue
out.add(int(part))
return out
def parse_chat_id_list(value: Any) -> Optional[set[int]]:
if value is None:
return None
if isinstance(value, str):
return parse_allowed_chat_ids(value)
if isinstance(value, int):
return {value}
if isinstance(value, (list, tuple, set)):
out: set[int] = set()
for item in value:
if item is None:
continue
if isinstance(item, str):
if not item.strip():
continue
out.add(int(item))
else:
out.add(int(item))
return out or None
return None
def resolve_chat_ids(config: Dict[str, Any]) -> Optional[set[int]]:
chat_ids = parse_chat_id_list(config_get(config, "chat_id"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "allowed_chat_ids"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "startup_chat_ids"))
return chat_ids
@@ -0,0 +1,531 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["markdown-it-py", "sulguk", "tomli; python_version < '3.11'"]
# ///
from __future__ import annotations
import json
import os
import shlex
import subprocess
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, Optional, Tuple
from .bridge_common import (
TelegramClient,
RouteStore,
TELEGRAM_HARD_LIMIT,
config_get,
load_telegram_config,
render_markdown,
resolve_chat_ids,
)
# -------------------- Codex runner --------------------
def log(msg: str) -> None:
ts = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{ts}] {msg}", flush=True)
def _one_line(text: Optional[str]) -> str:
if text is None:
return "None"
return text.replace("\r", "\\r").replace("\n", "\\n")
TELEGRAM_TEXT_LIMIT = TELEGRAM_HARD_LIMIT
def _clamp_tg_text(text: str, limit: int = TELEGRAM_TEXT_LIMIT) -> str:
if len(text) <= limit:
return text
return text[: limit - 20] + "\n...(truncated)"
def _summarize_item(item: Dict[str, Any]) -> str:
item_type = item.get("type")
if isinstance(item_type, str):
if item_type == "agent_message" and isinstance(item.get("text"), str):
snippet = item["text"].strip().replace("\n", " ")
if len(snippet) > 140:
snippet = snippet[:140] + "..."
return f"agent_message: {snippet}"
name = item.get("name") or item.get("tool_name") or item.get("id")
if isinstance(name, str):
return f"{item_type}: {name}"
return item_type
return "item.completed"
class ProgressEditor:
def __init__(
self,
bot: TelegramClient,
chat_id: int,
message_id: int,
edit_every_s: float,
) -> None:
self.bot = bot
self.chat_id = chat_id
self.message_id = message_id
self.edit_every_s = edit_every_s
self._lock = threading.Lock()
self._pending: Optional[str] = None
self._last_sent: Optional[str] = None
self._last_edit_at = 0.0
self._stop = threading.Event()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def set(self, text: str) -> None:
text = _clamp_tg_text(text)
with self._lock:
self._pending = text
def stop(self) -> None:
self._stop.set()
self._thread.join(timeout=1.0)
def _edit(self, text: str) -> None:
try:
self.bot.edit_message_text(
chat_id=self.chat_id,
message_id=self.message_id,
text=text,
)
except Exception as e:
log(
"[progress] edit failed "
f"chat_id={self.chat_id} message_id={self.message_id}: {e}"
)
def _run(self) -> None:
while not self._stop.is_set():
to_send: Optional[str] = None
now = time.monotonic()
with self._lock:
if self._pending is not None and (now - self._last_edit_at) >= self.edit_every_s:
if self._pending != self._last_sent:
to_send = self._pending
self._last_sent = self._pending
self._last_edit_at = now
self._pending = None
if to_send is not None:
self._edit(to_send)
self._stop.wait(0.2)
def _typing_loop(bot: TelegramClient, chat_id: int, stop_evt: threading.Event) -> None:
while not stop_evt.is_set():
try:
bot.send_chat_action(chat_id=chat_id, action="typing")
except Exception as e:
log(f"[typing] send_chat_action failed chat_id={chat_id}: {e}")
stop_evt.wait(4.0)
class CodexExecRunner:
"""
Runs Codex in non-interactive mode:
- new: codex exec --json ... -
- resume: codex exec --json ... resume <SESSION_ID> -
"""
def __init__(self, codex_cmd: str, workspace: Optional[str], extra_args: list[str]) -> None:
self.codex_cmd = codex_cmd
self.workspace = workspace
self.extra_args = extra_args
# per-session locks to prevent concurrent resumes to same session_id
self._locks: dict[str, threading.Lock] = {}
self._locks_guard = threading.Lock()
def _lock_for(self, session_id: str) -> threading.Lock:
with self._locks_guard:
if session_id not in self._locks:
self._locks[session_id] = threading.Lock()
return self._locks[session_id]
def run(
self,
prompt: str,
session_id: Optional[str],
on_event: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Tuple[str, str]:
"""
Returns (session_id, final_agent_message_text)
"""
log(f"[codex] start run session_id={session_id!r} workspace={self.workspace!r}")
args = [self.codex_cmd, "exec", "--json"]
args.extend(self.extra_args)
if self.workspace:
args.extend(["--cd", self.workspace])
# Always pipe prompt via stdin ("-") to avoid quoting issues.
if session_id:
args.extend(["resume", session_id, "-"])
else:
args.append("-")
# read both stdout+stderr without deadlock
proc = subprocess.Popen(
args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
assert proc.stdin and proc.stdout and proc.stderr
# send prompt then close stdin
proc.stdin.write(prompt)
proc.stdin.close()
stderr_lines: list[str] = []
def _drain_stderr() -> None:
for line in proc.stderr:
log(f"[codex][stderr] {line.rstrip()}")
stderr_lines.append(line)
t = threading.Thread(target=_drain_stderr, daemon=True)
t.start()
found_session: Optional[str] = session_id
last_agent_text: Optional[str] = None
for line in proc.stdout:
line = line.strip()
if not line:
continue
log(f"[codex][event] {line}")
try:
evt = json.loads(line)
except json.JSONDecodeError:
continue
if on_event is not None:
try:
on_event(evt)
except Exception as e:
log(f"[codex][on_event] callback error: {e}")
# From Codex JSONL event stream
if evt.get("type") == "thread.started":
found_session = evt.get("thread_id") or found_session
if evt.get("type") == "item.completed":
item = evt.get("item") or {}
if item.get("type") == "agent_message" and isinstance(item.get("text"), str):
last_agent_text = item["text"]
rc = proc.wait()
t.join(timeout=2.0)
if rc != 0:
tail = "".join(stderr_lines[-200:])
raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}")
if not found_session:
raise RuntimeError("codex exec finished but no session_id/thread_id was captured")
log(f"[codex] done run session_id={found_session!r}")
return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)")
def run_serialized(
self,
prompt: str,
session_id: Optional[str],
on_event: Optional[Callable[[Dict[str, Any]], None]] = None,
) -> Tuple[str, str]:
"""
If resuming, serialize per-session.
"""
if not session_id:
return self.run(prompt, session_id=None, on_event=on_event)
lock = self._lock_for(session_id)
with lock:
return self.run(prompt, session_id=session_id, on_event=on_event)
# -------------------- Telegram loop --------------------
def main() -> None:
config = load_telegram_config()
token = config_get(config, "bot_token") or ""
db_path = config_get(config, "bridge_db") or "./bridge_routes.sqlite3"
chat_ids = resolve_chat_ids(config)
allowed = chat_ids
startup_ids = chat_ids
startup_msg = config_get(config, "startup_message") or "✅ exec_bridge started (codex exec)."
startup_pwd = os.getcwd()
startup_msg = f"{startup_msg}\nPWD: {startup_pwd}"
codex_cmd = config_get(config, "codex_cmd") or "codex"
workspace = config_get(config, "codex_workspace")
raw_exec_args = config_get(config, "codex_exec_args") or ""
if isinstance(raw_exec_args, list):
extra_args = [str(v) for v in raw_exec_args]
else:
extra_args = shlex.split(str(raw_exec_args)) # e.g. "--full-auto --search"
def _has_notify_override(args: list[str]) -> bool:
for i, arg in enumerate(args):
if arg in ("-c", "--config"):
if i + 1 >= len(args):
continue
key = args[i + 1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
elif arg.startswith(("--config=", "-c=")):
key = arg.split("=", 1)[1].split("=", 1)[0].strip()
if key == "notify" or key.endswith(".notify"):
return True
return False
# Default: disable notify hook for exec-bridge runs to avoid duplicate messages.
if not _has_notify_override(extra_args):
extra_args.extend(["-c", "notify=[]"])
bot = TelegramClient(token)
store = RouteStore(db_path)
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args)
max_workers = config_get(config, "max_workers")
if isinstance(max_workers, str):
max_workers = int(max_workers) if max_workers.strip() else None
elif not isinstance(max_workers, int):
max_workers = None
pool = ThreadPoolExecutor(max_workers=max_workers or 4)
offset: Optional[int] = None
log(f"[startup] pwd={startup_pwd}")
log("Option1 bridge running (codex exec). Long-polling Telegram...")
if startup_ids:
for chat_id in startup_ids:
try:
bot.send_message(chat_id=chat_id, text=startup_msg)
log(f"[startup] sent startup message to chat_id={chat_id}")
except Exception as e:
log(f"[startup] failed to send startup message to chat_id={chat_id}: {e}")
else:
log("[startup] no chat_id configured; skipping startup message")
def handle(chat_id: int, user_msg_id: int, text: str, resume_session: Optional[str]) -> None:
log(
"[handle] start "
f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r}"
)
try:
edit_every_s = float(os.environ.get("TG_PROGRESS_EDIT_EVERY_S", "2.5"))
except ValueError:
edit_every_s = 2.5
silent_progress = os.environ.get("TG_PROGRESS_SILENT", "1") == "1"
loud_final = os.environ.get("TG_FINAL_NOTIFY", "1") == "1"
typing_stop = threading.Event()
typing_thread = threading.Thread(
target=_typing_loop,
args=(bot, chat_id, typing_stop),
daemon=True,
)
typing_thread.start()
progress_id: Optional[int] = None
progress: Optional[ProgressEditor] = None
try:
progress_msg = bot.send_message(
chat_id=chat_id,
text="Working...",
reply_to_message_id=user_msg_id,
disable_notification=silent_progress,
)
progress_id = int(progress_msg["message_id"])
except Exception as e:
log(f"[handle] failed to send progress message chat_id={chat_id}: {e}")
if progress_id is not None:
progress = ProgressEditor(bot, chat_id, progress_id, edit_every_s)
started_at = time.monotonic()
session_box: dict[str, Optional[str]] = {"id": resume_session}
def on_event(evt: Dict[str, Any]) -> None:
event_type = evt.get("type")
if event_type == "thread.started":
thread_id = evt.get("thread_id")
if isinstance(thread_id, str) and thread_id:
session_box["id"] = thread_id
if progress_id is not None:
store.link(
chat_id,
progress_id,
"exec",
thread_id,
meta={"workspace": workspace},
)
elif event_type == "item.completed":
item = evt.get("item") or {}
elapsed = int(time.monotonic() - started_at)
line = _summarize_item(item) if isinstance(item, dict) else "item.completed"
session_id = session_box["id"]
header = f"Working... ({elapsed}s)"
if session_id:
header += f"\nSession: {session_id}"
msg = f"{header}\n\n{line}"
if progress is not None:
progress.set(msg)
def _stop_background() -> None:
typing_stop.set()
typing_thread.join(timeout=1.0)
if progress is not None:
progress.stop()
try:
session_id, answer = runner.run_serialized(text, resume_session, on_event=on_event)
except Exception as e:
_stop_background()
err = _clamp_tg_text(f"Error:\n{e}")
route_id = session_box["id"] or resume_session or "unknown"
if progress_id is not None and len(err) <= TELEGRAM_TEXT_LIMIT:
try:
bot.edit_message_text(chat_id=chat_id, message_id=progress_id, text=err)
store.link(chat_id, progress_id, "exec", route_id, meta={"error": True})
log(
"[handle] error "
f"chat_id={chat_id} user_msg_id={user_msg_id} "
f"resume_session={resume_session!r} err={e}"
)
return
except Exception as ee:
log(f"[handle] failed to edit progress into error: {ee}")
sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id,
text=err,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "exec", route_id, meta={"error": True})
log(
"[handle] error "
f"chat_id={chat_id} user_msg_id={user_msg_id} resume_session={resume_session!r} err={e}"
)
return
_stop_background()
answer = answer or "(No agent_message captured from JSON stream.)"
final_text, final_entities = render_markdown(answer)
can_edit_final = progress_id is not None and len(final_text) <= TELEGRAM_TEXT_LIMIT
if loud_final or not can_edit_final:
if progress_id is not None:
try:
bot.delete_message(chat_id=chat_id, message_id=progress_id)
except Exception as e:
log(f"[handle] delete progress failed chat_id={chat_id} message_id={progress_id}: {e}")
sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id,
text=answer,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace})
else:
bot.edit_message_text(
chat_id=chat_id,
message_id=progress_id,
text=final_text,
entities=final_entities or None,
)
store.link(chat_id, progress_id, "exec", session_id, meta={"workspace": workspace})
log(
"[handle] done "
f"chat_id={chat_id} user_msg_id={user_msg_id} session_id={session_id!r}"
)
while True:
try:
updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"])
except Exception as e:
log(f"[telegram] get_updates error: {e}")
time.sleep(2.0)
continue
for upd in updates:
offset = upd["update_id"] + 1
msg = upd.get("message") or {}
chat_id = msg.get("chat", {}).get("id")
from_bot = msg.get("from", {}).get("is_bot")
msg_text = msg.get("text")
reply_to = (msg.get("reply_to_message") or {}).get("message_id")
log(
"[telegram] received "
f"update_id={upd.get('update_id')} chat_id={chat_id} "
f"from_bot={from_bot} has_text={msg_text is not None} "
f"reply_to={reply_to} text={_one_line(msg_text)}"
)
if "text" not in msg:
log(
"[telegram] ignoring non-text message "
f"chat_id={chat_id} update_id={upd.get('update_id')}"
)
continue
if allowed is not None and int(chat_id) not in allowed:
log(
"[telegram] rejected by ACL "
f"chat_id={chat_id} allowed={sorted(allowed)}"
)
continue
if msg.get("from", {}).get("is_bot"):
log(
"[telegram] ignoring bot message "
f"chat_id={chat_id} update_id={upd.get('update_id')}"
)
continue
text = msg["text"]
user_msg_id = msg["message_id"]
log(
"[telegram] accepted message "
f"chat_id={chat_id} user_msg_id={user_msg_id} text={_one_line(text)}"
)
# If user replied to a bot message, route to that session
resume_session: Optional[str] = None
r = msg.get("reply_to_message")
if r and "message_id" in r:
route = store.resolve(chat_id, r["message_id"])
if route and route.route_type == "exec":
resume_session = route.route_id
log(
"[telegram] resolved reply route "
f"chat_id={chat_id} bot_message_id={r['message_id']} session_id={resume_session!r}"
)
else:
log(
"[telegram] reply has no exec route "
f"chat_id={chat_id} bot_message_id={r['message_id']}"
)
pool.submit(handle, chat_id, user_msg_id, text, resume_session)
if __name__ == "__main__":
main()
@@ -0,0 +1,373 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["markdown-it-py", "sulguk", "tomli; python_version < '3.11'"]
# ///
from __future__ import annotations
import json
import shlex
import subprocess
import threading
import time
from queue import Queue, Empty
from typing import Any, Dict, List, Optional, Tuple
from .bridge_common import (
TelegramClient,
RouteStore,
config_get,
load_telegram_config,
resolve_chat_ids,
)
MCP_PROTOCOL_VERSION = "2025-06-18"
def _deep_find_agent_text(obj: Any) -> Optional[str]:
"""
Heuristic: search nested dict/list for something like:
{"type":"agent_message","text":"..."}
"""
if isinstance(obj, dict):
if obj.get("type") == "agent_message" and isinstance(obj.get("text"), str):
return obj["text"]
for v in obj.values():
t = _deep_find_agent_text(v)
if t is not None:
return t
elif isinstance(obj, list):
for it in obj:
t = _deep_find_agent_text(it)
if t is not None:
return t
return None
def _extract_text_from_tool_result(result: Any) -> str:
"""
MCP tool results often look like: {"content":[{"type":"text","text":"..."}]}
"""
if not isinstance(result, dict):
return ""
content = result.get("content")
if not isinstance(content, list):
return ""
parts: List[str] = []
for c in content:
if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str):
parts.append(c["text"])
return "\n".join(parts).strip()
class MCPStdioClient:
"""
Minimal MCP stdio JSON-RPC client:
- spawns subprocess (codex mcp-server)
- performs initialize + notifications/initialized
- supports tools/list + tools/call
"""
def __init__(self, cmd: List[str]) -> None:
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
assert self._proc.stdin and self._proc.stdout and self._proc.stderr
self._inbox: "Queue[Dict[str, Any]]" = Queue()
self._next_id = 1
self._lock = threading.Lock()
self._stderr_tail: List[str] = []
self._reader = threading.Thread(target=self._read_stdout, daemon=True)
self._reader.start()
self._stderr_reader = threading.Thread(target=self._read_stderr, daemon=True)
self._stderr_reader.start()
self._initialize()
def _read_stdout(self) -> None:
for line in self._proc.stdout: # type: ignore[union-attr]
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
if isinstance(msg, dict):
self._inbox.put(msg)
def _read_stderr(self) -> None:
for line in self._proc.stderr: # type: ignore[union-attr]
self._stderr_tail.append(line)
# keep last ~300 lines
if len(self._stderr_tail) > 300:
self._stderr_tail = self._stderr_tail[-300:]
def _send(self, msg: Dict[str, Any]) -> None:
raw = json.dumps(msg, ensure_ascii=False)
self._proc.stdin.write(raw + "\n") # type: ignore[union-attr]
self._proc.stdin.flush() # type: ignore[union-attr]
def _request(self, method: str, params: Optional[Dict[str, Any]], timeout_s: int = 600) -> Any:
"""
Send one request and wait for response.
Sequential-only (guarded by self._lock).
"""
with self._lock:
req_id = self._next_id
self._next_id += 1
msg: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id, "method": method}
if params is not None:
msg["params"] = params
self._send(msg)
deadline = time.monotonic() + timeout_s
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"MCP request timed out: {method}")
try:
incoming = self._inbox.get(timeout=min(1.0, remaining))
except Empty:
# also check process
if self._proc.poll() is not None:
tail = "".join(self._stderr_tail)
raise RuntimeError(f"MCP server exited unexpectedly. stderr tail:\n{tail}")
continue
if incoming.get("id") == req_id:
if "error" in incoming:
raise RuntimeError(f"MCP error response: {incoming['error']}")
return incoming.get("result")
# ignore other messages here (we run sequential requests)
# (notifications are handled by call_tool_collecting_events)
def _notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
msg: Dict[str, Any] = {"jsonrpc": "2.0", "method": method}
if params is not None:
msg["params"] = params
self._send(msg)
def _initialize(self) -> None:
# MCP lifecycle: initialize then notifications/initialized.
init_params = {
"protocolVersion": MCP_PROTOCOL_VERSION,
"capabilities": {},
"clientInfo": {"name": "telegram-codex-bridge", "version": "0.1.0"},
}
self._request("initialize", init_params, timeout_s=30)
# send initialized notification
self._notify("notifications/initialized")
return
def tools_list(self) -> Any:
return self._request("tools/list", {}, timeout_s=30)
def call_tool_collecting_events(
self,
tool_name: str,
arguments: Dict[str, Any],
timeout_s: int = 600,
) -> Tuple[Optional[str], str]:
"""
Calls tools/call and collects notifications during the call.
Returns (session_id, best_text_answer)
"""
with self._lock:
req_id = self._next_id
self._next_id += 1
msg = {
"jsonrpc": "2.0",
"id": req_id,
"method": "tools/call",
"params": {"name": tool_name, "arguments": arguments},
}
self._send(msg)
deadline = time.monotonic() + timeout_s
session_id: Optional[str] = None
last_agent_text: Optional[str] = None
final_result: Optional[Any] = None
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise TimeoutError(f"tools/call timed out: {tool_name}")
try:
incoming = self._inbox.get(timeout=min(1.0, remaining))
except Empty:
if self._proc.poll() is not None:
tail = "".join(self._stderr_tail)
raise RuntimeError(f"MCP server exited unexpectedly. stderr tail:\n{tail}")
continue
# Notifications (no id) can stream progress/events
if "method" in incoming and "id" not in incoming:
if incoming.get("method") == "codex/event":
params = incoming.get("params") or {}
if isinstance(params, dict):
# Workaround: session_id can arrive in notification
sid = params.get("session_id")
if isinstance(sid, str) and sid:
session_id = sid
# Try to extract agent message text from event payload heuristically
t = _deep_find_agent_text(params)
if t:
last_agent_text = t
continue
# Response for our request
if incoming.get("id") == req_id:
if "error" in incoming:
raise RuntimeError(f"MCP tools/call error: {incoming['error']}")
final_result = incoming.get("result")
break
# Ignore other responses (we do sequential calls)
# Prefer last streamed agent message, else result.content text
if last_agent_text:
return session_id, last_agent_text
text_from_result = _extract_text_from_tool_result(final_result)
return session_id, (text_from_result or "(No agent_message found; tool result had no text.)")
def close(self) -> None:
try:
self._proc.terminate()
except Exception:
pass
def main() -> None:
config = load_telegram_config()
token = config_get(config, "bot_token") or ""
db_path = config_get(config, "bridge_db") or "./bridge_routes.sqlite3"
allowed = resolve_chat_ids(config)
# How to start Codex MCP server:
# default: "codex mcp-server" (can also be "npx -y codex mcp-server")
raw_mcp_cmd = config_get(config, "codex_mcp_cmd") or "codex mcp-server"
if isinstance(raw_mcp_cmd, list):
mcp_cmd = [str(v) for v in raw_mcp_cmd]
else:
mcp_cmd = shlex.split(str(raw_mcp_cmd))
# Optional defaults for tool args (you can override as you like)
default_cwd = config_get(config, "codex_workspace")
default_sandbox = config_get(config, "codex_sandbox") or "workspace-write"
default_approval = config_get(config, "codex_approval_policy") or "never"
bot = TelegramClient(token)
store = RouteStore(db_path)
print(f"Starting MCP server: {mcp_cmd}")
mcp = MCPStdioClient(mcp_cmd)
# Optional: verify tools exist
try:
mcp.tools_list()
# Not strictly required; but helpful for debugging
print("tools/list ok")
except Exception as e:
print(f"tools/list failed: {e}")
offset: Optional[int] = None
print("Option2 bridge running (codex mcp-server). Long-polling Telegram...")
# Single worker queue so we never overlap tools/call
work_q: "Queue[Tuple[int, int, str, Optional[str]]]" = Queue()
def worker() -> None:
while True:
chat_id, user_msg_id, prompt, conversation_id = work_q.get()
try:
if conversation_id:
args = {"conversationId": conversation_id, "prompt": prompt}
sid, answer = mcp.call_tool_collecting_events("codex-reply", args, timeout_s=600)
# sid may be None on replies; keep conversation_id
sid = sid or conversation_id
else:
args = {
"prompt": prompt,
"cwd": default_cwd,
"sandbox": default_sandbox,
"approval-policy": default_approval,
}
sid, answer = mcp.call_tool_collecting_events("codex", args, timeout_s=600)
if not sid:
# Worst-case fallback (still let user see output)
sid = "unknown-session"
sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id,
text=answer,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "mcp", sid, meta={"cwd": default_cwd})
except Exception as e:
err = f"❌ Error:\n{e}"
sent_msgs = bot.send_message_markdown_chunked(
chat_id=chat_id,
text=err,
reply_to_message_id=user_msg_id,
)
for m in sent_msgs:
store.link(chat_id, m["message_id"], "mcp", conversation_id or "unknown", meta={"error": True})
finally:
work_q.task_done()
threading.Thread(target=worker, daemon=True).start()
while True:
try:
updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"])
except Exception as e:
print(f"[telegram] get_updates error: {e}")
time.sleep(2.0)
continue
for upd in updates:
offset = upd["update_id"] + 1
msg = upd.get("message") or {}
if "text" not in msg:
continue
chat_id = msg["chat"]["id"]
if allowed is not None and int(chat_id) not in allowed:
continue
if msg.get("from", {}).get("is_bot"):
continue
prompt = msg["text"]
user_msg_id = msg["message_id"]
conversation_id: Optional[str] = None
r = msg.get("reply_to_message")
if r and "message_id" in r:
route = store.resolve(chat_id, r["message_id"])
if route and route.route_type == "mcp":
conversation_id = route.route_id
work_q.put((chat_id, user_msg_id, prompt, conversation_id))
if __name__ == "__main__":
main()
@@ -0,0 +1,55 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["markdown-it-py", "sulguk", "tomli; python_version < '3.11'"]
# ///
from __future__ import annotations
import argparse
import sys
from typing import Optional
from .bridge_common import TelegramClient, RouteStore, config_get, load_telegram_config
def main() -> None:
config = load_telegram_config()
default_chat_id = config_get(config, "chat_id")
if isinstance(default_chat_id, str):
default_chat_id = int(default_chat_id) if default_chat_id.strip() else None
elif not isinstance(default_chat_id, int):
default_chat_id = None
ap = argparse.ArgumentParser()
ap.add_argument("--chat-id", type=int, default=default_chat_id, required=default_chat_id is None)
ap.add_argument("--tmux-target", type=str, required=True, help='tmux target, e.g. "codex1:0.0" or "codex1"')
ap.add_argument(
"--db",
type=str,
default=config_get(config, "bridge_db") or "./bridge_routes.sqlite3",
)
ap.add_argument("--reply-to", type=int, default=None, help="Optional Telegram message_id to reply to")
ap.add_argument("--text", type=str, default=None, help="Message text. If omitted, read stdin.")
args = ap.parse_args()
token = config_get(config, "bot_token") or ""
bot = TelegramClient(token)
store = RouteStore(args.db)
text = args.text
if text is None:
text = sys.stdin.read()
sent = bot.send_message_markdown_chunked(
chat_id=args.chat_id,
text=text,
reply_to_message_id=args.reply_to,
)
# Store mapping for every chunk so user can reply to any chunk.
for m in sent:
store.link(args.chat_id, m["message_id"], "tmux", args.tmux_target, meta={})
if __name__ == "__main__":
main()
@@ -0,0 +1,104 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = ["markdown-it-py", "sulguk", "tomli; python_version < '3.11'"]
# ///
from __future__ import annotations
import subprocess
import time
from typing import Optional
from .bridge_common import (
TelegramClient,
RouteStore,
config_get,
load_telegram_config,
resolve_chat_ids,
)
def tmux_send_text(target: str, text: str, press_enter: bool = True) -> None:
"""
Send text to tmux target pane/session.
If your Telegram messages include newlines, we replace them with literal '\n'
to avoid accidentally submitting early.
"""
safe = (text or "").replace("\r\n", "\n").replace("\r", "\n").replace("\n", "\\n")
subprocess.check_call(["tmux", "send-keys", "-t", target, "-l", safe])
if press_enter:
subprocess.check_call(["tmux", "send-keys", "-t", target, "Enter"])
def main() -> None:
config = load_telegram_config()
token = config_get(config, "bot_token") or ""
db_path = config_get(config, "bridge_db") or "./bridge_routes.sqlite3"
allowed = resolve_chat_ids(config)
bot = TelegramClient(token)
store = RouteStore(db_path)
offset: Optional[int] = None
print("Option3 reply bot running (tmux injector). Long-polling Telegram...")
while True:
try:
updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"])
except Exception as e:
print(f"[telegram] get_updates error: {e}")
time.sleep(2.0)
continue
for upd in updates:
offset = upd["update_id"] + 1
msg = upd.get("message") or {}
if "text" not in msg:
continue
chat_id = msg["chat"]["id"]
if allowed is not None and int(chat_id) not in allowed:
continue
if msg.get("from", {}).get("is_bot"):
continue
text = msg["text"]
user_msg_id = msg["message_id"]
r = msg.get("reply_to_message")
if not (r and "message_id" in r):
# In tmux mode we only route replies (no reply => ignore or treat as new session)
bot.send_message(
chat_id=chat_id,
text="Reply to a Codex message (from the bot) so I know which tmux session to send this to.",
reply_to_message_id=user_msg_id,
)
continue
route = store.resolve(chat_id, r["message_id"])
if not route or route.route_type != "tmux":
bot.send_message(
chat_id=chat_id,
text="I don't know which tmux session this reply belongs to (no routing record found).",
reply_to_message_id=user_msg_id,
)
continue
tmux_target = route.route_id
try:
tmux_send_text(tmux_target, text, press_enter=True)
bot.send_message(
chat_id=chat_id,
text=f"✅ Sent to tmux target: {tmux_target}",
reply_to_message_id=user_msg_id,
)
except Exception as e:
bot.send_message(
chat_id=chat_id,
text=f"❌ Failed to send to tmux ({tmux_target}): {e}",
reply_to_message_id=user_msg_id,
)
if __name__ == "__main__":
main()