feat(telegram): add chat session mode (#102)

This commit is contained in:
banteg
2026-01-12 19:05:39 +04:00
committed by GitHub
parent 7825dd73a9
commit 637a9fc3e2
18 changed files with 622 additions and 30 deletions
+1 -1
View File
@@ -115,7 +115,7 @@
- register repos with `takopi init <alias>` and target them via `/project` directives - register repos with `takopi init <alias>` and target them via `/project` directives
- route runs to git worktrees with `@branch` — takopi resolves or creates worktrees automatically - route runs to git worktrees with `@branch` — takopi resolves or creates worktrees automatically
- replies preserve context via `ctx: project @ branch` footers, no need to repeat directives - replies preserve context via `ctx: project @branch` footers, no need to repeat directives
- set `default_project` to skip the `/project` prefix entirely - set `default_project` to skip the `/project` prefix entirely
- per-project `default_engine` and `worktree_base` configuration - per-project `default_engine` and `worktree_base` configuration
+1 -1
View File
@@ -238,7 +238,7 @@ sequenceDiagram
Runner->>CLI: claude "fix the bug" Runner->>CLI: claude "fix the bug"
CLI-->>Runner: StartedEvent(resume=abc123) CLI-->>Runner: StartedEvent(resume=abc123)
Runner-->>Bridge: Stream events Runner-->>Bridge: Stream events
Bridge->>User: Final message with:<br/>claude --resume abc123<br/>ctx: project @ branch Bridge->>User: Final message with:<br/>claude --resume abc123<br/>ctx: project @branch
Note over User,CLI: Resume Conversation Note over User,CLI: Resume Conversation
User->>Bridge: Reply: "now add tests" User->>Bridge: Reply: "now add tests"
+2 -2
View File
@@ -92,7 +92,7 @@ Rules:
When a run has project context, Takopi appends a footer line rendered as inline When a run has project context, Takopi appends a footer line rendered as inline
code (backticked): code (backticked):
- With branch: `` `ctx: <project> @ <branch>` `` - With branch: `` `ctx: <project> @<branch>` ``
- Without branch: `` `ctx: <project>` `` - Without branch: `` `ctx: <project>` ``
The `ctx:` line is parsed from replies and takes precedence over new directives. The `ctx:` line is parsed from replies and takes precedence over new directives.
@@ -153,5 +153,5 @@ Start a new thread in a worktree:
Reply to a progress message to continue in the same context: Reply to a progress message to continue in the same context:
``` ```
ctx: z80 @ feat/streaming ctx: z80 @feat/streaming
``` ```
+19
View File
@@ -40,6 +40,25 @@ example, `http://localhost:8000/v1`) and a dummy `OPENAI_API_KEY` if your server
ignores it. If your server requires a specific model name, set ignores it. If your server requires a specific model name, set
`voice_transcription_model` (for example, `whisper-1`). `voice_transcription_model` (for example, `whisper-1`).
## Chat sessions (optional)
Takopi is stateless by default unless you reply to a bot message containing a resume
line. If you want auto-resume without replies, enable chat sessions.
Configuration (under `[transports.telegram]`):
```toml
session_mode = "chat" # or "stateless"
```
Behavior:
- Stores one resume token per chat (per sender in group chats).
- Auto-resumes when no explicit resume token is present.
- Reset with `/new`.
State is stored in `telegram_chat_sessions_state.json` alongside the config file.
## Message overflow ## Message overflow
By default, takopi trims long final responses to ~3500 characters to stay under By default, takopi trims long final responses to ~3500 characters to stay under
+27 -9
View File
@@ -20,13 +20,30 @@ A few terms you'll see throughout:
| Term | Meaning | | Term | Meaning |
|------|---------| |------|---------|
| **Engine** | A coding agent backend (Codex, Claude, opencode, pi) | | **Engine** | A coding agent backend (Codex, Claude, opencode, pi) |
| **Project** | A registered git repository with an alias | | **Project** | A repo/workdir alias used for routing (can set default engine, worktrees, chat ID) |
| **Worktree** | A git feature that lets you check out multiple branches simultaneously in separate directories | | **Worktree** | A git feature that lets you check out multiple branches simultaneously in separate directories |
| **Topic** | A Telegram forum thread bound to a specific project/branch context | | **Topic** | A Telegram forum thread bound to a project/branch; stores per-topic resume tokens |
| **Resume token** | State that allows an engine to continue from where it left off | | **Resume token** | State that allows an engine to continue from where it left off |
--- ---
## How conversations work
Takopi is **stateless by default**. Each message starts a new engine session unless a resume token is present.
To continue a session:
- **Reply** to a bot message. Takopi reads the resume token from the footer and resumes that session.
- **Forum topics** (optional) store resume tokens per topic and auto-resume for new messages in that topic.
Reset with `/new`.
- **Chat sessions** (optional) store one resume token per chat (per sender in groups) so new messages
auto-resume without replying. Enable with `session_mode = "chat"` and reset with `/new`.
State is stored in `telegram_chat_sessions_state.json`.
Reply-to-continue always works, even if chat sessions or topics are enabled.
---
## 1. Installation and setup ## 1. Installation and setup
Install takopi with: Install takopi with:
@@ -85,7 +102,7 @@ Takopi streams progress in the chat and sends a final response when the agent fi
### Basic controls ### Basic controls
- **Reply** to a bot message with more instructions to continue the conversation - **Reply** to a bot message to continue the session (takopi reads the resume token in the footer)
- **Cancel** a run by clicking the cancel button or replying to the progress message with `/cancel` - **Cancel** a run by clicking the cancel button or replying to the progress message with `/cancel`
--- ---
@@ -336,10 +353,10 @@ voice_transcription_model = "gpt-4o-mini-transcribe" # optional
Set `OPENAI_API_KEY` in your environment (uses OpenAI's transcription API with the Set `OPENAI_API_KEY` in your environment (uses OpenAI's transcription API with the
`gpt-4o-mini-transcribe` model by default). To use a local OpenAI-compatible `gpt-4o-mini-transcribe` model by default). To use a local OpenAI-compatible
Whisper server, also set `OPENAI_BASE_URL` (for example, Whisper server, also set `OPENAI_BASE_URL` (for example, `http://localhost:8000/v1`)
`http://localhost:8000/v1`) and a dummy `OPENAI_API_KEY` if your server ignores it. and a dummy `OPENAI_API_KEY` if your server ignores it. If your server requires a
If your server requires a specific model name, set `voice_transcription_model` specific model name, set `voice_transcription_model` accordingly (for example,
accordingly (for example, `whisper-1`). `whisper-1`).
When you send a voice note, takopi transcribes it and runs the result as a normal text message. If transcription fails, you'll get an error message and the run is skipped. When you send a voice note, takopi transcribes it and runs the result as a normal text message. If transcription fails, you'll get an error message and the run is skipped.
@@ -424,7 +441,8 @@ watch_config = true # hot-reload on config changes (except transport)
bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz"
chat_id = 123456789 chat_id = 123456789
voice_transcription = true voice_transcription = true
# voice_transcription_model = "gpt-4o-mini-transcribe" voice_transcription_model = "gpt-4o-mini-transcribe"
session_mode = "stateless" # or "chat" for auto-resume per chat
[transports.telegram.files] [transports.telegram.files]
enabled = true enabled = true
@@ -477,7 +495,7 @@ worktree_base = "develop"
| `/ctx` | Show current context | | `/ctx` | Show current context |
| `/ctx set <project> @branch` | Update context binding | | `/ctx set <project> @branch` | Update context binding |
| `/ctx clear` | Remove context binding | | `/ctx clear` | Remove context binding |
| `/new` | Clear resume tokens | | `/new` | Clear stored resume tokens (topic or chat session) |
### CLI commands ### CLI commands
+1 -1
View File
@@ -142,5 +142,5 @@ def format_context_line(
project_cfg = projects.projects.get(context.project) project_cfg = projects.projects.get(context.project)
alias = project_cfg.alias if project_cfg is not None else context.project alias = project_cfg.alias if project_cfg is not None else context.project
if context.branch: if context.branch:
return f"`ctx: {alias} @ {context.branch}`" return f"`ctx: {alias} @{context.branch}`"
return f"`ctx: {alias}`" return f"`ctx: {alias}`"
+3
View File
@@ -19,6 +19,7 @@ class ThreadJob:
resume_token: ResumeToken resume_token: ResumeToken
context: RunContext | None = None context: RunContext | None = None
thread_id: ThreadId | None = None thread_id: ThreadId | None = None
session_key: tuple[int, int | None] | None = None
RunJob = Callable[[ThreadJob], Awaitable[None]] RunJob = Callable[[ThreadJob], Awaitable[None]]
@@ -72,6 +73,7 @@ class ThreadScheduler:
resume_token: ResumeToken, resume_token: ResumeToken,
context: RunContext | None = None, context: RunContext | None = None,
thread_id: ThreadId | None = None, thread_id: ThreadId | None = None,
session_key: tuple[int, int | None] | None = None,
) -> None: ) -> None:
await self.enqueue( await self.enqueue(
ThreadJob( ThreadJob(
@@ -81,6 +83,7 @@ class ThreadScheduler:
resume_token=resume_token, resume_token=resume_token,
context=context, context=context,
thread_id=thread_id, thread_id=thread_id,
session_key=session_key,
) )
) )
+1
View File
@@ -102,6 +102,7 @@ class TelegramTransportSettings(BaseModel):
voice_transcription: bool = False voice_transcription: bool = False
voice_max_bytes: StrictInt = 10 * 1024 * 1024 voice_max_bytes: StrictInt = 10 * 1024 * 1024
voice_transcription_model: NonEmptyStr = "gpt-4o-mini-transcribe" voice_transcription_model: NonEmptyStr = "gpt-4o-mini-transcribe"
session_mode: Literal["stateless", "chat"] = "stateless"
topics: TelegramTopicsSettings = Field(default_factory=TelegramTopicsSettings) topics: TelegramTopicsSettings = Field(default_factory=TelegramTopicsSettings)
files: TelegramFilesSettings = Field(default_factory=TelegramFilesSettings) files: TelegramFilesSettings = Field(default_factory=TelegramFilesSettings)
+1
View File
@@ -113,6 +113,7 @@ class TelegramBackend(TransportBackend):
chat_id=chat_id, chat_id=chat_id,
startup_msg=startup_msg, startup_msg=startup_msg,
exec_cfg=exec_cfg, exec_cfg=exec_cfg,
session_mode=settings.session_mode,
voice_transcription=settings.voice_transcription, voice_transcription=settings.voice_transcription,
voice_max_bytes=int(settings.voice_max_bytes), voice_max_bytes=int(settings.voice_max_bytes),
voice_transcription_model=settings.voice_transcription_model, voice_transcription_model=settings.voice_transcription_model,
+14 -2
View File
@@ -2,7 +2,7 @@ from __future__ import annotations
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import cast from typing import Literal, cast
from ..logging import get_logger from ..logging import get_logger
from ..markdown import MarkdownFormatter, MarkdownParts from ..markdown import MarkdownFormatter, MarkdownParts
@@ -118,6 +118,7 @@ class TelegramBridgeConfig:
chat_id: int chat_id: int
startup_msg: str startup_msg: str
exec_cfg: ExecBridgeConfig exec_cfg: ExecBridgeConfig
session_mode: Literal["stateless", "chat"] = "stateless"
voice_transcription: bool = False voice_transcription: bool = False
voice_max_bytes: int = 10 * 1024 * 1024 voice_max_bytes: int = 10 * 1024 * 1024
voice_transcription_model: str = "gpt-4o-mini-transcribe" voice_transcription_model: str = "gpt-4o-mini-transcribe"
@@ -343,12 +344,22 @@ async def handle_callback_cancel(
async def send_with_resume( async def send_with_resume(
cfg: TelegramBridgeConfig, cfg: TelegramBridgeConfig,
enqueue: Callable[ enqueue: Callable[
[int, int, str, ResumeToken, RunContext | None, int | None], Awaitable[None] [
int,
int,
str,
ResumeToken,
RunContext | None,
int | None,
tuple[int, int | None] | None,
],
Awaitable[None],
], ],
running_task: RunningTask, running_task: RunningTask,
chat_id: int, chat_id: int,
user_msg_id: int, user_msg_id: int,
thread_id: int | None, thread_id: int | None,
session_key: tuple[int, int | None] | None,
text: str, text: str,
) -> None: ) -> None:
from .loop import send_with_resume as _send_with_resume from .loop import send_with_resume as _send_with_resume
@@ -360,6 +371,7 @@ async def send_with_resume(
chat_id, chat_id,
user_msg_id, user_msg_id,
thread_id, thread_id,
session_key,
text, text,
) )
+142
View File
@@ -0,0 +1,142 @@
from __future__ import annotations
import json
import os
from pathlib import Path
import anyio
import msgspec
from ..logging import get_logger
from ..model import ResumeToken
logger = get_logger(__name__)
STATE_VERSION = 1
STATE_FILENAME = "telegram_chat_sessions_state.json"
class _SessionState(msgspec.Struct, forbid_unknown_fields=False):
resume: str
class _ChatState(msgspec.Struct, forbid_unknown_fields=False):
sessions: dict[str, _SessionState] = msgspec.field(default_factory=dict)
class _ChatSessionsState(msgspec.Struct, forbid_unknown_fields=False):
version: int
chats: dict[str, _ChatState] = msgspec.field(default_factory=dict)
def resolve_sessions_path(config_path: Path) -> Path:
return config_path.with_name(STATE_FILENAME)
def _chat_key(chat_id: int, owner_id: int | None) -> str:
owner = "chat" if owner_id is None else str(owner_id)
return f"{chat_id}:{owner}"
class ChatSessionStore:
def __init__(self, path: Path) -> None:
self._path = path
self._lock = anyio.Lock()
self._loaded = False
self._mtime_ns: int | None = None
self._state = _ChatSessionsState(version=STATE_VERSION, chats={})
async def get_session_resume(
self, chat_id: int, owner_id: int | None, engine: str
) -> ResumeToken | None:
async with self._lock:
self._reload_locked_if_needed()
chat = self._get_chat_locked(chat_id, owner_id)
if chat is None:
return None
entry = chat.sessions.get(engine)
if entry is None or not entry.resume:
return None
return ResumeToken(engine=engine, value=entry.resume)
async def set_session_resume(
self, chat_id: int, owner_id: int | None, token: ResumeToken
) -> None:
async with self._lock:
self._reload_locked_if_needed()
chat = self._ensure_chat_locked(chat_id, owner_id)
chat.sessions[token.engine] = _SessionState(resume=token.value)
self._save_locked()
async def clear_sessions(self, chat_id: int, owner_id: int | None) -> None:
async with self._lock:
self._reload_locked_if_needed()
chat = self._get_chat_locked(chat_id, owner_id)
if chat is None:
return
chat.sessions = {}
self._save_locked()
def _stat_mtime_ns(self) -> int | None:
try:
return self._path.stat().st_mtime_ns
except FileNotFoundError:
return None
def _reload_locked_if_needed(self) -> None:
current = self._stat_mtime_ns()
if self._loaded and current == self._mtime_ns:
return
self._load_locked()
def _load_locked(self) -> None:
self._loaded = True
self._mtime_ns = self._stat_mtime_ns()
if self._mtime_ns is None:
self._state = _ChatSessionsState(version=STATE_VERSION, chats={})
return
try:
payload = msgspec.json.decode(
self._path.read_bytes(), type=_ChatSessionsState
)
except Exception as exc:
logger.warning(
"telegram.chat_sessions.load_failed",
path=str(self._path),
error=str(exc),
error_type=exc.__class__.__name__,
)
self._state = _ChatSessionsState(version=STATE_VERSION, chats={})
return
if payload.version != STATE_VERSION:
logger.warning(
"telegram.chat_sessions.version_mismatch",
path=str(self._path),
version=payload.version,
expected=STATE_VERSION,
)
self._state = _ChatSessionsState(version=STATE_VERSION, chats={})
return
self._state = payload
def _save_locked(self) -> None:
self._path.parent.mkdir(parents=True, exist_ok=True)
payload = msgspec.to_builtins(self._state)
tmp_path = self._path.with_suffix(f"{self._path.suffix}.tmp")
with open(tmp_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2, sort_keys=True)
handle.write("\n")
os.replace(tmp_path, self._path)
self._mtime_ns = self._stat_mtime_ns()
def _get_chat_locked(self, chat_id: int, owner_id: int | None) -> _ChatState | None:
return self._state.chats.get(_chat_key(chat_id, owner_id))
def _ensure_chat_locked(self, chat_id: int, owner_id: int | None) -> _ChatState:
key = _chat_key(chat_id, owner_id)
entry = self._state.chats.get(key)
if entry is not None:
return entry
entry = _ChatState()
self._state.chats[key] = entry
return entry
+26
View File
@@ -37,6 +37,7 @@ from ..transport import MessageRef, RenderedMessage, SendOptions
from ..transport_runtime import ResolvedMessage, TransportRuntime from ..transport_runtime import ResolvedMessage, TransportRuntime
from ..utils.paths import reset_run_base_dir, set_run_base_dir from ..utils.paths import reset_run_base_dir, set_run_base_dir
from .bridge import send_plain from .bridge import send_plain
from .chat_sessions import ChatSessionStore
from .context import ( from .context import (
_format_context, _format_context,
_format_ctx_status, _format_ctx_status,
@@ -77,6 +78,7 @@ __all__ = [
"FILE_GET_USAGE", "FILE_GET_USAGE",
"FILE_PUT_USAGE", "FILE_PUT_USAGE",
"_dispatch_command", "_dispatch_command",
"_handle_chat_new_command",
"_handle_file_command", "_handle_file_command",
"_handle_file_get", "_handle_file_get",
"_handle_file_put", "_handle_file_put",
@@ -1044,6 +1046,30 @@ async def _handle_new_command(
await reply(text="cleared stored sessions for this topic.") await reply(text="cleared stored sessions for this topic.")
async def _handle_chat_new_command(
cfg,
msg: TelegramIncomingMessage,
store: ChatSessionStore,
session_key: tuple[int, int | None] | None,
) -> None:
reply = partial(
send_plain,
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
thread_id=msg.thread_id,
)
if session_key is None:
await reply(text="no stored sessions to clear for this chat.")
return
await store.clear_sessions(session_key[0], session_key[1])
if msg.chat_type == "private":
text = "cleared stored sessions for this chat."
else:
text = "cleared stored sessions for you in this chat."
await reply(text=text)
async def _handle_topic_command( async def _handle_topic_command(
cfg, cfg,
msg: TelegramIncomingMessage, msg: TelegramIncomingMessage,
+98 -4
View File
@@ -22,6 +22,7 @@ from .bridge import CANCEL_CALLBACK_DATA, TelegramBridgeConfig, send_plain
from .commands import ( from .commands import (
FILE_PUT_USAGE, FILE_PUT_USAGE,
_dispatch_command, _dispatch_command,
_handle_chat_new_command,
_handle_ctx_command, _handle_ctx_command,
_handle_file_command, _handle_file_command,
_handle_file_put_default, _handle_file_put_default,
@@ -47,6 +48,7 @@ from .topics import (
_validate_topics_setup, _validate_topics_setup,
) )
from .client import poll_incoming from .client import poll_incoming
from .chat_sessions import ChatSessionStore, resolve_sessions_path
from .topic_state import TopicStateStore, resolve_state_path from .topic_state import TopicStateStore, resolve_state_path
from .types import ( from .types import (
TelegramCallbackQuery, TelegramCallbackQuery,
@@ -62,6 +64,18 @@ __all__ = ["poll_updates", "run_main_loop", "send_with_resume"]
_MEDIA_GROUP_DEBOUNCE_S = 1.0 _MEDIA_GROUP_DEBOUNCE_S = 1.0
def _chat_session_key(
msg: TelegramIncomingMessage, *, store: ChatSessionStore | None
) -> tuple[int, int | None] | None:
if store is None or msg.thread_id is not None:
return None
if msg.chat_type == "private":
return (msg.chat_id, None)
if msg.sender_id is None:
return None
return (msg.chat_id, msg.sender_id)
def _allowed_chat_ids(cfg: TelegramBridgeConfig) -> set[int]: def _allowed_chat_ids(cfg: TelegramBridgeConfig) -> set[int]:
allowed = set(cfg.chat_ids or ()) allowed = set(cfg.chat_ids or ())
allowed.add(cfg.chat_id) allowed.add(cfg.chat_id)
@@ -228,12 +242,22 @@ async def _wait_for_resume(running_task) -> ResumeToken | None:
async def send_with_resume( async def send_with_resume(
cfg: TelegramBridgeConfig, cfg: TelegramBridgeConfig,
enqueue: Callable[ enqueue: Callable[
[int, int, str, ResumeToken, RunContext | None, int | None], Awaitable[None] [
int,
int,
str,
ResumeToken,
RunContext | None,
int | None,
tuple[int, int | None] | None,
],
Awaitable[None],
], ],
running_task, running_task,
chat_id: int, chat_id: int,
user_msg_id: int, user_msg_id: int,
thread_id: int | None, thread_id: int | None,
session_key: tuple[int, int | None] | None,
text: str, text: str,
) -> None: ) -> None:
reply = partial( reply = partial(
@@ -257,6 +281,7 @@ async def send_with_resume(
resume, resume,
running_task.context, running_task.context,
thread_id, thread_id,
session_key,
) )
@@ -283,6 +308,7 @@ async def run_main_loop(
transport_config.model_dump() if transport_config is not None else None transport_config.model_dump() if transport_config is not None else None
) )
topic_store: TopicStateStore | None = None topic_store: TopicStateStore | None = None
chat_session_store: ChatSessionStore | None = None
media_groups: dict[tuple[int, str], _MediaGroupState] = {} media_groups: dict[tuple[int, str], _MediaGroupState] = {}
resolved_topics_scope: str | None = None resolved_topics_scope: str | None = None
topics_chat_ids: frozenset[int] = frozenset() topics_chat_ids: frozenset[int] = frozenset()
@@ -304,8 +330,18 @@ async def run_main_loop(
reserved_commands = _reserved_commands(cfg.runtime) reserved_commands = _reserved_commands(cfg.runtime)
try: try:
config_path = cfg.runtime.config_path
if cfg.session_mode == "chat":
if config_path is None:
raise ConfigError(
"session_mode=chat but config path is not set; cannot locate state file."
)
chat_session_store = ChatSessionStore(resolve_sessions_path(config_path))
logger.info(
"chat_sessions.enabled",
state_path=str(resolve_sessions_path(config_path)),
)
if cfg.topics.enabled: if cfg.topics.enabled:
config_path = cfg.runtime.config_path
if config_path is None: if config_path is None:
raise ConfigError( raise ConfigError(
"topics enabled but config path is not set; cannot locate state file." "topics enabled but config path is not set; cannot locate state file."
@@ -367,8 +403,9 @@ async def run_main_loop(
def wrap_on_thread_known( def wrap_on_thread_known(
base_cb: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, base_cb: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None,
topic_key: tuple[int, int] | None, topic_key: tuple[int, int] | None,
chat_session_key: tuple[int, int | None] | None,
) -> Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None: ) -> Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None:
if base_cb is None and topic_key is None: if base_cb is None and topic_key is None and chat_session_key is None:
return None return None
async def _wrapped(token: ResumeToken, done: anyio.Event) -> None: async def _wrapped(token: ResumeToken, done: anyio.Event) -> None:
@@ -378,6 +415,10 @@ async def run_main_loop(
await topic_store.set_session_resume( await topic_store.set_session_resume(
topic_key[0], topic_key[1], token topic_key[0], topic_key[1], token
) )
if chat_session_store is not None and chat_session_key is not None:
await chat_session_store.set_session_resume(
chat_session_key[0], chat_session_key[1], token
)
return _wrapped return _wrapped
@@ -388,6 +429,7 @@ async def run_main_loop(
resume_token: ResumeToken | None, resume_token: ResumeToken | None,
context: RunContext | None, context: RunContext | None,
thread_id: int | None = None, thread_id: int | None = None,
chat_session_key: tuple[int, int | None] | None = None,
reply_ref: MessageRef | None = None, reply_ref: MessageRef | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None, | None = None,
@@ -412,7 +454,9 @@ async def run_main_loop(
resume_token=resume_token, resume_token=resume_token,
context=context, context=context,
reply_ref=reply_ref, reply_ref=reply_ref,
on_thread_known=wrap_on_thread_known(on_thread_known, topic_key), on_thread_known=wrap_on_thread_known(
on_thread_known, topic_key, chat_session_key
),
engine_override=engine_override, engine_override=engine_override,
thread_id=thread_id, thread_id=thread_id,
) )
@@ -425,6 +469,7 @@ async def run_main_loop(
job.resume_token, job.resume_token,
job.context, job.context,
cast(int | None, job.thread_id), cast(int | None, job.thread_id),
job.session_key,
None, None,
scheduler.note_thread_known, scheduler.note_thread_known,
) )
@@ -451,6 +496,7 @@ async def run_main_loop(
None, None,
context, context,
msg.thread_id, msg.thread_id,
None,
reply_ref, reply_ref,
scheduler.note_thread_known, scheduler.note_thread_known,
) )
@@ -539,6 +585,7 @@ async def run_main_loop(
if topic_store is not None if topic_store is not None
else None else None
) )
chat_session_key = _chat_session_key(msg, store=chat_session_store)
chat_project = ( chat_project = (
_topics_chat_project(cfg, chat_id) if cfg.topics.enabled else None _topics_chat_project(cfg, chat_id) if cfg.topics.enabled else None
) )
@@ -571,6 +618,36 @@ async def run_main_loop(
continue continue
command_id, args_text = _parse_slash_command(text) command_id, args_text = _parse_slash_command(text)
if command_id == "new":
if topic_store is not None and topic_key is not None:
tg.start_soon(
_handle_new_command,
cfg,
msg,
topic_store,
resolved_topics_scope,
topics_chat_ids,
)
continue
if chat_session_store is not None:
tg.start_soon(
_handle_chat_new_command,
cfg,
msg,
chat_session_store,
chat_session_key,
)
continue
if topic_store is not None:
tg.start_soon(
_handle_new_command,
cfg,
msg,
topic_store,
resolved_topics_scope,
topics_chat_ids,
)
continue
if command_id is not None and _dispatch_builtin_command( if command_id is not None and _dispatch_builtin_command(
cfg=cfg, cfg=cfg,
msg=msg, msg=msg,
@@ -684,6 +761,7 @@ async def run_main_loop(
chat_id, chat_id,
user_msg_id, user_msg_id,
msg.thread_id, msg.thread_id,
chat_session_key,
text, text,
) )
continue continue
@@ -701,6 +779,20 @@ async def run_main_loop(
) )
if stored is not None: if stored is not None:
resume_token = stored resume_token = stored
if (
resume_token is None
and chat_session_store is not None
and chat_session_key is not None
):
engine_for_session = cfg.runtime.resolve_engine(
engine_override=engine_override,
context=context,
)
stored = await chat_session_store.get_session_resume(
chat_session_key[0], chat_session_key[1], engine_for_session
)
if stored is not None:
resume_token = stored
if resume_token is None: if resume_token is None:
tg.start_soon( tg.start_soon(
@@ -711,6 +803,7 @@ async def run_main_loop(
None, None,
context, context,
msg.thread_id, msg.thread_id,
chat_session_key,
reply_ref, reply_ref,
scheduler.note_thread_known, scheduler.note_thread_known,
engine_override, engine_override,
@@ -723,6 +816,7 @@ async def run_main_loop(
resume_token, resume_token,
context, context,
msg.thread_id, msg.thread_id,
chat_session_key,
) )
finally: finally:
await cfg.exec_cfg.transport.close() await cfg.exec_cfg.transport.close()
+2 -2
View File
@@ -355,13 +355,13 @@ async def test_final_message_includes_ctx_line() -> None:
runner=runner, runner=runner,
incoming=IncomingMessage(channel_id=123, message_id=42, text="do it"), incoming=IncomingMessage(channel_id=123, message_id=42, text="do it"),
resume_token=None, resume_token=None,
context_line="`ctx: takopi @ feat/api`", context_line="`ctx: takopi @feat/api`",
clock=clock, clock=clock,
) )
assert transport.send_calls assert transport.send_calls
final_text = transport.send_calls[-1]["message"].text final_text = transport.send_calls[-1]["message"].text
assert "`ctx: takopi @ feat/api`" in final_text assert "`ctx: takopi @feat/api`" in final_text
assert "codex resume" in final_text.lower() assert "codex resume" in final_text.lower()
+2 -2
View File
@@ -187,12 +187,12 @@ def test_progress_renderer_footer_includes_ctx_before_resume() -> None:
state = tracker.snapshot( state = tracker.snapshot(
resume_formatter=_format_resume, resume_formatter=_format_resume,
context_line="`ctx: z80 @ feat/name`", context_line="`ctx: z80 @feat/name`",
) )
formatter = MarkdownFormatter(max_actions=5) formatter = MarkdownFormatter(max_actions=5)
parts = formatter.render_progress_parts(state, elapsed_s=0.0) parts = formatter.render_progress_parts(state, elapsed_s=0.0)
assert parts.footer == ( assert parts.footer == (
"`ctx: z80 @ feat/name`" "`ctx: z80 @feat/name`"
f"{HARD_BREAK}`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" f"{HARD_BREAK}`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`"
) )
+243 -5
View File
@@ -34,6 +34,7 @@ from takopi.telegram.bridge import (
from takopi.telegram.client import BotClient from takopi.telegram.client import BotClient
from takopi.telegram.render import MAX_BODY_CHARS from takopi.telegram.render import MAX_BODY_CHARS
from takopi.telegram.topic_state import TopicStateStore, resolve_state_path from takopi.telegram.topic_state import TopicStateStore, resolve_state_path
from takopi.telegram.chat_sessions import ChatSessionStore, resolve_sessions_path
from takopi.context import RunContext from takopi.context import RunContext
from takopi.config import ProjectConfig, ProjectsConfig from takopi.config import ProjectConfig, ProjectsConfig
from takopi.runner_bridge import ExecBridgeConfig, RunningTask from takopi.runner_bridge import ExecBridgeConfig, RunningTask
@@ -1040,7 +1041,7 @@ def test_resolve_message_accepts_backticked_ctx_line() -> None:
) )
resolved = runtime.resolve_message( resolved = runtime.resolve_message(
text="do it", text="do it",
reply_text="`ctx: takopi @ feat/api`", reply_text="`ctx: takopi @feat/api`",
) )
assert resolved.prompt == "do it" assert resolved.prompt == "do it"
@@ -1153,7 +1154,17 @@ async def test_maybe_rename_topic_skips_when_title_matches(tmp_path: Path) -> No
async def test_send_with_resume_waits_for_token() -> None: async def test_send_with_resume_waits_for_token() -> None:
transport = _FakeTransport() transport = _FakeTransport()
cfg = _make_cfg(transport) cfg = _make_cfg(transport)
sent: list[tuple[int, int, str, ResumeToken, RunContext | None, int | None]] = [] sent: list[
tuple[
int,
int,
str,
ResumeToken,
RunContext | None,
int | None,
tuple[int, int | None] | None,
]
] = []
async def enqueue( async def enqueue(
chat_id: int, chat_id: int,
@@ -1162,8 +1173,11 @@ async def test_send_with_resume_waits_for_token() -> None:
resume: ResumeToken, resume: ResumeToken,
context: RunContext | None, context: RunContext | None,
thread_id: int | None, thread_id: int | None,
session_key: tuple[int, int | None] | None,
) -> None: ) -> None:
sent.append((chat_id, user_msg_id, text, resume, context, thread_id)) sent.append(
(chat_id, user_msg_id, text, resume, context, thread_id, session_key)
)
running_task = RunningTask() running_task = RunningTask()
@@ -1181,6 +1195,7 @@ async def test_send_with_resume_waits_for_token() -> None:
123, 123,
10, 10,
None, None,
None,
"hello", "hello",
) )
@@ -1192,6 +1207,7 @@ async def test_send_with_resume_waits_for_token() -> None:
ResumeToken(engine=CODEX_ENGINE, value="abc123"), ResumeToken(engine=CODEX_ENGINE, value="abc123"),
None, None,
None, None,
None,
) )
] ]
assert transport.send_calls == [] assert transport.send_calls == []
@@ -1201,7 +1217,17 @@ async def test_send_with_resume_waits_for_token() -> None:
async def test_send_with_resume_reports_when_missing() -> None: async def test_send_with_resume_reports_when_missing() -> None:
transport = _FakeTransport() transport = _FakeTransport()
cfg = _make_cfg(transport) cfg = _make_cfg(transport)
sent: list[tuple[int, int, str, ResumeToken, RunContext | None, int | None]] = [] sent: list[
tuple[
int,
int,
str,
ResumeToken,
RunContext | None,
int | None,
tuple[int, int | None] | None,
]
] = []
async def enqueue( async def enqueue(
chat_id: int, chat_id: int,
@@ -1210,8 +1236,11 @@ async def test_send_with_resume_reports_when_missing() -> None:
resume: ResumeToken, resume: ResumeToken,
context: RunContext | None, context: RunContext | None,
thread_id: int | None, thread_id: int | None,
session_key: tuple[int, int | None] | None,
) -> None: ) -> None:
sent.append((chat_id, user_msg_id, text, resume, context, thread_id)) sent.append(
(chat_id, user_msg_id, text, resume, context, thread_id, session_key)
)
running_task = RunningTask() running_task = RunningTask()
running_task.done.set() running_task.done.set()
@@ -1223,6 +1252,7 @@ async def test_send_with_resume_reports_when_missing() -> None:
123, 123,
10, 10,
None, None,
None,
"hello", "hello",
) )
@@ -1377,6 +1407,214 @@ async def test_run_main_loop_persists_topic_sessions_in_project_scope(
assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value) assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
@pytest.mark.anyio
async def test_run_main_loop_auto_resumes_chat_sessions(tmp_path: Path) -> None:
resume_value = "resume-123"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=_empty_projects(),
config_path=state_path,
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
session_mode="chat",
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="hello",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
chat_type="private",
)
await run_main_loop(cfg, poller)
store = ChatSessionStore(resolve_sessions_path(state_path))
stored = await store.get_session_resume(123, None, CODEX_ENGINE)
assert stored == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
runtime2 = TransportRuntime(
router=_make_router(runner2),
projects=_empty_projects(),
config_path=state_path,
)
cfg2 = TelegramBridgeConfig(
bot=bot,
runtime=runtime2,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
session_mode="chat",
)
async def poller2(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=2,
text="followup",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
chat_type="private",
)
await run_main_loop(cfg2, poller2)
assert runner2.calls[0][1] == ResumeToken(engine=CODEX_ENGINE, value=resume_value)
@pytest.mark.anyio
async def test_run_main_loop_chat_sessions_isolate_group_senders(
tmp_path: Path,
) -> None:
resume_value = "resume-group"
state_path = tmp_path / "takopi.toml"
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner(
[Return(answer="ok")],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=_empty_projects(),
config_path=state_path,
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
session_mode="chat",
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=-100,
message_id=1,
text="hello",
reply_to_message_id=None,
reply_to_text=None,
sender_id=111,
chat_type="supergroup",
)
await run_main_loop(cfg, poller)
runner2 = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
runtime2 = TransportRuntime(
router=_make_router(runner2),
projects=_empty_projects(),
config_path=state_path,
)
cfg2 = TelegramBridgeConfig(
bot=bot,
runtime=runtime2,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
session_mode="chat",
)
async def poller2(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=-100,
message_id=2,
text="followup",
reply_to_message_id=None,
reply_to_text=None,
sender_id=222,
chat_type="supergroup",
)
await run_main_loop(cfg2, poller2)
assert runner2.calls[0][1] is None
@pytest.mark.anyio
async def test_run_main_loop_new_clears_chat_sessions(tmp_path: Path) -> None:
state_path = tmp_path / "takopi.toml"
store = ChatSessionStore(resolve_sessions_path(state_path))
await store.set_session_resume(
123, None, ResumeToken(engine=CODEX_ENGINE, value="resume-1")
)
transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
runtime = TransportRuntime(
router=_make_router(runner),
projects=_empty_projects(),
config_path=state_path,
)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
session_mode="chat",
)
async def poller(_cfg: TelegramBridgeConfig):
yield TelegramIncomingMessage(
transport="telegram",
chat_id=123,
message_id=1,
text="/new",
reply_to_message_id=None,
reply_to_text=None,
sender_id=123,
chat_type="private",
)
await run_main_loop(cfg, poller)
store2 = ChatSessionStore(resolve_sessions_path(state_path))
assert await store2.get_session_resume(123, None, CODEX_ENGINE) is None
@pytest.mark.anyio @pytest.mark.anyio
async def test_run_main_loop_replies_in_same_thread() -> None: async def test_run_main_loop_replies_in_same_thread() -> None:
transport = _FakeTransport() transport = _FakeTransport()
+38
View File
@@ -0,0 +1,38 @@
import pytest
from takopi.model import ResumeToken
from takopi.telegram.chat_sessions import ChatSessionStore
@pytest.mark.anyio
async def test_chat_sessions_store_roundtrip(tmp_path) -> None:
path = tmp_path / "telegram_chat_sessions_state.json"
store = ChatSessionStore(path)
await store.set_session_resume(1, None, ResumeToken(engine="codex", value="abc123"))
await store.set_session_resume(1, 42, ResumeToken(engine="claude", value="res-1"))
stored_private = await store.get_session_resume(1, None, "codex")
stored_group = await store.get_session_resume(1, 42, "claude")
assert stored_private == ResumeToken(engine="codex", value="abc123")
assert stored_group == ResumeToken(engine="claude", value="res-1")
store2 = ChatSessionStore(path)
stored_private_2 = await store2.get_session_resume(1, None, "codex")
stored_group_2 = await store2.get_session_resume(1, 42, "claude")
assert stored_private_2 == ResumeToken(engine="codex", value="abc123")
assert stored_group_2 == ResumeToken(engine="claude", value="res-1")
@pytest.mark.anyio
async def test_chat_sessions_store_clear(tmp_path) -> None:
path = tmp_path / "telegram_chat_sessions_state.json"
store = ChatSessionStore(path)
await store.set_session_resume(2, None, ResumeToken(engine="codex", value="one"))
await store.set_session_resume(2, 77, ResumeToken(engine="codex", value="two"))
await store.clear_sessions(2, None)
assert await store.get_session_resume(2, None, "codex") is None
assert await store.get_session_resume(2, 77, "codex") == ResumeToken(
engine="codex",
value="two",
)
+1 -1
View File
@@ -93,7 +93,7 @@ def test_resolve_message_reply_ctx_overrides_ambient() -> None:
resolved = runtime.resolve_message( resolved = runtime.resolve_message(
text="hello", text="hello",
reply_text="`ctx: proj @ reply`", reply_text="`ctx: proj @reply`",
ambient_context=ambient, ambient_context=ambient,
) )