refactor: split bridge common into modules

This commit is contained in:
banteg
2025-12-28 21:51:48 +04:00
parent 353ee8d05a
commit cec35fef4b
11 changed files with 500 additions and 456 deletions
+6 -1
View File
@@ -78,7 +78,12 @@ Add `--chat-id` if `chat_id` is not set in `~/.codex/telegram.toml`.
## Files
- `src/codex_telegram_bridge/bridge_common.py`: shared Telegram client, chunking, and routing store
- `src/codex_telegram_bridge/bridge_common.py`: compatibility re-exports
- `src/codex_telegram_bridge/constants.py`: limits and config path constants
- `src/codex_telegram_bridge/config.py`: config loading and chat-id parsing helpers
- `src/codex_telegram_bridge/rendering.py`: markdown rendering + chunking
- `src/codex_telegram_bridge/routes.py`: sqlite routing store
- `src/codex_telegram_bridge/telegram_client.py`: Telegram Bot API client
- `src/codex_telegram_bridge/exec_bridge.py`: codex exec + resume bridge
- `src/codex_telegram_bridge/mcp_bridge.py`: MCP stdio JSON-RPC bridge
- `src/codex_telegram_bridge/tmux_notify.py`: tmux notifier helper
@@ -1,434 +1,29 @@
from __future__ import annotations
import json
import os
import re
import sqlite3
import time
import urllib.error
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from markdown_it import MarkdownIt
from sulguk import transform_html
TELEGRAM_HARD_LIMIT = 4096
DEFAULT_CHUNK_LEN = 3500 # leave room for formatting / safety
TELEGRAM_CONFIG_PATH = Path.home() / ".codex" / "telegram.toml"
def _now_unix() -> int:
return int(time.time())
def _load_toml(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
import tomllib
return tomllib.loads(path.read_text(encoding="utf-8"))
def load_telegram_config(path: Optional[str] = None) -> Dict[str, Any]:
cfg_path = Path(path) if path else TELEGRAM_CONFIG_PATH
return _load_toml(cfg_path)
def config_get(config: Dict[str, Any], key: str) -> Any:
if key in config:
return config[key]
nested = config.get("telegram")
if isinstance(nested, dict) and key in nested:
return nested[key]
return None
def render_markdown(md: str) -> Tuple[str, List[Dict[str, Any]]]:
html = MarkdownIt("commonmark", {"html": False}).render(md or "")
rendered = transform_html(html)
text = re.sub("(?m)^(\\s*)\u2022", r"\1-", rendered.text)
# FIX: Telegram requires MessageEntity.language (if present) to be a String.
entities: List[Dict[str, Any]] = []
for e in rendered.entities:
d = dict(e)
if "language" in d and not isinstance(d["language"], str):
d.pop("language", None)
entities.append(d)
return text, entities
def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]:
"""
Telegram hard limit is 4096 chars. Chunk at newlines when possible.
"""
text = text or ""
if len(text) <= limit:
return [text]
out: List[str] = []
buf: List[str] = []
size = 0
for line in text.splitlines(keepends=True):
if len(line) > limit:
# flush current buffer
if buf:
out.append("".join(buf))
buf, size = [], 0
# hard-split this long line
for i in range(0, len(line), limit):
out.append(line[i : i + limit])
continue
if size + len(line) > limit:
out.append("".join(buf))
buf, size = [line], len(line)
else:
buf.append(line)
size += len(line)
if buf:
out.append("".join(buf))
return out
def _chunk_text_with_indices(text: str, limit: int) -> List[Tuple[str, int, int]]:
text = text or ""
if len(text) <= limit:
return [(text, 0, len(text))]
out: List[Tuple[str, int, int]] = []
buf: List[str] = []
size = 0
buf_start = 0
pos = 0
for line in text.splitlines(keepends=True):
line_len = len(line)
line_start = pos
line_end = pos + line_len
if line_len > limit:
if buf:
out.append(("".join(buf), buf_start, line_start))
buf, size = [], 0
for i in range(0, line_len, limit):
part = line[i : i + limit]
out.append((part, line_start + i, line_start + i + len(part)))
pos = line_end
buf_start = pos
continue
if size + line_len > limit:
out.append(("".join(buf), buf_start, line_start))
buf = [line]
size = line_len
buf_start = line_start
else:
if not buf:
buf_start = line_start
buf.append(line)
size += line_len
pos = line_end
if buf:
out.append(("".join(buf), buf_start, pos))
return out
def _slice_entities(entities: List[Dict[str, Any]], start: int, end: int) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for ent in entities:
try:
ent_start = int(ent.get("offset", 0))
ent_len = int(ent.get("length", 0))
except (TypeError, ValueError):
continue
if ent_len <= 0:
continue
ent_end = ent_start + ent_len
if ent_end <= start or ent_start >= end:
continue
new_start = max(ent_start, start)
new_end = min(ent_end, end)
new_len = new_end - new_start
if new_len <= 0:
continue
new_ent = dict(ent)
new_ent["offset"] = new_start - start
new_ent["length"] = new_len
out.append(new_ent)
return out
class TelegramClient:
"""
Minimal Telegram Bot API client using standard library (no requests dependency).
"""
def __init__(self, token: str, timeout_s: int = 120) -> None:
if not token:
raise ValueError("Telegram token is empty")
self._base = f"https://api.telegram.org/bot{token}"
self._timeout_s = timeout_s
def _call(self, method: str, params: Dict[str, Any]) -> Any:
url = f"{self._base}/{method}"
data = json.dumps(params).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
from .config import (
config_get,
load_telegram_config,
parse_allowed_chat_ids,
parse_chat_id_list,
resolve_chat_ids,
)
try:
with urllib.request.urlopen(req, timeout=self._timeout_s) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Telegram URLError: {e}") from e
from .constants import DEFAULT_CHUNK_LEN, TELEGRAM_CONFIG_PATH, TELEGRAM_HARD_LIMIT
from .rendering import chunk_text, render_markdown
from .routes import Route, RouteStore
from .telegram_client import TelegramClient
if not payload.get("ok"):
raise RuntimeError(f"Telegram API error: {payload}")
return payload["result"]
def get_updates(
self,
offset: Optional[int],
timeout_s: int = 50,
allowed_updates: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
params: Dict[str, Any] = {"timeout": timeout_s}
if offset is not None:
params["offset"] = offset
if allowed_updates is not None:
params["allowed_updates"] = allowed_updates
return self._call("getUpdates", params)
def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: Optional[bool] = False,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("send_message received too-long text; chunk it first")
params: Dict[str, Any] = {
"chat_id": chat_id,
"text": text,
}
if disable_notification is not None:
params["disable_notification"] = disable_notification
if reply_to_message_id is not None:
params["reply_to_message_id"] = reply_to_message_id
if entities is not None:
params["entities"] = entities
return self._call("sendMessage", params)
def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("edit_message_text received too-long text")
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
"text": text,
}
if entities is not None:
params["entities"] = entities
return self._call("editMessageText", params)
def delete_message(self, chat_id: int, message_id: int) -> bool:
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
}
res = self._call("deleteMessage", params)
return bool(res)
def send_message_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
chunks = chunk_text(text, limit=chunk_len)
for i, c in enumerate(chunks):
msg = self.send_message(
chat_id=chat_id,
text=c,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
)
sent.append(msg)
return sent
def send_message_markdown_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
rendered_text, entities = render_markdown(text)
chunks = _chunk_text_with_indices(rendered_text, limit=chunk_len)
for i, (chunk, start, end) in enumerate(chunks):
chunk_entities = _slice_entities(entities, start, end) if entities else None
msg = self.send_message(
chat_id=chat_id,
text=chunk,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
entities=chunk_entities,
)
sent.append(msg)
return sent
def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict[str, Any]:
params: Dict[str, Any] = {
"chat_id": chat_id,
"action": action,
}
return self._call("sendChatAction", params)
@dataclass(frozen=True)
class Route:
route_type: str # "exec" | "mcp" | "tmux"
route_id: str # session_id / conversationId / tmux target
meta: Dict[str, Any]
class RouteStore:
"""
Stores mapping: (chat_id, bot_message_id) -> route
so Telegram replies can be routed.
"""
def __init__(self, path: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL;")
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS routes (
chat_id INTEGER NOT NULL,
bot_message_id INTEGER NOT NULL,
route_type TEXT NOT NULL,
route_id TEXT NOT NULL,
meta_json TEXT,
created_at INTEGER NOT NULL,
PRIMARY KEY (chat_id, bot_message_id)
);
"""
)
self._conn.execute(
"CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);"
)
self._conn.commit()
def link(
self,
chat_id: int,
bot_message_id: int,
route_type: str,
route_id: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
meta_json = json.dumps(meta or {}, ensure_ascii=False)
self._conn.execute(
"""
INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at)
VALUES(?, ?, ?, ?, ?, ?)
""",
(chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()),
)
self._conn.commit()
def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]:
cur = self._conn.execute(
"""
SELECT route_type, route_id, meta_json
FROM routes
WHERE chat_id = ? AND bot_message_id = ?
""",
(chat_id, bot_message_id),
)
row = cur.fetchone()
if not row:
return None
route_type, route_id, meta_json = row
try:
meta = json.loads(meta_json) if meta_json else {}
except json.JSONDecodeError:
meta = {}
return Route(route_type=route_type, route_id=route_id, meta=meta)
def close(self) -> None:
self._conn.close()
def parse_allowed_chat_ids(value: str) -> Optional[set[int]]:
"""
Parse a comma-separated chat id string like "123,456".
"""
v = (value or "").strip()
if not v:
return None
out: set[int] = set()
for part in v.split(","):
part = part.strip()
if not part:
continue
out.add(int(part))
return out
def parse_chat_id_list(value: Any) -> Optional[set[int]]:
if value is None:
return None
if isinstance(value, str):
return parse_allowed_chat_ids(value)
if isinstance(value, int):
return {value}
if isinstance(value, (list, tuple, set)):
out: set[int] = set()
for item in value:
if item is None:
continue
if isinstance(item, str):
if not item.strip():
continue
out.add(int(item))
else:
out.add(int(item))
return out or None
return None
def resolve_chat_ids(config: Dict[str, Any]) -> Optional[set[int]]:
chat_ids = parse_chat_id_list(config_get(config, "chat_id"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "allowed_chat_ids"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "startup_chat_ids"))
return chat_ids
__all__ = [
"DEFAULT_CHUNK_LEN",
"TELEGRAM_CONFIG_PATH",
"TELEGRAM_HARD_LIMIT",
"TelegramClient",
"Route",
"RouteStore",
"chunk_text",
"config_get",
"load_telegram_config",
"parse_allowed_chat_ids",
"parse_chat_id_list",
"render_markdown",
"resolve_chat_ids",
]
@@ -0,0 +1,75 @@
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, Optional
from .constants import TELEGRAM_CONFIG_PATH
def _load_toml(path: Path) -> Dict[str, Any]:
if not path.exists():
return {}
import tomllib
return tomllib.loads(path.read_text(encoding="utf-8"))
def load_telegram_config(path: Optional[str] = None) -> Dict[str, Any]:
cfg_path = Path(path) if path else TELEGRAM_CONFIG_PATH
return _load_toml(cfg_path)
def config_get(config: Dict[str, Any], key: str) -> Any:
if key in config:
return config[key]
nested = config.get("telegram")
if isinstance(nested, dict) and key in nested:
return nested[key]
return None
def parse_allowed_chat_ids(value: str) -> Optional[set[int]]:
"""
Parse a comma-separated chat id string like "123,456".
"""
v = (value or "").strip()
if not v:
return None
out: set[int] = set()
for part in v.split(","):
part = part.strip()
if not part:
continue
out.add(int(part))
return out
def parse_chat_id_list(value: Any) -> Optional[set[int]]:
if value is None:
return None
if isinstance(value, str):
return parse_allowed_chat_ids(value)
if isinstance(value, int):
return {value}
if isinstance(value, (list, tuple, set)):
out: set[int] = set()
for item in value:
if item is None:
continue
if isinstance(item, str):
if not item.strip():
continue
out.add(int(item))
else:
out.add(int(item))
return out or None
return None
def resolve_chat_ids(config: Dict[str, Any]) -> Optional[set[int]]:
chat_ids = parse_chat_id_list(config_get(config, "chat_id"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "allowed_chat_ids"))
if chat_ids is None:
chat_ids = parse_chat_id_list(config_get(config, "startup_chat_ids"))
return chat_ids
@@ -0,0 +1,7 @@
from __future__ import annotations
from pathlib import Path
TELEGRAM_HARD_LIMIT = 4096
DEFAULT_CHUNK_LEN = 3500 # leave room for formatting / safety
TELEGRAM_CONFIG_PATH = Path.home() / ".codex" / "telegram.toml"
@@ -16,15 +16,11 @@ from typing import Any, Callable, Dict, Optional, Tuple
import typer
from .bridge_common import (
TelegramClient,
RouteStore,
TELEGRAM_HARD_LIMIT,
config_get,
load_telegram_config,
render_markdown,
resolve_chat_ids,
)
from .config import config_get, load_telegram_config, resolve_chat_ids
from .constants import TELEGRAM_HARD_LIMIT
from .rendering import render_markdown
from .routes import RouteStore
from .telegram_client import TelegramClient
# -------------------- Codex runner --------------------
@@ -15,13 +15,9 @@ from typing import Any, Dict, List, Optional, Tuple
import typer
from .bridge_common import (
TelegramClient,
RouteStore,
config_get,
load_telegram_config,
resolve_chat_ids,
)
from .config import config_get, load_telegram_config, resolve_chat_ids
from .routes import RouteStore
from .telegram_client import TelegramClient
MCP_PROTOCOL_VERSION = "2025-06-18"
@@ -0,0 +1,130 @@
from __future__ import annotations
import re
from typing import Any, Dict, List, Tuple
from markdown_it import MarkdownIt
from sulguk import transform_html
from .constants import DEFAULT_CHUNK_LEN
def render_markdown(md: str) -> Tuple[str, List[Dict[str, Any]]]:
html = MarkdownIt("commonmark", {"html": False}).render(md or "")
rendered = transform_html(html)
text = re.sub("(?m)^(\\s*)\u2022", r"\1-", rendered.text)
# FIX: Telegram requires MessageEntity.language (if present) to be a String.
entities: List[Dict[str, Any]] = []
for e in rendered.entities:
d = dict(e)
if "language" in d and not isinstance(d["language"], str):
d.pop("language", None)
entities.append(d)
return text, entities
def chunk_text(text: str, limit: int = DEFAULT_CHUNK_LEN) -> List[str]:
"""
Telegram hard limit is 4096 chars. Chunk at newlines when possible.
"""
text = text or ""
if len(text) <= limit:
return [text]
out: List[str] = []
buf: List[str] = []
size = 0
for line in text.splitlines(keepends=True):
if len(line) > limit:
# flush current buffer
if buf:
out.append("".join(buf))
buf, size = [], 0
# hard-split this long line
for i in range(0, len(line), limit):
out.append(line[i : i + limit])
continue
if size + len(line) > limit:
out.append("".join(buf))
buf, size = [line], len(line)
else:
buf.append(line)
size += len(line)
if buf:
out.append("".join(buf))
return out
def _chunk_text_with_indices(text: str, limit: int) -> List[Tuple[str, int, int]]:
text = text or ""
if len(text) <= limit:
return [(text, 0, len(text))]
out: List[Tuple[str, int, int]] = []
buf: List[str] = []
size = 0
buf_start = 0
pos = 0
for line in text.splitlines(keepends=True):
line_len = len(line)
line_start = pos
line_end = pos + line_len
if line_len > limit:
if buf:
out.append(("".join(buf), buf_start, line_start))
buf, size = [], 0
for i in range(0, line_len, limit):
part = line[i : i + limit]
out.append((part, line_start + i, line_start + i + len(part)))
pos = line_end
buf_start = pos
continue
if size + line_len > limit:
out.append(("".join(buf), buf_start, line_start))
buf = [line]
size = line_len
buf_start = line_start
else:
if not buf:
buf_start = line_start
buf.append(line)
size += line_len
pos = line_end
if buf:
out.append(("".join(buf), buf_start, pos))
return out
def _slice_entities(entities: List[Dict[str, Any]], start: int, end: int) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for ent in entities:
try:
ent_start = int(ent.get("offset", 0))
ent_len = int(ent.get("length", 0))
except (TypeError, ValueError):
continue
if ent_len <= 0:
continue
ent_end = ent_start + ent_len
if ent_end <= start or ent_start >= end:
continue
new_start = max(ent_start, start)
new_end = min(ent_end, end)
new_len = new_end - new_start
if new_len <= 0:
continue
new_ent = dict(ent)
new_ent["offset"] = new_start - start
new_ent["length"] = new_len
out.append(new_ent)
return out
@@ -0,0 +1,88 @@
from __future__ import annotations
import json
import os
import sqlite3
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional
def _now_unix() -> int:
return int(time.time())
@dataclass(frozen=True)
class Route:
route_type: str # "exec" | "mcp" | "tmux"
route_id: str # session_id / conversationId / tmux target
meta: Dict[str, Any]
class RouteStore:
"""
Stores mapping: (chat_id, bot_message_id) -> route
so Telegram replies can be routed.
"""
def __init__(self, path: str) -> None:
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
self._conn = sqlite3.connect(path, check_same_thread=False)
self._conn.execute("PRAGMA journal_mode=WAL;")
self._conn.execute(
"""
CREATE TABLE IF NOT EXISTS routes (
chat_id INTEGER NOT NULL,
bot_message_id INTEGER NOT NULL,
route_type TEXT NOT NULL,
route_id TEXT NOT NULL,
meta_json TEXT,
created_at INTEGER NOT NULL,
PRIMARY KEY (chat_id, bot_message_id)
);
"""
)
self._conn.execute(
"CREATE INDEX IF NOT EXISTS idx_routes_route_id ON routes(route_id);"
)
self._conn.commit()
def link(
self,
chat_id: int,
bot_message_id: int,
route_type: str,
route_id: str,
meta: Optional[Dict[str, Any]] = None,
) -> None:
meta_json = json.dumps(meta or {}, ensure_ascii=False)
self._conn.execute(
"""
INSERT OR REPLACE INTO routes(chat_id, bot_message_id, route_type, route_id, meta_json, created_at)
VALUES(?, ?, ?, ?, ?, ?)
""",
(chat_id, bot_message_id, route_type, route_id, meta_json, _now_unix()),
)
self._conn.commit()
def resolve(self, chat_id: int, bot_message_id: int) -> Optional[Route]:
cur = self._conn.execute(
"""
SELECT route_type, route_id, meta_json
FROM routes
WHERE chat_id = ? AND bot_message_id = ?
""",
(chat_id, bot_message_id),
)
row = cur.fetchone()
if not row:
return None
route_type, route_id, meta_json = row
try:
meta = json.loads(meta_json) if meta_json else {}
except json.JSONDecodeError:
meta = {}
return Route(route_type=route_type, route_id=route_id, meta=meta)
def close(self) -> None:
self._conn.close()
@@ -0,0 +1,154 @@
from __future__ import annotations
import json
import urllib.error
import urllib.request
from typing import Any, Dict, List, Optional
from .constants import DEFAULT_CHUNK_LEN, TELEGRAM_HARD_LIMIT
from .rendering import chunk_text, render_markdown, _chunk_text_with_indices, _slice_entities
class TelegramClient:
"""
Minimal Telegram Bot API client using standard library (no requests dependency).
"""
def __init__(self, token: str, timeout_s: int = 120) -> None:
if not token:
raise ValueError("Telegram token is empty")
self._base = f"https://api.telegram.org/bot{token}"
self._timeout_s = timeout_s
def _call(self, method: str, params: Dict[str, Any]) -> Any:
url = f"{self._base}/{method}"
data = json.dumps(params).encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=self._timeout_s) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Telegram HTTPError {e.code}: {body}") from e
except urllib.error.URLError as e:
raise RuntimeError(f"Telegram URLError: {e}") from e
if not payload.get("ok"):
raise RuntimeError(f"Telegram API error: {payload}")
return payload["result"]
def get_updates(
self,
offset: Optional[int],
timeout_s: int = 50,
allowed_updates: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
params: Dict[str, Any] = {"timeout": timeout_s}
if offset is not None:
params["offset"] = offset
if allowed_updates is not None:
params["allowed_updates"] = allowed_updates
return self._call("getUpdates", params)
def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: Optional[bool] = False,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("send_message received too-long text; chunk it first")
params: Dict[str, Any] = {
"chat_id": chat_id,
"text": text,
}
if disable_notification is not None:
params["disable_notification"] = disable_notification
if reply_to_message_id is not None:
params["reply_to_message_id"] = reply_to_message_id
if entities is not None:
params["entities"] = entities
return self._call("sendMessage", params)
def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
if len(text) > TELEGRAM_HARD_LIMIT:
raise ValueError("edit_message_text received too-long text")
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
"text": text,
}
if entities is not None:
params["entities"] = entities
return self._call("editMessageText", params)
def delete_message(self, chat_id: int, message_id: int) -> bool:
params: Dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
}
res = self._call("deleteMessage", params)
return bool(res)
def send_message_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
chunks = chunk_text(text, limit=chunk_len)
for i, c in enumerate(chunks):
msg = self.send_message(
chat_id=chat_id,
text=c,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
)
sent.append(msg)
return sent
def send_message_markdown_chunked(
self,
chat_id: int,
text: str,
reply_to_message_id: Optional[int] = None,
disable_notification: bool = False,
chunk_len: int = DEFAULT_CHUNK_LEN,
) -> List[Dict[str, Any]]:
sent: List[Dict[str, Any]] = []
rendered_text, entities = render_markdown(text)
chunks = _chunk_text_with_indices(rendered_text, limit=chunk_len)
for i, (chunk, start, end) in enumerate(chunks):
chunk_entities = _slice_entities(entities, start, end) if entities else None
msg = self.send_message(
chat_id=chat_id,
text=chunk,
reply_to_message_id=(reply_to_message_id if i == 0 else None),
disable_notification=disable_notification,
entities=chunk_entities,
)
sent.append(msg)
return sent
def send_chat_action(self, chat_id: int, action: str = "typing") -> Dict[str, Any]:
params: Dict[str, Any] = {
"chat_id": chat_id,
"action": action,
}
return self._call("sendChatAction", params)
@@ -10,7 +10,9 @@ from typing import Optional
import typer
from .bridge_common import TelegramClient, RouteStore, config_get, load_telegram_config
from .config import config_get, load_telegram_config
from .routes import RouteStore
from .telegram_client import TelegramClient
def run(
@@ -11,13 +11,9 @@ from typing import Optional
import typer
from .bridge_common import (
TelegramClient,
RouteStore,
config_get,
load_telegram_config,
resolve_chat_ids,
)
from .config import config_get, load_telegram_config, resolve_chat_ids
from .routes import RouteStore
from .telegram_client import TelegramClient
def tmux_send_text(target: str, text: str, press_enter: bool = True) -> None: