feat: introduce runner protocol and normalized event model (#7)

This commit is contained in:
banteg
2026-01-01 01:13:55 +04:00
committed by GitHub
parent a9f8967bf4
commit d296c0dbf1
36 changed files with 4749 additions and 1836 deletions
+1 -1
View File
@@ -1 +1 @@
__version__ = "0.1.0"
__version__ = "0.2.0"
+805
View File
@@ -0,0 +1,805 @@
"""Telegram bridge orchestration for running a single runner and streaming progress."""
from __future__ import annotations
import logging
import re
import time
import inspect
from collections import deque
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any
import anyio
from .markdown import TELEGRAM_MARKDOWN_LIMIT, prepare_telegram
from .model import CompletedEvent, ResumeToken, StartedEvent, TakopiEvent
from .render import ExecProgressRenderer
from .runner import Runner
from .telegram import BotClient
logger = logging.getLogger(__name__)
def _resolve_resume(
runner: Runner, text: str | None, reply_text: str | None
) -> ResumeToken | None:
return runner.extract_resume(text) or runner.extract_resume(reply_text)
def _is_cancel_command(text: str) -> bool:
stripped = text.strip()
if not stripped:
return False
command = stripped.split(maxsplit=1)[0]
return command == "/cancel" or command.startswith("/cancel@")
_RESUME_COMMAND_RE = re.compile(
r"(?im)^\s*`?(?P<engine>[a-z0-9_-]+)\s+resume\s+(?P<token>(?=[^`\s]*\d)[^`\s]+)`?\s*$"
)
def _resume_attempt(text: str | None) -> tuple[bool, str | None]:
if not text:
return False, None
match = _RESUME_COMMAND_RE.search(text)
if match:
return True, match.group("engine").lower()
return False, None
def _resume_warning_text(engine_hint: str | None, current_engine: str) -> str:
if engine_hint and engine_hint.lower() != current_engine.lower():
return (
f"That looks like a {engine_hint} resume command, but this bot is running "
f"{current_engine}. Starting a new thread."
)
return "Couldn't parse a resume command; starting a new thread."
def _strip_resume_lines(text: str, *, is_resume_line: Callable[[str], bool]) -> str:
stripped_lines: list[str] = []
for line in text.splitlines():
if is_resume_line(line) or _RESUME_COMMAND_RE.match(line):
continue
stripped_lines.append(line)
prompt = "\n".join(stripped_lines).strip()
return prompt or "continue"
async def _send_resume_warning(
bot: BotClient,
chat_id: int,
user_msg_id: int,
engine_hint: str | None,
current_engine: str,
) -> None:
await bot.send_message(
chat_id=chat_id,
text=_resume_warning_text(engine_hint, current_engine),
reply_to_message_id=user_msg_id,
disable_notification=True,
)
PROGRESS_EDIT_EVERY_S = 1.0
async def _send_or_edit_markdown(
bot: BotClient,
*,
chat_id: int,
text: str,
edit_message_id: int | None = None,
reply_to_message_id: int | None = None,
disable_notification: bool = False,
limit: int = TELEGRAM_MARKDOWN_LIMIT,
is_resume_line: Callable[[str], bool] | None = None,
prepared: tuple[str, list[dict[str, Any]] | None] | None = None,
) -> tuple[dict[str, Any] | None, bool]:
if prepared is None:
rendered, entities = prepare_telegram(
text, limit=limit, is_resume_line=is_resume_line
)
else:
rendered, entities = prepared
if edit_message_id is not None:
edited = await bot.edit_message_text(
chat_id=chat_id,
message_id=edit_message_id,
text=rendered,
entities=entities,
)
if edited is not None:
return (edited, True)
return (
await bot.send_message(
chat_id=chat_id,
text=rendered,
entities=entities,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
),
False,
)
class ProgressEdits:
def __init__(
self,
*,
bot: BotClient,
chat_id: int,
progress_id: int | None,
renderer: ExecProgressRenderer,
started_at: float,
progress_edit_every: float,
clock: Callable[[], float],
sleep: Callable[[float], Awaitable[None]],
limit: int,
last_edit_at: float,
last_rendered: str | None,
is_resume_line: Callable[[str], bool],
) -> None:
self.bot = bot
self.chat_id = chat_id
self.progress_id = progress_id
self.renderer = renderer
self.started_at = started_at
self.progress_edit_every = progress_edit_every
self.clock = clock
self.sleep = sleep
self.limit = limit
self.last_edit_at = last_edit_at
self.last_rendered = last_rendered
self.is_resume_line = is_resume_line
self._event_seq = 0
self._published_seq = 0
self.wakeup = anyio.Event()
async def _wait_for_wakeup(self) -> None:
await self.wakeup.wait()
self.wakeup = anyio.Event()
async def run(self) -> None:
if self.progress_id is None:
return
while True:
await self._wait_for_wakeup()
while self._published_seq < self._event_seq:
await self.sleep(
max(
0.0,
self.last_edit_at + self.progress_edit_every - self.clock(),
)
)
seq_at_render = self._event_seq
now = self.clock()
md = self.renderer.render_progress(now - self.started_at)
rendered, entities = prepare_telegram(
md, limit=self.limit, is_resume_line=self.is_resume_line
)
if rendered != self.last_rendered:
logger.debug(
"[progress] edit message_id=%s md=%s", self.progress_id, md
)
self.last_edit_at = now
edited = await self.bot.edit_message_text(
chat_id=self.chat_id,
message_id=self.progress_id,
text=rendered,
entities=entities,
)
if edited is not None:
self.last_rendered = rendered
self._published_seq = seq_at_render
async def on_event(self, evt: TakopiEvent) -> None:
if not self.renderer.note_event(evt):
return
if self.progress_id is None:
return
self._event_seq += 1
self.wakeup.set()
@dataclass(frozen=True)
class BridgeConfig:
bot: BotClient
runner: Runner
chat_id: int
final_notify: bool
startup_msg: str
progress_edit_every: float = PROGRESS_EDIT_EVERY_S
@dataclass
class RunningTask:
resume: ResumeToken | None = None
resume_ready: anyio.Event = field(default_factory=anyio.Event)
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
done: anyio.Event = field(default_factory=anyio.Event)
async def _send_startup(cfg: BridgeConfig) -> None:
logger.debug("[startup] message: %s", cfg.startup_msg)
sent = await cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg)
if sent is not None:
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
drained = 0
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
if updates is None:
logger.info("[startup] backlog drain failed")
return offset
logger.debug("[startup] backlog updates: %s", updates)
if not updates:
if drained:
logger.info("[startup] drained %s pending update(s)", drained)
return offset
offset = updates[-1]["update_id"] + 1
drained += len(updates)
async def handle_message(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
running_tasks: dict[int, RunningTask] | None = None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
clock: Callable[[], float] = time.monotonic,
sleep: Callable[[float], Awaitable[None]] = anyio.sleep,
progress_edit_every: float = PROGRESS_EDIT_EVERY_S,
) -> None:
logger.debug(
"[handle] incoming chat_id=%s message_id=%s resume=%r text=%s",
chat_id,
user_msg_id,
resume_token,
text,
)
started_at = clock()
runner = cfg.runner
is_resume_line = runner.is_resume_line
runner_text = _strip_resume_lines(text, is_resume_line=is_resume_line)
progress_renderer = ExecProgressRenderer(
max_actions=5, resume_formatter=runner.format_resume
)
progress_id: int | None = None
last_edit_at = 0.0
last_rendered: str | None = None
initial_md = progress_renderer.render_progress(
0.0, label=f"working ({runner.engine})"
)
initial_rendered, initial_entities = prepare_telegram(
initial_md, limit=TELEGRAM_MARKDOWN_LIMIT, is_resume_line=is_resume_line
)
logger.debug(
"[progress] send reply_to=%s md=%s rendered=%s entities=%s",
user_msg_id,
initial_md,
initial_rendered,
initial_entities,
)
progress_msg = await cfg.bot.send_message(
chat_id=chat_id,
text=initial_rendered,
entities=initial_entities,
reply_to_message_id=user_msg_id,
disable_notification=True,
)
if progress_msg is not None:
progress_id = int(progress_msg["message_id"])
last_edit_at = clock()
last_rendered = initial_rendered
logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id)
edits = ProgressEdits(
bot=cfg.bot,
chat_id=chat_id,
progress_id=progress_id,
renderer=progress_renderer,
started_at=started_at,
progress_edit_every=progress_edit_every,
clock=clock,
sleep=sleep,
limit=TELEGRAM_MARKDOWN_LIMIT,
last_edit_at=last_edit_at,
last_rendered=last_rendered,
is_resume_line=is_resume_line,
)
cancel_exc_type = anyio.get_cancelled_exc_class()
cancelled = False
error: Exception | None = None
resume_token_value: ResumeToken | None = None
answer: str | None = None
run_ok: bool | None = None
run_error: str | None = None
running_task: RunningTask | None = None
if running_tasks is not None and progress_id is not None:
running_task = RunningTask()
running_tasks[progress_id] = running_task
edits_scope = anyio.CancelScope()
async def run_edits() -> None:
try:
with edits_scope:
await edits.run()
except cancel_exc_type:
# Edits are best-effort; cancellation should not bubble into the task group.
return
async with anyio.create_task_group() as tg:
if progress_id is not None:
tg.start_soon(run_edits)
async def run_exec() -> CompletedEvent | None:
nonlocal cancelled
cancel_flag = False
completed: CompletedEvent | None = None
async with anyio.create_task_group() as exec_tg:
async def run_runner() -> None:
nonlocal resume_token_value, completed, answer, run_ok, run_error
try:
async for evt in runner.run(runner_text, resume_token):
if isinstance(evt, StartedEvent):
resume_token_value = evt.resume
if (
running_task is not None
and running_task.resume is None
):
running_task.resume = resume_token_value
running_task.resume_ready.set()
if on_thread_known is not None:
await on_thread_known(
resume_token_value, running_task.done
)
elif isinstance(evt, CompletedEvent):
resume_token_value = evt.resume or resume_token_value
answer = evt.answer
run_ok = evt.ok
run_error = evt.error
completed = evt
await edits.on_event(evt)
finally:
exec_tg.cancel_scope.cancel()
async def wait_cancel() -> None:
nonlocal cancel_flag
if running_task is None:
return
await running_task.cancel_requested.wait()
cancel_flag = True
exec_tg.cancel_scope.cancel()
exec_tg.start_soon(run_runner)
if running_task is not None:
exec_tg.start_soon(wait_cancel)
if cancel_flag:
cancelled = True
return completed
try:
completed = await run_exec()
if completed is not None:
resume_token_value = completed.resume or resume_token_value
answer = completed.answer
run_ok = completed.ok
run_error = completed.error
except Exception as e:
error = e
finally:
if (
running_task is not None
and running_tasks is not None
and progress_id is not None
):
running_task.done.set()
running_tasks.pop(progress_id, None)
if not cancelled and error is None:
await anyio.sleep(0)
edits_scope.cancel()
if error is not None:
elapsed = clock() - started_at
if resume_token_value is None:
resume_token_value = progress_renderer.resume_token
progress_renderer.resume_token = resume_token_value
err_body = f"Error:\n{error}"
final_md = progress_renderer.render_final(elapsed, err_body, status="error")
logger.debug("[error] markdown: %s", final_md)
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
edit_message_id=progress_id,
reply_to_message_id=user_msg_id,
disable_notification=True,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=is_resume_line,
)
if final_msg is None:
return
if progress_id is not None and not edited:
logger.debug("[error] delete progress message_id=%s", progress_id)
await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
return
elapsed = clock() - started_at
if cancelled:
if resume_token_value is None:
resume_token_value = progress_renderer.resume_token
logger.info(
"[handle] cancelled resume=%s elapsed=%.1fs",
resume_token_value.value if resume_token_value else None,
elapsed,
)
progress_renderer.resume_token = resume_token_value
final_md = progress_renderer.render_progress(elapsed, label="`cancelled`")
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
edit_message_id=progress_id,
reply_to_message_id=user_msg_id,
disable_notification=True,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=is_resume_line,
)
if final_msg is None:
return
if progress_id is not None and not edited:
logger.debug("[cancel] delete progress message_id=%s", progress_id)
await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
return
if answer is None:
raise RuntimeError("runner finished without a completed event")
final_answer = answer
if run_ok is False and run_error:
if final_answer.strip():
final_answer = f"{final_answer}\n\nError:\n{run_error}"
else:
final_answer = f"Error:\n{run_error}"
status = (
"error" if run_ok is False else ("done" if final_answer.strip() else "error")
)
if resume_token_value is None:
resume_token_value = progress_renderer.resume_token
progress_renderer.resume_token = resume_token_value
final_md = progress_renderer.render_final(elapsed, final_answer, status=status)
logger.debug("[final] markdown: %s", final_md)
final_rendered, final_entities = prepare_telegram(
final_md, limit=TELEGRAM_MARKDOWN_LIMIT, is_resume_line=is_resume_line
)
can_edit_final = progress_id is not None and final_entities is not None
edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id
if edit_message_id is None:
logger.debug(
"[final] send reply_to=%s rendered=%s entities=%s",
user_msg_id,
final_rendered,
final_entities,
)
else:
logger.debug(
"[final] edit message_id=%s rendered=%s entities=%s",
edit_message_id,
final_rendered,
final_entities,
)
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
edit_message_id=edit_message_id,
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
is_resume_line=is_resume_line,
prepared=(final_rendered, final_entities),
)
if final_msg is None:
return
if progress_id is not None and (edit_message_id is None or not edited):
logger.debug("[final] delete progress message_id=%s", progress_id)
await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
async def poll_updates(cfg: BridgeConfig):
offset: int | None = None
offset = await _drain_backlog(cfg, offset)
await _send_startup(cfg)
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
if updates is None:
logger.info("[loop] getUpdates failed")
await anyio.sleep(2)
continue
logger.debug("[loop] updates: %s", updates)
for upd in updates:
offset = upd["update_id"] + 1
msg = upd["message"]
if "text" not in msg:
continue
if not (msg["chat"]["id"] == msg["from"]["id"] == cfg.chat_id):
continue
yield msg
async def _handle_cancel(
cfg: BridgeConfig,
msg: dict[str, Any],
running_tasks: dict[int, RunningTask],
) -> None:
chat_id = msg["chat"]["id"]
user_msg_id = msg["message_id"]
reply = msg.get("reply_to_message")
if not reply:
await cfg.bot.send_message(
chat_id=chat_id,
text="reply to the progress message to cancel.",
reply_to_message_id=user_msg_id,
)
return
progress_id = reply.get("message_id")
if progress_id is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
running_task = running_tasks.get(int(progress_id))
if running_task is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
logger.info("[cancel] cancelling progress_message_id=%s", progress_id)
running_task.cancel_requested.set()
async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None:
if running_task.resume is not None:
return running_task.resume
resume: ResumeToken | None = None
async with anyio.create_task_group() as tg:
async def wait_resume() -> None:
nonlocal resume
await running_task.resume_ready.wait()
resume = running_task.resume
tg.cancel_scope.cancel()
async def wait_done() -> None:
await running_task.done.wait()
tg.cancel_scope.cancel()
tg.start_soon(wait_resume)
tg.start_soon(wait_done)
return resume
async def _send_with_resume(
bot: BotClient,
enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None] | None],
running_task: RunningTask,
chat_id: int,
user_msg_id: int,
text: str,
) -> None:
resume = await _wait_for_resume(running_task)
if resume is None:
await bot.send_message(
chat_id=chat_id,
text="resume token not ready yet; try replying to the final message.",
reply_to_message_id=user_msg_id,
disable_notification=True,
)
return
result = enqueue(chat_id, user_msg_id, text, resume)
if inspect.isawaitable(result):
await result
async def _run_main_loop(
cfg: BridgeConfig,
poller: Callable[[BridgeConfig], AsyncIterator[dict[str, Any]]] = poll_updates,
) -> None:
running_tasks: dict[int, RunningTask] = {}
try:
async with anyio.create_task_group() as tg:
scheduler_lock = anyio.Lock()
@dataclass(frozen=True, slots=True)
class ThreadJob:
chat_id: int
user_msg_id: int
text: str
resume_token: ResumeToken
pending_by_thread: dict[str, deque[ThreadJob]] = {}
active_threads: set[str] = set()
busy_until: dict[str, anyio.Event] = {}
def thread_key(token: ResumeToken) -> str:
return f"{token.engine}:{token.value}"
async def clear_busy(key: str, done: anyio.Event) -> None:
await done.wait()
async with scheduler_lock:
if busy_until.get(key) is done:
busy_until.pop(key, None)
async def note_thread_known(token: ResumeToken, done: anyio.Event) -> None:
key = thread_key(token)
async with scheduler_lock:
current = busy_until.get(key)
if current is None or current.is_set():
busy_until[key] = done
tg.start_soon(clear_busy, key, done)
async def run_job(
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken | None,
on_thread_known: Callable[[ResumeToken, anyio.Event], Awaitable[None]]
| None = None,
) -> None:
try:
await handle_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
text=text,
resume_token=resume_token,
running_tasks=running_tasks,
on_thread_known=on_thread_known,
progress_edit_every=cfg.progress_edit_every,
)
except Exception:
logger.exception("[handle] worker failed")
async def thread_worker(key: str) -> None:
try:
while True:
async with scheduler_lock:
done = busy_until.get(key)
queue = pending_by_thread.get(key)
if not queue:
pending_by_thread.pop(key, None)
active_threads.discard(key)
return
job = queue.popleft()
if done is not None and not done.is_set():
await done.wait()
await run_job(
job.chat_id,
job.user_msg_id,
job.text,
job.resume_token,
)
finally:
async with scheduler_lock:
active_threads.discard(key)
async def enqueue(
chat_id: int,
user_msg_id: int,
text: str,
resume_token: ResumeToken,
) -> None:
key = thread_key(resume_token)
async with scheduler_lock:
queue = pending_by_thread.get(key)
if queue is None:
queue = deque()
pending_by_thread[key] = queue
queue.append(
ThreadJob(
chat_id=chat_id,
user_msg_id=user_msg_id,
text=text,
resume_token=resume_token,
)
)
if key in active_threads:
return
active_threads.add(key)
tg.start_soon(thread_worker, key)
async for msg in poller(cfg):
text = msg["text"]
user_msg_id = msg["message_id"]
if _is_cancel_command(text):
tg.start_soon(_handle_cancel, cfg, msg, running_tasks)
continue
r = msg.get("reply_to_message") or {}
resume_token = _resolve_resume(cfg.runner, text, r.get("text"))
reply_id = r.get("message_id")
if resume_token is None and reply_id is not None:
running_task = running_tasks.get(int(reply_id))
if running_task is not None:
tg.start_soon(
_send_with_resume,
cfg.bot,
enqueue,
running_task,
msg["chat"]["id"],
user_msg_id,
text,
)
continue
if resume_token is None:
attempt_text, engine_text = _resume_attempt(text)
attempt_reply, engine_reply = _resume_attempt(r.get("text"))
attempt = attempt_text or attempt_reply
if attempt:
tg.start_soon(
_send_resume_warning,
cfg.bot,
msg["chat"]["id"],
user_msg_id,
engine_text or engine_reply,
str(cfg.runner.engine),
)
if resume_token is None:
tg.start_soon(
run_job,
msg["chat"]["id"],
user_msg_id,
text,
None,
note_thread_known,
)
else:
await enqueue(msg["chat"]["id"], user_msg_id, text, resume_token)
finally:
await cfg.bot.close()
+138
View File
@@ -0,0 +1,138 @@
from __future__ import annotations
import os
from typing import Any
import anyio
import typer
from . import __version__
from .bridge import BridgeConfig, _run_main_loop
from .config import ConfigError, load_telegram_config
from .engines import (
EngineBackend,
get_backend,
get_engine_config,
list_backend_ids,
parse_engine_overrides,
)
from .logging import setup_logging
from .onboarding import check_setup, render_setup_guide
from .telegram import TelegramClient
def _print_version_and_exit() -> None:
typer.echo(__version__)
raise typer.Exit()
def _version_callback(value: bool) -> None:
if value:
_print_version_and_exit()
def _parse_bridge_config(
*,
final_notify: bool,
backend: EngineBackend,
engine_overrides: dict[str, Any],
) -> BridgeConfig:
startup_pwd = os.getcwd()
config, config_path = load_telegram_config()
try:
token = config["bot_token"]
except KeyError:
raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None
if not isinstance(token, str) or not token.strip():
raise ConfigError(
f"Invalid `bot_token` in {config_path}; expected a non-empty string."
) from None
try:
chat_id_value = config["chat_id"]
except KeyError:
raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None
if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int):
raise ConfigError(
f"Invalid `chat_id` in {config_path}; expected an integer."
) from None
chat_id = chat_id_value
engine_cfg = get_engine_config(config, backend.id, config_path)
startup_msg = backend.startup_message(startup_pwd)
bot = TelegramClient(token)
runner = backend.build_runner(engine_cfg, engine_overrides, config_path)
return BridgeConfig(
bot=bot,
runner=runner,
chat_id=chat_id,
final_notify=final_notify,
startup_msg=startup_msg,
)
def run(
version: bool = typer.Option(
False,
"--version",
help="Show the version and exit.",
callback=_version_callback,
is_eager=True,
),
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
engine: str = typer.Option(
"codex",
"--engine",
help=f"Engine backend id ({', '.join(list_backend_ids())}).",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
engine_option: list[str] = typer.Option(
[],
"--engine-option",
"-E",
help="Engine-specific override in KEY=VALUE form (repeatable).",
),
) -> None:
setup_logging(debug=debug)
try:
backend = get_backend(engine)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
try:
overrides = parse_engine_overrides(engine_option)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
setup = check_setup(backend)
if not setup.ok:
render_setup_guide(setup)
raise typer.Exit(code=1)
try:
cfg = _parse_bridge_config(
final_notify=final_notify,
backend=backend,
engine_overrides=overrides,
)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
anyio.run(_run_main_loop, cfg)
def main() -> None:
typer.run(run)
if __name__ == "__main__":
main()
+38 -2
View File
@@ -3,8 +3,10 @@ from __future__ import annotations
import tomllib
from pathlib import Path
LOCAL_CONFIG_NAME = Path(".codex") / "takopi.toml"
HOME_CONFIG_PATH = Path.home() / ".codex" / "takopi.toml"
LOCAL_CONFIG_NAME = Path(".takopi") / "takopi.toml"
HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml"
LEGACY_LOCAL_CONFIG_NAME = Path(".codex") / "takopi.toml"
LEGACY_HOME_CONFIG_PATH = Path.home() / ".codex" / "takopi.toml"
class ConfigError(RuntimeError):
@@ -18,6 +20,32 @@ def _config_candidates() -> list[Path]:
return candidates
def _legacy_candidates() -> list[Path]:
candidates = [Path.cwd() / LEGACY_LOCAL_CONFIG_NAME, LEGACY_HOME_CONFIG_PATH]
if candidates[0] == candidates[1]:
return [candidates[0]]
return candidates
def _maybe_migrate_legacy(legacy_path: Path, target_path: Path) -> None:
if target_path.exists():
if not target_path.is_file():
raise ConfigError(
f"Config path {target_path} exists but is not a file."
) from None
return
if not legacy_path.is_file():
return
try:
target_path.parent.mkdir(parents=True, exist_ok=True)
raw = legacy_path.read_text(encoding="utf-8")
target_path.write_text(raw, encoding="utf-8")
except OSError as e:
raise ConfigError(
f"Failed to migrate legacy config {legacy_path} to {target_path}: {e}"
) from e
def _read_config(cfg_path: Path) -> dict:
try:
raw = cfg_path.read_text(encoding="utf-8")
@@ -36,11 +64,19 @@ def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]:
cfg_path = Path(path).expanduser()
return _read_config(cfg_path), cfg_path
for legacy, target in zip(_legacy_candidates(), _config_candidates(), strict=True):
_maybe_migrate_legacy(legacy, target)
candidates = _config_candidates()
for candidate in candidates:
if candidate.is_file():
return _read_config(candidate), candidate
legacy_candidates = _legacy_candidates()
for candidate in legacy_candidates:
if candidate.is_file():
return _read_config(candidate), candidate
if len(candidates) == 1:
raise ConfigError("Missing takopi config.")
raise ConfigError("Missing takopi config.")
+137
View File
@@ -0,0 +1,137 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable
from .config import ConfigError
from .runner import Runner
from .runners.codex import CodexRunner
EngineConfig = dict[str, Any]
EngineOverrides = dict[str, Any]
@dataclass(frozen=True, slots=True)
class SetupIssue:
title: str
lines: tuple[str, ...]
@dataclass(frozen=True, slots=True)
class EngineBackend:
id: str
display_name: str
check_setup: Callable[[EngineConfig, Path], list[SetupIssue]]
build_runner: Callable[[EngineConfig, EngineOverrides, Path], Runner]
startup_message: Callable[[str], str]
def _codex_check_setup(_config: EngineConfig, _config_path: Path) -> list[SetupIssue]:
if shutil.which("codex") is None:
return [
SetupIssue(
"Install the Codex CLI",
(" [dim]$[/] npm install -g @openai/codex",),
)
]
return []
def _codex_build_runner(
config: EngineConfig, overrides: EngineOverrides, config_path: Path
) -> Runner:
codex_cmd = shutil.which("codex")
if not codex_cmd:
raise ConfigError(
"codex not found on PATH. Install the Codex CLI with:\n"
" npm install -g @openai/codex\n"
" # or on macOS\n"
" brew install codex"
)
extra_args_value = config.get("extra_args")
if extra_args_value is None:
extra_args = ["-c", "notify=[]"]
elif isinstance(extra_args_value, list) and all(
isinstance(item, str) for item in extra_args_value
):
extra_args = list(extra_args_value)
else:
raise ConfigError(
f"Invalid `codex.extra_args` in {config_path}; expected a list of strings."
)
title = "Codex"
profile_value = config.get("profile")
if profile_value:
if not isinstance(profile_value, str):
raise ConfigError(
f"Invalid `codex.profile` in {config_path}; expected a string."
)
extra_args.extend(["--profile", profile_value])
title = profile_value
if overrides:
unknown = ", ".join(sorted(overrides))
raise ConfigError(f"Unknown codex override(s): {unknown}")
return CodexRunner(codex_cmd=codex_cmd, extra_args=extra_args, title=title)
def _codex_startup_message(cwd: str) -> str:
return f"codex is ready\npwd: {cwd}"
_ENGINE_BACKENDS: dict[str, EngineBackend] = {
"codex": EngineBackend(
id="codex",
display_name="Codex",
check_setup=_codex_check_setup,
build_runner=_codex_build_runner,
startup_message=_codex_startup_message,
),
}
def get_backend(engine_id: str) -> EngineBackend:
try:
return _ENGINE_BACKENDS[engine_id]
except KeyError as exc:
available = ", ".join(sorted(_ENGINE_BACKENDS))
raise ConfigError(
f"Unknown engine {engine_id!r}. Available: {available}."
) from exc
def list_backends() -> list[EngineBackend]:
return list(_ENGINE_BACKENDS.values())
def list_backend_ids() -> list[str]:
return sorted(_ENGINE_BACKENDS)
def parse_engine_overrides(options: list[str]) -> EngineOverrides:
overrides: EngineOverrides = {}
for raw in options:
key, sep, value = raw.partition("=")
if not sep:
raise ConfigError(f"Invalid --engine-option {raw!r}; expected KEY=VALUE.")
key = key.strip()
if not key:
raise ConfigError(f"Invalid --engine-option {raw!r}; expected KEY=VALUE.")
overrides[key] = value
return overrides
def get_engine_config(
config: dict[str, Any], engine_id: str, config_path: Path
) -> EngineConfig:
engine_cfg = config.get(engine_id) or {}
if not isinstance(engine_cfg, dict):
raise ConfigError(
f"Invalid `{engine_id}` config in {config_path}; expected a table."
)
return engine_cfg
-895
View File
@@ -1,895 +0,0 @@
from __future__ import annotations
import inspect
import json
import logging
import os
import re
import shutil
import subprocess
import time
from collections import deque
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any
from weakref import WeakValueDictionary
import anyio
import typer
from anyio.abc import ByteReceiveStream, Process
from anyio.streams.text import TextReceiveStream
from . import __version__
from .config import ConfigError, load_telegram_config
from .exec_render import (
ExecProgressRenderer,
render_event_cli,
render_markdown,
)
from .logging import setup_logging
from .onboarding import check_setup, render_setup_guide
from .telegram import TelegramClient
logger = logging.getLogger(__name__)
UUID_PATTERN_TEXT = r"\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b"
UUID_PATTERN = re.compile(UUID_PATTERN_TEXT, re.IGNORECASE)
RESUME_LINE = re.compile(
rf"^\s*resume\s*:\s*`?(?P<id>{UUID_PATTERN_TEXT})`?\s*$",
re.IGNORECASE | re.MULTILINE,
)
def _print_version_and_exit() -> None:
typer.echo(__version__)
raise typer.Exit()
def _version_callback(value: bool) -> None:
if value:
_print_version_and_exit()
def extract_session_id(text: str | None) -> str | None:
if not text:
return None
found: str | None = None
for match in RESUME_LINE.finditer(text):
found = match.group("id")
return found
def resolve_resume_session(text: str | None, reply_text: str | None) -> str | None:
return extract_session_id(text) or extract_session_id(reply_text)
async def _iter_text_lines(stream: ByteReceiveStream) -> AsyncIterator[str]:
text_stream = TextReceiveStream(stream, errors="replace")
buffer = ""
while True:
try:
chunk = await text_stream.receive()
except anyio.EndOfStream:
if buffer:
yield buffer
return
buffer += chunk
while True:
split_at = buffer.find("\n")
if split_at < 0:
break
line = buffer[: split_at + 1]
buffer = buffer[split_at + 1 :]
yield line
async def _drain_stderr(stderr: ByteReceiveStream, tail: deque[str]) -> None:
try:
async for line in _iter_text_lines(stderr):
logger.info("[codex][stderr] %s", line.rstrip())
tail.append(line)
except Exception as e:
logger.debug("[codex][stderr] drain error: %s", e)
async def _wait_for_process(proc: Process, timeout: float) -> bool:
with anyio.move_on_after(timeout) as scope:
await proc.wait()
return scope.cancel_called
@asynccontextmanager
async def manage_subprocess(*args, terminate_timeout: float = 2.0, **kwargs):
proc = await anyio.open_process(args, **kwargs)
try:
yield proc
finally:
if proc.returncode is None:
with anyio.CancelScope(shield=True):
try:
proc.terminate()
except ProcessLookupError:
pass
timed_out = await _wait_for_process(proc, terminate_timeout)
if timed_out:
logger.debug(
"[codex] terminate timed out pid=%s; leaving process to exit",
proc.pid,
)
TELEGRAM_MARKDOWN_LIMIT = 3500
PROGRESS_EDIT_EVERY_S = 2.0
def _clamp_tg_text(text: str, limit: int = TELEGRAM_MARKDOWN_LIMIT) -> str:
if len(text) <= limit:
return text
return text[: limit - 20] + "\n...(truncated)"
def truncate_for_telegram(text: str, limit: int) -> str:
"""
Truncate text to fit Telegram limits while preserving the trailing `resume: ...`
line (if present), otherwise preserving the last non-empty line.
"""
if len(text) <= limit:
return text
lines = text.splitlines()
tail_lines: list[str] | None = None
is_resume_tail = False
for i in range(len(lines) - 1, -1, -1):
line = lines[i]
if "resume" in line and UUID_PATTERN.search(line):
tail_lines = lines[i:]
is_resume_tail = True
break
if tail_lines is None:
for i in range(len(lines) - 1, -1, -1):
if lines[i].strip():
tail_lines = [lines[i]]
break
tail = "\n".join(tail_lines or []).strip("\n")
sep = "\n\n"
max_tail = limit if is_resume_tail else (limit // 4)
tail = tail[-max_tail:] if max_tail > 0 else ""
head_budget = limit - len(sep) - len(tail)
if head_budget <= 0:
return tail[-limit:] if tail else text[:limit]
head = text[:head_budget].rstrip()
return (head + sep + tail)[:limit]
def prepare_telegram(md: str, *, limit: int) -> tuple[str, list[dict[str, Any]] | None]:
rendered, entities = render_markdown(md)
if len(rendered) > limit:
rendered = truncate_for_telegram(rendered, limit)
return rendered, None
return rendered, entities
async def _send_or_edit_markdown(
bot: TelegramClient,
*,
chat_id: int,
text: str,
edit_message_id: int | None = None,
reply_to_message_id: int | None = None,
disable_notification: bool = False,
limit: int = TELEGRAM_MARKDOWN_LIMIT,
) -> tuple[dict[str, Any] | None, bool]:
if edit_message_id is not None:
rendered, entities = prepare_telegram(text, limit=limit)
edited = await bot.edit_message_text(
chat_id=chat_id,
message_id=edit_message_id,
text=rendered,
entities=entities,
)
if edited is not None:
return (edited, True)
rendered, entities = prepare_telegram(text, limit=limit)
return (
await bot.send_message(
chat_id=chat_id,
text=rendered,
entities=entities,
reply_to_message_id=reply_to_message_id,
disable_notification=disable_notification,
),
False,
)
EventCallback = Callable[[dict[str, Any]], Awaitable[None] | None]
class ProgressEdits:
def __init__(
self,
*,
bot: TelegramClient,
chat_id: int,
progress_id: int | None,
renderer: ExecProgressRenderer,
started_at: float,
progress_edit_every: float,
clock: Callable[[], float],
sleep: Callable[[float], Awaitable[None]],
limit: int,
last_edit_at: float,
last_rendered: str | None,
) -> None:
self.bot = bot
self.chat_id = chat_id
self.progress_id = progress_id
self.renderer = renderer
self.started_at = started_at
self.progress_edit_every = progress_edit_every
self.clock = clock
self.sleep = sleep
self.limit = limit
self.last_edit_at = last_edit_at
self.last_rendered = last_rendered
self._event_seq = 0
self._published_seq = 0
self.wakeup = anyio.Event()
async def _wait_for_wakeup(self) -> None:
await self.wakeup.wait()
self.wakeup = anyio.Event()
async def run(self) -> None:
if self.progress_id is None:
return
while True:
await self._wait_for_wakeup()
while self._published_seq < self._event_seq:
await self.sleep(
max(
0.0,
self.last_edit_at + self.progress_edit_every - self.clock(),
)
)
seq_at_render = self._event_seq
now = self.clock()
md = self.renderer.render_progress(now - self.started_at)
rendered, entities = prepare_telegram(md, limit=self.limit)
if rendered != self.last_rendered:
logger.debug(
"[progress] edit message_id=%s md=%s", self.progress_id, md
)
self.last_edit_at = now
edited = await self.bot.edit_message_text(
chat_id=self.chat_id,
message_id=self.progress_id,
text=rendered,
entities=entities,
)
if edited is not None:
self.last_rendered = rendered
self._published_seq = seq_at_render
async def on_event(self, evt: dict[str, Any]) -> None:
if not self.renderer.note_event(evt):
return
if self.progress_id is None:
return
self._event_seq += 1
self.wakeup.set()
class CodexExecRunner:
def __init__(
self,
codex_cmd: str,
extra_args: list[str],
) -> None:
self.codex_cmd = codex_cmd
self.extra_args = extra_args
# Per-session locks to prevent concurrent resumes to the same session_id.
self._session_locks: WeakValueDictionary[str, anyio.Lock] = (
WeakValueDictionary()
)
async def _lock_for(self, session_id: str) -> anyio.Lock:
lock = self._session_locks.get(session_id)
if lock is None:
lock = anyio.Lock()
self._session_locks[session_id] = lock
return lock
async def run(
self,
prompt: str,
session_id: str | None,
on_event: EventCallback | None = None,
) -> tuple[str, str, bool]:
logger.info("[codex] start run session_id=%r", session_id)
logger.debug("[codex] prompt: %s", prompt)
args = [self.codex_cmd]
args.extend(self.extra_args)
args.extend(["exec", "--json"])
# Always pipe prompt via stdin ("-") to avoid quoting issues.
if session_id:
args.extend(["resume", session_id, "-"])
else:
args.append("-")
cancelled_exc_type = anyio.get_cancelled_exc_class()
cancelled_exc: BaseException | None = None
async with manage_subprocess(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
raise RuntimeError("codex exec failed to open subprocess pipes")
proc_stdin = proc.stdin
proc_stdout = proc.stdout
proc_stderr = proc.stderr
logger.debug("[codex] spawn pid=%s args=%r", proc.pid, args)
stderr_tail: deque[str] = deque(maxlen=200)
rc: int | None = None
found_session: str | None = session_id
last_agent_text: str | None = None
saw_agent_message = False
cli_last_item: int | None = None
cancelled = False
async with anyio.create_task_group() as tg:
tg.start_soon(_drain_stderr, proc_stderr, stderr_tail)
try:
await proc_stdin.send(prompt.encode())
await proc_stdin.aclose()
async for raw_line in _iter_text_lines(proc_stdout):
raw = raw_line.rstrip("\n")
logger.debug("[codex][jsonl] %s", raw)
line = raw.strip()
if not line:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
logger.debug("[codex][jsonl] invalid line: %r", line)
continue
cli_last_item, out_lines = render_event_cli(evt, cli_last_item)
for out in out_lines:
logger.info("[codex] %s", out)
if on_event is not None:
try:
res = on_event(evt)
if inspect.isawaitable(res):
await res
except Exception as e:
logger.info("[codex][on_event] callback error: %s", e)
if evt["type"] == "thread.started":
found_session = evt.get("thread_id") or found_session
if evt["type"] == "item.completed":
item = evt.get("item") or {}
if item.get("type") == "agent_message" and isinstance(
item.get("text"), str
):
last_agent_text = item["text"]
saw_agent_message = True
except cancelled_exc_type as exc:
cancelled = True
cancelled_exc = exc
tg.cancel_scope.cancel()
finally:
if not cancelled:
rc = await proc.wait()
if cancelled:
raise cancelled_exc # type: ignore[misc]
logger.debug("[codex] process exit pid=%s rc=%s", proc.pid, rc)
if rc != 0:
tail = "".join(stderr_tail)
raise RuntimeError(f"codex exec failed (rc={rc}). stderr tail:\n{tail}")
if not found_session:
raise RuntimeError(
"codex exec finished but no session_id/thread_id was captured"
)
logger.info("[codex] done run session_id=%r", found_session)
return (
found_session,
(last_agent_text or "(No agent_message captured from JSON stream.)"),
saw_agent_message,
)
async def run_serialized(
self,
prompt: str,
session_id: str | None,
on_event: EventCallback | None = None,
) -> tuple[str, str, bool]:
if session_id:
lock = await self._lock_for(session_id)
async with lock:
return await self.run(prompt, session_id=session_id, on_event=on_event)
session_lock: anyio.Lock | None = None
async def on_event_with_lock(evt: dict[str, Any]) -> None:
nonlocal session_lock
if session_lock is None and evt.get("type") == "thread.started":
thread_id = evt.get("thread_id")
if isinstance(thread_id, str) and thread_id:
session_lock = await self._lock_for(thread_id)
await session_lock.acquire()
if on_event is None:
return
res = on_event(evt)
if inspect.isawaitable(res):
await res
try:
return await self.run(prompt, session_id=None, on_event=on_event_with_lock)
finally:
if session_lock is not None:
session_lock.release()
@dataclass(frozen=True)
class BridgeConfig:
bot: TelegramClient
runner: CodexExecRunner
chat_id: int
final_notify: bool
startup_msg: str
max_concurrency: int
@dataclass
class RunningTask:
scope: anyio.CancelScope
session_id: str | None = None
def _parse_bridge_config(
*,
final_notify: bool,
profile: str | None,
) -> BridgeConfig:
startup_pwd = os.getcwd()
config, config_path = load_telegram_config()
try:
token = config["bot_token"]
except KeyError:
raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None
if not isinstance(token, str) or not token.strip():
raise ConfigError(
f"Invalid `bot_token` in {config_path}; expected a non-empty string."
) from None
try:
chat_id_value = config["chat_id"]
except KeyError:
raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None
if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int):
raise ConfigError(
f"Invalid `chat_id` in {config_path}; expected an integer."
) from None
chat_id = chat_id_value
codex_cmd = shutil.which("codex")
if not codex_cmd:
raise ConfigError(
"codex not found on PATH. Install the Codex CLI with:\n"
" npm install -g @openai/codex\n"
" # or on macOS\n"
" brew install codex"
)
startup_msg = f"🐙 takopi is ready to help-pi!\npwd: {startup_pwd}"
extra_args = ["-c", "notify=[]"]
if profile:
extra_args.extend(["--profile", profile])
bot = TelegramClient(token)
runner = CodexExecRunner(codex_cmd=codex_cmd, extra_args=extra_args)
return BridgeConfig(
bot=bot,
runner=runner,
chat_id=chat_id,
final_notify=final_notify,
startup_msg=startup_msg,
max_concurrency=16,
)
async def _send_startup(cfg: BridgeConfig) -> None:
logger.debug("[startup] message: %s", cfg.startup_msg)
sent = await cfg.bot.send_message(chat_id=cfg.chat_id, text=cfg.startup_msg)
if sent is not None:
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
drained = 0
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=0, allowed_updates=["message"]
)
if updates is None:
logger.info("[startup] backlog drain failed")
return offset
logger.debug("[startup] backlog updates: %s", updates)
if not updates:
if drained:
logger.info("[startup] drained %s pending update(s)", drained)
return offset
offset = updates[-1]["update_id"] + 1
drained += len(updates)
async def handle_message(
cfg: BridgeConfig,
*,
chat_id: int,
user_msg_id: int,
text: str,
resume_session: str | None,
running_tasks: dict[int, RunningTask] | None = None,
clock: Callable[[], float] = time.monotonic,
sleep: Callable[[float], Awaitable[None]] = anyio.sleep,
progress_edit_every: float = PROGRESS_EDIT_EVERY_S,
) -> None:
logger.debug(
"[handle] incoming chat_id=%s message_id=%s resume=%r text=%s",
chat_id,
user_msg_id,
resume_session,
text,
)
started_at = clock()
progress_renderer = ExecProgressRenderer(max_actions=5)
progress_id: int | None = None
last_edit_at = 0.0
last_rendered: str | None = None
initial_md = progress_renderer.render_progress(0.0)
initial_rendered, initial_entities = prepare_telegram(
initial_md, limit=TELEGRAM_MARKDOWN_LIMIT
)
logger.debug(
"[progress] send reply_to=%s md=%s rendered=%s entities=%s",
user_msg_id,
initial_md,
initial_rendered,
initial_entities,
)
progress_msg = await cfg.bot.send_message(
chat_id=chat_id,
text=initial_rendered,
entities=initial_entities,
reply_to_message_id=user_msg_id,
disable_notification=True,
)
if progress_msg is not None:
progress_id = int(progress_msg["message_id"])
last_edit_at = clock()
last_rendered = initial_rendered
logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id)
edits = ProgressEdits(
bot=cfg.bot,
chat_id=chat_id,
progress_id=progress_id,
renderer=progress_renderer,
started_at=started_at,
progress_edit_every=progress_edit_every,
clock=clock,
sleep=sleep,
limit=TELEGRAM_MARKDOWN_LIMIT,
last_edit_at=last_edit_at,
last_rendered=last_rendered,
)
exec_scope = anyio.CancelScope()
cancelled = False
error: Exception | None = None
session_id: str | None = None
answer: str | None = None
saw_agent_message: bool | None = None
running_task: RunningTask | None = None
if running_tasks is not None and progress_id is not None:
running_task = RunningTask(scope=exec_scope)
running_tasks[progress_id] = running_task
if resume_session is not None:
running_task.session_id = resume_session
async def on_event(evt: dict[str, Any]) -> None:
if (
running_task is not None
and running_task.session_id is None
and evt.get("type") == "thread.started"
):
thread_id = evt.get("thread_id")
if isinstance(thread_id, str) and thread_id:
running_task.session_id = thread_id
await edits.on_event(evt)
async with anyio.create_task_group() as tg:
if progress_id is not None:
tg.start_soon(edits.run)
try:
with exec_scope:
session_id, answer, saw_agent_message = await cfg.runner.run_serialized(
text, resume_session, on_event=on_event
)
except Exception as e:
error = e
finally:
if running_task is not None:
if running_tasks is not None and progress_id is not None:
running_tasks.pop(progress_id, None)
if exec_scope.cancelled_caught and not cancelled and error is None:
cancelled = True
session_id = progress_renderer.resume_session or resume_session
if not cancelled and error is None:
await anyio.sleep(0)
tg.cancel_scope.cancel()
if error is not None:
err = _clamp_tg_text(f"Error:\n{error}")
logger.debug("[error] send reply_to=%s text=%s", user_msg_id, err)
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=err,
edit_message_id=progress_id,
reply_to_message_id=user_msg_id,
disable_notification=True,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
return
elapsed = clock() - started_at
if cancelled:
if session_id is None:
session_id = progress_renderer.resume_session or resume_session
logger.info(
"[handle] cancelled session_id=%s elapsed=%.1fs", session_id, elapsed
)
progress_renderer.resume_session = session_id
final_md = progress_renderer.render_progress(elapsed, label="`cancelled`")
await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
edit_message_id=progress_id,
reply_to_message_id=user_msg_id,
disable_notification=True,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
return
if session_id is None or answer is None or saw_agent_message is None:
raise RuntimeError("codex exec finished without a result")
status = "done" if saw_agent_message else "error"
progress_renderer.resume_session = session_id
final_md = progress_renderer.render_final(elapsed, answer, status=status)
logger.debug("[final] markdown: %s", final_md)
final_rendered, final_entities = render_markdown(final_md)
can_edit_final = (
progress_id is not None and len(final_rendered) <= TELEGRAM_MARKDOWN_LIMIT
)
edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id
if edit_message_id is None:
logger.debug(
"[final] send reply_to=%s rendered=%s entities=%s",
user_msg_id,
final_rendered,
final_entities,
)
else:
logger.debug(
"[final] edit message_id=%s rendered=%s entities=%s",
edit_message_id,
final_rendered,
final_entities,
)
final_msg, edited = await _send_or_edit_markdown(
cfg.bot,
chat_id=chat_id,
text=final_md,
edit_message_id=edit_message_id,
reply_to_message_id=user_msg_id,
disable_notification=False,
limit=TELEGRAM_MARKDOWN_LIMIT,
)
if final_msg is None:
return
if progress_id is not None and (edit_message_id is None or not edited):
logger.debug("[final] delete progress message_id=%s", progress_id)
await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
async def poll_updates(cfg: BridgeConfig):
offset: int | None = None
offset = await _drain_backlog(cfg, offset)
await _send_startup(cfg)
while True:
updates = await cfg.bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=["message"]
)
if updates is None:
logger.info("[loop] getUpdates failed")
await anyio.sleep(2)
continue
logger.debug("[loop] updates: %s", updates)
for upd in updates:
offset = upd["update_id"] + 1
msg = upd["message"]
if "text" not in msg:
continue
if not (msg["chat"]["id"] == msg["from"]["id"] == cfg.chat_id):
continue
yield msg
async def _handle_cancel(
cfg: BridgeConfig,
msg: dict[str, Any],
running_tasks: dict[int, RunningTask],
) -> None:
chat_id = msg["chat"]["id"]
user_msg_id = msg["message_id"]
reply = msg.get("reply_to_message")
if not reply:
await cfg.bot.send_message(
chat_id=chat_id,
text="reply to the progress message to cancel.",
reply_to_message_id=user_msg_id,
)
return
progress_id = reply.get("message_id")
if progress_id is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
running_task = running_tasks.get(int(progress_id))
if running_task is None:
await cfg.bot.send_message(
chat_id=chat_id,
text="nothing is currently running for that message.",
reply_to_message_id=user_msg_id,
)
return
logger.info("[cancel] cancelling progress_message_id=%s", progress_id)
running_task.scope.cancel()
async def _run_main_loop(cfg: BridgeConfig) -> None:
worker_count = max(1, min(cfg.max_concurrency, 16))
send_stream, receive_stream = anyio.create_memory_object_stream(
max_buffer_size=worker_count * 2
)
running_tasks: dict[int, RunningTask] = {}
async def worker() -> None:
while True:
chat_id, user_msg_id, text, resume_session = await receive_stream.receive()
try:
await handle_message(
cfg,
chat_id=chat_id,
user_msg_id=user_msg_id,
text=text,
resume_session=resume_session,
running_tasks=running_tasks,
)
except Exception:
logger.exception("[handle] worker failed")
try:
async with anyio.create_task_group() as tg:
for _ in range(worker_count):
tg.start_soon(worker)
async for msg in poll_updates(cfg):
text = msg["text"]
user_msg_id = msg["message_id"]
if text == "/cancel":
tg.start_soon(_handle_cancel, cfg, msg, running_tasks)
continue
r = msg.get("reply_to_message") or {}
resume_session = resolve_resume_session(text, r.get("text"))
await send_stream.send(
(msg["chat"]["id"], user_msg_id, text, resume_session)
)
finally:
await send_stream.aclose()
await receive_stream.aclose()
await cfg.bot.close()
def run(
version: bool = typer.Option(
False,
"--version",
help="Show the version and exit.",
callback=_version_callback,
is_eager=True,
),
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log codex JSONL, Telegram requests, and rendered messages.",
),
profile: str | None = typer.Option(
None,
"--profile",
help="Codex profile name to pass to `codex --profile`.",
),
) -> None:
setup_logging(debug=debug)
setup = check_setup()
if not setup.ok:
render_setup_guide(setup)
raise typer.Exit(code=1)
try:
cfg = _parse_bridge_config(
final_notify=final_notify,
profile=profile,
)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
anyio.run(_run_main_loop, cfg)
def main() -> None:
typer.run(run)
if __name__ == "__main__":
main()
-261
View File
@@ -1,261 +0,0 @@
from __future__ import annotations
import re
import textwrap
from collections import deque
from pathlib import Path
from textwrap import indent
from typing import Any
from markdown_it import MarkdownIt
from sulguk import transform_html
STATUS_RUNNING = ""
STATUS_DONE = ""
STATUS_FAIL = ""
HEADER_SEP = " · "
HARD_BREAK = " \n"
MAX_PROGRESS_CMD_LEN = 300
MAX_QUERY_LEN = 60
MAX_PATH_LEN = 40
_md = MarkdownIt("commonmark", {"html": False})
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _md.render(md or "")
rendered = transform_html(html)
text = re.sub(r"(?m)^(\s*)•", r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def format_elapsed(elapsed_s: float) -> str:
total = max(0, int(elapsed_s))
minutes, seconds = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes:02d}m"
if minutes:
return f"{minutes}m {seconds:02d}s"
return f"{seconds}s"
def format_header(elapsed_s: float, item: int | None, label: str) -> str:
elapsed = format_elapsed(elapsed_s)
parts = [label, elapsed]
if item is not None:
parts.append(f"step {item}")
return HEADER_SEP.join(parts)
def extract_numeric_id(item_id: object, fallback: int | None = None) -> int | None:
if isinstance(item_id, int):
return item_id
if isinstance(item_id, str):
match = re.search(r"(?:item_)?(\d+)", item_id)
if match:
return int(match.group(1))
return fallback
def _shorten(text: str, width: int) -> str:
return textwrap.shorten(text, width=width, placeholder="")
def _format_change_path(path: str) -> str:
workdir = Path.cwd()
path_obj = Path(path)
if path_obj.is_absolute() and path_obj.is_relative_to(workdir):
return str(path_obj.relative_to(workdir))
return path
def format_event(
event: dict[str, Any],
last_item: int | None,
*,
command_width: int | None = None,
escape_markdown: bool = False,
) -> tuple[int | None, list[str], str | None, str | None]:
lines: list[str] = []
match event["type"]:
case "thread.started":
return last_item, ["thread started"], None, None
case "turn.started":
return last_item, ["turn started"], None, None
case "turn.completed":
return last_item, ["turn completed"], None, None
case "turn.failed":
return last_item, [f"turn failed: {event['error']['message']}"], None, None
case "error":
return last_item, [f"stream error: {event['message']}"], None, None
case "item.started" | "item.updated" | "item.completed" as etype:
item = event["item"]
item_type = item.get("type") or item.get("item_type")
if item_type == "assistant_message":
item_type = "agent_message"
if item_type is None:
return last_item, [], None, None
item_num = extract_numeric_id(item.get("id"), last_item)
last_item = item_num if item_num is not None else last_item
prefix = f"{item_num}. "
if escape_markdown and item_num is not None:
# Avoid ordered-list parsing which renumbers items in MarkdownIt/CommonMark.
prefix = f"{item_num}\\." + " "
match (item_type, etype):
case ("agent_message", "item.completed"):
lines.append("assistant:")
lines.extend(indent(item["text"], " ").splitlines())
return last_item, lines, None, None
case ("reasoning", "item.completed"):
text = item.get("text") or ""
first_line = text.splitlines()[0] if text else ""
line = prefix + first_line
return last_item, [line], line, prefix
case ("command_execution", "item.started"):
command = item["command"]
if command_width is not None:
command = _shorten(command, command_width)
command = f"`{command}`"
line = prefix + f"{STATUS_RUNNING} {command}"
return last_item, [line], line, prefix
case ("command_execution", "item.completed"):
command = item["command"]
if command_width is not None:
command = _shorten(command, command_width)
command = f"`{command}`"
exit_code = item["exit_code"]
if exit_code == 0:
status = STATUS_DONE
exit_part = ""
else:
status = STATUS_FAIL if exit_code is not None else STATUS_DONE
exit_part = (
f" (exit {exit_code})" if exit_code is not None else ""
)
line = prefix + f"{status} {command}{exit_part}"
return last_item, [line], line, prefix
case ("mcp_tool_call", "item.started"):
name = (
".".join(
part for part in (item["server"], item["tool"]) if part
)
or "tool"
)
line = prefix + f"{STATUS_RUNNING} tool: {name}"
return last_item, [line], line, prefix
case ("mcp_tool_call", "item.completed"):
name = (
".".join(
part for part in (item["server"], item["tool"]) if part
)
or "tool"
)
line = prefix + f"{STATUS_DONE} tool: {name}"
return last_item, [line], line, prefix
case ("web_search", "item.completed"):
query = _shorten(item["query"], MAX_QUERY_LEN)
line = prefix + f"{STATUS_DONE} searched: {query}"
return last_item, [line], line, prefix
case ("file_change", "item.completed"):
paths = [
change["path"]
for change in item["changes"]
if change.get("path")
]
if not paths:
total = len(item["changes"])
desc = (
"updated files" if total == 0 else f"updated {total} files"
)
elif len(paths) <= 3:
desc = "updated " + ", ".join(
f"`{_format_change_path(p)}`" for p in paths
)
else:
desc = f"updated {len(paths)} files"
line = prefix + f"{STATUS_DONE} {desc}"
return last_item, [line], line, prefix
case ("error", "item.completed"):
warning = _shorten(item["message"], 120)
line = prefix + f"{STATUS_DONE} warning: {warning}"
return last_item, [line], line, prefix
case _:
return last_item, [], None, None
case _:
return last_item, [], None, None
def render_event_cli(
event: dict[str, Any], last_item: int | None = None
) -> tuple[int | None, list[str]]:
last_item, cli_lines, _, _ = format_event(
event, last_item, command_width=None, escape_markdown=False
)
return last_item, cli_lines
class ExecProgressRenderer:
def __init__(
self,
max_actions: int = 5,
command_width: int | None = MAX_PROGRESS_CMD_LEN,
) -> None:
self.max_actions = max_actions
self.command_width = command_width
self.recent_actions: deque[str] = deque(maxlen=max_actions)
self.last_item: int | None = None
self.resume_session: str | None = None
def note_event(self, event: dict[str, Any]) -> bool:
if event["type"] == "thread.started":
self.resume_session = event["thread_id"]
return True
self.last_item, _, progress_line, progress_prefix = format_event(
event,
self.last_item,
command_width=self.command_width,
escape_markdown=True,
)
if progress_line is None:
return False
# Replace the preceding "running" line for the same item on completion.
if (
event["type"] == "item.completed"
and progress_prefix
and self.recent_actions
):
last = self.recent_actions[-1]
if last.startswith(progress_prefix + f"{STATUS_RUNNING} "):
self.recent_actions.pop()
self.recent_actions.append(progress_line)
return True
def render_progress(self, elapsed_s: float, label: str = "working") -> str:
header = format_header(elapsed_s, self.last_item, label=label)
message = self._assemble(header, list(self.recent_actions))
return self._append_resume(message)
def render_final(self, elapsed_s: float, answer: str, status: str = "done") -> str:
header = format_header(elapsed_s, self.last_item, label=status)
answer = (answer or "").strip()
message = header + ("\n\n" + answer if answer else "")
return self._append_resume(message)
def _append_resume(self, message: str) -> str:
if not self.resume_session:
return message
return message + f"\n\nresume: `{self.resume_session}`"
@staticmethod
def _assemble(header: str, lines: list[str]) -> str:
return header if not lines else header + "\n\n" + HARD_BREAK.join(lines)
+83
View File
@@ -0,0 +1,83 @@
"""Markdown rendering and truncation helpers for Telegram constraints."""
from __future__ import annotations
import re
from typing import Any, Callable
from markdown_it import MarkdownIt
from sulguk import transform_html
TELEGRAM_MARKDOWN_LIMIT = 3500
_md = MarkdownIt("commonmark", {"html": False})
def render_markdown(md: str) -> tuple[str, list[dict[str, Any]]]:
html = _md.render(md or "")
rendered = transform_html(html)
text = re.sub(r"(?m)^(\s*)•", r"\1-", rendered.text)
entities = [dict(e) for e in rendered.entities]
return text, entities
def truncate_for_telegram(
text: str, limit: int, *, is_resume_line: Callable[[str], bool]
) -> str:
"""
Truncate text to fit Telegram limits while preserving the trailing resume command
line (if present), otherwise preserving the last non-empty line.
"""
if len(text) <= limit:
return text
lines = text.splitlines()
tail_lines: list[str] | None = None
is_resume_tail = False
for i in range(len(lines) - 1, -1, -1):
line = lines[i]
if is_resume_line(line):
tail_lines = lines[i:]
is_resume_tail = True
break
if tail_lines is None:
for i in range(len(lines) - 1, -1, -1):
if lines[i].strip():
tail_lines = [lines[i]]
break
tail = "\n".join(tail_lines or []).strip("\n")
sep = "\n\n"
max_tail = limit if is_resume_tail else (limit // 4)
tail = tail[-max_tail:] if max_tail > 0 else ""
head_budget = limit - len(sep) - len(tail)
if head_budget <= 0:
return tail[-limit:] if tail else text[:limit]
head = text[:head_budget].rstrip()
return (head + sep + tail)[:limit]
def prepare_telegram(
md: str,
*,
limit: int,
is_resume_line: Callable[[str], bool] | None = None,
) -> tuple[str, list[dict[str, Any]] | None]:
rendered, entities = render_markdown(md)
if len(rendered) > limit:
if is_resume_line is None:
def _never_resume_line(_line: str) -> bool:
return False
is_resume_line = _never_resume_line
rendered = truncate_for_telegram(rendered, limit, is_resume_line=is_resume_line)
return rendered, None
return rendered, entities
+76
View File
@@ -0,0 +1,76 @@
"""Takopi domain model types (events, actions, resume tokens)."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Literal, TypeAlias
EngineId: TypeAlias = str
ActionKind: TypeAlias = Literal[
"command",
"tool",
"file_change",
"web_search",
"note",
"turn",
"warning",
"telemetry",
]
TakopiEventType: TypeAlias = Literal[
"started",
"action",
"completed",
]
ActionPhase: TypeAlias = Literal["started", "updated", "completed"]
ActionLevel: TypeAlias = Literal["debug", "info", "warning", "error"]
@dataclass(frozen=True, slots=True)
class ResumeToken:
engine: EngineId
value: str
@dataclass(frozen=True, slots=True)
class Action:
id: str
kind: ActionKind
title: str
detail: dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True, slots=True)
class StartedEvent:
type: Literal["started"] = field(default="started", init=False)
engine: EngineId
resume: ResumeToken
title: str | None = None
meta: dict[str, Any] | None = None
@dataclass(frozen=True, slots=True)
class ActionEvent:
type: Literal["action"] = field(default="action", init=False)
engine: EngineId
action: Action
phase: ActionPhase
ok: bool | None = None
message: str | None = None
level: ActionLevel | None = None
@dataclass(frozen=True, slots=True)
class CompletedEvent:
type: Literal["completed"] = field(default="completed", init=False)
engine: EngineId
ok: bool
answer: str
resume: ResumeToken | None = None
error: str | None = None
usage: dict[str, Any] | None = None
TakopiEvent: TypeAlias = StartedEvent | ActionEvent | CompletedEvent
+37 -38
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import shutil
from dataclasses import dataclass
from pathlib import Path
@@ -8,32 +7,52 @@ from rich.console import Console
from rich.panel import Panel
from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config
from .engines import EngineBackend, SetupIssue
_OCTOPUS = "\N{OCTOPUS}"
@dataclass(slots=True)
class SetupResult:
missing_codex: bool = False
missing_or_invalid_config: bool = False
issues: list[SetupIssue]
config_path: Path = HOME_CONFIG_PATH
@property
def ok(self) -> bool:
return not (self.missing_codex or self.missing_or_invalid_config)
return not self.issues
def check_setup() -> SetupResult:
missing_codex = shutil.which("codex") is None
def _config_issue(path: Path) -> SetupIssue:
config_display = _config_path_display(path)
return SetupIssue(
"Create a config",
(
f" [dim]{config_display}[/]",
"",
' [cyan]bot_token[/] = [green]"123456789:ABCdef..."[/]',
" [cyan]chat_id[/] = [green]123456789[/]",
"",
"[dim]" + ("-" * 56) + "[/]",
"",
"[bold]Getting your Telegram credentials:[/]",
"",
" [cyan]bot_token[/] create a bot with [link=https://t.me/BotFather]@BotFather[/]",
" [cyan]chat_id[/] message [link=https://t.me/myidbot]@myidbot[/] to get your id",
),
)
def check_setup(backend: EngineBackend) -> SetupResult:
issues: list[SetupIssue] = []
config_path = HOME_CONFIG_PATH
config: dict = {}
try:
config, config_path = load_telegram_config()
except ConfigError:
return SetupResult(
missing_codex=missing_codex,
missing_or_invalid_config=True,
config_path=HOME_CONFIG_PATH,
)
issues.extend(backend.check_setup({}, config_path))
issues.append(_config_issue(config_path))
return SetupResult(issues=issues, config_path=config_path)
token = config.get("bot_token")
chat_id = config.get("chat_id")
@@ -41,11 +60,11 @@ def check_setup() -> SetupResult:
missing_or_invalid_config = not (isinstance(token, str) and token.strip())
missing_or_invalid_config |= type(chat_id) is not int
return SetupResult(
missing_codex=missing_codex,
missing_or_invalid_config=missing_or_invalid_config,
config_path=config_path,
)
issues.extend(backend.check_setup(config, config_path))
if missing_or_invalid_config:
issues.append(_config_issue(config_path))
return SetupResult(issues=issues, config_path=config_path)
def _config_path_display(path: Path) -> str:
@@ -72,28 +91,8 @@ def render_setup_guide(result: SetupResult) -> None:
parts.extend(lines)
parts.append("")
if result.missing_codex:
add_step(
"Install the Codex CLI",
" [dim]$[/] npm install -g @openai/codex",
)
if result.missing_or_invalid_config:
config_display = _config_path_display(result.config_path)
add_step(
"Create a config",
f" [dim]{config_display}[/]",
"",
' [cyan]bot_token[/] = [green]"123456789:ABCdef..."[/]',
" [cyan]chat_id[/] = [green]123456789[/]",
"",
"[dim]" + ("-" * 56) + "[/]",
"",
"[bold]Getting your Telegram credentials:[/]",
"",
" [cyan]bot_token[/] create a bot with [link=https://t.me/BotFather]@BotFather[/]",
" [cyan]chat_id[/] message [link=https://t.me/myidbot]@myidbot[/] to get your id",
)
for issue in result.issues:
add_step(issue.title, *issue.lines)
panel = Panel(
"\n".join(parts).rstrip(),
+259
View File
@@ -0,0 +1,259 @@
"""Pure renderers for Takopi events (no engine-native event handling)."""
from __future__ import annotations
import textwrap
from collections import deque
from typing import Callable
from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent
STATUS_RUNNING = ""
STATUS_UPDATE = ""
STATUS_DONE = ""
STATUS_FAIL = ""
HEADER_SEP = " · "
HARD_BREAK = " \n"
MAX_PROGRESS_CMD_LEN = 300
MAX_FILE_CHANGES_INLINE = 3
FILE_CHANGE_PREFIX = {"add": "+", "delete": "-", "update": "~"}
def format_elapsed(elapsed_s: float) -> str:
total = max(0, int(elapsed_s))
minutes, seconds = divmod(total, 60)
hours, minutes = divmod(minutes, 60)
if hours:
return f"{hours}h {minutes:02d}m"
if minutes:
return f"{minutes}m {seconds:02d}s"
return f"{seconds}s"
def format_header(elapsed_s: float, item: int | None, label: str) -> str:
elapsed = format_elapsed(elapsed_s)
parts = [label, elapsed]
if item is not None:
parts.append(f"step {item}")
return HEADER_SEP.join(parts)
def shorten(text: str, width: int | None) -> str:
if width is None:
return text
return textwrap.shorten(text, width=width, placeholder="")
def action_status_symbol(
action: Action, *, completed: bool, ok: bool | None = None
) -> str:
if not completed:
return STATUS_RUNNING
if ok is not None:
return STATUS_DONE if ok else STATUS_FAIL
detail = action.detail or {}
exit_code = detail.get("exit_code")
if isinstance(exit_code, int) and exit_code != 0:
return STATUS_FAIL
return STATUS_DONE
def action_exit_suffix(action: Action) -> str:
detail = action.detail or {}
exit_code = detail.get("exit_code")
if isinstance(exit_code, int) and exit_code != 0:
return f" (exit {exit_code})"
return ""
def format_file_change_title(action: Action, *, command_width: int | None) -> str:
title = str(action.title or "")
detail = action.detail or {}
changes = detail.get("changes")
if isinstance(changes, list) and changes:
rendered: list[str] = []
for raw in changes:
if not isinstance(raw, dict):
continue
path = raw.get("path")
if not isinstance(path, str) or not path:
continue
kind = raw.get("kind")
prefix = FILE_CHANGE_PREFIX.get(kind, "~") if isinstance(kind, str) else "~"
rendered.append(f"{prefix}{path}")
if rendered:
if len(rendered) > MAX_FILE_CHANGES_INLINE:
remaining = len(rendered) - MAX_FILE_CHANGES_INLINE
rendered = rendered[:MAX_FILE_CHANGES_INLINE] + [f"…(+{remaining})"]
inline = shorten(", ".join(rendered), command_width)
return f"files: {inline}"
return f"files: {shorten(title, command_width)}"
def format_action_title(action: Action, *, command_width: int | None) -> str:
title = str(action.title or "")
kind = action.kind
if kind == "command":
title = shorten(title, command_width)
return f"`{title}`"
if kind == "tool":
title = shorten(title, command_width)
return f"tool: {title}"
if kind == "web_search":
title = shorten(title, command_width)
return f"searched: {title}"
if kind == "file_change":
return format_file_change_title(action, command_width=command_width)
if kind in {"note", "warning"}:
return shorten(title, command_width)
return shorten(title, command_width)
def phase_status_and_suffix(event: ActionEvent) -> tuple[str, str]:
action = event.action
match event.phase:
case "completed":
status = action_status_symbol(action, completed=True, ok=event.ok)
suffix = action_exit_suffix(action)
return status, suffix
case "updated":
return STATUS_UPDATE, ""
case _:
return STATUS_RUNNING, ""
def render_event_cli(event: TakopiEvent) -> list[str]:
match event:
case StartedEvent(engine=engine):
return [str(engine)]
case ActionEvent() as action_event:
action = action_event.action
if action.kind == "turn":
return []
status, suffix = phase_status_and_suffix(action_event)
title = format_action_title(action, command_width=MAX_PROGRESS_CMD_LEN)
return [f"{status} {title}{suffix}"]
case _:
return []
class ExecProgressRenderer:
def __init__(
self,
max_actions: int = 5,
command_width: int | None = MAX_PROGRESS_CMD_LEN,
resume_formatter: Callable[[ResumeToken], str] | None = None,
show_title: bool = False,
) -> None:
self.max_actions = max_actions
self.command_width = command_width
self.recent_actions: deque[str] = deque(maxlen=max_actions)
self._recent_action_ids: deque[str] = deque(maxlen=max_actions)
self._recent_action_completed: deque[bool] = deque(maxlen=max_actions)
self.action_count = 0
self._started_counts: dict[str, int] = {}
self.resume_token: ResumeToken | None = None
self.session_title: str | None = None
self._resume_formatter = resume_formatter
self.show_title = show_title
def note_event(self, event: TakopiEvent) -> bool:
match event:
case StartedEvent(resume=resume, title=title):
self.resume_token = resume
self.session_title = title
return True
case ActionEvent(action=action, phase=phase, ok=ok):
if action.kind == "turn":
return False
action_id = str(action.id or "")
if not action_id:
return False
completed = phase == "completed"
if completed:
is_update = False
else:
started_count = self._started_counts.get(action_id, 0)
is_update = phase == "updated" or started_count > 0
if started_count == 0:
self.action_count += 1
self._started_counts[action_id] = 1
elif phase == "started":
self._started_counts[action_id] = started_count + 1
else:
self._started_counts[action_id] = started_count
case _:
return False
if completed:
count = self._started_counts.get(action_id, 0)
if count <= 0:
self.action_count += 1
elif count == 1:
self._started_counts.pop(action_id, None)
else:
self._started_counts[action_id] = count - 1
status = (
STATUS_UPDATE
if (is_update and not completed)
else action_status_symbol(action, completed=completed, ok=ok)
)
title = format_action_title(action, command_width=self.command_width)
suffix = action_exit_suffix(action) if completed else ""
line = f"{status} {title}{suffix}"
self._append_action(action_id, completed=completed, line=line)
return True
def _append_action(self, action_id: str, *, completed: bool, line: str) -> None:
for i in range(len(self._recent_action_ids) - 1, -1, -1):
if (
self._recent_action_ids[i] == action_id
and not self._recent_action_completed[i]
):
self.recent_actions[i] = line
if completed:
self._recent_action_completed[i] = True
return
if len(self.recent_actions) >= self.max_actions:
self.recent_actions.popleft()
self._recent_action_ids.popleft()
self._recent_action_completed.popleft()
self.recent_actions.append(line)
self._recent_action_ids.append(action_id)
self._recent_action_completed.append(completed)
def render_progress(self, elapsed_s: float, label: str = "working") -> str:
step = self.action_count or None
header = format_header(elapsed_s, step, label=self._label_with_title(label))
message = self._assemble(header, list(self.recent_actions))
return self._append_resume(message)
def render_final(self, elapsed_s: float, answer: str, status: str = "done") -> str:
step = self.action_count or None
header = format_header(elapsed_s, step, label=self._label_with_title(status))
answer = (answer or "").strip()
message = header + ("\n\n" + answer if answer else "")
return self._append_resume(message)
def _label_with_title(self, label: str) -> str:
if self.show_title and self.session_title:
return f"{label} ({self.session_title})"
return label
def _append_resume(self, message: str) -> str:
if not self.resume_token or self._resume_formatter is None:
return message
return message + "\n\n" + self._resume_formatter(self.resume_token)
@staticmethod
def _assemble(header: str, lines: list[str]) -> str:
return header if not lines else header + "\n\n" + HARD_BREAK.join(lines)
+55
View File
@@ -0,0 +1,55 @@
"""Runner protocol and shared runner definitions."""
from __future__ import annotations
import re
from collections.abc import AsyncIterator
from typing import Protocol
from .model import EngineId, ResumeToken, TakopiEvent
def compile_resume_pattern(engine: EngineId) -> re.Pattern[str]:
name = re.escape(str(engine))
return re.compile(rf"(?im)^\s*`?{name}\s+resume\s+(?P<token>[^`\s]+)`?\s*$")
class ResumeRunnerMixin:
engine: EngineId
resume_re: re.Pattern[str]
def format_resume(self, token: ResumeToken) -> str:
if token.engine != self.engine:
raise RuntimeError(f"resume token is for engine {token.engine!r}")
return f"`{self.engine} resume {token.value}`"
def is_resume_line(self, line: str) -> bool:
return bool(self.resume_re.match(line))
def extract_resume(self, text: str | None) -> ResumeToken | None:
if not text:
return None
found: str | None = None
for match in self.resume_re.finditer(text):
token = match.group("token")
if token:
found = token
if not found:
return None
return ResumeToken(engine=self.engine, value=found)
class Runner(Protocol):
engine: str
def is_resume_line(self, line: str) -> bool: ...
def format_resume(self, token: ResumeToken) -> str: ...
def extract_resume(self, text: str | None) -> ResumeToken | None: ...
def run(
self,
prompt: str,
resume: ResumeToken | None,
) -> AsyncIterator[TakopiEvent]: ...
+1
View File
@@ -0,0 +1 @@
"""Runner implementations."""
+758
View File
@@ -0,0 +1,758 @@
from __future__ import annotations
import json
import logging
import os
import signal
import subprocess
from collections import deque
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any, cast
from weakref import WeakValueDictionary
import anyio
from anyio.abc import ByteReceiveStream, Process
from anyio.streams.text import TextReceiveStream
from ..model import (
Action,
ActionEvent,
ActionKind,
ActionLevel,
ActionPhase,
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from ..runner import ResumeRunnerMixin, Runner, compile_resume_pattern
logger = logging.getLogger(__name__)
ENGINE: EngineId = EngineId("codex")
STDERR_TAIL_LINES = 200
_ACTION_KIND_MAP: dict[str, ActionKind] = {
"command_execution": "command",
"mcp_tool_call": "tool",
"tool_call": "tool",
"web_search": "web_search",
"file_change": "file_change",
"reasoning": "note",
"todo_list": "note",
}
_RESUME_RE = compile_resume_pattern(ENGINE)
def _started_event(token: ResumeToken, *, title: str) -> StartedEvent:
return StartedEvent(engine=token.engine, resume=token, title=title)
def _completed_event(
*,
resume: ResumeToken | None,
ok: bool,
answer: str,
error: str | None = None,
usage: dict[str, Any] | None = None,
) -> TakopiEvent:
return CompletedEvent(
engine=ENGINE,
ok=ok,
answer=answer,
resume=resume,
error=error,
usage=usage,
)
def _action_event(
*,
phase: ActionPhase,
action_id: str,
kind: ActionKind,
title: str,
detail: dict[str, Any] | None = None,
ok: bool | None = None,
message: str | None = None,
level: ActionLevel | None = None,
) -> TakopiEvent:
action = Action(
id=action_id,
kind=kind,
title=title,
detail=detail or {},
)
return ActionEvent(
engine=ENGINE,
action=action,
phase=phase,
ok=ok,
message=message,
level=level,
)
def _note_completed(
action_id: str,
message: str,
*,
ok: bool = False,
detail: dict[str, Any] | None = None,
) -> TakopiEvent:
return _action_event(
phase="completed",
action_id=action_id,
kind="warning",
title=message,
detail=detail,
ok=ok,
message=message,
level="warning" if not ok else "info",
)
def _short_tool_name(item: dict[str, Any]) -> str:
name = ".".join(part for part in (item.get("server"), item.get("tool")) if part)
return name or "tool"
def _summarize_tool_result(result: Any) -> dict[str, Any] | None:
if not isinstance(result, dict):
return None
summary: dict[str, Any] = {}
content = result.get("content")
if isinstance(content, list):
summary["content_blocks"] = len(content)
elif content is not None:
summary["content_blocks"] = 1
if "structured" in result:
summary["has_structured"] = bool(result.get("structured"))
return summary or None
def _format_change_summary(item: dict[str, Any]) -> str:
changes = item.get("changes") or []
paths = [c.get("path") for c in changes if c.get("path")]
if not paths:
total = len(changes)
if total <= 0:
return "files"
return f"{total} files"
return ", ".join(str(path) for path in paths)
@dataclass(frozen=True, slots=True)
class _TodoSummary:
done: int
total: int
next_text: str | None
def _summarize_todo_list(items: Any) -> _TodoSummary:
if not isinstance(items, list):
return _TodoSummary(done=0, total=0, next_text=None)
done = 0
total = 0
next_text: str | None = None
for raw_item in items:
if not isinstance(raw_item, dict):
continue
total += 1
completed = raw_item.get("completed") is True
if completed:
done += 1
continue
if next_text is None:
text = raw_item.get("text")
next_text = str(text) if text is not None else None
return _TodoSummary(done=done, total=total, next_text=next_text)
def _todo_title(summary: _TodoSummary) -> str:
if summary.total <= 0:
return "todo"
if summary.next_text:
return f"todo {summary.done}/{summary.total}: {summary.next_text}"
return f"todo {summary.done}/{summary.total}: done"
def _translate_item_event(etype: str, item: dict[str, Any]) -> list[TakopiEvent]:
item_type = item.get("type") or item.get("item_type")
if item_type == "assistant_message":
item_type = "agent_message"
if not item_type:
return []
if item_type == "agent_message":
return []
action_id = item.get("id")
if not isinstance(action_id, str) or not action_id:
logger.debug("[codex] missing item id in codex event: %r", item)
return []
phase = cast(ActionPhase, etype.split(".")[-1])
if item_type == "error":
if phase != "completed":
return []
message = str(item.get("message") or "codex item error")
return [
_action_event(
phase="completed",
action_id=action_id,
kind="warning",
title=message,
detail={"message": message},
ok=False,
message=message,
level="warning",
)
]
kind = _ACTION_KIND_MAP.get(item_type)
if kind is None:
return []
if kind == "command":
title = str(item.get("command") or "")
if phase in {"started", "updated"}:
return [
_action_event(
phase=phase,
action_id=action_id,
kind=kind,
title=title,
)
]
if phase == "completed":
exit_code = item.get("exit_code")
ok = item.get("status") != "failed"
if isinstance(exit_code, int):
ok = ok and exit_code == 0
detail = {
"exit_code": exit_code,
"status": item.get("status"),
}
return [
_action_event(
phase="completed",
action_id=action_id,
kind=kind,
title=title,
detail=detail,
ok=ok,
)
]
if kind == "tool":
tool_name = _short_tool_name(item)
title = tool_name
detail = {
"server": item.get("server"),
"tool": item.get("tool"),
"status": item.get("status"),
}
if "arguments" in item:
detail["arguments"] = item.get("arguments")
if item_type == "tool_call":
name = item.get("name")
tool_name = str(name) if name else "tool"
title = tool_name
detail = {"name": name, "status": item.get("status")}
if "arguments" in item:
detail["arguments"] = item.get("arguments")
if phase in {"started", "updated"}:
return [
_action_event(
phase=phase,
action_id=action_id,
kind=kind,
title=title,
detail=detail,
)
]
if phase == "completed":
ok = item.get("status") != "failed" and not item.get("error")
error = item.get("error")
if error:
detail["error_message"] = str(
error.get("message") if isinstance(error, dict) else error
)
result_summary = _summarize_tool_result(item.get("result"))
if result_summary is not None:
detail["result_summary"] = result_summary
return [
_action_event(
phase="completed",
action_id=action_id,
kind=kind,
title=title,
detail=detail,
ok=ok,
)
]
if kind == "web_search":
title = str(item.get("query") or "")
detail = {"query": item.get("query")}
if phase in {"started", "updated"}:
return [
_action_event(
phase=phase,
action_id=action_id,
kind=kind,
title=title,
detail=detail,
)
]
if phase == "completed":
return [
_action_event(
phase="completed",
action_id=action_id,
kind=kind,
title=title,
detail=detail,
ok=True,
)
]
if kind == "file_change":
if phase != "completed":
return []
title = _format_change_summary(item)
detail = {
"changes": item.get("changes") or [],
"status": item.get("status"),
"error": item.get("error"),
}
ok = item.get("status") != "failed"
return [
_action_event(
phase="completed",
action_id=action_id,
kind=kind,
title=title,
detail=detail,
ok=ok,
)
]
if kind == "note":
if item_type == "todo_list":
summary = _summarize_todo_list(item.get("items"))
title = _todo_title(summary)
detail = {"done": summary.done, "total": summary.total}
else:
title = str(item.get("text") or "")
detail = None
if phase in {"started", "updated"}:
return [
_action_event(
phase=phase,
action_id=action_id,
kind=kind,
title=title,
detail=detail,
)
]
if phase == "completed":
return [
_action_event(
phase="completed",
action_id=action_id,
kind=kind,
title=title,
detail=detail,
ok=True,
)
]
return []
def translate_codex_event(event: dict[str, Any], *, title: str) -> list[TakopiEvent]:
etype = event.get("type")
if etype == "thread.started":
thread_id = event.get("thread_id")
if thread_id:
token = ResumeToken(engine=ENGINE, value=str(thread_id))
return [_started_event(token, title=title)]
logger.debug("[codex] codex thread.started missing thread_id: %r", event)
return []
if etype in {"item.started", "item.updated", "item.completed"}:
item = event.get("item") or {}
return _translate_item_event(etype, item)
return []
async def _iter_text_lines(stream: ByteReceiveStream):
text_stream = TextReceiveStream(stream, errors="replace")
buffer = ""
while True:
try:
chunk = await text_stream.receive()
except anyio.EndOfStream:
if buffer:
yield buffer
return
buffer += chunk
while True:
split_at = buffer.find("\n")
if split_at < 0:
break
line = buffer[: split_at + 1]
buffer = buffer[split_at + 1 :]
yield line
async def _drain_stderr(stderr: ByteReceiveStream, chunks: deque[str]) -> None:
try:
async for line in _iter_text_lines(stderr):
logger.debug("[codex][stderr] %s", line.rstrip())
chunks.append(line)
except Exception as e:
logger.debug("[codex][stderr] drain error: %s", e)
async def _wait_for_process(proc: Process, timeout: float) -> bool:
with anyio.move_on_after(timeout) as scope:
await proc.wait()
return scope.cancel_called
def _terminate_process(proc: Process) -> None:
if proc.returncode is not None:
return
if os.name == "posix" and proc.pid is not None:
try:
os.killpg(proc.pid, signal.SIGTERM)
return
except ProcessLookupError:
return
except Exception as e:
logger.debug("[codex] failed to terminate process group: %s", e)
try:
proc.terminate()
except ProcessLookupError:
return
def _kill_process(proc: Process) -> None:
if proc.returncode is not None:
return
if os.name == "posix" and proc.pid is not None:
try:
os.killpg(proc.pid, signal.SIGKILL)
return
except ProcessLookupError:
return
except Exception as e:
logger.debug("[codex] failed to kill process group: %s", e)
try:
proc.kill()
except ProcessLookupError:
return
@asynccontextmanager
async def manage_subprocess(*args, **kwargs):
"""Ensure subprocesses receive SIGTERM, then SIGKILL after a 2s timeout."""
if os.name == "posix":
kwargs.setdefault("start_new_session", True)
proc = await anyio.open_process(args, **kwargs)
try:
yield proc
finally:
if proc.returncode is None:
with anyio.CancelScope(shield=True):
_terminate_process(proc)
timed_out = await _wait_for_process(proc, timeout=2.0)
if timed_out:
_kill_process(proc)
await proc.wait()
class CodexRunner(ResumeRunnerMixin, Runner):
engine: EngineId = ENGINE
resume_re = _RESUME_RE
def __init__(
self,
*,
codex_cmd: str,
extra_args: list[str],
title: str = "Codex",
) -> None:
self.codex_cmd = codex_cmd
self.extra_args = extra_args
self.session_title = title
self._session_locks: WeakValueDictionary[str, anyio.Lock] = (
WeakValueDictionary()
)
def _lock_for(self, token: ResumeToken) -> anyio.Lock:
key = f"{token.engine}:{token.value}"
lock = self._session_locks.get(key)
if lock is None:
lock = anyio.Lock()
self._session_locks[key] = lock
return lock
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
resume_token = resume
if resume_token is not None and resume_token.engine != ENGINE:
raise RuntimeError(
f"resume token is for engine {resume_token.engine!r}, not {ENGINE!r}"
)
if resume_token is None:
async for evt in self._run(prompt, resume_token):
yield evt
return
lock = self._lock_for(resume_token)
async with lock:
async for evt in self._run(prompt, resume_token):
yield evt
async def _run( # noqa: C901
self,
prompt: str,
resume_token: ResumeToken | None,
) -> AsyncIterator[TakopiEvent]:
logger.info(
"[codex] start run resume=%r", resume_token.value if resume_token else None
)
logger.debug("[codex] prompt: %s", prompt)
args = [self.codex_cmd]
args.extend(self.extra_args)
args.extend(["exec", "--json"])
if resume_token:
args.extend(["resume", resume_token.value, "-"])
else:
args.append("-")
session_lock: anyio.Lock | None = None
session_lock_acquired = False
try:
async with manage_subprocess(
*args,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
) as proc:
if proc.stdin is None or proc.stdout is None or proc.stderr is None:
raise RuntimeError("codex exec failed to open subprocess pipes")
proc_stdin = proc.stdin
proc_stdout = proc.stdout
proc_stderr = proc.stderr
logger.debug("[codex] spawn pid=%s args=%r", proc.pid, args)
stderr_chunks: deque[str] = deque(maxlen=STDERR_TAIL_LINES)
rc: int | None = None
expected_session: ResumeToken | None = resume_token
found_session: ResumeToken | None = None
final_answer: str | None = None
note_seq = 0
did_emit_completed = False
turn_index = 0
def next_note_id() -> str:
nonlocal note_seq
note_seq += 1
return f"codex.note.{note_seq}"
async with anyio.create_task_group() as tg:
tg.start_soon(_drain_stderr, proc_stderr, stderr_chunks)
await proc_stdin.send(prompt.encode())
await proc_stdin.aclose()
async for raw_line in _iter_text_lines(proc_stdout):
raw = raw_line.rstrip("\n")
logger.debug("[codex][jsonl] %s", raw)
line = raw.strip()
if not line:
continue
if did_emit_completed:
continue
try:
evt = json.loads(line)
except json.JSONDecodeError:
logger.debug("[codex] invalid json line: %s", line)
note = _note_completed(
next_note_id(),
"invalid JSON from codex; ignoring line",
ok=False,
detail={"line": line},
)
yield note
continue
etype = evt.get("type")
if etype == "error":
message = str(evt.get("message") or "codex error")
fatal_flag = evt.get("fatal")
fatal = fatal_flag is True or fatal_flag is None
if fatal:
resume_for_completed = found_session or resume_token
yield _completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
error=message,
)
did_emit_completed = True
continue
note = _note_completed(
next_note_id(),
message,
ok=False,
detail={
"code": evt.get("code"),
"fatal": evt.get("fatal"),
},
)
yield note
continue
if etype == "turn.failed":
error = evt.get("error") or {}
message = str(error.get("message") or "codex turn failed")
resume_for_completed = found_session or resume_token
yield _completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
error=message,
)
did_emit_completed = True
continue
if etype == "turn.rate_limited":
retry_ms = evt.get("retry_after_ms")
message = "rate limited"
if isinstance(retry_ms, int):
message = f"rate limited (retry after {retry_ms}ms)"
note = _note_completed(next_note_id(), message, ok=False)
yield note
continue
if etype == "turn.started":
action_id = f"turn_{turn_index}"
turn_index += 1
yield _action_event(
phase="started",
action_id=action_id,
kind="turn",
title="turn started",
)
continue
if etype == "turn.completed":
resume_for_completed = found_session or resume_token
yield _completed_event(
resume=resume_for_completed,
ok=True,
answer=final_answer or "",
usage=evt.get("usage"),
)
did_emit_completed = True
continue
if evt.get("type") == "item.completed":
item = evt.get("item") or {}
item_type = item.get("type") or item.get("item_type")
if item_type == "assistant_message":
item_type = "agent_message"
if item_type == "agent_message" and isinstance(
item.get("text"), str
):
if final_answer is None:
final_answer = item["text"]
else:
logger.debug(
"[codex] emitted multiple agent messages; using the last one"
)
final_answer = item["text"]
for out_evt in translate_codex_event(
evt, title=self.session_title
):
if isinstance(out_evt, StartedEvent):
session = out_evt.resume
if found_session is None:
if session.engine != ENGINE:
raise RuntimeError(
f"codex emitted session token for engine {session.engine!r}"
)
if (
expected_session is not None
and session != expected_session
):
message = "codex emitted a different session id than expected"
raise RuntimeError(message)
if expected_session is None:
session_lock = self._lock_for(session)
await session_lock.acquire()
session_lock_acquired = True
found_session = session
yield out_evt
continue
yield out_evt
rc = await proc.wait()
logger.debug("[codex] process exit pid=%s rc=%s", proc.pid, rc)
if did_emit_completed:
return
if rc != 0:
stderr_text = "".join(stderr_chunks)
message = f"codex exec failed (rc={rc})."
yield _note_completed(
next_note_id(),
message,
ok=False,
detail={"stderr_tail": stderr_text},
)
resume_for_completed = found_session or resume_token
yield _completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
error=message,
)
return
if not found_session:
message = (
"codex exec finished but no session_id/thread_id was captured"
)
resume_for_completed = resume_token
yield _completed_event(
resume=resume_for_completed,
ok=False,
answer=final_answer or "",
error=message,
)
return
logger.info("[codex] done run session=%s", found_session.value)
yield _completed_event(
resume=found_session,
ok=True,
answer=final_answer or "",
)
finally:
if session_lock is not None and session_lock_acquired:
session_lock.release()
+232
View File
@@ -0,0 +1,232 @@
from __future__ import annotations
import uuid
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
from dataclasses import dataclass, replace
from typing import TypeAlias
from weakref import WeakValueDictionary
import anyio
from ..model import (
ActionEvent,
CompletedEvent,
EngineId,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from ..runner import ResumeRunnerMixin, Runner, compile_resume_pattern
ENGINE: EngineId = EngineId("mock")
@dataclass(frozen=True, slots=True)
class Emit:
event: TakopiEvent
at: float | None = None
@dataclass(frozen=True, slots=True)
class Advance:
now: float
@dataclass(frozen=True, slots=True)
class Sleep:
seconds: float
@dataclass(frozen=True, slots=True)
class Wait:
event: anyio.Event
@dataclass(frozen=True, slots=True)
class Return:
answer: str
@dataclass(frozen=True, slots=True)
class Raise:
error: Exception
ScriptStep: TypeAlias = Emit | Advance | Sleep | Wait | Return | Raise
def _resume_token(engine: EngineId, value: str | None) -> ResumeToken:
return ResumeToken(engine=engine, value=value or uuid.uuid4().hex)
class MockRunner(ResumeRunnerMixin, Runner):
engine: EngineId
def __init__(
self,
*,
events: Iterable[TakopiEvent] | None = None,
answer: str = "",
engine: EngineId = ENGINE,
resume_value: str | None = None,
title: str | None = None,
) -> None:
self.engine = engine
self._events = list(events or [])
self._answer = answer
self._resume_value = resume_value
self.title = title or str(engine).title()
self._session_locks: WeakValueDictionary[str, anyio.Lock] = (
WeakValueDictionary()
)
self.resume_re = compile_resume_pattern(engine)
def _lock_for(self, token: ResumeToken) -> anyio.Lock:
key = f"{token.engine}:{token.value}"
lock = self._session_locks.get(key)
if lock is None:
lock = anyio.Lock()
self._session_locks[key] = lock
return lock
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
_ = prompt
token_value = None
if resume is not None:
if resume.engine != self.engine:
raise RuntimeError(
f"resume token is for engine {resume.engine!r}, not {self.engine!r}"
)
token_value = resume.value
if token_value is None:
token_value = self._resume_value
token = _resume_token(self.engine, token_value)
session_evt = StartedEvent(
engine=self.engine,
resume=token,
title=self.title,
)
lock = self._lock_for(token)
async with lock:
yield session_evt
for event in self._events:
event_out: TakopiEvent = event
if (
isinstance(event_out, ActionEvent)
and event_out.phase == "completed"
):
if event_out.ok is None:
event_out = replace(event_out, ok=True)
yield event_out
await anyio.sleep(0)
yield CompletedEvent(
engine=self.engine,
resume=token,
ok=True,
answer=self._answer,
)
class ScriptRunner(MockRunner):
def __init__(
self,
script: Iterable[ScriptStep],
*,
engine: EngineId = ENGINE,
resume_value: str | None = None,
emit_session_start: bool = True,
sleep: Callable[[float], Awaitable[None]] = anyio.sleep,
advance: Callable[[float], None] | None = None,
default_answer: str = "",
title: str | None = None,
) -> None:
super().__init__(
events=[],
answer=default_answer,
engine=engine,
resume_value=resume_value,
title=title,
)
self.calls: list[tuple[str, ResumeToken | None]] = []
self._script = list(script)
self._emit_session_start = emit_session_start
self._sleep = sleep
self._advance = advance
def _advance_to(self, now: float) -> None:
if self._advance is None:
raise RuntimeError("ScriptRunner advance callback is not configured.")
self._advance(now)
async def run(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
self.calls.append((prompt, resume))
_ = prompt
token_value = None
if resume is not None:
if resume.engine != self.engine:
raise RuntimeError(
f"resume token is for engine {resume.engine!r}, not {self.engine!r}"
)
token_value = resume.value
if token_value is None:
token_value = self._resume_value
token = _resume_token(self.engine, token_value)
session_evt = StartedEvent(
engine=self.engine,
resume=token,
title=self.title,
)
lock = self._lock_for(token)
async with lock:
if self._emit_session_start:
yield session_evt
await anyio.sleep(0)
for step in self._script:
if isinstance(step, Emit):
if step.at is not None:
self._advance_to(step.at)
event_out: TakopiEvent = step.event
if (
isinstance(event_out, ActionEvent)
and event_out.phase == "completed"
):
if event_out.ok is None:
event_out = replace(event_out, ok=True)
yield event_out
await anyio.sleep(0)
continue
if isinstance(step, Advance):
self._advance_to(step.now)
continue
if isinstance(step, Sleep):
await self._sleep(step.seconds)
continue
if isinstance(step, Wait):
await step.event.wait()
continue
if isinstance(step, Raise):
raise step.error
if isinstance(step, Return):
yield CompletedEvent(
engine=self.engine,
resume=token,
ok=True,
answer=step.answer,
)
return
raise RuntimeError(f"Unhandled script step: {step!r}")
yield CompletedEvent(
engine=self.engine,
resume=token,
ok=True,
answer=self._answer,
)
+33 -1
View File
@@ -1,7 +1,7 @@
from __future__ import annotations
import logging
from typing import Any
from typing import Any, Protocol
import httpx
@@ -11,6 +11,38 @@ logger = logging.getLogger(__name__)
logger.addFilter(RedactTokenFilter())
class BotClient(Protocol):
async def close(self) -> None: ...
async def get_updates(
self,
offset: int | None,
timeout_s: int = 50,
allowed_updates: list[str] | None = None,
) -> list[dict] | None: ...
async def send_message(
self,
chat_id: int,
text: str,
reply_to_message_id: int | None = None,
disable_notification: bool | None = False,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> dict | None: ...
async def edit_message_text(
self,
chat_id: int,
message_id: int,
text: str,
entities: list[dict] | None = None,
parse_mode: str | None = None,
) -> dict | None: ...
async def delete_message(self, chat_id: int, message_id: int) -> bool: ...
class TelegramClient:
def __init__(
self,