From 85ecbc2b9607583dceb5031c13b49a89af5beef0 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Mon, 12 Jan 2026 23:12:24 +0400 Subject: [PATCH] fix(telegram): track sessions for plugin runs (#107) --- src/takopi/telegram/commands.py | 13 +++- src/takopi/telegram/loop.py | 5 ++ tests/test_telegram_bridge.py | 113 ++++++++++++++++++++++++++++++++ 3 files changed, 129 insertions(+), 2 deletions(-) diff --git a/src/takopi/telegram/commands.py b/src/takopi/telegram/commands.py index 602cbd3..4d5c271 100644 --- a/src/takopi/telegram/commands.py +++ b/src/takopi/telegram/commands.py @@ -1433,6 +1433,7 @@ class _TelegramCommandExecutor(CommandExecutor): runtime: TransportRuntime, running_tasks: RunningTasks, scheduler: ThreadScheduler, + on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, chat_id: int, user_msg_id: int, thread_id: int | None, @@ -1443,6 +1444,7 @@ class _TelegramCommandExecutor(CommandExecutor): self._runtime = runtime self._running_tasks = running_tasks self._scheduler = scheduler + self._on_thread_known = on_thread_known self._chat_id = chat_id self._user_msg_id = user_msg_id self._thread_id = thread_id @@ -1502,6 +1504,11 @@ class _TelegramCommandExecutor(CommandExecutor): engine_override=request.engine, context=request.context, ) + on_thread_known = ( + self._scheduler.note_thread_known + if self._on_thread_known is None + else self._on_thread_known + ) if mode == "capture": capture = _CaptureTransport() exec_cfg = ExecBridgeConfig( @@ -1519,7 +1526,7 @@ class _TelegramCommandExecutor(CommandExecutor): resume_token=None, context=request.context, reply_ref=self._reply_ref, - on_thread_known=None, + on_thread_known=on_thread_known, engine_override=engine, thread_id=self._thread_id, show_resume_line=effective_show_resume_line, @@ -1535,7 +1542,7 @@ class _TelegramCommandExecutor(CommandExecutor): resume_token=None, context=request.context, reply_ref=self._reply_ref, - on_thread_known=self._scheduler.note_thread_known, + on_thread_known=on_thread_known, engine_override=engine, thread_id=self._thread_id, show_resume_line=effective_show_resume_line, @@ -1572,6 +1579,7 @@ async def _dispatch_command( args_text: str, running_tasks: RunningTasks, scheduler: ThreadScheduler, + on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, stateful_mode: bool, ) -> None: allowlist = cfg.runtime.allowlist @@ -1591,6 +1599,7 @@ async def _dispatch_command( runtime=cfg.runtime, running_tasks=running_tasks, scheduler=scheduler, + on_thread_known=on_thread_known, chat_id=chat_id, user_msg_id=user_msg_id, thread_id=msg.thread_id, diff --git a/src/takopi/telegram/loop.py b/src/takopi/telegram/loop.py index 2ed780f..cc3b817 100644 --- a/src/takopi/telegram/loop.py +++ b/src/takopi/telegram/loop.py @@ -858,6 +858,11 @@ async def run_main_loop( args_text, running_tasks, scheduler, + wrap_on_thread_known( + scheduler.note_thread_known, + topic_key, + chat_session_key, + ), stateful_mode, ) continue diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py index 0e96758..e2468a1 100644 --- a/tests/test_telegram_bridge.py +++ b/tests/test_telegram_bridge.py @@ -1761,6 +1761,119 @@ async def test_run_main_loop_prompt_upload_auto_resumes_chat_sessions( ) +@pytest.mark.anyio +async def test_run_main_loop_command_updates_chat_session_resume( + tmp_path: Path, + monkeypatch, +) -> None: + class _Command: + id = "run_cmd" + description = "run command" + + async def handle(self, ctx): + await ctx.executor.run_one(commands.RunRequest(prompt="hello")) + return commands.CommandResult(text="done") + + entrypoints = [ + FakeEntryPoint( + "run_cmd", + "takopi.commands.run_cmd:BACKEND", + plugins.COMMAND_GROUP, + loader=_Command, + ) + ] + install_entrypoints(monkeypatch, entrypoints) + + resume_value = "resume-123" + state_path = tmp_path / "takopi.toml" + + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner( + [Return(answer="ok")], + engine=CODEX_ENGINE, + resume_value=resume_value, + ) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime = TransportRuntime( + router=_make_router(runner), + projects=_empty_projects(), + config_path=state_path, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + show_resume_line=False, + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=1, + text="/run_cmd", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + chat_type="private", + ) + + await run_main_loop(cfg, poller) + + store = ChatSessionStore(resolve_sessions_path(state_path)) + stored = await store.get_session_resume(123, None, CODEX_ENGINE) + assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value) + + transport2 = _FakeTransport() + runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + exec_cfg2 = ExecBridgeConfig( + transport=transport2, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime2 = TransportRuntime( + router=_make_router(runner2), + projects=_empty_projects(), + config_path=state_path, + ) + cfg2 = TelegramBridgeConfig( + bot=bot, + runtime=runtime2, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg2, + session_mode="chat", + show_resume_line=False, + ) + + async def poller2(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=2, + text="followup", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + chat_type="private", + ) + + await run_main_loop(cfg2, poller2) + + assert runner2.calls[0][1] == ResumeToken( + engine=CODEX_ENGINE, + value=resume_value, + ) + + @pytest.mark.anyio async def test_run_main_loop_hides_resume_line_when_disabled( tmp_path: Path,