From 12dfaded267e5705c426432009df48b40260269a Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Thu, 1 Jan 2026 23:14:17 +0400 Subject: [PATCH] fix: code review (#16) --- src/takopi/backends.py | 3 ++- src/takopi/bridge.py | 17 ++++------------- src/takopi/cli.py | 2 +- src/takopi/config.py | 22 +++++++++++++--------- src/takopi/engines.py | 29 +++++++++++++---------------- src/takopi/markdown.py | 3 ++- src/takopi/render.py | 2 +- src/takopi/runner.py | 8 ++++---- src/takopi/runners/claude.py | 2 +- src/takopi/runners/codex.py | 2 +- src/takopi/telegram.py | 22 +++++++++++++++++++--- src/takopi/utils/subprocess.py | 8 ++++++-- tests/test_exec_bridge.py | 8 ++++++-- tests/test_subprocess.py | 8 +++++--- 14 files changed, 78 insertions(+), 58 deletions(-) diff --git a/src/takopi/backends.py b/src/takopi/backends.py index ed6b62c..8aeb1be 100644 --- a/src/takopi/backends.py +++ b/src/takopi/backends.py @@ -1,8 +1,9 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, Any, Callable +from typing import TYPE_CHECKING, Any if TYPE_CHECKING: from .runner import Runner diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py index 689be24..2f69b0b 100644 --- a/src/takopi/bridge.py +++ b/src/takopi/bridge.py @@ -4,7 +4,6 @@ from __future__ import annotations import logging import time -import inspect from collections import deque from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass, field @@ -28,12 +27,6 @@ def _resolve_resume( return runner.extract_resume(text) or runner.extract_resume(reply_text) -def _summarize_error(error: str | None) -> str: - if not error: - return "error" - return error - - def _log_runner_event(evt: TakopiEvent) -> None: for line in render_event_cli(evt): logger.info("[runner] %s", line) @@ -41,7 +34,7 @@ def _log_runner_event(evt: TakopiEvent) -> None: if evt.ok: logger.info("[runner] done") else: - logger.info("[runner] error: %s", _summarize_error(evt.error)) + logger.info("[runner] error: %s", evt.error or "error") def _is_cancel_command(text: str) -> bool: @@ -516,7 +509,7 @@ async def handle_message( await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id) -async def poll_updates(cfg: BridgeConfig): +async def poll_updates(cfg: BridgeConfig) -> AsyncIterator[dict[str, Any]]: offset: int | None = None offset = await _drain_backlog(cfg, offset) await _send_startup(cfg) @@ -605,7 +598,7 @@ async def _wait_for_resume(running_task: RunningTask) -> ResumeToken | None: async def _send_with_resume( bot: BotClient, - enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None] | None], + enqueue: Callable[[int, int, str, ResumeToken], Awaitable[None]], running_task: RunningTask, chat_id: int, user_msg_id: int, @@ -620,9 +613,7 @@ async def _send_with_resume( disable_notification=True, ) return - result = enqueue(chat_id, user_msg_id, text, resume) - if inspect.isawaitable(result): - await result + await enqueue(chat_id, user_msg_id, text, resume) async def _run_main_loop( diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 2c8422e..3a63f53 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -1,7 +1,7 @@ from __future__ import annotations import os -from typing import Callable +from collections.abc import Callable import anyio import typer diff --git a/src/takopi/config.py b/src/takopi/config.py index 71ad471..ab4c6b1 100644 --- a/src/takopi/config.py +++ b/src/takopi/config.py @@ -1,5 +1,6 @@ from __future__ import annotations +import shutil import tomllib from pathlib import Path @@ -38,8 +39,7 @@ def _maybe_migrate_legacy(legacy_path: Path, target_path: Path) -> None: return try: target_path.parent.mkdir(parents=True, exist_ok=True) - raw = legacy_path.read_text(encoding="utf-8") - target_path.write_text(raw, encoding="utf-8") + shutil.move(legacy_path, target_path) except OSError as e: raise ConfigError( f"Failed to migrate legacy config {legacy_path} to {target_path}: {e}" @@ -64,19 +64,23 @@ def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]: cfg_path = Path(path).expanduser() return _read_config(cfg_path), cfg_path - for legacy, target in zip(_legacy_candidates(), _config_candidates(), strict=True): + config_candidates = _config_candidates() + legacy_candidates = _legacy_candidates() + for legacy, target in zip(legacy_candidates, config_candidates, strict=True): _maybe_migrate_legacy(legacy, target) - candidates = _config_candidates() - for candidate in candidates: + for candidate in config_candidates: if candidate.is_file(): return _read_config(candidate), candidate - legacy_candidates = _legacy_candidates() for candidate in legacy_candidates: if candidate.is_file(): return _read_config(candidate), candidate - if len(candidates) == 1: - raise ConfigError("Missing takopi config.") - raise ConfigError("Missing takopi config.") + checked: list[Path] = [] + for candidate in [*config_candidates, *legacy_candidates]: + if candidate in checked: + continue + checked.append(candidate) + checked_display = ", ".join(str(candidate) for candidate in checked) + raise ConfigError(f"Missing takopi config. Checked: {checked_display}") diff --git a/src/takopi/engines.py b/src/takopi/engines.py index 9a6b0c4..e7e449b 100644 --- a/src/takopi/engines.py +++ b/src/takopi/engines.py @@ -2,14 +2,15 @@ from __future__ import annotations import importlib import pkgutil +from collections.abc import Mapping +from functools import cache from pathlib import Path +from types import MappingProxyType from typing import Any from .backends import EngineBackend, EngineConfig from .config import ConfigError -_BACKENDS: dict[str, EngineBackend] | None = None - def _discover_backends() -> dict[str, EngineBackend]: import takopi.runners as runners_pkg @@ -33,34 +34,30 @@ def _discover_backends() -> dict[str, EngineBackend]: return backends -def _ensure_loaded() -> None: - global _BACKENDS - if _BACKENDS is None: - _BACKENDS = _discover_backends() +@cache +def _backends() -> Mapping[str, EngineBackend]: + backends = _discover_backends() + return MappingProxyType(backends) def get_backend(engine_id: str) -> EngineBackend: - _ensure_loaded() - assert _BACKENDS is not None + backends = _backends() try: - return _BACKENDS[engine_id] + return backends[engine_id] except KeyError as exc: - available = ", ".join(sorted(_BACKENDS)) + available = ", ".join(sorted(backends)) raise ConfigError( f"Unknown engine {engine_id!r}. Available: {available}." ) from exc def list_backends() -> list[EngineBackend]: - _ensure_loaded() - assert _BACKENDS is not None - return [_BACKENDS[key] for key in sorted(_BACKENDS)] + backends = _backends() + return [backends[key] for key in sorted(backends)] def list_backend_ids() -> list[str]: - _ensure_loaded() - assert _BACKENDS is not None - return sorted(_BACKENDS) + return sorted(_backends()) def get_engine_config( diff --git a/src/takopi/markdown.py b/src/takopi/markdown.py index 3a87318..1a9d01d 100644 --- a/src/takopi/markdown.py +++ b/src/takopi/markdown.py @@ -3,7 +3,8 @@ from __future__ import annotations import re -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from markdown_it import MarkdownIt from sulguk import transform_html diff --git a/src/takopi/render.py b/src/takopi/render.py index 4b3a00a..4c89ddb 100644 --- a/src/takopi/render.py +++ b/src/takopi/render.py @@ -4,8 +4,8 @@ from __future__ import annotations import textwrap from collections import deque +from collections.abc import Callable from pathlib import Path -from typing import Callable from .model import Action, ActionEvent, ResumeToken, StartedEvent, TakopiEvent from .utils.paths import relativize_path diff --git a/src/takopi/runner.py b/src/takopi/runner.py index fc8991b..0691e02 100644 --- a/src/takopi/runner.py +++ b/src/takopi/runner.py @@ -126,7 +126,7 @@ class BaseRunner(SessionLockMixin): raise NotImplementedError -@dataclass +@dataclass(slots=True) class JsonlRunState: note_seq: int = 0 @@ -302,12 +302,12 @@ class JsonlSubprocessRunner(BaseRunner): tag = self.tag() logger = self.get_logger() - args = [self.command(), *self.build_args(prompt, resume, state=state)] + cmd = [self.command(), *self.build_args(prompt, resume, state=state)] payload = self.stdin_payload(prompt, resume, state=state) env = self.env(state=state) async with manage_subprocess( - *args, + cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -318,7 +318,7 @@ class JsonlSubprocessRunner(BaseRunner): if payload is not None and proc.stdin is None: raise RuntimeError(self.pipes_error_message()) - logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, args) + logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, cmd) if payload is not None: assert proc.stdin is not None diff --git a/src/takopi/runners/claude.py b/src/takopi/runners/claude.py index 5c57ca2..2307278 100644 --- a/src/takopi/runners/claude.py +++ b/src/takopi/runners/claude.py @@ -31,7 +31,7 @@ _RESUME_RE = re.compile( ) -@dataclass +@dataclass(slots=True) class ClaudeStreamState: pending_actions: dict[str, Action] = field(default_factory=dict) last_assistant_text: str | None = None diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index 4527ab7..6e354bd 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -381,7 +381,7 @@ def translate_codex_event(event: dict[str, Any], *, title: str) -> list[TakopiEv return [] -@dataclass +@dataclass(slots=True) class CodexRunState: note_seq: int = 0 final_answer: str | None = None diff --git a/src/takopi/telegram.py b/src/takopi/telegram.py index a37528e..45c274e 100644 --- a/src/takopi/telegram.py +++ b/src/takopi/telegram.py @@ -72,14 +72,30 @@ class TelegramClient: return None try: - payload = resp.json() - except Exception as e: + resp.raise_for_status() + except httpx.HTTPStatusError as e: + body = resp.text logger.error( - "[telegram] bad response method=%s status=%s url=%s: %s", + "[telegram] http error method=%s status=%s url=%s: %s body=%r", method, resp.status_code, resp.request.url, e, + body, + ) + return None + + try: + payload = resp.json() + except Exception as e: + body = resp.text + logger.error( + "[telegram] bad response method=%s status=%s url=%s: %s body=%r", + method, + resp.status_code, + resp.request.url, + e, + body, ) return None diff --git a/src/takopi/utils/subprocess.py b/src/takopi/utils/subprocess.py index 2a9f12f..929331d 100644 --- a/src/takopi/utils/subprocess.py +++ b/src/takopi/utils/subprocess.py @@ -3,7 +3,9 @@ from __future__ import annotations import logging import os import signal +from collections.abc import AsyncIterator, Sequence from contextlib import asynccontextmanager +from typing import Any import anyio from anyio.abc import Process @@ -52,11 +54,13 @@ def kill_process(proc: Process) -> None: @asynccontextmanager -async def manage_subprocess(*args, **kwargs): +async def manage_subprocess( + cmd: Sequence[str], **kwargs: Any +) -> AsyncIterator[Process]: """Ensure subprocesses receive SIGTERM, then SIGKILL after a 2s timeout.""" if os.name == "posix": kwargs.setdefault("start_new_session", True) - proc = await anyio.open_process(args, **kwargs) + proc = await anyio.open_process(cmd, **kwargs) try: yield proc finally: diff --git a/tests/test_exec_bridge.py b/tests/test_exec_bridge.py index 10a85a1..b72af83 100644 --- a/tests/test_exec_bridge.py +++ b/tests/test_exec_bridge.py @@ -715,7 +715,9 @@ async def test_send_with_resume_waits_for_token() -> None: bot = _FakeBot() sent: list[tuple[int, int, str, ResumeToken | None]] = [] - def enqueue(chat_id: int, user_msg_id: int, text: str, resume: ResumeToken) -> None: + async def enqueue( + chat_id: int, user_msg_id: int, text: str, resume: ResumeToken + ) -> None: sent.append((chat_id, user_msg_id, text, resume)) running_task = RunningTask() @@ -748,7 +750,9 @@ async def test_send_with_resume_reports_when_missing() -> None: bot = _FakeBot() sent: list[tuple[int, int, str, ResumeToken | None]] = [] - def enqueue(chat_id: int, user_msg_id: int, text: str, resume: ResumeToken) -> None: + async def enqueue( + chat_id: int, user_msg_id: int, text: str, resume: ResumeToken + ) -> None: sent.append((chat_id, user_msg_id, text, resume)) running_task = RunningTask() diff --git a/tests/test_subprocess.py b/tests/test_subprocess.py index 39faca9..5446c53 100644 --- a/tests/test_subprocess.py +++ b/tests/test_subprocess.py @@ -16,9 +16,11 @@ async def test_manage_subprocess_kills_when_terminate_times_out( monkeypatch.setattr(subprocess_utils, "wait_for_process", fake_wait_for_process) async with subprocess_utils.manage_subprocess( - sys.executable, - "-c", - "import signal, time; signal.signal(signal.SIGTERM, signal.SIG_IGN); time.sleep(10)", + [ + sys.executable, + "-c", + "import signal, time; signal.signal(signal.SIGTERM, signal.SIG_IGN); time.sleep(10)", + ] ) as proc: assert proc.returncode is None