refactor!: split telegram bridge into transport/presenter (#55)

This commit is contained in:
banteg
2026-01-06 02:14:36 +04:00
committed by GitHub
parent a8eb1290cc
commit 1178b738df
26 changed files with 2338 additions and 1833 deletions
+5
View File
@@ -0,0 +1,5 @@
check:
uv run ruff format --check
uv run ruff check .
uv run ty check .
uv run pytest
-7
View File
@@ -1,7 +0,0 @@
.PHONY: check
check:
uv run ruff format --check
uv run ruff check .
uv run ty check .
uv run pytest
+11
View File
@@ -1,5 +1,16 @@
# changelog # changelog
## v0.9.0 (unreleased)
### breaking
- remove `takopi.bridge`; Telegram bridge now lives in `takopi.bridges.telegram`
### changes
- add transport/presenter protocols plus transport-agnostic `exec_bridge`
- move Telegram polling + wiring into `takopi.bridges.telegram` with transport/presenter adapters
## v0.8.0 (2026-01-05) ## v0.8.0 (2026-01-05)
### changes ### changes
+68 -23
View File
@@ -23,7 +23,7 @@ uv run ruff check src tests
uv run ty check . uv run ty check .
# Or all at once # Or all at once
make check just check
``` ```
Takopi runs in **auto-router** mode by default. `default_engine` in `takopi.toml` selects Takopi runs in **auto-router** mode by default. `default_engine` in `takopi.toml` selects
@@ -31,47 +31,86 @@ the engine for new threads; engine subcommands override that default for the pro
## Module Responsibilities ## Module Responsibilities
### `bridge.py` - Telegram bridge loop ### `runner_bridge.py` - Transport-agnostic orchestration
The orchestrator module containing: The core handler module containing:
| Component | Purpose | | Component | Purpose |
|-----------|---------| |-----------|---------|
| `BridgeConfig` | Frozen dataclass holding runtime config | | `ExecBridgeConfig` | Frozen dataclass holding transport + presenter config |
| `poll_updates()` | Async generator that drains backlog, long-polls updates, filters messages | | `IncomingMessage` | Normalized incoming message shape |
| `run_main_loop()` | TaskGroup-based main loop that spawns per-message handlers |
| `handle_message()` | Per-message handler with progress updates and final render | | `handle_message()` | Per-message handler with progress updates and final render |
| `ProgressEdits` | Throttled progress edit worker | | `ProgressEdits` | Throttled progress edit worker |
| `RunningTask` | Cancellation + resume coordination for in-flight runs |
**Key patterns:**
- Progress edits are best-effort and only run when new events arrive (Telegram outbox handles rate limiting/coalescing)
- Resume tokens are runner-formatted command lines (e.g., `` `codex resume <token>` ``, `` `claude --resume <token>` ``, `` `pi --session <path>` ``)
- Resume lines are stripped from the prompt before invoking the runner
- Errors/cancellation render final status while preserving resume tokens when known
### `telegram/bridge.py` - Telegram bridge loop
The Telegram adapter module containing:
| Component | Purpose |
|-----------|---------|
| `TelegramBridgeConfig` | Frozen dataclass holding bot + router + exec config |
| `TelegramTransport` | `BotClient``Transport` adapter |
| `TelegramPresenter` | `ProgressState``RenderedMessage` adapter |
| `poll_updates()` | Async generator that drains backlog, long-polls updates, filters messages |
| `run_main_loop()` | TaskGroup-based main loop that spawns per-message handlers |
| `_handle_cancel()` | `/cancel` routing | | `_handle_cancel()` | `/cancel` routing |
**Key patterns:** **Key patterns:**
- Bridge schedules runs FIFO per thread to avoid concurrent progress messages; runner locks enforce per-thread serialization - Bridge schedules runs FIFO per thread to avoid concurrent progress messages; runner locks enforce per-thread serialization
- `/cancel` routes by reply-to progress message id (accepts extra text) - `/cancel` routes by reply-to progress message id (accepts extra text)
- `/{engine}` on the first line selects the engine for new threads - `/{engine}` on the first line selects the engine for new threads
- Progress edits are throttled to 2s intervals and only run when new events arrive
- Resume tokens are runner-formatted command lines (e.g., `` `codex resume <token>` ``, `` `claude --resume <token>` ``, `` `pi --session <path>` ``)
- Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match - Resume parsing polls all runners via `AutoRouter.resolve_resume()` and routes to the first match
- Bot command menu is synced on startup (`cancel` + engine commands) - Bot command menu is synced on startup (`cancel` + engine commands)
### `transport.py` - Transport protocol
Defines `Transport`, `MessageRef`, `RenderedMessage`, and `SendOptions`.
### `presenter.py` - Presenter protocol
Defines a renderer that converts `ProgressState` into `RenderedMessage` outputs.
### `cli.py` - CLI entry point ### `cli.py` - CLI entry point
| Component | Purpose | | Component | Purpose |
|-----------|---------| |-----------|---------|
| `run()` / `main()` | Typer CLI entry points | | `run()` / `main()` | Typer CLI entry points |
| `_parse_bridge_config()` | Reads config + builds `BridgeConfig` | | `_parse_bridge_config()` | Reads config + builds `TelegramBridgeConfig` + `ExecBridgeConfig` |
### `render.py` - Takopi event + Markdown helpers ### `progress.py` - Progress tracking
| Function/Class | Purpose |
|----------------|---------|
| `ProgressTracker` | Stateful reducer of takopi events into progress snapshots |
| `ProgressState` | Snapshot of actions, resume token, and engine metadata |
### `markdown.py` - Markdown formatting
| Function/Class | Purpose |
|----------------|---------|
| `MarkdownFormatter` | Converts `ProgressState` into MarkdownParts |
| `MarkdownPresenter` | `ProgressState``RenderedMessage` (markdown text) |
| `MarkdownParts` | Header/body/footer building blocks for markdown output |
| `assemble_markdown_parts()` | Join MarkdownParts into a single markdown string |
| `render_event_cli()` | Format a takopi event for CLI logs |
| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` |
### `telegram/render.py` - Telegram markdown rendering
| Function/Class | Purpose | | Function/Class | Purpose |
|----------------|---------| |----------------|---------|
| `render_markdown()` | Markdown → Telegram text + entities | | `render_markdown()` | Markdown → Telegram text + entities |
| `trim_body()` | Trim body to 3500 chars (header/footer preserved) | | `trim_body()` | Trim body to 3500 chars (header/footer preserved) |
| `prepare_telegram()` | Trim + render Markdown parts for Telegram | | `prepare_telegram()` | Trim + render Markdown parts for Telegram |
| `ExecProgressRenderer` | Stateful renderer tracking recent actions for progress display |
| `render_event_cli()` | Format a takopi event for CLI logs |
| `format_elapsed()` | Formats seconds as `Xh Ym`, `Xm Ys`, or `Xs` |
### `telegram.py` - Telegram API wrapper ### `telegram/client.py` - Telegram API wrapper
| Component | Purpose | | Component | Purpose |
|-----------|---------| |-----------|---------|
@@ -184,7 +223,13 @@ Self-documenting msgspec schemas for decoding engine JSONL streams.
|----------|---------| |----------|---------|
| `install_issue()` | Creates `SetupIssue` with install instructions for missing CLI | | `install_issue()` | Creates `SetupIssue` with install instructions for missing CLI |
### `config.py` - Configuration loading ### `config.py` - Shared configuration errors
```python
class ConfigError(RuntimeError): ...
```
### `telegram/config.py` - Configuration loading
```python ```python
def load_telegram_config() -> tuple[dict, Path]: def load_telegram_config() -> tuple[dict, Path]:
@@ -210,7 +255,7 @@ Environment flags:
CLI flag: `--debug` enables debug logging (overrides `TAKOPI_LOG_LEVEL`). CLI flag: `--debug` enables debug logging (overrides `TAKOPI_LOG_LEVEL`).
### `onboarding.py` - Setup validation ### `telegram/onboarding.py` - Setup validation
```python ```python
def check_setup(backend: EngineBackend) -> SetupResult: def check_setup(backend: EngineBackend) -> SetupResult:
@@ -231,15 +276,15 @@ See `docs/adding-a-runner.md` for the full guide and a worked example.
``` ```
Telegram Update Telegram Update
poll_updates() drains backlog, long-polls, filters chat_id == cfg.chat_id telegram/bridge.poll_updates() drains backlog, long-polls, filters chat_id == cfg.chat_id
run_main_loop() spawns tasks in TaskGroup telegram/bridge.run_main_loop() spawns tasks in TaskGroup
router.resolve_resume(text, reply_text) → ResumeToken | None router.resolve_resume(text, reply_text) → ResumeToken | None
router.entry_for(resume_token) or router.entry_for_engine(default) → RunnerEntry router.entry_for(resume_token) or router.entry_for_engine(override/default) → RunnerEntry
handle_message() spawned as task with selected runner runner_bridge.handle_message() spawned as task with selected runner
Send initial progress message (silent) Send initial progress message (silent)
@@ -249,14 +294,14 @@ runner.run(prompt, resume_token)
├── Normalizes JSONL -> takopi events ├── Normalizes JSONL -> takopi events
├── Yields Takopi events (async iterator) ├── Yields Takopi events (async iterator)
│ ↓ │ ↓
ExecProgressRenderer.note_event() │ ProgressTracker.note_event()
│ ↓ │ ↓
│ ProgressEdits throttled edit_message_text() │ ProgressEdits best-effort transport.edit(wait=False)
└── Ends with completed(resume, ok, answer) └── Ends with completed(resume, ok, answer)
render_final() with resume line (runner-formatted) render_final() with resume line (runner-formatted)
Send/edit final message transport.send()/edit() final message, delete progress if needed
``` ```
### Resume Flow ### Resume Flow
-922
View File
@@ -1,922 +0,0 @@
"""Telegram bridge orchestration for running runners and streaming progress."""
from __future__ import annotations
import time
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any
import anyio
from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent
from .logging import bind_run_context, clear_context, get_logger
from .render import (
ExecProgressRenderer,
MarkdownParts,
assemble_markdown_parts,
prepare_telegram,
render_event_cli,
)
from .router import AutoRouter, RunnerUnavailableError
from .runner import Runner
from .scheduler import ThreadJob, ThreadScheduler
from .telegram import BotClient
logger = get_logger(__name__)
def _log_runner_event(evt: TakopiEvent) -> None:
for line in render_event_cli(evt):
logger.debug(
"runner.event.cli",
line=line,
event_type=getattr(evt, "type", None),
engine=getattr(evt, "engine", None),
)
def _is_cancel_command(text: str) -> bool:
stripped = text.strip()
if not stripped:
return False
command = stripped.split(maxsplit=1)[0]
return command == "/cancel" or command.startswith("/cancel@")
def _strip_engine_command(
text: str, *, engine_ids: tuple[EngineId, ...]
) -> tuple[str, EngineId | None]:
if not text:
return text, None
if not engine_ids:
return text, None
engine_map = {engine.lower(): engine for engine in engine_ids}
lines = text.splitlines()
idx = next((i for i, line in enumerate(lines) if line.strip()), None)
if idx is None:
return text, None
line = lines[idx].lstrip()
if not line.startswith("/"):
return text, None
parts = line.split(maxsplit=1)
command = parts[0][1:]
if "@" in command:
command = command.split("@", 1)[0]
engine = engine_map.get(command.lower())
if engine is None:
return text, None
remainder = parts[1] if len(parts) > 1 else ""
if remainder:
lines[idx] = remainder
else:
lines.pop(idx)
return "\n".join(lines).strip(), engine
def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]:
commands: list[dict[str, str]] = []
seen: set[str] = set()
for entry in router.available_entries:
cmd = entry.engine.lower()
if cmd in seen:
continue
commands.append({"command": cmd, "description": f"start {cmd}"})
seen.add(cmd)
if "cancel" not in seen:
commands.append({"command": "cancel", "description": "cancel run"})
return commands
async def _set_command_menu(cfg: BridgeConfig) -> None:
commands = _build_bot_commands(cfg.router)
if not commands:
return
try:
ok = await cfg.bot.set_my_commands(commands)
except Exception as exc:
logger.info(
"startup.command_menu.failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
return
if not ok:
logger.info("startup.command_menu.rejected")
return
logger.info(
"startup.command_menu.updated",
commands=[cmd["command"] for cmd in commands],
)
def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str:
stripped_lines: list[str] = []
for line in text.splitlines():
if is_resume_line(line):
continue
stripped_lines.append(line)
prompt = "\n".join(stripped_lines).strip()
return prompt or "continue"
def _flatten_exception_group(error: BaseException) -> list[BaseException]:
if isinstance(error, BaseExceptionGroup):
flattened: list[BaseException] = []
for exc in error.exceptions:
flattened.extend(_flatten_exception_group(exc))
return flattened
return [error]
def _format_error(error: Exception) -> str:
cancel_exc = anyio.get_cancelled_exc_class()
flattened = [
exc
for exc in _flatten_exception_group(error)
if not isinstance(exc, cancel_exc)
]
if len(flattened) == 1:
return str(flattened[0]) or flattened[0].__class__.__name__
if not flattened:
return str(error) or error.__class__.__name__
messages = [str(exc) for exc in flattened if str(exc)]
if not messages:
return str(error) or error.__class__.__name__
if len(messages) == 1:
return messages[0]
return "\n".join(messages)
async def _send_or_edit_markdown(
bot: BotClient,
*,
chat_id: int,
parts: MarkdownParts,
edit_message_id: int | None = None,
reply_to_message_id: int | None = None,
replace_message_id: int | None = None,
disable_notification: bool = False,
prepared: tuple[str, list[dict[str, Any]]] | None = None,
) -> tuple[dict[str, Any] | None, bool]:
if prepared is None:
rendered, entities = prepare_telegram(parts)
else:
rendered, entities = prepared
if edit_message_id is not None:
logger.debug(
"telegram.edit_message",
chat_id=chat_id,
message_id=edit_message_id,
rendered=rendered,
)
edited = await bot.edit_message_text(
chat_id=chat_id,
message_id=edit_message_id,
text=rendered,
entities=entities,
)
if edited is not None:
return (edited, True)
logger.debug(
"telegram.send_message",
chat_id=chat_id,
reply_to_message_id=reply_to_message_id,
rendered=rendered,
)
return (
await bot.send_message(
chat_id=chat_id,
text=rendered,
entities=entities,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
replace_message_id=replace_message_id,
),
False,
)
class ProgressEdits:
def __init__(
self,
*,
bot: BotClient,
chat_id: int,
progress_id: int | None,
renderer: ExecProgressRenderer,
started_at: float,
clock: Callable[[], float],
last_rendered: str | None,
) -> None:
self.bot = bot
self.chat_id = chat_id
self.progress_id = progress_id
self.renderer = renderer
self.started_at = started_at
self.clock = clock
self.last_rendered = last_rendered
self.event_seq = 0
self.rendered_seq = 0
self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1)
async def run(self) -> None:
if self.progress_id is None:
return
while True:
while self.rendered_seq == self.event_seq:
try:
await self.signal_recv.receive()
except anyio.EndOfStream:
return
seq_at_render = self.event_seq
now = self.clock()
parts = self.renderer.render_progress_parts(now - self.started_at)
rendered, entities = prepare_telegram(parts)
if rendered != self.last_rendered:
logger.debug(
"telegram.edit_message",
chat_id=self.chat_id,
message_id=self.progress_id,
rendered=rendered,
)
await self.bot.edit_message_text(
chat_id=self.chat_id,
message_id=self.progress_id,
text=rendered,
entities=entities,
wait=False,
)
self.last_rendered = rendered
self.rendered_seq = seq_at_render
async def on_event(self, evt: TakopiEvent) -> None:
if not self.renderer.note_event(evt):
return
if self.progress_id is None:
return
self.event_seq += 1
try:
self.signal_send.send_nowait(None)
except anyio.WouldBlock:
pass
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
pass
@dataclass(frozen=True)
class BridgeConfig:
bot: BotClient
router: AutoRouter
chat_id: int
final_notify: bool
startup_msg: str
@dataclass
class RunningTask:
resume: ResumeToken | None = None
resume_ready: anyio.Event = field(default_factory=anyio.Event)
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
done: anyio.Event = field(default_factory=anyio.Event)
async def _send_startup(cfg: BridgeConfig) -> None:
logger.debug("startup.message", text=cfg.startup_msg)
sent, _ = await _send_or_edit_markdown(
cfg.bot,
chat_id=cfg.chat_id,
parts=MarkdownParts(header=cfg.startup_msg),
)
if sent is not None:
logger.info("startup.sent", chat_id=cfg.chat_id)
async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
drained = 0
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
if updates is None:
logger.info("startup.backlog.failed")
return offset
logger.debug("startup.backlog.updates", updates=updates)
if not updates:
if drained:
logger.info("startup.backlog.drained", count=drained)
return offset
offset = updates[-1]["update_id"] + 1
drained += len(updates)
@dataclass(frozen=True, slots=True)
class ProgressMessageState:
message_id: int | None
last_rendered: str | None
async def send_initial_progress(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
label: str,
renderer: ExecProgressRenderer,
clock: Callable[[], float],
) -> ProgressMessageState:
progress_id: int | None = None
last_rendered: str | None = None
initial_parts = renderer.render_progress_parts(0.0, label=label)
initial_rendered, initial_entities = prepare_telegram(initial_parts)
logger.debug(
"telegram.send_message",
chat_id=chat_id,
reply_to_message_id=user_msg_id,
rendered=initial_rendered,
)
progress_msg = await cfg.bot.send_message(
chat_id=chat_id,
text=initial_rendered,
entities=initial_entities,
reply_to_message_id=user_msg_id,
disable_notification=True,
)
if progress_msg is not None:
progress_id = int(progress_msg["message_id"])
last_rendered = initial_rendered
logger.debug(
"progress.sent",
chat_id=chat_id,
message_id=progress_id,
)
return ProgressMessageState(
message_id=progress_id,
last_rendered=last_rendered,
)
@dataclass(slots=True)
class RunOutcome:
cancelled: bool = False
completed: CompletedEvent | None = None
resume: ResumeToken | None = None
async def run_runner_with_cancel(
runner: Runner,
*,
prompt: str,
resume_token: ResumeToken | None,
edits: ProgressEdits,
running_task: RunningTask | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None,
) -> RunOutcome:
outcome = RunOutcome()
async with anyio.create_task_group() as tg:
async def run_runner() -> None:
try:
async for evt in runner.run(prompt, resume_token):
_log_runner_event(evt)
if isinstance(evt, StartedEvent):
outcome.resume = evt.resume
bind_run_context(resume=evt.resume.value)
if running_task is not None and running_task.resume is None:
running_task.resume = evt.resume
running_task.resume_ready.set()
if on_thread_known is not None:
await on_thread_known(evt.resume, running_task.done)
elif isinstance(evt, CompletedEvent):
outcome.resume = evt.resume or outcome.resume
outcome.completed = evt
await edits.on_event(evt)
finally:
tg.cancel_scope.cancel()
async def wait_cancel(task: RunningTask) -> None:
await task.cancel_requested.wait()
outcome.cancelled = True
tg.cancel_scope.cancel()
tg.start_soon(run_runner)
if running_task is not None:
tg.start_soon(wait_cancel, running_task)
return outcome
def sync_resume_token(
renderer: ExecProgressRenderer, resume: ResumeToken | None
) -> ResumeToken | None:
resume = resume or renderer.resume_token
renderer.resume_token = resume
return resume
async def send_result_message(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
progress_id: int | None,
parts: MarkdownParts,
disable_notification: bool,
edit_message_id: int | None,
prepared: tuple[str, list[dict[str, Any]]] | None = None,
) -> None:
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
parts=parts,
edit_message_id=edit_message_id,
reply_to_message_id=user_msg_id,
replace_message_id=progress_id,
disable_notification=disable_notification,
prepared=prepared,
)
if final_msg is None:
return
async def handle_message(
cfg: BridgeConfig,
*,
runner: Runner,
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
strip_resume_line: Callable[[str], bool] | None = None,
running_tasks: dict[int, RunningTask] | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
clock: Callable[[], float] = time.monotonic,
) -> None:
logger.info(
"handle.incoming",
chat_id=chat_id,
user_msg_id=user_msg_id,
resume=resume_token.value if resume_token else None,
text=text,
)
started_at = clock()
is_resume_line = runner.is_resume_line
resume_strip = strip_resume_line or is_resume_line
runner_text = _strip_resume_lines(text, is_resume_line=resume_strip)
progress_renderer = ExecProgressRenderer(
max_actions=5, resume_formatter=runner.format_resume, engine=runner.engine
)
progress_state = await send_initial_progress(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
label="starting",
renderer=progress_renderer,
clock=clock,
)
progress_id = progress_state.message_id
edits = ProgressEdits(
bot=cfg.bot,
chat_id=chat_id,
progress_id=progress_id,
renderer=progress_renderer,
started_at=started_at,
clock=clock,
last_rendered=progress_state.last_rendered,
)
running_task: RunningTask | None = None
if running_tasks is not None and progress_id is not None:
running_task = RunningTask()
running_tasks[progress_id] = running_task
cancel_exc_type = anyio.get_cancelled_exc_class()
edits_scope = anyio.CancelScope()
async def run_edits() -> None:
try:
with edits_scope:
await edits.run()
except cancel_exc_type:
# Edits are best-effort; cancellation should not bubble into the task group.
return
outcome = RunOutcome()
error: Exception | None = None
async with anyio.create_task_group() as tg:
if progress_id is not None:
tg.start_soon(run_edits)
try:
outcome = await run_runner_with_cancel(
runner,
prompt=runner_text,
resume_token=resume_token,
edits=edits,
running_task=running_task,
on_thread_known=on_thread_known,
)
except Exception as exc:
error = exc
logger.exception(
"handle.runner_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
if (
running_task is not None
and running_tasks is not None
and progress_id is not None
):
running_task.done.set()
running_tasks.pop(progress_id, None)
if not outcome.cancelled and error is None:
# Give pending progress edits a chance to flush if they're ready.
await anyio.sleep(0)
edits_scope.cancel()
elapsed = clock() - started_at
if error is not None:
sync_resume_token(progress_renderer, outcome.resume)
err_body = _format_error(error)
final_parts = progress_renderer.render_final_parts(
elapsed, err_body, status="error"
)
logger.debug(
"handle.error.markdown",
error=err_body,
markdown=assemble_markdown_parts(final_parts),
)
await send_result_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
parts=final_parts,
disable_notification=True,
edit_message_id=progress_id,
)
return
if outcome.cancelled:
resume = sync_resume_token(progress_renderer, outcome.resume)
logger.info(
"handle.cancelled",
resume=resume.value if resume else None,
elapsed_s=elapsed,
)
final_parts = progress_renderer.render_progress_parts(
elapsed, label="`cancelled`"
)
await send_result_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
parts=final_parts,
disable_notification=True,
edit_message_id=progress_id,
)
return
if outcome.completed is None:
raise RuntimeError("runner finished without a completed event")
completed = outcome.completed
run_ok = completed.ok
run_error = completed.error
final_answer = completed.answer
if run_ok is False and run_error:
if final_answer.strip():
final_answer = f"{final_answer}\n\n{run_error}"
else:
final_answer = str(run_error)
status = (
"error" if run_ok is False else ("done" if final_answer.strip() else "error")
)
resume_value = None
resume_token = completed.resume or outcome.resume
if resume_token is not None:
resume_value = resume_token.value
logger.info(
"runner.completed",
ok=run_ok,
error=run_error,
answer_len=len(final_answer or ""),
elapsed_s=round(elapsed, 2),
action_count=progress_renderer.action_count,
resume=resume_value,
)
sync_resume_token(progress_renderer, completed.resume or outcome.resume)
final_parts = progress_renderer.render_final_parts(
elapsed, final_answer, status=status
)
logger.debug(
"handle.final.markdown",
markdown=assemble_markdown_parts(final_parts),
status=status,
)
final_rendered, final_entities = prepare_telegram(final_parts)
can_edit_final = progress_id is not None
edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id
await send_result_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
progress_id=progress_id,
parts=final_parts,
disable_notification=False,
edit_message_id=edit_message_id,
prepared=(final_rendered, final_entities),
)
async def poll_updates(cfg: BridgeConfig) -> AsyncIterator[dict[str, Any]]:
offset: int | None = None
offset = await _drain_backlog(cfg, offset)
await _send_startup(cfg)
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
if updates is None:
logger.info("loop.get_updates.failed")
await anyio.sleep(2)
continue
logger.debug("loop.updates", updates=updates)
for upd in updates:
offset = upd["update_id"] + 1
msg = upd["message"]
if "text" not in msg:
continue
if msg["chat"]["id"] != cfg.chat_id:
continue
yield msg
async def _handle_cancel(
cfg: BridgeConfig,
msg: dict[str, Any],
running_tasks: dict[int, RunningTask],
) -> None:
chat_id = msg["chat"]["id"]
user_msg_id = msg["message_id"]
reply = msg.get("reply_to_message")
if not reply:
await cfg.bot.send_message(
chat_id=chat_id,
text="reply to the progress message to cancel.",
reply_to_message_id=user_msg_id,
)
return
progress_id = reply.get("message_id")
if progress_id is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
running_task = running_tasks.get(int(progress_id))
if running_task is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
logger.info(
"cancel.requested",
chat_id=chat_id,
progress_message_id=progress_id,
)
running_task.cancel_requested.set()
async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None:
if running_task.resume is not None:
return running_task.resume
resume: ResumeToken | None = None
async with anyio.create_task_group() as tg:
async def wait_resume() -> None:
nonlocal resume
await running_task.resume_ready.wait()
resume = running_task.resume
tg.cancel_scope.cancel()
async def wait_done() -> None:
await running_task.done.wait()
tg.cancel_scope.cancel()
tg.start_soon(wait_resume)
tg.start_soon(wait_done)
return resume
async def _send_with_resume(
bot: BotClient,
enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None]],
running_task: RunningTask,
chat_id: int,
user_msg_id: int,
text: str,
) -> None:
resume = await _wait_for_resume(running_task)
if resume is None:
await bot.send_message(
chat_id=chat_id,
text="resume token not ready yet; try replying to the final message.",
reply_to_message_id=user_msg_id,
disable_notification=True,
)
return
await enqueue(chat_id, user_msg_id, text, resume)
async def _send_runner_unavailable(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
resume_token: ResumeToken | None,
runner: Runner,
reason: str,
) -> None:
progress_renderer = ExecProgressRenderer(
max_actions=0, resume_formatter=runner.format_resume, engine=runner.engine
)
if resume_token is not None:
progress_renderer.resume_token = resume_token
final_parts = progress_renderer.render_final_parts(
0.0, f"error:\n{reason}", status="error"
)
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
parts=final_parts,
reply_to_message_id=user_msg_id,
disable_notification=False,
)
async def run_main_loop(
cfg: BridgeConfig,
poller: Callable[[BridgeConfig], AsyncIterator[dict[str, Any]]] = poll_updates,
) -> None:
running_tasks: dict[int, RunningTask] = {}
try:
await _set_command_menu(cfg)
async with anyio.create_task_group() as tg:
async def run_job(
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
engine_override: EngineId | None = None,
) -> None:
try:
try:
entry = (
cfg.router.entry_for_engine(engine_override)
if resume_token is None
else cfg.router.entry_for(resume_token)
)
except RunnerUnavailableError as exc:
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
parts=MarkdownParts(header=f"error:\n{exc}"),
reply_to_message_id=user_msg_id,
disable_notification=False,
)
return
if not entry.available:
reason = entry.issue or "engine unavailable"
await _send_runner_unavailable(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
resume_token=resume_token,
runner=entry.runner,
reason=reason,
)
return
bind_run_context(
chat_id=chat_id,
user_msg_id=user_msg_id,
engine=entry.runner.engine,
resume=resume_token.value if resume_token else None,
)
await handle_message(
cfg,
runner=entry.runner,
chat_id=chat_id,
user_msg_id=user_msg_id,
text=text,
resume_token=resume_token,
strip_resume_line=cfg.router.is_resume_line,
running_tasks=running_tasks,
on_thread_known=on_thread_known,
)
except Exception as exc:
logger.exception(
"handle.worker_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
clear_context()
async def run_thread_job(job: ThreadJob) -> None:
await run_job(
job.chat_id,
job.user_msg_id,
job.text,
job.resume_token,
)
scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job)
async for msg in poller(cfg):
text = msg["text"]
user_msg_id = msg["message_id"]
if _is_cancel_command(text):
tg.start_soon(_handle_cancel, cfg, msg, running_tasks)
continue
text, engine_override = _strip_engine_command(
text, engine_ids=cfg.router.engine_ids
)
r = msg.get("reply_to_message") or {}
resume_token = cfg.router.resolve_resume(text, r.get("text"))
reply_id = r.get("message_id")
if resume_token is None and reply_id is not None:
running_task = running_tasks.get(int(reply_id))
if running_task is None:
await anyio.sleep(0)
running_task = running_tasks.get(int(reply_id))
if running_task is not None:
tg.start_soon(
_send_with_resume,
cfg.bot,
scheduler.enqueue_resume,
running_task,
msg["chat"]["id"],
user_msg_id,
text,
)
continue
if resume_token is None:
tg.start_soon(
run_job,
msg["chat"]["id"],
user_msg_id,
text,
None,
scheduler.note_thread_known,
engine_override,
)
else:
await scheduler.enqueue_resume(
msg["chat"]["id"], user_msg_id, text, resume_token
)
finally:
await cfg.bot.close()
+21 -7
View File
@@ -11,14 +11,21 @@ import typer
from . import __version__ from . import __version__
from .backends import EngineBackend from .backends import EngineBackend
from .bridge import BridgeConfig, run_main_loop from .config import ConfigError
from .config import ConfigError, load_telegram_config
from .engines import get_backend, get_engine_config, list_backends from .engines import get_backend, get_engine_config, list_backends
from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from .logging import get_logger, setup_logging from .logging import get_logger, setup_logging
from .onboarding import SetupResult, check_setup, interactive_setup from .telegram.bridge import (
TelegramBridgeConfig,
TelegramPresenter,
TelegramTransport,
run_main_loop,
)
from .telegram.client import TelegramClient
from .telegram.config import load_telegram_config
from .telegram.onboarding import SetupResult, check_setup, interactive_setup
from .router import AutoRouter, RunnerEntry from .router import AutoRouter, RunnerEntry
from .telegram import TelegramClient from .runner_bridge import ExecBridgeConfig
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -184,7 +191,7 @@ def _parse_bridge_config(
config_path: Path, config_path: Path,
token: str, token: str,
chat_id: int, chat_id: int,
) -> BridgeConfig: ) -> TelegramBridgeConfig:
startup_pwd = os.getcwd() startup_pwd = os.getcwd()
backends = list_backends() backends = list_backends()
@@ -213,13 +220,20 @@ def _parse_bridge_config(
) )
bot = TelegramClient(token) bot = TelegramClient(token)
transport = TelegramTransport(bot)
presenter = TelegramPresenter()
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=presenter,
final_notify=final_notify,
)
return BridgeConfig( return TelegramBridgeConfig(
bot=bot, bot=bot,
router=router, router=router,
chat_id=chat_id, chat_id=chat_id,
final_notify=final_notify,
startup_msg=startup_msg, startup_msg=startup_msg,
exec_cfg=exec_cfg,
) )
-28
View File
@@ -1,33 +1,5 @@
from __future__ import annotations from __future__ import annotations
import tomllib
from pathlib import Path
HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml"
class ConfigError(RuntimeError): class ConfigError(RuntimeError):
pass pass
def _read_config(cfg_path: Path) -> dict:
try:
raw = cfg_path.read_text(encoding="utf-8")
except FileNotFoundError:
raise ConfigError(f"Missing config file {cfg_path}.") from None
except OSError as e:
raise ConfigError(f"Failed to read config file {cfg_path}: {e}") from e
try:
return tomllib.loads(raw)
except tomllib.TOMLDecodeError as e:
raise ConfigError(f"Malformed TOML in {cfg_path}: {e}") from None
def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]:
if path:
cfg_path = Path(path).expanduser()
return _read_config(cfg_path), cfg_path
cfg_path = HOME_CONFIG_PATH
if cfg_path.exists() and not cfg_path.is_file():
raise ConfigError(f"Config path {cfg_path} exists but is not a file.") from None
return _read_config(cfg_path), cfg_path
+70 -122
View File
@@ -1,19 +1,12 @@
"""Pure renderers for Takopi events (no engine-native event handling)."""
from __future__ import annotations from __future__ import annotations
import re
import textwrap import textwrap
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any
from markdown_it import MarkdownIt from .model import Action, ActionEvent, StartedEvent, TakopiEvent
from sulguk import transform_html from .progress import ProgressState
from .transport import RenderedMessage
from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
from .utils.paths import relativize_path from .utils.paths import relativize_path
STATUS = {"running": "", "update": "", "done": "", "fail": ""} STATUS = {"running": "", "update": "", "done": "", "fail": ""}
@@ -23,11 +16,8 @@ HARD_BREAK = " \n"
MAX_PROGRESS_CMD_LEN = 300 MAX_PROGRESS_CMD_LEN = 300
MAX_FILE_CHANGES_INLINE = 3 MAX_FILE_CHANGES_INLINE = 3
_MD_RENDERER = MarkdownIt("commonmark", {"html": False})
_BULLET_RE = re.compile(r"(?m)^(\s*)•")
@dataclass(frozen=True, slots=True)
@dataclass(frozen=True)
class MarkdownParts: class MarkdownParts:
header: str header: str
body: str | None = None body: str | None = None
@@ -40,33 +30,6 @@ def assemble_markdown_parts(parts: MarkdownParts) -> str:
) )
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _MD_RENDERER.render(md or "")
rendered = transform_html(html)
text = _BULLET_RE.sub(r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def trim_body(body: str | None) -> str | None:
if not body:
return None
if len(body) > 3500:
body = body[: 3500 - 1] + ""
return body if body.strip() else None
def prepare_telegram(parts: MarkdownParts) -> tuple[str, list[dict[str, Any]]]:
trimmed = MarkdownParts(
header=parts.header or "",
body=trim_body(parts.body),
footer=parts.footer,
)
return render_markdown(assemble_markdown_parts(trimmed))
def format_changed_file_path(path: str, *, base_dir: Path | None = None) -> str: def format_changed_file_path(path: str, *, base_dir: Path | None = None) -> str:
return f"`{relativize_path(path, base_dir=base_dir)}`" return f"`{relativize_path(path, base_dir=base_dir)}`"
@@ -209,115 +172,100 @@ def render_event_cli(event: TakopiEvent) -> list[str]:
return [] return []
@dataclass class MarkdownFormatter:
class RecentLine:
action_id: str
text: str
completed: bool = False
class ExecProgressRenderer:
def __init__( def __init__(
self, self,
engine: str, *,
max_actions: int = 5, max_actions: int = 5,
command_width: int | None = MAX_PROGRESS_CMD_LEN, command_width: int | None = MAX_PROGRESS_CMD_LEN,
resume_formatter: Callable[[ResumeToken], str] | None = None,
) -> None: ) -> None:
self.max_actions = max(0, int(max_actions)) self.max_actions = max(0, int(max_actions))
self.command_width = command_width self.command_width = command_width
self.lines: deque[RecentLine] = deque(maxlen=self.max_actions)
self.action_count = 0
self.seen_action_ids: set[str] = set()
self.resume_token: ResumeToken | None = None
self._resume_formatter = resume_formatter
self.engine = engine
def note_event(self, event: TakopiEvent) -> bool:
match event:
case StartedEvent(resume=resume):
self.resume_token = resume
return True
case ActionEvent(action=action, phase=phase, ok=ok):
if action.kind == "turn":
return False
action_id = str(action.id or "")
if not action_id:
return False
completed = phase == "completed"
has_open = self.has_open_line(action_id)
is_update = phase == "updated" or (phase == "started" and has_open)
phase_for_line = "updated" if is_update and not completed else phase
line = format_action_line(
action, phase_for_line, ok, command_width=self.command_width
)
if action_id not in self.seen_action_ids:
self.seen_action_ids.add(action_id)
self.action_count += 1
self.upsert_line(action_id, line=line, completed=completed)
return True
case _:
return False
def has_open_line(self, action_id: str) -> bool:
return any(
line.action_id == action_id and not line.completed for line in self.lines
)
def upsert_line(self, action_id: str, *, line: str, completed: bool) -> None:
for i in range(len(self.lines) - 1, -1, -1):
existing = self.lines[i]
if existing.action_id == action_id and not existing.completed:
self.lines[i] = RecentLine(
action_id=action_id,
text=line,
completed=existing.completed or completed,
)
return
self.lines.append(
RecentLine(action_id=action_id, text=line, completed=completed)
)
def render_progress_parts( def render_progress_parts(
self, elapsed_s: float, label: str = "working" self,
state: ProgressState,
*,
elapsed_s: float,
label: str = "working",
) -> MarkdownParts: ) -> MarkdownParts:
step = self.action_count or None step = state.action_count or None
header = format_header( header = format_header(
elapsed_s, elapsed_s,
step, step,
label=label, label=label,
engine=self.engine, engine=state.engine,
) )
body = self.assemble_body([line.text for line in self.lines]) body = self._assemble_body(self._format_actions(state))
return MarkdownParts(header=header, body=body, footer=self.render_footer()) return MarkdownParts(header=header, body=body, footer=state.resume_line)
def render_final_parts( def render_final_parts(
self, elapsed_s: float, answer: str, status: str = "done" self,
state: ProgressState,
*,
elapsed_s: float,
status: str,
answer: str,
) -> MarkdownParts: ) -> MarkdownParts:
step = self.action_count or None step = state.action_count or None
header = format_header( header = format_header(
elapsed_s, elapsed_s,
step, step,
label=status, label=status,
engine=self.engine, engine=state.engine,
) )
answer = (answer or "").strip() answer = (answer or "").strip()
body = answer if answer else None body = answer if answer else None
return MarkdownParts(header=header, body=body, footer=self.render_footer()) return MarkdownParts(header=header, body=body, footer=state.resume_line)
def render_footer(self) -> str | None: def _format_actions(self, state: ProgressState) -> list[str]:
if not self.resume_token or self._resume_formatter is None: actions = list(state.actions)
return None if self.max_actions == 0:
return self._resume_formatter(self.resume_token) actions = []
else:
@property actions = actions[-self.max_actions :]
def recent_actions(self) -> list[str]: return [
return [line.text for line in self.lines] format_action_line(
action_state.action,
action_state.display_phase,
action_state.ok,
command_width=self.command_width,
)
for action_state in actions
]
@staticmethod @staticmethod
def assemble_body(lines: list[str]) -> str | None: def _assemble_body(lines: list[str]) -> str | None:
if not lines: if not lines:
return None return None
return HARD_BREAK.join(lines) return HARD_BREAK.join(lines)
class MarkdownPresenter:
def __init__(self, *, formatter: MarkdownFormatter | None = None) -> None:
self._formatter = formatter or MarkdownFormatter()
def render_progress(
self,
state: ProgressState,
*,
elapsed_s: float,
label: str = "working",
) -> RenderedMessage:
parts = self._formatter.render_progress_parts(
state, elapsed_s=elapsed_s, label=label
)
return RenderedMessage(text=assemble_markdown_parts(parts))
def render_final(
self,
state: ProgressState,
*,
elapsed_s: float,
status: str,
answer: str,
) -> RenderedMessage:
parts = self._formatter.render_final_parts(
state, elapsed_s=elapsed_s, status=status, answer=answer
)
return RenderedMessage(text=assemble_markdown_parts(parts))
+25
View File
@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Protocol
from .progress import ProgressState
from .transport import RenderedMessage
class Presenter(Protocol):
def render_progress(
self,
state: ProgressState,
*,
elapsed_s: float,
label: str = "working",
) -> RenderedMessage: ...
def render_final(
self,
state: ProgressState,
*,
elapsed_s: float,
status: str,
answer: str,
) -> RenderedMessage: ...
+96
View File
@@ -0,0 +1,96 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
@dataclass(frozen=True, slots=True)
class ActionState:
action: Action
phase: str
ok: bool | None
display_phase: str
completed: bool
first_seen: int
last_update: int
@dataclass(frozen=True, slots=True)
class ProgressState:
engine: str
action_count: int
actions: tuple[ActionState, ...]
resume: ResumeToken | None
resume_line: str | None
class ProgressTracker:
def __init__(self, *, engine: str) -> None:
self.engine = engine
self.resume: ResumeToken | None = None
self.action_count = 0
self._actions: dict[str, ActionState] = {}
self._seq = 0
def note_event(self, event: TakopiEvent) -> bool:
match event:
case StartedEvent(resume=resume):
self.resume = resume
return True
case ActionEvent(action=action, phase=phase, ok=ok):
if action.kind == "turn":
return False
action_id = str(action.id or "")
if not action_id:
return False
completed = phase == "completed"
existing = self._actions.get(action_id)
has_open = existing is not None and not existing.completed
is_update = phase == "updated" or (phase == "started" and has_open)
display_phase = "updated" if is_update and not completed else phase
self._seq += 1
seq = self._seq
if existing is None:
self.action_count += 1
first_seen = seq
else:
first_seen = existing.first_seen
self._actions[action_id] = ActionState(
action=action,
phase=phase,
ok=ok,
display_phase=display_phase,
completed=completed,
first_seen=first_seen,
last_update=seq,
)
return True
case _:
return False
def set_resume(self, resume: ResumeToken | None) -> None:
if resume is not None:
self.resume = resume
def snapshot(
self,
*,
resume_formatter: Callable[[ResumeToken], str] | None = None,
) -> ProgressState:
resume_line: str | None = None
if self.resume is not None and resume_formatter is not None:
resume_line = resume_formatter(self.resume)
actions = tuple(
sorted(self._actions.values(), key=lambda item: item.first_seen)
)
return ProgressState(
engine=self.engine,
action_count=self.action_count,
actions=actions,
resume=self.resume,
resume_line=resume_line,
)
+575
View File
@@ -0,0 +1,575 @@
from __future__ import annotations
import time
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
import anyio
from .logging import bind_run_context, get_logger
from .model import CompletedEvent, ResumeToken, StartedEvent, TakopiEvent
from .presenter import Presenter
from .markdown import render_event_cli
from .runner import Runner
from .progress import ProgressTracker
from .transport import (
ChannelId,
MessageId,
MessageRef,
RenderedMessage,
SendOptions,
Transport,
)
logger = get_logger(__name__)
def _log_runner_event(evt: TakopiEvent) -> None:
for line in render_event_cli(evt):
logger.debug(
"runner.event.cli",
line=line,
event_type=getattr(evt, "type", None),
engine=getattr(evt, "engine", None),
)
def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str:
stripped_lines: list[str] = []
for line in text.splitlines():
if is_resume_line(line):
continue
stripped_lines.append(line)
prompt = "\n".join(stripped_lines).strip()
return prompt or "continue"
def _flatten_exception_group(error: BaseException) -> list[BaseException]:
if isinstance(error, BaseExceptionGroup):
flattened: list[BaseException] = []
for exc in error.exceptions:
flattened.extend(_flatten_exception_group(exc))
return flattened
return [error]
def _format_error(error: Exception) -> str:
cancel_exc = anyio.get_cancelled_exc_class()
flattened = [
exc
for exc in _flatten_exception_group(error)
if not isinstance(exc, cancel_exc)
]
if len(flattened) == 1:
return str(flattened[0]) or flattened[0].__class__.__name__
if not flattened:
return str(error) or error.__class__.__name__
messages = [str(exc) for exc in flattened if str(exc)]
if not messages:
return str(error) or error.__class__.__name__
if len(messages) == 1:
return messages[0]
return "\n".join(messages)
@dataclass(frozen=True, slots=True)
class IncomingMessage:
channel_id: ChannelId
message_id: MessageId
text: str
reply_to: MessageRef | None = None
@dataclass(frozen=True)
class ExecBridgeConfig:
transport: Transport
presenter: Presenter
final_notify: bool
@dataclass
class RunningTask:
resume: ResumeToken | None = None
resume_ready: anyio.Event = field(default_factory=anyio.Event)
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
done: anyio.Event = field(default_factory=anyio.Event)
RunningTasks = dict[MessageRef, RunningTask]
async def _send_or_edit_message(
transport: Transport,
*,
channel_id: ChannelId,
message: RenderedMessage,
edit_ref: MessageRef | None = None,
reply_to: MessageRef | None = None,
notify: bool = True,
replace_ref: MessageRef | None = None,
) -> tuple[MessageRef | None, bool]:
msg = message
if edit_ref is not None:
logger.debug(
"transport.edit_message",
channel_id=edit_ref.channel_id,
message_id=edit_ref.message_id,
rendered=msg.text,
)
edited = await transport.edit(ref=edit_ref, message=msg)
if edited is not None:
return edited, True
logger.debug(
"transport.send_message",
channel_id=channel_id,
reply_to_message_id=reply_to.message_id if reply_to else None,
rendered=msg.text,
)
sent = await transport.send(
channel_id=channel_id,
message=msg,
options=SendOptions(
reply_to=reply_to,
notify=notify,
replace=replace_ref,
),
)
return sent, False
class ProgressEdits:
def __init__(
self,
*,
transport: Transport,
presenter: Presenter,
channel_id: ChannelId,
progress_ref: MessageRef | None,
tracker: ProgressTracker,
started_at: float,
clock: Callable[[], float],
last_rendered: RenderedMessage | None,
resume_formatter: Callable[[ResumeToken], str] | None = None,
label: str = "working",
) -> None:
self.transport = transport
self.presenter = presenter
self.channel_id = channel_id
self.progress_ref = progress_ref
self.tracker = tracker
self.started_at = started_at
self.clock = clock
self.last_rendered = last_rendered
self.resume_formatter = resume_formatter
self.label = label
self.event_seq = 0
self.rendered_seq = 0
self.signal_send, self.signal_recv = anyio.create_memory_object_stream(1)
async def run(self) -> None:
if self.progress_ref is None:
return
while True:
while self.rendered_seq == self.event_seq:
try:
await self.signal_recv.receive()
except anyio.EndOfStream:
return
seq_at_render = self.event_seq
now = self.clock()
state = self.tracker.snapshot(resume_formatter=self.resume_formatter)
rendered = self.presenter.render_progress(
state, elapsed_s=now - self.started_at, label=self.label
)
if rendered != self.last_rendered:
logger.debug(
"transport.edit_message",
channel_id=self.channel_id,
message_id=self.progress_ref.message_id,
rendered=rendered.text,
)
edited = await self.transport.edit(
ref=self.progress_ref,
message=rendered,
wait=False,
)
if edited is not None:
self.last_rendered = rendered
self.rendered_seq = seq_at_render
async def on_event(self, evt: TakopiEvent) -> None:
if not self.tracker.note_event(evt):
return
if self.progress_ref is None:
return
self.event_seq += 1
try:
self.signal_send.send_nowait(None)
except anyio.WouldBlock:
pass
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
pass
@dataclass(frozen=True, slots=True)
class ProgressMessageState:
ref: MessageRef | None
last_rendered: RenderedMessage | None
async def send_initial_progress(
cfg: ExecBridgeConfig,
*,
channel_id: ChannelId,
reply_to: MessageRef,
label: str,
tracker: ProgressTracker,
resume_formatter: Callable[[ResumeToken], str] | None = None,
) -> ProgressMessageState:
progress_ref: MessageRef | None = None
last_rendered: RenderedMessage | None = None
state = tracker.snapshot(resume_formatter=resume_formatter)
initial_rendered = cfg.presenter.render_progress(
state,
elapsed_s=0.0,
label=label,
)
logger.debug(
"transport.send_message",
channel_id=channel_id,
reply_to_message_id=reply_to.message_id,
rendered=initial_rendered.text,
)
progress_ref = await cfg.transport.send(
channel_id=channel_id,
message=initial_rendered,
options=SendOptions(reply_to=reply_to, notify=False),
)
if progress_ref is not None:
last_rendered = initial_rendered
logger.debug(
"progress.sent",
channel_id=progress_ref.channel_id,
message_id=progress_ref.message_id,
)
return ProgressMessageState(
ref=progress_ref,
last_rendered=last_rendered,
)
@dataclass(slots=True)
class RunOutcome:
cancelled: bool = False
completed: CompletedEvent | None = None
resume: ResumeToken | None = None
async def run_runner_with_cancel(
runner: Runner,
*,
prompt: str,
resume_token: ResumeToken | None,
edits: ProgressEdits,
running_task: RunningTask | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]] | None,
) -> RunOutcome:
outcome = RunOutcome()
async with anyio.create_task_group() as tg:
async def run_runner() -> None:
try:
async for evt in runner.run(prompt, resume_token):
_log_runner_event(evt)
if isinstance(evt, StartedEvent):
outcome.resume = evt.resume
bind_run_context(resume=evt.resume.value)
if running_task is not None and running_task.resume is None:
running_task.resume = evt.resume
running_task.resume_ready.set()
if on_thread_known is not None:
await on_thread_known(evt.resume, running_task.done)
elif isinstance(evt, CompletedEvent):
outcome.resume = evt.resume or outcome.resume
outcome.completed = evt
await edits.on_event(evt)
finally:
tg.cancel_scope.cancel()
async def wait_cancel(task: RunningTask) -> None:
await task.cancel_requested.wait()
outcome.cancelled = True
tg.cancel_scope.cancel()
tg.start_soon(run_runner)
if running_task is not None:
tg.start_soon(wait_cancel, running_task)
return outcome
def sync_resume_token(
tracker: ProgressTracker, resume: ResumeToken | None
) -> ResumeToken | None:
resume = resume or tracker.resume
tracker.set_resume(resume)
return resume
async def send_result_message(
cfg: ExecBridgeConfig,
*,
channel_id: ChannelId,
reply_to: MessageRef,
progress_ref: MessageRef | None,
message: RenderedMessage,
notify: bool,
edit_ref: MessageRef | None,
replace_ref: MessageRef | None = None,
delete_tag: str = "final",
) -> None:
final_msg, edited = await _send_or_edit_message(
cfg.transport,
channel_id=channel_id,
message=message,
edit_ref=edit_ref,
reply_to=reply_to,
notify=notify,
replace_ref=replace_ref,
)
if final_msg is None:
return
if (
progress_ref is not None
and (edit_ref is None or not edited)
and replace_ref is None
):
logger.debug(
"transport.delete_message",
channel_id=progress_ref.channel_id,
message_id=progress_ref.message_id,
tag=delete_tag,
)
await cfg.transport.delete(ref=progress_ref)
async def handle_message(
cfg: ExecBridgeConfig,
*,
runner: Runner,
incoming: IncomingMessage,
resume_token: ResumeToken | None,
strip_resume_line: Callable[[str], bool] | None = None,
running_tasks: RunningTasks | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
clock: Callable[[], float] = time.monotonic,
) -> None:
logger.info(
"handle.incoming",
channel_id=incoming.channel_id,
user_msg_id=incoming.message_id,
resume=resume_token.value if resume_token else None,
text=incoming.text,
)
started_at = clock()
is_resume_line = runner.is_resume_line
resume_strip = strip_resume_line or is_resume_line
runner_text = _strip_resume_lines(incoming.text, is_resume_line=resume_strip)
progress_tracker = ProgressTracker(engine=runner.engine)
user_ref = MessageRef(
channel_id=incoming.channel_id,
message_id=incoming.message_id,
)
progress_state = await send_initial_progress(
cfg,
channel_id=incoming.channel_id,
reply_to=user_ref,
label="starting",
tracker=progress_tracker,
resume_formatter=runner.format_resume,
)
progress_ref = progress_state.ref
edits = ProgressEdits(
transport=cfg.transport,
presenter=cfg.presenter,
channel_id=incoming.channel_id,
progress_ref=progress_ref,
tracker=progress_tracker,
started_at=started_at,
clock=clock,
last_rendered=progress_state.last_rendered,
resume_formatter=runner.format_resume,
)
running_task: RunningTask | None = None
if running_tasks is not None and progress_ref is not None:
running_task = RunningTask()
running_tasks[progress_ref] = running_task
cancel_exc_type = anyio.get_cancelled_exc_class()
edits_scope = anyio.CancelScope()
async def run_edits() -> None:
try:
with edits_scope:
await edits.run()
except cancel_exc_type:
# Edits are best-effort; cancellation should not bubble into the task group.
return
outcome = RunOutcome()
error: Exception | None = None
async with anyio.create_task_group() as tg:
if progress_ref is not None:
tg.start_soon(run_edits)
try:
outcome = await run_runner_with_cancel(
runner,
prompt=runner_text,
resume_token=resume_token,
edits=edits,
running_task=running_task,
on_thread_known=on_thread_known,
)
except Exception as exc:
error = exc
logger.exception(
"handle.runner_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
if running_task is not None and running_tasks is not None:
running_task.done.set()
if progress_ref is not None:
running_tasks.pop(progress_ref, None)
if not outcome.cancelled and error is None:
# Give pending progress edits a chance to flush if they're ready.
await anyio.sleep(0)
edits_scope.cancel()
elapsed = clock() - started_at
if error is not None:
sync_resume_token(progress_tracker, outcome.resume)
err_body = _format_error(error)
state = progress_tracker.snapshot(resume_formatter=runner.format_resume)
final_rendered = cfg.presenter.render_final(
state,
elapsed_s=elapsed,
status="error",
answer=err_body,
)
logger.debug(
"handle.error.rendered",
error=err_body,
rendered=final_rendered.text,
)
await send_result_message(
cfg,
channel_id=incoming.channel_id,
reply_to=user_ref,
progress_ref=progress_ref,
message=final_rendered,
notify=False,
edit_ref=progress_ref,
replace_ref=progress_ref,
delete_tag="error",
)
return
if outcome.cancelled:
resume = sync_resume_token(progress_tracker, outcome.resume)
logger.info(
"handle.cancelled",
resume=resume.value if resume else None,
elapsed_s=elapsed,
)
state = progress_tracker.snapshot(resume_formatter=runner.format_resume)
final_rendered = cfg.presenter.render_progress(
state,
elapsed_s=elapsed,
label="`cancelled`",
)
await send_result_message(
cfg,
channel_id=incoming.channel_id,
reply_to=user_ref,
progress_ref=progress_ref,
message=final_rendered,
notify=False,
edit_ref=progress_ref,
replace_ref=progress_ref,
delete_tag="cancel",
)
return
if outcome.completed is None:
raise RuntimeError("runner finished without a completed event")
completed = outcome.completed
run_ok = completed.ok
run_error = completed.error
final_answer = completed.answer
if run_ok is False and run_error:
if final_answer.strip():
final_answer = f"{final_answer}\n\n{run_error}"
else:
final_answer = str(run_error)
status = (
"error" if run_ok is False else ("done" if final_answer.strip() else "error")
)
resume_value = None
resume_token = completed.resume or outcome.resume
if resume_token is not None:
resume_value = resume_token.value
logger.info(
"runner.completed",
ok=run_ok,
error=run_error,
answer_len=len(final_answer or ""),
elapsed_s=round(elapsed, 2),
action_count=progress_tracker.action_count,
resume=resume_value,
)
sync_resume_token(progress_tracker, completed.resume or outcome.resume)
state = progress_tracker.snapshot(resume_formatter=runner.format_resume)
final_rendered = cfg.presenter.render_final(
state,
elapsed_s=elapsed,
status=status,
answer=final_answer,
)
logger.debug(
"handle.final.rendered",
rendered=final_rendered.text,
status=status,
)
can_edit_final = progress_ref is not None
edit_ref = None if cfg.final_notify or not can_edit_final else progress_ref
await send_result_message(
cfg,
channel_id=incoming.channel_id,
reply_to=user_ref,
progress_ref=progress_ref,
message=final_rendered,
notify=cfg.final_notify,
edit_ref=edit_ref,
replace_ref=progress_ref,
delete_tag="final",
)
+1
View File
@@ -0,0 +1 @@
"""Telegram-specific clients and adapters."""
+569
View File
@@ -0,0 +1,569 @@
from __future__ import annotations
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass
from typing import Any
import anyio
from ..runner_bridge import (
ExecBridgeConfig,
IncomingMessage,
RunningTask,
RunningTasks,
handle_message,
)
from ..logging import bind_run_context, clear_context, get_logger
from ..markdown import MarkdownFormatter, MarkdownParts
from ..model import EngineId, ResumeToken
from ..progress import ProgressState, ProgressTracker
from ..router import AutoRouter, RunnerUnavailableError
from ..runner import Runner
from ..scheduler import ThreadJob, ThreadScheduler
from ..transport import MessageRef, RenderedMessage, SendOptions, Transport
from .client import BotClient
from .render import prepare_telegram
logger = get_logger(__name__)
def _is_cancel_command(text: str) -> bool:
stripped = text.strip()
if not stripped:
return False
command = stripped.split(maxsplit=1)[0]
return command == "/cancel" or command.startswith("/cancel@")
def _strip_engine_command(
text: str, *, engine_ids: tuple[EngineId, ...]
) -> tuple[str, EngineId | None]:
if not text:
return text, None
if not engine_ids:
return text, None
engine_map = {engine.lower(): engine for engine in engine_ids}
lines = text.splitlines()
idx = next((i for i, line in enumerate(lines) if line.strip()), None)
if idx is None:
return text, None
line = lines[idx].lstrip()
if not line.startswith("/"):
return text, None
parts = line.split(maxsplit=1)
command = parts[0][1:]
if "@" in command:
command = command.split("@", 1)[0]
engine = engine_map.get(command.lower())
if engine is None:
return text, None
remainder = parts[1] if len(parts) > 1 else ""
if remainder:
lines[idx] = remainder
else:
lines.pop(idx)
return "\n".join(lines).strip(), engine
def _build_bot_commands(router: AutoRouter) -> list[dict[str, str]]:
commands: list[dict[str, str]] = []
seen: set[str] = set()
for entry in router.available_entries:
cmd = entry.engine.lower()
if cmd in seen:
continue
commands.append({"command": cmd, "description": f"start {cmd}"})
seen.add(cmd)
if "cancel" not in seen:
commands.append({"command": "cancel", "description": "cancel run"})
return commands
async def _set_command_menu(cfg: TelegramBridgeConfig) -> None:
commands = _build_bot_commands(cfg.router)
if not commands:
return
try:
ok = await cfg.bot.set_my_commands(commands)
except Exception as exc:
logger.info(
"startup.command_menu.failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
return
if not ok:
logger.info("startup.command_menu.rejected")
return
logger.info(
"startup.command_menu.updated",
commands=[cmd["command"] for cmd in commands],
)
class TelegramPresenter:
def __init__(self, *, formatter: MarkdownFormatter | None = None) -> None:
self._formatter = formatter or MarkdownFormatter()
def render_progress(
self,
state: ProgressState,
*,
elapsed_s: float,
label: str = "working",
) -> RenderedMessage:
parts = self._formatter.render_progress_parts(
state, elapsed_s=elapsed_s, label=label
)
text, entities = prepare_telegram(parts)
return RenderedMessage(text=text, extra={"entities": entities})
def render_final(
self,
state: ProgressState,
*,
elapsed_s: float,
status: str,
answer: str,
) -> RenderedMessage:
parts = self._formatter.render_final_parts(
state, elapsed_s=elapsed_s, status=status, answer=answer
)
text, entities = prepare_telegram(parts)
return RenderedMessage(text=text, extra={"entities": entities})
def _as_int(value: int | str, *, label: str) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise TypeError(f"Telegram {label} must be int")
return value
class TelegramTransport:
def __init__(self, bot: BotClient) -> None:
self._bot = bot
async def close(self) -> None:
await self._bot.close()
async def send(
self,
*,
channel_id: int | str,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef | None:
chat_id = _as_int(channel_id, label="chat_id")
reply_to_message_id: int | None = None
replace_message_id: int | None = None
disable_notification = None
if options is not None:
disable_notification = not options.notify
if options.reply_to is not None:
reply_to_message_id = _as_int(
options.reply_to.message_id, label="reply_to_message_id"
)
if options.replace is not None:
replace_message_id = _as_int(
options.replace.message_id, label="replace_message_id"
)
entities = message.extra.get("entities")
parse_mode = message.extra.get("parse_mode")
sent = await self._bot.send_message(
chat_id=chat_id,
text=message.text,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
entities=entities,
parse_mode=parse_mode,
replace_message_id=replace_message_id,
)
if sent is None:
return None
message_id = sent.get("message_id")
if message_id is None:
return None
return MessageRef(
channel_id=chat_id,
message_id=_as_int(message_id, label="message_id"),
raw=sent,
)
async def edit(
self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True
) -> MessageRef | None:
chat_id = _as_int(ref.channel_id, label="chat_id")
message_id = _as_int(ref.message_id, label="message_id")
entities = message.extra.get("entities")
parse_mode = message.extra.get("parse_mode")
edited = await self._bot.edit_message_text(
chat_id=chat_id,
message_id=message_id,
text=message.text,
entities=entities,
parse_mode=parse_mode,
wait=wait,
)
if edited is None:
return ref if not wait else None
message_id = edited.get("message_id", message_id)
return MessageRef(
channel_id=chat_id,
message_id=_as_int(message_id, label="message_id"),
raw=edited,
)
async def delete(self, *, ref: MessageRef) -> bool:
return await self._bot.delete_message(
chat_id=_as_int(ref.channel_id, label="chat_id"),
message_id=_as_int(ref.message_id, label="message_id"),
)
@dataclass(frozen=True)
class TelegramBridgeConfig:
bot: BotClient
router: AutoRouter
chat_id: int
startup_msg: str
exec_cfg: ExecBridgeConfig
async def _send_plain(
transport: Transport,
*,
chat_id: int,
user_msg_id: int,
text: str,
notify: bool = True,
) -> None:
reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id)
await transport.send(
channel_id=chat_id,
message=RenderedMessage(text=text),
options=SendOptions(reply_to=reply_to, notify=notify),
)
async def _send_startup(cfg: TelegramBridgeConfig) -> None:
logger.debug("startup.message", text=cfg.startup_msg)
parts = MarkdownParts(header=cfg.startup_msg)
text, entities = prepare_telegram(parts)
message = RenderedMessage(text=text, extra={"entities": entities})
sent = await cfg.exec_cfg.transport.send(
channel_id=cfg.chat_id,
message=message,
)
if sent is not None:
logger.info("startup.sent", chat_id=cfg.chat_id)
async def _drain_backlog(cfg: TelegramBridgeConfig, offset: int | None) -> int | None:
drained = 0
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
if updates is None:
logger.info("startup.backlog.failed")
return offset
logger.debug("startup.backlog.updates", updates=updates)
if not updates:
if drained:
logger.info("startup.backlog.drained", count=drained)
return offset
offset = updates[-1]["update_id"] + 1
drained += len(updates)
async def poll_updates(cfg: TelegramBridgeConfig) -> AsyncIterator[dict[str, Any]]:
offset: int | None = None
offset = await _drain_backlog(cfg, offset)
await _send_startup(cfg)
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
if updates is None:
logger.info("loop.get_updates.failed")
await anyio.sleep(2)
continue
logger.debug("loop.updates", updates=updates)
for upd in updates:
offset = upd["update_id"] + 1
msg = upd["message"]
if "text" not in msg:
continue
if msg["chat"]["id"] != cfg.chat_id:
continue
yield msg
async def _handle_cancel(
cfg: TelegramBridgeConfig,
msg: dict[str, Any],
running_tasks: RunningTasks,
) -> None:
chat_id = msg["chat"]["id"]
user_msg_id = msg["message_id"]
reply = msg.get("reply_to_message")
if not reply:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=chat_id,
user_msg_id=user_msg_id,
text="reply to the progress message to cancel.",
)
return
progress_id = reply.get("message_id")
if progress_id is None:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=chat_id,
user_msg_id=user_msg_id,
text="nothing is currently running for that message.",
)
return
progress_ref = MessageRef(channel_id=chat_id, message_id=progress_id)
running_task = running_tasks.get(progress_ref)
if running_task is None:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=chat_id,
user_msg_id=user_msg_id,
text="nothing is currently running for that message.",
)
return
logger.info(
"cancel.requested",
chat_id=chat_id,
progress_message_id=progress_id,
)
running_task.cancel_requested.set()
async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None:
if running_task.resume is not None:
return running_task.resume
resume: ResumeToken | None = None
async with anyio.create_task_group() as tg:
async def wait_resume() -> None:
nonlocal resume
await running_task.resume_ready.wait()
resume = running_task.resume
tg.cancel_scope.cancel()
async def wait_done() -> None:
await running_task.done.wait()
tg.cancel_scope.cancel()
tg.start_soon(wait_resume)
tg.start_soon(wait_done)
return resume
async def _send_with_resume(
cfg: TelegramBridgeConfig,
enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None]],
running_task: RunningTask,
chat_id: int,
user_msg_id: int,
text: str,
) -> None:
resume = await _wait_for_resume(running_task)
if resume is None:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=chat_id,
user_msg_id=user_msg_id,
text="resume token not ready yet; try replying to the final message.",
notify=False,
)
return
await enqueue(chat_id, user_msg_id, text, resume)
async def _send_runner_unavailable(
cfg: TelegramBridgeConfig,
*,
chat_id: int,
user_msg_id: int,
resume_token: ResumeToken | None,
runner: Runner,
reason: str,
) -> None:
tracker = ProgressTracker(engine=runner.engine)
tracker.set_resume(resume_token)
state = tracker.snapshot(resume_formatter=runner.format_resume)
message = cfg.exec_cfg.presenter.render_final(
state,
elapsed_s=0.0,
status="error",
answer=f"error:\n{reason}",
)
reply_to = MessageRef(channel_id=chat_id, message_id=user_msg_id)
await cfg.exec_cfg.transport.send(
channel_id=chat_id,
message=message,
options=SendOptions(reply_to=reply_to, notify=True),
)
async def run_main_loop(
cfg: TelegramBridgeConfig,
poller: Callable[
[TelegramBridgeConfig], AsyncIterator[dict[str, Any]]
] = poll_updates,
) -> None:
running_tasks: RunningTasks = {}
try:
await _set_command_menu(cfg)
async with anyio.create_task_group() as tg:
async def run_job(
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
reply_ref: MessageRef | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
engine_override: EngineId | None = None,
) -> None:
try:
try:
entry = (
cfg.router.entry_for_engine(engine_override)
if resume_token is None
else cfg.router.entry_for(resume_token)
)
except RunnerUnavailableError as exc:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=chat_id,
user_msg_id=user_msg_id,
text=f"error:\n{exc}",
)
return
if not entry.available:
reason = entry.issue or "engine unavailable"
await _send_runner_unavailable(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
resume_token=resume_token,
runner=entry.runner,
reason=reason,
)
return
bind_run_context(
chat_id=chat_id,
user_msg_id=user_msg_id,
engine=entry.runner.engine,
resume=resume_token.value if resume_token else None,
)
incoming = IncomingMessage(
channel_id=chat_id,
message_id=user_msg_id,
text=text,
reply_to=reply_ref,
)
await handle_message(
cfg.exec_cfg,
runner=entry.runner,
incoming=incoming,
resume_token=resume_token,
strip_resume_line=cfg.router.is_resume_line,
running_tasks=running_tasks,
on_thread_known=on_thread_known,
)
except Exception as exc:
logger.exception(
"handle.worker_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
clear_context()
async def run_thread_job(job: ThreadJob) -> None:
await run_job(
job.chat_id,
job.user_msg_id,
job.text,
job.resume_token,
None,
)
scheduler = ThreadScheduler(task_group=tg, run_job=run_thread_job)
async for msg in poller(cfg):
text = msg["text"]
user_msg_id = msg["message_id"]
chat_id = msg["chat"]["id"]
reply_ref = None
reply_msg = msg.get("reply_to_message")
if reply_msg:
reply_id = reply_msg.get("message_id")
if reply_id is not None:
reply_ref = MessageRef(channel_id=chat_id, message_id=reply_id)
if _is_cancel_command(text):
tg.start_soon(_handle_cancel, cfg, msg, running_tasks)
continue
text, engine_override = _strip_engine_command(
text, engine_ids=cfg.router.engine_ids
)
r = msg.get("reply_to_message") or {}
resume_token = cfg.router.resolve_resume(text, r.get("text"))
reply_id = r.get("message_id")
if resume_token is None and reply_id is not None:
running_task = running_tasks.get(
MessageRef(channel_id=chat_id, message_id=reply_id)
)
if running_task is not None:
tg.start_soon(
_send_with_resume,
cfg,
scheduler.enqueue_resume,
running_task,
chat_id,
user_msg_id,
text,
)
continue
if resume_token is None:
tg.start_soon(
run_job,
chat_id,
user_msg_id,
text,
None,
reply_ref,
scheduler.note_thread_known,
engine_override,
)
else:
await scheduler.enqueue_resume(
chat_id, user_msg_id, text, resume_token
)
finally:
await cfg.exec_cfg.transport.close()
@@ -9,7 +9,7 @@ import httpx
import anyio import anyio
from .logging import get_logger from ..logging import get_logger
logger = get_logger(__name__) logger = get_logger(__name__)
+31
View File
@@ -0,0 +1,31 @@
from __future__ import annotations
import tomllib
from pathlib import Path
from ..config import ConfigError
HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml"
def _read_config(cfg_path: Path) -> dict:
try:
raw = cfg_path.read_text(encoding="utf-8")
except FileNotFoundError:
raise ConfigError(f"Missing config file {cfg_path}.") from None
except OSError as e:
raise ConfigError(f"Failed to read config file {cfg_path}: {e}") from e
try:
return tomllib.loads(raw)
except tomllib.TOMLDecodeError as e:
raise ConfigError(f"Malformed TOML in {cfg_path}: {e}") from None
def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]:
if path:
cfg_path = Path(path).expanduser()
return _read_config(cfg_path), cfg_path
cfg_path = HOME_CONFIG_PATH
if cfg_path.exists() and not cfg_path.is_file():
raise ConfigError(f"Config path {cfg_path} exists but is not a file.") from None
return _read_config(cfg_path), cfg_path
@@ -20,12 +20,13 @@ from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from rich.table import Table from rich.table import Table
from .backends import EngineBackend, SetupIssue from ..backends import EngineBackend, SetupIssue
from .backends_helpers import install_issue from ..backends_helpers import install_issue
from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config from ..config import ConfigError
from .engines import list_backends from ..engines import list_backends
from .logging import suppress_logs from ..logging import suppress_logs
from .telegram import TelegramClient, TelegramRetryAfter from .client import TelegramClient, TelegramRetryAfter
from .config import HOME_CONFIG_PATH, load_telegram_config
@dataclass(slots=True) @dataclass(slots=True)
+39
View File
@@ -0,0 +1,39 @@
from __future__ import annotations
import re
from typing import Any
from markdown_it import MarkdownIt
from sulguk import transform_html
from ..markdown import MarkdownParts, assemble_markdown_parts
_MD_RENDERER = MarkdownIt("commonmark", {"html": False})
_BULLET_RE = re.compile(r"(?m)^(\s*)•")
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _MD_RENDERER.render(md or "")
rendered = transform_html(html)
text = _BULLET_RE.sub(r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def trim_body(body: str | None) -> str | None:
if not body:
return None
if len(body) > 3500:
body = body[: 3500 - 1] + ""
return body if body.strip() else None
def prepare_telegram(parts: MarkdownParts) -> tuple[str, list[dict[str, Any]]]:
trimmed = MarkdownParts(
header=parts.header or "",
body=trim_body(parts.body),
footer=parts.footer,
)
return render_markdown(assemble_markdown_parts(trimmed))
+49
View File
@@ -0,0 +1,49 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Protocol, TypeAlias
ChannelId: TypeAlias = int | str
MessageId: TypeAlias = int | str
@dataclass(frozen=True, slots=True)
class MessageRef:
channel_id: ChannelId
message_id: MessageId
raw: Any | None = field(default=None, compare=False, hash=False)
@dataclass(frozen=True, slots=True)
class RenderedMessage:
text: str
extra: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class SendOptions:
reply_to: MessageRef | None = None
notify: bool = True
replace: MessageRef | None = None
class Transport(Protocol):
async def close(self) -> None: ...
async def send(
self,
*,
channel_id: ChannelId,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef | None: ...
async def edit(
self,
*,
ref: MessageRef,
message: RenderedMessage,
wait: bool = True,
) -> MessageRef | None: ...
async def delete(self, *, ref: MessageRef) -> bool: ...
+126 -670
View File
@@ -3,25 +3,18 @@ import uuid
import anyio import anyio
import pytest import pytest
from takopi.bridge import _build_bot_commands, _strip_engine_command from takopi.runner_bridge import ExecBridgeConfig, IncomingMessage, handle_message
from takopi.markdown import MarkdownParts, MarkdownPresenter
from takopi.model import EngineId, ResumeToken, TakopiEvent from takopi.model import EngineId, ResumeToken, TakopiEvent
from takopi.render import MarkdownParts, prepare_telegram from takopi.telegram.render import prepare_telegram
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.codex import CodexRunner from takopi.runners.codex import CodexRunner
from takopi.telegram import TelegramClient from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Wait
from takopi.runners.mock import Advance, Emit, Raise, Return, ScriptRunner, Sleep, Wait from takopi.transport import MessageRef, RenderedMessage, SendOptions
from tests.factories import action_completed, action_started from tests.factories import action_completed, action_started
CODEX_ENGINE = EngineId("codex") CODEX_ENGINE = EngineId("codex")
def _make_router(runner) -> AutoRouter:
return AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
def _patch_config(monkeypatch, config): def _patch_config(monkeypatch, config):
from pathlib import Path from pathlib import Path
@@ -34,6 +27,67 @@ def _patch_config(monkeypatch, config):
) )
class _FakeTransport:
def __init__(self) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[MessageRef] = []
async def send(
self,
*,
channel_id: int | str,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef:
ref = MessageRef(channel_id=channel_id, message_id=self._next_id)
self._next_id += 1
self.send_calls.append(
{
"ref": ref,
"channel_id": channel_id,
"message": message,
"options": options,
}
)
return ref
async def edit(
self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True
) -> MessageRef:
self.edit_calls.append({"ref": ref, "message": message, "wait": wait})
return ref
async def delete(self, *, ref: MessageRef) -> bool:
self.delete_calls.append(ref)
return True
async def close(self) -> None:
return None
class _FakeClock:
def __init__(self, start: float = 0.0) -> None:
self._now = start
def __call__(self) -> float:
return self._now
def set(self, value: float) -> None:
self._now = value
def _return_runner(
*, answer: str = "ok", resume_value: str | None = None
) -> ScriptRunner:
return ScriptRunner(
[Return(answer=answer)],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
def test_load_and_validate_config_rejects_empty_token(monkeypatch) -> None: def test_load_and_validate_config_rejects_empty_token(monkeypatch) -> None:
from takopi import cli from takopi import cli
@@ -125,250 +179,36 @@ def test_prepare_telegram_preserves_entities_on_truncate() -> None:
assert any(e.get("type") == "bold" for e in entities) assert any(e.get("type") == "bold" for e in entities)
def test_strip_engine_command_inline() -> None:
text, engine = _strip_engine_command(
"/claude do it", engine_ids=("codex", "claude")
)
assert engine == "claude"
assert text == "do it"
def test_strip_engine_command_newline() -> None:
text, engine = _strip_engine_command(
"/codex\nhello", engine_ids=("codex", "claude")
)
assert engine == "codex"
assert text == "hello"
def test_strip_engine_command_ignores_unknown() -> None:
text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude"))
assert engine is None
assert text == "/unknown hi"
def test_strip_engine_command_bot_suffix() -> None:
text, engine = _strip_engine_command(
"/claude@bunny_agent_bot hi", engine_ids=("claude",)
)
assert engine == "claude"
assert text == "hi"
def test_strip_engine_command_only_first_non_empty_line() -> None:
text, engine = _strip_engine_command(
"hello\n/claude hi", engine_ids=("codex", "claude")
)
assert engine is None
assert text == "hello\n/claude hi"
def test_build_bot_commands_includes_cancel_and_engine() -> None:
runner = ScriptRunner(
[Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid"
)
router = _make_router(runner)
commands = _build_bot_commands(router)
assert {"command": "cancel", "description": "cancel run"} in commands
assert any(cmd["command"] == "codex" for cmd in commands)
class _FakeBot:
def __init__(self) -> None:
self._next_id = 1
self.command_calls: list[dict] = []
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[dict] = []
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
replace_message_id: int | None = None,
) -> dict:
_ = replace_message_id
self.send_calls.append(
{
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
"disable_notification": disable_notification,
"entities": entities,
"parse_mode": parse_mode,
}
)
msg_id = self._next_id
self._next_id += 1
return {"message_id": msg_id}
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = True,
) -> dict:
_ = wait
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
}
)
return {"message_id": message_id}
async def delete_message(
self,
chat_id: int,
message_id: int,
) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
async def set_my_commands(
self,
commands: list[dict],
*,
scope: dict | None = None,
language_code: str | None = None,
) -> bool:
self.command_calls.append(
{
"commands": commands,
"scope": scope,
"language_code": language_code,
}
)
return True
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
return []
async def close(self) -> None:
return None
async def get_me(self) -> dict | None:
return {"id": 1}
class _FakeClock:
def __init__(self, start: float = 0.0) -> None:
self._now = start
self._sleep_until: float | None = None
self._sleep_event: anyio.Event | None = None
self.sleep_calls = 0
def __call__(self) -> float:
return self._now
def set(self, value: float) -> None:
self._now = value
if self._sleep_until is None or self._sleep_event is None:
return
if self._sleep_until <= self._now:
self._sleep_event.set()
self._sleep_until = None
self._sleep_event = None
async def sleep(self, delay: float) -> None:
if delay <= 0:
await anyio.sleep(0)
return
self.sleep_calls += 1
self._sleep_until = self._now + delay
self._sleep_event = anyio.Event()
await self._sleep_event.wait()
def _queued_bot(
bot: "_FakeBot", *, clock: "_FakeClock | None" = None
) -> TelegramClient:
if clock is None:
return TelegramClient(
client=bot,
private_chat_rps=0.0,
group_chat_rps=0.0,
)
return TelegramClient(
client=bot,
clock=clock,
sleep=clock.sleep,
private_chat_rps=0.0,
group_chat_rps=0.0,
)
def _return_runner(
*, answer: str = "ok", resume_value: str | None = None
) -> ScriptRunner:
return ScriptRunner(
[Return(answer=answer)],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
@pytest.mark.anyio @pytest.mark.anyio
async def test_final_notify_sends_loud_final_message() -> None: async def test_final_notify_sends_loud_final_message() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
runner = _return_runner(answer="ok") runner = _return_runner(answer="ok")
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"),
user_msg_id=10,
text="hi",
resume_token=None, resume_token=None,
) )
assert len(bot.send_calls) == 2 assert len(transport.send_calls) == 2
assert bot.send_calls[0]["disable_notification"] is True assert transport.send_calls[0]["options"].notify is False
assert bot.send_calls[1]["disable_notification"] is False assert transport.send_calls[1]["options"].notify is True
@pytest.mark.anyio @pytest.mark.anyio
async def test_handle_message_strips_resume_line_from_prompt() -> None: async def test_handle_message_strips_resume_line_from_prompt() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE) runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
resume = ResumeToken(engine=CODEX_ENGINE, value="sid") resume = ResumeToken(engine=CODEX_ENGINE, value="sid")
text = "do this\n`codex resume sid`\nand that" text = "do this\n`codex resume sid`\nand that"
@@ -376,9 +216,7 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=10, text=text),
user_msg_id=10,
text=text,
resume_token=resume, resume_token=resume,
) )
@@ -390,38 +228,30 @@ async def test_handle_message_strips_resume_line_from_prompt() -> None:
@pytest.mark.anyio @pytest.mark.anyio
async def test_long_final_message_edits_progress_message() -> None: async def test_long_final_message_edits_progress_message() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
runner = _return_runner(answer="x" * 10_000) runner = _return_runner(answer="x" * 10_000)
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=False, final_notify=False,
startup_msg="",
) )
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"),
user_msg_id=10,
text="hi",
resume_token=None, resume_token=None,
) )
assert len(bot.send_calls) == 1 assert len(transport.send_calls) == 1
assert bot.send_calls[0]["disable_notification"] is True assert transport.send_calls[0]["options"].notify is False
assert bot.edit_calls assert transport.edit_calls
assert "done" in bot.edit_calls[-1]["text"].lower() assert "done" in transport.edit_calls[-1]["message"].text.lower()
@pytest.mark.anyio @pytest.mark.anyio
async def test_progress_edits_are_rate_limited() -> None: async def test_progress_edits_are_best_effort() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
clock = _FakeClock() clock = _FakeClock()
events: list[TakopiEvent] = [ events: list[TakopiEvent] = [
action_started("item_0", "command", "echo 1"), action_started("item_0", "command", "echo 1"),
@@ -437,88 +267,28 @@ async def test_progress_edits_are_rate_limited() -> None:
engine=CODEX_ENGINE, engine=CODEX_ENGINE,
advance=clock.set, advance=clock.set,
) )
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot, clock=clock), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=10, text="hi"),
user_msg_id=10,
text="hi",
resume_token=None, resume_token=None,
clock=clock, clock=clock,
) )
assert bot.edit_calls assert transport.edit_calls
assert "working" in bot.edit_calls[-1]["text"].lower() assert all(call["wait"] is False for call in transport.edit_calls)
assert "working" in transport.edit_calls[-1]["message"].text.lower()
@pytest.mark.anyio
async def test_progress_edits_do_not_sleep_again_without_new_events() -> None:
from takopi.bridge import BridgeConfig, handle_message
bot = _FakeBot()
clock = _FakeClock()
hold = anyio.Event()
events: list[TakopiEvent] = [
action_started("item_0", "command", "echo 1"),
action_started("item_1", "command", "echo 2"),
]
runner = ScriptRunner(
[
Emit(events[0], at=0.2),
Emit(events[1], at=0.4),
Wait(hold),
Return(answer="ok"),
],
engine=CODEX_ENGINE,
advance=clock.set,
)
cfg = BridgeConfig(
bot=_queued_bot(bot, clock=clock),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
async def run_handle_message() -> None:
await handle_message(
cfg,
runner=runner,
chat_id=123,
user_msg_id=10,
text="hi",
resume_token=None,
clock=clock,
)
async with anyio.create_task_group() as tg:
tg.start_soon(run_handle_message)
for _ in range(100):
if bot.edit_calls:
break
await anyio.sleep(0)
assert bot.edit_calls
assert clock.sleep_calls == 0
assert clock._sleep_until is None
hold.set()
@pytest.mark.anyio @pytest.mark.anyio
async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None: async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
clock = _FakeClock() clock = _FakeClock()
events: list[TakopiEvent] = [ events: list[TakopiEvent] = [
action_started("item_0", "command", "echo ok"), action_started("item_0", "command", "echo ok"),
@@ -541,182 +311,32 @@ async def test_bridge_flow_sends_progress_edits_and_final_resume() -> None:
advance=clock.set, advance=clock.set,
resume_value=session_id, resume_value=session_id,
) )
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot, clock=clock), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=42, text="do it"),
user_msg_id=42,
text="do it",
resume_token=None, resume_token=None,
clock=clock, clock=clock,
) )
assert bot.send_calls[0]["reply_to_message_id"] == 42 assert transport.send_calls[0]["options"].reply_to.message_id == 42
assert "starting" in bot.send_calls[0]["text"] assert "starting" in transport.send_calls[0]["message"].text
assert "codex" in bot.send_calls[0]["text"] assert "codex" in transport.send_calls[0]["message"].text
assert len(bot.edit_calls) >= 1 assert len(transport.edit_calls) >= 1
assert session_id in bot.send_calls[-1]["text"] assert session_id in transport.send_calls[-1]["message"].text
assert "codex resume" in bot.send_calls[-1]["text"].lower() assert "codex resume" in transport.send_calls[-1]["message"].text.lower()
assert len(bot.delete_calls) == 1 assert transport.send_calls[-1]["options"].replace == transport.send_calls[0]["ref"]
@pytest.mark.anyio
async def test_handle_cancel_without_reply_prompts_user() -> None:
from takopi.bridge import BridgeConfig, _handle_cancel
bot = _FakeBot()
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
msg = {"chat": {"id": 123}, "message_id": 10}
running_tasks: dict = {}
await _handle_cancel(cfg, msg, running_tasks)
assert len(bot.send_calls) == 1
assert "reply to the progress message" in bot.send_calls[0]["text"]
@pytest.mark.anyio
async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None:
from takopi.bridge import BridgeConfig, _handle_cancel
bot = _FakeBot()
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"text": "no message id"},
}
running_tasks: dict = {}
await _handle_cancel(cfg, msg, running_tasks)
assert len(bot.send_calls) == 1
assert "nothing is currently running" in bot.send_calls[0]["text"]
@pytest.mark.anyio
async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
from takopi.bridge import BridgeConfig, _handle_cancel
bot = _FakeBot()
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
progress_id = 99
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": progress_id},
}
running_tasks: dict = {} # Progress message not in running_tasks
await _handle_cancel(cfg, msg, running_tasks)
assert len(bot.send_calls) == 1
assert "nothing is currently running" in bot.send_calls[0]["text"]
@pytest.mark.anyio
async def test_handle_cancel_cancels_running_task() -> None:
from takopi.bridge import BridgeConfig, _handle_cancel
bot = _FakeBot()
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
progress_id = 42
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": progress_id},
}
from takopi.bridge import RunningTask
running_task = RunningTask()
running_tasks = {progress_id: running_task}
await _handle_cancel(cfg, msg, running_tasks)
assert running_task.cancel_requested.is_set() is True
assert len(bot.send_calls) == 0 # No error message sent
@pytest.mark.anyio
async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
from takopi.bridge import BridgeConfig, _handle_cancel
bot = _FakeBot()
runner = _return_runner(answer="ok")
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
from takopi.bridge import RunningTask
task_first = RunningTask()
task_second = RunningTask()
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": 1},
}
running_tasks = {1: task_first, 2: task_second}
await _handle_cancel(cfg, msg, running_tasks)
assert task_first.cancel_requested.is_set() is True
assert task_second.cancel_requested.is_set() is False
assert len(bot.send_calls) == 0
def test_cancel_command_accepts_extra_text() -> None:
from takopi.bridge import _is_cancel_command
assert _is_cancel_command("/cancel now") is True
assert _is_cancel_command("/cancel@takopi please") is True
assert _is_cancel_command("/cancelled") is False
@pytest.mark.anyio @pytest.mark.anyio
async def test_handle_message_cancelled_renders_cancelled_state() -> None: async def test_handle_message_cancelled_renders_cancelled_state() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2" session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
hold = anyio.Event() hold = anyio.Event()
runner = ScriptRunner( runner = ScriptRunner(
@@ -724,12 +344,10 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
engine=CODEX_ENGINE, engine=CODEX_ENGINE,
resume_value=session_id, resume_value=session_id,
) )
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
running_tasks: dict = {} running_tasks: dict = {}
@@ -737,9 +355,9 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(
user_msg_id=10, channel_id=123, message_id=10, text="do something"
text="do something", ),
resume_token=None, resume_token=None,
running_tasks=running_tasks, running_tasks=running_tasks,
) )
@@ -756,199 +374,37 @@ async def test_handle_message_cancelled_renders_cancelled_state() -> None:
await running_task.resume_ready.wait() await running_task.resume_ready.wait()
running_task.cancel_requested.set() running_task.cancel_requested.set()
assert len(bot.send_calls) == 1 # Progress message assert len(transport.send_calls) == 1 # Progress message
assert len(bot.edit_calls) >= 1 assert len(transport.edit_calls) >= 1
last_edit = bot.edit_calls[-1]["text"] last_edit = transport.edit_calls[-1]["message"].text
assert "cancelled" in last_edit.lower() assert "cancelled" in last_edit.lower()
assert session_id in last_edit assert session_id in last_edit
@pytest.mark.anyio @pytest.mark.anyio
async def test_handle_message_error_preserves_resume_token() -> None: async def test_handle_message_error_preserves_resume_token() -> None:
from takopi.bridge import BridgeConfig, handle_message transport = _FakeTransport()
bot = _FakeBot()
session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2" session_id = "019b66fc-64c2-7a71-81cd-081c504cfeb2"
runner = ScriptRunner( runner = ScriptRunner(
[Raise(RuntimeError("boom"))], [Raise(RuntimeError("boom"))],
engine=CODEX_ENGINE, engine=CODEX_ENGINE,
resume_value=session_id, resume_value=session_id,
) )
cfg = BridgeConfig( cfg = ExecBridgeConfig(
bot=_queued_bot(bot), transport=transport,
router=_make_router(runner), presenter=MarkdownPresenter(),
chat_id=123,
final_notify=True, final_notify=True,
startup_msg="",
) )
await handle_message( await handle_message(
cfg, cfg,
runner=runner, runner=runner,
chat_id=123, incoming=IncomingMessage(channel_id=123, message_id=10, text="do something"),
user_msg_id=10,
text="do something",
resume_token=None, resume_token=None,
) )
assert bot.edit_calls assert transport.edit_calls
last_edit = bot.edit_calls[-1]["text"] last_edit = transport.edit_calls[-1]["message"].text
assert "error" in last_edit.lower() assert "error" in last_edit.lower()
assert session_id in last_edit assert session_id in last_edit
assert "codex resume" in last_edit.lower() assert "codex resume" in last_edit.lower()
@pytest.mark.anyio
async def test_send_with_resume_waits_for_token() -> None:
from takopi.bridge import RunningTask, _send_with_resume
bot = _FakeBot()
sent: list[tuple[int, int, str, ResumeToken | None]] = []
async def enqueue(
chat_id: int, user_msg_id: int, text: str, resume: ResumeToken
) -> None:
sent.append((chat_id, user_msg_id, text, resume))
running_task = RunningTask()
async def trigger_resume() -> None:
await anyio.sleep(0)
running_task.resume = ResumeToken(engine=CODEX_ENGINE, value="abc123")
running_task.resume_ready.set()
async with anyio.create_task_group() as tg:
tg.start_soon(trigger_resume)
await _send_with_resume(
bot,
enqueue,
running_task,
123,
10,
"hello",
)
assert sent == [
(123, 10, "hello", ResumeToken(engine=CODEX_ENGINE, value="abc123"))
]
@pytest.mark.anyio
async def test_send_with_resume_reports_when_missing() -> None:
from takopi.bridge import RunningTask, _send_with_resume
bot = _FakeBot()
sent: list[tuple[int, int, str, ResumeToken | None]] = []
async def enqueue(
chat_id: int, user_msg_id: int, text: str, resume: ResumeToken
) -> None:
sent.append((chat_id, user_msg_id, text, resume))
running_task = RunningTask()
running_task.done.set()
await _send_with_resume(
bot,
enqueue,
running_task,
123,
10,
"hello",
)
assert sent == []
assert bot.send_calls
assert "resume token" in bot.send_calls[-1]["text"].lower()
@pytest.mark.anyio
async def test_run_main_loop_routes_reply_to_running_resume() -> None:
from takopi.bridge import BridgeConfig, run_main_loop
progress_ready = anyio.Event()
stop_polling = anyio.Event()
reply_ready = anyio.Event()
hold = anyio.Event()
class _BotWithProgress(_FakeBot):
def __init__(self) -> None:
super().__init__()
self.progress_id: int | None = None
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
replace_message_id: int | None = None,
) -> dict:
msg = await super().send_message(
chat_id=chat_id,
text=text,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
entities=entities,
parse_mode=parse_mode,
replace_message_id=replace_message_id,
)
if self.progress_id is None and reply_to_message_id is not None:
self.progress_id = int(msg["message_id"])
progress_ready.set()
return msg
bot = _BotWithProgress()
resume_value = "abc123"
runner = ScriptRunner(
[Wait(hold), Sleep(0.05), Return(answer="ok")],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
cfg = BridgeConfig(
bot=_queued_bot(bot),
router=_make_router(runner),
chat_id=123,
final_notify=True,
startup_msg="",
)
async def poller(_cfg: BridgeConfig):
yield {
"message_id": 1,
"text": "first",
"chat": {"id": 123},
"from": {"id": 123},
}
await progress_ready.wait()
assert bot.progress_id is not None
reply_ready.set()
yield {
"message_id": 2,
"text": "followup",
"chat": {"id": 123},
"from": {"id": 123},
"reply_to_message": {"message_id": bot.progress_id},
}
await stop_polling.wait()
async with anyio.create_task_group() as tg:
tg.start_soon(run_main_loop, cfg, poller)
try:
with anyio.fail_after(2):
await reply_ready.wait()
await anyio.sleep(0)
hold.set()
with anyio.fail_after(2):
while len(runner.calls) < 2:
await anyio.sleep(0)
assert runner.calls[1][1] == ResumeToken(
engine=CODEX_ENGINE, value=resume_value
)
finally:
hold.set()
stop_polling.set()
tg.cancel_scope.cancel()
+63 -42
View File
@@ -2,18 +2,20 @@ from typing import cast
from types import SimpleNamespace from types import SimpleNamespace
from pathlib import Path from pathlib import Path
from takopi.model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent from takopi.markdown import (
from takopi.render import ( HARD_BREAK,
ExecProgressRenderer, MarkdownFormatter,
STATUS, STATUS,
action_status, action_status,
assemble_markdown_parts, assemble_markdown_parts,
format_elapsed, format_elapsed,
format_file_change_title, format_file_change_title,
render_event_cli, render_event_cli,
render_markdown,
shorten, shorten,
) )
from takopi.model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
from takopi.progress import ProgressTracker
from takopi.telegram.render import render_markdown
from tests.factories import ( from tests.factories import (
action_completed, action_completed,
action_started, action_started,
@@ -118,19 +120,21 @@ def test_file_change_renders_relative_paths_inside_cwd() -> None:
def test_progress_renderer_renders_progress_and_final() -> None: def test_progress_renderer_renders_progress_and_final() -> None:
r = ExecProgressRenderer( tracker = ProgressTracker(engine="codex")
max_actions=5, resume_formatter=_format_resume, engine="codex"
)
for evt in SAMPLE_EVENTS: for evt in SAMPLE_EVENTS:
r.note_event(evt) tracker.note_event(evt)
progress_parts = r.render_progress_parts(3.0) state = tracker.snapshot(resume_formatter=_format_resume)
formatter = MarkdownFormatter(max_actions=5)
progress_parts = formatter.render_progress_parts(state, elapsed_s=3.0)
progress = assemble_markdown_parts(progress_parts) progress = assemble_markdown_parts(progress_parts)
assert progress.startswith("working · codex · 3s · step 2") assert progress.startswith("working · codex · 3s · step 2")
assert "✓ `bash -lc ls`" in progress assert "✓ `bash -lc ls`" in progress
assert "`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" in progress assert "`codex resume 0199a213-81c0-7800-8aa1-bbab2a035a53`" in progress
final_parts = r.render_final_parts(3.0, "answer", status="done") final_parts = formatter.render_final_parts(
state, elapsed_s=3.0, status="done", answer="answer"
)
final = assemble_markdown_parts(final_parts) final = assemble_markdown_parts(final_parts)
assert final.startswith("done · codex · 3s · step 2") assert final.startswith("done · codex · 3s · step 2")
assert "✓ `bash -lc ls`" not in final assert "✓ `bash -lc ls`" not in final
@@ -142,7 +146,7 @@ def test_progress_renderer_renders_progress_and_final() -> None:
def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None: def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None:
r = ExecProgressRenderer(max_actions=3, command_width=20, engine="codex") tracker = ProgressTracker(engine="codex")
events = [ events = [
action_completed( action_completed(
f"item_{i}", f"item_{i}",
@@ -155,19 +159,23 @@ def test_progress_renderer_clamps_actions_and_ignores_unknown() -> None:
] ]
for evt in events: for evt in events:
assert r.note_event(evt) is True assert tracker.note_event(evt) is True
assert len(r.recent_actions) == 3 state = tracker.snapshot()
assert "echo 3" in r.recent_actions[0] formatter = MarkdownFormatter(max_actions=3, command_width=20)
assert "echo 5" in r.recent_actions[-1] parts = formatter.render_progress_parts(state, elapsed_s=0.0)
lines = parts.body.split(HARD_BREAK) if parts.body else []
assert len(lines) == 3
assert "echo 3" in lines[0]
assert "echo 5" in lines[-1]
mystery = SimpleNamespace(type="mystery") mystery = SimpleNamespace(type="mystery")
assert r.note_event(cast(TakopiEvent, mystery)) is False assert tracker.note_event(cast(TakopiEvent, mystery)) is False
def test_progress_renderer_renders_commands_in_markdown() -> None: def test_progress_renderer_renders_commands_in_markdown() -> None:
r = ExecProgressRenderer(max_actions=5, command_width=None, engine="codex") tracker = ProgressTracker(engine="codex")
for i in (30, 31, 32): for i in (30, 31, 32):
r.note_event( tracker.note_event(
action_completed( action_completed(
f"item_{i}", f"item_{i}",
"command", "command",
@@ -177,7 +185,9 @@ def test_progress_renderer_renders_commands_in_markdown() -> None:
) )
) )
md = assemble_markdown_parts(r.render_progress_parts(0.0)) state = tracker.snapshot()
formatter = MarkdownFormatter(max_actions=5, command_width=None)
md = assemble_markdown_parts(formatter.render_progress_parts(state, elapsed_s=0.0))
text, _ = render_markdown(md) text, _ = render_markdown(md)
assert "✓ echo 30" in text assert "✓ echo 30" in text
assert "✓ echo 31" in text assert "✓ echo 31" in text
@@ -185,7 +195,7 @@ def test_progress_renderer_renders_commands_in_markdown() -> None:
def test_progress_renderer_handles_duplicate_action_ids() -> None: def test_progress_renderer_handles_duplicate_action_ids() -> None:
r = ExecProgressRenderer(max_actions=5, engine="codex") tracker = ProgressTracker(engine="codex")
events = [ events = [
action_started("dup", "command", "echo first"), action_started("dup", "command", "echo first"),
action_completed( action_completed(
@@ -206,17 +216,19 @@ def test_progress_renderer_handles_duplicate_action_ids() -> None:
] ]
for evt in events: for evt in events:
assert r.note_event(evt) is True assert tracker.note_event(evt) is True
assert len(r.recent_actions) == 2 state = tracker.snapshot()
assert r.recent_actions[0].startswith("") formatter = MarkdownFormatter(max_actions=5)
assert "echo first" in r.recent_actions[0] parts = formatter.render_progress_parts(state, elapsed_s=0.0)
assert r.recent_actions[1].startswith("") lines = parts.body.split(HARD_BREAK) if parts.body else []
assert "echo second" in r.recent_actions[1] assert len(lines) == 1
assert lines[0].startswith("")
assert "echo second" in lines[0]
def test_progress_renderer_collapses_action_updates() -> None: def test_progress_renderer_collapses_action_updates() -> None:
r = ExecProgressRenderer(max_actions=5, engine="codex") tracker = ProgressTracker(engine="codex")
events = [ events = [
action_started("a-1", "command", "echo one"), action_started("a-1", "command", "echo one"),
action_started("a-1", "command", "echo two"), action_started("a-1", "command", "echo two"),
@@ -230,12 +242,16 @@ def test_progress_renderer_collapses_action_updates() -> None:
] ]
for evt in events: for evt in events:
assert r.note_event(evt) is True assert tracker.note_event(evt) is True
assert r.action_count == 1 assert tracker.action_count == 1
assert len(r.recent_actions) == 1 state = tracker.snapshot()
assert r.recent_actions[0].startswith("") formatter = MarkdownFormatter(max_actions=5)
assert "echo two" in r.recent_actions[0] parts = formatter.render_progress_parts(state, elapsed_s=0.0)
lines = parts.body.split(HARD_BREAK) if parts.body else []
assert len(lines) == 1
assert lines[0].startswith("")
assert "echo two" in lines[0]
def test_progress_renderer_deterministic_output() -> None: def test_progress_renderer_deterministic_output() -> None:
@@ -249,16 +265,18 @@ def test_progress_renderer_deterministic_output() -> None:
detail={"exit_code": 0}, detail={"exit_code": 0},
), ),
] ]
r1 = ExecProgressRenderer(max_actions=5, engine="codex") t1 = ProgressTracker(engine="codex")
r2 = ExecProgressRenderer(max_actions=5, engine="codex") t2 = ProgressTracker(engine="codex")
for evt in events: for evt in events:
r1.note_event(evt) t1.note_event(evt)
r2.note_event(evt) t2.note_event(evt)
f1 = MarkdownFormatter(max_actions=5)
f2 = MarkdownFormatter(max_actions=5)
assert assemble_markdown_parts( assert assemble_markdown_parts(
r1.render_progress_parts(1.0) f1.render_progress_parts(t1.snapshot(), elapsed_s=1.0)
) == assemble_markdown_parts(r2.render_progress_parts(1.0)) ) == assemble_markdown_parts(f2.render_progress_parts(t2.snapshot(), elapsed_s=1.0))
def test_format_elapsed_branches() -> None: def test_format_elapsed_branches() -> None:
@@ -319,9 +337,9 @@ def test_render_event_cli_ignores_turn_actions() -> None:
def test_progress_renderer_ignores_missing_action_id() -> None: def test_progress_renderer_ignores_missing_action_id() -> None:
renderer = ExecProgressRenderer(engine="codex") tracker = ProgressTracker(engine="codex")
resume = ResumeToken(engine="codex", value="abc") resume = ResumeToken(engine="codex", value="abc")
renderer.note_event(StartedEvent(engine="codex", resume=resume, title="Session")) tracker.note_event(StartedEvent(engine="codex", resume=resume, title="Session"))
event = ActionEvent( event = ActionEvent(
engine="codex", engine="codex",
@@ -329,7 +347,10 @@ def test_progress_renderer_ignores_missing_action_id() -> None:
phase="started", phase="started",
ok=None, ok=None,
) )
assert renderer.note_event(event) is False assert tracker.note_event(event) is False
header = assemble_markdown_parts(renderer.render_progress_parts(0.0)) formatter = MarkdownFormatter()
header = assemble_markdown_parts(
formatter.render_progress_parts(tracker.snapshot(), elapsed_s=0.0)
)
assert header.startswith("working · codex · 0s") assert header.startswith("working · codex · 0s")
+2 -1
View File
@@ -2,7 +2,8 @@ from __future__ import annotations
from pathlib import Path from pathlib import Path
from takopi import engines, onboarding from takopi import engines
from takopi.telegram import onboarding
def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None: def test_check_setup_marks_missing_codex(monkeypatch, tmp_path: Path) -> None:
+1 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
from takopi import onboarding from takopi.telegram import onboarding
from takopi.backends import EngineBackend from takopi.backends import EngineBackend
+1 -1
View File
@@ -1,4 +1,4 @@
from takopi.render import render_markdown from takopi.telegram.render import render_markdown
def test_render_markdown_basic_entities() -> None: def test_render_markdown_basic_entities() -> None:
+575
View File
@@ -0,0 +1,575 @@
import anyio
import pytest
from takopi.telegram.bridge import (
TelegramBridgeConfig,
TelegramTransport,
_build_bot_commands,
_handle_cancel,
_is_cancel_command,
_send_with_resume,
_strip_engine_command,
run_main_loop,
)
from takopi.runner_bridge import ExecBridgeConfig, RunningTask
from takopi.markdown import MarkdownPresenter
from takopi.model import EngineId, ResumeToken
from takopi.router import AutoRouter, RunnerEntry
from takopi.runners.mock import Return, ScriptRunner, Sleep, Wait
from takopi.transport import MessageRef, RenderedMessage, SendOptions
CODEX_ENGINE = EngineId("codex")
def _make_router(runner) -> AutoRouter:
return AutoRouter(
entries=[RunnerEntry(engine=runner.engine, runner=runner)],
default_engine=runner.engine,
)
class _FakeTransport:
def __init__(self, progress_ready: anyio.Event | None = None) -> None:
self._next_id = 1
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[MessageRef] = []
self.progress_ready = progress_ready
self.progress_ref: MessageRef | None = None
async def send(
self,
*,
channel_id: int | str,
message: RenderedMessage,
options: SendOptions | None = None,
) -> MessageRef:
ref = MessageRef(channel_id=channel_id, message_id=self._next_id)
self._next_id += 1
self.send_calls.append(
{
"ref": ref,
"channel_id": channel_id,
"message": message,
"options": options,
}
)
if (
self.progress_ref is None
and options is not None
and options.reply_to is not None
and options.notify is False
):
self.progress_ref = ref
if self.progress_ready is not None:
self.progress_ready.set()
return ref
async def edit(
self, *, ref: MessageRef, message: RenderedMessage, wait: bool = True
) -> MessageRef:
self.edit_calls.append({"ref": ref, "message": message, "wait": wait})
return ref
async def delete(self, *, ref: MessageRef) -> bool:
self.delete_calls.append(ref)
return True
async def close(self) -> None:
return None
class _FakeBot:
def __init__(self) -> None:
self.command_calls: list[dict] = []
self.send_calls: list[dict] = []
self.edit_calls: list[dict] = []
self.delete_calls: list[dict] = []
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict] | None:
_ = offset
_ = timeout_s
_ = allowed_updates
return []
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
replace_message_id: int | None = None,
) -> dict:
self.send_calls.append(
{
"chat_id": chat_id,
"text": text,
"reply_to_message_id": reply_to_message_id,
"disable_notification": disable_notification,
"entities": entities,
"parse_mode": parse_mode,
"replace_message_id": replace_message_id,
}
)
return {"message_id": 1}
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = True,
) -> dict:
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
"wait": wait,
}
)
return {"message_id": message_id}
async def delete_message(self, chat_id: int, message_id: int) -> bool:
self.delete_calls.append({"chat_id": chat_id, "message_id": message_id})
return True
async def set_my_commands(
self,
commands: list[dict],
*,
scope: dict | None = None,
language_code: str | None = None,
) -> bool:
self.command_calls.append(
{
"commands": commands,
"scope": scope,
"language_code": language_code,
}
)
return True
async def get_me(self) -> dict | None:
return {"id": 1}
async def close(self) -> None:
return None
def _make_cfg(
transport: _FakeTransport, runner: ScriptRunner | None = None
) -> TelegramBridgeConfig:
if runner is None:
runner = ScriptRunner([Return(answer="ok")], engine=CODEX_ENGINE)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
return TelegramBridgeConfig(
bot=_FakeBot(),
router=_make_router(runner),
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
def test_strip_engine_command_inline() -> None:
text, engine = _strip_engine_command(
"/claude do it", engine_ids=("codex", "claude")
)
assert engine == "claude"
assert text == "do it"
def test_strip_engine_command_newline() -> None:
text, engine = _strip_engine_command(
"/codex\nhello", engine_ids=("codex", "claude")
)
assert engine == "codex"
assert text == "hello"
def test_strip_engine_command_ignores_unknown() -> None:
text, engine = _strip_engine_command("/unknown hi", engine_ids=("codex", "claude"))
assert engine is None
assert text == "/unknown hi"
def test_strip_engine_command_bot_suffix() -> None:
text, engine = _strip_engine_command(
"/claude@bunny_agent_bot hi", engine_ids=("claude",)
)
assert engine == "claude"
assert text == "hi"
def test_strip_engine_command_only_first_non_empty_line() -> None:
text, engine = _strip_engine_command(
"hello\n/claude hi", engine_ids=("codex", "claude")
)
assert engine is None
assert text == "hello\n/claude hi"
def test_build_bot_commands_includes_cancel_and_engine() -> None:
runner = ScriptRunner(
[Return(answer="ok")], engine=CODEX_ENGINE, resume_value="sid"
)
router = _make_router(runner)
commands = _build_bot_commands(router)
assert {"command": "cancel", "description": "cancel run"} in commands
assert any(cmd["command"] == "codex" for cmd in commands)
@pytest.mark.anyio
async def test_telegram_transport_passes_replace_and_wait() -> None:
bot = _FakeBot()
transport = TelegramTransport(bot)
reply = MessageRef(channel_id=123, message_id=10)
replace = MessageRef(channel_id=123, message_id=11)
await transport.send(
channel_id=123,
message=RenderedMessage(text="hello"),
options=SendOptions(reply_to=reply, notify=True, replace=replace),
)
assert bot.send_calls
assert bot.send_calls[0]["replace_message_id"] == 11
await transport.edit(
ref=replace,
message=RenderedMessage(text="edit"),
wait=False,
)
assert bot.edit_calls
assert bot.edit_calls[0]["wait"] is False
@pytest.mark.anyio
async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
class _OutboxBot:
def __init__(self) -> None:
self.edit_calls: list[dict[str, object]] = []
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict] | None:
return None
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
replace_message_id: int | None = None,
) -> dict | None:
return None
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
*,
wait: bool = True,
) -> dict | None:
self.edit_calls.append(
{
"chat_id": chat_id,
"message_id": message_id,
"text": text,
"entities": entities,
"parse_mode": parse_mode,
"wait": wait,
}
)
if not wait:
return None
return {"message_id": message_id}
async def delete_message(
self,
chat_id: int,
message_id: int,
) -> bool:
return False
async def set_my_commands(
self,
commands: list[dict[str, object]],
*,
scope: dict[str, object] | None = None,
language_code: str | None = None,
) -> bool:
return False
async def get_me(self) -> dict | None:
return None
async def close(self) -> None:
return None
bot = _OutboxBot()
transport = TelegramTransport(bot)
ref = MessageRef(channel_id=123, message_id=1)
result = await transport.edit(
ref=ref,
message=RenderedMessage(text="edit"),
wait=False,
)
assert result == ref
assert bot.edit_calls
assert bot.edit_calls[0]["wait"] is False
@pytest.mark.anyio
async def test_handle_cancel_without_reply_prompts_user() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
msg = {"chat": {"id": 123}, "message_id": 10}
running_tasks: dict = {}
await _handle_cancel(cfg, msg, running_tasks)
assert len(transport.send_calls) == 1
assert "reply to the progress message" in transport.send_calls[0]["message"].text
@pytest.mark.anyio
async def test_handle_cancel_with_no_progress_message_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"text": "no message id"},
}
running_tasks: dict = {}
await _handle_cancel(cfg, msg, running_tasks)
assert len(transport.send_calls) == 1
assert "nothing is currently running" in transport.send_calls[0]["message"].text
@pytest.mark.anyio
async def test_handle_cancel_with_finished_task_says_nothing_running() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
progress_id = 99
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": progress_id},
}
running_tasks: dict = {}
await _handle_cancel(cfg, msg, running_tasks)
assert len(transport.send_calls) == 1
assert "nothing is currently running" in transport.send_calls[0]["message"].text
@pytest.mark.anyio
async def test_handle_cancel_cancels_running_task() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
progress_id = 42
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": progress_id},
}
running_task = RunningTask()
running_tasks = {MessageRef(channel_id=123, message_id=progress_id): running_task}
await _handle_cancel(cfg, msg, running_tasks)
assert running_task.cancel_requested.is_set() is True
assert len(transport.send_calls) == 0 # No error message sent
@pytest.mark.anyio
async def test_handle_cancel_only_cancels_matching_progress_message() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
task_first = RunningTask()
task_second = RunningTask()
msg = {
"chat": {"id": 123},
"message_id": 10,
"reply_to_message": {"message_id": 1},
}
running_tasks = {
MessageRef(channel_id=123, message_id=1): task_first,
MessageRef(channel_id=123, message_id=2): task_second,
}
await _handle_cancel(cfg, msg, running_tasks)
assert task_first.cancel_requested.is_set() is True
assert task_second.cancel_requested.is_set() is False
assert len(transport.send_calls) == 0
def test_cancel_command_accepts_extra_text() -> None:
assert _is_cancel_command("/cancel now") is True
assert _is_cancel_command("/cancel@takopi please") is True
assert _is_cancel_command("/cancelled") is False
@pytest.mark.anyio
async def test_send_with_resume_waits_for_token() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
sent: list[tuple[int, int, str, ResumeToken | None]] = []
async def enqueue(
chat_id: int, user_msg_id: int, text: str, resume: ResumeToken
) -> None:
sent.append((chat_id, user_msg_id, text, resume))
running_task = RunningTask()
async def trigger_resume() -> None:
await anyio.sleep(0)
running_task.resume = ResumeToken(engine=CODEX_ENGINE, value="abc123")
running_task.resume_ready.set()
async with anyio.create_task_group() as tg:
tg.start_soon(trigger_resume)
await _send_with_resume(
cfg,
enqueue,
running_task,
123,
10,
"hello",
)
assert sent == [
(123, 10, "hello", ResumeToken(engine=CODEX_ENGINE, value="abc123"))
]
assert transport.send_calls == []
@pytest.mark.anyio
async def test_send_with_resume_reports_when_missing() -> None:
transport = _FakeTransport()
cfg = _make_cfg(transport)
sent: list[tuple[int, int, str, ResumeToken | None]] = []
async def enqueue(
chat_id: int, user_msg_id: int, text: str, resume: ResumeToken
) -> None:
sent.append((chat_id, user_msg_id, text, resume))
running_task = RunningTask()
running_task.done.set()
await _send_with_resume(
cfg,
enqueue,
running_task,
123,
10,
"hello",
)
assert sent == []
assert transport.send_calls
assert "resume token" in transport.send_calls[-1]["message"].text.lower()
@pytest.mark.anyio
async def test_run_main_loop_routes_reply_to_running_resume() -> None:
progress_ready = anyio.Event()
stop_polling = anyio.Event()
reply_ready = anyio.Event()
hold = anyio.Event()
transport = _FakeTransport(progress_ready=progress_ready)
bot = _FakeBot()
resume_value = "abc123"
runner = ScriptRunner(
[Wait(hold), Sleep(0.05), Return(answer="ok")],
engine=CODEX_ENGINE,
resume_value=resume_value,
)
exec_cfg = ExecBridgeConfig(
transport=transport,
presenter=MarkdownPresenter(),
final_notify=True,
)
cfg = TelegramBridgeConfig(
bot=bot,
router=_make_router(runner),
chat_id=123,
startup_msg="",
exec_cfg=exec_cfg,
)
async def poller(_cfg: TelegramBridgeConfig):
yield {
"message_id": 1,
"text": "first",
"chat": {"id": 123},
"from": {"id": 123},
}
await progress_ready.wait()
assert transport.progress_ref is not None
reply_ready.set()
yield {
"message_id": 2,
"text": "followup",
"chat": {"id": 123},
"from": {"id": 123},
"reply_to_message": {"message_id": transport.progress_ref.message_id},
}
await stop_polling.wait()
async with anyio.create_task_group() as tg:
tg.start_soon(run_main_loop, cfg, poller)
try:
with anyio.fail_after(2):
await reply_ready.wait()
await anyio.sleep(0)
hold.set()
with anyio.fail_after(2):
while len(runner.calls) < 2:
await anyio.sleep(0)
assert runner.calls[1][1] == ResumeToken(
engine=CODEX_ENGINE, value=resume_value
)
finally:
hold.set()
stop_polling.set()
tg.cancel_scope.cancel()
+1 -1
View File
@@ -2,7 +2,7 @@ import httpx
import pytest import pytest
from takopi.logging import setup_logging from takopi.logging import setup_logging
from takopi.telegram import TelegramClient, TelegramRetryAfter from takopi.telegram.client import TelegramClient, TelegramRetryAfter
@pytest.mark.anyio @pytest.mark.anyio
+1 -1
View File
@@ -1,7 +1,7 @@
import anyio import anyio
import pytest import pytest
from takopi.telegram import TelegramClient, TelegramRetryAfter from takopi.telegram.client import TelegramClient, TelegramRetryAfter
class _FakeBot: class _FakeBot: