diff --git a/docs/public-api.md b/docs/public-api.md index b743076..05a9694 100644 --- a/docs/public-api.md +++ b/docs/public-api.md @@ -183,6 +183,15 @@ Command handlers receive a `CommandContext` with: Use `ctx.executor.run_one(...)` or `ctx.executor.run_many(...)` to reuse Takopi's engine pipeline. Use `mode="capture"` to collect results and build a custom reply. +`ctx.message` and `ctx.reply_to` are `MessageRef` objects with: + +- `channel_id` (`int | str`, chat/channel id) +- `message_id` (`int | str`, message id) +- `thread_id` (`int | str | None`; set when the transport supports threads, like Telegram topics) +- `raw` (transport-specific payload, may be `None`) + +Example: key per-thread state by `(ctx.message.channel_id, ctx.message.thread_id)`. + --- ## TransportRuntime helpers @@ -228,6 +237,7 @@ async def on_message(...): message_id=..., text=..., reply_to=..., + thread_id=..., ) await handle_message( exec_cfg, diff --git a/src/takopi/runner_bridge.py b/src/takopi/runner_bridge.py index c3b7d59..6b95a24 100644 --- a/src/takopi/runner_bridge.py +++ b/src/takopi/runner_bridge.py @@ -19,6 +19,7 @@ from .transport import ( MessageRef, RenderedMessage, SendOptions, + ThreadId, Transport, ) @@ -79,7 +80,7 @@ class IncomingMessage: message_id: MessageId text: str reply_to: MessageRef | None = None - thread_id: int | None = None + thread_id: ThreadId | None = None @dataclass(frozen=True) @@ -110,7 +111,7 @@ async def _send_or_edit_message( reply_to: MessageRef | None = None, notify: bool = True, replace_ref: MessageRef | None = None, - thread_id: int | None = None, + thread_id: ThreadId | None = None, ) -> tuple[MessageRef | None, bool]: msg = message followups = message.extra.get("followups") @@ -248,7 +249,7 @@ async def send_initial_progress( tracker: ProgressTracker, resume_formatter: Callable[[ResumeToken], str] | None = None, context_line: str | None = None, - thread_id: int | None = None, + thread_id: ThreadId | None = None, ) -> ProgressMessageState: progress_ref: MessageRef | None = None last_rendered: RenderedMessage | None = None @@ -358,7 +359,7 @@ async def send_result_message( edit_ref: MessageRef | None, replace_ref: MessageRef | None = None, delete_tag: str = "final", - thread_id: int | None = None, + thread_id: ThreadId | None = None, ) -> None: final_msg, edited = await _send_or_edit_message( cfg.transport, diff --git a/src/takopi/scheduler.py b/src/takopi/scheduler.py index 13b495b..9a8ea2b 100644 --- a/src/takopi/scheduler.py +++ b/src/takopi/scheduler.py @@ -8,16 +8,17 @@ import anyio from .context import RunContext from .model import ResumeToken +from .transport import ChannelId, MessageId, ThreadId @dataclass(frozen=True, slots=True) class ThreadJob: - chat_id: int - user_msg_id: int + chat_id: ChannelId + user_msg_id: MessageId text: str resume_token: ResumeToken context: RunContext | None = None - thread_id: int | None = None + thread_id: ThreadId | None = None RunJob = Callable[[ThreadJob], Awaitable[None]] @@ -65,12 +66,12 @@ class ThreadScheduler: async def enqueue_resume( self, - chat_id: int, - user_msg_id: int, + chat_id: ChannelId, + user_msg_id: MessageId, text: str, resume_token: ResumeToken, context: RunContext | None = None, - thread_id: int | None = None, + thread_id: ThreadId | None = None, ) -> None: await self.enqueue( ThreadJob( diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py index 5486f5c..553b580 100644 --- a/src/takopi/telegram/bridge.py +++ b/src/takopi/telegram/bridge.py @@ -185,7 +185,11 @@ class TelegramTransport: else None ) notify = options.notify - message_thread_id = options.thread_id + message_thread_id = ( + cast(int | None, options.thread_id) + if options.thread_id is not None + else None + ) else: reply_to_message_id = cast( int | None, @@ -219,10 +223,16 @@ class TelegramTransport: notify=notify, ) message_id = sent.message_id + thread_id = ( + sent.message_thread_id + if sent.message_thread_id is not None + else message_thread_id + ) return MessageRef( channel_id=chat_id, message_id=message_id, raw=sent, + thread_id=thread_id, ) async def edit( @@ -261,10 +271,16 @@ class TelegramTransport: notify=notify, ) message_id = edited.message_id + thread_id = ( + edited.message_thread_id + if edited.message_thread_id is not None + else ref.thread_id + ) return MessageRef( channel_id=chat_id, message_id=message_id, raw=edited, + thread_id=thread_id, ) async def delete(self, *, ref: MessageRef) -> bool: diff --git a/src/takopi/telegram/commands.py b/src/takopi/telegram/commands.py index 2943dd5..ca1a3e4 100644 --- a/src/takopi/telegram/commands.py +++ b/src/takopi/telegram/commands.py @@ -1178,11 +1178,15 @@ class _CaptureTransport: message: RenderedMessage, options: SendOptions | None = None, ) -> MessageRef: - _ = options + thread_id = options.thread_id if options is not None else None ref = MessageRef(channel_id=channel_id, message_id=self._next_id) self._next_id += 1 self.last_message = message - return ref + return MessageRef( + channel_id=ref.channel_id, + message_id=ref.message_id, + thread_id=thread_id, + ) async def edit( self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True @@ -1218,7 +1222,11 @@ class _TelegramCommandExecutor(CommandExecutor): self._chat_id = chat_id self._user_msg_id = user_msg_id self._thread_id = thread_id - self._reply_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id) + self._reply_ref = MessageRef( + channel_id=chat_id, + message_id=user_msg_id, + thread_id=thread_id, + ) def _apply_default_context(self, request: RunRequest) -> RunRequest: if request.context is not None: @@ -1336,7 +1344,11 @@ async def _dispatch_command( chat_id = msg.chat_id user_msg_id = msg.message_id reply_ref = ( - MessageRef(channel_id=chat_id, message_id=msg.reply_to_message_id) + MessageRef( + channel_id=chat_id, + message_id=msg.reply_to_message_id, + thread_id=msg.thread_id, + ) if msg.reply_to_message_id is not None else None ) @@ -1349,7 +1361,9 @@ async def _dispatch_command( user_msg_id=user_msg_id, thread_id=msg.thread_id, ) - message_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id) + message_ref = MessageRef( + channel_id=chat_id, message_id=user_msg_id, thread_id=msg.thread_id + ) try: backend = get_command(command_id, allowlist=allowlist, required=False) except ConfigError as exc: diff --git a/src/takopi/telegram/loop.py b/src/takopi/telegram/loop.py index 67bbd61..8a591f8 100644 --- a/src/takopi/telegram/loop.py +++ b/src/takopi/telegram/loop.py @@ -3,6 +3,7 @@ from __future__ import annotations from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass from functools import partial +from typing import cast import anyio from anyio.abc import TaskGroup @@ -417,12 +418,12 @@ async def run_main_loop( async def run_thread_job(job: ThreadJob) -> None: await run_job( - job.chat_id, - job.user_msg_id, + cast(int, job.chat_id), + cast(int, job.user_msg_id), job.text, job.resume_token, job.context, - job.thread_id, + cast(int | None, job.thread_id), None, scheduler.note_thread_known, ) diff --git a/src/takopi/transport.py b/src/takopi/transport.py index a17d74c..19c71a2 100644 --- a/src/takopi/transport.py +++ b/src/takopi/transport.py @@ -5,6 +5,7 @@ from typing import Any, Protocol, TypeAlias ChannelId: TypeAlias = int | str MessageId: TypeAlias = int | str +ThreadId: TypeAlias = int | str @dataclass(frozen=True, slots=True) @@ -12,6 +13,7 @@ class MessageRef: channel_id: ChannelId message_id: MessageId raw: Any | None = field(default=None, compare=False, hash=False) + thread_id: ThreadId | None = field(default=None, compare=False, hash=False) @dataclass(frozen=True, slots=True) @@ -25,7 +27,7 @@ class SendOptions: reply_to: MessageRef | None = None notify: bool = True replace: MessageRef | None = None - thread_id: int | None = None + thread_id: ThreadId | None = None class Transport(Protocol):