diff --git a/codex/codex_telegram_bridge/bridge_common.py b/codex/codex_telegram_bridge/bridge_common.py new file mode 100644 index 0000000..0fdb287 --- /dev/null +++ b/codex/codex_telegram_bridge/bridge_common.py @@ -0,0 +1,232 @@ +from __future__ import annotations + +import json +import os +import sqlite3 +import time +import urllib.error +import urllib.request +from dataclasses import dataclass +from typing import Any, Dict, Iterable, List, Optional, Tuple + +TELEGRAM_HARD_LIMIT = 4096 +DEFAULT_CHUNK_LEN = 3500 # leave room for formatting / safety + + +def _now_unix() -> int: + return int(time.time()) + + +def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]: + """ + Telegram hard limit is 4096 chars. Chunk at newlines when possible. + """ + text = text or "" + if len(text) <= limit: + return [text] + + out: List[str] = [] + buf: List[str] = [] + size = 0 + + for line in text.splitlines(keepends=True): + if len(line) > limit: + # flush current buffer + if buf: + out.append("".join(buf)) + buf, size = [], 0 + # hard-split this long line + for i in range(0, len(line), limit): + out.append(line[i : i + limit]) + continue + + if size + len(line) > limit: + out.append("".join(buf)) + buf, size = [line], len(line) + else: + buf.append(line) + size += len(line) + + if buf: + out.append("".join(buf)) + return out + + +class TelegramClient: + """ + Minimal Telegram Bot API client using standard library (no requests dependency). + + Env: + TELEGRAM_BOT_TOKEN + """ + + def __init__(self, token: str, timeout_s: int = 120) -> None: + if not token: + raise ValueError("Telegram token is empty") + self._base = f"https://api.telegram.org/bot{token}" + self._timeout_s = timeout_s + + def _call(self, method: str, params: Dict[str, Any]) -> Any: + url = f"{self._base}/{method}" + data = json.dumps(params).encode("utf-8") + req = urllib.request.Request( + url, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req, timeout=self._timeout_s) as resp: + payload = json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as e: + body = e.read().decode("utf-8", errors="replace") + raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e + except urllib.error.URLError as e: + raise RuntimeError(f"Telegram URLError: {e}") from e + + if not payload.get("ok"): + raise RuntimeError(f"Telegram API error: {payload}") + return payload["result"] + + def get_updates( + self, + offset: Optional[int], + timeout_s: int = 50, + allowed_updates: Optional[List[str]] = None, + ) -> List[Dict[str, Any]]: + params: Dict[str, Any] = {"timeout": timeout_s} + if offset is not None: + params["offset"] = offset + if allowed_updates is not None: + params["allowed_updates"] = allowed_updates + return self._call("getUpdates", params) + + def send_message( + self, + chat_id: int, + text: str, + reply_to_message_id: Optional[int] = None, + disable_notification: bool = False, + ) -> Dict[str, Any]: + if len(text) > TELEGRAM_HARD_LIMIT: + raise ValueError("send_message received too-long text; chunk it first") + params: Dict[str, Any] = { + "chat_id": chat_id, + "text": text, + "disable_notification": disable_notification, + } + if reply_to_message_id is not None: + params["reply_to_message_id"] = reply_to_message_id + return self._call("sendMessage", params) + + def send_message_chunked( + self, + chat_id: int, + text: str, + reply_to_message_id: Optional[int] = None, + disable_notification: bool = False, + chunk_len: int = DEFAULT_CHUNK_LEN, + ) -> List[Dict[str, Any]]: + sent: List[Dict[str, Any]] = [] + chunks = chunk_text(text, limit=chunk_len) + for i, c in enumerate(chunks): + msg = self.send_message( + chat_id=chat_id, + text=c, + reply_to_message_id=(reply_to_message_id if i == 0 else None), + disable_notification=disable_notification, + ) + sent.append(msg) + return sent + + +@dataclass(frozen=True) +class Route: + route_type: str # "exec" | "mcp" | "tmux" + route_id: str # session_id / conversationId / tmux target + meta: Dict[str, Any] + + +class RouteStore: + """ + Stores mapping: (chat_id, bot_message_id) -> route + so Telegram replies can be routed. + """ + + def __init__(self, path: str) -> None: + os.makedirs(os.path.dirname(path) or ".", exist_ok=True) + self._conn = sqlite3.connect(path, check_same_thread=False) + self._conn.execute("PRAGMA journal_mode=WAL;") + self._conn.execute( + """ + CREATE TABLE IF NOT EXISTS routes ( + chat_id INTEGER NOT NULL, + bot_message_id INTEGER NOT NULL, + route_type TEXT NOT NULL, + route_id TEXT NOT NULL, + meta_json TEXT, + created_at INTEGER NOT NULL, + PRIMARY KEY (chat_id, bot_message_id) + ); + """ + ) + self._conn.execute( + "CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);" + ) + self._conn.commit() + + def link( + self, + chat_id: int, + bot_message_id: int, + route_type: str, + route_id: str, + meta: Optional[Dict[str, Any]] = None, + ) -> None: + meta_json = json.dumps(meta or {}, ensure_ascii=False) + self._conn.execute( + """ + INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at) + VALUES(?, ?, ?, ?, ?, ?) + """, + (chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()), + ) + self._conn.commit() + + def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]: + cur = self._conn.execute( + """ + SELECT route_type, route_id, meta_json + FROM routes + WHERE chat_id = ? AND bot_message_id = ? + """, + (chat_id, bot_message_id), + ) + row = cur.fetchone() + if not row: + return None + route_type, route_id, meta_json = row + try: + meta = json.loads(meta_json) if meta_json else {} + except json.JSONDecodeError: + meta = {} + return Route(route_type=route_type, route_id=route_id, meta=meta) + + def close(self) -> None: + self._conn.close() + + +def parse_allowed_chat_ids(env_value: str) -> Optional[set[int]]: + """ + Parse ALLOWED_CHAT_IDS="123,456" + """ + v = (env_value or "").strip() + if not v: + return None + out: set[int] = set() + for part in v.split(","): + part = part.strip() + if not part: + continue + out.add(int(part)) + return out diff --git a/codex/codex_telegram_bridge/exec_bridge.py b/codex/codex_telegram_bridge/exec_bridge.py new file mode 100644 index 0000000..2afc6f3 --- /dev/null +++ b/codex/codex_telegram_bridge/exec_bridge.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +import json +import os +import shlex +import subprocess +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Dict, Optional, Tuple + +from bridge_common import TelegramClient, RouteStore, parse_allowed_chat_ids + +# -------------------- Codex runner -------------------- + + +class CodexExecRunner: + """ + Runs Codex in non-interactive mode: + - new: codex exec --json ... - + - resume: codex exec --json ... resume - + """ + + def __init__(self, codex_cmd: str, workspace: Optional[str], extra_args: list[str]) -> None: + self.codex_cmd = codex_cmd + self.workspace = workspace + self.extra_args = extra_args + + # per-session locks to prevent concurrent resumes to same session_id + self._locks: dict[str, threading.Lock] = {} + self._locks_guard = threading.Lock() + + def _lock_for(self, session_id: str) -> threading.Lock: + with self._locks_guard: + if session_id not in self._locks: + self._locks[session_id] = threading.Lock() + return self._locks[session_id] + + def run(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]: + """ + Returns (session_id, final_agent_message_text) + """ + args = [self.codex_cmd, "exec", "--json"] + args.extend(self.extra_args) + if self.workspace: + args.extend(["--cd", self.workspace]) + + # Always pipe prompt via stdin ("-") to avoid quoting issues. + if session_id: + args.extend(["resume", session_id, "-"]) + else: + args.append("-") + + # read both stdout+stderr without deadlock + proc = subprocess.Popen( + args, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + assert proc.stdin and proc.stdout and proc.stderr + + # send prompt then close stdin + proc.stdin.write(prompt) + proc.stdin.close() + + stderr_lines: list[str] = [] + + def _drain_stderr() -> None: + for line in proc.stderr: + stderr_lines.append(line) + + t = threading.Thread(target=_drain_stderr, daemon=True) + t.start() + + found_session: Optional[str] = session_id + last_agent_text: Optional[str] = None + + for line in proc.stdout: + line = line.strip() + if not line: + continue + try: + evt = json.loads(line) + except json.JSONDecodeError: + continue + + # From Codex JSONL event stream + if evt.get("type") == "thread.started": + found_session = evt.get("thread_id") or found_session + + if evt.get("type") == "item.completed": + item = evt.get("item") or {} + if item.get("type") == "agent_message" and isinstance(item.get("text"), str): + last_agent_text = item["text"] + + rc = proc.wait() + t.join(timeout=2.0) + + if rc != 0: + tail = "".join(stderr_lines[-200:]) + raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}") + + if not found_session: + raise RuntimeError("codex exec finished but no session_id/thread_id was captured") + + return found_session, (last_agent_text or "(No agent_message captured from JSON stream.)") + + def run_serialized(self, prompt: str, session_id: Optional[str]) -> Tuple[str, str]: + """ + If resuming, serialize per-session. + """ + if not session_id: + return self.run(prompt, session_id=None) + lock = self._lock_for(session_id) + with lock: + return self.run(prompt, session_id=session_id) + + +# -------------------- Telegram loop -------------------- + + +def main() -> None: + token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + db_path = os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3") + allowed = parse_allowed_chat_ids(os.environ.get("ALLOWED_CHAT_IDS", "")) + + codex_cmd = os.environ.get("CODEX_CMD", "codex") + workspace = os.environ.get("CODEX_WORKSPACE") # optional + extra_args = shlex.split(os.environ.get("CODEX_EXEC_ARGS", "")) # e.g. "--full-auto --search" + + bot = TelegramClient(token) + store = RouteStore(db_path) + runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args) + + pool = ThreadPoolExecutor(max_workers=int(os.environ.get("MAX_WORKERS", "4"))) + offset: Optional[int] = None + + print("Option1 bridge running (codex exec). Long-polling Telegram...") + + def handle(chat_id: int, user_msg_id: int, text: str, resume_session: Optional[str]) -> None: + try: + session_id, answer = runner.run_serialized(text, resume_session) + sent_msgs = bot.send_message_chunked( + chat_id=chat_id, + text=answer, + reply_to_message_id=user_msg_id, + ) + for m in sent_msgs: + store.link(chat_id, m["message_id"], "exec", session_id, meta={"workspace": workspace}) + except Exception as e: + err = f"❌ Error:\n{e}" + sent_msgs = bot.send_message_chunked(chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) + for m in sent_msgs: + store.link(chat_id, m["message_id"], "exec", resume_session or "unknown", meta={"error": True}) + + while True: + try: + updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) + except Exception as e: + print(f"[telegram] get_updates error: {e}") + time.sleep(2.0) + continue + + for upd in updates: + offset = upd["update_id"] + 1 + msg = upd.get("message") or {} + if "text" not in msg: + continue + + chat_id = msg["chat"]["id"] + if allowed is not None and int(chat_id) not in allowed: + continue + + if msg.get("from", {}).get("is_bot"): + continue + + text = msg["text"] + user_msg_id = msg["message_id"] + + # If user replied to a bot message, route to that session + resume_session: Optional[str] = None + r = msg.get("reply_to_message") + if r and "message_id" in r: + route = store.resolve(chat_id, r["message_id"]) + if route and route.route_type == "exec": + resume_session = route.route_id + + pool.submit(handle, chat_id, user_msg_id, text, resume_session) + + +if __name__ == "__main__": + main() diff --git a/codex/codex_telegram_bridge/mcp_bridge.py b/codex/codex_telegram_bridge/mcp_bridge.py new file mode 100644 index 0000000..2d5f5a4 --- /dev/null +++ b/codex/codex_telegram_bridge/mcp_bridge.py @@ -0,0 +1,350 @@ +from __future__ import annotations + +import json +import os +import shlex +import subprocess +import threading +import time +from queue import Queue, Empty +from typing import Any, Dict, List, Optional, Tuple + +from bridge_common import TelegramClient, RouteStore, parse_allowed_chat_ids + +MCP_PROTOCOL_VERSION = "2025-06-18" + + +def _deep_find_agent_text(obj: Any) -> Optional[str]: + """ + Heuristic: search nested dict/list for something like: + {"type":"agent_message","text":"..."} + """ + if isinstance(obj, dict): + if obj.get("type") == "agent_message" and isinstance(obj.get("text"), str): + return obj["text"] + for v in obj.values(): + t = _deep_find_agent_text(v) + if t is not None: + return t + elif isinstance(obj, list): + for it in obj: + t = _deep_find_agent_text(it) + if t is not None: + return t + return None + + +def _extract_text_from_tool_result(result: Any) -> str: + """ + MCP tool results often look like: {"content":[{"type":"text","text":"..."}]} + """ + if not isinstance(result, dict): + return "" + content = result.get("content") + if not isinstance(content, list): + return "" + parts: List[str] = [] + for c in content: + if isinstance(c, dict) and c.get("type") == "text" and isinstance(c.get("text"), str): + parts.append(c["text"]) + return "\n".join(parts).strip() + + +class MCPStdioClient: + """ + Minimal MCP stdio JSON-RPC client: + - spawns subprocess (codex mcp-server) + - performs initialize + notifications/initialized + - supports tools/list + tools/call + """ + + def __init__(self, cmd: List[str]) -> None: + self._proc = subprocess.Popen( + cmd, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + assert self._proc.stdin and self._proc.stdout and self._proc.stderr + self._inbox: "Queue[Dict[str, Any]]" = Queue() + self._next_id = 1 + self._lock = threading.Lock() + self._stderr_tail: List[str] = [] + + self._reader = threading.Thread(target=self._read_stdout, daemon=True) + self._reader.start() + + self._stderr_reader = threading.Thread(target=self._read_stderr, daemon=True) + self._stderr_reader.start() + + self._initialize() + + def _read_stdout(self) -> None: + for line in self._proc.stdout: # type: ignore[union-attr] + line = line.strip() + if not line: + continue + try: + msg = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(msg, dict): + self._inbox.put(msg) + + def _read_stderr(self) -> None: + for line in self._proc.stderr: # type: ignore[union-attr] + self._stderr_tail.append(line) + # keep last ~300 lines + if len(self._stderr_tail) > 300: + self._stderr_tail = self._stderr_tail[-300:] + + def _send(self, msg: Dict[str, Any]) -> None: + raw = json.dumps(msg, ensure_ascii=False) + self._proc.stdin.write(raw + "\n") # type: ignore[union-attr] + self._proc.stdin.flush() # type: ignore[union-attr] + + def _request(self, method: str, params: Optional[Dict[str, Any]], timeout_s: int = 600) -> Any: + """ + Send one request and wait for response. + Sequential-only (guarded by self._lock). + """ + with self._lock: + req_id = self._next_id + self._next_id += 1 + + msg: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id, "method": method} + if params is not None: + msg["params"] = params + + self._send(msg) + + deadline = time.monotonic() + timeout_s + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError(f"MCP request timed out: {method}") + + try: + incoming = self._inbox.get(timeout=min(1.0, remaining)) + except Empty: + # also check process + if self._proc.poll() is not None: + tail = "".join(self._stderr_tail) + raise RuntimeError(f"MCP server exited unexpectedly. stderr tail:\n{tail}") + continue + + if incoming.get("id") == req_id: + if "error" in incoming: + raise RuntimeError(f"MCP error response: {incoming['error']}") + return incoming.get("result") + + # ignore other messages here (we run sequential requests) + # (notifications are handled by call_tool_collecting_events) + + def _notify(self, method: str, params: Optional[Dict[str, Any]] = None) -> None: + msg: Dict[str, Any] = {"jsonrpc": "2.0", "method": method} + if params is not None: + msg["params"] = params + self._send(msg) + + def _initialize(self) -> None: + # MCP lifecycle: initialize then notifications/initialized. + init_params = { + "protocolVersion": MCP_PROTOCOL_VERSION, + "capabilities": {}, + "clientInfo": {"name": "telegram-codex-bridge", "version": "0.1.0"}, + } + self._request("initialize", init_params, timeout_s=30) + # send initialized notification + self._notify("notifications/initialized") + return + + def tools_list(self) -> Any: + return self._request("tools/list", {}, timeout_s=30) + + def call_tool_collecting_events( + self, + tool_name: str, + arguments: Dict[str, Any], + timeout_s: int = 600, + ) -> Tuple[Optional[str], str]: + """ + Calls tools/call and collects notifications during the call. + Returns (session_id, best_text_answer) + """ + with self._lock: + req_id = self._next_id + self._next_id += 1 + + msg = { + "jsonrpc": "2.0", + "id": req_id, + "method": "tools/call", + "params": {"name": tool_name, "arguments": arguments}, + } + self._send(msg) + + deadline = time.monotonic() + timeout_s + session_id: Optional[str] = None + last_agent_text: Optional[str] = None + final_result: Optional[Any] = None + + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise TimeoutError(f"tools/call timed out: {tool_name}") + + try: + incoming = self._inbox.get(timeout=min(1.0, remaining)) + except Empty: + if self._proc.poll() is not None: + tail = "".join(self._stderr_tail) + raise RuntimeError(f"MCP server exited unexpectedly. stderr tail:\n{tail}") + continue + + # Notifications (no id) can stream progress/events + if "method" in incoming and "id" not in incoming: + if incoming.get("method") == "codex/event": + params = incoming.get("params") or {} + if isinstance(params, dict): + # Workaround: session_id can arrive in notification + sid = params.get("session_id") + if isinstance(sid, str) and sid: + session_id = sid + + # Try to extract agent message text from event payload heuristically + t = _deep_find_agent_text(params) + if t: + last_agent_text = t + continue + + # Response for our request + if incoming.get("id") == req_id: + if "error" in incoming: + raise RuntimeError(f"MCP tools/call error: {incoming['error']}") + final_result = incoming.get("result") + break + + # Ignore other responses (we do sequential calls) + + # Prefer last streamed agent message, else result.content text + if last_agent_text: + return session_id, last_agent_text + + text_from_result = _extract_text_from_tool_result(final_result) + return session_id, (text_from_result or "(No agent_message found; tool result had no text.)") + + def close(self) -> None: + try: + self._proc.terminate() + except Exception: + pass + + +def main() -> None: + token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + db_path = os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3") + allowed = parse_allowed_chat_ids(os.environ.get("ALLOWED_CHAT_IDS", "")) + + # How to start Codex MCP server: + # default: "codex mcp-server" (can also be "npx -y codex mcp-server") + mcp_cmd = shlex.split(os.environ.get("CODEX_MCP_CMD", "codex mcp-server")) + + # Optional defaults for tool args (you can override as you like) + default_cwd = os.environ.get("CODEX_WORKSPACE") # used as tool 'cwd' + default_sandbox = os.environ.get("CODEX_SANDBOX", "workspace-write") + default_approval = os.environ.get("CODEX_APPROVAL_POLICY", "never") + + bot = TelegramClient(token) + store = RouteStore(db_path) + + print(f"Starting MCP server: {mcp_cmd}") + mcp = MCPStdioClient(mcp_cmd) + + # Optional: verify tools exist + try: + mcp.tools_list() + # Not strictly required; but helpful for debugging + print("tools/list ok") + except Exception as e: + print(f"tools/list failed: {e}") + + offset: Optional[int] = None + + print("Option2 bridge running (codex mcp-server). Long-polling Telegram...") + + # Single worker queue so we never overlap tools/call + work_q: "Queue[Tuple[int, int, str, Optional[str]]]" = Queue() + + def worker() -> None: + while True: + chat_id, user_msg_id, prompt, conversation_id = work_q.get() + try: + if conversation_id: + args = {"conversationId": conversation_id, "prompt": prompt} + sid, answer = mcp.call_tool_collecting_events("codex-reply", args, timeout_s=600) + # sid may be None on replies; keep conversation_id + sid = sid or conversation_id + else: + args = { + "prompt": prompt, + "cwd": default_cwd, + "sandbox": default_sandbox, + "approval-policy": default_approval, + } + sid, answer = mcp.call_tool_collecting_events("codex", args, timeout_s=600) + if not sid: + # Worst-case fallback (still let user see output) + sid = "unknown-session" + + sent_msgs = bot.send_message_chunked(chat_id=chat_id, text=answer, reply_to_message_id=user_msg_id) + for m in sent_msgs: + store.link(chat_id, m["message_id"], "mcp", sid, meta={"cwd": default_cwd}) + except Exception as e: + err = f"❌ Error:\n{e}" + sent_msgs = bot.send_message_chunked(chat_id=chat_id, text=err, reply_to_message_id=user_msg_id) + for m in sent_msgs: + store.link(chat_id, m["message_id"], "mcp", conversation_id or "unknown", meta={"error": True}) + finally: + work_q.task_done() + + threading.Thread(target=worker, daemon=True).start() + + while True: + try: + updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) + except Exception as e: + print(f"[telegram] get_updates error: {e}") + time.sleep(2.0) + continue + + for upd in updates: + offset = upd["update_id"] + 1 + msg = upd.get("message") or {} + if "text" not in msg: + continue + + chat_id = msg["chat"]["id"] + if allowed is not None and int(chat_id) not in allowed: + continue + + if msg.get("from", {}).get("is_bot"): + continue + + prompt = msg["text"] + user_msg_id = msg["message_id"] + + conversation_id: Optional[str] = None + r = msg.get("reply_to_message") + if r and "message_id" in r: + route = store.resolve(chat_id, r["message_id"]) + if route and route.route_type == "mcp": + conversation_id = route.route_id + + work_q.put((chat_id, user_msg_id, prompt, conversation_id)) + + +if __name__ == "__main__": + main() diff --git a/codex/codex_telegram_bridge/readme.md b/codex/codex_telegram_bridge/readme.md new file mode 100644 index 0000000..212aa37 --- /dev/null +++ b/codex/codex_telegram_bridge/readme.md @@ -0,0 +1,67 @@ +# Telegram Codex Bridge (Codex) + +Route Telegram replies back into Codex sessions. Includes three options: + +1. Non-interactive `codex exec` + `codex exec resume`. +2. `codex mcp-server` with MCP stdio JSON-RPC. +3. tmux injection for interactive Codex sessions. + +All options store a mapping from `(chat_id, bot_message_id)` to a route so replies can be routed correctly. + +## Install + +1. Ensure `uv` is installed. +2. Use the scripts in this folder as-is (no extra dependencies). +3. Set `TELEGRAM_BOT_TOKEN` and (optionally) `ALLOWED_CHAT_IDS`. + +## Option 1: exec/resume + +Run: + +```bash +export TELEGRAM_BOT_TOKEN="123:abc" +export BRIDGE_DB="./bridge_routes.sqlite3" +export CODEX_CMD="codex" +export CODEX_WORKSPACE="/path/to/repo" +export CODEX_EXEC_ARGS="--full-auto" +uv run exec_bridge.py +``` + +## Option 2: MCP server + +Run: + +```bash +export TELEGRAM_BOT_TOKEN="123:abc" +export BRIDGE_DB="./bridge_routes.sqlite3" +export CODEX_MCP_CMD="codex mcp-server" +export CODEX_WORKSPACE="/path/to/repo" +export CODEX_SANDBOX="workspace-write" +export CODEX_APPROVAL_POLICY="never" +uv run mcp_bridge.py +``` + +## Option 3: tmux + +Reply injector: + +```bash +export TELEGRAM_BOT_TOKEN="123:abc" +export BRIDGE_DB="./bridge_routes.sqlite3" +export ALLOWED_CHAT_IDS="123456789" +uv run tmux_reply_bot.py +``` + +Notifier (call from your existing hook): + +```bash +uv run tmux_notify.py --chat-id "$CHAT_ID" --tmux-target "codex1:0.0" --text "$TURN_TEXT" +``` + +## Files + +- `bridge_common.py`: shared Telegram client, chunking, and routing store +- `exec_bridge.py`: codex exec + resume bridge +- `mcp_bridge.py`: MCP stdio JSON-RPC bridge +- `tmux_notify.py`: tmux notifier helper +- `tmux_reply_bot.py`: tmux reply injector diff --git a/codex/codex_telegram_bridge/tmux_notify.py b/codex/codex_telegram_bridge/tmux_notify.py new file mode 100644 index 0000000..941a83e --- /dev/null +++ b/codex/codex_telegram_bridge/tmux_notify.py @@ -0,0 +1,40 @@ +from __future__ import annotations + +import argparse +import os +import sys +from typing import Optional + +from bridge_common import TelegramClient, RouteStore + + +def main() -> None: + ap = argparse.ArgumentParser() + ap.add_argument("--chat-id", type=int, required=True) + ap.add_argument("--tmux-target", type=str, required=True, help='tmux target, e.g. "codex1:0.0" or "codex1"') + ap.add_argument("--db", type=str, default=os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3")) + ap.add_argument("--reply-to", type=int, default=None, help="Optional Telegram message_id to reply to") + ap.add_argument("--text", type=str, default=None, help="Message text. If omitted, read stdin.") + args = ap.parse_args() + + token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + bot = TelegramClient(token) + store = RouteStore(args.db) + + text = args.text + if text is None: + text = sys.stdin.read() + + sent = bot.send_message_chunked( + chat_id=args.chat_id, + text=text, + reply_to_message_id=args.reply_to, + ) + + # Store mapping for every chunk so user can reply to any chunk. + for m in sent: + store.link(args.chat_id, m["message_id"], "tmux", args.tmux_target, meta={}) + + +if __name__ == "__main__": + main() diff --git a/codex/codex_telegram_bridge/tmux_reply_bot.py b/codex/codex_telegram_bridge/tmux_reply_bot.py new file mode 100644 index 0000000..ec12254 --- /dev/null +++ b/codex/codex_telegram_bridge/tmux_reply_bot.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +import os +import subprocess +import time +from typing import Optional + +from bridge_common import TelegramClient, RouteStore, parse_allowed_chat_ids + + +def tmux_send_text(target: str, text: str, press_enter: bool = True) -> None: + """ + Send text to tmux target pane/session. + If your Telegram messages include newlines, we replace them with literal '\n' + to avoid accidentally submitting early. + """ + safe = (text or "").replace("\r\n", "\n").replace("\r", "\n").replace("\n", "\\n") + subprocess.check_call(["tmux", "send-keys", "-t", target, "-l", safe]) + if press_enter: + subprocess.check_call(["tmux", "send-keys", "-t", target, "Enter"]) + + +def main() -> None: + token = os.environ.get("TELEGRAM_BOT_TOKEN", "") + db_path = os.environ.get("BRIDGE_DB", "./bridge_routes.sqlite3") + allowed = parse_allowed_chat_ids(os.environ.get("ALLOWED_CHAT_IDS", "")) + + bot = TelegramClient(token) + store = RouteStore(db_path) + + offset: Optional[int] = None + print("Option3 reply bot running (tmux injector). Long-polling Telegram...") + + while True: + try: + updates = bot.get_updates(offset=offset, timeout_s=50, allowed_updates=["message"]) + except Exception as e: + print(f"[telegram] get_updates error: {e}") + time.sleep(2.0) + continue + + for upd in updates: + offset = upd["update_id"] + 1 + msg = upd.get("message") or {} + if "text" not in msg: + continue + + chat_id = msg["chat"]["id"] + if allowed is not None and int(chat_id) not in allowed: + continue + if msg.get("from", {}).get("is_bot"): + continue + + text = msg["text"] + user_msg_id = msg["message_id"] + + r = msg.get("reply_to_message") + if not (r and "message_id" in r): + # In tmux mode we only route replies (no reply => ignore or treat as new session) + bot.send_message( + chat_id=chat_id, + text="Reply to a Codex message (from the bot) so I know which tmux session to send this to.", + reply_to_message_id=user_msg_id, + ) + continue + + route = store.resolve(chat_id, r["message_id"]) + if not route or route.route_type != "tmux": + bot.send_message( + chat_id=chat_id, + text="I don't know which tmux session this reply belongs to (no routing record found).", + reply_to_message_id=user_msg_id, + ) + continue + + tmux_target = route.route_id + try: + tmux_send_text(tmux_target, text, press_enter=True) + bot.send_message( + chat_id=chat_id, + text=f"✅ Sent to tmux target: {tmux_target}", + reply_to_message_id=user_msg_id, + ) + except Exception as e: + bot.send_message( + chat_id=chat_id, + text=f"❌ Failed to send to tmux ({tmux_target}): {e}", + reply_to_message_id=user_msg_id, + ) + + +if __name__ == "__main__": + main() diff --git a/readme.md b/readme.md index ebf98c0..5bee248 100644 --- a/readme.md +++ b/readme.md @@ -3,3 +3,4 @@ ## Codex - [Notify Telegram](codex/notify_telegram/readme.md) — Send Codex completion summaries to Telegram with safe Markdown rendering. +- [Codex Telegram Bridge](codex/codex_telegram_bridge/readme.md) — Route Telegram replies into Codex sessions (exec, MCP, or tmux).