From ab1ecc277d41e7be8d017e61b230d1e0e313bba6 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Sun, 11 Jan 2026 05:32:31 +0400 Subject: [PATCH] feat(telegram): add file transfer support (#83) --- docs/user-guide.md | 73 ++- readme.md | 7 + src/takopi/cli.py | 1 - src/takopi/ids.py | 2 +- src/takopi/settings.py | 62 +++ src/takopi/telegram/__init__.py | 2 + src/takopi/telegram/backend.py | 33 +- src/takopi/telegram/bridge.py | 913 +++++++++++++++++++++++++++++++- src/takopi/telegram/client.py | 262 ++++++++- src/takopi/telegram/files.py | 153 ++++++ src/takopi/telegram/types.py | 11 + tests/test_telegram_backend.py | 112 ++++ tests/test_telegram_bridge.py | 293 +++++++++- tests/test_telegram_incoming.py | 149 +++++- tests/test_telegram_queue.py | 22 + 15 files changed, 2047 insertions(+), 48 deletions(-) create mode 100644 src/takopi/telegram/files.py create mode 100644 tests/test_telegram_backend.py diff --git a/docs/user-guide.md b/docs/user-guide.md index 1382657..320816e 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -333,7 +333,67 @@ When you send a voice note, takopi transcribes it and runs the result as a norma --- -## 9. Configuration reference +--- + +## 9. File transfer + +Upload files into the active repo/worktree or fetch files back into Telegram. + +### Upload a file + +Send a document with a caption: + +``` +/file put +``` + +Examples: + +``` +/file put docs/spec.pdf +/file put /happy-gadgets @feat/camera assets/logo.png +``` + +If you send a file **without a caption**, takopi saves it to: + +``` +incoming/ +``` + +Use `--force` to overwrite an existing file: + +``` +/file put --force docs/spec.pdf +``` + +### Fetch a file + +Send: + +``` +/file get +``` + +Directories are zipped automatically. + +### File transfer config + +```toml +[transports.telegram.files] +enabled = true +auto_put = true +uploads_dir = "incoming" +allowed_user_ids = [123456789] +deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] +``` + +Notes: +- File transfer is **disabled by default**. +- If `allowed_user_ids` is empty, private chats are allowed and group usage requires admin privileges. + +--- + +## 10. Configuration reference Full example with all options: @@ -349,6 +409,13 @@ bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" chat_id = 123456789 voice_transcription = true +[transports.telegram.files] +enabled = true +auto_put = true +uploads_dir = "incoming" +allowed_user_ids = [123456789] +deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] + [transports.telegram.topics] enabled = true scope = "auto" @@ -370,7 +437,7 @@ worktree_base = "develop" --- -## 10. Command cheatsheet +## 11. Command cheatsheet ### Message directives @@ -386,6 +453,8 @@ worktree_base = "develop" | Command | Description | |---------|-------------| | `/cancel` | Reply to the progress message to stop the current run | +| `/file put ` | Upload a document into the repo/worktree | +| `/file get ` | Fetch a file (directories are zipped) | | `/topic @branch` | Create/bind a topic | | `/ctx` | Show current context | | `/ctx set @branch` | Update context binding | diff --git a/readme.md b/readme.md index 041ba9e..2e89414 100644 --- a/readme.md +++ b/readme.md @@ -20,6 +20,8 @@ parallel runs across threads, per thread queue support. optional voice note transcription for telegram (routes transcript like typed text). +telegram file transfer: upload documents into repos (`/file put`) and fetch files back (`/file get`). + telegram forum topics: bind a topic to a project/branch and keep per-topic session resumes. per-project chat routing: assign different telegram chats to different projects. @@ -71,6 +73,11 @@ bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" chat_id = 123456789 voice_transcription = true +[transports.telegram.files] +enabled = true +auto_put = true +allowed_user_ids = [123456789] + [transports.telegram.topics] enabled = true diff --git a/src/takopi/cli.py b/src/takopi/cli.py index bedd617..458f0ff 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -274,7 +274,6 @@ def _run_auto_router( ) -> None: if debug: os.environ.setdefault("TAKOPI_LOG_FILE", "debug.log") - os.environ.setdefault("TAKOPI_LOG_FORMAT", "json") setup_logging(debug=debug) lock_handle: LockHandle | None = None try: diff --git a/src/takopi/ids.py b/src/takopi/ids.py index cf19a37..eab937b 100644 --- a/src/takopi/ids.py +++ b/src/takopi/ids.py @@ -6,7 +6,7 @@ ID_PATTERN = r"^[a-z0-9_]{1,32}$" _ID_RE = re.compile(ID_PATTERN) RESERVED_CLI_COMMANDS = frozenset({"init", "plugins"}) -RESERVED_CHAT_COMMANDS = frozenset({"cancel"}) +RESERVED_CHAT_COMMANDS = frozenset({"cancel", "file"}) RESERVED_ENGINE_IDS = RESERVED_CLI_COMMANDS | RESERVED_CHAT_COMMANDS RESERVED_COMMAND_IDS = RESERVED_CLI_COMMANDS | RESERVED_CHAT_COMMANDS diff --git a/src/takopi/settings.py b/src/takopi/settings.py index 60a3553..9ed1105 100644 --- a/src/takopi/settings.py +++ b/src/takopi/settings.py @@ -46,6 +46,67 @@ class TelegramTopicsSettings(BaseModel): return cleaned +class TelegramFilesSettings(BaseModel): + model_config = ConfigDict(extra="forbid") + + enabled: bool = False + auto_put: bool = True + uploads_dir: str = "incoming" + allowed_user_ids: list[int] = Field(default_factory=list) + deny_globs: list[str] = Field( + default_factory=lambda: [ + ".git/**", + ".env", + ".envrc", + "**/*.pem", + "**/.ssh/**", + ] + ) + + @field_validator("uploads_dir", mode="before") + @classmethod + def _validate_uploads_dir(cls, value: Any) -> Any: + if value is None: + raise ValueError("files.uploads_dir must be a string") + if not isinstance(value, str): + raise ValueError("files.uploads_dir must be a string") + cleaned = value.strip() + if not cleaned: + raise ValueError("files.uploads_dir must be a non-empty string") + if Path(cleaned).is_absolute(): + raise ValueError("files.uploads_dir must be a relative path") + return cleaned + + @field_validator("allowed_user_ids", mode="before") + @classmethod + def _validate_allowed_users(cls, value: Any) -> Any: + if value is None: + return [] + if not isinstance(value, list): + raise ValueError("files.allowed_user_ids must be a list of integers") + for item in value: + if isinstance(item, bool) or not isinstance(item, int): + raise ValueError("files.allowed_user_ids must be a list of integers") + return value + + @field_validator("deny_globs", mode="before") + @classmethod + def _validate_deny_globs(cls, value: Any) -> Any: + if value is None: + return [] + if not isinstance(value, list): + raise ValueError("files.deny_globs must be a list of strings") + cleaned: list[str] = [] + for item in value: + if not isinstance(item, str): + raise ValueError("files.deny_globs must be a list of strings") + stripped = item.strip() + if not stripped: + raise ValueError("files.deny_globs entries must be non-empty strings") + cleaned.append(stripped) + return cleaned + + class TelegramTransportSettings(BaseModel): model_config = ConfigDict(extra="forbid") @@ -53,6 +114,7 @@ class TelegramTransportSettings(BaseModel): chat_id: int | None = None voice_transcription: bool = False topics: TelegramTopicsSettings = Field(default_factory=TelegramTopicsSettings) + files: TelegramFilesSettings = Field(default_factory=TelegramFilesSettings) @field_validator("bot_token", mode="before") @classmethod diff --git a/src/takopi/telegram/__init__.py b/src/takopi/telegram/__init__.py index c633939..e17862a 100644 --- a/src/takopi/telegram/__init__.py +++ b/src/takopi/telegram/__init__.py @@ -3,6 +3,7 @@ from .client import parse_incoming_update, poll_incoming from .types import ( TelegramCallbackQuery, + TelegramDocument, TelegramIncomingMessage, TelegramIncomingUpdate, TelegramVoice, @@ -10,6 +11,7 @@ from .types import ( __all__ = [ "TelegramCallbackQuery", + "TelegramDocument", "TelegramIncomingMessage", "TelegramIncomingUpdate", "TelegramVoice", diff --git a/src/takopi/telegram/backend.py b/src/takopi/telegram/backend.py index cd7fd37..10e2f68 100644 --- a/src/takopi/telegram/backend.py +++ b/src/takopi/telegram/backend.py @@ -11,13 +11,19 @@ from ..config import ConfigError from ..logging import get_logger from pydantic import ValidationError -from ..settings import TelegramTopicsSettings, load_settings, require_telegram_config +from ..settings import ( + TelegramFilesSettings, + TelegramTopicsSettings, + load_settings, + require_telegram_config, +) from ..transports import SetupResult, TransportBackend from ..transport_runtime import TransportRuntime from .bridge import ( TelegramBridgeConfig, TelegramPresenter, TelegramTransport, + TelegramFilesConfig, TelegramTopicsConfig, TelegramVoiceTranscriptionConfig, run_main_loop, @@ -79,6 +85,29 @@ def _build_topics_config( ) +def _build_files_config( + transport_config: dict[str, object], + *, + config_path: Path, +) -> TelegramFilesConfig: + raw = transport_config.get("files") or {} + if not isinstance(raw, dict): + raise ConfigError( + f"Invalid `transports.telegram.files` in {config_path}; expected a table." + ) + try: + settings = TelegramFilesSettings.model_validate(raw) + except ValidationError as exc: + raise ConfigError(f"Invalid files config in {config_path}: {exc}") from exc + return TelegramFilesConfig( + enabled=settings.enabled, + auto_put=settings.auto_put, + uploads_dir=settings.uploads_dir, + allowed_user_ids=frozenset(settings.allowed_user_ids), + deny_globs=tuple(settings.deny_globs), + ) + + class TelegramBackend(TransportBackend): id = "telegram" description = "Telegram bot" @@ -135,6 +164,7 @@ class TelegramBackend(TransportBackend): ) voice_transcription = _build_voice_transcription_config(transport_config) topics = _build_topics_config(transport_config, config_path=config_path) + files = _build_files_config(transport_config, config_path=config_path) cfg = TelegramBridgeConfig( bot=bot, runtime=runtime, @@ -143,6 +173,7 @@ class TelegramBackend(TransportBackend): exec_cfg=exec_cfg, voice_transcription=voice_transcription, topics=topics, + files=files, ) async def run_loop() -> None: diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py index 7f35613..9cc52e5 100644 --- a/src/takopi/telegram/bridge.py +++ b/src/takopi/telegram/bridge.py @@ -1,9 +1,9 @@ from __future__ import annotations import os -import shlex from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from dataclasses import dataclass +from functools import partial from pathlib import Path import anyio @@ -39,10 +39,26 @@ from ..scheduler import ThreadJob, ThreadScheduler from ..transport import MessageRef, RenderedMessage, SendOptions, Transport from ..plugins import COMMAND_GROUP, list_entrypoints from ..utils.paths import reset_run_base_dir, set_run_base_dir -from ..transport_runtime import TransportRuntime +from ..transport_runtime import ResolvedMessage, TransportRuntime from .client import BotClient, poll_incoming +from .files import ( + default_upload_name, + default_upload_path, + deny_reason, + file_get_usage, + file_put_usage, + format_bytes, + normalize_relative_path, + parse_file_command, + parse_file_prompt, + resolve_path_within_root, + split_command_args, + write_bytes_atomic, + zip_directory, +) from .types import ( TelegramCallbackQuery, + TelegramDocument, TelegramIncomingMessage, TelegramIncomingUpdate, ) @@ -56,6 +72,7 @@ _MAX_BOT_COMMANDS = 100 _OPENAI_AUDIO_MAX_BYTES = 25 * 1024 * 1024 _OPENAI_TRANSCRIPTION_MODEL = "gpt-4o-mini-transcribe" _OPENAI_TRANSCRIPTION_CHUNKING = "auto" +_MEDIA_GROUP_DEBOUNCE_S = 1.0 CANCEL_CALLBACK_DATA = "takopi:cancel" CANCEL_MARKUP = { "inline_keyboard": [[{"text": "cancel", "callback_data": CANCEL_CALLBACK_DATA}]] @@ -184,14 +201,14 @@ def _format_context(runtime: TransportRuntime, context: RunContext | None) -> st def _usage_ctx_set(*, chat_project: str | None) -> str: if chat_project is not None: - return "usage: /ctx set [@branch]" - return "usage: /ctx set [@branch]" + return "usage: `/ctx set [@branch]`" + return "usage: `/ctx set [@branch]`" def _usage_topic(*, chat_project: str | None) -> str: if chat_project is not None: - return "usage: /topic @branch" - return "usage: /topic @branch" + return "usage: `/topic @branch`" + return "usage: `/topic @branch`" def _parse_project_branch_args( @@ -202,7 +219,7 @@ def _parse_project_branch_args( require_branch: bool, chat_project: str | None, ) -> tuple[RunContext | None, str | None]: - tokens = _split_command_args(args_text) + tokens = split_command_args(args_text) if not tokens: return ( None, @@ -272,9 +289,7 @@ def _format_ctx_status( ctx_usage = ( _usage_ctx_set(chat_project=chat_project).removeprefix("usage: ").strip() ) - lines.append( - f"note: unbound topic — bind with `{topic_usage}` or `{ctx_usage}`" - ) + lines.append(f"note: unbound topic — bind with {topic_usage} or {ctx_usage}") sessions = None if snapshot is not None and snapshot.sessions: sessions = ", ".join(sorted(snapshot.sessions)) @@ -330,6 +345,9 @@ def _build_bot_commands(runtime: TransportRuntime) -> list[dict[str, str]]: description = backend.description or f"command: {cmd}" commands.append({"command": cmd, "description": description}) seen.add(cmd) + if "file" not in seen: + commands.append({"command": "file", "description": "upload or fetch files"}) + seen.add("file") if "cancel" not in seen: commands.append({"command": "cancel", "description": "cancel run"}) if len(commands) > _MAX_BOT_COMMANDS: @@ -454,6 +472,23 @@ class TelegramVoiceTranscriptionConfig: enabled: bool = False +@dataclass(frozen=True) +class TelegramFilesConfig: + enabled: bool = False + auto_put: bool = True + uploads_dir: str = "incoming" + max_upload_bytes: int = 20 * 1024 * 1024 + max_download_bytes: int = 50 * 1024 * 1024 + allowed_user_ids: frozenset[int] = frozenset() + deny_globs: tuple[str, ...] = ( + ".git/**", + ".env", + ".envrc", + "**/*.pem", + "**/.ssh/**", + ) + + @dataclass(frozen=True) class TelegramTopicsConfig: enabled: bool = False @@ -565,6 +600,7 @@ class TelegramBridgeConfig: startup_msg: str exec_cfg: ExecBridgeConfig voice_transcription: TelegramVoiceTranscriptionConfig | None = None + files: TelegramFilesConfig = TelegramFilesConfig() chat_ids: tuple[int, ...] | None = None topics: TelegramTopicsConfig = TelegramTopicsConfig() @@ -586,9 +622,10 @@ async def _send_plain( thread_id: int | None = None, ) -> None: reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id) + rendered_text, entities = prepare_telegram(MarkdownParts(header=text)) await transport.send( channel_id=chat_id, - message=RenderedMessage(text=text), + message=RenderedMessage(text=rendered_text, extra={"entities": entities}), options=SendOptions(reply_to=reply_to, notify=notify, thread_id=thread_id), ) @@ -822,6 +859,764 @@ async def _transcribe_voice( return transcript +@dataclass(slots=True) +class _FilePutPlan: + resolved: ResolvedMessage + run_root: Path + path_value: str | None + force: bool + + +@dataclass(slots=True) +class _FilePutResult: + name: str + rel_path: Path | None + size: int | None + error: str | None + + +@dataclass(slots=True) +class _MediaGroupState: + messages: list[TelegramIncomingMessage] + token: int = 0 + + +async def _check_file_permissions( + cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage +) -> bool: + sender_id = msg.sender_id + if sender_id is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="cannot verify sender for file transfer.", + thread_id=msg.thread_id, + ) + return False + if cfg.files.allowed_user_ids: + if sender_id not in cfg.files.allowed_user_ids: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="file transfer is not allowed for this user.", + thread_id=msg.thread_id, + ) + return False + return True + is_private = msg.chat_type == "private" + if msg.chat_type is None: + is_private = msg.chat_id > 0 + if is_private: + return True + member = await cfg.bot.get_chat_member(msg.chat_id, sender_id) + if not isinstance(member, dict): + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to verify file transfer permissions.", + thread_id=msg.thread_id, + ) + return False + status = member.get("status") + if status in {"creator", "administrator"}: + return True + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="file transfer is restricted to group admins.", + thread_id=msg.thread_id, + ) + return False + + +async def _prepare_file_put_plan( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + args_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> _FilePutPlan | None: + if not await _check_file_permissions(cfg, msg): + return None + try: + resolved = cfg.runtime.resolve_message( + text=args_text, + reply_text=msg.reply_to_text, + ambient_context=ambient_context, + chat_id=msg.chat_id, + ) + except DirectiveError as exc: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"error:\n{exc}", + thread_id=msg.thread_id, + ) + return None + topic_key = _topic_key(msg, cfg) if topic_store is not None else None + await _maybe_update_topic_context( + cfg=cfg, + topic_store=topic_store, + topic_key=topic_key, + context=resolved.context, + context_source=resolved.context_source, + ) + if resolved.context is None or resolved.context.project is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="no project context available for file upload.", + thread_id=msg.thread_id, + ) + return None + try: + run_root = cfg.runtime.resolve_run_cwd(resolved.context) + except ConfigError as exc: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"error:\n{exc}", + thread_id=msg.thread_id, + ) + return None + if run_root is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="no project context available for file upload.", + thread_id=msg.thread_id, + ) + return None + path_value, force, error = parse_file_prompt(resolved.prompt, allow_empty=True) + if error is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=error, + thread_id=msg.thread_id, + ) + return None + return _FilePutPlan( + resolved=resolved, + run_root=run_root, + path_value=path_value, + force=force, + ) + + +async def _save_document_payload( + cfg: TelegramBridgeConfig, + *, + document: TelegramDocument, + run_root: Path, + rel_path: Path | None, + base_dir: Path | None, + force: bool, +) -> _FilePutResult: + name = default_upload_name(document.file_name, None) + if ( + document.file_size is not None + and document.file_size > cfg.files.max_upload_bytes + ): + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="file is too large to upload.", + ) + file_info = await cfg.bot.get_file(document.file_id) + if not isinstance(file_info, dict): + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="failed to fetch file metadata.", + ) + file_path = file_info.get("file_path") + if not isinstance(file_path, str) or not file_path: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="failed to fetch file metadata.", + ) + name = default_upload_name(document.file_name, file_path) + resolved_path = rel_path + if resolved_path is None: + if base_dir is None: + resolved_path = default_upload_path( + cfg.files.uploads_dir, document.file_name, file_path + ) + else: + resolved_path = base_dir / name + deny_rule = deny_reason(resolved_path, cfg.files.deny_globs) + if deny_rule is not None: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error=f"path denied by rule: {deny_rule}", + ) + target = resolve_path_within_root(run_root, resolved_path) + if target is None: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="upload path escapes the repo root.", + ) + if target.exists(): + if target.is_dir(): + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="upload target is a directory.", + ) + if not force: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="file already exists; use --force to overwrite.", + ) + payload = await cfg.bot.download_file(file_path) + if payload is None: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="failed to download file.", + ) + if len(payload) > cfg.files.max_upload_bytes: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error="file is too large to upload.", + ) + try: + write_bytes_atomic(target, payload) + except OSError as exc: + return _FilePutResult( + name=name, + rel_path=None, + size=None, + error=f"failed to write file: {exc}", + ) + return _FilePutResult( + name=name, + rel_path=resolved_path, + size=len(payload), + error=None, + ) + + +async def _maybe_update_topic_context( + *, + cfg: TelegramBridgeConfig, + topic_store: TopicStateStore | None, + topic_key: tuple[int, int] | None, + context: RunContext | None, + context_source: str, +) -> None: + if ( + topic_store is None + or topic_key is None + or context is None + or context_source != "directives" + ): + return + await topic_store.set_context(topic_key[0], topic_key[1], context) + await _maybe_rename_topic( + cfg, + topic_store, + chat_id=topic_key[0], + thread_id=topic_key[1], + context=context, + ) + + +async def _handle_file_command( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + args_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> None: + command, rest, error = parse_file_command(args_text) + if error is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=error, + thread_id=msg.thread_id, + ) + return + if command == "put": + await _handle_file_put(cfg, msg, rest, ambient_context, topic_store) + else: + await _handle_file_get(cfg, msg, rest, ambient_context, topic_store) + + +async def _handle_file_put_default( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> None: + await _handle_file_put(cfg, msg, "", ambient_context, topic_store) + + +async def _handle_file_put( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + args_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> None: + document = msg.document + if document is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=file_put_usage(), + thread_id=msg.thread_id, + ) + return + plan = await _prepare_file_put_plan( + cfg, + msg, + args_text, + ambient_context, + topic_store, + ) + if plan is None: + return + rel_path: Path | None = None + if plan.path_value: + rel_path = normalize_relative_path(plan.path_value) + if rel_path is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="invalid upload path.", + thread_id=msg.thread_id, + ) + return + result = await _save_document_payload( + cfg, + document=document, + run_root=plan.run_root, + rel_path=rel_path, + base_dir=None, + force=plan.force, + ) + if result.error is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=result.error, + thread_id=msg.thread_id, + ) + return + if result.rel_path is None or result.size is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to save file.", + thread_id=msg.thread_id, + ) + return + context_label = _format_context(cfg.runtime, plan.resolved.context) + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=( + f"saved `{result.rel_path.as_posix()}` " + f"in `{context_label}` ({format_bytes(result.size)})" + ), + thread_id=msg.thread_id, + ) + + +async def _handle_file_put_group( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + args_text: str, + messages: Sequence[TelegramIncomingMessage], + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> None: + documents = [item.document for item in messages if item.document is not None] + if not documents: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=file_put_usage(), + thread_id=msg.thread_id, + ) + return + plan = await _prepare_file_put_plan( + cfg, + msg, + args_text, + ambient_context, + topic_store, + ) + if plan is None: + return + base_dir: Path | None = None + if plan.path_value: + base_dir = normalize_relative_path(plan.path_value) + if base_dir is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="invalid upload path.", + thread_id=msg.thread_id, + ) + return + deny_rule = deny_reason(base_dir, cfg.files.deny_globs) + if deny_rule is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"path denied by rule: {deny_rule}", + thread_id=msg.thread_id, + ) + return + base_target = resolve_path_within_root(plan.run_root, base_dir) + if base_target is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="upload path escapes the repo root.", + thread_id=msg.thread_id, + ) + return + if base_target.exists() and not base_target.is_dir(): + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="upload path is a file.", + thread_id=msg.thread_id, + ) + return + saved: list[_FilePutResult] = [] + failed: list[_FilePutResult] = [] + for document in documents: + result = await _save_document_payload( + cfg, + document=document, + run_root=plan.run_root, + rel_path=None, + base_dir=base_dir, + force=plan.force, + ) + if result.error is None: + saved.append(result) + else: + failed.append(result) + context_label = _format_context(cfg.runtime, plan.resolved.context) + total_bytes = sum(item.size or 0 for item in saved) + dir_label: Path | None = base_dir + if dir_label is None and saved: + first_path = saved[0].rel_path + if first_path is not None: + dir_label = first_path.parent + if saved: + saved_names = ", ".join(f"`{item.name}`" for item in saved) + if dir_label is not None: + dir_text = dir_label.as_posix() + if not dir_text.endswith("/"): + dir_text = f"{dir_text}/" + text = ( + f"saved {saved_names} to `{dir_text}` " + f"in `{context_label}` ({format_bytes(total_bytes)})" + ) + else: + text = ( + f"saved {saved_names} in `{context_label}` " + f"({format_bytes(total_bytes)})" + ) + else: + text = "failed to upload files." + if failed: + errors = ", ".join( + f"`{item.name}` ({item.error})" for item in failed if item.error is not None + ) + if errors: + text = f"{text}\n\nfailed: {errors}" + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=text, + thread_id=msg.thread_id, + ) + + +async def _handle_media_group( + cfg: TelegramBridgeConfig, + messages: Sequence[TelegramIncomingMessage], + topic_store: TopicStateStore | None, +) -> None: + if not messages: + return + ordered = sorted(messages, key=lambda item: item.message_id) + command_msg = next( + (item for item in ordered if item.text.strip()), + ordered[0], + ) + topic_key = _topic_key(command_msg, cfg) if topic_store is not None else None + chat_project = ( + _topics_chat_project(cfg, command_msg.chat_id) if cfg.topics.enabled else None + ) + bound_context = ( + await topic_store.get_context(*topic_key) + if topic_store is not None and topic_key is not None + else None + ) + ambient_context = _merge_topic_context( + chat_project=chat_project, + bound=bound_context, + ) + command_id, args_text = _parse_slash_command(command_msg.text) + if command_id == "file": + if not cfg.files.enabled: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=command_msg.chat_id, + user_msg_id=command_msg.message_id, + text=("file transfer disabled; enable `[transports.telegram.files]`."), + thread_id=command_msg.thread_id, + ) + return + command, rest, error = parse_file_command(args_text) + if error is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=command_msg.chat_id, + user_msg_id=command_msg.message_id, + text=error, + thread_id=command_msg.thread_id, + ) + return + if command == "put": + await _handle_file_put_group( + cfg, + command_msg, + rest, + ordered, + ambient_context, + topic_store, + ) + else: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=command_msg.chat_id, + user_msg_id=command_msg.message_id, + text=file_put_usage(), + thread_id=command_msg.thread_id, + ) + return + if cfg.files.enabled and cfg.files.auto_put and not command_msg.text.strip(): + await _handle_file_put_group( + cfg, + command_msg, + "", + ordered, + ambient_context, + topic_store, + ) + return + if cfg.files.enabled: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=command_msg.chat_id, + user_msg_id=command_msg.message_id, + text=file_put_usage(), + thread_id=command_msg.thread_id, + ) + + +async def _handle_file_get( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, + args_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> None: + if not await _check_file_permissions(cfg, msg): + return + try: + resolved = cfg.runtime.resolve_message( + text=args_text, + reply_text=msg.reply_to_text, + ambient_context=ambient_context, + chat_id=msg.chat_id, + ) + except DirectiveError as exc: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"error:\n{exc}", + thread_id=msg.thread_id, + ) + return + topic_key = _topic_key(msg, cfg) if topic_store is not None else None + await _maybe_update_topic_context( + cfg=cfg, + topic_store=topic_store, + topic_key=topic_key, + context=resolved.context, + context_source=resolved.context_source, + ) + if resolved.context is None or resolved.context.project is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="no project context available for file fetch.", + thread_id=msg.thread_id, + ) + return + try: + run_root = cfg.runtime.resolve_run_cwd(resolved.context) + except ConfigError as exc: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"error:\n{exc}", + thread_id=msg.thread_id, + ) + return + if run_root is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="no project context available for file fetch.", + thread_id=msg.thread_id, + ) + return + path_value, _, error = parse_file_prompt(resolved.prompt, allow_empty=False) + if error is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=file_get_usage(), + thread_id=msg.thread_id, + ) + return + rel_path = normalize_relative_path(path_value or "") + if rel_path is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="invalid file path.", + thread_id=msg.thread_id, + ) + return + deny_rule = deny_reason(rel_path, cfg.files.deny_globs) + if deny_rule is not None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text=f"path denied by rule: {deny_rule}", + thread_id=msg.thread_id, + ) + return + target = resolve_path_within_root(run_root, rel_path) + if target is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="requested path escapes the repo root.", + thread_id=msg.thread_id, + ) + return + if not target.exists(): + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="file not found.", + thread_id=msg.thread_id, + ) + return + payload: bytes + filename: str + if target.is_dir(): + payload = zip_directory(run_root, rel_path, cfg.files.deny_globs) + filename = f"{rel_path.name or 'archive'}.zip" + else: + size = target.stat().st_size + if size > cfg.files.max_download_bytes: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="file is too large to send.", + thread_id=msg.thread_id, + ) + return + payload = target.read_bytes() + filename = target.name + if len(payload) > cfg.files.max_download_bytes: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="file is too large to send.", + thread_id=msg.thread_id, + ) + return + sent = await cfg.bot.send_document( + chat_id=msg.chat_id, + filename=filename, + content=payload, + reply_to_message_id=msg.message_id, + message_thread_id=msg.thread_id, + ) + if sent is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to send file.", + thread_id=msg.thread_id, + ) + return + + def _topic_title( *, cfg: TelegramBridgeConfig, runtime: TransportRuntime, context: RunContext ) -> str: @@ -894,7 +1689,7 @@ async def _handle_ctx_command( thread_id=msg.thread_id, ) return - tokens = _split_command_args(args_text) + tokens = split_command_args(args_text) action = tokens[0].lower() if tokens else "show" if action in {"show", ""}: snapshot = await store.get_thread(*tkey) @@ -980,7 +1775,7 @@ async def _handle_ctx_command( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, - text="unknown /ctx command. use /ctx, /ctx set, or /ctx clear.", + text="unknown `/ctx` command. use `/ctx`, `/ctx set`, or `/ctx clear`.", thread_id=msg.thread_id, ) @@ -1090,7 +1885,7 @@ async def _handle_topic_command( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, - text=f"created topic {title!r}.", + text=f"created topic `{title}`.", thread_id=msg.thread_id, ) await cfg.exec_cfg.transport.send( @@ -1356,15 +2151,6 @@ async def _run_engine( clear_context() -def _split_command_args(text: str) -> tuple[str, ...]: - if not text.strip(): - return () - try: - return tuple(shlex.split(text)) - except ValueError: - return tuple(text.split()) - - class _CaptureTransport: def __init__(self) -> None: self._next_id = 1 @@ -1565,7 +2351,7 @@ async def _dispatch_command( command=command_id, text=text, args_text=args_text, - args=_split_command_args(args_text), + args=split_command_args(args_text), message=message_ref, reply_to=reply_ref, reply_text=msg.reply_to_text, @@ -1608,6 +2394,7 @@ async def run_main_loop( dict(transport_config) if transport_config is not None else None ) topic_store: TopicStateStore | None = None + media_groups: dict[tuple[int, str], _MediaGroupState] = {} try: if cfg.topics.enabled: @@ -1734,6 +2521,23 @@ async def run_main_loop( scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job) + async def flush_media_group(key: tuple[int, str]) -> None: + while True: + state = media_groups.get(key) + if state is None: + return + token = state.token + await anyio.sleep(_MEDIA_GROUP_DEBOUNCE_S) + state = media_groups.get(key) + if state is None: + return + if state.token != token: + continue + messages = list(state.messages) + del media_groups[key] + await _handle_media_group(cfg, messages, topic_store) + return + async for msg in poller(cfg): if isinstance(msg, TelegramCallbackQuery): if msg.data == CANCEL_CALLBACK_DATA: @@ -1770,11 +2574,72 @@ async def run_main_loop( chat_project=chat_project, bound=bound_context ) + if ( + cfg.files.enabled + and msg.document is not None + and msg.media_group_id is not None + ): + key = (chat_id, msg.media_group_id) + state = media_groups.get(key) + if state is None: + state = _MediaGroupState(messages=[]) + media_groups[key] = state + tg.start_soon(flush_media_group, key) + state.messages.append(msg) + state.token += 1 + continue + if _is_cancel_command(text): tg.start_soon(_handle_cancel, cfg, msg, running_tasks) continue command_id, args_text = _parse_slash_command(text) + if command_id == "file": + if not cfg.files.enabled: + tg.start_soon( + partial( + _send_plain, + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text=( + "file transfer disabled; enable " + "`[transports.telegram.files]`." + ), + thread_id=msg.thread_id, + ) + ) + else: + tg.start_soon( + _handle_file_command, + cfg, + msg, + args_text, + ambient_context, + topic_store, + ) + continue + if msg.document is not None: + if cfg.files.enabled and cfg.files.auto_put and not text.strip(): + tg.start_soon( + _handle_file_put_default, + cfg, + msg, + ambient_context, + topic_store, + ) + elif cfg.files.enabled: + tg.start_soon( + partial( + _send_plain, + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text=file_put_usage(), + thread_id=msg.thread_id, + ) + ) + continue if ( cfg.topics.enabled and topic_store is not None diff --git a/src/takopi/telegram/client.py b/src/takopi/telegram/client.py index 39dfc21..a71084c 100644 --- a/src/takopi/telegram/client.py +++ b/src/takopi/telegram/client.py @@ -21,6 +21,7 @@ import anyio from ..logging import get_logger from .types import ( TelegramCallbackQuery, + TelegramDocument, TelegramIncomingMessage, TelegramIncomingUpdate, TelegramVoice, @@ -74,31 +75,97 @@ def _parse_incoming_message( chat_id: int | None = None, chat_ids: set[int] | None = None, ) -> TelegramIncomingMessage | None: - text = msg.get("text") - voice_payload: TelegramVoice | None = None - if not isinstance(text, str): - voice = msg.get("voice") - if not isinstance(voice, dict): - return None - file_id = voice.get("file_id") + def _parse_document_payload(payload: dict[str, Any]) -> TelegramDocument | None: + file_id = payload.get("file_id") if not isinstance(file_id, str) or not file_id: return None - voice_payload = TelegramVoice( + return TelegramDocument( file_id=file_id, - mime_type=voice.get("mime_type") - if isinstance(voice.get("mime_type"), str) + file_name=payload.get("file_name") + if isinstance(payload.get("file_name"), str) else None, - file_size=voice.get("file_size") - if isinstance(voice.get("file_size"), int) - and not isinstance(voice.get("file_size"), bool) + mime_type=payload.get("mime_type") + if isinstance(payload.get("mime_type"), str) else None, - duration=voice.get("duration") - if isinstance(voice.get("duration"), int) - and not isinstance(voice.get("duration"), bool) + file_size=payload.get("file_size") + if isinstance(payload.get("file_size"), int) + and not isinstance(payload.get("file_size"), bool) else None, - raw=voice, + raw=payload, ) + + raw_text = msg.get("text") + text = raw_text if isinstance(raw_text, str) else None + caption = msg.get("caption") + if text is None and isinstance(caption, str): + text = caption + if text is None: text = "" + voice_payload: TelegramVoice | None = None + voice = msg.get("voice") + if isinstance(voice, dict): + file_id = voice.get("file_id") + if not isinstance(file_id, str) or not file_id: + file_id = None + if file_id is not None: + voice_payload = TelegramVoice( + file_id=file_id, + mime_type=voice.get("mime_type") + if isinstance(voice.get("mime_type"), str) + else None, + file_size=voice.get("file_size") + if isinstance(voice.get("file_size"), int) + and not isinstance(voice.get("file_size"), bool) + else None, + duration=voice.get("duration") + if isinstance(voice.get("duration"), int) + and not isinstance(voice.get("duration"), bool) + else None, + raw=voice, + ) + if not isinstance(raw_text, str) and not isinstance(caption, str): + text = "" + document_payload: TelegramDocument | None = None + document = msg.get("document") + if isinstance(document, dict): + document_payload = _parse_document_payload(document) + if document_payload is None: + video = msg.get("video") + if isinstance(video, dict): + document_payload = _parse_document_payload(video) + if document_payload is None: + photo = msg.get("photo") + if isinstance(photo, list): + best: dict[str, Any] | None = None + best_score = -1 + for item in photo: + if not isinstance(item, dict): + continue + file_id = item.get("file_id") + if not isinstance(file_id, str) or not file_id: + continue + size = item.get("file_size") + if isinstance(size, int) and not isinstance(size, bool): + score = size + else: + width = item.get("width") + height = item.get("height") + if isinstance(width, int) and isinstance(height, int): + score = width * height + else: + score = 0 + if score > best_score: + best_score = score + best = item + if best is not None: + document_payload = _parse_document_payload(best) + if document_payload is None: + sticker = msg.get("sticker") + if isinstance(sticker, dict): + document_payload = _parse_document_payload(sticker) + has_text = isinstance(raw_text, str) or isinstance(caption, str) + if not has_text and voice_payload is None and document_payload is None: + return None chat = msg.get("chat") if not isinstance(chat, dict): return None @@ -135,6 +202,9 @@ def _parse_incoming_message( if isinstance(sender, dict) and isinstance(sender.get("id"), int) else None ) + media_group_id = msg.get("media_group_id") + if not isinstance(media_group_id, str): + media_group_id = None thread_id = msg.get("message_thread_id") if isinstance(thread_id, bool) or not isinstance(thread_id, int): thread_id = None @@ -149,11 +219,13 @@ def _parse_incoming_message( reply_to_message_id=reply_to_message_id, reply_to_text=reply_to_text, sender_id=sender_id, + media_group_id=media_group_id, thread_id=thread_id, is_topic_message=is_topic_message, chat_type=chat_type, is_forum=is_forum, voice=voice_payload, + document=document_payload, raw=msg, ) @@ -259,6 +331,17 @@ class BotClient(Protocol): replace_message_id: int | None = None, ) -> dict | None: ... + async def send_document( + self, + chat_id: int, + filename: str, + content: bytes, + reply_to_message_id: int | None = None, + message_thread_id: int | None = None, + disable_notification: bool | None = False, + caption: str | None = None, + ) -> dict | None: ... + async def edit_message_text( self, chat_id: int, @@ -683,6 +766,106 @@ class TelegramClient: logger.debug("telegram.response", method=method, payload=payload) return payload.get("result") + async def _post_form( + self, + method: str, + data: dict[str, Any], + files: dict[str, Any], + ) -> Any | None: + if self._http_client is None or self._base is None: + raise RuntimeError("TelegramClient is configured without an HTTP client.") + logger.debug("telegram.request", method=method, payload=data) + try: + resp = await self._http_client.post( + f"{self._base}/{method}", data=data, files=files + ) + except httpx.HTTPError as e: + url = getattr(e.request, "url", None) + logger.error( + "telegram.network_error", + method=method, + url=str(url) if url is not None else None, + error=str(e), + error_type=e.__class__.__name__, + ) + return None + + try: + resp.raise_for_status() + except httpx.HTTPStatusError as e: + if resp.status_code == 429: + retry_after: float | None = None + try: + payload = resp.json() + except Exception: + payload = None + if isinstance(payload, dict): + retry_after = retry_after_from_payload(payload) + retry_after = 5.0 if retry_after is None else retry_after + logger.warning( + "telegram.rate_limited", + method=method, + status=resp.status_code, + url=str(resp.request.url), + retry_after=retry_after, + ) + raise TelegramRetryAfter(retry_after) from e + body = resp.text + logger.error( + "telegram.http_error", + method=method, + status=resp.status_code, + url=str(resp.request.url), + error=str(e), + body=body, + ) + return None + + try: + payload = resp.json() + except Exception as e: + body = resp.text + logger.error( + "telegram.bad_response", + method=method, + status=resp.status_code, + error=str(e), + error_type=e.__class__.__name__, + body=body, + ) + return None + + if not isinstance(payload, dict): + logger.error( + "telegram.invalid_payload", + method=method, + url=str(resp.request.url), + payload=payload, + ) + return None + + if not payload.get("ok"): + if payload.get("error_code") == 429: + retry_after = retry_after_from_payload(payload) + retry_after = 5.0 if retry_after is None else retry_after + logger.warning( + "telegram.rate_limited", + method=method, + url=str(resp.request.url), + retry_after=retry_after, + ) + raise TelegramRetryAfter(retry_after) + logger.error( + "telegram.api_error", + method=method, + url=str(resp.request.url), + payload=payload, + ) + return None + + logger.debug("telegram.response", method=method, payload=payload) + return payload.get("result") + async def get_updates( self, offset: int | None, @@ -806,6 +989,51 @@ class TelegramClient: await self.delete_message(chat_id=chat_id, message_id=replace_message_id) return result + async def send_document( + self, + chat_id: int, + filename: str, + content: bytes, + reply_to_message_id: int | None = None, + message_thread_id: int | None = None, + disable_notification: bool | None = False, + caption: str | None = None, + ) -> dict | None: + async def execute() -> dict | None: + if self._client_override is not None: + return await self._client_override.send_document( + chat_id=chat_id, + filename=filename, + content=content, + reply_to_message_id=reply_to_message_id, + message_thread_id=message_thread_id, + disable_notification=disable_notification, + caption=caption, + ) + params: dict[str, Any] = {"chat_id": chat_id} + if disable_notification is not None: + params["disable_notification"] = disable_notification + if reply_to_message_id is not None: + params["reply_to_message_id"] = reply_to_message_id + if message_thread_id is not None: + params["message_thread_id"] = message_thread_id + if caption is not None: + params["caption"] = caption + result = await self._post_form( + "sendDocument", + params, + files={"document": (filename, content)}, + ) + return result if isinstance(result, dict) else None + + return await self.enqueue_op( + key=self.unique_key("send_document"), + label="send_document", + execute=execute, + priority=SEND_PRIORITY, + chat_id=chat_id, + ) + async def edit_message_text( self, chat_id: int, diff --git a/src/takopi/telegram/files.py b/src/takopi/telegram/files.py new file mode 100644 index 0000000..7bc866a --- /dev/null +++ b/src/takopi/telegram/files.py @@ -0,0 +1,153 @@ +from __future__ import annotations + +import io +import shlex +import tempfile +import zipfile +from collections.abc import Sequence +from pathlib import Path, PurePosixPath + + +def split_command_args(text: str) -> tuple[str, ...]: + if not text.strip(): + return () + try: + return tuple(shlex.split(text)) + except ValueError: + return tuple(text.split()) + + +def file_usage() -> str: + return "usage: `/file put ` or `/file get `" + + +def file_put_usage() -> str: + return "usage: `/file put `" + + +def file_get_usage() -> str: + return "usage: `/file get `" + + +def parse_file_command(args_text: str) -> tuple[str | None, str, str | None]: + tokens = split_command_args(args_text) + if not tokens: + return None, "", file_usage() + command = tokens[0].lower() + rest = " ".join(tokens[1:]).strip() + if command not in {"put", "get"}: + return None, rest, file_usage() + return command, rest, None + + +def parse_file_prompt( + prompt: str, *, allow_empty: bool +) -> tuple[str | None, bool, str | None]: + tokens = split_command_args(prompt) + force = False + parts: list[str] = [] + for token in tokens: + if token == "--force": + force = True + continue + if token.startswith("--"): + return None, force, f"unknown flag: {token}" + parts.append(token) + path = " ".join(parts).strip() + if not path and not allow_empty: + return None, force, "missing path" + return (path or None), force, None + + +def normalize_relative_path(value: str) -> Path | None: + cleaned = value.strip() + if not cleaned: + return None + if cleaned.startswith("~"): + return None + path = Path(cleaned) + if path.is_absolute(): + return None + parts = [part for part in path.parts if part not in {"", "."}] + if not parts: + return None + if ".." in parts: + return None + if ".git" in parts: + return None + return Path(*parts) + + +def resolve_path_within_root(root: Path, rel_path: Path) -> Path | None: + root_resolved = root.resolve(strict=False) + target = (root / rel_path).resolve(strict=False) + if not target.is_relative_to(root_resolved): + return None + return target + + +def deny_reason(rel_path: Path, deny_globs: Sequence[str]) -> str | None: + if ".git" in rel_path.parts: + return ".git/**" + posix = PurePosixPath(rel_path.as_posix()) + for pattern in deny_globs: + if posix.match(pattern): + return pattern + return None + + +def format_bytes(value: int) -> str: + size = max(0.0, float(value)) + units = ("b", "kb", "mb", "gb", "tb") + for unit in units: + if size < 1024 or unit == units[-1]: + if unit == "b": + return f"{int(size)} b" + if size < 10: + return f"{size:.1f} {unit}" + return f"{size:.0f} {unit}" + size /= 1024 + return f"{int(size)} B" + + +def default_upload_name(filename: str | None, file_path: str | None) -> str: + name = Path(filename or "").name + if not name and file_path: + name = Path(file_path).name + if not name: + name = "upload.bin" + return name + + +def default_upload_path( + uploads_dir: str, filename: str | None, file_path: str | None +) -> Path: + return Path(uploads_dir) / default_upload_name(filename, file_path) + + +def write_bytes_atomic(path: Path, payload: bytes) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with tempfile.NamedTemporaryFile( + mode="wb", delete=False, dir=path.parent, prefix=".takopi-upload-" + ) as handle: + handle.write(payload) + temp_name = handle.name + Path(temp_name).replace(path) + + +def zip_directory( + root: Path, + rel_path: Path, + deny_globs: Sequence[str], +) -> bytes: + target = root / rel_path + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as archive: + for item in sorted(target.rglob("*")): + if item.is_dir(): + continue + rel_item = rel_path / item.relative_to(target) + if deny_reason(rel_item, deny_globs) is not None: + continue + archive.write(item, arcname=rel_item.as_posix()) + return buffer.getvalue() diff --git a/src/takopi/telegram/types.py b/src/takopi/telegram/types.py index 7ddc6f5..733ea12 100644 --- a/src/takopi/telegram/types.py +++ b/src/takopi/telegram/types.py @@ -13,6 +13,15 @@ class TelegramVoice: raw: dict[str, Any] +@dataclass(frozen=True, slots=True) +class TelegramDocument: + file_id: str + file_name: str | None + mime_type: str | None + file_size: int | None + raw: dict[str, Any] + + @dataclass(frozen=True, slots=True) class TelegramIncomingMessage: transport: str @@ -22,11 +31,13 @@ class TelegramIncomingMessage: reply_to_message_id: int | None reply_to_text: str | None sender_id: int | None + media_group_id: str | None = None thread_id: int | None = None is_topic_message: bool | None = None chat_type: str | None = None is_forum: bool | None = None voice: TelegramVoice | None = None + document: TelegramDocument | None = None raw: dict[str, Any] | None = None diff --git a/tests/test_telegram_backend.py b/tests/test_telegram_backend.py new file mode 100644 index 0000000..54ecb16 --- /dev/null +++ b/tests/test_telegram_backend.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any + +import pytest + +from takopi.config import ConfigError, empty_projects_config +from takopi.model import EngineId +from takopi.router import AutoRouter, RunnerEntry +from takopi.runners.mock import Return, ScriptRunner +from takopi.telegram import backend as telegram_backend +from takopi.transport_runtime import TransportRuntime + + +def test_build_startup_message_includes_missing_engines(tmp_path: Path) -> None: + codex = EngineId("codex") + pi = EngineId("pi") + runner = ScriptRunner([Return(answer="ok")], engine=codex) + missing = ScriptRunner([Return(answer="ok")], engine=pi) + router = AutoRouter( + entries=[ + RunnerEntry(engine=codex, runner=runner, available=True), + RunnerEntry(engine=pi, runner=missing, available=False, issue="missing"), + ], + default_engine=codex, + ) + runtime = TransportRuntime(router=router, projects=empty_projects_config()) + + message = telegram_backend._build_startup_message( + runtime, startup_pwd=str(tmp_path) + ) + + assert "takopi is ready" in message + assert "agents: `codex (not installed: pi)`" in message + assert "projects: `none`" in message + + +def test_telegram_backend_build_and_run_wires_config( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + config_path = tmp_path / "takopi.toml" + config_path.write_text( + 'watch_config = true\ntransport = "telegram"\n\n' + "[transports.telegram]\n" + 'bot_token = "token"\n' + "chat_id = 321\n", + encoding="utf-8", + ) + + codex = EngineId("codex") + runner = ScriptRunner([Return(answer="ok")], engine=codex) + router = AutoRouter( + entries=[RunnerEntry(engine=codex, runner=runner, available=True)], + default_engine=codex, + ) + runtime = TransportRuntime(router=router, projects=empty_projects_config()) + + captured: dict[str, Any] = {} + + async def fake_run_main_loop(cfg, **kwargs) -> None: + captured["cfg"] = cfg + captured["kwargs"] = kwargs + + class _FakeClient: + def __init__(self, token: str) -> None: + self.token = token + + async def close(self) -> None: + return None + + monkeypatch.setattr(telegram_backend, "run_main_loop", fake_run_main_loop) + monkeypatch.setattr(telegram_backend, "TelegramClient", _FakeClient) + + transport_config = { + "bot_token": "token", + "chat_id": 321, + "voice_transcription": True, + "files": {"enabled": True, "allowed_user_ids": [1, 2]}, + "topics": {"enabled": True, "scope": "main"}, + } + + telegram_backend.TelegramBackend().build_and_run( + transport_config=transport_config, + config_path=config_path, + runtime=runtime, + final_notify=False, + default_engine_override=None, + ) + + cfg = captured["cfg"] + kwargs = captured["kwargs"] + assert cfg.chat_id == 321 + assert cfg.voice_transcription is not None + assert cfg.voice_transcription.enabled is True + assert cfg.files.enabled is True + assert cfg.files.allowed_user_ids == frozenset({1, 2}) + assert cfg.topics.enabled is True + assert cfg.bot.token == "token" + assert kwargs["watch_config"] is True + assert kwargs["transport_id"] == "telegram" + + +def test_build_files_config_rejects_non_dict(tmp_path: Path) -> None: + config_path = tmp_path / "takopi.toml" + transport_config: dict[str, object] = {"files": ["nope"]} + + with pytest.raises(ConfigError, match="transports.telegram.files"): + telegram_backend._build_files_config( + transport_config, + config_path=config_path, + ) diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py index dd631e3..2da0846 100644 --- a/tests/test_telegram_bridge.py +++ b/tests/test_telegram_bridge.py @@ -10,6 +10,7 @@ import takopi.telegram.bridge as bridge from takopi.directives import parse_directives from takopi.telegram.bridge import ( TelegramBridgeConfig, + TelegramFilesConfig, TelegramPresenter, TelegramTransport, _build_bot_commands, @@ -30,7 +31,11 @@ from takopi.progress import ProgressTracker from takopi.router import AutoRouter, RunnerEntry from takopi.transport_runtime import TransportRuntime from takopi.runners.mock import Return, ScriptRunner, Sleep, Wait -from takopi.telegram.types import TelegramCallbackQuery, TelegramIncomingMessage +from takopi.telegram.types import ( + TelegramCallbackQuery, + TelegramDocument, + TelegramIncomingMessage, +) from takopi.transport import MessageRef, RenderedMessage, SendOptions from tests.plugin_fixtures import FakeEntryPoint, install_entrypoints @@ -100,6 +105,7 @@ class _FakeBot(BotClient): self.command_calls: list[dict] = [] self.callback_calls: list[dict] = [] self.send_calls: list[dict] = [] + self.document_calls: list[dict] = [] self.edit_calls: list[dict] = [] self.edit_topic_calls: list[dict[str, Any]] = [] self.delete_calls: list[dict] = [] @@ -151,6 +157,29 @@ class _FakeBot(BotClient): ) return {"message_id": 1} + async def send_document( + self, + chat_id: int, + filename: str, + content: bytes, + reply_to_message_id: int | None = None, + message_thread_id: int | None = None, + disable_notification: bool | None = False, + caption: str | None = None, + ) -> dict[str, Any]: + self.document_calls.append( + { + "chat_id": chat_id, + "filename": filename, + "content": content, + "reply_to_message_id": reply_to_message_id, + "message_thread_id": message_thread_id, + "disable_notification": disable_notification, + "caption": caption, + } + ) + return {"message_id": 2} + async def edit_message_text( self, chat_id: int, @@ -331,6 +360,7 @@ def test_build_bot_commands_includes_cancel_and_engine() -> None: commands = _build_bot_commands(runtime) assert {"command": "cancel", "description": "cancel run"} in commands + assert {"command": "file", "description": "upload or fetch files"} in commands assert any(cmd["command"] == "codex" for cmd in commands) @@ -529,6 +559,27 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None: _ = reply_markup return None + async def send_document( + self, + chat_id: int, + filename: str, + content: bytes, + reply_to_message_id: int | None = None, + message_thread_id: int | None = None, + disable_notification: bool | None = False, + caption: str | None = None, + ) -> dict | None: + _ = ( + chat_id, + filename, + content, + reply_to_message_id, + message_thread_id, + disable_notification, + caption, + ) + return None + async def edit_message_text( self, chat_id: int, @@ -715,6 +766,130 @@ async def test_handle_cancel_only_cancels_matching_progress_message() -> None: assert len(transport.send_calls) == 0 +@pytest.mark.anyio +async def test_handle_file_put_writes_file(tmp_path: Path) -> None: + payload = b"hello" + + class _FileBot(_FakeBot): + async def get_file(self, file_id: str) -> dict[str, Any] | None: + _ = file_id + return {"file_path": "files/hello.txt"} + + async def download_file(self, file_path: str) -> bytes | None: + _ = file_path + return payload + + transport = _FakeTransport() + bot = _FileBot() + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + projects = ProjectsConfig( + projects={ + "proj": ProjectConfig( + alias="proj", + path=tmp_path, + worktrees_dir=Path(".worktrees"), + ) + }, + default_project=None, + ) + runtime = TransportRuntime(router=_make_router(runner), projects=projects) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + files=TelegramFilesConfig(enabled=True), + ) + msg = TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=10, + text="", + reply_to_message_id=None, + reply_to_text=None, + sender_id=321, + chat_type="private", + document=TelegramDocument( + file_id="doc-id", + file_name="hello.txt", + mime_type="text/plain", + file_size=len(payload), + raw={"file_id": "doc-id"}, + ), + ) + + await bridge._handle_file_put(cfg, msg, "/proj uploads/hello.txt", None, None) + + target = tmp_path / "uploads" / "hello.txt" + assert target.read_bytes() == payload + assert transport.send_calls + text = transport.send_calls[-1]["message"].text + assert "saved uploads/hello.txt" in text + assert "(5 b)" in text + + +@pytest.mark.anyio +async def test_handle_file_get_sends_document_for_allowed_user( + tmp_path: Path, +) -> None: + payload = b"fetch" + target = tmp_path / "hello.txt" + target.write_bytes(payload) + + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + projects = ProjectsConfig( + projects={ + "proj": ProjectConfig( + alias="proj", + path=tmp_path, + worktrees_dir=Path(".worktrees"), + ) + }, + default_project=None, + ) + runtime = TransportRuntime(router=_make_router(runner), projects=projects) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + files=TelegramFilesConfig( + enabled=True, + allowed_user_ids=frozenset({42}), + ), + ) + msg = TelegramIncomingMessage( + transport="telegram", + chat_id=-100, + message_id=10, + text="", + reply_to_message_id=None, + reply_to_text=None, + sender_id=42, + chat_type="supergroup", + ) + + await bridge._handle_file_get(cfg, msg, "/proj hello.txt", None, None) + + assert bot.document_calls + assert bot.document_calls[0]["filename"] == "hello.txt" + assert bot.document_calls[0]["content"] == payload + + @pytest.mark.anyio async def test_handle_callback_cancel_cancels_running_task() -> None: transport = _FakeTransport() @@ -1169,6 +1344,122 @@ async def test_run_main_loop_replies_in_same_thread() -> None: assert all(call["options"].thread_id == 77 for call in reply_calls) +@pytest.mark.anyio +async def test_run_main_loop_batches_media_group_upload( + tmp_path: Path, +) -> None: + payloads = { + "photos/file_1.jpg": b"one", + "photos/file_2.jpg": b"two", + } + file_map = { + "doc-1": "photos/file_1.jpg", + "doc-2": "photos/file_2.jpg", + } + + class _MediaBot(_FakeBot): + async def get_file(self, file_id: str) -> dict[str, Any] | None: + file_path = file_map.get(file_id) + if file_path is None: + return None + return {"file_path": file_path} + + async def download_file(self, file_path: str) -> bytes | None: + return payloads.get(file_path) + + transport = _FakeTransport() + bot = _MediaBot() + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + projects = ProjectsConfig( + projects={ + "proj": ProjectConfig( + alias="proj", + path=tmp_path, + worktrees_dir=Path(".worktrees"), + ) + }, + default_project=None, + ) + runtime = TransportRuntime(router=_make_router(runner), projects=projects) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + files=TelegramFilesConfig(enabled=True, auto_put=True), + ) + msg1 = TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=1, + text="/file put /proj incoming/test1", + reply_to_message_id=None, + reply_to_text=None, + sender_id=321, + chat_type="private", + media_group_id="grp-1", + document=TelegramDocument( + file_id="doc-1", + file_name=None, + mime_type="image/jpeg", + file_size=len(payloads["photos/file_1.jpg"]), + raw={"file_id": "doc-1"}, + ), + ) + msg2 = TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=2, + text="", + reply_to_message_id=None, + reply_to_text=None, + sender_id=321, + chat_type="private", + media_group_id="grp-1", + document=TelegramDocument( + file_id="doc-2", + file_name=None, + mime_type="image/jpeg", + file_size=len(payloads["photos/file_2.jpg"]), + raw={"file_id": "doc-2"}, + ), + ) + + stop_polling = anyio.Event() + + async def poller(_cfg: TelegramBridgeConfig): + yield msg1 + yield msg2 + await stop_polling.wait() + + async with anyio.create_task_group() as tg: + tg.start_soon(run_main_loop, cfg, poller) + try: + with anyio.fail_after(3): + while len(transport.send_calls) < 1: + await anyio.sleep(0.05) + assert len(transport.send_calls) == 1 + text = transport.send_calls[0]["message"].text + assert "saved file_1.jpg, file_2.jpg" in text + assert "to incoming/test1/" in text + target_dir = tmp_path / "incoming" / "test1" + assert (target_dir / "file_1.jpg").read_bytes() == payloads[ + "photos/file_1.jpg" + ] + assert (target_dir / "file_2.jpg").read_bytes() == payloads[ + "photos/file_2.jpg" + ] + finally: + stop_polling.set() + tg.cancel_scope.cancel() + + @pytest.mark.anyio async def test_run_main_loop_handles_command_plugins(monkeypatch) -> None: class _Command: diff --git a/tests/test_telegram_incoming.py b/tests/test_telegram_incoming.py index 6b08a10..ee53ce3 100644 --- a/tests/test_telegram_incoming.py +++ b/tests/test_telegram_incoming.py @@ -32,6 +32,7 @@ def test_parse_incoming_update_maps_fields() -> None: assert msg.chat_type == "supergroup" assert msg.is_forum is True assert msg.voice is None + assert msg.document is None assert msg.raw == update["message"] @@ -51,7 +52,11 @@ def test_parse_incoming_update_filters_non_matching_chat() -> None: def test_parse_incoming_update_filters_non_text_and_non_voice() -> None: update = { "update_id": 1, - "message": {"message_id": 10, "chat": {"id": 123}, "photo": []}, + "message": { + "message_id": 10, + "chat": {"id": 123}, + "location": {"latitude": 1.0, "longitude": 2.0}, + }, } assert parse_incoming_update(update, chat_id=123) is None @@ -84,6 +89,148 @@ def test_parse_incoming_update_voice_message() -> None: assert msg.voice.duration == 3 +def test_parse_incoming_update_document_message() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "caption": "/file put incoming/doc.txt", + "chat": {"id": 123}, + "document": { + "file_id": "doc-id", + "file_unique_id": "uniq", + "file_name": "doc.txt", + "mime_type": "text/plain", + "file_size": 4321, + }, + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert isinstance(msg, TelegramIncomingMessage) + assert msg.text == "/file put incoming/doc.txt" + assert msg.document is not None + assert msg.document.file_id == "doc-id" + assert msg.document.file_name == "doc.txt" + assert msg.document.mime_type == "text/plain" + assert msg.document.file_size == 4321 + + +def test_parse_incoming_update_photo_message() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "caption": "/file put incoming/photo.jpg", + "chat": {"id": 123}, + "photo": [ + { + "file_id": "small", + "file_unique_id": "uniq-small", + "file_size": 100, + "width": 90, + "height": 90, + }, + { + "file_id": "large", + "file_unique_id": "uniq-large", + "file_size": 1000, + "width": 800, + "height": 600, + }, + ], + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert isinstance(msg, TelegramIncomingMessage) + assert msg.text == "/file put incoming/photo.jpg" + assert msg.document is not None + assert msg.document.file_id == "large" + assert msg.document.file_name is None + assert msg.document.file_size == 1000 + + +def test_parse_incoming_update_media_group_id() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "chat": {"id": 123}, + "media_group_id": "group-1", + "photo": [ + { + "file_id": "large", + "file_unique_id": "uniq-large", + "file_size": 1000, + "width": 800, + "height": 600, + } + ], + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert isinstance(msg, TelegramIncomingMessage) + assert msg.media_group_id == "group-1" + + +def test_parse_incoming_update_video_message() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "caption": "/file put incoming/video.mp4", + "chat": {"id": 123}, + "video": { + "file_id": "video-id", + "file_unique_id": "uniq", + "file_name": "video.mp4", + "mime_type": "video/mp4", + "file_size": 4242, + }, + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert isinstance(msg, TelegramIncomingMessage) + assert msg.text == "/file put incoming/video.mp4" + assert msg.document is not None + assert msg.document.file_id == "video-id" + assert msg.document.file_name == "video.mp4" + assert msg.document.mime_type == "video/mp4" + assert msg.document.file_size == 4242 + + +def test_parse_incoming_update_sticker_message() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "chat": {"id": 123}, + "sticker": { + "file_id": "sticker-id", + "file_unique_id": "uniq", + "file_size": 2468, + }, + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert isinstance(msg, TelegramIncomingMessage) + assert msg.text == "" + assert msg.document is not None + assert msg.document.file_id == "sticker-id" + assert msg.document.file_name is None + assert msg.document.mime_type is None + assert msg.document.file_size == 2468 + + def test_parse_incoming_update_callback_query() -> None: update = { "update_id": 1, diff --git a/tests/test_telegram_queue.py b/tests/test_telegram_queue.py index d401712..55279c1 100644 --- a/tests/test_telegram_queue.py +++ b/tests/test_telegram_queue.py @@ -40,6 +40,28 @@ class _FakeBot(BotClient): self.calls.append("send_message") return {"message_id": 1} + async def send_document( + self, + chat_id: int, + filename: str, + content: bytes, + reply_to_message_id: int | None = None, + message_thread_id: int | None = None, + disable_notification: bool | None = False, + caption: str | None = None, + ) -> dict[str, Any]: + _ = ( + chat_id, + filename, + content, + reply_to_message_id, + message_thread_id, + disable_notification, + caption, + ) + self.calls.append("send_document") + return {"message_id": 1} + async def edit_message_text( self, chat_id: int,