From 77504231d42e8e013a7dd6f69a8d7d62936b210e Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Sun, 11 Jan 2026 01:40:49 +0400 Subject: [PATCH] feat(telegram): topics scope + thread-aware replies (#81) --- docs/transports/telegram.md | 17 ++- docs/user-guide.md | 21 +-- readme.md | 2 - src/takopi/config_migrations.py | 50 +++++++ src/takopi/logging.py | 16 ++- src/takopi/runner_bridge.py | 12 +- src/takopi/settings.py | 12 +- src/takopi/telegram/backend.py | 2 +- src/takopi/telegram/bridge.py | 219 ++++++++++++++++++++--------- src/takopi/telegram/topic_state.py | 2 +- tests/test_telegram_bridge.py | 53 ++++++- 11 files changed, 300 insertions(+), 106 deletions(-) diff --git a/docs/transports/telegram.md b/docs/transports/telegram.md index 2048cbf..c44cc6b 100644 --- a/docs/transports/telegram.md +++ b/docs/transports/telegram.md @@ -44,21 +44,24 @@ Configuration (under `[transports.telegram]`): ```toml [transports.telegram.topics] enabled = true -mode = "multi_project_chat" # or "per_project_chat" +scope = "auto" # auto | main | projects | all ``` Requirements: -- `multi_project_chat`: `chat_id` must be a forum-enabled supergroup (topics enabled). -- `per_project_chat`: each `projects..chat_id` must point to a forum-enabled +- `main`: `chat_id` must be a forum-enabled supergroup (topics enabled). +- `projects`: each `projects..chat_id` must point to a forum-enabled supergroup for that project. +- `all`: both the main chat and each project chat must be forum-enabled. +- `auto`: if any project chats are configured, uses `projects`; otherwise `main`. - The bot needs the **Manage Topics** permission in the relevant chat(s). Commands: -- `multi_project_chat`: `/topic @branch` creates a topic in the main chat - and binds it. -- `per_project_chat`: `/topic @branch` creates a topic in the project chat and binds it. +- `main`: `/topic @branch` creates a topic in the main chat and binds it. +- `projects`: `/topic @branch` creates a topic in the project chat and binds it. +- `all`: use `/topic @branch` in the main chat, or `/topic @branch` in + project chats. - `/ctx` inside a topic shows the bound context and stored session engines. `/ctx set ...` and `/ctx clear` update the binding. - `/new` inside a topic clears stored resume tokens for that topic. @@ -66,7 +69,7 @@ Commands: State is stored in `telegram_topics_state.json` alongside the config file. Delete it to reset all topic bindings and stored sessions. -Note: `multi_project_chat` does not assume a default project; topics must be bound +Note: main chat topics do not assume a default project; topics must be bound before running without directives. ## Outbox model diff --git a/docs/user-guide.md b/docs/user-guide.md index e66c23d..1382657 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -238,14 +238,14 @@ Topics bind Telegram forum threads to specific project/branch contexts. They als ```toml [transports.telegram.topics] enabled = true -mode = "multi_project_chat" # or "per_project_chat" ``` Your bot needs **Manage Topics** permission in the group. -### Topic modes explained +If any `projects..chat_id` are configured, topics are managed in those +project chats; otherwise topics are managed in the main chat. -**`multi_project_chat`** — One forum-enabled supergroup for everything. Create topics per project/branch combination. +### Topic behavior ``` ┌────────────────────────────┐ @@ -258,7 +258,10 @@ Your bot needs **Manage Topics** permission in the group. └────────────────────────────┘ ``` -**`per_project_chat`** — Each project has its own forum-enabled supergroup. Topics still include the project name for consistency, but the project is inferred from the chat. Regular messages in that chat also infer the project, so `/project` is usually optional. +Each project can have its own forum-enabled supergroup. Topics still +include the project name for consistency, but the project is inferred from the +chat. Regular messages in that chat also infer the project, so `/project` is +usually optional. ``` ┌────────────────────────────────┐ ┌───────────────────────────────────┐ @@ -282,11 +285,11 @@ Run these inside a topic thread: | `/ctx clear` | Remove the binding | | `/new` | Clear resume tokens for this topic | -In `per_project_chat` mode, omit the project: `/topic @branch` or `/ctx set @branch`. +In project chats, omit the project: `/topic @branch` or `/ctx set @branch`. ### Configuration examples -**Multi-project chat:** +**Main chat only:** ```toml [transports.telegram] @@ -294,10 +297,9 @@ chat_id = -1001234567890 [transports.telegram.topics] enabled = true -mode = "multi_project_chat" ``` -**Per-project chat:** +**Project chats:** ```toml [transports.telegram] @@ -305,7 +307,6 @@ chat_id = 123456789 # main chat (private, for non-project messages) [transports.telegram.topics] enabled = true -mode = "per_project_chat" [projects.takopi] path = "~/dev/takopi" @@ -350,7 +351,7 @@ voice_transcription = true [transports.telegram.topics] enabled = true -mode = "multi_project_chat" +scope = "auto" # Project definitions [projects.takopi] diff --git a/readme.md b/readme.md index ead8a99..041ba9e 100644 --- a/readme.md +++ b/readme.md @@ -73,8 +73,6 @@ voice_transcription = true [transports.telegram.topics] enabled = true -mode = "multi_project_chat" # or "per_project_chat" -# per_project_chat uses projects..chat_id to infer the project [codex] # optional: profile from ~/.codex/config.toml diff --git a/src/takopi/config_migrations.py b/src/takopi/config_migrations.py index 77a5a33..6de024e 100644 --- a/src/takopi/config_migrations.py +++ b/src/takopi/config_migrations.py @@ -54,10 +54,60 @@ def _migrate_legacy_telegram(config: dict[str, Any], *, config_path: Path) -> bo return True +def _migrate_topics_scope(config: dict[str, Any], *, config_path: Path) -> bool: + transports = config.get("transports") + if transports is None: + return False + if not isinstance(transports, dict): + raise ConfigError(f"Invalid `transports` in {config_path}; expected a table.") + + telegram = transports.get("telegram") + if telegram is None: + return False + if not isinstance(telegram, dict): + raise ConfigError( + f"Invalid `transports.telegram` in {config_path}; expected a table." + ) + + topics = telegram.get("topics") + if topics is None: + return False + if not isinstance(topics, dict): + raise ConfigError( + f"Invalid `transports.telegram.topics` in {config_path}; expected a table." + ) + if "mode" not in topics: + return False + + if "scope" not in topics: + mode = topics.get("mode") + if not isinstance(mode, str): + raise ConfigError( + f"Invalid `transports.telegram.topics.mode` in {config_path}; " + "expected a string." + ) + cleaned = mode.strip() + mapping = { + "multi_project_chat": "main", + "per_project_chat": "projects", + } + if cleaned not in mapping: + raise ConfigError( + f"Invalid `transports.telegram.topics.mode` in {config_path}; " + "expected 'multi_project_chat' or 'per_project_chat'." + ) + topics["scope"] = mapping[cleaned] + + topics.pop("mode", None) + return True + + def migrate_config(config: dict[str, Any], *, config_path: Path) -> list[str]: applied: list[str] = [] if _migrate_legacy_telegram(config, config_path=config_path): applied.append("legacy-telegram") + if _migrate_topics_scope(config, config_path=config_path): + applied.append("topics-scope") return applied diff --git a/src/takopi/logging.py b/src/takopi/logging.py index ec2fc28..8d6a4bc 100644 --- a/src/takopi/logging.py +++ b/src/takopi/logging.py @@ -253,12 +253,20 @@ def setup_logging( structlog.processors.TimeStamper(fmt="iso", utc=True), structlog.processors.add_log_level, _add_logger_name, - structlog.processors.format_exc_info, - _redact_event_dict, - _file_sink, - cast(Processor, renderer), ], ) + if format_value == "json": + processors.append(structlog.processors.format_exc_info) + processors.extend( + cast( + list[Processor], + [ + _redact_event_dict, + _file_sink, + cast(Processor, renderer), + ], + ) + ) structlog.configure( processors=processors, diff --git a/src/takopi/runner_bridge.py b/src/takopi/runner_bridge.py index 27f4cd8..587d038 100644 --- a/src/takopi/runner_bridge.py +++ b/src/takopi/runner_bridge.py @@ -79,6 +79,7 @@ class IncomingMessage: message_id: MessageId text: str reply_to: MessageRef | None = None + thread_id: int | None = None @dataclass(frozen=True) @@ -109,6 +110,7 @@ async def _send_or_edit_message( reply_to: MessageRef | None = None, notify: bool = True, replace_ref: MessageRef | None = None, + thread_id: int | None = None, ) -> tuple[MessageRef | None, bool]: msg = message if edit_ref is not None: @@ -135,6 +137,7 @@ async def _send_or_edit_message( reply_to=reply_to, notify=notify, replace=replace_ref, + thread_id=thread_id, ), ) return sent, False @@ -236,6 +239,7 @@ async def send_initial_progress( tracker: ProgressTracker, resume_formatter: Callable[[ResumeToken], str] | None = None, context_line: str | None = None, + thread_id: int | None = None, ) -> ProgressMessageState: progress_ref: MessageRef | None = None last_rendered: RenderedMessage | None = None @@ -258,7 +262,7 @@ async def send_initial_progress( progress_ref = await cfg.transport.send( channel_id=channel_id, message=initial_rendered, - options=SendOptions(reply_to=reply_to, notify=False), + options=SendOptions(reply_to=reply_to, notify=False, thread_id=thread_id), ) if progress_ref is not None: last_rendered = initial_rendered @@ -345,6 +349,7 @@ async def send_result_message( edit_ref: MessageRef | None, replace_ref: MessageRef | None = None, delete_tag: str = "final", + thread_id: int | None = None, ) -> None: final_msg, edited = await _send_or_edit_message( cfg.transport, @@ -354,6 +359,7 @@ async def send_result_message( reply_to=reply_to, notify=notify, replace_ref=replace_ref, + thread_id=thread_id, ) if final_msg is None: return @@ -411,6 +417,7 @@ async def handle_message( tracker=progress_tracker, resume_formatter=runner.format_resume, context_line=context_line, + thread_id=incoming.thread_id, ) progress_ref = progress_state.ref @@ -506,6 +513,7 @@ async def handle_message( edit_ref=progress_ref, replace_ref=progress_ref, delete_tag="error", + thread_id=incoming.thread_id, ) return @@ -535,6 +543,7 @@ async def handle_message( edit_ref=progress_ref, replace_ref=progress_ref, delete_tag="cancel", + thread_id=incoming.thread_id, ) return @@ -598,4 +607,5 @@ async def handle_message( edit_ref=edit_ref, replace_ref=progress_ref, delete_tag="final", + thread_id=incoming.thread_id, ) diff --git a/src/takopi/settings.py b/src/takopi/settings.py index 6f4042b..60a3553 100644 --- a/src/takopi/settings.py +++ b/src/takopi/settings.py @@ -31,17 +31,17 @@ class TelegramTopicsSettings(BaseModel): model_config = ConfigDict(extra="forbid") enabled: bool = False - mode: str = "multi_project_chat" + scope: str = "auto" - @field_validator("mode", mode="before") + @field_validator("scope", mode="before") @classmethod - def _validate_mode(cls, value: Any) -> str: + def _validate_scope(cls, value: Any) -> str: if not isinstance(value, str): - raise ValueError("topics.mode must be a string") + raise ValueError("topics.scope must be a string") cleaned = value.strip() - if cleaned not in {"per_project_chat", "multi_project_chat"}: + if cleaned not in {"auto", "main", "projects", "all"}: raise ValueError( - "topics.mode must be 'per_project_chat' or 'multi_project_chat'" + "topics.scope must be 'auto', 'main', 'projects', or 'all'" ) return cleaned diff --git a/src/takopi/telegram/backend.py b/src/takopi/telegram/backend.py index c720ce1..cd7fd37 100644 --- a/src/takopi/telegram/backend.py +++ b/src/takopi/telegram/backend.py @@ -75,7 +75,7 @@ def _build_topics_config( raise ConfigError(f"Invalid topics config in {config_path}: {exc}") from exc return TelegramTopicsConfig( enabled=settings.enabled, - mode=settings.mode, + scope=settings.scope, ) diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py index 4cbc5d2..7f35613 100644 --- a/src/takopi/telegram/bridge.py +++ b/src/takopi/telegram/bridge.py @@ -95,24 +95,58 @@ def _parse_slash_command(text: str) -> tuple[str | None, str]: _TOPICS_COMMANDS = {"ctx", "new", "topic"} +def _resolve_topics_scope(cfg: TelegramBridgeConfig) -> tuple[str, frozenset[int]]: + scope = cfg.topics.scope + project_ids = set(cfg.runtime.project_chat_ids()) + if scope == "auto": + scope = "projects" if project_ids else "main" + if scope == "main": + return scope, frozenset({cfg.chat_id}) + if scope == "projects": + return scope, frozenset(project_ids) + if scope == "all": + return scope, frozenset({cfg.chat_id, *project_ids}) + raise ValueError(f"Invalid topics.scope: {cfg.topics.scope!r}") + + +def _topics_scope_label(cfg: TelegramBridgeConfig) -> str: + resolved, _ = _resolve_topics_scope(cfg) + if cfg.topics.scope == "auto": + return f"auto ({resolved})" + return resolved + + def _topics_chat_project(cfg: TelegramBridgeConfig, chat_id: int) -> str | None: context = cfg.runtime.default_context_for_chat(chat_id) return context.project if context is not None else None def _topics_chat_allowed(cfg: TelegramBridgeConfig, chat_id: int) -> bool: - if cfg.topics.mode == "per_project_chat": - return _topics_chat_project(cfg, chat_id) is not None - return chat_id == cfg.chat_id + if not cfg.topics.enabled: + return False + _, scope_chat_ids = _resolve_topics_scope(cfg) + return chat_id in scope_chat_ids def _topics_command_error(cfg: TelegramBridgeConfig, chat_id: int) -> str | None: - if cfg.topics.mode == "per_project_chat": - if _topics_chat_project(cfg, chat_id) is None: - return "topics commands are only available in project chats." - elif chat_id != cfg.chat_id: + if _topics_chat_allowed(cfg, chat_id): + return None + resolved, _ = _resolve_topics_scope(cfg) + if resolved == "main": + if cfg.topics.scope == "auto": + return ( + "topics commands are only available in the main chat (auto scope). " + 'to use topics in project chats, set `topics.scope = "projects"`.' + ) return "topics commands are only available in the main chat." - return None + if resolved == "projects": + if cfg.topics.scope == "auto": + return ( + "topics commands are only available in project chats (auto scope). " + 'to use topics in the main chat, set `topics.scope = "main"`.' + ) + return "topics commands are only available in project chats." + return "topics commands are only available in the main or project chats." def _merge_topic_context( @@ -148,14 +182,14 @@ def _format_context(runtime: TransportRuntime, context: RunContext | None) -> st return project -def _usage_ctx_set(cfg: TelegramBridgeConfig) -> str: - if cfg.topics.mode == "per_project_chat": +def _usage_ctx_set(*, chat_project: str | None) -> str: + if chat_project is not None: return "usage: /ctx set [@branch]" return "usage: /ctx set [@branch]" -def _usage_topic(cfg: TelegramBridgeConfig) -> str: - if cfg.topics.mode == "per_project_chat": +def _usage_topic(*, chat_project: str | None) -> str: + if chat_project is not None: return "usage: /topic @branch" return "usage: /topic @branch" @@ -170,7 +204,12 @@ def _parse_project_branch_args( ) -> tuple[RunContext | None, str | None]: tokens = _split_command_args(args_text) if not tokens: - return None, _usage_topic(cfg) if require_branch else _usage_ctx_set(cfg) + return ( + None, + _usage_topic(chat_project=chat_project) + if require_branch + else _usage_ctx_set(chat_project=chat_project), + ) if len(tokens) > 2: return None, "too many arguments" project_token: str | None = None @@ -187,9 +226,7 @@ def _parse_project_branch_args( branch = second[1:] or None project_key: str | None = None - if cfg.topics.mode == "per_project_chat": - if chat_project is None: - return None, "topics are only available in project chats" + if chat_project is not None: if project_token is None: project_key = chat_project else: @@ -202,7 +239,7 @@ def _parse_project_branch_args( project_key = normalized else: if project_token is None: - return None, "project is required in multi_project_chat mode" + return None, "project is required" project_key = runtime.normalize_project_key(project_token) if project_key is None: return None, f"unknown project {project_token!r}" @@ -221,15 +258,20 @@ def _format_ctx_status( resolved: RunContext | None, context_source: str, snapshot: TopicThreadSnapshot | None, + chat_project: str | None, ) -> str: lines = [ - f"topics: enabled ({cfg.topics.mode})", + f"topics: enabled (scope={_topics_scope_label(cfg)})", f"bound ctx: {_format_context(runtime, bound)}", f"resolved ctx: {_format_context(runtime, resolved)} (source: {context_source})", ] - if cfg.topics.mode == "multi_project_chat" and bound is None: - topic_usage = _usage_topic(cfg).removeprefix("usage: ").strip() - ctx_usage = _usage_ctx_set(cfg).removeprefix("usage: ").strip() + if chat_project is None and bound is None: + topic_usage = ( + _usage_topic(chat_project=chat_project).removeprefix("usage: ").strip() + ) + ctx_usage = ( + _usage_ctx_set(chat_project=chat_project).removeprefix("usage: ").strip() + ) lines.append( f"note: unbound topic — bind with `{topic_usage}` or `{ctx_usage}`" ) @@ -415,7 +457,7 @@ class TelegramVoiceTranscriptionConfig: @dataclass(frozen=True) class TelegramTopicsConfig: enabled: bool = False - mode: str = "multi_project_chat" + scope: str = "auto" def _as_int(value: int | str, *, label: str) -> int: @@ -541,12 +583,13 @@ async def _send_plain( user_msg_id: int, text: str, notify: bool = True, + thread_id: int | None = None, ) -> None: reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id) await transport.send( channel_id=chat_id, message=RenderedMessage(text=text), - options=SendOptions(reply_to=reply_to, notify=notify), + options=SendOptions(reply_to=reply_to, notify=notify, thread_id=thread_id), ) @@ -569,53 +612,50 @@ async def _validate_topics_setup(cfg: TelegramBridgeConfig) -> None: me = await cfg.bot.get_me() bot_id = me.get("id") if isinstance(me, dict) else None if not isinstance(bot_id, int): - raise ConfigError("Failed to fetch bot id for topics validation.") - if cfg.topics.mode == "per_project_chat": - chat_ids = cfg.runtime.project_chat_ids() - if not chat_ids: - raise ConfigError( - "Topics enabled but no project chats are configured; " - "set projects..chat_id for forum chats." - ) - else: - chat_ids = (cfg.chat_id,) + raise ConfigError("failed to fetch bot id for topics validation.") + scope, chat_ids = _resolve_topics_scope(cfg) + if scope == "projects" and not chat_ids: + raise ConfigError( + "topics enabled but no project chats are configured; " + 'set projects..chat_id for forum chats or use scope="main".' + ) for chat_id in chat_ids: chat = await cfg.bot.get_chat(chat_id) if not isinstance(chat, dict): raise ConfigError( - f"Failed to fetch chat info for topics validation ({chat_id})." + f"failed to fetch chat info for topics validation ({chat_id})." ) chat_type = chat.get("type") is_forum = chat.get("is_forum") if chat_type != "supergroup": raise ConfigError( - "Topics enabled but chat is not a supergroup; convert the group " - "and enable Topics." + "topics enabled but chat is not a supergroup " + f"(chat_id={chat_id}); convert the group and enable topics." ) if is_forum is not True: raise ConfigError( - "Topics enabled but chat does not have Topics enabled; " - "turn on Topics in group settings." + "topics enabled but chat does not have topics enabled " + f"(chat_id={chat_id}); turn on topics in group settings." ) member = await cfg.bot.get_chat_member(chat_id, bot_id) if not isinstance(member, dict): raise ConfigError( - "Failed to fetch bot permissions; promote the bot to admin with " - "Manage Topics." + "failed to fetch bot permissions " + f"(chat_id={chat_id}); promote the bot to admin with manage topics." ) status = member.get("status") if status == "creator": continue if status != "administrator": raise ConfigError( - "Topics enabled but bot is not an admin; promote it and grant " - "Manage Topics." + "topics enabled but bot is not an admin " + f"(chat_id={chat_id}); promote it and grant manage topics." ) if member.get("can_manage_topics") is not True: raise ConfigError( - "Topics enabled but bot lacks Manage Topics permission; " - "grant can_manage_topics." + "topics enabled but bot lacks manage topics permission " + f"(chat_id={chat_id}); grant can_manage_topics." ) @@ -690,6 +730,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice transcription is disabled.", + thread_id=msg.thread_id, ) return None api_key = _resolve_openai_api_key(settings) @@ -699,6 +740,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice transcription requires OPENAI_API_KEY.", + thread_id=msg.thread_id, ) return None if voice.file_size is not None and voice.file_size > _OPENAI_AUDIO_MAX_BYTES: @@ -707,6 +749,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice message is too large to transcribe.", + thread_id=msg.thread_id, ) return None file_info = await cfg.bot.get_file(voice.file_id) @@ -716,6 +759,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="failed to fetch voice file.", + thread_id=msg.thread_id, ) return None file_path = file_info.get("file_path") @@ -725,6 +769,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="failed to fetch voice file.", + thread_id=msg.thread_id, ) return None audio_bytes = await cfg.bot.download_file(file_path) @@ -734,6 +779,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="failed to download voice message.", + thread_id=msg.thread_id, ) return None if len(audio_bytes) > _OPENAI_AUDIO_MAX_BYTES: @@ -742,6 +788,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice message is too large to transcribe.", + thread_id=msg.thread_id, ) return None filename = _normalize_voice_filename(file_path, voice.mime_type) @@ -759,6 +806,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice transcription failed.", + thread_id=msg.thread_id, ) return None transcript = transcript.strip() @@ -768,6 +816,7 @@ async def _transcribe_voice( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="voice transcription returned empty text.", + thread_id=msg.thread_id, ) return None return transcript @@ -831,13 +880,10 @@ async def _handle_ctx_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text=error, + thread_id=msg.thread_id, ) return - chat_project = ( - _topics_chat_project(cfg, msg.chat_id) - if cfg.topics.mode == "per_project_chat" - else None - ) + chat_project = _topics_chat_project(cfg, msg.chat_id) tkey = _topic_key(msg, cfg) if tkey is None: await _send_plain( @@ -845,6 +891,7 @@ async def _handle_ctx_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="this command only works inside a topic.", + thread_id=msg.thread_id, ) return tokens = _split_command_args(args_text) @@ -866,12 +913,14 @@ async def _handle_ctx_command( resolved=resolved.context, context_source=resolved.context_source, snapshot=snapshot, + chat_project=chat_project, ) await _send_plain( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, text=text, + thread_id=msg.thread_id, ) return if action == "set": @@ -888,7 +937,8 @@ async def _handle_ctx_command( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, - text=f"error:\n{error}\n{_usage_ctx_set(cfg)}", + text=f"error:\n{error}\n{_usage_ctx_set(chat_project=chat_project)}", + thread_id=msg.thread_id, ) return if context is None: @@ -896,7 +946,8 @@ async def _handle_ctx_command( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, - text=f"error:\n{_usage_ctx_set(cfg)}", + text=f"error:\n{_usage_ctx_set(chat_project=chat_project)}", + thread_id=msg.thread_id, ) return await store.set_context(*tkey, context) @@ -911,7 +962,8 @@ async def _handle_ctx_command( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, - text=f"topic bound to {_format_context(cfg.runtime, context)}", + text=f"topic bound to `{_format_context(cfg.runtime, context)}`", + thread_id=msg.thread_id, ) return if action == "clear": @@ -921,6 +973,7 @@ async def _handle_ctx_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="topic binding cleared.", + thread_id=msg.thread_id, ) return await _send_plain( @@ -928,6 +981,7 @@ async def _handle_ctx_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="unknown /ctx command. use /ctx, /ctx set, or /ctx clear.", + thread_id=msg.thread_id, ) @@ -943,6 +997,7 @@ async def _handle_new_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text=error, + thread_id=msg.thread_id, ) return tkey = _topic_key(msg, cfg) @@ -952,6 +1007,7 @@ async def _handle_new_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="this command only works inside a topic.", + thread_id=msg.thread_id, ) return await store.clear_sessions(*tkey) @@ -960,6 +1016,7 @@ async def _handle_new_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="cleared stored sessions for this topic.", + thread_id=msg.thread_id, ) @@ -976,13 +1033,10 @@ async def _handle_topic_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text=error, + thread_id=msg.thread_id, ) return - chat_project = ( - _topics_chat_project(cfg, msg.chat_id) - if cfg.topics.mode == "per_project_chat" - else None - ) + chat_project = _topics_chat_project(cfg, msg.chat_id) context, error = _parse_project_branch_args( args_text, runtime=cfg.runtime, @@ -991,18 +1045,17 @@ async def _handle_topic_command( chat_project=chat_project, ) if error is not None or context is None: - usage = _usage_topic(cfg) + usage = _usage_topic(chat_project=chat_project) text = f"error:\n{error}\n{usage}" if error else usage await _send_plain( cfg.exec_cfg.transport, chat_id=msg.chat_id, user_msg_id=msg.message_id, text=text, + thread_id=msg.thread_id, ) return - target_chat_id = ( - msg.chat_id if cfg.topics.mode == "per_project_chat" else cfg.chat_id - ) + target_chat_id = msg.chat_id existing = await store.find_thread_for_context(target_chat_id, context) if existing is not None: await _send_plain( @@ -1011,6 +1064,7 @@ async def _handle_topic_command( user_msg_id=msg.message_id, text=f"topic already exists for {_format_context(cfg.runtime, context)} " "in this chat.", + thread_id=msg.thread_id, ) return title = _topic_title(cfg=cfg, runtime=cfg.runtime, context=context) @@ -1022,6 +1076,7 @@ async def _handle_topic_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text="failed to create topic.", + thread_id=msg.thread_id, ) return await store.set_context( @@ -1036,11 +1091,12 @@ async def _handle_topic_command( chat_id=msg.chat_id, user_msg_id=msg.message_id, text=f"created topic {title!r}.", + thread_id=msg.thread_id, ) await cfg.exec_cfg.transport.send( channel_id=target_chat_id, message=RenderedMessage( - text=f"topic bound to {_format_context(cfg.runtime, context)}" + text=f"topic bound to `{_format_context(cfg.runtime, context)}`" ), options=SendOptions(thread_id=thread_id), ) @@ -1062,6 +1118,7 @@ async def _handle_cancel( chat_id=chat_id, user_msg_id=user_msg_id, text="nothing is currently running for that message.", + thread_id=msg.thread_id, ) return await _send_plain( @@ -1069,6 +1126,7 @@ async def _handle_cancel( chat_id=chat_id, user_msg_id=user_msg_id, text="reply to the progress message to cancel.", + thread_id=msg.thread_id, ) return @@ -1080,6 +1138,7 @@ async def _handle_cancel( chat_id=chat_id, user_msg_id=user_msg_id, text="nothing is currently running for that message.", + thread_id=msg.thread_id, ) return @@ -1158,6 +1217,7 @@ async def _send_with_resume( user_msg_id=user_msg_id, text="resume token not ready yet; try replying to the final message.", notify=False, + thread_id=thread_id, ) return await enqueue( @@ -1178,6 +1238,7 @@ async def _send_runner_unavailable( resume_token: ResumeToken | None, runner: Runner, reason: str, + thread_id: int | None = None, ) -> None: tracker = ProgressTracker(engine=runner.engine) tracker.set_resume(resume_token) @@ -1192,7 +1253,7 @@ async def _send_runner_unavailable( await exec_cfg.transport.send( channel_id=chat_id, message=message, - options=SendOptions(reply_to=reply_to, notify=True), + options=SendOptions(reply_to=reply_to, notify=True, thread_id=thread_id), ) @@ -1210,6 +1271,7 @@ async def _run_engine( on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None = None, engine_override: EngineId | None = None, + thread_id: int | None = None, ) -> None: try: try: @@ -1223,6 +1285,7 @@ async def _run_engine( chat_id=chat_id, user_msg_id=user_msg_id, text=f"error:\n{exc}", + thread_id=thread_id, ) return if not entry.available: @@ -1234,6 +1297,7 @@ async def _run_engine( resume_token=resume_token, runner=entry.runner, reason=reason, + thread_id=thread_id, ) return try: @@ -1244,6 +1308,7 @@ async def _run_engine( chat_id=chat_id, user_msg_id=user_msg_id, text=f"error:\n{exc}", + thread_id=thread_id, ) return run_base_token = set_run_base_dir(cwd) @@ -1266,6 +1331,7 @@ async def _run_engine( message_id=user_msg_id, text=text, reply_to=reply_ref, + thread_id=thread_id, ) await handle_message( exec_cfg, @@ -1342,6 +1408,7 @@ class _TelegramCommandExecutor(CommandExecutor): scheduler: ThreadScheduler, chat_id: int, user_msg_id: int, + thread_id: int | None, ) -> None: self._exec_cfg = exec_cfg self._runtime = runtime @@ -1349,6 +1416,7 @@ class _TelegramCommandExecutor(CommandExecutor): self._scheduler = scheduler self._chat_id = chat_id self._user_msg_id = user_msg_id + self._thread_id = thread_id self._reply_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id) def _apply_default_context(self, request: RunRequest) -> RunRequest: @@ -1379,7 +1447,11 @@ class _TelegramCommandExecutor(CommandExecutor): return await self._exec_cfg.transport.send( channel_id=self._chat_id, message=rendered, - options=SendOptions(reply_to=reply_ref, notify=notify), + options=SendOptions( + reply_to=reply_ref, + notify=notify, + thread_id=self._thread_id, + ), ) async def run_one( @@ -1409,6 +1481,7 @@ class _TelegramCommandExecutor(CommandExecutor): reply_ref=self._reply_ref, on_thread_known=None, engine_override=engine, + thread_id=self._thread_id, ) return RunResult(engine=engine, message=capture.last_message) await _run_engine( @@ -1423,6 +1496,7 @@ class _TelegramCommandExecutor(CommandExecutor): reply_ref=self._reply_ref, on_thread_known=self._scheduler.note_thread_known, engine_override=engine, + thread_id=self._thread_id, ) return RunResult(engine=engine, message=None) @@ -1472,6 +1546,7 @@ async def _dispatch_command( scheduler=scheduler, chat_id=chat_id, user_msg_id=user_msg_id, + thread_id=msg.thread_id, ) message_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id) try: @@ -1539,13 +1614,15 @@ async def run_main_loop( config_path = cfg.runtime.config_path if config_path is None: raise ConfigError( - "Topics enabled but config path is not set; cannot locate state file." + "topics enabled but config path is not set; cannot locate state file." ) topic_store = TopicStateStore(resolve_state_path(config_path)) await _validate_topics_setup(cfg) + resolved_scope, _ = _resolve_topics_scope(cfg) logger.info( "topics.enabled", - mode=cfg.topics.mode, + scope=cfg.topics.scope, + resolved_scope=resolved_scope, state_path=str(resolve_state_path(config_path)), ) await _set_command_menu(cfg) @@ -1640,6 +1717,7 @@ async def run_main_loop( reply_ref=reply_ref, on_thread_known=wrap_on_thread_known(on_thread_known, topic_key), engine_override=engine_override, + thread_id=thread_id, ) async def run_thread_job(job: ThreadJob) -> None: @@ -1681,9 +1759,7 @@ async def run_main_loop( ) topic_key = _topic_key(msg, cfg) if topic_store is not None else None chat_project = ( - _topics_chat_project(cfg, chat_id) - if cfg.topics.enabled and cfg.topics.mode == "per_project_chat" - else None + _topics_chat_project(cfg, chat_id) if cfg.topics.enabled else None ) bound_context = ( await topic_store.get_context(*topic_key) @@ -1748,6 +1824,7 @@ async def run_main_loop( chat_id=chat_id, user_msg_id=user_msg_id, text=f"error:\n{exc}", + thread_id=msg.thread_id, ) continue @@ -1782,8 +1859,10 @@ async def run_main_loop( user_msg_id=user_msg_id, text=( "this topic isn't bound to a project yet.\n" - f"{_usage_ctx_set(cfg)} or {_usage_topic(cfg)}" + f"{_usage_ctx_set(chat_project=chat_project)} or " + f"{_usage_topic(chat_project=chat_project)}" ), + thread_id=msg.thread_id, ) continue if resume_token is None and reply_id is not None: diff --git a/src/takopi/telegram/topic_state.py b/src/takopi/telegram/topic_state.py index 3ed140b..f925438 100644 --- a/src/takopi/telegram/topic_state.py +++ b/src/takopi/telegram/topic_state.py @@ -248,7 +248,7 @@ class TopicStateStore: self._data = {"version": STATE_VERSION, "threads": {}} return try: - payload = json.loads(self._path.read_text()) + payload = json.loads(self._path.read_text(encoding="utf-8")) except Exception as exc: logger.warning( "telegram.topic_state.load_failed", diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py index 17671c7..dd631e3 100644 --- a/tests/test_telegram_bridge.py +++ b/tests/test_telegram_bridge.py @@ -821,13 +821,13 @@ def test_topic_title_matches_command_syntax() -> None: assert title == "@main" -def test_topic_title_per_project_chat_includes_project() -> None: +def test_topic_title_projects_scope_includes_project() -> None: transport = _FakeTransport() cfg = replace( _make_cfg(transport), topics=bridge.TelegramTopicsConfig( enabled=True, - mode="per_project_chat", + scope="projects", ), ) @@ -1056,7 +1056,7 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None: @pytest.mark.anyio -async def test_run_main_loop_persists_topic_sessions_in_per_project_chat( +async def test_run_main_loop_persists_topic_sessions_in_project_scope( tmp_path: Path, ) -> None: project_chat_id = -100 @@ -1099,7 +1099,7 @@ async def test_run_main_loop_persists_topic_sessions_in_per_project_chat( exec_cfg=exec_cfg, topics=bridge.TelegramTopicsConfig( enabled=True, - mode="per_project_chat", + scope="projects", ), ) @@ -1124,6 +1124,51 @@ async def test_run_main_loop_persists_topic_sessions_in_per_project_chat( assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value) +@pytest.mark.anyio +async def test_run_main_loop_replies_in_same_thread() -> None: + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime = TransportRuntime( + router=_make_router(runner), + projects=empty_projects_config(), + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=1, + text="hello", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + thread_id=77, + ) + + await run_main_loop(cfg, poller) + + reply_calls = [ + call + for call in transport.send_calls + if call["options"] is not None and call["options"].reply_to is not None + ] + assert reply_calls + assert all(call["options"].thread_id == 77 for call in reply_calls) + + @pytest.mark.anyio async def test_run_main_loop_handles_command_plugins(monkeypatch) -> None: class _Command: