feat: add lockfile to prevent concurrent instances (#30)

This commit is contained in:
banteg
2026-01-03 04:20:51 +04:00
committed by GitHub
parent 5e855e977f
commit 0a56d4002f
4 changed files with 300 additions and 33 deletions
+62 -23
View File
@@ -14,6 +14,7 @@ from .backends import EngineBackend
from .bridge import BridgeConfig, run_main_loop
from .config import ConfigError, load_telegram_config
from .engines import get_backend, get_engine_config, list_backends
from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from .logging import setup_logging
from .onboarding import check_setup, render_setup_guide
from .router import AutoRouter, RunnerEntry
@@ -32,6 +33,46 @@ def _version_callback(value: bool) -> None:
_print_version_and_exit()
def load_and_validate_config(
path: str | Path | None = None,
) -> tuple[dict, Path, str, int]:
config, config_path = load_telegram_config(path)
try:
token = config["bot_token"]
except KeyError:
raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None
if not isinstance(token, str) or not token.strip():
raise ConfigError(
f"Invalid `bot_token` in {config_path}; expected a non-empty string."
) from None
try:
chat_id_value = config["chat_id"]
except KeyError:
raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None
if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int):
raise ConfigError(
f"Invalid `chat_id` in {config_path}; expected an integer."
) from None
return config, config_path, token.strip(), chat_id_value
def acquire_config_lock(config_path: Path, token: str) -> LockHandle:
try:
return acquire_lock(
config_path=config_path,
token_fingerprint=token_fingerprint(token),
)
except LockError as exc:
lines = str(exc).splitlines()
if lines:
typer.echo(lines[0], err=True)
if len(lines) > 1:
typer.echo("\n".join(lines[1:]), err=True)
else:
typer.echo("error: unknown error", err=True)
raise typer.Exit(code=1) from exc
def _default_engine_for_setup(override: str | None) -> str:
if override:
return override
@@ -139,28 +180,13 @@ def _parse_bridge_config(
*,
final_notify: bool,
default_engine_override: str | None,
config: dict,
config_path: Path,
token: str,
chat_id: int,
) -> BridgeConfig:
startup_pwd = os.getcwd()
config, config_path = load_telegram_config()
try:
token = config["bot_token"]
except KeyError:
raise ConfigError(f"Missing key `bot_token` in {config_path}.") from None
if not isinstance(token, str) or not token.strip():
raise ConfigError(
f"Invalid `bot_token` in {config_path}; expected a non-empty string."
) from None
try:
chat_id_value = config["chat_id"]
except KeyError:
raise ConfigError(f"Missing key `chat_id` in {config_path}.") from None
if isinstance(chat_id_value, bool) or not isinstance(chat_id_value, int):
raise ConfigError(
f"Invalid `chat_id` in {config_path}; expected an integer."
) from None
chat_id = chat_id_value
backends = list_backends()
default_engine = _resolve_default_engine(
override=default_engine_override,
@@ -201,25 +227,38 @@ def _run_auto_router(
*, default_engine_override: str | None, final_notify: bool, debug: bool
) -> None:
setup_logging(debug=debug)
lock_handle: LockHandle | None = None
try:
default_engine = _default_engine_for_setup(default_engine_override)
backend = get_backend(default_engine)
except ConfigError as e:
typer.echo(str(e), err=True)
typer.echo(f"error: {e}", err=True)
raise typer.Exit(code=1)
setup = check_setup(backend)
if not setup.ok:
render_setup_guide(setup)
raise typer.Exit(code=1)
try:
config, config_path, token, chat_id = load_and_validate_config()
lock_handle = acquire_config_lock(config_path, token)
cfg = _parse_bridge_config(
final_notify=final_notify,
default_engine_override=default_engine_override,
config=config,
config_path=config_path,
token=token,
chat_id=chat_id,
)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
anyio.run(run_main_loop, cfg)
except ConfigError as e:
typer.echo(f"error: {e}", err=True)
raise typer.Exit(code=1)
except KeyboardInterrupt:
logger.info("[shutdown] interrupted")
raise typer.Exit(code=130)
finally:
if lock_handle is not None:
lock_handle.release()
app = typer.Typer(
+152
View File
@@ -0,0 +1,152 @@
from __future__ import annotations
import hashlib
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class LockInfo:
pid: int | None
token_fingerprint: str | None
class LockError(RuntimeError):
def __init__(
self,
*,
path: Path,
state: str,
) -> None:
self.path = path
self.state = state
super().__init__(_format_lock_message(path, state))
@dataclass
class LockHandle:
path: Path
def release(self) -> None:
try:
self.path.unlink(missing_ok=True)
except OSError as exc:
logger.warning("[lock] failed to remove lock file %s: %s", self.path, exc)
def __enter__(self) -> "LockHandle":
return self
def __exit__(self, exc_type, exc, tb) -> None:
self.release()
def token_fingerprint(token: str) -> str:
digest = hashlib.sha256(token.encode("utf-8")).hexdigest()
return digest[:10]
def lock_path_for_config(config_path: Path) -> Path:
return config_path.with_suffix(".lock")
def acquire_lock(
*, config_path: Path, token_fingerprint: str | None = None
) -> LockHandle:
cfg_path = config_path.expanduser().resolve()
lock_path = lock_path_for_config(cfg_path)
try:
lock_path.parent.mkdir(parents=True, exist_ok=True)
existing = _read_lock_info(lock_path)
if existing:
if (
token_fingerprint
and existing.token_fingerprint
and existing.token_fingerprint != token_fingerprint
):
_write_lock_info(
lock_path,
pid=os.getpid(),
token_fingerprint=token_fingerprint,
)
return LockHandle(path=lock_path)
if _pid_running(existing.pid):
raise LockError(path=lock_path, state="running") from None
_write_lock_info(
lock_path,
pid=os.getpid(),
token_fingerprint=token_fingerprint,
)
except OSError as exc:
raise LockError(path=lock_path, state=str(exc)) from exc
return LockHandle(path=lock_path)
def _read_lock_info(path: Path) -> LockInfo | None:
try:
raw = path.read_text(encoding="utf-8")
except FileNotFoundError:
return None
except OSError:
return None
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None
if not isinstance(data, dict):
return None
pid = data.get("pid")
if isinstance(pid, bool) or not isinstance(pid, int):
pid = None
token_hint = data.get("token_fingerprint")
if not isinstance(token_hint, str):
token_hint = None
return LockInfo(
pid=pid,
token_fingerprint=token_hint,
)
def _write_lock_info(path: Path, *, pid: int, token_fingerprint: str | None) -> None:
payload = {"pid": pid, "token_fingerprint": token_fingerprint}
path.write_text(
json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8"
)
def _pid_running(pid: int | None) -> bool:
if pid is None or pid <= 0:
return False
try:
os.kill(pid, 0)
except ProcessLookupError:
return False
except PermissionError:
return True
except OSError:
return False
return True
def _format_lock_message(path: Path, state: str) -> str:
if state != "running":
return f"error: lock failed: {state}"
header = "error: already running"
display_path = _display_lock_path(path)
lines = [header, f"remove {display_path} if stale"]
return "\n".join(lines)
def _display_lock_path(path: Path) -> str:
home = Path.home()
try:
resolved = path.expanduser().resolve()
rel = resolved.relative_to(home)
return f"~/{rel}"
except (ValueError, OSError):
return str(path)
+5 -11
View File
@@ -29,32 +29,26 @@ def _patch_config(monkeypatch, config):
monkeypatch.setattr(
cli,
"load_telegram_config",
lambda: (config, Path("takopi.toml")),
lambda *args, **kwargs: (config, Path("takopi.toml")),
)
def test_parse_bridge_config_rejects_empty_token(monkeypatch) -> None:
def test_load_and_validate_config_rejects_empty_token(monkeypatch) -> None:
from takopi import cli
_patch_config(monkeypatch, {"bot_token": " ", "chat_id": 123})
with pytest.raises(cli.ConfigError, match="bot_token"):
cli._parse_bridge_config(
final_notify=True,
default_engine_override=None,
)
cli.load_and_validate_config()
def test_parse_bridge_config_rejects_string_chat_id(monkeypatch) -> None:
def test_load_and_validate_config_rejects_string_chat_id(monkeypatch) -> None:
from takopi import cli
_patch_config(monkeypatch, {"bot_token": "token", "chat_id": "123"})
with pytest.raises(cli.ConfigError, match="chat_id"):
cli._parse_bridge_config(
final_notify=True,
default_engine_override=None,
)
cli.load_and_validate_config()
def test_codex_extract_resume_finds_command() -> None:
+82
View File
@@ -0,0 +1,82 @@
import json
import os
import pytest
import takopi.lockfile as lockfile
def test_lockfile_creates_and_cleans_up(tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
config_path.write_text("ok", encoding="utf-8")
handle = lockfile.acquire_lock(
config_path=config_path,
token_fingerprint="deadbeef",
)
try:
assert lockfile.lock_path_for_config(config_path).exists()
finally:
handle.release()
assert not lockfile.lock_path_for_config(config_path).exists()
def test_lockfile_refuses_running_pid(tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
config_path.write_text("ok", encoding="utf-8")
handle = lockfile.acquire_lock(
config_path=config_path,
token_fingerprint="deadbeef",
)
try:
with pytest.raises(lockfile.LockError) as exc:
lockfile.acquire_lock(
config_path=config_path,
token_fingerprint="deadbeef",
)
message = str(exc.value).lower()
assert "already running" in message
assert str(lockfile.lock_path_for_config(config_path)) in str(exc.value)
finally:
handle.release()
def test_lockfile_replaces_dead_pid(tmp_path, monkeypatch) -> None:
config_path = tmp_path / "takopi.toml"
config_path.write_text("ok", encoding="utf-8")
lock_path = lockfile.lock_path_for_config(config_path)
payload = {"pid": 424242, "token_fingerprint": "deadbeef"}
lock_path.write_text(json.dumps(payload), encoding="utf-8")
monkeypatch.setattr(lockfile, "_pid_running", lambda pid: False)
handle = lockfile.acquire_lock(
config_path=config_path,
token_fingerprint="deadbeef",
)
try:
updated = json.loads(lock_path.read_text(encoding="utf-8"))
assert updated["pid"] == os.getpid()
assert updated["token_fingerprint"] == "deadbeef"
finally:
handle.release()
def test_lockfile_replaces_token_mismatch(tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
config_path.write_text("ok", encoding="utf-8")
lock_path = lockfile.lock_path_for_config(config_path)
payload = {"pid": os.getpid(), "token_fingerprint": "other"}
lock_path.write_text(json.dumps(payload), encoding="utf-8")
handle = lockfile.acquire_lock(
config_path=config_path,
token_fingerprint="deadbeef",
)
try:
updated = json.loads(lock_path.read_text(encoding="utf-8"))
assert updated["token_fingerprint"] == "deadbeef"
finally:
handle.release()