From 637a9fc3e2bc20d28d9381aa0c917d68bde47e36 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Mon, 12 Jan 2026 19:05:39 +0400 Subject: [PATCH] feat(telegram): add chat session mode (#102) --- changelog.md | 2 +- docs/architecture.md | 2 +- docs/projects.md | 4 +- docs/transports/telegram.md | 19 ++ docs/user-guide.md | 36 +++- src/takopi/directives.py | 2 +- src/takopi/scheduler.py | 3 + src/takopi/settings.py | 1 + src/takopi/telegram/backend.py | 1 + src/takopi/telegram/bridge.py | 16 +- src/takopi/telegram/chat_sessions.py | 142 +++++++++++++++ src/takopi/telegram/commands.py | 26 +++ src/takopi/telegram/loop.py | 102 ++++++++++- tests/test_exec_bridge.py | 4 +- tests/test_exec_render.py | 4 +- tests/test_telegram_bridge.py | 248 ++++++++++++++++++++++++++- tests/test_telegram_chat_sessions.py | 38 ++++ tests/test_transport_runtime.py | 2 +- 18 files changed, 622 insertions(+), 30 deletions(-) create mode 100644 src/takopi/telegram/chat_sessions.py create mode 100644 tests/test_telegram_chat_sessions.py diff --git a/changelog.md b/changelog.md index 2c193ac..959a559 100644 --- a/changelog.md +++ b/changelog.md @@ -115,7 +115,7 @@ - register repos with `takopi init ` and target them via `/project` directives - route runs to git worktrees with `@branch` — takopi resolves or creates worktrees automatically -- replies preserve context via `ctx: project @ branch` footers, no need to repeat directives +- replies preserve context via `ctx: project @branch` footers, no need to repeat directives - set `default_project` to skip the `/project` prefix entirely - per-project `default_engine` and `worktree_base` configuration diff --git a/docs/architecture.md b/docs/architecture.md index a845f75..60d3b7b 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -238,7 +238,7 @@ sequenceDiagram Runner->>CLI: claude "fix the bug" CLI-->>Runner: StartedEvent(resume=abc123) Runner-->>Bridge: Stream events - Bridge->>User: Final message with:
claude --resume abc123
ctx: project @ branch + Bridge->>User: Final message with:
claude --resume abc123
ctx: project @branch Note over User,CLI: Resume Conversation User->>Bridge: Reply: "now add tests" diff --git a/docs/projects.md b/docs/projects.md index 5a8d521..7ace1c3 100644 --- a/docs/projects.md +++ b/docs/projects.md @@ -92,7 +92,7 @@ Rules: When a run has project context, Takopi appends a footer line rendered as inline code (backticked): -- With branch: `` `ctx: @ ` `` +- With branch: `` `ctx: @` `` - Without branch: `` `ctx: ` `` The `ctx:` line is parsed from replies and takes precedence over new directives. @@ -153,5 +153,5 @@ Start a new thread in a worktree: Reply to a progress message to continue in the same context: ``` -ctx: z80 @ feat/streaming +ctx: z80 @feat/streaming ``` diff --git a/docs/transports/telegram.md b/docs/transports/telegram.md index 3af66b5..cc9b02c 100644 --- a/docs/transports/telegram.md +++ b/docs/transports/telegram.md @@ -40,6 +40,25 @@ example, `http://localhost:8000/v1`) and a dummy `OPENAI_API_KEY` if your server ignores it. If your server requires a specific model name, set `voice_transcription_model` (for example, `whisper-1`). +## Chat sessions (optional) + +Takopi is stateless by default unless you reply to a bot message containing a resume +line. If you want auto-resume without replies, enable chat sessions. + +Configuration (under `[transports.telegram]`): + +```toml +session_mode = "chat" # or "stateless" +``` + +Behavior: + +- Stores one resume token per chat (per sender in group chats). +- Auto-resumes when no explicit resume token is present. +- Reset with `/new`. + +State is stored in `telegram_chat_sessions_state.json` alongside the config file. + ## Message overflow By default, takopi trims long final responses to ~3500 characters to stay under diff --git a/docs/user-guide.md b/docs/user-guide.md index 51733f2..0678126 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -20,13 +20,30 @@ A few terms you'll see throughout: | Term | Meaning | |------|---------| | **Engine** | A coding agent backend (Codex, Claude, opencode, pi) | -| **Project** | A registered git repository with an alias | +| **Project** | A repo/workdir alias used for routing (can set default engine, worktrees, chat ID) | | **Worktree** | A git feature that lets you check out multiple branches simultaneously in separate directories | -| **Topic** | A Telegram forum thread bound to a specific project/branch context | +| **Topic** | A Telegram forum thread bound to a project/branch; stores per-topic resume tokens | | **Resume token** | State that allows an engine to continue from where it left off | --- +## How conversations work + +Takopi is **stateless by default**. Each message starts a new engine session unless a resume token is present. + +To continue a session: + +- **Reply** to a bot message. Takopi reads the resume token from the footer and resumes that session. +- **Forum topics** (optional) store resume tokens per topic and auto-resume for new messages in that topic. + Reset with `/new`. +- **Chat sessions** (optional) store one resume token per chat (per sender in groups) so new messages + auto-resume without replying. Enable with `session_mode = "chat"` and reset with `/new`. + State is stored in `telegram_chat_sessions_state.json`. + +Reply-to-continue always works, even if chat sessions or topics are enabled. + +--- + ## 1. Installation and setup Install takopi with: @@ -85,7 +102,7 @@ Takopi streams progress in the chat and sends a final response when the agent fi ### Basic controls -- **Reply** to a bot message with more instructions to continue the conversation +- **Reply** to a bot message to continue the session (takopi reads the resume token in the footer) - **Cancel** a run by clicking the cancel button or replying to the progress message with `/cancel` --- @@ -336,10 +353,10 @@ voice_transcription_model = "gpt-4o-mini-transcribe" # optional Set `OPENAI_API_KEY` in your environment (uses OpenAI's transcription API with the `gpt-4o-mini-transcribe` model by default). To use a local OpenAI-compatible -Whisper server, also set `OPENAI_BASE_URL` (for example, -`http://localhost:8000/v1`) and a dummy `OPENAI_API_KEY` if your server ignores it. -If your server requires a specific model name, set `voice_transcription_model` -accordingly (for example, `whisper-1`). +Whisper server, also set `OPENAI_BASE_URL` (for example, `http://localhost:8000/v1`) +and a dummy `OPENAI_API_KEY` if your server ignores it. If your server requires a +specific model name, set `voice_transcription_model` accordingly (for example, +`whisper-1`). When you send a voice note, takopi transcribes it and runs the result as a normal text message. If transcription fails, you'll get an error message and the run is skipped. @@ -424,7 +441,8 @@ watch_config = true # hot-reload on config changes (except transport) bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" chat_id = 123456789 voice_transcription = true -# voice_transcription_model = "gpt-4o-mini-transcribe" +voice_transcription_model = "gpt-4o-mini-transcribe" +session_mode = "stateless" # or "chat" for auto-resume per chat [transports.telegram.files] enabled = true @@ -477,7 +495,7 @@ worktree_base = "develop" | `/ctx` | Show current context | | `/ctx set @branch` | Update context binding | | `/ctx clear` | Remove context binding | -| `/new` | Clear resume tokens | +| `/new` | Clear stored resume tokens (topic or chat session) | ### CLI commands diff --git a/src/takopi/directives.py b/src/takopi/directives.py index 15d50e3..91bf2d2 100644 --- a/src/takopi/directives.py +++ b/src/takopi/directives.py @@ -142,5 +142,5 @@ def format_context_line( project_cfg = projects.projects.get(context.project) alias = project_cfg.alias if project_cfg is not None else context.project if context.branch: - return f"`ctx: {alias} @ {context.branch}`" + return f"`ctx: {alias} @{context.branch}`" return f"`ctx: {alias}`" diff --git a/src/takopi/scheduler.py b/src/takopi/scheduler.py index 9a8ea2b..bf45c74 100644 --- a/src/takopi/scheduler.py +++ b/src/takopi/scheduler.py @@ -19,6 +19,7 @@ class ThreadJob: resume_token: ResumeToken context: RunContext | None = None thread_id: ThreadId | None = None + session_key: tuple[int, int | None] | None = None RunJob = Callable[[ThreadJob], Awaitable[None]] @@ -72,6 +73,7 @@ class ThreadScheduler: resume_token: ResumeToken, context: RunContext | None = None, thread_id: ThreadId | None = None, + session_key: tuple[int, int | None] | None = None, ) -> None: await self.enqueue( ThreadJob( @@ -81,6 +83,7 @@ class ThreadScheduler: resume_token=resume_token, context=context, thread_id=thread_id, + session_key=session_key, ) ) diff --git a/src/takopi/settings.py b/src/takopi/settings.py index 75953de..d77a6ad 100644 --- a/src/takopi/settings.py +++ b/src/takopi/settings.py @@ -102,6 +102,7 @@ class TelegramTransportSettings(BaseModel): voice_transcription: bool = False voice_max_bytes: StrictInt = 10 * 1024 * 1024 voice_transcription_model: NonEmptyStr = "gpt-4o-mini-transcribe" + session_mode: Literal["stateless", "chat"] = "stateless" topics: TelegramTopicsSettings = Field(default_factory=TelegramTopicsSettings) files: TelegramFilesSettings = Field(default_factory=TelegramFilesSettings) diff --git a/src/takopi/telegram/backend.py b/src/takopi/telegram/backend.py index 730af1e..e56fb91 100644 --- a/src/takopi/telegram/backend.py +++ b/src/takopi/telegram/backend.py @@ -113,6 +113,7 @@ class TelegramBackend(TransportBackend): chat_id=chat_id, startup_msg=startup_msg, exec_cfg=exec_cfg, + session_mode=settings.session_mode, voice_transcription=settings.voice_transcription, voice_max_bytes=int(settings.voice_max_bytes), voice_transcription_model=settings.voice_transcription_model, diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py index 553b580..738357d 100644 --- a/src/takopi/telegram/bridge.py +++ b/src/takopi/telegram/bridge.py @@ -2,7 +2,7 @@ from __future__ import annotations from collections.abc import Awaitable, Callable from dataclasses import dataclass, field -from typing import cast +from typing import Literal, cast from ..logging import get_logger from ..markdown import MarkdownFormatter, MarkdownParts @@ -118,6 +118,7 @@ class TelegramBridgeConfig: chat_id: int startup_msg: str exec_cfg: ExecBridgeConfig + session_mode: Literal["stateless", "chat"] = "stateless" voice_transcription: bool = False voice_max_bytes: int = 10 * 1024 * 1024 voice_transcription_model: str = "gpt-4o-mini-transcribe" @@ -343,12 +344,22 @@ async def handle_callback_cancel( async def send_with_resume( cfg: TelegramBridgeConfig, enqueue: Callable[ - [int, int, str, ResumeToken, RunContext | None, int | None], Awaitable[None] + [ + int, + int, + str, + ResumeToken, + RunContext | None, + int | None, + tuple[int, int | None] | None, + ], + Awaitable[None], ], running_task: RunningTask, chat_id: int, user_msg_id: int, thread_id: int | None, + session_key: tuple[int, int | None] | None, text: str, ) -> None: from .loop import send_with_resume as _send_with_resume @@ -360,6 +371,7 @@ async def send_with_resume( chat_id, user_msg_id, thread_id, + session_key, text, ) diff --git a/src/takopi/telegram/chat_sessions.py b/src/takopi/telegram/chat_sessions.py new file mode 100644 index 0000000..1e61987 --- /dev/null +++ b/src/takopi/telegram/chat_sessions.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import json +import os +from pathlib import Path + +import anyio +import msgspec + +from ..logging import get_logger +from ..model import ResumeToken + +logger = get_logger(__name__) + +STATE_VERSION = 1 +STATE_FILENAME = "telegram_chat_sessions_state.json" + + +class _SessionState(msgspec.Struct, forbid_unknown_fields=False): + resume: str + + +class _ChatState(msgspec.Struct, forbid_unknown_fields=False): + sessions: dict[str, _SessionState] = msgspec.field(default_factory=dict) + + +class _ChatSessionsState(msgspec.Struct, forbid_unknown_fields=False): + version: int + chats: dict[str, _ChatState] = msgspec.field(default_factory=dict) + + +def resolve_sessions_path(config_path: Path) -> Path: + return config_path.with_name(STATE_FILENAME) + + +def _chat_key(chat_id: int, owner_id: int | None) -> str: + owner = "chat" if owner_id is None else str(owner_id) + return f"{chat_id}:{owner}" + + +class ChatSessionStore: + def __init__(self, path: Path) -> None: + self._path = path + self._lock = anyio.Lock() + self._loaded = False + self._mtime_ns: int | None = None + self._state = _ChatSessionsState(version=STATE_VERSION, chats={}) + + async def get_session_resume( + self, chat_id: int, owner_id: int | None, engine: str + ) -> ResumeToken | None: + async with self._lock: + self._reload_locked_if_needed() + chat = self._get_chat_locked(chat_id, owner_id) + if chat is None: + return None + entry = chat.sessions.get(engine) + if entry is None or not entry.resume: + return None + return ResumeToken(engine=engine, value=entry.resume) + + async def set_session_resume( + self, chat_id: int, owner_id: int | None, token: ResumeToken + ) -> None: + async with self._lock: + self._reload_locked_if_needed() + chat = self._ensure_chat_locked(chat_id, owner_id) + chat.sessions[token.engine] = _SessionState(resume=token.value) + self._save_locked() + + async def clear_sessions(self, chat_id: int, owner_id: int | None) -> None: + async with self._lock: + self._reload_locked_if_needed() + chat = self._get_chat_locked(chat_id, owner_id) + if chat is None: + return + chat.sessions = {} + self._save_locked() + + def _stat_mtime_ns(self) -> int | None: + try: + return self._path.stat().st_mtime_ns + except FileNotFoundError: + return None + + def _reload_locked_if_needed(self) -> None: + current = self._stat_mtime_ns() + if self._loaded and current == self._mtime_ns: + return + self._load_locked() + + def _load_locked(self) -> None: + self._loaded = True + self._mtime_ns = self._stat_mtime_ns() + if self._mtime_ns is None: + self._state = _ChatSessionsState(version=STATE_VERSION, chats={}) + return + try: + payload = msgspec.json.decode( + self._path.read_bytes(), type=_ChatSessionsState + ) + except Exception as exc: + logger.warning( + "telegram.chat_sessions.load_failed", + path=str(self._path), + error=str(exc), + error_type=exc.__class__.__name__, + ) + self._state = _ChatSessionsState(version=STATE_VERSION, chats={}) + return + if payload.version != STATE_VERSION: + logger.warning( + "telegram.chat_sessions.version_mismatch", + path=str(self._path), + version=payload.version, + expected=STATE_VERSION, + ) + self._state = _ChatSessionsState(version=STATE_VERSION, chats={}) + return + self._state = payload + + def _save_locked(self) -> None: + self._path.parent.mkdir(parents=True, exist_ok=True) + payload = msgspec.to_builtins(self._state) + tmp_path = self._path.with_suffix(f"{self._path.suffix}.tmp") + with open(tmp_path, "w", encoding="utf-8") as handle: + json.dump(payload, handle, indent=2, sort_keys=True) + handle.write("\n") + os.replace(tmp_path, self._path) + self._mtime_ns = self._stat_mtime_ns() + + def _get_chat_locked(self, chat_id: int, owner_id: int | None) -> _ChatState | None: + return self._state.chats.get(_chat_key(chat_id, owner_id)) + + def _ensure_chat_locked(self, chat_id: int, owner_id: int | None) -> _ChatState: + key = _chat_key(chat_id, owner_id) + entry = self._state.chats.get(key) + if entry is not None: + return entry + entry = _ChatState() + self._state.chats[key] = entry + return entry diff --git a/src/takopi/telegram/commands.py b/src/takopi/telegram/commands.py index de16f4e..634f900 100644 --- a/src/takopi/telegram/commands.py +++ b/src/takopi/telegram/commands.py @@ -37,6 +37,7 @@ from ..transport import MessageRef, RenderedMessage, SendOptions from ..transport_runtime import ResolvedMessage, TransportRuntime from ..utils.paths import reset_run_base_dir, set_run_base_dir from .bridge import send_plain +from .chat_sessions import ChatSessionStore from .context import ( _format_context, _format_ctx_status, @@ -77,6 +78,7 @@ __all__ = [ "FILE_GET_USAGE", "FILE_PUT_USAGE", "_dispatch_command", + "_handle_chat_new_command", "_handle_file_command", "_handle_file_get", "_handle_file_put", @@ -1044,6 +1046,30 @@ async def _handle_new_command( await reply(text="cleared stored sessions for this topic.") +async def _handle_chat_new_command( + cfg, + msg: TelegramIncomingMessage, + store: ChatSessionStore, + session_key: tuple[int, int | None] | None, +) -> None: + reply = partial( + send_plain, + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + thread_id=msg.thread_id, + ) + if session_key is None: + await reply(text="no stored sessions to clear for this chat.") + return + await store.clear_sessions(session_key[0], session_key[1]) + if msg.chat_type == "private": + text = "cleared stored sessions for this chat." + else: + text = "cleared stored sessions for you in this chat." + await reply(text=text) + + async def _handle_topic_command( cfg, msg: TelegramIncomingMessage, diff --git a/src/takopi/telegram/loop.py b/src/takopi/telegram/loop.py index 1ad3a3e..df700c9 100644 --- a/src/takopi/telegram/loop.py +++ b/src/takopi/telegram/loop.py @@ -22,6 +22,7 @@ from .bridge import CANCEL_CALLBACK_DATA, TelegramBridgeConfig, send_plain from .commands import ( FILE_PUT_USAGE, _dispatch_command, + _handle_chat_new_command, _handle_ctx_command, _handle_file_command, _handle_file_put_default, @@ -47,6 +48,7 @@ from .topics import ( _validate_topics_setup, ) from .client import poll_incoming +from .chat_sessions import ChatSessionStore, resolve_sessions_path from .topic_state import TopicStateStore, resolve_state_path from .types import ( TelegramCallbackQuery, @@ -62,6 +64,18 @@ __all__ = ["poll_updates", "run_main_loop", "send_with_resume"] _MEDIA_GROUP_DEBOUNCE_S = 1.0 +def _chat_session_key( + msg: TelegramIncomingMessage, *, store: ChatSessionStore | None +) -> tuple[int, int | None] | None: + if store is None or msg.thread_id is not None: + return None + if msg.chat_type == "private": + return (msg.chat_id, None) + if msg.sender_id is None: + return None + return (msg.chat_id, msg.sender_id) + + def _allowed_chat_ids(cfg: TelegramBridgeConfig) -> set[int]: allowed = set(cfg.chat_ids or ()) allowed.add(cfg.chat_id) @@ -228,12 +242,22 @@ async def _wait_for_resume(running_task) -> ResumeToken | None: async def send_with_resume( cfg: TelegramBridgeConfig, enqueue: Callable[ - [int, int, str, ResumeToken, RunContext | None, int | None], Awaitable[None] + [ + int, + int, + str, + ResumeToken, + RunContext | None, + int | None, + tuple[int, int | None] | None, + ], + Awaitable[None], ], running_task, chat_id: int, user_msg_id: int, thread_id: int | None, + session_key: tuple[int, int | None] | None, text: str, ) -> None: reply = partial( @@ -257,6 +281,7 @@ async def send_with_resume( resume, running_task.context, thread_id, + session_key, ) @@ -283,6 +308,7 @@ async def run_main_loop( transport_config.model_dump() if transport_config is not None else None ) topic_store: TopicStateStore | None = None + chat_session_store: ChatSessionStore | None = None media_groups: dict[tuple[int, str], _MediaGroupState] = {} resolved_topics_scope: str | None = None topics_chat_ids: frozenset[int] = frozenset() @@ -304,8 +330,18 @@ async def run_main_loop( reserved_commands = _reserved_commands(cfg.runtime) try: + config_path = cfg.runtime.config_path + if cfg.session_mode == "chat": + if config_path is None: + raise ConfigError( + "session_mode=chat but config path is not set; cannot locate state file." + ) + chat_session_store = ChatSessionStore(resolve_sessions_path(config_path)) + logger.info( + "chat_sessions.enabled", + state_path=str(resolve_sessions_path(config_path)), + ) if cfg.topics.enabled: - config_path = cfg.runtime.config_path if config_path is None: raise ConfigError( "topics enabled but config path is not set; cannot locate state file." @@ -367,8 +403,9 @@ async def run_main_loop( def wrap_on_thread_known( base_cb: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, topic_key: tuple[int, int] | None, + chat_session_key: tuple[int, int | None] | None, ) -> Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None: - if base_cb is None and topic_key is None: + if base_cb is None and topic_key is None and chat_session_key is None: return None async def _wrapped(token: ResumeToken, done: anyio.Event) -> None: @@ -378,6 +415,10 @@ async def run_main_loop( await topic_store.set_session_resume( topic_key[0], topic_key[1], token ) + if chat_session_store is not None and chat_session_key is not None: + await chat_session_store.set_session_resume( + chat_session_key[0], chat_session_key[1], token + ) return _wrapped @@ -388,6 +429,7 @@ async def run_main_loop( resume_token: ResumeToken | None, context: RunContext | None, thread_id: int | None = None, + chat_session_key: tuple[int, int | None] | None = None, reply_ref: MessageRef | None = None, on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None = None, @@ -412,7 +454,9 @@ async def run_main_loop( resume_token=resume_token, context=context, reply_ref=reply_ref, - on_thread_known=wrap_on_thread_known(on_thread_known, topic_key), + on_thread_known=wrap_on_thread_known( + on_thread_known, topic_key, chat_session_key + ), engine_override=engine_override, thread_id=thread_id, ) @@ -425,6 +469,7 @@ async def run_main_loop( job.resume_token, job.context, cast(int | None, job.thread_id), + job.session_key, None, scheduler.note_thread_known, ) @@ -451,6 +496,7 @@ async def run_main_loop( None, context, msg.thread_id, + None, reply_ref, scheduler.note_thread_known, ) @@ -539,6 +585,7 @@ async def run_main_loop( if topic_store is not None else None ) + chat_session_key = _chat_session_key(msg, store=chat_session_store) chat_project = ( _topics_chat_project(cfg, chat_id) if cfg.topics.enabled else None ) @@ -571,6 +618,36 @@ async def run_main_loop( continue command_id, args_text = _parse_slash_command(text) + if command_id == "new": + if topic_store is not None and topic_key is not None: + tg.start_soon( + _handle_new_command, + cfg, + msg, + topic_store, + resolved_topics_scope, + topics_chat_ids, + ) + continue + if chat_session_store is not None: + tg.start_soon( + _handle_chat_new_command, + cfg, + msg, + chat_session_store, + chat_session_key, + ) + continue + if topic_store is not None: + tg.start_soon( + _handle_new_command, + cfg, + msg, + topic_store, + resolved_topics_scope, + topics_chat_ids, + ) + continue if command_id is not None and _dispatch_builtin_command( cfg=cfg, msg=msg, @@ -684,6 +761,7 @@ async def run_main_loop( chat_id, user_msg_id, msg.thread_id, + chat_session_key, text, ) continue @@ -701,6 +779,20 @@ async def run_main_loop( ) if stored is not None: resume_token = stored + if ( + resume_token is None + and chat_session_store is not None + and chat_session_key is not None + ): + engine_for_session = cfg.runtime.resolve_engine( + engine_override=engine_override, + context=context, + ) + stored = await chat_session_store.get_session_resume( + chat_session_key[0], chat_session_key[1], engine_for_session + ) + if stored is not None: + resume_token = stored if resume_token is None: tg.start_soon( @@ -711,6 +803,7 @@ async def run_main_loop( None, context, msg.thread_id, + chat_session_key, reply_ref, scheduler.note_thread_known, engine_override, @@ -723,6 +816,7 @@ async def run_main_loop( resume_token, context, msg.thread_id, + chat_session_key, ) finally: await cfg.exec_cfg.transport.close() diff --git a/tests/test_exec_bridge.py b/tests/test_exec_bridge.py index 9e96a32..1a3ed5a 100644 --- a/tests/test_exec_bridge.py +++ b/tests/test_exec_bridge.py @@ -355,13 +355,13 @@ async def test_final_message_includes_ctx_line() -> None: runner=runner, incoming=IncomingMessage(channel_id=123, message_id=42, text="do it"), resume_token=None, - context_line="`ctx: takopi @ feat/api`", + context_line="`ctx: takopi @feat/api`", clock=clock, ) assert transport.send_calls final_text = transport.send_calls[-1]["message"].text - assert "`ctx: takopi @ feat/api`" in final_text + assert "`ctx: takopi @feat/api`" in final_text assert "codex resume" in final_text.lower() diff --git a/tests/test_exec_render.py b/tests/test_exec_render.py index a72cacd..20b09a9 100644 --- a/tests/test_exec_render.py +++ b/tests/test_exec_render.py @@ -187,12 +187,12 @@ def test_progress_renderer_footer_includes_ctx_before_resume() -> None: state = tracker.snapshot( resume_formatter=_format_resume, - context_line="`ctx: z80 @ feat/name`", + context_line="`ctx: z80 @feat/name`", ) formatter = MarkdownFormatter(max_actions=5) parts = formatter.render_progress_parts(state, elapsed_s=0.0) assert parts.footer == ( - "`ctx: z80 @ feat/name`" + "`ctx: z80 @feat/name`" f"{HARD_BREAK}`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" ) diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py index 96a9020..31c15a8 100644 --- a/tests/test_telegram_bridge.py +++ b/tests/test_telegram_bridge.py @@ -34,6 +34,7 @@ from takopi.telegram.bridge import ( from takopi.telegram.client import BotClient from takopi.telegram.render import MAX_BODY_CHARS from takopi.telegram.topic_state import TopicStateStore, resolve_state_path +from takopi.telegram.chat_sessions import ChatSessionStore, resolve_sessions_path from takopi.context import RunContext from takopi.config import ProjectConfig, ProjectsConfig from takopi.runner_bridge import ExecBridgeConfig, RunningTask @@ -1040,7 +1041,7 @@ def test_resolve_message_accepts_backticked_ctx_line() -> None: ) resolved = runtime.resolve_message( text="do it", - reply_text="`ctx: takopi @ feat/api`", + reply_text="`ctx: takopi @feat/api`", ) assert resolved.prompt == "do it" @@ -1153,7 +1154,17 @@ async def test_maybe_rename_topic_skips_when_title_matches(tmp_path: Path) -> No async def test_send_with_resume_waits_for_token() -> None: transport = _FakeTransport() cfg = _make_cfg(transport) - sent: list[tuple[int, int, str, ResumeToken, RunContext | None, int | None]] = [] + sent: list[ + tuple[ + int, + int, + str, + ResumeToken, + RunContext | None, + int | None, + tuple[int, int | None] | None, + ] + ] = [] async def enqueue( chat_id: int, @@ -1162,8 +1173,11 @@ async def test_send_with_resume_waits_for_token() -> None: resume: ResumeToken, context: RunContext | None, thread_id: int | None, + session_key: tuple[int, int | None] | None, ) -> None: - sent.append((chat_id, user_msg_id, text, resume, context, thread_id)) + sent.append( + (chat_id, user_msg_id, text, resume, context, thread_id, session_key) + ) running_task = RunningTask() @@ -1181,6 +1195,7 @@ async def test_send_with_resume_waits_for_token() -> None: 123, 10, None, + None, "hello", ) @@ -1192,6 +1207,7 @@ async def test_send_with_resume_waits_for_token() -> None: ResumeToken(engine=CODEX_ENGINE, value="abc123"), None, None, + None, ) ] assert transport.send_calls == [] @@ -1201,7 +1217,17 @@ async def test_send_with_resume_waits_for_token() -> None: async def test_send_with_resume_reports_when_missing() -> None: transport = _FakeTransport() cfg = _make_cfg(transport) - sent: list[tuple[int, int, str, ResumeToken, RunContext | None, int | None]] = [] + sent: list[ + tuple[ + int, + int, + str, + ResumeToken, + RunContext | None, + int | None, + tuple[int, int | None] | None, + ] + ] = [] async def enqueue( chat_id: int, @@ -1210,8 +1236,11 @@ async def test_send_with_resume_reports_when_missing() -> None: resume: ResumeToken, context: RunContext | None, thread_id: int | None, + session_key: tuple[int, int | None] | None, ) -> None: - sent.append((chat_id, user_msg_id, text, resume, context, thread_id)) + sent.append( + (chat_id, user_msg_id, text, resume, context, thread_id, session_key) + ) running_task = RunningTask() running_task.done.set() @@ -1223,6 +1252,7 @@ async def test_send_with_resume_reports_when_missing() -> None: 123, 10, None, + None, "hello", ) @@ -1377,6 +1407,214 @@ async def test_run_main_loop_persists_topic_sessions_in_project_scope( assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value) +@pytest.mark.anyio +async def test_run_main_loop_auto_resumes_chat_sessions(tmp_path: Path) -> None: + resume_value = "resume-123" + state_path = tmp_path / "takopi.toml" + + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner( + [Return(answer="ok")], + engine=CODEX_ENGINE, + resume_value=resume_value, + ) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime = TransportRuntime( + router=_make_router(runner), + projects=_empty_projects(), + config_path=state_path, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=1, + text="hello", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + chat_type="private", + ) + + await run_main_loop(cfg, poller) + + store = ChatSessionStore(resolve_sessions_path(state_path)) + stored = await store.get_session_resume(123, None, CODEX_ENGINE) + assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value) + + runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + runtime2 = TransportRuntime( + router=_make_router(runner2), + projects=_empty_projects(), + config_path=state_path, + ) + cfg2 = TelegramBridgeConfig( + bot=bot, + runtime=runtime2, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + ) + + async def poller2(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=2, + text="followup", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + chat_type="private", + ) + + await run_main_loop(cfg2, poller2) + + assert runner2.calls[0][1] == ResumeToken(engine=CODEX_ENGINE, value=resume_value) + + +@pytest.mark.anyio +async def test_run_main_loop_chat_sessions_isolate_group_senders( + tmp_path: Path, +) -> None: + resume_value = "resume-group" + state_path = tmp_path / "takopi.toml" + + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner( + [Return(answer="ok")], + engine=CODEX_ENGINE, + resume_value=resume_value, + ) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime = TransportRuntime( + router=_make_router(runner), + projects=_empty_projects(), + config_path=state_path, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=-100, + message_id=1, + text="hello", + reply_to_message_id=None, + reply_to_text=None, + sender_id=111, + chat_type="supergroup", + ) + + await run_main_loop(cfg, poller) + + runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + runtime2 = TransportRuntime( + router=_make_router(runner2), + projects=_empty_projects(), + config_path=state_path, + ) + cfg2 = TelegramBridgeConfig( + bot=bot, + runtime=runtime2, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + ) + + async def poller2(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=-100, + message_id=2, + text="followup", + reply_to_message_id=None, + reply_to_text=None, + sender_id=222, + chat_type="supergroup", + ) + + await run_main_loop(cfg2, poller2) + + assert runner2.calls[0][1] is None + + +@pytest.mark.anyio +async def test_run_main_loop_new_clears_chat_sessions(tmp_path: Path) -> None: + state_path = tmp_path / "takopi.toml" + store = ChatSessionStore(resolve_sessions_path(state_path)) + await store.set_session_resume( + 123, None, ResumeToken(engine=CODEX_ENGINE, value="resume-1") + ) + + transport = _FakeTransport() + bot = _FakeBot() + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + runtime = TransportRuntime( + router=_make_router(runner), + projects=_empty_projects(), + config_path=state_path, + ) + cfg = TelegramBridgeConfig( + bot=bot, + runtime=runtime, + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + session_mode="chat", + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield TelegramIncomingMessage( + transport="telegram", + chat_id=123, + message_id=1, + text="/new", + reply_to_message_id=None, + reply_to_text=None, + sender_id=123, + chat_type="private", + ) + + await run_main_loop(cfg, poller) + + store2 = ChatSessionStore(resolve_sessions_path(state_path)) + assert await store2.get_session_resume(123, None, CODEX_ENGINE) is None + + @pytest.mark.anyio async def test_run_main_loop_replies_in_same_thread() -> None: transport = _FakeTransport() diff --git a/tests/test_telegram_chat_sessions.py b/tests/test_telegram_chat_sessions.py new file mode 100644 index 0000000..5385222 --- /dev/null +++ b/tests/test_telegram_chat_sessions.py @@ -0,0 +1,38 @@ +import pytest + +from takopi.model import ResumeToken +from takopi.telegram.chat_sessions import ChatSessionStore + + +@pytest.mark.anyio +async def test_chat_sessions_store_roundtrip(tmp_path) -> None: + path = tmp_path / "telegram_chat_sessions_state.json" + store = ChatSessionStore(path) + await store.set_session_resume(1, None, ResumeToken(engine="codex", value="abc123")) + await store.set_session_resume(1, 42, ResumeToken(engine="claude", value="res-1")) + + stored_private = await store.get_session_resume(1, None, "codex") + stored_group = await store.get_session_resume(1, 42, "claude") + assert stored_private == ResumeToken(engine="codex", value="abc123") + assert stored_group == ResumeToken(engine="claude", value="res-1") + + store2 = ChatSessionStore(path) + stored_private_2 = await store2.get_session_resume(1, None, "codex") + stored_group_2 = await store2.get_session_resume(1, 42, "claude") + assert stored_private_2 == ResumeToken(engine="codex", value="abc123") + assert stored_group_2 == ResumeToken(engine="claude", value="res-1") + + +@pytest.mark.anyio +async def test_chat_sessions_store_clear(tmp_path) -> None: + path = tmp_path / "telegram_chat_sessions_state.json" + store = ChatSessionStore(path) + await store.set_session_resume(2, None, ResumeToken(engine="codex", value="one")) + await store.set_session_resume(2, 77, ResumeToken(engine="codex", value="two")) + + await store.clear_sessions(2, None) + assert await store.get_session_resume(2, None, "codex") is None + assert await store.get_session_resume(2, 77, "codex") == ResumeToken( + engine="codex", + value="two", + ) diff --git a/tests/test_transport_runtime.py b/tests/test_transport_runtime.py index 0bad688..03c343d 100644 --- a/tests/test_transport_runtime.py +++ b/tests/test_transport_runtime.py @@ -93,7 +93,7 @@ def test_resolve_message_reply_ctx_overrides_ambient() -> None: resolved = runtime.resolve_message( text="hello", - reply_text="`ctx: proj @ reply`", + reply_text="`ctx: proj @reply`", ambient_context=ambient, )