feat(files): add prompt auto-put mode for telegram uploads (#97)

This commit is contained in:
banteg
2026-01-12 18:46:32 +04:00
committed by GitHub
parent 9d3fd1a515
commit 7825dd73a9
5 changed files with 289 additions and 92 deletions
+5
View File
@@ -372,6 +372,9 @@ If you send a file **without a caption**, takopi saves it to:
incoming/<original_filename> incoming/<original_filename>
``` ```
If you set `auto_put_mode = "prompt"`, a caption/question will start a run
immediately after upload, with the prompt annotated with the uploaded path.
Use `--force` to overwrite an existing file: Use `--force` to overwrite an existing file:
``` ```
@@ -394,6 +397,7 @@ Directories are zipped automatically.
[transports.telegram.files] [transports.telegram.files]
enabled = true enabled = true
auto_put = true auto_put = true
auto_put_mode = "upload"
uploads_dir = "incoming" uploads_dir = "incoming"
allowed_user_ids = [123456789] allowed_user_ids = [123456789]
deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"]
@@ -425,6 +429,7 @@ voice_transcription = true
[transports.telegram.files] [transports.telegram.files]
enabled = true enabled = true
auto_put = true auto_put = true
auto_put_mode = "upload"
uploads_dir = "incoming" uploads_dir = "incoming"
allowed_user_ids = [123456789] allowed_user_ids = [123456789]
deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"] deny_globs = [".git/**", ".env", ".envrc", "**/*.pem", "**/.ssh/**"]
+1
View File
@@ -64,6 +64,7 @@ class TelegramFilesSettings(BaseModel):
enabled: bool = False enabled: bool = False
auto_put: bool = True auto_put: bool = True
auto_put_mode: Literal["upload", "prompt"] = "upload"
uploads_dir: NonEmptyStr = "incoming" uploads_dir: NonEmptyStr = "incoming"
allowed_user_ids: list[StrictInt] = Field(default_factory=list) allowed_user_ids: list[StrictInt] = Field(default_factory=list)
deny_globs: list[NonEmptyStr] = Field( deny_globs: list[NonEmptyStr] = Field(
+209 -83
View File
@@ -238,6 +238,21 @@ class _FilePutResult:
error: str | None error: str | None
@dataclass(slots=True)
class _SavedFilePut:
context: RunContext | None
rel_path: Path
size: int
@dataclass(slots=True)
class _SavedFilePutGroup:
context: RunContext | None
base_dir: Path | None
saved: list[_FilePutResult]
failed: list[_FilePutResult]
def resolve_file_put_paths( def resolve_file_put_paths(
plan: _FilePutPlan, plan: _FilePutPlan,
*, *,
@@ -355,6 +370,17 @@ async def _prepare_file_put_plan(
) )
def _format_file_put_failures(failed: Sequence[_FilePutResult]) -> str | None:
if not failed:
return None
errors = ", ".join(
f"`{item.name}` ({item.error})" for item in failed if item.error is not None
)
if not errors:
return None
return f"failed: {errors}"
async def _save_document_payload( async def _save_document_payload(
cfg, cfg,
*, *,
@@ -489,6 +515,62 @@ async def _handle_file_put_default(
await _handle_file_put(cfg, msg, "", ambient_context, topic_store) await _handle_file_put(cfg, msg, "", ambient_context, topic_store)
async def _save_file_put(
cfg,
msg: TelegramIncomingMessage,
args_text: str,
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
) -> _SavedFilePut | None:
reply = partial(
send_plain,
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
thread_id=msg.thread_id,
)
document = msg.document
if document is None:
await reply(text=FILE_PUT_USAGE)
return None
plan = await _prepare_file_put_plan(
cfg,
msg,
args_text,
ambient_context,
topic_store,
)
if plan is None:
return None
base_dir, rel_path, error = resolve_file_put_paths(
plan,
cfg=cfg,
require_dir=False,
)
if error is not None:
await reply(text=error)
return None
result = await _save_document_payload(
cfg,
document=document,
run_root=plan.run_root,
rel_path=rel_path,
base_dir=base_dir,
force=plan.force,
)
if result.error is not None:
await reply(text=result.error)
return None
if result.rel_path is None or result.size is None:
await reply(text="failed to save file.")
return None
return _SavedFilePut(
context=plan.resolved.context,
rel_path=result.rel_path,
size=result.size,
)
async def _handle_file_put( async def _handle_file_put(
cfg, cfg,
msg: TelegramIncomingMessage, msg: TelegramIncomingMessage,
@@ -503,46 +585,20 @@ async def _handle_file_put(
user_msg_id=msg.message_id, user_msg_id=msg.message_id,
thread_id=msg.thread_id, thread_id=msg.thread_id,
) )
document = msg.document saved = await _save_file_put(
if document is None:
await reply(text=FILE_PUT_USAGE)
return
plan = await _prepare_file_put_plan(
cfg, cfg,
msg, msg,
args_text, args_text,
ambient_context, ambient_context,
topic_store, topic_store,
) )
if plan is None: if saved is None:
return return
base_dir, rel_path, error = resolve_file_put_paths( context_label = _format_context(cfg.runtime, saved.context)
plan,
cfg=cfg,
require_dir=False,
)
if error is not None:
await reply(text=error)
return
result = await _save_document_payload(
cfg,
document=document,
run_root=plan.run_root,
rel_path=rel_path,
base_dir=base_dir,
force=plan.force,
)
if result.error is not None:
await reply(text=result.error)
return
if result.rel_path is None or result.size is None:
await reply(text="failed to save file.")
return
context_label = _format_context(cfg.runtime, plan.resolved.context)
await reply( await reply(
text=( text=(
f"saved `{result.rel_path.as_posix()}` " f"saved `{saved.rel_path.as_posix()}` "
f"in `{context_label}` ({format_bytes(result.size)})" f"in `{context_label}` ({format_bytes(saved.size)})"
), ),
) )
@@ -562,51 +618,25 @@ async def _handle_file_put_group(
user_msg_id=msg.message_id, user_msg_id=msg.message_id,
thread_id=msg.thread_id, thread_id=msg.thread_id,
) )
documents = [item.document for item in messages if item.document is not None] saved_group = await _save_file_put_group(
if not documents:
await reply(text=FILE_PUT_USAGE)
return
plan = await _prepare_file_put_plan(
cfg, cfg,
msg, msg,
args_text, args_text,
messages,
ambient_context, ambient_context,
topic_store, topic_store,
) )
if plan is None: if saved_group is None:
return return
base_dir, _, error = resolve_file_put_paths( context_label = _format_context(cfg.runtime, saved_group.context)
plan, total_bytes = sum(item.size or 0 for item in saved_group.saved)
cfg=cfg, dir_label: Path | None = saved_group.base_dir
require_dir=True, if dir_label is None and saved_group.saved:
) first_path = saved_group.saved[0].rel_path
if error is not None:
await reply(text=error)
return
saved: list[_FilePutResult] = []
failed: list[_FilePutResult] = []
for document in documents:
result = await _save_document_payload(
cfg,
document=document,
run_root=plan.run_root,
rel_path=None,
base_dir=base_dir,
force=plan.force,
)
if result.error is None:
saved.append(result)
else:
failed.append(result)
context_label = _format_context(cfg.runtime, plan.resolved.context)
total_bytes = sum(item.size or 0 for item in saved)
dir_label: Path | None = base_dir
if dir_label is None and saved:
first_path = saved[0].rel_path
if first_path is not None: if first_path is not None:
dir_label = first_path.parent dir_label = first_path.parent
if saved: if saved_group.saved:
saved_names = ", ".join(f"`{item.name}`" for item in saved) saved_names = ", ".join(f"`{item.name}`" for item in saved_group.saved)
if dir_label is not None: if dir_label is not None:
dir_text = dir_label.as_posix() dir_text = dir_label.as_posix()
if not dir_text.endswith("/"): if not dir_text.endswith("/"):
@@ -622,19 +652,79 @@ async def _handle_file_put_group(
) )
else: else:
text = "failed to upload files." text = "failed to upload files."
if failed: failure_text = _format_file_put_failures(saved_group.failed)
errors = ", ".join( if failure_text is not None:
f"`{item.name}` ({item.error})" for item in failed if item.error is not None text = f"{text}\n\n{failure_text}"
)
if errors:
text = f"{text}\n\nfailed: {errors}"
await reply(text=text) await reply(text=text)
async def _save_file_put_group(
cfg,
msg: TelegramIncomingMessage,
args_text: str,
messages: Sequence[TelegramIncomingMessage],
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
) -> _SavedFilePutGroup | None:
reply = partial(
send_plain,
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
thread_id=msg.thread_id,
)
documents = [item.document for item in messages if item.document is not None]
if not documents:
await reply(text=FILE_PUT_USAGE)
return None
plan = await _prepare_file_put_plan(
cfg,
msg,
args_text,
ambient_context,
topic_store,
)
if plan is None:
return None
base_dir, _, error = resolve_file_put_paths(
plan,
cfg=cfg,
require_dir=True,
)
if error is not None:
await reply(text=error)
return None
saved: list[_FilePutResult] = []
failed: list[_FilePutResult] = []
for document in documents:
result = await _save_document_payload(
cfg,
document=document,
run_root=plan.run_root,
rel_path=None,
base_dir=base_dir,
force=plan.force,
)
if result.error is None:
saved.append(result)
else:
failed.append(result)
return _SavedFilePutGroup(
context=plan.resolved.context,
base_dir=base_dir,
saved=saved,
failed=failed,
)
async def _handle_media_group( async def _handle_media_group(
cfg, cfg,
messages: Sequence[TelegramIncomingMessage], messages: Sequence[TelegramIncomingMessage],
topic_store: TopicStateStore | None, topic_store: TopicStateStore | None,
run_prompt: Callable[
[TelegramIncomingMessage, str, RunContext | None], Awaitable[None]
]
| None = None,
) -> None: ) -> None:
if not messages: if not messages:
return return
@@ -676,16 +766,52 @@ async def _handle_media_group(
topic_store, topic_store,
) )
return return
if cfg.files.enabled and cfg.files.auto_put and not command_msg.text.strip(): if cfg.files.enabled and cfg.files.auto_put:
await _handle_file_put_group( caption_text = command_msg.text.strip()
cfg, if cfg.files.auto_put_mode == "prompt" and caption_text:
command_msg, saved_group = await _save_file_put_group(
"", cfg,
ordered, command_msg,
ambient_context, "",
topic_store, ordered,
) ambient_context,
return topic_store,
)
if saved_group is None:
return
if not saved_group.saved:
failure_text = _format_file_put_failures(saved_group.failed)
text = "failed to upload files."
if failure_text is not None:
text = f"{text}\n\n{failure_text}"
await reply(text=text)
return
if saved_group.failed:
failure_text = _format_file_put_failures(saved_group.failed)
if failure_text is not None:
await reply(text=f"some files failed to upload.\n\n{failure_text}")
if run_prompt is None:
await reply(text=FILE_PUT_USAGE)
return
paths = [
item.rel_path.as_posix()
for item in saved_group.saved
if item.rel_path is not None
]
files_text = "\n".join(f"- {path}" for path in paths)
prompt = f"{caption_text}\n\n[uploaded files]\n{files_text}"
await run_prompt(command_msg, prompt, saved_group.context)
return
if not caption_text:
await _handle_file_put_group(
cfg,
command_msg,
"",
ordered,
ambient_context,
topic_store,
)
return
await reply(text=FILE_PUT_USAGE) await reply(text=FILE_PUT_USAGE)
+73 -9
View File
@@ -30,6 +30,7 @@ from .commands import (
_handle_topic_command, _handle_topic_command,
_parse_slash_command, _parse_slash_command,
_reserved_commands, _reserved_commands,
_save_file_put,
_run_engine, _run_engine,
_set_command_menu, _set_command_menu,
handle_callback_cancel, handle_callback_cancel,
@@ -430,6 +431,50 @@ async def run_main_loop(
scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job) scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job)
async def run_prompt_from_upload(
msg: TelegramIncomingMessage,
prompt_text: str,
context: RunContext | None,
) -> None:
reply_ref = (
MessageRef(
channel_id=msg.chat_id,
message_id=msg.reply_to_message_id,
)
if msg.reply_to_message_id is not None
else None
)
await run_job(
msg.chat_id,
msg.message_id,
prompt_text,
None,
context,
msg.thread_id,
reply_ref,
scheduler.note_thread_known,
)
async def handle_prompt_upload(
msg: TelegramIncomingMessage,
caption_text: str,
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
) -> None:
saved = await _save_file_put(
cfg,
msg,
"",
ambient_context,
topic_store,
)
if saved is None:
return
prompt = (
f"{caption_text}\n\n[uploaded file: {saved.rel_path.as_posix()}]"
)
await run_prompt_from_upload(msg, prompt, saved.context)
async def flush_media_group(key: tuple[int, str]) -> None: async def flush_media_group(key: tuple[int, str]) -> None:
while True: while True:
state = media_groups.get(key) state = media_groups.get(key)
@@ -444,7 +489,12 @@ async def run_main_loop(
continue continue
messages = list(state.messages) messages = list(state.messages)
del media_groups[key] del media_groups[key]
await _handle_media_group(cfg, messages, topic_store) await _handle_media_group(
cfg,
messages,
topic_store,
run_prompt_from_upload,
)
return return
async for msg in poller(cfg): async for msg in poller(cfg):
@@ -535,14 +585,28 @@ async def run_main_loop(
): ):
continue continue
if msg.document is not None: if msg.document is not None:
if cfg.files.enabled and cfg.files.auto_put and not text.strip(): if cfg.files.enabled and cfg.files.auto_put:
tg.start_soon( caption_text = text.strip()
_handle_file_put_default, if cfg.files.auto_put_mode == "prompt" and caption_text:
cfg, tg.start_soon(
msg, handle_prompt_upload,
ambient_context, msg,
topic_store, caption_text,
) ambient_context,
topic_store,
)
elif not caption_text:
tg.start_soon(
_handle_file_put_default,
cfg,
msg,
ambient_context,
topic_store,
)
else:
tg.start_soon(
partial(reply, text=FILE_PUT_USAGE),
)
elif cfg.files.enabled: elif cfg.files.enabled:
tg.start_soon( tg.start_soon(
partial(reply, text=FILE_PUT_USAGE), partial(reply, text=FILE_PUT_USAGE),
+1
View File
@@ -165,5 +165,6 @@ def test_telegram_files_settings_defaults() -> None:
assert cfg.enabled is False assert cfg.enabled is False
assert cfg.auto_put is True assert cfg.auto_put is True
assert cfg.auto_put_mode == "upload"
assert cfg.uploads_dir == "incoming" assert cfg.uploads_dir == "incoming"
assert cfg.allowed_user_ids == [] assert cfg.allowed_user_ids == []