From 7825dd73a90bfc98c10700b4f0a80e3565c70659 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Mon, 12 Jan 2026 18:46:32 +0400 Subject: [PATCH] feat(files): add prompt auto-put mode for telegram uploads (#97) --- docs/user-guide.md | 5 + src/takopi/settings.py | 1 + src/takopi/telegram/commands.py | 292 +++++++++++++++++++++++--------- src/takopi/telegram/loop.py | 82 ++++++++- tests/test_telegram_backend.py | 1 + 5 files changed, 289 insertions(+), 92 deletions(-) diff --git a/docs/user-guide.md b/docs/user-guide.md index 5f19dae..51733f2 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -372,6 +372,9 @@ If you send a file **without a caption**, takopi saves it to: incoming/ ``` +If you set `auto_put_mode = "prompt"`, a caption/question will start a run +immediately after upload, with the prompt annotated with the uploaded path. + Use `--force` to overwrite an existing file: ``` @@ -394,6 +397,7 @@ Directories are zipped automatically. [transports.telegram.files] enabled = true auto_put = true +auto_put_mode = "upload" uploads_dir = "incoming" allowed_user_ids = [123456789] deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] @@ -425,6 +429,7 @@ voice_transcription = true [transports.telegram.files] enabled = true auto_put = true +auto_put_mode = "upload" uploads_dir = "incoming" allowed_user_ids = [123456789] deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] diff --git a/src/takopi/settings.py b/src/takopi/settings.py index 984e5fa..75953de 100644 --- a/src/takopi/settings.py +++ b/src/takopi/settings.py @@ -64,6 +64,7 @@ class TelegramFilesSettings(BaseModel): enabled: bool = False auto_put: bool = True + auto_put_mode: Literal["upload", "prompt"] = "upload" uploads_dir: NonEmptyStr = "incoming" allowed_user_ids: list[StrictInt] = Field(default_factory=list) deny_globs: list[NonEmptyStr] = Field( diff --git a/src/takopi/telegram/commands.py b/src/takopi/telegram/commands.py index ca1a3e4..de16f4e 100644 --- a/src/takopi/telegram/commands.py +++ b/src/takopi/telegram/commands.py @@ -238,6 +238,21 @@ class _FilePutResult: error: str | None +@dataclass(slots=True) +class _SavedFilePut: + context: RunContext | None + rel_path: Path + size: int + + +@dataclass(slots=True) +class _SavedFilePutGroup: + context: RunContext | None + base_dir: Path | None + saved: list[_FilePutResult] + failed: list[_FilePutResult] + + def resolve_file_put_paths( plan: _FilePutPlan, *, @@ -355,6 +370,17 @@ async def _prepare_file_put_plan( ) +def _format_file_put_failures(failed: Sequence[_FilePutResult]) -> str | None: + if not failed: + return None + errors = ", ".join( + f"`{item.name}` ({item.error})" for item in failed if item.error is not None + ) + if not errors: + return None + return f"failed: {errors}" + + async def _save_document_payload( cfg, *, @@ -489,6 +515,62 @@ async def _handle_file_put_default( await _handle_file_put(cfg, msg, "", ambient_context, topic_store) +async def _save_file_put( + cfg, + msg: TelegramIncomingMessage, + args_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> _SavedFilePut | None: + reply = partial( + send_plain, + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + thread_id=msg.thread_id, + ) + document = msg.document + if document is None: + await reply(text=FILE_PUT_USAGE) + return None + plan = await _prepare_file_put_plan( + cfg, + msg, + args_text, + ambient_context, + topic_store, + ) + if plan is None: + return None + base_dir, rel_path, error = resolve_file_put_paths( + plan, + cfg=cfg, + require_dir=False, + ) + if error is not None: + await reply(text=error) + return None + result = await _save_document_payload( + cfg, + document=document, + run_root=plan.run_root, + rel_path=rel_path, + base_dir=base_dir, + force=plan.force, + ) + if result.error is not None: + await reply(text=result.error) + return None + if result.rel_path is None or result.size is None: + await reply(text="failed to save file.") + return None + return _SavedFilePut( + context=plan.resolved.context, + rel_path=result.rel_path, + size=result.size, + ) + + async def _handle_file_put( cfg, msg: TelegramIncomingMessage, @@ -503,46 +585,20 @@ async def _handle_file_put( user_msg_id=msg.message_id, thread_id=msg.thread_id, ) - document = msg.document - if document is None: - await reply(text=FILE_PUT_USAGE) - return - plan = await _prepare_file_put_plan( + saved = await _save_file_put( cfg, msg, args_text, ambient_context, topic_store, ) - if plan is None: + if saved is None: return - base_dir, rel_path, error = resolve_file_put_paths( - plan, - cfg=cfg, - require_dir=False, - ) - if error is not None: - await reply(text=error) - return - result = await _save_document_payload( - cfg, - document=document, - run_root=plan.run_root, - rel_path=rel_path, - base_dir=base_dir, - force=plan.force, - ) - if result.error is not None: - await reply(text=result.error) - return - if result.rel_path is None or result.size is None: - await reply(text="failed to save file.") - return - context_label = _format_context(cfg.runtime, plan.resolved.context) + context_label = _format_context(cfg.runtime, saved.context) await reply( text=( - f"saved `{result.rel_path.as_posix()}` " - f"in `{context_label}` ({format_bytes(result.size)})" + f"saved `{saved.rel_path.as_posix()}` " + f"in `{context_label}` ({format_bytes(saved.size)})" ), ) @@ -562,51 +618,25 @@ async def _handle_file_put_group( user_msg_id=msg.message_id, thread_id=msg.thread_id, ) - documents = [item.document for item in messages if item.document is not None] - if not documents: - await reply(text=FILE_PUT_USAGE) - return - plan = await _prepare_file_put_plan( + saved_group = await _save_file_put_group( cfg, msg, args_text, + messages, ambient_context, topic_store, ) - if plan is None: + if saved_group is None: return - base_dir, _, error = resolve_file_put_paths( - plan, - cfg=cfg, - require_dir=True, - ) - if error is not None: - await reply(text=error) - return - saved: list[_FilePutResult] = [] - failed: list[_FilePutResult] = [] - for document in documents: - result = await _save_document_payload( - cfg, - document=document, - run_root=plan.run_root, - rel_path=None, - base_dir=base_dir, - force=plan.force, - ) - if result.error is None: - saved.append(result) - else: - failed.append(result) - context_label = _format_context(cfg.runtime, plan.resolved.context) - total_bytes = sum(item.size or 0 for item in saved) - dir_label: Path | None = base_dir - if dir_label is None and saved: - first_path = saved[0].rel_path + context_label = _format_context(cfg.runtime, saved_group.context) + total_bytes = sum(item.size or 0 for item in saved_group.saved) + dir_label: Path | None = saved_group.base_dir + if dir_label is None and saved_group.saved: + first_path = saved_group.saved[0].rel_path if first_path is not None: dir_label = first_path.parent - if saved: - saved_names = ", ".join(f"`{item.name}`" for item in saved) + if saved_group.saved: + saved_names = ", ".join(f"`{item.name}`" for item in saved_group.saved) if dir_label is not None: dir_text = dir_label.as_posix() if not dir_text.endswith("/"): @@ -622,19 +652,79 @@ async def _handle_file_put_group( ) else: text = "failed to upload files." - if failed: - errors = ", ".join( - f"`{item.name}` ({item.error})" for item in failed if item.error is not None - ) - if errors: - text = f"{text}\n\nfailed: {errors}" + failure_text = _format_file_put_failures(saved_group.failed) + if failure_text is not None: + text = f"{text}\n\n{failure_text}" await reply(text=text) +async def _save_file_put_group( + cfg, + msg: TelegramIncomingMessage, + args_text: str, + messages: Sequence[TelegramIncomingMessage], + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, +) -> _SavedFilePutGroup | None: + reply = partial( + send_plain, + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + thread_id=msg.thread_id, + ) + documents = [item.document for item in messages if item.document is not None] + if not documents: + await reply(text=FILE_PUT_USAGE) + return None + plan = await _prepare_file_put_plan( + cfg, + msg, + args_text, + ambient_context, + topic_store, + ) + if plan is None: + return None + base_dir, _, error = resolve_file_put_paths( + plan, + cfg=cfg, + require_dir=True, + ) + if error is not None: + await reply(text=error) + return None + saved: list[_FilePutResult] = [] + failed: list[_FilePutResult] = [] + for document in documents: + result = await _save_document_payload( + cfg, + document=document, + run_root=plan.run_root, + rel_path=None, + base_dir=base_dir, + force=plan.force, + ) + if result.error is None: + saved.append(result) + else: + failed.append(result) + return _SavedFilePutGroup( + context=plan.resolved.context, + base_dir=base_dir, + saved=saved, + failed=failed, + ) + + async def _handle_media_group( cfg, messages: Sequence[TelegramIncomingMessage], topic_store: TopicStateStore | None, + run_prompt: Callable[ + [TelegramIncomingMessage, str, RunContext | None], Awaitable[None] + ] + | None = None, ) -> None: if not messages: return @@ -676,16 +766,52 @@ async def _handle_media_group( topic_store, ) return - if cfg.files.enabled and cfg.files.auto_put and not command_msg.text.strip(): - await _handle_file_put_group( - cfg, - command_msg, - "", - ordered, - ambient_context, - topic_store, - ) - return + if cfg.files.enabled and cfg.files.auto_put: + caption_text = command_msg.text.strip() + if cfg.files.auto_put_mode == "prompt" and caption_text: + saved_group = await _save_file_put_group( + cfg, + command_msg, + "", + ordered, + ambient_context, + topic_store, + ) + if saved_group is None: + return + if not saved_group.saved: + failure_text = _format_file_put_failures(saved_group.failed) + text = "failed to upload files." + if failure_text is not None: + text = f"{text}\n\n{failure_text}" + await reply(text=text) + return + if saved_group.failed: + failure_text = _format_file_put_failures(saved_group.failed) + if failure_text is not None: + await reply(text=f"some files failed to upload.\n\n{failure_text}") + if run_prompt is None: + await reply(text=FILE_PUT_USAGE) + return + paths = [ + item.rel_path.as_posix() + for item in saved_group.saved + if item.rel_path is not None + ] + files_text = "\n".join(f"- {path}" for path in paths) + prompt = f"{caption_text}\n\n[uploaded files]\n{files_text}" + await run_prompt(command_msg, prompt, saved_group.context) + return + if not caption_text: + await _handle_file_put_group( + cfg, + command_msg, + "", + ordered, + ambient_context, + topic_store, + ) + return await reply(text=FILE_PUT_USAGE) diff --git a/src/takopi/telegram/loop.py b/src/takopi/telegram/loop.py index 8a591f8..1ad3a3e 100644 --- a/src/takopi/telegram/loop.py +++ b/src/takopi/telegram/loop.py @@ -30,6 +30,7 @@ from .commands import ( _handle_topic_command, _parse_slash_command, _reserved_commands, + _save_file_put, _run_engine, _set_command_menu, handle_callback_cancel, @@ -430,6 +431,50 @@ async def run_main_loop( scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job) + async def run_prompt_from_upload( + msg: TelegramIncomingMessage, + prompt_text: str, + context: RunContext | None, + ) -> None: + reply_ref = ( + MessageRef( + channel_id=msg.chat_id, + message_id=msg.reply_to_message_id, + ) + if msg.reply_to_message_id is not None + else None + ) + await run_job( + msg.chat_id, + msg.message_id, + prompt_text, + None, + context, + msg.thread_id, + reply_ref, + scheduler.note_thread_known, + ) + + async def handle_prompt_upload( + msg: TelegramIncomingMessage, + caption_text: str, + ambient_context: RunContext | None, + topic_store: TopicStateStore | None, + ) -> None: + saved = await _save_file_put( + cfg, + msg, + "", + ambient_context, + topic_store, + ) + if saved is None: + return + prompt = ( + f"{caption_text}\n\n[uploaded file: {saved.rel_path.as_posix()}]" + ) + await run_prompt_from_upload(msg, prompt, saved.context) + async def flush_media_group(key: tuple[int, str]) -> None: while True: state = media_groups.get(key) @@ -444,7 +489,12 @@ async def run_main_loop( continue messages = list(state.messages) del media_groups[key] - await _handle_media_group(cfg, messages, topic_store) + await _handle_media_group( + cfg, + messages, + topic_store, + run_prompt_from_upload, + ) return async for msg in poller(cfg): @@ -535,14 +585,28 @@ async def run_main_loop( ): continue if msg.document is not None: - if cfg.files.enabled and cfg.files.auto_put and not text.strip(): - tg.start_soon( - _handle_file_put_default, - cfg, - msg, - ambient_context, - topic_store, - ) + if cfg.files.enabled and cfg.files.auto_put: + caption_text = text.strip() + if cfg.files.auto_put_mode == "prompt" and caption_text: + tg.start_soon( + handle_prompt_upload, + msg, + caption_text, + ambient_context, + topic_store, + ) + elif not caption_text: + tg.start_soon( + _handle_file_put_default, + cfg, + msg, + ambient_context, + topic_store, + ) + else: + tg.start_soon( + partial(reply, text=FILE_PUT_USAGE), + ) elif cfg.files.enabled: tg.start_soon( partial(reply, text=FILE_PUT_USAGE), diff --git a/tests/test_telegram_backend.py b/tests/test_telegram_backend.py index e3cb602..a67da12 100644 --- a/tests/test_telegram_backend.py +++ b/tests/test_telegram_backend.py @@ -165,5 +165,6 @@ def test_telegram_files_settings_defaults() -> None: assert cfg.enabled is False assert cfg.auto_put is True + assert cfg.auto_put_mode == "upload" assert cfg.uploads_dir == "incoming" assert cfg.allowed_user_ids == []