feat(telegram): topics scope + thread-aware replies (#81)

This commit is contained in:
banteg
2026-01-11 01:40:49 +04:00
committed by GitHub
parent 94fda5e7e6
commit 77504231d4
11 changed files with 300 additions and 106 deletions
+10 -7
View File
@@ -44,21 +44,24 @@ Configuration (under `[transports.telegram]`):
```toml
[transports.telegram.topics]
enabled = true
mode = "multi_project_chat" # or "per_project_chat"
scope = "auto" # auto | main | projects | all
```
Requirements:
- `multi_project_chat`: `chat_id` must be a forum-enabled supergroup (topics enabled).
- `per_project_chat`: each `projects.<alias>.chat_id` must point to a forum-enabled
- `main`: `chat_id` must be a forum-enabled supergroup (topics enabled).
- `projects`: each `projects.<alias>.chat_id` must point to a forum-enabled
supergroup for that project.
- `all`: both the main chat and each project chat must be forum-enabled.
- `auto`: if any project chats are configured, uses `projects`; otherwise `main`.
- The bot needs the **Manage Topics** permission in the relevant chat(s).
Commands:
- `multi_project_chat`: `/topic <project> @branch` creates a topic in the main chat
and binds it.
- `per_project_chat`: `/topic @branch` creates a topic in the project chat and binds it.
- `main`: `/topic <project> @branch` creates a topic in the main chat and binds it.
- `projects`: `/topic @branch` creates a topic in the project chat and binds it.
- `all`: use `/topic <project> @branch` in the main chat, or `/topic @branch` in
project chats.
- `/ctx` inside a topic shows the bound context and stored session engines.
`/ctx set ...` and `/ctx clear` update the binding.
- `/new` inside a topic clears stored resume tokens for that topic.
@@ -66,7 +69,7 @@ Commands:
State is stored in `telegram_topics_state.json` alongside the config file.
Delete it to reset all topic bindings and stored sessions.
Note: `multi_project_chat` does not assume a default project; topics must be bound
Note: main chat topics do not assume a default project; topics must be bound
before running without directives.
## Outbox model
+11 -10
View File
@@ -238,14 +238,14 @@ Topics bind Telegram forum threads to specific project/branch contexts. They als
```toml
[transports.telegram.topics]
enabled = true
mode = "multi_project_chat" # or "per_project_chat"
```
Your bot needs **Manage Topics** permission in the group.
### Topic modes explained
If any `projects.<alias>.chat_id` are configured, topics are managed in those
project chats; otherwise topics are managed in the main chat.
**`multi_project_chat`** — One forum-enabled supergroup for everything. Create topics per project/branch combination.
### Topic behavior
```
┌────────────────────────────┐
@@ -258,7 +258,10 @@ Your bot needs **Manage Topics** permission in the group.
└────────────────────────────┘
```
**`per_project_chat`** — Each project has its own forum-enabled supergroup. Topics still include the project name for consistency, but the project is inferred from the chat. Regular messages in that chat also infer the project, so `/project` is usually optional.
Each project can have its own forum-enabled supergroup. Topics still
include the project name for consistency, but the project is inferred from the
chat. Regular messages in that chat also infer the project, so `/project` is
usually optional.
```
┌────────────────────────────────┐ ┌───────────────────────────────────┐
@@ -282,11 +285,11 @@ Run these inside a topic thread:
| `/ctx clear` | Remove the binding |
| `/new` | Clear resume tokens for this topic |
In `per_project_chat` mode, omit the project: `/topic @branch` or `/ctx set @branch`.
In project chats, omit the project: `/topic @branch` or `/ctx set @branch`.
### Configuration examples
**Multi-project chat:**
**Main chat only:**
```toml
[transports.telegram]
@@ -294,10 +297,9 @@ chat_id = -1001234567890
[transports.telegram.topics]
enabled = true
mode = "multi_project_chat"
```
**Per-project chat:**
**Project chats:**
```toml
[transports.telegram]
@@ -305,7 +307,6 @@ chat_id = 123456789 # main chat (private, for non-project messages)
[transports.telegram.topics]
enabled = true
mode = "per_project_chat"
[projects.takopi]
path = "~/dev/takopi"
@@ -350,7 +351,7 @@ voice_transcription = true
[transports.telegram.topics]
enabled = true
mode = "multi_project_chat"
scope = "auto"
# Project definitions
[projects.takopi]
-2
View File
@@ -73,8 +73,6 @@ voice_transcription = true
[transports.telegram.topics]
enabled = true
mode = "multi_project_chat" # or "per_project_chat"
# per_project_chat uses projects.<alias>.chat_id to infer the project
[codex]
# optional: profile from ~/.codex/config.toml
+50
View File
@@ -54,10 +54,60 @@ def _migrate_legacy_telegram(config: dict[str, Any], *, config_path: Path) -> bo
return True
def _migrate_topics_scope(config: dict[str, Any], *, config_path: Path) -> bool:
transports = config.get("transports")
if transports is None:
return False
if not isinstance(transports, dict):
raise ConfigError(f"Invalid `transports` in {config_path}; expected a table.")
telegram = transports.get("telegram")
if telegram is None:
return False
if not isinstance(telegram, dict):
raise ConfigError(
f"Invalid `transports.telegram` in {config_path}; expected a table."
)
topics = telegram.get("topics")
if topics is None:
return False
if not isinstance(topics, dict):
raise ConfigError(
f"Invalid `transports.telegram.topics` in {config_path}; expected a table."
)
if "mode" not in topics:
return False
if "scope" not in topics:
mode = topics.get("mode")
if not isinstance(mode, str):
raise ConfigError(
f"Invalid `transports.telegram.topics.mode` in {config_path}; "
"expected a string."
)
cleaned = mode.strip()
mapping = {
"multi_project_chat": "main",
"per_project_chat": "projects",
}
if cleaned not in mapping:
raise ConfigError(
f"Invalid `transports.telegram.topics.mode` in {config_path}; "
"expected 'multi_project_chat' or 'per_project_chat'."
)
topics["scope"] = mapping[cleaned]
topics.pop("mode", None)
return True
def migrate_config(config: dict[str, Any], *, config_path: Path) -> list[str]:
applied: list[str] = []
if _migrate_legacy_telegram(config, config_path=config_path):
applied.append("legacy-telegram")
if _migrate_topics_scope(config, config_path=config_path):
applied.append("topics-scope")
return applied
+12 -4
View File
@@ -253,12 +253,20 @@ def setup_logging(
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.add_log_level,
_add_logger_name,
structlog.processors.format_exc_info,
_redact_event_dict,
_file_sink,
cast(Processor, renderer),
],
)
if format_value == "json":
processors.append(structlog.processors.format_exc_info)
processors.extend(
cast(
list[Processor],
[
_redact_event_dict,
_file_sink,
cast(Processor, renderer),
],
)
)
structlog.configure(
processors=processors,
+11 -1
View File
@@ -79,6 +79,7 @@ class IncomingMessage:
message_id: MessageId
text: str
reply_to: MessageRef | None = None
thread_id: int | None = None
@dataclass(frozen=True)
@@ -109,6 +110,7 @@ async def _send_or_edit_message(
reply_to: MessageRef | None = None,
notify: bool = True,
replace_ref: MessageRef | None = None,
thread_id: int | None = None,
) -> tuple[MessageRef | None, bool]:
msg = message
if edit_ref is not None:
@@ -135,6 +137,7 @@ async def _send_or_edit_message(
reply_to=reply_to,
notify=notify,
replace=replace_ref,
thread_id=thread_id,
),
)
return sent, False
@@ -236,6 +239,7 @@ async def send_initial_progress(
tracker: ProgressTracker,
resume_formatter: Callable[[ResumeToken], str] | None = None,
context_line: str | None = None,
thread_id: int | None = None,
) -> ProgressMessageState:
progress_ref: MessageRef | None = None
last_rendered: RenderedMessage | None = None
@@ -258,7 +262,7 @@ async def send_initial_progress(
progress_ref = await cfg.transport.send(
channel_id=channel_id,
message=initial_rendered,
options=SendOptions(reply_to=reply_to, notify=False),
options=SendOptions(reply_to=reply_to, notify=False, thread_id=thread_id),
)
if progress_ref is not None:
last_rendered = initial_rendered
@@ -345,6 +349,7 @@ async def send_result_message(
edit_ref: MessageRef | None,
replace_ref: MessageRef | None = None,
delete_tag: str = "final",
thread_id: int | None = None,
) -> None:
final_msg, edited = await _send_or_edit_message(
cfg.transport,
@@ -354,6 +359,7 @@ async def send_result_message(
reply_to=reply_to,
notify=notify,
replace_ref=replace_ref,
thread_id=thread_id,
)
if final_msg is None:
return
@@ -411,6 +417,7 @@ async def handle_message(
tracker=progress_tracker,
resume_formatter=runner.format_resume,
context_line=context_line,
thread_id=incoming.thread_id,
)
progress_ref = progress_state.ref
@@ -506,6 +513,7 @@ async def handle_message(
edit_ref=progress_ref,
replace_ref=progress_ref,
delete_tag="error",
thread_id=incoming.thread_id,
)
return
@@ -535,6 +543,7 @@ async def handle_message(
edit_ref=progress_ref,
replace_ref=progress_ref,
delete_tag="cancel",
thread_id=incoming.thread_id,
)
return
@@ -598,4 +607,5 @@ async def handle_message(
edit_ref=edit_ref,
replace_ref=progress_ref,
delete_tag="final",
thread_id=incoming.thread_id,
)
+6 -6
View File
@@ -31,17 +31,17 @@ class TelegramTopicsSettings(BaseModel):
model_config = ConfigDict(extra="forbid")
enabled: bool = False
mode: str = "multi_project_chat"
scope: str = "auto"
@field_validator("mode", mode="before")
@field_validator("scope", mode="before")
@classmethod
def _validate_mode(cls, value: Any) -> str:
def _validate_scope(cls, value: Any) -> str:
if not isinstance(value, str):
raise ValueError("topics.mode must be a string")
raise ValueError("topics.scope must be a string")
cleaned = value.strip()
if cleaned not in {"per_project_chat", "multi_project_chat"}:
if cleaned not in {"auto", "main", "projects", "all"}:
raise ValueError(
"topics.mode must be 'per_project_chat' or 'multi_project_chat'"
"topics.scope must be 'auto', 'main', 'projects', or 'all'"
)
return cleaned
+1 -1
View File
@@ -75,7 +75,7 @@ def _build_topics_config(
raise ConfigError(f"Invalid topics config in {config_path}: {exc}") from exc
return TelegramTopicsConfig(
enabled=settings.enabled,
mode=settings.mode,
scope=settings.scope,
)
+149 -70
View File
@@ -95,24 +95,58 @@ def _parse_slash_command(text: str) -> tuple[str | None, str]:
_TOPICS_COMMANDS = {"ctx", "new", "topic"}
def _resolve_topics_scope(cfg: TelegramBridgeConfig) -> tuple[str, frozenset[int]]:
scope = cfg.topics.scope
project_ids = set(cfg.runtime.project_chat_ids())
if scope == "auto":
scope = "projects" if project_ids else "main"
if scope == "main":
return scope, frozenset({cfg.chat_id})
if scope == "projects":
return scope, frozenset(project_ids)
if scope == "all":
return scope, frozenset({cfg.chat_id, *project_ids})
raise ValueError(f"Invalid topics.scope: {cfg.topics.scope!r}")
def _topics_scope_label(cfg: TelegramBridgeConfig) -> str:
resolved, _ = _resolve_topics_scope(cfg)
if cfg.topics.scope == "auto":
return f"auto ({resolved})"
return resolved
def _topics_chat_project(cfg: TelegramBridgeConfig, chat_id: int) -> str | None:
context = cfg.runtime.default_context_for_chat(chat_id)
return context.project if context is not None else None
def _topics_chat_allowed(cfg: TelegramBridgeConfig, chat_id: int) -> bool:
if cfg.topics.mode == "per_project_chat":
return _topics_chat_project(cfg, chat_id) is not None
return chat_id == cfg.chat_id
if not cfg.topics.enabled:
return False
_, scope_chat_ids = _resolve_topics_scope(cfg)
return chat_id in scope_chat_ids
def _topics_command_error(cfg: TelegramBridgeConfig, chat_id: int) -> str | None:
if cfg.topics.mode == "per_project_chat":
if _topics_chat_project(cfg, chat_id) is None:
return "topics commands are only available in project chats."
elif chat_id != cfg.chat_id:
if _topics_chat_allowed(cfg, chat_id):
return None
resolved, _ = _resolve_topics_scope(cfg)
if resolved == "main":
if cfg.topics.scope == "auto":
return (
"topics commands are only available in the main chat (auto scope). "
'to use topics in project chats, set `topics.scope = "projects"`.'
)
return "topics commands are only available in the main chat."
return None
if resolved == "projects":
if cfg.topics.scope == "auto":
return (
"topics commands are only available in project chats (auto scope). "
'to use topics in the main chat, set `topics.scope = "main"`.'
)
return "topics commands are only available in project chats."
return "topics commands are only available in the main or project chats."
def _merge_topic_context(
@@ -148,14 +182,14 @@ def _format_context(runtime: TransportRuntime, context: RunContext | None) -> st
return project
def _usage_ctx_set(cfg: TelegramBridgeConfig) -> str:
if cfg.topics.mode == "per_project_chat":
def _usage_ctx_set(*, chat_project: str | None) -> str:
if chat_project is not None:
return "usage: /ctx set [@branch]"
return "usage: /ctx set <project> [@branch]"
def _usage_topic(cfg: TelegramBridgeConfig) -> str:
if cfg.topics.mode == "per_project_chat":
def _usage_topic(*, chat_project: str | None) -> str:
if chat_project is not None:
return "usage: /topic @branch"
return "usage: /topic <project> @branch"
@@ -170,7 +204,12 @@ def _parse_project_branch_args(
) -> tuple[RunContext | None, str | None]:
tokens = _split_command_args(args_text)
if not tokens:
return None, _usage_topic(cfg) if require_branch else _usage_ctx_set(cfg)
return (
None,
_usage_topic(chat_project=chat_project)
if require_branch
else _usage_ctx_set(chat_project=chat_project),
)
if len(tokens) > 2:
return None, "too many arguments"
project_token: str | None = None
@@ -187,9 +226,7 @@ def _parse_project_branch_args(
branch = second[1:] or None
project_key: str | None = None
if cfg.topics.mode == "per_project_chat":
if chat_project is None:
return None, "topics are only available in project chats"
if chat_project is not None:
if project_token is None:
project_key = chat_project
else:
@@ -202,7 +239,7 @@ def _parse_project_branch_args(
project_key = normalized
else:
if project_token is None:
return None, "project is required in multi_project_chat mode"
return None, "project is required"
project_key = runtime.normalize_project_key(project_token)
if project_key is None:
return None, f"unknown project {project_token!r}"
@@ -221,15 +258,20 @@ def _format_ctx_status(
resolved: RunContext | None,
context_source: str,
snapshot: TopicThreadSnapshot | None,
chat_project: str | None,
) -> str:
lines = [
f"topics: enabled ({cfg.topics.mode})",
f"topics: enabled (scope={_topics_scope_label(cfg)})",
f"bound ctx: {_format_context(runtime, bound)}",
f"resolved ctx: {_format_context(runtime, resolved)} (source: {context_source})",
]
if cfg.topics.mode == "multi_project_chat" and bound is None:
topic_usage = _usage_topic(cfg).removeprefix("usage: ").strip()
ctx_usage = _usage_ctx_set(cfg).removeprefix("usage: ").strip()
if chat_project is None and bound is None:
topic_usage = (
_usage_topic(chat_project=chat_project).removeprefix("usage: ").strip()
)
ctx_usage = (
_usage_ctx_set(chat_project=chat_project).removeprefix("usage: ").strip()
)
lines.append(
f"note: unbound topic — bind with `{topic_usage}` or `{ctx_usage}`"
)
@@ -415,7 +457,7 @@ class TelegramVoiceTranscriptionConfig:
@dataclass(frozen=True)
class TelegramTopicsConfig:
enabled: bool = False
mode: str = "multi_project_chat"
scope: str = "auto"
def _as_int(value: int | str, *, label: str) -> int:
@@ -541,12 +583,13 @@ async def _send_plain(
user_msg_id: int,
text: str,
notify: bool = True,
thread_id: int | None = None,
) -> None:
reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id)
await transport.send(
channel_id=chat_id,
message=RenderedMessage(text=text),
options=SendOptions(reply_to=reply_to, notify=notify),
options=SendOptions(reply_to=reply_to, notify=notify, thread_id=thread_id),
)
@@ -569,53 +612,50 @@ async def _validate_topics_setup(cfg: TelegramBridgeConfig) -> None:
me = await cfg.bot.get_me()
bot_id = me.get("id") if isinstance(me, dict) else None
if not isinstance(bot_id, int):
raise ConfigError("Failed to fetch bot id for topics validation.")
if cfg.topics.mode == "per_project_chat":
chat_ids = cfg.runtime.project_chat_ids()
if not chat_ids:
raise ConfigError(
"Topics enabled but no project chats are configured; "
"set projects.<alias>.chat_id for forum chats."
)
else:
chat_ids = (cfg.chat_id,)
raise ConfigError("failed to fetch bot id for topics validation.")
scope, chat_ids = _resolve_topics_scope(cfg)
if scope == "projects" and not chat_ids:
raise ConfigError(
"topics enabled but no project chats are configured; "
'set projects.<alias>.chat_id for forum chats or use scope="main".'
)
for chat_id in chat_ids:
chat = await cfg.bot.get_chat(chat_id)
if not isinstance(chat, dict):
raise ConfigError(
f"Failed to fetch chat info for topics validation ({chat_id})."
f"failed to fetch chat info for topics validation ({chat_id})."
)
chat_type = chat.get("type")
is_forum = chat.get("is_forum")
if chat_type != "supergroup":
raise ConfigError(
"Topics enabled but chat is not a supergroup; convert the group "
"and enable Topics."
"topics enabled but chat is not a supergroup "
f"(chat_id={chat_id}); convert the group and enable topics."
)
if is_forum is not True:
raise ConfigError(
"Topics enabled but chat does not have Topics enabled; "
"turn on Topics in group settings."
"topics enabled but chat does not have topics enabled "
f"(chat_id={chat_id}); turn on topics in group settings."
)
member = await cfg.bot.get_chat_member(chat_id, bot_id)
if not isinstance(member, dict):
raise ConfigError(
"Failed to fetch bot permissions; promote the bot to admin with "
"Manage Topics."
"failed to fetch bot permissions "
f"(chat_id={chat_id}); promote the bot to admin with manage topics."
)
status = member.get("status")
if status == "creator":
continue
if status != "administrator":
raise ConfigError(
"Topics enabled but bot is not an admin; promote it and grant "
"Manage Topics."
"topics enabled but bot is not an admin "
f"(chat_id={chat_id}); promote it and grant manage topics."
)
if member.get("can_manage_topics") is not True:
raise ConfigError(
"Topics enabled but bot lacks Manage Topics permission; "
"grant can_manage_topics."
"topics enabled but bot lacks manage topics permission "
f"(chat_id={chat_id}); grant can_manage_topics."
)
@@ -690,6 +730,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription is disabled.",
thread_id=msg.thread_id,
)
return None
api_key = _resolve_openai_api_key(settings)
@@ -699,6 +740,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription requires OPENAI_API_KEY.",
thread_id=msg.thread_id,
)
return None
if voice.file_size is not None and voice.file_size > _OPENAI_AUDIO_MAX_BYTES:
@@ -707,6 +749,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice message is too large to transcribe.",
thread_id=msg.thread_id,
)
return None
file_info = await cfg.bot.get_file(voice.file_id)
@@ -716,6 +759,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to fetch voice file.",
thread_id=msg.thread_id,
)
return None
file_path = file_info.get("file_path")
@@ -725,6 +769,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to fetch voice file.",
thread_id=msg.thread_id,
)
return None
audio_bytes = await cfg.bot.download_file(file_path)
@@ -734,6 +779,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to download voice message.",
thread_id=msg.thread_id,
)
return None
if len(audio_bytes) > _OPENAI_AUDIO_MAX_BYTES:
@@ -742,6 +788,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice message is too large to transcribe.",
thread_id=msg.thread_id,
)
return None
filename = _normalize_voice_filename(file_path, voice.mime_type)
@@ -759,6 +806,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription failed.",
thread_id=msg.thread_id,
)
return None
transcript = transcript.strip()
@@ -768,6 +816,7 @@ async def _transcribe_voice(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription returned empty text.",
thread_id=msg.thread_id,
)
return None
return transcript
@@ -831,13 +880,10 @@ async def _handle_ctx_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=error,
thread_id=msg.thread_id,
)
return
chat_project = (
_topics_chat_project(cfg, msg.chat_id)
if cfg.topics.mode == "per_project_chat"
else None
)
chat_project = _topics_chat_project(cfg, msg.chat_id)
tkey = _topic_key(msg, cfg)
if tkey is None:
await _send_plain(
@@ -845,6 +891,7 @@ async def _handle_ctx_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="this command only works inside a topic.",
thread_id=msg.thread_id,
)
return
tokens = _split_command_args(args_text)
@@ -866,12 +913,14 @@ async def _handle_ctx_command(
resolved=resolved.context,
context_source=resolved.context_source,
snapshot=snapshot,
chat_project=chat_project,
)
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=text,
thread_id=msg.thread_id,
)
return
if action == "set":
@@ -888,7 +937,8 @@ async def _handle_ctx_command(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=f"error:\n{error}\n{_usage_ctx_set(cfg)}",
text=f"error:\n{error}\n{_usage_ctx_set(chat_project=chat_project)}",
thread_id=msg.thread_id,
)
return
if context is None:
@@ -896,7 +946,8 @@ async def _handle_ctx_command(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=f"error:\n{_usage_ctx_set(cfg)}",
text=f"error:\n{_usage_ctx_set(chat_project=chat_project)}",
thread_id=msg.thread_id,
)
return
await store.set_context(*tkey, context)
@@ -911,7 +962,8 @@ async def _handle_ctx_command(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=f"topic bound to {_format_context(cfg.runtime, context)}",
text=f"topic bound to `{_format_context(cfg.runtime, context)}`",
thread_id=msg.thread_id,
)
return
if action == "clear":
@@ -921,6 +973,7 @@ async def _handle_ctx_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="topic binding cleared.",
thread_id=msg.thread_id,
)
return
await _send_plain(
@@ -928,6 +981,7 @@ async def _handle_ctx_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="unknown /ctx command. use /ctx, /ctx set, or /ctx clear.",
thread_id=msg.thread_id,
)
@@ -943,6 +997,7 @@ async def _handle_new_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=error,
thread_id=msg.thread_id,
)
return
tkey = _topic_key(msg, cfg)
@@ -952,6 +1007,7 @@ async def _handle_new_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="this command only works inside a topic.",
thread_id=msg.thread_id,
)
return
await store.clear_sessions(*tkey)
@@ -960,6 +1016,7 @@ async def _handle_new_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="cleared stored sessions for this topic.",
thread_id=msg.thread_id,
)
@@ -976,13 +1033,10 @@ async def _handle_topic_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=error,
thread_id=msg.thread_id,
)
return
chat_project = (
_topics_chat_project(cfg, msg.chat_id)
if cfg.topics.mode == "per_project_chat"
else None
)
chat_project = _topics_chat_project(cfg, msg.chat_id)
context, error = _parse_project_branch_args(
args_text,
runtime=cfg.runtime,
@@ -991,18 +1045,17 @@ async def _handle_topic_command(
chat_project=chat_project,
)
if error is not None or context is None:
usage = _usage_topic(cfg)
usage = _usage_topic(chat_project=chat_project)
text = f"error:\n{error}\n{usage}" if error else usage
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=text,
thread_id=msg.thread_id,
)
return
target_chat_id = (
msg.chat_id if cfg.topics.mode == "per_project_chat" else cfg.chat_id
)
target_chat_id = msg.chat_id
existing = await store.find_thread_for_context(target_chat_id, context)
if existing is not None:
await _send_plain(
@@ -1011,6 +1064,7 @@ async def _handle_topic_command(
user_msg_id=msg.message_id,
text=f"topic already exists for {_format_context(cfg.runtime, context)} "
"in this chat.",
thread_id=msg.thread_id,
)
return
title = _topic_title(cfg=cfg, runtime=cfg.runtime, context=context)
@@ -1022,6 +1076,7 @@ async def _handle_topic_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to create topic.",
thread_id=msg.thread_id,
)
return
await store.set_context(
@@ -1036,11 +1091,12 @@ async def _handle_topic_command(
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text=f"created topic {title!r}.",
thread_id=msg.thread_id,
)
await cfg.exec_cfg.transport.send(
channel_id=target_chat_id,
message=RenderedMessage(
text=f"topic bound to {_format_context(cfg.runtime, context)}"
text=f"topic bound to `{_format_context(cfg.runtime, context)}`"
),
options=SendOptions(thread_id=thread_id),
)
@@ -1062,6 +1118,7 @@ async def _handle_cancel(
chat_id=chat_id,
user_msg_id=user_msg_id,
text="nothing is currently running for that message.",
thread_id=msg.thread_id,
)
return
await _send_plain(
@@ -1069,6 +1126,7 @@ async def _handle_cancel(
chat_id=chat_id,
user_msg_id=user_msg_id,
text="reply to the progress message to cancel.",
thread_id=msg.thread_id,
)
return
@@ -1080,6 +1138,7 @@ async def _handle_cancel(
chat_id=chat_id,
user_msg_id=user_msg_id,
text="nothing is currently running for that message.",
thread_id=msg.thread_id,
)
return
@@ -1158,6 +1217,7 @@ async def _send_with_resume(
user_msg_id=user_msg_id,
text="resume token not ready yet; try replying to the final message.",
notify=False,
thread_id=thread_id,
)
return
await enqueue(
@@ -1178,6 +1238,7 @@ async def _send_runner_unavailable(
resume_token: ResumeToken | None,
runner: Runner,
reason: str,
thread_id: int | None = None,
) -> None:
tracker = ProgressTracker(engine=runner.engine)
tracker.set_resume(resume_token)
@@ -1192,7 +1253,7 @@ async def _send_runner_unavailable(
await exec_cfg.transport.send(
channel_id=chat_id,
message=message,
options=SendOptions(reply_to=reply_to, notify=True),
options=SendOptions(reply_to=reply_to, notify=True, thread_id=thread_id),
)
@@ -1210,6 +1271,7 @@ async def _run_engine(
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
engine_override: EngineId | None = None,
thread_id: int | None = None,
) -> None:
try:
try:
@@ -1223,6 +1285,7 @@ async def _run_engine(
chat_id=chat_id,
user_msg_id=user_msg_id,
text=f"error:\n{exc}",
thread_id=thread_id,
)
return
if not entry.available:
@@ -1234,6 +1297,7 @@ async def _run_engine(
resume_token=resume_token,
runner=entry.runner,
reason=reason,
thread_id=thread_id,
)
return
try:
@@ -1244,6 +1308,7 @@ async def _run_engine(
chat_id=chat_id,
user_msg_id=user_msg_id,
text=f"error:\n{exc}",
thread_id=thread_id,
)
return
run_base_token = set_run_base_dir(cwd)
@@ -1266,6 +1331,7 @@ async def _run_engine(
message_id=user_msg_id,
text=text,
reply_to=reply_ref,
thread_id=thread_id,
)
await handle_message(
exec_cfg,
@@ -1342,6 +1408,7 @@ class _TelegramCommandExecutor(CommandExecutor):
scheduler: ThreadScheduler,
chat_id: int,
user_msg_id: int,
thread_id: int | None,
) -> None:
self._exec_cfg = exec_cfg
self._runtime = runtime
@@ -1349,6 +1416,7 @@ class _TelegramCommandExecutor(CommandExecutor):
self._scheduler = scheduler
self._chat_id = chat_id
self._user_msg_id = user_msg_id
self._thread_id = thread_id
self._reply_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id)
def _apply_default_context(self, request: RunRequest) -> RunRequest:
@@ -1379,7 +1447,11 @@ class _TelegramCommandExecutor(CommandExecutor):
return await self._exec_cfg.transport.send(
channel_id=self._chat_id,
message=rendered,
options=SendOptions(reply_to=reply_ref, notify=notify),
options=SendOptions(
reply_to=reply_ref,
notify=notify,
thread_id=self._thread_id,
),
)
async def run_one(
@@ -1409,6 +1481,7 @@ class _TelegramCommandExecutor(CommandExecutor):
reply_ref=self._reply_ref,
on_thread_known=None,
engine_override=engine,
thread_id=self._thread_id,
)
return RunResult(engine=engine, message=capture.last_message)
await _run_engine(
@@ -1423,6 +1496,7 @@ class _TelegramCommandExecutor(CommandExecutor):
reply_ref=self._reply_ref,
on_thread_known=self._scheduler.note_thread_known,
engine_override=engine,
thread_id=self._thread_id,
)
return RunResult(engine=engine, message=None)
@@ -1472,6 +1546,7 @@ async def _dispatch_command(
scheduler=scheduler,
chat_id=chat_id,
user_msg_id=user_msg_id,
thread_id=msg.thread_id,
)
message_ref = MessageRef(channel_id=chat_id, message_id=user_msg_id)
try:
@@ -1539,13 +1614,15 @@ async def run_main_loop(
config_path = cfg.runtime.config_path
if config_path is None:
raise ConfigError(
"Topics enabled but config path is not set; cannot locate state file."
"topics enabled but config path is not set; cannot locate state file."
)
topic_store = TopicStateStore(resolve_state_path(config_path))
await _validate_topics_setup(cfg)
resolved_scope, _ = _resolve_topics_scope(cfg)
logger.info(
"topics.enabled",
mode=cfg.topics.mode,
scope=cfg.topics.scope,
resolved_scope=resolved_scope,
state_path=str(resolve_state_path(config_path)),
)
await _set_command_menu(cfg)
@@ -1640,6 +1717,7 @@ async def run_main_loop(
reply_ref=reply_ref,
on_thread_known=wrap_on_thread_known(on_thread_known, topic_key),
engine_override=engine_override,
thread_id=thread_id,
)
async def run_thread_job(job: ThreadJob) -> None:
@@ -1681,9 +1759,7 @@ async def run_main_loop(
)
topic_key = _topic_key(msg, cfg) if topic_store is not None else None
chat_project = (
_topics_chat_project(cfg, chat_id)
if cfg.topics.enabled and cfg.topics.mode == "per_project_chat"
else None
_topics_chat_project(cfg, chat_id) if cfg.topics.enabled else None
)
bound_context = (
await topic_store.get_context(*topic_key)
@@ -1748,6 +1824,7 @@ async def run_main_loop(
chat_id=chat_id,
user_msg_id=user_msg_id,
text=f"error:\n{exc}",
thread_id=msg.thread_id,
)
continue
@@ -1782,8 +1859,10 @@ async def run_main_loop(
user_msg_id=user_msg_id,
text=(
"this topic isn't bound to a project yet.\n"
f"{_usage_ctx_set(cfg)} or {_usage_topic(cfg)}"
f"{_usage_ctx_set(chat_project=chat_project)} or "
f"{_usage_topic(chat_project=chat_project)}"
),
thread_id=msg.thread_id,
)
continue
if resume_token is None and reply_id is not None:
+1 -1
View File
@@ -248,7 +248,7 @@ class TopicStateStore:
self._data = {"version": STATE_VERSION, "threads": {}}
return
try:
payload = json.loads(self._path.read_text())
payload = json.loads(self._path.read_text(encoding="utf-8"))
except Exception as exc:
logger.warning(
"telegram.topic_state.load_failed",
+49 -4
View File
@@ -821,13 +821,13 @@ def test_topic_title_matches_command_syntax() -> None:
assert title == "@main"
def test_topic_title_per_project_chat_includes_project() -> None:
def test_topic_title_projects_scope_includes_project() -> None:
transport = _FakeTransport()
cfg = replace(
_make_cfg(transport),
topics=bridge.TelegramTopicsConfig(
enabled=True,
mode="per_project_chat",
scope="projects",
),
)
@@ -1056,7 +1056,7 @@ async def test_run_main_loop_routes_reply_to_running_resume() -> None:
@pytest.mark.anyio
async def test_run_main_loop_persists_topic_sessions_in_per_project_chat(
async def test_run_main_loop_persists_topic_sessions_in_project_scope(
tmp_path: Path,
) -> None:
project_chat_id = -100
@@ -1099,7 +1099,7 @@ async def test_run_main_loop_persists_topic_sessions_in_per_project_chat(
exec_cfg=exec_cfg,
topics=bridge.TelegramTopicsConfig(
enabled=True,
mode="per_project_chat",
scope="projects",
),
)
@@ -1124,6 +1124,51 @@ async def test_run_main_loop_persists_topic_sessions_in_per_project_chat(
assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
@pytest.mark.anyio
async def test_run_main_loop_replies_in_same_thread() -> None:
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=empty_projects_config(),
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="hello",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
thread_id=77,
)
await run_main_loop(cfg, poller)
reply_calls = [
call
for call in transport.send_calls
if call["options"] is not None and call["options"].reply_to is not None
]
assert reply_calls
assert all(call["options"].thread_id == 77 for call in reply_calls)
@pytest.mark.anyio
async def test_run_main_loop_handles_command_plugins(monkeypatch) -> None:
class _Command: