diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 7cdd26c..fa89945 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -14,6 +14,7 @@ from .backends import EngineBackend from .bridge import BridgeConfig, run_main_loop from .config import ConfigError, load_telegram_config from .engines import get_backend, get_engine_config, list_backends +from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint from .logging import setup_logging from .onboarding import check_setup, render_setup_guide from .router import AutoRouter, RunnerEntry @@ -32,6 +33,46 @@ def _version_callback(value: bool) -> None: _print_version_and_exit() +def load_and_validate_config( + path: str | Path | None = None, +) -> tuple[dict, Path, str, int]: + config, config_path = load_telegram_config(path) + try: + token = config["bot_token"] + except KeyError: + raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None + if not isinstance(token, str) or not token.strip(): + raise ConfigError( + f"Invalid `bot_token` in {config_path}; expected a non-empty string." + ) from None + try: + chat_id_value = config["chat_id"] + except KeyError: + raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None + if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int): + raise ConfigError( + f"Invalid `chat_id` in {config_path}; expected an integer." + ) from None + return config, config_path, token.strip(), chat_id_value + + +def acquire_config_lock(config_path: Path, token: str) -> LockHandle: + try: + return acquire_lock( + config_path=config_path, + token_fingerprint=token_fingerprint(token), + ) + except LockError as exc: + lines = str(exc).splitlines() + if lines: + typer.echo(lines[0], err=True) + if len(lines) > 1: + typer.echo("\n".join(lines[1:]), err=True) + else: + typer.echo("error: unknown error", err=True) + raise typer.Exit(code=1) from exc + + def _default_engine_for_setup(override: str | None) -> str: if override: return override @@ -139,28 +180,13 @@ def _parse_bridge_config( *, final_notify: bool, default_engine_override: str | None, + config: dict, + config_path: Path, + token: str, + chat_id: int, ) -> BridgeConfig: startup_pwd = os.getcwd() - config, config_path = load_telegram_config() - try: - token = config["bot_token"] - except KeyError: - raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None - if not isinstance(token, str) or not token.strip(): - raise ConfigError( - f"Invalid `bot_token` in {config_path}; expected a non-empty string." - ) from None - try: - chat_id_value = config["chat_id"] - except KeyError: - raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None - if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int): - raise ConfigError( - f"Invalid `chat_id` in {config_path}; expected an integer." - ) from None - chat_id = chat_id_value - backends = list_backends() default_engine = _resolve_default_engine( override=default_engine_override, @@ -201,25 +227,38 @@ def _run_auto_router( *, default_engine_override: str | None, final_notify: bool, debug: bool ) -> None: setup_logging(debug=debug) + lock_handle: LockHandle | None = None try: default_engine = _default_engine_for_setup(default_engine_override) backend = get_backend(default_engine) except ConfigError as e: - typer.echo(str(e), err=True) + typer.echo(f"error: {e}", err=True) raise typer.Exit(code=1) setup = check_setup(backend) if not setup.ok: render_setup_guide(setup) raise typer.Exit(code=1) try: + config, config_path, token, chat_id = load_and_validate_config() + lock_handle = acquire_config_lock(config_path, token) cfg = _parse_bridge_config( final_notify=final_notify, default_engine_override=default_engine_override, + config=config, + config_path=config_path, + token=token, + chat_id=chat_id, ) + anyio.run(run_main_loop, cfg) except ConfigError as e: - typer.echo(str(e), err=True) + typer.echo(f"error: {e}", err=True) raise typer.Exit(code=1) - anyio.run(run_main_loop, cfg) + except KeyboardInterrupt: + logger.info("[shutdown] interrupted") + raise typer.Exit(code=130) + finally: + if lock_handle is not None: + lock_handle.release() app = typer.Typer( diff --git a/src/takopi/lockfile.py b/src/takopi/lockfile.py new file mode 100644 index 0000000..e801787 --- /dev/null +++ b/src/takopi/lockfile.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import hashlib +import json +import logging +import os +from dataclasses import dataclass +from pathlib import Path + +logger = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class LockInfo: + pid: int | None + token_fingerprint: str | None + + +class LockError(RuntimeError): + def __init__( + self, + *, + path: Path, + state: str, + ) -> None: + self.path = path + self.state = state + super().__init__(_format_lock_message(path, state)) + + +@dataclass +class LockHandle: + path: Path + + def release(self) -> None: + try: + self.path.unlink(missing_ok=True) + except OSError as exc: + logger.warning("[lock] failed to remove lock file %s: %s", self.path, exc) + + def __enter__(self) -> "LockHandle": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + self.release() + + +def token_fingerprint(token: str) -> str: + digest = hashlib.sha256(token.encode("utf-8")).hexdigest() + return digest[:10] + + +def lock_path_for_config(config_path: Path) -> Path: + return config_path.with_suffix(".lock") + + +def acquire_lock( + *, config_path: Path, token_fingerprint: str | None = None +) -> LockHandle: + cfg_path = config_path.expanduser().resolve() + lock_path = lock_path_for_config(cfg_path) + try: + lock_path.parent.mkdir(parents=True, exist_ok=True) + existing = _read_lock_info(lock_path) + if existing: + if ( + token_fingerprint + and existing.token_fingerprint + and existing.token_fingerprint != token_fingerprint + ): + _write_lock_info( + lock_path, + pid=os.getpid(), + token_fingerprint=token_fingerprint, + ) + return LockHandle(path=lock_path) + if _pid_running(existing.pid): + raise LockError(path=lock_path, state="running") from None + _write_lock_info( + lock_path, + pid=os.getpid(), + token_fingerprint=token_fingerprint, + ) + except OSError as exc: + raise LockError(path=lock_path, state=str(exc)) from exc + + return LockHandle(path=lock_path) + + +def _read_lock_info(path: Path) -> LockInfo | None: + try: + raw = path.read_text(encoding="utf-8") + except FileNotFoundError: + return None + except OSError: + return None + try: + data = json.loads(raw) + except json.JSONDecodeError: + return None + if not isinstance(data, dict): + return None + pid = data.get("pid") + if isinstance(pid, bool) or not isinstance(pid, int): + pid = None + token_hint = data.get("token_fingerprint") + if not isinstance(token_hint, str): + token_hint = None + return LockInfo( + pid=pid, + token_fingerprint=token_hint, + ) + + +def _write_lock_info(path: Path, *, pid: int, token_fingerprint: str | None) -> None: + payload = {"pid": pid, "token_fingerprint": token_fingerprint} + path.write_text( + json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8" + ) + + +def _pid_running(pid: int | None) -> bool: + if pid is None or pid <= 0: + return False + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + return True + except OSError: + return False + return True + + +def _format_lock_message(path: Path, state: str) -> str: + if state != "running": + return f"error: lock failed: {state}" + header = "error: already running" + display_path = _display_lock_path(path) + lines = [header, f"remove {display_path} if stale"] + return "\n".join(lines) + + +def _display_lock_path(path: Path) -> str: + home = Path.home() + try: + resolved = path.expanduser().resolve() + rel = resolved.relative_to(home) + return f"~/{rel}" + except (ValueError, OSError): + return str(path) diff --git a/tests/test_exec_bridge.py b/tests/test_exec_bridge.py index 384ce9d..de9a821 100644 --- a/tests/test_exec_bridge.py +++ b/tests/test_exec_bridge.py @@ -29,32 +29,26 @@ def _patch_config(monkeypatch, config): monkeypatch.setattr( cli, "load_telegram_config", - lambda: (config, Path("takopi.toml")), + lambda *args, **kwargs: (config, Path("takopi.toml")), ) -def test_parse_bridge_config_rejects_empty_token(monkeypatch) -> None: +def test_load_and_validate_config_rejects_empty_token(monkeypatch) -> None: from takopi import cli _patch_config(monkeypatch, {"bot_token": " ", "chat_id": 123}) with pytest.raises(cli.ConfigError, match="bot_token"): - cli._parse_bridge_config( - final_notify=True, - default_engine_override=None, - ) + cli.load_and_validate_config() -def test_parse_bridge_config_rejects_string_chat_id(monkeypatch) -> None: +def test_load_and_validate_config_rejects_string_chat_id(monkeypatch) -> None: from takopi import cli _patch_config(monkeypatch, {"bot_token": "token", "chat_id": "123"}) with pytest.raises(cli.ConfigError, match="chat_id"): - cli._parse_bridge_config( - final_notify=True, - default_engine_override=None, - ) + cli.load_and_validate_config() def test_codex_extract_resume_finds_command() -> None: diff --git a/tests/test_lockfile.py b/tests/test_lockfile.py new file mode 100644 index 0000000..0c298e4 --- /dev/null +++ b/tests/test_lockfile.py @@ -0,0 +1,82 @@ +import json +import os + +import pytest + +import takopi.lockfile as lockfile + + +def test_lockfile_creates_and_cleans_up(tmp_path) -> None: + config_path = tmp_path / "takopi.toml" + config_path.write_text("ok", encoding="utf-8") + + handle = lockfile.acquire_lock( + config_path=config_path, + token_fingerprint="deadbeef", + ) + try: + assert lockfile.lock_path_for_config(config_path).exists() + finally: + handle.release() + + assert not lockfile.lock_path_for_config(config_path).exists() + + +def test_lockfile_refuses_running_pid(tmp_path) -> None: + config_path = tmp_path / "takopi.toml" + config_path.write_text("ok", encoding="utf-8") + + handle = lockfile.acquire_lock( + config_path=config_path, + token_fingerprint="deadbeef", + ) + try: + with pytest.raises(lockfile.LockError) as exc: + lockfile.acquire_lock( + config_path=config_path, + token_fingerprint="deadbeef", + ) + message = str(exc.value).lower() + assert "already running" in message + assert str(lockfile.lock_path_for_config(config_path)) in str(exc.value) + finally: + handle.release() + + +def test_lockfile_replaces_dead_pid(tmp_path, monkeypatch) -> None: + config_path = tmp_path / "takopi.toml" + config_path.write_text("ok", encoding="utf-8") + lock_path = lockfile.lock_path_for_config(config_path) + payload = {"pid": 424242, "token_fingerprint": "deadbeef"} + lock_path.write_text(json.dumps(payload), encoding="utf-8") + + monkeypatch.setattr(lockfile, "_pid_running", lambda pid: False) + + handle = lockfile.acquire_lock( + config_path=config_path, + token_fingerprint="deadbeef", + ) + try: + updated = json.loads(lock_path.read_text(encoding="utf-8")) + assert updated["pid"] == os.getpid() + assert updated["token_fingerprint"] == "deadbeef" + finally: + handle.release() + + +def test_lockfile_replaces_token_mismatch(tmp_path) -> None: + config_path = tmp_path / "takopi.toml" + config_path.write_text("ok", encoding="utf-8") + lock_path = lockfile.lock_path_for_config(config_path) + payload = {"pid": os.getpid(), "token_fingerprint": "other"} + lock_path.write_text(json.dumps(payload), encoding="utf-8") + + handle = lockfile.acquire_lock( + config_path=config_path, + token_fingerprint="deadbeef", + ) + try: + updated = json.loads(lock_path.read_text(encoding="utf-8")) + assert updated["token_fingerprint"] == "deadbeef" + finally: + handle.release()