diff --git a/Justfile b/Justfile new file mode 100644 index 0000000..cc7399a --- /dev/null +++ b/Justfile @@ -0,0 +1,5 @@ +check: + uv run ruff format --check + uv run ruff check . + uv run ty check . + uv run pytest diff --git a/Makefile b/Makefile deleted file mode 100644 index 33778f8..0000000 --- a/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -.PHONY: check - -check: - uv run ruff format --check - uv run ruff check . - uv run ty check . - uv run pytest diff --git a/changelog.md b/changelog.md index e4a743c..ad421dc 100644 --- a/changelog.md +++ b/changelog.md @@ -1,5 +1,16 @@ # changelog +## v0.9.0 (unreleased) + +### breaking + +- remove `takopi.bridge`; Telegram bridge now lives in `takopi.bridges.telegram` + +### changes + +- add transport/presenter protocols plus transport-agnostic `exec_bridge` +- move Telegram polling + wiring into `takopi.bridges.telegram` with transport/presenter adapters + ## v0.8.0 (2026-01-05) ### changes diff --git a/docs/developing.md b/docs/developing.md index bd952f6..84a7c14 100644 --- a/docs/developing.md +++ b/docs/developing.md @@ -23,7 +23,7 @@ uv run ruff check src tests uv run ty check . # Or all at once -make check +just check ``` Takopi runs in **auto-router** mode by default. `default_engine` in `takopi.toml` selects @@ -31,47 +31,86 @@ the engine for new threads; engine subcommands override that default for the pro ## Module Responsibilities -### `bridge.py` - Telegram bridge loop +### `runner_bridge.py` - Transport-agnostic orchestration -The orchestrator module containing: +The core handler module containing: | Component | Purpose | |-----------|---------| -| `BridgeConfig` | Frozen dataclass holding runtime config | -| `poll_updates()` | Async generator that drains backlog, long-polls updates, filters messages | -| `run_main_loop()` | TaskGroup-based main loop that spawns per-message handlers | +| `ExecBridgeConfig` | Frozen dataclass holding transport + presenter config | +| `IncomingMessage` | Normalized incoming message shape | | `handle_message()` | Per-message handler with progress updates and final render | | `ProgressEdits` | Throttled progress edit worker | +| `RunningTask` | Cancellation + resume coordination for in-flight runs | + +**Key patterns:** +- Progress edits are best-effort and only run when new events arrive (Telegram outbox handles rate limiting/coalescing) +- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``, `` `claude --resume ` ``, `` `pi --session ` ``) +- Resume lines are stripped from the prompt before invoking the runner +- Errors/cancellation render final status while preserving resume tokens when known + +### `telegram/bridge.py` - Telegram bridge loop + +The Telegram adapter module containing: + +| Component | Purpose | +|-----------|---------| +| `TelegramBridgeConfig` | Frozen dataclass holding bot + router + exec config | +| `TelegramTransport` | `BotClient` → `Transport` adapter | +| `TelegramPresenter` | `ProgressState` → `RenderedMessage` adapter | +| `poll_updates()` | Async generator that drains backlog, long-polls updates, filters messages | +| `run_main_loop()` | TaskGroup-based main loop that spawns per-message handlers | | `_handle_cancel()` | `/cancel` routing | **Key patterns:** - Bridge schedules runs FIFO per thread to avoid concurrent progress messages; runner locks enforce per-thread serialization - `/cancel` routes by reply-to progress message id (accepts extra text) - `/{engine}` on the first line selects the engine for new threads -- Progress edits are throttled to 2s intervals and only run when new events arrive -- Resume tokens are runner-formatted command lines (e.g., `` `codex resume ` ``, `` `claude --resume ` ``, `` `pi --session ` ``) - Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match - Bot command menu is synced on startup (`cancel` + engine commands) +### `transport.py` - Transport protocol + +Defines `Transport`, `MessageRef`, `RenderedMessage`, and `SendOptions`. + +### `presenter.py` - Presenter protocol + +Defines a renderer that converts `ProgressState` into `RenderedMessage` outputs. + ### `cli.py` - CLI entry point | Component | Purpose | |-----------|---------| | `run()` / `main()` | Typer CLI entry points | -| `_parse_bridge_config()` | Reads config + builds `BridgeConfig` | +| `_parse_bridge_config()` | Reads config + builds `TelegramBridgeConfig` + `ExecBridgeConfig` | -### `render.py` - Takopi event + Markdown helpers +### `progress.py` - Progress tracking + +| Function/Class | Purpose | +|----------------|---------| +| `ProgressTracker` | Stateful reducer of takopi events into progress snapshots | +| `ProgressState` | Snapshot of actions, resume token, and engine metadata | + +### `markdown.py` - Markdown formatting + +| Function/Class | Purpose | +|----------------|---------| +| `MarkdownFormatter` | Converts `ProgressState` into MarkdownParts | +| `MarkdownPresenter` | `ProgressState` → `RenderedMessage` (markdown text) | +| `MarkdownParts` | Header/body/footer building blocks for markdown output | +| `assemble_markdown_parts()` | Join MarkdownParts into a single markdown string | +| `render_event_cli()` | Format a takopi event for CLI logs | +| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` | + +### `telegram/render.py` - Telegram markdown rendering | Function/Class | Purpose | |----------------|---------| | `render_markdown()` | Markdown → Telegram text + entities | | `trim_body()` | Trim body to 3500 chars (header/footer preserved) | | `prepare_telegram()` | Trim + render Markdown parts for Telegram | -| `ExecProgressRenderer` | Stateful renderer tracking recent actions for progress display | -| `render_event_cli()` | Format a takopi event for CLI logs | -| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` | -### `telegram.py` - Telegram API wrapper +### `telegram/client.py` - Telegram API wrapper | Component | Purpose | |-----------|---------| @@ -184,7 +223,13 @@ Self-documenting msgspec schemas for decoding engine JSONL streams. |----------|---------| | `install_issue()` | Creates `SetupIssue` with install instructions for missing CLI | -### `config.py` - Configuration loading +### `config.py` - Shared configuration errors + +```python +class ConfigError(RuntimeError): ... +``` + +### `telegram/config.py` - Configuration loading ```python def load_telegram_config() -> tuple[dict, Path]: @@ -210,7 +255,7 @@ Environment flags: CLI flag: `--debug` enables debug logging (overrides `TAKOPI_LOG_LEVEL`). -### `onboarding.py` - Setup validation +### `telegram/onboarding.py` - Setup validation ```python def check_setup(backend: EngineBackend) -> SetupResult: @@ -231,15 +276,15 @@ See `docs/adding-a-runner.md` for the full guide and a worked example. ``` Telegram Update ↓ -poll_updates() drains backlog, long-polls, filters chat_id == cfg.chat_id +telegram/bridge.poll_updates() drains backlog, long-polls, filters chat_id == cfg.chat_id ↓ -run_main_loop() spawns tasks in TaskGroup +telegram/bridge.run_main_loop() spawns tasks in TaskGroup ↓ router.resolve_resume(text, reply_text) → ResumeToken | None ↓ -router.entry_for(resume_token) or router.entry_for_engine(default) → RunnerEntry +router.entry_for(resume_token) or router.entry_for_engine(override/default) → RunnerEntry ↓ -handle_message() spawned as task with selected runner +runner_bridge.handle_message() spawned as task with selected runner ↓ Send initial progress message (silent) ↓ @@ -249,14 +294,14 @@ runner.run(prompt, resume_token) ├── Normalizes JSONL -> takopi events ├── Yields Takopi events (async iterator) │ ↓ - │ ExecProgressRenderer.note_event() + │ ProgressTracker.note_event() │ ↓ - │ ProgressEdits throttled edit_message_text() + │ ProgressEdits best-effort transport.edit(wait=False) └── Ends with completed(resume, ok, answer) ↓ render_final() with resume line (runner-formatted) ↓ -Send/edit final message +transport.send()/edit() final message, delete progress if needed ``` ### Resume Flow diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py deleted file mode 100644 index bd7008c..0000000 --- a/src/takopi/bridge.py +++ /dev/null @@ -1,922 +0,0 @@ -"""Telegram bridge orchestration for running runners and streaming progress.""" - -from __future__ import annotations - -import time -from collections.abc import AsyncIterator, Awaitable, Callable -from dataclasses import dataclass, field -from typing import Any - -import anyio - -from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent -from .logging import bind_run_context, clear_context, get_logger -from .render import ( - ExecProgressRenderer, - MarkdownParts, - assemble_markdown_parts, - prepare_telegram, - render_event_cli, -) -from .router import AutoRouter, RunnerUnavailableError -from .runner import Runner -from .scheduler import ThreadJob, ThreadScheduler -from .telegram import BotClient - - -logger = get_logger(__name__) - - -def _log_runner_event(evt: TakopiEvent) -> None: - for line in render_event_cli(evt): - logger.debug( - "runner.event.cli", - line=line, - event_type=getattr(evt, "type", None), - engine=getattr(evt, "engine", None), - ) - - -def _is_cancel_command(text: str) -> bool: - stripped = text.strip() - if not stripped: - return False - command = stripped.split(maxsplit=1)[0] - return command == "/cancel" or command.startswith("/cancel@") - - -def _strip_engine_command( - text: str, *, engine_ids: tuple[EngineId, ...] -) -> tuple[str, EngineId | None]: - if not text: - return text, None - - if not engine_ids: - return text, None - - engine_map = {engine.lower(): engine for engine in engine_ids} - lines = text.splitlines() - idx = next((i for i, line in enumerate(lines) if line.strip()), None) - if idx is None: - return text, None - - line = lines[idx].lstrip() - if not line.startswith("/"): - return text, None - - parts = line.split(maxsplit=1) - command = parts[0][1:] - if "@" in command: - command = command.split("@", 1)[0] - engine = engine_map.get(command.lower()) - if engine is None: - return text, None - - remainder = parts[1] if len(parts) > 1 else "" - if remainder: - lines[idx] = remainder - else: - lines.pop(idx) - return "\n".join(lines).strip(), engine - - -def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]: - commands: list[dict[str, str]] = [] - seen: set[str] = set() - for entry in router.available_entries: - cmd = entry.engine.lower() - if cmd in seen: - continue - commands.append({"command": cmd, "description": f"start {cmd}"}) - seen.add(cmd) - if "cancel" not in seen: - commands.append({"command": "cancel", "description": "cancel run"}) - return commands - - -async def _set_command_menu(cfg: BridgeConfig) -> None: - commands = _build_bot_commands(cfg.router) - if not commands: - return - try: - ok = await cfg.bot.set_my_commands(commands) - except Exception as exc: - logger.info( - "startup.command_menu.failed", - error=str(exc), - error_type=exc.__class__.__name__, - ) - return - if not ok: - logger.info("startup.command_menu.rejected") - return - logger.info( - "startup.command_menu.updated", - commands=[cmd["command"] for cmd in commands], - ) - - -def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str: - stripped_lines: list[str] = [] - for line in text.splitlines(): - if is_resume_line(line): - continue - stripped_lines.append(line) - prompt = "\n".join(stripped_lines).strip() - return prompt or "continue" - - -def _flatten_exception_group(error: BaseException) -> list[BaseException]: - if isinstance(error, BaseExceptionGroup): - flattened: list[BaseException] = [] - for exc in error.exceptions: - flattened.extend(_flatten_exception_group(exc)) - return flattened - return [error] - - -def _format_error(error: Exception) -> str: - cancel_exc = anyio.get_cancelled_exc_class() - flattened = [ - exc - for exc in _flatten_exception_group(error) - if not isinstance(exc, cancel_exc) - ] - if len(flattened) == 1: - return str(flattened[0]) or flattened[0].__class__.__name__ - if not flattened: - return str(error) or error.__class__.__name__ - messages = [str(exc) for exc in flattened if str(exc)] - if not messages: - return str(error) or error.__class__.__name__ - if len(messages) == 1: - return messages[0] - return "\n".join(messages) - - -async def _send_or_edit_markdown( - bot: BotClient, - *, - chat_id: int, - parts: MarkdownParts, - edit_message_id: int | None = None, - reply_to_message_id: int | None = None, - replace_message_id: int | None = None, - disable_notification: bool = False, - prepared: tuple[str, list[dict[str, Any]]] | None = None, -) -> tuple[dict[str, Any] | None, bool]: - if prepared is None: - rendered, entities = prepare_telegram(parts) - else: - rendered, entities = prepared - if edit_message_id is not None: - logger.debug( - "telegram.edit_message", - chat_id=chat_id, - message_id=edit_message_id, - rendered=rendered, - ) - edited = await bot.edit_message_text( - chat_id=chat_id, - message_id=edit_message_id, - text=rendered, - entities=entities, - ) - if edited is not None: - return (edited, True) - - logger.debug( - "telegram.send_message", - chat_id=chat_id, - reply_to_message_id=reply_to_message_id, - rendered=rendered, - ) - return ( - await bot.send_message( - chat_id=chat_id, - text=rendered, - entities=entities, - reply_to_message_id=reply_to_message_id, - disable_notification=disable_notification, - replace_message_id=replace_message_id, - ), - False, - ) - - -class ProgressEdits: - def __init__( - self, - *, - bot: BotClient, - chat_id: int, - progress_id: int | None, - renderer: ExecProgressRenderer, - started_at: float, - clock: Callable[[], float], - last_rendered: str | None, - ) -> None: - self.bot = bot - self.chat_id = chat_id - self.progress_id = progress_id - self.renderer = renderer - self.started_at = started_at - self.clock = clock - self.last_rendered = last_rendered - self.event_seq = 0 - self.rendered_seq = 0 - self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1) - - async def run(self) -> None: - if self.progress_id is None: - return - while True: - while self.rendered_seq == self.event_seq: - try: - await self.signal_recv.receive() - except anyio.EndOfStream: - return - - seq_at_render = self.event_seq - now = self.clock() - parts = self.renderer.render_progress_parts(now - self.started_at) - rendered, entities = prepare_telegram(parts) - if rendered != self.last_rendered: - logger.debug( - "telegram.edit_message", - chat_id=self.chat_id, - message_id=self.progress_id, - rendered=rendered, - ) - await self.bot.edit_message_text( - chat_id=self.chat_id, - message_id=self.progress_id, - text=rendered, - entities=entities, - wait=False, - ) - self.last_rendered = rendered - - self.rendered_seq = seq_at_render - - async def on_event(self, evt: TakopiEvent) -> None: - if not self.renderer.note_event(evt): - return - if self.progress_id is None: - return - self.event_seq += 1 - try: - self.signal_send.send_nowait(None) - except anyio.WouldBlock: - pass - except (anyio.BrokenResourceError, anyio.ClosedResourceError): - pass - - -@dataclass(frozen=True) -class BridgeConfig: - bot: BotClient - router: AutoRouter - chat_id: int - final_notify: bool - startup_msg: str - - -@dataclass -class RunningTask: - resume: ResumeToken | None = None - resume_ready: anyio.Event = field(default_factory=anyio.Event) - cancel_requested: anyio.Event = field(default_factory=anyio.Event) - done: anyio.Event = field(default_factory=anyio.Event) - - -async def _send_startup(cfg: BridgeConfig) -> None: - logger.debug("startup.message", text=cfg.startup_msg) - sent, _ = await _send_or_edit_markdown( - cfg.bot, - chat_id=cfg.chat_id, - parts=MarkdownParts(header=cfg.startup_msg), - ) - if sent is not None: - logger.info("startup.sent", chat_id=cfg.chat_id) - - -async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None: - drained = 0 - while True: - updates = await cfg.bot.get_updates( - offset=offset, timeout_s=0, allowed_updates=["message"] - ) - if updates is None: - logger.info("startup.backlog.failed") - return offset - logger.debug("startup.backlog.updates", updates=updates) - if not updates: - if drained: - logger.info("startup.backlog.drained", count=drained) - return offset - offset = updates[-1]["update_id"] + 1 - drained += len(updates) - - -@dataclass(frozen=True, slots=True) -class ProgressMessageState: - message_id: int | None - last_rendered: str | None - - -async def send_initial_progress( - cfg: BridgeConfig, - *, - chat_id: int, - user_msg_id: int, - label: str, - renderer: ExecProgressRenderer, - clock: Callable[[], float], -) -> ProgressMessageState: - progress_id: int | None = None - last_rendered: str | None = None - - initial_parts = renderer.render_progress_parts(0.0, label=label) - initial_rendered, initial_entities = prepare_telegram(initial_parts) - logger.debug( - "telegram.send_message", - chat_id=chat_id, - reply_to_message_id=user_msg_id, - rendered=initial_rendered, - ) - progress_msg = await cfg.bot.send_message( - chat_id=chat_id, - text=initial_rendered, - entities=initial_entities, - reply_to_message_id=user_msg_id, - disable_notification=True, - ) - if progress_msg is not None: - progress_id = int(progress_msg["message_id"]) - last_rendered = initial_rendered - logger.debug( - "progress.sent", - chat_id=chat_id, - message_id=progress_id, - ) - - return ProgressMessageState( - message_id=progress_id, - last_rendered=last_rendered, - ) - - -@dataclass(slots=True) -class RunOutcome: - cancelled: bool = False - completed: CompletedEvent | None = None - resume: ResumeToken | None = None - - -async def run_runner_with_cancel( - runner: Runner, - *, - prompt: str, - resume_token: ResumeToken | None, - edits: ProgressEdits, - running_task: RunningTask | None, - on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, -) -> RunOutcome: - outcome = RunOutcome() - async with anyio.create_task_group() as tg: - - async def run_runner() -> None: - try: - async for evt in runner.run(prompt, resume_token): - _log_runner_event(evt) - if isinstance(evt, StartedEvent): - outcome.resume = evt.resume - bind_run_context(resume=evt.resume.value) - if running_task is not None and running_task.resume is None: - running_task.resume = evt.resume - running_task.resume_ready.set() - if on_thread_known is not None: - await on_thread_known(evt.resume, running_task.done) - elif isinstance(evt, CompletedEvent): - outcome.resume = evt.resume or outcome.resume - outcome.completed = evt - await edits.on_event(evt) - finally: - tg.cancel_scope.cancel() - - async def wait_cancel(task: RunningTask) -> None: - await task.cancel_requested.wait() - outcome.cancelled = True - tg.cancel_scope.cancel() - - tg.start_soon(run_runner) - if running_task is not None: - tg.start_soon(wait_cancel, running_task) - - return outcome - - -def sync_resume_token( - renderer: ExecProgressRenderer, resume: ResumeToken | None -) -> ResumeToken | None: - resume = resume or renderer.resume_token - renderer.resume_token = resume - return resume - - -async def send_result_message( - cfg: BridgeConfig, - *, - chat_id: int, - user_msg_id: int, - progress_id: int | None, - parts: MarkdownParts, - disable_notification: bool, - edit_message_id: int | None, - prepared: tuple[str, list[dict[str, Any]]] | None = None, -) -> None: - final_msg, edited = await _send_or_edit_markdown( - cfg.bot, - chat_id=chat_id, - parts=parts, - edit_message_id=edit_message_id, - reply_to_message_id=user_msg_id, - replace_message_id=progress_id, - disable_notification=disable_notification, - prepared=prepared, - ) - if final_msg is None: - return - - -async def handle_message( - cfg: BridgeConfig, - *, - runner: Runner, - chat_id: int, - user_msg_id: int, - text: str, - resume_token: ResumeToken | None, - strip_resume_line: Callable[[str], bool] | None = None, - running_tasks: dict[int, RunningTask] | None = None, - on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] - | None = None, - clock: Callable[[], float] = time.monotonic, -) -> None: - logger.info( - "handle.incoming", - chat_id=chat_id, - user_msg_id=user_msg_id, - resume=resume_token.value if resume_token else None, - text=text, - ) - started_at = clock() - is_resume_line = runner.is_resume_line - resume_strip = strip_resume_line or is_resume_line - runner_text = _strip_resume_lines(text, is_resume_line=resume_strip) - - progress_renderer = ExecProgressRenderer( - max_actions=5, resume_formatter=runner.format_resume, engine=runner.engine - ) - - progress_state = await send_initial_progress( - cfg, - chat_id=chat_id, - user_msg_id=user_msg_id, - label="starting", - renderer=progress_renderer, - clock=clock, - ) - progress_id = progress_state.message_id - - edits = ProgressEdits( - bot=cfg.bot, - chat_id=chat_id, - progress_id=progress_id, - renderer=progress_renderer, - started_at=started_at, - clock=clock, - last_rendered=progress_state.last_rendered, - ) - - running_task: RunningTask | None = None - if running_tasks is not None and progress_id is not None: - running_task = RunningTask() - running_tasks[progress_id] = running_task - - cancel_exc_type = anyio.get_cancelled_exc_class() - edits_scope = anyio.CancelScope() - - async def run_edits() -> None: - try: - with edits_scope: - await edits.run() - except cancel_exc_type: - # Edits are best-effort; cancellation should not bubble into the task group. - return - - outcome = RunOutcome() - error: Exception | None = None - - async with anyio.create_task_group() as tg: - if progress_id is not None: - tg.start_soon(run_edits) - - try: - outcome = await run_runner_with_cancel( - runner, - prompt=runner_text, - resume_token=resume_token, - edits=edits, - running_task=running_task, - on_thread_known=on_thread_known, - ) - except Exception as exc: - error = exc - logger.exception( - "handle.runner_failed", - error=str(exc), - error_type=exc.__class__.__name__, - ) - finally: - if ( - running_task is not None - and running_tasks is not None - and progress_id is not None - ): - running_task.done.set() - running_tasks.pop(progress_id, None) - if not outcome.cancelled and error is None: - # Give pending progress edits a chance to flush if they're ready. - await anyio.sleep(0) - edits_scope.cancel() - - elapsed = clock() - started_at - - if error is not None: - sync_resume_token(progress_renderer, outcome.resume) - err_body = _format_error(error) - final_parts = progress_renderer.render_final_parts( - elapsed, err_body, status="error" - ) - logger.debug( - "handle.error.markdown", - error=err_body, - markdown=assemble_markdown_parts(final_parts), - ) - await send_result_message( - cfg, - chat_id=chat_id, - user_msg_id=user_msg_id, - progress_id=progress_id, - parts=final_parts, - disable_notification=True, - edit_message_id=progress_id, - ) - return - - if outcome.cancelled: - resume = sync_resume_token(progress_renderer, outcome.resume) - logger.info( - "handle.cancelled", - resume=resume.value if resume else None, - elapsed_s=elapsed, - ) - final_parts = progress_renderer.render_progress_parts( - elapsed, label="`cancelled`" - ) - await send_result_message( - cfg, - chat_id=chat_id, - user_msg_id=user_msg_id, - progress_id=progress_id, - parts=final_parts, - disable_notification=True, - edit_message_id=progress_id, - ) - return - - if outcome.completed is None: - raise RuntimeError("runner finished without a completed event") - - completed = outcome.completed - run_ok = completed.ok - run_error = completed.error - - final_answer = completed.answer - if run_ok is False and run_error: - if final_answer.strip(): - final_answer = f"{final_answer}\n\n{run_error}" - else: - final_answer = str(run_error) - - status = ( - "error" if run_ok is False else ("done" if final_answer.strip() else "error") - ) - resume_value = None - resume_token = completed.resume or outcome.resume - if resume_token is not None: - resume_value = resume_token.value - logger.info( - "runner.completed", - ok=run_ok, - error=run_error, - answer_len=len(final_answer or ""), - elapsed_s=round(elapsed, 2), - action_count=progress_renderer.action_count, - resume=resume_value, - ) - sync_resume_token(progress_renderer, completed.resume or outcome.resume) - final_parts = progress_renderer.render_final_parts( - elapsed, final_answer, status=status - ) - logger.debug( - "handle.final.markdown", - markdown=assemble_markdown_parts(final_parts), - status=status, - ) - - final_rendered, final_entities = prepare_telegram(final_parts) - can_edit_final = progress_id is not None - edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id - - await send_result_message( - cfg, - chat_id=chat_id, - user_msg_id=user_msg_id, - progress_id=progress_id, - parts=final_parts, - disable_notification=False, - edit_message_id=edit_message_id, - prepared=(final_rendered, final_entities), - ) - - -async def poll_updates(cfg: BridgeConfig) -> AsyncIterator[dict[str, Any]]: - offset: int | None = None - offset = await _drain_backlog(cfg, offset) - await _send_startup(cfg) - - while True: - updates = await cfg.bot.get_updates( - offset=offset, timeout_s=50, allowed_updates=["message"] - ) - if updates is None: - logger.info("loop.get_updates.failed") - await anyio.sleep(2) - continue - logger.debug("loop.updates", updates=updates) - - for upd in updates: - offset = upd["update_id"] + 1 - msg = upd["message"] - if "text" not in msg: - continue - if msg["chat"]["id"] != cfg.chat_id: - continue - yield msg - - -async def _handle_cancel( - cfg: BridgeConfig, - msg: dict[str, Any], - running_tasks: dict[int, RunningTask], -) -> None: - chat_id = msg["chat"]["id"] - user_msg_id = msg["message_id"] - reply = msg.get("reply_to_message") - - if not reply: - await cfg.bot.send_message( - chat_id=chat_id, - text="reply to the progress message to cancel.", - reply_to_message_id=user_msg_id, - ) - return - - progress_id = reply.get("message_id") - if progress_id is None: - await cfg.bot.send_message( - chat_id=chat_id, - text="nothing is currently running for that message.", - reply_to_message_id=user_msg_id, - ) - return - - running_task = running_tasks.get(int(progress_id)) - if running_task is None: - await cfg.bot.send_message( - chat_id=chat_id, - text="nothing is currently running for that message.", - reply_to_message_id=user_msg_id, - ) - return - - logger.info( - "cancel.requested", - chat_id=chat_id, - progress_message_id=progress_id, - ) - running_task.cancel_requested.set() - - -async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None: - if running_task.resume is not None: - return running_task.resume - resume: ResumeToken | None = None - - async with anyio.create_task_group() as tg: - - async def wait_resume() -> None: - nonlocal resume - await running_task.resume_ready.wait() - resume = running_task.resume - tg.cancel_scope.cancel() - - async def wait_done() -> None: - await running_task.done.wait() - tg.cancel_scope.cancel() - - tg.start_soon(wait_resume) - tg.start_soon(wait_done) - - return resume - - -async def _send_with_resume( - bot: BotClient, - enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None]], - running_task: RunningTask, - chat_id: int, - user_msg_id: int, - text: str, -) -> None: - resume = await _wait_for_resume(running_task) - if resume is None: - await bot.send_message( - chat_id=chat_id, - text="resume token not ready yet; try replying to the final message.", - reply_to_message_id=user_msg_id, - disable_notification=True, - ) - return - await enqueue(chat_id, user_msg_id, text, resume) - - -async def _send_runner_unavailable( - cfg: BridgeConfig, - *, - chat_id: int, - user_msg_id: int, - resume_token: ResumeToken | None, - runner: Runner, - reason: str, -) -> None: - progress_renderer = ExecProgressRenderer( - max_actions=0, resume_formatter=runner.format_resume, engine=runner.engine - ) - if resume_token is not None: - progress_renderer.resume_token = resume_token - final_parts = progress_renderer.render_final_parts( - 0.0, f"error:\n{reason}", status="error" - ) - await _send_or_edit_markdown( - cfg.bot, - chat_id=chat_id, - parts=final_parts, - reply_to_message_id=user_msg_id, - disable_notification=False, - ) - - -async def run_main_loop( - cfg: BridgeConfig, - poller: Callable[[BridgeConfig], AsyncIterator[dict[str, Any]]] = poll_updates, -) -> None: - running_tasks: dict[int, RunningTask] = {} - - try: - await _set_command_menu(cfg) - async with anyio.create_task_group() as tg: - - async def run_job( - chat_id: int, - user_msg_id: int, - text: str, - resume_token: ResumeToken | None, - on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] - | None = None, - engine_override: EngineId | None = None, - ) -> None: - try: - try: - entry = ( - cfg.router.entry_for_engine(engine_override) - if resume_token is None - else cfg.router.entry_for(resume_token) - ) - except RunnerUnavailableError as exc: - await _send_or_edit_markdown( - cfg.bot, - chat_id=chat_id, - parts=MarkdownParts(header=f"error:\n{exc}"), - reply_to_message_id=user_msg_id, - disable_notification=False, - ) - return - if not entry.available: - reason = entry.issue or "engine unavailable" - await _send_runner_unavailable( - cfg, - chat_id=chat_id, - user_msg_id=user_msg_id, - resume_token=resume_token, - runner=entry.runner, - reason=reason, - ) - return - bind_run_context( - chat_id=chat_id, - user_msg_id=user_msg_id, - engine=entry.runner.engine, - resume=resume_token.value if resume_token else None, - ) - await handle_message( - cfg, - runner=entry.runner, - chat_id=chat_id, - user_msg_id=user_msg_id, - text=text, - resume_token=resume_token, - strip_resume_line=cfg.router.is_resume_line, - running_tasks=running_tasks, - on_thread_known=on_thread_known, - ) - except Exception as exc: - logger.exception( - "handle.worker_failed", - error=str(exc), - error_type=exc.__class__.__name__, - ) - finally: - clear_context() - - async def run_thread_job(job: ThreadJob) -> None: - await run_job( - job.chat_id, - job.user_msg_id, - job.text, - job.resume_token, - ) - - scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job) - - async for msg in poller(cfg): - text = msg["text"] - user_msg_id = msg["message_id"] - - if _is_cancel_command(text): - tg.start_soon(_handle_cancel, cfg, msg, running_tasks) - continue - - text, engine_override = _strip_engine_command( - text, engine_ids=cfg.router.engine_ids - ) - - r = msg.get("reply_to_message") or {} - resume_token = cfg.router.resolve_resume(text, r.get("text")) - reply_id = r.get("message_id") - if resume_token is None and reply_id is not None: - running_task = running_tasks.get(int(reply_id)) - if running_task is None: - await anyio.sleep(0) - running_task = running_tasks.get(int(reply_id)) - if running_task is not None: - tg.start_soon( - _send_with_resume, - cfg.bot, - scheduler.enqueue_resume, - running_task, - msg["chat"]["id"], - user_msg_id, - text, - ) - continue - - if resume_token is None: - tg.start_soon( - run_job, - msg["chat"]["id"], - user_msg_id, - text, - None, - scheduler.note_thread_known, - engine_override, - ) - else: - await scheduler.enqueue_resume( - msg["chat"]["id"], user_msg_id, text, resume_token - ) - finally: - await cfg.bot.close() diff --git a/src/takopi/cli.py b/src/takopi/cli.py index b8fe00e..cdff1f7 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -11,14 +11,21 @@ import typer from . import __version__ from .backends import EngineBackend -from .bridge import BridgeConfig, run_main_loop -from .config import ConfigError, load_telegram_config +from .config import ConfigError from .engines import get_backend, get_engine_config, list_backends from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint from .logging import get_logger, setup_logging -from .onboarding import SetupResult, check_setup, interactive_setup +from .telegram.bridge import ( + TelegramBridgeConfig, + TelegramPresenter, + TelegramTransport, + run_main_loop, +) +from .telegram.client import TelegramClient +from .telegram.config import load_telegram_config +from .telegram.onboarding import SetupResult, check_setup, interactive_setup from .router import AutoRouter, RunnerEntry -from .telegram import TelegramClient +from .runner_bridge import ExecBridgeConfig logger = get_logger(__name__) @@ -184,7 +191,7 @@ def _parse_bridge_config( config_path: Path, token: str, chat_id: int, -) -> BridgeConfig: +) -> TelegramBridgeConfig: startup_pwd = os.getcwd() backends = list_backends() @@ -213,13 +220,20 @@ def _parse_bridge_config( ) bot = TelegramClient(token) + transport = TelegramTransport(bot) + presenter = TelegramPresenter() + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=presenter, + final_notify=final_notify, + ) - return BridgeConfig( + return TelegramBridgeConfig( bot=bot, router=router, chat_id=chat_id, - final_notify=final_notify, startup_msg=startup_msg, + exec_cfg=exec_cfg, ) diff --git a/src/takopi/config.py b/src/takopi/config.py index afff341..dc00535 100644 --- a/src/takopi/config.py +++ b/src/takopi/config.py @@ -1,33 +1,5 @@ from __future__ import annotations -import tomllib -from pathlib import Path - -HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml" - class ConfigError(RuntimeError): pass - - -def _read_config(cfg_path: Path) -> dict: - try: - raw = cfg_path.read_text(encoding="utf-8") - except FileNotFoundError: - raise ConfigError(f"Missing config file {cfg_path}.") from None - except OSError as e: - raise ConfigError(f"Failed to read config file {cfg_path}: {e}") from e - try: - return tomllib.loads(raw) - except tomllib.TOMLDecodeError as e: - raise ConfigError(f"Malformed TOML in {cfg_path}: {e}") from None - - -def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]: - if path: - cfg_path = Path(path).expanduser() - return _read_config(cfg_path), cfg_path - cfg_path = HOME_CONFIG_PATH - if cfg_path.exists() and not cfg_path.is_file(): - raise ConfigError(f"Config path {cfg_path} exists but is not a file.") from None - return _read_config(cfg_path), cfg_path diff --git a/src/takopi/render.py b/src/takopi/markdown.py similarity index 57% rename from src/takopi/render.py rename to src/takopi/markdown.py index 8d66fe7..a184e0d 100644 --- a/src/takopi/render.py +++ b/src/takopi/markdown.py @@ -1,19 +1,12 @@ -"""Pure renderers for Takopi events (no engine-native event handling).""" - from __future__ import annotations -import re import textwrap -from collections import deque -from collections.abc import Callable from dataclasses import dataclass from pathlib import Path -from typing import Any -from markdown_it import MarkdownIt -from sulguk import transform_html - -from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent +from .model import Action, ActionEvent, StartedEvent, TakopiEvent +from .progress import ProgressState +from .transport import RenderedMessage from .utils.paths import relativize_path STATUS = {"running": "▸", "update": "↻", "done": "✓", "fail": "✗"} @@ -23,11 +16,8 @@ HARD_BREAK = " \n" MAX_PROGRESS_CMD_LEN = 300 MAX_FILE_CHANGES_INLINE = 3 -_MD_RENDERER = MarkdownIt("commonmark", {"html": False}) -_BULLET_RE = re.compile(r"(?m)^(\s*)•") - -@dataclass(frozen=True) +@dataclass(frozen=True, slots=True) class MarkdownParts: header: str body: str | None = None @@ -40,33 +30,6 @@ def assemble_markdown_parts(parts: MarkdownParts) -> str: ) -def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]: - html = _MD_RENDERER.render(md or "") - rendered = transform_html(html) - - text = _BULLET_RE.sub(r"\1-", rendered.text) - - entities = [dict(e) for e in rendered.entities] - return text, entities - - -def trim_body(body: str | None) -> str | None: - if not body: - return None - if len(body) > 3500: - body = body[: 3500 - 1] + "…" - return body if body.strip() else None - - -def prepare_telegram(parts: MarkdownParts) -> tuple[str, list[dict[str, Any]]]: - trimmed = MarkdownParts( - header=parts.header or "", - body=trim_body(parts.body), - footer=parts.footer, - ) - return render_markdown(assemble_markdown_parts(trimmed)) - - def format_changed_file_path(path: str, *, base_dir: Path | None = None) -> str: return f"`{relativize_path(path, base_dir=base_dir)}`" @@ -209,115 +172,100 @@ def render_event_cli(event: TakopiEvent) -> list[str]: return [] -@dataclass -class RecentLine: - action_id: str - text: str - completed: bool = False - - -class ExecProgressRenderer: +class MarkdownFormatter: def __init__( self, - engine: str, + *, max_actions: int = 5, command_width: int | None = MAX_PROGRESS_CMD_LEN, - resume_formatter: Callable[[ResumeToken], str] | None = None, ) -> None: self.max_actions = max(0, int(max_actions)) self.command_width = command_width - self.lines: deque[RecentLine] = deque(maxlen=self.max_actions) - self.action_count = 0 - self.seen_action_ids: set[str] = set() - self.resume_token: ResumeToken | None = None - self._resume_formatter = resume_formatter - self.engine = engine - - def note_event(self, event: TakopiEvent) -> bool: - match event: - case StartedEvent(resume=resume): - self.resume_token = resume - return True - case ActionEvent(action=action, phase=phase, ok=ok): - if action.kind == "turn": - return False - action_id = str(action.id or "") - if not action_id: - return False - completed = phase == "completed" - has_open = self.has_open_line(action_id) - is_update = phase == "updated" or (phase == "started" and has_open) - phase_for_line = "updated" if is_update and not completed else phase - line = format_action_line( - action, phase_for_line, ok, command_width=self.command_width - ) - - if action_id not in self.seen_action_ids: - self.seen_action_ids.add(action_id) - self.action_count += 1 - - self.upsert_line(action_id, line=line, completed=completed) - return True - case _: - return False - - def has_open_line(self, action_id: str) -> bool: - return any( - line.action_id == action_id and not line.completed for line in self.lines - ) - - def upsert_line(self, action_id: str, *, line: str, completed: bool) -> None: - for i in range(len(self.lines) - 1, -1, -1): - existing = self.lines[i] - if existing.action_id == action_id and not existing.completed: - self.lines[i] = RecentLine( - action_id=action_id, - text=line, - completed=existing.completed or completed, - ) - return - self.lines.append( - RecentLine(action_id=action_id, text=line, completed=completed) - ) def render_progress_parts( - self, elapsed_s: float, label: str = "working" + self, + state: ProgressState, + *, + elapsed_s: float, + label: str = "working", ) -> MarkdownParts: - step = self.action_count or None + step = state.action_count or None header = format_header( elapsed_s, step, label=label, - engine=self.engine, + engine=state.engine, ) - body = self.assemble_body([line.text for line in self.lines]) - return MarkdownParts(header=header, body=body, footer=self.render_footer()) + body = self._assemble_body(self._format_actions(state)) + return MarkdownParts(header=header, body=body, footer=state.resume_line) def render_final_parts( - self, elapsed_s: float, answer: str, status: str = "done" + self, + state: ProgressState, + *, + elapsed_s: float, + status: str, + answer: str, ) -> MarkdownParts: - step = self.action_count or None + step = state.action_count or None header = format_header( elapsed_s, step, label=status, - engine=self.engine, + engine=state.engine, ) answer = (answer or "").strip() body = answer if answer else None - return MarkdownParts(header=header, body=body, footer=self.render_footer()) + return MarkdownParts(header=header, body=body, footer=state.resume_line) - def render_footer(self) -> str | None: - if not self.resume_token or self._resume_formatter is None: - return None - return self._resume_formatter(self.resume_token) - - @property - def recent_actions(self) -> list[str]: - return [line.text for line in self.lines] + def _format_actions(self, state: ProgressState) -> list[str]: + actions = list(state.actions) + if self.max_actions == 0: + actions = [] + else: + actions = actions[-self.max_actions :] + return [ + format_action_line( + action_state.action, + action_state.display_phase, + action_state.ok, + command_width=self.command_width, + ) + for action_state in actions + ] @staticmethod - def assemble_body(lines: list[str]) -> str | None: + def _assemble_body(lines: list[str]) -> str | None: if not lines: return None return HARD_BREAK.join(lines) + + +class MarkdownPresenter: + def __init__(self, *, formatter: MarkdownFormatter | None = None) -> None: + self._formatter = formatter or MarkdownFormatter() + + def render_progress( + self, + state: ProgressState, + *, + elapsed_s: float, + label: str = "working", + ) -> RenderedMessage: + parts = self._formatter.render_progress_parts( + state, elapsed_s=elapsed_s, label=label + ) + return RenderedMessage(text=assemble_markdown_parts(parts)) + + def render_final( + self, + state: ProgressState, + *, + elapsed_s: float, + status: str, + answer: str, + ) -> RenderedMessage: + parts = self._formatter.render_final_parts( + state, elapsed_s=elapsed_s, status=status, answer=answer + ) + return RenderedMessage(text=assemble_markdown_parts(parts)) diff --git a/src/takopi/presenter.py b/src/takopi/presenter.py new file mode 100644 index 0000000..cc8c29e --- /dev/null +++ b/src/takopi/presenter.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import Protocol + +from .progress import ProgressState +from .transport import RenderedMessage + + +class Presenter(Protocol): + def render_progress( + self, + state: ProgressState, + *, + elapsed_s: float, + label: str = "working", + ) -> RenderedMessage: ... + + def render_final( + self, + state: ProgressState, + *, + elapsed_s: float, + status: str, + answer: str, + ) -> RenderedMessage: ... diff --git a/src/takopi/progress.py b/src/takopi/progress.py new file mode 100644 index 0000000..d286d8c --- /dev/null +++ b/src/takopi/progress.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable + +from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent + + +@dataclass(frozen=True, slots=True) +class ActionState: + action: Action + phase: str + ok: bool | None + display_phase: str + completed: bool + first_seen: int + last_update: int + + +@dataclass(frozen=True, slots=True) +class ProgressState: + engine: str + action_count: int + actions: tuple[ActionState, ...] + resume: ResumeToken | None + resume_line: str | None + + +class ProgressTracker: + def __init__(self, *, engine: str) -> None: + self.engine = engine + self.resume: ResumeToken | None = None + self.action_count = 0 + self._actions: dict[str, ActionState] = {} + self._seq = 0 + + def note_event(self, event: TakopiEvent) -> bool: + match event: + case StartedEvent(resume=resume): + self.resume = resume + return True + case ActionEvent(action=action, phase=phase, ok=ok): + if action.kind == "turn": + return False + action_id = str(action.id or "") + if not action_id: + return False + completed = phase == "completed" + existing = self._actions.get(action_id) + has_open = existing is not None and not existing.completed + is_update = phase == "updated" or (phase == "started" and has_open) + display_phase = "updated" if is_update and not completed else phase + + self._seq += 1 + seq = self._seq + + if existing is None: + self.action_count += 1 + first_seen = seq + else: + first_seen = existing.first_seen + self._actions[action_id] = ActionState( + action=action, + phase=phase, + ok=ok, + display_phase=display_phase, + completed=completed, + first_seen=first_seen, + last_update=seq, + ) + return True + case _: + return False + + def set_resume(self, resume: ResumeToken | None) -> None: + if resume is not None: + self.resume = resume + + def snapshot( + self, + *, + resume_formatter: Callable[[ResumeToken], str] | None = None, + ) -> ProgressState: + resume_line: str | None = None + if self.resume is not None and resume_formatter is not None: + resume_line = resume_formatter(self.resume) + actions = tuple( + sorted(self._actions.values(), key=lambda item: item.first_seen) + ) + return ProgressState( + engine=self.engine, + action_count=self.action_count, + actions=actions, + resume=self.resume, + resume_line=resume_line, + ) diff --git a/src/takopi/runner_bridge.py b/src/takopi/runner_bridge.py new file mode 100644 index 0000000..9ed6ae5 --- /dev/null +++ b/src/takopi/runner_bridge.py @@ -0,0 +1,575 @@ +from __future__ import annotations + +import time +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field + +import anyio + +from .logging import bind_run_context, get_logger +from .model import CompletedEvent, ResumeToken, StartedEvent, TakopiEvent +from .presenter import Presenter +from .markdown import render_event_cli +from .runner import Runner +from .progress import ProgressTracker +from .transport import ( + ChannelId, + MessageId, + MessageRef, + RenderedMessage, + SendOptions, + Transport, +) + +logger = get_logger(__name__) + + +def _log_runner_event(evt: TakopiEvent) -> None: + for line in render_event_cli(evt): + logger.debug( + "runner.event.cli", + line=line, + event_type=getattr(evt, "type", None), + engine=getattr(evt, "engine", None), + ) + + +def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str: + stripped_lines: list[str] = [] + for line in text.splitlines(): + if is_resume_line(line): + continue + stripped_lines.append(line) + prompt = "\n".join(stripped_lines).strip() + return prompt or "continue" + + +def _flatten_exception_group(error: BaseException) -> list[BaseException]: + if isinstance(error, BaseExceptionGroup): + flattened: list[BaseException] = [] + for exc in error.exceptions: + flattened.extend(_flatten_exception_group(exc)) + return flattened + return [error] + + +def _format_error(error: Exception) -> str: + cancel_exc = anyio.get_cancelled_exc_class() + flattened = [ + exc + for exc in _flatten_exception_group(error) + if not isinstance(exc, cancel_exc) + ] + if len(flattened) == 1: + return str(flattened[0]) or flattened[0].__class__.__name__ + if not flattened: + return str(error) or error.__class__.__name__ + messages = [str(exc) for exc in flattened if str(exc)] + if not messages: + return str(error) or error.__class__.__name__ + if len(messages) == 1: + return messages[0] + return "\n".join(messages) + + +@dataclass(frozen=True, slots=True) +class IncomingMessage: + channel_id: ChannelId + message_id: MessageId + text: str + reply_to: MessageRef | None = None + + +@dataclass(frozen=True) +class ExecBridgeConfig: + transport: Transport + presenter: Presenter + final_notify: bool + + +@dataclass +class RunningTask: + resume: ResumeToken | None = None + resume_ready: anyio.Event = field(default_factory=anyio.Event) + cancel_requested: anyio.Event = field(default_factory=anyio.Event) + done: anyio.Event = field(default_factory=anyio.Event) + + +RunningTasks = dict[MessageRef, RunningTask] + + +async def _send_or_edit_message( + transport: Transport, + *, + channel_id: ChannelId, + message: RenderedMessage, + edit_ref: MessageRef | None = None, + reply_to: MessageRef | None = None, + notify: bool = True, + replace_ref: MessageRef | None = None, +) -> tuple[MessageRef | None, bool]: + msg = message + if edit_ref is not None: + logger.debug( + "transport.edit_message", + channel_id=edit_ref.channel_id, + message_id=edit_ref.message_id, + rendered=msg.text, + ) + edited = await transport.edit(ref=edit_ref, message=msg) + if edited is not None: + return edited, True + + logger.debug( + "transport.send_message", + channel_id=channel_id, + reply_to_message_id=reply_to.message_id if reply_to else None, + rendered=msg.text, + ) + sent = await transport.send( + channel_id=channel_id, + message=msg, + options=SendOptions( + reply_to=reply_to, + notify=notify, + replace=replace_ref, + ), + ) + return sent, False + + +class ProgressEdits: + def __init__( + self, + *, + transport: Transport, + presenter: Presenter, + channel_id: ChannelId, + progress_ref: MessageRef | None, + tracker: ProgressTracker, + started_at: float, + clock: Callable[[], float], + last_rendered: RenderedMessage | None, + resume_formatter: Callable[[ResumeToken], str] | None = None, + label: str = "working", + ) -> None: + self.transport = transport + self.presenter = presenter + self.channel_id = channel_id + self.progress_ref = progress_ref + self.tracker = tracker + self.started_at = started_at + self.clock = clock + self.last_rendered = last_rendered + self.resume_formatter = resume_formatter + self.label = label + self.event_seq = 0 + self.rendered_seq = 0 + self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1) + + async def run(self) -> None: + if self.progress_ref is None: + return + while True: + while self.rendered_seq == self.event_seq: + try: + await self.signal_recv.receive() + except anyio.EndOfStream: + return + + seq_at_render = self.event_seq + now = self.clock() + state = self.tracker.snapshot(resume_formatter=self.resume_formatter) + rendered = self.presenter.render_progress( + state, elapsed_s=now - self.started_at, label=self.label + ) + if rendered != self.last_rendered: + logger.debug( + "transport.edit_message", + channel_id=self.channel_id, + message_id=self.progress_ref.message_id, + rendered=rendered.text, + ) + edited = await self.transport.edit( + ref=self.progress_ref, + message=rendered, + wait=False, + ) + if edited is not None: + self.last_rendered = rendered + + self.rendered_seq = seq_at_render + + async def on_event(self, evt: TakopiEvent) -> None: + if not self.tracker.note_event(evt): + return + if self.progress_ref is None: + return + self.event_seq += 1 + try: + self.signal_send.send_nowait(None) + except anyio.WouldBlock: + pass + except (anyio.BrokenResourceError, anyio.ClosedResourceError): + pass + + +@dataclass(frozen=True, slots=True) +class ProgressMessageState: + ref: MessageRef | None + last_rendered: RenderedMessage | None + + +async def send_initial_progress( + cfg: ExecBridgeConfig, + *, + channel_id: ChannelId, + reply_to: MessageRef, + label: str, + tracker: ProgressTracker, + resume_formatter: Callable[[ResumeToken], str] | None = None, +) -> ProgressMessageState: + progress_ref: MessageRef | None = None + last_rendered: RenderedMessage | None = None + + state = tracker.snapshot(resume_formatter=resume_formatter) + initial_rendered = cfg.presenter.render_progress( + state, + elapsed_s=0.0, + label=label, + ) + logger.debug( + "transport.send_message", + channel_id=channel_id, + reply_to_message_id=reply_to.message_id, + rendered=initial_rendered.text, + ) + progress_ref = await cfg.transport.send( + channel_id=channel_id, + message=initial_rendered, + options=SendOptions(reply_to=reply_to, notify=False), + ) + if progress_ref is not None: + last_rendered = initial_rendered + logger.debug( + "progress.sent", + channel_id=progress_ref.channel_id, + message_id=progress_ref.message_id, + ) + + return ProgressMessageState( + ref=progress_ref, + last_rendered=last_rendered, + ) + + +@dataclass(slots=True) +class RunOutcome: + cancelled: bool = False + completed: CompletedEvent | None = None + resume: ResumeToken | None = None + + +async def run_runner_with_cancel( + runner: Runner, + *, + prompt: str, + resume_token: ResumeToken | None, + edits: ProgressEdits, + running_task: RunningTask | None, + on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None, +) -> RunOutcome: + outcome = RunOutcome() + async with anyio.create_task_group() as tg: + + async def run_runner() -> None: + try: + async for evt in runner.run(prompt, resume_token): + _log_runner_event(evt) + if isinstance(evt, StartedEvent): + outcome.resume = evt.resume + bind_run_context(resume=evt.resume.value) + if running_task is not None and running_task.resume is None: + running_task.resume = evt.resume + running_task.resume_ready.set() + if on_thread_known is not None: + await on_thread_known(evt.resume, running_task.done) + elif isinstance(evt, CompletedEvent): + outcome.resume = evt.resume or outcome.resume + outcome.completed = evt + await edits.on_event(evt) + finally: + tg.cancel_scope.cancel() + + async def wait_cancel(task: RunningTask) -> None: + await task.cancel_requested.wait() + outcome.cancelled = True + tg.cancel_scope.cancel() + + tg.start_soon(run_runner) + if running_task is not None: + tg.start_soon(wait_cancel, running_task) + + return outcome + + +def sync_resume_token( + tracker: ProgressTracker, resume: ResumeToken | None +) -> ResumeToken | None: + resume = resume or tracker.resume + tracker.set_resume(resume) + return resume + + +async def send_result_message( + cfg: ExecBridgeConfig, + *, + channel_id: ChannelId, + reply_to: MessageRef, + progress_ref: MessageRef | None, + message: RenderedMessage, + notify: bool, + edit_ref: MessageRef | None, + replace_ref: MessageRef | None = None, + delete_tag: str = "final", +) -> None: + final_msg, edited = await _send_or_edit_message( + cfg.transport, + channel_id=channel_id, + message=message, + edit_ref=edit_ref, + reply_to=reply_to, + notify=notify, + replace_ref=replace_ref, + ) + if final_msg is None: + return + if ( + progress_ref is not None + and (edit_ref is None or not edited) + and replace_ref is None + ): + logger.debug( + "transport.delete_message", + channel_id=progress_ref.channel_id, + message_id=progress_ref.message_id, + tag=delete_tag, + ) + await cfg.transport.delete(ref=progress_ref) + + +async def handle_message( + cfg: ExecBridgeConfig, + *, + runner: Runner, + incoming: IncomingMessage, + resume_token: ResumeToken | None, + strip_resume_line: Callable[[str], bool] | None = None, + running_tasks: RunningTasks | None = None, + on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] + | None = None, + clock: Callable[[], float] = time.monotonic, +) -> None: + logger.info( + "handle.incoming", + channel_id=incoming.channel_id, + user_msg_id=incoming.message_id, + resume=resume_token.value if resume_token else None, + text=incoming.text, + ) + started_at = clock() + is_resume_line = runner.is_resume_line + resume_strip = strip_resume_line or is_resume_line + runner_text = _strip_resume_lines(incoming.text, is_resume_line=resume_strip) + + progress_tracker = ProgressTracker(engine=runner.engine) + + user_ref = MessageRef( + channel_id=incoming.channel_id, + message_id=incoming.message_id, + ) + progress_state = await send_initial_progress( + cfg, + channel_id=incoming.channel_id, + reply_to=user_ref, + label="starting", + tracker=progress_tracker, + resume_formatter=runner.format_resume, + ) + progress_ref = progress_state.ref + + edits = ProgressEdits( + transport=cfg.transport, + presenter=cfg.presenter, + channel_id=incoming.channel_id, + progress_ref=progress_ref, + tracker=progress_tracker, + started_at=started_at, + clock=clock, + last_rendered=progress_state.last_rendered, + resume_formatter=runner.format_resume, + ) + + running_task: RunningTask | None = None + if running_tasks is not None and progress_ref is not None: + running_task = RunningTask() + running_tasks[progress_ref] = running_task + + cancel_exc_type = anyio.get_cancelled_exc_class() + edits_scope = anyio.CancelScope() + + async def run_edits() -> None: + try: + with edits_scope: + await edits.run() + except cancel_exc_type: + # Edits are best-effort; cancellation should not bubble into the task group. + return + + outcome = RunOutcome() + error: Exception | None = None + + async with anyio.create_task_group() as tg: + if progress_ref is not None: + tg.start_soon(run_edits) + + try: + outcome = await run_runner_with_cancel( + runner, + prompt=runner_text, + resume_token=resume_token, + edits=edits, + running_task=running_task, + on_thread_known=on_thread_known, + ) + except Exception as exc: + error = exc + logger.exception( + "handle.runner_failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) + finally: + if running_task is not None and running_tasks is not None: + running_task.done.set() + if progress_ref is not None: + running_tasks.pop(progress_ref, None) + if not outcome.cancelled and error is None: + # Give pending progress edits a chance to flush if they're ready. + await anyio.sleep(0) + edits_scope.cancel() + + elapsed = clock() - started_at + + if error is not None: + sync_resume_token(progress_tracker, outcome.resume) + err_body = _format_error(error) + state = progress_tracker.snapshot(resume_formatter=runner.format_resume) + final_rendered = cfg.presenter.render_final( + state, + elapsed_s=elapsed, + status="error", + answer=err_body, + ) + logger.debug( + "handle.error.rendered", + error=err_body, + rendered=final_rendered.text, + ) + await send_result_message( + cfg, + channel_id=incoming.channel_id, + reply_to=user_ref, + progress_ref=progress_ref, + message=final_rendered, + notify=False, + edit_ref=progress_ref, + replace_ref=progress_ref, + delete_tag="error", + ) + return + + if outcome.cancelled: + resume = sync_resume_token(progress_tracker, outcome.resume) + logger.info( + "handle.cancelled", + resume=resume.value if resume else None, + elapsed_s=elapsed, + ) + state = progress_tracker.snapshot(resume_formatter=runner.format_resume) + final_rendered = cfg.presenter.render_progress( + state, + elapsed_s=elapsed, + label="`cancelled`", + ) + await send_result_message( + cfg, + channel_id=incoming.channel_id, + reply_to=user_ref, + progress_ref=progress_ref, + message=final_rendered, + notify=False, + edit_ref=progress_ref, + replace_ref=progress_ref, + delete_tag="cancel", + ) + return + + if outcome.completed is None: + raise RuntimeError("runner finished without a completed event") + + completed = outcome.completed + run_ok = completed.ok + run_error = completed.error + + final_answer = completed.answer + if run_ok is False and run_error: + if final_answer.strip(): + final_answer = f"{final_answer}\n\n{run_error}" + else: + final_answer = str(run_error) + + status = ( + "error" if run_ok is False else ("done" if final_answer.strip() else "error") + ) + resume_value = None + resume_token = completed.resume or outcome.resume + if resume_token is not None: + resume_value = resume_token.value + logger.info( + "runner.completed", + ok=run_ok, + error=run_error, + answer_len=len(final_answer or ""), + elapsed_s=round(elapsed, 2), + action_count=progress_tracker.action_count, + resume=resume_value, + ) + sync_resume_token(progress_tracker, completed.resume or outcome.resume) + state = progress_tracker.snapshot(resume_formatter=runner.format_resume) + final_rendered = cfg.presenter.render_final( + state, + elapsed_s=elapsed, + status=status, + answer=final_answer, + ) + logger.debug( + "handle.final.rendered", + rendered=final_rendered.text, + status=status, + ) + + can_edit_final = progress_ref is not None + edit_ref = None if cfg.final_notify or not can_edit_final else progress_ref + + await send_result_message( + cfg, + channel_id=incoming.channel_id, + reply_to=user_ref, + progress_ref=progress_ref, + message=final_rendered, + notify=cfg.final_notify, + edit_ref=edit_ref, + replace_ref=progress_ref, + delete_tag="final", + ) diff --git a/src/takopi/telegram/__init__.py b/src/takopi/telegram/__init__.py new file mode 100644 index 0000000..f620aaa --- /dev/null +++ b/src/takopi/telegram/__init__.py @@ -0,0 +1 @@ +"""Telegram-specific clients and adapters.""" diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py new file mode 100644 index 0000000..27d5fcf --- /dev/null +++ b/src/takopi/telegram/bridge.py @@ -0,0 +1,569 @@ +from __future__ import annotations + +from collections.abc import AsyncIterator, Awaitable, Callable +from dataclasses import dataclass +from typing import Any + +import anyio + +from ..runner_bridge import ( + ExecBridgeConfig, + IncomingMessage, + RunningTask, + RunningTasks, + handle_message, +) +from ..logging import bind_run_context, clear_context, get_logger +from ..markdown import MarkdownFormatter, MarkdownParts +from ..model import EngineId, ResumeToken +from ..progress import ProgressState, ProgressTracker +from ..router import AutoRouter, RunnerUnavailableError +from ..runner import Runner +from ..scheduler import ThreadJob, ThreadScheduler +from ..transport import MessageRef, RenderedMessage, SendOptions, Transport +from .client import BotClient +from .render import prepare_telegram + +logger = get_logger(__name__) + + +def _is_cancel_command(text: str) -> bool: + stripped = text.strip() + if not stripped: + return False + command = stripped.split(maxsplit=1)[0] + return command == "/cancel" or command.startswith("/cancel@") + + +def _strip_engine_command( + text: str, *, engine_ids: tuple[EngineId, ...] +) -> tuple[str, EngineId | None]: + if not text: + return text, None + + if not engine_ids: + return text, None + + engine_map = {engine.lower(): engine for engine in engine_ids} + lines = text.splitlines() + idx = next((i for i, line in enumerate(lines) if line.strip()), None) + if idx is None: + return text, None + + line = lines[idx].lstrip() + if not line.startswith("/"): + return text, None + + parts = line.split(maxsplit=1) + command = parts[0][1:] + if "@" in command: + command = command.split("@", 1)[0] + engine = engine_map.get(command.lower()) + if engine is None: + return text, None + + remainder = parts[1] if len(parts) > 1 else "" + if remainder: + lines[idx] = remainder + else: + lines.pop(idx) + return "\n".join(lines).strip(), engine + + +def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]: + commands: list[dict[str, str]] = [] + seen: set[str] = set() + for entry in router.available_entries: + cmd = entry.engine.lower() + if cmd in seen: + continue + commands.append({"command": cmd, "description": f"start {cmd}"}) + seen.add(cmd) + if "cancel" not in seen: + commands.append({"command": "cancel", "description": "cancel run"}) + return commands + + +async def _set_command_menu(cfg: TelegramBridgeConfig) -> None: + commands = _build_bot_commands(cfg.router) + if not commands: + return + try: + ok = await cfg.bot.set_my_commands(commands) + except Exception as exc: + logger.info( + "startup.command_menu.failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) + return + if not ok: + logger.info("startup.command_menu.rejected") + return + logger.info( + "startup.command_menu.updated", + commands=[cmd["command"] for cmd in commands], + ) + + +class TelegramPresenter: + def __init__(self, *, formatter: MarkdownFormatter | None = None) -> None: + self._formatter = formatter or MarkdownFormatter() + + def render_progress( + self, + state: ProgressState, + *, + elapsed_s: float, + label: str = "working", + ) -> RenderedMessage: + parts = self._formatter.render_progress_parts( + state, elapsed_s=elapsed_s, label=label + ) + text, entities = prepare_telegram(parts) + return RenderedMessage(text=text, extra={"entities": entities}) + + def render_final( + self, + state: ProgressState, + *, + elapsed_s: float, + status: str, + answer: str, + ) -> RenderedMessage: + parts = self._formatter.render_final_parts( + state, elapsed_s=elapsed_s, status=status, answer=answer + ) + text, entities = prepare_telegram(parts) + return RenderedMessage(text=text, extra={"entities": entities}) + + +def _as_int(value: int | str, *, label: str) -> int: + if isinstance(value, bool) or not isinstance(value, int): + raise TypeError(f"Telegram {label} must be int") + return value + + +class TelegramTransport: + def __init__(self, bot: BotClient) -> None: + self._bot = bot + + async def close(self) -> None: + await self._bot.close() + + async def send( + self, + *, + channel_id: int | str, + message: RenderedMessage, + options: SendOptions | None = None, + ) -> MessageRef | None: + chat_id = _as_int(channel_id, label="chat_id") + reply_to_message_id: int | None = None + replace_message_id: int | None = None + disable_notification = None + if options is not None: + disable_notification = not options.notify + if options.reply_to is not None: + reply_to_message_id = _as_int( + options.reply_to.message_id, label="reply_to_message_id" + ) + if options.replace is not None: + replace_message_id = _as_int( + options.replace.message_id, label="replace_message_id" + ) + entities = message.extra.get("entities") + parse_mode = message.extra.get("parse_mode") + sent = await self._bot.send_message( + chat_id=chat_id, + text=message.text, + reply_to_message_id=reply_to_message_id, + disable_notification=disable_notification, + entities=entities, + parse_mode=parse_mode, + replace_message_id=replace_message_id, + ) + if sent is None: + return None + message_id = sent.get("message_id") + if message_id is None: + return None + return MessageRef( + channel_id=chat_id, + message_id=_as_int(message_id, label="message_id"), + raw=sent, + ) + + async def edit( + self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True + ) -> MessageRef | None: + chat_id = _as_int(ref.channel_id, label="chat_id") + message_id = _as_int(ref.message_id, label="message_id") + entities = message.extra.get("entities") + parse_mode = message.extra.get("parse_mode") + edited = await self._bot.edit_message_text( + chat_id=chat_id, + message_id=message_id, + text=message.text, + entities=entities, + parse_mode=parse_mode, + wait=wait, + ) + if edited is None: + return ref if not wait else None + message_id = edited.get("message_id", message_id) + return MessageRef( + channel_id=chat_id, + message_id=_as_int(message_id, label="message_id"), + raw=edited, + ) + + async def delete(self, *, ref: MessageRef) -> bool: + return await self._bot.delete_message( + chat_id=_as_int(ref.channel_id, label="chat_id"), + message_id=_as_int(ref.message_id, label="message_id"), + ) + + +@dataclass(frozen=True) +class TelegramBridgeConfig: + bot: BotClient + router: AutoRouter + chat_id: int + startup_msg: str + exec_cfg: ExecBridgeConfig + + +async def _send_plain( + transport: Transport, + *, + chat_id: int, + user_msg_id: int, + text: str, + notify: bool = True, +) -> None: + reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id) + await transport.send( + channel_id=chat_id, + message=RenderedMessage(text=text), + options=SendOptions(reply_to=reply_to, notify=notify), + ) + + +async def _send_startup(cfg: TelegramBridgeConfig) -> None: + logger.debug("startup.message", text=cfg.startup_msg) + parts = MarkdownParts(header=cfg.startup_msg) + text, entities = prepare_telegram(parts) + message = RenderedMessage(text=text, extra={"entities": entities}) + sent = await cfg.exec_cfg.transport.send( + channel_id=cfg.chat_id, + message=message, + ) + if sent is not None: + logger.info("startup.sent", chat_id=cfg.chat_id) + + +async def _drain_backlog(cfg: TelegramBridgeConfig, offset: int | None) -> int | None: + drained = 0 + while True: + updates = await cfg.bot.get_updates( + offset=offset, timeout_s=0, allowed_updates=["message"] + ) + if updates is None: + logger.info("startup.backlog.failed") + return offset + logger.debug("startup.backlog.updates", updates=updates) + if not updates: + if drained: + logger.info("startup.backlog.drained", count=drained) + return offset + offset = updates[-1]["update_id"] + 1 + drained += len(updates) + + +async def poll_updates(cfg: TelegramBridgeConfig) -> AsyncIterator[dict[str, Any]]: + offset: int | None = None + offset = await _drain_backlog(cfg, offset) + await _send_startup(cfg) + + while True: + updates = await cfg.bot.get_updates( + offset=offset, timeout_s=50, allowed_updates=["message"] + ) + if updates is None: + logger.info("loop.get_updates.failed") + await anyio.sleep(2) + continue + logger.debug("loop.updates", updates=updates) + + for upd in updates: + offset = upd["update_id"] + 1 + msg = upd["message"] + if "text" not in msg: + continue + if msg["chat"]["id"] != cfg.chat_id: + continue + yield msg + + +async def _handle_cancel( + cfg: TelegramBridgeConfig, + msg: dict[str, Any], + running_tasks: RunningTasks, +) -> None: + chat_id = msg["chat"]["id"] + user_msg_id = msg["message_id"] + reply = msg.get("reply_to_message") + + if not reply: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text="reply to the progress message to cancel.", + ) + return + + progress_id = reply.get("message_id") + if progress_id is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text="nothing is currently running for that message.", + ) + return + + progress_ref = MessageRef(channel_id=chat_id, message_id=progress_id) + running_task = running_tasks.get(progress_ref) + if running_task is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text="nothing is currently running for that message.", + ) + return + + logger.info( + "cancel.requested", + chat_id=chat_id, + progress_message_id=progress_id, + ) + running_task.cancel_requested.set() + + +async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None: + if running_task.resume is not None: + return running_task.resume + resume: ResumeToken | None = None + + async with anyio.create_task_group() as tg: + + async def wait_resume() -> None: + nonlocal resume + await running_task.resume_ready.wait() + resume = running_task.resume + tg.cancel_scope.cancel() + + async def wait_done() -> None: + await running_task.done.wait() + tg.cancel_scope.cancel() + + tg.start_soon(wait_resume) + tg.start_soon(wait_done) + + return resume + + +async def _send_with_resume( + cfg: TelegramBridgeConfig, + enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None]], + running_task: RunningTask, + chat_id: int, + user_msg_id: int, + text: str, +) -> None: + resume = await _wait_for_resume(running_task) + if resume is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text="resume token not ready yet; try replying to the final message.", + notify=False, + ) + return + await enqueue(chat_id, user_msg_id, text, resume) + + +async def _send_runner_unavailable( + cfg: TelegramBridgeConfig, + *, + chat_id: int, + user_msg_id: int, + resume_token: ResumeToken | None, + runner: Runner, + reason: str, +) -> None: + tracker = ProgressTracker(engine=runner.engine) + tracker.set_resume(resume_token) + state = tracker.snapshot(resume_formatter=runner.format_resume) + message = cfg.exec_cfg.presenter.render_final( + state, + elapsed_s=0.0, + status="error", + answer=f"error:\n{reason}", + ) + reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id) + await cfg.exec_cfg.transport.send( + channel_id=chat_id, + message=message, + options=SendOptions(reply_to=reply_to, notify=True), + ) + + +async def run_main_loop( + cfg: TelegramBridgeConfig, + poller: Callable[ + [TelegramBridgeConfig], AsyncIterator[dict[str, Any]] + ] = poll_updates, +) -> None: + running_tasks: RunningTasks = {} + + try: + await _set_command_menu(cfg) + async with anyio.create_task_group() as tg: + + async def run_job( + chat_id: int, + user_msg_id: int, + text: str, + resume_token: ResumeToken | None, + reply_ref: MessageRef | None = None, + on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] + | None = None, + engine_override: EngineId | None = None, + ) -> None: + try: + try: + entry = ( + cfg.router.entry_for_engine(engine_override) + if resume_token is None + else cfg.router.entry_for(resume_token) + ) + except RunnerUnavailableError as exc: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=chat_id, + user_msg_id=user_msg_id, + text=f"error:\n{exc}", + ) + return + if not entry.available: + reason = entry.issue or "engine unavailable" + await _send_runner_unavailable( + cfg, + chat_id=chat_id, + user_msg_id=user_msg_id, + resume_token=resume_token, + runner=entry.runner, + reason=reason, + ) + return + bind_run_context( + chat_id=chat_id, + user_msg_id=user_msg_id, + engine=entry.runner.engine, + resume=resume_token.value if resume_token else None, + ) + incoming = IncomingMessage( + channel_id=chat_id, + message_id=user_msg_id, + text=text, + reply_to=reply_ref, + ) + await handle_message( + cfg.exec_cfg, + runner=entry.runner, + incoming=incoming, + resume_token=resume_token, + strip_resume_line=cfg.router.is_resume_line, + running_tasks=running_tasks, + on_thread_known=on_thread_known, + ) + except Exception as exc: + logger.exception( + "handle.worker_failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) + finally: + clear_context() + + async def run_thread_job(job: ThreadJob) -> None: + await run_job( + job.chat_id, + job.user_msg_id, + job.text, + job.resume_token, + None, + ) + + scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job) + + async for msg in poller(cfg): + text = msg["text"] + user_msg_id = msg["message_id"] + chat_id = msg["chat"]["id"] + reply_ref = None + reply_msg = msg.get("reply_to_message") + if reply_msg: + reply_id = reply_msg.get("message_id") + if reply_id is not None: + reply_ref = MessageRef(channel_id=chat_id, message_id=reply_id) + + if _is_cancel_command(text): + tg.start_soon(_handle_cancel, cfg, msg, running_tasks) + continue + + text, engine_override = _strip_engine_command( + text, engine_ids=cfg.router.engine_ids + ) + + r = msg.get("reply_to_message") or {} + resume_token = cfg.router.resolve_resume(text, r.get("text")) + reply_id = r.get("message_id") + if resume_token is None and reply_id is not None: + running_task = running_tasks.get( + MessageRef(channel_id=chat_id, message_id=reply_id) + ) + if running_task is not None: + tg.start_soon( + _send_with_resume, + cfg, + scheduler.enqueue_resume, + running_task, + chat_id, + user_msg_id, + text, + ) + continue + + if resume_token is None: + tg.start_soon( + run_job, + chat_id, + user_msg_id, + text, + None, + reply_ref, + scheduler.note_thread_known, + engine_override, + ) + else: + await scheduler.enqueue_resume( + chat_id, user_msg_id, text, resume_token + ) + finally: + await cfg.exec_cfg.transport.close() diff --git a/src/takopi/telegram.py b/src/takopi/telegram/client.py similarity index 99% rename from src/takopi/telegram.py rename to src/takopi/telegram/client.py index a25394f..52e3c69 100644 --- a/src/takopi/telegram.py +++ b/src/takopi/telegram/client.py @@ -9,7 +9,7 @@ import httpx import anyio -from .logging import get_logger +from ..logging import get_logger logger = get_logger(__name__) diff --git a/src/takopi/telegram/config.py b/src/takopi/telegram/config.py new file mode 100644 index 0000000..b395e0e --- /dev/null +++ b/src/takopi/telegram/config.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +import tomllib +from pathlib import Path + +from ..config import ConfigError + +HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml" + + +def _read_config(cfg_path: Path) -> dict: + try: + raw = cfg_path.read_text(encoding="utf-8") + except FileNotFoundError: + raise ConfigError(f"Missing config file {cfg_path}.") from None + except OSError as e: + raise ConfigError(f"Failed to read config file {cfg_path}: {e}") from e + try: + return tomllib.loads(raw) + except tomllib.TOMLDecodeError as e: + raise ConfigError(f"Malformed TOML in {cfg_path}: {e}") from None + + +def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]: + if path: + cfg_path = Path(path).expanduser() + return _read_config(cfg_path), cfg_path + cfg_path = HOME_CONFIG_PATH + if cfg_path.exists() and not cfg_path.is_file(): + raise ConfigError(f"Config path {cfg_path} exists but is not a file.") from None + return _read_config(cfg_path), cfg_path diff --git a/src/takopi/onboarding.py b/src/takopi/telegram/onboarding.py similarity index 97% rename from src/takopi/onboarding.py rename to src/takopi/telegram/onboarding.py index 7d7790c..8de6ad8 100644 --- a/src/takopi/onboarding.py +++ b/src/takopi/telegram/onboarding.py @@ -20,12 +20,13 @@ from rich.console import Console from rich.panel import Panel from rich.table import Table -from .backends import EngineBackend, SetupIssue -from .backends_helpers import install_issue -from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config -from .engines import list_backends -from .logging import suppress_logs -from .telegram import TelegramClient, TelegramRetryAfter +from ..backends import EngineBackend, SetupIssue +from ..backends_helpers import install_issue +from ..config import ConfigError +from ..engines import list_backends +from ..logging import suppress_logs +from .client import TelegramClient, TelegramRetryAfter +from .config import HOME_CONFIG_PATH, load_telegram_config @dataclass(slots=True) diff --git a/src/takopi/telegram/render.py b/src/takopi/telegram/render.py new file mode 100644 index 0000000..cc5283f --- /dev/null +++ b/src/takopi/telegram/render.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +import re +from typing import Any + +from markdown_it import MarkdownIt +from sulguk import transform_html + +from ..markdown import MarkdownParts, assemble_markdown_parts + +_MD_RENDERER = MarkdownIt("commonmark", {"html": False}) +_BULLET_RE = re.compile(r"(?m)^(\s*)•") + + +def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]: + html = _MD_RENDERER.render(md or "") + rendered = transform_html(html) + + text = _BULLET_RE.sub(r"\1-", rendered.text) + + entities = [dict(e) for e in rendered.entities] + return text, entities + + +def trim_body(body: str | None) -> str | None: + if not body: + return None + if len(body) > 3500: + body = body[: 3500 - 1] + "…" + return body if body.strip() else None + + +def prepare_telegram(parts: MarkdownParts) -> tuple[str, list[dict[str, Any]]]: + trimmed = MarkdownParts( + header=parts.header or "", + body=trim_body(parts.body), + footer=parts.footer, + ) + return render_markdown(assemble_markdown_parts(trimmed)) diff --git a/src/takopi/transport.py b/src/takopi/transport.py new file mode 100644 index 0000000..b0a8789 --- /dev/null +++ b/src/takopi/transport.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Protocol, TypeAlias + +ChannelId: TypeAlias = int | str +MessageId: TypeAlias = int | str + + +@dataclass(frozen=True, slots=True) +class MessageRef: + channel_id: ChannelId + message_id: MessageId + raw: Any | None = field(default=None, compare=False, hash=False) + + +@dataclass(frozen=True, slots=True) +class RenderedMessage: + text: str + extra: dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True, slots=True) +class SendOptions: + reply_to: MessageRef | None = None + notify: bool = True + replace: MessageRef | None = None + + +class Transport(Protocol): + async def close(self) -> None: ... + + async def send( + self, + *, + channel_id: ChannelId, + message: RenderedMessage, + options: SendOptions | None = None, + ) -> MessageRef | None: ... + + async def edit( + self, + *, + ref: MessageRef, + message: RenderedMessage, + wait: bool = True, + ) -> MessageRef | None: ... + + async def delete(self, *, ref: MessageRef) -> bool: ... diff --git a/tests/test_exec_bridge.py b/tests/test_exec_bridge.py index 99f16f5..5636990 100644 --- a/tests/test_exec_bridge.py +++ b/tests/test_exec_bridge.py @@ -3,25 +3,18 @@ import uuid import anyio import pytest -from takopi.bridge import _build_bot_commands, _strip_engine_command +from takopi.runner_bridge import ExecBridgeConfig, IncomingMessage, handle_message +from takopi.markdown import MarkdownParts, MarkdownPresenter from takopi.model import EngineId, ResumeToken, TakopiEvent -from takopi.render import MarkdownParts, prepare_telegram -from takopi.router import AutoRouter, RunnerEntry +from takopi.telegram.render import prepare_telegram from takopi.runners.codex import CodexRunner -from takopi.telegram import TelegramClient -from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Sleep, Wait +from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Wait +from takopi.transport import MessageRef, RenderedMessage, SendOptions from tests.factories import action_completed, action_started CODEX_ENGINE = EngineId("codex") -def _make_router(runner) -> AutoRouter: - return AutoRouter( - entries=[RunnerEntry(engine=runner.engine, runner=runner)], - default_engine=runner.engine, - ) - - def _patch_config(monkeypatch, config): from pathlib import Path @@ -34,6 +27,67 @@ def _patch_config(monkeypatch, config): ) +class _FakeTransport: + def __init__(self) -> None: + self._next_id = 1 + self.send_calls: list[dict] = [] + self.edit_calls: list[dict] = [] + self.delete_calls: list[MessageRef] = [] + + async def send( + self, + *, + channel_id: int | str, + message: RenderedMessage, + options: SendOptions | None = None, + ) -> MessageRef: + ref = MessageRef(channel_id=channel_id, message_id=self._next_id) + self._next_id += 1 + self.send_calls.append( + { + "ref": ref, + "channel_id": channel_id, + "message": message, + "options": options, + } + ) + return ref + + async def edit( + self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True + ) -> MessageRef: + self.edit_calls.append({"ref": ref, "message": message, "wait": wait}) + return ref + + async def delete(self, *, ref: MessageRef) -> bool: + self.delete_calls.append(ref) + return True + + async def close(self) -> None: + return None + + +class _FakeClock: + def __init__(self, start: float = 0.0) -> None: + self._now = start + + def __call__(self) -> float: + return self._now + + def set(self, value: float) -> None: + self._now = value + + +def _return_runner( + *, answer: str = "ok", resume_value: str | None = None +) -> ScriptRunner: + return ScriptRunner( + [Return(answer=answer)], + engine=CODEX_ENGINE, + resume_value=resume_value, + ) + + def test_load_and_validate_config_rejects_empty_token(monkeypatch) -> None: from takopi import cli @@ -125,250 +179,36 @@ def test_prepare_telegram_preserves_entities_on_truncate() -> None: assert any(e.get("type") == "bold" for e in entities) -def test_strip_engine_command_inline() -> None: - text, engine = _strip_engine_command( - "/claude do it", engine_ids=("codex", "claude") - ) - assert engine == "claude" - assert text == "do it" - - -def test_strip_engine_command_newline() -> None: - text, engine = _strip_engine_command( - "/codex\nhello", engine_ids=("codex", "claude") - ) - assert engine == "codex" - assert text == "hello" - - -def test_strip_engine_command_ignores_unknown() -> None: - text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude")) - assert engine is None - assert text == "/unknown hi" - - -def test_strip_engine_command_bot_suffix() -> None: - text, engine = _strip_engine_command( - "/claude@bunny_agent_bot hi", engine_ids=("claude",) - ) - assert engine == "claude" - assert text == "hi" - - -def test_strip_engine_command_only_first_non_empty_line() -> None: - text, engine = _strip_engine_command( - "hello\n/claude hi", engine_ids=("codex", "claude") - ) - assert engine is None - assert text == "hello\n/claude hi" - - -def test_build_bot_commands_includes_cancel_and_engine() -> None: - runner = ScriptRunner( - [Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid" - ) - router = _make_router(runner) - commands = _build_bot_commands(router) - - assert {"command": "cancel", "description": "cancel run"} in commands - assert any(cmd["command"] == "codex" for cmd in commands) - - -class _FakeBot: - def __init__(self) -> None: - self._next_id = 1 - self.command_calls: list[dict] = [] - self.send_calls: list[dict] = [] - self.edit_calls: list[dict] = [] - self.delete_calls: list[dict] = [] - - async def send_message( - self, - chat_id: int, - text: str, - reply_to_message_id: int | None = None, - disable_notification: bool | None = False, - entities: list[dict] | None = None, - parse_mode: str | None = None, - *, - replace_message_id: int | None = None, - ) -> dict: - _ = replace_message_id - self.send_calls.append( - { - "chat_id": chat_id, - "text": text, - "reply_to_message_id": reply_to_message_id, - "disable_notification": disable_notification, - "entities": entities, - "parse_mode": parse_mode, - } - ) - msg_id = self._next_id - self._next_id += 1 - return {"message_id": msg_id} - - async def edit_message_text( - self, - chat_id: int, - message_id: int, - text: str, - entities: list[dict] | None = None, - parse_mode: str | None = None, - *, - wait: bool = True, - ) -> dict: - _ = wait - self.edit_calls.append( - { - "chat_id": chat_id, - "message_id": message_id, - "text": text, - "entities": entities, - "parse_mode": parse_mode, - } - ) - return {"message_id": message_id} - - async def delete_message( - self, - chat_id: int, - message_id: int, - ) -> bool: - self.delete_calls.append({"chat_id": chat_id, "message_id": message_id}) - return True - - async def set_my_commands( - self, - commands: list[dict], - *, - scope: dict | None = None, - language_code: str | None = None, - ) -> bool: - self.command_calls.append( - { - "commands": commands, - "scope": scope, - "language_code": language_code, - } - ) - return True - - async def get_updates( - self, - offset: int | None, - timeout_s: int = 50, - allowed_updates: list[str] | None = None, - ) -> list[dict] | None: - _ = offset - _ = timeout_s - _ = allowed_updates - return [] - - async def close(self) -> None: - return None - - async def get_me(self) -> dict | None: - return {"id": 1} - - -class _FakeClock: - def __init__(self, start: float = 0.0) -> None: - self._now = start - self._sleep_until: float | None = None - self._sleep_event: anyio.Event | None = None - self.sleep_calls = 0 - - def __call__(self) -> float: - return self._now - - def set(self, value: float) -> None: - self._now = value - if self._sleep_until is None or self._sleep_event is None: - return - if self._sleep_until <= self._now: - self._sleep_event.set() - self._sleep_until = None - self._sleep_event = None - - async def sleep(self, delay: float) -> None: - if delay <= 0: - await anyio.sleep(0) - return - self.sleep_calls += 1 - self._sleep_until = self._now + delay - self._sleep_event = anyio.Event() - await self._sleep_event.wait() - - -def _queued_bot( - bot: "_FakeBot", *, clock: "_FakeClock | None" = None -) -> TelegramClient: - if clock is None: - return TelegramClient( - client=bot, - private_chat_rps=0.0, - group_chat_rps=0.0, - ) - return TelegramClient( - client=bot, - clock=clock, - sleep=clock.sleep, - private_chat_rps=0.0, - group_chat_rps=0.0, - ) - - -def _return_runner( - *, answer: str = "ok", resume_value: str | None = None -) -> ScriptRunner: - return ScriptRunner( - [Return(answer=answer)], - engine=CODEX_ENGINE, - resume_value=resume_value, - ) - - @pytest.mark.anyio async def test_final_notify_sends_loud_final_message() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text="hi", + incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"), resume_token=None, ) - assert len(bot.send_calls) == 2 - assert bot.send_calls[0]["disable_notification"] is True - assert bot.send_calls[1]["disable_notification"] is False + assert len(transport.send_calls) == 2 + assert transport.send_calls[0]["options"].notify is False + assert transport.send_calls[1]["options"].notify is True @pytest.mark.anyio async def test_handle_message_strips_resume_line_from_prompt() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) resume = ResumeToken(engine=CODEX_ENGINE, value="sid") text = "do this\n`codex resume sid`\nand that" @@ -376,9 +216,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None: await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text=text, + incoming=IncomingMessage(channel_id=123, message_id=10, text=text), resume_token=resume, ) @@ -390,38 +228,30 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None: @pytest.mark.anyio async def test_long_final_message_edits_progress_message() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() runner = _return_runner(answer="x" * 10_000) - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=False, - startup_msg="", ) await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text="hi", + incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"), resume_token=None, ) - assert len(bot.send_calls) == 1 - assert bot.send_calls[0]["disable_notification"] is True - assert bot.edit_calls - assert "done" in bot.edit_calls[-1]["text"].lower() + assert len(transport.send_calls) == 1 + assert transport.send_calls[0]["options"].notify is False + assert transport.edit_calls + assert "done" in transport.edit_calls[-1]["message"].text.lower() @pytest.mark.anyio -async def test_progress_edits_are_rate_limited() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() +async def test_progress_edits_are_best_effort() -> None: + transport = _FakeTransport() clock = _FakeClock() events: list[TakopiEvent] = [ action_started("item_0", "command", "echo 1"), @@ -437,88 +267,28 @@ async def test_progress_edits_are_rate_limited() -> None: engine=CODEX_ENGINE, advance=clock.set, ) - cfg = BridgeConfig( - bot=_queued_bot(bot, clock=clock), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text="hi", + incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"), resume_token=None, clock=clock, ) - assert bot.edit_calls - assert "working" in bot.edit_calls[-1]["text"].lower() - - -@pytest.mark.anyio -async def test_progress_edits_do_not_sleep_again_without_new_events() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() - clock = _FakeClock() - hold = anyio.Event() - events: list[TakopiEvent] = [ - action_started("item_0", "command", "echo 1"), - action_started("item_1", "command", "echo 2"), - ] - runner = ScriptRunner( - [ - Emit(events[0], at=0.2), - Emit(events[1], at=0.4), - Wait(hold), - Return(answer="ok"), - ], - engine=CODEX_ENGINE, - advance=clock.set, - ) - cfg = BridgeConfig( - bot=_queued_bot(bot, clock=clock), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - - async def run_handle_message() -> None: - await handle_message( - cfg, - runner=runner, - chat_id=123, - user_msg_id=10, - text="hi", - resume_token=None, - clock=clock, - ) - - async with anyio.create_task_group() as tg: - tg.start_soon(run_handle_message) - - for _ in range(100): - if bot.edit_calls: - break - await anyio.sleep(0) - - assert bot.edit_calls - assert clock.sleep_calls == 0 - assert clock._sleep_until is None - - hold.set() + assert transport.edit_calls + assert all(call["wait"] is False for call in transport.edit_calls) + assert "working" in transport.edit_calls[-1]["message"].text.lower() @pytest.mark.anyio async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() clock = _FakeClock() events: list[TakopiEvent] = [ action_started("item_0", "command", "echo ok"), @@ -541,182 +311,32 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None: advance=clock.set, resume_value=session_id, ) - cfg = BridgeConfig( - bot=_queued_bot(bot, clock=clock), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=42, - text="do it", + incoming=IncomingMessage(channel_id=123, message_id=42, text="do it"), resume_token=None, clock=clock, ) - assert bot.send_calls[0]["reply_to_message_id"] == 42 - assert "starting" in bot.send_calls[0]["text"] - assert "codex" in bot.send_calls[0]["text"] - assert len(bot.edit_calls) >= 1 - assert session_id in bot.send_calls[-1]["text"] - assert "codex resume" in bot.send_calls[-1]["text"].lower() - assert len(bot.delete_calls) == 1 - - -@pytest.mark.anyio -async def test_handle_cancel_without_reply_prompts_user() -> None: - from takopi.bridge import BridgeConfig, _handle_cancel - - bot = _FakeBot() - runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - msg = {"chat": {"id": 123}, "message_id": 10} - running_tasks: dict = {} - - await _handle_cancel(cfg, msg, running_tasks) - - assert len(bot.send_calls) == 1 - assert "reply to the progress message" in bot.send_calls[0]["text"] - - -@pytest.mark.anyio -async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None: - from takopi.bridge import BridgeConfig, _handle_cancel - - bot = _FakeBot() - runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - msg = { - "chat": {"id": 123}, - "message_id": 10, - "reply_to_message": {"text": "no message id"}, - } - running_tasks: dict = {} - - await _handle_cancel(cfg, msg, running_tasks) - - assert len(bot.send_calls) == 1 - assert "nothing is currently running" in bot.send_calls[0]["text"] - - -@pytest.mark.anyio -async def test_handle_cancel_with_finished_task_says_nothing_running() -> None: - from takopi.bridge import BridgeConfig, _handle_cancel - - bot = _FakeBot() - runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - progress_id = 99 - msg = { - "chat": {"id": 123}, - "message_id": 10, - "reply_to_message": {"message_id": progress_id}, - } - running_tasks: dict = {} # Progress message not in running_tasks - - await _handle_cancel(cfg, msg, running_tasks) - - assert len(bot.send_calls) == 1 - assert "nothing is currently running" in bot.send_calls[0]["text"] - - -@pytest.mark.anyio -async def test_handle_cancel_cancels_running_task() -> None: - from takopi.bridge import BridgeConfig, _handle_cancel - - bot = _FakeBot() - runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - progress_id = 42 - msg = { - "chat": {"id": 123}, - "message_id": 10, - "reply_to_message": {"message_id": progress_id}, - } - - from takopi.bridge import RunningTask - - running_task = RunningTask() - running_tasks = {progress_id: running_task} - await _handle_cancel(cfg, msg, running_tasks) - - assert running_task.cancel_requested.is_set() is True - assert len(bot.send_calls) == 0 # No error message sent - - -@pytest.mark.anyio -async def test_handle_cancel_only_cancels_matching_progress_message() -> None: - from takopi.bridge import BridgeConfig, _handle_cancel - - bot = _FakeBot() - runner = _return_runner(answer="ok") - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - from takopi.bridge import RunningTask - - task_first = RunningTask() - task_second = RunningTask() - msg = { - "chat": {"id": 123}, - "message_id": 10, - "reply_to_message": {"message_id": 1}, - } - running_tasks = {1: task_first, 2: task_second} - - await _handle_cancel(cfg, msg, running_tasks) - - assert task_first.cancel_requested.is_set() is True - assert task_second.cancel_requested.is_set() is False - assert len(bot.send_calls) == 0 - - -def test_cancel_command_accepts_extra_text() -> None: - from takopi.bridge import _is_cancel_command - - assert _is_cancel_command("/cancel now") is True - assert _is_cancel_command("/cancel@takopi please") is True - assert _is_cancel_command("/cancelled") is False + assert transport.send_calls[0]["options"].reply_to.message_id == 42 + assert "starting" in transport.send_calls[0]["message"].text + assert "codex" in transport.send_calls[0]["message"].text + assert len(transport.edit_calls) >= 1 + assert session_id in transport.send_calls[-1]["message"].text + assert "codex resume" in transport.send_calls[-1]["message"].text.lower() + assert transport.send_calls[-1]["options"].replace == transport.send_calls[0]["ref"] @pytest.mark.anyio async def test_handle_message_cancelled_renders_cancelled_state() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2" hold = anyio.Event() runner = ScriptRunner( @@ -724,12 +344,10 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None: engine=CODEX_ENGINE, resume_value=session_id, ) - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) running_tasks: dict = {} @@ -737,9 +355,9 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None: await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text="do something", + incoming=IncomingMessage( + channel_id=123, message_id=10, text="do something" + ), resume_token=None, running_tasks=running_tasks, ) @@ -756,199 +374,37 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None: await running_task.resume_ready.wait() running_task.cancel_requested.set() - assert len(bot.send_calls) == 1 # Progress message - assert len(bot.edit_calls) >= 1 - last_edit = bot.edit_calls[-1]["text"] + assert len(transport.send_calls) == 1 # Progress message + assert len(transport.edit_calls) >= 1 + last_edit = transport.edit_calls[-1]["message"].text assert "cancelled" in last_edit.lower() assert session_id in last_edit @pytest.mark.anyio async def test_handle_message_error_preserves_resume_token() -> None: - from takopi.bridge import BridgeConfig, handle_message - - bot = _FakeBot() + transport = _FakeTransport() session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2" runner = ScriptRunner( [Raise(RuntimeError("boom"))], engine=CODEX_ENGINE, resume_value=session_id, ) - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, + cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), final_notify=True, - startup_msg="", ) await handle_message( cfg, runner=runner, - chat_id=123, - user_msg_id=10, - text="do something", + incoming=IncomingMessage(channel_id=123, message_id=10, text="do something"), resume_token=None, ) - assert bot.edit_calls - last_edit = bot.edit_calls[-1]["text"] + assert transport.edit_calls + last_edit = transport.edit_calls[-1]["message"].text assert "error" in last_edit.lower() assert session_id in last_edit assert "codex resume" in last_edit.lower() - - -@pytest.mark.anyio -async def test_send_with_resume_waits_for_token() -> None: - from takopi.bridge import RunningTask, _send_with_resume - - bot = _FakeBot() - sent: list[tuple[int, int, str, ResumeToken | None]] = [] - - async def enqueue( - chat_id: int, user_msg_id: int, text: str, resume: ResumeToken - ) -> None: - sent.append((chat_id, user_msg_id, text, resume)) - - running_task = RunningTask() - - async def trigger_resume() -> None: - await anyio.sleep(0) - running_task.resume = ResumeToken(engine=CODEX_ENGINE, value="abc123") - running_task.resume_ready.set() - - async with anyio.create_task_group() as tg: - tg.start_soon(trigger_resume) - await _send_with_resume( - bot, - enqueue, - running_task, - 123, - 10, - "hello", - ) - - assert sent == [ - (123, 10, "hello", ResumeToken(engine=CODEX_ENGINE, value="abc123")) - ] - - -@pytest.mark.anyio -async def test_send_with_resume_reports_when_missing() -> None: - from takopi.bridge import RunningTask, _send_with_resume - - bot = _FakeBot() - sent: list[tuple[int, int, str, ResumeToken | None]] = [] - - async def enqueue( - chat_id: int, user_msg_id: int, text: str, resume: ResumeToken - ) -> None: - sent.append((chat_id, user_msg_id, text, resume)) - - running_task = RunningTask() - running_task.done.set() - - await _send_with_resume( - bot, - enqueue, - running_task, - 123, - 10, - "hello", - ) - - assert sent == [] - assert bot.send_calls - assert "resume token" in bot.send_calls[-1]["text"].lower() - - -@pytest.mark.anyio -async def test_run_main_loop_routes_reply_to_running_resume() -> None: - from takopi.bridge import BridgeConfig, run_main_loop - - progress_ready = anyio.Event() - stop_polling = anyio.Event() - reply_ready = anyio.Event() - hold = anyio.Event() - - class _BotWithProgress(_FakeBot): - def __init__(self) -> None: - super().__init__() - self.progress_id: int | None = None - - async def send_message( - self, - chat_id: int, - text: str, - reply_to_message_id: int | None = None, - disable_notification: bool | None = False, - entities: list[dict] | None = None, - parse_mode: str | None = None, - *, - replace_message_id: int | None = None, - ) -> dict: - msg = await super().send_message( - chat_id=chat_id, - text=text, - reply_to_message_id=reply_to_message_id, - disable_notification=disable_notification, - entities=entities, - parse_mode=parse_mode, - replace_message_id=replace_message_id, - ) - if self.progress_id is None and reply_to_message_id is not None: - self.progress_id = int(msg["message_id"]) - progress_ready.set() - return msg - - bot = _BotWithProgress() - resume_value = "abc123" - runner = ScriptRunner( - [Wait(hold), Sleep(0.05), Return(answer="ok")], - engine=CODEX_ENGINE, - resume_value=resume_value, - ) - cfg = BridgeConfig( - bot=_queued_bot(bot), - router=_make_router(runner), - chat_id=123, - final_notify=True, - startup_msg="", - ) - - async def poller(_cfg: BridgeConfig): - yield { - "message_id": 1, - "text": "first", - "chat": {"id": 123}, - "from": {"id": 123}, - } - await progress_ready.wait() - assert bot.progress_id is not None - reply_ready.set() - yield { - "message_id": 2, - "text": "followup", - "chat": {"id": 123}, - "from": {"id": 123}, - "reply_to_message": {"message_id": bot.progress_id}, - } - await stop_polling.wait() - - async with anyio.create_task_group() as tg: - tg.start_soon(run_main_loop, cfg, poller) - try: - with anyio.fail_after(2): - await reply_ready.wait() - await anyio.sleep(0) - hold.set() - with anyio.fail_after(2): - while len(runner.calls) < 2: - await anyio.sleep(0) - assert runner.calls[1][1] == ResumeToken( - engine=CODEX_ENGINE, value=resume_value - ) - finally: - hold.set() - stop_polling.set() - tg.cancel_scope.cancel() diff --git a/tests/test_exec_render.py b/tests/test_exec_render.py index 136f97f..c1b42b1 100644 --- a/tests/test_exec_render.py +++ b/tests/test_exec_render.py @@ -2,18 +2,20 @@ from typing import cast from types import SimpleNamespace from pathlib import Path -from takopi.model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent -from takopi.render import ( - ExecProgressRenderer, +from takopi.markdown import ( + HARD_BREAK, + MarkdownFormatter, STATUS, action_status, assemble_markdown_parts, format_elapsed, format_file_change_title, render_event_cli, - render_markdown, shorten, ) +from takopi.model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent +from takopi.progress import ProgressTracker +from takopi.telegram.render import render_markdown from tests.factories import ( action_completed, action_started, @@ -118,19 +120,21 @@ def test_file_change_renders_relative_paths_inside_cwd() -> None: def test_progress_renderer_renders_progress_and_final() -> None: - r = ExecProgressRenderer( - max_actions=5, resume_formatter=_format_resume, engine="codex" - ) + tracker = ProgressTracker(engine="codex") for evt in SAMPLE_EVENTS: - r.note_event(evt) + tracker.note_event(evt) - progress_parts = r.render_progress_parts(3.0) + state = tracker.snapshot(resume_formatter=_format_resume) + formatter = MarkdownFormatter(max_actions=5) + progress_parts = formatter.render_progress_parts(state, elapsed_s=3.0) progress = assemble_markdown_parts(progress_parts) assert progress.startswith("working · codex · 3s · step 2") assert "✓ `bash -lc ls`" in progress assert "`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" in progress - final_parts = r.render_final_parts(3.0, "answer", status="done") + final_parts = formatter.render_final_parts( + state, elapsed_s=3.0, status="done", answer="answer" + ) final = assemble_markdown_parts(final_parts) assert final.startswith("done · codex · 3s · step 2") assert "✓ `bash -lc ls`" not in final @@ -142,7 +146,7 @@ def test_progress_renderer_renders_progress_and_final() -> None: def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None: - r = ExecProgressRenderer(max_actions=3, command_width=20, engine="codex") + tracker = ProgressTracker(engine="codex") events = [ action_completed( f"item_{i}", @@ -155,19 +159,23 @@ def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None: ] for evt in events: - assert r.note_event(evt) is True + assert tracker.note_event(evt) is True - assert len(r.recent_actions) == 3 - assert "echo 3" in r.recent_actions[0] - assert "echo 5" in r.recent_actions[-1] + state = tracker.snapshot() + formatter = MarkdownFormatter(max_actions=3, command_width=20) + parts = formatter.render_progress_parts(state, elapsed_s=0.0) + lines = parts.body.split(HARD_BREAK) if parts.body else [] + assert len(lines) == 3 + assert "echo 3" in lines[0] + assert "echo 5" in lines[-1] mystery = SimpleNamespace(type="mystery") - assert r.note_event(cast(TakopiEvent, mystery)) is False + assert tracker.note_event(cast(TakopiEvent, mystery)) is False def test_progress_renderer_renders_commands_in_markdown() -> None: - r = ExecProgressRenderer(max_actions=5, command_width=None, engine="codex") + tracker = ProgressTracker(engine="codex") for i in (30, 31, 32): - r.note_event( + tracker.note_event( action_completed( f"item_{i}", "command", @@ -177,7 +185,9 @@ def test_progress_renderer_renders_commands_in_markdown() -> None: ) ) - md = assemble_markdown_parts(r.render_progress_parts(0.0)) + state = tracker.snapshot() + formatter = MarkdownFormatter(max_actions=5, command_width=None) + md = assemble_markdown_parts(formatter.render_progress_parts(state, elapsed_s=0.0)) text, _ = render_markdown(md) assert "✓ echo 30" in text assert "✓ echo 31" in text @@ -185,7 +195,7 @@ def test_progress_renderer_renders_commands_in_markdown() -> None: def test_progress_renderer_handles_duplicate_action_ids() -> None: - r = ExecProgressRenderer(max_actions=5, engine="codex") + tracker = ProgressTracker(engine="codex") events = [ action_started("dup", "command", "echo first"), action_completed( @@ -206,17 +216,19 @@ def test_progress_renderer_handles_duplicate_action_ids() -> None: ] for evt in events: - assert r.note_event(evt) is True + assert tracker.note_event(evt) is True - assert len(r.recent_actions) == 2 - assert r.recent_actions[0].startswith("✓ ") - assert "echo first" in r.recent_actions[0] - assert r.recent_actions[1].startswith("✓ ") - assert "echo second" in r.recent_actions[1] + state = tracker.snapshot() + formatter = MarkdownFormatter(max_actions=5) + parts = formatter.render_progress_parts(state, elapsed_s=0.0) + lines = parts.body.split(HARD_BREAK) if parts.body else [] + assert len(lines) == 1 + assert lines[0].startswith("✓ ") + assert "echo second" in lines[0] def test_progress_renderer_collapses_action_updates() -> None: - r = ExecProgressRenderer(max_actions=5, engine="codex") + tracker = ProgressTracker(engine="codex") events = [ action_started("a-1", "command", "echo one"), action_started("a-1", "command", "echo two"), @@ -230,12 +242,16 @@ def test_progress_renderer_collapses_action_updates() -> None: ] for evt in events: - assert r.note_event(evt) is True + assert tracker.note_event(evt) is True - assert r.action_count == 1 - assert len(r.recent_actions) == 1 - assert r.recent_actions[0].startswith("✓ ") - assert "echo two" in r.recent_actions[0] + assert tracker.action_count == 1 + state = tracker.snapshot() + formatter = MarkdownFormatter(max_actions=5) + parts = formatter.render_progress_parts(state, elapsed_s=0.0) + lines = parts.body.split(HARD_BREAK) if parts.body else [] + assert len(lines) == 1 + assert lines[0].startswith("✓ ") + assert "echo two" in lines[0] def test_progress_renderer_deterministic_output() -> None: @@ -249,16 +265,18 @@ def test_progress_renderer_deterministic_output() -> None: detail={"exit_code": 0}, ), ] - r1 = ExecProgressRenderer(max_actions=5, engine="codex") - r2 = ExecProgressRenderer(max_actions=5, engine="codex") + t1 = ProgressTracker(engine="codex") + t2 = ProgressTracker(engine="codex") for evt in events: - r1.note_event(evt) - r2.note_event(evt) + t1.note_event(evt) + t2.note_event(evt) + f1 = MarkdownFormatter(max_actions=5) + f2 = MarkdownFormatter(max_actions=5) assert assemble_markdown_parts( - r1.render_progress_parts(1.0) - ) == assemble_markdown_parts(r2.render_progress_parts(1.0)) + f1.render_progress_parts(t1.snapshot(), elapsed_s=1.0) + ) == assemble_markdown_parts(f2.render_progress_parts(t2.snapshot(), elapsed_s=1.0)) def test_format_elapsed_branches() -> None: @@ -319,9 +337,9 @@ def test_render_event_cli_ignores_turn_actions() -> None: def test_progress_renderer_ignores_missing_action_id() -> None: - renderer = ExecProgressRenderer(engine="codex") + tracker = ProgressTracker(engine="codex") resume = ResumeToken(engine="codex", value="abc") - renderer.note_event(StartedEvent(engine="codex", resume=resume, title="Session")) + tracker.note_event(StartedEvent(engine="codex", resume=resume, title="Session")) event = ActionEvent( engine="codex", @@ -329,7 +347,10 @@ def test_progress_renderer_ignores_missing_action_id() -> None: phase="started", ok=None, ) - assert renderer.note_event(event) is False + assert tracker.note_event(event) is False - header = assemble_markdown_parts(renderer.render_progress_parts(0.0)) + formatter = MarkdownFormatter() + header = assemble_markdown_parts( + formatter.render_progress_parts(tracker.snapshot(), elapsed_s=0.0) + ) assert header.startswith("working · codex · 0s") diff --git a/tests/test_onboarding.py b/tests/test_onboarding.py index 442917e..0458ad9 100644 --- a/tests/test_onboarding.py +++ b/tests/test_onboarding.py @@ -2,7 +2,8 @@ from __future__ import annotations from pathlib import Path -from takopi import engines, onboarding +from takopi import engines +from takopi.telegram import onboarding def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None: diff --git a/tests/test_onboarding_interactive.py b/tests/test_onboarding_interactive.py index 1b5701b..7788500 100644 --- a/tests/test_onboarding_interactive.py +++ b/tests/test_onboarding_interactive.py @@ -1,6 +1,6 @@ from __future__ import annotations -from takopi import onboarding +from takopi.telegram import onboarding from takopi.backends import EngineBackend diff --git a/tests/test_rendering.py b/tests/test_rendering.py index 741309d..25342ba 100644 --- a/tests/test_rendering.py +++ b/tests/test_rendering.py @@ -1,4 +1,4 @@ -from takopi.render import render_markdown +from takopi.telegram.render import render_markdown def test_render_markdown_basic_entities() -> None: diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py new file mode 100644 index 0000000..6e34300 --- /dev/null +++ b/tests/test_telegram_bridge.py @@ -0,0 +1,575 @@ +import anyio +import pytest + +from takopi.telegram.bridge import ( + TelegramBridgeConfig, + TelegramTransport, + _build_bot_commands, + _handle_cancel, + _is_cancel_command, + _send_with_resume, + _strip_engine_command, + run_main_loop, +) +from takopi.runner_bridge import ExecBridgeConfig, RunningTask +from takopi.markdown import MarkdownPresenter +from takopi.model import EngineId, ResumeToken +from takopi.router import AutoRouter, RunnerEntry +from takopi.runners.mock import Return, ScriptRunner, Sleep, Wait +from takopi.transport import MessageRef, RenderedMessage, SendOptions + +CODEX_ENGINE = EngineId("codex") + + +def _make_router(runner) -> AutoRouter: + return AutoRouter( + entries=[RunnerEntry(engine=runner.engine, runner=runner)], + default_engine=runner.engine, + ) + + +class _FakeTransport: + def __init__(self, progress_ready: anyio.Event | None = None) -> None: + self._next_id = 1 + self.send_calls: list[dict] = [] + self.edit_calls: list[dict] = [] + self.delete_calls: list[MessageRef] = [] + self.progress_ready = progress_ready + self.progress_ref: MessageRef | None = None + + async def send( + self, + *, + channel_id: int | str, + message: RenderedMessage, + options: SendOptions | None = None, + ) -> MessageRef: + ref = MessageRef(channel_id=channel_id, message_id=self._next_id) + self._next_id += 1 + self.send_calls.append( + { + "ref": ref, + "channel_id": channel_id, + "message": message, + "options": options, + } + ) + if ( + self.progress_ref is None + and options is not None + and options.reply_to is not None + and options.notify is False + ): + self.progress_ref = ref + if self.progress_ready is not None: + self.progress_ready.set() + return ref + + async def edit( + self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True + ) -> MessageRef: + self.edit_calls.append({"ref": ref, "message": message, "wait": wait}) + return ref + + async def delete(self, *, ref: MessageRef) -> bool: + self.delete_calls.append(ref) + return True + + async def close(self) -> None: + return None + + +class _FakeBot: + def __init__(self) -> None: + self.command_calls: list[dict] = [] + self.send_calls: list[dict] = [] + self.edit_calls: list[dict] = [] + self.delete_calls: list[dict] = [] + + async def get_updates( + self, + offset: int | None, + timeout_s: int = 50, + allowed_updates: list[str] | None = None, + ) -> list[dict] | None: + _ = offset + _ = timeout_s + _ = allowed_updates + return [] + + async def send_message( + self, + chat_id: int, + text: str, + reply_to_message_id: int | None = None, + disable_notification: bool | None = False, + entities: list[dict] | None = None, + parse_mode: str | None = None, + *, + replace_message_id: int | None = None, + ) -> dict: + self.send_calls.append( + { + "chat_id": chat_id, + "text": text, + "reply_to_message_id": reply_to_message_id, + "disable_notification": disable_notification, + "entities": entities, + "parse_mode": parse_mode, + "replace_message_id": replace_message_id, + } + ) + return {"message_id": 1} + + async def edit_message_text( + self, + chat_id: int, + message_id: int, + text: str, + entities: list[dict] | None = None, + parse_mode: str | None = None, + *, + wait: bool = True, + ) -> dict: + self.edit_calls.append( + { + "chat_id": chat_id, + "message_id": message_id, + "text": text, + "entities": entities, + "parse_mode": parse_mode, + "wait": wait, + } + ) + return {"message_id": message_id} + + async def delete_message(self, chat_id: int, message_id: int) -> bool: + self.delete_calls.append({"chat_id": chat_id, "message_id": message_id}) + return True + + async def set_my_commands( + self, + commands: list[dict], + *, + scope: dict | None = None, + language_code: str | None = None, + ) -> bool: + self.command_calls.append( + { + "commands": commands, + "scope": scope, + "language_code": language_code, + } + ) + return True + + async def get_me(self) -> dict | None: + return {"id": 1} + + async def close(self) -> None: + return None + + +def _make_cfg( + transport: _FakeTransport, runner: ScriptRunner | None = None +) -> TelegramBridgeConfig: + if runner is None: + runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + return TelegramBridgeConfig( + bot=_FakeBot(), + router=_make_router(runner), + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + ) + + +def test_strip_engine_command_inline() -> None: + text, engine = _strip_engine_command( + "/claude do it", engine_ids=("codex", "claude") + ) + assert engine == "claude" + assert text == "do it" + + +def test_strip_engine_command_newline() -> None: + text, engine = _strip_engine_command( + "/codex\nhello", engine_ids=("codex", "claude") + ) + assert engine == "codex" + assert text == "hello" + + +def test_strip_engine_command_ignores_unknown() -> None: + text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude")) + assert engine is None + assert text == "/unknown hi" + + +def test_strip_engine_command_bot_suffix() -> None: + text, engine = _strip_engine_command( + "/claude@bunny_agent_bot hi", engine_ids=("claude",) + ) + assert engine == "claude" + assert text == "hi" + + +def test_strip_engine_command_only_first_non_empty_line() -> None: + text, engine = _strip_engine_command( + "hello\n/claude hi", engine_ids=("codex", "claude") + ) + assert engine is None + assert text == "hello\n/claude hi" + + +def test_build_bot_commands_includes_cancel_and_engine() -> None: + runner = ScriptRunner( + [Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid" + ) + router = _make_router(runner) + commands = _build_bot_commands(router) + + assert {"command": "cancel", "description": "cancel run"} in commands + assert any(cmd["command"] == "codex" for cmd in commands) + + +@pytest.mark.anyio +async def test_telegram_transport_passes_replace_and_wait() -> None: + bot = _FakeBot() + transport = TelegramTransport(bot) + reply = MessageRef(channel_id=123, message_id=10) + replace = MessageRef(channel_id=123, message_id=11) + + await transport.send( + channel_id=123, + message=RenderedMessage(text="hello"), + options=SendOptions(reply_to=reply, notify=True, replace=replace), + ) + assert bot.send_calls + assert bot.send_calls[0]["replace_message_id"] == 11 + + await transport.edit( + ref=replace, + message=RenderedMessage(text="edit"), + wait=False, + ) + assert bot.edit_calls + assert bot.edit_calls[0]["wait"] is False + + +@pytest.mark.anyio +async def test_telegram_transport_edit_wait_false_returns_ref() -> None: + class _OutboxBot: + def __init__(self) -> None: + self.edit_calls: list[dict[str, object]] = [] + + async def get_updates( + self, + offset: int | None, + timeout_s: int = 50, + allowed_updates: list[str] | None = None, + ) -> list[dict] | None: + return None + + async def send_message( + self, + chat_id: int, + text: str, + reply_to_message_id: int | None = None, + disable_notification: bool | None = False, + entities: list[dict] | None = None, + parse_mode: str | None = None, + *, + replace_message_id: int | None = None, + ) -> dict | None: + return None + + async def edit_message_text( + self, + chat_id: int, + message_id: int, + text: str, + entities: list[dict] | None = None, + parse_mode: str | None = None, + *, + wait: bool = True, + ) -> dict | None: + self.edit_calls.append( + { + "chat_id": chat_id, + "message_id": message_id, + "text": text, + "entities": entities, + "parse_mode": parse_mode, + "wait": wait, + } + ) + if not wait: + return None + return {"message_id": message_id} + + async def delete_message( + self, + chat_id: int, + message_id: int, + ) -> bool: + return False + + async def set_my_commands( + self, + commands: list[dict[str, object]], + *, + scope: dict[str, object] | None = None, + language_code: str | None = None, + ) -> bool: + return False + + async def get_me(self) -> dict | None: + return None + + async def close(self) -> None: + return None + + bot = _OutboxBot() + transport = TelegramTransport(bot) + ref = MessageRef(channel_id=123, message_id=1) + + result = await transport.edit( + ref=ref, + message=RenderedMessage(text="edit"), + wait=False, + ) + + assert result == ref + assert bot.edit_calls + assert bot.edit_calls[0]["wait"] is False + + +@pytest.mark.anyio +async def test_handle_cancel_without_reply_prompts_user() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + msg = {"chat": {"id": 123}, "message_id": 10} + running_tasks: dict = {} + + await _handle_cancel(cfg, msg, running_tasks) + + assert len(transport.send_calls) == 1 + assert "reply to the progress message" in transport.send_calls[0]["message"].text + + +@pytest.mark.anyio +async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + msg = { + "chat": {"id": 123}, + "message_id": 10, + "reply_to_message": {"text": "no message id"}, + } + running_tasks: dict = {} + + await _handle_cancel(cfg, msg, running_tasks) + + assert len(transport.send_calls) == 1 + assert "nothing is currently running" in transport.send_calls[0]["message"].text + + +@pytest.mark.anyio +async def test_handle_cancel_with_finished_task_says_nothing_running() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + progress_id = 99 + msg = { + "chat": {"id": 123}, + "message_id": 10, + "reply_to_message": {"message_id": progress_id}, + } + running_tasks: dict = {} + + await _handle_cancel(cfg, msg, running_tasks) + + assert len(transport.send_calls) == 1 + assert "nothing is currently running" in transport.send_calls[0]["message"].text + + +@pytest.mark.anyio +async def test_handle_cancel_cancels_running_task() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + progress_id = 42 + msg = { + "chat": {"id": 123}, + "message_id": 10, + "reply_to_message": {"message_id": progress_id}, + } + + running_task = RunningTask() + running_tasks = {MessageRef(channel_id=123, message_id=progress_id): running_task} + await _handle_cancel(cfg, msg, running_tasks) + + assert running_task.cancel_requested.is_set() is True + assert len(transport.send_calls) == 0 # No error message sent + + +@pytest.mark.anyio +async def test_handle_cancel_only_cancels_matching_progress_message() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + task_first = RunningTask() + task_second = RunningTask() + msg = { + "chat": {"id": 123}, + "message_id": 10, + "reply_to_message": {"message_id": 1}, + } + running_tasks = { + MessageRef(channel_id=123, message_id=1): task_first, + MessageRef(channel_id=123, message_id=2): task_second, + } + + await _handle_cancel(cfg, msg, running_tasks) + + assert task_first.cancel_requested.is_set() is True + assert task_second.cancel_requested.is_set() is False + assert len(transport.send_calls) == 0 + + +def test_cancel_command_accepts_extra_text() -> None: + assert _is_cancel_command("/cancel now") is True + assert _is_cancel_command("/cancel@takopi please") is True + assert _is_cancel_command("/cancelled") is False + + +@pytest.mark.anyio +async def test_send_with_resume_waits_for_token() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + sent: list[tuple[int, int, str, ResumeToken | None]] = [] + + async def enqueue( + chat_id: int, user_msg_id: int, text: str, resume: ResumeToken + ) -> None: + sent.append((chat_id, user_msg_id, text, resume)) + + running_task = RunningTask() + + async def trigger_resume() -> None: + await anyio.sleep(0) + running_task.resume = ResumeToken(engine=CODEX_ENGINE, value="abc123") + running_task.resume_ready.set() + + async with anyio.create_task_group() as tg: + tg.start_soon(trigger_resume) + await _send_with_resume( + cfg, + enqueue, + running_task, + 123, + 10, + "hello", + ) + + assert sent == [ + (123, 10, "hello", ResumeToken(engine=CODEX_ENGINE, value="abc123")) + ] + assert transport.send_calls == [] + + +@pytest.mark.anyio +async def test_send_with_resume_reports_when_missing() -> None: + transport = _FakeTransport() + cfg = _make_cfg(transport) + sent: list[tuple[int, int, str, ResumeToken | None]] = [] + + async def enqueue( + chat_id: int, user_msg_id: int, text: str, resume: ResumeToken + ) -> None: + sent.append((chat_id, user_msg_id, text, resume)) + + running_task = RunningTask() + running_task.done.set() + + await _send_with_resume( + cfg, + enqueue, + running_task, + 123, + 10, + "hello", + ) + + assert sent == [] + assert transport.send_calls + assert "resume token" in transport.send_calls[-1]["message"].text.lower() + + +@pytest.mark.anyio +async def test_run_main_loop_routes_reply_to_running_resume() -> None: + progress_ready = anyio.Event() + stop_polling = anyio.Event() + reply_ready = anyio.Event() + hold = anyio.Event() + + transport = _FakeTransport(progress_ready=progress_ready) + bot = _FakeBot() + resume_value = "abc123" + runner = ScriptRunner( + [Wait(hold), Sleep(0.05), Return(answer="ok")], + engine=CODEX_ENGINE, + resume_value=resume_value, + ) + exec_cfg = ExecBridgeConfig( + transport=transport, + presenter=MarkdownPresenter(), + final_notify=True, + ) + cfg = TelegramBridgeConfig( + bot=bot, + router=_make_router(runner), + chat_id=123, + startup_msg="", + exec_cfg=exec_cfg, + ) + + async def poller(_cfg: TelegramBridgeConfig): + yield { + "message_id": 1, + "text": "first", + "chat": {"id": 123}, + "from": {"id": 123}, + } + await progress_ready.wait() + assert transport.progress_ref is not None + reply_ready.set() + yield { + "message_id": 2, + "text": "followup", + "chat": {"id": 123}, + "from": {"id": 123}, + "reply_to_message": {"message_id": transport.progress_ref.message_id}, + } + await stop_polling.wait() + + async with anyio.create_task_group() as tg: + tg.start_soon(run_main_loop, cfg, poller) + try: + with anyio.fail_after(2): + await reply_ready.wait() + await anyio.sleep(0) + hold.set() + with anyio.fail_after(2): + while len(runner.calls) < 2: + await anyio.sleep(0) + assert runner.calls[1][1] == ResumeToken( + engine=CODEX_ENGINE, value=resume_value + ) + finally: + hold.set() + stop_polling.set() + tg.cancel_scope.cancel() diff --git a/tests/test_telegram_client.py b/tests/test_telegram_client.py index f7e2e88..9b2da4e 100644 --- a/tests/test_telegram_client.py +++ b/tests/test_telegram_client.py @@ -2,7 +2,7 @@ import httpx import pytest from takopi.logging import setup_logging -from takopi.telegram import TelegramClient, TelegramRetryAfter +from takopi.telegram.client import TelegramClient, TelegramRetryAfter @pytest.mark.anyio diff --git a/tests/test_telegram_queue.py b/tests/test_telegram_queue.py index 928419d..67fab50 100644 --- a/tests/test_telegram_queue.py +++ b/tests/test_telegram_queue.py @@ -1,7 +1,7 @@ import anyio import pytest -from takopi.telegram import TelegramClient, TelegramRetryAfter +from takopi.telegram.client import TelegramClient, TelegramRetryAfter class _FakeBot: